Documentation ¶
Overview ¶
Package eventstore contains the necessary abstractions and types to represents and interact with an Event Store.
Index ¶
- Variables
- func ContextMetadata(ctx context.Context, metadata eventually.Metadata) context.Context
- func MapEvents(events []Event) []eventually.Event
- type Appender
- type ConflictError
- type ContextAware
- type Event
- type EventStream
- type Fused
- type Select
- type SequenceNumberGetter
- type Store
- type StoreSuite
- type Streamer
- type VersionCheck
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var SelectFromBeginning = Select{From: 0}
SelectFromBeginning is a Select operator instance that will select the entirety of the desired Event Stream.
Functions ¶
func ContextMetadata ¶
ContextMetadata extends a provided context.Context instance with the provided eventually.Metadata map. When using the ContextAware eventstore.Appender extension, the provided Metadata map will be applied to all events passed during an Append() method call.
Types ¶
type Appender ¶
type Appender interface { // Append inserts the specified Domain Events into the Event Stream specified // by the current instance, returning the new version of the Event Stream. // // A version can be specified to enable an Optimistic Concurrency check // on append, by using the expected version of the Event Stream prior // to appending the new Events. // // Alternatively, VersionCheckAny can be used if no Optimistic Concurrency check // should be carried out. // // An instance of ConflictError will be returned if the optimistic locking // version check fails against the current version of the Event Stream. Append(ctx context.Context, id stream.ID, versionCheck VersionCheck, events ...eventually.Event) (int64, error) }
Appender is an Event Store trait that provides the ability to append new Domain Events to an Event Stream.
Implementations of this interface should be synchronous, returning from the call only when either the Events have been correctly saved on the Event Store, or if an error occurred.
type ConflictError ¶
ConflictError is an error returned by an Event Store when appending some events using an expected Event Stream version that does not match the current state of the Event Stream.
func (ConflictError) Error ¶
func (err ConflictError) Error() string
type ContextAware ¶
type ContextAware struct {
Appender
}
ContextAware is an eventstore.Appender extension that uses the eventually.Metadata map provided in the context.Context used during an Append() call to extend the metadata of each event being appended.
Use ContextMetadata in conjunction with this type to make use of this feature.
func NewContextAware ¶
func NewContextAware(appender Appender) ContextAware
NewContextAware extends the provided eventstore.Appender instance with a ContextAware version.
func (ContextAware) Append ¶
func (ca ContextAware) Append( ctx context.Context, id stream.ID, versionCheck VersionCheck, events ...eventually.Event, ) (int64, error)
Append applies the eventually.Metadata map from the context.Context to all the events specified, if such map has been provided using eventstore.ContextMetadata.
The extended events are then appended to the Event Store using the base eventstore.Appender instance provided during initialization.
type Event ¶
type Event struct { eventually.Event // Stream is the identifier of the Event Stream this Event // belongs to. Stream stream.ID // Version is the version of this Event, used for Optimistic Locking // and detecting or avoiding concurrency conflict scenarios. Version int64 // Sequence Number is the index of the Event in the Event Store, // used for ordered streaming. SequenceNumber int64 }
Event represents an Event Message that has been persisted into the Event Store.
func StreamToSlice ¶
func StreamToSlice(ctx context.Context, f func(context.Context, EventStream) error) ([]Event, error)
StreamToSlice synchronously exhausts an EventStream to an Event slice, and returns an error if the EventStream origin, passed here as a closure, fails with an error.
type Fused ¶
Fused is a convenience type to fuse multiple Event Store interfaces where you might need to extend the functionality of the Store only partially.
E.g. You might want to extend the functionality of the Append() method, but keep the Streamer methods the same.
If the extension wrapper does not support the Streamer interface, you cannot use the extension wrapper instance as an Event Store in certain cases (e.g. the Aggregate Repository).
Using a Fused instance you can fuse both instances together, and use it with the rest of the library ecosystem.
Example ¶
package main import ( "github.com/get-eventually/go-eventually/aggregate" "github.com/get-eventually/go-eventually/eventstore" "github.com/get-eventually/go-eventually/eventstore/inmemory" "github.com/get-eventually/go-eventually/extension/correlation" ) func main() { eventStore := inmemory.NewEventStore() correlatedEventStore := correlation.EventStoreWrapper{ Appender: eventStore, Generator: func() string { return "test-id" }, } aggregate.NewRepository(aggregate.Type{}, eventstore.Fused{ Appender: correlatedEventStore, Streamer: eventStore, }) }
Output:
type Select ¶
type Select struct {
From int64
}
Select is used to effectively select a slice of the Event Stream, by referencing to either the Event Stream version (in case of Stream) or Event Store sequence number (for StreamByType and StreamAll).
type SequenceNumberGetter ¶
type SequenceNumberGetter interface { // LatestSequenceNumber should return the latest, global Sequence Number // registered by the Event Store. LatestSequenceNumber(ctx context.Context) (int64, error) }
SequenceNumberGetter is an Event Store trait that is used to interact with sequence numbers (e.g. queries).
type StoreSuite ¶
StoreSuite is a full testing suite for an eventstore.Store instance.
func NewStoreSuite ¶
func NewStoreSuite(factory func() Store) *StoreSuite
NewStoreSuite creates a new Event Store testing suite using the provided eventstore.Store type.
func (*StoreSuite) SetupTest ¶
func (ss *StoreSuite) SetupTest()
SetupTest creates a new, fresh Event Store instance for each test in the suite.
func (*StoreSuite) TestStream ¶
func (ss *StoreSuite) TestStream()
TestStream tests all the eventstore.Streamer functions using the provided Event Store instance.
type Streamer ¶
type Streamer interface { // Stream opens one or more Event Streams as specified by the provided Event Stream target. Stream(ctx context.Context, es EventStream, target stream.Target, selectt Select) error }
Streamer is the Event Store trait that deals with opening Event Streams from a certain version, or sequence number. Streamer should return all the committed Events (after the `from` bound) in the Event Store at the time of invocation, instead of opening a long-running subscription channel.
Implementations of this interface should be synchronous, returning from the call only when all the Events have been streamed into the provided Event Stream, and close the channel.
Event Stream channel is provided in input as inversion of dependency, in order to allow to callers to choose the desired buffering on the channel, matching the caller concurrency properties.
type VersionCheck ¶
type VersionCheck int64
VersionCheck is used to specify the expected version of an Event Stream before Append-ing new Events. Useful for optimistic locking.
Use VersionCheckAny if you're not interested in optimistic locking and conflict resolution.
const VersionCheckAny VersionCheck = iota - 1
VersionCheckAny can be used when calling Append() to disregard any check on the Event Stream version, when you just want to insert some events in the Event Store.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package inmemory provides an in-memory implementation of an Event Store, and some other utilities that use an in-memory backend.
|
Package inmemory provides an in-memory implementation of an Event Store, and some other utilities that use an in-memory backend. |
postgres
module
|
|
Package stream exports types for dealing with Event Streams, such as Event Stream ids and Event Streams targets.
|
Package stream exports types for dealing with Event Streams, such as Event Stream ids and Event Streams targets. |