eventstore

package
v0.1.0-alpha.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 21, 2021 License: MIT Imports: 8 Imported by: 0

Documentation

Overview

Package eventstore contains the necessary abstractions and types to represents and interact with an Event Store.

Index

Examples

Constants

This section is empty.

Variables

View Source
var SelectFromBeginning = Select{From: 0}

SelectFromBeginning is a Select operator instance that will select the entirety of the desired Event Stream.

Functions

func ContextMetadata

func ContextMetadata(ctx context.Context, metadata eventually.Metadata) context.Context

ContextMetadata extends a provided context.Context instance with the provided eventually.Metadata map. When using the ContextAware eventstore.Appender extension, the provided Metadata map will be applied to all events passed during an Append() method call.

func MapEvents

func MapEvents(events []Event) []eventually.Event

MapEvents maps a list of persisted eventstore.Event to only their internal Event data.

Types

type Appender

type Appender interface {
	// Append inserts the specified Domain Events into the Event Stream specified
	// by the current instance, returning the new version of the Event Stream.
	//
	// A version can be specified to enable an Optimistic Concurrency check
	// on append, by using the expected version of the Event Stream prior
	// to appending the new Events.
	//
	// Alternatively, VersionCheckAny can be used if no Optimistic Concurrency check
	// should be carried out.
	//
	// An instance of ConflictError will be returned if the optimistic locking
	// version check fails against the current version of the Event Stream.
	Append(ctx context.Context, id stream.ID, versionCheck VersionCheck, events ...eventually.Event) (int64, error)
}

Appender is an Event Store trait that provides the ability to append new Domain Events to an Event Stream.

Implementations of this interface should be synchronous, returning from the call only when either the Events have been correctly saved on the Event Store, or if an error occurred.

type ConflictError

type ConflictError struct {
	Expected int64
	Actual   int64
}

ConflictError is an error returned by an Event Store when appending some events using an expected Event Stream version that does not match the current state of the Event Stream.

func (ConflictError) Error

func (err ConflictError) Error() string

type ContextAware

type ContextAware struct {
	Appender
}

ContextAware is an eventstore.Appender extension that uses the eventually.Metadata map provided in the context.Context used during an Append() call to extend the metadata of each event being appended.

Use ContextMetadata in conjunction with this type to make use of this feature.

func NewContextAware

func NewContextAware(appender Appender) ContextAware

NewContextAware extends the provided eventstore.Appender instance with a ContextAware version.

func (ContextAware) Append

func (ca ContextAware) Append(
	ctx context.Context,
	id stream.ID,
	versionCheck VersionCheck,
	events ...eventually.Event,
) (int64, error)

Append applies the eventually.Metadata map from the context.Context to all the events specified, if such map has been provided using eventstore.ContextMetadata.

The extended events are then appended to the Event Store using the base eventstore.Appender instance provided during initialization.

type Event

type Event struct {
	eventually.Event

	// Stream is the identifier of the Event Stream this Event
	// belongs to.
	Stream stream.ID

	// Version is the version of this Event, used for Optimistic Locking
	// and detecting or avoiding concurrency conflict scenarios.
	Version int64

	// Sequence Number is the index of the Event in the Event Store,
	// used for ordered streaming.
	SequenceNumber int64
}

Event represents an Event Message that has been persisted into the Event Store.

func StreamToSlice

func StreamToSlice(ctx context.Context, f func(context.Context, EventStream) error) ([]Event, error)

StreamToSlice synchronously exhausts an EventStream to an Event slice, and returns an error if the EventStream origin, passed here as a closure, fails with an error.

type EventStream

type EventStream chan<- Event

EventStream is a stream of persisted Events.

type Fused

type Fused struct {
	Appender
	Streamer
}

Fused is a convenience type to fuse multiple Event Store interfaces where you might need to extend the functionality of the Store only partially.

E.g. You might want to extend the functionality of the Append() method, but keep the Streamer methods the same.

If the extension wrapper does not support the Streamer interface, you cannot use the extension wrapper instance as an Event Store in certain cases (e.g. the Aggregate Repository).

Using a Fused instance you can fuse both instances together, and use it with the rest of the library ecosystem.

Example
package main

import (
	"github.com/get-eventually/go-eventually/aggregate"
	"github.com/get-eventually/go-eventually/eventstore"
	"github.com/get-eventually/go-eventually/eventstore/inmemory"
	"github.com/get-eventually/go-eventually/extension/correlation"
)

func main() {
	eventStore := inmemory.NewEventStore()
	correlatedEventStore := correlation.EventStoreWrapper{
		Appender:  eventStore,
		Generator: func() string { return "test-id" },
	}

	aggregate.NewRepository(aggregate.Type{}, eventstore.Fused{
		Appender: correlatedEventStore,
		Streamer: eventStore,
	})
}
Output:

type Select

type Select struct {
	From int64
}

Select is used to effectively select a slice of the Event Stream, by referencing to either the Event Stream version (in case of Stream) or Event Store sequence number (for StreamByType and StreamAll).

type SequenceNumberGetter

type SequenceNumberGetter interface {
	// LatestSequenceNumber should return the latest, global Sequence Number
	// registered by the Event Store.
	LatestSequenceNumber(ctx context.Context) (int64, error)
}

SequenceNumberGetter is an Event Store trait that is used to interact with sequence numbers (e.g. queries).

type Store

type Store interface {
	Appender
	Streamer
}

Store represents an Event Store.

type StoreSuite

type StoreSuite struct {
	suite.Suite
	// contains filtered or unexported fields
}

StoreSuite is a full testing suite for an eventstore.Store instance.

func NewStoreSuite

func NewStoreSuite(factory func() Store) *StoreSuite

NewStoreSuite creates a new Event Store testing suite using the provided eventstore.Store type.

func (*StoreSuite) SetupTest

func (ss *StoreSuite) SetupTest()

SetupTest creates a new, fresh Event Store instance for each test in the suite.

func (*StoreSuite) TestStream

func (ss *StoreSuite) TestStream()

TestStream tests all the eventstore.Streamer functions using the provided Event Store instance.

type Streamer

type Streamer interface {
	// Stream opens one or more Event Streams as specified by the provided Event Stream target.
	Stream(ctx context.Context, es EventStream, target stream.Target, selectt Select) error
}

Streamer is the Event Store trait that deals with opening Event Streams from a certain version, or sequence number. Streamer should return all the committed Events (after the `from` bound) in the Event Store at the time of invocation, instead of opening a long-running subscription channel.

Implementations of this interface should be synchronous, returning from the call only when all the Events have been streamed into the provided Event Stream, and close the channel.

Event Stream channel is provided in input as inversion of dependency, in order to allow to callers to choose the desired buffering on the channel, matching the caller concurrency properties.

type VersionCheck

type VersionCheck int64

VersionCheck is used to specify the expected version of an Event Stream before Append-ing new Events. Useful for optimistic locking.

Use VersionCheckAny if you're not interested in optimistic locking and conflict resolution.

const VersionCheckAny VersionCheck = iota - 1

VersionCheckAny can be used when calling Append() to disregard any check on the Event Stream version, when you just want to insert some events in the Event Store.

Directories

Path Synopsis
Package inmemory provides an in-memory implementation of an Event Store, and some other utilities that use an in-memory backend.
Package inmemory provides an in-memory implementation of an Event Store, and some other utilities that use an in-memory backend.
postgres module
Package stream exports types for dealing with Event Streams, such as Event Stream ids and Event Streams targets.
Package stream exports types for dealing with Event Streams, such as Event Stream ids and Event Streams targets.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL