Documentation ¶
Index ¶
- type ConsumerGroup
- type ConsumerGroupCreator
- type ConsumerGroupHandler
- func (h *ConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error
- func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (h *ConsumerGroupHandler) Handle(session sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) error
- func (h *ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error
- type Input
- type SaramaCreator
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConsumerGroup ¶
type ConsumerGroupCreator ¶
type ConsumerGroupHandler ¶
type ConsumerGroupHandler struct { MaxMessageLen int TopicTag string // contains filtered or unexported fields }
ConsumerGroupHandler is a sarama.ConsumerGroupHandler implementation.
func NewConsumerGroupHandler ¶
func NewConsumerGroupHandler(edge edge.Edge, log logger.Logger) *ConsumerGroupHandler
func (*ConsumerGroupHandler) Cleanup ¶
func (h *ConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error
Cleanup stops the internal goroutine and is called after all ConsumeClaim functions have completed.
func (*ConsumerGroupHandler) ConsumeClaim ¶
func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim is called once each claim in a goroutine and must be thread-safe. Should run until the claim is closed.
func (*ConsumerGroupHandler) Handle ¶
func (h *ConsumerGroupHandler) Handle(session sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) error
Handle processes a message and if successful saves it to be acknowledged after delivery.
func (*ConsumerGroupHandler) Setup ¶
func (h *ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error
Setup is called once when a new session is opened. It setups up the handler and begins processing delivered messages.
type Input ¶
type Input struct { Brokers []string `toml:"brokers"` ConsumerGroup string `toml:"consumer_group"` MaxMessageLen int `toml:"max_message_len"` MaxUndeliveredMessages int `toml:"max_undelivered_messages"` MaxProcessingTime time.Duration `toml:"max_processing_time"` Offset string `toml:"offset"` BalanceStrategy string `toml:"balance_strategy"` Topics []string `toml:"topics"` TopicTag string `toml:"topic_tag"` ConsumerFetchDefault int64 `toml:"consumer_fetch_default"` ConnectionStrategy string `toml:"connection_strategy"` kafka.ReadConfig kafka.Logger Log logger.Logger `toml:"-"` ConsumerCreator ConsumerGroupCreator `toml:"-"` // contains filtered or unexported fields }
type SaramaCreator ¶
type SaramaCreator struct{}
func (*SaramaCreator) Create ¶
func (*SaramaCreator) Create(brokers []string, group string, cfg *sarama.Config) (ConsumerGroup, error)
Click to show internal directories.
Click to hide internal directories.