Documentation ¶
Overview ¶
Package kafka provides consumer abstractions and base functionality with included tracing capabilities.
Index ¶
- Constants
- func ClaimMessage(ctx context.Context, msg *sarama.ConsumerMessage, d encoding.DecodeRawFunc, ...) (async.Message, error)
- func DefaultSaramaConfig(name string) (*sarama.Config, error)
- func MessageStatusCountInc(status, group, topic string)
- func TopicPartitionOffsetDiffGaugeSet(group, topic string, partition int32, high, offset int64)
- type ConsumerConfig
- type OptionFunc
- func Buffer(buf int) OptionFunc
- func Decoder(dec encoding.DecodeRawFunc) OptionFunc
- func DecoderJSON() OptionFunc
- func Start(offset int64) OptionFunc
- func StartFromNewest() OptionFunc
- func StartFromOldest() OptionFunc
- func Timeout(timeout time.Duration) OptionFunc
- func Version(version string) OptionFunc
- func WithDurationOffset(since time.Duration, timeExtractor TimeExtractor) OptionFunc
- type TimeExtractor
Constants ¶
const ( // MessageReceived is used to label the Prometheus Message Status counter. MessageReceived = "received" // MessageClaimErrors is used to label the Prometheus Message Status counter. MessageClaimErrors = "claim-errors" // MessageDecoded is used to label the Prometheus Message Status counter. MessageDecoded = "decoded" )
Variables ¶
This section is empty.
Functions ¶
func ClaimMessage ¶
func ClaimMessage(ctx context.Context, msg *sarama.ConsumerMessage, d encoding.DecodeRawFunc, sess sarama.ConsumerGroupSession) (async.Message, error)
ClaimMessage transforms a sarama.ConsumerMessage to an async.Message.
func DefaultSaramaConfig ¶
DefaultSaramaConfig function creates a sarama config object with the default configuration set up.
func MessageStatusCountInc ¶
func MessageStatusCountInc(status, group, topic string)
MessageStatusCountInc increments the messageStatus counter for a certain status.
func TopicPartitionOffsetDiffGaugeSet ¶
TopicPartitionOffsetDiffGaugeSet creates a new Gauge that measures partition offsets.
Types ¶
type ConsumerConfig ¶
type ConsumerConfig struct { Brokers []string Buffer int DecoderFunc encoding.DecodeRawFunc DurationBasedConsumer bool DurationOffset time.Duration TimeExtractor func(*sarama.ConsumerMessage) (time.Time, error) SaramaConfig *sarama.Config }
ConsumerConfig is the common configuration of patron kafka consumers.
type OptionFunc ¶
type OptionFunc func(*ConsumerConfig) error
OptionFunc definition for configuring the consumer in a functional way.
func Buffer ¶
func Buffer(buf int) OptionFunc
Buffer option for adjusting the incoming messages buffer.
func Decoder ¶
func Decoder(dec encoding.DecodeRawFunc) OptionFunc
Decoder option for injecting a specific decoder implementation.
func Start ¶
func Start(offset int64) OptionFunc
Start option for adjusting the the starting offset.
func StartFromNewest ¶
func StartFromNewest() OptionFunc
StartFromNewest option for adjusting the starting offset to newest.
func StartFromOldest ¶
func StartFromOldest() OptionFunc
StartFromOldest option for adjusting the starting offset to oldest.
func Timeout ¶
func Timeout(timeout time.Duration) OptionFunc
Timeout option for adjusting the timeout of the connection.
func Version ¶
func Version(version string) OptionFunc
Version option for setting the Kafka version.
func WithDurationOffset ¶ added in v0.42.0
func WithDurationOffset(since time.Duration, timeExtractor TimeExtractor) OptionFunc
WithDurationOffset allows creating a consumer from a given duration. It accepts a function indicating how to extract the time from a Kafka message.
type TimeExtractor ¶ added in v0.42.0
type TimeExtractor func(*sarama.ConsumerMessage) (time.Time, error)
TimeExtractor defines a function extracting a time from a Kafka message.
Directories ¶
Path | Synopsis |
---|---|
Package group provides a consumer group implementation.
|
Package group provides a consumer group implementation. |
Package simple provides a simple consumer implementation without consumer groups.
|
Package simple provides a simple consumer implementation without consumer groups. |