Documentation ¶
Index ¶
- Variables
- type KafkaConsumerGroupFactory
- type KafkaConsumerGroupManager
- type KafkaConsumerHandler
- type SaramaConsumerHandler
- func (consumer *SaramaConsumerHandler) Cleanup(session sarama.ConsumerGroupSession) error
- func (consumer *SaramaConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (consumer *SaramaConsumerHandler) Setup(session sarama.ConsumerGroupSession) error
- type SaramaConsumerHandlerOption
- type SaramaConsumerLifecycleListener
Constants ¶
This section is empty.
Variables ¶
var GroupLockedError = fmt.Errorf("managed group lock failed: locked by a different token")
GroupLockedError is the error returned if the a locked managed group is given a different token for access
Functions ¶
This section is empty.
Types ¶
type KafkaConsumerGroupFactory ¶
type KafkaConsumerGroupFactory interface {
StartConsumerGroup(groupID string, topics []string, logger *zap.SugaredLogger, handler KafkaConsumerHandler, options ...SaramaConsumerHandlerOption) (sarama.ConsumerGroup, error)
}
KafkaConsumerGroupFactory creates the ConsumerGroup and start consuming the specified topic
func NewConsumerGroupFactory ¶
func NewConsumerGroupFactory(addrs []string, config *sarama.Config) KafkaConsumerGroupFactory
type KafkaConsumerGroupManager ¶ added in v0.24.0
type KafkaConsumerGroupManager interface { Reconfigure(brokers []string, config *sarama.Config) error StartConsumerGroup(groupId string, topics []string, logger *zap.SugaredLogger, handler KafkaConsumerHandler, options ...SaramaConsumerHandlerOption) error CloseConsumerGroup(groupId string) error Errors(groupId string) <-chan error IsManaged(groupId string) bool }
KafkaConsumerGroupManager keeps track of Sarama consumer groups and handles messages from control-protocol clients
func NewConsumerGroupManager ¶ added in v0.24.0
func NewConsumerGroupManager(logger *zap.Logger, serverHandler controlprotocol.ServerHandler, brokers []string, config *sarama.Config) KafkaConsumerGroupManager
NewConsumerGroupManager returns a new kafkaConsumerGroupManagerImpl as a KafkaConsumerGroupManager interface
type KafkaConsumerHandler ¶
type KafkaConsumerHandler interface { // When this function returns true, the consumer group offset is marked as consumed. // The returned error is enqueued in errors channel. Handle(context context.Context, message *sarama.ConsumerMessage) (bool, error) SetReady(partition int32, ready bool) GetConsumerGroup() string }
type SaramaConsumerHandler ¶
type SaramaConsumerHandler struct {
// contains filtered or unexported fields
}
ConsumerHandler implements sarama.ConsumerGroupHandler and provides some glue code to simplify message handling You must implement KafkaConsumerHandler and create a new SaramaConsumerHandler with it
func NewConsumerHandler ¶
func NewConsumerHandler(logger *zap.SugaredLogger, handler KafkaConsumerHandler, errorsCh chan error, options ...SaramaConsumerHandlerOption) SaramaConsumerHandler
func (*SaramaConsumerHandler) Cleanup ¶
func (consumer *SaramaConsumerHandler) Cleanup(session sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*SaramaConsumerHandler) ConsumeClaim ¶
func (consumer *SaramaConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (*SaramaConsumerHandler) Setup ¶
func (consumer *SaramaConsumerHandler) Setup(session sarama.ConsumerGroupSession) error
Setup is run at the beginning of a new session, before ConsumeClaim
type SaramaConsumerHandlerOption ¶ added in v0.23.0
type SaramaConsumerHandlerOption func(*SaramaConsumerHandler)
func WithSaramaConsumerLifecycleListener ¶ added in v0.23.0
func WithSaramaConsumerLifecycleListener(listener SaramaConsumerLifecycleListener) SaramaConsumerHandlerOption
func WithTimeout ¶ added in v0.23.1
func WithTimeout(timeout time.Duration) SaramaConsumerHandlerOption
WithTimeout configures the request timeout. Default is set to 60s.
type SaramaConsumerLifecycleListener ¶ added in v0.23.0
type SaramaConsumerLifecycleListener interface { // Setup is invoked when the consumer is joining the session Setup(sess sarama.ConsumerGroupSession) // Cleanup is invoked when the consumer is leaving the session Cleanup(sess sarama.ConsumerGroupSession) }