eventsourcing

package
v0.0.0-...-bd477af Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 16, 2024 License: Apache-2.0 Imports: 15 Imported by: 0

README

Ergonats Event Sourcing

Ergonats provides a number of building block components for creating an event-sourced application that works seamlessly within an Ergo supervision tree.

Aggregates

An aggregate validates and potentially rejects command requests. If the request passes validation, then the aggregate can emit events to the stream.

An aggregate determines its own internal state by sequentially applying the appropriate events.

To create your own aggregate, all you need to do is create an aggregate struct:

type BankAccountAggregate struct {
	es.Aggregate
}

and then implement the aggregate behavior interface:

InitAggregate(process *AggregateProcess, args ...etf.Term) (AggregateOptions, error)
ApplyEvent(process *AggregateProcess, state AggregateState, event cloudevents.Event) (AggregateState, error)
HandleCommand(process *AggregateProcess, state AggregateState, cmd Command) ([]cloudevents.Event, error)
  • InitAggregate - parameters are passed to the process during the init phase and your aggregate responds with a set of AggregateOptions.
  • ApplyEvent - given an existing state and a cloud event, returns a new state generation
  • HandleCommand - given an existing state and a command request, returns either an error or a list of events to be emitted.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DeleteState

func DeleteState(nc *nats.Conn, opts *AggregateOptions, key string) error

func NewCloudEvent

func NewCloudEvent(eventType string, entityKey string, rawData interface{}) cloudevents.Event

func StoreState

func StoreState(nc *nats.Conn, opts *AggregateOptions, key string, state AggregateState) error

Types

type Aggregate

type Aggregate struct {
	ergonats.PullConsumer
}

func (*Aggregate) HandleMessage

func (a *Aggregate) HandleMessage(process *ergonats.PullConsumerProcess, msg jetstream.Msg) error

func (*Aggregate) InitPullConsumer

func (a *Aggregate) InitPullConsumer(
	process *ergonats.PullConsumerProcess,
	args ...etf.Term) (*ergonats.PullConsumerOptions, error)

type AggregateBehavior

type AggregateBehavior interface {
	ergonats.PullConsumerBehavior

	InitAggregate(process *AggregateProcess, args ...etf.Term) (AggregateOptions, error)
	ApplyEvent(process *AggregateProcess, state AggregateState, event cloudevents.Event) (*AggregateState, error)
	HandleCommand(process *AggregateProcess, state AggregateState, cmd Command) ([]cloudevents.Event, error)
}

type AggregateMiddleware

type AggregateMiddleware interface {
	ExecMiddleware(*AggregateState, *Command) error
}

type AggregateOptions

type AggregateOptions struct {
	Logger                 *slog.Logger
	Connection             *nats.Conn
	JsDomain               string
	ServiceVersion         string
	CommandSubjectPrefix   string
	EventSubjectPrefix     string
	StreamName             string
	AcceptedCommands       []string
	StateStoreBucketName   string
	StateStoreMaxValueSize int
	StateStoreMaxBytes     int
	AggregateName          string
	Middleware             []AggregateMiddleware
}

type AggregateProcess

type AggregateProcess struct {
	ergonats.PullConsumerProcess
	// contains filtered or unexported fields
}

type AggregateState

type AggregateState struct {
	Version uint64          `json:"version"`
	Key     string          `json:"key"`
	Data    json.RawMessage `json:"data,omitempty"`
}

func LoadState

func LoadState(nc *nats.Conn, opts *AggregateOptions, key string) (*AggregateState, error)

type Command

type Command struct {
	Type     string            `json:"type"`
	Data     []byte            `json:"-"`
	Metadata map[string]string `json:"-"`
}

type CommandReply

type CommandReply struct {
	Accepted bool   `json:"accepted"`
	Message  string `json:"message"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL