eventstore

package
v0.0.0-...-860e413 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 21, 2022 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func EventData

func EventData(event Command) ([]byte, error)

func MapEventsToV1Events

func MapEventsToV1Events(events []Event) []*models.Event

func WithResourceOwner

func WithResourceOwner(resourceOwner string) aggregateOpt

WithResourceOwner overwrites the resource owner of the aggregate by default the resource owner is set by the context

Types

type Aggregate

type Aggregate struct {
	//ID is the unique identitfier of this aggregate
	ID string `json:"-"`
	//Type is the name of the aggregate.
	Type AggregateType `json:"-"`
	//ResourceOwner is the org this aggregates belongs to
	ResourceOwner string `json:"-"`
	//InstanceID is the instance this aggregate belongs to
	InstanceID string `json:"-"`
	//Version is the semver this aggregate represents
	Version Version `json:"-"`
}

Aggregate is the basic implementation of Aggregater

func AggregateFromWriteModel

func AggregateFromWriteModel(
	wm *WriteModel,
	typ AggregateType,
	version Version,
) *Aggregate

AggregateFromWriteModel maps the given WriteModel to an Aggregate

func NewAggregate

func NewAggregate(
	ctx context.Context,
	id string,
	typ AggregateType,
	version Version,
	opts ...aggregateOpt,
) *Aggregate

NewAggregate is the default constructor of an aggregate opts overwrite values calculated by given parameters

type AggregateType

type AggregateType repository.AggregateType

AggregateType is the object name

type Asset

type Asset struct {
	// ID is to refer to the asset
	ID string
	//Asset is the actual image
	Asset []byte
	//Action defines if asset should be added or removed
	Action AssetAction
}

func NewAddAsset

func NewAddAsset(
	id string,
	asset []byte) *Asset

func NewRemoveAsset

func NewRemoveAsset(
	id string) *Asset

type AssetAction

type AssetAction int32
const (
	AssetAdd AssetAction = iota
	AssetRemove
)

type BaseEvent

type BaseEvent struct {
	EventType EventType `json:"-"`

	//User who created the event
	User string `json:"-"`
	//Service which created the event
	Service string `json:"-"`
	Data    []byte `json:"-"`
	// contains filtered or unexported fields
}

BaseEvent represents the minimum metadata of an event

func BaseEventFromRepo

func BaseEventFromRepo(event *repository.Event) *BaseEvent

BaseEventFromRepo maps a stored event to a BaseEvent

func NewBaseEventForPush

func NewBaseEventForPush(ctx context.Context, aggregate *Aggregate, typ EventType) *BaseEvent

NewBaseEventForPush is the constructor for event's which will be pushed into the eventstore the resource owner of the aggregate is only used if it's the first event of this aggregate type afterwards the resource owner of the first previous events is taken

func (*BaseEvent) Aggregate

func (e *BaseEvent) Aggregate() Aggregate

Aggregate represents the metadata of the event's aggregate

func (*BaseEvent) CreationDate

func (e *BaseEvent) CreationDate() time.Time

CreationDate is the the time, the event is inserted into the eventstore

func (*BaseEvent) DataAsBytes

func (e *BaseEvent) DataAsBytes() []byte

Data returns the payload of the event. It represent the changed fields by the event

func (*BaseEvent) EditorService

func (e *BaseEvent) EditorService() string

EditorService implements Command

func (*BaseEvent) EditorUser

func (e *BaseEvent) EditorUser() string

EditorUser implements Command

func (*BaseEvent) PreviousAggregateSequence

func (e *BaseEvent) PreviousAggregateSequence() uint64

PreviousAggregateSequence implements EventReader

func (*BaseEvent) PreviousAggregateTypeSequence

func (e *BaseEvent) PreviousAggregateTypeSequence() uint64

PreviousAggregateTypeSequence implements EventReader

func (*BaseEvent) Sequence

func (e *BaseEvent) Sequence() uint64

Sequence is an upcounting unique number of the event

func (*BaseEvent) Type

func (e *BaseEvent) Type() EventType

Type implements Command

type Columns

type Columns repository.Columns

Columns defines which fields of the event are needed for the query

const (
	//ColumnsEvent represents all fields of an event
	ColumnsEvent Columns = repository.ColumnsEvent
	// ColumnsMaxSequence represents the latest sequence of the filtered events
	ColumnsMaxSequence Columns = repository.ColumnsMaxSequence
	// ColumnsInstanceIDs represents the instance ids of the filtered events
	ColumnsInstanceIDs Columns = repository.ColumnsInstanceIDs
)

type Command

type Command interface {
	//Aggregate is the metadata of an aggregate
	Aggregate() Aggregate
	// EditorService is the service who wants to push the event
	EditorService() string
	//EditorUser is the user who wants to push the event
	EditorUser() string
	//KeyType must return an event type which should be unique in the aggregate
	Type() EventType
	//Data returns the payload of the event. It represent the changed fields by the event
	// valid types are:
	// * nil (no payload),
	// * json byte array
	// * struct which can be marshalled to json
	// * pointer to struct which can be marshalled to json
	Data() interface{}
	//UniqueConstraints should be added for unique attributes of an event, if nil constraints will not be checked
	UniqueConstraints() []*EventUniqueConstraint
}

Command is the intend to store an event into the eventstore

type Event

type Event interface {
	// EditorService is the service who pushed the event
	EditorService() string
	//EditorUser is the user who pushed the event
	EditorUser() string
	//KeyType is the type of the event
	Type() EventType

	Aggregate() Aggregate

	Sequence() uint64
	CreationDate() time.Time
	//PreviousAggregateSequence returns the previous sequence of the aggregate root (e.g. for org.42508134)
	PreviousAggregateSequence() uint64
	//PreviousAggregateTypeSequence returns the previous sequence of the aggregate type (e.g. for org)
	PreviousAggregateTypeSequence() uint64
	//DataAsBytes returns the payload of the event. It represent the changed fields by the event
	DataAsBytes() []byte
}

Event is a stored activity

type EventType

type EventType repository.EventType

EventType is the description of the change

type EventUniqueConstraint

type EventUniqueConstraint struct {
	// UniqueType is the table name for the unique constraint
	UniqueType string
	//UniqueField is the unique key
	UniqueField string
	//Action defines if unique constraint should be added or removed
	Action UniqueConstraintAction
	//ErrorMessage defines the translation file key for the error message
	ErrorMessage string
	//IsGlobal defines if the unique constraint is globally unique or just within a single instance
	IsGlobal bool
}

func NewAddEventUniqueConstraint

func NewAddEventUniqueConstraint(
	uniqueType,
	uniqueField,
	errMessage string) *EventUniqueConstraint

func NewAddGlobalEventUniqueConstraint

func NewAddGlobalEventUniqueConstraint(
	uniqueType,
	uniqueField,
	errMessage string) *EventUniqueConstraint

func NewRemoveEventUniqueConstraint

func NewRemoveEventUniqueConstraint(
	uniqueType,
	uniqueField string) *EventUniqueConstraint

func NewRemoveGlobalEventUniqueConstraint

func NewRemoveGlobalEventUniqueConstraint(
	uniqueType,
	uniqueField string) *EventUniqueConstraint

type Eventstore

type Eventstore struct {
	// contains filtered or unexported fields
}

Eventstore abstracts all functions needed to store valid events and filters the stored events

func NewEventstore

func NewEventstore(repo repository.Repository) *Eventstore

func Start

func Start(sqlClient *sql.DB) (*Eventstore, error)

func (*Eventstore) Filter

func (es *Eventstore) Filter(ctx context.Context, queryFactory *SearchQueryBuilder) ([]Event, error)

Filter filters the stored events based on the searchQuery and maps the events to the defined event structs

func (*Eventstore) FilterToQueryReducer

func (es *Eventstore) FilterToQueryReducer(ctx context.Context, r QueryReducer) error

FilterToQueryReducer filters the events based on the search query of the query function, appends all events to the reducer and calls it's reduce function

func (*Eventstore) FilterToReducer

func (es *Eventstore) FilterToReducer(ctx context.Context, searchQuery *SearchQueryBuilder, r reducer) error

FilterToReducer filters the events based on the search query, appends all events to the reducer and calls it's reduce function

func (*Eventstore) Health

func (es *Eventstore) Health(ctx context.Context) error

Health checks if the eventstore can properly work It checks if the repository can serve load

func (*Eventstore) InstanceIDs

func (es *Eventstore) InstanceIDs(ctx context.Context, queryFactory *SearchQueryBuilder) ([]string, error)

InstanceIDs returns the instance ids found by the search query

func (*Eventstore) LatestSequence

func (es *Eventstore) LatestSequence(ctx context.Context, queryFactory *SearchQueryBuilder) (uint64, error)

LatestSequence filters the latest sequence for the given search query

func (*Eventstore) NewInstance

func (es *Eventstore) NewInstance(ctx context.Context, instanceID string) error

func (*Eventstore) Push

func (es *Eventstore) Push(ctx context.Context, cmds ...Command) ([]Event, error)

Push pushes the events in a single transaction an event needs at least an aggregate

func (*Eventstore) RegisterFilterEventMapper

func (es *Eventstore) RegisterFilterEventMapper(eventType EventType, mapper func(*repository.Event) (Event, error)) *Eventstore

RegisterFilterEventMapper registers a function for mapping an eventstore event to an event

type QueryReducer

type QueryReducer interface {

	//Query returns the SearchQueryFactory for the events needed in reducer
	Query() *SearchQueryBuilder
	// contains filtered or unexported methods
}

type ReadModel

type ReadModel struct {
	AggregateID       string    `json:"-"`
	ProcessedSequence uint64    `json:"-"`
	CreationDate      time.Time `json:"-"`
	ChangeDate        time.Time `json:"-"`
	Events            []Event   `json:"-"`
	ResourceOwner     string    `json:"-"`
	InstanceID        string    `json:"-"`
}

ReadModel is the minimum representation of a read model. It implements a basic reducer it might be saved in a database or in memory

func (*ReadModel) AppendEvents

func (rm *ReadModel) AppendEvents(events ...Event) *ReadModel

AppendEvents adds all the events to the read model. The function doesn't compute the new state of the read model

func (*ReadModel) Reduce

func (rm *ReadModel) Reduce() error

Reduce is the basic implementation of reducer If this function is extended the extending function should be the last step

type SearchQuery

type SearchQuery struct {
	// contains filtered or unexported fields
}

func (*SearchQuery) AggregateIDs

func (query *SearchQuery) AggregateIDs(ids ...string) *SearchQuery

AggregateIDs filters for events with the given aggregate id's

func (*SearchQuery) AggregateTypes

func (query *SearchQuery) AggregateTypes(types ...AggregateType) *SearchQuery

AggregateTypes filters for events with the given aggregate types

func (*SearchQuery) Builder

func (query *SearchQuery) Builder() *SearchQueryBuilder

Builder returns the SearchQueryBuilder of the sub query

func (*SearchQuery) CreationDateAfter

func (query *SearchQuery) CreationDateAfter(time time.Time) *SearchQuery

CreationDateNewer filters for events which happened after the specified time

func (*SearchQuery) EventData

func (query *SearchQuery) EventData(data map[string]interface{}) *SearchQuery

EventData filters for events with the given event data. Use this call with care as it will be slower than the other filters.

func (*SearchQuery) EventTypes

func (query *SearchQuery) EventTypes(types ...EventType) *SearchQuery

EventTypes filters for events with the given event types

func (*SearchQuery) ExcludedInstanceID

func (query *SearchQuery) ExcludedInstanceID(instanceIDs ...string) *SearchQuery

ExcludedInstanceID filters for events not having the given instanceIDs

func (*SearchQuery) InstanceID

func (query *SearchQuery) InstanceID(instanceID string) *SearchQuery

InstanceID filters for events with the given instanceID

func (SearchQuery) Or

func (query SearchQuery) Or() *SearchQuery

Or creates a new sub query on the search query builder

func (*SearchQuery) SequenceGreater

func (query *SearchQuery) SequenceGreater(sequence uint64) *SearchQuery

SequenceGreater filters for events with sequence greater the requested sequence

func (*SearchQuery) SequenceLess

func (query *SearchQuery) SequenceLess(sequence uint64) *SearchQuery

SequenceLess filters for events with sequence less the requested sequence

type SearchQueryBuilder

type SearchQueryBuilder struct {
	// contains filtered or unexported fields
}

SearchQueryBuilder represents the builder for your filter if invalid data are set the filter will fail

func NewSearchQueryBuilder

func NewSearchQueryBuilder(columns Columns) *SearchQueryBuilder

NewSearchQueryBuilder creates a new builder for event filters aggregateTypes must contain at least one aggregate type

func (*SearchQueryBuilder) AddQuery

func (builder *SearchQueryBuilder) AddQuery() *SearchQuery

AddQuery creates a new sub query. All fields in the sub query are AND-connected in the storage request. Multiple sub queries are OR-connected in the storage request.

func (*SearchQueryBuilder) Columns

func (builder *SearchQueryBuilder) Columns(columns Columns) *SearchQueryBuilder

Columns defines which fields are set

func (*SearchQueryBuilder) InstanceID

func (builder *SearchQueryBuilder) InstanceID(instanceID string) *SearchQueryBuilder

InstanceID defines the instanceID (system) of the events

func (*SearchQueryBuilder) Limit

func (builder *SearchQueryBuilder) Limit(limit uint64) *SearchQueryBuilder

Limit defines how many events are returned maximally.

func (*SearchQueryBuilder) Matches

func (builder *SearchQueryBuilder) Matches(event Event, existingLen int) (matches bool)

func (*SearchQueryBuilder) OrderAsc

func (builder *SearchQueryBuilder) OrderAsc() *SearchQueryBuilder

OrderAsc changes the sorting order of the returned events to ascending

func (*SearchQueryBuilder) OrderDesc

func (builder *SearchQueryBuilder) OrderDesc() *SearchQueryBuilder

OrderDesc changes the sorting order of the returned events to descending

func (*SearchQueryBuilder) ResourceOwner

func (builder *SearchQueryBuilder) ResourceOwner(resourceOwner string) *SearchQueryBuilder

ResourceOwner defines the resource owner (org) of the events

func (*SearchQueryBuilder) SetTx

func (builder *SearchQueryBuilder) SetTx(tx *sql.Tx) *SearchQueryBuilder

SetTx ensures that the eventstore library uses the existing transaction

type Subscription

type Subscription struct {
	Events chan Event
	// contains filtered or unexported fields
}

func SubscribeAggregates

func SubscribeAggregates(eventQueue chan Event, aggregates ...AggregateType) *Subscription

SubscribeAggregates subscribes for all events on the given aggregates

func SubscribeEventTypes

func SubscribeEventTypes(eventQueue chan Event, types map[AggregateType][]EventType) *Subscription

SubscribeEventTypes subscribes for the given event types if no event types are provided the subscription is for all events of the aggregate

func (*Subscription) Unsubscribe

func (s *Subscription) Unsubscribe()

type UniqueConstraintAction

type UniqueConstraintAction int32
const (
	UniqueConstraintAdd UniqueConstraintAction = iota
	UniqueConstraintRemove
)

type Version

type Version repository.Version

type WriteModel

type WriteModel struct {
	AggregateID       string    `json:"-"`
	ProcessedSequence uint64    `json:"-"`
	Events            []Event   `json:"-"`
	ResourceOwner     string    `json:"-"`
	InstanceID        string    `json:"-"`
	ChangeDate        time.Time `json:"-"`
}

WriteModel is the minimum representation of a command side write model. It implements a basic reducer it's purpose is to reduce events to create new ones

func (*WriteModel) AppendEvents

func (rm *WriteModel) AppendEvents(events ...Event)

AppendEvents adds all the events to the read model. The function doesn't compute the new state of the read model

func (*WriteModel) Reduce

func (wm *WriteModel) Reduce() error

Reduce is the basic implementaion of reducer If this function is extended the extending function should be the last step

Directories

Path Synopsis
mock
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.
sql
v1
internal/repository/mock
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.
mock
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.
sdk
spooler/mock
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL