cqrs

package module
v0.10.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 9, 2020 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrUnregisteredEventType ...
	ErrUnregisteredEventType = errors.New("unregistered event type")
)

Functions

func ApplyEvent added in v0.9.0

func ApplyEvent(aggregate Aggregate, track bool, event Event) error

ApplyEvent ...

func ApplyEvents added in v0.6.4

func ApplyEvents(aggregate Aggregate, track bool, events ...Event) error

ApplyEvents ...

func ApplyHistory added in v0.6.4

func ApplyHistory(aggregate Aggregate, events ...Event) error

ApplyHistory ...

Types

type Aggregate

type Aggregate interface {
	AggregateID() uuid.UUID
	AggregateType() AggregateType
	OriginalVersion() int
	CurrentVersion() int
	Changes() []Event
	TrackChange(...Event)
	ApplyEvent(Event) error
}

Aggregate is the aggregate of an event stream.

type AggregateConfig

type AggregateConfig interface {
	Register(AggregateType, AggregateFactory)
	New(AggregateType, uuid.UUID) (Aggregate, error)
	Factories() map[AggregateType]AggregateFactory
}

AggregateConfig ...

func NewAggregateConfig

func NewAggregateConfig() AggregateConfig

NewAggregateConfig ...

type AggregateFactory

type AggregateFactory func(uuid.UUID) Aggregate

AggregateFactory ...

type AggregateRepository

type AggregateRepository interface {
	Save(ctx context.Context, aggregate Aggregate) error
	Fetch(ctx context.Context, typ AggregateType, id uuid.UUID, version int) (Aggregate, error)
	FetchLatest(ctx context.Context, typ AggregateType, id uuid.UUID) (Aggregate, error)
	Remove(ctx context.Context, aggregate Aggregate) error
}

AggregateRepository ...

func NewAggregateRepository

func NewAggregateRepository(
	eventStore EventStore,
	aggregateCfg AggregateConfig,
	snapshots SnapshotRepository,
	options ...AggregateRepositoryOption,
) AggregateRepository

NewAggregateRepository ... snapshots is optional.

type AggregateRepositoryOption added in v0.4.4

type AggregateRepositoryOption func(*aggregateRepository)

AggregateRepositoryOption ...

func SnapshotEveryNth added in v0.4.4

func SnapshotEveryNth(n int) AggregateRepositoryOption

SnapshotEveryNth ...

type AggregateType

type AggregateType string

AggregateType is the type of an aggregate.

type BaseAggregate

type BaseAggregate struct {
	ID      uuid.UUID
	Type    AggregateType
	Version int
	// contains filtered or unexported fields
}

BaseAggregate is the base implementation for an aggregate.

func NewBaseAggregate

func NewBaseAggregate(typ AggregateType, id uuid.UUID) *BaseAggregate

NewBaseAggregate returns a new BaseAggregate.

func (*BaseAggregate) AggregateID

func (a *BaseAggregate) AggregateID() uuid.UUID

AggregateID returns the aggregate ID.

func (*BaseAggregate) AggregateType

func (a *BaseAggregate) AggregateType() AggregateType

AggregateType returns the type name of the aggregate.

func (*BaseAggregate) Changes

func (a *BaseAggregate) Changes() []Event

Changes returns the applied events.

func (*BaseAggregate) CurrentVersion

func (a *BaseAggregate) CurrentVersion() int

CurrentVersion returns the version of the aggregate after applying the changes.

func (*BaseAggregate) FlushChanges

func (a *BaseAggregate) FlushChanges()

FlushChanges ...

func (*BaseAggregate) NewEvent added in v0.4.3

func (a *BaseAggregate) NewEvent(typ EventType, data EventData) Event

NewEvent ...

func (*BaseAggregate) NewEventWithTime added in v0.4.3

func (a *BaseAggregate) NewEventWithTime(typ EventType, data EventData, time time.Time) Event

NewEventWithTime ...

func (*BaseAggregate) OriginalVersion

func (a *BaseAggregate) OriginalVersion() int

OriginalVersion returns the original version of the aggregate.

func (*BaseAggregate) TrackChange

func (a *BaseAggregate) TrackChange(events ...Event)

TrackChange adds applied events to the aggregate.

type BaseCommand

type BaseCommand struct {
	// contains filtered or unexported fields
}

BaseCommand is the base implementation of a command.

func NewBaseCommand

func NewBaseCommand(typ CommandType, aggregateType AggregateType, aggregateID uuid.UUID) BaseCommand

NewBaseCommand returns a new BaseCommand.

func (BaseCommand) AggregateID

func (cmd BaseCommand) AggregateID() uuid.UUID

AggregateID ...

func (BaseCommand) AggregateType

func (cmd BaseCommand) AggregateType() AggregateType

AggregateType ...

func (BaseCommand) CommandType

func (cmd BaseCommand) CommandType() CommandType

CommandType ...

type Command

type Command interface {
	CommandType() CommandType
	AggregateType() AggregateType
	AggregateID() uuid.UUID
}

Command is a command.

type CommandBus

type CommandBus interface {
	Dispatch(context.Context, Command) error
}

CommandBus is the command bus.

func NewCommandBus

func NewCommandBus(logger *log.Logger) CommandBus

NewCommandBus returns a new CommandBus.

func NewCommandBusWithConfig

func NewCommandBusWithConfig(config CommandConfig, logger *log.Logger) CommandBus

NewCommandBusWithConfig ...

type CommandConfig

type CommandConfig interface {
	Register(CommandType, CommandHandler)
	Handler(CommandType) (CommandHandler, error)
	Handlers() map[CommandType]CommandHandler
}

CommandConfig ...

func NewCommandConfig

func NewCommandConfig() CommandConfig

NewCommandConfig returns a new command configuration.

type CommandHandler

type CommandHandler interface {
	HandleCommand(context.Context, Command) error
}

CommandHandler handles commands.

type CommandType

type CommandType string

CommandType is a command type.

type Container added in v0.5.0

type Container interface {
	AggregateConfig() AggregateConfig
	EventConfig() EventConfig
	CommandConfig() CommandConfig
	EventBus() EventBus
	EventStore() EventStore
	CommandBus() CommandBus
	Snapshots() SnapshotRepository
	Aggregates() AggregateRepository
}

Container ...

func New

func New(
	logger *log.Logger,
	aggregateConfig AggregateConfig,
	eventConfig EventConfig,
	commandConfig CommandConfig,
	eventBus EventBus,
	eventStore EventStore,
	commandBus CommandBus,
	snapshots SnapshotRepository,
	aggregates AggregateRepository,
) Container

New ...

type Event

type Event interface {
	Type() EventType
	Data() EventData
	Time() time.Time

	AggregateType() AggregateType
	AggregateID() uuid.UUID
	Version() int
}

Event is an event.

func NewAggregateEvent

func NewAggregateEvent(typ EventType, data EventData, aggregateType AggregateType, aggregateID uuid.UUID, version int) Event

NewAggregateEvent creates a new aggregate event with time set to time.Now().

func NewAggregateEventWithTime added in v0.3.0

func NewAggregateEventWithTime(typ EventType, data EventData, time time.Time, aggregateType AggregateType, aggregateID uuid.UUID, version int) Event

NewAggregateEventWithTime creates a new aggregate event.

func NewEvent

func NewEvent(typ EventType, data EventData) Event

NewEvent creates a new event with time set to time.Now().

func NewEventWithTime added in v0.3.0

func NewEventWithTime(typ EventType, data EventData, time time.Time) Event

NewEventWithTime creates a new event.

type EventBus

type EventBus interface {
	EventPublisher
	EventSubscriber
}

EventBus is the event bus.

type EventConfig

type EventConfig interface {
	Register(EventType, EventData)
	// NewData creates an EventData instance for EventType.
	// The returned object is a struct.
	NewData(EventType) (EventData, error)
	Factories() map[EventType]EventDataFactory
}

EventConfig is the configuration for the events.

func NewEventConfig

func NewEventConfig() EventConfig

NewEventConfig returns a new event config.

type EventData

type EventData interface{}

EventData is the payload of an event.

type EventDataFactory

type EventDataFactory func() EventData

EventDataFactory ...

type EventPublisher

type EventPublisher interface {
	Publish(ctx context.Context, events ...Event) error
}

EventPublisher publishes events.

type EventStore

type EventStore interface {
	Save(ctx context.Context, originalVersion int, events ...Event) error
	Find(ctx context.Context, aggregateType AggregateType, aggregateID uuid.UUID, version int) (Event, error)
	Fetch(ctx context.Context, aggregateType AggregateType, aggregateID uuid.UUID, from int, to int) ([]Event, error)
	FetchAll(ctx context.Context, aggregateType AggregateType, aggregateID uuid.UUID) ([]Event, error)
	FetchFrom(ctx context.Context, aggregateType AggregateType, aggregateID uuid.UUID, from int) ([]Event, error)
	FetchTo(ctx context.Context, aggregateType AggregateType, aggregateID uuid.UUID, to int) ([]Event, error)
	RemoveAll(ctx context.Context, aggregateType AggregateType, aggregateID uuid.UUID) error
}

EventStore stores events in a database.

type EventStoreError

type EventStoreError struct {
	Err       error
	StoreName string
}

EventStoreError ...

func (EventStoreError) Error

func (err EventStoreError) Error() string

func (EventStoreError) Unwrap

func (err EventStoreError) Unwrap() error

Unwrap ...

type EventSubscriber

type EventSubscriber interface {
	Subscribe(ctx context.Context, types ...EventType) (<-chan Event, error)
}

EventSubscriber subscribes to events.

type EventType

type EventType string

EventType is the type of an event.

func (EventType) String

func (t EventType) String() string

type OptimisticConcurrencyError

type OptimisticConcurrencyError struct {
	LatestVersion   int
	ProvidedVersion int
}

OptimisticConcurrencyError ...

func (OptimisticConcurrencyError) Error

func (err OptimisticConcurrencyError) Error() string

type SnapshotConfig

type SnapshotConfig interface {
	IsDue(Aggregate) bool
}

SnapshotConfig ...

func NewSnapshotConfig

func NewSnapshotConfig(options ...SnapshotOption) SnapshotConfig

NewSnapshotConfig ...

type SnapshotError

type SnapshotError struct {
	Err       error
	StoreName string
}

SnapshotError ...

func (SnapshotError) Error

func (err SnapshotError) Error() string

func (SnapshotError) Unwrap

func (err SnapshotError) Unwrap() error

Unwrap ...

type SnapshotOption added in v0.2.0

type SnapshotOption func(*snapshotConfig)

SnapshotOption ...

func SnapshotInterval added in v0.2.0

func SnapshotInterval(typ AggregateType, every int) SnapshotOption

SnapshotInterval ...

type SnapshotRepository

type SnapshotRepository interface {
	Save(ctx context.Context, snap Aggregate) error
	Find(ctx context.Context, typ AggregateType, id uuid.UUID, version int) (Aggregate, error)
	Latest(ctx context.Context, typ AggregateType, id uuid.UUID) (Aggregate, error)
	MaxVersion(ctx context.Context, typ AggregateType, id uuid.UUID, maxVersion int) (Aggregate, error)
}

SnapshotRepository ...

type UnregisteredAggregateError

type UnregisteredAggregateError struct {
	AggregateType AggregateType
}

UnregisteredAggregateError is raised when an event type is not registered.

func (UnregisteredAggregateError) Error

func (err UnregisteredAggregateError) Error() string

type UnregisteredCommandError

type UnregisteredCommandError struct {
	CommandType CommandType
}

UnregisteredCommandError is raised when a command type is not registered.

func (UnregisteredCommandError) Error

func (err UnregisteredCommandError) Error() string

type UnregisteredEventError

type UnregisteredEventError struct {
	EventType EventType
}

UnregisteredEventError is raised when an event type is not registered.

func (UnregisteredEventError) Error

func (err UnregisteredEventError) Error() string

Directories

Path Synopsis
eventbus
eventstore
Package mock_cqrs is a generated GoMock package.
Package mock_cqrs is a generated GoMock package.
setup
Package mock_setup is a generated GoMock package.
Package mock_setup is a generated GoMock package.
snapshot

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL