reflex

package module
v0.0.0-...-361bf8e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 4, 2024 License: MIT Imports: 15 Imported by: 60

README

Reflex

GoQuality Gate Status Coverage Bugs Security Rating Vulnerabilities Go Report Card GoDoc

reflex /ˈriːflɛks/

  1. an action that is performed without conscious thought as a response to a stimulus. "the system is equipped with ninja-like reflexes"
  2. a thing which is determined by and reproduces the essential features or qualities of something else. "business logic is no more than a reflex of user actions"

Reflex provides an API for building distributed event notification streams.

Note reflex is used in production at Luno, but still undergoing active development, breaking changes are on the way.

Overview

logic := func(ctx context.Context, f fate.Fate, e *reflex.Event) error {
	log.Printf("Consumed event: %#v", e)
	return f.Tempt()
}
consumer := reflex.NewConsumer("logging-consumer", logic)

spec := reflex.NewSpec(streamFunc, cursorStore, consumer)

for {
	err := reflex.Run(context.Background(), spec)
	log.Printf("Consume error: %v", err)
}

Consumer encapsulates the business logic triggered on each event. It has of a name used for cursor persistence and metrics.

StreamFunc is the event source providing event streams from an offset (cursor).

CursorStore provides cursor persistence; GetCursor on start and SetCursor on successfully consumed events.

Spec combines all three above elements required to stream and consume reflex events. It is passed to reflex.Run which streams events from the source to the consumer updating the cursor on success.

See StreamFunc and CursorStore implementations below.

Characteristics

Events are primarily state change notifications

  • Event.ID is the unique event identifier. It is used as the cursor.
  • Event.Type is an enum indicating the type of state change; eg. UserCreated, TradeCompleted,EmailUpdated.
  • Event.ForeignID points to the associated (mutable) entity.
  • Event.Timestamp provides the time the event occurred.
  • Event.Metadata provides custom unstructured event metadata.
  • Event persistence implementations must provide exactly-once or at-least-once semantics.
  • It is not designed as "Event Sourcing" but rather "Event Notification" see this video.

The event store providing the StreamFunc API is inspired by Kafka partitions

  • It stores events as immutable ordered log.
  • It must be queryable by offset (cursor, event ID).
  • It must support independent concurrent stream queries with different offsets.
  • It must retain events for reasonable amount of time (days).
  • It should be responsive, new events should stream within milliseconds.

Errors always result in the consumer getting stuck (failing fast)

  • On any error, the cursor will not be updated and the reflex.Run function will return.
  • In the case of transient errors (network, io, shutdown, etc), merely re-calling Run will succeed (at some point).
  • In the case of logic or data errors, it is up to the user to either fix the bug or catch and ignore the error.

It is designed for micro-services

  • gRPC implementations are provided for StreamFunc.
  • This allows peer-to-peer event streaming without a central event bus.
  • It allows encapsulating events behind a API; #microservices_own_their_own_data

It is composable

  • CursorStore and StreamFunc are decoupled and data source/store agnostic.
  • This results in multiple types of Specs, including:
    • rsql CursorStore with gRPC StreamFunc (remove service events and local mysql cursors)
    • rsql CursorStore and rsql StreamFunc (local mysql events and cursors)
    • remote CursorStore with gRPC StreamFunc (remove service events and remote cursors)
Sources and implementations

The github.com/luno/reflex package provides the main framework API with types and interfaces.

The github.com/luno/reflex/rpatterns package provides patterns for common reflex use-cases.

The following packages provide reflex.StreamFunc event stream source implementations:

The following packages provide reflex.CursorStore cursor store implementations:

Documentation

Overview

Package reflex provides events (state change notifications) for mutable sql tables. It also supports streaming consumption of these events both directly locally or via gRPC. It therefore aims to provide streams of sql events similar to other database streaming features, like DynamoDB Streams, except that these events do not contain any data, but are merely pointers.

Reflex events are pointers to state changes of mutable data.

event_id string        // unique id of the event
type enum              // state change type
foreign_id string      // id of the mutable datum
timestamp timestamp    // timestamp of the event

Events are inserted as part of sql transactions that modify the data. This ensures strong data consistency; exactly one event per state change.

The EventsTable wraps a event DB table allowing insertion of events. The canonical pattern is to define an EventsTable per mutable data table. Ex. users and user_events, payments and payment_events. Note that event insertion is generally defined on the data layer, not business logic layer.

The datum (data entity) referred to and state change performed on it are implied by the event source and type. Ex. An event produced by the "user service" with type "UserCreated", or an event by the "auth service" with type "APIKeyVerified".

The EventsTable also provides event streams from a arbitrary point in the past. Reflex also supports exposing these event streams via gRPC.

Consumers consume the event stream keeping a cursor of the last event. This ensures at-least-once data consistency. Consumers therefore need to persist their cursor.

The CursorsTable wraps a DB table allowing getting and setting of cursors. The canonical pattern is to define one cursor table per service.

There are two ways to consume event streams:

  1. EventTable + CursorsTable: The consumer logic has local access to both the events and consumers tables. Ex. User service sends an email on UserCreated.

  2. gRPC stream + CursorsTable: The consumer logic has access to a local CursorsTable, but requests the event stream from a remote service via gRPC. Ex. The Fraud service consumes PaymentCreated events from the payments service. It has its own DB and CursorsTable.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrStopped is returned when an events table is stopped, usually
	// when the grpc server is stopped. Clients should check for this error
	// and reconnect.
	ErrStopped = errors.New("the event stream has been stopped", j.C("ERR_09290f5944cb8671"))

	// ErrHeadReached is returned when the events table reaches the end of
	// the stream of events.
	ErrHeadReached = errors.New("the event stream has reached the current head", j.C("ERR_b4b155d2a91cfcd0"))
)

Functions

func IsAnyType

func IsAnyType(source EventType, targets ...EventType) bool

IsAnyType returns true if the source reflex type equals any of the target types.

func IsExpected

func IsExpected(err error) bool

IsExpected returns true if the error is expected during normal streaming operation.

func IsFilterErr

func IsFilterErr(err error) bool

IsFilterErr returns true if the error occurred during Event filtering operations.

func IsHeadReachedErr

func IsHeadReachedErr(err error) bool

IsHeadReachedErr checks whether err is an ErrHeadReached

func IsStoppedErr

func IsStoppedErr(err error) bool

IsStoppedErr checks whether err is an ErrStopped

func IsType

func IsType(source, target EventType) bool

IsType returns true if the source reflex type equals the target type.

func Run

func Run(in context.Context, s Spec) error

Run executes the spec by streaming events from the current cursor, feeding each into the consumer and updating the cursor on success. It always returns a non-nil error. Cancel the context to return early.

Types

type Consumable

type Consumable interface {
	// Consume blocks while events are streamed to consumer. It always returns a non-nil error.
	// Cancel the context to return early.
	Consume(context.Context, Consumer, ...StreamOption) error
}

Consumable is an interface for an object that provides a ConsumeFunc with the name Run. Deprecated: Please use Spec.

func NewConsumable

func NewConsumable(sFn StreamFunc, cstore CursorStore, opts ...StreamOption) Consumable

NewConsumable creates a new Consumable Deprecated: Please use Run.

type ConsumeFunc

type ConsumeFunc func(context.Context, Consumer, ...StreamOption) error

ConsumeFunc is the main reflex consume interface. It blocks while events are streamed to consumer. It always returns a non-nil error. Cancel the context to return early. Deprecated: Please use Spec.

type Consumer

type Consumer interface {
	Name() string
	Consume(context.Context, *Event) error
}

Consumer represents a piece of business logic that consumes events. It consists of a name and the consume logic. Consumer logic should be idempotent since reflex provides at-least-once event delivery.

func NewConsumer

func NewConsumer(name string, fn ConsumerFunc,
	opts ...ConsumerOption,
) Consumer

NewConsumer returns a new instrumented consumer of events.

type ConsumerError

type ConsumerError struct {
	ID           string
	ConsumerName string
	EventID      string
	Message      string
	CreatedAt    time.Time
	UpdatedAt    time.Time
	Status       ErrorStatus
}

ConsumerError is a record of a reflex event consumer error.

func (*ConsumerError) EventIDInt

func (e *ConsumerError) EventIDInt() int64

EventIDInt returns the event id as an int64 or 0 if it is not an integer.

func (*ConsumerError) IDInt

func (e *ConsumerError) IDInt() int64

IDInt returns the event id as an int64 or 0 if it is not an integer.

func (*ConsumerError) IsEventIDInt

func (e *ConsumerError) IsEventIDInt() bool

IsEventIDInt returns true if the event id is an integer.

func (*ConsumerError) IsIDInt

func (e *ConsumerError) IsIDInt() bool

IsIDInt returns true if the event id is an integer.

type ConsumerFunc

type ConsumerFunc func(context.Context, *Event) error

type ConsumerOption

type ConsumerOption func(*consumer)

ConsumerOption will change the behaviour of the consumer

func WithConsumerActivityTTL

func WithConsumerActivityTTL(ttl time.Duration) ConsumerOption

WithConsumerActivityTTL provides an option to set the consumer activity metric ttl; ie. if no events is consumed in `tll` duration the consumer is considered inactive.

func WithConsumerActivityTTLFunc

func WithConsumerActivityTTLFunc(ttl func() time.Duration) ConsumerOption

WithConsumerActivityTTLFunc is similar to WithConsumerActivityTTL but accepts a function that returns the TTL.

func WithConsumerLagAlert

func WithConsumerLagAlert(d time.Duration) ConsumerOption

WithConsumerLagAlert provides an option to set the consumer lag alert threshold.

func WithConsumerLagAlertGauge

func WithConsumerLagAlertGauge(g prometheus.Gauge) ConsumerOption

WithConsumerLagAlertGauge provides an option to set the consumer lag alert gauge. Handy for custom alert metadata as labels.

func WithEventFilter

func WithEventFilter(flt EventFilter) ConsumerOption

WithEventFilter provides an option to specify which Event values a consumer is interested in. For uninteresting events Consume is never called, and a skipped metric is incremented.

func WithFilterIncludeTypes

func WithFilterIncludeTypes(evts ...EventType) ConsumerOption

WithFilterIncludeTypes provides an option to specify which EventTypes a consumer is interested in. For uninteresting events Consume is never called, and a skipped metric is incremented.

func WithRecoverFunction

func WithRecoverFunction(rfn RecoveryFunc) ConsumerOption

WithRecoverFunction provides an option to specify a recovery function to be called when a consuming an event returns an error and potentially changing the error returned or even eliminate it by return nil. It can also be used for notification and/or recording of events that failed to process successfully.

func WithoutConsumerActivityTTL

func WithoutConsumerActivityTTL() ConsumerOption

WithoutConsumerActivityTTL provides an option to disable the consumer activity metric ttl.

func WithoutConsumerLag

func WithoutConsumerLag() ConsumerOption

WithoutConsumerLag provides an option to disable the consumer lag alert.

type CursorStore

type CursorStore interface {
	// GetCursor returns the consumers cursor, it returns an empty string if no cursor exists.
	GetCursor(ctx context.Context, consumerName string) (string, error)

	// SetCursor stores the consumers cursor. Note some implementation may buffer writes.
	SetCursor(ctx context.Context, consumerName string, cursor string) error

	// Flush writes any buffered cursors to the underlying store.
	Flush(ctx context.Context) error
}

CursorStore is an interface used to persist consumer offsets in a stream.

type ErrorInsertFunc

type ErrorInsertFunc func(ctx context.Context, consumerName string, eventID string, errMsg string) error

ErrorInsertFunc abstracts the insertion of an event into a sql table.

type ErrorStatus

type ErrorStatus int

ErrorStatus is the current status of a consumer error.

const (
	UnknownEventError ErrorStatus = 0
	// EventErrorRecorded - New errors should be saved in this state [initial]
	EventErrorRecorded ErrorStatus = 1
)

func (ErrorStatus) ReflexType

func (e ErrorStatus) ReflexType() int

func (ErrorStatus) ShiftStatus

func (e ErrorStatus) ShiftStatus() int

type Event

type Event struct {
	ID        string
	Type      EventType
	ForeignID string
	Timestamp time.Time
	MetaData  []byte
	Trace     []byte
}

Event is the reflex event. It is an immutable notification event that indicates that a change of a some type relating a foreign entity happened at a specific time. It may also contain metadata relating to the change.

func (*Event) ForeignIDInt

func (e *Event) ForeignIDInt() int64

ForeignIDInt returns the foreign id as an int64 or 0 if it is not an integer.

func (*Event) IDInt

func (e *Event) IDInt() int64

IDInt returns the event id as an int64 or 0 if it is not an integer.

func (*Event) IsForeignIDInt

func (e *Event) IsForeignIDInt() bool

IsForeignIDInt returns true if the foreign id is an integer.

func (*Event) IsIDInt

func (e *Event) IsIDInt() bool

IsIDInt returns true if the event id is an integer.

type EventFilter

type EventFilter func(event *Event) (bool, error)

EventFilter takes an Event and returns true if it should be allowed to be processed or false if it shouldn't. It can error if it fails to determine if the event should be processed. Please note it is expected that the func should return promptly and as such other than smaller in memory transforms/extractions it should not be making any I/O or significant API calls (especially remote ones) as the expectation is that the only data needed will be on the event itself.

type EventType

type EventType interface {
	// ReflexType returns the type as an int.
	ReflexType() int
}

EventType is an interface for enums that act as reflex event types.

type RecoveryFunc

type RecoveryFunc func(ctx context.Context, ev *Event, consumer Consumer, err error) error

RecoveryFunc is a function that can be added as a ConsumerOption using the WithRecoverFunction function to provide handling for when a consumer function returns an error. This handling can just be recording the error or since it takes in the error and returns an error as well it can return nil to "recover" from the error (additional work may obviously be needed to do any actual recovery), return the same error if it could not be handled or even return a different error.

type ResetterCtx

type ResetterCtx interface {
	Reset(context.Context) error
}

ResetterCtx is an optional interface that a consumer can implement indicating that it is stateful and requires resetting at the start of each Run.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server provides stream, consume and graceful shutdown functionality for use in a gRPC server.

func NewServer

func NewServer() *Server

NewServer returns a new server.

func (*Server) Stop

func (s *Server) Stop()

Stop stops serving gRPC stream and consume methods returning ErrStopped. It should be used for graceful shutdown. It panics if called more than once.

func (*Server) Stream

func (s *Server) Stream(sFn StreamFunc, req *reflexpb.StreamRequest, sspb streamServerPB) error

Stream streams events for a gRPC stream method. It always returns a non-nil error. It returns ErrStopped if the server is stopped. Note that back pressure is achieved by gRPC Streams' 64KB send and receive buffers. Note that gRPC does not guarantee buffered messages being sent on the wire, see https://github.com/grpc/grpc-go/issues/2159

type Spec

type Spec struct {
	// contains filtered or unexported fields
}

Spec specifies all the elements required to stream and consume reflex events for a specific purpose. StreamFunc is the source of the events. Consumer is the business logic consuming the events. CursorStore persists a cursor of consumed events. As long as the elements do not change the consumer is guaranteed at-least-once delivery of all events in the stream.

func NewSpec

func NewSpec(stream StreamFunc, cStore CursorStore, consumer Consumer,
	opts ...StreamOption,
) Spec

NewSpec returns a new Spec.

func (Spec) Name

func (req Spec) Name() string

Name returns the name of the spec which is the name of the consumer.

func (Spec) Stop

func (req Spec) Stop() error

Stop stops the spec's consumer.

type Stopper

type Stopper interface {
	Stop() error
}

Stopper is an optional interface that a consumer can implement indicating that it has clean up work to do at the end of each Run.

type StreamClient

type StreamClient interface {
	// Recv blocks until the next event is found. Either the event or error is non-nil.
	Recv() (*Event, error)
}

StreamClient is a stream interface providing subsequent events on calls to Recv.

type StreamClientPB

type StreamClientPB interface {
	Recv() (*reflexpb.Event, error)
}

StreamClientPB defines a common interface for reflex stream gRPC generated implementations.

type StreamFunc

type StreamFunc func(ctx context.Context, after string, opts ...StreamOption) (StreamClient, error)

StreamFunc is the main reflex stream interface that all implementations should provide. It returns a long-lived StreamClient that will stream events from the source.

func NewMockStream

func NewMockStream(events []*Event, endErr error) StreamFunc

NewMockStream stream options will not work with a mock stream, it will just return the list of events provided. Purely meant for testing.

func WrapStreamPB

func WrapStreamPB(wrap func(context.Context, *reflexpb.StreamRequest) (
	StreamClientPB, error),
) StreamFunc

WrapStreamPB wraps a gRPC client's stream method and returns a StreamFunc.

type StreamOption

type StreamOption func(*StreamOptions)

StreamOption defines a functional option that configures StreamOptions.

func WithStreamFromHead

func WithStreamFromHead() StreamOption

WithStreamFromHead provides an option to stream only new events from the head of events table. Note this overrides the "after" parameter.

func WithStreamLag

func WithStreamLag(d time.Duration) StreamOption

WithStreamLag provides an option to stream events only after they are older than a duration.

func WithStreamToHead

func WithStreamToHead() StreamOption

WithStreamToHead provides an option to return ErrHeadReached as soon as no more events are available. This is useful for testing or back-fills.

type StreamOptions

type StreamOptions struct {
	// Lag defines the duration after an event is created before it becomes
	// eligible for streaming.
	Lag time.Duration

	// StreamFromHead defines that the initial event be retrieved
	// from the head of the evens table.
	StreamFromHead bool

	// StreamToHead defines that ErrHeadReached be returned as soon
	// as no more events are available.
	StreamToHead bool
}

StreamOptions provide options sent to the event stream source.

func ResolveOptions

func ResolveOptions(options ...StreamOption) StreamOptions

ResolveOptions converts a list of options to the StreamOptions struct

Directories

Path Synopsis
_example
internal/db
Package db implements common DB functions for the reflex example servers.
Package db implements common DB functions for the reflex example servers.
internal
Package rblob leverages the gocloud.dev/blob package and provides a reflex stream for events persisted in a bucket of strictly ordered append-only log of flat files.
Package rblob leverages the gocloud.dev/blob package and provides a reflex stream for events persisted in a bucket of strictly ordered append-only log of flat files.
Package rsql provides reflex event stream and cursor table implementations for mysql.
Package rsql provides reflex event stream and cursor table implementations for mysql.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL