Documentation ¶
Index ¶
- func NewConsumer(options ...streamconfig.Option) (stream.Consumer, error)
- func NewProducer(options ...streamconfig.Option) (stream.Producer, error)
- func TestConsumer(tb testing.TB, topicAndGroup string, options ...streamconfig.Option) (stream.Consumer, func())
- func TestConsumerConfig(tb testing.TB, topicAndGroup string, options ...streamconfig.Option) []streamconfig.Option
- func TestConsumerWithAssignments(tb testing.TB, topicAndGroup string, options ...streamconfig.Option) (stream.Consumer, func())
- func TestMessageFromTopic(tb testing.TB, topic string) stream.Message
- func TestMessagesFromTopic(tb testing.TB, topic string) []stream.Message
- func TestOffsets(tb testing.TB, message stream.Message) []kafka.TopicPartition
- func TestProduceMessages(tb testing.TB, topic string, values ...interface{})
- func TestProducer(tb testing.TB, topic string, options ...streamconfig.Option) (stream.Producer, func())
- func TestProducerConfig(tb testing.TB, topic string, options ...streamconfig.Option) []streamconfig.Option
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewConsumer ¶
func NewConsumer(options ...streamconfig.Option) (stream.Consumer, error)
NewConsumer returns a new Kafka consumer.
func NewProducer ¶
func NewProducer(options ...streamconfig.Option) (stream.Producer, error)
NewProducer returns a new Kafka producer.
func TestConsumer ¶
func TestConsumer(tb testing.TB, topicAndGroup string, options ...streamconfig.Option) (stream.Consumer, func())
TestConsumer returns a new kafka consumer to be used in test cases. It also returns a function that should be deferred to clean up resources.
You pass the topic and group name of the consumer as a single argument.
func TestConsumerConfig ¶
func TestConsumerConfig(tb testing.TB, topicAndGroup string, options ...streamconfig.Option) []streamconfig.Option
TestConsumerConfig returns sane default options to use during testing of the kafkaclient consumer implementation.
func TestConsumerWithAssignments ¶
func TestConsumerWithAssignments(tb testing.TB, topicAndGroup string, options ...streamconfig.Option) (stream.Consumer, func())
TestConsumerWithAssignments is the same as `TestConsumer`, except that it waits for topic assignments to finish. If no topic assignment happens within a hard-coded period of 5 seconds, an error is triggered.
func TestMessageFromTopic ¶
TestMessageFromTopic returns a single message, consumed from the provided topic. It has a built-in timeout mechanism to prevent the test from getting stuck.
func TestMessagesFromTopic ¶
TestMessagesFromTopic returns all messages in a topic.
func TestOffsets ¶
TestOffsets returns a list of `kafka.TopicPartition`s.
func TestProduceMessages ¶
TestProduceMessages accepts a string to use as the topic, and an arbitrary number of argument to generate messages on the provided Kafka topic.
The provided extra arguments can be of several different types:
* `string` – The value is used as the kafka message value.
* `[]string` – The first value is used as the kafka message key, the second as the message value, all other values are ignored.
* `stream.Message` – The value (and, if applicable, the key) are set on a new `kafka.Message`.
* `*kafka.Message` – The message is delivered to Kafka as-is. If `kafka.TopicPartition` is empty, the passed in topic value is used instead.
func TestProducer ¶
func TestProducer(tb testing.TB, topic string, options ...streamconfig.Option) (stream.Producer, func())
TestProducer returns a new Kafka consumer to be used in test cases. It also returns a function that should be deferred to clean up resources.
You pass the topic and group name of the consumer as a single argument.
func TestProducerConfig ¶
func TestProducerConfig(tb testing.TB, topic string, options ...streamconfig.Option) []streamconfig.Option
TestProducerConfig returns sane default options to use during testing of the kafkaclient producer implementation.
Types ¶
This section is empty.