Documentation ¶
Overview ¶
Package kafka provides consumer abstractions and base functionality with included tracing capabilities.
Index ¶
- Constants
- func ClaimMessage(ctx context.Context, msg *sarama.ConsumerMessage, d encoding.DecodeRawFunc, ...) (async.Message, error)
- func MessageStatusCountInc(status, group, topic string)
- func TopicPartitionOffsetDiffGaugeSet(group, topic string, partition int32, high, offset int64)
- type ConsumerConfig
- type OptionFunc
- func Buffer(buf int) OptionFunc
- func Decoder(dec encoding.DecodeRawFunc) OptionFunc
- func DecoderJSON() OptionFunc
- func Start(offset int64) OptionFunc
- func StartFromNewest() OptionFunc
- func StartFromOldest() OptionFunc
- func Timeout(timeout time.Duration) OptionFunc
- func Version(version string) OptionFunc
Constants ¶
const ( // MessageReceived is used to label the Prometheus Message Status counter. MessageReceived = "received" // MessageClaimErrors is used to label the Prometheus Message Status counter. MessageClaimErrors = "claim-errors" // MessageDecoded is used to label the Prometheus Message Status counter. MessageDecoded = "decoded" )
Variables ¶
This section is empty.
Functions ¶
func ClaimMessage ¶
func ClaimMessage(ctx context.Context, msg *sarama.ConsumerMessage, d encoding.DecodeRawFunc, sess sarama.ConsumerGroupSession) (async.Message, error)
ClaimMessage transforms a sarama.ConsumerMessage to an async.Message.
func MessageStatusCountInc ¶
func MessageStatusCountInc(status, group, topic string)
MessageStatusCountInc increments the messageStatus counter for a certain status.
func TopicPartitionOffsetDiffGaugeSet ¶
TopicPartitionOffsetDiffGaugeSet creates a new Gauge that measures partition offsets.
Types ¶
type ConsumerConfig ¶
type ConsumerConfig struct { Brokers []string Buffer int DecoderFunc encoding.DecodeRawFunc DurationBasedConsumer bool DurationOffset time.Duration TimeExtractor func(*sarama.ConsumerMessage) (time.Time, error) TimestampBasedConsumer bool TimestampOffset int64 SaramaConfig *sarama.Config LatestOffsetReachedChan chan<- struct{} }
ConsumerConfig is the common configuration of patron kafka consumers.
type OptionFunc ¶
type OptionFunc func(*ConsumerConfig) error
OptionFunc definition for configuring the consumer in a functional way.
func Decoder ¶
func Decoder(dec encoding.DecodeRawFunc) OptionFunc
Decoder for injecting a specific decoder implementation.
func StartFromNewest ¶
func StartFromNewest() OptionFunc
StartFromNewest for adjusting the starting offset to newest.
func StartFromOldest ¶
func StartFromOldest() OptionFunc
StartFromOldest for adjusting the starting offset to oldest.
func Timeout ¶
func Timeout(timeout time.Duration) OptionFunc
Timeout for adjusting the timeout of the connection.
Directories ¶
Path | Synopsis |
---|---|
Package group provides a consumer group implementation.
|
Package group provides a consumer group implementation. |
Package simple provides a simple consumer implementation without consumer groups.
|
Package simple provides a simple consumer implementation without consumer groups. |