Documentation ¶
Index ¶
- type ConsumerGroup
- type ConsumerGroupCreator
- type ConsumerGroupHandler
- func (h *ConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error
- func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (h *ConsumerGroupHandler) Handle(session sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) error
- func (h *ConsumerGroupHandler) Reserve(ctx context.Context) error
- func (h *ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error
- type KafkaConsumer
- func (k *KafkaConsumer) Description() string
- func (k *KafkaConsumer) Gather(acc telegraf.Accumulator) error
- func (k *KafkaConsumer) Init() error
- func (k *KafkaConsumer) SampleConfig() string
- func (k *KafkaConsumer) SetParser(parser parsers.Parser)
- func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error
- func (k *KafkaConsumer) Stop()
- type Message
- type SaramaCreator
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConsumerGroup ¶ added in v1.17.3
type ConsumerGroupCreator ¶ added in v1.17.3
type ConsumerGroupHandler ¶ added in v1.17.3
type ConsumerGroupHandler struct { MaxMessageLen int TopicTag string // contains filtered or unexported fields }
ConsumerGroupHandler is a sarama.ConsumerGroupHandler implementation.
func NewConsumerGroupHandler ¶ added in v1.17.3
func NewConsumerGroupHandler(acc telegraf.Accumulator, maxUndelivered int, parser parsers.Parser) *ConsumerGroupHandler
func (*ConsumerGroupHandler) Cleanup ¶ added in v1.17.3
func (h *ConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error
Cleanup stops the internal goroutine and is called after all ConsumeClaim functions have completed.
func (*ConsumerGroupHandler) ConsumeClaim ¶ added in v1.17.3
func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim is called once each claim in a goroutine and must be thread-safe. Should run until the claim is closed.
func (*ConsumerGroupHandler) Handle ¶ added in v1.17.3
func (h *ConsumerGroupHandler) Handle(session sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) error
Handle processes a message and if successful saves it to be acknowledged after delivery.
func (*ConsumerGroupHandler) Reserve ¶ added in v1.17.3
func (h *ConsumerGroupHandler) Reserve(ctx context.Context) error
Reserve blocks until there is an available slot for a new message.
func (*ConsumerGroupHandler) Setup ¶ added in v1.17.3
func (h *ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error
Setup is called once when a new session is opened. It setups up the handler and begins processing delivered messages.
type KafkaConsumer ¶ added in v1.17.3
type KafkaConsumer struct { Brokers []string `toml:"brokers"` ConsumerGroup string `toml:"consumer_group"` MaxMessageLen int `toml:"max_message_len"` MaxUndeliveredMessages int `toml:"max_undelivered_messages"` Offset string `toml:"offset"` BalanceStrategy string `toml:"balance_strategy"` Topics []string `toml:"topics"` TopicTag string `toml:"topic_tag"` kafka.ReadConfig Log telegraf.Logger `toml:"-"` ConsumerCreator ConsumerGroupCreator `toml:"-"` // contains filtered or unexported fields }
func (*KafkaConsumer) Description ¶ added in v1.17.3
func (k *KafkaConsumer) Description() string
func (*KafkaConsumer) Gather ¶ added in v1.17.3
func (k *KafkaConsumer) Gather(acc telegraf.Accumulator) error
func (*KafkaConsumer) Init ¶ added in v1.17.3
func (k *KafkaConsumer) Init() error
func (*KafkaConsumer) SampleConfig ¶ added in v1.17.3
func (k *KafkaConsumer) SampleConfig() string
func (*KafkaConsumer) SetParser ¶ added in v1.17.3
func (k *KafkaConsumer) SetParser(parser parsers.Parser)
func (*KafkaConsumer) Start ¶ added in v1.17.3
func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error
func (*KafkaConsumer) Stop ¶ added in v1.17.3
func (k *KafkaConsumer) Stop()
type Message ¶ added in v1.17.3
type Message struct {
// contains filtered or unexported fields
}
Message is an aggregate type binding the Kafka message and the session so that offsets can be updated.
type SaramaCreator ¶ added in v1.17.3
type SaramaCreator struct{}
func (*SaramaCreator) Create ¶ added in v1.17.3
func (*SaramaCreator) Create(brokers []string, group string, config *sarama.Config) (ConsumerGroup, error)