tester

package
v1.1.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 16, 2024 License: BSD-3-Clause Imports: 12 Imported by: 1

Documentation

Overview

This package provides a kafka mock that allows integration testing of goka processors.

Usage

Simply append a tester option when creating the processor for testing. Usually it makes sense to move the processor creation to a function that accepts extra options. That way the test can use exactly the same processor setup.

// creates the processor defining its group graph
func createProcessor(brokers []string, options ...goka.ProcessorOption) *goka.Processor{
  return goka.NewProcessor(brokers, goka.DefineGroup("group",
                          // some group definitions
                          options...,
                          ),
  )
}

In the main function we would run the processor like this:

func main(){
  proc := createProcessor([]string{"broker1:9092"})
  proc.Run(ctx.Background())
}

And in the unit test something like:

func TestProcessor(t *testing.T){
  // create tester
  tester := tester.New(t)
  // create the processor
  proc := createProcessor(nil, goka.WithTester(tester))

  // .. do extra initialization if necessary

  go proc.Run(ctx.Background())

  // execute the actual test
  tester.Consume("input-topic", "key", "value")

  value := tester.TableValue("group-table", "key")
  if value != expected{
    t.Fatalf("got unexpected table value")
  }
}

See https://github.com/lovoo/goka/tree/master/examples/testing for a full example

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type EmitOption added in v1.1.0

type EmitOption func(*emitOption)

EmitOption defines a configuration option for emitting messages

func WithHeaders added in v1.1.0

func WithHeaders(headers goka.Headers) EmitOption

WithHeaders sets kafka headers to use when emitting to kafka

type MockTopicManager added in v1.0.0

type MockTopicManager struct {
	// contains filtered or unexported fields
}

MockTopicManager mimicks the behavior of the real topic manager

func NewMockTopicManager added in v1.0.0

func NewMockTopicManager(tt *Tester, defaultNumPartitions int, defaultReplFactor int) *MockTopicManager

NewMockTopicManager creates a new topic manager mock

func (*MockTopicManager) Close added in v1.0.0

func (tm *MockTopicManager) Close() error

Close has no action on the mock

func (*MockTopicManager) EnsureStreamExists added in v1.0.0

func (tm *MockTopicManager) EnsureStreamExists(topic string, npar int) error

EnsureStreamExists ensures a stream exists

func (*MockTopicManager) EnsureTableExists added in v1.0.0

func (tm *MockTopicManager) EnsureTableExists(topic string, npar int) error

EnsureTableExists ensures a table exists

func (*MockTopicManager) EnsureTopicExists added in v1.0.0

func (tm *MockTopicManager) EnsureTopicExists(topic string, npar, rfactor int, config map[string]string) error

EnsureTopicExists ensures a topic exists

func (*MockTopicManager) GetOffset added in v1.0.0

func (tm *MockTopicManager) GetOffset(topicName string, partitionID int32, time int64) (int64, error)

GetOffset returns the offset closest to the passed time (or exactly time, if the offsets are empty)

func (*MockTopicManager) Partitions added in v1.0.0

func (tm *MockTopicManager) Partitions(topic string) ([]int32, error)

Partitions returns all partitions for a topic

type QueueTracker added in v0.1.1

type QueueTracker struct {
	// contains filtered or unexported fields
}

QueueTracker tracks message offsets for each topic for convenient 'expect message x to be in topic y' in unit tests

func (*QueueTracker) Hwm added in v0.1.1

func (mt *QueueTracker) Hwm() int64

Hwm returns the tracked queue's hwm value

func (*QueueTracker) Next added in v0.1.1

func (mt *QueueTracker) Next() (string, interface{}, bool)

Next returns the next message since the last time this function was called (or MoveToEnd) It uses the known codec for the topic to decode the message

func (*QueueTracker) NextOffset added in v0.1.1

func (mt *QueueTracker) NextOffset() int64

NextOffset returns the tracker's next offset

func (*QueueTracker) NextRaw added in v0.1.1

func (mt *QueueTracker) NextRaw() (string, []byte, bool)

NextRaw returns the next message similar to Next(), but without the decoding

func (*QueueTracker) NextRawWithHeaders added in v1.1.0

func (mt *QueueTracker) NextRawWithHeaders() (goka.Headers, string, []byte, bool)

NextRawWithHeaders returns the next message similar to Next(), but without the decoding

func (*QueueTracker) NextWithHeaders added in v1.1.0

func (mt *QueueTracker) NextWithHeaders() (goka.Headers, string, interface{}, bool)

NextWithHeaders returns the next message since the last time this function was called (or MoveToEnd). This includes headers It uses the known codec for the topic to decode the message

func (*QueueTracker) Seek added in v0.1.1

func (mt *QueueTracker) Seek(offset int64)

Seek moves the index pointer of the queue tracker to passed offset

type T

type T interface {
	Errorf(format string, args ...interface{})
	Fatalf(format string, args ...interface{})
	Fatal(a ...interface{})
}

T abstracts the interface we assume from the test case. Will most likely be T

type Tester

type Tester struct {
	// contains filtered or unexported fields
}

Tester mimicks kafka for complex highlevel testing of single or multiple processors/views/emitters

func New

func New(t T) *Tester

New creates a new tester instance

func (*Tester) Catchup added in v1.1.0

func (tt *Tester) Catchup()

Catchup waits until all pending messages are consumed by all processors/views. Calling this is very rarely necessary, normal calls to `Consume` include waiting for catchup. One specific use case this is necessary for, is the Visitor-tool of processors.

func (*Tester) ClearValues

func (tt *Tester) ClearValues()

ClearValues clears all table values in all storages

func (*Tester) Consume

func (tt *Tester) Consume(topic string, key string, msg interface{}, options ...EmitOption)

Consume pushes a message for topic/key to be consumed by all processors/views whoever is using it being registered to the Tester

func (*Tester) ConsumerBuilder

func (tt *Tester) ConsumerBuilder() goka.SaramaConsumerBuilder

ConsumerBuilder creates a consumerbuilder that builds consumers for passed clientID

func (*Tester) ConsumerGroupBuilder added in v1.0.0

func (tt *Tester) ConsumerGroupBuilder() goka.ConsumerGroupBuilder

ConsumerGroupBuilder builds a builder. The builder returns the consumergroup for passed client-ID if it was expected by registering the processor to the Tester

func (*Tester) EmitterProducerBuilder added in v0.1.2

func (tt *Tester) EmitterProducerBuilder() goka.ProducerBuilder

EmitterProducerBuilder creates a producer builder used for Emitters. Emitters need to flush when emitting messages.

func (*Tester) GetTableKeys added in v1.1.0

func (tt *Tester) GetTableKeys(table goka.Table) []string

GetTableKeys returns a Table's keys.

func (*Tester) NewQueueTracker added in v0.1.1

func (tt *Tester) NewQueueTracker(topic string) *QueueTracker

NewQueueTracker creates a new queue tracker

func (*Tester) ProducerBuilder

func (tt *Tester) ProducerBuilder() goka.ProducerBuilder

func (*Tester) RegisterEmitter added in v0.1.1

func (tt *Tester) RegisterEmitter(topic goka.Stream, codec goka.Codec)

RegisterEmitter registers an emitter to be working with the tester.

func (*Tester) RegisterGroupGraph added in v0.1.1

func (tt *Tester) RegisterGroupGraph(gg *goka.GroupGraph) string

RegisterGroupGraph is called by a processor when the tester is passed via `WithTester(..)`. This will setup the tester with the neccessary consumer structure

func (*Tester) RegisterView added in v0.1.2

func (tt *Tester) RegisterView(table goka.Table, c goka.Codec) string

RegisterView registers a new view to the tester

func (*Tester) SetTableValue added in v0.1.1

func (tt *Tester) SetTableValue(table goka.Table, key string, value interface{})

SetTableValue sets a value in a processor's or view's table direcly via storage This method blocks until all expected clients are running, so make sure to call it *after* you have started all processors/views, otherwise it'll deadlock.

func (*Tester) StorageBuilder

func (tt *Tester) StorageBuilder() storage.Builder

StorageBuilder builds inmemory storages

func (*Tester) TableValue added in v0.1.1

func (tt *Tester) TableValue(table goka.Table, key string) interface{}

TableValue attempts to get a value from any table that is used in the tester

func (*Tester) TopicManagerBuilder

func (tt *Tester) TopicManagerBuilder() goka.TopicManagerBuilder

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL