eventstore

package
v0.0.0-...-579c8bd Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 6, 2024 License: MIT Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func RegisterEventMapper

func RegisterEventMapper(aggregateType AggregateType, eventType EventType, eventMapper func(Event) (Event, error))

Types

type Aggregate

type Aggregate struct {
	TenantId  string        `json:"-"`
	Id        string        `json:"-"`
	Owner     string        `json:"-"`
	Type      AggregateType `json:"-"`
	Version   int           `json:"-"`
	Sequence  int64         `json:"-"`
	CreatedAt time.Time     `json:"-"`
}

type AggregateType

type AggregateType string

type AggregateVersion

type AggregateVersion string

func (AggregateVersion) Int

func (av AggregateVersion) Int() (error, int)

type Command

type Command interface {
	GetPayload() any
	// contains filtered or unexported methods
}

type Event

type Event interface {
	GetId() string
	GetPayloadBytes() []byte
	UnmarshalPayload(ptr any) error
	GetPosition() decimal.Decimal
	GetCreatedAt() time.Time
	// contains filtered or unexported methods
}

type EventBase

type EventBase struct {
	Aggregate     *Aggregate      `json:"-"`
	Id            string          `json:"-"`
	Type          EventType       `json:"-"`
	Payload       []byte          `json:"-"`
	Creator       *string         `json:"-"`
	CorrelationId *string         `json:"-"`
	CausationId   *string         `json:"-"`
	Position      decimal.Decimal `json:"-"`
	CreatedAt     time.Time       `json:"-"`
}

func EventBaseFromEvent

func EventBaseFromEvent(event Event) *EventBase

func (*EventBase) GetAggregate

func (eb *EventBase) GetAggregate() *Aggregate

func (*EventBase) GetCausationId

func (eb *EventBase) GetCausationId() *string

func (*EventBase) GetCorrelationId

func (eb *EventBase) GetCorrelationId() *string

func (*EventBase) GetCreatedAt

func (eb *EventBase) GetCreatedAt() time.Time

func (*EventBase) GetCreator

func (eb *EventBase) GetCreator() *string

func (*EventBase) GetId

func (eb *EventBase) GetId() string

func (*EventBase) GetPayloadBytes

func (eb *EventBase) GetPayloadBytes() []byte

func (*EventBase) GetPosition

func (eb *EventBase) GetPosition() decimal.Decimal

func (*EventBase) GetType

func (eb *EventBase) GetType() EventType

func (*EventBase) UnmarshalPayload

func (eb *EventBase) UnmarshalPayload(ptr any) error

type EventStore

type EventStore struct {
	// contains filtered or unexported fields
}

func NewEventStore

func NewEventStore(db *database.Db) *EventStore

func (*EventStore) NewSession

func (es *EventStore) NewSession(ctx context.Context) (*Session, error)

func (*EventStore) Push

func (es *EventStore) Push(ctx context.Context, commands ...Command) ([]Event, error)

func (*EventStore) Reduce

func (es *EventStore) Reduce(ctx context.Context, reducer Reducer) error

type EventType

type EventType string

type Query

type Query struct {
	// contains filtered or unexported fields
}

type QueryBuilder

type QueryBuilder struct {
	// contains filtered or unexported fields
}

func NewQuery

func NewQuery() *QueryBuilder

func (*QueryBuilder) ToSQL

func (qb *QueryBuilder) ToSQL() (string, []any, error)

type ReadModel

type ReadModel struct {
	TenantId          string          `json:"-"`
	AggregateId       string          `json:"-"`
	AggregateSequence int64           `json:"-"`
	Owner             string          `json:"-"`
	Events            []Event         `json:"-"`
	Position          decimal.Decimal `json:"-"`
	CreatedAt         time.Time       `json:"-"`
	UpdatedAt         time.Time       `json:"-"`
}

func (*ReadModel) AppendEvents

func (rm *ReadModel) AppendEvents(events ...Event)

func (*ReadModel) Query

func (rm *ReadModel) Query() *QueryBuilder

func (*ReadModel) Reduce

func (rm *ReadModel) Reduce() error

type Reducer

type Reducer interface {
	Query() *QueryBuilder
	AppendEvents(events ...Event)
	Reduce() error
}

type Session

type Session struct {
	// contains filtered or unexported fields
}

func (*Session) Push

func (s *Session) Push(commands ...Command) *Session

func (*Session) Reduce

func (s *Session) Reduce(ctx context.Context, reducer Reducer) error

func (*Session) SaveChanges

func (s *Session) SaveChanges(ctx context.Context) ([]Event, error)

type Subscription

type Subscription struct {
	Events chan Event
	// contains filtered or unexported fields
}

func SubscribeAggregates

func SubscribeAggregates(eventQueue chan Event, aggregates ...AggregateType) *Subscription

func SubscribeEventTypes

func SubscribeEventTypes(eventQueue chan Event, types map[AggregateType][]EventType) *Subscription

func (*Subscription) Unsubscribe

func (s *Subscription) Unsubscribe()

type WriteModel

type WriteModel struct {
	TenantId          string          `json:"-"`
	AggregateId       string          `json:"-"`
	AggregateSequence int64           `json:"-"`
	Owner             string          `json:"-"`
	Events            []Event         `json:"-"`
	Position          decimal.Decimal `json:"-"`
	CreatedAt         time.Time       `json:"-"`
	UpdatedAt         time.Time       `json:"-"`
}

func (*WriteModel) AppendEvents

func (wm *WriteModel) AppendEvents(events ...Event)

func (*WriteModel) Query

func (wm *WriteModel) Query() *QueryBuilder

func (*WriteModel) Reduce

func (wm *WriteModel) Reduce() error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL