Documentation ¶
Index ¶
- Variables
- func NewConsumer(options ...func(*streamconfig.Consumer)) (stream.Consumer, error)
- func NewProducer(options ...func(*streamconfig.Producer)) (stream.Producer, error)
- func TestConsumer(tb testing.TB, topicAndGroup string, options ...func(c *streamconfig.Consumer)) (stream.Consumer, func())
- func TestConsumerConfig(tb testing.TB, topicAndGroup string, options ...func(c *streamconfig.Consumer)) []func(c *streamconfig.Consumer)
- func TestMessageFromConsumer(tb testing.TB, consumer stream.Consumer) streammsg.Message
- func TestMessageFromTopic(tb testing.TB, topic string) streammsg.Message
- func TestMessagesFromTopic(tb testing.TB, topic string) []streammsg.Message
- func TestOffsets(tb testing.TB, message streammsg.Message) []kafka.TopicPartition
- func TestProduceMessages(tb testing.TB, topic string, values ...interface{})
- func TestProducer(tb testing.TB, topic string, options ...func(c *streamconfig.Producer)) (stream.Producer, func())
- func TestProducerConfig(tb testing.TB, topic string, options ...func(c *streamconfig.Producer)) []func(c *streamconfig.Producer)
- type Consumer
- type Producer
Constants ¶
This section is empty.
Variables ¶
var ( // TestBrokerAddress is the address used to connect to the testing broker. // This defaults to 127.0.0.1:9092, but can be overwritten if desired. TestBrokerAddress = "127.0.0.1:9092" // TestTimeoutMultiplier can be used to increase the default timeouts during // test runs when waiting for time-sensitive values to be returned. It // defaults to a multiplier of 1. // // This is specifically useful on slower environments like a CI server. TestTimeoutMultiplier = 1 )
Functions ¶
func NewConsumer ¶
func NewConsumer(options ...func(*streamconfig.Consumer)) (stream.Consumer, error)
NewConsumer returns a new Kafka consumer.
func NewProducer ¶
func NewProducer(options ...func(*streamconfig.Producer)) (stream.Producer, error)
NewProducer returns a new Kafka producer.
func TestConsumer ¶
func TestConsumer(tb testing.TB, topicAndGroup string, options ...func(c *streamconfig.Consumer)) (stream.Consumer, func())
TestConsumer returns a new kafka consumer to be used in test cases. It also returns a function that should be deferred to clean up resources.
You pass the topic and group name of the consumer as a single argument.
func TestConsumerConfig ¶
func TestConsumerConfig(tb testing.TB, topicAndGroup string, options ...func(c *streamconfig.Consumer)) []func(c *streamconfig.Consumer)
TestConsumerConfig returns sane default options to use during testing of the kafkaclient consumer implementation.
func TestMessageFromConsumer ¶
TestMessageFromConsumer returns a single message, consumed from the provided consumer. It has a built-in timeout mechanism to prevent the test from getting stuck.
func TestMessageFromTopic ¶
TestMessageFromTopic returns a single message, consumed from the provided topic. It has a built-in timeout mechanism to prevent the test from getting stuck.
func TestMessagesFromTopic ¶
TestMessagesFromTopic returns all messages in a topic.
func TestOffsets ¶
TestOffsets returns a list of `kafka.TopicPartition`s.
func TestProduceMessages ¶
TestProduceMessages accepts a string to use as the topic, and an arbitrary number of argument to generate messages on the provided Kafka topic.
The provided extra arguments can be of several different types:
* `string` – The value is used as the kafka message value.
* `[]string` – The first value is used as the kafka message key, the second as the message value, all other values are ignored.
- `streammsg.Message` – The value (and, if applicable, the key) are set on a new `kafka.Message`.
* `*kafka.Message` – The message is delivered to Kafka as-is. If `kafka.TopicPartition` is empty, the passed in topic value is used instead.
func TestProducer ¶
func TestProducer(tb testing.TB, topic string, options ...func(c *streamconfig.Producer)) (stream.Producer, func())
TestProducer returns a new kafka consumer to be used in test cases. It also returns a function that should be deferred to clean up resources.
You pass the topic and group name of the consumer as a single argument.
func TestProducerConfig ¶
func TestProducerConfig(tb testing.TB, topic string, options ...func(c *streamconfig.Producer)) []func(c *streamconfig.Producer)
TestProducerConfig returns sane default options to use during testing of the kafkaclient producer implementation.
Types ¶
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer implements the stream.Consumer interface for the Kafka client.
func (*Consumer) Config ¶
func (c *Consumer) Config() streamconfig.Consumer
Config returns a read-only representation of the consumer configuration.
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer implements the stream.Producer interface for the Kafka client.
func (*Producer) Close ¶
Close closes the producer connection. This function blocks until all messages still in the channel have been processed, and the channel is properly closed.
func (*Producer) Config ¶
func (p *Producer) Config() streamconfig.Producer
Config returns a read-only representation of the producer configuration.