Documentation ¶
Index ¶
- func CopyMap(m map[string]interface{}) map[string]interface{}
- func FindStatusInSlice(slice []Status, val Status) (int, bool)
- func ProjectorAlreadyInitialized() string
- func ProjectorFromWasAlreadyCalled() string
- func ProjectorNoHandler() string
- func ProjectorStateNotInitialised() string
- type Aggregate
- type BaseAggregate
- func (a BaseAggregate) AggregateID() uuid.UUID
- func (a *BaseAggregate) Apply(event DomainEvent)
- func (a *BaseAggregate) CallEventHandler(event interface{}, metadata map[string]interface{})
- func (a *BaseAggregate) FromHistory(events DomainEventIterator) error
- func (a *BaseAggregate) PopEvents() []DomainEvent
- func (a *BaseAggregate) RecordThat(event interface{}, metadata map[string]interface{})
- func (a BaseAggregate) Version() int
- type Client
- type DomainEvent
- func (e DomainEvent) AggregateID() uuid.UUID
- func (e DomainEvent) AggregateType() string
- func (e DomainEvent) CreatedAt() time.Time
- func (e DomainEvent) Metadata() map[string]interface{}
- func (e DomainEvent) Name() string
- func (e DomainEvent) Number() int
- func (e DomainEvent) Payload() interface{}
- func (e DomainEvent) SingleMetadata(key string) (interface{}, bool)
- func (e DomainEvent) UUID() uuid.UUID
- func (e DomainEvent) Version() int
- func (e DomainEvent) WithAddedMetadata(name string, value interface{}) DomainEvent
- func (e DomainEvent) WithAggregateType(aType string) DomainEvent
- func (e DomainEvent) WithNumber(number int) DomainEvent
- func (e DomainEvent) WithUUID(uuid uuid.UUID) DomainEvent
- func (e DomainEvent) WithVersion(v int) DomainEvent
- type DomainEventAction
- type DomainEventIterator
- type DomainEventMiddleware
- type EventHandler
- type EventStore
- func (es *EventStore) AppendMiddleware(action DomainEventAction, middleware DomainEventMiddleware)
- func (es *EventStore) AppendTo(ctx context.Context, streamName string, events []DomainEvent) error
- func (es *EventStore) CreateStream(ctx context.Context, streamName string) error
- func (es *EventStore) DeleteStream(ctx context.Context, streamName string) error
- func (es *EventStore) FetchAllStreamNames(ctx context.Context) ([]string, error)
- func (es *EventStore) HasStream(ctx context.Context, streamName string) (bool, error)
- func (es *EventStore) Install(ctx context.Context) error
- func (es *EventStore) Load(ctx context.Context, streamName string, fromNumber, count int, ...) (DomainEventIterator, error)
- func (es *EventStore) MergeAndLoad(ctx context.Context, count int, streams ...LoadStreamParameter) (DomainEventIterator, error)
- type FieldType
- type HandlersCache
- type LoadStreamParameter
- type MetadataMatch
- type MetadataMatcher
- type MetadataOperator
- type MiddlewareIterator
- func (m MiddlewareIterator) Close()
- func (m MiddlewareIterator) Current() (*DomainEvent, error)
- func (m MiddlewareIterator) Error() error
- func (m MiddlewareIterator) IsEmpty() (bool, error)
- func (m MiddlewareIterator) Next() bool
- func (m MiddlewareIterator) Rewind()
- func (m MiddlewareIterator) ToList() ([]DomainEvent, error)
- type PersistenceStrategy
- type ProjectionManager
- type ProjectionNotFound
- type Projector
- func (q *Projector) Delete(ctx context.Context, deleteEmittedEvents bool) error
- func (q *Projector) Emit(ctx context.Context, event DomainEvent) error
- func (q *Projector) FromAll() *Projector
- func (q *Projector) FromStream(streamName string, matcher MetadataMatcher) *Projector
- func (q *Projector) FromStreams(streams ...StreamProjection) *Projector
- func (q *Projector) Init(handler func() interface{}) *Projector
- func (q *Projector) LinkTo(ctx context.Context, streamName string, event DomainEvent) error
- func (q Projector) Name() string
- func (q *Projector) Reset(ctx context.Context) error
- func (q *Projector) Run(ctx context.Context, keepRunning bool) error
- func (q Projector) State() interface{}
- func (q Projector) Status() Status
- func (q *Projector) Stop(ctx context.Context) error
- func (q *Projector) When(handlers map[string]EventHandler) *Projector
- func (q *Projector) WhenAny(handler EventHandler) *Projector
- type Query
- func (q *Query) FromAll() *Query
- func (q *Query) FromStream(streamName string, matcher MetadataMatcher) *Query
- func (q *Query) FromStreams(streams ...StreamProjection) *Query
- func (q *Query) Init(handler func() interface{}) *Query
- func (q *Query) Reset()
- func (q *Query) Run(ctx context.Context) error
- func (q Query) State() interface{}
- func (q *Query) Stop()
- func (q *Query) When(handlers map[string]func(state interface{}, event DomainEvent) interface{}) *Query
- func (q *Query) WhenAny(handler func(state interface{}, event DomainEvent) interface{}) *Query
- type ReadModel
- type ReadModelProjector
- func (q *ReadModelProjector) Delete(ctx context.Context, deleteProjection bool) error
- func (q *ReadModelProjector) FromAll() *ReadModelProjector
- func (q *ReadModelProjector) FromStream(streamName string, matcher MetadataMatcher) *ReadModelProjector
- func (q *ReadModelProjector) FromStreams(streams ...StreamProjection) *ReadModelProjector
- func (q *ReadModelProjector) Init(handler func() interface{}) *ReadModelProjector
- func (q ReadModelProjector) Name() string
- func (q *ReadModelProjector) Reset(ctx context.Context) error
- func (q *ReadModelProjector) Run(ctx context.Context, keepRunning bool) error
- func (q ReadModelProjector) State() interface{}
- func (q ReadModelProjector) Status() Status
- func (q *ReadModelProjector) Stop(ctx context.Context) error
- func (q *ReadModelProjector) When(handlers map[string]EventHandler) *ReadModelProjector
- func (q *ReadModelProjector) WhenAny(handler EventHandler) *ReadModelProjector
- type Repository
- type Status
- type StreamAlreadyExist
- type StreamNotFound
- type StreamProjection
- type TypeCache
- type TypeRegistry
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CopyMap ¶
CopyMap is a helper function to copy a map to create immutable events with all methods
func FindStatusInSlice ¶
FindStatusInSlice returns if the given status is in the given status slice
func ProjectorAlreadyInitialized ¶
func ProjectorAlreadyInitialized() string
ProjectorAlreadyInitialized panics if you call Init on an Projection twice
func ProjectorFromWasAlreadyCalled ¶
func ProjectorFromWasAlreadyCalled() string
ProjectorFromWasAlreadyCalled panics if you call more then one of the available From Methods (FromStream, FromAll, FromStreams)
func ProjectorNoHandler ¶ added in v1.0.0
func ProjectorNoHandler() string
ProjectorNoHandler panics if you run a Projection without defining a Handler
func ProjectorStateNotInitialised ¶
func ProjectorStateNotInitialised() string
ProjectorStateNotInitialised panics if you don't call Init to initialise the ProjectionState
Types ¶
type Aggregate ¶
type Aggregate interface { PopEvents() []DomainEvent AggregateID() uuid.UUID FromHistory(events DomainEventIterator) error }
Aggregate define the Basic Methods required by any Implementation
type BaseAggregate ¶
type BaseAggregate struct {
// contains filtered or unexported fields
}
BaseAggregate provide basic functionallity to simplify Aggregate handling Like creation, recreation and loading of Aggregates
func NewAggregate ¶
func NewAggregate(source interface{}) BaseAggregate
NewAggregate is a Constructor Function to create a new BaseAggregate
func (BaseAggregate) AggregateID ¶
func (a BaseAggregate) AggregateID() uuid.UUID
AggregateID is a unique identifier for an Aggregate instance All Aggregate Events are grouped by this UUID
func (*BaseAggregate) Apply ¶
func (a *BaseAggregate) Apply(event DomainEvent)
Apply add a already wrapped Event in an DomainEvent to the Stack With this Method you have more control of the wrapped DomainEvent
func (*BaseAggregate) CallEventHandler ¶
func (a *BaseAggregate) CallEventHandler(event interface{}, metadata map[string]interface{})
CallEventHandler is an internal Method who calls the related Handler Method of the Aggregate after a new Event was recorded The Method has to have the Schema When{EventName}(event EventStruct) -> an example: func (f *FooAggregate) WhenFooEvent(e FooEvent, metadata map[string]interface{})
func (*BaseAggregate) FromHistory ¶
func (a *BaseAggregate) FromHistory(events DomainEventIterator) error
FromHistory recreate the latest state of an existing Aggregate by its recorded Events
func (*BaseAggregate) PopEvents ¶
func (a *BaseAggregate) PopEvents() []DomainEvent
PopEvents return and clear all new and not persisted events of the stack
func (*BaseAggregate) RecordThat ¶
func (a *BaseAggregate) RecordThat(event interface{}, metadata map[string]interface{})
RecordThat add a new Event to the EventStream of an Aggregate instance {event} represent a basic struct with the Event payload {metadata} is an map with additional informations and can be used to filter the EventStream in Projections or Queries
func (BaseAggregate) Version ¶
func (a BaseAggregate) Version() int
Version returns the current Version of the Aggregate
type Client ¶
type Client interface { // Conn is the underlying Storage Connection like pgx.Pool for Postgres Conn() interface{} // Exists the given collection (table) Exists(ctx context.Context, collection string) (bool, error) // Delete the given collection (table) Delete(ctx context.Context, collection string) error // Reset (truncate) the given collection (table) Reset(ctx context.Context, collection string) error // Insert a new item into the collection, the map key represent the storage column Insert(ctx context.Context, collection string, values map[string]interface{}) error // Remove all items from the collection by the given identifiers, the map key represent the storage column Remove(ctx context.Context, collection string, identifiers map[string]interface{}) error // Update all matching items from the collection with the new values, the map key represent the storage column Update(ctx context.Context, collection string, values map[string]interface{}, identifiers map[string]interface{}) error }
Client is a Helper to execute simple commands on a ReadModel
type DomainEvent ¶
type DomainEvent struct {
// contains filtered or unexported fields
}
DomainEvent represent a Wrapper for DomainEvents to add meta informations It is used to simplify the handling of adding and recreate new DomainEvents
func NewDomainEvent ¶
func NewDomainEvent(aggregateID uuid.UUID, payload interface{}, metadata map[string]interface{}, createdAt time.Time) DomainEvent
NewDomainEvent creates a new Event and set default vaues like Metadata Creates the EventID
func (DomainEvent) AggregateID ¶
func (e DomainEvent) AggregateID() uuid.UUID
AggregateID of the related Aggregate
func (DomainEvent) AggregateType ¶
func (e DomainEvent) AggregateType() string
AggregateType of the Event
func (DomainEvent) CreatedAt ¶
func (e DomainEvent) CreatedAt() time.Time
CreatedAt is the creation DateTime of the Event
func (DomainEvent) Metadata ¶
func (e DomainEvent) Metadata() map[string]interface{}
Metadata of the Event like the related AggregateID
func (DomainEvent) Number ¶
func (e DomainEvent) Number() int
Number of the event in the complete EventStream (over multiple aggregates)
func (DomainEvent) Payload ¶
func (e DomainEvent) Payload() interface{}
Payload is the original recorded DomainEventStruct
func (DomainEvent) SingleMetadata ¶
func (e DomainEvent) SingleMetadata(key string) (interface{}, bool)
SingleMetadata from the Metadata Map
func (DomainEvent) Version ¶
func (e DomainEvent) Version() int
Payload is the original recorded DomainEventStruct It is set and modified as Integer Postgres as persistence layout read the value as float64 to it has to be converted in this scenarios
func (DomainEvent) WithAddedMetadata ¶
func (e DomainEvent) WithAddedMetadata(name string, value interface{}) DomainEvent
WithAddedMetadata create a copy of the event with the given additional Metadata
func (DomainEvent) WithAggregateType ¶
func (e DomainEvent) WithAggregateType(aType string) DomainEvent
WithAggregateType create a copy of the event with the given AggregateType
func (DomainEvent) WithNumber ¶
func (e DomainEvent) WithNumber(number int) DomainEvent
WithNumber create a copy of the event with the given number Is used to update the Number after adding to the EventStream or to recreate an existing Event
func (DomainEvent) WithUUID ¶
func (e DomainEvent) WithUUID(uuid uuid.UUID) DomainEvent
WithUUID create a copy of the event with the given EventID Used to recreate a existing event from the underlying persistence storage
func (DomainEvent) WithVersion ¶
func (e DomainEvent) WithVersion(v int) DomainEvent
WithVersion create a copy of the event with the given Version
type DomainEventAction ¶
type DomainEventAction = string
const ( PreAppend DomainEventAction = "PRE_APPEND" Appended DomainEventAction = "APPENDED" Loaded DomainEventAction = "LOADED" )
type DomainEventIterator ¶
type DomainEventIterator interface { // Next turns the cursor to the next DomainEvent Next() bool // Current returns the current selected DomainEvent in the List or a related error Current() (*DomainEvent, error) // Error returns the latest error Error() error // Rewind the iterator cursor Rewind() // Close removes all fetched events and resets the cursor Close() // IsEmpty checks for any DomainEvent in the Iterator IsEmpty() (bool, error) // Converts the Iterator to an list of DomainEvents ToList() ([]DomainEvent, error) }
DomainEventIterator is a lazy loading Iterator It fetches DomainEvents in 1000er steps until all events are loaded
type DomainEventMiddleware ¶
type DomainEventMiddleware = func(ctx context.Context, event DomainEvent) (DomainEvent, error)
type EventHandler ¶
type EventHandler = func(state interface{}, event DomainEvent) interface{}
EventHandler process a single Event and returns the new ProjectionState The First argument is the current state of the projection The Second argument is the loaded event
type EventStore ¶
type EventStore struct {
// contains filtered or unexported fields
}
EventStore represent the connection to our EventStream with the selected PersistenceStrategy
func NewEventStore ¶
func NewEventStore(strategy PersistenceStrategy) *EventStore
NewEventStore creates a BasicEventStore with the selected PersistenceStrategy
func (*EventStore) AppendMiddleware ¶
func (es *EventStore) AppendMiddleware(action DomainEventAction, middleware DomainEventMiddleware)
Append a middleware to one of the existing Actions PreAppend, Appended, Loaded
func (*EventStore) AppendTo ¶
func (es *EventStore) AppendTo(ctx context.Context, streamName string, events []DomainEvent) error
AppendTo appends a list of events to the given EventStream
func (*EventStore) CreateStream ¶
func (es *EventStore) CreateStream(ctx context.Context, streamName string) error
CreateStream creates a new EventStream
func (*EventStore) DeleteStream ¶
func (es *EventStore) DeleteStream(ctx context.Context, streamName string) error
DeleteStream deletes an existing EventStream
func (*EventStore) FetchAllStreamNames ¶
func (es *EventStore) FetchAllStreamNames(ctx context.Context) ([]string, error)
FetchAllStreamNames returns a list of all existing EventStreams
func (*EventStore) Install ¶
func (es *EventStore) Install(ctx context.Context) error
Install creates the EventStreams and Projections Table if not Exists
func (*EventStore) Load ¶
func (es *EventStore) Load(ctx context.Context, streamName string, fromNumber, count int, matcher MetadataMatcher) (DomainEventIterator, error)
Load Events from the given EventStream from the given Number, restrictable by a count (Limit) and custom filter (MetadataMatcher)
func (*EventStore) MergeAndLoad ¶
func (es *EventStore) MergeAndLoad(ctx context.Context, count int, streams ...LoadStreamParameter) (DomainEventIterator, error)
MergeAndLoad can load Events from multiple Stream merged and sorted by the historical order The Result could also be restriced by count and MetadataMatcher
type FieldType ¶
type FieldType string
Enum for possible categories of fields to filter EventStreams
type HandlersCache ¶
type LoadStreamParameter ¶
type LoadStreamParameter struct { StreamName string FromNumber int Matcher MetadataMatcher }
type MetadataMatch ¶
type MetadataMatch struct { // Field name to filter // For MessagePropertyField possible values are "event_name", "created_at", "uuid" // For MetadataField its the name of the filtered metadata like "_aggregate_id" Field string // Value to filter Value interface{} // Operation to execute like EqualsOperator // Example for MetadataField and EqualsOperator checks if MetadataMatch.Value = event.metadata[MetadataMatch.Field] Operation MetadataOperator // FieldType to filter FieldType FieldType }
MetadataMatch is a struct to filter an EventStream by Metadata or EventProperties Like EventName, AggregateID, Version, CreatedAt
type MetadataMatcher ¶
type MetadataMatcher = []MetadataMatch
MetadataMatcher alias of a List of MetadataMatch
type MetadataOperator ¶
type MetadataOperator string
Enum for possible Operators to filter EventStreams
const ( EqualsOperator MetadataOperator = "=" NotEqualsOperator MetadataOperator = "!=" GreaterThanOperator MetadataOperator = ">" GreaterThanEqualsOperator MetadataOperator = ">=" InOperator MetadataOperator = "in" NotInOperator MetadataOperator = "nin" LowerThanOperator MetadataOperator = "<" LowerThanEuqalsOperator MetadataOperator = "<=" RegexOperator MetadataOperator = "regex" )
type MiddlewareIterator ¶
type MiddlewareIterator struct {
// contains filtered or unexported fields
}
func NewMiddlewareIterator ¶
func NewMiddlewareIterator(ctx context.Context, iterator DomainEventIterator, middleware []DomainEventMiddleware) MiddlewareIterator
func (MiddlewareIterator) Close ¶
func (m MiddlewareIterator) Close()
func (MiddlewareIterator) Current ¶
func (m MiddlewareIterator) Current() (*DomainEvent, error)
func (MiddlewareIterator) Error ¶
func (m MiddlewareIterator) Error() error
func (MiddlewareIterator) IsEmpty ¶
func (m MiddlewareIterator) IsEmpty() (bool, error)
func (MiddlewareIterator) Next ¶
func (m MiddlewareIterator) Next() bool
func (MiddlewareIterator) Rewind ¶
func (m MiddlewareIterator) Rewind()
func (MiddlewareIterator) ToList ¶
func (m MiddlewareIterator) ToList() ([]DomainEvent, error)
type PersistenceStrategy ¶
type PersistenceStrategy interface { // FetchAllStreamNames returns a list of all existing EventStreams FetchAllStreamNames(ctx context.Context) ([]string, error) // CreateEventStreamsTable creates a DB Table / Collection to manage all existing EventStreams CreateEventStreamsTable(context.Context) error // CreateProjectionsTable creates a DB Table / Collection to manage all existing Projections CreateProjectionsTable(context.Context) error // AddStreamToStreamsTable adds a new EventStream to the EventStreams Table / Collection AddStreamToStreamsTable(ctx context.Context, streamName string) error // RemoveStreamFromStreamsTable removes a EventStream from the EventStreams Table / Collection RemoveStreamFromStreamsTable(ctx context.Context, streamName string) error // DeleteStream deletes the EventStream from the EventStreams Table / Collection and deletes the related EventStream Table / Collection DeleteStream(ctx context.Context, streamName string) error // CreateSchema creates a new EventStream Table / Collection which is used to persist all related Events CreateSchema(ctx context.Context, streamName string) error // DropSchema removes a EventStream Table / Collection with all included Events DropSchema(ctx context.Context, streamName string) error // HasStream return if a EventStream with the given name exists HasStream(ctx context.Context, streamName string) (bool, error) // AppendTo appends multiple events to the given EventStream AppendTo(ctx context.Context, streamName string, events []DomainEvent) error // Load Events from the given EventStream from the given Number, restrictable by a count (Limit) and custom filter (MetadataMatcher) Load(ctx context.Context, streamName string, fromNumber int, count int, matcher MetadataMatcher) (DomainEventIterator, error) // MergeAndLoad can load Events from multiple Stream merged and sorted by the historical order // The Result could also be restriced by count and MetadataMatcher MergeAndLoad(ctx context.Context, count int, streams ...LoadStreamParameter) (DomainEventIterator, error) }
PersistenceStrategy defines an Interface needed for an underlying PersistentStorage Current implementations are Postgres and InMemory
type ProjectionManager ¶
type ProjectionManager interface { // FetchProjectionStatus returns the active status of the given projection FetchProjectionStatus(ctx context.Context, projectionName string) (Status, error) // CreateProjection creates a new projections entry in the projections table CreateProjection(ctx context.Context, projectionName string, state interface{}, status Status) error // DeleteProjection deletes a projection entry from the projections table DeleteProjection(ctx context.Context, projectionName string) error // ResetProjection resets state and positions from the given projection ResetProjection(ctx context.Context, projectionName string, state interface{}) error // PersistProjection persists the current state and position of the given projection PersistProjection(ctx context.Context, projectionName string, state interface{}, streamPositions map[string]int) error // LoadProjection loads latest state and positions of the given projection LoadProjection(ctx context.Context, projectionName string) (map[string]int, interface{}, error) // UpdateProjectionStatus updates the status of a given projection UpdateProjectionStatus(ctx context.Context, projectionName string, status Status) error // ProjectionExists returns if a projection with the given name exists ProjectionExists(ctx context.Context, projectionName string) (bool, error) }
ProjectionManager manages the Projections Table / Collection and hase multiple implementations for different persistens layers
type ProjectionNotFound ¶
type ProjectionNotFound struct {
Name string
}
ProjectionNotFound is returned if you try to delete / reset or load a none existing Projection
func (ProjectionNotFound) Error ¶
func (e ProjectionNotFound) Error() string
type Projector ¶
type Projector struct {
// contains filtered or unexported fields
}
Projector creates a persistened projection of one or multiple streams
func NewProjector ¶
func NewProjector(name string, eventStore *EventStore, manager ProjectionManager) Projector
NewProjector create a new Projector to configure and run a new projection Define your prefered persistence storage with the ProjectionManager (at this time only Postgres is supported :-D)
func (*Projector) Delete ¶
Delete the Projection from the Projections table / collection and if deleteEmittedEvents is true Also if exists the related Emit-EventStream
func (*Projector) Emit ¶
func (q *Projector) Emit(ctx context.Context, event DomainEvent) error
Emit creates a new EventStream with the name of the Projection And append the event to this new EventStream
func (*Projector) FromStream ¶
func (q *Projector) FromStream(streamName string, matcher MetadataMatcher) *Projector
FromStream read events from a single EventStream
func (*Projector) FromStreams ¶
func (q *Projector) FromStreams(streams ...StreamProjection) *Projector
FromStreams read events from multiple EventStreams
func (Projector) State ¶
func (q Projector) State() interface{}
State returns the current Projection State
func (*Projector) Stop ¶
Stop the Projection and persist the current state and EventStream positions
func (*Projector) When ¶
func (q *Projector) When(handlers map[string]EventHandler) *Projector
When define multiple handlers for You can create one handler for one event Events without a handler will not be progressed
func (*Projector) WhenAny ¶
func (q *Projector) WhenAny(handler EventHandler) *Projector
WhenAny defines a single handler for all possible Events
type Query ¶
type Query struct { Status Status // contains filtered or unexported fields }
Query custom informations from your EventStream Queries are not persisted, they provide the latest state after running
func (*Query) FromStream ¶
func (q *Query) FromStream(streamName string, matcher MetadataMatcher) *Query
FromStream read events from a single EventStream
func (*Query) FromStreams ¶
func (q *Query) FromStreams(streams ...StreamProjection) *Query
FromStreams read events from multiple EventStreams
func (*Query) When ¶
func (q *Query) When(handlers map[string]func(state interface{}, event DomainEvent) interface{}) *Query
When define multiple handlers for You can create one handler for one event Events without a handler will not be progressed
func (*Query) WhenAny ¶
func (q *Query) WhenAny(handler func(state interface{}, event DomainEvent) interface{}) *Query
WhenAny defines a single handler for all possible Events
type ReadModel ¶
type ReadModel interface { // Init your ReadModel, for example create the DB Table Init(ctx context.Context) error // Check if your ReadModel was already initialized, for example if DB Table already exists IsInitialized(ctx context.Context) (bool, error) // Reset your ReadModel Reset(ctx context.Context) error // Delete your ReadModel Delete(ctx context.Context) error // Stack add a new command to you ReadModel Stack(method string, args ...map[string]interface{}) // Persist the current State of your ReadModel, executes all stacked commands Persist(ctx context.Context) error }
ReadModel is a custom ReadModel of your DomainEvents and could be represented and peristed in many different forms For Example as DB Table in your Database or as cached files Example implementation in example/read_model.go
type ReadModelProjector ¶
type ReadModelProjector struct { ReadModel ReadModel // contains filtered or unexported fields }
Projector creates a custom ReadModel over one or multiple streams
func NewReadModelProjector ¶
func NewReadModelProjector(name string, readModel ReadModel, eventStore *EventStore, manager ProjectionManager) ReadModelProjector
NewReadModelProjector for the given ReadModel implementation, EventStore and ProjectionManager Find an example for a ReadModel in example/read_model.go
func (*ReadModelProjector) Delete ¶
func (q *ReadModelProjector) Delete(ctx context.Context, deleteProjection bool) error
Delete the ReadModelProjection from the Projections table / collection and if deleteProjection is true it also runs the Delete Method of your ReadModel
func (*ReadModelProjector) FromAll ¶
func (q *ReadModelProjector) FromAll() *ReadModelProjector
FromAll read events from all existing EventStreams
func (*ReadModelProjector) FromStream ¶
func (q *ReadModelProjector) FromStream(streamName string, matcher MetadataMatcher) *ReadModelProjector
FromStream read events from a single EventStream
func (*ReadModelProjector) FromStreams ¶
func (q *ReadModelProjector) FromStreams(streams ...StreamProjection) *ReadModelProjector
FromStreams read events from multiple EventStreams
func (*ReadModelProjector) Init ¶
func (q *ReadModelProjector) Init(handler func() interface{}) *ReadModelProjector
Init the state, define the type and/or prefill it with data
func (ReadModelProjector) Name ¶
func (q ReadModelProjector) Name() string
Name returns the current ReadModelProjection Name
func (*ReadModelProjector) Reset ¶
func (q *ReadModelProjector) Reset(ctx context.Context) error
Reset the ReadModelProjection state and EventStream positions Run also the Reset Method of your ReadModel
func (*ReadModelProjector) Run ¶
func (q *ReadModelProjector) Run(ctx context.Context, keepRunning bool) error
Run the ReadModelProjection
func (ReadModelProjector) State ¶
func (q ReadModelProjector) State() interface{}
State returns the current ReadModelProjection State
func (ReadModelProjector) Status ¶
func (q ReadModelProjector) Status() Status
Status returns the current ReadModelProjection Status
func (*ReadModelProjector) Stop ¶
func (q *ReadModelProjector) Stop(ctx context.Context) error
Stop the ReadModelProjection and persist the current state and EventStream positions
func (*ReadModelProjector) When ¶
func (q *ReadModelProjector) When(handlers map[string]EventHandler) *ReadModelProjector
When define multiple handlers for You can create one handler for one event Events without a handler will not be progressed
func (*ReadModelProjector) WhenAny ¶
func (q *ReadModelProjector) WhenAny(handler EventHandler) *ReadModelProjector
WhenAny defines a single handler for all possible Events
type Repository ¶
type Repository struct { // Stream releated to your AggregateType Stream string // contains filtered or unexported fields }
Repository for an AggregateType
func NewRepository ¶
func NewRepository(streamName string, eventStore *EventStore) Repository
NewRepository creates a Repository
func (Repository) GetAggregate ¶
func (r Repository) GetAggregate(ctx context.Context, aggregateID uuid.UUID) (DomainEventIterator, error)
GetAggregate returns a list of all persisted events of a single Aggregate, grouped by the AggregateID, in historical order
func (Repository) SaveAggregate ¶
func (r Repository) SaveAggregate(ctx context.Context, aggregate Aggregate) error
SaveAggregate persist all new Events to the EventStore
type StreamAlreadyExist ¶
type StreamAlreadyExist struct {
Stream string
}
StreamAlreadyExist is returned when you create an already existing EventStream
func (StreamAlreadyExist) Error ¶
func (e StreamAlreadyExist) Error() string
type StreamNotFound ¶
type StreamNotFound struct {
Stream string
}
StreamNotFound is returned if you try to delete / reset or load a none existing EventStream
func (StreamNotFound) Error ¶
func (e StreamNotFound) Error() string
type StreamProjection ¶
type StreamProjection struct { // StreamName of the EventStream StreamName string // Matcher a optional list of custom filters Matcher MetadataMatcher }
StreamProjection is used if you want to Projection over different Streams with different Filters
type TypeRegistry ¶
type TypeRegistry interface { GetHandlers(interface{}) HandlersCache GetTypeByName(string) (reflect.Type, bool) RegisterAggregate(aggregate interface{}, events ...interface{}) RegisterEvents(events ...interface{}) RegisterType(interface{}) }
TypeRegistry register all existing Aggregates and Events for dynamic Type Assertions and Conversions
func NewTypeRegistry ¶
func NewTypeRegistry() TypeRegistry
NewTypeRegistry constructs a new TypeRegistry