projection

package
v0.38.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 19, 2023 License: MIT Imports: 26 Imported by: 2

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func PartitionedCompetingWorkers added in v0.29.0

func PartitionedCompetingWorkers[K eventsourcing.ID](
	ctx context.Context,
	logger *slog.Logger,
	lockerFactory LockerFactory,
	subscriberFactory SubscriberFactory[K],
	topic string, partitions uint32,
	esRepo EventsRepository[K],
	projection Projection[K],
	splits int,
	resumeStore store.KVStore,
) ([]*worker.RunWorker, error)

PartitionedCompetingWorkers creates workers that will run depending if a lock was acquired or not.

If a locker is provided it is possible to balance workers between the several server instances using a worker.Balancer

func PartitionedWorkers added in v0.29.0

func PartitionedWorkers[K eventsourcing.ID](
	ctx context.Context,
	logger *slog.Logger,
	lockerFactory LockerFactory,
	subscriberFactory SubscriberFactory[K],
	topic string, partitions uint32,
	esRepo EventsRepository[K],
	projection Projection[K],
	splits int,
	resumeStore store.KVStore,
) ([]*worker.RunWorker, error)

PartitionedWorkers creates workers that will always run because a balancer locker is not provided. This assumes that the balancing will be done by the message broker.

func Project added in v0.32.0

func Project[K eventsourcing.ID](
	logger *slog.Logger,
	lockerFactory LockerFactory,
	esRepo EventsRepository[K],
	subscriber Consumer[K],
	projection Projection[K],
	splits int,
	resumeStore store.KVStore,
) *worker.RunWorker

NewProjector creates a subscriber to an event stream and process all events.

It will check if it needs to do a catch up. If so, it will try acquire a lock and run a projection catchup. If it is unable to acquire the lock because it is held by another process, it will wait for its release. In the end it will fire up the subscribers. All this will happen in a separate go routine allowing the service to completely start up.

After a successfully projection creation, subsequent start up will no longer execute the catch up function. This can be used to migrate projections, where a completely new projection will be populated.

The catch up function should replay all events from all event stores needed for this

func StartGrpcServer added in v0.31.0

func StartGrpcServer[K eventsourcing.ID](ctx context.Context, address string, repo EventsRepository[K]) error

Types

type Cancel added in v0.31.0

type Cancel func()

type CatchUpCallback added in v0.29.0

type CatchUpCallback func(context.Context, eventid.EventID) (eventid.EventID, error)

type CatchUpOptions added in v0.34.0

type CatchUpOptions struct {
	StartOffset   time.Duration
	CatchUpWindow time.Duration

	AggregateKinds []eventsourcing.Kind
	// Metadata filters on top of metadata. Every key of the map is ANDed with every OR of the values
	// eg: [{"geo": "EU"}, {"geo": "USA"}, {"membership": "prime"}] equals to:  geo IN ("EU", "USA") AND membership = "prime"
	Metadata store.MetadataFilter
}

type Consumer added in v0.15.0

type Consumer[K eventsourcing.ID] interface {
	TopicPartitions() (string, []uint32)
	// returns the subscriber Positions. The first Position should be 1
	Positions(ctx context.Context) (map[uint32]SubscriberPosition, error)
	StartConsumer(ctx context.Context, subPos map[uint32]SubscriberPosition, projectionName string, handle ConsumerHandler[K], options ...ConsumerOption[K]) error
}

type ConsumerHandler added in v0.34.0

type ConsumerHandler[K eventsourcing.ID] func(ctx context.Context, e *sink.Message[K], partition uint32, seq uint64) error

type ConsumerOption

type ConsumerOption[K eventsourcing.ID] func(*ConsumerOptions[K])

func WithAckWait added in v0.25.0

func WithAckWait[K eventsourcing.ID](ackWait time.Duration) ConsumerOption[K]

func WithFilter

func WithFilter[K eventsourcing.ID](filter func(e *sink.Message[K]) bool) ConsumerOption[K]

type ConsumerOptions

type ConsumerOptions[K eventsourcing.ID] struct {
	Filter  func(e *sink.Message[K]) bool
	AckWait time.Duration
}

type ConsumerTopic added in v0.34.0

type ConsumerTopic struct {
	Topic      string
	Partitions []uint32
}

type Event added in v0.30.0

type Event[K eventsourcing.ID] struct {
	ID               eventid.EventID
	AggregateID      K
	AggregateVersion uint32
	AggregateKind    eventsourcing.Kind
	Kind             eventsourcing.Kind
	Body             encoding.Base64
	IdempotencyKey   string
	Metadata         eventsourcing.Metadata
	CreatedAt        time.Time
}

func FromEvent added in v0.30.0

func FromEvent[K eventsourcing.ID](e *eventsourcing.Event[K]) *Event[K]

func FromMessage added in v0.30.0

func FromMessage[K eventsourcing.ID](m *sink.Message[K]) *Event[K]

type EventsRepository added in v0.32.0

type EventsRepository[K eventsourcing.ID] interface {
	GetEvents(ctx context.Context, afterEventID eventid.EventID, untilEventID eventid.EventID, batchSize int, filter store.Filter) ([]*eventsourcing.Event[K], error)
}

type GrpcRepository added in v0.31.0

type GrpcRepository[K eventsourcing.ID, PK eventsourcing.IDPt[K]] struct {
	// contains filtered or unexported fields
}

func NewGrpcRepository added in v0.31.0

func NewGrpcRepository[K eventsourcing.ID, PK eventsourcing.IDPt[K]](address string) GrpcRepository[K, PK]

func (GrpcRepository[K, PK]) GetEvents added in v0.31.0

func (c GrpcRepository[K, PK]) GetEvents(ctx context.Context, after, until eventid.EventID, limit int, filter store.Filter) ([]*eventsourcing.Event[K], error)

type GrpcServer added in v0.31.0

type GrpcServer[K eventsourcing.ID] struct {
	// contains filtered or unexported fields
}

func (*GrpcServer[K]) GetEvents added in v0.31.0

func (s *GrpcServer[K]) GetEvents(ctx context.Context, r *pb.GetEventsRequest) (*pb.GetEventsReply, error)

type LockerFactory added in v0.15.0

type LockerFactory func(lockName string) lock.Locker

type MessageHandlerFunc added in v0.30.0

type MessageHandlerFunc[K eventsourcing.ID] func(ctx context.Context, e *sink.Message[K]) error

type Option added in v0.31.0

type Option[K eventsourcing.ID] func(*Player[K])

func WithBatchSize added in v0.31.0

func WithBatchSize[K eventsourcing.ID](batchSize int) Option[K]

func WithCustomFilter added in v0.31.0

func WithCustomFilter[K eventsourcing.ID](fn func(events *eventsourcing.Event[K]) bool) Option[K]

type Player added in v0.31.0

type Player[K eventsourcing.ID] struct {
	// contains filtered or unexported fields
}

func NewPlayer added in v0.32.0

func NewPlayer[K eventsourcing.ID](repository EventsRepository[K], options ...Option[K]) Player[K]

NewPlayer instantiates a new Player.

func (Player[K]) Replay added in v0.31.0

func (p Player[K]) Replay(ctx context.Context, handler MessageHandlerFunc[K], afterEventID, untilEventID eventid.EventID, filters ...store.FilterOption) (eventid.EventID, error)

type Projection added in v0.32.0

type Projection[K eventsourcing.ID] interface {
	Name() string
	CatchUpOptions() CatchUpOptions
	Handle(ctx context.Context, e *sink.Message[K]) error
}

type ResumeKey added in v0.25.0

type ResumeKey struct {
	// contains filtered or unexported fields
}

ResumeKey is used to retrieve the last event id to replay messages directly from the event store.

func NewResumeKey added in v0.34.0

func NewResumeKey(projectionName, topic string, partition uint32) (_ ResumeKey, er error)

func (ResumeKey) Partition added in v0.34.0

func (r ResumeKey) Partition() uint32

func (ResumeKey) Projection added in v0.29.0

func (r ResumeKey) Projection() string

func (ResumeKey) String added in v0.25.0

func (r ResumeKey) String() string

func (ResumeKey) Topic added in v0.25.0

func (r ResumeKey) Topic() string

type Start added in v0.31.0

type Start int
const (
	END Start = iota
	BEGINNING
	SEQUENCE
)

type StartOption added in v0.31.0

type StartOption struct {
	// contains filtered or unexported fields
}

func StartAt added in v0.31.0

func StartAt(sequence uint64) StartOption

func StartBeginning added in v0.31.0

func StartBeginning() StartOption

func StartEnd added in v0.31.0

func StartEnd() StartOption

func (StartOption) AfterSeq added in v0.31.0

func (so StartOption) AfterSeq() uint64

func (StartOption) StartFrom added in v0.31.0

func (so StartOption) StartFrom() Start

type SubscriberFactory added in v0.29.0

type SubscriberFactory[K eventsourcing.ID] func(context.Context, ResumeKey) Consumer[K]

type SubscriberPosition added in v0.34.0

type SubscriberPosition struct {
	EventID  eventid.EventID
	Position uint64
}

type Token added in v0.29.0

type Token struct {
	// contains filtered or unexported fields
}

func NewCatchupToken added in v0.34.0

func NewCatchupToken(eventID eventid.EventID) Token

func NewConsumerToken added in v0.34.0

func NewConsumerToken(sequence uint64) Token

func ParseToken added in v0.29.0

func ParseToken(s string) (_ Token, e error)

func (Token) CatchupEventID added in v0.34.0

func (t Token) CatchupEventID() eventid.EventID

func (Token) ConsumerSequence added in v0.34.0

func (t Token) ConsumerSequence() uint64

func (Token) IsEmpty added in v0.29.0

func (t Token) IsEmpty() bool

func (Token) IsZero added in v0.29.0

func (t Token) IsZero() bool

func (Token) Kind added in v0.29.0

func (t Token) Kind() TokenKind

func (Token) String added in v0.29.0

func (t Token) String() string

type TokenKind added in v0.29.0

type TokenKind string
const (
	CatchUpToken  TokenKind = "catchup"
	ConsumerToken TokenKind = "consumer"
)

type WaitLockerFactory added in v0.29.0

type WaitLockerFactory func(lockName string) lock.WaitLocker

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL