Documentation ¶
Overview ¶
Package eventhorizon is a CQRS/ES toolkit.
Index ¶
- Variables
- func RegisterAggregate(factory func(UUID) Aggregate)
- func RegisterCommand(factory func() Command)
- func RegisterEvent(factory func() Event)
- type Aggregate
- type AggregateBase
- type AggregateCommandHandler
- type AggregateRecord
- type AggregateType
- type Command
- type CommandBus
- type CommandFieldError
- type CommandHandler
- type CommandType
- type Event
- type EventBus
- type EventHandler
- type EventHandlerType
- type EventObserver
- type EventRecord
- type EventSourcingRepository
- type EventStore
- type EventType
- type ReadRepository
- type Repository
- type Saga
- type SagaBase
- type SagaType
- type UUID
Constants ¶
This section is empty.
Variables ¶
var ErrAggregateAlreadySet = errors.New("aggregate is already set")
ErrAggregateAlreadySet is when an aggregate is already registered for a command.
var ErrAggregateNotFound = errors.New("no aggregate for command")
ErrAggregateNotFound is when no aggregate can be found.
var ErrAggregateNotRegistered = errors.New("aggregate not registered")
ErrAggregateNotRegistered is when no aggregate factory was registered.
var ErrCommandNotRegistered = errors.New("command not registered")
ErrCommandNotRegistered is when no command factory was registered.
var ErrCouldNotSaveModel = errors.New("could not save model")
ErrCouldNotSaveModel is when a model could not be found.
var ErrEventNotRegistered = errors.New("event not registered")
ErrEventNotRegistered is when no event factory was registered.
var ErrHandlerAlreadySet = errors.New("handler is already set")
ErrHandlerAlreadySet is when a handler is already registered for a command.
var ErrHandlerNotFound = errors.New("no handlers for command")
ErrHandlerNotFound is when no handler can be found.
var ErrInvalidEventBus = errors.New("invalid event bus")
ErrInvalidEventBus is when a dispatcher is created with a nil event bus.
var ErrInvalidEventStore = errors.New("invalid event store")
ErrInvalidEventStore is when a dispatcher is created with a nil event store.
var ErrMismatchedEventType = errors.New("mismatched event type and aggregate type")
ErrMismatchedEventType occurs when loaded events from ID does not match aggregate type.
var ErrModelNotFound = errors.New("could not find model")
ErrModelNotFound is when a model could not be found.
var ErrNilRepository = errors.New("repository is nil")
ErrNilRepository is when a dispatcher is created with a nil repository.
var ErrNoEventsToAppend = errors.New("no events to append")
ErrNoEventsToAppend is when no events are available to append.
Functions ¶
func RegisterAggregate ¶
RegisterAggregate registers an aggregate factory for a type. The factory is used to create concrete aggregate types when loading from the database.
An example would be:
RegisterAggregate(func(id UUID) Aggregate { return &MyAggregate{id} })
func RegisterCommand ¶
func RegisterCommand(factory func() Command)
RegisterCommand registers an command factory for a type. The factory is used to create concrete command types.
An example would be:
RegisterCommand(func() Command { return &MyCommand{} })
func RegisterEvent ¶
func RegisterEvent(factory func() Event)
RegisterEvent registers an event factory for a type. The factory is used to create concrete event types when loading from the database.
An example would be:
RegisterEvent(func() Event { return &MyEvent{} })
Types ¶
type Aggregate ¶
type Aggregate interface { // AggregateID returns the id of the aggregate. AggregateID() UUID // AggregateType returns the type name of the aggregate. // AggregateType() string AggregateType() AggregateType // Version returns the version of the aggregate. Version() int // IncrementVersion increments the aggregate version. IncrementVersion() // HandleCommand handles a command and stores events. // TODO: Rename to Handle() HandleCommand(Command) error // ApplyEvent applies an event to the aggregate by setting its values. // TODO: Rename to Apply() ApplyEvent(Event) // StoreEvent stores an event as uncommitted. // TODO: Rename to Store() StoreEvent(Event) // GetUncommittedEvents gets all uncommitted events for storing. // TODO: Rename to UncommitedEvents() GetUncommittedEvents() []Event // ClearUncommittedEvents clears all uncommitted events after storing. // TODO: Rename to ClearUncommitted() ClearUncommittedEvents() }
Aggregate is an interface representing a versioned data entity created from events. It receives commands and generates evens that are stored.
The aggregate is created/loaded and saved by the Repository inside the Dispatcher. A domain specific aggregate can either imlement the full interface, or more commonly embed *AggregateBase to take care of the common methods.
func CreateAggregate ¶
func CreateAggregate(aggregateType AggregateType, id UUID) (Aggregate, error)
CreateAggregate creates an aggregate of a type with an ID using the factory registered with RegisterAggregate.
type AggregateBase ¶
type AggregateBase struct {
// contains filtered or unexported fields
}
AggregateBase is a CQRS aggregate base to embed in domain specific aggregates.
A typical aggregate example:
type UserAggregate struct { *eventhorizon.AggregateBase name string }
The embedded aggregate is then initialized by the factory function in the callback repository.
func NewAggregateBase ¶
func NewAggregateBase(id UUID) *AggregateBase
NewAggregateBase creates an aggregate.
func (*AggregateBase) AggregateID ¶
func (a *AggregateBase) AggregateID() UUID
AggregateID returns the ID of the aggregate.
func (*AggregateBase) ClearUncommittedEvents ¶
func (a *AggregateBase) ClearUncommittedEvents()
ClearUncommittedEvents clears all uncommitted events after storing.
func (*AggregateBase) GetUncommittedEvents ¶
func (a *AggregateBase) GetUncommittedEvents() []Event
GetUncommittedEvents gets all uncommitted events for storing.
func (*AggregateBase) IncrementVersion ¶
func (a *AggregateBase) IncrementVersion()
IncrementVersion increments the aggregate version.
func (*AggregateBase) StoreEvent ¶
func (a *AggregateBase) StoreEvent(event Event)
StoreEvent stores an event until as uncommitted.
func (*AggregateBase) Version ¶
func (a *AggregateBase) Version() int
Version returns the version of the aggregate.
type AggregateCommandHandler ¶
type AggregateCommandHandler struct {
// contains filtered or unexported fields
}
AggregateCommandHandler dispatches commands to registered aggregates.
The dispatch process is as follows: 1. The handler receives a command 2. An aggregate is created or rebuilt from previous events by the repository 3. The aggregate's command handler is called 4. The aggregate stores events in response to the command 5. The new events are stored in the event store by the repository 6. The events are published to the event bus when stored by the event store
func NewAggregateCommandHandler ¶
func NewAggregateCommandHandler(repository Repository) (*AggregateCommandHandler, error)
NewAggregateCommandHandler creates a new AggregateCommandHandler.
func (*AggregateCommandHandler) HandleCommand ¶
func (h *AggregateCommandHandler) HandleCommand(command Command) error
HandleCommand handles a command with the registered aggregate. Returns ErrAggregateNotFound if no aggregate could be found.
func (*AggregateCommandHandler) SetAggregate ¶
func (h *AggregateCommandHandler) SetAggregate(aggregateType AggregateType, commandType CommandType) error
SetAggregate sets an aggregate as handler for a command.
type AggregateRecord ¶
type AggregateRecord interface { AggregateID() UUID Version() int EventRecords() []EventRecord }
AggregateRecord is a stored record of an aggregate in form of its events. NOTE: Not currently used.
type Command ¶
type Command interface { // AggregateID returns the ID of the aggregate that the command should be // handled by. AggregateID() UUID // AggregateType returns the type of the aggregate that the command can be // handled by. AggregateType() AggregateType // CommandType returns the type of the command. CommandType() CommandType }
Command is a domain command that is sent to a Dispatcher.
A command name should 1) be in present tense and 2) contain the intent (MoveCustomer vs CorrectCustomerAddress).
The command should contain all the data needed when handling it as fields. These fields can take an optional "eh" tag, which adds properties. For now only "optional" is a valid tag: `eh:"optional"`.
func CreateCommand ¶
func CreateCommand(commandType CommandType) (Command, error)
CreateCommand creates an command of a type with an ID using the factory registered with RegisterCommand.
type CommandBus ¶
type CommandBus interface { // HandleCommand handles a command on the event bus. HandleCommand(Command) error // SetHandler registers a handler with a command. SetHandler(CommandHandler, CommandType) error }
CommandBus is an interface defining an event bus for distributing events.
type CommandFieldError ¶
type CommandFieldError struct {
Field string
}
CommandFieldError is returned by Dispatch when a field is incorrect.
func (CommandFieldError) Error ¶
func (c CommandFieldError) Error() string
type CommandHandler ¶
CommandHandler is an interface that all handlers of commands should implement.
type CommandType ¶
type CommandType string
CommandType is the type of a command, used as its unique identifier.
type Event ¶
type Event interface { // AggregateID returns the ID of the aggregate that the event should be // applied to. AggregateID() UUID // AggregateType returns the type of the aggregate that the event can be // applied to. AggregateType() AggregateType // EventType returns the type of the event. EventType() EventType }
Event is a domain event describing a change that has happened to an aggregate.
An event struct and type name should:
- Be in past tense (CustomerMoved)
- Contain the intent (CustomerMoved vs CustomerAddressCorrected).
The event should contain all the data needed when applying/handling it.
func CreateEvent ¶
CreateEvent creates an event of a type with an ID using the factory registered with RegisterEvent.
type EventBus ¶
type EventBus interface { // PublishEvent publishes an event on the event bus. // Only one handler of each handler type that is registered for the event // will receive it. // All the observers will receive the event. PublishEvent(Event) // AddHandler adds a handler for an event. // TODO: Use a pattern instead of event for what to handle. AddHandler(EventHandler, EventType) // AddObserver adds an observer. // TODO: Add pattern for what to observe. AddObserver(EventObserver) }
EventBus is an interface defining an event bus for distributing events.
type EventHandler ¶
type EventHandler interface { // HandleEvent handles an event. HandleEvent(Event) // HandlerType returns the type of the handler. HandlerType() EventHandlerType }
EventHandler is a handler of events. Only one handler of the same type will receive an event.
type EventHandlerType ¶
type EventHandlerType string
EventHandlerType is the type of an event handler. Used to serve only handle an event by one handler of each type.
type EventObserver ¶
type EventObserver interface { // Notify is notifed about an event. Notify(Event) }
EventObserver is an observer of events. All observers will receive an event.
type EventRecord ¶
type EventRecord interface { // Version of the aggregate for this event (after it has been applied). Version() int // Timestamp of when the event was created. Timestamp() time.Time // The specific event and its data. Event() Event // A string representation of the event. String() string }
EventRecord is a single event with metadata such as the type and timestamp.
type EventSourcingRepository ¶
type EventSourcingRepository struct {
// contains filtered or unexported fields
}
EventSourcingRepository is an aggregate repository using event sourcing. It uses an event store for loading and saving events used to build the aggregate.
func NewEventSourcingRepository ¶
func NewEventSourcingRepository(eventStore EventStore, eventBus EventBus) (*EventSourcingRepository, error)
NewEventSourcingRepository creates a repository that will use an event store and bus.
func (*EventSourcingRepository) Load ¶
func (r *EventSourcingRepository) Load(aggregateType AggregateType, id UUID) (Aggregate, error)
Load loads an aggregate from the event store. It does so by creating a new aggregate of the type with the ID and then applies all events to it, thus making it the most current version of the aggregate.
func (*EventSourcingRepository) Save ¶
func (r *EventSourcingRepository) Save(aggregate Aggregate) error
Save saves all uncommitted events from an aggregate to the event store.
type EventStore ¶
type EventStore interface { // Save appends all events in the event stream to the store. Save(events []Event, originalVersion int) error // Load loads all events for the aggregate id from the store. Load(AggregateType, UUID) ([]EventRecord, error) }
EventStore is an interface for an event sourcing event store.
type EventType ¶
type EventType string
EventType is the type of an event, used as its unique identifier.
type ReadRepository ¶
type ReadRepository interface { // Save saves a read model with id to the repository. Save(UUID, interface{}) error // Find returns one read model with using an id. Find(UUID) (interface{}, error) // FindAll returns all read models in the repository. FindAll() ([]interface{}, error) // Remove removes a read model with id from the repository. Remove(UUID) error }
ReadRepository is a storage for read models.
type Repository ¶
type Repository interface { // Load loads the most recent version of an aggregate with a type and id. Load(AggregateType, UUID) (Aggregate, error) // Save saves the uncommittend events for an aggregate. Save(Aggregate) error }
Repository is a repository responsible for loading and saving aggregates.
type Saga ¶
type Saga interface { // SagaType returns the type of the saga. SagaType() SagaType // RunSaga handles an event in the saga that can return commands. RunSaga(event Event) []Command }
Saga is an interface for a CQRS saga that listens to events and generate commands. It is used for any long lived transaction and can be used to react on multiple events.
type SagaBase ¶
type SagaBase struct {
// contains filtered or unexported fields
}
SagaBase is a CQRS saga base to embed in domain specific sagas.
A typical saga example:
type OrderSaga struct { *eventhorizon.SagaBase amount int }
The implementing saga must set itself as the saga in the saga base.
func NewSagaBase ¶
func NewSagaBase(commandBus CommandBus, saga Saga) *SagaBase
NewSagaBase creates a new SagaBase.
func (*SagaBase) HandleEvent ¶
HandleEvent implements the HandleEvent method of the EventHandler interface.
func (*SagaBase) HandlerType ¶
func (s *SagaBase) HandlerType() EventHandlerType
HandlerType implements the HandlerType method of the EventHandler interface.
type UUID ¶
type UUID string
UUID is a unique identifier, based on the UUID spec. It must be exactly 16 bytes long.
func ParseUUID ¶
ParseUUID parses a UUID from a string representation. ParseUUID creates a UUID object from given hex string representation. The function accepts UUID string in following formats:
ParseUUID("6ba7b814-9dad-11d1-80b4-00c04fd430c8") ParseUUID("{6ba7b814-9dad-11d1-80b4-00c04fd430c8}") ParseUUID("urn:uuid:6ba7b814-9dad-11d1-80b4-00c04fd430c8")
func (UUID) MarshalJSON ¶
MarshalJSON turns UUID into a json.Marshaller.
func (*UUID) UnmarshalJSON ¶
UnmarshalJSON turns *UUID into a json.Unmarshaller.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
commandbus
|
|
eventbus
|
|
eventstore
|
|
examples
|
|
mongodb
Package mongodb contains an example of a CQRS/ES app using the MongoDB adapter.
|
Package mongodb contains an example of a CQRS/ES app using the MongoDB adapter. |
simple
Package simple contains a simple runnable example of a CQRS/ES app.
|
Package simple contains a simple runnable example of a CQRS/ES app. |
readrepository
|
|