Documentation
¶
Index ¶
- Constants
- Variables
- type ConsumerGroupOffsetsChecker
- type KafkaConsumerGroupFactory
- type KafkaConsumerGroupOffsetsChecker
- type KafkaConsumerHandler
- type NoopConsumerGroupOffsetsChecker
- type SaramaConsumerHandler
- func (consumer *SaramaConsumerHandler) Cleanup(session sarama.ConsumerGroupSession) error
- func (consumer *SaramaConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (consumer *SaramaConsumerHandler) Setup(session sarama.ConsumerGroupSession) error
- type SaramaConsumerHandlerOption
- type SaramaConsumerLifecycleListener
Constants ¶
const ( OffsetCheckRetryTimeout = 60 * time.Second OffsetCheckRetryInterval = 5 * time.Second )
Variables ¶
var GroupLockedError = fmt.Errorf("managed group lock failed: locked by a different token")
GroupLockedError is the error returned if the a locked managed group is given a different token for access
Functions ¶
This section is empty.
Types ¶
type ConsumerGroupOffsetsChecker ¶ added in v0.24.6
type KafkaConsumerGroupFactory ¶
type KafkaConsumerGroupFactory interface {
StartConsumerGroup(ctx context.Context, groupID string, topics []string, handler KafkaConsumerHandler, channelRef types.NamespacedName, options ...SaramaConsumerHandlerOption) (sarama.ConsumerGroup, error)
}
KafkaConsumerGroupFactory creates the ConsumerGroup and start consuming the specified topic
func NewConsumerGroupFactory ¶
func NewConsumerGroupFactory(addrs []string, config *sarama.Config, offsetsChecker ConsumerGroupOffsetsChecker, enqueue func(ref types.NamespacedName)) KafkaConsumerGroupFactory
type KafkaConsumerGroupOffsetsChecker ¶ added in v0.24.6
type KafkaConsumerGroupOffsetsChecker struct { }
func (*KafkaConsumerGroupOffsetsChecker) WaitForOffsetsInitialization ¶ added in v0.24.6
type KafkaConsumerHandler ¶
type KafkaConsumerHandler interface { // When this function returns true, the consumer group offset is marked as consumed. // The returned error is enqueued in errors channel. Handle(context context.Context, message *sarama.ConsumerMessage) (bool, error) SetReady(partition int32, ready bool) GetConsumerGroup() string }
type NoopConsumerGroupOffsetsChecker ¶ added in v0.24.6
type NoopConsumerGroupOffsetsChecker struct { }
func (*NoopConsumerGroupOffsetsChecker) WaitForOffsetsInitialization ¶ added in v0.24.6
type SaramaConsumerHandler ¶
type SaramaConsumerHandler struct {
// contains filtered or unexported fields
}
ConsumerHandler implements sarama.ConsumerGroupHandler and provides some glue code to simplify message handling You must implement KafkaConsumerHandler and create a new SaramaConsumerHandler with it
func NewConsumerHandler ¶
func NewConsumerHandler(logger *zap.SugaredLogger, handler KafkaConsumerHandler, errorsCh chan error, options ...SaramaConsumerHandlerOption) SaramaConsumerHandler
func (*SaramaConsumerHandler) Cleanup ¶
func (consumer *SaramaConsumerHandler) Cleanup(session sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*SaramaConsumerHandler) ConsumeClaim ¶
func (consumer *SaramaConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (*SaramaConsumerHandler) Setup ¶
func (consumer *SaramaConsumerHandler) Setup(session sarama.ConsumerGroupSession) error
Setup is run at the beginning of a new session, before ConsumeClaim
type SaramaConsumerHandlerOption ¶ added in v0.23.0
type SaramaConsumerHandlerOption func(*SaramaConsumerHandler)
func WithSaramaConsumerLifecycleListener ¶ added in v0.23.0
func WithSaramaConsumerLifecycleListener(listener SaramaConsumerLifecycleListener) SaramaConsumerHandlerOption
func WithTimeout ¶ added in v0.23.1
func WithTimeout(timeout time.Duration) SaramaConsumerHandlerOption
WithTimeout configures the request timeout. Default is set to 60s.
type SaramaConsumerLifecycleListener ¶ added in v0.23.0
type SaramaConsumerLifecycleListener interface { // Setup is invoked when the consumer is joining the session Setup(sess sarama.ConsumerGroupSession) // Cleanup is invoked when the consumer is leaving the session Cleanup(sess sarama.ConsumerGroupSession) }