Documentation ¶
Index ¶
- Variables
- func AllowCheckForNoChanges(appSource, loggedInUserId string) bool
- func EnrichEventWithMetadata(event *Event, span *opentracing.Span, tenant, userId string)
- func EnrichEventWithMetadataExtended(event *Event, span opentracing.Span, mtd EventMetadata)
- func GetAggregateObjectID(aggregateID, tenant string, aggregateType AggregateType) string
- func GetAggregateWithIdObjectID(aggregateID string, aggregateType AggregateType) string
- func GetAggregateWithTenantAndIdObjectID(aggregateID string, aggregateType AggregateType, tenant string) string
- func GetTempAggregateWithTenantAndIdObjectID(aggregateID string, aggregateType AggregateType, tenant string) string
- func GetTenantFromAggregate(aggregateID string, aggregateType AggregateType) string
- func IsAggregateNotFound(aggregate Aggregate) bool
- func IsEventStoreErrorCodeResourceAlreadyExists(err error) bool
- func IsEventStoreErrorCodeResourceNotFound(err error) bool
- func IsEventStoreErrorCodeWrongExpectedVersion(err error) bool
- func LoadAggregate(ctx context.Context, eventStore AggregateStore, agg Aggregate, ...) error
- type Aggregate
- type AggregateBase
- func (a *AggregateBase) Apply(event Event) error
- func (a *AggregateBase) ApplyAll(events []Event) error
- func (a *AggregateBase) ClearUncommittedEvents()
- func (a *AggregateBase) GetAppliedEvents() []Event
- func (a *AggregateBase) GetID() string
- func (a *AggregateBase) GetStreamMetadata() *esdb.StreamMetadata
- func (a *AggregateBase) GetTenant() string
- func (a *AggregateBase) GetType() AggregateType
- func (a *AggregateBase) GetUncommittedEvents() []Event
- func (a *AggregateBase) GetVersion() int64
- func (a *AggregateBase) IsTemporal() bool
- func (a *AggregateBase) IsWithAppliedEvents() bool
- func (a *AggregateBase) Load(events []Event) error
- func (a *AggregateBase) PrepareStreamMetadata() esdb.StreamMetadata
- func (a *AggregateBase) RaiseEvent(event Event) error
- func (a *AggregateBase) SetAppliedEvents(events []Event)
- func (a *AggregateBase) SetID(suffix string) *AggregateBase
- func (a *AggregateBase) SetStreamMetadata(streamMetadata *esdb.StreamMetadata)
- func (a *AggregateBase) SetTemporal(temporal bool)
- func (a *AggregateBase) SetType(aggregateType AggregateType)
- func (a *AggregateBase) SetVersion(version int64)
- func (a *AggregateBase) String() string
- func (a *AggregateBase) ToSnapshot()
- func (a *AggregateBase) WithAppliedEvents()
- type AggregateRoot
- type AggregateStore
- type AggregateType
- type Apply
- type BaseCommand
- type Command
- type CommonIdAggregate
- type CommonTenantIdAggregate
- func (a *CommonTenantIdAggregate) HandleGRPCRequest(ctx context.Context, request any, params map[string]any) (any, error)
- func (a CommonTenantIdAggregate) NotFound() bool
- func (a *CommonTenantIdAggregate) SetWhen(when func(event Event) error)
- func (a *CommonTenantIdAggregate) When(event Event) error
- type CommonTenantIdTempAggregate
- func LoadCommonTempAggregateWithTenantAndId(ctx context.Context, eventStore AggregateStore, aggregateType AggregateType, ...) (*CommonTenantIdTempAggregate, error)
- func NewCommonTempAggregate(aggregateType AggregateType) *CommonTenantIdTempAggregate
- func NewCommonTempAggregateWithTenantAndId(aggregateType AggregateType, tenant, id string) *CommonTenantIdTempAggregate
- func (a *CommonTenantIdTempAggregate) HandleGRPCRequest(ctx context.Context, request any, params map[string]any) (any, error)
- func (a *CommonTenantIdTempAggregate) IsTemporal() bool
- func (a CommonTenantIdTempAggregate) NotFound() bool
- func (a *CommonTenantIdTempAggregate) PrepareStreamMetadata() esdb.StreamMetadata
- func (a *CommonTenantIdTempAggregate) SetWhen(when func(event Event) error)
- func (a *CommonTenantIdTempAggregate) When(event Event) error
- type Event
- func (e *Event) GetAggregateID() string
- func (e *Event) GetAggregateType() AggregateType
- func (e *Event) GetData() []byte
- func (e *Event) GetEventID() string
- func (e *Event) GetEventType() string
- func (e *Event) GetJsonData(data interface{}) error
- func (e *Event) GetJsonMetadata(metaData interface{}) error
- func (e *Event) GetMetadata() []byte
- func (e *Event) GetString() string
- func (e *Event) GetTimeStamp() time.Time
- func (e *Event) GetVersion() int64
- func (e Event) MarshalJSON() ([]byte, error)
- func (e *Event) SetAggregateType(aggregateType AggregateType)
- func (e *Event) SetData(data []byte) *Event
- func (e *Event) SetJsonData(data interface{}) error
- func (e *Event) SetMetadata(metaData interface{}) error
- func (e *Event) SetVersion(aggregateVersion int64)
- func (e *Event) String() string
- func (e *Event) ToEventData() esdb.EventData
- type EventMetadata
- type EventStore
- type EventType
- type InvalidEventTypeError
- type Load
- type LoadAggregateOptions
- type Projection
- type RecordedBaseEvent
- type When
Constants ¶
This section is empty.
Variables ¶
var ( ErrInvalidEventType = InvalidEventTypeError{} ErrAggregateNotFound = errors.New("aggregate not found") ErrInvalidCommandType = errors.New("invalid command type") ErrInvalidRequestType = errors.New("invalid request type") ErrInvalidAggregate = errors.New("invalid aggregate") ErrInvalidAggregateID = errors.New("invalid aggregate id") ErrInvalidEventVersion = errors.New("invalid event version") )
Functions ¶
func AllowCheckForNoChanges ¶
func EnrichEventWithMetadata ¶
Deprecated, use EnrichEventWithMetadataExtended instead
func EnrichEventWithMetadataExtended ¶
func EnrichEventWithMetadataExtended(event *Event, span opentracing.Span, mtd EventMetadata)
func GetAggregateObjectID ¶
func GetAggregateObjectID(aggregateID, tenant string, aggregateType AggregateType) string
func GetAggregateWithIdObjectID ¶
func GetAggregateWithIdObjectID(aggregateID string, aggregateType AggregateType) string
func GetAggregateWithTenantAndIdObjectID ¶
func GetAggregateWithTenantAndIdObjectID(aggregateID string, aggregateType AggregateType, tenant string) string
func GetTempAggregateWithTenantAndIdObjectID ¶
func GetTempAggregateWithTenantAndIdObjectID(aggregateID string, aggregateType AggregateType, tenant string) string
func GetTenantFromAggregate ¶
func GetTenantFromAggregate(aggregateID string, aggregateType AggregateType) string
func IsAggregateNotFound ¶
func LoadAggregate ¶
func LoadAggregate(ctx context.Context, eventStore AggregateStore, agg Aggregate, options LoadAggregateOptions) error
Types ¶
type AggregateBase ¶
type AggregateBase struct { ID string Tenant string Version int64 AppliedEvents []Event UncommittedEvents []Event Type AggregateType // contains filtered or unexported fields }
AggregateBase base aggregate contains all main necessary fields
func NewAggregateBase ¶
func NewAggregateBase(when when) *AggregateBase
func (*AggregateBase) Apply ¶
func (a *AggregateBase) Apply(event Event) error
Apply push event to aggregate uncommitted events using When method
func (*AggregateBase) ApplyAll ¶
func (a *AggregateBase) ApplyAll(events []Event) error
func (*AggregateBase) ClearUncommittedEvents ¶
func (a *AggregateBase) ClearUncommittedEvents()
ClearUncommittedEvents clear AggregateBase uncommitted Event's
func (*AggregateBase) GetAppliedEvents ¶
func (a *AggregateBase) GetAppliedEvents() []Event
GetAppliedEvents get AggregateBase applied Event's
func (*AggregateBase) GetStreamMetadata ¶
func (a *AggregateBase) GetStreamMetadata() *esdb.StreamMetadata
func (*AggregateBase) GetTenant ¶
func (a *AggregateBase) GetTenant() string
GetTenant get AggregateBase Tenant
func (*AggregateBase) GetType ¶
func (a *AggregateBase) GetType() AggregateType
GetType get AggregateBase AggregateType
func (*AggregateBase) GetUncommittedEvents ¶
func (a *AggregateBase) GetUncommittedEvents() []Event
GetUncommittedEvents get AggregateBase uncommitted Event's
func (*AggregateBase) GetVersion ¶
func (a *AggregateBase) GetVersion() int64
GetVersion get AggregateBase version
func (*AggregateBase) IsTemporal ¶
func (a *AggregateBase) IsTemporal() bool
func (*AggregateBase) IsWithAppliedEvents ¶
func (a *AggregateBase) IsWithAppliedEvents() bool
func (*AggregateBase) Load ¶
func (a *AggregateBase) Load(events []Event) error
Load add existing events from event store to aggregate using When interface method
func (*AggregateBase) PrepareStreamMetadata ¶
func (a *AggregateBase) PrepareStreamMetadata() esdb.StreamMetadata
func (*AggregateBase) RaiseEvent ¶
func (a *AggregateBase) RaiseEvent(event Event) error
RaiseEvent push event to aggregate applied events using When method, used for load directly from eventstore
func (*AggregateBase) SetAppliedEvents ¶
func (a *AggregateBase) SetAppliedEvents(events []Event)
SetAppliedEvents set AggregateBase applied Event's
func (*AggregateBase) SetID ¶
func (a *AggregateBase) SetID(suffix string) *AggregateBase
SetID set AggregateBase ID
func (*AggregateBase) SetStreamMetadata ¶
func (a *AggregateBase) SetStreamMetadata(streamMetadata *esdb.StreamMetadata)
func (*AggregateBase) SetTemporal ¶
func (a *AggregateBase) SetTemporal(temporal bool)
func (*AggregateBase) SetType ¶
func (a *AggregateBase) SetType(aggregateType AggregateType)
SetType set AggregateBase AggregateType
func (*AggregateBase) SetVersion ¶
func (a *AggregateBase) SetVersion(version int64)
SetVersion set AggregateBase version
func (*AggregateBase) String ¶
func (a *AggregateBase) String() string
func (*AggregateBase) ToSnapshot ¶
func (a *AggregateBase) ToSnapshot()
func (*AggregateBase) WithAppliedEvents ¶
func (a *AggregateBase) WithAppliedEvents()
type AggregateRoot ¶
type AggregateRoot interface { GetUncommittedEvents() []Event GetTenant() string GetID() string SetID(id string) *AggregateBase GetVersion() int64 SetVersion(version int64) ClearUncommittedEvents() ToSnapshot() SetType(aggregateType AggregateType) GetType() AggregateType SetAppliedEvents(events []Event) GetAppliedEvents() []Event RaiseEvent(event Event) error String() string IsWithAppliedEvents() bool IsTemporal() bool SetStreamMetadata(streamMetadata *esdb.StreamMetadata) GetStreamMetadata() *esdb.StreamMetadata PrepareStreamMetadata() esdb.StreamMetadata Load Apply }
AggregateRoot contains all methods of AggregateBase
type AggregateStore ¶
type AggregateStore interface { // Load loads the most recent version of an aggregate to provided into params aggregate with a type and id. Load(ctx context.Context, aggregate Aggregate) error // LoadVersion loads the most recent version of an aggregate to provided into params aggregate with a type and id. LoadVersion(ctx context.Context, aggregate Aggregate) error // Save saves the uncommitted events for an aggregate. Save(ctx context.Context, aggregate Aggregate) error // Exists check aggregate exists by id. Exists(ctx context.Context, streamID string) error // UpdateStreamMetadata updates the stream metadata for the aggregate. UpdateStreamMetadata(ctx context.Context, streamID string, streamMetadata esdb.StreamMetadata) error }
AggregateStore is responsible for loading and saving aggregates.
type BaseCommand ¶
type BaseCommand struct { ObjectID string `json:"objectID" validate:"required"` Tenant string `json:"tenant" validate:"required"` LoggedInUserId string `json:"loggedInUserId"` AppSource string `json:"appSource"` }
func NewBaseCommand ¶
func NewBaseCommand(objectID, tenant, loggedInUserId string) BaseCommand
func (BaseCommand) GetAppSource ¶
func (c BaseCommand) GetAppSource() string
func (BaseCommand) GetLoggedInUserId ¶
func (c BaseCommand) GetLoggedInUserId() string
func (BaseCommand) GetObjectID ¶
func (c BaseCommand) GetObjectID() string
func (BaseCommand) GetTenant ¶
func (c BaseCommand) GetTenant() string
func (BaseCommand) WithAppSource ¶
func (c BaseCommand) WithAppSource(appSource string) BaseCommand
type CommonIdAggregate ¶
type CommonIdAggregate struct { *AggregateBase // contains filtered or unexported fields }
func LoadCommonAggregateWithId ¶
func LoadCommonAggregateWithId(ctx context.Context, eventStore AggregateStore, aggregateType AggregateType, objectID string) (*CommonIdAggregate, error)
func NewCommonAggregateWithId ¶
func NewCommonAggregateWithId(aggregateType AggregateType, id string) *CommonIdAggregate
func (*CommonIdAggregate) HandleGRPCRequest ¶
func (CommonIdAggregate) NotFound ¶
func (a CommonIdAggregate) NotFound() bool
func (*CommonIdAggregate) SetWhen ¶
func (a *CommonIdAggregate) SetWhen(when func(event Event) error)
func (*CommonIdAggregate) When ¶
func (a *CommonIdAggregate) When(event Event) error
type CommonTenantIdAggregate ¶
type CommonTenantIdAggregate struct { *AggregateBase // contains filtered or unexported fields }
func NewCommonAggregate ¶
func NewCommonAggregate(aggregateType AggregateType) *CommonTenantIdAggregate
func NewCommonAggregateWithTenantAndId ¶
func NewCommonAggregateWithTenantAndId(aggregateType AggregateType, tenant, id string) *CommonTenantIdAggregate
func (*CommonTenantIdAggregate) HandleGRPCRequest ¶
func (CommonTenantIdAggregate) NotFound ¶
func (a CommonTenantIdAggregate) NotFound() bool
func (*CommonTenantIdAggregate) SetWhen ¶
func (a *CommonTenantIdAggregate) SetWhen(when func(event Event) error)
func (*CommonTenantIdAggregate) When ¶
func (a *CommonTenantIdAggregate) When(event Event) error
type CommonTenantIdTempAggregate ¶
type CommonTenantIdTempAggregate struct { *AggregateBase // contains filtered or unexported fields }
func LoadCommonTempAggregateWithTenantAndId ¶
func LoadCommonTempAggregateWithTenantAndId(ctx context.Context, eventStore AggregateStore, aggregateType AggregateType, tenant, objectID string) (*CommonTenantIdTempAggregate, error)
func NewCommonTempAggregate ¶
func NewCommonTempAggregate(aggregateType AggregateType) *CommonTenantIdTempAggregate
func NewCommonTempAggregateWithTenantAndId ¶
func NewCommonTempAggregateWithTenantAndId(aggregateType AggregateType, tenant, id string) *CommonTenantIdTempAggregate
func (*CommonTenantIdTempAggregate) HandleGRPCRequest ¶
func (*CommonTenantIdTempAggregate) IsTemporal ¶
func (a *CommonTenantIdTempAggregate) IsTemporal() bool
func (CommonTenantIdTempAggregate) NotFound ¶
func (a CommonTenantIdTempAggregate) NotFound() bool
func (*CommonTenantIdTempAggregate) PrepareStreamMetadata ¶
func (a *CommonTenantIdTempAggregate) PrepareStreamMetadata() esdb.StreamMetadata
func (*CommonTenantIdTempAggregate) SetWhen ¶
func (a *CommonTenantIdTempAggregate) SetWhen(when func(event Event) error)
func (*CommonTenantIdTempAggregate) When ¶
func (a *CommonTenantIdTempAggregate) When(event Event) error
type Event ¶
type Event struct { EventID string EventType string Data []byte Timestamp time.Time AggregateType AggregateType AggregateID string Version int64 Metadata []byte }
Event is an internal representation of an event, returned when the Aggregate uses NewEvent to create a new event. The events loaded from the db is represented by each DBs internal event type, implementing Event.
func NewBaseEvent ¶
NewBaseEvent new base Event constructor with configured EventID, Aggregate properties and Timestamp.
func NewEventFromRecorded ¶
func NewEventFromRecorded(event *esdb.RecordedEvent) Event
func ToAggregateEvent ¶
TODO
func (*Event) GetAggregateID ¶
GetAggregateID is the ID of the Aggregate that the Event belongs to
func (*Event) GetAggregateType ¶
func (e *Event) GetAggregateType() AggregateType
GetAggregateType is the AggregateType that the Event can be applied to.
func (*Event) GetEventType ¶
GetEventType returns the EventType of the event.
func (*Event) GetJsonData ¶
GetJsonData json unmarshal data attached to the Event.
func (*Event) GetJsonMetadata ¶
GetJsonMetadata unmarshal app-specific metadata serialized as json for the Event.
func (*Event) GetMetadata ¶
GetMetadata is app-specific metadata such as request ID, originating user etc.
func (*Event) GetTimeStamp ¶
GetTimeStamp get timestamp of the Event.
func (*Event) GetVersion ¶
GetVersion is the version of the Aggregate after the Event has been applied.
func (Event) MarshalJSON ¶
func (*Event) SetAggregateType ¶
func (e *Event) SetAggregateType(aggregateType AggregateType)
SetAggregateType set the AggregateType that the Event can be applied to.
func (*Event) SetJsonData ¶
SetJsonData serialize to json and set data attached to the Event.
func (*Event) SetMetadata ¶
SetMetadata add app-specific metadata serialized as json for the Event.
func (*Event) SetVersion ¶
SetVersion set the version of the Aggregate.
func (*Event) ToEventData ¶
type EventMetadata ¶
type EventStore ¶
type EventStore interface { // SaveEvents appends all events in the event stream to the store. SaveEvents(ctx context.Context, streamID string, events []Event) error // LoadEvents loads all events for the aggregate id from the store. LoadEvents(ctx context.Context, streamID string) ([]Event, error) }
EventStore is an interface for an event sourcing event store.
type EventType ¶
type EventType string
EventType is the type of any event, used as its unique identifier.
type InvalidEventTypeError ¶
type InvalidEventTypeError struct {
EventType string
}
func (InvalidEventTypeError) Error ¶
func (e InvalidEventTypeError) Error() string
type LoadAggregateOptions ¶
func NewLoadAggregateOptions ¶
func NewLoadAggregateOptions() *LoadAggregateOptions
func NewLoadAggregateOptionsWithRequired ¶
func NewLoadAggregateOptionsWithRequired() *LoadAggregateOptions
func (*LoadAggregateOptions) WithSkipLoadEvents ¶
func (o *LoadAggregateOptions) WithSkipLoadEvents() *LoadAggregateOptions
type Projection ¶
Projection When method works and process Event's like Aggregate's for interacting with read database.
type RecordedBaseEvent ¶
type RecordedBaseEvent struct { // Event's id. EventID string // Event's type. EventType string // Event's content type. ContentType string // The stream that event belongs to. StreamID string // The event's revision number. EventNumber uint64 // The event's transaction log position. Position esdb.Position // When the event was created. CreatedDate time.Time }
func NewRecordedBaseEventFromRecorded ¶
func NewRecordedBaseEventFromRecorded(recorded *esdb.RecordedEvent) RecordedBaseEvent