eventstore

package
v0.0.0-...-81f6bf7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 20, 2024 License: Apache-2.0 Imports: 14 Imported by: 46

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidEventType    = InvalidEventTypeError{}
	ErrAggregateNotFound   = errors.New("aggregate not found")
	ErrInvalidCommandType  = errors.New("invalid command type")
	ErrInvalidRequestType  = errors.New("invalid request type")
	ErrInvalidAggregate    = errors.New("invalid aggregate")
	ErrInvalidAggregateID  = errors.New("invalid aggregate id")
	ErrInvalidEventVersion = errors.New("invalid event version")
)

Functions

func AllowCheckForNoChanges

func AllowCheckForNoChanges(appSource, loggedInUserId string) bool

func EnrichEventWithMetadata

func EnrichEventWithMetadata(event *Event, span *opentracing.Span, tenant, userId string)

Deprecated, use EnrichEventWithMetadataExtended instead

func EnrichEventWithMetadataExtended

func EnrichEventWithMetadataExtended(event *Event, span opentracing.Span, mtd EventMetadata)

func GetAggregateObjectID

func GetAggregateObjectID(aggregateID, tenant string, aggregateType AggregateType) string

func GetAggregateWithIdObjectID

func GetAggregateWithIdObjectID(aggregateID string, aggregateType AggregateType) string

func GetAggregateWithTenantAndIdObjectID

func GetAggregateWithTenantAndIdObjectID(aggregateID string, aggregateType AggregateType, tenant string) string

func GetTempAggregateWithTenantAndIdObjectID

func GetTempAggregateWithTenantAndIdObjectID(aggregateID string, aggregateType AggregateType, tenant string) string

func GetTenantFromAggregate

func GetTenantFromAggregate(aggregateID string, aggregateType AggregateType) string

func IsAggregateNotFound

func IsAggregateNotFound(aggregate Aggregate) bool

func IsEventStoreErrorCodeResourceAlreadyExists

func IsEventStoreErrorCodeResourceAlreadyExists(err error) bool

func IsEventStoreErrorCodeResourceNotFound

func IsEventStoreErrorCodeResourceNotFound(err error) bool

func IsEventStoreErrorCodeWrongExpectedVersion

func IsEventStoreErrorCodeWrongExpectedVersion(err error) bool

func LoadAggregate

func LoadAggregate(ctx context.Context, eventStore AggregateStore, agg Aggregate, options LoadAggregateOptions) error

Types

type Aggregate

type Aggregate interface {
	When
	AggregateRoot
	HandleGRPCRequest(ctx context.Context, request any, params map[string]any) (any, error)
}

type AggregateBase

type AggregateBase struct {
	ID                string
	Tenant            string
	Version           int64
	AppliedEvents     []Event
	UncommittedEvents []Event
	Type              AggregateType
	// contains filtered or unexported fields
}

AggregateBase base aggregate contains all main necessary fields

func NewAggregateBase

func NewAggregateBase(when when) *AggregateBase

func (*AggregateBase) Apply

func (a *AggregateBase) Apply(event Event) error

Apply push event to aggregate uncommitted events using When method

func (*AggregateBase) ApplyAll

func (a *AggregateBase) ApplyAll(events []Event) error

func (*AggregateBase) ClearUncommittedEvents

func (a *AggregateBase) ClearUncommittedEvents()

ClearUncommittedEvents clear AggregateBase uncommitted Event's

func (*AggregateBase) GetAppliedEvents

func (a *AggregateBase) GetAppliedEvents() []Event

GetAppliedEvents get AggregateBase applied Event's

func (*AggregateBase) GetID

func (a *AggregateBase) GetID() string

GetID get AggregateBase ID

func (*AggregateBase) GetStreamMetadata

func (a *AggregateBase) GetStreamMetadata() *esdb.StreamMetadata

func (*AggregateBase) GetTenant

func (a *AggregateBase) GetTenant() string

GetTenant get AggregateBase Tenant

func (*AggregateBase) GetType

func (a *AggregateBase) GetType() AggregateType

GetType get AggregateBase AggregateType

func (*AggregateBase) GetUncommittedEvents

func (a *AggregateBase) GetUncommittedEvents() []Event

GetUncommittedEvents get AggregateBase uncommitted Event's

func (*AggregateBase) GetVersion

func (a *AggregateBase) GetVersion() int64

GetVersion get AggregateBase version

func (*AggregateBase) IsTemporal

func (a *AggregateBase) IsTemporal() bool

func (*AggregateBase) IsWithAppliedEvents

func (a *AggregateBase) IsWithAppliedEvents() bool

func (*AggregateBase) Load

func (a *AggregateBase) Load(events []Event) error

Load add existing events from event store to aggregate using When interface method

func (*AggregateBase) PrepareStreamMetadata

func (a *AggregateBase) PrepareStreamMetadata() esdb.StreamMetadata

func (*AggregateBase) RaiseEvent

func (a *AggregateBase) RaiseEvent(event Event) error

RaiseEvent push event to aggregate applied events using When method, used for load directly from eventstore

func (*AggregateBase) SetAppliedEvents

func (a *AggregateBase) SetAppliedEvents(events []Event)

SetAppliedEvents set AggregateBase applied Event's

func (*AggregateBase) SetID

func (a *AggregateBase) SetID(suffix string) *AggregateBase

SetID set AggregateBase ID

func (*AggregateBase) SetStreamMetadata

func (a *AggregateBase) SetStreamMetadata(streamMetadata *esdb.StreamMetadata)

func (*AggregateBase) SetTemporal

func (a *AggregateBase) SetTemporal(temporal bool)

func (*AggregateBase) SetType

func (a *AggregateBase) SetType(aggregateType AggregateType)

SetType set AggregateBase AggregateType

func (*AggregateBase) SetVersion

func (a *AggregateBase) SetVersion(version int64)

SetVersion set AggregateBase version

func (*AggregateBase) String

func (a *AggregateBase) String() string

func (*AggregateBase) ToSnapshot

func (a *AggregateBase) ToSnapshot()

func (*AggregateBase) WithAppliedEvents

func (a *AggregateBase) WithAppliedEvents()

type AggregateRoot

type AggregateRoot interface {
	GetUncommittedEvents() []Event
	GetTenant() string
	GetID() string
	SetID(id string) *AggregateBase
	GetVersion() int64
	SetVersion(version int64)
	ClearUncommittedEvents()
	ToSnapshot()
	SetType(aggregateType AggregateType)
	GetType() AggregateType
	SetAppliedEvents(events []Event)
	GetAppliedEvents() []Event
	RaiseEvent(event Event) error
	String() string
	IsWithAppliedEvents() bool
	IsTemporal() bool
	SetStreamMetadata(streamMetadata *esdb.StreamMetadata)
	GetStreamMetadata() *esdb.StreamMetadata
	PrepareStreamMetadata() esdb.StreamMetadata
	Load
	Apply
}

AggregateRoot contains all methods of AggregateBase

type AggregateStore

type AggregateStore interface {
	// Load loads the most recent version of an aggregate to provided  into params aggregate with a type and id.
	Load(ctx context.Context, aggregate Aggregate) error

	// LoadVersion loads the most recent version of an aggregate to provided  into params aggregate with a type and id.
	LoadVersion(ctx context.Context, aggregate Aggregate) error

	// Save saves the uncommitted events for an aggregate.
	Save(ctx context.Context, aggregate Aggregate) error

	// Exists check aggregate exists by id.
	Exists(ctx context.Context, streamID string) error

	// UpdateStreamMetadata updates the stream metadata for the aggregate.
	UpdateStreamMetadata(ctx context.Context, streamID string, streamMetadata esdb.StreamMetadata) error
}

AggregateStore is responsible for loading and saving aggregates.

type AggregateType

type AggregateType string

AggregateType type of the Aggregate

type Apply

type Apply interface {
	Apply(event Event) error
	ApplyAll(events []Event) error
}

Apply process Aggregate Event

type BaseCommand

type BaseCommand struct {
	ObjectID       string `json:"objectID" validate:"required"`
	Tenant         string `json:"tenant" validate:"required"`
	LoggedInUserId string `json:"loggedInUserId"`
	AppSource      string `json:"appSource"`
}

func NewBaseCommand

func NewBaseCommand(objectID, tenant, loggedInUserId string) BaseCommand

func (BaseCommand) GetAppSource

func (c BaseCommand) GetAppSource() string

func (BaseCommand) GetLoggedInUserId

func (c BaseCommand) GetLoggedInUserId() string

func (BaseCommand) GetObjectID

func (c BaseCommand) GetObjectID() string

func (BaseCommand) GetTenant

func (c BaseCommand) GetTenant() string

func (BaseCommand) WithAppSource

func (c BaseCommand) WithAppSource(appSource string) BaseCommand

type Command

type Command interface {
	GetObjectID() string
	GetTenant() string
}

type CommonIdAggregate

type CommonIdAggregate struct {
	*AggregateBase
	// contains filtered or unexported fields
}

func LoadCommonAggregateWithId

func LoadCommonAggregateWithId(ctx context.Context, eventStore AggregateStore, aggregateType AggregateType, objectID string) (*CommonIdAggregate, error)

func NewCommonAggregateWithId

func NewCommonAggregateWithId(aggregateType AggregateType, id string) *CommonIdAggregate

func (*CommonIdAggregate) HandleGRPCRequest

func (a *CommonIdAggregate) HandleGRPCRequest(ctx context.Context, request any, params map[string]any) (any, error)

func (CommonIdAggregate) NotFound

func (a CommonIdAggregate) NotFound() bool

func (*CommonIdAggregate) SetWhen

func (a *CommonIdAggregate) SetWhen(when func(event Event) error)

func (*CommonIdAggregate) When

func (a *CommonIdAggregate) When(event Event) error

type CommonTenantIdAggregate

type CommonTenantIdAggregate struct {
	*AggregateBase
	// contains filtered or unexported fields
}

func NewCommonAggregate

func NewCommonAggregate(aggregateType AggregateType) *CommonTenantIdAggregate

func NewCommonAggregateWithTenantAndId

func NewCommonAggregateWithTenantAndId(aggregateType AggregateType, tenant, id string) *CommonTenantIdAggregate

func (*CommonTenantIdAggregate) HandleGRPCRequest

func (a *CommonTenantIdAggregate) HandleGRPCRequest(ctx context.Context, request any, params map[string]any) (any, error)

func (CommonTenantIdAggregate) NotFound

func (a CommonTenantIdAggregate) NotFound() bool

func (*CommonTenantIdAggregate) SetWhen

func (a *CommonTenantIdAggregate) SetWhen(when func(event Event) error)

func (*CommonTenantIdAggregate) When

func (a *CommonTenantIdAggregate) When(event Event) error

type CommonTenantIdTempAggregate

type CommonTenantIdTempAggregate struct {
	*AggregateBase
	// contains filtered or unexported fields
}

func LoadCommonTempAggregateWithTenantAndId

func LoadCommonTempAggregateWithTenantAndId(ctx context.Context, eventStore AggregateStore, aggregateType AggregateType, tenant, objectID string) (*CommonTenantIdTempAggregate, error)

func NewCommonTempAggregate

func NewCommonTempAggregate(aggregateType AggregateType) *CommonTenantIdTempAggregate

func NewCommonTempAggregateWithTenantAndId

func NewCommonTempAggregateWithTenantAndId(aggregateType AggregateType, tenant, id string) *CommonTenantIdTempAggregate

func (*CommonTenantIdTempAggregate) HandleGRPCRequest

func (a *CommonTenantIdTempAggregate) HandleGRPCRequest(ctx context.Context, request any, params map[string]any) (any, error)

func (*CommonTenantIdTempAggregate) IsTemporal

func (a *CommonTenantIdTempAggregate) IsTemporal() bool

func (CommonTenantIdTempAggregate) NotFound

func (a CommonTenantIdTempAggregate) NotFound() bool

func (*CommonTenantIdTempAggregate) PrepareStreamMetadata

func (a *CommonTenantIdTempAggregate) PrepareStreamMetadata() esdb.StreamMetadata

func (*CommonTenantIdTempAggregate) SetWhen

func (a *CommonTenantIdTempAggregate) SetWhen(when func(event Event) error)

func (*CommonTenantIdTempAggregate) When

func (a *CommonTenantIdTempAggregate) When(event Event) error

type Event

type Event struct {
	EventID       string
	EventType     string
	Data          []byte
	Timestamp     time.Time
	AggregateType AggregateType
	AggregateID   string
	Version       int64
	Metadata      []byte
}

Event is an internal representation of an event, returned when the Aggregate uses NewEvent to create a new event. The events loaded from the db is represented by each DBs internal event type, implementing Event.

func NewBaseEvent

func NewBaseEvent(aggregate Aggregate, eventType string) Event

NewBaseEvent new base Event constructor with configured EventID, Aggregate properties and Timestamp.

func NewEventFromRecorded

func NewEventFromRecorded(event *esdb.RecordedEvent) Event

func ToAggregateEvent

func ToAggregateEvent(aggregate Aggregate, eventData baseEvent.BaseEvent) (Event, error)

TODO

func (*Event) GetAggregateID

func (e *Event) GetAggregateID() string

GetAggregateID is the ID of the Aggregate that the Event belongs to

func (*Event) GetAggregateType

func (e *Event) GetAggregateType() AggregateType

GetAggregateType is the AggregateType that the Event can be applied to.

func (*Event) GetData

func (e *Event) GetData() []byte

GetData The data attached to the Event serialized to bytes.

func (*Event) GetEventID

func (e *Event) GetEventID() string

GetEventID get EventID of the Event.

func (*Event) GetEventType

func (e *Event) GetEventType() string

GetEventType returns the EventType of the event.

func (*Event) GetJsonData

func (e *Event) GetJsonData(data interface{}) error

GetJsonData json unmarshal data attached to the Event.

func (*Event) GetJsonMetadata

func (e *Event) GetJsonMetadata(metaData interface{}) error

GetJsonMetadata unmarshal app-specific metadata serialized as json for the Event.

func (*Event) GetMetadata

func (e *Event) GetMetadata() []byte

GetMetadata is app-specific metadata such as request ID, originating user etc.

func (*Event) GetString

func (e *Event) GetString() string

GetString A string representation of the Event.

func (*Event) GetTimeStamp

func (e *Event) GetTimeStamp() time.Time

GetTimeStamp get timestamp of the Event.

func (*Event) GetVersion

func (e *Event) GetVersion() int64

GetVersion is the version of the Aggregate after the Event has been applied.

func (Event) MarshalJSON

func (e Event) MarshalJSON() ([]byte, error)

func (*Event) SetAggregateType

func (e *Event) SetAggregateType(aggregateType AggregateType)

SetAggregateType set the AggregateType that the Event can be applied to.

func (*Event) SetData

func (e *Event) SetData(data []byte) *Event

SetData add the data attached to the Event serialized to bytes.

func (*Event) SetJsonData

func (e *Event) SetJsonData(data interface{}) error

SetJsonData serialize to json and set data attached to the Event.

func (*Event) SetMetadata

func (e *Event) SetMetadata(metaData interface{}) error

SetMetadata add app-specific metadata serialized as json for the Event.

func (*Event) SetVersion

func (e *Event) SetVersion(aggregateVersion int64)

SetVersion set the version of the Aggregate.

func (*Event) String

func (e *Event) String() string

func (*Event) ToEventData

func (e *Event) ToEventData() esdb.EventData

type EventMetadata

type EventMetadata struct {
	Tenant string `json:"tenant"`
	UserId string `json:"user-id"`
	App    string `json:"app"`
}

type EventStore

type EventStore interface {
	// SaveEvents appends all events in the event stream to the store.
	SaveEvents(ctx context.Context, streamID string, events []Event) error

	// LoadEvents loads all events for the aggregate id from the store.
	LoadEvents(ctx context.Context, streamID string) ([]Event, error)
}

EventStore is an interface for an event sourcing event store.

type EventType

type EventType string

EventType is the type of any event, used as its unique identifier.

type InvalidEventTypeError

type InvalidEventTypeError struct {
	EventType string
}

func (InvalidEventTypeError) Error

func (e InvalidEventTypeError) Error() string

type Load

type Load interface {
	Load(events []Event) error
}

Load create Aggregate state from Event's.

type LoadAggregateOptions

type LoadAggregateOptions struct {
	Required       bool
	SkipLoadEvents bool
}

func NewLoadAggregateOptions

func NewLoadAggregateOptions() *LoadAggregateOptions

func NewLoadAggregateOptionsWithRequired

func NewLoadAggregateOptionsWithRequired() *LoadAggregateOptions

func (*LoadAggregateOptions) WithSkipLoadEvents

func (o *LoadAggregateOptions) WithSkipLoadEvents() *LoadAggregateOptions

type Projection

type Projection interface {
	When(ctx context.Context, evt Event) error
}

Projection When method works and process Event's like Aggregate's for interacting with read database.

type RecordedBaseEvent

type RecordedBaseEvent struct {
	// Event's id.
	EventID string
	// Event's type.
	EventType string
	// Event's content type.
	ContentType string
	// The stream that event belongs to.
	StreamID string
	// The event's revision number.
	EventNumber uint64
	// The event's transaction log position.
	Position esdb.Position
	// When the event was created.
	CreatedDate time.Time
}

func NewRecordedBaseEventFromRecorded

func NewRecordedBaseEventFromRecorded(recorded *esdb.RecordedEvent) RecordedBaseEvent

type When

type When interface {
	When(event Event) error
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL