Documentation ¶
Overview ¶
Package eventsource provides event store structures and abstractions.
Package eventsource is a generated GoMock package.
Index ¶
- Variables
- type Aggregate
- type AggregateBase
- func (a *AggregateBase) CommittedEvents() []*Event
- func (a *AggregateBase) DecodeEventAs(eventData []byte, eventMsg interface{}) error
- func (a *AggregateBase) ID() string
- func (a *AggregateBase) LatestCommittedEvent() (*Event, bool)
- func (a *AggregateBase) MarkEventsCommitted()
- func (a *AggregateBase) MustLatestCommittedEvent() *Event
- func (a *AggregateBase) Revision() int64
- func (a *AggregateBase) SetEvent(eventMsg EventMessage) error
- func (a *AggregateBase) SetID(id string)
- func (a *AggregateBase) Timestamp() int64
- func (a *AggregateBase) Type() string
- func (a *AggregateBase) Version() int64
- type AggregateBaseSetter
- type AggregateFactory
- type Config
- type Cursor
- type CursorAggregate
- type Event
- func (x *Event) Copy() *Event
- func (*Event) Descriptor() ([]byte, []int)deprecated
- func (x *Event) GetAggregateId() string
- func (x *Event) GetAggregateType() string
- func (x *Event) GetEventData() []byte
- func (x *Event) GetEventId() string
- func (x *Event) GetEventType() string
- func (x *Event) GetRevision() int64
- func (x *Event) GetTimestamp() int64
- func (*Event) ProtoMessage()
- func (x *Event) ProtoReflect() protoreflect.Message
- func (x *Event) Reset()
- func (x *Event) String() string
- func (x *Event) Time() time.Time
- type EventCodec
- type EventHandler
- type EventMessage
- type EventStore
- type IdGenerator
- type MockEventStore
- func (m *MockEventStore) Commit(arg0 context.Context, arg1 Aggregate) error
- func (m *MockEventStore) EXPECT() *MockEventStoreMockRecorder
- func (m *MockEventStore) LoadEvents(arg0 context.Context, arg1 Aggregate) error
- func (m *MockEventStore) LoadEventsWithSnapshot(arg0 context.Context, arg1 Aggregate) error
- func (m *MockEventStore) SaveSnapshot(arg0 context.Context, arg1 Aggregate) error
- func (m *MockEventStore) SetAggregateBase(arg0 Aggregate, arg1, arg2 string, arg3 int64)
- func (m *MockEventStore) StreamAggregates(arg0 context.Context, arg1 string, arg2 int64, arg3 AggregateFactory) (<-chan Aggregate, error)
- func (m *MockEventStore) StreamEvents(arg0 context.Context, arg1 *StreamEventsRequest) (<-chan *Event, error)
- func (m *MockEventStore) StreamProjections(arg0 context.Context, arg1 string, arg2 int64, arg3 ProjectionFactory) (<-chan Projection, error)
- type MockEventStoreMockRecorder
- func (mr *MockEventStoreMockRecorder) Commit(arg0, arg1 interface{}) *gomock.Call
- func (mr *MockEventStoreMockRecorder) LoadEvents(arg0, arg1 interface{}) *gomock.Call
- func (mr *MockEventStoreMockRecorder) LoadEventsWithSnapshot(arg0, arg1 interface{}) *gomock.Call
- func (mr *MockEventStoreMockRecorder) SaveSnapshot(arg0, arg1 interface{}) *gomock.Call
- func (mr *MockEventStoreMockRecorder) SetAggregateBase(arg0, arg1, arg2, arg3 interface{}) *gomock.Call
- func (mr *MockEventStoreMockRecorder) StreamAggregates(arg0, arg1, arg2, arg3 interface{}) *gomock.Call
- func (mr *MockEventStoreMockRecorder) StreamEvents(arg0, arg1 interface{}) *gomock.Call
- func (mr *MockEventStoreMockRecorder) StreamProjections(arg0, arg1, arg2, arg3 interface{}) *gomock.Call
- type Projection
- type ProjectionFactory
- type Snapshot
- type SnapshotCodec
- type Storage
- type Store
- func (e *Store) Commit(ctx context.Context, agg Aggregate) error
- func (e *Store) CopyWithStorage(storage Storage) *Store
- func (e *Store) LoadEvents(ctx context.Context, agg Aggregate) error
- func (e *Store) LoadEventsWithSnapshot(ctx context.Context, agg Aggregate) error
- func (e *Store) SaveSnapshot(ctx context.Context, agg Aggregate) error
- func (e *Store) StreamAggregates(ctx context.Context, aggType string, aggVersion int64, ...) (<-chan Aggregate, error)
- func (e *Store) StreamEvents(ctx context.Context, req *StreamEventsRequest) (<-chan *Event, error)
- func (e *Store) StreamProjections(ctx context.Context, aggType string, aggVersion int64, ...) (<-chan Projection, error)
- type StreamEventsRequest
- type UUIDGenerator
Constants ¶
This section is empty.
Variables ¶
var File_event_proto protoreflect.FileDescriptor
Functions ¶
This section is empty.
Types ¶
type Aggregate ¶
type Aggregate interface { Apply(e *Event) error SetBase(base *AggregateBase) AggBase() *AggregateBase Reset() }
Aggregate is an interface used for the aggregate models
type AggregateBase ¶
type AggregateBase struct {
// contains filtered or unexported fields
}
AggregateBase is the base of the aggregate which
func (*AggregateBase) CommittedEvents ¶
func (a *AggregateBase) CommittedEvents() []*Event
CommittedEvents gets the committed event messages.
func (*AggregateBase) DecodeEventAs ¶
func (a *AggregateBase) DecodeEventAs(eventData []byte, eventMsg interface{}) error
DecodeEventAs decodes provided in put eventData into the structure of eventMsg. THe eventMsg is expected to be a pointer to the event msg.
func (*AggregateBase) LatestCommittedEvent ¶
func (a *AggregateBase) LatestCommittedEvent() (*Event, bool)
LatestCommittedEvent gets the latest committed event message.
func (*AggregateBase) MarkEventsCommitted ¶
func (a *AggregateBase) MarkEventsCommitted()
MarkEventsCommitted marks the aggregate events as committed. NOTE: Use this function carefully, as the event store wouldn't try to commit events, already marked as committed.
func (*AggregateBase) MustLatestCommittedEvent ¶
func (a *AggregateBase) MustLatestCommittedEvent() *Event
MustLatestCommittedEvent gets the latest committed event message or panics.
func (*AggregateBase) Revision ¶
func (a *AggregateBase) Revision() int64
Revision gets aggregate current revision.
func (*AggregateBase) SetEvent ¶
func (a *AggregateBase) SetEvent(eventMsg EventMessage) error
SetEvent sets new event message into given aggregate.
func (*AggregateBase) Timestamp ¶
func (a *AggregateBase) Timestamp() int64
Timestamp gets the aggregate base timestamp.
func (*AggregateBase) Version ¶
func (a *AggregateBase) Version() int64
Version gets aggregate version.
type AggregateBaseSetter ¶
type AggregateBaseSetter struct {
// contains filtered or unexported fields
}
AggregateBaseSetter is a structure responsible for setting the aggregate base.
func NewAggregateBaseSetter ¶
func NewAggregateBaseSetter(eventCodec, snapCodec codec.Codec, idGen IdGenerator) *AggregateBaseSetter
NewAggregateBaseSetter creates new aggregate setter.
func (*AggregateBaseSetter) SetAggregateBase ¶
func (a *AggregateBaseSetter) SetAggregateBase(agg Aggregate, aggId, aggType string, version int64)
SetAggregateBase implements AggregateBaseSetter interface.
type AggregateFactory ¶
AggregateFactory is a factory interface used to create new Aggregate models.
type Config ¶
type Config struct {
BufferSize int
}
Config is the configuration for the eventsource storage.
func DefaultConfig ¶
func DefaultConfig() *Config
DefaultConfig sets up the default config for the event store.
type Cursor ¶
type Cursor interface { // GetAggregateStream gets the stream of aggregates. GetAggregateStream(withSnapshot bool) (<-chan *CursorAggregate, error) }
Cursor is an interface used by the storages that enables listing the aggregates.
type CursorAggregate ¶
CursorAggregate is an aggregate events and snapshot taken by the cursor.
type Event ¶
type Event struct { EventId string `protobuf:"bytes,1,opt,name=event_id,json=eventId,proto3" json:"event_id,omitempty"` EventType string `protobuf:"bytes,2,opt,name=event_type,json=eventType,proto3" json:"event_type,omitempty"` AggregateType string `protobuf:"bytes,3,opt,name=aggregate_type,json=aggregateType,proto3" json:"aggregate_type,omitempty"` AggregateId string `protobuf:"bytes,4,opt,name=aggregate_id,json=aggregateId,proto3" json:"aggregate_id,omitempty"` EventData []byte `protobuf:"bytes,5,opt,name=event_data,json=eventData,proto3" json:"event_data,omitempty"` Timestamp int64 `protobuf:"varint,6,opt,name=timestamp,proto3" json:"timestamp,omitempty"` Revision int64 `protobuf:"varint,7,opt,name=revision,proto3" json:"revision,omitempty"` // contains filtered or unexported fields }
Event is the event source message model.
func (*Event) Descriptor
deprecated
func (*Event) GetAggregateId ¶
func (*Event) GetAggregateType ¶
func (*Event) GetEventData ¶
func (*Event) GetEventId ¶
func (*Event) GetEventType ¶
func (*Event) GetRevision ¶
func (*Event) GetTimestamp ¶
func (*Event) ProtoMessage ¶
func (*Event) ProtoMessage()
func (*Event) ProtoReflect ¶
func (x *Event) ProtoReflect() protoreflect.Message
type EventCodec ¶
EventCodec is the type wrapper over the codec.Codec used for event encoding in wire injection.
type EventHandler ¶
EventHandler is an interface used for handling events.
type EventMessage ¶
type EventMessage interface {
MessageType() string
}
EventMessage is an interface that defines event messages.
type EventStore ¶
type EventStore interface { // LoadEvents loads all events for given aggregate. LoadEvents(ctx context.Context, aggregate Aggregate) error // LoadEventsWithSnapshot loads the latest snapshot with the events that happened after it. LoadEventsWithSnapshot(ctx context.Context, aggregate Aggregate) error // Commit commits the event changes done in given aggregate. Commit(ctx context.Context, aggregate Aggregate) error // SaveSnapshot saves the snapshot of given aggregate. SaveSnapshot(ctx context.Context, aggregate Aggregate) error // StreamEvents opens stream events that matches given request. StreamEvents(ctx context.Context, req *StreamEventsRequest) (<-chan *Event, error) // StreamAggregates opens aggregate stream for given type and version. StreamAggregates(ctx context.Context, aggType string, aggVersion int64, factory AggregateFactory) (<-chan Aggregate, error) // StreamProjections streams the projections based on given aggregate type and version. StreamProjections(ctx context.Context, aggType string, aggVersion int64, factory ProjectionFactory) (<-chan Projection, error) // SetAggregateBase sets the AggregateBase within an aggregate. SetAggregateBase(agg Aggregate, aggId, aggType string, version int64) }
EventStore is an interface used by the event store to load, commit and create snapshot on aggregates.
type IdGenerator ¶
type IdGenerator interface {
GenerateId() string
}
IdGenerator is the interface used by identity generators.
type MockEventStore ¶
type MockEventStore struct {
// contains filtered or unexported fields
}
MockEventStore is a mock of EventStore interface.
func NewMockEventStore ¶
func NewMockEventStore(ctrl *gomock.Controller) *MockEventStore
NewMockEventStore creates a new mock instance.
func (*MockEventStore) Commit ¶
func (m *MockEventStore) Commit(arg0 context.Context, arg1 Aggregate) error
Commit mocks base method.
func (*MockEventStore) EXPECT ¶
func (m *MockEventStore) EXPECT() *MockEventStoreMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockEventStore) LoadEvents ¶
func (m *MockEventStore) LoadEvents(arg0 context.Context, arg1 Aggregate) error
LoadEvents mocks base method.
func (*MockEventStore) LoadEventsWithSnapshot ¶
func (m *MockEventStore) LoadEventsWithSnapshot(arg0 context.Context, arg1 Aggregate) error
LoadEventsWithSnapshot mocks base method.
func (*MockEventStore) SaveSnapshot ¶
func (m *MockEventStore) SaveSnapshot(arg0 context.Context, arg1 Aggregate) error
SaveSnapshot mocks base method.
func (*MockEventStore) SetAggregateBase ¶
func (m *MockEventStore) SetAggregateBase(arg0 Aggregate, arg1, arg2 string, arg3 int64)
SetAggregateBase mocks base method.
func (*MockEventStore) StreamAggregates ¶
func (m *MockEventStore) StreamAggregates(arg0 context.Context, arg1 string, arg2 int64, arg3 AggregateFactory) (<-chan Aggregate, error)
StreamAggregates mocks base method.
func (*MockEventStore) StreamEvents ¶
func (m *MockEventStore) StreamEvents(arg0 context.Context, arg1 *StreamEventsRequest) (<-chan *Event, error)
StreamEvents mocks base method.
func (*MockEventStore) StreamProjections ¶
func (m *MockEventStore) StreamProjections(arg0 context.Context, arg1 string, arg2 int64, arg3 ProjectionFactory) (<-chan Projection, error)
StreamProjections mocks base method.
type MockEventStoreMockRecorder ¶
type MockEventStoreMockRecorder struct {
// contains filtered or unexported fields
}
MockEventStoreMockRecorder is the mock recorder for MockEventStore.
func (*MockEventStoreMockRecorder) Commit ¶
func (mr *MockEventStoreMockRecorder) Commit(arg0, arg1 interface{}) *gomock.Call
Commit indicates an expected call of Commit.
func (*MockEventStoreMockRecorder) LoadEvents ¶
func (mr *MockEventStoreMockRecorder) LoadEvents(arg0, arg1 interface{}) *gomock.Call
LoadEvents indicates an expected call of LoadEvents.
func (*MockEventStoreMockRecorder) LoadEventsWithSnapshot ¶
func (mr *MockEventStoreMockRecorder) LoadEventsWithSnapshot(arg0, arg1 interface{}) *gomock.Call
LoadEventsWithSnapshot indicates an expected call of LoadEventsWithSnapshot.
func (*MockEventStoreMockRecorder) SaveSnapshot ¶
func (mr *MockEventStoreMockRecorder) SaveSnapshot(arg0, arg1 interface{}) *gomock.Call
SaveSnapshot indicates an expected call of SaveSnapshot.
func (*MockEventStoreMockRecorder) SetAggregateBase ¶
func (mr *MockEventStoreMockRecorder) SetAggregateBase(arg0, arg1, arg2, arg3 interface{}) *gomock.Call
SetAggregateBase indicates an expected call of SetAggregateBase.
func (*MockEventStoreMockRecorder) StreamAggregates ¶
func (mr *MockEventStoreMockRecorder) StreamAggregates(arg0, arg1, arg2, arg3 interface{}) *gomock.Call
StreamAggregates indicates an expected call of StreamAggregates.
func (*MockEventStoreMockRecorder) StreamEvents ¶
func (mr *MockEventStoreMockRecorder) StreamEvents(arg0, arg1 interface{}) *gomock.Call
StreamEvents indicates an expected call of StreamEvents.
func (*MockEventStoreMockRecorder) StreamProjections ¶
func (mr *MockEventStoreMockRecorder) StreamProjections(arg0, arg1, arg2, arg3 interface{}) *gomock.Call
StreamProjections indicates an expected call of StreamProjections.
type Projection ¶
Projection is an interface used to represent the query projeciton.
type ProjectionFactory ¶
type ProjectionFactory interface {
NewProjection(id string) Projection
}
ProjectionFactory is an interface used to create new projections.
type Snapshot ¶
type Snapshot struct { AggregateId string `json:"aggregate_id,omitempty"` AggregateType string `json:"aggregate_type,omitempty"` AggregateVersion int64 `json:"aggregate_version,omitempty"` Revision int64 `json:"revision,omitempty"` Timestamp int64 `json:"timestamp,omitempty"` SnapshotData []byte `json:"snapshot_data,omitempty"` }
Snapshot is a structure that define basic fields of the aggregate snapshot.
type SnapshotCodec ¶
SnapshotCodec is the type wrapper over the codec.Codec used for wire injection.
type Storage ¶
type Storage interface { // SaveEvents all input events atomically in the storage. SaveEvents(ctx context.Context, es []*Event) error // ListEvents lists all events for given aggregate type with given id. ListEvents(ctx context.Context, aggId string, aggType string) ([]*Event, error) // SaveSnapshot stores a snapshot. SaveSnapshot(ctx context.Context, snap *Snapshot) error // GetSnapshot gets the snapshot of the aggregate with it's id, type and version. GetSnapshot(ctx context.Context, aggId string, aggType string, aggVersion int64) (*Snapshot, error) // ListEventsAfterRevision gets the event stream for given aggregate id, type starting after given revision. ListEventsAfterRevision(ctx context.Context, aggId string, aggType string, from int64) ([]*Event, error) // NewCursor creates a new cursor of given aggregate type and version. NewCursor(ctx context.Context, aggType string, aggVersion int64) (Cursor, error) // StreamEvents streams the events that matching given request. StreamEvents(ctx context.Context, req *StreamEventsRequest) (<-chan *Event, error) // As allows drivers to expose driver-specific types. As(dst interface{}) error // ErrorCode gets the error code from the storage. ErrorCode(err error) cgerrors.ErrorCode }
Storage is the interface used by the event store as a storage for its events and snapshots.
type Store ¶
type Store struct { *AggregateBaseSetter // contains filtered or unexported fields }
Store is the default implementation for the EventStore interface.
func New ¶
func New(cfg *Config, eventCodec EventCodec, snapCodec SnapshotCodec, storage Storage) (*Store, error)
New creates new EventStore implementation.
func (*Store) CopyWithStorage ¶
CopyWithStorage creates a copy of the Store structure that has a different storage. This function could be used to create transaction implementations.
func (*Store) LoadEvents ¶
LoadEvents gets the event stream and applies on provided aggregate.
func (*Store) LoadEventsWithSnapshot ¶
LoadEventsWithSnapshot gets the aggregate stream with the latest possible snapshot.
func (*Store) SaveSnapshot ¶
SaveSnapshot stores the snapshot
func (*Store) StreamAggregates ¶
func (e *Store) StreamAggregates(ctx context.Context, aggType string, aggVersion int64, factory AggregateFactory) (<-chan Aggregate, error)
StreamAggregates opens up the aggregate streaming channel. The channel would got closed when there is no more aggregate to read or when the context is done. Closing resulting channel would result with a panic.
func (*Store) StreamEvents ¶
StreamEvents opens an event stream that matches given request.
func (*Store) StreamProjections ¶
func (e *Store) StreamProjections(ctx context.Context, aggType string, aggVersion int64, factory ProjectionFactory) (<-chan Projection, error)
StreamProjections streams the projection of given aggregate.
type StreamEventsRequest ¶
type StreamEventsRequest struct { AggregateTypes []string AggregateIDs []string ExcludeEventTypes []string EventTypes []string BuffSize int }
StreamEventsRequest is a request for the stream events query.
type UUIDGenerator ¶
type UUIDGenerator struct{}
UUIDGenerator implements IdGenerator interface. Generates UUID V4 identifier.
func (UUIDGenerator) GenerateId ¶
func (u UUIDGenerator) GenerateId() string
GenerateId generates identified. Implements IdGenerator interface.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
esxsql
module
|
|
esxsql_test
Module
|
|
Package mockes is a generated GoMock package.
|
Package mockes is a generated GoMock package. |