Documentation ¶
Overview ¶
This package provides a kafka mock that allows integration testing of goka processors.
Usage ¶
Simply append a tester option when creating the processor for testing. Usually it makes sense to move the processor creation to a function that accepts extra options. That way the test can use exactly the same processor setup.
// creates the processor defining its group graph func createProcessor(brokers []string, options ...goka.ProcessorOption) *goka.Processor{ return goka.NewProcessor(brokers, goka.DefineGroup("group", // some group definitions options..., ), ) }
In the main function we would run the processor like this:
func main(){ proc := createProcessor([]string{"broker1:9092"}) proc.Run(ctx.Background()) }
And in the unit test something like:
func TestProcessor(t *testing.T){ // create tester tester := tester.New(t) // create the processor proc := createProcessor(nil, goka.WithTester(tester)) // .. do extra initialization if necessary go proc.Run(ctx.Background()) // execute the actual test tester.Consume("input-topic", "key", "value") value := tester.TableValue("group-table", "key") if value != expected{ t.Fatalf("got unexpected table value") } }
See https://github.com/moment-technology/goka/tree/master/examples/testing for a full example
Index ¶
- type EmitOption
- type MockTopicManager
- func (tm *MockTopicManager) Close() error
- func (tm *MockTopicManager) EnsureStreamExists(topic string, npar int) error
- func (tm *MockTopicManager) EnsureTableExists(topic string, npar int) error
- func (tm *MockTopicManager) EnsureTopicExists(topic string, npar, rfactor int, config map[string]string) error
- func (tm *MockTopicManager) GetOffset(topicName string, partitionID int32, time int64) (int64, error)
- func (tm *MockTopicManager) Partitions(topic string) ([]int32, error)
- type QueueTracker
- func (mt *QueueTracker) Hwm() int64
- func (mt *QueueTracker) Next() (string, interface{}, bool)
- func (mt *QueueTracker) NextOffset() int64
- func (mt *QueueTracker) NextRaw() (string, []byte, bool)
- func (mt *QueueTracker) NextRawWithHeaders() (goka.Headers, string, []byte, bool)
- func (mt *QueueTracker) NextWithHeaders() (goka.Headers, string, interface{}, bool)
- func (mt *QueueTracker) Seek(offset int64)
- type T
- type Tester
- func (tt *Tester) Catchup()
- func (tt *Tester) ClearValues()
- func (tt *Tester) Consume(topic string, key string, msg interface{}, options ...EmitOption)
- func (tt *Tester) ConsumerBuilder() goka.SaramaConsumerBuilder
- func (tt *Tester) ConsumerGroupBuilder() goka.ConsumerGroupBuilder
- func (tt *Tester) EmitterProducerBuilder() goka.ProducerBuilder
- func (tt *Tester) GetTableKeys(table goka.Table) []string
- func (tt *Tester) NewQueueTracker(topic string) *QueueTracker
- func (tt *Tester) ProducerBuilder() goka.ProducerBuilder
- func (tt *Tester) RegisterEmitter(topic goka.Stream, codec goka.Codec)
- func (tt *Tester) RegisterGroupGraph(gg *goka.GroupGraph) string
- func (tt *Tester) RegisterView(table goka.Table, c goka.Codec) string
- func (tt *Tester) SetTableValue(table goka.Table, key string, value interface{})
- func (tt *Tester) StorageBuilder() storage.Builder
- func (tt *Tester) TableValue(table goka.Table, key string) interface{}
- func (tt *Tester) TopicManagerBuilder() goka.TopicManagerBuilder
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type EmitOption ¶
type EmitOption func(*emitOption)
EmitOption defines a configuration option for emitting messages
func WithHeaders ¶
func WithHeaders(headers goka.Headers) EmitOption
WithHeaders sets kafka headers to use when emitting to kafka
type MockTopicManager ¶
type MockTopicManager struct {
// contains filtered or unexported fields
}
MockTopicManager mimicks the behavior of the real topic manager
func NewMockTopicManager ¶
func NewMockTopicManager(tt *Tester, defaultNumPartitions int, defaultReplFactor int) *MockTopicManager
NewMockTopicManager creates a new topic manager mock
func (*MockTopicManager) Close ¶
func (tm *MockTopicManager) Close() error
Close has no action on the mock
func (*MockTopicManager) EnsureStreamExists ¶
func (tm *MockTopicManager) EnsureStreamExists(topic string, npar int) error
EnsureStreamExists ensures a stream exists
func (*MockTopicManager) EnsureTableExists ¶
func (tm *MockTopicManager) EnsureTableExists(topic string, npar int) error
EnsureTableExists ensures a table exists
func (*MockTopicManager) EnsureTopicExists ¶
func (tm *MockTopicManager) EnsureTopicExists(topic string, npar, rfactor int, config map[string]string) error
EnsureTopicExists ensures a topic exists
func (*MockTopicManager) GetOffset ¶
func (tm *MockTopicManager) GetOffset(topicName string, partitionID int32, time int64) (int64, error)
GetOffset returns the offset closest to the passed time (or exactly time, if the offsets are empty)
func (*MockTopicManager) Partitions ¶
func (tm *MockTopicManager) Partitions(topic string) ([]int32, error)
Partitions returns all partitions for a topic
type QueueTracker ¶
type QueueTracker struct {
// contains filtered or unexported fields
}
QueueTracker tracks message offsets for each topic for convenient 'expect message x to be in topic y' in unit tests
func (*QueueTracker) Hwm ¶
func (mt *QueueTracker) Hwm() int64
Hwm returns the tracked queue's hwm value
func (*QueueTracker) Next ¶
func (mt *QueueTracker) Next() (string, interface{}, bool)
Next returns the next message since the last time this function was called (or MoveToEnd) It uses the known codec for the topic to decode the message
func (*QueueTracker) NextOffset ¶
func (mt *QueueTracker) NextOffset() int64
NextOffset returns the tracker's next offset
func (*QueueTracker) NextRaw ¶
func (mt *QueueTracker) NextRaw() (string, []byte, bool)
NextRaw returns the next message similar to Next(), but without the decoding
func (*QueueTracker) NextRawWithHeaders ¶
NextRawWithHeaders returns the next message similar to Next(), but without the decoding
func (*QueueTracker) NextWithHeaders ¶
func (mt *QueueTracker) NextWithHeaders() (goka.Headers, string, interface{}, bool)
NextWithHeaders returns the next message since the last time this function was called (or MoveToEnd). This includes headers It uses the known codec for the topic to decode the message
func (*QueueTracker) Seek ¶
func (mt *QueueTracker) Seek(offset int64)
Seek moves the index pointer of the queue tracker to passed offset
type T ¶
type T interface { Errorf(format string, args ...interface{}) Fatalf(format string, args ...interface{}) Fatal(a ...interface{}) }
T abstracts the interface we assume from the test case. Will most likely be T
type Tester ¶
type Tester struct {
// contains filtered or unexported fields
}
Tester mimicks kafka for complex highlevel testing of single or multiple processors/views/emitters
func (*Tester) Catchup ¶
func (tt *Tester) Catchup()
Catchup waits until all pending messages are consumed by all processors/views. Calling this is very rarely necessary, normal calls to `Consume` include waiting for catchup. One specific use case this is necessary for, is the Visitor-tool of processors.
func (*Tester) ClearValues ¶
func (tt *Tester) ClearValues()
ClearValues clears all table values in all storages
func (*Tester) Consume ¶
func (tt *Tester) Consume(topic string, key string, msg interface{}, options ...EmitOption)
Consume pushes a message for topic/key to be consumed by all processors/views whoever is using it being registered to the Tester
func (*Tester) ConsumerBuilder ¶
func (tt *Tester) ConsumerBuilder() goka.SaramaConsumerBuilder
ConsumerBuilder creates a consumerbuilder that builds consumers for passed clientID
func (*Tester) ConsumerGroupBuilder ¶
func (tt *Tester) ConsumerGroupBuilder() goka.ConsumerGroupBuilder
ConsumerGroupBuilder builds a builder. The builder returns the consumergroup for passed client-ID if it was expected by registering the processor to the Tester
func (*Tester) EmitterProducerBuilder ¶
func (tt *Tester) EmitterProducerBuilder() goka.ProducerBuilder
EmitterProducerBuilder creates a producer builder used for Emitters. Emitters need to flush when emitting messages.
func (*Tester) GetTableKeys ¶
GetTableKeys returns a Table's keys.
func (*Tester) NewQueueTracker ¶
func (tt *Tester) NewQueueTracker(topic string) *QueueTracker
NewQueueTracker creates a new queue tracker
func (*Tester) ProducerBuilder ¶
func (tt *Tester) ProducerBuilder() goka.ProducerBuilder
func (*Tester) RegisterEmitter ¶
RegisterEmitter registers an emitter to be working with the tester.
func (*Tester) RegisterGroupGraph ¶
func (tt *Tester) RegisterGroupGraph(gg *goka.GroupGraph) string
RegisterGroupGraph is called by a processor when the tester is passed via `WithTester(..)`. This will setup the tester with the neccessary consumer structure
func (*Tester) RegisterView ¶
RegisterView registers a new view to the tester
func (*Tester) SetTableValue ¶
SetTableValue sets a value in a processor's or view's table direcly via storage This method blocks until all expected clients are running, so make sure to call it *after* you have started all processors/views, otherwise it'll deadlock.
func (*Tester) StorageBuilder ¶
StorageBuilder builds inmemory storages
func (*Tester) TableValue ¶
TableValue attempts to get a value from any table that is used in the tester
func (*Tester) TopicManagerBuilder ¶
func (tt *Tester) TopicManagerBuilder() goka.TopicManagerBuilder