Documentation ¶
Overview ¶
This package provides a kafka mock that allows integration testing of goka processors.
Usage ¶
Simply append a tester option when creating the processor for testing. Usually it makes sense to move the processor creation to a function that accepts extra options. That way the test can use exactly the same processor setup.
// creates the processor defining its group graph func createProcessor(brokers []string, options ...goka.ProcessorOption) *goka.Processor{ return goka.NewProcessor(brokers, goka.DefineGroup("group", // some group definitions options..., ), ) }
In the main function we would run the processor like this:
func main(){ proc := createProcessor([]string{"broker1:9092"}) proc.Run(ctx.Background()) }
And in the unit test something like:
func TestProcessor(t *testing.T){ // create tester tester := tester.New(t) // create the processor proc := createProcessor(nil, goka.WithTester(tester)) // .. do extra initialization if necessary go proc.Run(ctx.Background()) // execute the actual test tester.Consume("input-topic", "key", "value") value := tester.TableValue("group-table", "key") if value != expected{ t.Fatalf("got unexpected table value") } }
See https://github.com/lovoo/goka/tree/master/examples/testing for a full example
Index ¶
- type Codec
- type EmitHandler
- type QueueTracker
- type Signal
- type State
- type T
- type Tester
- func (km *Tester) ClearValues()
- func (km *Tester) Consume(topic string, key string, msg interface{})
- func (km *Tester) ConsumeData(topic string, key string, data []byte)
- func (km *Tester) ConsumerBuilder() kafka.ConsumerBuilder
- func (km *Tester) EmitterProducerBuilder() kafka.ProducerBuilder
- func (km *Tester) NewQueueTracker(topic string) *QueueTracker
- func (km *Tester) ProducerBuilder() kafka.ProducerBuilder
- func (km *Tester) RegisterEmitter(topic goka.Stream, codec goka.Codec)
- func (km *Tester) RegisterGroupGraph(gg *goka.GroupGraph)
- func (km *Tester) RegisterView(table goka.Table, c goka.Codec)
- func (km *Tester) ReplaceEmitHandler(emitter EmitHandler)
- func (km *Tester) SetTableValue(table goka.Table, key string, value interface{})
- func (km *Tester) StorageBuilder() storage.Builder
- func (km *Tester) TableValue(table goka.Table, key string) interface{}
- func (km *Tester) TopicManagerBuilder() kafka.TopicManagerBuilder
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Codec ¶
type Codec interface { Encode(value interface{}) (data []byte, err error) Decode(data []byte) (value interface{}, err error) }
Codec decodes and encodes from and to []byte
type EmitHandler ¶
EmitHandler abstracts a function that allows to overwrite kafkamock's Emit function to simulate producer errors
type QueueTracker ¶ added in v0.1.1
type QueueTracker struct {
// contains filtered or unexported fields
}
QueueTracker tracks message offsets for each topic for convenient 'expect message x to be in topic y' in unit tests
func (*QueueTracker) Hwm ¶ added in v0.1.1
func (mt *QueueTracker) Hwm() int64
Hwm returns the tracked queue's hwm value
func (*QueueTracker) Next ¶ added in v0.1.1
func (mt *QueueTracker) Next() (string, interface{}, bool)
Next returns the next message since the last time this function was called (or MoveToEnd) It uses the known codec for the topic to decode the message
func (*QueueTracker) NextOffset ¶ added in v0.1.1
func (mt *QueueTracker) NextOffset() int64
NextOffset returns the tracker's next offset
func (*QueueTracker) NextRaw ¶ added in v0.1.1
func (mt *QueueTracker) NextRaw() (string, []byte, bool)
NextRaw returns the next message similar to Next(), but without the decoding
func (*QueueTracker) Seek ¶ added in v0.1.1
func (mt *QueueTracker) Seek(offset int64)
Seek moves the index pointer of the queue tracker to passed offset
type Signal ¶ added in v0.1.1
Signal allows synchronization on a state, waiting for that state and checking the current state
func (*Signal) SetState ¶ added in v0.1.1
SetState changes the state of the signal and notifies all goroutines waiting for the new state
func (*Signal) WaitForState ¶ added in v0.1.1
WaitForState returns a channel that closes when the signal reaches passed state.
type T ¶
type T interface { Errorf(format string, args ...interface{}) Fatalf(format string, args ...interface{}) Fatal(a ...interface{}) }
T abstracts the interface we assume from the test case. Will most likely be *testing.T
type Tester ¶
type Tester struct {
// contains filtered or unexported fields
}
Tester allows interacting with a test processor
func (*Tester) ConsumeData ¶
ConsumeData pushes a marshalled byte slice to a topic and a key
func (*Tester) ConsumerBuilder ¶
func (km *Tester) ConsumerBuilder() kafka.ConsumerBuilder
ConsumerBuilder returns the consumer builder when this tester is used as an option to a processor
func (*Tester) EmitterProducerBuilder ¶ added in v0.1.2
func (km *Tester) EmitterProducerBuilder() kafka.ProducerBuilder
EmitterProducerBuilder creates a producer builder used for Emitters. Emitters need to flush when emitting messages.
func (*Tester) NewQueueTracker ¶ added in v0.1.1
func (km *Tester) NewQueueTracker(topic string) *QueueTracker
NewQueueTracker creates a message tracker that starts tracking the messages from the end of the current queues
func (*Tester) ProducerBuilder ¶
func (km *Tester) ProducerBuilder() kafka.ProducerBuilder
ProducerBuilder returns the producer builder when this tester is used as an option to a processor
func (*Tester) RegisterEmitter ¶ added in v0.1.1
RegisterEmitter registers an emitter to be working with the tester.
func (*Tester) RegisterGroupGraph ¶ added in v0.1.1
func (km *Tester) RegisterGroupGraph(gg *goka.GroupGraph)
RegisterGroupGraph is called by a processor when the tester is passed via `WithTester(..)`. This will setup the tester with the neccessary consumer structure
func (*Tester) RegisterView ¶ added in v0.1.2
RegisterView registers a view to be working with the tester.
func (*Tester) ReplaceEmitHandler ¶
func (km *Tester) ReplaceEmitHandler(emitter EmitHandler)
ReplaceEmitHandler replaces the emitter.
func (*Tester) SetTableValue ¶ added in v0.1.1
SetTableValue sets a value in a processor's or view's table direcly via storage
func (*Tester) StorageBuilder ¶
StorageBuilder returns the storage builder when this tester is used as an option to a processor
func (*Tester) TableValue ¶ added in v0.1.1
TableValue attempts to get a value from any table that is used in the kafka mock.
func (*Tester) TopicManagerBuilder ¶
func (km *Tester) TopicManagerBuilder() kafka.TopicManagerBuilder
TopicManagerBuilder returns the topicmanager builder when this tester is used as an option to a processor