Documentation ¶
Index ¶
- Variables
- func AcquireConn(ctx context.Context, db *sql.DB) (*sql.Conn, error)
- type AggregateProjector
- type AggregateProjectorStorage
- type EventStreamLoader
- type Execer
- type Listener
- type MessageFactory
- type Metrics
- type NotificationQueue
- func (nq *NotificationQueue) Empty() bool
- func (nq *NotificationQueue) Next(ctx context.Context) (*ProjectionNotification, bool)
- func (nq *NotificationQueue) Open() func()
- func (nq *NotificationQueue) Queue(ctx context.Context, notification *ProjectionNotification) error
- func (nq *NotificationQueue) ReQueue(ctx context.Context, notification *ProjectionNotification) error
- type NotificationQueuer
- type PersistenceStrategy
- type ProcessHandler
- type ProjectionErrorAction
- type ProjectionErrorCallback
- type ProjectionHandlerError
- type ProjectionNotification
- type ProjectionNotificationProcessor
- func (b *ProjectionNotificationProcessor) Execute(ctx context.Context, handler ProcessHandler, ...) error
- func (b *ProjectionNotificationProcessor) Queue(ctx context.Context, notification *ProjectionNotification) error
- func (b *ProjectionNotificationProcessor) Start(ctx context.Context, handler ProcessHandler) func()
- type ProjectionRawState
- type ProjectionState
- type ProjectionStateSerialization
- type ProjectionTrigger
- type ProjectorStorage
- type ProjectorTransaction
- type Queryer
- type ReadOnlyEventStore
- type StreamProjector
- type StreamProjectorStorage
Constants ¶
This section is empty.
Variables ¶
var ( // ErrConnFailedToAcquire occurs when a connection cannot be acquired within the timelimit ErrConnFailedToAcquire = errors.New("goengine: unable to acquire projection lock") // ErrProjectionFailedToLock occurs when the projector cannot acquire the projection lock ErrProjectionFailedToLock = errors.New("goengine: unable to acquire projection lock") // ErrProjectionPreviouslyLocked occurs when a projection was lock was acquired but a previous lock is still in place ErrProjectionPreviouslyLocked = errors.New("goengine: unable to lock projection due to a previous lock being in place") // ErrNoProjectionRequired occurs when a notification was being acquired but the projection was already at the indicated position ErrNoProjectionRequired = errors.New("goengine: no projection acquisition required") )
Functions ¶
Types ¶
type AggregateProjector ¶
AggregateProjector is a postgres projector used to execute a projection per aggregate instance against an event stream
func NewAggregateProjector ¶
func NewAggregateProjector( db *sql.DB, eventLoader EventStreamLoader, resolver goengine.MessagePayloadResolver, projection goengine.Projection, projectorStorage AggregateProjectorStorage, projectionErrorHandler ProjectionErrorCallback, logger goengine.Logger, metrics Metrics, retryDelay time.Duration, ) (*AggregateProjector, error)
NewAggregateProjector creates a new projector for a projection
func (*AggregateProjector) Run ¶
func (a *AggregateProjector) Run(ctx context.Context) error
Run executes the projection and manages the state of the projection
func (*AggregateProjector) RunAndListen ¶
func (a *AggregateProjector) RunAndListen(ctx context.Context, listener Listener) error
RunAndListen executes the projection and listens to any changes to the event store
type AggregateProjectorStorage ¶
type AggregateProjectorStorage interface { ProjectorStorage LoadOutOfSync(ctx context.Context, conn Queryer) (*sql.Rows, error) PersistFailure(conn Execer, notification *ProjectionNotification) error }
AggregateProjectorStorage the storage interface that will persist and load the projection state
type EventStreamLoader ¶
type EventStreamLoader func(ctx context.Context, conn *sql.Conn, notification *ProjectionNotification, position int64) (goengine.EventStream, error)
EventStreamLoader loads an event stream based on the provided notification and state
func AggregateProjectionEventStreamLoader ¶
func AggregateProjectionEventStreamLoader(eventStore ReadOnlyEventStore, streamName goengine.StreamName, aggregateTypeName string) EventStreamLoader
AggregateProjectionEventStreamLoader returns a EventStreamLoader for the AggregateProjector
func StreamProjectionEventStreamLoader ¶
func StreamProjectionEventStreamLoader(eventStore ReadOnlyEventStore, streamName goengine.StreamName) EventStreamLoader
StreamProjectionEventStreamLoader returns a EventStreamLoader for the StreamProjector
type Execer ¶
type Execer interface {
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
}
Execer a interface used to execute a query on a sql.DB, sql.Conn or sql.Tx
type Listener ¶
type Listener interface { // Listen starts listening to the event stream and call the trigger when an event was appended Listen(ctx context.Context, trigger ProjectionTrigger) error }
Listener listens to an event stream and triggers a notification when an event was appended
type MessageFactory ¶
type MessageFactory interface { // CreateEventStream reconstructs the message from the provided rows CreateEventStream(rows *sql.Rows) (goengine.EventStream, error) }
MessageFactory reconstruct messages from the database
type Metrics ¶
type Metrics interface { // ReceivedNotification sends the metric to keep count of notifications received by goengine ReceivedNotification(isNotification bool) // QueueNotification is called when a notification is queued. // It saves start time for an event on aggregate when it's queued QueueNotification(notification *ProjectionNotification) // StartNotificationProcessing is called when a notification processing is started // It saves start time for an event on aggregate when it's picked to be processed by background processor StartNotificationProcessing(notification *ProjectionNotification) // FinishNotificationProcessing is called when a notification processing is finished // It actually sends metrics calculating duration for which a notification spends in queue and then processed by background processor FinishNotificationProcessing(notification *ProjectionNotification, success bool) }
Metrics a structured metrics interface
var NopMetrics Metrics = &nopMetrics{}
NopMetrics is default Metrics handler in case nil is passed
type NotificationQueue ¶
type NotificationQueue struct {
// contains filtered or unexported fields
}
NotificationQueue implements a smart queue
func (*NotificationQueue) Empty ¶
func (nq *NotificationQueue) Empty() bool
Empty returns whether the queue is empty
func (*NotificationQueue) Next ¶
func (nq *NotificationQueue) Next(ctx context.Context) (*ProjectionNotification, bool)
Next yields the next notification on the queue or stopped when processor has stopped
func (*NotificationQueue) Open ¶
func (nq *NotificationQueue) Open() func()
Open enables the queue for business
func (*NotificationQueue) Queue ¶
func (nq *NotificationQueue) Queue(ctx context.Context, notification *ProjectionNotification) error
Queue sends a notification to the queue
func (*NotificationQueue) ReQueue ¶
func (nq *NotificationQueue) ReQueue(ctx context.Context, notification *ProjectionNotification) error
ReQueue sends a notification to the queue after setting the ValidAfter property
type NotificationQueuer ¶
type NotificationQueuer interface { Open() func() Empty() bool Next(context.Context) (*ProjectionNotification, bool) Queue(context.Context, *ProjectionNotification) error ReQueue(context.Context, *ProjectionNotification) error }
NotificationQueuer describes a smart queue for projection notifications
type PersistenceStrategy ¶
type PersistenceStrategy interface { CreateSchema(tableName string) []string // EventColumnNames represent the event store columns selected from the event stream table. Used by PrepareSearch EventColumnNames() []string // InsertColumnNames represent the ordered event store columns that are used to insert data into the event stream. InsertColumnNames() []string PrepareData([]goengine.Message) ([]interface{}, error) PrepareSearch(metadata.Matcher) ([]byte, []interface{}) GenerateTableName(streamName goengine.StreamName) (string, error) }
PersistenceStrategy interface describes strategy of persisting messages in the database
type ProcessHandler ¶
type ProcessHandler func(context.Context, *ProjectionNotification, ProjectionTrigger) error
ProcessHandler is a func used to trigger a notification but with the addition of providing a Trigger func so the original notification can trigger other notifications
type ProjectionErrorAction ¶
type ProjectionErrorAction int
ProjectionErrorAction a type containing the action that the projector should take after an error
const ( // ProjectionFail indicated that the projection failed and cannot be recovered // This means that a human as a service is needed ProjectionFail ProjectionErrorAction = iota // ProjectionRetry indicated that the notification should be retried // This can be used in combination with retry mechanism in the ProjectorErrorCallback ProjectionRetry ProjectionErrorAction = iota // ProjectionIgnoreError indicated that the projection failed but the failure can be ignored // This can be used when the ProjectorErrorCallback recovered the system from the error ProjectionIgnoreError ProjectionErrorAction = iota )
type ProjectionErrorCallback ¶
type ProjectionErrorCallback func(err error, notification *ProjectionNotification) ProjectionErrorAction
ProjectionErrorCallback is a function used to determine what action to take based on a failed projection
type ProjectionHandlerError ¶
type ProjectionHandlerError struct {
// contains filtered or unexported fields
}
ProjectionHandlerError an error indicating that a projection handler failed
func NewProjectionHandlerError ¶
func NewProjectionHandlerError(err error) *ProjectionHandlerError
NewProjectionHandlerError return a ProjectionHandlerError with the cause being the provided error
func (*ProjectionHandlerError) Cause ¶
func (e *ProjectionHandlerError) Cause() error
Cause returns the actual projection errors. This also adds support for github.com/pkg/errors.Cause
func (*ProjectionHandlerError) Error ¶
func (e *ProjectionHandlerError) Error() string
Error return the error message
type ProjectionNotification ¶
type ProjectionNotification struct { No int64 `json:"no"` AggregateID string `json:"aggregate_id"` ValidAfter time.Time `json:"valid_after"` }
ProjectionNotification is a representation of the data provided by database notify
func (*ProjectionNotification) MarshalEasyJSON ¶
func (p *ProjectionNotification) MarshalEasyJSON(w *jwriter.Writer)
MarshalEasyJSON supports easyjson.Marshaler interface
func (*ProjectionNotification) UnmarshalEasyJSON ¶
func (p *ProjectionNotification) UnmarshalEasyJSON(in *jlexer.Lexer)
UnmarshalEasyJSON supports easyjson.Unmarshaler interface
func (*ProjectionNotification) UnmarshalJSON ¶
func (p *ProjectionNotification) UnmarshalJSON(data []byte) error
UnmarshalJSON supports json.Unmarshaler interface
type ProjectionNotificationProcessor ¶
type ProjectionNotificationProcessor struct {
// contains filtered or unexported fields
}
ProjectionNotificationProcessor provides a way to Trigger a notification using a set of background processes.
func NewBackgroundProcessor ¶
func NewBackgroundProcessor( queueProcessors, queueBuffer int, logger goengine.Logger, metrics Metrics, notificationQueue NotificationQueuer, ) (*ProjectionNotificationProcessor, error)
NewBackgroundProcessor create a new projectionNotificationProcessor
func (*ProjectionNotificationProcessor) Execute ¶
func (b *ProjectionNotificationProcessor) Execute(ctx context.Context, handler ProcessHandler, notification *ProjectionNotification) error
Execute starts the background worker and wait for the notification to be executed
func (*ProjectionNotificationProcessor) Queue ¶
func (b *ProjectionNotificationProcessor) Queue(ctx context.Context, notification *ProjectionNotification) error
Queue puts the notification on the queue to be processed
func (*ProjectionNotificationProcessor) Start ¶
func (b *ProjectionNotificationProcessor) Start(ctx context.Context, handler ProcessHandler) func()
Start starts the background processes that will call the ProcessHandler based on the notification queued by Exec
type ProjectionRawState ¶
ProjectionRawState the raw projection projectionState returned by ProjectorStorage.Acquire
type ProjectionState ¶
type ProjectionState struct { Position int64 ProjectionState interface{} }
ProjectionState is a projection projectionState
type ProjectionStateSerialization ¶
type ProjectionStateSerialization interface { // Init initializes the state Init(ctx context.Context) (interface{}, error) // DecodeState reconstitute the projection state based on the provided state data DecodeState(data []byte) (interface{}, error) // EncodeState encode the given object for storage EncodeState(obj interface{}) ([]byte, error) }
ProjectionStateSerialization is an interface describing how a projection state can be initialized, serialized/encoded anf deserialized/decoded
func GetProjectionStateSerialization ¶
func GetProjectionStateSerialization(projection goengine.Projection) ProjectionStateSerialization
GetProjectionStateSerialization returns a ProjectionStateSerialization based on the provided projection
type ProjectionTrigger ¶
type ProjectionTrigger func(ctx context.Context, notification *ProjectionNotification) error
ProjectionTrigger triggers the notification for processing
type ProjectorStorage ¶
type ProjectorStorage interface { // Acquire this function is used to acquire the projection and its projectionState. // A projection can only be acquired once and must be released using the returned func Acquire(ctx context.Context, conn *sql.Conn, notification *ProjectionNotification) (ProjectorTransaction, int64, error) }
ProjectorStorage is an interface for handling the projection storage
type ProjectorTransaction ¶
type ProjectorTransaction interface { AcquireState(ctx context.Context) (ProjectionState, error) CommitState(ProjectionState) error Close() error }
ProjectorTransaction is a transaction type object returned by the ProjectorStorage
type Queryer ¶
type Queryer interface {
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
}
Queryer an interface used to query a sql.DB, sql.Conn or sql.Tx
type ReadOnlyEventStore ¶
type ReadOnlyEventStore interface { // LoadWithConnection returns an eventstream based on the provided constraints using the provided Queryer LoadWithConnection(ctx context.Context, conn Queryer, streamName goengine.StreamName, fromNumber int64, count *uint, metadataMatcher metadata.Matcher) (goengine.EventStream, error) }
ReadOnlyEventStore an interface describing a readonly event store that supports providing a SQL conn
type StreamProjector ¶
StreamProjector is a postgres projector used to execute a projection against an event stream.
func NewStreamProjector ¶
func NewStreamProjector( db *sql.DB, eventLoader EventStreamLoader, resolver goengine.MessagePayloadResolver, projection goengine.Projection, projectorStorage StreamProjectorStorage, projectionErrorHandler ProjectionErrorCallback, logger goengine.Logger, ) (*StreamProjector, error)
NewStreamProjector creates a new projector for a projection
func (*StreamProjector) Run ¶
func (s *StreamProjector) Run(ctx context.Context) error
Run executes the projection and manages the state of the projection
func (*StreamProjector) RunAndListen ¶
func (s *StreamProjector) RunAndListen(ctx context.Context, listener Listener) error
RunAndListen executes the projection and listens to any changes to the event store
type StreamProjectorStorage ¶
type StreamProjectorStorage interface { ProjectorStorage CreateProjection(ctx context.Context, conn Execer) error }
StreamProjectorStorage the storage interface that will persist and load the projection state