sql

package
v0.11.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 25, 2021 License: MIT Imports: 15 Imported by: 6

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrConnFailedToAcquire occurs when a connection cannot be acquired within the timelimit
	ErrConnFailedToAcquire = errors.New("goengine: unable to acquire projection lock")
	// ErrProjectionFailedToLock occurs when the projector cannot acquire the projection lock
	ErrProjectionFailedToLock = errors.New("goengine: unable to acquire projection lock")
	// ErrProjectionPreviouslyLocked occurs when a projection was lock was acquired but a previous lock is still in place
	ErrProjectionPreviouslyLocked = errors.New("goengine: unable to lock projection due to a previous lock being in place")
	// ErrNoProjectionRequired occurs when a notification was being acquired but the projection was already at the indicated position
	ErrNoProjectionRequired = errors.New("goengine: no projection acquisition required")
)

Functions

func AcquireConn

func AcquireConn(ctx context.Context, db *sql.DB) (*sql.Conn, error)

AcquireConn will return a new connection

Types

type AggregateProjector

type AggregateProjector struct {
	sync.Mutex
	// contains filtered or unexported fields
}

AggregateProjector is a postgres projector used to execute a projection per aggregate instance against an event stream

func NewAggregateProjector

func NewAggregateProjector(
	db *sql.DB,
	eventLoader EventStreamLoader,
	resolver goengine.MessagePayloadResolver,
	projection goengine.Projection,
	projectorStorage AggregateProjectorStorage,
	projectionErrorHandler ProjectionErrorCallback,
	logger goengine.Logger,
	metrics Metrics,
	retryDelay time.Duration,
) (*AggregateProjector, error)

NewAggregateProjector creates a new projector for a projection

func (*AggregateProjector) Run

Run executes the projection and manages the state of the projection

func (*AggregateProjector) RunAndListen

func (a *AggregateProjector) RunAndListen(ctx context.Context, listener Listener) error

RunAndListen executes the projection and listens to any changes to the event store

type AggregateProjectorStorage

type AggregateProjectorStorage interface {
	ProjectorStorage

	LoadOutOfSync(ctx context.Context, conn Queryer) (*sql.Rows, error)

	PersistFailure(conn Execer, notification *ProjectionNotification) error
}

AggregateProjectorStorage the storage interface that will persist and load the projection state

type EventStreamLoader

type EventStreamLoader func(ctx context.Context, conn *sql.Conn, notification *ProjectionNotification, position int64) (goengine.EventStream, error)

EventStreamLoader loads a event stream based on the provided notification and state

func AggregateProjectionEventStreamLoader

func AggregateProjectionEventStreamLoader(eventStore ReadOnlyEventStore, streamName goengine.StreamName, aggregateTypeName string) EventStreamLoader

AggregateProjectionEventStreamLoader returns a EventStreamLoader for the AggregateProjector

func StreamProjectionEventStreamLoader

func StreamProjectionEventStreamLoader(eventStore ReadOnlyEventStore, streamName goengine.StreamName) EventStreamLoader

StreamProjectionEventStreamLoader returns a EventStreamLoader for the StreamProjector

type Execer

type Execer interface {
	ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
}

Execer a interface used to execute a query on a sql.DB, sql.Conn or sql.Tx

type Listener

type Listener interface {
	// Listen starts listening to the event stream and call the trigger when a event was appended
	Listen(ctx context.Context, trigger ProjectionTrigger) error
}

Listener listens to a event stream and triggers a notification when a event was appended

type MessageFactory

type MessageFactory interface {
	// CreateEventStream reconstructs the message from the provided rows
	CreateEventStream(rows *sql.Rows) (goengine.EventStream, error)
}

MessageFactory reconstruct messages from the database

type Metrics

type Metrics interface {
	// ReceivedNotification sends the metric to keep count of notifications received by goengine
	ReceivedNotification(isNotification bool)
	// QueueNotification is called when a notification is queued.
	// It saves start time for an event on aggregate when it's queued
	QueueNotification(notification *ProjectionNotification)
	// StartNotificationProcessing is called when a notification processing is started
	// It saves start time for an event on aggregate when it's picked to be processed by background processor
	StartNotificationProcessing(notification *ProjectionNotification)
	// FinishNotificationProcessing is called when a notification processing is finished
	// It actually sends metrics calculating duration for which a notification spends in queue and then processed by background processor
	FinishNotificationProcessing(notification *ProjectionNotification, success bool)
}

Metrics a structured metrics interface

var NopMetrics Metrics = &nopMetrics{}

NopMetrics is default Metrics handler in case nil is passed

type NotificationQueue

type NotificationQueue struct {
	// contains filtered or unexported fields
}

NotificationQueue implements a smart queue

func (*NotificationQueue) Empty

func (nq *NotificationQueue) Empty() bool

Empty returns whether the queue is empty

func (*NotificationQueue) Next

Next yields the next notification on the queue or stopped when processor has stopped

func (*NotificationQueue) Open

func (nq *NotificationQueue) Open() func()

Open enables the queue for business

func (*NotificationQueue) Queue

func (nq *NotificationQueue) Queue(ctx context.Context, notification *ProjectionNotification) error

Queue sends a notification to the queue

func (*NotificationQueue) ReQueue

func (nq *NotificationQueue) ReQueue(ctx context.Context, notification *ProjectionNotification) error

ReQueue sends a notification to the queue after setting the ValidAfter property

type NotificationQueuer

type NotificationQueuer interface {
	Open() func()

	Empty() bool
	Next(context.Context) (*ProjectionNotification, bool)

	Queue(context.Context, *ProjectionNotification) error
	ReQueue(context.Context, *ProjectionNotification) error
}

NotificationQueuer describes a smart queue for projection notifications

type PersistenceStrategy

type PersistenceStrategy interface {
	CreateSchema(tableName string) []string
	// EventColumnsNames represent the event store columns selected from the event stream table. Used by PrepareSearch
	EventColumnNames() []string
	// InsertColumnNames represent the ordered event store columns that are used to insert data into the event stream.
	InsertColumnNames() []string
	PrepareData([]goengine.Message) ([]interface{}, error)
	PrepareSearch(metadata.Matcher) ([]byte, []interface{})
	GenerateTableName(streamName goengine.StreamName) (string, error)
}

PersistenceStrategy interface describes strategy of persisting messages in the database

type ProcessHandler

ProcessHandler is a func used to trigger a notification but with the addition of providing a Trigger func so the original notification can trigger other notifications

type ProjectionErrorAction

type ProjectionErrorAction int

ProjectionErrorAction a type containing the action that the projector should take after an error

const (
	// ProjectionFail indicated that the projection failed and cannot be recovered
	// This means that a human as a service is needed
	ProjectionFail ProjectionErrorAction = iota
	// ProjectionRetry indicated that the notification should be retried
	// This can be used in combination with retry mechanism in the ProjectorErrorCallback
	ProjectionRetry ProjectionErrorAction = iota
	// ProjectionIgnoreError indicated that the projection failed but the failure can be ignored
	// This can be used when the ProjectorErrorCallback recovered the system from the error
	ProjectionIgnoreError ProjectionErrorAction = iota
)

type ProjectionErrorCallback

type ProjectionErrorCallback func(err error, notification *ProjectionNotification) ProjectionErrorAction

ProjectionErrorCallback is a function used to determin what action to take based on a failed projection

type ProjectionHandlerError

type ProjectionHandlerError struct {
	// contains filtered or unexported fields
}

ProjectionHandlerError an error indicating that a projection handler failed

func NewProjectionHandlerError

func NewProjectionHandlerError(err error) *ProjectionHandlerError

NewProjectionHandlerError return a ProjectionHandlerError with the cause being the provided error

func (*ProjectionHandlerError) Cause

func (e *ProjectionHandlerError) Cause() error

Cause returns the actual projection errors. This also adds support for github.com/pkg/errors.Cause

func (*ProjectionHandlerError) Error

func (e *ProjectionHandlerError) Error() string

Error return the error message

type ProjectionNotification

type ProjectionNotification struct {
	No          int64     `json:"no"`
	AggregateID string    `json:"aggregate_id"`
	ValidAfter  time.Time `json:"valid_after"`
}

ProjectionNotification is a representation of the data provided by database notify

func (*ProjectionNotification) MarshalEasyJSON

func (p *ProjectionNotification) MarshalEasyJSON(w *jwriter.Writer)

MarshalEasyJSON supports easyjson.Marshaler interface

func (*ProjectionNotification) UnmarshalEasyJSON

func (p *ProjectionNotification) UnmarshalEasyJSON(in *jlexer.Lexer)

UnmarshalEasyJSON supports easyjson.Unmarshaler interface

func (*ProjectionNotification) UnmarshalJSON

func (p *ProjectionNotification) UnmarshalJSON(data []byte) error

UnmarshalJSON supports json.Unmarshaler interface

type ProjectionNotificationProcessor

type ProjectionNotificationProcessor struct {
	// contains filtered or unexported fields
}

ProjectionNotificationProcessor provides a way to Trigger a notification using a set of background processes.

func NewBackgroundProcessor

func NewBackgroundProcessor(
	queueProcessors,
	queueBuffer int,
	logger goengine.Logger,
	metrics Metrics,
	notificationQueue NotificationQueuer,
) (*ProjectionNotificationProcessor, error)

NewBackgroundProcessor create a new projectionNotificationProcessor

func (*ProjectionNotificationProcessor) Execute

Execute starts the background worker and wait for the notification to be executed

func (*ProjectionNotificationProcessor) Queue

Queue puts the notification on the queue to be processed

func (*ProjectionNotificationProcessor) Start

func (b *ProjectionNotificationProcessor) Start(ctx context.Context, handler ProcessHandler) func()

Start starts the background processes that will call the ProcessHandler based on the notification queued by Exec

type ProjectionRawState

type ProjectionRawState struct {
	Position        int64
	ProjectionState []byte
}

ProjectionRawState the raw projection projectionState returned by ProjectorStorage.Acquire

type ProjectionState

type ProjectionState struct {
	Position        int64
	ProjectionState interface{}
}

ProjectionState is a projection projectionState

type ProjectionStateSerialization

type ProjectionStateSerialization interface {
	// Init initializes the state
	Init(ctx context.Context) (interface{}, error)

	// DecodeState reconstitute the projection state based on the provided state data
	DecodeState(data []byte) (interface{}, error)

	// EncodeState encode the given object for storage
	EncodeState(obj interface{}) ([]byte, error)
}

ProjectionStateSerialization is an interface describing how a projection state can be initialized, serialized/encoded anf deserialized/decoded

func GetProjectionStateSerialization

func GetProjectionStateSerialization(projection goengine.Projection) ProjectionStateSerialization

GetProjectionStateSerialization returns a ProjectionStateSerialization based on the provided projection

type ProjectionTrigger

type ProjectionTrigger func(ctx context.Context, notification *ProjectionNotification) error

ProjectionTrigger triggers the notification for processing

type ProjectorStorage

type ProjectorStorage interface {
	// Acquire this function is used to acquire the projection and it's projectionState
	// A projection can only be acquired once and must be released using the returned func
	Acquire(ctx context.Context, conn *sql.Conn, notification *ProjectionNotification) (ProjectorTransaction, int64, error)
}

ProjectorStorage is an interface for handling the projection storage

type ProjectorTransaction

type ProjectorTransaction interface {
	AcquireState(ctx context.Context) (ProjectionState, error)
	CommitState(ProjectionState) error

	Close() error
}

ProjectorTransaction is a transaction type object returned by the ProjectorStorage

type Queryer

type Queryer interface {
	QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
}

Queryer an interface used to query a sql.DB, sql.Conn or sql.Tx

type ReadOnlyEventStore

type ReadOnlyEventStore interface {
	// LoadWithConnection returns a eventstream based on the provided constraints using the provided Queryer
	LoadWithConnection(ctx context.Context, conn Queryer, streamName goengine.StreamName, fromNumber int64, count *uint, metadataMatcher metadata.Matcher) (goengine.EventStream, error)
}

ReadOnlyEventStore an interface describing a readonly event store that supports providing a SQL conn

type StreamProjector

type StreamProjector struct {
	sync.Mutex
	// contains filtered or unexported fields
}

StreamProjector is a postgres projector used to execute a projection against an event stream.

func NewStreamProjector

func NewStreamProjector(
	db *sql.DB,
	eventLoader EventStreamLoader,
	resolver goengine.MessagePayloadResolver,
	projection goengine.Projection,
	projectorStorage StreamProjectorStorage,
	projectionErrorHandler ProjectionErrorCallback,
	logger goengine.Logger,
) (*StreamProjector, error)

NewStreamProjector creates a new projector for a projection

func (*StreamProjector) Run

func (s *StreamProjector) Run(ctx context.Context) error

Run executes the projection and manages the state of the projection

func (*StreamProjector) RunAndListen

func (s *StreamProjector) RunAndListen(ctx context.Context, listener Listener) error

RunAndListen executes the projection and listens to any changes to the event store

type StreamProjectorStorage

type StreamProjectorStorage interface {
	ProjectorStorage

	CreateProjection(ctx context.Context, conn Execer) error
}

StreamProjectorStorage the storage interface that will persist and load the projection state

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL