Documentation ¶
Overview ¶
This package provides a kafka mock that allows integration testing of goka processors.
Usage ¶
Simply append a tester option when creating the processor for testing. Usually it makes sense to move the processor creation to a function that accepts extra options. That way the test can use exactly the same processor setup.
// creates the processor defining its group graph func createProcessor(brokers []string, options ...goka.ProcessorOption) *goka.Processor{ return goka.NewProcessor(brokers, goka.DefineGroup("group", // some group definitions options..., ), ) }
In the main function we would run the processor like this:
func main(){ proc := createProcessor([]string{"broker1:9092"}) proc.Run(ctx.Background()) }
And in the unit test something like:
func TestProcessor(t *testing.T){ // create tester tester := tester.New(t) // create the processor proc := createProcessor(nil, goka.WithTester(tester)) // .. do extra initialization if necessary go proc.Run(ctx.Background()) // execute the actual test tester.Consume("input-topic", "key", "value") value := tester.TableValue("group-table", "key") if value != expected{ t.Fatalf("got unexpected table value") } }
See https://github.com/lovoo/goka/tree/master/examples/testing for a full example
Index ¶
- type EmitOption
- type MockTopicManager
- func (tm *MockTopicManager) Close() error
- func (tm *MockTopicManager) EnsureStreamExists(topic string, npar int) error
- func (tm *MockTopicManager) EnsureTableExists(topic string, npar int) error
- func (tm *MockTopicManager) EnsureTopicExists(topic string, npar, rfactor int, config map[string]string) error
- func (tm *MockTopicManager) GetOffset(topicName string, partitionID int32, time int64) (int64, error)
- func (tm *MockTopicManager) Partitions(topic string) ([]int32, error)
- type QueueTracker
- func (mt *QueueTracker) Hwm() int64
- func (mt *QueueTracker) Next() (string, interface{}, bool)
- func (mt *QueueTracker) NextOffset() int64
- func (mt *QueueTracker) NextRaw() (string, []byte, bool)
- func (mt *QueueTracker) NextRawWithHeaders() (goka.Headers, string, []byte, bool)
- func (mt *QueueTracker) NextWithHeaders() (goka.Headers, string, interface{}, bool)
- func (mt *QueueTracker) Seek(offset int64)
- type T
- type Tester
- func (tt *Tester) Catchup()
- func (tt *Tester) ClearValues()
- func (tt *Tester) Consume(topic string, key string, msg interface{}, options ...EmitOption)
- func (tt *Tester) ConsumerBuilder() goka.SaramaConsumerBuilder
- func (tt *Tester) ConsumerGroupBuilder() goka.ConsumerGroupBuilder
- func (tt *Tester) EmitterProducerBuilder() goka.ProducerBuilder
- func (tt *Tester) GetTableKeys(table goka.Table) []string
- func (tt *Tester) NewQueueTracker(topic string) *QueueTracker
- func (tt *Tester) ProducerBuilder() goka.ProducerBuilder
- func (tt *Tester) RegisterEmitter(topic goka.Stream, codec goka.Codec)
- func (tt *Tester) RegisterGroupGraph(gg *goka.GroupGraph) string
- func (tt *Tester) RegisterView(table goka.Table, c goka.Codec) string
- func (tt *Tester) SetTableValue(table goka.Table, key string, value interface{})
- func (tt *Tester) StorageBuilder() storage.Builder
- func (tt *Tester) TableValue(table goka.Table, key string) interface{}
- func (tt *Tester) TopicManagerBuilder() goka.TopicManagerBuilder
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type EmitOption ¶ added in v1.1.0
type EmitOption func(*emitOption)
EmitOption defines a configuration option for emitting messages
func WithHeaders ¶ added in v1.1.0
func WithHeaders(headers goka.Headers) EmitOption
WithHeaders sets kafka headers to use when emitting to kafka
type MockTopicManager ¶ added in v1.0.0
type MockTopicManager struct {
// contains filtered or unexported fields
}
MockTopicManager mimicks the behavior of the real topic manager
func NewMockTopicManager ¶ added in v1.0.0
func NewMockTopicManager(tt *Tester, defaultNumPartitions int, defaultReplFactor int) *MockTopicManager
NewMockTopicManager creates a new topic manager mock
func (*MockTopicManager) Close ¶ added in v1.0.0
func (tm *MockTopicManager) Close() error
Close has no action on the mock
func (*MockTopicManager) EnsureStreamExists ¶ added in v1.0.0
func (tm *MockTopicManager) EnsureStreamExists(topic string, npar int) error
EnsureStreamExists ensures a stream exists
func (*MockTopicManager) EnsureTableExists ¶ added in v1.0.0
func (tm *MockTopicManager) EnsureTableExists(topic string, npar int) error
EnsureTableExists ensures a table exists
func (*MockTopicManager) EnsureTopicExists ¶ added in v1.0.0
func (tm *MockTopicManager) EnsureTopicExists(topic string, npar, rfactor int, config map[string]string) error
EnsureTopicExists ensures a topic exists
func (*MockTopicManager) GetOffset ¶ added in v1.0.0
func (tm *MockTopicManager) GetOffset(topicName string, partitionID int32, time int64) (int64, error)
GetOffset returns the offset closest to the passed time (or exactly time, if the offsets are empty)
func (*MockTopicManager) Partitions ¶ added in v1.0.0
func (tm *MockTopicManager) Partitions(topic string) ([]int32, error)
Partitions returns all partitions for a topic
type QueueTracker ¶ added in v0.1.1
type QueueTracker struct {
// contains filtered or unexported fields
}
QueueTracker tracks message offsets for each topic for convenient 'expect message x to be in topic y' in unit tests
func (*QueueTracker) Hwm ¶ added in v0.1.1
func (mt *QueueTracker) Hwm() int64
Hwm returns the tracked queue's hwm value
func (*QueueTracker) Next ¶ added in v0.1.1
func (mt *QueueTracker) Next() (string, interface{}, bool)
Next returns the next message since the last time this function was called (or MoveToEnd) It uses the known codec for the topic to decode the message
func (*QueueTracker) NextOffset ¶ added in v0.1.1
func (mt *QueueTracker) NextOffset() int64
NextOffset returns the tracker's next offset
func (*QueueTracker) NextRaw ¶ added in v0.1.1
func (mt *QueueTracker) NextRaw() (string, []byte, bool)
NextRaw returns the next message similar to Next(), but without the decoding
func (*QueueTracker) NextRawWithHeaders ¶ added in v1.1.0
NextRawWithHeaders returns the next message similar to Next(), but without the decoding
func (*QueueTracker) NextWithHeaders ¶ added in v1.1.0
func (mt *QueueTracker) NextWithHeaders() (goka.Headers, string, interface{}, bool)
NextWithHeaders returns the next message since the last time this function was called (or MoveToEnd). This includes headers It uses the known codec for the topic to decode the message
func (*QueueTracker) Seek ¶ added in v0.1.1
func (mt *QueueTracker) Seek(offset int64)
Seek moves the index pointer of the queue tracker to passed offset
type T ¶
type T interface { Errorf(format string, args ...interface{}) Fatalf(format string, args ...interface{}) Fatal(a ...interface{}) }
T abstracts the interface we assume from the test case. Will most likely be T
type Tester ¶
type Tester struct {
// contains filtered or unexported fields
}
Tester mimicks kafka for complex highlevel testing of single or multiple processors/views/emitters
func (*Tester) Catchup ¶ added in v1.1.0
func (tt *Tester) Catchup()
Catchup waits until all pending messages are consumed by all processors/views. Calling this is very rarely necessary, normal calls to `Consume` include waiting for catchup. One specific use case this is necessary for, is the Visitor-tool of processors.
func (*Tester) ClearValues ¶
func (tt *Tester) ClearValues()
ClearValues clears all table values in all storages
func (*Tester) Consume ¶
func (tt *Tester) Consume(topic string, key string, msg interface{}, options ...EmitOption)
Consume pushes a message for topic/key to be consumed by all processors/views whoever is using it being registered to the Tester
func (*Tester) ConsumerBuilder ¶
func (tt *Tester) ConsumerBuilder() goka.SaramaConsumerBuilder
ConsumerBuilder creates a consumerbuilder that builds consumers for passed clientID
func (*Tester) ConsumerGroupBuilder ¶ added in v1.0.0
func (tt *Tester) ConsumerGroupBuilder() goka.ConsumerGroupBuilder
ConsumerGroupBuilder builds a builder. The builder returns the consumergroup for passed client-ID if it was expected by registering the processor to the Tester
func (*Tester) EmitterProducerBuilder ¶ added in v0.1.2
func (tt *Tester) EmitterProducerBuilder() goka.ProducerBuilder
EmitterProducerBuilder creates a producer builder used for Emitters. Emitters need to flush when emitting messages.
func (*Tester) GetTableKeys ¶ added in v1.1.0
GetTableKeys returns a Table's keys.
func (*Tester) NewQueueTracker ¶ added in v0.1.1
func (tt *Tester) NewQueueTracker(topic string) *QueueTracker
NewQueueTracker creates a new queue tracker
func (*Tester) ProducerBuilder ¶
func (tt *Tester) ProducerBuilder() goka.ProducerBuilder
func (*Tester) RegisterEmitter ¶ added in v0.1.1
RegisterEmitter registers an emitter to be working with the tester.
func (*Tester) RegisterGroupGraph ¶ added in v0.1.1
func (tt *Tester) RegisterGroupGraph(gg *goka.GroupGraph) string
RegisterGroupGraph is called by a processor when the tester is passed via `WithTester(..)`. This will setup the tester with the neccessary consumer structure
func (*Tester) RegisterView ¶ added in v0.1.2
RegisterView registers a new view to the tester
func (*Tester) SetTableValue ¶ added in v0.1.1
SetTableValue sets a value in a processor's or view's table direcly via storage This method blocks until all expected clients are running, so make sure to call it *after* you have started all processors/views, otherwise it'll deadlock.
func (*Tester) StorageBuilder ¶
StorageBuilder builds inmemory storages
func (*Tester) TableValue ¶ added in v0.1.1
TableValue attempts to get a value from any table that is used in the tester
func (*Tester) TopicManagerBuilder ¶
func (tt *Tester) TopicManagerBuilder() goka.TopicManagerBuilder