Documentation ¶
Index ¶
- type Consumer
- func (c *Consumer[K]) Cleanup(session sarama.ConsumerGroupSession) error
- func (c *Consumer[K]) ClearSubPos()
- func (c *Consumer[K]) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (c *Consumer[K]) Setup(session sarama.ConsumerGroupSession) (er error)
- func (c *Consumer[K]) SubPos() map[int32]int64
- type SubOption
- type Subscriber
- func NewSubscriberWithBrokers[K eventsourcing.ID, PK eventsourcing.IDPt[K]](ctx context.Context, logger *slog.Logger, brokers []string, topic string, ...) (*Subscriber[K], error)
- func NewSubscriberWithClient[K eventsourcing.ID, PK eventsourcing.IDPt[K]](logger *slog.Logger, client sarama.Client, topic string, ...) (*Subscriber[K], error)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Consumer ¶
type Consumer[K eventsourcing.ID] struct { // contains filtered or unexported fields }
Consumer represents a Sarama consumer group consumer
func (*Consumer[K]) Cleanup ¶
func (c *Consumer[K]) Cleanup(session sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*Consumer[K]) ClearSubPos ¶ added in v0.39.1
func (c *Consumer[K]) ClearSubPos()
func (*Consumer[K]) ConsumeClaim ¶
func (c *Consumer[K]) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). Once the Messages() channel is closed, the Handler must finish its processing loop and exit.
type SubOption ¶
type SubOption[K eventsourcing.ID] func(*Subscriber[K])
func WithMsgCodec ¶
func WithMsgCodec[K eventsourcing.ID](codec sink.Codec[K]) SubOption[K]
type Subscriber ¶
type Subscriber[K eventsourcing.ID] struct { // contains filtered or unexported fields }
func NewSubscriberWithBrokers ¶
func NewSubscriberWithBrokers[K eventsourcing.ID, PK eventsourcing.IDPt[K]]( ctx context.Context, logger *slog.Logger, brokers []string, topic string, config *sarama.Config, ) (*Subscriber[K], error)
func NewSubscriberWithClient ¶
func NewSubscriberWithClient[K eventsourcing.ID, PK eventsourcing.IDPt[K]]( logger *slog.Logger, client sarama.Client, topic string, options ...SubOption[K], ) (*Subscriber[K], error)
func (*Subscriber[K]) AddShutdownHook ¶
func (s *Subscriber[K]) AddShutdownHook(hook func())
func (*Subscriber[K]) Shutdown ¶
func (s *Subscriber[K]) Shutdown()
func (*Subscriber[K]) StartConsumer ¶
func (s *Subscriber[K]) StartConsumer(ctx context.Context, startTime *time.Time, projName string, handler projection.ConsumerHandler[K], options ...projection.ConsumerOption[K]) error
func (*Subscriber[K]) Topic ¶ added in v0.39.0
func (s *Subscriber[K]) Topic() (topic string)
Click to show internal directories.
Click to hide internal directories.