Documentation ¶
Index ¶
- Constants
- Variables
- func InitConfig(conf config.Section)
- type CheckpointsTuningConfig
- type Config
- type ConfigWebhookDefaults
- type ConfigWebsocketDefaults
- type DBSerializable
- type Deliver
- type DispatchStatus
- type Dispatcher
- type DispatcherFactory
- type DistributionMode
- type ErrorHandlingType
- type Event
- type EventBatch
- type EventCommon
- type EventStreamActions
- type EventStreamCheckpoint
- type EventStreamDefaults
- type EventStreamSpec
- type EventStreamSpecFields
- type EventStreamStatistics
- type EventStreamStatus
- type EventStreamType
- type GenericEventStream
- func (ges *GenericEventStream) ESFields() *EventStreamSpecFields
- func (ges *GenericEventStream) ESType() EventStreamType
- func (ges *GenericEventStream) IsNil() bool
- func (ges *GenericEventStream) SetID(s string)
- func (ges *GenericEventStream) WebSocketConf() *WebSocketConfig
- func (ges *GenericEventStream) WebhookConf() *WebhookConfig
- func (ges *GenericEventStream) WithRuntimeStatus(status EventStreamStatus, stats *EventStreamStatistics) *GenericEventStream
- type IDValidator
- type LifecyclePhase
- type Manager
- type Persistence
- type Runtime
- type SourceInstruction
- type WebSocketConfig
- type WebhookConfig
Constants ¶
const ( ConfigTLSConfigName = "name" ConfigCheckpointsAsynchronous = "asynchronous" ConfigCheckpointsUnmatchedEventThreshold = "unmatchedEventThreshold" ConfigDisablePrivateIPs = "disablePrivateIPs" ConfigWebhooksDefaultTLSConfig = "tlsConfigName" ConfigWebSocketsDistributionMode = "distributionMode" ConfigDefaultsErrorHandling = "errorHandling" ConfigDefaultsBatchSize = "batchSize" ConfigDefaultsBatchTimeout = "batchTimeout" ConfigDefaultsRetryTimeout = "retryTimeout" ConfigDefaultsBlockedRetryDelay = "blockedRetryDelay" )
const MessageTypeEventBatch = "event_batch"
Variables ¶
var ( EventStreamTypeWebhook = fftypes.FFEnumValue("estype", "webhook") EventStreamTypeWebSocket = fftypes.FFEnumValue("estype", "websocket") )
var ( ErrorHandlingTypeBlock = fftypes.FFEnumValue("ehtype", "block") ErrorHandlingTypeSkip = fftypes.FFEnumValue("ehtype", "skip") )
var ( DispatchStatusDispatching = fftypes.FFEnumValue("edstatus", "dispatching") DispatchStatusRetrying = fftypes.FFEnumValue("edstatus", "retrying") DispatchStatusBlocked = fftypes.FFEnumValue("edstatus", "blocked") DispatchStatusComplete = fftypes.FFEnumValue("edstatus", "complete") DispatchStatusSkipped = fftypes.FFEnumValue("edstatus", "skipped") )
var ( EventStreamStatusStarted = fftypes.FFEnumValue("esstatus", "started") EventStreamStatusStopped = fftypes.FFEnumValue("esstatus", "stopped") EventStreamStatusDeleted = fftypes.FFEnumValue("esstatus", "deleted") EventStreamStatusStopping = fftypes.FFEnumValue("esstatus", "stopping") // not persisted EventStreamStatusStoppingDeleted = fftypes.FFEnumValue("esstatus", "stopping_deleted") // not persisted EventStreamStatusUnknown = fftypes.FFEnumValue("esstatus", "unknown") // not persisted )
var ( DistributionModeBroadcast = fftypes.FFEnumValue("distmode", "broadcast") DistributionModeLoadBalance = fftypes.FFEnumValue("distmode", "load_balance") )
var CheckpointFilters = &ffapi.QueryFields{ "id": &ffapi.StringField{}, "created": &ffapi.TimeField{}, "updated": &ffapi.TimeField{}, "sequenceid": &ffapi.StringField{}, }
var CheckpointsConfig config.Section
var DefaultsConfig config.Section
var GenericEventStreamFilters = &ffapi.QueryFields{ "id": &ffapi.StringField{}, "created": &ffapi.TimeField{}, "updated": &ffapi.TimeField{}, "name": &ffapi.StringField{}, "status": &ffapi.StringField{}, "type": &ffapi.StringField{}, "topicfilter": &ffapi.StringField{}, "initialsequenceid": &ffapi.StringField{}, }
var RetrySection config.Section
var RootConfig config.Section
var TLSConfigs config.ArraySection
var WebSocketsDefaultsConfig config.Section
var WebhookDefaultsConfig config.Section
Functions ¶
func InitConfig ¶
Due to how arrays work currently in the config system, this can only be initialized in one section for the whole process.
Types ¶
type CheckpointsTuningConfig ¶
type Config ¶
type Config[CT EventStreamSpec, DT any] struct { TLSConfigs map[string]*fftls.Config `ffstruct:"EventStreamConfig" json:"tlsConfigs,omitempty"` Retry *retry.Retry `ffstruct:"EventStreamConfig" json:"retry,omitempty"` DisablePrivateIPs bool `ffstruct:"EventStreamConfig" json:"disabledPrivateIPs"` Checkpoints CheckpointsTuningConfig `ffstruct:"EventStreamConfig" json:"checkpoints"` Defaults EventStreamDefaults `ffstruct:"EventStreamConfig" json:"defaults,omitempty"` // Allow plugging in additional types (important that the embedding code adds the FFEnum doc entry for the EventStreamType) AdditionalDispatchers map[EventStreamType]DispatcherFactory[CT, DT] `json:"-"` }
func GenerateConfig ¶
func GenerateConfig[CT EventStreamSpec, DT any](ctx context.Context) *Config[CT, DT]
Optional function to generate config directly from YAML configuration using the config package. You can also generate the configuration programmatically
type ConfigWebhookDefaults ¶
type ConfigWebhookDefaults struct {
ffresty.HTTPConfig
}
type ConfigWebsocketDefaults ¶
type ConfigWebsocketDefaults struct {
DefaultDistributionMode DistributionMode `ffstruct:"EventStreamConfig" json:"distributionMode"`
}
type DBSerializable ¶
Let's us check that the config serializes
type Deliver ¶
type Deliver[DT any] func(events []*Event[DT]) SourceInstruction
type DispatchStatus ¶
type Dispatcher ¶ added in v1.4.3
type DispatcherFactory ¶ added in v1.4.3
type DispatcherFactory[CT EventStreamSpec, DT any] interface { Validate(ctx context.Context, conf *Config[CT, DT], spec CT, tlsConfigs map[string]*tls.Config, phase LifecyclePhase) error NewDispatcher(ctx context.Context, conf *Config[CT, DT], spec CT) Dispatcher[DT] }
DispatcherFactory is the interface to plug in a custom dispatcher, for example to provide local in-process processing of events (in addition to remote WebSocket/Webhook consumption). Generics: - CT is the Configuration Type - the custom extensions to the configuration schema - DT is the Data Type - the payload type that will be delivered to the application
type DistributionMode ¶
type ErrorHandlingType ¶
type Event ¶
type Event[DataType any] struct { EventCommon // Data can be anything to deliver for the event - must be JSON marshalable. // Will be flattened into the struct. // Can define topic and/or sequenceId, but these will overridden with EventCommon strings in the JSON serialization. Data *DataType `json:"-"` }
func (Event[DataType]) MarshalJSON ¶
func (*Event[DataType]) UnmarshalJSON ¶
type EventBatch ¶
type EventBatch[DataType any] struct { wsserver.BatchHeader Type string `json:"type"` // always MessageTypeEventBatch (for consistent WebSocket flow control) Events []*Event[DataType] `json:"events"` // an array of events allows efficient batch acknowledgment }
func (*EventBatch[DataType]) GetBatchHeader ¶ added in v1.4.6
func (eb *EventBatch[DataType]) GetBatchHeader() *wsserver.BatchHeader
type EventCommon ¶
type EventCommon struct { Topic string `json:"topic,omitempty"` // describes the sub-stream of events (optional) allowing sever-side event filtering (regexp) SequenceID string `json:"sequenceId"` // deterministic ID for the event, that must be alpha-numerically orderable within the stream (numbers must be left-padded hex/decimal strings for ordering) }
type EventStreamActions ¶
type EventStreamCheckpoint ¶
type EventStreamCheckpoint struct { ID *string `ffstruct:"EventStreamCheckpoint" json:"id"` Created *fftypes.FFTime `ffstruct:"EventStreamCheckpoint" json:"created"` Updated *fftypes.FFTime `ffstruct:"EventStreamCheckpoint" json:"updated"` SequenceID *string `ffstruct:"EventStreamCheckpoint" json:"sequenceId,omitempty"` }
func (*EventStreamCheckpoint) GetID ¶ added in v1.4.1
func (esc *EventStreamCheckpoint) GetID() string
func (*EventStreamCheckpoint) SetCreated ¶ added in v1.4.1
func (esc *EventStreamCheckpoint) SetCreated(t *fftypes.FFTime)
func (*EventStreamCheckpoint) SetUpdated ¶ added in v1.4.1
func (esc *EventStreamCheckpoint) SetUpdated(t *fftypes.FFTime)
type EventStreamDefaults ¶
type EventStreamDefaults struct { ErrorHandling ErrorHandlingType `ffstruct:"EventStreamDefaults" json:"errorHandling"` BatchSize int `ffstruct:"EventStreamDefaults" json:"batchSize"` BatchTimeout fftypes.FFDuration `ffstruct:"EventStreamDefaults" json:"batchTimeout"` RetryTimeout fftypes.FFDuration `ffstruct:"EventStreamDefaults" json:"retryTimeout"` BlockedRetryDelay fftypes.FFDuration `ffstruct:"EventStreamDefaults" json:"blockedRetryDelay"` WebSocketDefaults ConfigWebsocketDefaults `ffstruct:"EventStreamDefaults" json:"webSockets,omitempty"` WebhookDefaults ConfigWebhookDefaults `ffstruct:"EventStreamDefaults" json:"webhooks,omitempty"` }
type EventStreamSpec ¶
type EventStreamSpec interface { dbsql.Resource SetID(s string) ESFields() *EventStreamSpecFields ESType() EventStreamType // separated from fields to allow choice on restrictions WebhookConf() *WebhookConfig // can return nil if Webhooks not supported WebSocketConf() *WebSocketConfig // can return nil if WebSockets not supported IsNil() bool // needed as quirk of using interfaces with generics }
type EventStreamSpecFields ¶ added in v1.4.4
type EventStreamSpecFields struct { Name *string `ffstruct:"eventstream" json:"name,omitempty"` Status *EventStreamStatus `ffstruct:"eventstream" json:"status,omitempty"` InitialSequenceID *string `ffstruct:"eventstream" json:"initialSequenceId,omitempty"` TopicFilter *string `ffstruct:"eventstream" json:"topicFilter,omitempty"` ErrorHandling *ErrorHandlingType `ffstruct:"eventstream" json:"errorHandling"` BatchSize *int `ffstruct:"eventstream" json:"batchSize"` BatchTimeout *fftypes.FFDuration `ffstruct:"eventstream" json:"batchTimeout"` RetryTimeout *fftypes.FFDuration `ffstruct:"eventstream" json:"retryTimeout"` BlockedRetryDelay *fftypes.FFDuration `ffstruct:"eventstream" json:"blockedRetryDelay"` // contains filtered or unexported fields }
type EventStreamStatistics ¶
type EventStreamStatistics struct { StartTime *fftypes.FFTime `ffstruct:"EventStreamStatistics" json:"startTime"` LastDispatchTime *fftypes.FFTime `ffstruct:"EventStreamStatistics" json:"lastDispatchTime"` LastDispatchAttempts int `ffstruct:"EventStreamStatistics" json:"lastDispatchAttempts,omitempty"` LastDispatchFailure string `ffstruct:"EventStreamStatistics" json:"lastDispatchFailure,omitempty"` LastDispatchStatus DispatchStatus `ffstruct:"EventStreamStatistics" json:"lastDispatchComplete"` HighestDetected string `ffstruct:"EventStreamStatistics" json:"highestDetected"` HighestDispatched string `ffstruct:"EventStreamStatistics" json:"highestDispatched"` Checkpoint string `ffstruct:"EventStreamStatistics" json:"checkpoint"` }
type EventStreamStatus ¶
type EventStreamType ¶
type GenericEventStream ¶ added in v1.4.4
type GenericEventStream struct { dbsql.ResourceBase Type *EventStreamType `ffstruct:"eventstream" json:"type,omitempty" ffenum:"estype"` EventStreamSpecFields Webhook *WebhookConfig `ffstruct:"eventstream" json:"webhook,omitempty"` WebSocket *WebSocketConfig `ffstruct:"eventstream" json:"websocket,omitempty"` Statistics *EventStreamStatistics `ffstruct:"EventStream" json:"statistics,omitempty"` }
This is a base object, and set of filters, that you can use if: - You are happy exposing all the built-in types of consumer (webhooks/websockets) - You do not need to extend the configuration in any way - You are happy using UUIDs for your IDs per dbsql.ResourceBase semantics
A pre-built persistence library is provided, and sample migrations, that work with this structure.
The design of the generic is such that you can start with the generic structure, and then move to your own structure later if you want to add more fields.
When you are ready to extend, you need to:
- Copy the GenericEventStream source into your own repo, and rename it appropriately. Then you can add your extra configuration fields to it.
- Copy the EventStreams() and Checkpoints() CRUD factories into your own repo, and extend them with additional columns etc. as you see fit.
func (*GenericEventStream) ESFields ¶ added in v1.4.4
func (ges *GenericEventStream) ESFields() *EventStreamSpecFields
func (*GenericEventStream) ESType ¶ added in v1.4.4
func (ges *GenericEventStream) ESType() EventStreamType
func (*GenericEventStream) IsNil ¶ added in v1.4.4
func (ges *GenericEventStream) IsNil() bool
func (*GenericEventStream) SetID ¶ added in v1.4.4
func (ges *GenericEventStream) SetID(s string)
func (*GenericEventStream) WebSocketConf ¶ added in v1.4.4
func (ges *GenericEventStream) WebSocketConf() *WebSocketConfig
func (*GenericEventStream) WebhookConf ¶ added in v1.4.4
func (ges *GenericEventStream) WebhookConf() *WebhookConfig
func (*GenericEventStream) WithRuntimeStatus ¶ added in v1.4.4
func (ges *GenericEventStream) WithRuntimeStatus(status EventStreamStatus, stats *EventStreamStatistics) *GenericEventStream
type LifecyclePhase ¶ added in v1.4.3
type LifecyclePhase int
const ( LifecyclePhasePreInsertValidation LifecyclePhase = iota // on user-supplied context, prior to inserting to DB LifecyclePhaseStarting // while initializing for startup (so all defaults should be resolved) )
type Manager ¶
type Manager[CT EventStreamSpec] interface { UpsertStream(ctx context.Context, nameOrID string, esSpec CT) (bool, error) GetStreamByID(ctx context.Context, id string, opts ...dbsql.GetOption) (CT, error) GetStreamByNameOrID(ctx context.Context, nameOrID string, opts ...dbsql.GetOption) (CT, error) ListStreams(ctx context.Context, filter ffapi.Filter) ([]CT, *ffapi.FilterResult, error) StopStream(ctx context.Context, nameOrID string) error StartStream(ctx context.Context, nameOrID string) error ResetStream(ctx context.Context, nameOrID string, sequenceID *string, preStartCallbacks ...func(ctx context.Context, spec CT) error) error DeleteStream(ctx context.Context, nameOrID string) error Close(ctx context.Context) }
func NewEventStreamManager ¶
type Persistence ¶
type Persistence[CT EventStreamSpec] interface { EventStreams() dbsql.CRUD[CT] Checkpoints() dbsql.CRUD[*EventStreamCheckpoint] IDValidator() IDValidator Close() }
func NewGenericEventStreamPersistence ¶ added in v1.4.4
func NewGenericEventStreamPersistence(db *dbsql.Database, idValidator IDValidator) Persistence[*GenericEventStream]
NewGenericEventStreamPersistence is a helper that builds persistence with no extra config Users of this package can use this in cases where they do not have any additional configuration that needs to be persisted, and are happy using dbsql.ResourceBase for IDs.
type Runtime ¶
type Runtime[ConfigType EventStreamSpec, DataType any] interface { // Generate a new unique resource ID (such as a UUID) NewID() string // Return a COPY of the config object with runtime status and statistics (runtime enrichment) WithRuntimeStatus(spec ConfigType, status EventStreamStatus, stats *EventStreamStatistics) ConfigType // Type specific config validation goes here Validate(ctx context.Context, config ConfigType) error // The run function should execute in a loop detecting events until instructed to stop: // - The Run function should block when no events are available // - Must detect if the context is closed (see below) // - The Deliver function will block if the stream is blocked: // - Blocked means the previous batch is being processed, and the current batch is full // - If the stream stops, the Exit instruction will be returned from deliver // - The supplied context will be cancelled as well on exit, so should be used: // 1. In any blocking i/o functions // 2. To wake any sleeps early, such as batch polling scenarios // - If the function returns without an Exit instruction, it will be restarted from the last checkpoint Run(ctx context.Context, spec ConfigType, checkpointSequenceID string, deliver Deliver[DataType]) error }
Runtime is the required implementation extension for the EventStream common utility Generics: - ConfigType is the Configuration Type - the custom extensions to the configuration schema - DataType is the Data Type - the payload type that will be delivered to the application
type SourceInstruction ¶
type SourceInstruction int
const ( Continue SourceInstruction = iota Exit )
type WebSocketConfig ¶
type WebSocketConfig struct {
DistributionMode *DistributionMode `ffstruct:"wsconfig" json:"distributionMode,omitempty"`
}
func (*WebSocketConfig) Scan ¶
func (wc *WebSocketConfig) Scan(src interface{}) error
Store in DB as JSON
type WebhookConfig ¶
type WebhookConfig struct { URL *string `ffstruct:"whconfig" json:"url,omitempty"` Method *string `ffstruct:"whconfig" json:"method,omitempty"` TLSConfigName *string `ffstruct:"whconfig" json:"tlsConfigName,omitempty"` HTTP *ffresty.HTTPConfig `ffstruct:"whconfig" json:"http,omitempty"` // contains filtered or unexported fields }
func (*WebhookConfig) Scan ¶
func (wc *WebhookConfig) Scan(src interface{}) error
Store in DB as JSON