eventsourcing

package
v0.3.6-dev51 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 21, 2022 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	AnyRole  Role  = "*"
	AnyScope Scope = "*"
)

Variables

This section is empty.

Functions

func NewProtoFromEvent

func NewProtoFromEvent(storeEvent Event) *esApi.Event

NewProtoFromEvent converts Event to proto events

Types

type Aggregate

type Aggregate interface {
	CommandHandler

	// Type is the type of the aggregate that the event can be applied to.
	Type() AggregateType
	// ID is the id of the aggregate that the event should be applied to.
	ID() uuid.UUID
	// Version is the version of the aggregate.
	Version() uint64
	// SetDeleted sets the deleted flag of the aggregate to true
	SetDeleted(bool)
	// Deleted indicates whether the aggregate resource has been deleted
	Deleted() bool
	// UncommittedEvents returns outstanding events that need to persisted. They are cleared on reading them.
	UncommittedEvents() []Event
	// ApplyEvent applies an Event on the aggregate.
	ApplyEvent(Event) error
	// IncrementVersion increments the version of the Aggregate.
	IncrementVersion()
	// Exists returns if the version of the aggregate is >0
	Exists() bool
	// contains filtered or unexported methods
}

Aggregate is the interface definition for all aggregates

type AggregateRegistry

type AggregateRegistry interface {
	RegisterAggregate(func() Aggregate)
	CreateAggregate(AggregateType) (Aggregate, error)
}
var DefaultAggregateRegistry AggregateRegistry

func NewAggregateRegistry

func NewAggregateRegistry() AggregateRegistry

NewAggregateRegistry creates a new aggregate registry

type AggregateStore

type AggregateStore interface {
	// Get returns the most recent version of all aggregate of a given type.
	All(context.Context, AggregateType) ([]Aggregate, error)

	// Get returns the most recent version of an aggregate.
	Get(context.Context, AggregateType, uuid.UUID) (Aggregate, error)

	// Update stores all in-flight events for an aggregate.
	Update(context.Context, Aggregate) error
}

AggregateStore handles storing and loading Aggregates

func NewAggregateManager

func NewAggregateManager(aggregateRegistry AggregateRegistry, eventStoreClient esApi.EventStoreClient) AggregateStore

NewAggregateManager creates a new AggregateHandler which loads/updates Aggregates with the given EventStore.

type AggregateType

type AggregateType string

AggregateType is the type of an aggregate, used as its unique identifier.

func (AggregateType) String

func (t AggregateType) String() string

String returns the string representation of an AggregateType.

type BaseAggregate

type BaseAggregate struct {
	// contains filtered or unexported fields
}

BaseAggregate is the base implementation for all aggregates

func NewBaseAggregate

func NewBaseAggregate(t AggregateType) *BaseAggregate

NewBaseAggregate creates an aggregate.

func (*BaseAggregate) AppendEvent

func (a *BaseAggregate) AppendEvent(ctx context.Context, eventType EventType, eventData EventData) Event

AppendEvent appends an event to the events the aggregate was build upon.

func (*BaseAggregate) Deleted

func (a *BaseAggregate) Deleted() bool

Deleted implements the Deleted method of the Aggregate interface.

func (*BaseAggregate) Exists

func (a *BaseAggregate) Exists() bool

Exists returns if the version of the aggregate is >0

func (*BaseAggregate) ID

func (a *BaseAggregate) ID() uuid.UUID

ID implements the ID method of the Aggregate interface.

func (*BaseAggregate) IncrementVersion

func (a *BaseAggregate) IncrementVersion()

IncrementVersion implements the IncrementVersion method of the Aggregate interface.

func (*BaseAggregate) SetDeleted

func (a *BaseAggregate) SetDeleted(deleted bool)

SetDeleted implements the SetDeleted method of the Aggregate interface.

func (*BaseAggregate) Type

func (a *BaseAggregate) Type() AggregateType

Type implements the Type method of the Aggregate interface.

func (*BaseAggregate) UncommittedEvents

func (a *BaseAggregate) UncommittedEvents() []Event

UncommittedEvents implements the UncommittedEvents method of the Aggregate interface.

func (*BaseAggregate) Version

func (a *BaseAggregate) Version() uint64

Version implements the Version method of the Aggregate interface.

type BaseCommand

type BaseCommand struct {
	// contains filtered or unexported fields
}

BaseCommand is the base implementation for all commands

func NewBaseCommand

func NewBaseCommand(id uuid.UUID, aggregateType AggregateType, commandType CommandType) *BaseCommand

NewBaseCommand creates a command.

func (*BaseCommand) AggregateID

func (c *BaseCommand) AggregateID() uuid.UUID

AggregateID returns the ID of the aggregate that the command should be handled by.

func (*BaseCommand) AggregateType

func (c *BaseCommand) AggregateType() AggregateType

AggregateType returns the type of the aggregate that the command can be handled by.

func (*BaseCommand) CommandType

func (c *BaseCommand) CommandType() CommandType

CommandType returns the type of the command.

type Command

type Command interface {
	// AggregateID returns the ID of the aggregate that the command should be
	// handled by.
	AggregateID() uuid.UUID

	// AggregateType returns the type of the aggregate that the command can be
	// handled by.
	AggregateType() AggregateType

	// CommandType returns the type of the command.
	CommandType() CommandType

	// SetData sets type specific additional data.
	SetData(*anypb.Any) error

	// Policies returns the Role/Scope/Resource combination allowed to execute.
	Policies(ctx context.Context) []Policy
}

Command is a domain command that is executed by a CommandHandler.

A command name should 1) be in present tense and 2) contain the intent (CreateTenant, AddRoleToUser).

The command should contain all the data needed when handling it as fields.

type CommandHandler

type CommandHandler interface {
	// HandleCommand handles a command.
	HandleCommand(context.Context, Command) (*CommandReply, error)
}

CommandHandler is an interface that all handlers of commands must implement.

func UseCommandHandlerMiddleware

func UseCommandHandlerMiddleware(h CommandHandler, middleware ...CommandHandlerMiddleware) CommandHandler

UseCommandHandlerMiddleware wraps a CommandHandler in one or more middlewares.

type CommandHandlerMiddleware

type CommandHandlerMiddleware func(CommandHandler) CommandHandler

CommandHandlerMiddleware is a function that middlewares can implement to be able to chain.

type CommandRegistry

type CommandRegistry interface {
	CommandHandler
	RegisterCommand(func(uuid.UUID) Command)
	CreateCommand(id uuid.UUID, commandType CommandType, data *anypb.Any) (Command, error)
	GetRegisteredCommandTypes() []CommandType
	SetHandler(handler CommandHandler, commandType CommandType)
}
var DefaultCommandRegistry CommandRegistry

func NewCommandRegistry

func NewCommandRegistry() CommandRegistry

newCommandRegistry creates a new command registry

type CommandReply

type CommandReply struct {
	Id      uuid.UUID
	Version uint64
}

CommandReply is a internal representation of the gRPC struct to avoid constant parsing and rendering of UUIDs

type CommandType

type CommandType string

CommandType is the type of a command, used as its unique identifier.

func (CommandType) String

func (c CommandType) String() string

String returns the string representation of a command type.

type Event

type Event interface {
	// EventType is the type of the event.
	EventType() EventType
	// Timestamp of when the event was created.
	Timestamp() time.Time
	// AggregateType is the type of the aggregate that the event can be applied to.
	AggregateType() AggregateType
	// AggregateID is the id of the aggregate that the event should be applied to.
	AggregateID() uuid.UUID
	// AggregateVersion is the version of the aggregate.
	AggregateVersion() uint64
	// Event type specific event data.
	Data() EventData
	// Metadata is app-specific metadata originating user etc. when this event has been stored.
	Metadata() map[string]string
	// A string representation of the event.
	String() string
}

Event describes anything that has happened in the system. An event type name should be in past tense and contain the intent (TenantUpdated). The event should contain all the data needed when applying/handling it. The combination of AggregateType, AggregateID and AggregateVersion is unique.

func NewEvent

func NewEvent(ctx context.Context, eventType EventType, data EventData, timestamp time.Time,
	aggregateType AggregateType, aggregateID uuid.UUID, aggregateVersion uint64) Event

NewEvent creates a new event.

func NewEventFromProto

func NewEventFromProto(protoEvent *esApi.Event) (Event, error)

NewEventFromProto converts proto events to Event

func NewEventWithMetadata

func NewEventWithMetadata(eventType EventType, data EventData, timestamp time.Time,
	aggregateType AggregateType, aggregateID uuid.UUID, aggregateVersion uint64, metadata map[string]string) Event

NewEvent creates a new event with metadata attached.

type EventBusConnector

type EventBusConnector interface {
	// Close closes the underlying connections
	Close() error
}

EventBusConnector can open and close connections.

type EventBusConsumer

type EventBusConsumer interface {
	EventBusConnector
	// Matcher returns a new implementation specific matcher.
	Matcher() EventMatcher
	// AddHandler adds a handler for events matching one of the given EventMatcher.
	AddHandler(context.Context, EventHandler, ...EventMatcher) error
	// AddWorker behave similar to AddHandler but distributes events among the handlers with the same
	// work queue name according to the competing consumers pattern.
	AddWorker(context.Context, EventHandler, string, ...EventMatcher) error
}

EventBusConsumer notifies registered handlers on incoming events on the underlying message bus.

type EventBusPublisher

type EventBusPublisher interface {
	EventBusConnector
	// PublishEvent publishes the event on the bus.
	PublishEvent(context.Context, Event) error
}

EventBusPublisher publishes events on the underlying message bus.

type EventData

type EventData []byte

EventData is any additional data for an event. Internally this is represented by protojson.

func ToEventDataFromProto

func ToEventDataFromProto(m proto.Message) EventData

ToEventDataFromProto marshalls m into EventData.

func (EventData) ToProto

func (d EventData) ToProto(m proto.Message) error

ToProto unmarshals the contents the EventData into m.

func (EventData) Unmarshal added in v0.3.5

func (d EventData) Unmarshal() (proto.Message, error)

Unmarshal deserializes the EventData into a proto message using a type resolved from the type URL.

type EventHandler

type EventHandler interface {
	// HandleEvent handles an event.
	HandleEvent(context.Context, Event) error
}

EventHandler is an interface that all handlers of events must implement.

func UseEventHandlerMiddleware

func UseEventHandlerMiddleware(h EventHandler, middleware ...EventHandlerMiddleware) EventHandler

UseEventHandlerMiddleware wraps an EventHandler in one or more middlewares.

type EventHandlerMiddleware

type EventHandlerMiddleware func(EventHandler) EventHandler

EventHandlerMiddleware is a function that middlewares can implement to be able to chain.

type EventMatcher

type EventMatcher interface {
	// Any matches any event.
	Any() EventMatcher
	// MatchEventType matches a specific event type.
	MatchEventType(eventType EventType) EventMatcher
	// MatchAggregate matches a specific aggregate type.
	MatchAggregateType(aggregateType AggregateType) EventMatcher
}

EventMatcher is an interface used to define what events should be consumed

type EventStore

type EventStore interface {
	// Open connects to the bus
	Open(context.Context) error

	// Save appends all events in the event stream to the store.
	Save(context.Context, []Event) error

	// Load loads all events for the query from the store.
	Load(context.Context, *StoreQuery) (EventStreamReceiver, error)

	// LoadOr loads all events by combining the queries with the logical OR from the store.
	LoadOr(context.Context, []*StoreQuery) (EventStreamReceiver, error)

	// Close closes the underlying connections
	Close() error
}

EventStore is an interface for an event storage backend.

type EventStream

type EventStream interface {
	EventStreamSender
	EventStreamReceiver
}

EventStream is an interface for handling asynchronous result sending/receiving from the EventStore

func NewEventStream

func NewEventStream() EventStream

NewEventStream returns an implementation for the EventStream interface

type EventStreamReceiver

type EventStreamReceiver interface {
	Receive() (Event, error)
}

type EventStreamSender

type EventStreamSender interface {
	Send(Event)
	Error(error)
	Done()
}

type EventType

type EventType string

EventType is the type of an event, used as its unique identifier.

func (EventType) String

func (t EventType) String() string

String returns the string representation of an EventType.

type MetadataManager

type MetadataManager interface {
	// GetContext returns a new context enriched with the metadata of this manager.
	GetContext() context.Context

	// GetMetadata returns the metadata of this manager.
	GetMetadata() map[string]string

	// SetMetadata sets the metadata of this manager.
	SetMetadata(map[string]string) MetadataManager

	// Get returns the information stored for the key.
	Get(string) (string, bool)
	GetObject(string, interface{}) error

	// Set stores information for the key.
	Set(string, string) MetadataManager
	SetObject(string, interface{}) error
}

MetadataManager is an interface for a storage of metadata. It can be used to easily store any metadata in the context of a call.

func NewMetadataManagerFromContext

func NewMetadataManagerFromContext(ctx context.Context) MetadataManager

type Policy

type Policy interface {
	// Role returns the role this policy accepts.
	Role() Role
	// Scope returns the scope this policy accepts.
	Scope() Scope

	// WithRole sets the role this policy accepts to the given value.
	WithRole(Role) Policy
	// WithScope sets the scope this policy accepts to the given value.
	WithScope(Scope) Policy

	// AcceptsRole checks if the policy accepts the given role.
	AcceptsRole(Role) bool
	// AcceptsScope checks if the policy accepts the given scope.
	AcceptsScope(Scope) bool

	// String returns a string representation of the policy.
	String() string
}

func NewPolicy

func NewPolicy() Policy

NewPolicy creates a new policy which accepts anything.

type Projection

type Projection interface {
	// ID returns the ID of the Projection.
	ID() uuid.UUID
	// Version returns the version of the aggregate this Projection is based upon.
	Version() uint64
	// IncrementVersion increments the Version of the Projection.
	IncrementVersion()
}

Projection is the interface for projections.

type Projector

type Projector interface {
	// NewProjection creates a new Projection of the type the Projector projects.
	NewProjection(uuid.UUID) Projection

	// Project updates the state of the projection according to the given event.
	Project(context.Context, Event, Projection) (Projection, error)
}

Projector is the interface for projectors.

type Reactor

type Reactor interface {
	// HandleEvent handles a given event send 0..* Events through the given channel in reaction or an error.
	// Attention: The reactor is responsible for closing the channel if no further events will be send to that channel.
	HandleEvent(ctx context.Context, event Event, events chan<- Event) error
}

Reactor is the interface for reactors.

type ReadOnlyRepository

type ReadOnlyRepository interface {
	// ById returns a projection for an ID.
	ById(context.Context, uuid.UUID) (Projection, error)

	// All returns all projections in the repository.
	All(context.Context) ([]Projection, error)
}

ReadOnlyRepository is a repository for reading projections.

type Repository

type Repository interface {
	ReadOnlyRepository
	WriteOnlyRepository
}

Repository is a repository for reading and writing projections.

type Role

type Role string

Role is the name of a user's role.

func (Role) String

func (r Role) String() string

type Scope

type Scope string

Scope is the scope of a role.

func (Scope) String

func (s Scope) String() string

type StoreQuery

type StoreQuery struct {
	// Filter events by aggregate id
	AggregateId *uuid.UUID
	// Filter events for a specific aggregate type
	AggregateType *AggregateType
	// Filter events with a Version >= MinVersion
	MinVersion *uint64
	// Filter events with a Version <= MaxVersion
	MaxVersion *uint64
	// Filter events with a Timestamp >= MinTimestamp
	MinTimestamp *time.Time
	// Filter events with a Timestamp <= MaxTimestamp
	MaxTimestamp *time.Time
}

StoreQuery contains query information on how to retrieve events from an event store

type WriteOnlyRepository

type WriteOnlyRepository interface {
	// Upsert saves a projection in the storage or replaces an existing one.
	Upsert(context.Context, Projection) error

	// Remove removes a projection by ID from the storage.
	Remove(context.Context, uuid.UUID) error
}

WriteOnlyRepository is a repository for writing projections.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL