Documentation ¶
Overview ¶
Package eventhorizon is a CQRS/ES toolkit.
Package eventhorizon is a CQRS/ES toolkit.
Index ¶
- Constants
- Variables
- func AggregateTypeFromContext(ctx context.Context) string
- func CheckCommand(cmd Command) error
- func MarshalContext(ctx context.Context) map[string]interface{}
- func MinVersionFromContext(ctx context.Context) (int, bool)
- func NamespaceFromContext(ctx context.Context) string
- func NewContextWithMinVersion(ctx context.Context, minVersion int) context.Context
- func NewContextWithMinVersionWait(ctx context.Context, minVersion int) (c context.Context, cancel func())
- func NewContextWithNamespace(ctx context.Context, namespace string) context.Context
- func NewContextWithNamespaceAndType(ctx context.Context, namespace, aggregateType string) context.Context
- func RegisterAggregate(factory func(string) Aggregate)
- func RegisterCommand(factory func() Command)
- func RegisterContextMarshaler(f ContextMarshalFunc)
- func RegisterContextUnmarshaler(f ContextUnmarshalFunc)
- func RegisterEventData(eventType EventType, factory func() EventData)
- func UnmarshalContext(vals map[string]interface{}) context.Context
- func UnregisterCommand(commandType CommandType)
- func UnregisterEventData(eventType EventType)
- type Aggregate
- type AggregateStore
- type AggregateType
- type Command
- type CommandFieldError
- type CommandHandler
- type CommandHandlerFunc
- type CommandHandlerMiddleware
- type CommandType
- type ContextMarshalFunc
- type ContextUnmarshalFunc
- type Entity
- type Event
- type EventBus
- type EventBusError
- type EventData
- type EventHandler
- type EventHandlerFunc
- type EventHandlerMiddleware
- type EventHandlerType
- type EventMatcher
- type EventStore
- type EventStoreError
- type EventStoreMaintainer
- type EventType
- type Iter
- type ReadRepo
- type ReadWriteRepo
- type RepoError
- type Snapshot
- type SnapshotStore
- type SnapshotStoreError
- type Versionable
- type WriteRepo
Constants ¶
const DefaultAggregateType = "event_default"
const DefaultMinVersionDeadline = 10 * time.Second
DefaultMinVersionDeadline is the deadline to use when creating a min version context that waits.
const DefaultNamespace = "default"
DefaultNamespace is the namespace to use if not set in the context.
Variables ¶
var ErrAggregateNotFound = errors.New("aggregate not found")
ErrAggregateNotFound is when no aggregate can be found.
var ErrAggregateNotRegistered = errors.New("aggregate not registered")
ErrAggregateNotRegistered is when no aggregate factory was registered.
var ErrCommandNotRegistered = errors.New("command not registered")
ErrCommandNotRegistered is when no command factory was registered.
var ErrCouldNotSaveEntity = errors.New("could not save entity")
ErrCouldNotSaveEntity is when a entity could not be saved.
var ErrEntityHasNoVersion = errors.New("entity has no version")
ErrEntityHasNoVersion is when an entity has no version number.
var ErrEntityNotFound = errors.New("could not find entity")
ErrEntityNotFound is when a entity could not be found.
var ErrEventDataNotRegistered = errors.New("event data not registered")
ErrEventDataNotRegistered is when no event data factory was registered.
var ErrIncorrectEntityVersion = errors.New("incorrect entity version")
ErrIncorrectEntityVersion is when an entity has an incorrect version.
var ErrIncorrectEventVersion = errors.New("mismatching event version")
ErrIncorrectEventVersion is when an event is for an other version of the aggregate.
var ErrInvalidEvent = errors.New("invalid event")
ErrInvalidEvent is when an event does not implement the Event interface.
var ErrMissingEntityID = errors.New("missing entity ID")
ErrMissingEntityID is when a entity has no ID.
var ErrNoEventsToAppend = errors.New("no events to append")
ErrNoEventsToAppend is when no events are available to append.
Functions ¶
func AggregateTypeFromContext ¶ added in v0.5.1
func MarshalContext ¶
MarshalContext marshals a context into a map.
func MinVersionFromContext ¶
MinVersionFromContext returns the min version from the context.
func NamespaceFromContext ¶
NamespaceFromContext returns the namespace from the context, or the default namespace.
func NewContextWithMinVersion ¶
NewContextWithMinVersion returns the context with min version set.
func NewContextWithMinVersionWait ¶
func NewContextWithMinVersionWait(ctx context.Context, minVersion int) (c context.Context, cancel func())
NewContextWithMinVersionWait returns the context with min version and a default deadline set.
func NewContextWithNamespace ¶
NewContextWithNamespace sets the namespace to use in the context. The namespace is used to determine which database.
func NewContextWithNamespaceAndType ¶ added in v0.5.1
func RegisterAggregate ¶
RegisterAggregate registers an aggregate factory for a type. The factory is used to create concrete aggregate types when loading from the database.
An example would be:
RegisterAggregate(func(id UUID) Aggregate { return &MyAggregate{id} })
func RegisterCommand ¶
func RegisterCommand(factory func() Command)
RegisterCommand registers an command factory for a type. The factory is used to create concrete command types.
An example would be:
RegisterCommand(func() Command { return &MyCommand{} })
func RegisterContextMarshaler ¶
func RegisterContextMarshaler(f ContextMarshalFunc)
RegisterContextMarshaler registers a marshaler function used by MarshalContext.
func RegisterContextUnmarshaler ¶
func RegisterContextUnmarshaler(f ContextUnmarshalFunc)
RegisterContextUnmarshaler registers a marshaler function used by UnmarshalContext.
func RegisterEventData ¶
RegisterEventData registers an event data factory for a type. The factory is used to create concrete event data structs when loading from the database.
An example would be:
RegisterEventData(MyEventType, func() Event { return &MyEventData{} })
func UnmarshalContext ¶
UnmarshalContext unmarshals a context from a map.
func UnregisterCommand ¶
func UnregisterCommand(commandType CommandType)
UnregisterCommand removes the registration of the command factory for a type. This is mainly useful in mainenance situations where the command type needs to be switched at runtime.
func UnregisterEventData ¶
func UnregisterEventData(eventType EventType)
UnregisterEventData removes the registration of the event data factory for a type. This is mainly useful in mainenance situations where the event data needs to be switched in a migrations.
Types ¶
type Aggregate ¶
type Aggregate interface { // Entity provides the ID of the aggregate. Entity // AggregateType returns the type name of the aggregate. // AggregateType() string AggregateType() AggregateType // CommandHandler is used to handle commands. CommandHandler }
Aggregate is an interface representing a versioned data entity created from events. It receives commands and generates events that are stored.
The aggregate is created/loaded and saved by the Repository inside the Dispatcher. A domain specific aggregate can either implement the full interface, or more commonly embed *AggregateBase to take care of the common methods.
func CreateAggregate ¶
func CreateAggregate(aggregateType AggregateType, id string) (Aggregate, error)
CreateAggregate creates an aggregate of a type with an ID using the factory registered with RegisterAggregate.
type AggregateStore ¶
type AggregateStore interface { // Load loads the most recent version of an aggregate with a type and id. Load(context.Context, AggregateType, string) (Aggregate, error) // Save saves the uncommittend events for an aggregate. Save(context.Context, Aggregate) error }
AggregateStore is responsible for loading and saving aggregates.
type Command ¶
type Command interface { // AggregateID returns the ID of the aggregate that the command should be // handled by. AggregateID() string // AggregateType returns the type of the aggregate that the command can be // handled by. AggregateType() AggregateType // CommandType returns the type of the command. CommandType() CommandType }
Command is a domain command that is sent to a Dispatcher.
A command name should 1) be in present tense and 2) contain the intent (MoveCustomer vs CorrectCustomerAddress).
The command should contain all the data needed when handling it as fields. These fields can take an optional "eh" tag, which adds properties. For now only "optional" is a valid tag: `eh:"optional"`.
func CreateCommand ¶
func CreateCommand(commandType CommandType) (Command, error)
CreateCommand creates an command of a type with an ID using the factory registered with RegisterCommand.
type CommandFieldError ¶
type CommandFieldError struct {
Field string
}
CommandFieldError is returned by Dispatch when a field is incorrect.
func (CommandFieldError) Error ¶
func (c CommandFieldError) Error() string
type CommandHandler ¶
CommandHandler is an interface that all handlers of commands should implement.
func UseCommandHandlerMiddleware ¶
func UseCommandHandlerMiddleware(h CommandHandler, middleware ...CommandHandlerMiddleware) CommandHandler
UseCommandHandlerMiddleware wraps a CommandHandler in one or more middleware.
type CommandHandlerFunc ¶
CommandHandlerFunc is a function that can be used as a command handler.
func (CommandHandlerFunc) HandleCommand ¶
func (h CommandHandlerFunc) HandleCommand(ctx context.Context, cmd Command) error
HandleCommand implements the HandleCommand method of the CommandHandler.
type CommandHandlerMiddleware ¶
type CommandHandlerMiddleware func(CommandHandler) CommandHandler
CommandHandlerMiddleware is a function that middlewares can implement to be able to chain.
type CommandType ¶
type CommandType string
CommandType is the type of a command, used as its unique identifier.
type ContextMarshalFunc ¶
ContextMarshalFunc is a function that marshalls any context values to a map, used for sending context on the wire.
type ContextUnmarshalFunc ¶
ContextUnmarshalFunc is a function that marshalls any context values to a map, used for sending context on the wire.
type Entity ¶
type Entity interface { // EntityID returns the ID of the entity. EntityID() string }
Entity is an item which is identified by an ID.
From http://cqrs.nu/Faq: "Entities or reference types are characterized by having an identity that's not tied to their attribute values. All attributes in an entity can change and it's still "the same" entity. Conversely, two entities might be equivalent in all their attributes, but will still be distinct."
type Event ¶
type Event interface { ID() string // EventType returns the type of the event. EventType() EventType // The data attached to the event. Data() EventData // Timestamp of when the event was created. Timestamp() time.Time // AggregateType returns the type of the aggregate that the event can be // applied to. AggregateType() AggregateType // AggregateID returns the ID of the aggregate that the event should be // applied to. AggregateID() string // Version of the aggregate for this event (after it has been applied). Version() int // A string representation of the event. String() string }
Event is a domain event describing a change that has happened to an aggregate.
An event struct and type name should:
- Be in past tense (CustomerMoved)
- Contain the intent (CustomerMoved vs CustomerAddressCorrected).
The event should contain all the data needed when applying/handling it.
func NewEventForAggregate ¶
func NewEventForAggregate(eventType EventType, data EventData, timestamp time.Time, aggregateType AggregateType, aggregateID string, version int) Event
NewEventForAggregate creates a new event with a type and data, setting its timestamp. It also sets the aggregate data on it.
type EventBus ¶
type EventBus interface { // PublishEvent publishes the event on the bus. PublishEvent(context.Context, Event) error // AddHandler adds a handler for an event. Panics if either the matcher // or handler is nil or the handler is already added. AddHandler(EventMatcher, EventHandler) // AddObserver adds an observer. Panics if the observer is nil or the observer // is already added. AddObserver(EventMatcher, EventHandler) // Errors returns an error channel where async handling errors are sent. Errors() <-chan EventBusError }
EventBus sends published events to one of each handler type and all observers. That means that if the same handler is registered on multiple nodes only one of them will receive the event. In contrast all observers registered on multiple nodes will receive the event. Events are not garantued to be handeled or observed in order.
type EventBusError ¶ added in v0.5.1
EventBusError is an async error containing the error returned from a handler or observer and the event that it happened on.
func (EventBusError) Error ¶ added in v0.5.1
func (e EventBusError) Error() string
Error implements the Error method of the error interface.
type EventData ¶
type EventData interface{}
EventData is any additional data for an event.
func CreateEventData ¶
CreateEventData creates an event data of a type using the factory registered with RegisterEventData.
type EventHandler ¶
type EventHandler interface { // HandlerType is the type of the handler. HandlerType() EventHandlerType // HandleEvent handles an event. HandleEvent(context.Context, Event) error }
EventHandler is a handler of events. If registered on a bus as a handler only one handler of the same type will receive each event. If registered on a bus as an observer all handlers of the same type will receive each event.
func UseEventHandlerMiddleware ¶
func UseEventHandlerMiddleware(h EventHandler, middleware ...EventHandlerMiddleware) EventHandler
UseEventHandlerMiddleware wraps a EventHandler in one or more middleware.
type EventHandlerFunc ¶
EventHandlerFunc is a function that can be used as a event handler.
func (EventHandlerFunc) HandleEvent ¶
func (h EventHandlerFunc) HandleEvent(ctx context.Context, e Event) error
HandleEvent implements the HandleEvent method of the EventHandler.
func (EventHandlerFunc) HandlerType ¶ added in v0.5.1
func (h EventHandlerFunc) HandlerType() EventHandlerType
HandlerType implements the HandlerType method of the EventHandler.
type EventHandlerMiddleware ¶
type EventHandlerMiddleware func(EventHandler) EventHandler
EventHandlerMiddleware is a function that middlewares can implement to be able to chain.
type EventHandlerType ¶ added in v0.5.1
type EventHandlerType string
EventHandlerType is the type of an event handler, used as its unique identifier.
type EventMatcher ¶
EventMatcher is a func that can match event to a criteria.
func MatchAggregate ¶
func MatchAggregate(t AggregateType) EventMatcher
MatchAggregate matches a specific aggregate type, nil events never match.
func MatchAnyEventOf ¶
func MatchAnyEventOf(types ...EventType) EventMatcher
MatchAnyEventOf matches if any of several matchers matches.
func MatchAnyOf ¶
func MatchAnyOf(matchers ...EventMatcher) EventMatcher
MatchAnyOf matches if any of several matchers matches.
func MatchEvent ¶
func MatchEvent(t EventType) EventMatcher
MatchEvent matches a specific event type, nil events never match.
type EventStore ¶
type EventStore interface { // Save appends all events in the event stream to the store. Save(ctx context.Context, events []Event, originalVersion int) error // Load loads all events for the aggregate id from the store. Load(context.Context, string) ([]Event, context.Context, error) }
EventStore is an interface for an event sourcing event store.
type EventStoreError ¶
type EventStoreError struct { // Err is the error. Err error // BaseErr is an optional underlying error, for example from the DB driver. BaseErr error // Namespace is the namespace for the error. Namespace string // AggregateType AggregateType string }
EventStoreError is an error in the event store, with the namespace.
func (EventStoreError) Error ¶
func (e EventStoreError) Error() string
Error implements the Error method of the errors.Error interface.
type EventStoreMaintainer ¶
type EventStoreMaintainer interface { EventStore // Replace an event, the version must match. Useful for maintenance actions. // Returns ErrAggregateNotFound if there is no aggregate. Replace(context.Context, Event) error // RenameEvent renames all instances of the event type. RenameEvent(ctx context.Context, from, to EventType) error }
EventStoreMaintainer is an interface for a maintainer of an EventStore. NOTE: Should not be used in apps, useful for migration tools etc.
type EventType ¶
type EventType string
EventType is the type of an event, used as its unique identifier.
type Iter ¶
type Iter interface { Next() bool Value() interface{} // Close must be called after the last Next() to retrieve error if any Close() error }
Iter is a stateful iterator object that when called Next() readies the next value that can be retrieved from Value(). Enables incremental object retrieval from repos that support it. You must call Close() on each Iter even when results were delivered without apparent error.
type ReadRepo ¶
type ReadRepo interface { // Parent returns the parent read repository, if there is one. // Useful for iterating a wrapped set of repositories to get a specific one. Parent() ReadRepo // Find returns an entity for an ID. Find(context.Context, string) (Entity, error) // FindAll returns all entities in the repository. FindAll(context.Context) ([]Entity, error) }
ReadRepo is a read repository for entities.
type ReadWriteRepo ¶
ReadWriteRepo is a combined read and write repo, mainly useful for testing.
type RepoError ¶
type RepoError struct { // Err is the error. Err error // BaseErr is an optional underlying error, for example from the DB driver. BaseErr error // Namespace is the namespace for the error. Namespace string AggregateType string }
RepoError is an error in the read repository, with the namespace.
type Snapshot ¶ added in v0.6.0
type Snapshot interface { RawDataI() interface{} Version() int AggregateType() AggregateType AggregateId() string }
type SnapshotStore ¶ added in v0.6.0
type SnapshotStoreError ¶ added in v0.6.0
type SnapshotStoreError struct { // Err is the error. Err error // BaseErr is an optional underlying error, for example from the DB driver. BaseErr error // Namespace is the namespace for the error. Namespace string // AggregateType AggregateType string }
func (SnapshotStoreError) Error ¶ added in v0.6.0
func (e SnapshotStoreError) Error() string
Error implements the Error method of the errors.Error interface.
type Versionable ¶
type Versionable interface { // AggregateVersion returns the version of the item. AggregateVersion() int }
Versionable is an item that has a version number, used by version.ReadRepo.FindMinVersion().
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
aggregatestore
|
|
commandhandler
|
|
eventhandler
|
|
examples
|
|
guestlist/memory
Package memory contains an example of a CQRS/ES app using memory as DB.
|
Package memory contains an example of a CQRS/ES app using memory as DB. |
guestlist/mongodb
Package mongodb contains an example of a CQRS/ES app using the MongoDB adapter.
|
Package mongodb contains an example of a CQRS/ES app using the MongoDB adapter. |
middleware
|
|
snapshotstore
|
|