Documentation ¶
Index ¶
- Variables
- func ValidateEventsBeforeSave(events []Event) error
- type DeleteQuery
- type DeviceDocumentMetadata
- type ETagData
- type Event
- type EventStore
- type EventUnmarshaler
- type FactoryModelFunc
- type GetEventsQuery
- type Handler
- type Iter
- type LoadedEvent
- type LogDebugfFunc
- type Model
- type Projection
- func (p *Projection) Forget(queries []SnapshotQuery) (err error)
- func (p *Projection) Handle(ctx context.Context, iter Iter) error
- func (p *Projection) HandleWithReload(ctx context.Context, iter Iter) error
- func (p *Projection) Models(queries []SnapshotQuery, onModel func(m Model) (wantNext bool))
- func (p *Projection) Project(ctx context.Context, queries []SnapshotQuery) (err error)
- type SaveStatus
- type SnapshotQuery
- type VersionQuery
Constants ¶
This section is empty.
Variables ¶
var ErrNotSupported = errors.New("not supported")
ErrNotSupported is returned when the operation is not supported.
Functions ¶
func ValidateEventsBeforeSave ¶ added in v2.16.0
Validate events before saving them in the database Prerequisites that must hold:
- AggregateID, GroupID and EventType are not empty
- Version for each event is by 1 greater than the version of the previous event
- Only the first event can be a snapshot
- All events have the same AggregateID and GroupID
- Timestamps are non-zero
- Timestamps are non-decreasing
Types ¶
type DeleteQuery ¶
type DeleteQuery struct {
GroupID string // filter by group ID, required
}
Delete documents with given group id
type DeviceDocumentMetadata ¶ added in v2.16.0
type Event ¶
type Event = interface { Version() uint64 EventType() string AggregateID() string GroupID() string IsSnapshot() bool ServiceID() (string, bool) Timestamp() time.Time ETag() *ETagData Types() []string }
Event interface over event created by user.
type EventStore ¶
type EventStore interface { // Get events from the eventstore with timestamp larger than given value // If timestamp is <=0 then the argument is ignored. GetEvents(ctx context.Context, queries []GetEventsQuery, timestamp int64, eventHandler Handler) error // Save save events to eventstore. // AggregateID, GroupID and EventType are required. // All events within one Save operation shall have the same AggregateID and GroupID. // Versions shall be unique and ascend continually. // Only first event can be a snapshot. Save(ctx context.Context, events ...Event) (status SaveStatus, err error) LoadUpToVersion(ctx context.Context, queries []VersionQuery, eventHandler Handler) error LoadFromVersion(ctx context.Context, queries []VersionQuery, eventHandler Handler) error LoadFromSnapshot(ctx context.Context, queries []SnapshotQuery, eventHandler Handler) error RemoveUpToVersion(ctx context.Context, queries []VersionQuery) error Delete(ctx context.Context, queries []DeleteQuery) error LoadDeviceMetadataByServiceIDs(ctx context.Context, serviceIDs []string, limit int64) ([]DeviceDocumentMetadata, error) GetLatestDeviceETags(ctx context.Context, deviceID string, limit uint32) ([][]byte, error) Close(ctx context.Context) error Clear(ctx context.Context) error }
EventStore provides interface over eventstore. More aggregates can be grouped by groupID, but aggregateID of aggregates must be unique against whole DB.
type EventUnmarshaler ¶
type EventUnmarshaler = interface { Version() uint64 EventType() string AggregateID() string GroupID() string IsSnapshot() bool Timestamp() time.Time Unmarshal(v interface{}) error }
EventUnmarshaler provides event.
type FactoryModelFunc ¶
FactoryModelFunc creates user model.
type GetEventsQuery ¶
type GetEventsQuery struct { GroupID string // filter by group ID, optional AggregateID string // filter to certain aggregateID, optional Types []string // filter to certain event types, optional }
Get events with given attributes. All filtering options are optional, if none are given then all events are returned,
type Iter ¶
type Iter = interface { Next(ctx context.Context) (EventUnmarshaler, bool) Err() error }
Iter provides iterator over events from eventstore or eventbus.
type LoadedEvent ¶
type LoadedEvent struct {
// contains filtered or unexported fields
}
func NewLoadedEvent ¶
func (LoadedEvent) AggregateID ¶
func (e LoadedEvent) AggregateID() string
func (LoadedEvent) EventType ¶
func (e LoadedEvent) EventType() string
func (LoadedEvent) GroupID ¶
func (e LoadedEvent) GroupID() string
func (LoadedEvent) IsSnapshot ¶
func (e LoadedEvent) IsSnapshot() bool
func (LoadedEvent) Timestamp ¶
func (e LoadedEvent) Timestamp() time.Time
func (LoadedEvent) Unmarshal ¶
func (e LoadedEvent) Unmarshal(v interface{}) error
func (LoadedEvent) Version ¶
func (e LoadedEvent) Version() uint64
type LogDebugfFunc ¶
type LogDebugfFunc func(fmt string, args ...interface{})
LogDebugfFunc log debug messages
type Model ¶
type Model interface { Handler }
Model user defined model where events from eventstore will be projected.
type Projection ¶
type Projection struct { LogDebugfFunc LogDebugfFunc // contains filtered or unexported fields }
Projection projects events from eventstore to user model.
func NewProjection ¶
func NewProjection(store EventStore, factoryModel FactoryModelFunc, logDebugfFunc LogDebugfFunc) *Projection
NewProjection projection over eventstore.
func (*Projection) Forget ¶
func (p *Projection) Forget(queries []SnapshotQuery) (err error)
Forget drop projection by query.Version in Query is ignored.
func (*Projection) Handle ¶
func (p *Projection) Handle(ctx context.Context, iter Iter) error
Handle update projection by events.
func (*Projection) HandleWithReload ¶
func (p *Projection) HandleWithReload(ctx context.Context, iter Iter) error
HandleWithReload update projection by events and reload events if it is needed.
func (*Projection) Models ¶
func (p *Projection) Models(queries []SnapshotQuery, onModel func(m Model) (wantNext bool))
Models return models from projection.
func (*Projection) Project ¶
func (p *Projection) Project(ctx context.Context, queries []SnapshotQuery) (err error)
Project update projection from snapshots defined by query. Verson in Query is ignored.
type SaveStatus ¶
type SaveStatus int
const ( Ok SaveStatus = 0 // events were stored ConcurrencyException SaveStatus = 1 // events with this version already exists SnapshotRequired SaveStatus = 2 // event store requires aggregated snapshot before applying new event; snapshot shall not contain this new event Fail SaveStatus = -1 // error occurred )
type SnapshotQuery ¶
type SnapshotQuery struct { GroupID string // filter by group ID AggregateID string // filter to certain aggregateID, groupID is required Types []string // filter to certain event types, optional }
SnapshotQuery used to load events from snapshot.
type VersionQuery ¶
type VersionQuery struct { GroupID string // required AggregateID string // required Version uint64 // required }
VersionQuery used to load events from version.