kafkaclient

package
v2.0.0-beta.0+incompat... Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 6, 2018 License: ISC Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// TestBrokerAddress is the address used to connect to the testing broker.
	// This defaults to 127.0.0.1:9092, but can be overwritten if desired.
	TestBrokerAddress = "127.0.0.1:9092"

	// TestTimeoutMultiplier can be used to increase the default timeouts during
	// test runs when waiting for time-sensitive values to be returned. It
	// defaults to a multiplier of 1.
	//
	// This is specifically useful on slower environments like a CI server.
	TestTimeoutMultiplier = 1
)

Functions

func NewConsumer

func NewConsumer(options ...func(*streamconfig.Consumer)) (stream.Consumer, error)

NewConsumer returns a new Kafka consumer.

func NewProducer

func NewProducer(options ...func(*streamconfig.Producer)) (stream.Producer, error)

NewProducer returns a new Kafka producer.

func TestConsumer

func TestConsumer(tb testing.TB, topicAndGroup string, options ...func(c *streamconfig.Consumer)) (stream.Consumer, func())

TestConsumer returns a new kafka consumer to be used in test cases. It also returns a function that should be deferred to clean up resources.

You pass the topic and group name of the consumer as a single argument.

func TestConsumerConfig

func TestConsumerConfig(tb testing.TB, topicAndGroup string, options ...func(c *streamconfig.Consumer)) []func(c *streamconfig.Consumer)

TestConsumerConfig returns sane default options to use during testing of the kafkaclient consumer implementation.

func TestMessageFromConsumer

func TestMessageFromConsumer(tb testing.TB, consumer stream.Consumer) streammsg.Message

TestMessageFromConsumer returns a single message, consumed from the provided consumer. It has a built-in timeout mechanism to prevent the test from getting stuck.

func TestMessageFromTopic

func TestMessageFromTopic(tb testing.TB, topic string) streammsg.Message

TestMessageFromTopic returns a single message, consumed from the provided topic. It has a built-in timeout mechanism to prevent the test from getting stuck.

func TestMessagesFromTopic

func TestMessagesFromTopic(tb testing.TB, topic string) []streammsg.Message

TestMessagesFromTopic returns all messages in a topic.

func TestOffsets

func TestOffsets(tb testing.TB, message streammsg.Message) []kafka.TopicPartition

TestOffsets returns a list of `kafka.TopicPartition`s.

func TestProduceMessages

func TestProduceMessages(tb testing.TB, topic string, values ...interface{})

TestProduceMessages accepts a string to use as the topic, and an arbitrary number of argument to generate messages on the provided Kafka topic.

The provided extra arguments can be of several different types:

* `string` – The value is used as the kafka message value.

* `[]string` – The first value is used as the kafka message key, the second as the message value, all other values are ignored.

  • `streammsg.Message` – The value (and, if applicable, the key) are set on a new `kafka.Message`.

* `*kafka.Message` – The message is delivered to Kafka as-is. If `kafka.TopicPartition` is empty, the passed in topic value is used instead.

func TestProducer

func TestProducer(tb testing.TB, topic string, options ...func(c *streamconfig.Producer)) (stream.Producer, func())

TestProducer returns a new kafka consumer to be used in test cases. It also returns a function that should be deferred to clean up resources.

You pass the topic and group name of the consumer as a single argument.

func TestProducerConfig

func TestProducerConfig(tb testing.TB, topic string, options ...func(c *streamconfig.Producer)) []func(c *streamconfig.Producer)

TestProducerConfig returns sane default options to use during testing of the kafkaclient producer implementation.

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer implements the stream.Consumer interface for the Kafka client.

func (*Consumer) Ack

func (c *Consumer) Ack(m streammsg.Message) error

Ack acknowledges that a message was processed

func (*Consumer) Close

func (c *Consumer) Close() (err error)

Close closes the consumer connection.

func (*Consumer) Config

func (c *Consumer) Config() streamconfig.Consumer

Config returns a read-only representation of the consumer configuration.

func (*Consumer) Messages

func (c *Consumer) Messages() <-chan streammsg.Message

Messages returns the read channel for the messages that are returned by the stream.

func (*Consumer) Nack

func (c *Consumer) Nack(m streammsg.Message) error

Nack is a no-op implementation to satisfy the `stream.Consumer` interface. We don't need an actual implementation, since not acknowledging a message will eventually result in the message being redelivered.

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer implements the stream.Producer interface for the Kafka client.

func (*Producer) Close

func (p *Producer) Close() (err error)

Close closes the producer connection. This function blocks until all messages still in the channel have been processed, and the channel is properly closed.

func (*Producer) Config

func (p *Producer) Config() streamconfig.Producer

Config returns a read-only representation of the producer configuration.

func (*Producer) Messages

func (p *Producer) Messages() chan<- streammsg.Message

Messages returns the write channel for messages to be produced.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL