eventsourcing

package module
v0.0.0-...-eeebc8f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 20, 2018 License: MIT Imports: 5 Imported by: 0

README

eventsourcing

Build Status GoDoc codecov Go Report Card stability-experimental

A framework for implementing the event-sourcing pattern easily in Go.

Installation

To install this package, please use gopkg.in instead of Github:

  go get gopkg.in/go-gadgets/eventsourcing.v0

Features

The features of this framework are:

  • Low-ceremony:
    • The counter-example is less than 150 lines of code, including snapshot support, Mongo persistence and a web-server API.
  • Pluggable event-store engines:
    • DynamoDB
    • MongoDB
    • In-Memory
    • Middleware support
      • Ability to mutate store/load operations with custom functions for any store
      • Snapshotting
      • DynamoDB
        • MongoDB
        • In-Memory
        • Redis
      • Logging (with Logrus)
  • Quick-Start helper types:
    • The AggregateBase type allows for fast creation of aggregates and uses reflection in order to wire-up event replay methods.
  • Simple structure annotations:
    • Just use the json:"name" tag on your aggregates/events to persist fields, without worrying about your underlying storage engine.

What is Event-Sourcing?

Event-Sourcing is an architectural pattern in which the state of an entity in your application is modelled as a series of events, mutating the state. For example, we may store the history of a bank account:

Aggregate Key Sequence Event Data
123456 1 Account Created
123456 2 Deposit ($50)
123456 3 Withdrawl ($25)

If we now had to consider a bank account withdrawl, we would:

  • Create an empty (Sequence-0) bank account entity.
  • Fetch events from the event-store and apply them to the model
    • Repeat until we have completely rehydrated the state.
  • Check the 'Balance' property of the model
  • Write the new event.

At any given point in time, we can track and identify the state of an entity. It's also possible to understand exactly the sequence of events that led to an outcome being selected.

Creating Your Aggregate-Root

An aggregate (root) is an entity that's defined by the series of events that happen to it. In this simple example (found under /examples/counter within this repository), we'll look at an aggregate that counts the times it's incremented:

var registry eventsourcing.EventRegistry

func init() {
	registry = eventsourcing.NewStandardEventRegistry("ExampleDomain")
	registry.RegisterEvent(IncrementEvent{})
}

type CounterAggregate struct {
	eventsourcing.AggregateBase 
	Count int 
}

// IncrementEvent is an event that moves the counter up.
type IncrementEvent struct {
}

// Initialize the aggregate
func (agg *CounterAggregate) Initialize(key string, store eventsourcing.EventStore, state eventsourcing.StateFetchFunc) {
	agg.AggregateBase.Initialize(key, registry, store, state)
	agg.AutomaticWireup(agg)
}

// HandleIncrementCommand handles an increment command from the bus.
func (agg *CounterAggregate) HandleIncrementCommand(command IncrementCommand) ([]eventsourcing.Event, error) {
  // Insert domain rules here.

	// Raise the events
	return []eventsourcing.Event{
		IncrementEvent{},
	}, nil
}
// ReplayIncrementEvent updates the counter by adding one.
func (agg *CounterAggregate) ReplayIncrementEvent(event IncrementEvent) {
	agg.Count++
}

In this example we have:

  • A registry
    • A registry identifies the types of events that apply to a model, acting as a helper for mapping stored events back to real types.
  • CounterAggregate
    • Our aggregate-root type, which leverages the eventsourcing.AggregateBase type for implementing some common functionality.
  • IncrementEvent
    • An event that when replayed, bumps the count up.

Note the use of AutomaticWireup(agg): this is a helper function that scans a type using reflection and configures:

  • Replay Functions
    • Used to recover the present state of an aggregate from the event stream.
    • Use methods that match the Replay<EventTypeName>(event <EventTypeName>) method signature.
  • Command Handlers
    • Used to execute commands agains the model.
    • Use methods that match the Handle<CommandTypeName>(command <CommandTypeName>) ([]eventsourcing.Event, error) method signature.

To run this code, we can leverage a memory based store:

package main

import (
	"github.com/gin-gonic/gin"
	"github.com/go-gadgets/eventsourcing/stores/memory"
)

func main() {
	gin.SetMode(gin.ReleaseMode)
	store := memory.NewStore()

	r := gin.Default()
	r.POST("/:name/increment", func(c *gin.Context) {
		name := c.Param("name")
		agg := CounterAggregate{}
		agg.Initialize(name, store, func() interface{} { return &agg })

		errCommand := agg.Handle(IncrementCommand{})

		if errCommand != nil {
			c.JSON(500, errCommand.Error())
			return
		}

		// Show the count
		c.JSON(200, gin.H{
			"count": agg.Count,
		})

	})
  
  r.Run() // Listen and serve on 0.0.0.0:8080
}

Questions & Answers

Can I call external methods in my Replay functions?

Don't! Stop! Replay methods should act atomically upon the aggregate and only the aggregate - not calling out to anything else that could impact the decision or control flow. This is a mandatory element for reliable event-sourcing:

  • Events represent something that has happened: you should not make decisions about this.
  • If you rely on externalities, the aggregate state could be different between two refreshes/loads.

You should call any externalities in your Command handling functions, and then once you're satisfied that the model can mutate, you raise the events.

When my Aggregate has a long life, operations are slow - Why?

When loading events from the backing stores, all prior events must be loaded and processed in order to 'catch up' and execute the command versus the latest state. In the case of long-lived aggregates, this can be one hell of a long history and will be accordingly slow. It's recommended that:

  • Use Snapshotting
    • All built-in providers support snapshotting, allowing the state of an aggregate to be cached every N events. This allows for faster replay, since only the last N events at most need to be fetched from the backing store.
  • Spread the Events Among Aggregates
    • Aggregates should be numerous. This allows for scaling through sharding with most providers for backing storage.
    • If you've got a single large aggregate that's your entire application, something needs to change there.

If you follow these practices, you'll get great performance - and the ability to scale.

Why are you using Reflection?

There's a few spots where it's required to use reflection/marshalling of types from generic structures (i.e. Turning BSON/JSON back into a structure or vice-versa). Some other areas which leverage reflection can be avoided if you're prepared to do a little bit of extra leg-work:

  • AggregateBase
    • The AggregateBase can be used with the AutomaticWireup method to dynamically register event handler methods and command handlers.
    • If you dont call .AutomaticWireup, you can:
      • For each event type, call agg.DefineReplayMethod(eventType, replay func(Event)) to manually define the event type.

In short, if you're keen to avoid those reflecftion calls - you can - but there's a price to pay in terms of code effort - and if you're using a real-world backing store generally it won't be a good trade.

Where is the Event-Bus Concept?

This is something that most (possibly all) event-sourcing implementations out there do poorly it seems. The challenges are that document-databases for storing events either have one of two limitations:

  • Maximum document size (i.e. Some frameworks store all events as an Array on a single document, meaning serious history limits)
    • Nothing stops you writing a Mongo-store variant that implements this pattern here.
  • Non-Transactionality of the bus (i.e. Even though it got into MongoDB, did it get published/acted on elsewhere?)

If you can accept these shortcomings, there is a publisher middleware that allows a distribution publisher to be attached to the event storage process. However, for applications where transactional integrity is paramount it is recommended that you:

  • Use your event-stores native back-end to propegate events to subscribers via an intermediary.
    • Mongo - Tail the oplog on your database into Kafka.
    • DynamoDB - Enable kinesis streams and attach handlers for distribution to those.
  • Design aggregate events such that one-command triggers one-event in the general case, or that commands are retryable in the case of not all events getting published to the store (i.e. Of 10 events, only first 5 got written in a batch)

This ultimately reduces the complexity required in the package, but also means that you'll get a more reliable delivery of events to targets.

What About Command-Buses?

Making a generic command-bus is essentially an exercise in reflection-abuse in Go, so instead the library is currently focused on making BYO-bus as easy as possible. The preferred pattern for this project is that your model gets exposed as a service (i.e. HTTP-ReST or similiar) and then people interact with that, without reference to the fact that under the hood you are using event-sourcing.

How do I do Consistent Reads?

Use the aggregate.Run((cb) => {}) methods. During the callback the aggregate will be revived to the latest/current state. Be mindful of using this excessively though and instead bias towards using projections, unless there is a distinct and genuine reason to hit your event-store with the read commands.

Documentation

Overview

Package eventsourcing contains the core implementation of an event-sourcing framework written in Golang.

Index

Constants

View Source
const (
	// HandleMethodPrefix is a prefix used for command handler methods
	HandleMethodPrefix = "Handle"

	// ReplayMethodPrefix is the prefix used for event replay methods
	ReplayMethodPrefix = "Replay"
)
View Source
const (
	// EventHandleMethodPrefix is the prefix for handler auto-wireup methods
	EventHandleMethodPrefix = "Handle"
)

Variables

This section is empty.

Functions

func DefaultTestStoreFilter

func DefaultTestStoreFilter() error

DefaultTestStoreFilter is an error filter that doesn't fail, which is the default.

func NewConcurrencyFault

func NewConcurrencyFault(aggregateKey string, eventSequence int64) error

NewConcurrencyFault creates an error from the specified fault code

func NewDomainFault

func NewDomainFault(aggregateKey string, faultCode string) error

NewDomainFault creates an error from the specified fault code

func NormalizeTypeName

func NormalizeTypeName(name string) string

NormalizeTypeName the event name of an event so that we remove the go-supplied package name

func Retry

func Retry(limit int, body func() error) error

Retry retries a block of code, until it hits a limit or the concurrency fault does not occur.

Types

type Adapter

type Adapter interface {
	// GetKey fetches the aggregate key
	GetKey() string
}

Adapter is an interface that exposes state information about the aggregate being operated on.

type AdapterPositional

type AdapterPositional interface {
	Adapter

	// SequenceNumber fetches the current sequence number
	SequenceNumber() int64
}

AdapterPositional is an adapter which can introspect about where an aggregate is at in terms of it's history.

type AdapterWithEvents

type AdapterWithEvents interface {
	AdapterPositional

	// GetEventRegistry gets the event registry to use
	GetEventRegistry() EventRegistry

	// IsDirty returns true if the aggregate has uncommitted state.
	IsDirty() bool
}

AdapterWithEvents is variant of Adapter that is required where components need to reason about event types in an abstract way (i.e. Event Reader adapters)

type Aggregate

type Aggregate interface {
	// Initialize sets up the initial state of the aggregate.
	Initialize(key string, registry EventRegistry, store EventStore)

	// ApplyEvent applies an event that has occurred to the aggregate
	// instance to mutate its state. Events that are not recognized are
	// ignored, and all event application is fail-safe.
	ApplyEvent(Event)

	// Commit commits the state of the aggregate, persisting any
	// new events to the store.
	Commit() error

	// Refresh recovers the state of the aggregate from the underlying
	// store.
	Refresh() error

	// GetState gets the state of an aggregate
	GetState() interface{}
}

Aggregate is the interface for an event-sourced aggregate root. All common behaviours of an aggregate expected by the runtime are defined here.

type AggregateBase

type AggregateBase struct {
	// contains filtered or unexported fields
}

AggregateBase is an implementation of Aggregate that provides a lot of shared boilerplate code.

func (*AggregateBase) ApplyEvent

func (agg *AggregateBase) ApplyEvent(event Event)

ApplyEvent applies an event that has occurred to the aggregate base instance to mutate its state. Events that are not recognized are ignored, and all event application should be fail-safe.

func (*AggregateBase) AutomaticWireup

func (agg *AggregateBase) AutomaticWireup(subject interface{})

AutomaticWireup performs automatic detection of event replay methods, looking for applyEventName methods on the current type.

func (*AggregateBase) Commit

func (agg *AggregateBase) Commit() error

Commit commits the state of the aggregate, marking all events as having been accepted by a backing store. This does not itself cause persistence to occur.

func (*AggregateBase) DefineReplayMethod

func (agg *AggregateBase) DefineReplayMethod(eventType EventType, replay func(Event))

DefineReplayMethod defines a method that replays events of a given event type.

func (*AggregateBase) GetKey

func (agg *AggregateBase) GetKey() string

GetKey fetches the key of this aggregate instance.

func (*AggregateBase) Handle

func (agg *AggregateBase) Handle(command Command) error

Handle processes a command against the aggregate.

func (*AggregateBase) Initialize

func (agg *AggregateBase) Initialize(key string, registry EventRegistry, store EventStore, state StateFetchFunc)

Initialize sets the initial state of the AggregateBase and ensures we are in a suitable situation to start reasoning about the events that will happen.

func (*AggregateBase) Refresh

func (agg *AggregateBase) Refresh() error

Refresh reloads the current state of the aggregate from the underlying store.

func (*AggregateBase) Run

func (agg *AggregateBase) Run(callback func() error) error

Run performs a load, mutate, commit cycle on an aggregate

func (*AggregateBase) SequenceNumber

func (agg *AggregateBase) SequenceNumber() int64

SequenceNumber gets the current sequence number of the aggregate.

func (*AggregateBase) State

func (agg *AggregateBase) State() interface{}

State gets the current state of an aggregate for any process that is interested

type CloseMiddleware

type CloseMiddleware func() error

CloseMiddleware shuts down a middleware, if present

type Command

type Command interface {
}

Command is an interface that describes commands common attributes

type CommandHandleFunc

type CommandHandleFunc func(command Command) ([]Event, error)

CommandHandleFunc is a function that handles a command directly.

type CommandHandler

type CommandHandler interface {
	// Handle a command, returning the resultant events (or an error)
	Handle(command Command) ([]Event, error)
}

CommandHandler is an interface that describes the operations available on an instance that can follow the command-handler pattern.

type CommandRegistry

type CommandRegistry interface {
	// CreateCommand creates an instance of an event
	CreateCommand(CommandType) Command

	// Domain this registry contains commands for
	Domain() string

	// GetCommandType determines the CommandType of a Command
	GetCommandType(interface{}) (CommandType, bool)

	// RegisterCommand registers a command
	RegisterCommand(Command) CommandType
}

CommandRegistry defines a per-aggregate type registry of the commands that are known to a specific aggregate.

func NewStandardCommandRegistry

func NewStandardCommandRegistry(domain string) CommandRegistry

NewStandardCommandRegistry creates an instance of a plain CommandRegistry that stores information about command types in an internal map. The string parameter is the name of the domain/bounded-context in which our commands live.

type CommandType

type CommandType string

CommandType is a string-alias that represents a commands type, which can be used in maps.

type CommitMiddleware

type CommitMiddleware func(writer StoreWriterAdapter, next NextHandler) error

CommitMiddleware is middleware that handles commit operations, allowing for intercepting or other operations.

type ConcurrencyFault

type ConcurrencyFault struct {
	AggregateKey  string `json:"aggregate_key"`
	EventSequence int64  `json:"event_sequence"`
}

ConcurrencyFault represents an error that occurred when updating an aggregate: specifically that we have tried to insert events at an index that is already defined. This means the client likely needs to re-run the command to break the deadlock, as someone else executed first.

func IsConcurrencyFault

func IsConcurrencyFault(err error) (bool, *ConcurrencyFault)

IsConcurrencyFault determines if the specified error is a ConcurrencyFault

func (ConcurrencyFault) Error

func (curr ConcurrencyFault) Error() string

Error returns the ConcurrencyFault formatted as a string to meet the Error interface.

type DomainFault

type DomainFault struct {
	// AggregateKey that had the fault
	AggregateKey string `json:"aggregate_key"`

	// FaultCode for the domain fault
	FaultCode string `json:"fault_code"`
}

DomainFault represents an error that has arisen during a command that indicates the command is invalid within the domain. This can be any application-relevant incident (i.e. attempting to overdraw a a bank account)

func IsDomainFault

func IsDomainFault(err error) (bool, *DomainFault)

IsDomainFault determines if the specified error is a DomainFault

func (DomainFault) Error

func (curr DomainFault) Error() string

Error returns the DomainFault formatted as a string to meet the Error interface.

type Event

type Event interface {
}

Event is an interface that describes common attributes of events.

type EventConsumer

type EventConsumer interface {
	// Start consuming
	Start() error

	// Stop consuming
	Stop() error

	// AddHandler adds a handler to the set of handlers for this consumer.
	AddHandler(handler EventHandler)
}

EventConsumer is an interface that describes a consumer that allows multiple handlers to be attached, allowing events to be multiplexed to the handlers without needing to consume the same stream multiple times.

type EventDefinition

type EventDefinition struct {
	// Detector is a function that determines if a specific runtime event
	// matches this event revisions type.
	Detector EventDetector

	// Factory method to create an instance of the event for this specific version.
	Factory EventFactory
}

EventDefinition defines the structure of an event.

type EventDetector

type EventDetector func(interface{}) bool

An EventDetector is a function that determines if the streamed event is an instance of the specified event revision. True indicates a match, false indicates a mis-match.

type EventFactory

type EventFactory func() Event

EventFactory is a function that creates an event instance of a given type, ready to work with.

type EventHandler

type EventHandler interface {
	// Handle the specified event and apply any consequences.
	Handle(event PublishedEvent) error
}

EventHandler is an interface that handles events that have been delivered from a publishing source

type EventHandlerBase

type EventHandlerBase struct {
	// contains filtered or unexported fields
}

EventHandlerBase is a common base type for an event handler that takes events from a publishing source and handles them.

func (*EventHandlerBase) AutomaticWireup

func (base *EventHandlerBase) AutomaticWireup(subject interface{})

AutomaticWireup performs automatic detection of consumer methods

func (*EventHandlerBase) Handle

func (base *EventHandlerBase) Handle(event PublishedEvent) error

Handle processes an event

func (*EventHandlerBase) Initialize

func (base *EventHandlerBase) Initialize(registry EventRegistry, self interface{})

Initialize the EventHandlerBase

type EventPublisher

type EventPublisher interface {
	// Publish an event. When the method returns the event should be committed/guaranteed
	// to have been distributed.
	Publish(key string, sequence int64, event Event) error
}

EventPublisher is an interface that describes an event publisher sink that allows events to be distributed to other components.

type EventRegistry

type EventRegistry interface {
	// CreateEvent creates an instance of an event
	CreateEvent(EventType) Event

	// Domain this registry contains events for
	Domain() string

	// GetEventType determines the EventType of an event
	GetEventType(interface{}) (EventType, bool)

	// RegisterEvent registers an event
	RegisterEvent(Event) EventType
}

EventRegistry defines a per-aggregate type registry of the events that are known to a specific aggregate.

func NewStandardEventRegistry

func NewStandardEventRegistry(domain string) EventRegistry

NewStandardEventRegistry creates an instance of a plain EventRegistry that stores information about event types in an internal map. The string parameter is the name of the domain/bounded-context in which our events live.

type EventStore

type EventStore interface {
	// CommitEvents stores any events for the specified aggregate that are uncommitted
	// at this point in time.
	CommitEvents(writer StoreWriterAdapter) error

	// Refresh refreshes the state of the specified aggregate from the underlying store
	Refresh(reader StoreLoaderAdapter) error

	// Close shuts down the storage driver.
	Close() error
}

EventStore defines the behaviours of a store that can load/save event streams for an aggregate.

type EventStoreWithMiddleware

type EventStoreWithMiddleware interface {
	EventStore

	// Use a middleware
	Use(commit CommitMiddleware, refresh RefreshMiddleware, cleanup func() error)

	// HandleCleanup registers a cleanup/shutdown handler
	HandleCleanup(cleanup func() error)

	// HandleCommit registers middleware to handle commits
	HandleCommit(middleware CommitMiddleware)

	// HandleRefresh registers middleware to handle refreshes
	HandleRefresh(middleware RefreshMiddleware)
}

EventStoreWithMiddleware is an interface that describes an event-store with middleware support.

func NewMiddlewareWrapper

func NewMiddlewareWrapper(inner EventStore) EventStoreWithMiddleware

NewMiddlewareWrapper is an event-store wrapper that provides the ability to insert middleware into the pipeline.

type EventType

type EventType string

EventType is a string alias that represents the type of an event.

type MiddlewareFactory

type MiddlewareFactory func() (CommitMiddleware, RefreshMiddleware, CloseMiddleware)

MiddlewareFactory is a middleware callback that provides all 3 items.

type NextHandler

type NextHandler func() error

NextHandler is a callback function that runs the next handler in a middleware chain.

type PublishedEvent

type PublishedEvent struct {
	Domain   string      `json:"domain"`     // Domain the event belong sto
	Type     EventType   `json:"event_type"` // EventType
	Key      string      `json:"key"`        // Event key
	Sequence int64       `json:"sequence"`   // Sequence number
	Data     interface{} `json:"data"`       // Data
}

PublishedEvent is a record of an event that's published to a queue or sink

type RefreshMiddleware

type RefreshMiddleware func(reader StoreLoaderAdapter, next NextHandler) error

RefreshMiddleware is middleware that handles refresh/load operations, allowing for interception or other operations

type StateFetchFunc

type StateFetchFunc func() interface{}

StateFetchFunc is a function that returns the state-value.

type StoreLoaderAdapter

type StoreLoaderAdapter interface {
	AdapterWithEvents

	// ReplayEvent applies an event that has already been persisted
	ReplayEvent(event Event)

	// RestoreSnapshot applies a snapshot state, if available
	RestoreSnapshot(sequence int64, state interface{}) error
}

StoreLoaderAdapter represents an adapter that can be used to modify an aggregate in response to a load/refresh operation

type StoreWriterAdapter

type StoreWriterAdapter interface {
	AdapterWithEvents

	// GetUncommittedEvents gets the committed sequence number, and any
	// events that have been added since hte last commit. This can been
	// used by a backing store to write data.
	GetUncommittedEvents() (int64, []Event)

	// GetState returns the state of the aggregate in it's current
	// sequence/position, which may be required when snapshotting.
	GetState() interface{}
}

StoreWriterAdapter is an adapter interface that defines the inputs an aggregate gives to a store for writing/committing new events.

type TestStore

type TestStore struct {
	History     []TestStoreHistoryItem
	ErrorFilter func() error
	// contains filtered or unexported fields
}

TestStore is our mock-store type

func NewTestStore

func NewTestStore() *TestStore

NewTestStore creates a new EventStore instance that uses the Mock provider. This allows us to track and observe actions, and can be used to validate the correctness of other implementations or actions in unit tests.

func (*TestStore) Close

func (store *TestStore) Close() error

Close the test store

func (*TestStore) CommitEvents

func (store *TestStore) CommitEvents(writer StoreWriterAdapter) error

CommitEvents stores the events

func (*TestStore) Refresh

func (store *TestStore) Refresh(reader StoreLoaderAdapter) error

Refresh recovers the state of an aggregate from a known state.

func (*TestStore) When

func (store *TestStore) When(key string, offset int64, events []Event, state interface{})

When sets up a configured state that can be used for refreshes.

type TestStoreFilter

type TestStoreFilter func() error

TestStoreFilter is a callback that decides whether or not to fail the next operation with an error

func TestStoreFailureFilter

func TestStoreFailureFilter(err error) TestStoreFilter

TestStoreFailureFilter is an error filter that is used to ensure a store fails in the prescribed way.

type TestStoreHistoryItem

type TestStoreHistoryItem struct {
	Key    string      // Key of the aggregate
	Offset int64       // Offset that this set of events is from
	Events []Event     // Events
	State  interface{} // State instance
}

TestStoreHistoryItem is the set of history items recorded by an event store.

Directories

Path Synopsis
distribution
examples
stores
key-value
Package keyvalue contains a base implementation of much of the common logic required for a scale-out tablestore implementation of an event store.
Package keyvalue contains a base implementation of much of the common logic required for a scale-out tablestore implementation of an event store.
utilities

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL