kafka

package
v0.54.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 2, 2021 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Overview

Package kafka provides consumer abstractions and base functionality with included tracing capabilities.

Index

Constants

View Source
const (

	// MessageReceived is used to label the Prometheus Message Status counter.
	MessageReceived = "received"
	// MessageClaimErrors is used to label the Prometheus Message Status counter.
	MessageClaimErrors = "claim-errors"
	// MessageDecoded is used to label the Prometheus Message Status counter.
	MessageDecoded = "decoded"
)

Variables

This section is empty.

Functions

func ClaimMessage

ClaimMessage transforms a sarama.ConsumerMessage to an async.Message.

func DefaultSaramaConfig

func DefaultSaramaConfig(name string) (*sarama.Config, error)

DefaultSaramaConfig function creates a sarama config object with the default configuration set up.

func MessageStatusCountInc

func MessageStatusCountInc(status, group, topic string)

MessageStatusCountInc increments the messageStatus counter for a certain status.

func TopicPartitionOffsetDiffGaugeSet

func TopicPartitionOffsetDiffGaugeSet(group, topic string, partition int32, high, offset int64)

TopicPartitionOffsetDiffGaugeSet creates a new Gauge that measures partition offsets.

Types

type ConsumerConfig

type ConsumerConfig struct {
	Brokers               []string
	Buffer                int
	DecoderFunc           encoding.DecodeRawFunc
	DurationBasedConsumer bool
	DurationOffset        time.Duration
	TimeExtractor         func(*sarama.ConsumerMessage) (time.Time, error)
	SaramaConfig          *sarama.Config
}

ConsumerConfig is the common configuration of patron kafka consumers.

type OptionFunc

type OptionFunc func(*ConsumerConfig) error

OptionFunc definition for configuring the consumer in a functional way.

func Buffer

func Buffer(buf int) OptionFunc

Buffer option for adjusting the incoming messages buffer.

func Decoder

func Decoder(dec encoding.DecodeRawFunc) OptionFunc

Decoder option for injecting a specific decoder implementation.

func DecoderJSON

func DecoderJSON() OptionFunc

DecoderJSON option for injecting json decoder.

func Start

func Start(offset int64) OptionFunc

Start option for adjusting the the starting offset.

func StartFromNewest

func StartFromNewest() OptionFunc

StartFromNewest option for adjusting the starting offset to newest.

func StartFromOldest

func StartFromOldest() OptionFunc

StartFromOldest option for adjusting the starting offset to oldest.

func Timeout

func Timeout(timeout time.Duration) OptionFunc

Timeout option for adjusting the timeout of the connection.

func Version

func Version(version string) OptionFunc

Version option for setting the Kafka version.

Directories

Path Synopsis
Package group provides a consumer group implementation.
Package group provides a consumer group implementation.
Package simple provides a simple consumer implementation without consumer groups.
Package simple provides a simple consumer implementation without consumer groups.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL