Documentation ¶
Index ¶
- func PartitionedCompetingWorkers[K eventsourcing.ID](ctx context.Context, logger *slog.Logger, lockerFactory LockerFactory, ...) ([]*worker.RunWorker, error)
- func PartitionedWorkers[K eventsourcing.ID](ctx context.Context, logger *slog.Logger, lockerFactory LockerFactory, ...) ([]*worker.RunWorker, error)
- func Project[K eventsourcing.ID](logger *slog.Logger, lockerFactory LockerFactory, esRepo EventsRepository[K], ...) *worker.RunWorker
- func StartGrpcServer[K eventsourcing.ID](ctx context.Context, address string, repo EventsRepository[K]) error
- type Cancel
- type CatchUpCallback
- type CatchUpOptions
- type Checkpoints
- type Consumer
- type ConsumerHandler
- type ConsumerOption
- type ConsumerOptions
- type ConsumerTopic
- type Event
- type EventsRepository
- type GetEventsReply
- type GrpcRepository
- type GrpcServer
- type HandleEventsReply
- type LockerFactory
- type Message
- type MessageHandlerFunc
- type MessageKind
- type Meta
- type Option
- type Player
- type Projection
- type ResumeKey
- type Start
- type StartOption
- type SubscriberFactory
- type Token
- type WaitLockerFactory
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func PartitionedCompetingWorkers ¶ added in v0.29.0
func PartitionedCompetingWorkers[K eventsourcing.ID]( ctx context.Context, logger *slog.Logger, lockerFactory LockerFactory, subscriberFactory SubscriberFactory[K], topic string, partitions uint32, esRepo EventsRepository[K], projection Projection[K], splits int, resumeStore store.KVStore, ) ([]*worker.RunWorker, error)
PartitionedCompetingWorkers creates workers that will run depending if a lock was acquired or not.
If a locker is provided it is possible to balance workers between the several server instances using a worker.Balancer
func PartitionedWorkers ¶ added in v0.29.0
func PartitionedWorkers[K eventsourcing.ID]( ctx context.Context, logger *slog.Logger, lockerFactory LockerFactory, subscriberFactory SubscriberFactory[K], topic string, partitions uint32, esRepo EventsRepository[K], projection Projection[K], splits int, splitIds []int, resumeStore store.KVStore, ) ([]*worker.RunWorker, error)
PartitionedWorkers creates workers that will always run because a balancer locker is not provided. This assumes that the balancing will be done by the message broker.
func Project ¶ added in v0.32.0
func Project[K eventsourcing.ID]( logger *slog.Logger, lockerFactory LockerFactory, esRepo EventsRepository[K], subscriber Consumer[K], projection Projection[K], resumeStore store.KVStore, catchupSplits int, ) *worker.RunWorker
NewProjector creates a subscriber to an event stream and process all events.
It will check if it needs to do a catch up. If so, it will try acquire a lock and run a projection catchup. If it is unable to acquire the lock because it is held by another process, it will wait for its release. In the end it will fire up the subscribers. All this will happen in a separate go routine allowing the service to completely start up.
After a successfully projection creation, subsequent start up will no longer execute the catch up function. This can be used to migrate projections, where a completely new projection will be populated.
The catch up function should replay all events from all event stores needed for this
func StartGrpcServer ¶ added in v0.31.0
func StartGrpcServer[K eventsourcing.ID](ctx context.Context, address string, repo EventsRepository[K]) error
Types ¶
type CatchUpCallback ¶ added in v0.29.0
type CatchUpOptions ¶ added in v0.34.0
type CatchUpOptions struct { StartOffset time.Duration CatchUpWindow time.Duration UpdateResumeInterval time.Duration AggregateKinds []eventsourcing.Kind // Metadata filters on top of metadata. Every key of the map is ANDed with every OR of the values // eg: [{"geo": "EU"}, {"geo": "USA"}, {"membership": "prime"}] equals to: geo IN ("EU", "USA") AND membership = "prime" Metadata store.MetadataFilter }
type Checkpoints ¶ added in v0.39.0
type Checkpoints[K eventsourcing.ID] struct { // contains filtered or unexported fields }
func NewCheckpoints ¶ added in v0.39.0
func NewCheckpoints[K eventsourcing.ID](store store.KVStore, txRunner store.Tx) *Checkpoints[K]
type Consumer ¶ added in v0.15.0
type Consumer[K eventsourcing.ID] interface { Topic() string // returns the subscriber Positions. The first Position should be 1 StartConsumer(ctx context.Context, startTime *time.Time, projectionName string, handle ConsumerHandler[K], options ...ConsumerOption[K]) error }
type ConsumerHandler ¶ added in v0.34.0
type ConsumerOption ¶
type ConsumerOption[K eventsourcing.ID] func(*ConsumerOptions[K])
func WithAckWait ¶ added in v0.25.0
func WithAckWait[K eventsourcing.ID](ackWait time.Duration) ConsumerOption[K]
func WithFilter ¶
func WithFilter[K eventsourcing.ID](filter func(e *sink.Message[K]) bool) ConsumerOption[K]
type ConsumerOptions ¶
type ConsumerTopic ¶ added in v0.34.0
type Event ¶ added in v0.30.0
type Event[K eventsourcing.ID] struct { ID eventid.EventID AggregateID K AggregateVersion uint32 AggregateKind eventsourcing.Kind Kind eventsourcing.Kind Body encoding.Base64 Metadata eventsourcing.Metadata CreatedAt time.Time }
func FromEvent ¶ added in v0.30.0
func FromEvent[K eventsourcing.ID](e *eventsourcing.Event[K]) *Event[K]
func FromMessage ¶ added in v0.30.0
func FromMessage[K eventsourcing.ID](m *sink.Message[K]) *Event[K]
type EventsRepository ¶ added in v0.32.0
type GetEventsReply ¶ added in v0.39.0
type GetEventsReply[K eventsourcing.ID] struct { Events []*eventsourcing.Event[K] Error error }
type GrpcRepository ¶ added in v0.31.0
type GrpcRepository[K eventsourcing.ID, PK eventsourcing.IDPt[K]] struct { // contains filtered or unexported fields }
func NewGrpcRepository ¶ added in v0.31.0
func NewGrpcRepository[K eventsourcing.ID, PK eventsourcing.IDPt[K]](address string) GrpcRepository[K, PK]
type GrpcServer ¶ added in v0.31.0
type GrpcServer[K eventsourcing.ID] struct { // contains filtered or unexported fields }
func (*GrpcServer[K]) GetEvents ¶ added in v0.31.0
func (s *GrpcServer[K]) GetEvents(ctx context.Context, r *pb.GetEventsRequest) (*pb.GetEventsReply, error)
type HandleEventsReply ¶ added in v0.39.0
type LockerFactory ¶ added in v0.15.0
type Message ¶ added in v0.39.0
type Message[K eventsourcing.ID] struct { Meta Meta Message *sink.Message[K] }
type MessageHandlerFunc ¶ added in v0.30.0
type MessageKind ¶ added in v0.39.0
type MessageKind string
var ( MessageKindCatchup MessageKind = "catchup" MessageKindSwitch MessageKind = "switch" MessageKindLive MessageKind = "live" )
type Meta ¶ added in v0.31.0
type Meta struct { Name string Kind MessageKind Partition uint32 Sequence uint64 }
type Option ¶ added in v0.31.0
type Option[K eventsourcing.ID] func(*Player[K])
func WithBatchSize ¶ added in v0.31.0
func WithBatchSize[K eventsourcing.ID](batchSize int) Option[K]
func WithCustomFilter ¶ added in v0.31.0
func WithCustomFilter[K eventsourcing.ID](fn func(events *eventsourcing.Event[K]) bool) Option[K]
type Player ¶ added in v0.31.0
type Player[K eventsourcing.ID] struct { // contains filtered or unexported fields }
func NewPlayer ¶ added in v0.32.0
func NewPlayer[K eventsourcing.ID](repository EventsRepository[K], options ...Option[K]) Player[K]
NewPlayer instantiates a new Player.
type Projection ¶ added in v0.32.0
type Projection[K eventsourcing.ID] interface { Name() string CatchUpOptions() CatchUpOptions Handle(ctx context.Context, e Message[K]) error }
type ResumeKey ¶ added in v0.25.0
type ResumeKey struct {
// contains filtered or unexported fields
}
ResumeKey is used to retrieve the last event id to replay messages directly from the event store.
func NewResumeKey ¶ added in v0.34.0
func (ResumeKey) Projection ¶ added in v0.29.0
type StartOption ¶ added in v0.31.0
type StartOption struct {
// contains filtered or unexported fields
}
func StartAt ¶ added in v0.31.0
func StartAt(sequence uint64) StartOption
func StartBeginning ¶ added in v0.31.0
func StartBeginning() StartOption
func StartEnd ¶ added in v0.31.0
func StartEnd() StartOption
func (StartOption) AfterSeq ¶ added in v0.31.0
func (so StartOption) AfterSeq() uint64
func (StartOption) StartFrom ¶ added in v0.31.0
func (so StartOption) StartFrom() Start
type SubscriberFactory ¶ added in v0.29.0
type Token ¶ added in v0.29.0
func ParseToken ¶ added in v0.29.0
type WaitLockerFactory ¶ added in v0.29.0
type WaitLockerFactory func(lockName string) dist.WaitLocker