Versions in this module Expand all Collapse all v1 v1.0.0 Sep 7, 2022 Changes in this version + type Client struct + func NewClient(addrs []string, config *Config) (*Client, error) + func (c *Client) ClusterConfig() *Config + type Config struct + Group struct{ ... } + func NewConfig() *Config + func (c *Config) Validate() error + type Consumer struct + func NewConsumer(addrs []string, groupID string, topics []string, config *Config) (*Consumer, error) + func NewConsumerFromClient(client *Client, groupID string, topics []string) (*Consumer, error) + func (c *Consumer) Close() (err error) + func (c *Consumer) CommitOffsets() error + func (c *Consumer) Errors() <-chan error + func (c *Consumer) HighWaterMarks() map[string]map[int32]int64 + func (c *Consumer) MarkOffset(msg *sarama.ConsumerMessage, metadata string) + func (c *Consumer) MarkOffsets(s *OffsetStash) + func (c *Consumer) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) + func (c *Consumer) Messages() <-chan *sarama.ConsumerMessage + func (c *Consumer) Notifications() <-chan *Notification + func (c *Consumer) Partitions() <-chan PartitionConsumer + func (c *Consumer) ResetOffset(msg *sarama.ConsumerMessage, metadata string) + func (c *Consumer) ResetOffsets(s *OffsetStash) + func (c *Consumer) ResetPartitionOffset(topic string, partition int32, offset int64, metadata string) + func (c *Consumer) Subscriptions() map[string][]int32 + type ConsumerMode uint8 + const ConsumerModeMultiplex + const ConsumerModePartitions + type Error struct + Ctx string + type Notification struct + Claimed map[string][]int32 + Current map[string][]int32 + Released map[string][]int32 + Type NotificationType + type NotificationType uint8 + const RebalanceError + const RebalanceOK + const RebalanceStart + const UnknownNotification + func (t NotificationType) String() string + type OffsetStash struct + func NewOffsetStash() *OffsetStash + func (s *OffsetStash) MarkOffset(msg *sarama.ConsumerMessage, metadata string) + func (s *OffsetStash) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) + func (s *OffsetStash) Offsets() map[string]int64 + func (s *OffsetStash) ResetOffset(msg *sarama.ConsumerMessage, metadata string) + func (s *OffsetStash) ResetPartitionOffset(topic string, partition int32, offset int64, metadata string) + type PartitionConsumer interface + InitialOffset func() int64 + MarkOffset func(offset int64, metadata string) + Partition func() int32 + ResetOffset func(offset int64, metadata string) + Topic func() string + type Strategy string + const StrategyRange + const StrategyRoundRobin