Documentation
¶
Overview ¶
Package persistence provides abstractions for data persistence.
Index ¶
- Variables
- type AggregateMetaData
- type AggregateRepository
- type Batch
- type ConflictError
- type DataStore
- type DataStoreSet
- type Event
- type EventRepository
- type EventResult
- type OffsetRepository
- type Operation
- type OperationVisitor
- type Persister
- type ProcessInstance
- type ProcessRepository
- type Provider
- type QueueMessage
- type QueueRepository
- type RemoveProcessInstance
- type RemoveQueueMessage
- type Result
- type SaveAggregateMetaData
- type SaveEvent
- type SaveOffset
- type SaveProcessInstance
- type SaveQueueMessage
- type UnknownMessageError
Constants ¶
This section is empty.
Variables ¶
var ErrDataStoreClosed = errors.New("data store is closed")
ErrDataStoreClosed is returned when performing any persistence operation on a closed data-store.
var ErrDataStoreLocked = errors.New("data store is locked")
ErrDataStoreLocked indicates that an application's data store can not be opened because it is locked by another engine instance.
Functions ¶
This section is empty.
Types ¶
type AggregateMetaData ¶
type AggregateMetaData struct { // HandlerKey is the identity key of the aggregate message handler. HandlerKey string // InstanceID is the aggregate instance ID. InstanceID string // Revision is the instance's current version, used to enforce optimistic // concurrency control. Revision uint64 // InstanceExists is true if the instance currently exists. // // When an aggregate instance is destroyed, its meta-data is retained but // this flag is set to false. InstanceExists bool // LastEventID is the ID of the most recent event message recorded against // the instance. LastEventID string // BarrierEventID is the ID of the event message to use as the "barrier // message" when loading the instance's historical events. // // It is updated when the instance is destroyed to avoid loading any events // prior to that point. BarrierEventID string }
AggregateMetaData contains meta-data about an aggregate instance.
type AggregateRepository ¶
type AggregateRepository interface { // LoadAggregateMetaData loads the meta-data for an aggregate instance. // // hk is the aggregate handler's identity key, id is the instance ID. LoadAggregateMetaData( ctx context.Context, hk, id string, ) (AggregateMetaData, error) }
AggregateRepository is an interface for reading aggregate state.
type Batch ¶
type Batch []Operation
Batch is a set of operations that are committed to the data store atomically using a Persister.
func (Batch) AcceptVisitor ¶
func (b Batch) AcceptVisitor(ctx context.Context, v OperationVisitor) error
AcceptVisitor visits each operation in the batch.
func (Batch) MustValidate ¶
func (b Batch) MustValidate()
MustValidate panics if the batch contains any operations that operate on the same entity.
type ConflictError ¶
type ConflictError struct { // Cause is the operation that caused the conflict. Cause Operation }
ConflictError is an error indicating one or more operations within a batch caused an optimistic concurrency conflict.
func (ConflictError) Error ¶
func (e ConflictError) Error() string
type DataStore ¶
type DataStore interface { AggregateRepository EventRepository OffsetRepository ProcessRepository QueueRepository Persister // Close closes the data store. // // Closing a data-store causes any future calls to Persist() to return // ErrDataStoreClosed. // // The behavior read operations on a closed data-store is // implementation-defined. // // In general use it is expected that all pending calls to Persist() will // have finished before a data-store is closed. Close() may block until any // in-flight calls to Persist() return, or may prevent any such calls from // succeeding. Close() error }
DataStore is an interface used by the engine to persist and retrieve data for a specific application.
type DataStoreSet ¶
type DataStoreSet struct { Provider Provider // contains filtered or unexported fields }
DataStoreSet is a collection of data-stores for several applications.
func (*DataStoreSet) Close ¶
func (s *DataStoreSet) Close() error
Close closes all datastores in the set.
type Event ¶
type Event struct { Offset uint64 Envelope *envelopespec.Envelope }
Event is a persisted event message.
type EventRepository ¶
type EventRepository interface { // NextEventOffset returns the next "unused" offset. NextEventOffset(ctx context.Context) (uint64, error) // LoadEventsByType loads events that match a specific set of message types. // // f is the set of message types to include in the result. The keys of f are // the "portable type name" produced when the events are marshaled. // // o specifies the (inclusive) lower-bound of the offset range to include in // the results. LoadEventsByType( ctx context.Context, f map[string]struct{}, o uint64, ) (EventResult, error) // LoadEventsBySource loads the events produced by a specific handler. // // hk is the handler's identity key. // // id is the instance ID, which must be empty if the handler type does not // use instances. // // m is ID of a "barrier" message. If supplied, the results are limited to // events with higher offsets than the barrier message. If the message // cannot be found, UnknownMessageError is returned. LoadEventsBySource( ctx context.Context, hk, id, m string, ) (EventResult, error) }
EventRepository is an interface for reading event messages.
type EventResult ¶
type EventResult interface { // Next returns the next event in the result. // // It returns false if the are no more events in the result. Next(ctx context.Context) (Event, bool, error) // Close closes the cursor. Close() error }
EventResult is the result of a query made using an EventRepository.
EventResult values are not safe for concurrent use.
type OffsetRepository ¶
type OffsetRepository interface { // LoadOffset loads the offset associated with a specific application. // // ak is the application's identity key. LoadOffset(ctx context.Context, ak string) (uint64, error) }
OffsetRepository is an interface for reading event stream offsets associated with remote applications.
type Operation ¶
type Operation interface { // AcceptVisitor calls the appropriate visit method on the given visitor. AcceptVisitor(context.Context, OperationVisitor) error // contains filtered or unexported methods }
Operation is a persistence operation that can be performed as part of an atomic batch.
type OperationVisitor ¶
type OperationVisitor interface { VisitSaveAggregateMetaData(context.Context, SaveAggregateMetaData) error VisitSaveEvent(context.Context, SaveEvent) error VisitSaveProcessInstance(context.Context, SaveProcessInstance) error VisitRemoveProcessInstance(context.Context, RemoveProcessInstance) error VisitSaveQueueMessage(context.Context, SaveQueueMessage) error VisitRemoveQueueMessage(context.Context, RemoveQueueMessage) error VisitSaveOffset(context.Context, SaveOffset) error }
OperationVisitor visits persistence operations.
type Persister ¶
type Persister interface { // Persist commits a batch of operations atomically. // // If any one of the operations causes an optimistic concurrency conflict // the entire batch is aborted and a ConflictError is returned. Persist(context.Context, Batch) (Result, error) }
A Persister is an interface for committing batches of atomic operations to the data store.
type ProcessInstance ¶
type ProcessInstance struct { // HandlerKey is the identity key of the process message handler. HandlerKey string // InstanceID is the process instance ID. InstanceID string // Revision is the instance's current version, used to enforce optimistic // concurrency control. Revision uint64 // Packet contains the binary representation of the process state. Packet marshaler.Packet }
ProcessInstance contains the state of a process instance.
type ProcessRepository ¶
type ProcessRepository interface { // LoadProcessInstance loads a process instance. // // hk is the process handler's identity key, id is the instance ID. LoadProcessInstance( ctx context.Context, hk, id string, ) (ProcessInstance, error) }
ProcessRepository is an interface for reading process state.
type Provider ¶
type Provider interface { // Open returns a data-store for a specific application. // // k is the identity key of the application. // // Data stores are opened for exclusive use. If another engine instance has // already opened this application's data-store, ErrDataStoreLocked is // returned. Open(ctx context.Context, k string) (DataStore, error) }
Provider is an interface used by the engine to obtain application-specific DataStore instances.
type QueueMessage ¶
type QueueMessage struct { Revision uint64 FailureCount uint NextAttemptAt time.Time Envelope *envelopespec.Envelope }
QueueMessage is a message persisted in the message queue.
type QueueRepository ¶
type QueueRepository interface { // LoadQueueMessages loads the next n messages from the queue. LoadQueueMessages(ctx context.Context, n int) ([]QueueMessage, error) }
QueueRepository is an interface for reading queued messages.
type RemoveProcessInstance ¶
type RemoveProcessInstance struct { // Instance is the instance to remove. // // Instance.Revision must be the revision of the process instance as // currently persisted, otherwise an optimistic concurrency conflict occurs // and the entire batch of operations is rejected. Instance ProcessInstance }
RemoveProcessInstance is an Operation that removes a process instance.
The instance's pending timeout messages are removed from the message queue.
func (RemoveProcessInstance) AcceptVisitor ¶
func (op RemoveProcessInstance) AcceptVisitor(ctx context.Context, v OperationVisitor) error
AcceptVisitor calls v.VisitRemoveProcessInstance().
type RemoveQueueMessage ¶
type RemoveQueueMessage struct { // Message is the message to remove from the queue. // // The message's revision field must be the revision of the message as // currently persisted, otherwise an optimistic concurrency conflict occurs // and the entire batch of operations is rejected. Message QueueMessage }
RemoveQueueMessage is an Operation that removes a message from the queue.
func (RemoveQueueMessage) AcceptVisitor ¶
func (op RemoveQueueMessage) AcceptVisitor(ctx context.Context, v OperationVisitor) error
AcceptVisitor calls v.VisitRemoveQueueMessage().
type Result ¶
type Result struct { // EventOffset contains the offsets of the events saved within the batch, // keyed by their message ID. EventOffsets map[string]uint64 }
Result is the result of a successfully persisted batch of operations.
type SaveAggregateMetaData ¶
type SaveAggregateMetaData struct { // MetaData is the meta-data to persist. // // MetaData.Revision must be the revision of the aggregate instance as // currently persisted, otherwise an optimistic concurrency conflict occurs // and the entire batch of operations is rejected. MetaData AggregateMetaData }
SaveAggregateMetaData is an Operation that creates or updates meta-data about an aggregate instance.
func (SaveAggregateMetaData) AcceptVisitor ¶
func (op SaveAggregateMetaData) AcceptVisitor(ctx context.Context, v OperationVisitor) error
AcceptVisitor calls v.VisitSaveAggregateMetaData().
type SaveEvent ¶
type SaveEvent struct { // Envelope is the envelope containing the event to persist. Envelope *envelopespec.Envelope }
SaveEvent is an Operation that persists an event message.
func (SaveEvent) AcceptVisitor ¶
func (op SaveEvent) AcceptVisitor(ctx context.Context, v OperationVisitor) error
AcceptVisitor calls v.VisitSaveEvent().
type SaveOffset ¶
type SaveOffset struct { // ApplicationKey is the identity key of the source application. ApplicationKey string // CurrentOffset must be offset currently associated with this application, // otherwise an optimistic concurrency conflict occurs and the entire batch // of operations is rejected. CurrentOffset uint64 // NextOffset is the next offset to consume from this application. NextOffset uint64 }
SaveOffset is an Operation that persists the offset of the next event to be consumed from a specific application.
func (SaveOffset) AcceptVisitor ¶
func (op SaveOffset) AcceptVisitor(ctx context.Context, v OperationVisitor) error
AcceptVisitor calls v.VisitSaveOffset().
type SaveProcessInstance ¶
type SaveProcessInstance struct { // Instance is the instance to persist. // // Instance.Revision must be the revision of the process instance as // currently persisted, otherwise an optimistic concurrency conflict occurs // and the entire batch of operations is rejected. Instance ProcessInstance }
SaveProcessInstance is an Operation that creates or updates a process instance.
func (SaveProcessInstance) AcceptVisitor ¶
func (op SaveProcessInstance) AcceptVisitor(ctx context.Context, v OperationVisitor) error
AcceptVisitor calls v.VisitSaveProcessInstance().
type SaveQueueMessage ¶
type SaveQueueMessage struct { // Message is the message to persist to the queue. // // The message's revision field must be the revision of the message as // currently persisted, otherwise an optimistic concurrency conflict occurs // and the entire batch of operations is rejected. Message QueueMessage }
SaveQueueMessage is an Operation that saves a message to the queue, or updates a message that is already on the queue.
func (SaveQueueMessage) AcceptVisitor ¶
func (op SaveQueueMessage) AcceptVisitor(ctx context.Context, v OperationVisitor) error
AcceptVisitor calls v.VisitSaveQueueMessage().
type UnknownMessageError ¶
type UnknownMessageError struct {
MessageID string
}
UnknownMessageError is the error returned when a message referenced by its ID does not exist.
func (UnknownMessageError) Error ¶
func (e UnknownMessageError) Error() string
Error returns a string representation of UnknownMessageError.
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
Package boltpersistence is a BoltDB (bbolt) persistence provider.
|
Package boltpersistence is a BoltDB (bbolt) persistence provider. |
internal
|
|
providertest
Package providertest contains a common test suite for persistence.Provider implementations.
|
Package providertest contains a common test suite for persistence.Provider implementations. |
Package memorypersistence is an in-memory persistence provider.
|
Package memorypersistence is an in-memory persistence provider. |
Package sqlpersistence is an SQL-based persistence provider with drivers for several popular SQL database systems.
|
Package sqlpersistence is an SQL-based persistence provider with drivers for several popular SQL database systems. |
mysql
Package mysql is a MySQL driver for the SQL persistence provider.
|
Package mysql is a MySQL driver for the SQL persistence provider. |
postgres
Package postgres is a PostgreSQL driver for the SQL persistence provider.
|
Package postgres is a PostgreSQL driver for the SQL persistence provider. |
sqlite
Package sqlite is an SQlite v3 driver for the SQL persistence provider.
|
Package sqlite is an SQlite v3 driver for the SQL persistence provider. |