tester

package
v0.0.0-...-034e452 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 9, 2024 License: BSD-3-Clause Imports: 12 Imported by: 0

Documentation

Overview

This package provides a kafka mock that allows integration testing of goka processors.

Usage

Simply append a tester option when creating the processor for testing. Usually it makes sense to move the processor creation to a function that accepts extra options. That way the test can use exactly the same processor setup.

// creates the processor defining its group graph
func createProcessor(brokers []string, options ...goka.ProcessorOption) *goka.Processor{
  return goka.NewProcessor(brokers, goka.DefineGroup("group",
                          // some group definitions
                          options...,
                          ),
  )
}

In the main function we would run the processor like this:

func main(){
  proc := createProcessor([]string{"broker1:9092"})
  proc.Run(ctx.Background())
}

And in the unit test something like:

func TestProcessor(t *testing.T){
  // create tester
  tester := tester.New(t)
  // create the processor
  proc := createProcessor(nil, goka.WithTester(tester))

  // .. do extra initialization if necessary

  go proc.Run(ctx.Background())

  // execute the actual test
  tester.Consume("input-topic", "key", "value")

  value := tester.TableValue("group-table", "key")
  if value != expected{
    t.Fatalf("got unexpected table value")
  }
}

See https://github.com/moment-technology/goka/tree/master/examples/testing for a full example

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type EmitOption

type EmitOption func(*emitOption)

EmitOption defines a configuration option for emitting messages

func WithHeaders

func WithHeaders(headers goka.Headers) EmitOption

WithHeaders sets kafka headers to use when emitting to kafka

type MockTopicManager

type MockTopicManager struct {
	// contains filtered or unexported fields
}

MockTopicManager mimicks the behavior of the real topic manager

func NewMockTopicManager

func NewMockTopicManager(tt *Tester, defaultNumPartitions int, defaultReplFactor int) *MockTopicManager

NewMockTopicManager creates a new topic manager mock

func (*MockTopicManager) Close

func (tm *MockTopicManager) Close() error

Close has no action on the mock

func (*MockTopicManager) EnsureStreamExists

func (tm *MockTopicManager) EnsureStreamExists(topic string, npar int) error

EnsureStreamExists ensures a stream exists

func (*MockTopicManager) EnsureTableExists

func (tm *MockTopicManager) EnsureTableExists(topic string, npar int) error

EnsureTableExists ensures a table exists

func (*MockTopicManager) EnsureTopicExists

func (tm *MockTopicManager) EnsureTopicExists(topic string, npar, rfactor int, config map[string]string) error

EnsureTopicExists ensures a topic exists

func (*MockTopicManager) GetOffset

func (tm *MockTopicManager) GetOffset(topicName string, partitionID int32, time int64) (int64, error)

GetOffset returns the offset closest to the passed time (or exactly time, if the offsets are empty)

func (*MockTopicManager) Partitions

func (tm *MockTopicManager) Partitions(topic string) ([]int32, error)

Partitions returns all partitions for a topic

type QueueTracker

type QueueTracker struct {
	// contains filtered or unexported fields
}

QueueTracker tracks message offsets for each topic for convenient 'expect message x to be in topic y' in unit tests

func (*QueueTracker) Hwm

func (mt *QueueTracker) Hwm() int64

Hwm returns the tracked queue's hwm value

func (*QueueTracker) Next

func (mt *QueueTracker) Next() (string, interface{}, bool)

Next returns the next message since the last time this function was called (or MoveToEnd) It uses the known codec for the topic to decode the message

func (*QueueTracker) NextOffset

func (mt *QueueTracker) NextOffset() int64

NextOffset returns the tracker's next offset

func (*QueueTracker) NextRaw

func (mt *QueueTracker) NextRaw() (string, []byte, bool)

NextRaw returns the next message similar to Next(), but without the decoding

func (*QueueTracker) NextRawWithHeaders

func (mt *QueueTracker) NextRawWithHeaders() (goka.Headers, string, []byte, bool)

NextRawWithHeaders returns the next message similar to Next(), but without the decoding

func (*QueueTracker) NextWithHeaders

func (mt *QueueTracker) NextWithHeaders() (goka.Headers, string, interface{}, bool)

NextWithHeaders returns the next message since the last time this function was called (or MoveToEnd). This includes headers It uses the known codec for the topic to decode the message

func (*QueueTracker) Seek

func (mt *QueueTracker) Seek(offset int64)

Seek moves the index pointer of the queue tracker to passed offset

type T

type T interface {
	Errorf(format string, args ...interface{})
	Fatalf(format string, args ...interface{})
	Fatal(a ...interface{})
}

T abstracts the interface we assume from the test case. Will most likely be T

type Tester

type Tester struct {
	// contains filtered or unexported fields
}

Tester mimicks kafka for complex highlevel testing of single or multiple processors/views/emitters

func New

func New(t T) *Tester

New creates a new tester instance

func (*Tester) Catchup

func (tt *Tester) Catchup()

Catchup waits until all pending messages are consumed by all processors/views. Calling this is very rarely necessary, normal calls to `Consume` include waiting for catchup. One specific use case this is necessary for, is the Visitor-tool of processors.

func (*Tester) ClearValues

func (tt *Tester) ClearValues()

ClearValues clears all table values in all storages

func (*Tester) Consume

func (tt *Tester) Consume(topic string, key string, msg interface{}, options ...EmitOption)

Consume pushes a message for topic/key to be consumed by all processors/views whoever is using it being registered to the Tester

func (*Tester) ConsumerBuilder

func (tt *Tester) ConsumerBuilder() goka.SaramaConsumerBuilder

ConsumerBuilder creates a consumerbuilder that builds consumers for passed clientID

func (*Tester) ConsumerGroupBuilder

func (tt *Tester) ConsumerGroupBuilder() goka.ConsumerGroupBuilder

ConsumerGroupBuilder builds a builder. The builder returns the consumergroup for passed client-ID if it was expected by registering the processor to the Tester

func (*Tester) EmitterProducerBuilder

func (tt *Tester) EmitterProducerBuilder() goka.ProducerBuilder

EmitterProducerBuilder creates a producer builder used for Emitters. Emitters need to flush when emitting messages.

func (*Tester) GetTableKeys

func (tt *Tester) GetTableKeys(table goka.Table) []string

GetTableKeys returns a Table's keys.

func (*Tester) NewQueueTracker

func (tt *Tester) NewQueueTracker(topic string) *QueueTracker

NewQueueTracker creates a new queue tracker

func (*Tester) ProducerBuilder

func (tt *Tester) ProducerBuilder() goka.ProducerBuilder

func (*Tester) RegisterEmitter

func (tt *Tester) RegisterEmitter(topic goka.Stream, codec goka.Codec)

RegisterEmitter registers an emitter to be working with the tester.

func (*Tester) RegisterGroupGraph

func (tt *Tester) RegisterGroupGraph(gg *goka.GroupGraph) string

RegisterGroupGraph is called by a processor when the tester is passed via `WithTester(..)`. This will setup the tester with the neccessary consumer structure

func (*Tester) RegisterView

func (tt *Tester) RegisterView(table goka.Table, c goka.Codec) string

RegisterView registers a new view to the tester

func (*Tester) SetTableValue

func (tt *Tester) SetTableValue(table goka.Table, key string, value interface{})

SetTableValue sets a value in a processor's or view's table direcly via storage This method blocks until all expected clients are running, so make sure to call it *after* you have started all processors/views, otherwise it'll deadlock.

func (*Tester) StorageBuilder

func (tt *Tester) StorageBuilder() storage.Builder

StorageBuilder builds inmemory storages

func (*Tester) TableValue

func (tt *Tester) TableValue(table goka.Table, key string) interface{}

TableValue attempts to get a value from any table that is used in the tester

func (*Tester) TopicManagerBuilder

func (tt *Tester) TopicManagerBuilder() goka.TopicManagerBuilder

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL