eventsource

package
v2.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 16, 2021 License: MIT Imports: 9 Imported by: 3

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrDeleted is returned by Aggregate.On() method to signal that the object has been deleted
	ErrDeleted = errors.New("not found (was deleted)")
	// ErrNoHistory is returned by Repository.Load() when no history exist for the given aggregate ID
	ErrNoHistory = errors.New("no history found")
	// ErrNotificationFailed is returned by Commit() if notification service fails
	ErrNotificationFailed = errors.New("Failed to send notification")
)

Functions

func GetType

func GetType(e Event) reflect.Type

GetType the type of the given input value, or if input is a pointer, return the type of the pointed to object

func GetTypeName

func GetTypeName(e Event) string

GetTypeName the type of the given input value, or if input is a pointer, return the type of the pointed to object

func NewULID

func NewULID() string

NewULID returns a Universally Unique Lexicographically Sortable Identifier

Types

type Aggregate

type Aggregate interface {
	On(ctx context.Context, event Event) error
	SetAggregateID(id string)
}

Aggregate is an interface representing an object whose state changes can be recorded to and replayed from an event source.

type AggregatorMock

type AggregatorMock struct {
	Mock *mock.Mock
}

AggregatorMock is a mock

func CreateAggregatorMock

func CreateAggregatorMock() *AggregatorMock

CreateAggregatorMock returns an aggregatorMock

func (AggregatorMock) On

func (o AggregatorMock) On(ctx context.Context, event Event) error

On is a mock

func (AggregatorMock) SetAggregateID

func (o AggregatorMock) SetAggregateID(id string)

SetAggregateID is not implemented

type BaseEvent

type BaseEvent struct {
	AggregateID string `json:"aggregateId"`
	UserID      string `json:"userId"`
	SequenceID  string `json:"sequenceId"`
	Timestamp   int64  `json:"timestamp"`
}

BaseEvent ...

func (BaseEvent) GetAggregateID

func (e BaseEvent) GetAggregateID() string

GetAggregateID ...

func (BaseEvent) GetSequenceID

func (e BaseEvent) GetSequenceID() string

GetSequenceID ...

func (BaseEvent) GetTimestamp

func (e BaseEvent) GetTimestamp() int64

GetTimestamp ...

func (BaseEvent) GetUserID

func (e BaseEvent) GetUserID() string

GetUserID ...

func (*BaseEvent) SetSequenceID

func (e *BaseEvent) SetSequenceID(sequenceID string)

SetSequenceID ...

func (*BaseEvent) SetTimestamp

func (e *BaseEvent) SetTimestamp(timestamp int64)

SetTimestamp ...

type Event

type Event interface {
	GetAggregateID() string
	GetUserID() string
	GetSequenceID() string
	GetTimestamp() int64
	SetSequenceID(string)
	SetTimestamp(int64)
}

Event ...

type NotificationService

type NotificationService interface {
	Send(record Record) error // deprecated use SendWithContext instead
	SendWithContext(ctx context.Context, record Record) error
}

NotificationService represents a service which can emit notifications when records are saved to the event source

type NotificationServiceMock

type NotificationServiceMock struct {
	*mock.Mock
}

func CreateNotificationServiceMock

func CreateNotificationServiceMock() *NotificationServiceMock

CreateNotificationServiceMock creates a notification service mock

func (NotificationServiceMock) Send added in v2.1.0

func (ns NotificationServiceMock) Send(record Record) error

func (NotificationServiceMock) SendWithContext added in v2.5.0

func (ns NotificationServiceMock) SendWithContext(ctx context.Context, record Record) error

type QueryOption

type QueryOption func(opt interface{})

QueryOption is used for setting store specific options like limit or sorting Can be found in any of the stores

type Record

type Record struct {
	AggregateID string `json:"aggregateId"`
	SequenceID  string `json:"sequenceId"`
	Timestamp   int64  `json:"timestamp"`
	Type        string `json:"type"`
	Data        []byte `json:"data"`
	UserID      string `json:"userId"`
}

Record is a store row. The Data field contains the marshalled Event, and Type is the type of event retrieved by reflect.TypeOf(event).

type Repository

type Repository interface {
	// Return store
	Store() Store

	// Save one or more events to the repository
	Save(ctx context.Context, events ...Event) error

	// Save one or more events to the repository, within a transaction
	SaveTransaction(ctx context.Context, events ...Event) (StoreTransaction, error)

	// Load events from repository for the given aggregate ID. For each event e,
	// call aggr.On(e) to update the state of aggr. When done, aggr has been
	// "fast forwarded" to the current state.
	Load(ctx context.Context, id string, aggr Aggregate) (deleted bool, err error)

	// Get all events with query options (definied in the store)
	// Query options can be used for filter by sequence ID (see https://github.com/oklog/ulid)
	// or options like limit, offset
	LoadEvents(ctx context.Context, opts ...QueryOption) (events []Event, err error)

	// Deprecated: Use LoadEvents(ctx, store.BySequenceId(...))
	// Get all events with sequence ID newer than the given ID (see https://github.com/oklog/ulid)
	// Return at most limit records. If limit is 0, don't limit the number of records returned.
	GetEventsBySequenceID(ctx context.Context, sequenceID string, opts ...QueryOption) (events []Event, err error)

	// Deprecated: Use LoadEvents(ctx, store.BySequenceId(...), store.ByType(...))
	// Same as GetEventsBySequenceID, but only returns events of the same type
	// as the one provided in the eventType parameter.
	GetEventsBySequenceIDAndType(ctx context.Context, sequenceID string, eventType Event, opts ...QueryOption) (events []Event, err error)

	// Deprecated: Use LoadEvents(ctx, store.ByTimestamp(...))
	// Get all events newer than the given timestamp
	// Return at most limit records. If limit is 0, don't limit the number of records returned.
	GetEventsByTimestamp(ctx context.Context, timestamp int64, opts ...QueryOption) (events []Event, err error)

	// Add notification service
	AddNotificationService(service NotificationService)

	// Unmarshal records to events using repository
	UnmarshalRecords(records []Record) (events []Event, err error)
}

Repository is an interface representing the actual event source.

func NewRepository

func NewRepository(store Store, serializer Serializer) Repository

NewRepository returns a new repository

type RepositoryMock

type RepositoryMock struct {
	*mock.Mock
}

RepositoryMock is a mock

func CreateRepositoryMock

func CreateRepositoryMock() *RepositoryMock

CreateRepositoryMock returns a repositoryMock

func (RepositoryMock) AddNotificationService added in v2.1.0

func (r RepositoryMock) AddNotificationService(service NotificationService)

AddNotificationService is a mock

func (RepositoryMock) GetEventsBySequenceID

func (r RepositoryMock) GetEventsBySequenceID(ctx context.Context, sequenceID string, opts ...QueryOption) ([]Event, error)

GetEventsBySequenceID is a mock

func (RepositoryMock) GetEventsBySequenceIDAndType

func (r RepositoryMock) GetEventsBySequenceIDAndType(ctx context.Context, sequenceID string, eventType Event, opts ...QueryOption) ([]Event, error)

GetEventsBySequenceIDAndType is a mock

func (RepositoryMock) GetEventsByTimestamp

func (r RepositoryMock) GetEventsByTimestamp(ctx context.Context, timestamp int64, opts ...QueryOption) ([]Event, error)

GetEventsByTimestamp is a mock

func (RepositoryMock) Load

func (r RepositoryMock) Load(ctx context.Context, id string, aggr Aggregate) (deleted bool, err error)

Load is a mock

func (RepositoryMock) LoadEvents added in v2.2.0

func (r RepositoryMock) LoadEvents(ctx context.Context, opts ...QueryOption) ([]Event, error)

LoadEvents is a mock

func (RepositoryMock) Save

func (r RepositoryMock) Save(ctx context.Context, events ...Event) error

Save is a mock

func (RepositoryMock) SaveTransaction

func (r RepositoryMock) SaveTransaction(ctx context.Context, events ...Event) (StoreTransaction, error)

SaveTransaction is a mock

func (RepositoryMock) Store

func (r RepositoryMock) Store() Store

Store is a mock

func (RepositoryMock) UnmarshalRecords added in v2.3.0

func (r RepositoryMock) UnmarshalRecords(records []Record) ([]Event, error)

UnmarshalRecords is a mock

type Serializer

type Serializer interface {
	Unmarshal(data []byte, eventType string) (event Event, err error)
	Marshal(event Event) (data []byte, err error)
}

Serializer is an interface that should be implemented if events need to be saved using a new storage format.

type SerializerMock

type SerializerMock struct {
	*mock.Mock
}

SerializerMock is a mock

func CreateSerializerMock

func CreateSerializerMock() *SerializerMock

CreateSerializerMock returns a serializerMock

func (SerializerMock) Marshal

func (o SerializerMock) Marshal(event Event) (data []byte, err error)

Marshal returns the JSON encoding of event.

func (SerializerMock) Unmarshal

func (o SerializerMock) Unmarshal(data []byte, eventType string) (event Event, err error)

Unmarshal parses the JSON-encoded data and returns an event

type Store

type Store interface {
	NewTransaction(ctx context.Context, records ...Record) (StoreTransaction, error)
	LoadByAggregate(ctx context.Context, aggregateID string, opts ...QueryOption) ([]Record, error)
	Load(ctx context.Context, opts ...QueryOption) ([]Record, error)

	// Deprecated: Use Load(ctx, store.BySequenceID(...))
	LoadBySequenceID(ctx context.Context, sequenceID string, opts ...QueryOption) (record []Record, err error)

	// Deprecated: Use Load(ctx, store.BySequenceID(...), store.ByType(...))
	LoadBySequenceIDAndType(ctx context.Context, sequenceID string, eventType string, opts ...QueryOption) (records []Record, err error)

	// Deprecated: Use Load(ctx, store.ByTimestamp(...))
	LoadByTimestamp(ctx context.Context, timestamp int64, opts ...QueryOption) (record []Record, err error)
}

Store is the interface implemented by the data stores that can be used as back end for the event source.

type StoreMock

type StoreMock struct {
	*mock.Mock
}

StoreMock is a mock

func CreateStoreMock

func CreateStoreMock() *StoreMock

CreateStoreMock returns a storeMock

func (StoreMock) Load added in v2.2.0

func (o StoreMock) Load(ctx context.Context, opts ...QueryOption) (record []Record, err error)

LoadBySequenceID is a mock

func (StoreMock) LoadByAggregate

func (o StoreMock) LoadByAggregate(ctx context.Context, aggregateID string, opts ...QueryOption) (record []Record, err error)

LoadByAggregate is a mock

func (StoreMock) LoadBySequenceID

func (o StoreMock) LoadBySequenceID(ctx context.Context, sequenceID string, opts ...QueryOption) (record []Record, err error)

LoadBySequenceID is a mock

func (StoreMock) LoadBySequenceIDAndType

func (o StoreMock) LoadBySequenceIDAndType(ctx context.Context, sequenceID string, eventType string, opts ...QueryOption) (record []Record, err error)

LoadBySequenceIDAndType is a mock

func (StoreMock) LoadByTimestamp

func (o StoreMock) LoadByTimestamp(ctx context.Context, timestamp int64, opts ...QueryOption) (record []Record, err error)

LoadByTimestamp is a mock

func (StoreMock) NewTransaction

func (o StoreMock) NewTransaction(ctx context.Context, records ...Record) (StoreTransaction, error)

NewTransaction is a mock

type StoreTransaction

type StoreTransaction interface {
	Commit() error
	Rollback() error
	GetRecords() []Record
}

StoreTransaction encapsulates a write operation to a Store, allowing the caller to roll back the operation.

type StoreTransactionMock

type StoreTransactionMock struct {
	*mock.Mock
}

StoreTransactionMock is a mock

func CreateStoreTransactionMock

func CreateStoreTransactionMock() *StoreTransactionMock

CreateStoreTransactionMock returns a store transaction mock

func (StoreTransactionMock) Commit

func (o StoreTransactionMock) Commit() error

Commit is a mock

func (StoreTransactionMock) GetRecords added in v2.3.0

func (o StoreTransactionMock) GetRecords() []Record

GetRecords is a mock

func (StoreTransactionMock) Rollback

func (o StoreTransactionMock) Rollback() error

Rollback is a mock

Directories

Path Synopsis
notification
sns
serializers
stores

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL