Documentation ¶
Index ¶
- Constants
- Variables
- type ConsumerGroupOffsetsChecker
- type EventIndex
- type KafkaConsumerGroupFactory
- type KafkaConsumerGroupManager
- type KafkaConsumerGroupOffsetsChecker
- type KafkaConsumerHandler
- type ManagerEvent
- type NoopConsumerGroupOffsetsChecker
- type ReconfigureError
- type SaramaConsumerHandler
- func (consumer *SaramaConsumerHandler) Cleanup(session sarama.ConsumerGroupSession) error
- func (consumer *SaramaConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (consumer *SaramaConsumerHandler) Setup(session sarama.ConsumerGroupSession) error
- type SaramaConsumerHandlerOption
- type SaramaConsumerLifecycleListener
- type SubscriberStatus
- type SubscriberStatusMap
Constants ¶
const ( OffsetCheckRetryTimeout = 60 * time.Second OffsetCheckRetryInterval = 5 * time.Second )
Variables ¶
var GroupLockedError = fmt.Errorf("managed group lock failed: locked by a different token")
GroupLockedError is the error returned if the a locked managed group is given a different token for access
Functions ¶
This section is empty.
Types ¶
type ConsumerGroupOffsetsChecker ¶ added in v0.24.6
type EventIndex ¶ added in v0.25.0
type EventIndex int
EventIndex is the type of Event used when sending ManagerEvent structs via the notifyChannels list
const ( GroupCreated EventIndex = iota + 1 GroupStopped GroupStarted GroupClosed )
Events
type KafkaConsumerGroupFactory ¶
type KafkaConsumerGroupFactory interface {
StartConsumerGroup(ctx context.Context, groupID string, topics []string, handler KafkaConsumerHandler, channelRef types.NamespacedName, options ...SaramaConsumerHandlerOption) (sarama.ConsumerGroup, error)
}
KafkaConsumerGroupFactory creates the ConsumerGroup and start consuming the specified topic
func NewConsumerGroupFactory ¶
func NewConsumerGroupFactory(addrs []string, config *sarama.Config, offsetsChecker ConsumerGroupOffsetsChecker, enqueue func(ref types.NamespacedName)) KafkaConsumerGroupFactory
type KafkaConsumerGroupManager ¶ added in v0.24.0
type KafkaConsumerGroupManager interface { Reconfigure(brokers []string, config *sarama.Config) *ReconfigureError StartConsumerGroup(ctx context.Context, groupId string, topics []string, handler KafkaConsumerHandler, ref types.NamespacedName, options ...SaramaConsumerHandlerOption) error CloseConsumerGroup(groupId string) error Errors(groupId string) <-chan error IsManaged(groupId string) bool IsStopped(groupId string) bool GetNotificationChannel() <-chan ManagerEvent ClearNotifications() }
KafkaConsumerGroupManager keeps track of Sarama consumer groups and handles messages from control-protocol clients
func NewConsumerGroupManager ¶ added in v0.24.0
func NewConsumerGroupManager(logger *zap.Logger, serverHandler controlprotocol.ServerHandler, brokers []string, config *sarama.Config, offsetsChecker ConsumerGroupOffsetsChecker, enqueue func(ref types.NamespacedName)) KafkaConsumerGroupManager
NewConsumerGroupManager returns a new kafkaConsumerGroupManagerImpl as a KafkaConsumerGroupManager interface
type KafkaConsumerGroupOffsetsChecker ¶ added in v0.24.6
type KafkaConsumerGroupOffsetsChecker struct { }
func (*KafkaConsumerGroupOffsetsChecker) WaitForOffsetsInitialization ¶ added in v0.24.6
type KafkaConsumerHandler ¶
type KafkaConsumerHandler interface { // When this function returns true, the consumer group offset is marked as consumed. // The returned error is enqueued in errors channel. Handle(context context.Context, message *sarama.ConsumerMessage) (bool, error) SetReady(partition int32, ready bool) GetConsumerGroup() string }
type ManagerEvent ¶ added in v0.25.0
type ManagerEvent struct { Event EventIndex GroupId string }
ManagerEvent is the struct used by the notification channel
type NoopConsumerGroupOffsetsChecker ¶ added in v0.24.6
type NoopConsumerGroupOffsetsChecker struct { }
func (*NoopConsumerGroupOffsetsChecker) WaitForOffsetsInitialization ¶ added in v0.24.6
type ReconfigureError ¶ added in v0.25.0
type ReconfigureError struct { MultiError error // A MultiError with all ConsumerGroup failures. GroupIds []string // The ConsumerGroup IDs which failed to stop/start. }
ReconfigureError is a custom error type returned by the Reconfigure() function.
func (ReconfigureError) Error ¶ added in v0.25.0
func (r ReconfigureError) Error() string
Error implements the golang error interface.
type SaramaConsumerHandler ¶
type SaramaConsumerHandler struct {
// contains filtered or unexported fields
}
ConsumerHandler implements sarama.ConsumerGroupHandler and provides some glue code to simplify message handling You must implement KafkaConsumerHandler and create a new SaramaConsumerHandler with it
func NewConsumerHandler ¶
func NewConsumerHandler(logger *zap.SugaredLogger, handler KafkaConsumerHandler, errorsCh chan error, options ...SaramaConsumerHandlerOption) SaramaConsumerHandler
func (*SaramaConsumerHandler) Cleanup ¶
func (consumer *SaramaConsumerHandler) Cleanup(session sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*SaramaConsumerHandler) ConsumeClaim ¶
func (consumer *SaramaConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (*SaramaConsumerHandler) Setup ¶
func (consumer *SaramaConsumerHandler) Setup(session sarama.ConsumerGroupSession) error
Setup is run at the beginning of a new session, before ConsumeClaim
type SaramaConsumerHandlerOption ¶ added in v0.23.0
type SaramaConsumerHandlerOption func(*SaramaConsumerHandler)
func WithSaramaConsumerLifecycleListener ¶ added in v0.23.0
func WithSaramaConsumerLifecycleListener(listener SaramaConsumerLifecycleListener) SaramaConsumerHandlerOption
func WithTimeout ¶ added in v0.23.1
func WithTimeout(timeout time.Duration) SaramaConsumerHandlerOption
WithTimeout configures the request timeout. Default is set to 60s.
type SaramaConsumerLifecycleListener ¶ added in v0.23.0
type SaramaConsumerLifecycleListener interface { // Setup is invoked when the consumer is joining the session Setup(sess sarama.ConsumerGroupSession) // Cleanup is invoked when the consumer is leaving the session Cleanup(sess sarama.ConsumerGroupSession) }
type SubscriberStatus ¶ added in v0.25.0
type SubscriberStatus struct { Stopped bool // A stopped subscriber is active but suspended ("paused") and is not processing events Error error // A subscriber with a non-nil error has failed }
SubscriberStatus keeps track of the difference between active, failed, and stopped subscribers
type SubscriberStatusMap ¶ added in v0.25.0
type SubscriberStatusMap map[types.UID]SubscriberStatus
SubscriberStatusMap defines the map type which holds a collection of Subscribers by UID and their status
func (SubscriberStatusMap) FailedCount ¶ added in v0.25.0
func (s SubscriberStatusMap) FailedCount() int
FailedCount returns the count of subscribers represented by this map which have an errors associated with them