eventsourcing

package module
v0.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 16, 2024 License: MIT Imports: 6 Imported by: 1

README

Go Event Sourcing Library

Go Report Card

This is a robust and flexible Go library designed for implementing the Event Sourcing pattern in your applications, leveraging the power of Go's generics. Event Sourcing is a design pattern that enables you to capture changes to an application state as a sequence of events, rather than just storing the current state.

The use of generics in this library provides several advantages:

  1. Type Safety: Generics allow for compile-time type checking, reducing the likelihood of runtime errors and increasing overall code safety.
  2. Code Reusability: With generics, you can write functions or types that are abstracted over types, meaning they can be reused with different types.
  3. Improved Performance: Generics can lead to performance improvements as they eliminate the need for type assertions and reflection.
  4. Clearer and Cleaner Code: Generics result in code that's easier to read, understand, and maintain.

The library includes key components such as Aggregates, Events, and an Event Store, all fundamental to the Event Sourcing architecture. By facilitating an efficient method of handling data changes, this library serves as a valuable tool for maintaining and auditing the complete history of data changes over time in your applications.

Features

This Go Event Sourcing Library offers a broad range of features to make implementing Event Sourcing in your Go applications as straightforward and efficient as possible. Here's a rundown of the key features this library provides:

  1. Generics-Based Architecture: The library leverages Go's generics capabilities to provide type-safe and reusable components, resulting in clearer and cleaner code, improved performance, and reduced likelihood of runtime errors.

  2. Aggregate Management: It provides robust support for managing aggregates, entities with unique IDs whose state evolves over time through a sequence of events. Aggregates also have invariants that represent business rules to maintain consistency.

  3. Event Handling: The library offers comprehensive tools for creating and handling events, which are fundamental units of state change in the Event Sourcing pattern. It includes functionality to apply events to aggregates, track event metadata, and even marshal events to JSON.

  4. Event Store Interface: It provides a standard interface for an Event Store, the storage system for events, enabling the saving, loading, and retrieving of an aggregate's history. This interface can be implemented with various storage systems according to your application's needs.

  5. Version Management: It includes functionality for managing the version of an aggregate, which is incremented each time an event is applied. This feature aids in tracking the evolution of the aggregate over time.

These features come together to provide a solid foundation for implementing the Event Sourcing pattern in your Go applications, facilitating efficient data change handling and thorough auditing of changes over time.

Certainly! Here's a completed section that references the to-do list example as a practical illustration of how to use the event sourcing library:

Usage

To use this library, you would define your own aggregate states and event states that satisfy the AggregateState and EventState interfaces, respectively. Then, you can create events and apply them to aggregates, and store the aggregates in an event store.

Usage Examples

For a hands-on illustration of how to apply this library, check out our to-do list example. This repository demonstrates a simple yet effective implementation of event sourcing in a practical application: managing a to-do list. You'll see how aggregate and event states are defined and utilized within a Go application, providing a clear blueprint for incorporating event sourcing into your projects.

In this example, you will learn how to:

  • Define aggregate states for your to-do items and event states for actions such as creating a to-do or adding a task.
  • Create and apply events to these aggregates, effectively demonstrating the event sourcing process.
  • Utilize the Command Query Responsibility Segregation (CQRS) pattern to separate the read and write operations of your application, enhancing its architecture and scalability.

This example serves as a practical guide to understanding and implementing event sourcing with our library. By exploring the repository, you can gain insights into structuring your application for event sourcing and managing state through events in a real-world scenario.

Main Components

Aggregate

Aggregate represents an entity in your system that is identified by a unique ID. It encapsulates state that evolves over time through a sequence of events.

type Aggregate[S AggregateState] struct {...}
Event

Event represents something that has occurred within your system. In the context of event sourcing, events are the source of truth and they determine the current state of an aggregate. Each event is associated with an aggregate and contains data that describes the state change.

type Event[S AggregateState] struct {...}
Event Store

EventStore is the storage interface for events. It provides methods to save, load, and retrieve the history of an aggregate.

type EventStore[S AggregateState] interface {...}

Key Interfaces

AggregateState

AggregateState interface represents the state of an aggregate. It defines the Type method to return the type of the aggregate, and the Zero method to return a zero-valued instance of the aggregate state.

type AggregateState interface {...}
EventState

EventState is an interface representing an event state that can be applied to an aggregate. It has Type method to return the type of the event and Apply method to apply the event to an aggregate.

type EventState[S AggregateState] interface {...}
Identifiable

Identifiable is an interface representing objects that can be identified by an Aggregate ID. It's used to ensure that the state of an event can provide an Aggregate ID when the event is applied to an aggregate.

type Identifiable interface {...}
InceptionRecorder

InceptionRecorder is an interface representing objects that can record the inception date of an aggregate. It's used to ensure that the state of an event can record the inception date of an aggregate when the event is applied to an aggregate.

type InceptionRecorder interface {...}
ModificationRecorder

ModificationRecorder is an interface representing objects that can record the modification date of an aggregate. It's used to ensure that the state of an event can record the modification date of an aggregate when the event is applied to an aggregate.

type ModificationRecorder interface {...}
StorageNamer Interface

The v0.2.0 release introduces a significant enhancement to the Go Event Sourcing Library: the StorageNamer interface. This interface provides a customizable way for aggregates to define their corresponding storage entities, such as database tables or collections. This feature is especially beneficial as your project scales up, offering a more adaptable approach to storage naming conventions.

Definition:
type StorageNamer interface {
    StorageName() string
}
Usage:
  • Implement the StorageNamer interface in your aggregate state if you require a custom storage name different from the default aggregate state type name.
  • If an aggregate state does not implement StorageNamer, the library defaults to using the aggregate state's type as the storage identifier.
Example:

Suppose you have an aggregate state UserState. By default, the storage name would be UserState. If you need a different storage name, say user_table, implement the StorageNamer interface as shown below:

type UserState struct {...}

func (u *UserState) StorageName() string {
    return "user_table"
}

func (u *UserState) Type() string {
    return "user"
}

This implementation allows UserState to explicitly specify user_table as its storage name.

Benefits of Using StorageNamer
  • Explicit Naming: Offers a clear and explicit way to define storage names, enhancing readability and maintainability of your code.
  • Flexibility: Adapts to various storage backends and user requirements, making the library more versatile.
  • Backward Compatibility: Ensures existing codebases remain functional without modifications.
Implementation Considerations
  • This interface is optional; you can choose to implement it based on your project's requirements.
  • Ensure consistency in the naming conventions used across your project for clarity.

PostgreSQL Event Store Implementation

The Go Event Sourcing Library includes a PostgreSQL implementation of the Event Store interface. This provides the ability to persist and query your events in a PostgreSQL database, giving you the ability to leverage robust and scalable SQL database capabilities for your Event Sourcing needs.

The PostgreSQL Event Store implementation is located in the pgeventstore package, and includes the following key functionalities:

  1. Database Setup and Initialization: The library must be initialized with the necessary environment variables or config for your database connection and schema name, and establishes a connection to the PostgreSQL server. If necessary, it will also create the schema and corresponding tables for the specified aggregates in your database.

  2. Event Marshaling: The library provides a mechanism to marshal your events into a format that can be stored in PostgreSQL. It takes care of null checks and type conversions, ensuring your data is safe and reliable.

  3. Event Store Interface Implementation: This includes the following operations:

    • Load: Retrieves the latest aggregate state based on the aggregate ID and version number.
    • History: Returns the history of changes to an aggregate, based on the aggregate ID, starting from a specific version.
    • Save: Persists the changes to an aggregate into the PostgreSQL database.

The implementation also includes functionalities like creating the necessary tables and indexes in your PostgreSQL database and handling batch inserts to optimize the storage of events.

The PostgreSQL Event Store implementation is designed to be highly scalable, capable of handling large volumes of events and providing fast query performance. It leverages PostgreSQL's capabilities for handling JSON data types, making it an ideal choice for Event Sourcing applications.

For the detailed code of this implementation, please refer to the pgeventstore package in the source code. Be sure to initialize library properly for your PostgreSQL server's URL and other necessary details before running your application.

Remember to carefully handle your database connections and transactions to ensure data consistency and reliability. The library supports transaction management, allowing you to handle operations atomically and safely.

Note: Always ensure that your PostgreSQL server is properly configured and optimized for your workload. Performance can vary based on server specifications, data volume, query complexity, and other factors.

Configuration

The PostgreSQL Event Store implementation requires certain config variables to be set for its proper functioning. These are:

  • EVENT_STORE_PG_URL: This should be the connection string for your PostgreSQL database. This must be a valid connection string that includes the username, password, host, port, and database name. For example, postgresql://user:pass@localhost:5432/mydatabase.

  • EVENT_STORE_SCHEMA: This variable represents the name of the schema under which your event tables will be created. If not provided, the implementation will default to using eventsourcing as the schema name.

  • EVENT_STORE_AGGREGATES: A comma-separated list of all aggregate tables to be created in your PostgreSQL database. These tables will hold your events.

For example, if you have a user and order aggregates, you can set EVENT_STORE_AGGREGATES=user,order.

You can also initialize using pgeventstore.EventStorageConfig struct:

config := pgeventstore.EventStorageConfig{
    PostgresURL: "postgres://user:pass@localhost:5432?sslmode=disable",
    Schema:      "eventsourcing",
    Aggregates:  "user,order",
}

if err := pgeventstore.Init(config); err != nil {
    return err
}

The library uses the godotenv package for loading environment variables from a .env file. You can also set these environment variables manually in your deployment environment.

For security reasons, make sure that your database connection string (EVENT_STORE_PG_URL) is kept secure and not exposed in your code or version control system.

Remember to restart your application after changing your environment variables to ensure the changes take effect.

Contributing

Feel free to submit a pull request if you find any bugs or you want to make improvements to the library. For major changes, please open an issue first to discuss what you would like to change.

License

This project is licensed under the MIT License.


This Go library provides a solid foundation for working with Event Sourcing in Go. It is designed with flexibility in mind to allow it to be adapted to various application needs. We hope this library proves to be a valuable tool in your software development toolkit.

Documentation

Index

Constants

View Source
const LastVersion = 0

Variables

View Source
var (
	// ErrAggregateNotFound is returned when an aggregate is not found.
	ErrAggregateNotFound = errors.New("aggregate not found")
)

Functions

func StorageName added in v0.2.0

func StorageName(s AggregateState) string

Types

type Aggregate

type Aggregate[S AggregateState] struct {
	Invariants []func(*Aggregate[S]) error
	// contains filtered or unexported fields
}

Aggregate represents an aggregate in event sourcing.

func InitAggregate

func InitAggregate[S AggregateState](id string, version int, state S) *Aggregate[S]

InitAggregate initializes an aggregate with the provided id, version, and state.

func InitZeroAggregate

func InitZeroAggregate[S AggregateState](state S) *Aggregate[S]

InitZeroAggregate initializes an aggregate with the zero state.

func (*Aggregate[S]) Changes

func (a *Aggregate[S]) Changes() []*Event[S]

Changes returns all changes/events associated with the aggregate.

func (*Aggregate[S]) Check

func (a *Aggregate[S]) Check() error

Check checks all invariants of the aggregate and returns the first error encountered. Check returns nil if all Invariants passed without failing.

func (*Aggregate[S]) CheckAll

func (a *Aggregate[S]) CheckAll() []error

CheckAll checks all invariants of the aggregate and returns errors for every failed Invariants. CheckAll returns nil if all Invariants passed without failing.

func (*Aggregate[S]) Clone

func (a *Aggregate[S]) Clone() *Aggregate[S]

Clone creates a deep copy of the Aggregate.

func (*Aggregate[S]) ID

func (a *Aggregate[S]) ID() string

ID returns the ID of the aggregate.

func (*Aggregate[S]) MarshalJSON

func (a *Aggregate[S]) MarshalJSON() ([]byte, error)

MarshalJSON marshals the aggregate into JSON format.

func (*Aggregate[S]) Metadata

func (a *Aggregate[S]) Metadata() Metadata

Metadata returns the metadata of the aggregate.

func (*Aggregate[S]) Must

func (a *Aggregate[S]) Must(ii ...func(*Aggregate[S]) error)

Must replaces the current invariants with the provided ones.

func (*Aggregate[S]) MustAlso

func (a *Aggregate[S]) MustAlso(ii ...func(*Aggregate[S]) error)

MustAlso appends given invariants to the existing list of invariants.

func (*Aggregate[S]) State

func (a *Aggregate[S]) State() S

State returns the current state of the aggregate.

func (*Aggregate[S]) Type

func (a *Aggregate[S]) Type() string

Type returns the type of the aggregate.

func (*Aggregate[S]) Version

func (a *Aggregate[S]) Version() int

Version returns the version of the aggregate.

type AggregateState

type AggregateState interface {
	Type() string         // Returns the type of the aggregate state.
	Zero() AggregateState // Returns a zero value of the aggregate state.
}

AggregateState is an interface defining methods that should be implemented by any state that is part of an aggregate.

type AggregateStateEventMapper

type AggregateStateEventMapper[S AggregateState] interface {
	EventsMap() map[string]EventState[S]
}

type AggregateVersion

type AggregateVersion int

type Event

type Event[S AggregateState] struct {
	// contains filtered or unexported fields
}

Event represents an event in event sourcing.

func InitEvent

func InitEvent[S AggregateState](id string, occurredAt time.Time, state EventState[S], aggregate *Aggregate[S], data []byte, metadata []byte) (*Event[S], error)

InitEvent initializes an Event with provided id, occurredAt, state, aggregate, data, and metadata.

func NewEvent

func NewEvent[S AggregateState](event EventState[S], metadata map[string]any) *Event[S]

NewEvent creates a new event with the provided state and metadata.

func (*Event[S]) Aggregate

func (e *Event[S]) Aggregate() *Aggregate[S]

Aggregate returns the aggregate associated with the event.

func (*Event[S]) Apply

func (e *Event[S]) Apply(aggregate *Aggregate[S])

Apply applies the Event to an Aggregate. In the context of event sourcing, state changes are recorded as a sequence of events. When an event is applied to an aggregate, it changes the state of the aggregate. Here's a detailed breakdown of the steps:

  • It first increments the version of the Aggregate using the incrementVersion method. This helps in tracking the evolution of the aggregate. Each event applied to the aggregate increases its version number.

  • It then appends the Event itself to the changes slice of the Aggregate. This allows the aggregate to keep track of all events that have been applied to it.

  • It checks whether the state of the Event implements the Identifiable interface. If it does, it sets the ID of the Aggregate to the ID provided by the Event state's AggregateID method.

  • It applies the state of the Event to the Aggregate using the Apply method of the EventState. This is where the actual state change occurs.

  • Finally, it sets the Event's aggregate field to a clone of the updated Aggregate. This is crucial for keeping an accurate record of the state of the Aggregate at the time when the event was applied.

By enabling the event sourcing pattern in the provided Go codebase, this method allows the state of an Aggregate to evolve over time through the application of Events.

func (*Event[S]) ID

func (e *Event[S]) ID() string

ID returns the ID of the event.

func (*Event[S]) MarshalJSON

func (e *Event[S]) MarshalJSON() ([]byte, error)

MarshalJSON marshals the event into JSON format.

func (*Event[S]) Metadata

func (e *Event[S]) Metadata() Metadata

Metadata returns the metadata associated with the event.

func (*Event[S]) OccurredAt

func (e *Event[S]) OccurredAt() time.Time

OccurredAt returns the time when the event occurred.

func (*Event[S]) State

func (e *Event[S]) State() EventState[S]

State returns the state of the event.

func (*Event[S]) Type

func (e *Event[S]) Type() string

Type returns the type of the event.

type EventState

type EventState[S AggregateState] interface {
	Type() string
	Apply(aggregate *Aggregate[S])
}

func ParseEvent

func ParseEvent[S AggregateState](t string, eventsMap map[string]EventState[S]) (EventState[S], error)

type EventStore

type EventStore[S AggregateState] interface {
	Save(ctx context.Context, tx Transaction, aggregate ...*Aggregate[S]) (err error)
	Load(ctx context.Context, tx Transaction, aggregateID string, version AggregateVersion) (*Aggregate[S], error)
	History(ctx context.Context, tx Transaction, aggregateID string, fromVersion int, limit int) ([]*Event[S], error)
}

EventStore represents a storage for events. It provides methods to save, load, and retrieve the history of an aggregate.

type HistoryParams

type HistoryParams struct {
	AggregateID      string
	AggregateVersion AggregateVersion
}

type Identifiable

type Identifiable interface {
	// AggregateID returns the ID of the aggregate that is associated with the identifiable object.
	AggregateID() string
}

Identifiable is an interface representing objects that can be identified by an Aggregate ID. In the context of this event sourcing system, it's used to ensure that the state of an event can provide an Aggregate ID when the event is applied to an aggregate.

type InceptionRecorder

type InceptionRecorder[S AggregateState] interface {
	// RecordInception marks the time at which the entity was brought into existence.
	RecordInception(inceptionTime *time.Time, aggregate *Aggregate[S])
}

InceptionRecorder represents an interface for entities that are capable of recording the time at which they were brought into existence.

type LoadParams

type LoadParams[S AggregateState] struct {
	AggregateID      string
	Aggregate        S
	AggregateVersion AggregateVersion
}

type Metadata

type Metadata map[string]any

func (Metadata) Merge

func (m Metadata) Merge(m2 map[string]any) Metadata

type ModificationRecorder

type ModificationRecorder[S AggregateState] interface {
	// RecordModification marks the time at which the entity was recently modified.
	RecordModification(modificationTime *time.Time, aggregate *Aggregate[S])
}

ModificationRecorder represents an interface for entities that are capable of recording the time at which they were recently modified.

type StorageNamer added in v0.2.0

type StorageNamer interface {
	StorageName() string
}

type Transaction

type Transaction interface {
	Commit() error
	Rollback() error
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL