Documentation ¶
Overview ¶
Package cqrs provides a CQRS and Event Sourcing framework written in go influenced by the cqrs journey guide
For a full guide visit http://github.com/andrewwebber/cqrs
import "github.com/andrewwebber/cqrs" func NewAccount(firstName string, lastName string, emailAddress string, passwordHash []byte, initialBalance float64) *Account { account := new(Account) account.EventSourceBased = cqrs.NewEventSourceBased(account) event := AccountCreatedEvent{firstName, lastName, emailAddress, passwordHash, initialBalance} account.Update(event) return account }
Index ¶
- Constants
- Variables
- func DeliverCQRSError(correlationID string, err error, repo EventSourcingRepository)
- type ByCreated
- type CQRSErrorEvent
- type Command
- type CommandDispatchManager
- type CommandDispatcher
- type CommandHandler
- type CommandPublisher
- type CommandReceiver
- type CommandReceiverOptions
- type CommandTransactedAccept
- type EventSourceBased
- func (s *EventSourceBased) CallEventHandler(event interface{})
- func (s *EventSourceBased) Events() []interface{}
- func (s *EventSourceBased) ID() string
- func (s *EventSourceBased) SetID(id string)
- func (s *EventSourceBased) SetVersion(version int)
- func (s *EventSourceBased) Update(versionedEvent interface{})
- func (s *EventSourceBased) Version() int
- type EventSourced
- type EventSourcingRepository
- type EventStreamRepository
- type HandlersCache
- type InMemoryCommandBus
- type InMemoryEventBus
- type InMemoryEventStreamRepository
- func (r *InMemoryEventStreamRepository) AllIntegrationEventsEverPublished() ([]VersionedEvent, error)
- func (r *InMemoryEventStreamRepository) Get(id string) ([]VersionedEvent, error)
- func (r *InMemoryEventStreamRepository) GetIntegrationEventsByCorrelationID(correlationID string) ([]VersionedEvent, error)
- func (r *InMemoryEventStreamRepository) Save(id string, newEvents []VersionedEvent) error
- func (r *InMemoryEventStreamRepository) SaveIntegrationEvent(event VersionedEvent) error
- type MapBasedCommandDispatcher
- type MapBasedVersionedEventDispatcher
- type TypeCache
- type TypeRegistry
- type VersionedEvent
- type VersionedEventDispatchManager
- type VersionedEventDispatcher
- type VersionedEventHandler
- type VersionedEventPublicationLogger
- type VersionedEventPublisher
- type VersionedEventReceiver
- type VersionedEventReceiverOptions
- type VersionedEventTransactedAccept
Constants ¶
const CQRSErrorEventType = "cqrs.CQRSErrorEvent"
Variables ¶
var DebugTypeRegistry = false
DebugTypeRegistry if set to true logs type resolution failures
var ErrConcurrencyWhenSavingEvents = errors.New("concurrency error saving event")
ErrConcurrencyWhenSavingEvents is raised when a concurrency error has occured when saving events
Functions ¶
func DeliverCQRSError ¶
func DeliverCQRSError(correlationID string, err error, repo EventSourcingRepository)
Types ¶
type ByCreated ¶
type ByCreated []VersionedEvent
ByCreated is an alias for sorting VersionedEvents by the create field
type CQRSErrorEvent ¶
type CQRSErrorEvent struct {
Message string
}
CQRSErrorEvent is a generic event raised within the CQRS framework
type Command ¶
type Command struct { MessageID string `json:"messageID"` CorrelationID string `json:"correlationID"` CommandType string `json:"commandType"` Created time.Time `json:"time"` Body interface{} }
Command represents an actor intention to alter the state of the system
func CreateCommand ¶
func CreateCommand(body interface{}) Command
CreateCommand is a helper for creating a new command object with populated default properties
func CreateCommandWithCorrelationID ¶
CreateCommandWithCorrelationID is a helper for creating a new command object with populated default properties
type CommandDispatchManager ¶
type CommandDispatchManager struct {
// contains filtered or unexported fields
}
CommandDispatchManager is responsible for coordinating receiving messages from command receivers and dispatching them to the command dispatcher.
func NewCommandDispatchManager ¶
func NewCommandDispatchManager(receiver CommandReceiver, registry TypeRegistry) *CommandDispatchManager
NewCommandDispatchManager is a constructor for the CommandDispatchManager
func (*CommandDispatchManager) Listen ¶
func (m *CommandDispatchManager) Listen(stop <-chan bool, exclusive bool) error
Listen starts a listen loop processing channels related to new incoming events, errors and stop listening requests
func (*CommandDispatchManager) RegisterCommandHandler ¶
func (m *CommandDispatchManager) RegisterCommandHandler(command interface{}, handler CommandHandler)
RegisterCommandHandler allows a caller to register a command handler given a command of the specified type being received
func (*CommandDispatchManager) RegisterGlobalHandler ¶
func (m *CommandDispatchManager) RegisterGlobalHandler(handler CommandHandler)
RegisterGlobalHandler allows a caller to register a wildcard command handler call on any command received
type CommandDispatcher ¶
type CommandDispatcher interface { DispatchCommand(Command) error RegisterCommandHandler(event interface{}, handler CommandHandler) RegisterGlobalHandler(handler CommandHandler) }
CommandDispatcher is responsible for routing commands from the command manager to call handlers responsible for processing received commands
type CommandHandler ¶
CommandHandler is a function that takes a command
type CommandPublisher ¶
CommandPublisher is responsilbe for publishing commands
type CommandReceiver ¶
type CommandReceiver interface {
ReceiveCommands(CommandReceiverOptions) error
}
CommandReceiver is responsible for receiving commands
type CommandReceiverOptions ¶
type CommandReceiverOptions struct { TypeRegistry TypeRegistry Close chan chan error Error chan error ReceiveCommand chan CommandTransactedAccept Exclusive bool }
CommandReceiverOptions is an initalization structure to communicate to and from a command receiver go routine
type CommandTransactedAccept ¶
CommandTransactedAccept is the message routed from a command receiver to the command manager. Sometimes command receivers designed with reliable delivery require acknowledgements after a message has been received. The success channel here allows for such acknowledgements
type EventSourceBased ¶
type EventSourceBased struct {
// contains filtered or unexported fields
}
EventSourceBased provider a base class for aggregate times wishing to contain basis helper functionality for event sourcing
func NewEventSourceBased ¶
func NewEventSourceBased(source interface{}) EventSourceBased
NewEventSourceBased constructor
func NewEventSourceBasedWithID ¶
func NewEventSourceBasedWithID(source interface{}, id string) EventSourceBased
NewEventSourceBasedWithID constructor
func (*EventSourceBased) CallEventHandler ¶
func (s *EventSourceBased) CallEventHandler(event interface{})
CallEventHandler routes an event to an aggregate's event handler
func (*EventSourceBased) Events ¶
func (s *EventSourceBased) Events() []interface{}
Events returns a slice of newly created events since last deserialization
func (*EventSourceBased) SetID ¶
func (s *EventSourceBased) SetID(id string)
SetID sets the aggregate's ID
func (*EventSourceBased) SetVersion ¶
func (s *EventSourceBased) SetVersion(version int)
SetVersion sets the aggregate's Version
func (*EventSourceBased) Update ¶
func (s *EventSourceBased) Update(versionedEvent interface{})
Update should be called to change the state of an aggregate type
func (*EventSourceBased) Version ¶
func (s *EventSourceBased) Version() int
Version provider the aggregate's Version
type EventSourced ¶
type EventSourced interface { ID() string SetID(string) Version() int SetVersion(int) Events() []interface{} CallEventHandler(event interface{}) }
EventSourced providers an interface for event sourced aggregate types
type EventSourcingRepository ¶
type EventSourcingRepository interface { GetEventStreamRepository() EventStreamRepository GetTypeRegistry() TypeRegistry Save(EventSourced, string) error Get(string, EventSourced) error }
EventSourcingRepository is a repository for event source based aggregates
func NewRepository ¶
func NewRepository(eventStreamRepository EventStreamRepository, registry TypeRegistry) EventSourcingRepository
NewRepository constructs an EventSourcingRepository
func NewRepositoryWithPublisher ¶
func NewRepositoryWithPublisher(eventStreamRepository EventStreamRepository, publisher VersionedEventPublisher, registry TypeRegistry) EventSourcingRepository
NewRepositoryWithPublisher constructs an EventSourcingRepository with a VersionedEventPublisher to dispatch events once persisted to the EventStreamRepository
type EventStreamRepository ¶
type EventStreamRepository interface { VersionedEventPublicationLogger Save(string, []VersionedEvent) error Get(string) ([]VersionedEvent, error) }
EventStreamRepository is a persistance layer for events associated with aggregates by ID
type HandlersCache ¶
HandlersCache is a map of types to functions that will be used to route event sourcing events
type InMemoryCommandBus ¶
type InMemoryCommandBus struct {
// contains filtered or unexported fields
}
InMemoryCommandBus provides an inmemory implementation of the CommandPublisher CommandReceiver interfaces
func NewInMemoryCommandBus ¶
func NewInMemoryCommandBus() *InMemoryCommandBus
NewInMemoryCommandBus constructor
func (*InMemoryCommandBus) PublishCommands ¶
func (bus *InMemoryCommandBus) PublishCommands(commands []Command) error
PublishCommands publishes Commands to the Command bus
func (*InMemoryCommandBus) ReceiveCommands ¶
func (bus *InMemoryCommandBus) ReceiveCommands(options CommandReceiverOptions) error
ReceiveCommands starts a go routine that monitors incoming Commands and routes them to a receiver channel specified within the options
type InMemoryEventBus ¶
type InMemoryEventBus struct {
// contains filtered or unexported fields
}
InMemoryEventBus provides an inmemory implementation of the VersionedEventPublisher VersionedEventReceiver interfaces
func NewInMemoryEventBus ¶
func NewInMemoryEventBus() *InMemoryEventBus
NewInMemoryEventBus constructor
func (*InMemoryEventBus) PublishEvents ¶
func (bus *InMemoryEventBus) PublishEvents(events []VersionedEvent) error
PublishEvents publishes events to the event bus
func (*InMemoryEventBus) ReceiveEvents ¶
func (bus *InMemoryEventBus) ReceiveEvents(options VersionedEventReceiverOptions) error
ReceiveEvents starts a go routine that monitors incoming events and routes them to a receiver channel specified within the options
type InMemoryEventStreamRepository ¶
type InMemoryEventStreamRepository struct {
// contains filtered or unexported fields
}
InMemoryEventStreamRepository provides an inmemory event sourcing repository
func NewInMemoryEventStreamRepository ¶
func NewInMemoryEventStreamRepository() *InMemoryEventStreamRepository
NewInMemoryEventStreamRepository constructor
func (*InMemoryEventStreamRepository) AllIntegrationEventsEverPublished ¶
func (r *InMemoryEventStreamRepository) AllIntegrationEventsEverPublished() ([]VersionedEvent, error)
AllIntegrationEventsEverPublished returns all events ever published
func (*InMemoryEventStreamRepository) Get ¶
func (r *InMemoryEventStreamRepository) Get(id string) ([]VersionedEvent, error)
Get retrieves events assoicated with an event sourced object by ID
func (*InMemoryEventStreamRepository) GetIntegrationEventsByCorrelationID ¶
func (r *InMemoryEventStreamRepository) GetIntegrationEventsByCorrelationID(correlationID string) ([]VersionedEvent, error)
GetIntegrationEventsByCorrelationID returns all integration events with a matching correlationID
func (*InMemoryEventStreamRepository) Save ¶
func (r *InMemoryEventStreamRepository) Save(id string, newEvents []VersionedEvent) error
Save persists an event sourced object into the repository
func (*InMemoryEventStreamRepository) SaveIntegrationEvent ¶
func (r *InMemoryEventStreamRepository) SaveIntegrationEvent(event VersionedEvent) error
SaveIntegrationEvent persists an integration event
type MapBasedCommandDispatcher ¶
type MapBasedCommandDispatcher struct {
// contains filtered or unexported fields
}
MapBasedCommandDispatcher is a simple implementation of the command dispatcher. Using a map it registered command handlers to command types
func NewMapBasedCommandDispatcher ¶
func NewMapBasedCommandDispatcher() *MapBasedCommandDispatcher
NewMapBasedCommandDispatcher is a constructor for the MapBasedVersionedCommandDispatcher
func (*MapBasedCommandDispatcher) DispatchCommand ¶
func (m *MapBasedCommandDispatcher) DispatchCommand(command Command) error
DispatchCommand executes all command handlers registered for the given command type
func (*MapBasedCommandDispatcher) RegisterCommandHandler ¶
func (m *MapBasedCommandDispatcher) RegisterCommandHandler(command interface{}, handler CommandHandler)
RegisterCommandHandler allows a caller to register a command handler given a command of the specified type being received
func (*MapBasedCommandDispatcher) RegisterGlobalHandler ¶
func (m *MapBasedCommandDispatcher) RegisterGlobalHandler(handler CommandHandler)
RegisterGlobalHandler allows a caller to register a wildcard command handler call on any command received
type MapBasedVersionedEventDispatcher ¶
type MapBasedVersionedEventDispatcher struct {
// contains filtered or unexported fields
}
MapBasedVersionedEventDispatcher is a simple implementation of the versioned event dispatcher. Using a map it registered event handlers to event types
func NewVersionedEventDispatcher ¶
func NewVersionedEventDispatcher() *MapBasedVersionedEventDispatcher
NewVersionedEventDispatcher is a constructor for the MapBasedVersionedEventDispatcher
func (*MapBasedVersionedEventDispatcher) DispatchEvent ¶
func (m *MapBasedVersionedEventDispatcher) DispatchEvent(event VersionedEvent) error
DispatchEvent executes all event handlers registered for the given event type
func (*MapBasedVersionedEventDispatcher) RegisterEventHandler ¶
func (m *MapBasedVersionedEventDispatcher) RegisterEventHandler(event interface{}, handler VersionedEventHandler)
RegisterEventHandler allows a caller to register an event handler given an event of the specified type being received
func (*MapBasedVersionedEventDispatcher) RegisterGlobalHandler ¶
func (m *MapBasedVersionedEventDispatcher) RegisterGlobalHandler(handler VersionedEventHandler)
RegisterGlobalHandler allows a caller to register a wildcard event handler call on any event received
type TypeRegistry ¶
type TypeRegistry interface { GetHandlers(interface{}) HandlersCache GetTypeByName(string) (reflect.Type, bool) RegisterAggregate(aggregate interface{}, events ...interface{}) RegisterEvents(events ...interface{}) RegisterType(interface{}) }
TypeRegistry providers a helper registry for mapping event types and handlers after performance json serializaton
func NewTypeRegistry ¶
func NewTypeRegistry() TypeRegistry
NewTypeRegistry constructs a new TypeRegistry
type VersionedEvent ¶
type VersionedEvent struct { ID string `json:"id"` CorrelationID string `json:"correlationID"` SourceID string `json:"sourceID"` Version int `json:"version"` EventType string `json:"eventType"` Created time.Time `json:"time"` Event interface{} }
VersionedEvent represents an event in the past for an aggregate
type VersionedEventDispatchManager ¶
type VersionedEventDispatchManager struct {
// contains filtered or unexported fields
}
VersionedEventDispatchManager is responsible for coordinating receiving messages from event receivers and dispatching them to the event dispatcher.
func NewVersionedEventDispatchManager ¶
func NewVersionedEventDispatchManager(receiver VersionedEventReceiver, registry TypeRegistry) *VersionedEventDispatchManager
NewVersionedEventDispatchManager is a constructor for the VersionedEventDispatchManager
func (*VersionedEventDispatchManager) Listen ¶
func (m *VersionedEventDispatchManager) Listen(stop <-chan bool, exclusive bool) error
Listen starts a listen loop processing channels related to new incoming events, errors and stop listening requests
func (*VersionedEventDispatchManager) RegisterEventHandler ¶
func (m *VersionedEventDispatchManager) RegisterEventHandler(event interface{}, handler VersionedEventHandler)
RegisterEventHandler allows a caller to register an event handler given an event of the specified type being received
func (*VersionedEventDispatchManager) RegisterGlobalHandler ¶
func (m *VersionedEventDispatchManager) RegisterGlobalHandler(handler VersionedEventHandler)
RegisterGlobalHandler allows a caller to register a wildcard event handler call on any event received
type VersionedEventDispatcher ¶
type VersionedEventDispatcher interface { DispatchEvent(VersionedEvent) error RegisterEventHandler(event interface{}, handler VersionedEventHandler) RegisterGlobalHandler(handler VersionedEventHandler) }
VersionedEventDispatcher is responsible for routing events from the event manager to call handlers responsible for processing received events
type VersionedEventHandler ¶
type VersionedEventHandler func(VersionedEvent) error
VersionedEventHandler is a function that takes a versioned event
type VersionedEventPublicationLogger ¶
type VersionedEventPublicationLogger interface { SaveIntegrationEvent(VersionedEvent) error AllIntegrationEventsEverPublished() ([]VersionedEvent, error) GetIntegrationEventsByCorrelationID(correlationID string) ([]VersionedEvent, error) }
VersionedEventPublicationLogger is responsible to retreiving all events ever published to facilitate readmodel reconstruction
type VersionedEventPublisher ¶
type VersionedEventPublisher interface {
PublishEvents([]VersionedEvent) error
}
VersionedEventPublisher is responsible for publishing events that have been saved to the event store\repository
type VersionedEventReceiver ¶
type VersionedEventReceiver interface {
ReceiveEvents(VersionedEventReceiverOptions) error
}
VersionedEventReceiver is responsible for receiving globally published events
type VersionedEventReceiverOptions ¶
type VersionedEventReceiverOptions struct { TypeRegistry TypeRegistry Close chan chan error Error chan error ReceiveEvent chan VersionedEventTransactedAccept Exclusive bool }
VersionedEventReceiverOptions is an initalization structure to communicate to and from an event receiver go routine
type VersionedEventTransactedAccept ¶
type VersionedEventTransactedAccept struct { Event VersionedEvent ProcessedSuccessfully chan bool }
VersionedEventTransactedAccept is the message routed from an event receiver to the event manager. Sometimes event receivers designed with reliable delivery require acknowledgements after a message has been received. The success channel here allows for such acknowledgements
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package couchbase provides an event sourcing implementation in couchbase for the CQRS and Event Sourcing framework Current version: experimental
|
Package couchbase provides an event sourcing implementation in couchbase for the CQRS and Event Sourcing framework Current version: experimental |
Package example provides the example scenario utilizing couchbase and rabbitmq infastructure components func TestEventSourcingWithCouchbase(t *testing.T) { persistance, error := couchbase.NewEventStreamRepository("http://localhost:8091/") if error != nil { t.Fatal(error) } RunScenario(t, persistance) } func RunScenario(t *testing.T, persistance cqrs.EventStreamRepository) { bus := rabbit.NewEventBus("amqp://guest:guest@localhost:5672/", "example_test", "testing.example") repository := cqrs.NewRepositoryWithPublisher(persistance, bus) ...
|
Package example provides the example scenario utilizing couchbase and rabbitmq infastructure components func TestEventSourcingWithCouchbase(t *testing.T) { persistance, error := couchbase.NewEventStreamRepository("http://localhost:8091/") if error != nil { t.Fatal(error) } RunScenario(t, persistance) } func RunScenario(t *testing.T, persistance cqrs.EventStreamRepository) { bus := rabbit.NewEventBus("amqp://guest:guest@localhost:5672/", "example_test", "testing.example") repository := cqrs.NewRepositoryWithPublisher(persistance, bus) ... |
Package rabbit provides an event and command bus for the CQRS and Event Sourcing framework Current version: experimental
|
Package rabbit provides an event and command bus for the CQRS and Event Sourcing framework Current version: experimental |
Package rethinkdb provides an event sourcing implementation in rethinkdb for the CQRS and Event Sourcing framework Current version: experimental
|
Package rethinkdb provides an event sourcing implementation in rethinkdb for the CQRS and Event Sourcing framework Current version: experimental |