Documentation ¶
Overview ¶
Package reflex provides events (state change notifications) for mutable sql tables. It also supports streaming consumption of these events both directly locally or via gRPC. It therefore aims to provide streams of sql events similar to other database streaming features, like DynamoDB Streams, except that these events do not contain any data, but are merely pointers.
Reflex events are pointers to state changes of mutable data.
event_id string // unique id of the event type enum // state change type foreign_id string // id of the mutable datum timestamp timestamp // timestamp of the event
Events are inserted as part of sql transactions that modify the data. This ensures strong data consistency; exactly one event per state change.
The EventsTable wraps a event DB table allowing insertion of events. The canonical pattern is to define an EventsTable per mutable data table. Ex. users and user_events, payments and payment_events. Note that event insertion is generally defined on the data layer, not business logic layer.
The datum (data entity) referred to and state change performed on it are implied by the event source and type. Ex. An event produced by the "user service" with type "UserCreated", or an event by the "auth service" with type "APIKeyVerified".
The EventsTable also provides event streams from a arbitrary point in the past. Reflex also supports exposing these event streams via gRPC.
Consumers consume the event stream keeping a cursor of the last event. This ensures at-least-once data consistency. Consumers therefore need to persist their cursor.
The CursorsTable wraps a DB table allowing getting and setting of cursors. The canonical pattern is to define one cursor table per service.
There are two ways to consume event streams:
EventTable + CursorsTable: The consumer logic has local access to both the events and consumers tables. Ex. User service sends an email on UserCreated.
gRPC stream + CursorsTable: The consumer logic has access to a local CursorsTable, but requests the event stream from a remote service via gRPC. Ex. The Fraud service consumes PaymentCreated events from the payments service. It has its own DB and CursorsTable.
Index ¶
- Variables
- func IsAnyType(source EventType, targets ...EventType) bool
- func IsExpected(err error) bool
- func IsFilterErr(err error) bool
- func IsHeadReachedErr(err error) bool
- func IsStoppedErr(err error) bool
- func IsType(source, target EventType) bool
- func Run(in context.Context, s Spec) error
- type Consumable
- type ConsumeFunc
- type Consumer
- type ConsumerError
- type ConsumerFunc
- type ConsumerOption
- func WithConsumerActivityTTL(ttl time.Duration) ConsumerOption
- func WithConsumerActivityTTLFunc(ttl func() time.Duration) ConsumerOption
- func WithConsumerLagAlert(d time.Duration) ConsumerOption
- func WithConsumerLagAlertGauge(g prometheus.Gauge) ConsumerOption
- func WithEventFilter(flt EventFilter) ConsumerOption
- func WithFilterIncludeTypes(evts ...EventType) ConsumerOption
- func WithRecoverFunction(rfn RecoveryFunc) ConsumerOption
- func WithoutConsumerActivityTTL() ConsumerOption
- func WithoutConsumerLag() ConsumerOption
- type CursorStore
- type ErrorInsertFunc
- type ErrorStatus
- type Event
- type EventFilter
- type EventType
- type RecoveryFunc
- type ResetterCtx
- type Server
- type Spec
- type Stopper
- type StreamClient
- type StreamClientPB
- type StreamFunc
- type StreamOption
- type StreamOptions
Constants ¶
This section is empty.
Variables ¶
var ( // ErrStopped is returned when an events table is stopped, usually // when the grpc server is stopped. Clients should check for this error // and reconnect. ErrStopped = errors.New("the event stream has been stopped", j.C("ERR_09290f5944cb8671")) // ErrHeadReached is returned when the events table reaches the end of // the stream of events. ErrHeadReached = errors.New("the event stream has reached the current head", j.C("ERR_b4b155d2a91cfcd0")) )
Functions ¶
func IsExpected ¶
IsExpected returns true if the error is expected during normal streaming operation.
func IsFilterErr ¶
IsFilterErr returns true if the error occurred during Event filtering operations.
func IsHeadReachedErr ¶
IsHeadReachedErr checks whether err is an ErrHeadReached
func IsStoppedErr ¶
IsStoppedErr checks whether err is an ErrStopped
Types ¶
type Consumable ¶
type Consumable interface { // Consume blocks while events are streamed to consumer. It always returns a non-nil error. // Cancel the context to return early. Consume(context.Context, Consumer, ...StreamOption) error }
Consumable is an interface for an object that provides a ConsumeFunc with the name Run. Deprecated: Please use Spec.
func NewConsumable ¶
func NewConsumable(sFn StreamFunc, cstore CursorStore, opts ...StreamOption) Consumable
NewConsumable creates a new Consumable Deprecated: Please use Run.
type ConsumeFunc ¶
type ConsumeFunc func(context.Context, Consumer, ...StreamOption) error
ConsumeFunc is the main reflex consume interface. It blocks while events are streamed to consumer. It always returns a non-nil error. Cancel the context to return early. Deprecated: Please use Spec.
type Consumer ¶
Consumer represents a piece of business logic that consumes events. It consists of a name and the consume logic. Consumer logic should be idempotent since reflex provides at-least-once event delivery.
func NewConsumer ¶
func NewConsumer(name string, fn ConsumerFunc, opts ...ConsumerOption, ) Consumer
NewConsumer returns a new instrumented consumer of events.
type ConsumerError ¶
type ConsumerError struct { ID string ConsumerName string EventID string Message string CreatedAt time.Time UpdatedAt time.Time Status ErrorStatus }
ConsumerError is a record of a reflex event consumer error.
func (*ConsumerError) EventIDInt ¶
func (e *ConsumerError) EventIDInt() int64
EventIDInt returns the event id as an int64 or 0 if it is not an integer.
func (*ConsumerError) IDInt ¶
func (e *ConsumerError) IDInt() int64
IDInt returns the event id as an int64 or 0 if it is not an integer.
func (*ConsumerError) IsEventIDInt ¶
func (e *ConsumerError) IsEventIDInt() bool
IsEventIDInt returns true if the event id is an integer.
func (*ConsumerError) IsIDInt ¶
func (e *ConsumerError) IsIDInt() bool
IsIDInt returns true if the event id is an integer.
type ConsumerOption ¶
type ConsumerOption func(*consumer)
ConsumerOption will change the behaviour of the consumer
func WithConsumerActivityTTL ¶
func WithConsumerActivityTTL(ttl time.Duration) ConsumerOption
WithConsumerActivityTTL provides an option to set the consumer activity metric ttl; ie. if no events is consumed in `tll` duration the consumer is considered inactive.
func WithConsumerActivityTTLFunc ¶
func WithConsumerActivityTTLFunc(ttl func() time.Duration) ConsumerOption
WithConsumerActivityTTLFunc is similar to WithConsumerActivityTTL but accepts a function that returns the TTL.
func WithConsumerLagAlert ¶
func WithConsumerLagAlert(d time.Duration) ConsumerOption
WithConsumerLagAlert provides an option to set the consumer lag alert threshold.
func WithConsumerLagAlertGauge ¶
func WithConsumerLagAlertGauge(g prometheus.Gauge) ConsumerOption
WithConsumerLagAlertGauge provides an option to set the consumer lag alert gauge. Handy for custom alert metadata as labels.
func WithEventFilter ¶
func WithEventFilter(flt EventFilter) ConsumerOption
WithEventFilter provides an option to specify which Event values a consumer is interested in. For uninteresting events Consume is never called, and a skipped metric is incremented.
func WithFilterIncludeTypes ¶
func WithFilterIncludeTypes(evts ...EventType) ConsumerOption
WithFilterIncludeTypes provides an option to specify which EventTypes a consumer is interested in. For uninteresting events Consume is never called, and a skipped metric is incremented.
func WithRecoverFunction ¶
func WithRecoverFunction(rfn RecoveryFunc) ConsumerOption
WithRecoverFunction provides an option to specify a recovery function to be called when a consuming an event returns an error and potentially changing the error returned or even eliminate it by return nil. It can also be used for notification and/or recording of events that failed to process successfully.
func WithoutConsumerActivityTTL ¶
func WithoutConsumerActivityTTL() ConsumerOption
WithoutConsumerActivityTTL provides an option to disable the consumer activity metric ttl.
func WithoutConsumerLag ¶
func WithoutConsumerLag() ConsumerOption
WithoutConsumerLag provides an option to disable the consumer lag alert.
type CursorStore ¶
type CursorStore interface { // GetCursor returns the consumers cursor, it returns an empty string if no cursor exists. GetCursor(ctx context.Context, consumerName string) (string, error) // SetCursor stores the consumers cursor. Note some implementation may buffer writes. SetCursor(ctx context.Context, consumerName string, cursor string) error // Flush writes any buffered cursors to the underlying store. Flush(ctx context.Context) error }
CursorStore is an interface used to persist consumer offsets in a stream.
type ErrorInsertFunc ¶
type ErrorInsertFunc func(ctx context.Context, consumerName string, eventID string, errMsg string) error
ErrorInsertFunc abstracts the insertion of an event into a sql table.
type ErrorStatus ¶
type ErrorStatus int
ErrorStatus is the current status of a consumer error.
const ( UnknownEventError ErrorStatus = 0 // EventErrorRecorded - New errors should be saved in this state [initial] EventErrorRecorded ErrorStatus = 1 )
func (ErrorStatus) ReflexType ¶
func (e ErrorStatus) ReflexType() int
func (ErrorStatus) ShiftStatus ¶
func (e ErrorStatus) ShiftStatus() int
type Event ¶
type Event struct { ID string Type EventType ForeignID string Timestamp time.Time MetaData []byte Trace []byte }
Event is the reflex event. It is an immutable notification event that indicates that a change of a some type relating a foreign entity happened at a specific time. It may also contain metadata relating to the change.
func (*Event) ForeignIDInt ¶
ForeignIDInt returns the foreign id as an int64 or 0 if it is not an integer.
func (*Event) IsForeignIDInt ¶
IsForeignIDInt returns true if the foreign id is an integer.
type EventFilter ¶
EventFilter takes an Event and returns true if it should be allowed to be processed or false if it shouldn't. It can error if it fails to determine if the event should be processed. Please note it is expected that the func should return promptly and as such other than smaller in memory transforms/extractions it should not be making any I/O or significant API calls (especially remote ones) as the expectation is that the only data needed will be on the event itself.
type EventType ¶
type EventType interface { // ReflexType returns the type as an int. ReflexType() int }
EventType is an interface for enums that act as reflex event types.
type RecoveryFunc ¶
RecoveryFunc is a function that can be added as a ConsumerOption using the WithRecoverFunction function to provide handling for when a consumer function returns an error. This handling can just be recording the error or since it takes in the error and returns an error as well it can return nil to "recover" from the error (additional work may obviously be needed to do any actual recovery), return the same error if it could not be handled or even return a different error.
type ResetterCtx ¶
ResetterCtx is an optional interface that a consumer can implement indicating that it is stateful and requires resetting at the start of each Run.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server provides stream, consume and graceful shutdown functionality for use in a gRPC server.
func (*Server) Stop ¶
func (s *Server) Stop()
Stop stops serving gRPC stream and consume methods returning ErrStopped. It should be used for graceful shutdown. It panics if called more than once.
func (*Server) Stream ¶
func (s *Server) Stream(sFn StreamFunc, req *reflexpb.StreamRequest, sspb streamServerPB) error
Stream streams events for a gRPC stream method. It always returns a non-nil error. It returns ErrStopped if the server is stopped. Note that back pressure is achieved by gRPC Streams' 64KB send and receive buffers. Note that gRPC does not guarantee buffered messages being sent on the wire, see https://github.com/grpc/grpc-go/issues/2159
type Spec ¶
type Spec struct {
// contains filtered or unexported fields
}
Spec specifies all the elements required to stream and consume reflex events for a specific purpose. StreamFunc is the source of the events. Consumer is the business logic consuming the events. CursorStore persists a cursor of consumed events. As long as the elements do not change the consumer is guaranteed at-least-once delivery of all events in the stream.
func NewSpec ¶
func NewSpec(stream StreamFunc, cStore CursorStore, consumer Consumer, opts ...StreamOption, ) Spec
NewSpec returns a new Spec.
type Stopper ¶
type Stopper interface {
Stop() error
}
Stopper is an optional interface that a consumer can implement indicating that it has clean up work to do at the end of each Run.
type StreamClient ¶
type StreamClient interface { // Recv blocks until the next event is found. Either the event or error is non-nil. Recv() (*Event, error) }
StreamClient is a stream interface providing subsequent events on calls to Recv.
type StreamClientPB ¶
StreamClientPB defines a common interface for reflex stream gRPC generated implementations.
type StreamFunc ¶
type StreamFunc func(ctx context.Context, after string, opts ...StreamOption) (StreamClient, error)
StreamFunc is the main reflex stream interface that all implementations should provide. It returns a long-lived StreamClient that will stream events from the source.
func NewMockStream ¶
func NewMockStream(events []*Event, endErr error) StreamFunc
NewMockStream stream options will not work with a mock stream, it will just return the list of events provided. Purely meant for testing.
func WrapStreamPB ¶
func WrapStreamPB(wrap func(context.Context, *reflexpb.StreamRequest) ( StreamClientPB, error), ) StreamFunc
WrapStreamPB wraps a gRPC client's stream method and returns a StreamFunc.
type StreamOption ¶
type StreamOption func(*StreamOptions)
StreamOption defines a functional option that configures StreamOptions.
func WithStreamFromHead ¶
func WithStreamFromHead() StreamOption
WithStreamFromHead provides an option to stream only new events from the head of events table. Note this overrides the "after" parameter.
func WithStreamLag ¶
func WithStreamLag(d time.Duration) StreamOption
WithStreamLag provides an option to stream events only after they are older than a duration.
func WithStreamToHead ¶
func WithStreamToHead() StreamOption
WithStreamToHead provides an option to return ErrHeadReached as soon as no more events are available. This is useful for testing or back-fills.
type StreamOptions ¶
type StreamOptions struct { // Lag defines the duration after an event is created before it becomes // eligible for streaming. Lag time.Duration // StreamFromHead defines that the initial event be retrieved // from the head of the evens table. StreamFromHead bool // StreamToHead defines that ErrHeadReached be returned as soon // as no more events are available. StreamToHead bool }
StreamOptions provide options sent to the event stream source.
func ResolveOptions ¶
func ResolveOptions(options ...StreamOption) StreamOptions
ResolveOptions converts a list of options to the StreamOptions struct
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
_example
|
|
internal/db
Package db implements common DB functions for the reflex example servers.
|
Package db implements common DB functions for the reflex example servers. |
internal
|
|
Package rblob leverages the gocloud.dev/blob package and provides a reflex stream for events persisted in a bucket of strictly ordered append-only log of flat files.
|
Package rblob leverages the gocloud.dev/blob package and provides a reflex stream for events persisted in a bucket of strictly ordered append-only log of flat files. |
Package rsql provides reflex event stream and cursor table implementations for mysql.
|
Package rsql provides reflex event stream and cursor table implementations for mysql. |