eventhorizon

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 20, 2018 License: Apache-2.0 Imports: 8 Imported by: 0

README

wercker status Coverage Status GoDoc Go Report Card

Event Horizon

Event Horizon is a CQRS/ES toolkit for Go.

Event Horizon is used in at least one production system but may not be considered stable just yet!

CQRS stands for Command Query Responsibility Segregation and is a technique where object access (the Query part) and modification (the Command part) are separated from each other. This helps in designing complex data models where the actions can be totally independent from the data output.

ES stands for Event Sourcing and is a technique where all events that have happened in a system are recorded, and all future actions are based on the events instead of a single data model. The main benefit of adding Event Sourcing is traceability of changes which can be used for example in audit logging. Additionally, "incorrect" events that happened in the past (for example due to a bug) can be compensated for with an event which will make the current data "correct", as that is based on the events.

Read more about CQRS/ES from one of the major authors/contributors on the subject: http://codebetter.com/gregyoung/2010/02/16/cqrs-task-based-uis-event-sourcing-agh/

Other material on CQRS/ES:

Inspired by the following libraries/examples:

Suggestions are welcome!

Usage

See the example folder for a basic usage example to get you started.

Storage and messaging implementations

There are simple in memory implementations of all components in the toolkit (event store, read repository, event bus, command bus). Most of these are meant for testing and development, the command bus (and in some cases the event bus) could however fulfill the needs of a production system.

In addition there is MongoDB implementations of the event store and a simple read repository, and a Redis implementation of the event bus.

There is also experimental support for AWS DynamoDB as an event store. Support for a event bus using AWS SQS is also planned but not started.

Get Involved

License

Event Horizon is licensed under Apache License 2.0

http://www.apache.org/licenses/LICENSE-2.0

Documentation

Overview

Package eventhorizon is a CQRS/ES toolkit.

Package eventhorizon is a CQRS/ES toolkit.

Index

Constants

View Source
const DefaultMinVersionDeadline = 10 * time.Second

DefaultMinVersionDeadline is the deadline to use when creating a min version context that waits.

View Source
const DefaultNamespace = "default"

DefaultNamespace is the namespace to use if not set in the context.

Variables

View Source
var ErrAggregateNotFound = errors.New("aggregate not found")

ErrAggregateNotFound is when no aggregate can be found.

View Source
var ErrAggregateNotRegistered = errors.New("aggregate not registered")

ErrAggregateNotRegistered is when no aggregate factory was registered.

View Source
var ErrCommandNotRegistered = errors.New("command not registered")

ErrCommandNotRegistered is when no command factory was registered.

View Source
var ErrCouldNotSaveEntity = errors.New("could not save entity")

ErrCouldNotSaveEntity is when a entity could not be saved.

View Source
var ErrEntityHasNoVersion = errors.New("entity has no version")

ErrEntityHasNoVersion is when an entity has no version number.

View Source
var ErrEntityNotFound = errors.New("could not find entity")

ErrEntityNotFound is when a entity could not be found.

View Source
var ErrEventDataNotRegistered = errors.New("event data not registered")

ErrEventDataNotRegistered is when no event data factory was registered.

View Source
var ErrIncorrectEntityVersion = errors.New("incorrect entity version")

ErrIncorrectEntityVersion is when an entity has an incorrect version.

View Source
var ErrIncorrectEventVersion = errors.New("mismatching event version")

ErrIncorrectEventVersion is when an event is for an other version of the aggregate.

View Source
var ErrInvalidEvent = errors.New("invalid event")

ErrInvalidEvent is when an event does not implement the Event interface.

View Source
var ErrMissingEntityID = errors.New("missing entity ID")

ErrMissingEntityID is when a entity has no ID.

View Source
var ErrNoEventsToAppend = errors.New("no events to append")

ErrNoEventsToAppend is when no events are available to append.

Functions

func CheckCommand

func CheckCommand(cmd Command) error

CheckCommand checks a command for errors.

func MarshalContext

func MarshalContext(ctx context.Context) map[string]interface{}

MarshalContext marshals a context into a map.

func MinVersionFromContext

func MinVersionFromContext(ctx context.Context) (int, bool)

MinVersionFromContext returns the min version from the context.

func NamespaceFromContext

func NamespaceFromContext(ctx context.Context) string

NamespaceFromContext returns the namespace from the context, or the default namespace.

func NewContextWithMinVersion

func NewContextWithMinVersion(ctx context.Context, minVersion int) context.Context

NewContextWithMinVersion returns the context with min version set.

func NewContextWithMinVersionWait

func NewContextWithMinVersionWait(ctx context.Context, minVersion int) (c context.Context, cancel func())

NewContextWithMinVersionWait returns the context with min version and a default deadline set.

func NewContextWithNamespace

func NewContextWithNamespace(ctx context.Context, namespace string) context.Context

NewContextWithNamespace sets the namespace to use in the context. The namespace is used to determine which database.

func RegisterAggregate

func RegisterAggregate(factory func(UUID) Aggregate)

RegisterAggregate registers an aggregate factory for a type. The factory is used to create concrete aggregate types when loading from the database.

An example would be:

RegisterAggregate(func(id UUID) Aggregate { return &MyAggregate{id} })

func RegisterCommand

func RegisterCommand(factory func() Command)

RegisterCommand registers an command factory for a type. The factory is used to create concrete command types.

An example would be:

RegisterCommand(func() Command { return &MyCommand{} })

func RegisterContextMarshaler

func RegisterContextMarshaler(f ContextMarshalFunc)

RegisterContextMarshaler registers a marshaler function used by MarshalContext.

func RegisterContextUnmarshaler

func RegisterContextUnmarshaler(f ContextUnmarshalFunc)

RegisterContextUnmarshaler registers a marshaler function used by UnmarshalContext.

func RegisterEventData

func RegisterEventData(eventType EventType, factory func() EventData)

RegisterEventData registers an event data factory for a type. The factory is used to create concrete event data structs when loading from the database.

An example would be:

RegisterEventData(MyEventType, func() Event { return &MyEventData{} })

func UnmarshalContext

func UnmarshalContext(vals map[string]interface{}) context.Context

UnmarshalContext unmarshals a context from a map.

func UnregisterCommand

func UnregisterCommand(commandType CommandType)

UnregisterCommand removes the registration of the command factory for a type. This is mainly useful in mainenance situations where the command type needs to be switched at runtime.

func UnregisterEventData

func UnregisterEventData(eventType EventType)

UnregisterEventData removes the registration of the event data factory for a type. This is mainly useful in mainenance situations where the event data needs to be switched in a migrations.

Types

type Aggregate

type Aggregate interface {
	// Entity provides the ID of the aggregate.
	Entity

	// AggregateType returns the type name of the aggregate.
	// AggregateType() string
	AggregateType() AggregateType

	// CommandHandler is used to handle commands.
	CommandHandler
}

Aggregate is an interface representing a versioned data entity created from events. It receives commands and generates events that are stored.

The aggregate is created/loaded and saved by the Repository inside the Dispatcher. A domain specific aggregate can either implement the full interface, or more commonly embed *AggregateBase to take care of the common methods.

func CreateAggregate

func CreateAggregate(aggregateType AggregateType, id UUID) (Aggregate, error)

CreateAggregate creates an aggregate of a type with an ID using the factory registered with RegisterAggregate.

type AggregateStore

type AggregateStore interface {
	// Load loads the most recent version of an aggregate with a type and id.
	Load(context.Context, AggregateType, UUID) (Aggregate, error)

	// Save saves the uncommittend events for an aggregate.
	Save(context.Context, Aggregate) error
}

AggregateStore is responsible for loading and saving aggregates.

type AggregateType

type AggregateType string

AggregateType is the type of an aggregate.

type Command

type Command interface {
	// AggregateID returns the ID of the aggregate that the command should be
	// handled by.
	AggregateID() UUID

	// AggregateType returns the type of the aggregate that the command can be
	// handled by.
	AggregateType() AggregateType

	// CommandType returns the type of the command.
	CommandType() CommandType
}

Command is a domain command that is sent to a Dispatcher.

A command name should 1) be in present tense and 2) contain the intent (MoveCustomer vs CorrectCustomerAddress).

The command should contain all the data needed when handling it as fields. These fields can take an optional "eh" tag, which adds properties. For now only "optional" is a valid tag: `eh:"optional"`.

func CreateCommand

func CreateCommand(commandType CommandType) (Command, error)

CreateCommand creates an command of a type with an ID using the factory registered with RegisterCommand.

type CommandFieldError

type CommandFieldError struct {
	Field string
}

CommandFieldError is returned by Dispatch when a field is incorrect.

func (CommandFieldError) Error

func (c CommandFieldError) Error() string

type CommandHandler

type CommandHandler interface {
	HandleCommand(context.Context, Command) error
}

CommandHandler is an interface that all handlers of commands should implement.

type CommandHandlerFunc

type CommandHandlerFunc func(context.Context, Command) error

CommandHandlerFunc is a function that can be used as a command handler.

func (CommandHandlerFunc) HandleCommand

func (h CommandHandlerFunc) HandleCommand(ctx context.Context, cmd Command) error

HandleCommand implements the HandleCommand method of the CommandHandler.

type CommandType

type CommandType string

CommandType is the type of a command, used as its unique identifier.

type ContextMarshalFunc

type ContextMarshalFunc func(context.Context, map[string]interface{})

ContextMarshalFunc is a function that marshalls any context values to a map, used for sending context on the wire.

type ContextUnmarshalFunc

type ContextUnmarshalFunc func(context.Context, map[string]interface{}) context.Context

ContextUnmarshalFunc is a function that marshalls any context values to a map, used for sending context on the wire.

type Entity

type Entity interface {
	// EntityID returns the ID of the entity.
	EntityID() UUID
}

Entity is an item which is identified by an ID.

From http://cqrs.nu/Faq: "Entities or reference types are characterized by having an identity that's not tied to their attribute values. All attributes in an entity can change and it's still "the same" entity. Conversely, two entities might be equivalent in all their attributes, but will still be distinct."

type Event

type Event interface {
	// EventType returns the type of the event.
	EventType() EventType
	// The data attached to the event.
	Data() EventData
	// Timestamp of when the event was created.
	Timestamp() time.Time

	// AggregateType returns the type of the aggregate that the event can be
	// applied to.
	AggregateType() AggregateType
	// AggregateID returns the ID of the aggregate that the event should be
	// applied to.
	AggregateID() UUID
	// Version of the aggregate for this event (after it has been applied).
	Version() int

	// A string representation of the event.
	String() string
}

Event is a domain event describing a change that has happened to an aggregate.

An event struct and type name should:

  1. Be in past tense (CustomerMoved)
  2. Contain the intent (CustomerMoved vs CustomerAddressCorrected).

The event should contain all the data needed when applying/handling it.

func NewEvent

func NewEvent(eventType EventType, data EventData, timestamp time.Time) Event

NewEvent creates a new event with a type and data, setting its timestamp.

func NewEventForAggregate

func NewEventForAggregate(eventType EventType, data EventData, timestamp time.Time,
	aggregateType AggregateType, aggregateID UUID, version int) Event

NewEventForAggregate creates a new event with a type and data, setting its timestamp. It also sets the aggregate data on it.

type EventBus

type EventBus interface {
	EventHandler

	// AddHandler adds a handler for an event.
	AddHandler(EventHandler, EventType)

	// SetPublisher sets the publisher to use for publishing the event after all
	// handlers have been run.
	SetPublisher(EventPublisher)
}

EventBus is an event handler that handles events with the correct subhandlers after which it publishes the event using the publisher.

type EventData

type EventData interface{}

EventData is any additional data for an event.

func CreateEventData

func CreateEventData(eventType EventType) (EventData, error)

CreateEventData creates an event data of a type using the factory registered with RegisterEventData.

type EventHandler

type EventHandler interface {
	// HandleEvent handles an event.
	HandleEvent(context.Context, Event) error

	// HandlerType returns the type of the handler.
	HandlerType() EventHandlerType
}

EventHandler is a handler of events. Only one handler of the same type will receive an event.

type EventHandlerType

type EventHandlerType string

EventHandlerType is the type of an event handler. Used to serve only handle an event by one handler of each type.

type EventObserver

type EventObserver interface {
	// Notify is notifed about an event.
	Notify(context.Context, Event)
}

EventObserver is an observer of events. All observers will receive an event.

type EventPublisher

type EventPublisher interface {
	// PublishEvent publishes the event to all observers.
	PublishEvent(context.Context, Event) error

	// AddObserver adds an observer.
	AddObserver(EventObserver)
}

EventPublisher is a publisher of events to observers.

type EventStore

type EventStore interface {
	// Save appends all events in the event stream to the store.
	Save(ctx context.Context, events []Event, originalVersion int) error

	// Load loads all events for the aggregate id from the store.
	Load(context.Context, UUID) ([]Event, error)
}

EventStore is an interface for an event sourcing event store.

type EventStoreError

type EventStoreError struct {
	// Err is the error.
	Err error
	// BaseErr is an optional underlying error, for example from the DB driver.
	BaseErr error
	// Namespace is the namespace for the error.
	Namespace string
}

EventStoreError is an error in the event store, with the namespace.

func (EventStoreError) Error

func (e EventStoreError) Error() string

Error implements the Error method of the errors.Error interface.

type EventStoreMaintainer

type EventStoreMaintainer interface {
	EventStore

	// Replace an event, the version must match. Useful for maintenance actions.
	// Returns ErrAggregateNotFound if there is no aggregate.
	Replace(context.Context, Event) error

	// RenameEvent renames all instances of the event type.
	RenameEvent(ctx context.Context, from, to EventType) error
}

EventStoreMaintainer is an interface for a maintainer of an EventStore. NOTE: Should not be used in apps, useful for migration tools etc.

type EventType

type EventType string

EventType is the type of an event, used as its unique identifier.

type Iter

type Iter interface {
	Next() bool
	Value() interface{}
	// Close must be called after the last Next() to retrieve error if any
	Close() error
}

Iter is a stateful iterator object that when called Next() readies the next value that can be retrieved from Value(). Enables incremental object retrieval from repos that support it. You must call Close() on each Iter even when results were delivered without apparent error.

type ReadRepo

type ReadRepo interface {
	// Parent returns the parent read repository, if there is one.
	// Useful for iterating a wrapped set of repositories to get a specific one.
	Parent() ReadRepo

	// Find returns an entity for an ID.
	Find(context.Context, UUID) (Entity, error)

	// FindAll returns all entities in the repository.
	FindAll(context.Context) ([]Entity, error)
}

ReadRepo is a read repository for entities.

type ReadWriteRepo

type ReadWriteRepo interface {
	ReadRepo
	WriteRepo
}

ReadWriteRepo is a combined read and write repo, mainly useful for testing.

type RepoError

type RepoError struct {
	// Err is the error.
	Err error
	// BaseErr is an optional underlying error, for example from the DB driver.
	BaseErr error
	// Namespace is the namespace for the error.
	Namespace string
}

RepoError is an error in the read repository, with the namespace.

func (RepoError) Error

func (e RepoError) Error() string

Error implements the Error method of the errors.Error interface.

type UUID

type UUID string

UUID is a unique identifier, based on the UUID spec. It must be exactly 16 bytes long.

func NewUUID

func NewUUID() UUID

NewUUID creates a new UUID of type v4.

func ParseUUID

func ParseUUID(s string) (UUID, error)

ParseUUID parses a UUID from a string representation. ParseUUID creates a UUID object from given hex string representation. The function accepts UUID string in following formats:

ParseUUID("6ba7b814-9dad-11d1-80b4-00c04fd430c8")
ParseUUID("{6ba7b814-9dad-11d1-80b4-00c04fd430c8}")
ParseUUID("urn:uuid:6ba7b814-9dad-11d1-80b4-00c04fd430c8")

func (UUID) MarshalJSON

func (id UUID) MarshalJSON() ([]byte, error)

MarshalJSON turns UUID into a json.Marshaller.

func (UUID) String

func (id UUID) String() string

String implements the Stringer interface for UUID.

func (*UUID) UnmarshalJSON

func (id *UUID) UnmarshalJSON(data []byte) error

UnmarshalJSON turns *UUID into a json.Unmarshaller.

type Versionable

type Versionable interface {
	// AggregateVersion returns the version of the item.
	AggregateVersion() int
}

Versionable is an item that has a version number, used by version.ReadRepo.FindMinVersion().

type WriteRepo

type WriteRepo interface {
	// Save saves a entity in the storage.
	Save(context.Context, Entity) error

	// Remove removes a entity by ID from the storage.
	Remove(context.Context, UUID) error
}

WriteRepo is a write repository for entities.

Directories

Path Synopsis
aggregatestore
commandhandler
bus
eventbus
eventhandler
eventstore
examples
guestlist/memory
Package memory contains an example of a CQRS/ES app using memory as DB.
Package memory contains an example of a CQRS/ES app using memory as DB.
guestlist/mongodb
Package mongodb contains an example of a CQRS/ES app using the MongoDB adapter.
Package mongodb contains an example of a CQRS/ES app using the MongoDB adapter.
publisher
gcp
repo

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL