eventsourcing

package module
v0.0.0-...-a320c80 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 13, 2023 License: MIT Imports: 5 Imported by: 0

README

Event Sourcing in Go

The goal of this project is to implement event-sourcing pattern with minimal language requirements, and reflection usage, and to provide maximal simplicity. Also, in the future I am going to write some articles about how this library works in great detail.

Articles

Read these articles if you wanna understand Event Sourcing solidly.

Brief excursion

Lets separate the general Event Sourcing into 2 parts: event and aggregate cluster.

  • Event is a something that happened in the past. Events indicate that something within the domain has changed. They contain all the information that is needed to transform the state of the domain from one version to the next.
  • Aggregate cluster (or event stream) is a cluster of associated objects treated as a single unit. Every aggregate has its own event stream. Therefore every event must be stored together with an identifier for its aggregate. This ID is often called AggregateId.

Each aggregate cluster has its own Transition(Event) function that determines in which way event should be aggregated. For example, we have PaymentAggregate cluster and PaymentAggregateCreate, PaymentAggregateConfirmed, PaymentAggregateRefunded events.

Each event has Reason field that describes reason for aggregation. By this Reason field we can understand what exactly transition we should perform.

func (pa *PaymentAggregator) Transition(evt event.Eventer) error {
	switch evt.GetReason() {
	case PaymentAggregateReasonCreated:
		return pa.onCreated(evt)
	case PaymentAggregateReasonConfirmed:
		return pa.onConfirmed(evt)
	case PaymentAggregateReasonRefunded:
		return pa.onRefunded(evt)
	}
	return errors.New("undefined event type")
}
Let's implement aggregate cluster

In case of our example we will implement PaymentAggregator that responsibles for payment aggregations. The every aggregation structure should use *eventsourcing.AggregateCluster through composition (to implement event.Aggregator interface).

type PaymentAggregator struct {
	*eventsourcing.AggregateCluster // composition
	// General.
	PaymentID              string
	PaymentStatus          string
	PaymentAmount          int
	PaymentAvailableAmount int
	PaymentRefundAmount    int
}

Aggregator should provide event.Transition function that has func(event Eventer) error signature. This function will be called every time when aggregate cluster does transition. Also, we have to define some event reason, each reason to each event.

const (
    // Reason for paymentCreatedEvent
	PaymentAggregateReasonCreated   = "created"
    // Reason for paymentConfirmedEvent
	PaymentAggregateReasonConfirmed = "confirmed"
    // Reason for paymentRefundedEvent
	PaymentAggregateReasonRefunded  = "refunded"
)

func (pa *PaymentAggregator) Transition(evt event.Eventer) error {
	switch evt.GetReason() {
	case PaymentAggregateReasonCreated:
		return pa.onCreated(evt)
	case PaymentAggregateReasonConfirmed:
		return pa.onConfirmed(evt)
	case PaymentAggregateReasonRefunded:
		return pa.onRefunded(evt)
	}
    // In case if there is external reason that we don't know throw an error
	return errors.New("undefined event type")
}
Let's define events

As far as we have 3 event reasons we should implement 3 events: "created", "confirmed", "refunded"

type paymentCreatedEvent struct {
	PaymentID              string
	PaymentStatus          string
	PaymentAmount          int
	PaymentAvailableAmount int
}

func (pa *PaymentAggregator) onCreated(evt event.Eventer) error {
	var payload paymentCreatedEvent
	if err := json.Unmarshal(evt.GetPayload(), &payload); err != nil {
		return err
	}
	pa.PaymentID = payload.PaymentID
	pa.PaymentStatus = payload.PaymentStatus
	pa.PaymentAmount = payload.PaymentAmount
	pa.PaymentAvailableAmount = payload.PaymentAvailableAmount
	return nil
}

type paymentConfirmedEvent struct {
	PaymentStatus string
}

func (pa *PaymentAggregator) onConfirmed(evt event.Eventer) error {
	var payload paymentConfirmedEvent
	if err := json.Unmarshal(evt.GetPayload(), &payload); err != nil {
		return err
	}
	pa.PaymentStatus = payload.PaymentStatus
	return nil
}

type paymentRefundEvent struct {
	PaymentRefundAmount int
}

func (pa *PaymentAggregator) onRefunded(evt event.Eventer) error {
	var payload paymentRefundEvent
	if err := json.Unmarshal(evt.GetPayload(), &payload); err != nil {
		return err
	}
	pa.PaymentRefundAmount = payload.PaymentRefundAmount
	if pa.PaymentRefundAmount > pa.PaymentAmount {
		return errors.New("refund amount is greated than entire payment amount")
	}
	pa.PaymentAvailableAmount = pa.PaymentAmount - pa.PaymentRefundAmount
	return nil
}
Let's compose it together and apply events
func main() {
    agg := &PaymentAggregator{}
    agg.AggregateCluster = eventsourcing.New(agg, agg.Transition, eventsourcing.UUIDGenerator)

    // Define paymentCreatedEvent event to further saving
    createdEvent, err := event.New(PaymentAggregateReasonCreated, paymentCreatedEvent{
			PaymentID:              "id_0",
			PaymentStatus:          "created",
			PaymentAmount:          100,
			PaymentAvailableAmount: 100,
		})
    if err != nil {
        panic(err)
    }

    // Apply createdEvent into PaymentAggregator cluster (derivate state)
    if err := agg.Apply(createdEvent); err != nil {
        panic(err)
    }

    // Save committed events into database...
}
Eventstore

At the moment only PostgreSQL supports from the box. Note: table structure should be exactly as defined in eventstore/postgresql/migrate.go

You can implement your own repository (for MySQL, EventStore DB, etc...) by eventstore.Repository interface.

Event serialization

By default all events serializes in JSON. At the moment there support for: json, bson format. These formats implement event.Serializer interface. There is MatchedSerializers variable (map) that defines SerializerType to serializer implementation.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrEventDuplication = errors.New("event duplication, event is already exist")

Functions

func NanoidGenerator

func NanoidGenerator(alphabet string, size int) string

func UUIDGenerator

func UUIDGenerator(_ string, _ int) string

Types

type AggregateCluster

type AggregateCluster struct {
	// contains filtered or unexported fields
}

func New

func New(agg event.Aggregator, transition event.Transition, idgenfn IDGenerator) *AggregateCluster

func (*AggregateCluster) Apply

func (r *AggregateCluster) Apply(evt event.Eventer) error

Apply applies not committed yet event. The event Id, Type, Version will be replaced with current AggregateCluster Id, Type and Version.

func (*AggregateCluster) ApplyCommitted

func (r *AggregateCluster) ApplyCommitted(evt event.Eventer) error

ApplyCommitted applies already committed event. The AggregateCluster state id, type, version will be replaced with current event id, type and version.

func (*AggregateCluster) Commit

func (r *AggregateCluster) Commit(evt event.Eventer) error

Commit commits event and deletes from committed events list.

func (*AggregateCluster) GetId

func (r *AggregateCluster) GetId() string

func (*AggregateCluster) GetType

func (r *AggregateCluster) GetType() string

func (*AggregateCluster) GetVersion

func (r *AggregateCluster) GetVersion() event.Version

func (*AggregateCluster) ListCommittedEvents

func (r *AggregateCluster) ListCommittedEvents() []event.Eventer

ListCommittedEvents returns a list of already committed events.

func (*AggregateCluster) ListUncommittedEvents

func (r *AggregateCluster) ListUncommittedEvents() []event.Eventer

ListUncommittedEvents returns a list of not committed yet events.

func (*AggregateCluster) SetId

func (r *AggregateCluster) SetId(id string)

func (*AggregateCluster) SetType

func (r *AggregateCluster) SetType(typ string)

func (*AggregateCluster) SetVersion

func (r *AggregateCluster) SetVersion(version event.Version)

type IDGenerator

type IDGenerator func(alphabet string, size int) string

IDGenerator is a function that generates random a string. Uses passed alphabet to generate a string only from provided characters and for predefined size.

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL