Documentation ¶
Overview ¶
This package provides a kafka mock that allows integration testing of goka processors.
Usage ¶
Simply append a tester option when creating the processor for testing. Usually it makes sense to move the processor creation to a function that accepts extra options. That way the test can use exactly the same processor setup.
// creates the processor defining its group graph func createProcessor(brokers []string, options ...goka.ProcessorOption) *goka.Processor{ return goka.NewProcessor(brokers, goka.DefineGroup("group", // some group definitions options..., ), ) }
In the main function we would run the processor like this:
func main(){ proc := createProcessor([]string{"broker1:9092"}) proc.Run(ctx.Background()) }
And in the unit test something like:
func TestProcessor(t *testing.T){ // create tester tester := tester.New(t) // create the processor proc := createProcessor(nil, goka.WithTester(tester)) // .. do extra initialization if necessary go proc.Run(ctx.Background()) // execute the actual test tester.Consume("input-topic", "key", "value") value := tester.TableValue("group-table", "key") if value != expected{ t.Fatalf("got unexpected table value") } }
See https://gitlab.com/signoz-public/goka/tree/master/examples/testing for a full example
Index ¶
- type MockTopicManager
- func (tm *MockTopicManager) Close() error
- func (tm *MockTopicManager) EnsureStreamExists(topic string, npar int) error
- func (tm *MockTopicManager) EnsureTableExists(topic string, npar int) error
- func (tm *MockTopicManager) EnsureTopicExists(topic string, npar, rfactor int, config map[string]string) error
- func (tm *MockTopicManager) GetOffset(topicName string, partitionID int32, time int64) (int64, error)
- func (tm *MockTopicManager) Partitions(topic string) ([]int32, error)
- type QueueTracker
- type T
- type Tester
- func (tt *Tester) ClearValues()
- func (tt *Tester) Consume(topic string, key string, msg interface{})
- func (tt *Tester) ConsumerBuilder() goka.SaramaConsumerBuilder
- func (tt *Tester) ConsumerGroupBuilder() goka.ConsumerGroupBuilder
- func (tt *Tester) EmitterProducerBuilder() goka.ProducerBuilder
- func (tt *Tester) NewQueueTracker(topic string) *QueueTracker
- func (tt *Tester) ProducerBuilder() goka.ProducerBuilder
- func (tt *Tester) RegisterEmitter(topic goka.Stream, codec goka.Codec)
- func (tt *Tester) RegisterGroupGraph(gg *goka.GroupGraph) string
- func (tt *Tester) RegisterView(table goka.Table, c goka.Codec) string
- func (tt *Tester) SetTableValue(table goka.Table, key string, value interface{})
- func (tt *Tester) StorageBuilder() storage.Builder
- func (tt *Tester) TableValue(table goka.Table, key string) interface{}
- func (tt *Tester) TopicManagerBuilder() goka.TopicManagerBuilder
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type MockTopicManager ¶
type MockTopicManager struct { DefaultNumPartitions int DefaultReplicationFactor int // contains filtered or unexported fields }
MockTopicManager mimicks the behavior of the real topic manager
func NewMockTopicManager ¶
func NewMockTopicManager(tt *Tester, defaultNumPartitions int, defaultReplFactor int) *MockTopicManager
NewMockTopicManager creates a new topic manager mock
func (*MockTopicManager) Close ¶
func (tm *MockTopicManager) Close() error
Close has no action on the mock
func (*MockTopicManager) EnsureStreamExists ¶
func (tm *MockTopicManager) EnsureStreamExists(topic string, npar int) error
EnsureStreamExists ensures a stream exists
func (*MockTopicManager) EnsureTableExists ¶
func (tm *MockTopicManager) EnsureTableExists(topic string, npar int) error
EnsureTableExists ensures a table exists
func (*MockTopicManager) EnsureTopicExists ¶
func (tm *MockTopicManager) EnsureTopicExists(topic string, npar, rfactor int, config map[string]string) error
EnsureTopicExists ensures a topic exists
func (*MockTopicManager) GetOffset ¶
func (tm *MockTopicManager) GetOffset(topicName string, partitionID int32, time int64) (int64, error)
GetOffset returns the offset closest to the passed time (or exactly time, if the offsets are empty)
func (*MockTopicManager) Partitions ¶
func (tm *MockTopicManager) Partitions(topic string) ([]int32, error)
Partitions returns all partitions for a topic
type QueueTracker ¶
type QueueTracker struct {
// contains filtered or unexported fields
}
QueueTracker tracks message offsets for each topic for convenient 'expect message x to be in topic y' in unit tests
func (*QueueTracker) Hwm ¶
func (mt *QueueTracker) Hwm() int64
Hwm returns the tracked queue's hwm value
func (*QueueTracker) Next ¶
func (mt *QueueTracker) Next() (string, interface{}, bool)
Next returns the next message since the last time this function was called (or MoveToEnd) It uses the known codec for the topic to decode the message
func (*QueueTracker) NextOffset ¶
func (mt *QueueTracker) NextOffset() int64
NextOffset returns the tracker's next offset
func (*QueueTracker) NextRaw ¶
func (mt *QueueTracker) NextRaw() (string, []byte, bool)
NextRaw returns the next message similar to Next(), but without the decoding
func (*QueueTracker) Seek ¶
func (mt *QueueTracker) Seek(offset int64)
Seek moves the index pointer of the queue tracker to passed offset
type T ¶
type T interface { Errorf(format string, args ...interface{}) Fatalf(format string, args ...interface{}) Fatal(a ...interface{}) }
T abstracts the interface we assume from the test case. Will most likely be T
type Tester ¶
type Tester struct {
// contains filtered or unexported fields
}
Tester mimicks kafka for complex highlevel testing of single or multiple processors/views/emitters
func (*Tester) ClearValues ¶
func (tt *Tester) ClearValues()
ClearValues clears all table values in all storages
func (*Tester) Consume ¶
Consume pushes a message for topic/key to be consumed by all processors/views whoever is using it being registered to the Tester
func (*Tester) ConsumerBuilder ¶
func (tt *Tester) ConsumerBuilder() goka.SaramaConsumerBuilder
ConsumerBuilder creates a consumerbuilder that builds consumers for passed clientID
func (*Tester) ConsumerGroupBuilder ¶
func (tt *Tester) ConsumerGroupBuilder() goka.ConsumerGroupBuilder
ConsumerGroupBuilder builds a builder. The builder returns the consumergroup for passed client-ID if it was expected by registering the processor to the Tester
func (*Tester) EmitterProducerBuilder ¶
func (tt *Tester) EmitterProducerBuilder() goka.ProducerBuilder
EmitterProducerBuilder creates a producer builder used for Emitters. Emitters need to flush when emitting messages.
func (*Tester) NewQueueTracker ¶
func (tt *Tester) NewQueueTracker(topic string) *QueueTracker
NewQueueTracker creates a new queue tracker
func (*Tester) ProducerBuilder ¶
func (tt *Tester) ProducerBuilder() goka.ProducerBuilder
func (*Tester) RegisterEmitter ¶
RegisterEmitter registers an emitter to be working with the tester.
func (*Tester) RegisterGroupGraph ¶
func (tt *Tester) RegisterGroupGraph(gg *goka.GroupGraph) string
RegisterGroupGraph is called by a processor when the tester is passed via `WithTester(..)`. This will setup the tester with the neccessary consumer structure
func (*Tester) RegisterView ¶
RegisterView registers a new view to the tester
func (*Tester) SetTableValue ¶
SetTableValue sets a value in a processor's or view's table direcly via storage This method blocks until all expected clients are running, so make sure to call it *after* you have started all processors/views, otherwise it'll deadlock.
func (*Tester) StorageBuilder ¶
StorageBuilder builds inmemory storages
func (*Tester) TableValue ¶
TableValue attempts to get a value from any table that is used in the tester
func (*Tester) TopicManagerBuilder ¶
func (tt *Tester) TopicManagerBuilder() goka.TopicManagerBuilder