Documentation ¶
Overview ¶
Package mocks provides mocks that can be used for testing applications that use Sarama. The mock types provided by this package implement the interfaces Sarama exports, so you can use them for dependency injection in your tests.
All mock instances require you to set expectations on them before you can use them. It will determine how the mock will behave. If an expectation is not met, it will make your test fail.
NOTE: this package currently does not fall under the API stability guarantee of Sarama as it is still considered experimental.
Index ¶
- Constants
- func NewTestConfig() *sarama.Config
- type AsyncProducer
- func (mp *AsyncProducer) AbortTxn() error
- func (mp *AsyncProducer) AddMessageToTxn(msg *sarama.ConsumerMessage, groupId string, metadata *string) error
- func (mp *AsyncProducer) AddOffsetsToTxn(offsets map[string][]*sarama.PartitionOffsetMetadata, groupId string) error
- func (mp *AsyncProducer) AsyncClose()
- func (mp *AsyncProducer) BeginTxn() error
- func (mp *AsyncProducer) Close() error
- func (mp *AsyncProducer) CommitTxn() error
- func (mp *AsyncProducer) Errors() <-chan *sarama.ProducerError
- func (mp *AsyncProducer) ExpectInputAndFail(err error) *AsyncProducer
- func (mp *AsyncProducer) ExpectInputAndSucceed() *AsyncProducer
- func (mp *AsyncProducer) ExpectInputWithCheckerFunctionAndFail(cf ValueChecker, err error) *AsyncProducer
- func (mp *AsyncProducer) ExpectInputWithCheckerFunctionAndSucceed(cf ValueChecker) *AsyncProducer
- func (mp *AsyncProducer) ExpectInputWithMessageCheckerFunctionAndFail(cf MessageChecker, err error) *AsyncProducer
- func (mp *AsyncProducer) ExpectInputWithMessageCheckerFunctionAndSucceed(cf MessageChecker) *AsyncProducer
- func (mp *AsyncProducer) Input() chan<- *sarama.ProducerMessage
- func (mp *AsyncProducer) IsTransactional() bool
- func (mp *AsyncProducer) Successes() <-chan *sarama.ProducerMessage
- func (mp *AsyncProducer) TxnStatus() sarama.ProducerTxnStatusFlag
- type Consumer
- func (c *Consumer) Close() error
- func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error)
- func (c *Consumer) ExpectConsumePartition(topic string, partition int32, offset int64) *PartitionConsumer
- func (c *Consumer) HighWaterMarks() map[string]map[int32]int64
- func (c *Consumer) Partitions(topic string) ([]int32, error)
- func (c *Consumer) Pause(topicPartitions map[string][]int32)
- func (c *Consumer) PauseAll()
- func (c *Consumer) Resume(topicPartitions map[string][]int32)
- func (c *Consumer) ResumeAll()
- func (c *Consumer) SetTopicMetadata(metadata map[string][]int32)
- func (c *Consumer) Topics() ([]string, error)
- type ErrorReporter
- type MessageChecker
- type PartitionConsumer
- func (pc *PartitionConsumer) AsyncClose()
- func (pc *PartitionConsumer) Close() error
- func (pc *PartitionConsumer) Errors() <-chan *sarama.ConsumerError
- func (pc *PartitionConsumer) ExpectErrorsDrainedOnClose() *PartitionConsumer
- func (pc *PartitionConsumer) ExpectMessagesDrainedOnClose() *PartitionConsumer
- func (pc *PartitionConsumer) HighWaterMarkOffset() int64
- func (pc *PartitionConsumer) IsPaused() bool
- func (pc *PartitionConsumer) Messages() <-chan *sarama.ConsumerMessage
- func (pc *PartitionConsumer) Pause()
- func (pc *PartitionConsumer) Resume()
- func (pc *PartitionConsumer) YieldError(err error) *PartitionConsumer
- func (pc *PartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage) *PartitionConsumer
- type SyncProducer
- func (sp *SyncProducer) AbortTxn() error
- func (sp *SyncProducer) AddMessageToTxn(msg *sarama.ConsumerMessage, groupId string, metadata *string) error
- func (sp *SyncProducer) AddOffsetsToTxn(offsets map[string][]*sarama.PartitionOffsetMetadata, groupId string) error
- func (sp *SyncProducer) BeginTxn() error
- func (sp *SyncProducer) Close() error
- func (sp *SyncProducer) CommitTxn() error
- func (sp *SyncProducer) ExpectSendMessageAndFail(err error) *SyncProducer
- func (sp *SyncProducer) ExpectSendMessageAndSucceed() *SyncProducer
- func (sp *SyncProducer) ExpectSendMessageWithCheckerFunctionAndFail(cf ValueChecker, err error) *SyncProducer
- func (sp *SyncProducer) ExpectSendMessageWithCheckerFunctionAndSucceed(cf ValueChecker) *SyncProducer
- func (sp *SyncProducer) ExpectSendMessageWithMessageCheckerFunctionAndFail(cf MessageChecker, err error) *SyncProducer
- func (sp *SyncProducer) ExpectSendMessageWithMessageCheckerFunctionAndSucceed(cf MessageChecker) *SyncProducer
- func (sp *SyncProducer) IsTransactional() bool
- func (sp *SyncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error)
- func (sp *SyncProducer) SendMessages(msgs []*sarama.ProducerMessage) error
- func (sp *SyncProducer) TxnStatus() sarama.ProducerTxnStatusFlag
- type TopicConfig
- type ValueChecker
Constants ¶
const AnyOffset int64 = -1000
Variables ¶
This section is empty.
Functions ¶
func NewTestConfig ¶
NewTestConfig returns a config meant to be used by tests. Due to inconsistencies with the request versions the clients send using the default Kafka version and the response versions our mocks use, we default to the minimum Kafka version in most tests
Types ¶
type AsyncProducer ¶
type AsyncProducer struct { *TopicConfig // contains filtered or unexported fields }
AsyncProducer implements sarama's Producer interface for testing purposes. Before you can send messages to it's Input channel, you have to set expectations so it knows how to handle the input; it returns an error if the number of messages received is bigger then the number of expectations set. You can also set a function in each expectation so that the message is checked by this function and an error is returned if the match fails.
func NewAsyncProducer ¶
func NewAsyncProducer(t ErrorReporter, config *sarama.Config) *AsyncProducer
NewAsyncProducer instantiates a new Producer mock. The t argument should be the *testing.T instance of your test method. An error will be written to it if an expectation is violated. The config argument is validated and used to determine whether it should ack successes on the Successes channel and handle partitioning.
func (*AsyncProducer) AbortTxn ¶
func (mp *AsyncProducer) AbortTxn() error
func (*AsyncProducer) AddMessageToTxn ¶
func (mp *AsyncProducer) AddMessageToTxn(msg *sarama.ConsumerMessage, groupId string, metadata *string) error
func (*AsyncProducer) AddOffsetsToTxn ¶
func (mp *AsyncProducer) AddOffsetsToTxn(offsets map[string][]*sarama.PartitionOffsetMetadata, groupId string) error
func (*AsyncProducer) AsyncClose ¶
func (mp *AsyncProducer) AsyncClose()
AsyncClose corresponds with the AsyncClose method of sarama's Producer implementation. By closing a mock producer, you also tell it that no more input will be provided, so it will write an error to the test state if there's any remaining expectations.
func (*AsyncProducer) BeginTxn ¶
func (mp *AsyncProducer) BeginTxn() error
func (*AsyncProducer) Close ¶
func (mp *AsyncProducer) Close() error
Close corresponds with the Close method of sarama's Producer implementation. By closing a mock producer, you also tell it that no more input will be provided, so it will write an error to the test state if there's any remaining expectations.
func (*AsyncProducer) CommitTxn ¶
func (mp *AsyncProducer) CommitTxn() error
func (*AsyncProducer) Errors ¶
func (mp *AsyncProducer) Errors() <-chan *sarama.ProducerError
Errors corresponds with the Errors method of sarama's Producer implementation.
func (*AsyncProducer) ExpectInputAndFail ¶
func (mp *AsyncProducer) ExpectInputAndFail(err error) *AsyncProducer
ExpectInputAndFail sets an expectation on the mock producer that a message will be provided on the input channel. The mock producer will handle the message as if it failed to produce successfully. This means it will make a ProducerError available on the Errors channel.
func (*AsyncProducer) ExpectInputAndSucceed ¶
func (mp *AsyncProducer) ExpectInputAndSucceed() *AsyncProducer
ExpectInputAndSucceed sets an expectation on the mock producer that a message will be provided on the input channel. The mock producer will handle the message as if it is produced successfully, i.e. it will make it available on the Successes channel if the Producer.Return.Successes setting is set to true.
func (*AsyncProducer) ExpectInputWithCheckerFunctionAndFail ¶
func (mp *AsyncProducer) ExpectInputWithCheckerFunctionAndFail(cf ValueChecker, err error) *AsyncProducer
ExpectInputWithCheckerFunctionAndFail sets an expectation on the mock producer that a message will be provided on the input channel. The mock producer will first call the given function to check the message value. If an error is returned it will be made available on the Errors channel otherwise the mock will handle the message as if it failed to produce successfully. This means it will make a ProducerError available on the Errors channel.
func (*AsyncProducer) ExpectInputWithCheckerFunctionAndSucceed ¶
func (mp *AsyncProducer) ExpectInputWithCheckerFunctionAndSucceed(cf ValueChecker) *AsyncProducer
ExpectInputWithCheckerFunctionAndSucceed sets an expectation on the mock producer that a message will be provided on the input channel. The mock producer will call the given function to check the message value. If an error is returned it will be made available on the Errors channel otherwise the mock will handle the message as if it produced successfully, i.e. it will make it available on the Successes channel if the Producer.Return.Successes setting is set to true.
func (*AsyncProducer) ExpectInputWithMessageCheckerFunctionAndFail ¶
func (mp *AsyncProducer) ExpectInputWithMessageCheckerFunctionAndFail(cf MessageChecker, err error) *AsyncProducer
ExpectInputWithMessageCheckerFunctionAndFail sets an expectation on the mock producer that a message will be provided on the input channel. The mock producer will first call the given function to check the message. If an error is returned it will be made available on the Errors channel otherwise the mock will handle the message as if it failed to produce successfully. This means it will make a ProducerError available on the Errors channel.
func (*AsyncProducer) ExpectInputWithMessageCheckerFunctionAndSucceed ¶
func (mp *AsyncProducer) ExpectInputWithMessageCheckerFunctionAndSucceed(cf MessageChecker) *AsyncProducer
ExpectInputWithMessageCheckerFunctionAndSucceed sets an expectation on the mock producer that a message will be provided on the input channel. The mock producer will call the given function to check the message. If an error is returned it will be made available on the Errors channel otherwise the mock will handle the message as if it produced successfully, i.e. it will make it available on the Successes channel if the Producer.Return.Successes setting is set to true.
func (*AsyncProducer) Input ¶
func (mp *AsyncProducer) Input() chan<- *sarama.ProducerMessage
Input corresponds with the Input method of sarama's Producer implementation. You have to set expectations on the mock producer before writing messages to the Input channel, so it knows how to handle them. If there is no more remaining expectations and a messages is written to the Input channel, the mock producer will write an error to the test state object.
func (*AsyncProducer) IsTransactional ¶
func (mp *AsyncProducer) IsTransactional() bool
func (*AsyncProducer) Successes ¶
func (mp *AsyncProducer) Successes() <-chan *sarama.ProducerMessage
Successes corresponds with the Successes method of sarama's Producer implementation.
func (*AsyncProducer) TxnStatus ¶
func (mp *AsyncProducer) TxnStatus() sarama.ProducerTxnStatusFlag
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer implements sarama's Consumer interface for testing purposes. Before you can start consuming from this consumer, you have to register topic/partitions using ExpectConsumePartition, and set expectations on them.
func NewConsumer ¶
func NewConsumer(t ErrorReporter, config *sarama.Config) *Consumer
NewConsumer returns a new mock Consumer instance. The t argument should be the *testing.T instance of your test method. An error will be written to it if an expectation is violated. The config argument can be set to nil; if it is non-nil it is validated.
func (*Consumer) Close ¶
Close implements the Close method from the sarama.Consumer interface. It will close all registered PartitionConsumer instances.
func (*Consumer) ConsumePartition ¶
func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error)
ConsumePartition implements the ConsumePartition method from the sarama.Consumer interface. Before you can start consuming a partition, you have to set expectations on it using ExpectConsumePartition. You can only consume a partition once per consumer.
func (*Consumer) ExpectConsumePartition ¶
func (c *Consumer) ExpectConsumePartition(topic string, partition int32, offset int64) *PartitionConsumer
ExpectConsumePartition will register a topic/partition, so you can set expectations on it. The registered PartitionConsumer will be returned, so you can set expectations on it using method chaining. Once a topic/partition is registered, you are expected to start consuming it using ConsumePartition. If that doesn't happen, an error will be written to the error reporter once the mock consumer is closed. It also expects that the message and error channels be written with YieldMessage and YieldError accordingly, and be fully consumed once the mock consumer is closed if ExpectMessagesDrainedOnClose or ExpectErrorsDrainedOnClose have been called.
func (*Consumer) Partitions ¶
Partitions returns the list of parititons for the given topic, as registered with SetTopicMetadata
func (*Consumer) SetTopicMetadata ¶
SetTopicMetadata sets the clusters topic/partition metadata, which will be returned by Topics() and Partitions().
type ErrorReporter ¶
type ErrorReporter interface {
Errorf(string, ...interface{})
}
ErrorReporter is a simple interface that includes the testing.T methods we use to report expectation violations when using the mock objects.
type MessageChecker ¶
type MessageChecker func(*sarama.ProducerMessage) error
MessageChecker is a function type to be set in each expectation of the producer mocks to check the message passed.
type PartitionConsumer ¶
type PartitionConsumer struct {
// contains filtered or unexported fields
}
PartitionConsumer implements sarama's PartitionConsumer interface for testing purposes. It is returned by the mock Consumers ConsumePartitionMethod, but only if it is registered first using the Consumer's ExpectConsumePartition method. Before consuming the Errors and Messages channel, you should specify what values will be provided on these channels using YieldMessage and YieldError.
func (*PartitionConsumer) AsyncClose ¶
func (pc *PartitionConsumer) AsyncClose()
AsyncClose implements the AsyncClose method from the sarama.PartitionConsumer interface.
func (*PartitionConsumer) Close ¶
func (pc *PartitionConsumer) Close() error
Close implements the Close method from the sarama.PartitionConsumer interface. It will verify whether the partition consumer was actually started.
func (*PartitionConsumer) Errors ¶
func (pc *PartitionConsumer) Errors() <-chan *sarama.ConsumerError
Errors implements the Errors method from the sarama.PartitionConsumer interface.
func (*PartitionConsumer) ExpectErrorsDrainedOnClose ¶
func (pc *PartitionConsumer) ExpectErrorsDrainedOnClose() *PartitionConsumer
ExpectErrorsDrainedOnClose sets an expectation on the partition consumer that the errors channel will be fully drained when Close is called. If this expectation is not met, an error is reported to the error reporter.
func (*PartitionConsumer) ExpectMessagesDrainedOnClose ¶
func (pc *PartitionConsumer) ExpectMessagesDrainedOnClose() *PartitionConsumer
ExpectMessagesDrainedOnClose sets an expectation on the partition consumer that the messages channel will be fully drained when Close is called. If this expectation is not met, an error is reported to the error reporter.
func (*PartitionConsumer) HighWaterMarkOffset ¶
func (pc *PartitionConsumer) HighWaterMarkOffset() int64
func (*PartitionConsumer) IsPaused ¶
func (pc *PartitionConsumer) IsPaused() bool
IsPaused implements the IsPaused method from the sarama.PartitionConsumer interface.
func (*PartitionConsumer) Messages ¶
func (pc *PartitionConsumer) Messages() <-chan *sarama.ConsumerMessage
Messages implements the Messages method from the sarama.PartitionConsumer interface.
func (*PartitionConsumer) Pause ¶
func (pc *PartitionConsumer) Pause()
Pause implements the Pause method from the sarama.PartitionConsumer interface.
func (*PartitionConsumer) Resume ¶
func (pc *PartitionConsumer) Resume()
Resume implements the Resume method from the sarama.PartitionConsumer interface.
func (*PartitionConsumer) YieldError ¶
func (pc *PartitionConsumer) YieldError(err error) *PartitionConsumer
YieldError will yield an error on the Errors channel of this partition consumer when it is consumed. By default, the mock consumer will not verify whether this error was consumed from the Errors channel, because there are legitimate reasons for this not to happen. You can call ExpectErrorsDrainedOnClose so it will verify that the channel is empty on close.
func (*PartitionConsumer) YieldMessage ¶
func (pc *PartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage) *PartitionConsumer
YieldMessage will yield a messages Messages channel of this partition consumer when it is consumed. By default, the mock consumer will not verify whether this message was consumed from the Messages channel, because there are legitimate reasons forthis not to happen. ou can call ExpectMessagesDrainedOnClose so it will verify that the channel is empty on close.
type SyncProducer ¶
type SyncProducer struct { *TopicConfig // contains filtered or unexported fields }
SyncProducer implements sarama's SyncProducer interface for testing purposes. Before you can use it, you have to set expectations on the mock SyncProducer to tell it how to handle calls to SendMessage, so you can easily test success and failure scenarios.
func NewSyncProducer ¶
func NewSyncProducer(t ErrorReporter, config *sarama.Config) *SyncProducer
NewSyncProducer instantiates a new SyncProducer mock. The t argument should be the *testing.T instance of your test method. An error will be written to it if an expectation is violated. The config argument is validated and used to handle partitioning.
func (*SyncProducer) AbortTxn ¶
func (sp *SyncProducer) AbortTxn() error
func (*SyncProducer) AddMessageToTxn ¶
func (sp *SyncProducer) AddMessageToTxn(msg *sarama.ConsumerMessage, groupId string, metadata *string) error
func (*SyncProducer) AddOffsetsToTxn ¶
func (sp *SyncProducer) AddOffsetsToTxn(offsets map[string][]*sarama.PartitionOffsetMetadata, groupId string) error
func (*SyncProducer) BeginTxn ¶
func (sp *SyncProducer) BeginTxn() error
func (*SyncProducer) Close ¶
func (sp *SyncProducer) Close() error
Close corresponds with the Close method of sarama's SyncProducer implementation. By closing a mock syncproducer, you also tell it that no more SendMessage calls will follow, so it will write an error to the test state if there's any remaining expectations.
func (*SyncProducer) CommitTxn ¶
func (sp *SyncProducer) CommitTxn() error
func (*SyncProducer) ExpectSendMessageAndFail ¶
func (sp *SyncProducer) ExpectSendMessageAndFail(err error) *SyncProducer
ExpectSendMessageAndFail sets an expectation on the mock producer that SendMessage will be called. The mock producer will handle the message as if it failed to produce successfully, i.e. by returning the provided error.
func (*SyncProducer) ExpectSendMessageAndSucceed ¶
func (sp *SyncProducer) ExpectSendMessageAndSucceed() *SyncProducer
ExpectSendMessageAndSucceed sets an expectation on the mock producer that SendMessage will be called. The mock producer will handle the message as if it produced successfully, i.e. by returning a valid partition, and offset, and a nil error.
func (*SyncProducer) ExpectSendMessageWithCheckerFunctionAndFail ¶
func (sp *SyncProducer) ExpectSendMessageWithCheckerFunctionAndFail(cf ValueChecker, err error) *SyncProducer
ExpectSendMessageWithCheckerFunctionAndFail sets an expectation on the mock producer that SendMessage will be called. The mock producer will first call the given function to check the message value. It will cascade the error of the function, if any, or handle the message as if it failed to produce successfully, i.e. by returning the provided error.
func (*SyncProducer) ExpectSendMessageWithCheckerFunctionAndSucceed ¶
func (sp *SyncProducer) ExpectSendMessageWithCheckerFunctionAndSucceed(cf ValueChecker) *SyncProducer
ExpectSendMessageWithCheckerFunctionAndSucceed sets an expectation on the mock producer that SendMessage will be called. The mock producer will first call the given function to check the message value. It will cascade the error of the function, if any, or handle the message as if it produced successfully, i.e. by returning a valid partition, and offset, and a nil error.
func (*SyncProducer) ExpectSendMessageWithMessageCheckerFunctionAndFail ¶
func (sp *SyncProducer) ExpectSendMessageWithMessageCheckerFunctionAndFail(cf MessageChecker, err error) *SyncProducer
ExpectSendMessageWithMessageCheckerFunctionAndFail sets an expectation on the mock producer that SendMessage will be called. The mock producer will first call the given function to check the message. It will cascade the error of the function, if any, or handle the message as if it failed to produce successfully, i.e. by returning the provided error.
func (*SyncProducer) ExpectSendMessageWithMessageCheckerFunctionAndSucceed ¶
func (sp *SyncProducer) ExpectSendMessageWithMessageCheckerFunctionAndSucceed(cf MessageChecker) *SyncProducer
ExpectSendMessageWithMessageCheckerFunctionAndSucceed sets an expectation on the mock producer that SendMessage will be called. The mock producer will first call the given function to check the message. It will cascade the error of the function, if any, or handle the message as if it produced successfully, i.e. by returning a valid partition, and offset, and a nil error.
func (*SyncProducer) IsTransactional ¶
func (sp *SyncProducer) IsTransactional() bool
func (*SyncProducer) SendMessage ¶
func (sp *SyncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error)
SendMessage corresponds with the SendMessage method of sarama's SyncProducer implementation. You have to set expectations on the mock producer before calling SendMessage, so it knows how to handle them. You can set a function in each expectation so that the message value checked by this function and an error is returned if the match fails. If there is no more remaining expectation when SendMessage is called, the mock producer will write an error to the test state object.
func (*SyncProducer) SendMessages ¶
func (sp *SyncProducer) SendMessages(msgs []*sarama.ProducerMessage) error
SendMessages corresponds with the SendMessages method of sarama's SyncProducer implementation. You have to set expectations on the mock producer before calling SendMessages, so it knows how to handle them. If there is no more remaining expectations when SendMessages is called, the mock producer will write an error to the test state object.
func (*SyncProducer) TxnStatus ¶
func (sp *SyncProducer) TxnStatus() sarama.ProducerTxnStatusFlag
type TopicConfig ¶
type TopicConfig struct {
// contains filtered or unexported fields
}
TopicConfig describes a mock topic structure for the mock producers’ partitioning needs.
func NewTopicConfig ¶
func NewTopicConfig() *TopicConfig
NewTopicConfig makes a configuration which defaults to 32 partitions for every topic.
func (*TopicConfig) SetDefaultPartitions ¶
func (pc *TopicConfig) SetDefaultPartitions(n int32)
SetDefaultPartitions sets the number of partitions any topic not explicitly configured otherwise (by SetPartitions) will have from the perspective of created partitioners.
func (*TopicConfig) SetPartitions ¶
func (pc *TopicConfig) SetPartitions(partitions map[string]int32)
SetPartitions sets the number of partitions the partitioners will see for specific topics. This only applies to messages produced after setting them.
type ValueChecker ¶
ValueChecker is a function type to be set in each expectation of the producer mocks to check the value passed.