eventstore

package
v0.0.0-...-293a095 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 17, 2024 License: MIT Imports: 22 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func RegisterEventMapper

func RegisterEventMapper(aggregateType AggregateType, eventType EventType, eventMapper func(Event) (Event, error))

Types

type Aggregate

type Aggregate struct {
	TenantId      TenantId         `json:"-"`
	Type          AggregateType    `json:"-"`
	Version       AggregateVersion `json:"-"`
	Id            AggregateId      `json:"-"`
	ResourceOwner string           `json:"-"`
	Sequence      uint64           `json:"-"`
}

func NewAggregate

func NewAggregate(
	tenantId TenantId,
	aggregateType AggregateType,
	aggregateVersion AggregateVersion,
	aggregateId AggregateId,
	resourceOwner string,
	sequence uint64,
) *Aggregate

type AggregateId

type AggregateId string

func (AggregateId) String

func (ai AggregateId) String() string

type AggregateType

type AggregateType string

func (AggregateType) String

func (at AggregateType) String() string

type AggregateVersion

type AggregateVersion string

func (AggregateVersion) Int

func (av AggregateVersion) Int() (int, error)

func (AggregateVersion) String

func (av AggregateVersion) String() string

func (AggregateVersion) Validate

func (av AggregateVersion) Validate() error

type Command

type Command interface {
	GetPayload() any
	GetUniqueConstraints() []*UniqueConstraint
	// contains filtered or unexported methods
}

type Event

type Event interface {
	GetPosition() decimal.Decimal
	GetCreatedAt() time.Time
	UnmarshalPayload(ptr any) error
	// contains filtered or unexported methods
}

func GenericEventMapper

func GenericEventMapper[T any, PT EventBaseSetter[T]](event Event) (Event, error)

type EventBase

type EventBase struct {
	Aggregate     *Aggregate      `json:"-"`
	EventType     EventType       `json:"-"`
	Payload       []byte          `json:"-"`
	Creator       string          `json:"-"`
	CorrelationId *string         `json:"-"`
	CausationId   *string         `json:"-"`
	Position      decimal.Decimal `json:"-"`
	CreatedAt     time.Time       `json:"-"`
}

func EventBaseFromEvent

func EventBaseFromEvent(event Event) *EventBase

func (*EventBase) GetAggregate

func (eb *EventBase) GetAggregate() *Aggregate

func (*EventBase) GetCausationId

func (eb *EventBase) GetCausationId() *string

func (*EventBase) GetCorrelationId

func (eb *EventBase) GetCorrelationId() *string

func (*EventBase) GetCreatedAt

func (eb *EventBase) GetCreatedAt() time.Time

func (*EventBase) GetCreator

func (eb *EventBase) GetCreator() string

func (*EventBase) GetEventType

func (eb *EventBase) GetEventType() EventType

func (*EventBase) GetPosition

func (eb *EventBase) GetPosition() decimal.Decimal

func (*EventBase) SetCausationId

func (eb *EventBase) SetCausationId(causationId string) *EventBase

func (*EventBase) SetCorrelationId

func (eb *EventBase) SetCorrelationId(correlationId string) *EventBase

func (*EventBase) UnmarshalPayload

func (eb *EventBase) UnmarshalPayload(ptr any) error

type EventBaseSetter

type EventBaseSetter[T any] interface {
	Event
	SetBaseEvent(*EventBase)
	*T
}

type EventStore

type EventStore struct {
	PushTimeout time.Duration
	// contains filtered or unexported fields
}

func (*EventStore) Push

func (es *EventStore) Push(ctx context.Context, commands ...Command) (events []Event, err error)

type EventType

type EventType string

func (EventType) String

func (et EventType) String() string

type ReadModel

type ReadModel struct {
	TenantId      TenantId
	AggregateId   AggregateId
	Events        []Event
	ResourceOwner string
	Sequence      uint64
	Position      decimal.Decimal
	CreatedAt     time.Time
	UpdatedAt     time.Time
}

func (*ReadModel) AppendEvents

func (rm *ReadModel) AppendEvents(events ...Event)

func (*ReadModel) Reduce

func (rm *ReadModel) Reduce() error

type Subscription

type Subscription struct {
	Events chan Event
	// contains filtered or unexported fields
}

func SubscribeAggregates

func SubscribeAggregates(eventQueue chan Event, aggregates ...AggregateType) *Subscription

SubscribeAggregates subscribes for all events on the given aggregates

func SubscribeEventTypes

func SubscribeEventTypes(eventQueue chan Event, types map[AggregateType][]EventType) *Subscription

SubscribeEventTypes subscribes for the given event types if no event types are provided the subscription is for all events of the aggregate

func (*Subscription) Unsubscribe

func (s *Subscription) Unsubscribe()

type TenantId

type TenantId string

func (TenantId) String

func (ti TenantId) String() string

type UniqueConstraint

type UniqueConstraint struct {
	UniqueType   UniqueType
	UniqueValue  string
	Action       UniqueConstraintAction
	ErrorMessage string
	IsGlobal     bool
}

func NewUniqueConstraint

func NewUniqueConstraint(
	uniqueType UniqueType,
	uniqueValue string,
	uniqueConstraintAction UniqueConstraintAction,
	errorMessage string,
	isGlobal bool,
) *UniqueConstraint

type UniqueConstraintAction

type UniqueConstraintAction string
const (
	UniqueConstraintActionAdd          UniqueConstraintAction = "add"
	UniqueConstraintActionRemove       UniqueConstraintAction = "remove"
	UniqueConstraintActionTenantRemove UniqueConstraintAction = "tenant_remove"
)

type UniqueType

type UniqueType string

func (UniqueType) String

func (ut UniqueType) String() string

type WriteModel

type WriteModel struct {
	TenantId      TenantId
	AggregateId   AggregateId
	Events        []Event
	ResourceOwner string
	Sequence      uint64
	UpdatedAt     time.Time
}

func (*WriteModel) AppendEvents

func (wm *WriteModel) AppendEvents(events ...Event)

func (*WriteModel) Reduce

func (wm *WriteModel) Reduce() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL