ego

package module
v3.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 3, 2025 License: MIT Imports: 24 Imported by: 0

README

eGo

build Go Reference GitHub go.mod Go version GitHub Release codecov

eGo is a minimal library that help build event-sourcing and CQRS application through a simple interface, and it allows developers to describe their commands, events and states are defined using google protocol buffers . Under the hood, ego leverages Go-Akt to scale out and guarantee performant, reliable persistence.

Table of Content

Installation

go get github.com/tochemey/ego

Versioning

The version system adopted in eGo deviates a bit from the standard semantic versioning system. The version format is as follows:

  • The MAJOR part of the version will stay at v3 for the meantime.
  • The MINOR part of the version will cater for any new features, breaking changes with a note on the breaking changes.
  • The PATCH part of the version will cater for dependencies upgrades, bug fixes, security patches and co.

The versioning will remain like v3.x.x until further notice.

Features

Event Sourced Behavior

The EventSourcedBehavior is crucial for maintaining data consistency, especially in distributed systems. It defines how to handle the various commands (requests to perform actions) that are always directed at the event sourced entity. In eGo commands sent to the EventSourcedBehavior are processed in order. When a command is processed, it may result in the generation of events, which are then stored in an event store. Every event persisted has a revision number and timestamp that can help track it. The EventSourcedBehavior in eGo is responsible for defining how to handle events that are the result of command handlers. The end result of events handling is to build the new state of the event sourced entity. When running in cluster mode, aggregate root are sharded.

  • Commands handler: The command handlers define how to handle each incoming command, which validations must be applied, and finally, which events will be persisted if any. When there is no event to be persisted a nil can be returned as a no-op. Command handlers are the meat of the event sourced actor. They encode the business rules of your event sourced actor and act as a guardian of the event sourced entity consistency. The command handler must first validate that the incoming command can be applied to the current model state. Any decision should be solely based on the data passed in the commands and the state of the Behavior. In case of successful validation, one or more events expressing the mutations are persisted. Once the events are persisted, they are applied to the state producing a new valid state.
  • Events handler: The event handlers are used to mutate the state of the event sourced entity by applying the events to it. Event handlers must be pure functions as they will be used when instantiating the event sourced entity and replaying the event store.
Howto

To define an event sourced entity, one needs to:

  1. define the state of the event sourced entity using google protocol buffers message
  2. define the various commands that will be handled by the event sourced entity
  3. define the various events that are result of the command handlers and that will be handled by the event sourced entity to return the new state of the event sourced entity
  4. define and make sure the events store is properly implemented.
  5. implement the EventSourcedBehavior interface.
  6. call the Entity method of eGo engine
Events Stream

Every event handled by event sourced entity are pushed to an events stream. That enables real-time processing of events without having to interact with the events store. Just use Subscribe method of Engine and start iterating through the messages and cast every message to the Event.

Projection

One can add a projection to the eGo engine to help build a read model. Projections in eGo rely on an offset store to track how far they have consumed events persisted by the write model. The offset used in eGo is a timestamp-based offset. One can also:

  • remove a given projection: this will stop the projection and remove it from the system
  • check the status of a given projection
Events Store

One can implement a custom events store. See EventsStore. There are some pre-built events stores one can use out of the box. See Contrib

Offsets Store

One can implement a custom offsets store. See OffsetStore. There are some pre-built offsets stores one can use out of the box. See Contrib

Durable State Behavior

The DurableStateBehavior represents a type of Actor that persists its full state after processing each command instead of using event sourcing. This type of Actor keeps its current state in memory during command handling and based upon the command response persists its full state into a durable store. The store can be a SQL or NoSQL database. The whole concept is given the current state of the actor and a command produce a new state with a higher version as shown in this diagram: (State, Command) => State DurableStateBehavior reacts to commands which result in a new version of the actor state. Only the latest version of the actor state is persisted to the durable store. There is no concept of history regarding the actor state since this is not an event sourced actor. However, one can rely on the version number of the actor state and exactly know how the actor state has evolved overtime. DurableStateBehavior version number are numerically incremented by the command handler which means it is imperative that the newer version of the state is greater than the current version by one. DurableStateBehavior will attempt to recover its state whenever available from the durable state. During a normal shutdown process, it will persist its current state to the durable store prior to shutting down. This behavior help maintain some consistency across the actor state evolution.

Durable Store

One can implement a custom state store. See Durable Store. There are some pre-built durable stores one can use out of the box. See Contrib

Howto

To define a durable state entity, one needs to:

  1. define the state of the entity using google protocol buffers message
  2. define the various commands that will be handled by the entity
  3. define and make sure the durable state store is properly implemented.
  4. implements the DurableStateBehavior interface
  5. start eGo engine with the option durable store using WithStateStore
  6. call the DurableStateEntity method of eGo engine
Events Stream

DurableStateBehavior full state is pushed to an events stream. That enables real-time processing of state without having to interact with the state store. Just use Subscribe method of Engine and start iterating through the messages and cast every message to the DurableState.

Cluster

The cluster mode heavily relies on Go-Akt clustering. To enable clustering one need to use WithCluster option when creating the eGo engine.

Testkit

eGo comes bundle with in-memory datastore that can be found in the testkit package. This can help play with eGo.

Mocks

eGo ships in some mocks that can help mock the data stores for unit tests purpose.

Examples

Check the examples

Sample

package main

import (
	"context"
	"errors"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/google/uuid"
	"google.golang.org/protobuf/proto"

	"github.com/tochemey/ego/v3"
	samplepb "github.com/tochemey/ego/v3/example/pbs/sample/pb/v1"
	"github.com/tochemey/ego/v3/testkit"
)

func main() {
	// create the go context
	ctx := context.Background()
	// create the event store
	eventStore := testkit.NewEventsStore()
	// connect the event store
	_ = eventStore.Connect(ctx)
	// create the ego engine
	engine := ego.NewEngine("Sample", eventStore)
	// start ego engine
	_ = engine.Start(ctx)
	// create a persistence id
	entityID := uuid.NewString()
	// create an entity behavior with a given id
	behavior := NewAccountBehavior(entityID)
	// create an entity
	_ = engine.Entity(ctx, behavior)

	// send some commands to the pid
	var command proto.Message
	// create an account
	command = &samplepb.CreateAccount{
		AccountId:      entityID,
		AccountBalance: 500.00,
	}
	// send the command to the actor. Please don't ignore the error in production grid code
	reply, _, _ := engine.SendCommand(ctx, entityID, command, time.Minute)
	account := reply.(*samplepb.Account)
	log.Printf("current balance on opening: %v", account.GetAccountBalance())

	// send another command to credit the balance
	command = &samplepb.CreditAccount{
		AccountId: entityID,
		Balance:   250,
	}

	reply, _, _ = engine.SendCommand(ctx, entityID, command, time.Minute)
	account = reply.(*samplepb.Account)
	log.Printf("current balance after a credit of 250: %v", account.GetAccountBalance())

	// capture ctrl+c
	interruptSignal := make(chan os.Signal, 1)
	signal.Notify(interruptSignal, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
	<-interruptSignal

	// disconnect the event store
	_ = eventStore.Disconnect(ctx)
	// stop the actor system
	_ = engine.Stop(ctx)
	os.Exit(0)
}

// AccountBehavior implements EventSourcedBehavior
type AccountBehavior struct {
	id string
}

// make sure that AccountBehavior is a true persistence behavior
var _ ego.EventSourcedBehavior = &AccountBehavior{}

// NewAccountBehavior creates an instance of AccountBehavior
func NewAccountBehavior(id string) *AccountBehavior {
	return &AccountBehavior{id: id}
}

// ID returns the id
func (a *AccountBehavior) ID() string {
	return a.id
}

// InitialState returns the initial state
func (a *AccountBehavior) InitialState() ego.State {
	return ego.State(new(samplepb.Account))
}

// HandleCommand handles every command that is sent to the persistent behavior
func (a *AccountBehavior) HandleCommand(_ context.Context, command ego.Command, _ ego.State) (events []ego.Event, err error) {
	switch cmd := command.(type) {
	case *samplepb.CreateAccount:
		// TODO in production grid app validate the command using the prior state
		return []ego.Event{
			&samplepb.AccountCreated{
				AccountId:      cmd.GetAccountId(),
				AccountBalance: cmd.GetAccountBalance(),
			},
		}, nil

	case *samplepb.CreditAccount:
		// TODO in production grid app validate the command using the prior state
		return []ego.Event{
			&samplepb.AccountCredited{
				AccountId:      cmd.GetAccountId(),
				AccountBalance: cmd.GetBalance(),
			},
		}, nil

	default:
		return nil, errors.New("unhandled command")
	}
}

// HandleEvent handles every event emitted
func (a *AccountBehavior) HandleEvent(_ context.Context, event ego.Event, priorState ego.State) (state ego.State, err error) {
	switch evt := event.(type) {
	case *samplepb.AccountCreated:
		return &samplepb.Account{
			AccountId:      evt.GetAccountId(),
			AccountBalance: evt.GetAccountBalance(),
		}, nil

	case *samplepb.AccountCredited:
		account := priorState.(*samplepb.Account)
		bal := account.GetAccountBalance() + evt.GetAccountBalance()
		return &samplepb.Account{
			AccountId:      evt.GetAccountId(),
			AccountBalance: bal,
		}, nil

	default:
		return nil, errors.New("unhandled event")
	}
}

Contribution

kindly follow the instructions in the contribution doc

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrEngineNotStarted is returned when the eGo engine has not started
	ErrEngineNotStarted = errors.New("eGo engine has not started")
	// ErrUndefinedEntityID is returned when sending a command to an undefined entity
	ErrUndefinedEntityID = errors.New("eGo entity id is not defined")
	// ErrCommandReplyUnmarshalling is returned when unmarshalling command reply failed
	ErrCommandReplyUnmarshalling = errors.New("failed to parse command reply")
	// ErrDurableStateStoreRequired is returned when the eGo engine durable store is not set
	ErrDurableStateStoreRequired = errors.New("durable state store is required")
)

Functions

This section is empty.

Types

type Command

type Command proto.Message

type DurableStateBehavior added in v3.2.0

type DurableStateBehavior interface {
	// ID defines the id that will be used in the event journal.
	// This helps track the entity in the events store.
	ID() string
	// InitialState returns the durable state actor initial state.
	// This is set as the initial state when there are no snapshots found the entity
	InitialState() State
	// HandleCommand processes every command sent to the DurableStateBehavior. One needs to use the command, the priorVersion and the priorState sent to produce a newState and newVersion.
	// This defines how to handle each incoming command, which validations must be applied, and finally, whether a resulting state will be persisted depending upon the response.
	// They encode the business rules of your durable state actor and act as a guardian of the actor consistency.
	// The command handler must first validate that the incoming command can be applied to the current model state.
	// Any decision should be solely based on the data passed in the command, the priorVersion and the priorState.
	// In case of successful validation and processing , the new state will be stored in the durable store depending upon response.
	// The actor state will be updated with the newState only if the newVersion is 1 more than the already existing state.
	HandleCommand(ctx context.Context, command Command, priorVersion uint64, priorState State) (newState State, newVersion uint64, err error)
}

DurableStateBehavior represents a type of Actor that persists its full state after processing each command instead of using event sourcing. This type of Actor keeps its current state in memory during command handling and based upon the command response persists its full state into a durable store. The store can be a SQL or NoSQL database. The whole concept is given the current state of the actor and a command produce a new state with a higher version as shown in this diagram: (State, Command) => State DurableStateBehavior reacts to commands which result in a new version of the actor state. Only the latest version of the actor state is persisted to the durable store. There is no concept of history regarding the actor state since this is not an event sourced actor. However, one can rely on the version number of the actor state and exactly know how the actor state has evolved overtime. State actor version number are numerically incremented by the command handler which means it is imperative that the newer version of the state is greater than the current version by one.

DurableStateBehavior will attempt to recover its state whenever available from the durable state. During a normal shutdown process, it will persist its current state to the durable store prior to shutting down. This behavior help maintain some consistency across the actor state evolution.

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine represents the engine that empowers the various entities

func NewEngine

func NewEngine(name string, eventsStore persistence.EventsStore, opts ...Option) *Engine

NewEngine creates and initializes a new instance of the eGo engine.

This function constructs an engine with the specified name and event store, applying any additional configuration options. The engine serves as the core for managing event-sourced entities, durable state entities, and projections.

Parameters:

  • name: A unique identifier for the engine instance.
  • eventsStore: The event store responsible for persisting and retrieving events.
  • opts: Optional configurations to customize engine behavior.

Returns:

  • A pointer to the newly created Engine instance.

func (*Engine) AddProjection

func (engine *Engine) AddProjection(ctx context.Context, name string, handler projection.Handler, offsetStore offsetstore.OffsetStore, opts ...projection.Option) error

AddProjection registers a new projection with the eGo engine and starts its execution.

The projection processes events from the events store applying the specified handler to manage state updates based on incoming events. The provided offset store ensures the projection maintains its processing position across restarts.

Key behavior:

  • Projections once created, will persist for the entire lifespan of the running eGo system.

Parameters:

  • ctx: Execution context used for cancellation and deadlines.
  • name: A unique identifier for the projection.
  • handler: The event handler responsible for processing events and updating the projection state.
  • offsetStore: The storage mechanism for tracking the last processed event, ensuring resumability.
  • opts: Optional configuration settings that modify projection behavior.

Returns an error if the projection fails to start due to misconfiguration or underlying system issues.

func (*Engine) DurableStateEntity added in v3.2.0

func (engine *Engine) DurableStateEntity(ctx context.Context, behavior DurableStateBehavior, opts ...SpawnOption) error

DurableStateEntity creates an entity that persists its full state in a durable store without maintaining historical event records.

Unlike an event-sourced entity, a durable state entity does not track past state changes as a sequence of events. Instead, it directly stores and updates its current state in a durable store.

Key Behavior:

  • Commands: The entity receives non-persistent commands that are validated before being processed.
  • Validation: This can range from simple field checks to complex interactions with external services.
  • State Updates: If validation succeeds, a new state is derived from the command, persisted, and then applied to update the entity’s state.
  • Persistence: The latest state is always stored, ensuring that only the most recent version is retained.
  • Recovery: Upon restart, the entity reloads its last persisted state, rather than replaying a sequence of past events.
  • Shutdown Handling: During a normal shutdown, the entity ensures that its current state is persisted before termination.

To interact with the entity, use SendCommand to send commands and update the durable state.

Parameters:

  • ctx: Execution context for controlling the entity’s lifecycle.
  • behavior: Defines the entity’s behavior, including command handling and state transitions.
  • opts: Additional spawning options to configure entity behavior.

Returns an error if the entity fails to initialize or encounters an issue during execution.

func (*Engine) Entity

func (engine *Engine) Entity(ctx context.Context, behavior EventSourcedBehavior, opts ...SpawnOption) error

Entity creates an event-sourced entity that persists its state by storing a history of events.

This entity follows the event sourcing pattern, where changes to its state are driven by events rather than direct mutations. It processes incoming commands, validates them, generates corresponding events upon successful validation, and persists those events before applying them to update its state.

Key Behavior:

  • Commands: The entity receives commands (non-persistent messages) that are validated before being processed.
  • Validation: This can range from simple field checks to interactions with external services.
  • Events: If validation succeeds, events are derived from the command, persisted in the event store, and then used to update the entity’s state.
  • Recovery: During recovery, only persisted events are replayed to rebuild the entity’s state, ensuring deterministic behavior.
  • Command vs. Event: Commands may be rejected if they are invalid, while events—once persisted—cannot fail during replay.

If no new events are generated, the entity simply returns its current state. To interact with the entity, use SendCommand to send commands to a durable event-sourced entity.

Parameters:

  • ctx: Execution context for controlling the lifecycle of the entity.
  • behavior: Defines the entity’s event-sourced behavior, including command handling and state transitions.
  • opts: Additional spawning options to configure entity behavior.

Returns an error if the entity fails to initialize or encounters an issue during execution.

func (*Engine) IsProjectionRunning added in v3.1.1

func (engine *Engine) IsProjectionRunning(ctx context.Context, name string) (bool, error)

IsProjectionRunning checks whether the specified projection is currently active and running.

This function returns `true` if the projection identified by `name` is running. However, callers should always check the returned error to ensure the result is valid, as an error may indicate an inability to determine the projection's status.

Parameters:

  • ctx: Execution context for managing timeouts and cancellations.
  • name: The unique identifier of the projection.

Returns:

  • A boolean indicating whether the projection is running (`true` if active, `false` otherwise).
  • An error if the status check fails, which may result in a false negative.

func (*Engine) RemoveProjection added in v3.1.1

func (engine *Engine) RemoveProjection(ctx context.Context, name string) error

RemoveProjection stops and removes the specified projection from the engine.

This function gracefully shuts down the projection identified by `name` and removes it from the system. Any in-progress processing will be stopped, and the projection will no longer receive events.

Parameters:

  • ctx: Execution context for managing cancellation and timeouts.
  • name: The unique identifier of the projection to be removed.

Returns:

  • An error if the projection fails to stop or does not exist; otherwise, nil.

func (*Engine) SendCommand

func (engine *Engine) SendCommand(ctx context.Context, entityID string, cmd Command, timeout time.Duration) (resultingState State, revision uint64, err error)

SendCommand sends a command to the specified entity and processes its response.

This function dispatches a command to an entity identified by `entityID`. The entity validates the command, applies any necessary state changes, and persists the resulting state if applicable. The function returns the updated state, a revision number, or an error if the operation fails.

Behavior:

  • If the command is successfully processed and results in a state update, the new state and its revision number are returned.
  • If no state update occurs (i.e., no event is persisted or the command does not trigger a change), `nil` is returned.
  • If an error occurs during processing, the function returns a non-nil error.

Parameters:

  • ctx: Execution context for handling timeouts and cancellations.
  • entityID: The unique identifier of the target entity.
  • cmd: The command to be processed by the entity.
  • timeout: The duration within which the command must be processed before timing out.

Returns:

  • resultingState: The updated state of the entity after handling the command, or `nil` if no state change occurred.
  • revision: A monotonically increasing revision number representing the persisted state version.
  • err: An error if the command processing fails.

func (*Engine) Start

func (engine *Engine) Start(ctx context.Context) error

Start initializes and starts eGo engine.

This function launches the engine, enabling it to manage event-sourced entities, durable state entities, and projections. It ensures that all necessary components are initialized and ready to process commands and events.

Parameters:

  • ctx: Execution context for managing startup behavior and handling cancellations.

Returns:

  • An error if the engine fails to start due to misconfiguration or system issues; otherwise, nil.

func (*Engine) Started added in v3.1.1

func (engine *Engine) Started() bool

Started returns true when the eGo engine has started

func (*Engine) Stop

func (engine *Engine) Stop(ctx context.Context) error

Stop gracefully shuts down the eGo engine.

This function ensures that all running projections, entities, and processes managed by the engine are properly stopped before termination. It waits for ongoing operations to complete or time out based on the provided context.

Parameters:

  • ctx: Execution context for managing cancellation and timeouts.

Returns:

  • An error if the shutdown process encounters issues; otherwise, nil.

func (*Engine) Subscribe

func (engine *Engine) Subscribe() (eventstream.Subscriber, error)

Subscribe creates an events subscriber

type Event

type Event proto.Message

type EventSourcedBehavior added in v3.2.0

type EventSourcedBehavior interface {
	// ID defines the id that will be used in the event journal.
	// This helps track the entity in the events store.
	ID() string
	// InitialState returns the event sourced actor initial state.
	// This is set as the initial state when there are no snapshots found the entity
	InitialState() State
	// HandleCommand helps handle commands received by the event sourced actor. The command handlers define how to handle each incoming command,
	// which validations must be applied, and finally, which events will be persisted if any. When there is no event to be persisted a nil can
	// be returned as a no-op. Command handlers are the meat of the event sourced actor.
	// They encode the business rules of your event sourced actor and act as a guardian of the event sourced actor consistency.
	// The command must first validate that the incoming command can be applied to the current model state.
	//  Any decision should be solely based on the data passed in the commands and the state of the Behavior.
	// In case of successful validation, one or more events expressing the mutations are persisted.
	// Once the events are persisted, they are applied to the state producing a new valid state.
	// Every event emitted are processed one after the other in the same order they were emitted to guarantee consistency.
	// It is at the discretion of the application developer to know in which order a given command should return the list of events
	// This is really powerful when a command needs to return two events. For instance, an OpenAccount command can result in two events: one is AccountOpened and the second is AccountCredited
	HandleCommand(ctx context.Context, command Command, priorState State) (events []Event, err error)
	// HandleEvent handle events emitted by the command handlers. The event handlers are used to mutate the state of the event sourced actor by applying the events to it.
	// Event handlers must be pure functions as they will be used when instantiating the event sourced actor and replaying the event journal.
	HandleEvent(ctx context.Context, event Event, priorState State) (state State, err error)
}

EventSourcedBehavior defines an event sourced behavior when modeling a CQRS EventSourcedBehavior.

type Option

type Option interface {
	// Apply sets the Option value of a config.
	Apply(e *Engine)
}

Option is the interface that applies a configuration option.

func WithCluster

func WithCluster(provider discovery.Provider, partitionCount uint64, minimumPeersQuorum uint16, host string, remotingPort, gossipPort, peersPort int) Option

WithCluster enables cluster mode

func WithLogger

func WithLogger(logger log.Logger) Option

WithLogger sets the logger

func WithStateStore added in v3.2.0

func WithStateStore(stateStore persistence.StateStore) Option

WithStateStore sets the durable store. This is necessary when creating a durable state entity

type OptionFunc

type OptionFunc func(e *Engine)

OptionFunc implements the Option interface.

func (OptionFunc) Apply

func (f OptionFunc) Apply(e *Engine)

Apply applies the options to Engine

type SpawnOption added in v3.3.2

type SpawnOption interface {
	// Apply sets the Option value of a config.
	Apply(config *spawnConfig)
}

SpawnOption is the interface that applies to

func WithPassivateAfter added in v3.3.2

func WithPassivateAfter(after time.Duration) SpawnOption

WithPassivateAfter sets a custom duration after which an idle actor will be passivated. Passivation allows the actor system to free up resources by stopping actors that have been inactive for the specified duration. If the actor receives a message before this timeout, the passivation timer is reset.

type State

type State proto.Message

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL