Documentation ¶
Overview ¶
Package rsql provides reflex event stream and cursor table implementations for mysql.
Index ¶
- Variables
- func DisableCache()
- func FillGaps(dbc *sql.DB, gapTable gapTable)
- func GetLatestIDForTesting(ctx context.Context, _ *testing.T, dbc *sql.DB, eventTable, idField string) (int64, error)
- func GetNextEventsForTesting(ctx context.Context, _ *testing.T, dbc *sql.DB, table *EventsTable, ...) ([]*reflex.Event, error)
- func IsDuplicateErrorInsertion(err error) bool
- func StopFillingGaps(ctx context.Context, gapTable gapTable)
- func TestCursorsTable(t *testing.T, dbc *sql.DB, table CursorsTable)
- func TestEventsTable(t *testing.T, dbc *sql.DB, table *EventsTable)
- func TestEventsTableInt(t *testing.T, dbc *sql.DB, table *EventsTableInt)
- func TestEventsTableWithID(t *testing.T, dbc *sql.DB, table *EventsTable, foreignID string)
- type CursorType
- type CursorsOption
- func WithCursorAsyncDisabled() CursorsOption
- func WithCursorAsyncPeriod(d time.Duration) CursorsOption
- func WithCursorCursorField(field string) CursorsOption
- func WithCursorIDField(field string) CursorsOption
- func WithCursorSetCounter(f func()) CursorsOption
- func WithCursorStrings() CursorsOption
- func WithCursorTimeField(field string) CursorsOption
- func WithTestCursorSleep(_ testing.TB, f func(time.Duration)) CursorsOption
- type CursorsTable
- type ErrorEventInserter
- type ErrorInserter
- type ErrorsOption
- func WithErrorCounter(counter func(consumer string)) ErrorsOption
- func WithErrorCreatedAtField(field string) ErrorsOption
- func WithErrorEventConsumerField(field string) ErrorsOption
- func WithErrorEventIDField(field string) ErrorsOption
- func WithErrorEventInserter(inserter ErrorEventInserter) ErrorsOption
- func WithErrorIDField(field string) ErrorsOption
- func WithErrorInserter(inserter ErrorInserter) ErrorsOption
- func WithErrorMsgField(field string) ErrorsOption
- func WithErrorRecordOnly() ErrorsOption
- func WithErrorStatusField(field string) ErrorsOption
- func WithErrorTableName(name string) ErrorsOption
- func WithErrorUpdatedAtField(field string) ErrorsOption
- type ErrorsTable
- type EventsNotifier
- type EventsOption
- func WithEventForeignIDField(field string) EventsOption
- func WithEventIDField(field string) EventsOption
- func WithEventMetadataField(field string) EventsOption
- func WithEventTimeField(field string) EventsOption
- func WithEventTraceField(field string) EventsOption
- func WithEventTypeField(field string) EventsOption
- func WithEventsBackoff(d time.Duration) EventsOption
- func WithEventsCacheEnabled() EventsOptiondeprecated
- func WithEventsInMemNotifier() EventsOption
- func WithEventsInserter(inserter inserter) EventsOption
- func WithEventsLoader(loader loader) EventsOption
- func WithEventsNotifier(notifier EventsNotifier) EventsOption
- func WithIncludeNoopEvents() EventsOption
- func WithoutEventsCache() EventsOption
- type EventsTable
- func (t *EventsTable) Clone(opts ...EventsOption) *EventsTable
- func (t *EventsTable) Insert(ctx context.Context, tx *sql.Tx, foreignID string, typ reflex.EventType) (NotifyFunc, error)
- func (t *EventsTable) InsertWithMetadata(ctx context.Context, tx *sql.Tx, foreignID string, typ reflex.EventType, ...) (NotifyFunc, error)
- func (t *EventsTable) ListenGaps(f GapListenFunc)
- func (t *EventsTable) StopGapListener(ctx context.Context)
- func (t *EventsTable) Stream(ctx context.Context, dbc *sql.DB, after string, opts ...reflex.StreamOption) reflex.StreamClient
- func (t *EventsTable) ToStream(dbc *sql.DB, opts1 ...reflex.StreamOption) reflex.StreamFunc
- type EventsTableInt
- func (e *EventsTableInt) Clone(opts ...EventsOption) *EventsTableInt
- func (e *EventsTableInt) Insert(ctx context.Context, tx *sql.Tx, foreignID int64, typ reflex.EventType) (NotifyFunc, error)
- func (e *EventsTableInt) InsertWithMetadata(ctx context.Context, tx *sql.Tx, foreignID int64, typ reflex.EventType, ...) (NotifyFunc, error)
- type Gap
- type GapListenFunc
- type NotifyFunc
- type StreamWatcher
Constants ¶
This section is empty.
Variables ¶
var ( // ErrConsecEvent occurs when the difference between the ids of two consecutive events is not 1. ErrConsecEvent = errors.New("non-consecutive event ids", j.C("ERR_bc3dcacb92b9761f")) // ErrInvalidIntID occurs when a non-int value is specified for an integer id ErrInvalidIntID = errors.New("invalid id, only int supported", j.C("ERR_82d0368b5478d378")) )
Functions ¶
func DisableCache ¶
func DisableCache()
DisableCache results in all loading going straight through to the underlying loader.
func FillGaps ¶
FillGaps registers the default gap filler with the events table. It inserts noops into the events table when gaps are detected. Both EventsTable and EventsTableInt satisfy the gapTable internal interface.
Usage: var events = rsql.NewEventsTable() ... rsql.FillGaps(dbc, events)
func GetLatestIDForTesting ¶
func GetLatestIDForTesting(ctx context.Context, _ *testing.T, dbc *sql.DB, eventTable, idField string) (int64, error)
GetLatestIDForTesting fetches the latest event id from the event table
func GetNextEventsForTesting ¶
func GetNextEventsForTesting(ctx context.Context, _ *testing.T, dbc *sql.DB, table *EventsTable, after int64, lag time.Duration) ([]*reflex.Event, error)
GetNextEventsForTesting fetches a bunch of events from the event table
func StopFillingGaps ¶
StopFillingGaps stops any goroutine started from FillGaps If FillGaps has not been called, then this will block until ctx is cancelled or deadline is reached
func TestCursorsTable ¶
func TestCursorsTable(t *testing.T, dbc *sql.DB, table CursorsTable)
TestCursorsTable runs assertions on the cursor table to check it has a valid schema
func TestEventsTable ¶
func TestEventsTable(t *testing.T, dbc *sql.DB, table *EventsTable)
TestEventsTable provides a helper function to test event tables.
func TestEventsTableInt ¶
func TestEventsTableInt(t *testing.T, dbc *sql.DB, table *EventsTableInt)
TestEventsTableInt provides a helper function to test event tables with int foreign id columns.
func TestEventsTableWithID ¶
TestEventsTableWithID provides a helper function to test event tables.
Types ¶
type CursorType ¶
type CursorType int
CursorType is either int or string
func (CursorType) Cast ¶
func (t CursorType) Cast(cursor string) (interface{}, error)
Cast returns cursor casted to type.
type CursorsOption ¶
type CursorsOption func(*ctable)
CursorsOption are the configurations for the cursor table
func WithCursorAsyncDisabled ¶
func WithCursorAsyncDisabled() CursorsOption
WithCursorAsyncDisabled provides an option to disable async writes.
func WithCursorAsyncPeriod ¶
func WithCursorAsyncPeriod(d time.Duration) CursorsOption
WithCursorAsyncPeriod provides an option to configure the async write period. It defaults to 5 seconds.
func WithCursorCursorField ¶
func WithCursorCursorField(field string) CursorsOption
WithCursorCursorField provides an option to configure the cursor field. It defaults to 'last_event_id'.
func WithCursorIDField ¶
func WithCursorIDField(field string) CursorsOption
WithCursorIDField provides an option to configure the cursor ID field. It defaults to 'id'.
func WithCursorSetCounter ¶
func WithCursorSetCounter(f func()) CursorsOption
WithCursorSetCounter provides an option to set the cursor DB set cursor metric. It defaults to prometheus metrics.
func WithCursorStrings ¶
func WithCursorStrings() CursorsOption
WithCursorStrings provides an option to configure the cursor type to string. It defaults to int.
func WithCursorTimeField ¶
func WithCursorTimeField(field string) CursorsOption
WithCursorTimeField provides an option to configure the cursor time field. It defaults to 'updated_at'.
func WithTestCursorSleep ¶
func WithTestCursorSleep(_ testing.TB, f func(time.Duration)) CursorsOption
WithTestCursorSleep replaces the sleep function for testing.
type CursorsTable ¶
type CursorsTable interface { GetCursor(ctx context.Context, dbc *sql.DB, consumerID string) (string, error) SetCursor(ctx context.Context, dbc *sql.DB, consumerID string, cursor string) error Flush(ctx context.Context) error Clone(ol ...CursorsOption) CursorsTable ToStore(dbc *sql.DB, ol ...CursorsOption) reflex.CursorStore }
CursorsTable provides an interface to an event consumer cursors db table.
func NewCursorsTable ¶
func NewCursorsTable(name string, options ...CursorsOption) CursorsTable
NewCursorsTable returns a new CursorsTable implementation.
type ErrorEventInserter ¶
type ErrorEventInserter func(ctx context.Context, tx *sql.Tx, foreignID string, typ reflex.EventType, metadata []byte) (NotifyFunc, error)
ErrorEventInserter abstracts the insertion of an event into a sql table including providing a notification capability.
type ErrorInserter ¶
type ErrorInserter func(ctx context.Context, tx *sql.Tx, consumer, eventID, errMsg string, errStatus reflex.ErrorStatus) (string, error)
ErrorInserter abstracts the insertion of an error into a sql table.
type ErrorsOption ¶
type ErrorsOption func(*ErrorsTable)
ErrorsOption defines a functional option to configure new error tables.
func WithErrorCounter ¶
func WithErrorCounter(counter func(consumer string)) ErrorsOption
WithErrorCounter provides an option to set the error counter which counts the errors being successfully record to the error table.
func WithErrorCreatedAtField ¶
func WithErrorCreatedAtField(field string) ErrorsOption
WithErrorCreatedAtField provides an option to set the error DB created at timestamp field. It defaults to 'created_at'.
func WithErrorEventConsumerField ¶
func WithErrorEventConsumerField(field string) ErrorsOption
WithErrorEventConsumerField provides an option to set the event consumer DB msg field. It defaults to 'consumer'.
func WithErrorEventIDField ¶
func WithErrorEventIDField(field string) ErrorsOption
WithErrorEventIDField provides an option to set the event DB eventID field. It defaults to 'event_id'.
func WithErrorEventInserter ¶
func WithErrorEventInserter(inserter ErrorEventInserter) ErrorsOption
WithErrorEventInserter provides an option to set the error event inserter which inserts error into a sql table. The default inserter would be the EventsTable.InsertWithMetadata function of a given EventsTable instance.
func WithErrorIDField ¶
func WithErrorIDField(field string) ErrorsOption
WithErrorIDField provides an option to set the event DB ID field. This is useful for tables which implement custom error loaders. It defaults to 'id'.
func WithErrorInserter ¶
func WithErrorInserter(inserter ErrorInserter) ErrorsOption
WithErrorInserter provides an option to set the error inserter which inserts error into a sql table. The default inserter would be generated from the rsql.makeDefaultErrorInserter function parameterised with the errTableSchema of the given ErrorsTable.
func WithErrorMsgField ¶
func WithErrorMsgField(field string) ErrorsOption
WithErrorMsgField provides an option to set the error DB msg field. It defaults to 'msg'.
func WithErrorRecordOnly ¶
func WithErrorRecordOnly() ErrorsOption
WithErrorRecordOnly provides an option to set that we only record errors and not enable streaming of those errors.
func WithErrorStatusField ¶
func WithErrorStatusField(field string) ErrorsOption
WithErrorStatusField provides an option to set the error status field. It defaults to 'status'.
func WithErrorTableName ¶
func WithErrorTableName(name string) ErrorsOption
WithErrorTableName provides an option to set the name of the consumer error table. It defaults to 'consumer_errors'.
func WithErrorUpdatedAtField ¶
func WithErrorUpdatedAtField(field string) ErrorsOption
WithErrorUpdatedAtField provides an option to set the error DB updated at timestamp field. It defaults to 'updated_at'.
type ErrorsTable ¶
type ErrorsTable struct {
// contains filtered or unexported fields
}
ErrorsTable provides reflex consumer event errors insertion and streaming for a sql db table.
func NewErrorsTable ¶
func NewErrorsTable(opts ...ErrorsOption) *ErrorsTable
NewErrorsTable returns a new event consumer errors table.
func (*ErrorsTable) ToErrorInsertFunc ¶
func (t *ErrorsTable) ToErrorInsertFunc(dbc *sql.DB) reflex.ErrorInsertFunc
type EventsNotifier ¶
type EventsNotifier interface { // StreamWatcher is passed as the default StreamWatcher every time stream() // is called on the EventsTable. StreamWatcher // Notify is called by reflex every time an event is inserted into the // EventsTable. Notify() }
EventsNotifier provides a way to receive notifications when an event is inserted in an EventsTable, and a way to trigger an EventsTable's StreamClients when there are new events available.
type EventsOption ¶
type EventsOption func(*EventsTable)
EventsOption defines a functional option to configure new event tables.
func WithEventForeignIDField ¶
func WithEventForeignIDField(field string) EventsOption
WithEventForeignIDField provides an option to set the event DB foreignID field. It defaults to 'foreign_id'.
func WithEventIDField ¶
func WithEventIDField(field string) EventsOption
WithEventIDField provides an option to set the event DB ID field. This is useful for tables which implement custom event loaders. It defaults to 'id'.
func WithEventMetadataField ¶
func WithEventMetadataField(field string) EventsOption
WithEventMetadataField provides an option to set the event DB metadata field. It is disabled by default; ie. ”.
func WithEventTimeField ¶
func WithEventTimeField(field string) EventsOption
WithEventTimeField provides an option to set the event DB timestamp field. It defaults to 'timestamp'.
func WithEventTraceField ¶
func WithEventTraceField(field string) EventsOption
WithEventTraceField provides an option to persist an opentelemetry trace through the events stream
func WithEventTypeField ¶
func WithEventTypeField(field string) EventsOption
WithEventTypeField provides an option to set the event DB type field. It defaults to 'type'.
func WithEventsBackoff ¶
func WithEventsBackoff(d time.Duration) EventsOption
WithEventsBackoff provides an option to set the backoff period between polling the DB for new events. It defaults to 10s.
func WithEventsCacheEnabled
deprecated
func WithEventsCacheEnabled() EventsOption
WithEventsCacheEnabled provides an option to enable the read-through cache on the events table.
Deprecated: Cache enabled by default.
func WithEventsInMemNotifier ¶
func WithEventsInMemNotifier() EventsOption
WithEventsInMemNotifier provides an option that enables an in-memory notifier.
Note: This can have a significant impact on database load if the cache is disabled since all consumers might query the database on every event.
func WithEventsInserter ¶
func WithEventsInserter(inserter inserter) EventsOption
WithEventsInserter provides an option to set the event inserter which inserts event into a sql table. The default inserter is configured with the WithEventsXField options.
func WithEventsLoader ¶
func WithEventsLoader(loader loader) EventsOption
WithEventsLoader provides an option to set the base event loader function. The base event loader loads events returns the next available events and the associated next cursor after the previous cursor or an error. The default loader is configured with the WithEventsXField options.
func WithEventsNotifier ¶
func WithEventsNotifier(notifier EventsNotifier) EventsOption
WithEventsNotifier provides an option to receive event notifications and trigger StreamClients when new events are available.
func WithIncludeNoopEvents ¶
func WithIncludeNoopEvents() EventsOption
WithIncludeNoopEvents noop events are not streamed by default. Use this option to enable the streaming of noop events
func WithoutEventsCache ¶
func WithoutEventsCache() EventsOption
WithoutEventsCache provides an option to disable the read-through cache on the events table.
type EventsTable ¶
type EventsTable struct {
// contains filtered or unexported fields
}
EventsTable provides reflex event insertion and streaming for a sql db table.
func NewEventsTable ¶
func NewEventsTable(name string, opts ...EventsOption) *EventsTable
NewEventsTable returns a new events table.
func (*EventsTable) Clone ¶
func (t *EventsTable) Clone(opts ...EventsOption) *EventsTable
Clone returns a new events table generated from the config of t with the new options applied. Note that non-config fields are not copied, so things like the cache and inmemnotifier are not shared.
func (*EventsTable) Insert ¶
func (t *EventsTable) Insert(ctx context.Context, tx *sql.Tx, foreignID string, typ reflex.EventType, ) (NotifyFunc, error)
Insert inserts an event into the EventsTable and returns a function that can be optionally called to notify the table's EventNotifier of the change. The intended pattern for this function is:
notify, err := eTable.Insert(ctx, tx, ...) if err != nil { return err } defer notify() return doWorkAndCommit(tx)
func (*EventsTable) InsertWithMetadata ¶
func (t *EventsTable) InsertWithMetadata(ctx context.Context, tx *sql.Tx, foreignID string, typ reflex.EventType, metadata []byte, ) (NotifyFunc, error)
InsertWithMetadata inserts an event with metadata into the EventsTable. Note metadata is disabled by default, enable with WithEventMetadataField option.
func (*EventsTable) ListenGaps ¶
func (t *EventsTable) ListenGaps(f GapListenFunc)
ListenGaps adds f to a slice of functions that are called when a gap is detected. On first call, it starts a goroutine that serves these functions.
func (*EventsTable) StopGapListener ¶
func (t *EventsTable) StopGapListener(ctx context.Context)
func (*EventsTable) Stream ¶
func (t *EventsTable) Stream(ctx context.Context, dbc *sql.DB, after string, opts ...reflex.StreamOption, ) reflex.StreamClient
Stream implements reflex.StreamFunc and returns a StreamClient that streams events from the db. It is only safe for a single goroutine to use.
func (*EventsTable) ToStream ¶
func (t *EventsTable) ToStream(dbc *sql.DB, opts1 ...reflex.StreamOption) reflex.StreamFunc
ToStream returns a reflex StreamFunc interface of this EventsTable.
type EventsTableInt ¶
type EventsTableInt struct {
*EventsTable
}
EventsTableInt wraps reflex EventsTable and provides typed int64 foreign id inserts.
func NewEventsTableInt ¶
func NewEventsTableInt(name string, options ...EventsOption) *EventsTableInt
NewEventsTableInt returns an event table which uses integers for the foreign IDs
func (*EventsTableInt) Clone ¶
func (e *EventsTableInt) Clone(opts ...EventsOption) *EventsTableInt
Clone works as EventsTable.Clone.
func (*EventsTableInt) Insert ¶
func (e *EventsTableInt) Insert(ctx context.Context, tx *sql.Tx, foreignID int64, typ reflex.EventType, ) (NotifyFunc, error)
Insert works as EventsTable.Insert except that foreign id is an int64.
func (*EventsTableInt) InsertWithMetadata ¶
func (e *EventsTableInt) InsertWithMetadata(ctx context.Context, tx *sql.Tx, foreignID int64, typ reflex.EventType, metadata []byte, ) (NotifyFunc, error)
InsertWithMetadata works as EventsTable.InsertWithMetadata except that foreign id is an int64.
type Gap ¶
Gap represents a gap in monotonically incrementing events IDs. The gap is after previous before next, so if Prev+1==Next, then there is no gap.
type GapListenFunc ¶
type GapListenFunc func(Gap)
type NotifyFunc ¶
type NotifyFunc func()
NotifyFunc notifies an events table's underlying EventsNotifier.
type StreamWatcher ¶
type StreamWatcher interface { // C returns a channel that blocks until the next event is available in the // StreamWatcher's EventsTable. C will be called every time a StreamClient // reaches the head of an events table. C() <-chan struct{} }
StreamWatcher provides the ability to trigger the streamer when new events are available.