Documentation ¶
Overview ¶
Package eventhorizon is a CQRS/ES toolkit.
Index ¶
- Constants
- Variables
- func MarshalContext(ctx context.Context) map[string]interface{}
- func MinVersionFromContext(ctx context.Context) (int, bool)
- func NamespaceFromContext(ctx context.Context) string
- func NewContextWithMinVersion(ctx context.Context, minVersion int) context.Context
- func NewContextWithMinVersionWait(ctx context.Context, minVersion int) (c context.Context, cancel func())
- func NewContextWithNamespace(ctx context.Context, namespace string) context.Context
- func RegisterAggregate(factory func(UUID) Aggregate)
- func RegisterCommand(factory func() Command)
- func RegisterContextMarshaler(f ContextMarshalFunc)
- func RegisterContextUnmarshaler(f ContextUnmarshalFunc)
- func RegisterEventData(eventType EventType, factory func() EventData)
- func UnmarshalContext(vals map[string]interface{}) context.Context
- func UnregisterCommand(commandType CommandType)
- func UnregisterEventData(eventType EventType)
- type Aggregate
- type AggregateBase
- func (a *AggregateBase) AggregateID() UUID
- func (a *AggregateBase) AggregateType() AggregateType
- func (a *AggregateBase) ClearUncommittedEvents()
- func (a *AggregateBase) IncrementVersion()
- func (a *AggregateBase) StoreEvent(eventType EventType, data EventData) Event
- func (a *AggregateBase) UncommittedEvents() []Event
- func (a *AggregateBase) Version() int
- type AggregateCommandHandler
- type AggregateType
- type ApplyEventError
- type Command
- type CommandBus
- type CommandFieldError
- type CommandHandler
- type CommandType
- type ContextMarshalFunc
- type ContextUnmarshalFunc
- type Event
- type EventBus
- type EventData
- type EventHandler
- type EventHandlerType
- type EventHandlingStrategy
- type EventObserver
- type EventPublisher
- type EventSourcingRepository
- type EventStore
- type EventStoreError
- type EventStoreMaintainer
- type EventType
- type ReadRepo
- type ReadWriteRepo
- type RepoError
- type Repository
- type UUID
- type Versionable
- type WriteRepo
Constants ¶
const DefaultMinVersionDeadline = 10 * time.Second
DefaultMinVersionDeadline is the deadline to use when creating a min version context that waits.
const DefaultNamespace = "default"
DefaultNamespace is the namespace to use if not set in the context.
Variables ¶
var ErrAggregateAlreadySet = errors.New("aggregate is already set")
ErrAggregateAlreadySet is when an aggregate is already registered for a command.
var ErrAggregateNotFound = errors.New("no aggregate for command")
ErrAggregateNotFound is when no aggregate can be found.
var ErrAggregateNotRegistered = errors.New("aggregate not registered")
ErrAggregateNotRegistered is when no aggregate factory was registered.
var ErrCommandNotRegistered = errors.New("command not registered")
ErrCommandNotRegistered is when no command factory was registered.
var ErrCouldNotSaveModel = errors.New("could not save model")
ErrCouldNotSaveModel is when a model could not be saved.
var ErrEventDataNotRegistered = errors.New("event data not registered")
ErrEventDataNotRegistered is when no event data factory was registered.
var ErrHandlerAlreadySet = errors.New("handler is already set")
ErrHandlerAlreadySet is when a handler is already registered for a command.
var ErrHandlerNotFound = errors.New("no handlers for command")
ErrHandlerNotFound is when no handler can be found.
var ErrIncorrectEventVersion = errors.New("mismatching event version")
ErrIncorrectEventVersion is when an event is for an other version of the aggregate.
var ErrIncorrectModelVersion = errors.New("incorrect model version")
ErrIncorrectModelVersion is when a model has an incorrect version.
var ErrInvalidEvent = errors.New("invalid event")
ErrInvalidEvent is when an event does not implement the Event interface.
var ErrInvalidEventBus = errors.New("invalid event bus")
ErrInvalidEventBus is when a dispatcher is created with a nil event bus.
var ErrInvalidEventStore = errors.New("invalid event store")
ErrInvalidEventStore is when a dispatcher is created with a nil event store.
var ErrMismatchedEventType = errors.New("mismatched event type and aggregate type")
ErrMismatchedEventType occurs when loaded events from ID does not match aggregate type.
var ErrModelHasNoVersion = errors.New("model has no version")
ErrModelHasNoVersion is when a model has no version number.
var ErrModelNotFound = errors.New("could not find model")
ErrModelNotFound is when a model could not be found.
var ErrNilRepository = errors.New("repository is nil")
ErrNilRepository is when a dispatcher is created with a nil repository.
var ErrNoEventsToAppend = errors.New("no events to append")
ErrNoEventsToAppend is when no events are available to append.
Functions ¶
func MarshalContext ¶
MarshalContext marshals a context into a map.
func MinVersionFromContext ¶
MinVersionFromContext returns the min version from the context.
func NamespaceFromContext ¶
NamespaceFromContext returns the namespace from the context, or the default namespace.
func NewContextWithMinVersion ¶
NewContextWithMinVersion returns the context with min version set.
func NewContextWithMinVersionWait ¶
func NewContextWithMinVersionWait(ctx context.Context, minVersion int) (c context.Context, cancel func())
NewContextWithMinVersionWait returns the context with min version and a default deadline set.
func NewContextWithNamespace ¶
NewContextWithNamespace sets the namespace to use in the context. The namespace is used to determine which database.
func RegisterAggregate ¶
RegisterAggregate registers an aggregate factory for a type. The factory is used to create concrete aggregate types when loading from the database.
An example would be:
RegisterAggregate(func(id UUID) Aggregate { return &MyAggregate{id} })
func RegisterCommand ¶
func RegisterCommand(factory func() Command)
RegisterCommand registers an command factory for a type. The factory is used to create concrete command types.
An example would be:
RegisterCommand(func() Command { return &MyCommand{} })
func RegisterContextMarshaler ¶
func RegisterContextMarshaler(f ContextMarshalFunc)
RegisterContextMarshaler registers a marshaler function used by MarshalContext.
func RegisterContextUnmarshaler ¶
func RegisterContextUnmarshaler(f ContextUnmarshalFunc)
RegisterContextUnmarshaler registers a marshaler function used by UnmarshalContext.
func RegisterEventData ¶
RegisterEventData registers an event data factory for a type. The factory is used to create concrete event data structs when loading from the database.
An example would be:
RegisterEventData(MyEventType, func() Event { return &MyEventData{} })
func UnmarshalContext ¶
UnmarshalContext unmarshals a context from a map.
func UnregisterCommand ¶
func UnregisterCommand(commandType CommandType)
UnregisterCommand removes the registration of the command factory for a type. This is mainly useful in mainenance situations where the command type needs to be switched at runtime.
func UnregisterEventData ¶
func UnregisterEventData(eventType EventType)
UnregisterEventData removes the registration of the event data factory for a type. This is mainly useful in mainenance situations where the event data needs to be switched in a migrations.
Types ¶
type Aggregate ¶
type Aggregate interface { // AggregateType returns the type name of the aggregate. // AggregateType() string AggregateType() AggregateType // AggregateID returns the id of the aggregate. AggregateID() UUID // Version returns the version of the aggregate. Version() int // Increment version increments the version of the aggregate. It should be // called after an event has been successfully applied. IncrementVersion() // CommandHandler is used to handle commands. CommandHandler // StoreEvent creates and stores a new event as uncommitted for the aggregate. StoreEvent(EventType, EventData) Event // UncommittedEvents gets all uncommitted events to commit to the store. UncommittedEvents() []Event // ClearUncommittedEvents clears all uncommitted events after committing to // the store. ClearUncommittedEvents() // ApplyEvent applies an event on the aggregate by setting its values. // If there are no errors the version shoudl be incremented by calling // IncrementVersion. ApplyEvent(context.Context, Event) error }
Aggregate is an interface representing a versioned data entity created from events. It receives commands and generates evens that are stored.
The aggregate is created/loaded and saved by the Repository inside the Dispatcher. A domain specific aggregate can either implement the full interface, or more commonly embed *AggregateBase to take care of the common methods.
func CreateAggregate ¶
func CreateAggregate(aggregateType AggregateType, id UUID) (Aggregate, error)
CreateAggregate creates an aggregate of a type with an ID using the factory registered with RegisterAggregate.
type AggregateBase ¶
type AggregateBase struct {
// contains filtered or unexported fields
}
AggregateBase is a CQRS aggregate base to embed in domain specific aggregates.
A typical aggregate example:
type UserAggregate struct { *eventhorizon.AggregateBase name string }
Using a new function to create aggregates and setting up the aggregate base is recommended:
func NewUserAggregate(id eh.UUID) *InvitationAggregate { return &UserAggregate{ AggregateBase: eh.NewAggregateBase(UserAggregateType, id), } }
The aggregate must also be registered, in this case:
func init() { eh.RegisterAggregate(func(id eh.UUID) eh.Aggregate { return NewUserAggregate(id) }) }
The aggregate must return an error if the event can not be applied, or nil to signal success which will increment the version.
func (a *Aggregate) ApplyEvent(event Event) error { switch event.EventType() { case AddUserEvent: // Apply the event data to the aggregate. } }
See the examples folder for a complete use case.
func NewAggregateBase ¶
func NewAggregateBase(aggregateType AggregateType, id UUID) *AggregateBase
NewAggregateBase creates an aggregate.
func (*AggregateBase) AggregateID ¶
func (a *AggregateBase) AggregateID() UUID
AggregateID implements the AggregateID method of the Aggregate interface.
func (*AggregateBase) AggregateType ¶
func (a *AggregateBase) AggregateType() AggregateType
AggregateType implements the AggregateType method of the Aggregate interface.
func (*AggregateBase) ClearUncommittedEvents ¶
func (a *AggregateBase) ClearUncommittedEvents()
ClearUncommittedEvents implements the ClearUncommittedEvents method of the Aggregate interface.
func (*AggregateBase) IncrementVersion ¶
func (a *AggregateBase) IncrementVersion()
IncrementVersion increments the version of the aggregate and should be called after an event has been applied successfully in ApplyEvent.
func (*AggregateBase) StoreEvent ¶
func (a *AggregateBase) StoreEvent(eventType EventType, data EventData) Event
StoreEvent implements the StoreEvent method of the Aggregate interface.
func (*AggregateBase) UncommittedEvents ¶
func (a *AggregateBase) UncommittedEvents() []Event
UncommittedEvents implements the UncommittedEvents method of the Aggregate interface.
func (*AggregateBase) Version ¶
func (a *AggregateBase) Version() int
Version implements the Version method of the Aggregate interface.
type AggregateCommandHandler ¶
type AggregateCommandHandler struct {
// contains filtered or unexported fields
}
AggregateCommandHandler dispatches commands to registered aggregates.
The dispatch process is as follows: 1. The handler receives a command 2. An aggregate is created or rebuilt from previous events by the repository 3. The aggregate's command handler is called 4. The aggregate stores events in response to the command 5. The new events are stored in the event store by the repository 6. The events are published to the event bus when stored by the event store
func NewAggregateCommandHandler ¶
func NewAggregateCommandHandler(repository Repository) (*AggregateCommandHandler, error)
NewAggregateCommandHandler creates a new AggregateCommandHandler.
func (*AggregateCommandHandler) HandleCommand ¶
func (h *AggregateCommandHandler) HandleCommand(ctx context.Context, cmd Command) error
HandleCommand handles a command with the registered aggregate. Returns ErrAggregateNotFound if no aggregate could be found.
func (*AggregateCommandHandler) SetAggregate ¶
func (h *AggregateCommandHandler) SetAggregate(aggregateType AggregateType, cmdType CommandType) error
SetAggregate sets an aggregate as handler for a command.
type ApplyEventError ¶
type ApplyEventError struct { // Event is the event that caused the error. Event Event // Err is the error that happened when applying the event. Err error }
ApplyEventError is when an event could not be applied. It contains the error and the event that caused it.
func (ApplyEventError) Error ¶
func (a ApplyEventError) Error() string
Error implements the Error method of the error interface.
type Command ¶
type Command interface { // AggregateID returns the ID of the aggregate that the command should be // handled by. AggregateID() UUID // AggregateType returns the type of the aggregate that the command can be // handled by. AggregateType() AggregateType // CommandType returns the type of the command. CommandType() CommandType }
Command is a domain command that is sent to a Dispatcher.
A command name should 1) be in present tense and 2) contain the intent (MoveCustomer vs CorrectCustomerAddress).
The command should contain all the data needed when handling it as fields. These fields can take an optional "eh" tag, which adds properties. For now only "optional" is a valid tag: `eh:"optional"`.
func CreateCommand ¶
func CreateCommand(commandType CommandType) (Command, error)
CreateCommand creates an command of a type with an ID using the factory registered with RegisterCommand.
type CommandBus ¶
type CommandBus interface { CommandHandler // SetHandler registers a handler for a command. SetHandler(CommandHandler, CommandType) error }
CommandBus is a command handler that delegates command handling to other handlers based on the command type.
type CommandFieldError ¶
type CommandFieldError struct {
Field string
}
CommandFieldError is returned by Dispatch when a field is incorrect.
func (CommandFieldError) Error ¶
func (c CommandFieldError) Error() string
type CommandHandler ¶
CommandHandler is an interface that all handlers of commands should implement.
type CommandType ¶
type CommandType string
CommandType is the type of a command, used as its unique identifier.
type ContextMarshalFunc ¶
ContextMarshalFunc is a function that marshalls any context values to a map, used for sending context on the wire.
type ContextUnmarshalFunc ¶
ContextUnmarshalFunc is a function that marshalls any context values to a map, used for sending context on the wire.
type Event ¶
type Event interface { // EventType returns the type of the event. EventType() EventType // The data attached to the event. Data() EventData // Timestamp of when the event was created. Timestamp() time.Time // AggregateType returns the type of the aggregate that the event can be // applied to. AggregateType() AggregateType // AggregateID returns the ID of the aggregate that the event should be // applied to. AggregateID() UUID // Version of the aggregate for this event (after it has been applied). Version() int // A string representation of the event. String() string }
Event is a domain event describing a change that has happened to an aggregate.
An event struct and type name should:
- Be in past tense (CustomerMoved)
- Contain the intent (CustomerMoved vs CustomerAddressCorrected).
The event should contain all the data needed when applying/handling it.
func NewEventForAggregate ¶
func NewEventForAggregate(eventType EventType, data EventData, aggregateType AggregateType, aggregateID UUID, version int) Event
NewEventForAggregate creates a new event with a type and data, setting its timestamp. It also sets the aggregate data on it.
type EventBus ¶
type EventBus interface { EventHandler // AddHandler adds a handler for an event. AddHandler(EventHandler, EventType) // SetPublisher sets the publisher to use for publishing the event after all // handlers have been run. SetPublisher(EventPublisher) // SetHandlingStrategy will set the strategy to use for handling events. SetHandlingStrategy(EventHandlingStrategy) }
EventBus is an event handler that handles events with the correct subhandlers after which it publishes the event using the publisher.
type EventData ¶
type EventData interface{}
EventData is any additional data for an event.
func CreateEventData ¶
CreateEventData creates an event data of a type using the factory registered with RegisterEventData.
type EventHandler ¶
type EventHandler interface { // HandleEvent handles an event. HandleEvent(context.Context, Event) error // HandlerType returns the type of the handler. HandlerType() EventHandlerType }
EventHandler is a handler of events. Only one handler of the same type will receive an event.
type EventHandlerType ¶
type EventHandlerType string
EventHandlerType is the type of an event handler. Used to serve only handle an event by one handler of each type.
type EventHandlingStrategy ¶
type EventHandlingStrategy int
EventHandlingStrategy is the strategy to use when handling events.
const ( // SimpleEventHandlingStrategy will handle events in the same goroutine and // wait for them to complete before handling the next. SimpleEventHandlingStrategy EventHandlingStrategy = iota // AsyncEventHandlingStrategy will handle events concurrently in their own // goroutines and not wait for them to finish. AsyncEventHandlingStrategy )
type EventObserver ¶
type EventObserver interface { // Notify is notifed about an event. Notify(context.Context, Event) error }
EventObserver is an observer of events. All observers will receive an event.
type EventPublisher ¶
type EventPublisher interface { // PublishEvent publishes the event to all observers. PublishEvent(context.Context, Event) error // AddObserver adds an observer. AddObserver(EventObserver) // SetHandlingStrategy will set the strategy to use for handling events. SetHandlingStrategy(EventHandlingStrategy) }
EventPublisher is a publisher of events to observers.
type EventSourcingRepository ¶
type EventSourcingRepository struct {
// contains filtered or unexported fields
}
EventSourcingRepository is an aggregate repository using event sourcing. It uses an event store for loading and saving events used to build the aggregate.
func NewEventSourcingRepository ¶
func NewEventSourcingRepository(eventStore EventStore, eventBus EventBus) (*EventSourcingRepository, error)
NewEventSourcingRepository creates a repository that will use an event store and bus.
func (*EventSourcingRepository) Load ¶
func (r *EventSourcingRepository) Load(ctx context.Context, aggregateType AggregateType, id UUID) (Aggregate, error)
Load loads an aggregate from the event store. It does so by creating a new aggregate of the type with the ID and then applies all events to it, thus making it the most current version of the aggregate.
type EventStore ¶
type EventStore interface { // Save appends all events in the event stream to the store. Save(ctx context.Context, events []Event, originalVersion int) error // Load loads all events for the aggregate id from the store. Load(context.Context, AggregateType, UUID) ([]Event, error) }
EventStore is an interface for an event sourcing event store.
type EventStoreError ¶
type EventStoreError struct { // Err is the error. Err error // BaseErr is an optional underlying error, for example from the DB driver. BaseErr error // Namespace is the namespace for the error. Namespace string }
EventStoreError is an error in the event store, with the namespace.
func (EventStoreError) Error ¶
func (e EventStoreError) Error() string
Error implements the Error method of the errors.Error interface.
type EventStoreMaintainer ¶
type EventStoreMaintainer interface { EventStore // Replace an event, the version must match. Useful for maintenance actions. // Returns ErrAggregateNotFound if there is no aggregate. Replace(context.Context, Event) error // RenameEvent renames all instances of the event type. RenameEvent(ctx context.Context, from, to EventType) error }
EventStoreMaintainer is an interface for a maintainer of an EventStore. NOTE: Should not be used in apps, useful for migration tools etc.
type EventType ¶
type EventType string
EventType is the type of an event, used as its unique identifier.
type ReadRepo ¶
type ReadRepo interface { // Parent returns the parent read repository, if there is one. // Useful for iterating a wrapped set of repositories to get a specific one. Parent() ReadRepo // Find returns a read model with an id. Find(context.Context, UUID) (interface{}, error) // FindAll returns all read models in the repository. FindAll(context.Context) ([]interface{}, error) }
ReadRepo is a storage for read models.
type ReadWriteRepo ¶
ReadWriteRepo is a combined read and write repo, mainly useful for testing.
type RepoError ¶
type RepoError struct { // Err is the error. Err error // BaseErr is an optional underlying error, for example from the DB driver. BaseErr error // Namespace is the namespace for the error. Namespace string }
RepoError is an error in the read repository, with the namespace.
type Repository ¶
type Repository interface { // Load loads the most recent version of an aggregate with a type and id. Load(context.Context, AggregateType, UUID) (Aggregate, error) // Save saves the uncommittend events for an aggregate. Save(context.Context, Aggregate) error }
Repository is a repository responsible for loading and saving aggregates.
type UUID ¶
type UUID string
UUID is a unique identifier, based on the UUID spec. It must be exactly 16 bytes long.
func ParseUUID ¶
ParseUUID parses a UUID from a string representation. ParseUUID creates a UUID object from given hex string representation. The function accepts UUID string in following formats:
ParseUUID("6ba7b814-9dad-11d1-80b4-00c04fd430c8") ParseUUID("{6ba7b814-9dad-11d1-80b4-00c04fd430c8}") ParseUUID("urn:uuid:6ba7b814-9dad-11d1-80b4-00c04fd430c8")
func (UUID) MarshalJSON ¶
MarshalJSON turns UUID into a json.Marshaller.
func (*UUID) UnmarshalJSON ¶
UnmarshalJSON turns *UUID into a json.Unmarshaller.
type Versionable ¶
type Versionable interface { // AggregateVersion returns the aggregate version that a read model represents. AggregateVersion() int }
Versionable is a read model that has a version number saved, used by version.ReadRepo.FindMinVersion().
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
commandbus
|
|
commandhandler
|
|
eventbus
|
|
eventhandler
|
|
eventstore
|
|
examples
|
|
mongodb
Package mongodb contains an example of a CQRS/ES app using the MongoDB adapter.
|
Package mongodb contains an example of a CQRS/ES app using the MongoDB adapter. |
simple
Package simple contains a simple runnable example of a CQRS/ES app.
|
Package simple contains a simple runnable example of a CQRS/ES app. |
publisher
|
|
repo
|
|