README
¶
Event Horizon
Event Horizon is a CQRS/ES toolkit for Go.
Event Horizon is used in at least one production system but may not be considered stable just yet!
CQRS stands for Command Query Responsibility Segregation and is a technique where object access (the Query part) and modification (the Command part) are separated from each other. This helps in designing complex data models where the actions can be totally independent from the data output.
ES stands for Event Sourcing and is a technique where all events that have happened in a system are recorded, and all future actions are based on the events instead of a single data model. The main benefit of adding Event Sourcing is tracability of changes which can be used for example in audit logging. Additionally, "incorrect" events that happened in the past (for example due to a bug) can be edited which will make the current data "correct", as that is based on the events.
Read more about CQRS/ES from one of the major authors/contributors on the subject: http://codebetter.com/gregyoung/2010/02/16/cqrs-task-based-uis-event-sourcing-agh/
Other material on CQRS/ES:
- http://martinfowler.com/bliki/CQRS.html
- http://cqrs.nu
- https://groups.google.com/forum/#!forum/dddcqrs
Inspired by the following libraries/examples:
- https://github.com/edumentab/cqrs-starter-kit
- https://github.com/pjvds/go-cqrs
- http://www.codeproject.com/Articles/555855/Introduction-to-CQRS
- https://github.com/qandidate-labs/broadway
Suggestions are welcome!
Usage
See the example folder for a basic usage example to get you started.
Changes
2015-01-20
Addded CommandBus that routes commands to handlers. This is for upcoming Saga support. The dispatcher is now renamed to AggregateCommandHandler and must be added to the CommandBus. At the moment Commands have to registered both in the handler and on the bus, this may change in the future.
Added MongoDB ReadRepository implementation. Use with "-tags mongo", same as the MongoDB event store.
2015-01-14
Added Repository that creates/loads and saves aggregates. This needed additional methods in the Aggregate interface.
Removed the reflection based dispatcher, the code was worse performing and harder to test. There was also a bit too much magic going on. If you would like it back open an issue for further discussion.
Renamed Repository to ReadRepository to better adhere to CQRS standards and to free the name to a Aggregate/Saga repository in development.
2015-01-12
Added an EventStore implementation for MongoDB. It currently uses one document per aggregate with all events as an array to make the most out of MongoDBs lack of trasactions. It still takes two operations when adding events but at least there is a check that the version has not been changed by another operation in between. If you want to use the MongoDB event store add "-tags mongo" to your project build.
2015-01-07
As of this version commands and events are recommended to be passed around as pointers, instead of values as the previous versions did. Passing as values may still work, but is not tested at the momemnt. It should not requrie much changes in applications using Event Horizon, simple pass all commands and events with & before them or create them as *XXXCommand, see the examples and tests for usage. There are also some other API changes to method names, mostly with using "handler" as a more common term.
License
Event Horizon is licensed under Apache License 2.0
Documentation
¶
Overview ¶
Package eventhorizon is a CQRS/ES toolkit.
Index ¶
- Variables
- type Aggregate
- type AggregateBase
- type AggregateCommandHandler
- type AggregateRecord
- type CallbackRepository
- type Command
- type CommandBus
- type CommandFieldError
- type CommandHandler
- type Event
- type EventBus
- type EventHandler
- type EventRecord
- type EventStore
- type InternalCommandBus
- type InternalEventBus
- type MemoryEventStore
- type MemoryReadRepository
- type MongoEventStore
- func (s *MongoEventStore) Clear() error
- func (s *MongoEventStore) Close() error
- func (s *MongoEventStore) Load(id string) ([]Event, error)
- func (s *MongoEventStore) RegisterEventType(event Event, factory func() Event) error
- func (s *MongoEventStore) Save(events []Event) error
- func (s *MongoEventStore) SetDB(db string)
- type MongoReadRepository
- func (r *MongoReadRepository) Clear() error
- func (r *MongoReadRepository) Close() error
- func (r *MongoReadRepository) Find(id string) (interface{}, error)
- func (r *MongoReadRepository) FindAll() ([]interface{}, error)
- func (r *MongoReadRepository) FindCustom(callback func(*mgo.Collection) *mgo.Query) ([]interface{}, error)
- func (r *MongoReadRepository) Remove(id string) error
- func (r *MongoReadRepository) Save(id string, model interface{}) error
- func (r *MongoReadRepository) SetDB(db string)
- func (r *MongoReadRepository) SetModel(factory func() interface{})
- type PostgresEventStore
- type PostgresReadRepository
- func (r *PostgresReadRepository) Clear() error
- func (r *PostgresReadRepository) Close() error
- func (r *PostgresReadRepository) Find(id string) (interface{}, error)
- func (r *PostgresReadRepository) FindAll() ([]interface{}, error)
- func (r *PostgresReadRepository) Remove(id string) error
- func (r *PostgresReadRepository) Save(id string, model interface{}) error
- func (r *PostgresReadRepository) SetModel(factory func() interface{})
- type RabbitMQCommandBus
- func (b *RabbitMQCommandBus) Close() error
- func (b *RabbitMQCommandBus) HandleCommand(command Command) error
- func (b *RabbitMQCommandBus) PublishCommand(command Command) error
- func (b *RabbitMQCommandBus) RegisterCommandType(command Command, factory func() Command) error
- func (b *RabbitMQCommandBus) SetHandler(handler CommandHandler, command Command) error
- type RabbitMQEventBus
- func (b *RabbitMQEventBus) AddGlobalHandler(handler EventHandler)
- func (b *RabbitMQEventBus) AddHandler(handler EventHandler, event Event)
- func (b *RabbitMQEventBus) AddLocalHandler(handler EventHandler)
- func (b *RabbitMQEventBus) Close() error
- func (b *RabbitMQEventBus) PublishEvent(event Event)
- func (b *RabbitMQEventBus) RegisterEventType(event Event, factory func() Event) error
- type ReadRepository
- type RedisEventBus
- func (b *RedisEventBus) AddGlobalHandler(handler EventHandler)
- func (b *RedisEventBus) AddHandler(handler EventHandler, event Event)
- func (b *RedisEventBus) AddLocalHandler(handler EventHandler)
- func (b *RedisEventBus) Close() error
- func (b *RedisEventBus) PublishEvent(event Event)
- func (b *RedisEventBus) RegisterEventType(event Event, factory func() Event) error
- type RemoteCommandBus
- type RemoteEventBus
- type RemoteEventStore
- type RemoteHandler
- type RemoteReadRepository
- type Repository
- type TraceEventStore
Constants ¶
This section is empty.
Variables ¶
var ErrAggregateAlreadyRegistered = errors.New("aggregate is already registered")
Error returned when an aggregate is already registered.
var ErrAggregateAlreadySet = errors.New("aggregate is already set")
Error returned when an aggregate is already registered for a command.
var ErrAggregateNotFound = errors.New("no aggregate for command")
Error returned when no aggregate can be found.
var ErrAggregateNotRegistered = errors.New("aggregate is not registered")
Error returned when an aggregate is not registered.
var ErrCouldNotClearDB = errors.New("could not clear database")
ErrCouldNotClearDB returned when the database could not be cleared.
var ErrCouldNotCreateTables = errors.New("could not create tables")
ErrCouldNotCreateTables returned when necessary tables could not be created.
var ErrCouldNotDialDB = errors.New("could not dial database")
ErrCouldNotDialDB returned when the database could not be dialed.
var ErrCouldNotLoadAggregate = errors.New("could not load aggregate")
ErrCouldNotLoadAggregate returned when an aggregate could not be loaded.
var ErrCouldNotMarshalEvent = errors.New("could not marshal event")
ErrCouldNotMarshalEvent returned when an event could not be marshaled into BSON.
var ErrCouldNotSaveAggregate = errors.New("could not save aggregate")
ErrCouldNotSaveAggregate returned when an aggregate could not be saved.
var ErrCouldNotSaveEvent = errors.New("could not save event")
ErrCouldNotSaveEvent returned when an event could not be saved.
var ErrCouldNotSaveModel = errors.New("could not save model")
Error returned when a model could not be found.
var ErrCouldNotUnmarshalEvent = errors.New("could not unmarshal event")
ErrCouldNotUnmarshalEvent returned when an event could not be unmarshaled into a concrete type.
var ErrEventNotRegistered = errors.New("event not registered")
ErrEventNotRegistered returned when an event is not registered.
var ErrHandlerAlreadySet = errors.New("handler is already set")
ErrHandlerAlreadySet returned when a handler is already registered for a command.
var ErrHandlerNotFound = errors.New("no handlers for command")
ErrHandlerNotFound returned when no handler can be found.
var ErrInvalidEvent = errors.New("invalid event")
ErrInvalidEvent returned when an event does not implement the Event interface.
var ErrModelNotFound = errors.New("could not find model")
Error returned when a model could not be found.
var ErrModelNotSet = errors.New("model not set")
ErrModelNotSet returned when an model is not set on a read repository.
var ErrNilEventStore = errors.New("event store is nil")
Error returned when a dispatcher is created with a nil event store.
var ErrNilRepository = errors.New("repository is nil")
Error returned when a dispatcher is created with a nil repository.
var ErrNoDBSession = errors.New("no database session")
ErrNoDBSession returned when no database session is set.
var ErrNoEventStoreDefined = errors.New("no event store defined")
ErrNoEventStoreDefined returned if no event store has been defined.
var ErrNoEventsFound = errors.New("could not find events")
ErrNoEventsFound returned when no events are found.
var ErrNoEventsToAppend = errors.New("no events to append")
ErrNoEventsToAppend returned when no events are available to append.
Functions ¶
This section is empty.
Types ¶
type Aggregate ¶
type Aggregate interface { // AggregateID returns the id of the aggregate. AggregateID() string // AggregateType returns the type name of the aggregate. AggregateType() string // Version returns the version of the aggregate. Version() int // IncrementVersion increments the aggregate version. IncrementVersion() // HandleCommand handles a command and stores events. HandleCommand(Command) error // ApplyEvent applies an event to the aggregate by setting its values. ApplyEvent(events Event) // StoreEvent stores an event until as uncommitted. StoreEvent(Event) // GetUncommittedEvents gets all uncommitted events for storing. GetUncommittedEvents() []Event // ClearUncommittedEvents clears all uncommitted events after storing. ClearUncommittedEvents() }
Aggregate is an interface representing a versioned data entity created from events. It receives commands and generates evens that are stored.
The aggregate is created/loaded and saved by the Repository inside the Dispatcher. A domain specific aggregate can either imlement the full interface, or more commonly embed *AggregateBase to take care of the common methods.
type AggregateBase ¶
type AggregateBase struct {
// contains filtered or unexported fields
}
AggregateBase is a CQRS aggregate base to embed in domain specific aggregates.
A typical aggregate example:
type UserAggregate struct { *eventhorizon.AggregateBase name string }
The embedded aggregate is then initialized by the factory function in the callback repository.
func NewAggregateBase ¶
func NewAggregateBase(id string) *AggregateBase
NewAggregateBase creates an aggregate.
func (*AggregateBase) AggregateID ¶
func (a *AggregateBase) AggregateID() string
AggregateID returns the ID of the aggregate.
func (*AggregateBase) ClearUncommittedEvents ¶
func (a *AggregateBase) ClearUncommittedEvents()
ClearUncommittedEvents clears all uncommitted events after storing.
func (*AggregateBase) GetUncommittedEvents ¶
func (a *AggregateBase) GetUncommittedEvents() []Event
GetUncommittedEvents gets all uncommitted events for storing.
func (*AggregateBase) IncrementVersion ¶
func (a *AggregateBase) IncrementVersion()
IncrementVersion increments the aggregate version.
func (*AggregateBase) StoreEvent ¶
func (a *AggregateBase) StoreEvent(event Event)
StoreEvent stores an event until as uncommitted.
func (*AggregateBase) Version ¶
func (a *AggregateBase) Version() int
Version returns the version of the aggregate.
type AggregateCommandHandler ¶
type AggregateCommandHandler struct {
// contains filtered or unexported fields
}
AggregateCommandHandler dispatches commands to registered aggregates.
The dispatch process is as follows: 1. The handler receives a command 2. An aggregate is created or rebuilt from previous events by the repository 3. The aggregate's command handler is called 4. The aggregate stores events in response to the command 5. The new events are stored in the event store by the repository 6. The events are published to the event bus when stored by the event store
func NewAggregateCommandHandler ¶
func NewAggregateCommandHandler(repository Repository) (*AggregateCommandHandler, error)
NewAggregateCommandHandler creates a new AggregateCommandHandler.
func (*AggregateCommandHandler) HandleCommand ¶
func (h *AggregateCommandHandler) HandleCommand(command Command) error
HandleCommand handles a command with the registered aggregate. Returns ErrAggregateNotFound if no aggregate could be found.
func (*AggregateCommandHandler) SetAggregate ¶
func (h *AggregateCommandHandler) SetAggregate(aggregate Aggregate, command Command) error
SetAggregate sets an aggregate as handler for a command.
type AggregateRecord ¶
type AggregateRecord interface { AggregateID() string Version() int EventRecords() []EventRecord }
AggregateRecord is a stored record of an aggregate in form of its events.
type CallbackRepository ¶
type CallbackRepository struct {
// contains filtered or unexported fields
}
CallbackRepository is an aggregate repository using factory functions.
func NewCallbackRepository ¶
func NewCallbackRepository(eventStore EventStore) (*CallbackRepository, error)
NewCallbackRepository creates a repository and associates it with an event store.
func (*CallbackRepository) Load ¶
func (r *CallbackRepository) Load(aggregateType string, id string) (Aggregate, error)
Load loads an aggregate by creating it and applying all events.
func (*CallbackRepository) RegisterAggregate ¶
func (r *CallbackRepository) RegisterAggregate(aggregate Aggregate, callback func(string) Aggregate) error
RegisterAggregate registers an aggregate factory for a type. The factory is used to create concrete aggregate types when loading from the database.
An example would be:
repository.RegisterAggregate(&Aggregate{}, func(id UUID) interface{} { return &Aggregate{id} })
func (*CallbackRepository) Save ¶
func (r *CallbackRepository) Save(aggregate Aggregate) error
Save saves all uncommitted events from an aggregate.
type Command ¶
Command is a domain command that is sent to a Dispatcher.
A command name should 1) be in present tense and 2) contain the intent (MoveCustomer vs CorrectCustomerAddress).
The command should contain all the data needed when handling it as fields. These fields can take an optional "eh" tag, which adds properties. For now only "optional" is a valid tag: `eh:"optional"`.
type CommandBus ¶
type CommandBus interface { // PublishCommand publishes a command on the command bus. PublishCommand(Command) error // HandleCommand handles a command on the command bus. HandleCommand(Command) error // SetHandler registers a handler with a command. SetHandler(CommandHandler, Command) error }
CommandBus is an interface defining an event bus for distributing events.
func NewInternalCommandBus ¶
func NewInternalCommandBus() CommandBus
NewInternalCommandBus creates a InternalCommandBus.
type CommandFieldError ¶
type CommandFieldError struct {
Field string
}
CommandFieldError is returned by Dispatch when a field is incorrect.
func (CommandFieldError) Error ¶
func (c CommandFieldError) Error() string
type CommandHandler ¶
CommandHandler is an interface that all handlers of commands should implement.
type Event ¶
Event is a domain event describing a change that has happened to an aggregate.
An event name should 1) be in past tense and 2) contain the intent (CustomerMoved vs CustomerAddressCorrected).
The event should contain all the data needed when applying/handling it.
type EventBus ¶
type EventBus interface { // PublishEvent publishes an event on the event bus. PublishEvent(Event) // AddHandler adds a handler for a specific local event. AddHandler(EventHandler, Event) // AddLocalHandler adds a handler for local events. AddLocalHandler(EventHandler) // AddGlobalHandler adds a handler for global (remote) events. AddGlobalHandler(EventHandler) }
EventBus is an interface defining an event bus for distributing events.
type EventHandler ¶
type EventHandler interface { // HandleEvent handles an event. HandleEvent(Event) }
EventHandler is an interface that all handlers of events should implement.
type EventRecord ¶
EventRecord is a single event record with timestamp
type EventStore ¶
type EventStore interface { // Save appends all events in the event stream to the store. Save([]Event) error // Load loads all events for the aggregate id from the store. Load(string) ([]Event, error) }
EventStore is an interface for an event sourcing event store.
type InternalCommandBus ¶
type InternalCommandBus struct {
// contains filtered or unexported fields
}
InternalCommandBus is a command bus that handles commands with the registered CommandHandlers
func (*InternalCommandBus) HandleCommand ¶
func (b *InternalCommandBus) HandleCommand(command Command) error
HandleCommand handles a command with a handler capable of handling it.
func (*InternalCommandBus) PublishCommand ¶
func (b *InternalCommandBus) PublishCommand(command Command) error
PublishCommand publishes a command to the internal command bus.
func (*InternalCommandBus) SetHandler ¶
func (b *InternalCommandBus) SetHandler(handler CommandHandler, command Command) error
SetHandler adds a handler for a specific command.
type InternalEventBus ¶
type InternalEventBus struct {
// contains filtered or unexported fields
}
InternalEventBus is an event bus that notifies registered EventHandlers of published events.
func NewInternalEventBus ¶
func NewInternalEventBus() *InternalEventBus
NewInternalEventBus creates a InternalEventBus.
func (*InternalEventBus) AddGlobalHandler ¶
func (b *InternalEventBus) AddGlobalHandler(handler EventHandler)
AddGlobalHandler adds a handler for global (remote) events.
func (*InternalEventBus) AddHandler ¶
func (b *InternalEventBus) AddHandler(handler EventHandler, event Event)
AddHandler adds a handler for a specific local event.
func (*InternalEventBus) AddLocalHandler ¶
func (b *InternalEventBus) AddLocalHandler(handler EventHandler)
AddLocalHandler adds a handler for local events.
func (*InternalEventBus) PublishEvent ¶
func (b *InternalEventBus) PublishEvent(event Event)
PublishEvent publishes an event to all handlers capable of handling it.
type MemoryEventStore ¶
type MemoryEventStore struct {
// contains filtered or unexported fields
}
MemoryEventStore implements EventStore as an in memory structure.
func NewMemoryEventStore ¶
func NewMemoryEventStore(eventBus EventBus) *MemoryEventStore
NewMemoryEventStore creates a new MemoryEventStore.
func (*MemoryEventStore) Load ¶
func (s *MemoryEventStore) Load(id string) ([]Event, error)
Load loads all events for the aggregate id from the memory store. Returns ErrNoEventsFound if no events can be found.
func (*MemoryEventStore) Save ¶
func (s *MemoryEventStore) Save(events []Event) error
Save appends all events in the event stream to the memory store.
type MemoryReadRepository ¶
type MemoryReadRepository struct {
// contains filtered or unexported fields
}
MemoryReadRepository implements an in memory repository of read models.
func NewMemoryReadRepository ¶
func NewMemoryReadRepository() *MemoryReadRepository
NewMemoryReadRepository creates a new MemoryReadRepository.
func (*MemoryReadRepository) Find ¶
func (r *MemoryReadRepository) Find(id string) (interface{}, error)
Find returns one read model with using an id. Returns ErrModelNotFound if no model could be found.
func (*MemoryReadRepository) FindAll ¶
func (r *MemoryReadRepository) FindAll() ([]interface{}, error)
FindAll returns all read models in the repository.
func (*MemoryReadRepository) Remove ¶
func (r *MemoryReadRepository) Remove(id string) error
Remove removes a read model with id from the repository. Returns ErrModelNotFound if no model could be found.
func (*MemoryReadRepository) Save ¶
func (r *MemoryReadRepository) Save(id string, model interface{}) error
Save saves a read model with id to the repository.
type MongoEventStore ¶
type MongoEventStore struct {
// contains filtered or unexported fields
}
MongoEventStore implements an EventStore for MongoDB.
func NewMongoEventStore ¶
func NewMongoEventStore(eventBus EventBus, url, database string) (*MongoEventStore, error)
NewMongoEventStore creates a new MongoEventStore.
func NewMongoEventStoreWithSession ¶
func NewMongoEventStoreWithSession(eventBus EventBus, session *mgo.Session, database string) (*MongoEventStore, error)
NewMongoEventStoreWithSession creates a new MongoEventStore with a session.
func (*MongoEventStore) Clear ¶
func (s *MongoEventStore) Clear() error
Clear clears the event storge.
func (*MongoEventStore) Close ¶
func (s *MongoEventStore) Close() error
Close closes the database session.
func (*MongoEventStore) Load ¶
func (s *MongoEventStore) Load(id string) ([]Event, error)
Load loads all events for the aggregate id from the database. Returns ErrNoEventsFound if no events can be found.
func (*MongoEventStore) RegisterEventType ¶
func (s *MongoEventStore) RegisterEventType(event Event, factory func() Event) error
RegisterEventType registers an event factory for a event type. The factory is used to create concrete event types when loading from the database.
An example would be:
eventStore.RegisterEventType(&MyEvent{}, func() Event { return &MyEvent{} })
func (*MongoEventStore) Save ¶
func (s *MongoEventStore) Save(events []Event) error
Save appends all events in the event stream to the database.
func (*MongoEventStore) SetDB ¶
func (s *MongoEventStore) SetDB(db string)
SetDB sets the database session.
type MongoReadRepository ¶
type MongoReadRepository struct {
// contains filtered or unexported fields
}
MongoReadRepository implements an MongoDB repository of read models.
func NewMongoReadRepository ¶
func NewMongoReadRepository(url, database, collection string) (*MongoReadRepository, error)
NewMongoReadRepository creates a new MongoReadRepository.
func NewMongoReadRepositoryWithSession ¶
func NewMongoReadRepositoryWithSession(session *mgo.Session, database, collection string) (*MongoReadRepository, error)
NewMongoReadRepositoryWithSession creates a new MongoReadRepository with a session.
func (*MongoReadRepository) Clear ¶
func (r *MongoReadRepository) Clear() error
Clear clears the read model database.
func (*MongoReadRepository) Close ¶
func (r *MongoReadRepository) Close() error
Close closes a database session.
func (*MongoReadRepository) Find ¶
func (r *MongoReadRepository) Find(id string) (interface{}, error)
Find returns one read model with using an id. Returns ErrModelNotFound if no model could be found.
func (*MongoReadRepository) FindAll ¶
func (r *MongoReadRepository) FindAll() ([]interface{}, error)
FindAll returns all read models in the repository.
func (*MongoReadRepository) FindCustom ¶
func (r *MongoReadRepository) FindCustom(callback func(*mgo.Collection) *mgo.Query) ([]interface{}, error)
FindCustom uses a callback to specify a custom query.
func (*MongoReadRepository) Remove ¶
func (r *MongoReadRepository) Remove(id string) error
Remove removes a read model with id from the repository. Returns ErrModelNotFound if no model could be found.
func (*MongoReadRepository) Save ¶
func (r *MongoReadRepository) Save(id string, model interface{}) error
Save saves a read model with id to the repository.
func (*MongoReadRepository) SetDB ¶
func (r *MongoReadRepository) SetDB(db string)
SetDB sets the database session and database.
func (*MongoReadRepository) SetModel ¶
func (r *MongoReadRepository) SetModel(factory func() interface{})
SetModel sets a factory function that creates concrete model types.
type PostgresEventStore ¶
type PostgresEventStore struct {
// contains filtered or unexported fields
}
PostgresEventStore implements an EventStore for Postgres.
func NewPostgresEventStore ¶
func NewPostgresEventStore(eventBus EventBus, conn string) (*PostgresEventStore, error)
NewPostgresEventStore creates a new PostgresEventStore.
func (*PostgresEventStore) Clear ¶
func (s *PostgresEventStore) Clear() error
Clear clears the postgres storage.
func (*PostgresEventStore) Close ¶
func (s *PostgresEventStore) Close() error
Close closes the postgres db connection.
func (*PostgresEventStore) Load ¶
func (s *PostgresEventStore) Load(id string) ([]Event, error)
Load loads all events for the aggregate id from the store.
func (*PostgresEventStore) RegisterEventType ¶
func (s *PostgresEventStore) RegisterEventType(event Event, factory func() Event) error
RegisterEventType registers an event factory for a event type. The factory is used to create concrete event types when loading from the database.
An example would be:
eventStore.RegisterEventType(&MyEvent{}, func() Event { return &MyEvent{} })
func (*PostgresEventStore) Save ¶
func (s *PostgresEventStore) Save(events []Event) error
Save appends all events in the event stream to the store.
type PostgresReadRepository ¶
type PostgresReadRepository struct {
// contains filtered or unexported fields
}
PostgresReadRepository implements an Postgres repository of read models.
func NewPostgresReadRepository ¶
func NewPostgresReadRepository(conn, table string) (*PostgresReadRepository, error)
NewPostgresReadRepository creates a new PostgresReadRepository.
func (*PostgresReadRepository) Clear ¶
func (r *PostgresReadRepository) Clear() error
Clear clears the read model table.
func (*PostgresReadRepository) Close ¶
func (r *PostgresReadRepository) Close() error
Close closes the postgres db connection.
func (*PostgresReadRepository) Find ¶
func (r *PostgresReadRepository) Find(id string) (interface{}, error)
Find returns one read model with using an id.
func (*PostgresReadRepository) FindAll ¶
func (r *PostgresReadRepository) FindAll() ([]interface{}, error)
FindAll returns all read models in the repository.
func (*PostgresReadRepository) Remove ¶
func (r *PostgresReadRepository) Remove(id string) error
Remove removes a read model with id from the repository.
func (*PostgresReadRepository) Save ¶
func (r *PostgresReadRepository) Save(id string, model interface{}) error
Save saves a read model with id to the repository.
func (*PostgresReadRepository) SetModel ¶
func (r *PostgresReadRepository) SetModel(factory func() interface{})
SetModel sets a factory function that creates concrete model types.
type RabbitMQCommandBus ¶
type RabbitMQCommandBus struct {
// contains filtered or unexported fields
}
RabbitMQCommandBus implements CommandBus using RabbitMQ.
func NewRabbitMQCommandBus ¶
func NewRabbitMQCommandBus(amqpURI, app, tag string) (*RabbitMQCommandBus, error)
NewRabbitMQCommandBus creates a new RabbitMQ command bus. amqpURI is the RabbitMQ URI for rabbitmq. app is provides a namespace for this application, allowing for multiple command buses to run on one RabbitMQ and not conflict with eachother. tag is used as the RabbitMQ consumer tag for this bus.
func (*RabbitMQCommandBus) Close ¶
func (b *RabbitMQCommandBus) Close() error
Close closes the command bus, closing the rabbitmq connection.
func (*RabbitMQCommandBus) HandleCommand ¶
func (b *RabbitMQCommandBus) HandleCommand(command Command) error
HandleCommand handles a command, dispatching it to the proper handlers.
func (*RabbitMQCommandBus) PublishCommand ¶
func (b *RabbitMQCommandBus) PublishCommand(command Command) error
PublishCommand publishes a command to the commands exchange.
func (*RabbitMQCommandBus) RegisterCommandType ¶
func (b *RabbitMQCommandBus) RegisterCommandType(command Command, factory func() Command) error
RegisterCommandType registers a command factory for a specific command.
func (*RabbitMQCommandBus) SetHandler ¶
func (b *RabbitMQCommandBus) SetHandler(handler CommandHandler, command Command) error
SetHandler sets a handler for a specific command.
type RabbitMQEventBus ¶
type RabbitMQEventBus struct {
// contains filtered or unexported fields
}
RabbitMQEventBus implements CommandBus using RabbitMQ.
func NewRabbitMQEventBus ¶
func NewRabbitMQEventBus(amqpURI, app, tag string) (*RabbitMQEventBus, error)
NewRabbitMQEventBus creates a new RabbitMQ event bus. amqpURI is the RabbitMQ URI for rabbitmq. app is provides a namespace for this application, allowing for multiple event buses to run on one RabbitMQ and not conflict with eachother. tag is used as the RabbitMQ consumer tag for this bus.
func (*RabbitMQEventBus) AddGlobalHandler ¶
func (b *RabbitMQEventBus) AddGlobalHandler(handler EventHandler)
AddGlobalHandler adds a handler for global (remote) events.
func (*RabbitMQEventBus) AddHandler ¶
func (b *RabbitMQEventBus) AddHandler(handler EventHandler, event Event)
AddHandler adds a handler for a specific local event.
func (*RabbitMQEventBus) AddLocalHandler ¶
func (b *RabbitMQEventBus) AddLocalHandler(handler EventHandler)
AddLocalHandler adds a handler for local events.
func (*RabbitMQEventBus) Close ¶
func (b *RabbitMQEventBus) Close() error
Close closes the command bus, closing the rabbitmq connection.
func (*RabbitMQEventBus) PublishEvent ¶
func (b *RabbitMQEventBus) PublishEvent(event Event)
PublishEvent publishes a command to the commands exchange.
func (*RabbitMQEventBus) RegisterEventType ¶
func (b *RabbitMQEventBus) RegisterEventType(event Event, factory func() Event) error
RegisterEventType registers a event factory for a specific event.
type ReadRepository ¶
type ReadRepository interface { // Save saves a read model with id to the repository. Save(string, interface{}) error // Find returns one read model with using an id. Find(string) (interface{}, error) // FindAll returns all read models in the repository. FindAll() ([]interface{}, error) // Remove removes a read model with id from the repository. Remove(string) error }
ReadRepository is a storage for read models.
type RedisEventBus ¶
type RedisEventBus struct {
// contains filtered or unexported fields
}
RedisEventBus is an event bus that notifies registered EventHandlers of published events.
func NewRedisEventBus ¶
func NewRedisEventBus(appID, server, password string) (*RedisEventBus, error)
NewRedisEventBus creates a RedisEventBus for remote events.
func NewRedisEventBusWithPool ¶
func NewRedisEventBusWithPool(appID string, pool *redis.Pool) (*RedisEventBus, error)
NewRedisEventBusWithPool creates a RedisEventBus for remote events.
func (*RedisEventBus) AddGlobalHandler ¶
func (b *RedisEventBus) AddGlobalHandler(handler EventHandler)
AddGlobalHandler adds a handler for global (remote) events.
func (*RedisEventBus) AddHandler ¶
func (b *RedisEventBus) AddHandler(handler EventHandler, event Event)
AddHandler adds a handler for a specific local event.
func (*RedisEventBus) AddLocalHandler ¶
func (b *RedisEventBus) AddLocalHandler(handler EventHandler)
AddLocalHandler adds a handler for local events.
func (*RedisEventBus) Close ¶
func (b *RedisEventBus) Close() error
Close exits the recive goroutine by unsubscribing to all channels.
func (*RedisEventBus) PublishEvent ¶
func (b *RedisEventBus) PublishEvent(event Event)
PublishEvent publishes an event to all handlers capable of handling it.
func (*RedisEventBus) RegisterEventType ¶
func (b *RedisEventBus) RegisterEventType(event Event, factory func() Event) error
RegisterEventType registers an event factory for a event type. The factory is used to create concrete event types when receiving from subscriptions.
An example would be:
eventStore.RegisterEventType(&MyEvent{}, func() Event { return &MyEvent{} })
type RemoteCommandBus ¶
type RemoteCommandBus interface { CommandBus RegisterCommandType(command Command, factory func() Command) error Close() error }
RemoteCommandBus is a command bus that using a networked service.
type RemoteEventBus ¶
type RemoteEventBus interface { EventBus RemoteHandler // Close cleans up any connections Close() error }
RemoteEventBus is EventBus that uses a networked service
type RemoteEventStore ¶
type RemoteEventStore interface { EventStore RemoteHandler // Close the event store. Close() error // Clear deletes all data from the event store. Clear() error }
RemoteEventStore is a store that is remote, requiring serialization and closing
type RemoteHandler ¶
type RemoteHandler interface { // Register a function to create a new event from an event RegisterEventType(Event, func() Event) error }
RemoteHandler enables deserizliaing remote objects to events
type RemoteReadRepository ¶
type RemoteReadRepository interface { ReadRepository SetModel(factory func() interface{}) Close() error Clear() error }
RemoteReadRepository is a read repository that uses a networked service
type Repository ¶
type Repository interface { // Load loads an aggregate with a type and id. Load(string, string) (Aggregate, error) // Save saves an aggregets uncommitted events. Save(Aggregate) error }
Repository is a repository responsible for loading and saving aggregates.
type TraceEventStore ¶
type TraceEventStore struct {
// contains filtered or unexported fields
}
TraceEventStore wraps an EventStore and adds debug tracing.
func NewTraceEventStore ¶
func NewTraceEventStore(eventStore EventStore) *TraceEventStore
NewTraceEventStore creates a new TraceEventStore.
func (*TraceEventStore) GetTrace ¶
func (s *TraceEventStore) GetTrace() []Event
GetTrace returns the events that happened during the tracing.
func (*TraceEventStore) Load ¶
func (s *TraceEventStore) Load(id string) ([]Event, error)
Load loads all events for the aggregate id from the base store. Returns ErrNoEventStoreDefined if no event store could be found.
func (*TraceEventStore) ResetTrace ¶
func (s *TraceEventStore) ResetTrace()
ResetTrace resets the trace.
func (*TraceEventStore) Save ¶
func (s *TraceEventStore) Save(events []Event) error
Save appends all events to the base store and trace them if enabled.
func (*TraceEventStore) StartTracing ¶
func (s *TraceEventStore) StartTracing()
StartTracing starts the tracing of events.
func (*TraceEventStore) StopTracing ¶
func (s *TraceEventStore) StopTracing()
StopTracing stops the tracing of events.
Source Files
¶
- aggregate.go
- commandbus.go
- commandbus_rabbitmq.go
- commandhandler.go
- eventbus.go
- eventbus_rabbitmq.go
- eventbus_redis.go
- eventhorizon.go
- eventstore.go
- eventstore_memory.go
- eventstore_mongo.go
- eventstore_postgres.go
- eventstore_remote.go
- eventstore_tracing.go
- mongodb.go
- readrepository.go
- readrepository_memory.go
- readrepository_mongo.go
- readrepository_postgres.go
- readrepository_remote.go
- remote.go
- repository.go
Directories
¶
Path | Synopsis |
---|---|
examples
|
|
delegation
Package example contains a simple runnable example of a CQRS/ES app.
|
Package example contains a simple runnable example of a CQRS/ES app. |