Documentation ¶
Index ¶
- Variables
- func NewSaramaConsumerGroupClaimMock(ch chan *sarama.ConsumerMessage) *mock.SaramaConsumerGroupClaimMock
- func NewSaramaConsumerGroupSessionMock(memberID, topic string, numPartitions int) (*mock.SaramaConsumerGroupSessionMock, context.CancelFunc)
- func OptionalHeaders(h Headers) func(o *Options) error
- func SaramaBrokerGenerator(topic string) func(addr string) interfaces.SaramaBroker
- type Consumer
- type ConsumerConfig
- type Headers
- type IConsumerGroupMock
- func (mock *IConsumerGroupMock) Channels() *kafka.ConsumerGroupChannels
- func (mock *IConsumerGroupMock) ChannelsCalls() []struct{}
- func (mock *IConsumerGroupMock) Checker(ctx context.Context, state *health.CheckState) error
- func (mock *IConsumerGroupMock) CheckerCalls() []struct{ ... }
- func (mock *IConsumerGroupMock) Close(ctx context.Context, optFuncs ...kafka.OptFunc) error
- func (mock *IConsumerGroupMock) CloseCalls() []struct{ ... }
- func (mock *IConsumerGroupMock) Initialise(ctx context.Context) error
- func (mock *IConsumerGroupMock) InitialiseCalls() []struct{ ... }
- func (mock *IConsumerGroupMock) IsInitialised() bool
- func (mock *IConsumerGroupMock) IsInitialisedCalls() []struct{}
- func (mock *IConsumerGroupMock) LogErrors(ctx context.Context)
- func (mock *IConsumerGroupMock) LogErrorsCalls() []struct{ ... }
- func (mock *IConsumerGroupMock) OnHealthUpdate(status string)
- func (mock *IConsumerGroupMock) OnHealthUpdateCalls() []struct{ ... }
- func (mock *IConsumerGroupMock) RegisterBatchHandler(ctx context.Context, batchHandler kafka.BatchHandler) error
- func (mock *IConsumerGroupMock) RegisterBatchHandlerCalls() []struct{ ... }
- func (mock *IConsumerGroupMock) RegisterHandler(ctx context.Context, h kafka.Handler) error
- func (mock *IConsumerGroupMock) RegisterHandlerCalls() []struct{ ... }
- func (mock *IConsumerGroupMock) Start() error
- func (mock *IConsumerGroupMock) StartCalls() []struct{}
- func (mock *IConsumerGroupMock) State() kafka.State
- func (mock *IConsumerGroupMock) StateCalls() []struct{}
- func (mock *IConsumerGroupMock) StateWait(state kafka.State)
- func (mock *IConsumerGroupMock) StateWaitCalls() []struct{ ... }
- func (mock *IConsumerGroupMock) Stop() error
- func (mock *IConsumerGroupMock) StopAndWait() error
- func (mock *IConsumerGroupMock) StopAndWaitCalls() []struct{}
- func (mock *IConsumerGroupMock) StopCalls() []struct{}
- type IProducerMock
- func (mock *IProducerMock) AddHeader(key string, value string)
- func (mock *IProducerMock) AddHeaderCalls() []struct{ ... }
- func (mock *IProducerMock) Channels() *kafka.ProducerChannels
- func (mock *IProducerMock) ChannelsCalls() []struct{}
- func (mock *IProducerMock) Checker(ctx context.Context, state *health.CheckState) error
- func (mock *IProducerMock) CheckerCalls() []struct{ ... }
- func (mock *IProducerMock) Close(ctx context.Context) error
- func (mock *IProducerMock) CloseCalls() []struct{ ... }
- func (mock *IProducerMock) Initialise(ctx context.Context) error
- func (mock *IProducerMock) InitialiseCalls() []struct{ ... }
- func (mock *IProducerMock) IsInitialised() bool
- func (mock *IProducerMock) IsInitialisedCalls() []struct{}
- func (mock *IProducerMock) LogErrors(ctx context.Context)
- func (mock *IProducerMock) LogErrorsCalls() []struct{ ... }
- func (mock *IProducerMock) Send(schema *avro.Schema, event interface{}) error
- func (mock *IProducerMock) SendCalls() []struct{ ... }
- type Message
- type Options
- type Producer
- type ProducerConfig
Constants ¶
This section is empty.
Variables ¶
var DefaultConsumerConfig = &ConsumerConfig{ NumPartitions: 30, ChannelBufferSize: 30, InitAtCreation: true, }
var DefaultProducerConfig = &ProducerConfig{ ChannelBufferSize: 30, InitAtCreation: true, }
Functions ¶
func NewSaramaConsumerGroupClaimMock ¶ added in v3.9.0
func NewSaramaConsumerGroupClaimMock(ch chan *sarama.ConsumerMessage) *mock.SaramaConsumerGroupClaimMock
func NewSaramaConsumerGroupSessionMock ¶ added in v3.9.0
func NewSaramaConsumerGroupSessionMock(memberID, topic string, numPartitions int) (*mock.SaramaConsumerGroupSessionMock, context.CancelFunc)
NewSaramaConsumerGroupSessionMock returns a new sarama consuemr group session mock with the provided number of partitions it also returns a func to cancel the context, to be used when a session is ending
func OptionalHeaders ¶ added in v3.5.0
func SaramaBrokerGenerator ¶ added in v3.9.0
func SaramaBrokerGenerator(topic string) func(addr string) interfaces.SaramaBroker
Types ¶
type Consumer ¶ added in v3.9.0
type Consumer struct { Mock *IConsumerGroupMock // Implements all moq functions so users can validate calls // contains filtered or unexported fields }
Consumer is an extension of the moq ConsumerGroup with implementation of required functions and Sarama mocks to emulate a fully functional Kafka ConsumerGroup
func NewConsumer ¶ added in v3.9.0
func NewConsumer(ctx context.Context, cgConfig *kafka.ConsumerGroupConfig, cfg *ConsumerConfig) (*Consumer, error)
NewConsumer creates a testing consumer for testing. It behaves like a real consuemr-group, without network communication
func (*Consumer) QueueMessage ¶ added in v3.9.0
QueueMessage will put the provided message to the testing consumption queue, so that it is consumed when the consumer is ready to do so. This emulates a message being received by a kafka broker, which is kept until a consumer consumes it.
func (*Consumer) RebalanceCluster ¶ added in v3.9.0
RebalanceCluster emulates a serer-side rebalance, which will cancel any active session
type ConsumerConfig ¶ added in v3.9.0
type IConsumerGroupMock ¶
type IConsumerGroupMock struct { // ChannelsFunc mocks the Channels method. ChannelsFunc func() *kafka.ConsumerGroupChannels // CheckerFunc mocks the Checker method. CheckerFunc func(ctx context.Context, state *health.CheckState) error // CloseFunc mocks the Close method. CloseFunc func(ctx context.Context, optFuncs ...kafka.OptFunc) error // InitialiseFunc mocks the Initialise method. InitialiseFunc func(ctx context.Context) error // IsInitialisedFunc mocks the IsInitialised method. IsInitialisedFunc func() bool // LogErrorsFunc mocks the LogErrors method. LogErrorsFunc func(ctx context.Context) // OnHealthUpdateFunc mocks the OnHealthUpdate method. OnHealthUpdateFunc func(status string) // RegisterBatchHandlerFunc mocks the RegisterBatchHandler method. RegisterBatchHandlerFunc func(ctx context.Context, batchHandler kafka.BatchHandler) error // RegisterHandlerFunc mocks the RegisterHandler method. RegisterHandlerFunc func(ctx context.Context, h kafka.Handler) error // StartFunc mocks the Start method. StartFunc func() error // StateFunc mocks the State method. StateFunc func() kafka.State // StateWaitFunc mocks the StateWait method. StateWaitFunc func(state kafka.State) // StopFunc mocks the Stop method. StopFunc func() error // StopAndWaitFunc mocks the StopAndWait method. StopAndWaitFunc func() error // contains filtered or unexported fields }
IConsumerGroupMock is a mock implementation of kafka.IConsumerGroup.
func TestSomethingThatUsesIConsumerGroup(t *testing.T) { // make and configure a mocked kafka.IConsumerGroup mockedIConsumerGroup := &IConsumerGroupMock{ ChannelsFunc: func() *kafka.ConsumerGroupChannels { panic("mock out the Channels method") }, CheckerFunc: func(ctx context.Context, state *health.CheckState) error { panic("mock out the Checker method") }, CloseFunc: func(ctx context.Context, optFuncs ...kafka.OptFunc) error { panic("mock out the Close method") }, InitialiseFunc: func(ctx context.Context) error { panic("mock out the Initialise method") }, IsInitialisedFunc: func() bool { panic("mock out the IsInitialised method") }, LogErrorsFunc: func(ctx context.Context) { panic("mock out the LogErrors method") }, OnHealthUpdateFunc: func(status string) { panic("mock out the OnHealthUpdate method") }, RegisterBatchHandlerFunc: func(ctx context.Context, batchHandler kafka.BatchHandler) error { panic("mock out the RegisterBatchHandler method") }, RegisterHandlerFunc: func(ctx context.Context, h kafka.Handler) error { panic("mock out the RegisterHandler method") }, StartFunc: func() error { panic("mock out the Start method") }, StateFunc: func() kafka.State { panic("mock out the State method") }, StateWaitFunc: func(state kafka.State) { panic("mock out the StateWait method") }, StopFunc: func() error { panic("mock out the Stop method") }, StopAndWaitFunc: func() error { panic("mock out the StopAndWait method") }, } // use mockedIConsumerGroup in code that requires kafka.IConsumerGroup // and then make assertions. }
func (*IConsumerGroupMock) Channels ¶
func (mock *IConsumerGroupMock) Channels() *kafka.ConsumerGroupChannels
Channels calls ChannelsFunc.
func (*IConsumerGroupMock) ChannelsCalls ¶
func (mock *IConsumerGroupMock) ChannelsCalls() []struct { }
ChannelsCalls gets all the calls that were made to Channels. Check the length with:
len(mockedIConsumerGroup.ChannelsCalls())
func (*IConsumerGroupMock) Checker ¶
func (mock *IConsumerGroupMock) Checker(ctx context.Context, state *health.CheckState) error
Checker calls CheckerFunc.
func (*IConsumerGroupMock) CheckerCalls ¶
func (mock *IConsumerGroupMock) CheckerCalls() []struct { Ctx context.Context State *health.CheckState }
CheckerCalls gets all the calls that were made to Checker. Check the length with:
len(mockedIConsumerGroup.CheckerCalls())
func (*IConsumerGroupMock) CloseCalls ¶
func (mock *IConsumerGroupMock) CloseCalls() []struct { Ctx context.Context OptFuncs []kafka.OptFunc }
CloseCalls gets all the calls that were made to Close. Check the length with:
len(mockedIConsumerGroup.CloseCalls())
func (*IConsumerGroupMock) Initialise ¶
func (mock *IConsumerGroupMock) Initialise(ctx context.Context) error
Initialise calls InitialiseFunc.
func (*IConsumerGroupMock) InitialiseCalls ¶
func (mock *IConsumerGroupMock) InitialiseCalls() []struct { Ctx context.Context }
InitialiseCalls gets all the calls that were made to Initialise. Check the length with:
len(mockedIConsumerGroup.InitialiseCalls())
func (*IConsumerGroupMock) IsInitialised ¶
func (mock *IConsumerGroupMock) IsInitialised() bool
IsInitialised calls IsInitialisedFunc.
func (*IConsumerGroupMock) IsInitialisedCalls ¶
func (mock *IConsumerGroupMock) IsInitialisedCalls() []struct { }
IsInitialisedCalls gets all the calls that were made to IsInitialised. Check the length with:
len(mockedIConsumerGroup.IsInitialisedCalls())
func (*IConsumerGroupMock) LogErrors ¶
func (mock *IConsumerGroupMock) LogErrors(ctx context.Context)
LogErrors calls LogErrorsFunc.
func (*IConsumerGroupMock) LogErrorsCalls ¶
func (mock *IConsumerGroupMock) LogErrorsCalls() []struct { Ctx context.Context }
LogErrorsCalls gets all the calls that were made to LogErrors. Check the length with:
len(mockedIConsumerGroup.LogErrorsCalls())
func (*IConsumerGroupMock) OnHealthUpdate ¶
func (mock *IConsumerGroupMock) OnHealthUpdate(status string)
OnHealthUpdate calls OnHealthUpdateFunc.
func (*IConsumerGroupMock) OnHealthUpdateCalls ¶
func (mock *IConsumerGroupMock) OnHealthUpdateCalls() []struct { Status string }
OnHealthUpdateCalls gets all the calls that were made to OnHealthUpdate. Check the length with:
len(mockedIConsumerGroup.OnHealthUpdateCalls())
func (*IConsumerGroupMock) RegisterBatchHandler ¶
func (mock *IConsumerGroupMock) RegisterBatchHandler(ctx context.Context, batchHandler kafka.BatchHandler) error
RegisterBatchHandler calls RegisterBatchHandlerFunc.
func (*IConsumerGroupMock) RegisterBatchHandlerCalls ¶
func (mock *IConsumerGroupMock) RegisterBatchHandlerCalls() []struct { Ctx context.Context BatchHandler kafka.BatchHandler }
RegisterBatchHandlerCalls gets all the calls that were made to RegisterBatchHandler. Check the length with:
len(mockedIConsumerGroup.RegisterBatchHandlerCalls())
func (*IConsumerGroupMock) RegisterHandler ¶
RegisterHandler calls RegisterHandlerFunc.
func (*IConsumerGroupMock) RegisterHandlerCalls ¶
func (mock *IConsumerGroupMock) RegisterHandlerCalls() []struct { Ctx context.Context H kafka.Handler }
RegisterHandlerCalls gets all the calls that were made to RegisterHandler. Check the length with:
len(mockedIConsumerGroup.RegisterHandlerCalls())
func (*IConsumerGroupMock) Start ¶
func (mock *IConsumerGroupMock) Start() error
Start calls StartFunc.
func (*IConsumerGroupMock) StartCalls ¶
func (mock *IConsumerGroupMock) StartCalls() []struct { }
StartCalls gets all the calls that were made to Start. Check the length with:
len(mockedIConsumerGroup.StartCalls())
func (*IConsumerGroupMock) State ¶
func (mock *IConsumerGroupMock) State() kafka.State
State calls StateFunc.
func (*IConsumerGroupMock) StateCalls ¶
func (mock *IConsumerGroupMock) StateCalls() []struct { }
StateCalls gets all the calls that were made to State. Check the length with:
len(mockedIConsumerGroup.StateCalls())
func (*IConsumerGroupMock) StateWait ¶ added in v3.1.0
func (mock *IConsumerGroupMock) StateWait(state kafka.State)
StateWait calls StateWaitFunc.
func (*IConsumerGroupMock) StateWaitCalls ¶ added in v3.1.0
func (mock *IConsumerGroupMock) StateWaitCalls() []struct { State kafka.State }
StateWaitCalls gets all the calls that were made to StateWait. Check the length with:
len(mockedIConsumerGroup.StateWaitCalls())
func (*IConsumerGroupMock) StopAndWait ¶
func (mock *IConsumerGroupMock) StopAndWait() error
StopAndWait calls StopAndWaitFunc.
func (*IConsumerGroupMock) StopAndWaitCalls ¶
func (mock *IConsumerGroupMock) StopAndWaitCalls() []struct { }
StopAndWaitCalls gets all the calls that were made to StopAndWait. Check the length with:
len(mockedIConsumerGroup.StopAndWaitCalls())
func (*IConsumerGroupMock) StopCalls ¶
func (mock *IConsumerGroupMock) StopCalls() []struct { }
StopCalls gets all the calls that were made to Stop. Check the length with:
len(mockedIConsumerGroup.StopCalls())
type IProducerMock ¶
type IProducerMock struct { // AddHeaderFunc mocks the AddHeader method. AddHeaderFunc func(key string, value string) // ChannelsFunc mocks the Channels method. ChannelsFunc func() *kafka.ProducerChannels // CheckerFunc mocks the Checker method. CheckerFunc func(ctx context.Context, state *health.CheckState) error // CloseFunc mocks the Close method. CloseFunc func(ctx context.Context) error // InitialiseFunc mocks the Initialise method. InitialiseFunc func(ctx context.Context) error // IsInitialisedFunc mocks the IsInitialised method. IsInitialisedFunc func() bool // LogErrorsFunc mocks the LogErrors method. LogErrorsFunc func(ctx context.Context) // SendFunc mocks the Send method. SendFunc func(schema *avro.Schema, event interface{}) error // contains filtered or unexported fields }
IProducerMock is a mock implementation of kafka.IProducer.
func TestSomethingThatUsesIProducer(t *testing.T) { // make and configure a mocked kafka.IProducer mockedIProducer := &IProducerMock{ AddHeaderFunc: func(key string, value string) { panic("mock out the AddHeader method") }, ChannelsFunc: func() *kafka.ProducerChannels { panic("mock out the Channels method") }, CheckerFunc: func(ctx context.Context, state *health.CheckState) error { panic("mock out the Checker method") }, CloseFunc: func(ctx context.Context) error { panic("mock out the Close method") }, InitialiseFunc: func(ctx context.Context) error { panic("mock out the Initialise method") }, IsInitialisedFunc: func() bool { panic("mock out the IsInitialised method") }, LogErrorsFunc: func(ctx context.Context) { panic("mock out the LogErrors method") }, SendFunc: func(schema *avro.Schema, event interface{}) error { panic("mock out the Send method") }, } // use mockedIProducer in code that requires kafka.IProducer // and then make assertions. }
func (*IProducerMock) AddHeader ¶ added in v3.4.0
func (mock *IProducerMock) AddHeader(key string, value string)
AddHeader calls AddHeaderFunc.
func (*IProducerMock) AddHeaderCalls ¶ added in v3.4.0
func (mock *IProducerMock) AddHeaderCalls() []struct { Key string Value string }
AddHeaderCalls gets all the calls that were made to AddHeader. Check the length with:
len(mockedIProducer.AddHeaderCalls())
func (*IProducerMock) Channels ¶
func (mock *IProducerMock) Channels() *kafka.ProducerChannels
Channels calls ChannelsFunc.
func (*IProducerMock) ChannelsCalls ¶
func (mock *IProducerMock) ChannelsCalls() []struct { }
ChannelsCalls gets all the calls that were made to Channels. Check the length with:
len(mockedIProducer.ChannelsCalls())
func (*IProducerMock) Checker ¶
func (mock *IProducerMock) Checker(ctx context.Context, state *health.CheckState) error
Checker calls CheckerFunc.
func (*IProducerMock) CheckerCalls ¶
func (mock *IProducerMock) CheckerCalls() []struct { Ctx context.Context State *health.CheckState }
CheckerCalls gets all the calls that were made to Checker. Check the length with:
len(mockedIProducer.CheckerCalls())
func (*IProducerMock) Close ¶
func (mock *IProducerMock) Close(ctx context.Context) error
Close calls CloseFunc.
func (*IProducerMock) CloseCalls ¶
func (mock *IProducerMock) CloseCalls() []struct { Ctx context.Context }
CloseCalls gets all the calls that were made to Close. Check the length with:
len(mockedIProducer.CloseCalls())
func (*IProducerMock) Initialise ¶
func (mock *IProducerMock) Initialise(ctx context.Context) error
Initialise calls InitialiseFunc.
func (*IProducerMock) InitialiseCalls ¶
func (mock *IProducerMock) InitialiseCalls() []struct { Ctx context.Context }
InitialiseCalls gets all the calls that were made to Initialise. Check the length with:
len(mockedIProducer.InitialiseCalls())
func (*IProducerMock) IsInitialised ¶
func (mock *IProducerMock) IsInitialised() bool
IsInitialised calls IsInitialisedFunc.
func (*IProducerMock) IsInitialisedCalls ¶
func (mock *IProducerMock) IsInitialisedCalls() []struct { }
IsInitialisedCalls gets all the calls that were made to IsInitialised. Check the length with:
len(mockedIProducer.IsInitialisedCalls())
func (*IProducerMock) LogErrors ¶
func (mock *IProducerMock) LogErrors(ctx context.Context)
LogErrors calls LogErrorsFunc.
func (*IProducerMock) LogErrorsCalls ¶
func (mock *IProducerMock) LogErrorsCalls() []struct { Ctx context.Context }
LogErrorsCalls gets all the calls that were made to LogErrors. Check the length with:
len(mockedIProducer.LogErrorsCalls())
func (*IProducerMock) Send ¶
func (mock *IProducerMock) Send(schema *avro.Schema, event interface{}) error
Send calls SendFunc.
func (*IProducerMock) SendCalls ¶
func (mock *IProducerMock) SendCalls() []struct { Schema *avro.Schema Event interface{} }
SendCalls gets all the calls that were made to Send. Check the length with:
len(mockedIProducer.SendCalls())
type Message ¶
type Message struct { *mock.MessageMock // contains filtered or unexported fields }
Message allows a mock message to return the configured data, and capture whether commit has been called.
func NewMessage ¶
NewMessage returns a new mock message containing the given data.
func (Message) IsCommitted ¶
func (internal Message) IsCommitted() bool
IsCommittedFunc returns true if the message offset was committed.
type Producer ¶ added in v3.9.0
type Producer struct { Mock *IProducerMock // Implements all moq functions so users can validate calls // contains filtered or unexported fields }
Producer is an extension of the moq Producer with implementation of required functions and Sarama mocks to emulate a fully functional kafka Producer.
func NewProducer ¶ added in v3.9.0
func NewProducer(ctx context.Context, pConfig *kafka.ProducerConfig, cfg *ProducerConfig) (*Producer, error)
NewProducer creates a testing producer for testing. It behaves like a real producer, without network communication
func (*Producer) WaitForMessageSent ¶ added in v3.9.0
func (p *Producer) WaitForMessageSent(schema *avro.Schema, event interface{}, timeout time.Duration) error
WaitForMessageSent waits for a new message being sent to Kafka, with a timeout according to the provided value If a message is sent, it unmarshals it into the provided 'event', using the provided avro 'schema'.
func (*Producer) WaitNoMessageSent ¶ added in v3.9.0
WaitNoMessageSent waits until the timeWindow elapses. If during the time window the closer channel is closed, or a message is sent to the sarama message channel, then an error is returned