kafka

package
v0.17.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2019 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// RequiredAcksAll waits for all in-sync replicas to commit before responding.
	RequiredAcksAll int = -1
	// RequiredAcksNone waits for no acknowledgements.
	RequiredAcksNone = 0
	// RequiredAcksLocal waits for only the local commit to succeed before responding.
	RequiredAcksLocal = 1
)

Constant values that represent the required acks setting for produced messages. These map to the sarama.RequiredAcks constants

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client struct {
	ClientConfig
	SaramaClient sarama.Client
	// contains filtered or unexported fields
}

Client wraps a sarama client and Kafka configuration and can be used to create producers and consumers

func (Client) Close

func (c Client) Close(ctx context.Context)

Close the underlying Kafka client and stop the Kafka broker metrics gathering task. If an error occurs closing the client, the error is logged.

func (Client) NewConsumer

func (c Client) NewConsumer(config ConsumerConfig, logger *zap.Logger) (ConsumerIface, error)

NewConsumer sets up a high-level Kafka consumer that can be used for reading all partitions of a given topic a given offset. NewConsumer also sets up Prometheus metrics with the default registerer and configures schema registry if set in the configuration.

func (Client) NewProducer

func (c Client) NewProducer(config ProducerConfig, logger *zap.Logger, returnMessages bool) (ProducerIface, error)

NewProducer creates a sarama producer from a client. If the returnMessages flag is true, messages from the producer will be produced on the Success or Errors channel depending on the outcome of the produced message. This method also registers producer metrics with the default Prometheus registerer. Note that this method has the side effect of setting the compression level and codec on the provided client's underlying configuration.

type ClientConfig added in v0.10.0

type ClientConfig struct {
	Broker       string
	ClientID     string
	TLSCaCrtPath string
	TLSCrtPath   string
	TLSKeyPath   string
	Verbose      bool
	KafkaVersion string
}

ClientConfig contains connection settings and configuration for communicating with a Kafka cluster

func (ClientConfig) NewClient added in v0.10.0

func (c ClientConfig) NewClient(ctx context.Context) (Client, error)

NewClient creates a Sarama client from configuration and starts a periodic task for capturing Kafka broker metrics. The client is instantiated with configuration from the ClientConfiguration and the following options are turned on: * The Consumer returns errors * The Producer returns successes * The Producer returns errors

func (*ClientConfig) RegisterFlags added in v0.10.0

func (c *ClientConfig) RegisterFlags(flags *pflag.FlagSet)

Registers Kafka client flags with pflags

type ClientIface added in v0.7.0

type ClientIface interface {
	NewConsumer(config ConsumerConfig, logger *zap.Logger) (ConsumerIface, error)
	NewProducer(config ProducerConfig, logger *zap.Logger, returnMessages bool) (ProducerIface, error)
}

ClientIface is an interface for creating consumers and producers

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer is a high-level Kafka consumer that can consume off all partitions on a given topic, tracks metrics, provides optional logging, and unmarshals messages before passing them off to a handler.

func (Consumer) Close

func (c Consumer) Close()

Close Sarama consumer and client

func (Consumer) ConsumeTopic

func (c Consumer) ConsumeTopic(
	ctx context.Context,
	handler MessageHandler,
	topic string,
	offsets PartitionOffsets,
	readResult chan PartitionOffsets,
	catchupWg *sync.WaitGroup,
	exitAfterCaughtUp bool,
) error

ConsumeTopic consumes a particular Kafka topic from startOffset to endOffset or from startOffset to forever

This function will create consumers for all partitions in a topic and read from the given offset on each partition to the latest offset when the consumer was started, then notify the caller via catchupWg. If exitAfterCaughtUp is true, the consumer will exit after it reads message at the latest offset when it started up. When all partition consumers are closed, it will send the last offset read on each partition through the readResult channel.

func (Consumer) ConsumeTopicFromBeginning

func (c Consumer) ConsumeTopicFromBeginning(
	ctx context.Context,
	handler MessageHandler,
	topic string,
	readResult chan PartitionOffsets,
	catchupWg *sync.WaitGroup,
	exitAfterCaughtUp bool,
) error

ConsumeTopicFromBeginning starts Kafka consumers on all partitions in a given topic from the message with the oldest offset.

func (Consumer) ConsumeTopicFromLatest

func (c Consumer) ConsumeTopicFromLatest(
	ctx context.Context,
	handler MessageHandler,
	topic string,
	readResult chan PartitionOffsets,
) error

ConsumeTopicFromLatest starts Kafka consumers on all partitions in a given topic from the message with the latest offset.

type ConsumerConfig added in v0.10.0

type ConsumerConfig struct {
	JSONEnabled    bool
	SchemaRegistry *SchemaRegistryConfig
}

ConsumerConfig contains consumer-specific configuration including whether or not to use JSON deserialization and schema registry configuration.

func NewDefaultConsumerConfig added in v0.12.1

func NewDefaultConsumerConfig() ConsumerConfig

NewDefaultConsumerConfig constructs and returns a usable ConsumerConfig with default settings

func (*ConsumerConfig) RegisterFlags added in v0.10.0

func (c *ConsumerConfig) RegisterFlags(flags *pflag.FlagSet)

Registers high-level consumer flags with pflags

type ConsumerIface

type ConsumerIface interface {
	ConsumeTopic(ctx context.Context, handler MessageHandler, topic string, offsets PartitionOffsets, readResult chan PartitionOffsets, catchupWg *sync.WaitGroup, exitAfterCaughtUp bool) error
	ConsumeTopicFromBeginning(ctx context.Context, handler MessageHandler, topic string, readResult chan PartitionOffsets, catchupWg *sync.WaitGroup, exitAfterCaughtUp bool) error
	ConsumeTopicFromLatest(ctx context.Context, handler MessageHandler, topic string, readResult chan PartitionOffsets) error
	Close()
}

ConsumerIface is an interface for consuming messages from a Kafka topic

type ConsumerMetrics added in v0.10.0

type ConsumerMetrics struct {
	MessagesProcessed     *prometheus.GaugeVec
	MessageErrors         *prometheus.GaugeVec
	MessageProcessingTime *prometheus.SummaryVec
	ErrorsProcessed       *prometheus.GaugeVec
}

ConsumerMetrics is a collection of Prometheus metrics for tracking a Kafka consumer's performance

func RegisterConsumerMetrics added in v0.10.0

func RegisterConsumerMetrics(registerer prometheus.Registerer) ConsumerMetrics

RegisterConsumerMetrics registers Kafka consumer metrics with the provided registerer and returns a struct containing consumer Prometheus metrics.

type MessageHandler

type MessageHandler interface {
	HandleMessage(ctx context.Context, msg *sarama.ConsumerMessage, unmarshaler MessageUnmarshaler) error
}

MessageHandler defines an interface for handling new messages received by the Kafka consumer

type MessageUnmarshaler

type MessageUnmarshaler interface {
	UnmarshalMessage(ctx context.Context, msg *sarama.ConsumerMessage, target interface{}) error
}

MessageUnmarshaler defines an interface for unmarshaling messages received from Kafka to Go types

type MockClient added in v0.7.0

type MockClient struct {
	mock.Mock
}

MockClient implements ClientIface for testing purposes

func (*MockClient) NewConsumer added in v0.7.0

func (m *MockClient) NewConsumer(config ConsumerConfig, logger *zap.Logger) (ConsumerIface, error)

NewConsumer creates a new mock consumer

func (*MockClient) NewProducer added in v0.7.0

func (m *MockClient) NewProducer(config ProducerConfig, logger *zap.Logger, returnMessages bool) (ProducerIface, error)

NewProducer creates a new mock producer

type MockKafkaConsumer

type MockKafkaConsumer struct {
	mock.Mock
	sync.Mutex
	// contains filtered or unexported fields
}

MockKafkaConsumer implements KafkaConsumerIface for testing purposes

func (*MockKafkaConsumer) Close

func (m *MockKafkaConsumer) Close()

Close mocks the Kafka consumer Close method

func (*MockKafkaConsumer) ConsumeTopic

func (m *MockKafkaConsumer) ConsumeTopic(ctx context.Context, handler MessageHandler, topic string, offsets PartitionOffsets, readResult chan PartitionOffsets, catchupWg *sync.WaitGroup, exitAfterCaughtUp bool) error

ConsumeTopic mocks the Kafka consumer ConsumeTopic method

func (*MockKafkaConsumer) ConsumeTopicFromBeginning

func (m *MockKafkaConsumer) ConsumeTopicFromBeginning(ctx context.Context, handler MessageHandler, topic string, readResult chan PartitionOffsets, catchupWg *sync.WaitGroup, exitAfterCaughtUp bool) error

ConsumeTopicFromBeginning mocks the Kafka consumer ConsumeTopicFromBeginning method

func (*MockKafkaConsumer) ConsumeTopicFromLatest

func (m *MockKafkaConsumer) ConsumeTopicFromLatest(ctx context.Context, handler MessageHandler, topic string, readResult chan PartitionOffsets) error

ConsumeTopicFromLatest mocks the Kafka consumer ConsumeTopicFromLatest method

func (*MockKafkaConsumer) EmitReadResult

func (m *MockKafkaConsumer) EmitReadResult(offsets PartitionOffsets)

EmitReadResult allows tests to send values through the readResult channel passed into the mock consumer.

type MockProducer added in v0.7.0

type MockProducer struct {
	mock.Mock
	Messages chan *sarama.ProducerMessage

	Done chan bool
	// contains filtered or unexported fields
}

MockProducer implements the producer interface for testing and includes channels for mocking out messages

func NewMockProducer added in v0.7.0

func NewMockProducer() *MockProducer

NewMockProducer creates a new mock producer

func (*MockProducer) Errors added in v0.7.0

func (m *MockProducer) Errors() chan *sarama.ProducerError

Errors returns the channel on which messages that could not be published will be returned

func (*MockProducer) RunProducer added in v0.7.0

func (m *MockProducer) RunProducer(messages chan *sarama.ProducerMessage, done chan bool)

RunProducer mocks the RunProducer call. This method has the side effect of setting the Messages and Done channels on the mock producer to the channel that was passed into the method.

func (*MockProducer) Successes added in v0.7.0

func (m *MockProducer) Successes() chan *sarama.ProducerMessage

Successes returns the channel on which successfully published messages will be returned

type PartitionOffsets

type PartitionOffsets map[int32]int64

PartitionOffsets is a mapping of partition ID to an offset to which a consumer read on that partition

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer is a wrapped Sarama producer that tracks producer metrics and provides optional logging.

func (Producer) Errors added in v0.5.0

func (p Producer) Errors() chan *sarama.ProducerError

Errors returns the channel on which messages that could not be published will be returned

func (Producer) RunProducer

func (p Producer) RunProducer(messages chan *sarama.ProducerMessage, done chan bool)

RunProducer wraps the sarama AsyncProducer and adds metrics and optional logging to the producer. To stop the producer, close the messages channel; when the producer is shutdown the done channel will be closed. If the messages channel is unbuffered, each message sent to the producer is guaranteed to at least have been attempted to be produced to Kafka.

func (Producer) Successes added in v0.5.0

func (p Producer) Successes() chan *sarama.ProducerMessage

Successes returns the channel on which successfully published messages will be returned

type ProducerConfig added in v0.10.0

type ProducerConfig struct {
	ProducerCompressionCodec string
	ProducerCompressionLevel int
	ProducerRequiredAcks     int
}

ProducerConfig contains producer-specific configuration information

func (*ProducerConfig) RegisterFlags added in v0.10.0

func (c *ProducerConfig) RegisterFlags(flags *pflag.FlagSet)

RegisterFlags registers producer flags with pflags

type ProducerIface added in v0.7.0

type ProducerIface interface {
	RunProducer(messages chan *sarama.ProducerMessage, done chan bool)
	Successes() chan *sarama.ProducerMessage
	Errors() chan *sarama.ProducerError
}

ProducerIface is an interface for producing Kafka messages

type ProducerMetrics added in v0.10.0

type ProducerMetrics struct {
	MessagesProduced *prometheus.GaugeVec
	ErrorsProduced   *prometheus.GaugeVec
}

ProducerMetrics is a collection of Prometheus metrics for tracking a Kafka producer's performance

func RegisterProducerMetrics added in v0.10.0

func RegisterProducerMetrics(registerer prometheus.Registerer) ProducerMetrics

RegisterProducerMetrics registers Kafka producer metrics with the provided registerer and returns a struct containing gauges for the number of messages and errors produced.

type SchemaRegistryConfig

type SchemaRegistryConfig struct {
	SchemaRegistryURL string
	// contains filtered or unexported fields
}

SchemaRegistryConfig defines the necessary configuration for interacting with Schema Registry

func (*SchemaRegistryConfig) RegisterFlags

func (src *SchemaRegistryConfig) RegisterFlags(flags *pflag.FlagSet)

RegisterFlags registers Kafka flags with pflags

func (*SchemaRegistryConfig) UnmarshalMessage

func (src *SchemaRegistryConfig) UnmarshalMessage(
	ctx context.Context,
	msg *sarama.ConsumerMessage,
	target interface{},
) error

UnmarshalMessage Implements the KafkaMessageUnmarshaler interface. Decodes an Avro message into a Go struct type, specifically an Avro message from Kafka. Avro schemas are fetched from Kafka schema registry. To use this function, tag each field of the target struct with a `kafka` tag whose value indicates which key on the Avro message to set as the value.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL