Documentation ¶
Index ¶
- Constants
- func NewProtoFromEvent(storeEvent Event) *esApi.Event
- type Aggregate
- type AggregateRegistry
- type AggregateStore
- type AggregateType
- type BaseAggregate
- func (a *BaseAggregate) AppendEvent(ctx context.Context, eventType EventType, eventData EventData) Event
- func (a *BaseAggregate) Deleted() bool
- func (a *BaseAggregate) Exists() bool
- func (a *BaseAggregate) ID() uuid.UUID
- func (a *BaseAggregate) IncrementVersion()
- func (a *BaseAggregate) SetDeleted(deleted bool)
- func (a *BaseAggregate) Type() AggregateType
- func (a *BaseAggregate) UncommittedEvents() []Event
- func (a *BaseAggregate) Version() uint64
- type BaseCommand
- type Command
- type CommandHandler
- type CommandHandlerMiddleware
- type CommandRegistry
- type CommandReply
- type CommandType
- type Event
- type EventBusConnector
- type EventBusConsumer
- type EventBusPublisher
- type EventData
- type EventHandler
- type EventHandlerMiddleware
- type EventMatcher
- type EventStore
- type EventStream
- type EventStreamReceiver
- type EventStreamSender
- type EventType
- type MetadataManager
- type Projection
- type Projector
- type Reactor
- type Repository
- type Role
- type Scope
- type StoreQuery
Constants ¶
const ( AnyRole Role = "*" AnyScope Scope = "*" )
Variables ¶
This section is empty.
Functions ¶
func NewProtoFromEvent ¶
NewProtoFromEvent converts Event to proto events
Types ¶
type Aggregate ¶
type Aggregate interface { CommandHandler // Type is the type of the aggregate that the event can be applied to. Type() AggregateType // ID is the id of the aggregate that the event should be applied to. ID() uuid.UUID // Version is the version of the aggregate. Version() uint64 // SetDeleted sets the deleted flag of the aggregate to true SetDeleted(bool) // Deleted indicates whether the aggregate resource has been deleted Deleted() bool // UncommittedEvents returns outstanding events that need to persisted. They are cleared on reading them. UncommittedEvents() []Event // ApplyEvent applies an Event on the aggregate. ApplyEvent(Event) error // IncrementVersion increments the version of the Aggregate. IncrementVersion() // Exists returns if the version of the aggregate is >0 Exists() bool // contains filtered or unexported methods }
Aggregate is the interface definition for all aggregates
type AggregateRegistry ¶
type AggregateRegistry interface { RegisterAggregate(func() Aggregate) CreateAggregate(AggregateType) (Aggregate, error) }
var DefaultAggregateRegistry AggregateRegistry
func NewAggregateRegistry ¶
func NewAggregateRegistry() AggregateRegistry
NewAggregateRegistry creates a new aggregate registry
type AggregateStore ¶
type AggregateStore interface { // Get returns the most recent version of all aggregate of a given type. All(context.Context, AggregateType) ([]Aggregate, error) // Get returns the most recent version of an aggregate. Get(context.Context, AggregateType, uuid.UUID) (Aggregate, error) // Update stores all in-flight events for an aggregate. Update(context.Context, Aggregate) error }
AggregateStore handles storing and loading Aggregates
func NewAggregateManager ¶
func NewAggregateManager(aggregateRegistry AggregateRegistry, eventStoreClient esApi.EventStoreClient) AggregateStore
NewAggregateManager creates a new AggregateHandler which loads/updates Aggregates with the given EventStore.
type AggregateType ¶
type AggregateType string
AggregateType is the type of an aggregate, used as its unique identifier.
func (AggregateType) String ¶
func (t AggregateType) String() string
String returns the string representation of an AggregateType.
type BaseAggregate ¶
type BaseAggregate struct {
// contains filtered or unexported fields
}
BaseAggregate is the base implementation for all aggregates
func NewBaseAggregate ¶
func NewBaseAggregate(t AggregateType) *BaseAggregate
NewBaseAggregate creates an aggregate.
func (*BaseAggregate) AppendEvent ¶
func (a *BaseAggregate) AppendEvent(ctx context.Context, eventType EventType, eventData EventData) Event
AppendEvent appends an event to the events the aggregate was build upon.
func (*BaseAggregate) Deleted ¶
func (a *BaseAggregate) Deleted() bool
Deleted implements the Deleted method of the Aggregate interface.
func (*BaseAggregate) Exists ¶
func (a *BaseAggregate) Exists() bool
Exists returns if the version of the aggregate is >0
func (*BaseAggregate) ID ¶
func (a *BaseAggregate) ID() uuid.UUID
ID implements the ID method of the Aggregate interface.
func (*BaseAggregate) IncrementVersion ¶
func (a *BaseAggregate) IncrementVersion()
IncrementVersion implements the IncrementVersion method of the Aggregate interface.
func (*BaseAggregate) SetDeleted ¶
func (a *BaseAggregate) SetDeleted(deleted bool)
SetDeleted implements the SetDeleted method of the Aggregate interface.
func (*BaseAggregate) Type ¶
func (a *BaseAggregate) Type() AggregateType
Type implements the Type method of the Aggregate interface.
func (*BaseAggregate) UncommittedEvents ¶
func (a *BaseAggregate) UncommittedEvents() []Event
UncommittedEvents implements the UncommittedEvents method of the Aggregate interface.
func (*BaseAggregate) Version ¶
func (a *BaseAggregate) Version() uint64
Version implements the Version method of the Aggregate interface.
type BaseCommand ¶
type BaseCommand struct {
// contains filtered or unexported fields
}
BaseCommand is the base implementation for all commands
func NewBaseCommand ¶
func NewBaseCommand(id uuid.UUID, aggregateType AggregateType, commandType CommandType) *BaseCommand
NewBaseCommand creates a command.
func (*BaseCommand) AggregateID ¶
func (c *BaseCommand) AggregateID() uuid.UUID
AggregateID returns the ID of the aggregate that the command should be handled by.
func (*BaseCommand) AggregateType ¶
func (c *BaseCommand) AggregateType() AggregateType
AggregateType returns the type of the aggregate that the command can be handled by.
func (*BaseCommand) CommandType ¶
func (c *BaseCommand) CommandType() CommandType
CommandType returns the type of the command.
type Command ¶
type Command interface { // AggregateID returns the ID of the aggregate that the command should be // handled by. AggregateID() uuid.UUID // AggregateType returns the type of the aggregate that the command can be // handled by. AggregateType() AggregateType // CommandType returns the type of the command. CommandType() CommandType // SetData sets type specific additional data. SetData(*anypb.Any) error }
Command is a domain command that is executed by a CommandHandler.
A command name should 1) be in present tense and 2) contain the intent (CreateTenant, AddRoleToUser).
The command should contain all the data needed when handling it as fields.
type CommandHandler ¶
type CommandHandler interface { // HandleCommand handles a command. HandleCommand(context.Context, Command) (*CommandReply, error) }
CommandHandler is an interface that all handlers of commands must implement.
func UseCommandHandlerMiddleware ¶
func UseCommandHandlerMiddleware(h CommandHandler, middleware ...CommandHandlerMiddleware) CommandHandler
UseCommandHandlerMiddleware wraps a CommandHandler in one or more middlewares.
type CommandHandlerMiddleware ¶
type CommandHandlerMiddleware func(CommandHandler) CommandHandler
CommandHandlerMiddleware is a function that middlewares can implement to be able to chain.
type CommandRegistry ¶
type CommandRegistry interface { CommandHandler RegisterCommand(func(uuid.UUID) Command) CreateCommand(id uuid.UUID, commandType CommandType, data *anypb.Any) (Command, error) GetRegisteredCommandTypes() []CommandType SetHandler(handler CommandHandler, commandType CommandType) }
var DefaultCommandRegistry CommandRegistry
func NewCommandRegistry ¶
func NewCommandRegistry() CommandRegistry
newCommandRegistry creates a new command registry
type CommandReply ¶
CommandReply is a internal representation of the gRPC struct to avoid constant parsing and rendering of UUIDs
type CommandType ¶
type CommandType string
CommandType is the type of a command, used as its unique identifier.
func (CommandType) String ¶
func (c CommandType) String() string
String returns the string representation of a command type.
type Event ¶
type Event interface { // EventType is the type of the event. EventType() EventType // Timestamp of when the event was created. Timestamp() time.Time // AggregateType is the type of the aggregate that the event can be applied to. AggregateType() AggregateType // AggregateID is the id of the aggregate that the event should be applied to. AggregateID() uuid.UUID // AggregateVersion is the version of the aggregate. AggregateVersion() uint64 // Event type specific event data. Data() EventData // Metadata is app-specific metadata originating user etc. when this event has been stored. Metadata() map[string]string // A string representation of the event. String() string }
Event describes anything that has happened in the system. An event type name should be in past tense and contain the intent (TenantUpdated). The event should contain all the data needed when applying/handling it. The combination of AggregateType, AggregateID and AggregateVersion is unique.
func NewEvent ¶
func NewEvent(ctx context.Context, eventType EventType, data EventData, timestamp time.Time, aggregateType AggregateType, aggregateID uuid.UUID, aggregateVersion uint64) Event
NewEvent creates a new event.
func NewEventFromProto ¶
NewEventFromProto converts proto events to Event
type EventBusConnector ¶
type EventBusConnector interface { // Close closes the underlying connections Close() error }
EventBusConnector can open and close connections.
type EventBusConsumer ¶
type EventBusConsumer interface { EventBusConnector // Matcher returns a new implementation specific matcher. Matcher() EventMatcher // AddHandler adds a handler for events matching one of the given EventMatcher. AddHandler(context.Context, EventHandler, ...EventMatcher) error // AddWorker behave similar to AddHandler but distributes events among the handlers with the same // work queue name according to the competing consumers pattern. AddWorker(context.Context, EventHandler, string, ...EventMatcher) error }
EventBusConsumer notifies registered handlers on incoming events on the underlying message bus.
type EventBusPublisher ¶
type EventBusPublisher interface { EventBusConnector // PublishEvent publishes the event on the bus. PublishEvent(context.Context, Event) error }
EventBusPublisher publishes events on the underlying message bus.
type EventData ¶
type EventData []byte
EventData is any additional data for an event. Internally this is represented by protojson.
func ToEventDataFromProto ¶
ToEventDataFromProto marshalls m into EventData.
type EventHandler ¶
type EventHandler interface { // HandleEvent handles an event. HandleEvent(context.Context, Event) error }
EventHandler is an interface that all handlers of events must implement.
func UseEventHandlerMiddleware ¶
func UseEventHandlerMiddleware(h EventHandler, middleware ...EventHandlerMiddleware) EventHandler
UseEventHandlerMiddleware wraps an EventHandler in one or more middlewares.
type EventHandlerMiddleware ¶
type EventHandlerMiddleware func(EventHandler) EventHandler
EventHandlerMiddleware is a function that middlewares can implement to be able to chain.
type EventMatcher ¶
type EventMatcher interface { // Any matches any event. Any() EventMatcher // MatchEventType matches a specific event type. MatchEventType(eventType EventType) EventMatcher // MatchAggregate matches a specific aggregate type. MatchAggregateType(aggregateType AggregateType) EventMatcher }
EventMatcher is an interface used to define what events should be consumed
type EventStore ¶
type EventStore interface { // Open connects to the bus Open(context.Context) error // Save appends all events in the event stream to the store. Save(context.Context, []Event) error // Load loads all events for the query from the store. Load(context.Context, *StoreQuery) (EventStreamReceiver, error) // LoadOr loads all events by combining the queries with the logical OR from the store. LoadOr(context.Context, []*StoreQuery) (EventStreamReceiver, error) // Close closes the underlying connections Close() error }
EventStore is an interface for an event storage backend.
type EventStream ¶
type EventStream interface { EventStreamSender EventStreamReceiver }
EventStream is an interface for handling asynchronous result sending/receiving from the EventStore
func NewEventStream ¶
func NewEventStream() EventStream
NewEventStream returns an implementation for the EventStream interface
type EventStreamReceiver ¶
type EventStreamSender ¶
type EventType ¶
type EventType string
EventType is the type of an event, used as its unique identifier.
type MetadataManager ¶
type MetadataManager interface { // GetContext returns a new context enriched with the metadata of this manager. GetContext() context.Context // GetMetadata returns the metadata of this manager. GetMetadata() map[string]string // SetMetadata sets the metadata of this manager. SetMetadata(map[string]string) MetadataManager // Get returns the information stored for the key. Get(string) (string, bool) GetObject(string, interface{}) error // Set stores information for the key. Set(string, string) MetadataManager SetObject(string, interface{}) error }
MetadataManager is an interface for a storage of metadata. It can be used to easily store any metadata in the context of a call.
func NewMetadataManagerFromContext ¶
func NewMetadataManagerFromContext(ctx context.Context) MetadataManager
type Projection ¶
type Projection interface { // ID returns the ID of the Projection. ID() uuid.UUID // Version returns the version of the aggregate this Projection is based upon. Version() uint64 // IncrementVersion increments the Version of the Projection. IncrementVersion() }
Projection is the interface for projections.
type Projector ¶
type Projector[T Projection] interface { // NewProjection creates a new Projection of the type the Projector projects. NewProjection(uuid.UUID) T // Project updates the state of the projection according to the given event. Project(context.Context, Event, T) (T, error) }
Projector is the interface for projectors.
type Reactor ¶
type Reactor interface { // HandleEvent handles a given event send 0..* Events through the given channel in reaction or an error. // Attention: The reactor is responsible for closing the channel if no further events will be send to that channel. HandleEvent(ctx context.Context, event Event, events chan<- Event) error }
Reactor is the interface for reactors.
type Repository ¶
type Repository[T Projection] interface { // ById returns a projection for an ID. ById(context.Context, uuid.UUID) (T, error) // All returns all projections in the repository. All(context.Context) ([]T, error) // Upsert saves a projection in the storage or replaces an existing one. Upsert(context.Context, T) error // Remove removes a projection by ID from the storage. Remove(context.Context, uuid.UUID) error }
Repository is a repository for reading projections.
type StoreQuery ¶
type StoreQuery struct { // Filter events by aggregate id AggregateId *uuid.UUID // Filter events for a specific aggregate type AggregateType *AggregateType // Filter events with a Version >= MinVersion MinVersion *uint64 // Filter events with a Version <= MaxVersion MaxVersion *uint64 // Filter events with a Timestamp >= MinTimestamp MinTimestamp *time.Time // Filter events with a Timestamp <= MaxTimestamp MaxTimestamp *time.Time }
StoreQuery contains query information on how to retrieve events from an event store