Versions in this module Expand all Collapse all v2 v2.1.0 Sep 15, 2022 v2.0.0 Sep 14, 2021 Changes in this version + var ErrConnFailedToAcquire = errors.New("goengine: unable to acquire projection lock") + var ErrNoProjectionRequired = errors.New("goengine: no projection acquisition required") + var ErrProjectionFailedToLock = errors.New("goengine: unable to acquire projection lock") + var ErrProjectionPreviouslyLocked = errors.New("goengine: unable to lock projection due to a previous lock being in place") + func AcquireConn(ctx context.Context, db *sql.DB) (*sql.Conn, error) + type AggregateProjector struct + func NewAggregateProjector(db *sql.DB, eventLoader EventStreamLoader, ...) (*AggregateProjector, error) + func (a *AggregateProjector) Run(ctx context.Context) error + func (a *AggregateProjector) RunAndListen(ctx context.Context, listener Listener) error + type AggregateProjectorStorage interface + LoadOutOfSync func(ctx context.Context, conn Queryer) (*sql.Rows, error) + PersistFailure func(conn Execer, notification *ProjectionNotification) error + type EventStreamLoader func(ctx context.Context, conn *sql.Conn, notification *ProjectionNotification, ...) (goengine.EventStream, error) + func AggregateProjectionEventStreamLoader(eventStore ReadOnlyEventStore, streamName goengine.StreamName, ...) EventStreamLoader + func StreamProjectionEventStreamLoader(eventStore ReadOnlyEventStore, streamName goengine.StreamName) EventStreamLoader + type Execer interface + ExecContext func(ctx context.Context, query string, args ...interface{}) (sql.Result, error) + type Listener interface + Listen func(ctx context.Context, trigger ProjectionTrigger) error + type MessageFactory interface + CreateEventStream func(rows *sql.Rows) (goengine.EventStream, error) + type Metrics interface + FinishNotificationProcessing func(notification *ProjectionNotification, success bool) + QueueNotification func(notification *ProjectionNotification) + ReceivedNotification func(isNotification bool) + StartNotificationProcessing func(notification *ProjectionNotification) + var NopMetrics Metrics = &nopMetrics{} + type NotificationQueue struct + func (nq *NotificationQueue) Empty() bool + func (nq *NotificationQueue) Next(ctx context.Context) (*ProjectionNotification, bool) + func (nq *NotificationQueue) Open() func() + func (nq *NotificationQueue) Queue(ctx context.Context, notification *ProjectionNotification) error + func (nq *NotificationQueue) ReQueue(ctx context.Context, notification *ProjectionNotification) error + type NotificationQueuer interface + Empty func() bool + Next func(context.Context) (*ProjectionNotification, bool) + Open func() func() + Queue func(context.Context, *ProjectionNotification) error + ReQueue func(context.Context, *ProjectionNotification) error + type PersistenceStrategy interface + CreateSchema func(tableName string) []string + EventColumnNames func() []string + GenerateTableName func(streamName goengine.StreamName) (string, error) + InsertColumnNames func() []string + PrepareData func([]goengine.Message) ([]interface{}, error) + PrepareSearch func(metadata.Matcher) ([]byte, []interface{}) + type ProcessHandler func(context.Context, *ProjectionNotification, ProjectionTrigger) error + type ProjectionErrorAction int + const ProjectionFail + const ProjectionIgnoreError + const ProjectionRetry + type ProjectionErrorCallback func(err error, notification *ProjectionNotification) ProjectionErrorAction + type ProjectionHandlerError struct + func NewProjectionHandlerError(err error) *ProjectionHandlerError + func (e *ProjectionHandlerError) Cause() error + func (e *ProjectionHandlerError) Error() string + type ProjectionNotification struct + AggregateID string + No int64 + ValidAfter time.Time + func (p *ProjectionNotification) MarshalEasyJSON(w *jwriter.Writer) + func (p *ProjectionNotification) UnmarshalEasyJSON(in *jlexer.Lexer) + func (p *ProjectionNotification) UnmarshalJSON(data []byte) error + type ProjectionNotificationProcessor struct + func NewBackgroundProcessor(queueProcessors, queueBuffer int, logger goengine.Logger, metrics Metrics, ...) (*ProjectionNotificationProcessor, error) + func (b *ProjectionNotificationProcessor) Execute(ctx context.Context, handler ProcessHandler, ...) error + func (b *ProjectionNotificationProcessor) Queue(ctx context.Context, notification *ProjectionNotification) error + func (b *ProjectionNotificationProcessor) Start(ctx context.Context, handler ProcessHandler) func() + type ProjectionRawState struct + Position int64 + ProjectionState []byte + type ProjectionState struct + Position int64 + ProjectionState interface{} + type ProjectionStateSerialization interface + DecodeState func(data []byte) (interface{}, error) + EncodeState func(obj interface{}) ([]byte, error) + Init func(ctx context.Context) (interface{}, error) + func GetProjectionStateSerialization(projection goengine.Projection) ProjectionStateSerialization + type ProjectionTrigger func(ctx context.Context, notification *ProjectionNotification) error + type ProjectorStorage interface + Acquire func(ctx context.Context, conn *sql.Conn, notification *ProjectionNotification) (ProjectorTransaction, int64, error) + type ProjectorTransaction interface + AcquireState func(ctx context.Context) (ProjectionState, error) + Close func() error + CommitState func(ProjectionState) error + type Queryer interface + QueryContext func(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) + type ReadOnlyEventStore interface + LoadWithConnection func(ctx context.Context, conn Queryer, streamName goengine.StreamName, ...) (goengine.EventStream, error) + type StreamProjector struct + func NewStreamProjector(db *sql.DB, eventLoader EventStreamLoader, ...) (*StreamProjector, error) + func (s *StreamProjector) Run(ctx context.Context) error + func (s *StreamProjector) RunAndListen(ctx context.Context, listener Listener) error + type StreamProjectorStorage interface + CreateProjection func(ctx context.Context, conn Execer) error Other modules containing this package github.com/hellofresh/goengine