Documentation ¶
Overview ¶
Package eventsourcing contains the core implementation of an event-sourcing framework written in Golang.
Index ¶
- Constants
- func DefaultTestStoreFilter() error
- func NewConcurrencyFault(aggregateKey string, eventSequence int64) error
- func NewDomainFault(aggregateKey string, faultCode string) error
- func NormalizeTypeName(name string) string
- func Retry(limit int, body func() error) error
- type Adapter
- type AdapterPositional
- type AdapterWithEvents
- type Aggregate
- type AggregateBase
- func (agg *AggregateBase) ApplyEvent(event Event)
- func (agg *AggregateBase) AutomaticWireup(subject interface{})
- func (agg *AggregateBase) Commit() error
- func (agg *AggregateBase) DefineReplayMethod(eventType EventType, replay func(Event))
- func (agg *AggregateBase) GetKey() string
- func (agg *AggregateBase) Handle(command Command) error
- func (agg *AggregateBase) Initialize(key string, registry EventRegistry, store EventStore, state StateFetchFunc)
- func (agg *AggregateBase) Refresh() error
- func (agg *AggregateBase) Run(callback func() error) error
- func (agg *AggregateBase) SequenceNumber() int64
- func (agg *AggregateBase) State() interface{}
- type CloseMiddleware
- type Command
- type CommandHandleFunc
- type CommandHandler
- type CommandRegistry
- type CommandType
- type CommitMiddleware
- type ConcurrencyFault
- type DomainFault
- type Event
- type EventConsumer
- type EventDefinition
- type EventDetector
- type EventFactory
- type EventHandler
- type EventHandlerBase
- type EventPublisher
- type EventRegistry
- type EventStore
- type EventStoreWithMiddleware
- type EventType
- type MiddlewareFactory
- type NextHandler
- type PublishedEvent
- type RefreshMiddleware
- type StateFetchFunc
- type StoreLoaderAdapter
- type StoreWriterAdapter
- type TestStore
- type TestStoreFilter
- type TestStoreHistoryItem
Constants ¶
const ( // HandleMethodPrefix is a prefix used for command handler methods HandleMethodPrefix = "Handle" // ReplayMethodPrefix is the prefix used for event replay methods ReplayMethodPrefix = "Replay" )
const (
// EventHandleMethodPrefix is the prefix for handler auto-wireup methods
EventHandleMethodPrefix = "Handle"
)
Variables ¶
This section is empty.
Functions ¶
func DefaultTestStoreFilter ¶
func DefaultTestStoreFilter() error
DefaultTestStoreFilter is an error filter that doesn't fail, which is the default.
func NewConcurrencyFault ¶
NewConcurrencyFault creates an error from the specified fault code
func NewDomainFault ¶
NewDomainFault creates an error from the specified fault code
func NormalizeTypeName ¶
NormalizeTypeName the event name of an event so that we remove the go-supplied package name
Types ¶
type Adapter ¶
type Adapter interface { // GetKey fetches the aggregate key GetKey() string }
Adapter is an interface that exposes state information about the aggregate being operated on.
type AdapterPositional ¶
type AdapterPositional interface { Adapter // SequenceNumber fetches the current sequence number SequenceNumber() int64 }
AdapterPositional is an adapter which can introspect about where an aggregate is at in terms of it's history.
type AdapterWithEvents ¶
type AdapterWithEvents interface { AdapterPositional // GetEventRegistry gets the event registry to use GetEventRegistry() EventRegistry // IsDirty returns true if the aggregate has uncommitted state. IsDirty() bool }
AdapterWithEvents is variant of Adapter that is required where components need to reason about event types in an abstract way (i.e. Event Reader adapters)
type Aggregate ¶
type Aggregate interface { // Initialize sets up the initial state of the aggregate. Initialize(key string, registry EventRegistry, store EventStore) // ApplyEvent applies an event that has occurred to the aggregate // instance to mutate its state. Events that are not recognized are // ignored, and all event application is fail-safe. ApplyEvent(Event) // Commit commits the state of the aggregate, persisting any // new events to the store. Commit() error // Refresh recovers the state of the aggregate from the underlying // store. Refresh() error // GetState gets the state of an aggregate GetState() interface{} }
Aggregate is the interface for an event-sourced aggregate root. All common behaviours of an aggregate expected by the runtime are defined here.
type AggregateBase ¶
type AggregateBase struct {
// contains filtered or unexported fields
}
AggregateBase is an implementation of Aggregate that provides a lot of shared boilerplate code.
func (*AggregateBase) ApplyEvent ¶
func (agg *AggregateBase) ApplyEvent(event Event)
ApplyEvent applies an event that has occurred to the aggregate base instance to mutate its state. Events that are not recognized are ignored, and all event application should be fail-safe.
func (*AggregateBase) AutomaticWireup ¶
func (agg *AggregateBase) AutomaticWireup(subject interface{})
AutomaticWireup performs automatic detection of event replay methods, looking for applyEventName methods on the current type.
func (*AggregateBase) Commit ¶
func (agg *AggregateBase) Commit() error
Commit commits the state of the aggregate, marking all events as having been accepted by a backing store. This does not itself cause persistence to occur.
func (*AggregateBase) DefineReplayMethod ¶
func (agg *AggregateBase) DefineReplayMethod(eventType EventType, replay func(Event))
DefineReplayMethod defines a method that replays events of a given event type.
func (*AggregateBase) GetKey ¶
func (agg *AggregateBase) GetKey() string
GetKey fetches the key of this aggregate instance.
func (*AggregateBase) Handle ¶
func (agg *AggregateBase) Handle(command Command) error
Handle processes a command against the aggregate.
func (*AggregateBase) Initialize ¶
func (agg *AggregateBase) Initialize(key string, registry EventRegistry, store EventStore, state StateFetchFunc)
Initialize sets the initial state of the AggregateBase and ensures we are in a suitable situation to start reasoning about the events that will happen.
func (*AggregateBase) Refresh ¶
func (agg *AggregateBase) Refresh() error
Refresh reloads the current state of the aggregate from the underlying store.
func (*AggregateBase) Run ¶
func (agg *AggregateBase) Run(callback func() error) error
Run performs a load, mutate, commit cycle on an aggregate
func (*AggregateBase) SequenceNumber ¶
func (agg *AggregateBase) SequenceNumber() int64
SequenceNumber gets the current sequence number of the aggregate.
func (*AggregateBase) State ¶
func (agg *AggregateBase) State() interface{}
State gets the current state of an aggregate for any process that is interested
type CloseMiddleware ¶
type CloseMiddleware func() error
CloseMiddleware shuts down a middleware, if present
type Command ¶
type Command interface { }
Command is an interface that describes commands common attributes
type CommandHandleFunc ¶
CommandHandleFunc is a function that handles a command directly.
type CommandHandler ¶
type CommandHandler interface { // Handle a command, returning the resultant events (or an error) Handle(command Command) ([]Event, error) }
CommandHandler is an interface that describes the operations available on an instance that can follow the command-handler pattern.
type CommandRegistry ¶
type CommandRegistry interface { // CreateCommand creates an instance of an event CreateCommand(CommandType) Command // Domain this registry contains commands for Domain() string // GetCommandType determines the CommandType of a Command GetCommandType(interface{}) (CommandType, bool) // RegisterCommand registers a command RegisterCommand(Command) CommandType }
CommandRegistry defines a per-aggregate type registry of the commands that are known to a specific aggregate.
func NewStandardCommandRegistry ¶
func NewStandardCommandRegistry(domain string) CommandRegistry
NewStandardCommandRegistry creates an instance of a plain CommandRegistry that stores information about command types in an internal map. The string parameter is the name of the domain/bounded-context in which our commands live.
type CommandType ¶
type CommandType string
CommandType is a string-alias that represents a commands type, which can be used in maps.
type CommitMiddleware ¶
type CommitMiddleware func(writer StoreWriterAdapter, next NextHandler) error
CommitMiddleware is middleware that handles commit operations, allowing for intercepting or other operations.
type ConcurrencyFault ¶
type ConcurrencyFault struct { AggregateKey string `json:"aggregate_key"` EventSequence int64 `json:"event_sequence"` }
ConcurrencyFault represents an error that occurred when updating an aggregate: specifically that we have tried to insert events at an index that is already defined. This means the client likely needs to re-run the command to break the deadlock, as someone else executed first.
func IsConcurrencyFault ¶
func IsConcurrencyFault(err error) (bool, *ConcurrencyFault)
IsConcurrencyFault determines if the specified error is a ConcurrencyFault
func (ConcurrencyFault) Error ¶
func (curr ConcurrencyFault) Error() string
Error returns the ConcurrencyFault formatted as a string to meet the Error interface.
type DomainFault ¶
type DomainFault struct { // AggregateKey that had the fault AggregateKey string `json:"aggregate_key"` // FaultCode for the domain fault FaultCode string `json:"fault_code"` }
DomainFault represents an error that has arisen during a command that indicates the command is invalid within the domain. This can be any application-relevant incident (i.e. attempting to overdraw a a bank account)
func IsDomainFault ¶
func IsDomainFault(err error) (bool, *DomainFault)
IsDomainFault determines if the specified error is a DomainFault
func (DomainFault) Error ¶
func (curr DomainFault) Error() string
Error returns the DomainFault formatted as a string to meet the Error interface.
type Event ¶
type Event interface { }
Event is an interface that describes common attributes of events.
type EventConsumer ¶
type EventConsumer interface { // Start consuming Start() error // Stop consuming Stop() error // AddHandler adds a handler to the set of handlers for this consumer. AddHandler(handler EventHandler) }
EventConsumer is an interface that describes a consumer that allows multiple handlers to be attached, allowing events to be multiplexed to the handlers without needing to consume the same stream multiple times.
type EventDefinition ¶
type EventDefinition struct { // Detector is a function that determines if a specific runtime event // matches this event revisions type. Detector EventDetector // Factory method to create an instance of the event for this specific version. Factory EventFactory }
EventDefinition defines the structure of an event.
type EventDetector ¶
type EventDetector func(interface{}) bool
An EventDetector is a function that determines if the streamed event is an instance of the specified event revision. True indicates a match, false indicates a mis-match.
type EventFactory ¶
type EventFactory func() Event
EventFactory is a function that creates an event instance of a given type, ready to work with.
type EventHandler ¶
type EventHandler interface { // Handle the specified event and apply any consequences. Handle(event PublishedEvent) error }
EventHandler is an interface that handles events that have been delivered from a publishing source
type EventHandlerBase ¶
type EventHandlerBase struct {
// contains filtered or unexported fields
}
EventHandlerBase is a common base type for an event handler that takes events from a publishing source and handles them.
func (*EventHandlerBase) AutomaticWireup ¶
func (base *EventHandlerBase) AutomaticWireup(subject interface{})
AutomaticWireup performs automatic detection of consumer methods
func (*EventHandlerBase) Handle ¶
func (base *EventHandlerBase) Handle(event PublishedEvent) error
Handle processes an event
func (*EventHandlerBase) Initialize ¶
func (base *EventHandlerBase) Initialize(registry EventRegistry, self interface{})
Initialize the EventHandlerBase
type EventPublisher ¶
type EventPublisher interface { // Publish an event. When the method returns the event should be committed/guaranteed // to have been distributed. Publish(key string, sequence int64, event Event) error }
EventPublisher is an interface that describes an event publisher sink that allows events to be distributed to other components.
type EventRegistry ¶
type EventRegistry interface { // CreateEvent creates an instance of an event CreateEvent(EventType) Event // Domain this registry contains events for Domain() string // GetEventType determines the EventType of an event GetEventType(interface{}) (EventType, bool) // RegisterEvent registers an event RegisterEvent(Event) EventType }
EventRegistry defines a per-aggregate type registry of the events that are known to a specific aggregate.
func NewStandardEventRegistry ¶
func NewStandardEventRegistry(domain string) EventRegistry
NewStandardEventRegistry creates an instance of a plain EventRegistry that stores information about event types in an internal map. The string parameter is the name of the domain/bounded-context in which our events live.
type EventStore ¶
type EventStore interface { // CommitEvents stores any events for the specified aggregate that are uncommitted // at this point in time. CommitEvents(writer StoreWriterAdapter) error // Refresh refreshes the state of the specified aggregate from the underlying store Refresh(reader StoreLoaderAdapter) error // Close shuts down the storage driver. Close() error }
EventStore defines the behaviours of a store that can load/save event streams for an aggregate.
type EventStoreWithMiddleware ¶
type EventStoreWithMiddleware interface { EventStore // Use a middleware Use(commit CommitMiddleware, refresh RefreshMiddleware, cleanup func() error) // HandleCleanup registers a cleanup/shutdown handler HandleCleanup(cleanup func() error) // HandleCommit registers middleware to handle commits HandleCommit(middleware CommitMiddleware) // HandleRefresh registers middleware to handle refreshes HandleRefresh(middleware RefreshMiddleware) }
EventStoreWithMiddleware is an interface that describes an event-store with middleware support.
func NewMiddlewareWrapper ¶
func NewMiddlewareWrapper(inner EventStore) EventStoreWithMiddleware
NewMiddlewareWrapper is an event-store wrapper that provides the ability to insert middleware into the pipeline.
type EventType ¶
type EventType string
EventType is a string alias that represents the type of an event.
type MiddlewareFactory ¶
type MiddlewareFactory func() (CommitMiddleware, RefreshMiddleware, CloseMiddleware)
MiddlewareFactory is a middleware callback that provides all 3 items.
type NextHandler ¶
type NextHandler func() error
NextHandler is a callback function that runs the next handler in a middleware chain.
type PublishedEvent ¶
type PublishedEvent struct { Domain string `json:"domain"` // Domain the event belong sto Type EventType `json:"event_type"` // EventType Key string `json:"key"` // Event key Sequence int64 `json:"sequence"` // Sequence number Data interface{} `json:"data"` // Data }
PublishedEvent is a record of an event that's published to a queue or sink
type RefreshMiddleware ¶
type RefreshMiddleware func(reader StoreLoaderAdapter, next NextHandler) error
RefreshMiddleware is middleware that handles refresh/load operations, allowing for interception or other operations
type StateFetchFunc ¶
type StateFetchFunc func() interface{}
StateFetchFunc is a function that returns the state-value.
type StoreLoaderAdapter ¶
type StoreLoaderAdapter interface { AdapterWithEvents // ReplayEvent applies an event that has already been persisted ReplayEvent(event Event) // RestoreSnapshot applies a snapshot state, if available RestoreSnapshot(sequence int64, state interface{}) error }
StoreLoaderAdapter represents an adapter that can be used to modify an aggregate in response to a load/refresh operation
type StoreWriterAdapter ¶
type StoreWriterAdapter interface { AdapterWithEvents // GetUncommittedEvents gets the committed sequence number, and any // events that have been added since hte last commit. This can been // used by a backing store to write data. GetUncommittedEvents() (int64, []Event) // GetState returns the state of the aggregate in it's current // sequence/position, which may be required when snapshotting. GetState() interface{} }
StoreWriterAdapter is an adapter interface that defines the inputs an aggregate gives to a store for writing/committing new events.
type TestStore ¶
type TestStore struct { History []TestStoreHistoryItem ErrorFilter func() error // contains filtered or unexported fields }
TestStore is our mock-store type
func NewTestStore ¶
func NewTestStore() *TestStore
NewTestStore creates a new EventStore instance that uses the Mock provider. This allows us to track and observe actions, and can be used to validate the correctness of other implementations or actions in unit tests.
func (*TestStore) CommitEvents ¶
func (store *TestStore) CommitEvents(writer StoreWriterAdapter) error
CommitEvents stores the events
func (*TestStore) Refresh ¶
func (store *TestStore) Refresh(reader StoreLoaderAdapter) error
Refresh recovers the state of an aggregate from a known state.
type TestStoreFilter ¶
type TestStoreFilter func() error
TestStoreFilter is a callback that decides whether or not to fail the next operation with an error
func TestStoreFailureFilter ¶
func TestStoreFailureFilter(err error) TestStoreFilter
TestStoreFailureFilter is an error filter that is used to ensure a store fails in the prescribed way.
type TestStoreHistoryItem ¶
type TestStoreHistoryItem struct { Key string // Key of the aggregate Offset int64 // Offset that this set of events is from Events []Event // Events State interface{} // State instance }
TestStoreHistoryItem is the set of history items recorded by an event store.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
distribution
|
|
examples
|
|
stores
|
|
key-value
Package keyvalue contains a base implementation of much of the common logic required for a scale-out tablestore implementation of an event store.
|
Package keyvalue contains a base implementation of much of the common logic required for a scale-out tablestore implementation of an event store. |
utilities
|
|