kafka

package
v0.69.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 17, 2022 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Overview

Package kafka provides consumer abstractions and base functionality with included tracing capabilities.

Index

Constants

View Source
const (

	// MessageReceived is used to label the Prometheus Message Status counter.
	MessageReceived = "received"
	// MessageClaimErrors is used to label the Prometheus Message Status counter.
	MessageClaimErrors = "claim-errors"
	// MessageDecoded is used to label the Prometheus Message Status counter.
	MessageDecoded = "decoded"
)

Variables

This section is empty.

Functions

func ClaimMessage

ClaimMessage transforms a sarama.ConsumerMessage to an async.Message.

func MessageStatusCountInc

func MessageStatusCountInc(status, group, topic string)

MessageStatusCountInc increments the messageStatus counter for a certain status.

func TopicPartitionOffsetDiffGaugeSet

func TopicPartitionOffsetDiffGaugeSet(group, topic string, partition int32, high, offset int64)

TopicPartitionOffsetDiffGaugeSet creates a new Gauge that measures partition offsets.

Types

type ConsumerConfig

type ConsumerConfig struct {
	Brokers                 []string
	Buffer                  int
	DecoderFunc             encoding.DecodeRawFunc
	DurationBasedConsumer   bool
	DurationOffset          time.Duration
	TimeExtractor           func(*sarama.ConsumerMessage) (time.Time, error)
	TimestampBasedConsumer  bool
	TimestampOffset         int64
	SaramaConfig            *sarama.Config
	LatestOffsetReachedChan chan<- struct{}
}

ConsumerConfig is the common configuration of patron kafka consumers.

type OptionFunc

type OptionFunc func(*ConsumerConfig) error

OptionFunc definition for configuring the consumer in a functional way.

func Buffer

func Buffer(buf int) OptionFunc

Buffer for adjusting the incoming messages buffer.

func Decoder

func Decoder(dec encoding.DecodeRawFunc) OptionFunc

Decoder for injecting a specific decoder implementation.

func DecoderJSON

func DecoderJSON() OptionFunc

DecoderJSON for injecting json decoder.

func Start

func Start(offset int64) OptionFunc

Start for adjusting the starting offset.

func StartFromNewest

func StartFromNewest() OptionFunc

StartFromNewest for adjusting the starting offset to newest.

func StartFromOldest

func StartFromOldest() OptionFunc

StartFromOldest for adjusting the starting offset to oldest.

func Timeout

func Timeout(timeout time.Duration) OptionFunc

Timeout for adjusting the timeout of the connection.

func Version

func Version(version string) OptionFunc

Version for setting the Kafka version.

Directories

Path Synopsis
Package group provides a consumer group implementation.
Package group provides a consumer group implementation.
Package simple provides a simple consumer implementation without consumer groups.
Package simple provides a simple consumer implementation without consumer groups.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL