Documentation ¶
Overview ¶
Package es provides event store structures and abstractions.
Index ¶
- Variables
- type Aggregate
- type AggregateBase
- func (a *AggregateBase) CommittedEvents() []*Event
- func (a *AggregateBase) DecodeEventAs(eventData []byte, eventMsg interface{}) error
- func (a *AggregateBase) ID() string
- func (a *AggregateBase) LatestCommittedEvent() (*Event, bool)
- func (a *AggregateBase) MarkEventsCommitted()
- func (a *AggregateBase) MustLatestCommittedEvent() *Event
- func (a *AggregateBase) NewEvent(id string, msg EventMessage, timestamp time.Time) (*Event, error)
- func (a *AggregateBase) Revision() int64
- func (a *AggregateBase) SetEvent(eventMsg EventMessage) error
- func (a *AggregateBase) SetID(id string)
- func (a *AggregateBase) Timestamp() int64
- func (a *AggregateBase) Type() string
- func (a *AggregateBase) UncommittedEvents() []*Event
- func (a *AggregateBase) Version() int64
- type AggregateBaseSetter
- type Config
- type Event
- func (x *Event) Copy() *Event
- func (*Event) Descriptor() ([]byte, []int)deprecated
- func (x *Event) GetAggregateId() string
- func (x *Event) GetAggregateType() string
- func (x *Event) GetEventData() []byte
- func (x *Event) GetEventId() string
- func (x *Event) GetEventType() string
- func (x *Event) GetRevision() int64
- func (x *Event) GetTimestamp() int64
- func (*Event) ProtoMessage()
- func (x *Event) ProtoReflect() protoreflect.Message
- func (x *Event) Reset()
- func (x *Event) String() string
- func (x *Event) Time() time.Time
- type EventCodec
- type EventHandlingFailed
- func (*EventHandlingFailed) Descriptor() ([]byte, []int)deprecated
- func (x *EventHandlingFailed) GetErr() string
- func (x *EventHandlingFailed) GetErrCode() int32
- func (x *EventHandlingFailed) GetHandlerName() string
- func (*EventHandlingFailed) ProtoMessage()
- func (x *EventHandlingFailed) ProtoReflect() protoreflect.Message
- func (x *EventHandlingFailed) Reset()
- func (x *EventHandlingFailed) String() string
- type EventHandlingFinished
- func (*EventHandlingFinished) Descriptor() ([]byte, []int)deprecated
- func (x *EventHandlingFinished) GetHandlerName() string
- func (*EventHandlingFinished) ProtoMessage()
- func (x *EventHandlingFinished) ProtoReflect() protoreflect.Message
- func (x *EventHandlingFinished) Reset()
- func (x *EventHandlingFinished) String() string
- type EventHandlingStarted
- func (*EventHandlingStarted) Descriptor() ([]byte, []int)deprecated
- func (x *EventHandlingStarted) GetHandlerName() string
- func (*EventHandlingStarted) ProtoMessage()
- func (x *EventHandlingStarted) ProtoReflect() protoreflect.Message
- func (x *EventHandlingStarted) Reset()
- func (x *EventHandlingStarted) String() string
- type EventMessage
- type EventStore
- type EventUnhandled
- type IdGenerator
- type Snapshot
- type SnapshotCodec
- type Storage
- type StorageBase
- type Store
- func (e *Store) Commit(ctx context.Context, agg Aggregate) error
- func (e *Store) LoadEvents(ctx context.Context, agg Aggregate) error
- func (e *Store) LoadEventsWithSnapshot(ctx context.Context, agg Aggregate) error
- func (e *Store) SaveSnapshot(ctx context.Context, agg Aggregate) error
- func (e *Store) StreamEvents(ctx context.Context, req *StreamEventsRequest) (<-chan *Event, error)
- func (e *Store) WithStorage(base StorageBase) *Store
- type StreamEventsRequest
- type TxStorage
- type UUIDGenerator
Constants ¶
This section is empty.
Variables ¶
var File_event_proto protoreflect.FileDescriptor
Functions ¶
This section is empty.
Types ¶
type Aggregate ¶
type Aggregate interface { Apply(e *Event) error SetBase(base *AggregateBase) AggBase() *AggregateBase Reset() }
Aggregate is an interface used for the aggregate models
type AggregateBase ¶
type AggregateBase struct {
// contains filtered or unexported fields
}
AggregateBase is the base of the aggregate which
func (*AggregateBase) CommittedEvents ¶
func (a *AggregateBase) CommittedEvents() []*Event
CommittedEvents gets the committed event messages.
func (*AggregateBase) DecodeEventAs ¶
func (a *AggregateBase) DecodeEventAs(eventData []byte, eventMsg interface{}) error
DecodeEventAs decodes provided in put eventData into the structure of eventMsg. THe eventMsg is expected to be a pointer to the event msg.
func (*AggregateBase) LatestCommittedEvent ¶
func (a *AggregateBase) LatestCommittedEvent() (*Event, bool)
LatestCommittedEvent gets the latest committed event message.
func (*AggregateBase) MarkEventsCommitted ¶
func (a *AggregateBase) MarkEventsCommitted()
MarkEventsCommitted marks the aggregate events as committed. NOTE: Use this function carefully, as the event store wouldn't try to commit events, already marked as committed.
func (*AggregateBase) MustLatestCommittedEvent ¶
func (a *AggregateBase) MustLatestCommittedEvent() *Event
MustLatestCommittedEvent gets the latest committed event message or panics.
func (*AggregateBase) NewEvent ¶ added in v0.0.22
func (a *AggregateBase) NewEvent(id string, msg EventMessage, timestamp time.Time) (*Event, error)
NewEvent creates a new event with given identifier, at given timestamp and with given message. An event is added to uncommitted events of the aggregate base. NOTE: Created event is not applied to given aggregate base.
func (*AggregateBase) Revision ¶
func (a *AggregateBase) Revision() int64
Revision gets aggregate current revision.
func (*AggregateBase) SetEvent ¶
func (a *AggregateBase) SetEvent(eventMsg EventMessage) error
SetEvent sets new event message into given aggregate.
func (*AggregateBase) Timestamp ¶
func (a *AggregateBase) Timestamp() int64
Timestamp gets the aggregate base timestamp.
func (*AggregateBase) UncommittedEvents ¶ added in v0.0.17
func (a *AggregateBase) UncommittedEvents() []*Event
UncommittedEvents gets the slice of uncommitted event messages.
func (*AggregateBase) Version ¶
func (a *AggregateBase) Version() int64
Version gets aggregate version.
type AggregateBaseSetter ¶
type AggregateBaseSetter struct {
// contains filtered or unexported fields
}
AggregateBaseSetter is a structure responsible for setting the aggregate base.
func NewAggregateBaseSetter ¶
func NewAggregateBaseSetter(eventCodec, snapCodec codec.Codec, idGen IdGenerator) *AggregateBaseSetter
NewAggregateBaseSetter creates new aggregate setter.
func (*AggregateBaseSetter) SetAggregateBase ¶
func (a *AggregateBaseSetter) SetAggregateBase(agg Aggregate, aggId, aggType string, version int64)
SetAggregateBase implements AggregateBaseSetter interface.
type Config ¶
type Config struct {
BufferSize int
}
Config is the configuration for the eventsource storage.
func DefaultConfig ¶
func DefaultConfig() *Config
DefaultConfig sets up the default config for the event store.
type Event ¶
type Event struct { EventId string `protobuf:"bytes,1,opt,name=event_id,json=eventId,proto3" json:"event_id,omitempty"` EventType string `protobuf:"bytes,2,opt,name=event_type,json=eventType,proto3" json:"event_type,omitempty"` AggregateType string `protobuf:"bytes,3,opt,name=aggregate_type,json=aggregateType,proto3" json:"aggregate_type,omitempty"` AggregateId string `protobuf:"bytes,4,opt,name=aggregate_id,json=aggregateId,proto3" json:"aggregate_id,omitempty"` EventData []byte `protobuf:"bytes,5,opt,name=event_data,json=eventData,proto3" json:"event_data,omitempty"` Timestamp int64 `protobuf:"varint,6,opt,name=timestamp,proto3" json:"timestamp,omitempty"` Revision int64 `protobuf:"varint,7,opt,name=revision,proto3" json:"revision,omitempty"` // contains filtered or unexported fields }
Event is the event source message model.
func (*Event) Descriptor
deprecated
func (*Event) GetAggregateId ¶
func (*Event) GetAggregateType ¶
func (*Event) GetEventData ¶
func (*Event) GetEventId ¶
func (*Event) GetEventType ¶
func (*Event) GetRevision ¶
func (*Event) GetTimestamp ¶
func (*Event) ProtoMessage ¶
func (*Event) ProtoMessage()
func (*Event) ProtoReflect ¶
func (x *Event) ProtoReflect() protoreflect.Message
type EventCodec ¶
EventCodec is the type wrapper over the codec.Codec used for event encoding in wire injection.
type EventHandlingFailed ¶ added in v0.0.20
type EventHandlingFailed struct { HandlerName string `protobuf:"bytes,1,opt,name=handler_name,json=handlerName,proto3" json:"handler_name,omitempty"` Err string `protobuf:"bytes,2,opt,name=err,proto3" json:"err,omitempty"` ErrCode int32 `protobuf:"varint,3,opt,name=err_code,json=errCode,proto3" json:"err_code,omitempty"` // contains filtered or unexported fields }
EventHandlingFailed is an event message occurred on a failure when handling given event.
func (*EventHandlingFailed) Descriptor
deprecated
added in
v0.0.20
func (*EventHandlingFailed) Descriptor() ([]byte, []int)
Deprecated: Use EventHandlingFailed.ProtoReflect.Descriptor instead.
func (*EventHandlingFailed) GetErr ¶ added in v0.0.20
func (x *EventHandlingFailed) GetErr() string
func (*EventHandlingFailed) GetErrCode ¶ added in v0.0.20
func (x *EventHandlingFailed) GetErrCode() int32
func (*EventHandlingFailed) GetHandlerName ¶ added in v0.0.20
func (x *EventHandlingFailed) GetHandlerName() string
func (*EventHandlingFailed) ProtoMessage ¶ added in v0.0.20
func (*EventHandlingFailed) ProtoMessage()
func (*EventHandlingFailed) ProtoReflect ¶ added in v0.0.20
func (x *EventHandlingFailed) ProtoReflect() protoreflect.Message
func (*EventHandlingFailed) Reset ¶ added in v0.0.20
func (x *EventHandlingFailed) Reset()
func (*EventHandlingFailed) String ¶ added in v0.0.20
func (x *EventHandlingFailed) String() string
type EventHandlingFinished ¶ added in v0.0.20
type EventHandlingFinished struct { HandlerName string `protobuf:"bytes,1,opt,name=handler_name,json=handlerName,proto3" json:"handler_name,omitempty"` // contains filtered or unexported fields }
EventHandlingFinished is an event message occurred when given handler just finished successfully handling an event.
func (*EventHandlingFinished) Descriptor
deprecated
added in
v0.0.20
func (*EventHandlingFinished) Descriptor() ([]byte, []int)
Deprecated: Use EventHandlingFinished.ProtoReflect.Descriptor instead.
func (*EventHandlingFinished) GetHandlerName ¶ added in v0.0.20
func (x *EventHandlingFinished) GetHandlerName() string
func (*EventHandlingFinished) ProtoMessage ¶ added in v0.0.20
func (*EventHandlingFinished) ProtoMessage()
func (*EventHandlingFinished) ProtoReflect ¶ added in v0.0.20
func (x *EventHandlingFinished) ProtoReflect() protoreflect.Message
func (*EventHandlingFinished) Reset ¶ added in v0.0.20
func (x *EventHandlingFinished) Reset()
func (*EventHandlingFinished) String ¶ added in v0.0.20
func (x *EventHandlingFinished) String() string
type EventHandlingStarted ¶ added in v0.0.20
type EventHandlingStarted struct { HandlerName string `protobuf:"bytes,1,opt,name=handler_name,json=handlerName,proto3" json:"handler_name,omitempty"` // contains filtered or unexported fields }
EventHandlingStarted is an event message occurred when given handler just started handling an event.
func (*EventHandlingStarted) Descriptor
deprecated
added in
v0.0.20
func (*EventHandlingStarted) Descriptor() ([]byte, []int)
Deprecated: Use EventHandlingStarted.ProtoReflect.Descriptor instead.
func (*EventHandlingStarted) GetHandlerName ¶ added in v0.0.20
func (x *EventHandlingStarted) GetHandlerName() string
func (*EventHandlingStarted) ProtoMessage ¶ added in v0.0.20
func (*EventHandlingStarted) ProtoMessage()
func (*EventHandlingStarted) ProtoReflect ¶ added in v0.0.20
func (x *EventHandlingStarted) ProtoReflect() protoreflect.Message
func (*EventHandlingStarted) Reset ¶ added in v0.0.20
func (x *EventHandlingStarted) Reset()
func (*EventHandlingStarted) String ¶ added in v0.0.20
func (x *EventHandlingStarted) String() string
type EventMessage ¶
type EventMessage interface {
MessageType() string
}
EventMessage is an interface that defines event messages.
type EventStore ¶
type EventStore interface { // LoadEvents loads all events for given aggregate. LoadEvents(ctx context.Context, aggregate Aggregate) error // LoadEventsWithSnapshot loads the latest snapshot with the events that happened after it. LoadEventsWithSnapshot(ctx context.Context, aggregate Aggregate) error // Commit commits the event changes done in given aggregate. Commit(ctx context.Context, aggregate Aggregate) error // SaveSnapshot saves the snapshot of given aggregate. SaveSnapshot(ctx context.Context, aggregate Aggregate) error // StreamEvents opens stream events that matches given request. StreamEvents(ctx context.Context, req *StreamEventsRequest) (<-chan *Event, error) // SetAggregateBase sets the AggregateBase within an aggregate. SetAggregateBase(agg Aggregate, aggId, aggType string, version int64) }
EventStore is an interface used by the event store to load, commit and create snapshot on aggregates.
type EventUnhandled ¶ added in v0.0.20
type EventUnhandled struct {
// contains filtered or unexported fields
}
EventUnhandled is an event message which states that an event is marked as unhandled.
func (*EventUnhandled) Descriptor
deprecated
added in
v0.0.20
func (*EventUnhandled) Descriptor() ([]byte, []int)
Deprecated: Use EventUnhandled.ProtoReflect.Descriptor instead.
func (*EventUnhandled) ProtoMessage ¶ added in v0.0.20
func (*EventUnhandled) ProtoMessage()
func (*EventUnhandled) ProtoReflect ¶ added in v0.0.20
func (x *EventUnhandled) ProtoReflect() protoreflect.Message
func (*EventUnhandled) Reset ¶ added in v0.0.20
func (x *EventUnhandled) Reset()
func (*EventUnhandled) String ¶ added in v0.0.20
func (x *EventUnhandled) String() string
type IdGenerator ¶
type IdGenerator interface {
GenerateId() string
}
IdGenerator is the interface used by identity generators.
type Snapshot ¶
type Snapshot struct { AggregateId string `json:"aggregate_id,omitempty"` AggregateType string `json:"aggregate_type,omitempty"` AggregateVersion int64 `json:"aggregate_version,omitempty"` Revision int64 `json:"revision,omitempty"` Timestamp int64 `json:"timestamp,omitempty"` SnapshotData []byte `json:"snapshot_data,omitempty"` }
Snapshot is a structure that define basic fields of the aggregate snapshot.
type SnapshotCodec ¶
SnapshotCodec is the type wrapper over the codec.Codec used for wire injection.
type Storage ¶
type Storage interface { BeginTx(ctx context.Context) (TxStorage, error) StorageBase }
Storage is a transaction beginner.
type StorageBase ¶ added in v0.0.18
type StorageBase interface { // SaveEvents all input events atomically in the storage. SaveEvents(ctx context.Context, es []*Event) error // ListEvents lists all events for given aggregate type with given id. ListEvents(ctx context.Context, aggId string, aggType string) ([]*Event, error) // SaveSnapshot stores a snapshot. SaveSnapshot(ctx context.Context, snap *Snapshot) error // GetSnapshot gets the snapshot of the aggregate with it's id, type and version. GetSnapshot(ctx context.Context, aggId string, aggType string, aggVersion int64) (*Snapshot, error) // ListEventsAfterRevision gets the event stream for given aggregate id, type starting after given revision. ListEventsAfterRevision(ctx context.Context, aggId string, aggType string, from int64) ([]*Event, error) // StreamEvents streams the events that matching given request. StreamEvents(ctx context.Context, req *StreamEventsRequest) (<-chan *Event, error) // As allows drivers to expose driver-specific types. As(dst interface{}) error // ErrorCode gets the error code from the storage. ErrorCode(err error) cgerrors.ErrorCode }
StorageBase is the interface used by the event store as a storage for its events and snapshots.
type Store ¶
type Store struct { *AggregateBaseSetter // contains filtered or unexported fields }
Store is the default implementation for the EventStore interface.
func New ¶
func New(cfg *Config, eventCodec EventCodec, snapCodec SnapshotCodec, storage StorageBase) (*Store, error)
New creates new EventStore implementation.
func (*Store) LoadEvents ¶
LoadEvents gets the event stream and applies on provided aggregate.
func (*Store) LoadEventsWithSnapshot ¶
LoadEventsWithSnapshot gets the aggregate stream with the latest possible snapshot.
func (*Store) SaveSnapshot ¶
SaveSnapshot stores the snapshot
func (*Store) StreamEvents ¶
StreamEvents opens an event stream that matches given request.
func (*Store) WithStorage ¶ added in v0.0.20
func (e *Store) WithStorage(base StorageBase) *Store
WithStorage creates a copy of the event store with given storage base.
type StreamEventsRequest ¶
type StreamEventsRequest struct { // AggregateTypes streams the events for selected aggregate types. AggregateTypes []string // AggregateIDs is the filter that streams events for selected aggregate ids. AggregateIDs []string // ExcludeEventTypes is the filter that provides a stream with excluded event types. ExcludeEventTypes []string // EventTypes is the filter that gets only selected event types. EventTypes []string // BuffSize defines the size of the stream channel buffer. BuffSize int }
StreamEventsRequest is a request for the stream events query.
type TxStorage ¶ added in v0.0.18
type TxStorage interface { StorageBase Commit(ctx context.Context) error Rollback(ctx context.Context) error }
TxStorage is the interface that describes a storage base with a started transaction.
type UUIDGenerator ¶
type UUIDGenerator struct{}
UUIDGenerator implements IdGenerator interface. Generates UUID V4 identifier.
func (UUIDGenerator) GenerateId ¶
func (u UUIDGenerator) GenerateId() string
GenerateId generates identified. Implements IdGenerator interface.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
esxsql
module
|
|
esxsql_test
Module
|
|
Package mockes is a generated GoMock package.
|
Package mockes is a generated GoMock package. |