README ¶
KafkaTest
This package contains mocks intended to be used by users of this library for testing.
Empty mocks
If you require to implement your own mock functionality, you can use the empty mocks, which are created using moq
to implement the interfaces kafkatest.ConsumerGroup
, kafkatest.Producer
and Message
These interfaces expose the same methods as the real Producer and ConsumerGroup structs. You can instantiate the mocks like so:
consumer := kafkatest.ConsumerGroupMock{...}
producer := kafkatest.ProducerMock{...}
message := kafkatest.MessageMock{...}
Functional mocks
The previous mocks have been extended by implementing functionality that emulates a real Producer, Consumer and message; but without communicating with any real Kafka broker. If you require a functional mock to test how you interact with kafka, you can use these mocks (kafkatest.MessageConsumer
, kafaktest.MessageProducer
and kafkatest.Message
) like so:
consumer := kafkatest.NewMessageConsumer(true)
producer := kafkatest.NewMessageProducer(true)
message := kafkatest.NewMessage(data, offset)
Documentation ¶
Index ¶
- type IConsumerGroupMock
- func (mock *IConsumerGroupMock) Channels() *kafka.ConsumerGroupChannels
- func (mock *IConsumerGroupMock) ChannelsCalls() []struct{}
- func (mock *IConsumerGroupMock) Checker(ctx context.Context, state *health.CheckState) error
- func (mock *IConsumerGroupMock) CheckerCalls() []struct{ ... }
- func (mock *IConsumerGroupMock) Close(ctx context.Context) error
- func (mock *IConsumerGroupMock) CloseCalls() []struct{ ... }
- func (mock *IConsumerGroupMock) Initialise(ctx context.Context) error
- func (mock *IConsumerGroupMock) InitialiseCalls() []struct{ ... }
- func (mock *IConsumerGroupMock) IsInitialised() bool
- func (mock *IConsumerGroupMock) IsInitialisedCalls() []struct{}
- func (mock *IConsumerGroupMock) LogErrors(ctx context.Context)
- func (mock *IConsumerGroupMock) LogErrorsCalls() []struct{ ... }
- func (mock *IConsumerGroupMock) OnHealthUpdate(status string)
- func (mock *IConsumerGroupMock) OnHealthUpdateCalls() []struct{ ... }
- func (mock *IConsumerGroupMock) RegisterBatchHandler(ctx context.Context, batchHandler kafka.BatchHandler) error
- func (mock *IConsumerGroupMock) RegisterBatchHandlerCalls() []struct{ ... }
- func (mock *IConsumerGroupMock) RegisterHandler(ctx context.Context, h kafka.Handler) error
- func (mock *IConsumerGroupMock) RegisterHandlerCalls() []struct{ ... }
- func (mock *IConsumerGroupMock) Start() error
- func (mock *IConsumerGroupMock) StartCalls() []struct{}
- func (mock *IConsumerGroupMock) State() kafka.State
- func (mock *IConsumerGroupMock) StateCalls() []struct{}
- func (mock *IConsumerGroupMock) StateWait(state kafka.State)
- func (mock *IConsumerGroupMock) StateWaitCalls() []struct{ ... }
- func (mock *IConsumerGroupMock) Stop() error
- func (mock *IConsumerGroupMock) StopAndWait() error
- func (mock *IConsumerGroupMock) StopAndWaitCalls() []struct{}
- func (mock *IConsumerGroupMock) StopCalls() []struct{}
- type IProducerMock
- func (mock *IProducerMock) AddHeader(key string, value string)
- func (mock *IProducerMock) AddHeaderCalls() []struct{ ... }
- func (mock *IProducerMock) Channels() *kafka.ProducerChannels
- func (mock *IProducerMock) ChannelsCalls() []struct{}
- func (mock *IProducerMock) Checker(ctx context.Context, state *health.CheckState) error
- func (mock *IProducerMock) CheckerCalls() []struct{ ... }
- func (mock *IProducerMock) Close(ctx context.Context) error
- func (mock *IProducerMock) CloseCalls() []struct{ ... }
- func (mock *IProducerMock) Initialise(ctx context.Context) error
- func (mock *IProducerMock) InitialiseCalls() []struct{ ... }
- func (mock *IProducerMock) IsInitialised() bool
- func (mock *IProducerMock) IsInitialisedCalls() []struct{}
- func (mock *IProducerMock) LogErrors(ctx context.Context)
- func (mock *IProducerMock) LogErrorsCalls() []struct{ ... }
- func (mock *IProducerMock) Send(schema *avro.Schema, event interface{}) error
- func (mock *IProducerMock) SendCalls() []struct{ ... }
- type Message
- type MessageConsumer
- type MessageProducer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type IConsumerGroupMock ¶
type IConsumerGroupMock struct { // ChannelsFunc mocks the Channels method. ChannelsFunc func() *kafka.ConsumerGroupChannels // CheckerFunc mocks the Checker method. CheckerFunc func(ctx context.Context, state *health.CheckState) error // CloseFunc mocks the Close method. CloseFunc func(ctx context.Context) error // InitialiseFunc mocks the Initialise method. InitialiseFunc func(ctx context.Context) error // IsInitialisedFunc mocks the IsInitialised method. IsInitialisedFunc func() bool // LogErrorsFunc mocks the LogErrors method. LogErrorsFunc func(ctx context.Context) // OnHealthUpdateFunc mocks the OnHealthUpdate method. OnHealthUpdateFunc func(status string) // RegisterBatchHandlerFunc mocks the RegisterBatchHandler method. RegisterBatchHandlerFunc func(ctx context.Context, batchHandler kafka.BatchHandler) error // RegisterHandlerFunc mocks the RegisterHandler method. RegisterHandlerFunc func(ctx context.Context, h kafka.Handler) error // StartFunc mocks the Start method. StartFunc func() error // StateFunc mocks the State method. StateFunc func() kafka.State // StateWaitFunc mocks the StateWait method. StateWaitFunc func(state kafka.State) // StopFunc mocks the Stop method. StopFunc func() error // StopAndWaitFunc mocks the StopAndWait method. StopAndWaitFunc func() error // contains filtered or unexported fields }
IConsumerGroupMock is a mock implementation of kafka.IConsumerGroup.
func TestSomethingThatUsesIConsumerGroup(t *testing.T) { // make and configure a mocked kafka.IConsumerGroup mockedIConsumerGroup := &IConsumerGroupMock{ ChannelsFunc: func() *kafka.ConsumerGroupChannels { panic("mock out the Channels method") }, CheckerFunc: func(ctx context.Context, state *health.CheckState) error { panic("mock out the Checker method") }, CloseFunc: func(ctx context.Context) error { panic("mock out the Close method") }, InitialiseFunc: func(ctx context.Context) error { panic("mock out the Initialise method") }, IsInitialisedFunc: func() bool { panic("mock out the IsInitialised method") }, LogErrorsFunc: func(ctx context.Context) { panic("mock out the LogErrors method") }, OnHealthUpdateFunc: func(status string) { panic("mock out the OnHealthUpdate method") }, RegisterBatchHandlerFunc: func(ctx context.Context, batchHandler kafka.BatchHandler) error { panic("mock out the RegisterBatchHandler method") }, RegisterHandlerFunc: func(ctx context.Context, h kafka.Handler) error { panic("mock out the RegisterHandler method") }, StartFunc: func() error { panic("mock out the Start method") }, StateFunc: func() kafka.State { panic("mock out the State method") }, StateWaitFunc: func(state kafka.State) { panic("mock out the StateWait method") }, StopFunc: func() error { panic("mock out the Stop method") }, StopAndWaitFunc: func() error { panic("mock out the StopAndWait method") }, } // use mockedIConsumerGroup in code that requires kafka.IConsumerGroup // and then make assertions. }
func (*IConsumerGroupMock) Channels ¶
func (mock *IConsumerGroupMock) Channels() *kafka.ConsumerGroupChannels
Channels calls ChannelsFunc.
func (*IConsumerGroupMock) ChannelsCalls ¶
func (mock *IConsumerGroupMock) ChannelsCalls() []struct { }
ChannelsCalls gets all the calls that were made to Channels. Check the length with:
len(mockedIConsumerGroup.ChannelsCalls())
func (*IConsumerGroupMock) Checker ¶
func (mock *IConsumerGroupMock) Checker(ctx context.Context, state *health.CheckState) error
Checker calls CheckerFunc.
func (*IConsumerGroupMock) CheckerCalls ¶
func (mock *IConsumerGroupMock) CheckerCalls() []struct { Ctx context.Context State *health.CheckState }
CheckerCalls gets all the calls that were made to Checker. Check the length with:
len(mockedIConsumerGroup.CheckerCalls())
func (*IConsumerGroupMock) Close ¶
func (mock *IConsumerGroupMock) Close(ctx context.Context) error
Close calls CloseFunc.
func (*IConsumerGroupMock) CloseCalls ¶
func (mock *IConsumerGroupMock) CloseCalls() []struct { Ctx context.Context }
CloseCalls gets all the calls that were made to Close. Check the length with:
len(mockedIConsumerGroup.CloseCalls())
func (*IConsumerGroupMock) Initialise ¶
func (mock *IConsumerGroupMock) Initialise(ctx context.Context) error
Initialise calls InitialiseFunc.
func (*IConsumerGroupMock) InitialiseCalls ¶
func (mock *IConsumerGroupMock) InitialiseCalls() []struct { Ctx context.Context }
InitialiseCalls gets all the calls that were made to Initialise. Check the length with:
len(mockedIConsumerGroup.InitialiseCalls())
func (*IConsumerGroupMock) IsInitialised ¶
func (mock *IConsumerGroupMock) IsInitialised() bool
IsInitialised calls IsInitialisedFunc.
func (*IConsumerGroupMock) IsInitialisedCalls ¶
func (mock *IConsumerGroupMock) IsInitialisedCalls() []struct { }
IsInitialisedCalls gets all the calls that were made to IsInitialised. Check the length with:
len(mockedIConsumerGroup.IsInitialisedCalls())
func (*IConsumerGroupMock) LogErrors ¶
func (mock *IConsumerGroupMock) LogErrors(ctx context.Context)
LogErrors calls LogErrorsFunc.
func (*IConsumerGroupMock) LogErrorsCalls ¶
func (mock *IConsumerGroupMock) LogErrorsCalls() []struct { Ctx context.Context }
LogErrorsCalls gets all the calls that were made to LogErrors. Check the length with:
len(mockedIConsumerGroup.LogErrorsCalls())
func (*IConsumerGroupMock) OnHealthUpdate ¶
func (mock *IConsumerGroupMock) OnHealthUpdate(status string)
OnHealthUpdate calls OnHealthUpdateFunc.
func (*IConsumerGroupMock) OnHealthUpdateCalls ¶
func (mock *IConsumerGroupMock) OnHealthUpdateCalls() []struct { Status string }
OnHealthUpdateCalls gets all the calls that were made to OnHealthUpdate. Check the length with:
len(mockedIConsumerGroup.OnHealthUpdateCalls())
func (*IConsumerGroupMock) RegisterBatchHandler ¶
func (mock *IConsumerGroupMock) RegisterBatchHandler(ctx context.Context, batchHandler kafka.BatchHandler) error
RegisterBatchHandler calls RegisterBatchHandlerFunc.
func (*IConsumerGroupMock) RegisterBatchHandlerCalls ¶
func (mock *IConsumerGroupMock) RegisterBatchHandlerCalls() []struct { Ctx context.Context BatchHandler kafka.BatchHandler }
RegisterBatchHandlerCalls gets all the calls that were made to RegisterBatchHandler. Check the length with:
len(mockedIConsumerGroup.RegisterBatchHandlerCalls())
func (*IConsumerGroupMock) RegisterHandler ¶
func (mock *IConsumerGroupMock) RegisterHandler(ctx context.Context, h kafka.Handler) error
RegisterHandler calls RegisterHandlerFunc.
func (*IConsumerGroupMock) RegisterHandlerCalls ¶
func (mock *IConsumerGroupMock) RegisterHandlerCalls() []struct { Ctx context.Context H kafka.Handler }
RegisterHandlerCalls gets all the calls that were made to RegisterHandler. Check the length with:
len(mockedIConsumerGroup.RegisterHandlerCalls())
func (*IConsumerGroupMock) Start ¶
func (mock *IConsumerGroupMock) Start() error
Start calls StartFunc.
func (*IConsumerGroupMock) StartCalls ¶
func (mock *IConsumerGroupMock) StartCalls() []struct { }
StartCalls gets all the calls that were made to Start. Check the length with:
len(mockedIConsumerGroup.StartCalls())
func (*IConsumerGroupMock) State ¶
func (mock *IConsumerGroupMock) State() kafka.State
State calls StateFunc.
func (*IConsumerGroupMock) StateCalls ¶
func (mock *IConsumerGroupMock) StateCalls() []struct { }
StateCalls gets all the calls that were made to State. Check the length with:
len(mockedIConsumerGroup.StateCalls())
func (*IConsumerGroupMock) StateWait ¶ added in v3.1.0
func (mock *IConsumerGroupMock) StateWait(state kafka.State)
StateWait calls StateWaitFunc.
func (*IConsumerGroupMock) StateWaitCalls ¶ added in v3.1.0
func (mock *IConsumerGroupMock) StateWaitCalls() []struct { State kafka.State }
StateWaitCalls gets all the calls that were made to StateWait. Check the length with:
len(mockedIConsumerGroup.StateWaitCalls())
func (*IConsumerGroupMock) StopAndWait ¶
func (mock *IConsumerGroupMock) StopAndWait() error
StopAndWait calls StopAndWaitFunc.
func (*IConsumerGroupMock) StopAndWaitCalls ¶
func (mock *IConsumerGroupMock) StopAndWaitCalls() []struct { }
StopAndWaitCalls gets all the calls that were made to StopAndWait. Check the length with:
len(mockedIConsumerGroup.StopAndWaitCalls())
func (*IConsumerGroupMock) StopCalls ¶
func (mock *IConsumerGroupMock) StopCalls() []struct { }
StopCalls gets all the calls that were made to Stop. Check the length with:
len(mockedIConsumerGroup.StopCalls())
type IProducerMock ¶
type IProducerMock struct { // AddHeaderFunc mocks the AddHeader method. AddHeaderFunc func(key string, value string) // ChannelsFunc mocks the Channels method. ChannelsFunc func() *kafka.ProducerChannels // CheckerFunc mocks the Checker method. CheckerFunc func(ctx context.Context, state *health.CheckState) error // CloseFunc mocks the Close method. CloseFunc func(ctx context.Context) error // InitialiseFunc mocks the Initialise method. InitialiseFunc func(ctx context.Context) error // IsInitialisedFunc mocks the IsInitialised method. IsInitialisedFunc func() bool // LogErrorsFunc mocks the LogErrors method. LogErrorsFunc func(ctx context.Context) // SendFunc mocks the Send method. SendFunc func(schema *avro.Schema, event interface{}) error // contains filtered or unexported fields }
IProducerMock is a mock implementation of kafka.IProducer.
func TestSomethingThatUsesIProducer(t *testing.T) { // make and configure a mocked kafka.IProducer mockedIProducer := &IProducerMock{ AddHeaderFunc: func(key string, value string) { panic("mock out the AddHeader method") }, ChannelsFunc: func() *kafka.ProducerChannels { panic("mock out the Channels method") }, CheckerFunc: func(ctx context.Context, state *health.CheckState) error { panic("mock out the Checker method") }, CloseFunc: func(ctx context.Context) error { panic("mock out the Close method") }, InitialiseFunc: func(ctx context.Context) error { panic("mock out the Initialise method") }, IsInitialisedFunc: func() bool { panic("mock out the IsInitialised method") }, LogErrorsFunc: func(ctx context.Context) { panic("mock out the LogErrors method") }, SendFunc: func(schema *avro.Schema, event interface{}) error { panic("mock out the Send method") }, } // use mockedIProducer in code that requires kafka.IProducer // and then make assertions. }
func (*IProducerMock) AddHeader ¶ added in v3.4.0
func (mock *IProducerMock) AddHeader(key string, value string)
AddHeader calls AddHeaderFunc.
func (*IProducerMock) AddHeaderCalls ¶ added in v3.4.0
func (mock *IProducerMock) AddHeaderCalls() []struct { Key string Value string }
AddHeaderCalls gets all the calls that were made to AddHeader. Check the length with:
len(mockedIProducer.AddHeaderCalls())
func (*IProducerMock) Channels ¶
func (mock *IProducerMock) Channels() *kafka.ProducerChannels
Channels calls ChannelsFunc.
func (*IProducerMock) ChannelsCalls ¶
func (mock *IProducerMock) ChannelsCalls() []struct { }
ChannelsCalls gets all the calls that were made to Channels. Check the length with:
len(mockedIProducer.ChannelsCalls())
func (*IProducerMock) Checker ¶
func (mock *IProducerMock) Checker(ctx context.Context, state *health.CheckState) error
Checker calls CheckerFunc.
func (*IProducerMock) CheckerCalls ¶
func (mock *IProducerMock) CheckerCalls() []struct { Ctx context.Context State *health.CheckState }
CheckerCalls gets all the calls that were made to Checker. Check the length with:
len(mockedIProducer.CheckerCalls())
func (*IProducerMock) Close ¶
func (mock *IProducerMock) Close(ctx context.Context) error
Close calls CloseFunc.
func (*IProducerMock) CloseCalls ¶
func (mock *IProducerMock) CloseCalls() []struct { Ctx context.Context }
CloseCalls gets all the calls that were made to Close. Check the length with:
len(mockedIProducer.CloseCalls())
func (*IProducerMock) Initialise ¶
func (mock *IProducerMock) Initialise(ctx context.Context) error
Initialise calls InitialiseFunc.
func (*IProducerMock) InitialiseCalls ¶
func (mock *IProducerMock) InitialiseCalls() []struct { Ctx context.Context }
InitialiseCalls gets all the calls that were made to Initialise. Check the length with:
len(mockedIProducer.InitialiseCalls())
func (*IProducerMock) IsInitialised ¶
func (mock *IProducerMock) IsInitialised() bool
IsInitialised calls IsInitialisedFunc.
func (*IProducerMock) IsInitialisedCalls ¶
func (mock *IProducerMock) IsInitialisedCalls() []struct { }
IsInitialisedCalls gets all the calls that were made to IsInitialised. Check the length with:
len(mockedIProducer.IsInitialisedCalls())
func (*IProducerMock) LogErrors ¶
func (mock *IProducerMock) LogErrors(ctx context.Context)
LogErrors calls LogErrorsFunc.
func (*IProducerMock) LogErrorsCalls ¶
func (mock *IProducerMock) LogErrorsCalls() []struct { Ctx context.Context }
LogErrorsCalls gets all the calls that were made to LogErrors. Check the length with:
len(mockedIProducer.LogErrorsCalls())
func (*IProducerMock) Send ¶
func (mock *IProducerMock) Send(schema *avro.Schema, event interface{}) error
Send calls SendFunc.
func (*IProducerMock) SendCalls ¶
func (mock *IProducerMock) SendCalls() []struct { Schema *avro.Schema Event interface{} }
SendCalls gets all the calls that were made to Send. Check the length with:
len(mockedIProducer.SendCalls())
type Message ¶
type Message struct { *mock.MessageMock // contains filtered or unexported fields }
Message allows a mock message to return the configured data, and capture whether commit has been called.
func NewMessage ¶
NewMessage returns a new mock message containing the given data.
func (Message) IsCommitted ¶
func (internal Message) IsCommitted() bool
IsCommittedFunc returns true if the message offset was committed.
type MessageConsumer ¶
type MessageConsumer struct { *IConsumerGroupMock // contains filtered or unexported fields }
MessageConsumer is an extension of the moq ConsumerGroup, with channels and implementation of required functions to emulate a fully functional Kafka ConsumerGroup
func NewMessageConsumer ¶
func NewMessageConsumer(isInitialisedAtCreationTime bool) *MessageConsumer
NewMessageConsumer creates a testing consumer with new consumerGroupChannels. isInitialisedAtCreationTime determines if the consumer is initialised or not when it's created
func NewMessageConsumerWithChannels ¶
func NewMessageConsumerWithChannels(cgChannels *kafka.ConsumerGroupChannels, isInitialisedAtCreationTime bool) *MessageConsumer
NewMessageConsumerWithChannels creates a testing consumer with the provided consumerGroupChannels isInitialisedAtCreationTime determines if the consumer is initialised or not when it's created
type MessageProducer ¶
type MessageProducer struct { *IProducerMock // contains filtered or unexported fields }
MessageProducer is an extension of the moq Producer, with channels and implementation of required functions to emulate a fully functional kafka Producer.
func NewMessageProducer ¶
func NewMessageProducer(isInitialisedAtCreationTime bool) *MessageProducer
NewMessageProducer creates a testing producer with new producerChannels. isInitialisedAtCreationTime determines if the producer is initialised or not when it's created
func NewMessageProducerWithChannels ¶
func NewMessageProducerWithChannels(pChannels *kafka.ProducerChannels, isInitialisedAtCreationTime bool) *MessageProducer
NewMessageProducerWithChannels creates a testing producer with the provided producerChannels. isInitialisedAtCreationTime determines if the producer is initialised or not when it's created