Documentation ¶
Index ¶
- func EventData(event Command) ([]byte, error)
- func MapEventsToV1Events(events []Event) []*models.Event
- func WithResourceOwner(resourceOwner string) aggregateOpt
- type Aggregate
- type AggregateType
- type Asset
- type AssetAction
- type BaseEvent
- func (e *BaseEvent) Aggregate() Aggregate
- func (e *BaseEvent) CreationDate() time.Time
- func (e *BaseEvent) DataAsBytes() []byte
- func (e *BaseEvent) EditorService() string
- func (e *BaseEvent) EditorUser() string
- func (e *BaseEvent) PreviousAggregateSequence() uint64
- func (e *BaseEvent) PreviousAggregateTypeSequence() uint64
- func (e *BaseEvent) Sequence() uint64
- func (e *BaseEvent) Type() EventType
- type Columns
- type Command
- type Event
- type EventType
- type EventUniqueConstraint
- func NewAddEventUniqueConstraint(uniqueType, uniqueField, errMessage string) *EventUniqueConstraint
- func NewAddGlobalEventUniqueConstraint(uniqueType, uniqueField, errMessage string) *EventUniqueConstraint
- func NewRemoveEventUniqueConstraint(uniqueType, uniqueField string) *EventUniqueConstraint
- func NewRemoveGlobalEventUniqueConstraint(uniqueType, uniqueField string) *EventUniqueConstraint
- type Eventstore
- func (es *Eventstore) Filter(ctx context.Context, queryFactory *SearchQueryBuilder) ([]Event, error)
- func (es *Eventstore) FilterToQueryReducer(ctx context.Context, r QueryReducer) error
- func (es *Eventstore) FilterToReducer(ctx context.Context, searchQuery *SearchQueryBuilder, r reducer) error
- func (es *Eventstore) Health(ctx context.Context) error
- func (es *Eventstore) LatestSequence(ctx context.Context, queryFactory *SearchQueryBuilder) (uint64, error)
- func (es *Eventstore) NewInstance(ctx context.Context, instanceID string) error
- func (es *Eventstore) Push(ctx context.Context, cmds ...Command) ([]Event, error)
- func (es *Eventstore) RegisterFilterEventMapper(eventType EventType, mapper func(*repository.Event) (Event, error)) *Eventstore
- type QueryReducer
- type ReadModel
- type SearchQuery
- func (query *SearchQuery) AggregateIDs(ids ...string) *SearchQuery
- func (query *SearchQuery) AggregateTypes(types ...AggregateType) *SearchQuery
- func (query *SearchQuery) Builder() *SearchQueryBuilder
- func (query *SearchQuery) EventData(data map[string]interface{}) *SearchQuery
- func (query *SearchQuery) EventTypes(types ...EventType) *SearchQuery
- func (query *SearchQuery) ExcludedInstanceID(instanceIDs ...string) *SearchQuery
- func (query *SearchQuery) InstanceID(instanceID string) *SearchQuery
- func (query SearchQuery) Or() *SearchQuery
- func (query *SearchQuery) SequenceGreater(sequence uint64) *SearchQuery
- func (query *SearchQuery) SequenceLess(sequence uint64) *SearchQuery
- type SearchQueryBuilder
- func (builder *SearchQueryBuilder) AddQuery() *SearchQuery
- func (builder *SearchQueryBuilder) Columns(columns Columns) *SearchQueryBuilder
- func (builder *SearchQueryBuilder) InstanceID(instanceID string) *SearchQueryBuilder
- func (builder *SearchQueryBuilder) Limit(limit uint64) *SearchQueryBuilder
- func (builder *SearchQueryBuilder) Matches(event Event, existingLen int) (matches bool)
- func (builder *SearchQueryBuilder) OrderAsc() *SearchQueryBuilder
- func (builder *SearchQueryBuilder) OrderDesc() *SearchQueryBuilder
- func (builder *SearchQueryBuilder) ResourceOwner(resourceOwner string) *SearchQueryBuilder
- type Subscription
- type UniqueConstraintAction
- type Version
- type WriteModel
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func MapEventsToV1Events ¶
func WithResourceOwner ¶
func WithResourceOwner(resourceOwner string) aggregateOpt
WithResourceOwner overwrites the resource owner of the aggregate by default the resource owner is set by the context
Types ¶
type Aggregate ¶
type Aggregate struct { //ID is the unique identitfier of this aggregate ID string `json:"-"` //Type is the name of the aggregate. Type AggregateType `json:"-"` //ResourceOwner is the org this aggregates belongs to ResourceOwner string `json:"-"` //InstanceID is the instance this aggregate belongs to InstanceID string `json:"-"` //Version is the semver this aggregate represents Version Version `json:"-"` }
Aggregate is the basic implementation of Aggregater
func AggregateFromWriteModel ¶
func AggregateFromWriteModel( wm *WriteModel, typ AggregateType, version Version, ) *Aggregate
AggregateFromWriteModel maps the given WriteModel to an Aggregate
func NewAggregate ¶
func NewAggregate( ctx context.Context, id string, typ AggregateType, version Version, opts ...aggregateOpt, ) *Aggregate
NewAggregate is the default constructor of an aggregate opts overwrite values calculated by given parameters
type Asset ¶
type Asset struct { // ID is to refer to the asset ID string //Asset is the actual image Asset []byte //Action defines if asset should be added or removed Action AssetAction }
func NewAddAsset ¶
func NewRemoveAsset ¶
type BaseEvent ¶
type BaseEvent struct { EventType EventType `json:"-"` //User who created the event User string `json:"-"` //Service which created the event Service string `json:"-"` Data []byte `json:"-"` // contains filtered or unexported fields }
BaseEvent represents the minimum metadata of an event
func BaseEventFromRepo ¶
func BaseEventFromRepo(event *repository.Event) *BaseEvent
BaseEventFromRepo maps a stored event to a BaseEvent
func NewBaseEventForPush ¶
NewBaseEventForPush is the constructor for event's which will be pushed into the eventstore the resource owner of the aggregate is only used if it's the first event of this aggregate type afterwards the resource owner of the first previous events is taken
func (*BaseEvent) CreationDate ¶
CreationDate is the the time, the event is inserted into the eventstore
func (*BaseEvent) DataAsBytes ¶
Data returns the payload of the event. It represent the changed fields by the event
func (*BaseEvent) EditorService ¶
EditorService implements Command
func (*BaseEvent) EditorUser ¶
EditorUser implements Command
func (*BaseEvent) PreviousAggregateSequence ¶
PreviousAggregateSequence implements EventReader
func (*BaseEvent) PreviousAggregateTypeSequence ¶
PreviousAggregateTypeSequence implements EventReader
type Columns ¶
type Columns repository.Columns
Columns defines which fields of the event are needed for the query
const ( //ColumnsEvent represents all fields of an event ColumnsEvent Columns = repository.ColumnsEvent // ColumnsMaxSequence represents the latest sequence of the filtered events ColumnsMaxSequence Columns = repository.ColumnsMaxSequence )
type Command ¶
type Command interface { //Aggregate is the metadata of an aggregate Aggregate() Aggregate // EditorService is the service who wants to push the event EditorService() string //EditorUser is the user who wants to push the event EditorUser() string //KeyType must return an event type which should be unique in the aggregate Type() EventType //Data returns the payload of the event. It represent the changed fields by the event // valid types are: // * nil (no payload), // * json byte array // * struct which can be marshalled to json // * pointer to struct which can be marshalled to json Data() interface{} //UniqueConstraints should be added for unique attributes of an event, if nil constraints will not be checked UniqueConstraints() []*EventUniqueConstraint }
Command is the intend to store an event into the eventstore
type Event ¶
type Event interface { // EditorService is the service who pushed the event EditorService() string //EditorUser is the user who pushed the event EditorUser() string //KeyType is the type of the event Type() EventType Aggregate() Aggregate Sequence() uint64 CreationDate() time.Time //PreviousAggregateSequence returns the previous sequence of the aggregate root (e.g. for org.42508134) PreviousAggregateSequence() uint64 //PreviousAggregateTypeSequence returns the previous sequence of the aggregate type (e.g. for org) PreviousAggregateTypeSequence() uint64 //DataAsBytes returns the payload of the event. It represent the changed fields by the event DataAsBytes() []byte }
Event is a stored activity
type EventUniqueConstraint ¶
type EventUniqueConstraint struct { // UniqueType is the table name for the unique constraint UniqueType string //UniqueField is the unique key UniqueField string //Action defines if unique constraint should be added or removed Action UniqueConstraintAction //ErrorMessage defines the translation file key for the error message ErrorMessage string //IsGlobal defines if the unique constraint is globally unique or just within a single instance IsGlobal bool }
func NewAddEventUniqueConstraint ¶
func NewAddEventUniqueConstraint( uniqueType, uniqueField, errMessage string) *EventUniqueConstraint
func NewAddGlobalEventUniqueConstraint ¶
func NewAddGlobalEventUniqueConstraint( uniqueType, uniqueField, errMessage string) *EventUniqueConstraint
func NewRemoveEventUniqueConstraint ¶
func NewRemoveEventUniqueConstraint( uniqueType, uniqueField string) *EventUniqueConstraint
func NewRemoveGlobalEventUniqueConstraint ¶
func NewRemoveGlobalEventUniqueConstraint( uniqueType, uniqueField string) *EventUniqueConstraint
type Eventstore ¶
type Eventstore struct {
// contains filtered or unexported fields
}
Eventstore abstracts all functions needed to store valid events and filters the stored events
func NewEventstore ¶
func NewEventstore(repo repository.Repository) *Eventstore
func (*Eventstore) Filter ¶
func (es *Eventstore) Filter(ctx context.Context, queryFactory *SearchQueryBuilder) ([]Event, error)
Filter filters the stored events based on the searchQuery and maps the events to the defined event structs
func (*Eventstore) FilterToQueryReducer ¶
func (es *Eventstore) FilterToQueryReducer(ctx context.Context, r QueryReducer) error
FilterToQueryReducer filters the events based on the search query of the query function, appends all events to the reducer and calls it's reduce function
func (*Eventstore) FilterToReducer ¶
func (es *Eventstore) FilterToReducer(ctx context.Context, searchQuery *SearchQueryBuilder, r reducer) error
FilterToReducer filters the events based on the search query, appends all events to the reducer and calls it's reduce function
func (*Eventstore) Health ¶
func (es *Eventstore) Health(ctx context.Context) error
Health checks if the eventstore can properly work It checks if the repository can serve load
func (*Eventstore) LatestSequence ¶
func (es *Eventstore) LatestSequence(ctx context.Context, queryFactory *SearchQueryBuilder) (uint64, error)
LatestSequence filters the latest sequence for the given search query
func (*Eventstore) NewInstance ¶
func (es *Eventstore) NewInstance(ctx context.Context, instanceID string) error
func (*Eventstore) Push ¶
Push pushes the events in a single transaction an event needs at least an aggregate
func (*Eventstore) RegisterFilterEventMapper ¶
func (es *Eventstore) RegisterFilterEventMapper(eventType EventType, mapper func(*repository.Event) (Event, error)) *Eventstore
RegisterFilterEventMapper registers a function for mapping an eventstore event to an event
type QueryReducer ¶
type QueryReducer interface { //Query returns the SearchQueryFactory for the events needed in reducer Query() *SearchQueryBuilder // contains filtered or unexported methods }
type ReadModel ¶
type ReadModel struct { AggregateID string `json:"-"` ProcessedSequence uint64 `json:"-"` CreationDate time.Time `json:"-"` ChangeDate time.Time `json:"-"` Events []Event `json:"-"` ResourceOwner string `json:"-"` InstanceID string `json:"-"` }
ReadModel is the minimum representation of a read model. It implements a basic reducer it might be saved in a database or in memory
func (*ReadModel) AppendEvents ¶
AppendEvents adds all the events to the read model. The function doesn't compute the new state of the read model
type SearchQuery ¶
type SearchQuery struct {
// contains filtered or unexported fields
}
func (*SearchQuery) AggregateIDs ¶
func (query *SearchQuery) AggregateIDs(ids ...string) *SearchQuery
AggregateIDs filters for events with the given aggregate id's
func (*SearchQuery) AggregateTypes ¶
func (query *SearchQuery) AggregateTypes(types ...AggregateType) *SearchQuery
AggregateTypes filters for events with the given aggregate types
func (*SearchQuery) Builder ¶
func (query *SearchQuery) Builder() *SearchQueryBuilder
Builder returns the SearchQueryBuilder of the sub query
func (*SearchQuery) EventData ¶
func (query *SearchQuery) EventData(data map[string]interface{}) *SearchQuery
EventData filters for events with the given event data. Use this call with care as it will be slower than the other filters.
func (*SearchQuery) EventTypes ¶
func (query *SearchQuery) EventTypes(types ...EventType) *SearchQuery
EventTypes filters for events with the given event types
func (*SearchQuery) ExcludedInstanceID ¶
func (query *SearchQuery) ExcludedInstanceID(instanceIDs ...string) *SearchQuery
ExcludedInstanceID filters for events not having the given instanceIDs
func (*SearchQuery) InstanceID ¶
func (query *SearchQuery) InstanceID(instanceID string) *SearchQuery
InstanceID filters for events with the given instanceID
func (SearchQuery) Or ¶
func (query SearchQuery) Or() *SearchQuery
Or creates a new sub query on the search query builder
func (*SearchQuery) SequenceGreater ¶
func (query *SearchQuery) SequenceGreater(sequence uint64) *SearchQuery
SequenceGreater filters for events with sequence greater the requested sequence
func (*SearchQuery) SequenceLess ¶
func (query *SearchQuery) SequenceLess(sequence uint64) *SearchQuery
SequenceLess filters for events with sequence less the requested sequence
type SearchQueryBuilder ¶
type SearchQueryBuilder struct {
// contains filtered or unexported fields
}
SearchQueryBuilder represents the builder for your filter if invalid data are set the filter will fail
func NewSearchQueryBuilder ¶
func NewSearchQueryBuilder(columns Columns) *SearchQueryBuilder
NewSearchQueryBuilder creates a new builder for event filters aggregateTypes must contain at least one aggregate type
func (*SearchQueryBuilder) AddQuery ¶
func (builder *SearchQueryBuilder) AddQuery() *SearchQuery
AddQuery creates a new sub query. All fields in the sub query are AND-connected in the storage request. Multiple sub queries are OR-connected in the storage request.
func (*SearchQueryBuilder) Columns ¶
func (builder *SearchQueryBuilder) Columns(columns Columns) *SearchQueryBuilder
Columns defines which fields are set
func (*SearchQueryBuilder) InstanceID ¶
func (builder *SearchQueryBuilder) InstanceID(instanceID string) *SearchQueryBuilder
InstanceID defines the instanceID (system) of the events
func (*SearchQueryBuilder) Limit ¶
func (builder *SearchQueryBuilder) Limit(limit uint64) *SearchQueryBuilder
Limit defines how many events are returned maximally.
func (*SearchQueryBuilder) Matches ¶
func (builder *SearchQueryBuilder) Matches(event Event, existingLen int) (matches bool)
func (*SearchQueryBuilder) OrderAsc ¶
func (builder *SearchQueryBuilder) OrderAsc() *SearchQueryBuilder
OrderAsc changes the sorting order of the returned events to ascending
func (*SearchQueryBuilder) OrderDesc ¶
func (builder *SearchQueryBuilder) OrderDesc() *SearchQueryBuilder
OrderDesc changes the sorting order of the returned events to descending
func (*SearchQueryBuilder) ResourceOwner ¶
func (builder *SearchQueryBuilder) ResourceOwner(resourceOwner string) *SearchQueryBuilder
ResourceOwner defines the resource owner (org) of the events
type Subscription ¶
type Subscription struct { Events chan Event // contains filtered or unexported fields }
func SubscribeAggregates ¶
func SubscribeAggregates(eventQueue chan Event, aggregates ...AggregateType) *Subscription
SubscribeAggregates subscribes for all events on the given aggregates
func SubscribeEventTypes ¶
func SubscribeEventTypes(eventQueue chan Event, types map[AggregateType][]EventType) *Subscription
SubscribeEventTypes subscribes for the given event types if no event types are provided the subscription is for all events of the aggregate
func (*Subscription) Unsubscribe ¶
func (s *Subscription) Unsubscribe()
type UniqueConstraintAction ¶
type UniqueConstraintAction int32
const ( UniqueConstraintAdd UniqueConstraintAction = iota UniqueConstraintRemove )
type Version ¶
type Version repository.Version
type WriteModel ¶
type WriteModel struct { AggregateID string `json:"-"` ProcessedSequence uint64 `json:"-"` Events []Event `json:"-"` ResourceOwner string `json:"-"` InstanceID string `json:"-"` ChangeDate time.Time `json:"-"` }
WriteModel is the minimum representation of a command side write model. It implements a basic reducer it's purpose is to reduce events to create new ones
func (*WriteModel) AppendEvents ¶
func (rm *WriteModel) AppendEvents(events ...Event)
AppendEvents adds all the events to the read model. The function doesn't compute the new state of the read model
func (*WriteModel) Reduce ¶
func (wm *WriteModel) Reduce() error
Reduce is the basic implementaion of reducer If this function is extended the extending function should be the last step
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
mock
Package mock is a generated GoMock package.
|
Package mock is a generated GoMock package. |
internal/repository/mock
Package mock is a generated GoMock package.
|
Package mock is a generated GoMock package. |
mock
Package mock is a generated GoMock package.
|
Package mock is a generated GoMock package. |
spooler/mock
Package mock is a generated GoMock package.
|
Package mock is a generated GoMock package. |