Versions in this module Expand all Collapse all v0 v0.2.0 Dec 1, 2022 v0.1.0 Dec 1, 2022 Changes in this version + type EmitOption func(*emitOption) + func WithHeaders(headers goka.Headers) EmitOption + type MockTopicManager struct + func NewMockTopicManager(tt *Tester, defaultNumPartitions int, defaultReplFactor int) *MockTopicManager + func (tm *MockTopicManager) Close() error + func (tm *MockTopicManager) EnsureStreamExists(topic string, npar int) error + func (tm *MockTopicManager) EnsureTableExists(topic string, npar int) error + func (tm *MockTopicManager) EnsureTopicExists(topic string, npar, rfactor int, config map[string]string) error + func (tm *MockTopicManager) GetOffset(topicName string, partitionID int32, time int64) (int64, error) + func (tm *MockTopicManager) Partitions(topic string) ([]int32, error) + type QueueTracker struct + func (mt *QueueTracker) Hwm() int64 + func (mt *QueueTracker) Next() (string, interface{}, bool) + func (mt *QueueTracker) NextOffset() int64 + func (mt *QueueTracker) NextRaw() (string, []byte, bool) + func (mt *QueueTracker) NextRawWithHeaders() (goka.Headers, string, []byte, bool) + func (mt *QueueTracker) NextWithHeaders() (goka.Headers, string, interface{}, bool) + func (mt *QueueTracker) Seek(offset int64) + type T interface + Errorf func(format string, args ...interface{}) + Fatal func(a ...interface{}) + Fatalf func(format string, args ...interface{}) + type Tester struct + func New(t T) *Tester + func (tt *Tester) Catchup() + func (tt *Tester) ClearValues() + func (tt *Tester) Consume(topic string, key string, msg interface{}, options ...EmitOption) + func (tt *Tester) ConsumerBuilder() goka.SaramaConsumerBuilder + func (tt *Tester) ConsumerGroupBuilder() goka.ConsumerGroupBuilder + func (tt *Tester) EmitterProducerBuilder() goka.ProducerBuilder + func (tt *Tester) GetTableKeys(table goka.Table) []string + func (tt *Tester) NewQueueTracker(topic string) *QueueTracker + func (tt *Tester) ProducerBuilder() goka.ProducerBuilder + func (tt *Tester) RegisterEmitter(topic goka.Stream, codec goka.Codec) + func (tt *Tester) RegisterGroupGraph(gg *goka.GroupGraph) string + func (tt *Tester) RegisterView(table goka.Table, c goka.Codec) string + func (tt *Tester) SetTableValue(table goka.Table, key string, value interface{}) + func (tt *Tester) StorageBuilder() storage.Builder + func (tt *Tester) TableValue(table goka.Table, key string) interface{} + func (tt *Tester) TopicManagerBuilder() goka.TopicManagerBuilder