es

package
v0.0.16 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 9, 2021 License: Apache-2.0 Imports: 13 Imported by: 1

Documentation

Overview

Package eventsource provides event store structures and abstractions.

Package eventsource is a generated GoMock package.

Index

Constants

This section is empty.

Variables

View Source
var File_event_proto protoreflect.FileDescriptor

Functions

This section is empty.

Types

type Aggregate

type Aggregate interface {
	Apply(e *Event) error
	SetBase(base *AggregateBase)
	AggBase() *AggregateBase
	Reset()
}

Aggregate is an interface used for the aggregate models

type AggregateBase

type AggregateBase struct {
	// contains filtered or unexported fields
}

AggregateBase is the base of the aggregate which

func (*AggregateBase) CommittedEvents

func (a *AggregateBase) CommittedEvents() []*Event

CommittedEvents gets the committed event messages.

func (*AggregateBase) DecodeEventAs

func (a *AggregateBase) DecodeEventAs(eventData []byte, eventMsg interface{}) error

DecodeEventAs decodes provided in put eventData into the structure of eventMsg. THe eventMsg is expected to be a pointer to the event msg.

func (*AggregateBase) ID

func (a *AggregateBase) ID() string

ID gets the aggregate identifier.

func (*AggregateBase) LatestCommittedEvent

func (a *AggregateBase) LatestCommittedEvent() (*Event, bool)

LatestCommittedEvent gets the latest committed event message.

func (*AggregateBase) MarkEventsCommitted

func (a *AggregateBase) MarkEventsCommitted()

MarkEventsCommitted marks the aggregate events as committed. NOTE: Use this function carefully, as the event store wouldn't try to commit events, already marked as committed.

func (*AggregateBase) MustLatestCommittedEvent

func (a *AggregateBase) MustLatestCommittedEvent() *Event

MustLatestCommittedEvent gets the latest committed event message or panics.

func (*AggregateBase) Revision

func (a *AggregateBase) Revision() int64

Revision gets aggregate current revision.

func (*AggregateBase) SetEvent

func (a *AggregateBase) SetEvent(eventMsg EventMessage) error

SetEvent sets new event message into given aggregate.

func (*AggregateBase) SetID

func (a *AggregateBase) SetID(id string)

SetID sets aggregate id.

func (*AggregateBase) Timestamp

func (a *AggregateBase) Timestamp() int64

Timestamp gets the aggregate base timestamp.

func (*AggregateBase) Type

func (a *AggregateBase) Type() string

Type gets the aggregate type.

func (*AggregateBase) Version

func (a *AggregateBase) Version() int64

Version gets aggregate version.

type AggregateBaseSetter

type AggregateBaseSetter struct {
	// contains filtered or unexported fields
}

AggregateBaseSetter is a structure responsible for setting the aggregate base.

func NewAggregateBaseSetter

func NewAggregateBaseSetter(eventCodec, snapCodec codec.Codec, idGen IdGenerator) *AggregateBaseSetter

NewAggregateBaseSetter creates new aggregate setter.

func (*AggregateBaseSetter) SetAggregateBase

func (a *AggregateBaseSetter) SetAggregateBase(agg Aggregate, aggId, aggType string, version int64)

SetAggregateBase implements AggregateBaseSetter interface.

type AggregateFactory

type AggregateFactory interface {
	New(aggType string, aggVersion int64) Aggregate
}

AggregateFactory is a factory interface used to create new Aggregate models.

type Config

type Config struct {
	BufferSize int
}

Config is the configuration for the eventsource storage.

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig sets up the default config for the event store.

func (*Config) Validate

func (c *Config) Validate() error

type Cursor

type Cursor interface {
	// GetAggregateStream gets the stream of aggregates.
	GetAggregateStream(withSnapshot bool) (<-chan *CursorAggregate, error)
}

Cursor is an interface used by the storages that enables listing the aggregates.

type CursorAggregate

type CursorAggregate struct {
	AggregateID string
	Snapshot    *Snapshot
	Events      []*Event
}

CursorAggregate is an aggregate events and snapshot taken by the cursor.

type Event

type Event struct {
	EventId       string `protobuf:"bytes,1,opt,name=event_id,json=eventId,proto3" json:"event_id,omitempty"`
	EventType     string `protobuf:"bytes,2,opt,name=event_type,json=eventType,proto3" json:"event_type,omitempty"`
	AggregateType string `protobuf:"bytes,3,opt,name=aggregate_type,json=aggregateType,proto3" json:"aggregate_type,omitempty"`
	AggregateId   string `protobuf:"bytes,4,opt,name=aggregate_id,json=aggregateId,proto3" json:"aggregate_id,omitempty"`
	EventData     []byte `protobuf:"bytes,5,opt,name=event_data,json=eventData,proto3" json:"event_data,omitempty"`
	Timestamp     int64  `protobuf:"varint,6,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
	Revision      int64  `protobuf:"varint,7,opt,name=revision,proto3" json:"revision,omitempty"`
	// contains filtered or unexported fields
}

Event is the event source message model.

func (*Event) Copy

func (x *Event) Copy() *Event

Copy creates a copy of given event.

func (*Event) Descriptor deprecated

func (*Event) Descriptor() ([]byte, []int)

Deprecated: Use Event.ProtoReflect.Descriptor instead.

func (*Event) GetAggregateId

func (x *Event) GetAggregateId() string

func (*Event) GetAggregateType

func (x *Event) GetAggregateType() string

func (*Event) GetEventData

func (x *Event) GetEventData() []byte

func (*Event) GetEventId

func (x *Event) GetEventId() string

func (*Event) GetEventType

func (x *Event) GetEventType() string

func (*Event) GetRevision

func (x *Event) GetRevision() int64

func (*Event) GetTimestamp

func (x *Event) GetTimestamp() int64

func (*Event) ProtoMessage

func (*Event) ProtoMessage()

func (*Event) ProtoReflect

func (x *Event) ProtoReflect() protoreflect.Message

func (*Event) Reset

func (x *Event) Reset()

func (*Event) String

func (x *Event) String() string

func (*Event) Time

func (x *Event) Time() time.Time

type EventCodec

type EventCodec codec.Codec

EventCodec is the type wrapper over the codec.Codec used for event encoding in wire injection.

type EventHandler

type EventHandler interface {
	Handle(ctx context.Context, e *Event)
}

EventHandler is an interface used for handling events.

type EventMessage

type EventMessage interface {
	MessageType() string
}

EventMessage is an interface that defines event messages.

type EventStore

type EventStore interface {
	// LoadEvents loads all events for given aggregate.
	LoadEvents(ctx context.Context, aggregate Aggregate) error

	// LoadEventsWithSnapshot loads the latest snapshot with the events that happened after it.
	LoadEventsWithSnapshot(ctx context.Context, aggregate Aggregate) error

	// Commit commits the event changes done in given aggregate.
	Commit(ctx context.Context, aggregate Aggregate) error

	// SaveSnapshot saves the snapshot of given aggregate.
	SaveSnapshot(ctx context.Context, aggregate Aggregate) error

	// StreamEvents opens stream events that matches given request.
	StreamEvents(ctx context.Context, req *StreamEventsRequest) (<-chan *Event, error)

	// StreamAggregates opens aggregate stream for given type and version.
	StreamAggregates(ctx context.Context, aggType string, aggVersion int64, factory AggregateFactory) (<-chan Aggregate, error)

	// StreamProjections streams the projections based on given aggregate type and version.
	StreamProjections(ctx context.Context, aggType string, aggVersion int64, factory ProjectionFactory) (<-chan Projection, error)

	// SetAggregateBase sets the AggregateBase within an aggregate.
	SetAggregateBase(agg Aggregate, aggId, aggType string, version int64)
}

EventStore is an interface used by the event store to load, commit and create snapshot on aggregates.

type IdGenerator

type IdGenerator interface {
	GenerateId() string
}

IdGenerator is the interface used by identity generators.

type MockEventStore

type MockEventStore struct {
	// contains filtered or unexported fields
}

MockEventStore is a mock of EventStore interface.

func NewMockEventStore

func NewMockEventStore(ctrl *gomock.Controller) *MockEventStore

NewMockEventStore creates a new mock instance.

func (*MockEventStore) Commit

func (m *MockEventStore) Commit(arg0 context.Context, arg1 Aggregate) error

Commit mocks base method.

func (*MockEventStore) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockEventStore) LoadEvents

func (m *MockEventStore) LoadEvents(arg0 context.Context, arg1 Aggregate) error

LoadEvents mocks base method.

func (*MockEventStore) LoadEventsWithSnapshot

func (m *MockEventStore) LoadEventsWithSnapshot(arg0 context.Context, arg1 Aggregate) error

LoadEventsWithSnapshot mocks base method.

func (*MockEventStore) SaveSnapshot

func (m *MockEventStore) SaveSnapshot(arg0 context.Context, arg1 Aggregate) error

SaveSnapshot mocks base method.

func (*MockEventStore) SetAggregateBase

func (m *MockEventStore) SetAggregateBase(arg0 Aggregate, arg1, arg2 string, arg3 int64)

SetAggregateBase mocks base method.

func (*MockEventStore) StreamAggregates

func (m *MockEventStore) StreamAggregates(arg0 context.Context, arg1 string, arg2 int64, arg3 AggregateFactory) (<-chan Aggregate, error)

StreamAggregates mocks base method.

func (*MockEventStore) StreamEvents

func (m *MockEventStore) StreamEvents(arg0 context.Context, arg1 *StreamEventsRequest) (<-chan *Event, error)

StreamEvents mocks base method.

func (*MockEventStore) StreamProjections

func (m *MockEventStore) StreamProjections(arg0 context.Context, arg1 string, arg2 int64, arg3 ProjectionFactory) (<-chan Projection, error)

StreamProjections mocks base method.

type MockEventStoreMockRecorder

type MockEventStoreMockRecorder struct {
	// contains filtered or unexported fields
}

MockEventStoreMockRecorder is the mock recorder for MockEventStore.

func (*MockEventStoreMockRecorder) Commit

func (mr *MockEventStoreMockRecorder) Commit(arg0, arg1 interface{}) *gomock.Call

Commit indicates an expected call of Commit.

func (*MockEventStoreMockRecorder) LoadEvents

func (mr *MockEventStoreMockRecorder) LoadEvents(arg0, arg1 interface{}) *gomock.Call

LoadEvents indicates an expected call of LoadEvents.

func (*MockEventStoreMockRecorder) LoadEventsWithSnapshot

func (mr *MockEventStoreMockRecorder) LoadEventsWithSnapshot(arg0, arg1 interface{}) *gomock.Call

LoadEventsWithSnapshot indicates an expected call of LoadEventsWithSnapshot.

func (*MockEventStoreMockRecorder) SaveSnapshot

func (mr *MockEventStoreMockRecorder) SaveSnapshot(arg0, arg1 interface{}) *gomock.Call

SaveSnapshot indicates an expected call of SaveSnapshot.

func (*MockEventStoreMockRecorder) SetAggregateBase

func (mr *MockEventStoreMockRecorder) SetAggregateBase(arg0, arg1, arg2, arg3 interface{}) *gomock.Call

SetAggregateBase indicates an expected call of SetAggregateBase.

func (*MockEventStoreMockRecorder) StreamAggregates

func (mr *MockEventStoreMockRecorder) StreamAggregates(arg0, arg1, arg2, arg3 interface{}) *gomock.Call

StreamAggregates indicates an expected call of StreamAggregates.

func (*MockEventStoreMockRecorder) StreamEvents

func (mr *MockEventStoreMockRecorder) StreamEvents(arg0, arg1 interface{}) *gomock.Call

StreamEvents indicates an expected call of StreamEvents.

func (*MockEventStoreMockRecorder) StreamProjections

func (mr *MockEventStoreMockRecorder) StreamProjections(arg0, arg1, arg2, arg3 interface{}) *gomock.Call

StreamProjections indicates an expected call of StreamProjections.

type Projection

type Projection interface {
	Apply(c codec.Codec, e *Event) error
}

Projection is an interface used to represent the query projeciton.

type ProjectionFactory

type ProjectionFactory interface {
	NewProjection(id string) Projection
}

ProjectionFactory is an interface used to create new projections.

type Snapshot

type Snapshot struct {
	AggregateId      string `json:"aggregate_id,omitempty"`
	AggregateType    string `json:"aggregate_type,omitempty"`
	AggregateVersion int64  `json:"aggregate_version,omitempty"`
	Revision         int64  `json:"revision,omitempty"`
	Timestamp        int64  `json:"timestamp,omitempty"`
	SnapshotData     []byte `json:"snapshot_data,omitempty"`
}

Snapshot is a structure that define basic fields of the aggregate snapshot.

type SnapshotCodec

type SnapshotCodec codec.Codec

SnapshotCodec is the type wrapper over the codec.Codec used for wire injection.

type Storage

type Storage interface {
	// SaveEvents all input events atomically in the storage.
	SaveEvents(ctx context.Context, es []*Event) error

	// ListEvents lists all events for given aggregate type with given id.
	ListEvents(ctx context.Context, aggId string, aggType string) ([]*Event, error)

	// SaveSnapshot stores a snapshot.
	SaveSnapshot(ctx context.Context, snap *Snapshot) error

	// GetSnapshot gets the snapshot of the aggregate with it's id, type and version.
	GetSnapshot(ctx context.Context, aggId string, aggType string, aggVersion int64) (*Snapshot, error)

	// ListEventsAfterRevision gets the event stream for given aggregate id, type starting after given revision.
	ListEventsAfterRevision(ctx context.Context, aggId string, aggType string, from int64) ([]*Event, error)

	// NewCursor creates a new cursor of given aggregate type and version.
	NewCursor(ctx context.Context, aggType string, aggVersion int64) (Cursor, error)

	// StreamEvents streams the events that matching given request.
	StreamEvents(ctx context.Context, req *StreamEventsRequest) (<-chan *Event, error)

	// As allows drivers to expose driver-specific types.
	As(dst interface{}) error

	// ErrorCode gets the error code from the storage.
	ErrorCode(err error) cgerrors.ErrorCode
}

Storage is the interface used by the event store as a storage for its events and snapshots.

type Store

type Store struct {
	*AggregateBaseSetter
	// contains filtered or unexported fields
}

Store is the default implementation for the EventStore interface.

func New

func New(cfg *Config, eventCodec EventCodec, snapCodec SnapshotCodec, storage Storage) (*Store, error)

New creates new EventStore implementation.

func (*Store) Commit

func (e *Store) Commit(ctx context.Context, agg Aggregate) error

Commit commits provided aggregate events.

func (*Store) CopyWithStorage

func (e *Store) CopyWithStorage(storage Storage) *Store

CopyWithStorage creates a copy of the Store structure that has a different storage. This function could be used to create transaction implementations.

func (*Store) LoadEvents

func (e *Store) LoadEvents(ctx context.Context, agg Aggregate) error

LoadEvents gets the event stream and applies on provided aggregate.

func (*Store) LoadEventsWithSnapshot

func (e *Store) LoadEventsWithSnapshot(ctx context.Context, agg Aggregate) error

LoadEventsWithSnapshot gets the aggregate stream with the latest possible snapshot.

func (*Store) SaveSnapshot

func (e *Store) SaveSnapshot(ctx context.Context, agg Aggregate) error

SaveSnapshot stores the snapshot

func (*Store) StreamAggregates

func (e *Store) StreamAggregates(ctx context.Context, aggType string, aggVersion int64, factory AggregateFactory) (<-chan Aggregate, error)

StreamAggregates opens up the aggregate streaming channel. The channel would got closed when there is no more aggregate to read or when the context is done. Closing resulting channel would result with a panic.

func (*Store) StreamEvents

func (e *Store) StreamEvents(ctx context.Context, req *StreamEventsRequest) (<-chan *Event, error)

StreamEvents opens an event stream that matches given request.

func (*Store) StreamProjections

func (e *Store) StreamProjections(ctx context.Context, aggType string, aggVersion int64, factory ProjectionFactory) (<-chan Projection, error)

StreamProjections streams the projection of given aggregate.

type StreamEventsRequest

type StreamEventsRequest struct {
	AggregateTypes    []string
	AggregateIDs      []string
	ExcludeEventTypes []string
	EventTypes        []string
	BuffSize          int
}

StreamEventsRequest is a request for the stream events query.

type UUIDGenerator

type UUIDGenerator struct{}

UUIDGenerator implements IdGenerator interface. Generates UUID V4 identifier.

func (UUIDGenerator) GenerateId

func (u UUIDGenerator) GenerateId() string

GenerateId generates identified. Implements IdGenerator interface.

Directories

Path Synopsis
esxsql module
esxsql_test Module
Package mockes is a generated GoMock package.
Package mockes is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL