kafkatest

package
v3.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 13, 2022 License: MIT Imports: 7 Imported by: 3

README

KafkaTest

This package contains mocks intended to be used by users of this library for testing.

Empty mocks

If you require to implement your own mock functionality, you can use the empty mocks, which are created using moq to implement the interfaces kafkatest.ConsumerGroup, kafkatest.Producer and Message

These interfaces expose the same methods as the real Producer and ConsumerGroup structs. You can instantiate the mocks like so:

consumer := kafkatest.ConsumerGroupMock{...}
producer := kafkatest.ProducerMock{...}
message := kafkatest.MessageMock{...}

Functional mocks

The previous mocks have been extended by implementing functionality that emulates a real Producer, Consumer and message; but without communicating with any real Kafka broker. If you require a functional mock to test how you interact with kafka, you can use these mocks (kafkatest.MessageConsumer, kafaktest.MessageProducer and kafkatest.Message) like so:

consumer := kafkatest.NewMessageConsumer(true)
producer := kafkatest.NewMessageProducer(true)
message := kafkatest.NewMessage(data, offset)

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type IConsumerGroupMock

type IConsumerGroupMock struct {
	// ChannelsFunc mocks the Channels method.
	ChannelsFunc func() *kafka.ConsumerGroupChannels

	// CheckerFunc mocks the Checker method.
	CheckerFunc func(ctx context.Context, state *health.CheckState) error

	// CloseFunc mocks the Close method.
	CloseFunc func(ctx context.Context) error

	// InitialiseFunc mocks the Initialise method.
	InitialiseFunc func(ctx context.Context) error

	// IsInitialisedFunc mocks the IsInitialised method.
	IsInitialisedFunc func() bool

	// LogErrorsFunc mocks the LogErrors method.
	LogErrorsFunc func(ctx context.Context)

	// OnHealthUpdateFunc mocks the OnHealthUpdate method.
	OnHealthUpdateFunc func(status string)

	// RegisterBatchHandlerFunc mocks the RegisterBatchHandler method.
	RegisterBatchHandlerFunc func(ctx context.Context, batchHandler kafka.BatchHandler) error

	// RegisterHandlerFunc mocks the RegisterHandler method.
	RegisterHandlerFunc func(ctx context.Context, h kafka.Handler) error

	// StartFunc mocks the Start method.
	StartFunc func() error

	// StateFunc mocks the State method.
	StateFunc func() kafka.State

	// StateWaitFunc mocks the StateWait method.
	StateWaitFunc func(state kafka.State)

	// StopFunc mocks the Stop method.
	StopFunc func() error

	// StopAndWaitFunc mocks the StopAndWait method.
	StopAndWaitFunc func() error
	// contains filtered or unexported fields
}

IConsumerGroupMock is a mock implementation of kafka.IConsumerGroup.

func TestSomethingThatUsesIConsumerGroup(t *testing.T) {

	// make and configure a mocked kafka.IConsumerGroup
	mockedIConsumerGroup := &IConsumerGroupMock{
		ChannelsFunc: func() *kafka.ConsumerGroupChannels {
			panic("mock out the Channels method")
		},
		CheckerFunc: func(ctx context.Context, state *health.CheckState) error {
			panic("mock out the Checker method")
		},
		CloseFunc: func(ctx context.Context) error {
			panic("mock out the Close method")
		},
		InitialiseFunc: func(ctx context.Context) error {
			panic("mock out the Initialise method")
		},
		IsInitialisedFunc: func() bool {
			panic("mock out the IsInitialised method")
		},
		LogErrorsFunc: func(ctx context.Context)  {
			panic("mock out the LogErrors method")
		},
		OnHealthUpdateFunc: func(status string)  {
			panic("mock out the OnHealthUpdate method")
		},
		RegisterBatchHandlerFunc: func(ctx context.Context, batchHandler kafka.BatchHandler) error {
			panic("mock out the RegisterBatchHandler method")
		},
		RegisterHandlerFunc: func(ctx context.Context, h kafka.Handler) error {
			panic("mock out the RegisterHandler method")
		},
		StartFunc: func() error {
			panic("mock out the Start method")
		},
		StateFunc: func() kafka.State {
			panic("mock out the State method")
		},
		StateWaitFunc: func(state kafka.State)  {
			panic("mock out the StateWait method")
		},
		StopFunc: func() error {
			panic("mock out the Stop method")
		},
		StopAndWaitFunc: func() error {
			panic("mock out the StopAndWait method")
		},
	}

	// use mockedIConsumerGroup in code that requires kafka.IConsumerGroup
	// and then make assertions.

}

func (*IConsumerGroupMock) Channels

func (mock *IConsumerGroupMock) Channels() *kafka.ConsumerGroupChannels

Channels calls ChannelsFunc.

func (*IConsumerGroupMock) ChannelsCalls

func (mock *IConsumerGroupMock) ChannelsCalls() []struct {
}

ChannelsCalls gets all the calls that were made to Channels. Check the length with:

len(mockedIConsumerGroup.ChannelsCalls())

func (*IConsumerGroupMock) Checker

func (mock *IConsumerGroupMock) Checker(ctx context.Context, state *health.CheckState) error

Checker calls CheckerFunc.

func (*IConsumerGroupMock) CheckerCalls

func (mock *IConsumerGroupMock) CheckerCalls() []struct {
	Ctx   context.Context
	State *health.CheckState
}

CheckerCalls gets all the calls that were made to Checker. Check the length with:

len(mockedIConsumerGroup.CheckerCalls())

func (*IConsumerGroupMock) Close

func (mock *IConsumerGroupMock) Close(ctx context.Context) error

Close calls CloseFunc.

func (*IConsumerGroupMock) CloseCalls

func (mock *IConsumerGroupMock) CloseCalls() []struct {
	Ctx context.Context
}

CloseCalls gets all the calls that were made to Close. Check the length with:

len(mockedIConsumerGroup.CloseCalls())

func (*IConsumerGroupMock) Initialise

func (mock *IConsumerGroupMock) Initialise(ctx context.Context) error

Initialise calls InitialiseFunc.

func (*IConsumerGroupMock) InitialiseCalls

func (mock *IConsumerGroupMock) InitialiseCalls() []struct {
	Ctx context.Context
}

InitialiseCalls gets all the calls that were made to Initialise. Check the length with:

len(mockedIConsumerGroup.InitialiseCalls())

func (*IConsumerGroupMock) IsInitialised

func (mock *IConsumerGroupMock) IsInitialised() bool

IsInitialised calls IsInitialisedFunc.

func (*IConsumerGroupMock) IsInitialisedCalls

func (mock *IConsumerGroupMock) IsInitialisedCalls() []struct {
}

IsInitialisedCalls gets all the calls that were made to IsInitialised. Check the length with:

len(mockedIConsumerGroup.IsInitialisedCalls())

func (*IConsumerGroupMock) LogErrors

func (mock *IConsumerGroupMock) LogErrors(ctx context.Context)

LogErrors calls LogErrorsFunc.

func (*IConsumerGroupMock) LogErrorsCalls

func (mock *IConsumerGroupMock) LogErrorsCalls() []struct {
	Ctx context.Context
}

LogErrorsCalls gets all the calls that were made to LogErrors. Check the length with:

len(mockedIConsumerGroup.LogErrorsCalls())

func (*IConsumerGroupMock) OnHealthUpdate

func (mock *IConsumerGroupMock) OnHealthUpdate(status string)

OnHealthUpdate calls OnHealthUpdateFunc.

func (*IConsumerGroupMock) OnHealthUpdateCalls

func (mock *IConsumerGroupMock) OnHealthUpdateCalls() []struct {
	Status string
}

OnHealthUpdateCalls gets all the calls that were made to OnHealthUpdate. Check the length with:

len(mockedIConsumerGroup.OnHealthUpdateCalls())

func (*IConsumerGroupMock) RegisterBatchHandler

func (mock *IConsumerGroupMock) RegisterBatchHandler(ctx context.Context, batchHandler kafka.BatchHandler) error

RegisterBatchHandler calls RegisterBatchHandlerFunc.

func (*IConsumerGroupMock) RegisterBatchHandlerCalls

func (mock *IConsumerGroupMock) RegisterBatchHandlerCalls() []struct {
	Ctx          context.Context
	BatchHandler kafka.BatchHandler
}

RegisterBatchHandlerCalls gets all the calls that were made to RegisterBatchHandler. Check the length with:

len(mockedIConsumerGroup.RegisterBatchHandlerCalls())

func (*IConsumerGroupMock) RegisterHandler

func (mock *IConsumerGroupMock) RegisterHandler(ctx context.Context, h kafka.Handler) error

RegisterHandler calls RegisterHandlerFunc.

func (*IConsumerGroupMock) RegisterHandlerCalls

func (mock *IConsumerGroupMock) RegisterHandlerCalls() []struct {
	Ctx context.Context
	H   kafka.Handler
}

RegisterHandlerCalls gets all the calls that were made to RegisterHandler. Check the length with:

len(mockedIConsumerGroup.RegisterHandlerCalls())

func (*IConsumerGroupMock) Start

func (mock *IConsumerGroupMock) Start() error

Start calls StartFunc.

func (*IConsumerGroupMock) StartCalls

func (mock *IConsumerGroupMock) StartCalls() []struct {
}

StartCalls gets all the calls that were made to Start. Check the length with:

len(mockedIConsumerGroup.StartCalls())

func (*IConsumerGroupMock) State

func (mock *IConsumerGroupMock) State() kafka.State

State calls StateFunc.

func (*IConsumerGroupMock) StateCalls

func (mock *IConsumerGroupMock) StateCalls() []struct {
}

StateCalls gets all the calls that were made to State. Check the length with:

len(mockedIConsumerGroup.StateCalls())

func (*IConsumerGroupMock) StateWait added in v3.1.0

func (mock *IConsumerGroupMock) StateWait(state kafka.State)

StateWait calls StateWaitFunc.

func (*IConsumerGroupMock) StateWaitCalls added in v3.1.0

func (mock *IConsumerGroupMock) StateWaitCalls() []struct {
	State kafka.State
}

StateWaitCalls gets all the calls that were made to StateWait. Check the length with:

len(mockedIConsumerGroup.StateWaitCalls())

func (*IConsumerGroupMock) Stop

func (mock *IConsumerGroupMock) Stop() error

Stop calls StopFunc.

func (*IConsumerGroupMock) StopAndWait

func (mock *IConsumerGroupMock) StopAndWait() error

StopAndWait calls StopAndWaitFunc.

func (*IConsumerGroupMock) StopAndWaitCalls

func (mock *IConsumerGroupMock) StopAndWaitCalls() []struct {
}

StopAndWaitCalls gets all the calls that were made to StopAndWait. Check the length with:

len(mockedIConsumerGroup.StopAndWaitCalls())

func (*IConsumerGroupMock) StopCalls

func (mock *IConsumerGroupMock) StopCalls() []struct {
}

StopCalls gets all the calls that were made to Stop. Check the length with:

len(mockedIConsumerGroup.StopCalls())

type IProducerMock

type IProducerMock struct {
	// AddHeaderFunc mocks the AddHeader method.
	AddHeaderFunc func(key string, value string)

	// ChannelsFunc mocks the Channels method.
	ChannelsFunc func() *kafka.ProducerChannels

	// CheckerFunc mocks the Checker method.
	CheckerFunc func(ctx context.Context, state *health.CheckState) error

	// CloseFunc mocks the Close method.
	CloseFunc func(ctx context.Context) error

	// InitialiseFunc mocks the Initialise method.
	InitialiseFunc func(ctx context.Context) error

	// IsInitialisedFunc mocks the IsInitialised method.
	IsInitialisedFunc func() bool

	// LogErrorsFunc mocks the LogErrors method.
	LogErrorsFunc func(ctx context.Context)

	// SendFunc mocks the Send method.
	SendFunc func(schema *avro.Schema, event interface{}) error
	// contains filtered or unexported fields
}

IProducerMock is a mock implementation of kafka.IProducer.

func TestSomethingThatUsesIProducer(t *testing.T) {

	// make and configure a mocked kafka.IProducer
	mockedIProducer := &IProducerMock{
		AddHeaderFunc: func(key string, value string)  {
			panic("mock out the AddHeader method")
		},
		ChannelsFunc: func() *kafka.ProducerChannels {
			panic("mock out the Channels method")
		},
		CheckerFunc: func(ctx context.Context, state *health.CheckState) error {
			panic("mock out the Checker method")
		},
		CloseFunc: func(ctx context.Context) error {
			panic("mock out the Close method")
		},
		InitialiseFunc: func(ctx context.Context) error {
			panic("mock out the Initialise method")
		},
		IsInitialisedFunc: func() bool {
			panic("mock out the IsInitialised method")
		},
		LogErrorsFunc: func(ctx context.Context)  {
			panic("mock out the LogErrors method")
		},
		SendFunc: func(schema *avro.Schema, event interface{}) error {
			panic("mock out the Send method")
		},
	}

	// use mockedIProducer in code that requires kafka.IProducer
	// and then make assertions.

}

func (*IProducerMock) AddHeader added in v3.4.0

func (mock *IProducerMock) AddHeader(key string, value string)

AddHeader calls AddHeaderFunc.

func (*IProducerMock) AddHeaderCalls added in v3.4.0

func (mock *IProducerMock) AddHeaderCalls() []struct {
	Key   string
	Value string
}

AddHeaderCalls gets all the calls that were made to AddHeader. Check the length with:

len(mockedIProducer.AddHeaderCalls())

func (*IProducerMock) Channels

func (mock *IProducerMock) Channels() *kafka.ProducerChannels

Channels calls ChannelsFunc.

func (*IProducerMock) ChannelsCalls

func (mock *IProducerMock) ChannelsCalls() []struct {
}

ChannelsCalls gets all the calls that were made to Channels. Check the length with:

len(mockedIProducer.ChannelsCalls())

func (*IProducerMock) Checker

func (mock *IProducerMock) Checker(ctx context.Context, state *health.CheckState) error

Checker calls CheckerFunc.

func (*IProducerMock) CheckerCalls

func (mock *IProducerMock) CheckerCalls() []struct {
	Ctx   context.Context
	State *health.CheckState
}

CheckerCalls gets all the calls that were made to Checker. Check the length with:

len(mockedIProducer.CheckerCalls())

func (*IProducerMock) Close

func (mock *IProducerMock) Close(ctx context.Context) error

Close calls CloseFunc.

func (*IProducerMock) CloseCalls

func (mock *IProducerMock) CloseCalls() []struct {
	Ctx context.Context
}

CloseCalls gets all the calls that were made to Close. Check the length with:

len(mockedIProducer.CloseCalls())

func (*IProducerMock) Initialise

func (mock *IProducerMock) Initialise(ctx context.Context) error

Initialise calls InitialiseFunc.

func (*IProducerMock) InitialiseCalls

func (mock *IProducerMock) InitialiseCalls() []struct {
	Ctx context.Context
}

InitialiseCalls gets all the calls that were made to Initialise. Check the length with:

len(mockedIProducer.InitialiseCalls())

func (*IProducerMock) IsInitialised

func (mock *IProducerMock) IsInitialised() bool

IsInitialised calls IsInitialisedFunc.

func (*IProducerMock) IsInitialisedCalls

func (mock *IProducerMock) IsInitialisedCalls() []struct {
}

IsInitialisedCalls gets all the calls that were made to IsInitialised. Check the length with:

len(mockedIProducer.IsInitialisedCalls())

func (*IProducerMock) LogErrors

func (mock *IProducerMock) LogErrors(ctx context.Context)

LogErrors calls LogErrorsFunc.

func (*IProducerMock) LogErrorsCalls

func (mock *IProducerMock) LogErrorsCalls() []struct {
	Ctx context.Context
}

LogErrorsCalls gets all the calls that were made to LogErrors. Check the length with:

len(mockedIProducer.LogErrorsCalls())

func (*IProducerMock) Send

func (mock *IProducerMock) Send(schema *avro.Schema, event interface{}) error

Send calls SendFunc.

func (*IProducerMock) SendCalls

func (mock *IProducerMock) SendCalls() []struct {
	Schema *avro.Schema
	Event  interface{}
}

SendCalls gets all the calls that were made to Send. Check the length with:

len(mockedIProducer.SendCalls())

type Message

type Message struct {
	*mock.MessageMock
	// contains filtered or unexported fields
}

Message allows a mock message to return the configured data, and capture whether commit has been called.

func NewMessage

func NewMessage(data []byte, offset int64) *Message

NewMessage returns a new mock message containing the given data.

func (Message) IsCommitted

func (internal Message) IsCommitted() bool

IsCommittedFunc returns true if the message offset was committed.

func (Message) IsMarked

func (internal Message) IsMarked() bool

IsMarked returns true if the message was marked as consumed.

type MessageConsumer

type MessageConsumer struct {
	*IConsumerGroupMock
	// contains filtered or unexported fields
}

MessageConsumer is an extension of the moq ConsumerGroup, with channels and implementation of required functions to emulate a fully functional Kafka ConsumerGroup

func NewMessageConsumer

func NewMessageConsumer(isInitialisedAtCreationTime bool) *MessageConsumer

NewMessageConsumer creates a testing consumer with new consumerGroupChannels. isInitialisedAtCreationTime determines if the consumer is initialised or not when it's created

func NewMessageConsumerWithChannels

func NewMessageConsumerWithChannels(cgChannels *kafka.ConsumerGroupChannels, isInitialisedAtCreationTime bool) *MessageConsumer

NewMessageConsumerWithChannels creates a testing consumer with the provided consumerGroupChannels isInitialisedAtCreationTime determines if the consumer is initialised or not when it's created

type MessageProducer

type MessageProducer struct {
	*IProducerMock
	// contains filtered or unexported fields
}

MessageProducer is an extension of the moq Producer, with channels and implementation of required functions to emulate a fully functional kafka Producer.

func NewMessageProducer

func NewMessageProducer(isInitialisedAtCreationTime bool) *MessageProducer

NewMessageProducer creates a testing producer with new producerChannels. isInitialisedAtCreationTime determines if the producer is initialised or not when it's created

func NewMessageProducerWithChannels

func NewMessageProducerWithChannels(pChannels *kafka.ProducerChannels, isInitialisedAtCreationTime bool) *MessageProducer

NewMessageProducerWithChannels creates a testing producer with the provided producerChannels. isInitialisedAtCreationTime determines if the producer is initialised or not when it's created

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL