mocks

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 7, 2024 License: MIT Imports: 5 Imported by: 0

README

sarama/mocks

The mocks subpackage includes mock implementations that implement the interfaces of the major sarama types. You can use them to test your sarama applications using dependency injection.

The following mock objects are available:

The mocks allow you to set expectations on them. When you close the mocks, the expectations will be verified, and the results will be reported to the *testing.T object you provided when creating the mock.

Documentation

Overview

Package mocks provides mocks that can be used for testing applications that use Sarama. The mock types provided by this package implement the interfaces Sarama exports, so you can use them for dependency injection in your tests.

All mock instances require you to set expectations on them before you can use them. It will determine how the mock will behave. If an expectation is not met, it will make your test fail.

NOTE: this package currently does not fall under the API stability guarantee of Sarama as it is still considered experimental.

Index

Constants

View Source
const AnyOffset int64 = -1000

Variables

This section is empty.

Functions

func NewTestConfig

func NewTestConfig() *sarama.Config

NewTestConfig returns a config meant to be used by tests. Due to inconsistencies with the request versions the clients send using the default Kafka version and the response versions our mocks use, we default to the minimum Kafka version in most tests

Types

type AsyncProducer

type AsyncProducer struct {
	*TopicConfig
	// contains filtered or unexported fields
}

AsyncProducer implements sarama's Producer interface for testing purposes. Before you can send messages to it's Input channel, you have to set expectations so it knows how to handle the input; it returns an error if the number of messages received is bigger then the number of expectations set. You can also set a function in each expectation so that the message is checked by this function and an error is returned if the match fails.

func NewAsyncProducer

func NewAsyncProducer(t ErrorReporter, config *sarama.Config) *AsyncProducer

NewAsyncProducer instantiates a new Producer mock. The t argument should be the *testing.T instance of your test method. An error will be written to it if an expectation is violated. The config argument is validated and used to determine whether it should ack successes on the Successes channel and handle partitioning.

func (*AsyncProducer) AbortTxn

func (mp *AsyncProducer) AbortTxn() error

func (*AsyncProducer) AddMessageToTxn

func (mp *AsyncProducer) AddMessageToTxn(msg *sarama.ConsumerMessage, groupId string, metadata *string) error

func (*AsyncProducer) AddOffsetsToTxn

func (mp *AsyncProducer) AddOffsetsToTxn(offsets map[string][]*sarama.PartitionOffsetMetadata, groupId string) error

func (*AsyncProducer) AsyncClose

func (mp *AsyncProducer) AsyncClose()

AsyncClose corresponds with the AsyncClose method of sarama's Producer implementation. By closing a mock producer, you also tell it that no more input will be provided, so it will write an error to the test state if there's any remaining expectations.

func (*AsyncProducer) BeginTxn

func (mp *AsyncProducer) BeginTxn() error

func (*AsyncProducer) Close

func (mp *AsyncProducer) Close() error

Close corresponds with the Close method of sarama's Producer implementation. By closing a mock producer, you also tell it that no more input will be provided, so it will write an error to the test state if there's any remaining expectations.

func (*AsyncProducer) CommitTxn

func (mp *AsyncProducer) CommitTxn() error

func (*AsyncProducer) Errors

func (mp *AsyncProducer) Errors() <-chan *sarama.ProducerError

Errors corresponds with the Errors method of sarama's Producer implementation.

func (*AsyncProducer) ExpectInputAndFail

func (mp *AsyncProducer) ExpectInputAndFail(err error) *AsyncProducer

ExpectInputAndFail sets an expectation on the mock producer that a message will be provided on the input channel. The mock producer will handle the message as if it failed to produce successfully. This means it will make a ProducerError available on the Errors channel.

func (*AsyncProducer) ExpectInputAndSucceed

func (mp *AsyncProducer) ExpectInputAndSucceed() *AsyncProducer

ExpectInputAndSucceed sets an expectation on the mock producer that a message will be provided on the input channel. The mock producer will handle the message as if it is produced successfully, i.e. it will make it available on the Successes channel if the Producer.Return.Successes setting is set to true.

func (*AsyncProducer) ExpectInputWithCheckerFunctionAndFail

func (mp *AsyncProducer) ExpectInputWithCheckerFunctionAndFail(cf ValueChecker, err error) *AsyncProducer

ExpectInputWithCheckerFunctionAndFail sets an expectation on the mock producer that a message will be provided on the input channel. The mock producer will first call the given function to check the message value. If an error is returned it will be made available on the Errors channel otherwise the mock will handle the message as if it failed to produce successfully. This means it will make a ProducerError available on the Errors channel.

func (*AsyncProducer) ExpectInputWithCheckerFunctionAndSucceed

func (mp *AsyncProducer) ExpectInputWithCheckerFunctionAndSucceed(cf ValueChecker) *AsyncProducer

ExpectInputWithCheckerFunctionAndSucceed sets an expectation on the mock producer that a message will be provided on the input channel. The mock producer will call the given function to check the message value. If an error is returned it will be made available on the Errors channel otherwise the mock will handle the message as if it produced successfully, i.e. it will make it available on the Successes channel if the Producer.Return.Successes setting is set to true.

func (*AsyncProducer) ExpectInputWithMessageCheckerFunctionAndFail

func (mp *AsyncProducer) ExpectInputWithMessageCheckerFunctionAndFail(cf MessageChecker, err error) *AsyncProducer

ExpectInputWithMessageCheckerFunctionAndFail sets an expectation on the mock producer that a message will be provided on the input channel. The mock producer will first call the given function to check the message. If an error is returned it will be made available on the Errors channel otherwise the mock will handle the message as if it failed to produce successfully. This means it will make a ProducerError available on the Errors channel.

func (*AsyncProducer) ExpectInputWithMessageCheckerFunctionAndSucceed

func (mp *AsyncProducer) ExpectInputWithMessageCheckerFunctionAndSucceed(cf MessageChecker) *AsyncProducer

ExpectInputWithMessageCheckerFunctionAndSucceed sets an expectation on the mock producer that a message will be provided on the input channel. The mock producer will call the given function to check the message. If an error is returned it will be made available on the Errors channel otherwise the mock will handle the message as if it produced successfully, i.e. it will make it available on the Successes channel if the Producer.Return.Successes setting is set to true.

func (*AsyncProducer) Input

func (mp *AsyncProducer) Input() chan<- *sarama.ProducerMessage

Input corresponds with the Input method of sarama's Producer implementation. You have to set expectations on the mock producer before writing messages to the Input channel, so it knows how to handle them. If there is no more remaining expectations and a messages is written to the Input channel, the mock producer will write an error to the test state object.

func (*AsyncProducer) IsTransactional

func (mp *AsyncProducer) IsTransactional() bool

func (*AsyncProducer) Successes

func (mp *AsyncProducer) Successes() <-chan *sarama.ProducerMessage

Successes corresponds with the Successes method of sarama's Producer implementation.

func (*AsyncProducer) TxnStatus

func (mp *AsyncProducer) TxnStatus() sarama.ProducerTxnStatusFlag

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer implements sarama's Consumer interface for testing purposes. Before you can start consuming from this consumer, you have to register topic/partitions using ExpectConsumePartition, and set expectations on them.

func NewConsumer

func NewConsumer(t ErrorReporter, config *sarama.Config) *Consumer

NewConsumer returns a new mock Consumer instance. The t argument should be the *testing.T instance of your test method. An error will be written to it if an expectation is violated. The config argument can be set to nil; if it is non-nil it is validated.

func (*Consumer) Close

func (c *Consumer) Close() error

Close implements the Close method from the sarama.Consumer interface. It will close all registered PartitionConsumer instances.

func (*Consumer) ConsumePartition

func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error)

ConsumePartition implements the ConsumePartition method from the sarama.Consumer interface. Before you can start consuming a partition, you have to set expectations on it using ExpectConsumePartition. You can only consume a partition once per consumer.

func (*Consumer) ExpectConsumePartition

func (c *Consumer) ExpectConsumePartition(topic string, partition int32, offset int64) *PartitionConsumer

ExpectConsumePartition will register a topic/partition, so you can set expectations on it. The registered PartitionConsumer will be returned, so you can set expectations on it using method chaining. Once a topic/partition is registered, you are expected to start consuming it using ConsumePartition. If that doesn't happen, an error will be written to the error reporter once the mock consumer is closed. It also expects that the message and error channels be written with YieldMessage and YieldError accordingly, and be fully consumed once the mock consumer is closed if ExpectMessagesDrainedOnClose or ExpectErrorsDrainedOnClose have been called.

func (*Consumer) HighWaterMarks

func (c *Consumer) HighWaterMarks() map[string]map[int32]int64

func (*Consumer) Partitions

func (c *Consumer) Partitions(topic string) ([]int32, error)

Partitions returns the list of parititons for the given topic, as registered with SetTopicMetadata

func (*Consumer) Pause

func (c *Consumer) Pause(topicPartitions map[string][]int32)

Pause implements Consumer.

func (*Consumer) PauseAll

func (c *Consumer) PauseAll()

PauseAll implements Consumer.

func (*Consumer) Resume

func (c *Consumer) Resume(topicPartitions map[string][]int32)

Resume implements Consumer.

func (*Consumer) ResumeAll

func (c *Consumer) ResumeAll()

ResumeAll implements Consumer.

func (*Consumer) SetTopicMetadata

func (c *Consumer) SetTopicMetadata(metadata map[string][]int32)

SetTopicMetadata sets the clusters topic/partition metadata, which will be returned by Topics() and Partitions().

func (*Consumer) Topics

func (c *Consumer) Topics() ([]string, error)

Topics returns a list of topics, as registered with SetTopicMetadata

type ErrorReporter

type ErrorReporter interface {
	Errorf(string, ...interface{})
}

ErrorReporter is a simple interface that includes the testing.T methods we use to report expectation violations when using the mock objects.

type MessageChecker

type MessageChecker func(*sarama.ProducerMessage) error

MessageChecker is a function type to be set in each expectation of the producer mocks to check the message passed.

type PartitionConsumer

type PartitionConsumer struct {
	// contains filtered or unexported fields
}

PartitionConsumer implements sarama's PartitionConsumer interface for testing purposes. It is returned by the mock Consumers ConsumePartitionMethod, but only if it is registered first using the Consumer's ExpectConsumePartition method. Before consuming the Errors and Messages channel, you should specify what values will be provided on these channels using YieldMessage and YieldError.

func (*PartitionConsumer) AsyncClose

func (pc *PartitionConsumer) AsyncClose()

AsyncClose implements the AsyncClose method from the sarama.PartitionConsumer interface.

func (*PartitionConsumer) Close

func (pc *PartitionConsumer) Close() error

Close implements the Close method from the sarama.PartitionConsumer interface. It will verify whether the partition consumer was actually started.

func (*PartitionConsumer) Errors

func (pc *PartitionConsumer) Errors() <-chan *sarama.ConsumerError

Errors implements the Errors method from the sarama.PartitionConsumer interface.

func (*PartitionConsumer) ExpectErrorsDrainedOnClose

func (pc *PartitionConsumer) ExpectErrorsDrainedOnClose() *PartitionConsumer

ExpectErrorsDrainedOnClose sets an expectation on the partition consumer that the errors channel will be fully drained when Close is called. If this expectation is not met, an error is reported to the error reporter.

func (*PartitionConsumer) ExpectMessagesDrainedOnClose

func (pc *PartitionConsumer) ExpectMessagesDrainedOnClose() *PartitionConsumer

ExpectMessagesDrainedOnClose sets an expectation on the partition consumer that the messages channel will be fully drained when Close is called. If this expectation is not met, an error is reported to the error reporter.

func (*PartitionConsumer) HighWaterMarkOffset

func (pc *PartitionConsumer) HighWaterMarkOffset() int64

func (*PartitionConsumer) IsPaused

func (pc *PartitionConsumer) IsPaused() bool

IsPaused implements the IsPaused method from the sarama.PartitionConsumer interface.

func (*PartitionConsumer) Messages

func (pc *PartitionConsumer) Messages() <-chan *sarama.ConsumerMessage

Messages implements the Messages method from the sarama.PartitionConsumer interface.

func (*PartitionConsumer) Pause

func (pc *PartitionConsumer) Pause()

Pause implements the Pause method from the sarama.PartitionConsumer interface.

func (*PartitionConsumer) Resume

func (pc *PartitionConsumer) Resume()

Resume implements the Resume method from the sarama.PartitionConsumer interface.

func (*PartitionConsumer) YieldError

func (pc *PartitionConsumer) YieldError(err error) *PartitionConsumer

YieldError will yield an error on the Errors channel of this partition consumer when it is consumed. By default, the mock consumer will not verify whether this error was consumed from the Errors channel, because there are legitimate reasons for this not to happen. You can call ExpectErrorsDrainedOnClose so it will verify that the channel is empty on close.

func (*PartitionConsumer) YieldMessage

YieldMessage will yield a messages Messages channel of this partition consumer when it is consumed. By default, the mock consumer will not verify whether this message was consumed from the Messages channel, because there are legitimate reasons forthis not to happen. ou can call ExpectMessagesDrainedOnClose so it will verify that the channel is empty on close.

type SyncProducer

type SyncProducer struct {
	*TopicConfig
	// contains filtered or unexported fields
}

SyncProducer implements sarama's SyncProducer interface for testing purposes. Before you can use it, you have to set expectations on the mock SyncProducer to tell it how to handle calls to SendMessage, so you can easily test success and failure scenarios.

func NewSyncProducer

func NewSyncProducer(t ErrorReporter, config *sarama.Config) *SyncProducer

NewSyncProducer instantiates a new SyncProducer mock. The t argument should be the *testing.T instance of your test method. An error will be written to it if an expectation is violated. The config argument is validated and used to handle partitioning.

func (*SyncProducer) AbortTxn

func (sp *SyncProducer) AbortTxn() error

func (*SyncProducer) AddMessageToTxn

func (sp *SyncProducer) AddMessageToTxn(msg *sarama.ConsumerMessage, groupId string, metadata *string) error

func (*SyncProducer) AddOffsetsToTxn

func (sp *SyncProducer) AddOffsetsToTxn(offsets map[string][]*sarama.PartitionOffsetMetadata, groupId string) error

func (*SyncProducer) BeginTxn

func (sp *SyncProducer) BeginTxn() error

func (*SyncProducer) Close

func (sp *SyncProducer) Close() error

Close corresponds with the Close method of sarama's SyncProducer implementation. By closing a mock syncproducer, you also tell it that no more SendMessage calls will follow, so it will write an error to the test state if there's any remaining expectations.

func (*SyncProducer) CommitTxn

func (sp *SyncProducer) CommitTxn() error

func (*SyncProducer) ExpectSendMessageAndFail

func (sp *SyncProducer) ExpectSendMessageAndFail(err error) *SyncProducer

ExpectSendMessageAndFail sets an expectation on the mock producer that SendMessage will be called. The mock producer will handle the message as if it failed to produce successfully, i.e. by returning the provided error.

func (*SyncProducer) ExpectSendMessageAndSucceed

func (sp *SyncProducer) ExpectSendMessageAndSucceed() *SyncProducer

ExpectSendMessageAndSucceed sets an expectation on the mock producer that SendMessage will be called. The mock producer will handle the message as if it produced successfully, i.e. by returning a valid partition, and offset, and a nil error.

func (*SyncProducer) ExpectSendMessageWithCheckerFunctionAndFail

func (sp *SyncProducer) ExpectSendMessageWithCheckerFunctionAndFail(cf ValueChecker, err error) *SyncProducer

ExpectSendMessageWithCheckerFunctionAndFail sets an expectation on the mock producer that SendMessage will be called. The mock producer will first call the given function to check the message value. It will cascade the error of the function, if any, or handle the message as if it failed to produce successfully, i.e. by returning the provided error.

func (*SyncProducer) ExpectSendMessageWithCheckerFunctionAndSucceed

func (sp *SyncProducer) ExpectSendMessageWithCheckerFunctionAndSucceed(cf ValueChecker) *SyncProducer

ExpectSendMessageWithCheckerFunctionAndSucceed sets an expectation on the mock producer that SendMessage will be called. The mock producer will first call the given function to check the message value. It will cascade the error of the function, if any, or handle the message as if it produced successfully, i.e. by returning a valid partition, and offset, and a nil error.

func (*SyncProducer) ExpectSendMessageWithMessageCheckerFunctionAndFail

func (sp *SyncProducer) ExpectSendMessageWithMessageCheckerFunctionAndFail(cf MessageChecker, err error) *SyncProducer

ExpectSendMessageWithMessageCheckerFunctionAndFail sets an expectation on the mock producer that SendMessage will be called. The mock producer will first call the given function to check the message. It will cascade the error of the function, if any, or handle the message as if it failed to produce successfully, i.e. by returning the provided error.

func (*SyncProducer) ExpectSendMessageWithMessageCheckerFunctionAndSucceed

func (sp *SyncProducer) ExpectSendMessageWithMessageCheckerFunctionAndSucceed(cf MessageChecker) *SyncProducer

ExpectSendMessageWithMessageCheckerFunctionAndSucceed sets an expectation on the mock producer that SendMessage will be called. The mock producer will first call the given function to check the message. It will cascade the error of the function, if any, or handle the message as if it produced successfully, i.e. by returning a valid partition, and offset, and a nil error.

func (*SyncProducer) IsTransactional

func (sp *SyncProducer) IsTransactional() bool

func (*SyncProducer) SendMessage

func (sp *SyncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error)

SendMessage corresponds with the SendMessage method of sarama's SyncProducer implementation. You have to set expectations on the mock producer before calling SendMessage, so it knows how to handle them. You can set a function in each expectation so that the message value checked by this function and an error is returned if the match fails. If there is no more remaining expectation when SendMessage is called, the mock producer will write an error to the test state object.

func (*SyncProducer) SendMessages

func (sp *SyncProducer) SendMessages(msgs []*sarama.ProducerMessage) error

SendMessages corresponds with the SendMessages method of sarama's SyncProducer implementation. You have to set expectations on the mock producer before calling SendMessages, so it knows how to handle them. If there is no more remaining expectations when SendMessages is called, the mock producer will write an error to the test state object.

func (*SyncProducer) TxnStatus

func (sp *SyncProducer) TxnStatus() sarama.ProducerTxnStatusFlag

type TopicConfig

type TopicConfig struct {
	// contains filtered or unexported fields
}

TopicConfig describes a mock topic structure for the mock producers’ partitioning needs.

func NewTopicConfig

func NewTopicConfig() *TopicConfig

NewTopicConfig makes a configuration which defaults to 32 partitions for every topic.

func (*TopicConfig) SetDefaultPartitions

func (pc *TopicConfig) SetDefaultPartitions(n int32)

SetDefaultPartitions sets the number of partitions any topic not explicitly configured otherwise (by SetPartitions) will have from the perspective of created partitioners.

func (*TopicConfig) SetPartitions

func (pc *TopicConfig) SetPartitions(partitions map[string]int32)

SetPartitions sets the number of partitions the partitioners will see for specific topics. This only applies to messages produced after setting them.

type ValueChecker

type ValueChecker func(val []byte) error

ValueChecker is a function type to be set in each expectation of the producer mocks to check the value passed.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL