goka

package module
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 4, 2020 License: BSD-3-Clause Imports: 21 Imported by: 216

README

Goka License Build Status GoDoc

Goka is a compact yet powerful distributed stream processing library for Apache Kafka written in Go. Goka aims to reduce the complexity of building highly scalable and highly available microservices.

Goka extends the concept of Kafka consumer groups by binding a state table to them and persisting them in Kafka. Goka provides sane defaults and a pluggable architecture.

Features

  • Message Input and Output

    Goka handles all the message input and output for you. You only have to provide one or more callback functions that handle messages from any of the Kafka topics you are interested in. You only ever have to deal with deserialized messages.

  • Scaling

    Goka automatically distributes the processing and state across multiple instances of a service. This enables effortless scaling when the load increases.

  • Fault Tolerance

    In case of a failure, Goka will redistribute the failed instance's workload and state across the remaining healthy instances. All state is safely stored in Kafka and messages delivered with at-least-once semantics.

  • Built-in Monitoring and Introspection

    Goka provides a web interface for monitoring performance and querying values in the state.

  • Modularity

    Goka fosters a pluggable architecture which enables you to replace for example the storage layer or the Kafka communication layer.

Documentation

This README provides a brief, high level overview of the ideas behind Goka. A more detailed introduction of the project can be found in this blog post.

Package API documentation is available at GoDoc and the Wiki provides several tips for configuring, extending, and deploying Goka applications.

Installation

You can install Goka by running the following command:

$ go get -u github.com/lovoo/goka

Configuration

Goka relies on Sarama to perform the actual communication with Kafka, which offers many configuration settings. The config is documented here.

In most cases, you need to modify the config, e.g. to set the Kafka Version.

cfg := goka.DefaultConfig()
cfg.Version = sarama.V2_4_0_0
goka.ReplaceGlobalConfig(cfg)

This makes all goka components use the updated config.

If you do need specific configuration for different components, you need to pass customized builders to the component's constructor, e.g.

cfg := goka.DefaultConfig()
// modify the config with component-specific settings


// use the config by creating a builder which allows to override global config
goka.NewProcessor(// ...,
	goka.WithConsumerGroupBuilder(
		goka.ConsumerGroupBuilderWithConfig(cfg),
	),
	// ...
)

Concepts

Goka relies on Kafka for message passing, fault-tolerant state storage and workload partitioning.

  • Emitters deliver key-value messages into Kafka. As an example, an emitter could be a database handler emitting the state changes into Kafka for other interested applications to consume.

  • Processor is a set of callback functions that consume and perform state transformations upon delivery of these emitted messages. Processor groups are formed of one or more instances of a processor. Goka distributes the partitions of the input topics across all processor instances in a processor group. This enables effortless scaling and fault-tolerance. If a processor instance fails, its partitions and state are reassigned to the remaining healthy members of the processor group. Processors can also emit further messages into Kafka.

  • Group table is the state of a processor group. It is a partitioned key-value table stored in Kafka that belongs to a single processor group. If a processor instance fails, the remaining instances will take over the group table partitions of the failed instance recovering them from Kafka.

  • Views are local caches of a complete group table. Views provide read-only access to the group tables and can be used to provide external services for example through a gRPC interface.

  • Local storage keeps a local copy of the group table partitions to speedup recovery and reduce memory utilization. By default, the local storage uses LevelDB, but in-memory map and Redis-based storage are also available.

Get Started

An example Goka application could look like the following. An emitter emits a single message with key "some-key" and value "some-value" into the "example-stream" topic. A processor processes the "example-stream" topic counting the number of messages delivered for "some-key". The counter is persisted in the "example-group-table" topic. To locally start a dockerized Zookeeper and Kafka instances, execute make start with the Makefile in the examples folder.

package main

import (
	"context"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/lovoo/goka"
	"github.com/lovoo/goka/codec"
)

var (
	brokers             = []string{"localhost:9092"}
	topic   goka.Stream = "example-stream"
	group   goka.Group  = "example-group"
)

// Emit messages forever every second
func runEmitter() {
	emitter, err := goka.NewEmitter(brokers, topic, new(codec.String))
	if err != nil {
		log.Fatalf("error creating emitter: %v", err)
	}
	defer emitter.Finish()
	for {
		time.Sleep(1 * time.Second)
		err = emitter.EmitSync("some-key", "some-value")
		if err != nil {
			log.Fatalf("error emitting message: %v", err)
		}
	}
}

// process messages until ctrl-c is pressed
func runProcessor() {
	// process callback is invoked for each message delivered from
	// "example-stream" topic.
	cb := func(ctx goka.Context, msg interface{}) {
		var counter int64
		// ctx.Value() gets from the group table the value that is stored for
		// the message's key.
		if val := ctx.Value(); val != nil {
			counter = val.(int64)
		}
		counter++
		// SetValue stores the incremented counter in the group table for in
		// the message's key.
		ctx.SetValue(counter)
		log.Printf("key = %s, counter = %v, msg = %v", ctx.Key(), counter, msg)
	}

	// Define a new processor group. The group defines all inputs, outputs, and
	// serialization formats. The group-table topic is "example-group-table".
	g := goka.DefineGroup(group,
		goka.Input(topic, new(codec.String), cb),
		goka.Persist(new(codec.Int64)),
	)

	p, err := goka.NewProcessor(brokers, g)
	if err != nil {
		log.Fatalf("error creating processor: %v", err)
	}
	ctx, cancel := context.WithCancel(context.Background())
	done := make(chan bool)
	go func() {
		defer close(done)
		if err = p.Run(ctx); err != nil {
			log.Fatalf("error running processor: %v", err)
		} else {
			log.Printf("Processor shutdown cleanly")
		}
	}()

	wait := make(chan os.Signal, 1)
	signal.Notify(wait, syscall.SIGINT, syscall.SIGTERM)
	<-wait   // wait for SIGINT/SIGTERM
	cancel() // gracefully stop processor
	<-done
}

func main() {
	go runEmitter() // emits one message and stops
	runProcessor()  // press ctrl-c to stop
}

A very similar example is also in 1-simplest. Just run go run examples/1-simplest/main.go.

Note that tables have to be configured in Kafka with log compaction. For details check the Wiki.

How to contribute

Contributions are always welcome. Please fork the repo, create a pull request against master, and be sure tests pass. See the GitHub Flow for details.

Documentation

Overview

Package goka is a stateful stream processing library for Apache Kafka (version 0.9+) that eases the development of microservices. Goka extends the concept of consumer group with a group table, which represents the state of the group. A microservice modifies and serves the content of a table employing two complementary object types: processors and views.

Processors

A processor is a set of callback functions that modify the group table when messages arrive and may also emit messages into other topics. Messages as well as rows in the group table are key-value pairs. Callbacks receive the arriving message and the row addressed by the message's key.

In Kafka, keys are used to partition topics. A goka processor consumes from a set of co-partitioned topics (topics with the same number of partitions and the same key range). A group topic keeps track of the group table updates, allowing for recovery and rebalancing of processors: When multiple processor instances start in the same consumer group, the instances split the co-partitioned input topics and load the respective group table partitions from the group topic. A local disk storage minimizes recovery time by caching partitions of group table.

Views

A view is a materialized (ie, persistent) cache of a group table. A view subscribes for the updates of all partitions of a group table and keeps local disk storage in sync with the group topic. With a view, one can easily serve up-to-date content of the group table via, for example, gRPC.

Package goka is a generated GoMock package.

Package goka is a generated GoMock package.

Package goka is a generated GoMock package.

Index

Examples

Constants

This section is empty.

Variables

View Source
var CopartitioningStrategy = new(copartitioningStrategy)

CopartitioningStrategy is the rebalance strategy necessary to guarantee the copartitioning when consuming multiple input topics with multiple processor instances

View Source
var (
	// ErrEmitterAlreadyClosed is returned when Emit is called after the emitter has been finished.
	ErrEmitterAlreadyClosed error = errors.New("emitter already closed")
)

Functions

func DefaultConfig added in v1.0.0

func DefaultConfig() *sarama.Config

DefaultConfig creates a new config used by goka per default Use it to modify and pass to `goka.ReplaceGlobalConifg(...)` to modify goka's global config

func DefaultConsumerGroupBuilder added in v1.0.0

func DefaultConsumerGroupBuilder(brokers []string, group, clientID string) (sarama.ConsumerGroup, error)

DefaultConsumerGroupBuilder creates a Kafka consumer using the Sarama library.

func DefaultHasher

func DefaultHasher() func() hash.Hash32

DefaultHasher returns an FNV hasher builder to assign keys to partitions.

func DefaultProcessorStoragePath

func DefaultProcessorStoragePath(group Group) string

DefaultProcessorStoragePath is the default path where processor state will be stored.

func DefaultRebalance added in v0.1.3

func DefaultRebalance(a Assignment)

DefaultRebalance is the default callback when a new partition assignment is received. DefaultRebalance can be used in the function passed to WithRebalanceCallback.

func DefaultSaramaConsumerBuilder added in v1.0.0

func DefaultSaramaConsumerBuilder(brokers []string, clientID string) (sarama.Consumer, error)

DefaultSaramaConsumerBuilder creates a Kafka consumer using the Sarama library.

func DefaultUpdate

func DefaultUpdate(s storage.Storage, partition int32, key string, value []byte) error

DefaultUpdate is the default callback used to update the local storage with from the table topic in Kafka. It is called for every message received during recovery of processors and during the normal operation of views. DefaultUpdate can be used in the function passed to WithUpdateCallback and WithViewCallback.

func DefaultViewStoragePath

func DefaultViewStoragePath() string

DefaultViewStoragePath returns the default path where view state will be stored.

func NewMockController added in v1.0.0

func NewMockController(t gomock.TestReporter) *gomock.Controller

NewMockController returns a *gomock.Controller using a wrapped testing.T (or whatever) which panics on a Fatalf. This is necessary when using a mock in kafkamock. Otherwise it will freeze on an unexpected call.

func ReplaceGlobalConfig added in v1.0.0

func ReplaceGlobalConfig(config *sarama.Config)

ReplaceGlobalConfig registeres a standard config used during building if no other config is specified

Types

type Assignment added in v1.0.0

type Assignment map[int32]int64

Assignment represents a partition:offset assignment for the current connection

type Backoff added in v1.0.0

type Backoff interface {
	Duration() time.Duration
	Reset()
}

Backoff is used for adding backoff capabilities to the restarting of failing partition tables.

func DefaultBackoffBuilder added in v1.0.0

func DefaultBackoffBuilder() (Backoff, error)

DefaultBackoffBuilder returnes a simpleBackoff with 10 second steps

func NewSimpleBackoff added in v1.0.0

func NewSimpleBackoff(step time.Duration) Backoff

NewSimpleBackoff returns a simple backoff waiting the specified duration longer each iteration until reset.

type BackoffBuilder added in v1.0.0

type BackoffBuilder func() (Backoff, error)

BackoffBuilder creates a backoff

type Broker added in v1.0.0

type Broker interface {
	Addr() string
	Connected() (bool, error)
	CreateTopics(request *sarama.CreateTopicsRequest) (*sarama.CreateTopicsResponse, error)
	Open(conf *sarama.Config) error
}

Broker is an interface for the sarama broker

type Codec

type Codec interface {
	Encode(value interface{}) (data []byte, err error)
	Decode(data []byte) (value interface{}, err error)
}

Codec decodes and encodes from and to []byte

type ConsumerGroupBuilder added in v1.0.0

type ConsumerGroupBuilder func(brokers []string, group, clientID string) (sarama.ConsumerGroup, error)

ConsumerGroupBuilder creates a `sarama.ConsumerGroup`

func ConsumerGroupBuilderWithConfig added in v1.0.0

func ConsumerGroupBuilderWithConfig(config *sarama.Config) ConsumerGroupBuilder

ConsumerGroupBuilderWithConfig creates a sarama consumergroup using passed config

type Context

type Context interface {
	// Topic returns the topic of input message.
	Topic() Stream

	// Key returns the key of the input message.
	Key() string

	// Partition returns the partition of the input message.
	Partition() int32

	// Offset returns the offset of the input message.
	Offset() int64

	// Value returns the value of the key in the group table.
	//
	// This method might panic to initiate an immediate shutdown of the processor
	// to maintain data integrity. Do not recover from that panic or
	// the processor might deadlock.
	Value() interface{}

	// Headers returns the headers of the input message
	Headers() map[string][]byte

	// SetValue updates the value of the key in the group table.
	// It stores the value in the local cache and sends the
	// update to the Kafka topic representing the group table.
	//
	// This method might panic to initiate an immediate shutdown of the processor
	// to maintain data integrity. Do not recover from that panic or
	// the processor might deadlock.
	SetValue(value interface{})

	// Delete deletes a value from the group table. IMPORTANT: this deletes the
	// value associated with the key from both the local cache and the persisted
	// table in Kafka.
	//
	// This method might panic to initiate an immediate shutdown of the processor
	// to maintain data integrity. Do not recover from that panic or
	// the processor might deadlock.
	Delete()

	// Timestamp returns the timestamp of the input message. If the timestamp is
	// invalid, a zero time will be returned.
	Timestamp() time.Time

	// Join returns the value of key in the copartitioned table.
	//
	// This method might panic to initiate an immediate shutdown of the processor
	// to maintain data integrity. Do not recover from that panic or
	// the processor might deadlock.
	Join(topic Table) interface{}

	// Lookup returns the value of key in the view of table.
	//
	// This method might panic to initiate an immediate shutdown of the processor
	// to maintain data integrity. Do not recover from that panic or
	// the processor might deadlock.
	Lookup(topic Table, key string) interface{}

	// Emit asynchronously writes a message into a topic.
	//
	// This method might panic to initiate an immediate shutdown of the processor
	// to maintain data integrity. Do not recover from that panic or
	// the processor might deadlock.
	Emit(topic Stream, key string, value interface{})

	// Loopback asynchronously sends a message to another key of the group
	// table. Value passed to loopback is encoded via the codec given in the
	// Loop subscription.
	//
	// This method might panic to initiate an immediate shutdown of the processor
	// to maintain data integrity. Do not recover from that panic or
	// the processor might deadlock.
	Loopback(key string, value interface{})

	// Fail stops execution and shuts down the processor
	// The callback is stopped immediately by panicking. Do not recover from that panic or
	// the processor might deadlock.
	Fail(err error)

	// Context returns the underlying context used to start the processor or a
	// subcontext.
	Context() context.Context
}

Context provides access to the processor's table and emit capabilities to arbitrary topics in kafka. Upon arrival of a message from subscribed topics, the respective ConsumeCallback is invoked with a context object along with the input message. The context is only valid within the callback, do not store it or pass it to other goroutines.

Error handling

Most methods of the context can fail due to different reasons, which are handled in different ways: Synchronous errors like * wrong codec for topic (a message cannot be marshalled or unmarshalled) * Emit to a topic without the Output definition in the group graph * Value/SetValue without defining Persist in the group graph * Join/Lookup without the definition in the group graph etc.. will result in a panic to stop the callback immediately and shutdown the processor. This is necessary to preserve integrity of the processor and avoid further actions. Do not recover from that panic, otherwise the goroutine will deadlock.

Retrying synchronous errors must be implemented by restarting the processor. If errors must be tolerated (which is not advisable because they're usually persistent), provide fail-tolerant versions of the producer, storage or codec as needed.

Asynchronous errors can occur when the callback has been finished, but e.g. sending a batched message to kafka fails due to connection errors or leader election in the cluster. Those errors still shutdown the processor but will not result in a panic in the callback.

type Edge

type Edge interface {
	String() string
	Topic() string
	Codec() Codec
}

Edge represents a topic in Kafka and the corresponding codec to encode and decode the messages of that topic.

func Input

func Input(topic Stream, c Codec, cb ProcessCallback) Edge

Input represents an edge of an input stream topic. The edge specifies the topic name, its codec and the ProcessorCallback used to process it. The topic has to be copartitioned with any other input stream of the group and with the group table. The group starts reading the topic from the newest offset.

func Inputs

func Inputs(topics Streams, c Codec, cb ProcessCallback) Edge

Inputs creates edges of multiple input streams sharing the same codec and callback.

func Join

func Join(topic Table, c Codec) Edge

Join represents an edge of a copartitioned, log-compacted table topic. The edge specifies the topic name and the codec of the messages of the topic. The group starts reading the topic from the oldest offset. The processing of input streams is blocked until all partitions of the table are recovered.

func Lookup

func Lookup(topic Table, c Codec) Edge

Lookup represents an edge of a non-copartitioned, log-compacted table topic. The edge specifies the topic name and the codec of the messages of the topic. The group starts reading the topic from the oldest offset. The processing of input streams is blocked until the table is fully recovered.

func Loop

func Loop(c Codec, cb ProcessCallback) Edge

Loop represents the edge of the loopback topic of the group. The edge specifies the codec of the messages in the topic and ProcesCallback to process the messages of the topic. Context.Loopback() is used to write messages into this topic from any callback of the group.

func Output

func Output(topic Stream, c Codec) Edge

Output represents an edge of an output stream topic. The edge specifies the topic name and the codec of the messages of the topic. Context.Emit() only emits messages into Output edges defined in the group graph. The topic does not have to be copartitioned with the input streams.

func Persist

func Persist(c Codec) Edge

Persist represents the edge of the group table, which is log-compacted and copartitioned with the input streams. Without Persist, calls to ctx.Value or ctx.SetValue in the consume callback will fail and lead to shutdown of the processor.

This edge specifies the codec of the messages in the topic, ie, the codec of the values of the table. The processing of input streams is blocked until all partitions of the group table are recovered.

The topic name is derived from the group name by appending "-table".

type Edges

type Edges []Edge

Edges is a slice of edge objects.

func (Edges) Topics

func (e Edges) Topics() []string

Topics returns the names of the topics of the edges.

type Emitter

type Emitter struct {
	// contains filtered or unexported fields
}

Emitter emits messages into a specific Kafka topic, first encoding the message with the given codec.

func NewEmitter

func NewEmitter(brokers []string, topic Stream, codec Codec, options ...EmitterOption) (*Emitter, error)

NewEmitter creates a new emitter using passed brokers, topic, codec and possibly options.

func (*Emitter) Emit

func (e *Emitter) Emit(key string, msg interface{}) (*Promise, error)

Emit sends a message for passed key using the emitter's codec.

func (*Emitter) EmitSync

func (e *Emitter) EmitSync(key string, msg interface{}) error

EmitSync sends a message to passed topic and key.

func (*Emitter) EmitWithHeaders added in v1.0.1

func (e *Emitter) EmitWithHeaders(key string, msg interface{}, headers map[string][]byte) (*Promise, error)

EmitWithHeaders sends a message with the given headers for the passed key using the emitter's codec.

func (*Emitter) Finish

func (e *Emitter) Finish() error

Finish waits until the emitter is finished producing all pending messages.

type EmitterOption

type EmitterOption func(*eoptions, Stream, Codec)

EmitterOption defines a configuration option to be used when creating an emitter.

func WithEmitterClientID

func WithEmitterClientID(clientID string) EmitterOption

WithEmitterClientID defines the client ID used to identify with kafka.

func WithEmitterHasher

func WithEmitterHasher(hasher func() hash.Hash32) EmitterOption

WithEmitterHasher sets the hash function that assigns keys to partitions.

func WithEmitterLogger

func WithEmitterLogger(log logger.Logger) EmitterOption

WithEmitterLogger sets the logger the emitter should use. By default, emitters use the standard library logger.

func WithEmitterProducerBuilder

func WithEmitterProducerBuilder(pb ProducerBuilder) EmitterOption

WithEmitterProducerBuilder replaces the default producer builder.

func WithEmitterTester added in v0.1.1

func WithEmitterTester(t Tester) EmitterOption

WithEmitterTester configures the emitter to use passed tester. This is used for component tests

func WithEmitterTopicManagerBuilder

func WithEmitterTopicManagerBuilder(tmb TopicManagerBuilder) EmitterOption

WithEmitterTopicManagerBuilder replaces the default topic manager builder.

type Getter

type Getter func(string) (interface{}, error)

Getter functions return a value for a key or an error. If no value exists for the key, nil is returned without errors.

type Group

type Group string

Group is the name of a consumer group in Kafka and represents a processor group in Goka. A processor group may have a group table and a group loopback stream. By default, the group table is named <group>-table and the loopback stream <group>-loop.

type GroupGraph

type GroupGraph struct {
	// contains filtered or unexported fields
}

GroupGraph is the specification of a processor group. It contains all input, output, and any other topic from which and into which the processor group may consume or produce events. Each of these links to Kafka is called Edge.

func DefineGroup

func DefineGroup(group Group, edges ...Edge) *GroupGraph

DefineGroup creates a group graph with a given group name and a list of edges.

func (*GroupGraph) Group

func (gg *GroupGraph) Group() Group

Group returns the group name.

func (*GroupGraph) GroupTable

func (gg *GroupGraph) GroupTable() Edge

GroupTable returns the group table edge of the group.

func (*GroupGraph) InputStreams

func (gg *GroupGraph) InputStreams() Edges

InputStreams returns all input stream edges of the group.

func (*GroupGraph) JointTables

func (gg *GroupGraph) JointTables() Edges

JointTables retuns all joint table edges of the group.

func (*GroupGraph) LookupTables

func (gg *GroupGraph) LookupTables() Edges

LookupTables retuns all lookup table edges of the group.

func (*GroupGraph) LoopStream

func (gg *GroupGraph) LoopStream() Edge

LoopStream returns the loopback edge of the group.

func (*GroupGraph) OutputStreams

func (gg *GroupGraph) OutputStreams() Edges

OutputStreams returns the output stream edges of the group.

func (*GroupGraph) Validate

func (gg *GroupGraph) Validate() error

Validate validates the group graph and returns an error if invalid. Main validation checks are: - at most one loopback stream edge is allowed - at most one group table edge is allowed - at least one input stream is required - table and loopback topics cannot be used in any other edge.

type InputStats

type InputStats struct {
	Count      uint
	Bytes      int
	OffsetLag  int64
	LastOffset int64
	Delay      time.Duration
}

InputStats represents the number of messages and the number of bytes consumed from a stream or table topic since the process started.

type Iterator

type Iterator interface {
	// Next advances the iterator to the next KV-pair. Err should be called
	// after Next returns false to check whether the iteration finished
	// from exhaustion or was aborted due to an error.
	Next() bool
	// Err returns the error that stopped the iteration if any.
	Err() error
	// Return the key of the current item
	Key() string
	// Return the value of the current item
	// This value is already decoded with the view's codec (or nil, if it's nil)
	Value() (interface{}, error)
	// Release the iterator. After release, the iterator is not usable anymore
	Release()
	// Seek moves the iterator to the begining of a key-value pair sequence that
	// is greater or equal to the given key. It returns whether at least one of
	// such key-value pairs exist. Next must be called after seeking to access
	// the first pair.
	Seek(key string) bool
}

Iterator allows one to iterate over the keys of a view.

type MockAutoConsumer added in v1.0.0

type MockAutoConsumer struct {
	// contains filtered or unexported fields
}

MockAutoConsumer implements sarama's Consumer interface for testing purposes. Before you can start consuming from this consumer, you have to register topic/partitions using ExpectConsumePartition, and set expectations on them.

func NewMockAutoConsumer added in v1.0.0

func NewMockAutoConsumer(t *testing.T, config *sarama.Config) *MockAutoConsumer

NewMockAutoConsumer returns a new mock Consumer instance. The t argument should be the *testing.T instance of your test method. An error will be written to it if an expectation is violated. The config argument can be set to nil.

func (*MockAutoConsumer) Close added in v1.0.0

func (c *MockAutoConsumer) Close() error

Close implements the Close method from the sarama.Consumer interface. It will close all registered PartitionConsumer instances.

func (*MockAutoConsumer) ConsumePartition added in v1.0.0

func (c *MockAutoConsumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error)

ConsumePartition implements the ConsumePartition method from the sarama.Consumer interface. Before you can start consuming a partition, you have to set expectations on it using ExpectConsumePartition. You can only consume a partition once per consumer.

func (*MockAutoConsumer) ExpectConsumePartition added in v1.0.0

func (c *MockAutoConsumer) ExpectConsumePartition(topic string, partition int32, offset int64) *MockAutoPartitionConsumer

ExpectConsumePartition will register a topic/partition, so you can set expectations on it. The registered PartitionConsumer will be returned, so you can set expectations on it using method chaining. Once a topic/partition is registered, you are expected to start consuming it using ConsumePartition. If that doesn't happen, an error will be written to the error reporter once the mock consumer is closed. It will also expect that the

func (*MockAutoConsumer) HighWaterMarks added in v1.0.0

func (c *MockAutoConsumer) HighWaterMarks() map[string]map[int32]int64

HighWaterMarks returns a map of high watermarks for each topic/partition

func (*MockAutoConsumer) Partitions added in v1.0.0

func (c *MockAutoConsumer) Partitions(topic string) ([]int32, error)

Partitions returns the list of parititons for the given topic, as registered with SetTopicMetadata

func (*MockAutoConsumer) SetTopicMetadata added in v1.0.0

func (c *MockAutoConsumer) SetTopicMetadata(metadata map[string][]int32)

SetTopicMetadata sets the clusters topic/partition metadata, which will be returned by Topics() and Partitions().

func (*MockAutoConsumer) Topics added in v1.0.0

func (c *MockAutoConsumer) Topics() ([]string, error)

Topics returns a list of topics, as registered with SetTopicMetadata

type MockAutoPartitionConsumer added in v1.0.0

type MockAutoPartitionConsumer struct {
	// contains filtered or unexported fields
}

MockAutoPartitionConsumer implements sarama's PartitionConsumer interface for testing purposes. It is returned by the mock Consumers ConsumePartitionMethod, but only if it is registered first using the Consumer's ExpectConsumePartition method. Before consuming the Errors and Messages channel, you should specify what values will be provided on these channels using YieldMessage and YieldError.

func (*MockAutoPartitionConsumer) AsyncClose added in v1.0.0

func (pc *MockAutoPartitionConsumer) AsyncClose()

AsyncClose implements the AsyncClose method from the sarama.PartitionConsumer interface.

func (*MockAutoPartitionConsumer) Close added in v1.0.0

func (pc *MockAutoPartitionConsumer) Close() error

Close implements the Close method from the sarama.PartitionConsumer interface. It will verify whether the partition consumer was actually started.

func (*MockAutoPartitionConsumer) Errors added in v1.0.0

func (pc *MockAutoPartitionConsumer) Errors() <-chan *sarama.ConsumerError

Errors implements the Errors method from the sarama.PartitionConsumer interface.

func (*MockAutoPartitionConsumer) ExpectErrorsDrainedOnClose added in v1.0.0

func (pc *MockAutoPartitionConsumer) ExpectErrorsDrainedOnClose()

ExpectErrorsDrainedOnClose sets an expectation on the partition consumer that the errors channel will be fully drained when Close is called. If this expectation is not met, an error is reported to the error reporter.

func (*MockAutoPartitionConsumer) ExpectMessagesDrainedOnClose added in v1.0.0

func (pc *MockAutoPartitionConsumer) ExpectMessagesDrainedOnClose()

ExpectMessagesDrainedOnClose sets an expectation on the partition consumer that the messages channel will be fully drained when Close is called. If this expectation is not met, an error is reported to the error reporter.

func (*MockAutoPartitionConsumer) HighWaterMarkOffset added in v1.0.0

func (pc *MockAutoPartitionConsumer) HighWaterMarkOffset() int64

HighWaterMarkOffset returns the highwatermark for the partition

func (*MockAutoPartitionConsumer) Messages added in v1.0.0

func (pc *MockAutoPartitionConsumer) Messages() <-chan *sarama.ConsumerMessage

Messages implements the Messages method from the sarama.PartitionConsumer interface.

func (*MockAutoPartitionConsumer) YieldError added in v1.0.0

func (pc *MockAutoPartitionConsumer) YieldError(err error)

YieldError will yield an error on the Errors channel of this partition consumer when it is consumed. By default, the mock consumer will not verify whether this error was consumed from the Errors channel, because there are legitimate reasons for this not to happen. You can call ExpectErrorsDrainedOnClose so it will verify that the channel is empty on close.

func (*MockAutoPartitionConsumer) YieldMessage added in v1.0.0

func (pc *MockAutoPartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage)

YieldMessage will yield a messages Messages channel of this partition consumer when it is consumed. By default, the mock consumer will not verify whether this message was consumed from the Messages channel, because there are legitimate reasons forthis not to happen. ou can call ExpectMessagesDrainedOnClose so it will verify that the channel is empty on close.

type MockBroker added in v1.0.0

type MockBroker struct {
	// contains filtered or unexported fields
}

MockBroker is a mock of Broker interface

func NewMockBroker added in v1.0.0

func NewMockBroker(ctrl *gomock.Controller) *MockBroker

NewMockBroker creates a new mock instance

func (*MockBroker) Addr added in v1.0.0

func (m *MockBroker) Addr() string

Addr mocks base method

func (*MockBroker) Connected added in v1.0.0

func (m *MockBroker) Connected() (bool, error)

Connected mocks base method

func (*MockBroker) CreateTopics added in v1.0.0

CreateTopics mocks base method

func (*MockBroker) EXPECT added in v1.0.0

func (m *MockBroker) EXPECT() *MockBrokerMockRecorder

EXPECT returns an object that allows the caller to indicate expected use

func (*MockBroker) Open added in v1.0.0

func (m *MockBroker) Open(arg0 *sarama.Config) error

Open mocks base method

type MockBrokerMockRecorder added in v1.0.0

type MockBrokerMockRecorder struct {
	// contains filtered or unexported fields
}

MockBrokerMockRecorder is the mock recorder for MockBroker

func (*MockBrokerMockRecorder) Addr added in v1.0.0

func (mr *MockBrokerMockRecorder) Addr() *gomock.Call

Addr indicates an expected call of Addr

func (*MockBrokerMockRecorder) Connected added in v1.0.0

func (mr *MockBrokerMockRecorder) Connected() *gomock.Call

Connected indicates an expected call of Connected

func (*MockBrokerMockRecorder) CreateTopics added in v1.0.0

func (mr *MockBrokerMockRecorder) CreateTopics(arg0 interface{}) *gomock.Call

CreateTopics indicates an expected call of CreateTopics

func (*MockBrokerMockRecorder) Open added in v1.0.0

func (mr *MockBrokerMockRecorder) Open(arg0 interface{}) *gomock.Call

Open indicates an expected call of Open

type MockClient added in v1.0.0

type MockClient struct {
	// contains filtered or unexported fields
}

MockClient is a mock of Client interface

func NewMockClient added in v1.0.0

func NewMockClient(ctrl *gomock.Controller) *MockClient

NewMockClient creates a new mock instance

func (*MockClient) Brokers added in v1.0.0

func (m *MockClient) Brokers() []*sarama.Broker

Brokers mocks base method

func (*MockClient) Close added in v1.0.0

func (m *MockClient) Close() error

Close mocks base method

func (*MockClient) Closed added in v1.0.0

func (m *MockClient) Closed() bool

Closed mocks base method

func (*MockClient) Config added in v1.0.0

func (m *MockClient) Config() *sarama.Config

Config mocks base method

func (*MockClient) Controller added in v1.0.0

func (m *MockClient) Controller() (*sarama.Broker, error)

Controller mocks base method

func (*MockClient) Coordinator added in v1.0.0

func (m *MockClient) Coordinator(arg0 string) (*sarama.Broker, error)

Coordinator mocks base method

func (*MockClient) EXPECT added in v1.0.0

func (m *MockClient) EXPECT() *MockClientMockRecorder

EXPECT returns an object that allows the caller to indicate expected use

func (*MockClient) GetOffset added in v1.0.0

func (m *MockClient) GetOffset(arg0 string, arg1 int32, arg2 int64) (int64, error)

GetOffset mocks base method

func (*MockClient) InSyncReplicas added in v1.0.0

func (m *MockClient) InSyncReplicas(arg0 string, arg1 int32) ([]int32, error)

InSyncReplicas mocks base method

func (*MockClient) InitProducerID added in v1.0.0

func (m *MockClient) InitProducerID() (*sarama.InitProducerIDResponse, error)

InitProducerID mocks base method

func (*MockClient) Leader added in v1.0.0

func (m *MockClient) Leader(arg0 string, arg1 int32) (*sarama.Broker, error)

Leader mocks base method

func (*MockClient) OfflineReplicas added in v1.0.0

func (m *MockClient) OfflineReplicas(arg0 string, arg1 int32) ([]int32, error)

OfflineReplicas mocks base method

func (*MockClient) Partitions added in v1.0.0

func (m *MockClient) Partitions(arg0 string) ([]int32, error)

Partitions mocks base method

func (*MockClient) RefreshController added in v1.0.0

func (m *MockClient) RefreshController() (*sarama.Broker, error)

RefreshController mocks base method

func (*MockClient) RefreshCoordinator added in v1.0.0

func (m *MockClient) RefreshCoordinator(arg0 string) error

RefreshCoordinator mocks base method

func (*MockClient) RefreshMetadata added in v1.0.0

func (m *MockClient) RefreshMetadata(arg0 ...string) error

RefreshMetadata mocks base method

func (*MockClient) Replicas added in v1.0.0

func (m *MockClient) Replicas(arg0 string, arg1 int32) ([]int32, error)

Replicas mocks base method

func (*MockClient) Topics added in v1.0.0

func (m *MockClient) Topics() ([]string, error)

Topics mocks base method

func (*MockClient) WritablePartitions added in v1.0.0

func (m *MockClient) WritablePartitions(arg0 string) ([]int32, error)

WritablePartitions mocks base method

type MockClientMockRecorder added in v1.0.0

type MockClientMockRecorder struct {
	// contains filtered or unexported fields
}

MockClientMockRecorder is the mock recorder for MockClient

func (*MockClientMockRecorder) Brokers added in v1.0.0

func (mr *MockClientMockRecorder) Brokers() *gomock.Call

Brokers indicates an expected call of Brokers

func (*MockClientMockRecorder) Close added in v1.0.0

func (mr *MockClientMockRecorder) Close() *gomock.Call

Close indicates an expected call of Close

func (*MockClientMockRecorder) Closed added in v1.0.0

func (mr *MockClientMockRecorder) Closed() *gomock.Call

Closed indicates an expected call of Closed

func (*MockClientMockRecorder) Config added in v1.0.0

func (mr *MockClientMockRecorder) Config() *gomock.Call

Config indicates an expected call of Config

func (*MockClientMockRecorder) Controller added in v1.0.0

func (mr *MockClientMockRecorder) Controller() *gomock.Call

Controller indicates an expected call of Controller

func (*MockClientMockRecorder) Coordinator added in v1.0.0

func (mr *MockClientMockRecorder) Coordinator(arg0 interface{}) *gomock.Call

Coordinator indicates an expected call of Coordinator

func (*MockClientMockRecorder) GetOffset added in v1.0.0

func (mr *MockClientMockRecorder) GetOffset(arg0, arg1, arg2 interface{}) *gomock.Call

GetOffset indicates an expected call of GetOffset

func (*MockClientMockRecorder) InSyncReplicas added in v1.0.0

func (mr *MockClientMockRecorder) InSyncReplicas(arg0, arg1 interface{}) *gomock.Call

InSyncReplicas indicates an expected call of InSyncReplicas

func (*MockClientMockRecorder) InitProducerID added in v1.0.0

func (mr *MockClientMockRecorder) InitProducerID() *gomock.Call

InitProducerID indicates an expected call of InitProducerID

func (*MockClientMockRecorder) Leader added in v1.0.0

func (mr *MockClientMockRecorder) Leader(arg0, arg1 interface{}) *gomock.Call

Leader indicates an expected call of Leader

func (*MockClientMockRecorder) OfflineReplicas added in v1.0.0

func (mr *MockClientMockRecorder) OfflineReplicas(arg0, arg1 interface{}) *gomock.Call

OfflineReplicas indicates an expected call of OfflineReplicas

func (*MockClientMockRecorder) Partitions added in v1.0.0

func (mr *MockClientMockRecorder) Partitions(arg0 interface{}) *gomock.Call

Partitions indicates an expected call of Partitions

func (*MockClientMockRecorder) RefreshController added in v1.0.0

func (mr *MockClientMockRecorder) RefreshController() *gomock.Call

RefreshController indicates an expected call of RefreshController

func (*MockClientMockRecorder) RefreshCoordinator added in v1.0.0

func (mr *MockClientMockRecorder) RefreshCoordinator(arg0 interface{}) *gomock.Call

RefreshCoordinator indicates an expected call of RefreshCoordinator

func (*MockClientMockRecorder) RefreshMetadata added in v1.0.0

func (mr *MockClientMockRecorder) RefreshMetadata(arg0 ...interface{}) *gomock.Call

RefreshMetadata indicates an expected call of RefreshMetadata

func (*MockClientMockRecorder) Replicas added in v1.0.0

func (mr *MockClientMockRecorder) Replicas(arg0, arg1 interface{}) *gomock.Call

Replicas indicates an expected call of Replicas

func (*MockClientMockRecorder) Topics added in v1.0.0

func (mr *MockClientMockRecorder) Topics() *gomock.Call

Topics indicates an expected call of Topics

func (*MockClientMockRecorder) WritablePartitions added in v1.0.0

func (mr *MockClientMockRecorder) WritablePartitions(arg0 interface{}) *gomock.Call

WritablePartitions indicates an expected call of WritablePartitions

type MockConsumerGroup added in v1.0.0

type MockConsumerGroup struct {
	// contains filtered or unexported fields
}

MockConsumerGroup mocks the consumergroup

func NewMockConsumerGroup added in v1.0.0

func NewMockConsumerGroup(t *testing.T) *MockConsumerGroup

NewMockConsumerGroup creates a new consumer group

func (*MockConsumerGroup) Close added in v1.0.0

func (cg *MockConsumerGroup) Close() error

Close closes the consumergroup

func (*MockConsumerGroup) Consume added in v1.0.0

func (cg *MockConsumerGroup) Consume(ctx context.Context, topics []string, handler sarama.ConsumerGroupHandler) error

Consume starts consuming from the consumergroup

func (*MockConsumerGroup) Errors added in v1.0.0

func (cg *MockConsumerGroup) Errors() <-chan error

Errors returns the errors channel

func (*MockConsumerGroup) FailOnConsume added in v1.0.0

func (cg *MockConsumerGroup) FailOnConsume(err error)

FailOnConsume marks the consumer to fail on consume

func (*MockConsumerGroup) SendError added in v1.0.0

func (cg *MockConsumerGroup) SendError(err error)

SendError sends an error the consumergroup

func (*MockConsumerGroup) SendMessage added in v1.0.0

func (cg *MockConsumerGroup) SendMessage(message *sarama.ConsumerMessage) <-chan struct{}

SendMessage sends a message to the consumergroup returns a channel that will be closed when the message has been committed by the group

func (*MockConsumerGroup) SendMessageWait added in v1.0.0

func (cg *MockConsumerGroup) SendMessageWait(message *sarama.ConsumerMessage)

SendMessageWait sends a message to the consumergroup waiting for the message for being committed

type MockConsumerGroupClaim added in v1.0.0

type MockConsumerGroupClaim struct {
	// contains filtered or unexported fields
}

MockConsumerGroupClaim mocks the consumergroupclaim

func NewMockConsumerGroupClaim added in v1.0.0

func NewMockConsumerGroupClaim(topic string, partition int32) *MockConsumerGroupClaim

NewMockConsumerGroupClaim creates a new mocksconsumergroupclaim

func (*MockConsumerGroupClaim) HighWaterMarkOffset added in v1.0.0

func (cgc *MockConsumerGroupClaim) HighWaterMarkOffset() int64

HighWaterMarkOffset returns the hwm offset

func (*MockConsumerGroupClaim) InitialOffset added in v1.0.0

func (cgc *MockConsumerGroupClaim) InitialOffset() int64

InitialOffset returns the initial offset

func (*MockConsumerGroupClaim) Messages added in v1.0.0

func (cgc *MockConsumerGroupClaim) Messages() <-chan *sarama.ConsumerMessage

Messages returns the message channel that must be

func (*MockConsumerGroupClaim) Partition added in v1.0.0

func (cgc *MockConsumerGroupClaim) Partition() int32

Partition returns the partition

func (*MockConsumerGroupClaim) Topic added in v1.0.0

func (cgc *MockConsumerGroupClaim) Topic() string

Topic returns the current topic of the claim

type MockConsumerGroupSession added in v1.0.0

type MockConsumerGroupSession struct {
	// contains filtered or unexported fields
}

MockConsumerGroupSession mocks the consumer group session used for testing

func (*MockConsumerGroupSession) Claims added in v1.0.0

func (cgs *MockConsumerGroupSession) Claims() map[string][]int32

Claims returns the number of partitions assigned in the group session for each topic

func (*MockConsumerGroupSession) Commit added in v1.0.1

func (cgs *MockConsumerGroupSession) Commit()

Commit the offset to the backend

func (*MockConsumerGroupSession) Context added in v1.0.0

func (cgs *MockConsumerGroupSession) Context() context.Context

Context returns the consumer group's context

func (*MockConsumerGroupSession) GenerationID added in v1.0.0

func (cgs *MockConsumerGroupSession) GenerationID() int32

GenerationID returns the generation ID of the group consumer

func (*MockConsumerGroupSession) MarkMessage added in v1.0.0

func (cgs *MockConsumerGroupSession) MarkMessage(msg *sarama.ConsumerMessage, metadata string)

MarkMessage marks the passed message as consumed

func (*MockConsumerGroupSession) MarkOffset added in v1.0.0

func (cgs *MockConsumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string)

MarkOffset marks the passed offset consumed in topic/partition

func (*MockConsumerGroupSession) MemberID added in v1.0.0

func (cgs *MockConsumerGroupSession) MemberID() string

MemberID returns the member ID TOOD: clarify what that actually means and whether we need to mock taht somehow

func (*MockConsumerGroupSession) ResetOffset added in v1.0.0

func (cgs *MockConsumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string)

ResetOffset resets the offset to be consumed from

func (*MockConsumerGroupSession) SendMessage added in v1.0.0

func (cgs *MockConsumerGroupSession) SendMessage(msg *sarama.ConsumerMessage)

SendMessage sends a message to the consumer

type MockProducer added in v1.0.0

type MockProducer struct {
	// contains filtered or unexported fields
}

MockProducer is a mock of Producer interface

func NewMockProducer added in v1.0.0

func NewMockProducer(ctrl *gomock.Controller) *MockProducer

NewMockProducer creates a new mock instance

func (*MockProducer) Close added in v1.0.0

func (m *MockProducer) Close() error

Close mocks base method

func (*MockProducer) EXPECT added in v1.0.0

EXPECT returns an object that allows the caller to indicate expected use

func (*MockProducer) Emit added in v1.0.0

func (m *MockProducer) Emit(arg0, arg1 string, arg2 []byte) *Promise

Emit mocks base method

func (*MockProducer) EmitWithHeaders added in v1.0.1

func (m *MockProducer) EmitWithHeaders(arg0, arg1 string, arg2 []byte, arg3 map[string][]byte) *Promise

EmitWithHeaders mocks base method

type MockProducerMockRecorder added in v1.0.0

type MockProducerMockRecorder struct {
	// contains filtered or unexported fields
}

MockProducerMockRecorder is the mock recorder for MockProducer

func (*MockProducerMockRecorder) Close added in v1.0.0

func (mr *MockProducerMockRecorder) Close() *gomock.Call

Close indicates an expected call of Close

func (*MockProducerMockRecorder) Emit added in v1.0.0

func (mr *MockProducerMockRecorder) Emit(arg0, arg1, arg2 interface{}) *gomock.Call

Emit indicates an expected call of Emit

func (*MockProducerMockRecorder) EmitWithHeaders added in v1.0.1

func (mr *MockProducerMockRecorder) EmitWithHeaders(arg0, arg1, arg2, arg3 interface{}) *gomock.Call

EmitWithHeaders indicates an expected call of EmitWithHeaders

type MockStorage added in v1.0.0

type MockStorage struct {
	// contains filtered or unexported fields
}

MockStorage is a mock of Storage interface

func NewMockStorage added in v1.0.0

func NewMockStorage(ctrl *gomock.Controller) *MockStorage

NewMockStorage creates a new mock instance

func (*MockStorage) Close added in v1.0.0

func (m *MockStorage) Close() error

Close mocks base method

func (*MockStorage) Delete added in v1.0.0

func (m *MockStorage) Delete(arg0 string) error

Delete mocks base method

func (*MockStorage) EXPECT added in v1.0.0

func (m *MockStorage) EXPECT() *MockStorageMockRecorder

EXPECT returns an object that allows the caller to indicate expected use

func (*MockStorage) Get added in v1.0.0

func (m *MockStorage) Get(arg0 string) ([]byte, error)

Get mocks base method

func (*MockStorage) GetOffset added in v1.0.0

func (m *MockStorage) GetOffset(arg0 int64) (int64, error)

GetOffset mocks base method

func (*MockStorage) Has added in v1.0.0

func (m *MockStorage) Has(arg0 string) (bool, error)

Has mocks base method

func (*MockStorage) Iterator added in v1.0.0

func (m *MockStorage) Iterator() (storage.Iterator, error)

Iterator mocks base method

func (*MockStorage) IteratorWithRange added in v1.0.0

func (m *MockStorage) IteratorWithRange(arg0, arg1 []byte) (storage.Iterator, error)

IteratorWithRange mocks base method

func (*MockStorage) MarkRecovered added in v1.0.0

func (m *MockStorage) MarkRecovered() error

MarkRecovered mocks base method

func (*MockStorage) Open added in v1.0.0

func (m *MockStorage) Open() error

Open mocks base method

func (*MockStorage) Recovered added in v1.0.0

func (m *MockStorage) Recovered() bool

Recovered mocks base method

func (*MockStorage) Set added in v1.0.0

func (m *MockStorage) Set(arg0 string, arg1 []byte) error

Set mocks base method

func (*MockStorage) SetOffset added in v1.0.0

func (m *MockStorage) SetOffset(arg0 int64) error

SetOffset mocks base method

type MockStorageMockRecorder added in v1.0.0

type MockStorageMockRecorder struct {
	// contains filtered or unexported fields
}

MockStorageMockRecorder is the mock recorder for MockStorage

func (*MockStorageMockRecorder) Close added in v1.0.0

func (mr *MockStorageMockRecorder) Close() *gomock.Call

Close indicates an expected call of Close

func (*MockStorageMockRecorder) Delete added in v1.0.0

func (mr *MockStorageMockRecorder) Delete(arg0 interface{}) *gomock.Call

Delete indicates an expected call of Delete

func (*MockStorageMockRecorder) Get added in v1.0.0

func (mr *MockStorageMockRecorder) Get(arg0 interface{}) *gomock.Call

Get indicates an expected call of Get

func (*MockStorageMockRecorder) GetOffset added in v1.0.0

func (mr *MockStorageMockRecorder) GetOffset(arg0 interface{}) *gomock.Call

GetOffset indicates an expected call of GetOffset

func (*MockStorageMockRecorder) Has added in v1.0.0

func (mr *MockStorageMockRecorder) Has(arg0 interface{}) *gomock.Call

Has indicates an expected call of Has

func (*MockStorageMockRecorder) Iterator added in v1.0.0

func (mr *MockStorageMockRecorder) Iterator() *gomock.Call

Iterator indicates an expected call of Iterator

func (*MockStorageMockRecorder) IteratorWithRange added in v1.0.0

func (mr *MockStorageMockRecorder) IteratorWithRange(arg0, arg1 interface{}) *gomock.Call

IteratorWithRange indicates an expected call of IteratorWithRange

func (*MockStorageMockRecorder) MarkRecovered added in v1.0.0

func (mr *MockStorageMockRecorder) MarkRecovered() *gomock.Call

MarkRecovered indicates an expected call of MarkRecovered

func (*MockStorageMockRecorder) Open added in v1.0.0

func (mr *MockStorageMockRecorder) Open() *gomock.Call

Open indicates an expected call of Open

func (*MockStorageMockRecorder) Recovered added in v1.0.0

func (mr *MockStorageMockRecorder) Recovered() *gomock.Call

Recovered indicates an expected call of Recovered

func (*MockStorageMockRecorder) Set added in v1.0.0

func (mr *MockStorageMockRecorder) Set(arg0, arg1 interface{}) *gomock.Call

Set indicates an expected call of Set

func (*MockStorageMockRecorder) SetOffset added in v1.0.0

func (mr *MockStorageMockRecorder) SetOffset(arg0 interface{}) *gomock.Call

SetOffset indicates an expected call of SetOffset

type MockTopicManager added in v1.0.0

type MockTopicManager struct {
	// contains filtered or unexported fields
}

MockTopicManager is a mock of TopicManager interface

func NewMockTopicManager added in v1.0.0

func NewMockTopicManager(ctrl *gomock.Controller) *MockTopicManager

NewMockTopicManager creates a new mock instance

func (*MockTopicManager) Close added in v1.0.0

func (m *MockTopicManager) Close() error

Close mocks base method

func (*MockTopicManager) EXPECT added in v1.0.0

EXPECT returns an object that allows the caller to indicate expected use

func (*MockTopicManager) EnsureStreamExists added in v1.0.0

func (m *MockTopicManager) EnsureStreamExists(arg0 string, arg1 int) error

EnsureStreamExists mocks base method

func (*MockTopicManager) EnsureTableExists added in v1.0.0

func (m *MockTopicManager) EnsureTableExists(arg0 string, arg1 int) error

EnsureTableExists mocks base method

func (*MockTopicManager) EnsureTopicExists added in v1.0.0

func (m *MockTopicManager) EnsureTopicExists(arg0 string, arg1, arg2 int, arg3 map[string]string) error

EnsureTopicExists mocks base method

func (*MockTopicManager) GetOffset added in v1.0.0

func (m *MockTopicManager) GetOffset(arg0 string, arg1 int32, arg2 int64) (int64, error)

GetOffset mocks base method

func (*MockTopicManager) Partitions added in v1.0.0

func (m *MockTopicManager) Partitions(arg0 string) ([]int32, error)

Partitions mocks base method

type MockTopicManagerMockRecorder added in v1.0.0

type MockTopicManagerMockRecorder struct {
	// contains filtered or unexported fields
}

MockTopicManagerMockRecorder is the mock recorder for MockTopicManager

func (*MockTopicManagerMockRecorder) Close added in v1.0.0

Close indicates an expected call of Close

func (*MockTopicManagerMockRecorder) EnsureStreamExists added in v1.0.0

func (mr *MockTopicManagerMockRecorder) EnsureStreamExists(arg0, arg1 interface{}) *gomock.Call

EnsureStreamExists indicates an expected call of EnsureStreamExists

func (*MockTopicManagerMockRecorder) EnsureTableExists added in v1.0.0

func (mr *MockTopicManagerMockRecorder) EnsureTableExists(arg0, arg1 interface{}) *gomock.Call

EnsureTableExists indicates an expected call of EnsureTableExists

func (*MockTopicManagerMockRecorder) EnsureTopicExists added in v1.0.0

func (mr *MockTopicManagerMockRecorder) EnsureTopicExists(arg0, arg1, arg2, arg3 interface{}) *gomock.Call

EnsureTopicExists indicates an expected call of EnsureTopicExists

func (*MockTopicManagerMockRecorder) GetOffset added in v1.0.0

func (mr *MockTopicManagerMockRecorder) GetOffset(arg0, arg1, arg2 interface{}) *gomock.Call

GetOffset indicates an expected call of GetOffset

func (*MockTopicManagerMockRecorder) Partitions added in v1.0.0

func (mr *MockTopicManagerMockRecorder) Partitions(arg0 interface{}) *gomock.Call

Partitions indicates an expected call of Partitions

type NilHandling

type NilHandling int

NilHandling defines how nil messages should be handled by the processor.

const (
	// NilIgnore drops any message with nil value.
	NilIgnore NilHandling = 0 + iota
	// NilProcess passes the nil value to ProcessCallback.
	NilProcess
	// NilDecode passes the nil value to decoder before calling ProcessCallback.
	NilDecode
)

type OutputStats

type OutputStats struct {
	Count uint
	Bytes int
}

OutputStats represents the number of messages and the number of bytes emitted into a stream or table since the process started.

type PartitionProcStats added in v1.0.0

type PartitionProcStats struct {
	Now time.Time

	TableStats *TableStats

	Joined map[string]*TableStats

	Input  map[string]*InputStats
	Output map[string]*OutputStats
}

PartitionProcStats represents metrics and measurements of a partition processor

type PartitionProcessor added in v1.0.0

type PartitionProcessor struct {
	// contains filtered or unexported fields
}

PartitionProcessor handles message processing of one partition by serializing messages from different input topics. It also handles joined tables as well as lookup views (managed by `Processor`).

func (*PartitionProcessor) EnqueueMessage added in v1.0.0

func (pp *PartitionProcessor) EnqueueMessage(msg *sarama.ConsumerMessage)

EnqueueMessage enqueues a message in the partition processor's event channel for processing

func (*PartitionProcessor) Errors added in v1.0.0

func (pp *PartitionProcessor) Errors() <-chan error

Errors returns a channel or errors during consumption

func (*PartitionProcessor) Recovered added in v1.0.0

func (pp *PartitionProcessor) Recovered() bool

Recovered returns whether the processor is running (i.e. all joins, lookups and the table is recovered and it's consuming messages)

func (*PartitionProcessor) Setup added in v1.0.0

func (pp *PartitionProcessor) Setup(ctx context.Context) error

Setup initializes the processor after a rebalance

func (*PartitionProcessor) Stop added in v1.0.0

func (pp *PartitionProcessor) Stop() error

Stop stops the partition processor

type PartitionStatus

type PartitionStatus int

PartitionStatus is the status of the partition of a table (group table or joined table).

const (
	// PartitionStopped indicates the partition stopped and should not be used anymore.
	PartitionStopped PartitionStatus = iota
	// PartitionInitializing indicates that the underlying storage is initializing (e.g. opening leveldb files),
	// and has not actually started working yet.
	PartitionInitializing
	// PartitionConnecting indicates the partition trying to (re-)connect to Kafka
	PartitionConnecting
	// PartitionRecovering indicates the partition is recovering and the storage
	// is writing updates in bulk-mode (if the storage implementation supports it).
	PartitionRecovering
	// PartitionPreparing indicates the end of the bulk-mode. Depending on the storage
	// implementation, the Preparing phase may take long because the storage compacts its logs.
	PartitionPreparing
	// PartitionRunning indicates the partition is recovered and processing updates
	// in normal operation.
	PartitionRunning
)

type PartitionTable added in v1.0.0

type PartitionTable struct {
	// contains filtered or unexported fields
}

PartitionTable manages the usage of a table for one partition. It allows to setup and recover/catchup the table contents from kafka, allow updates via Get/Set/Delete accessors

func (*PartitionTable) CatchupForever added in v1.0.0

func (p *PartitionTable) CatchupForever(ctx context.Context, restartOnError bool) error

CatchupForever starts catching the partition table forever (until the context is cancelled). Option restartOnError allows the view to stay open/intact even in case of consumer errors

func (*PartitionTable) Close added in v1.0.0

func (p *PartitionTable) Close() error

Close closes the partition table

func (*PartitionTable) CurrentState added in v1.0.0

func (p *PartitionTable) CurrentState() PartitionStatus

CurrentState returns the partition's current status

func (*PartitionTable) Delete added in v1.0.0

func (p *PartitionTable) Delete(key string) error

Delete removes the passed key from the partition table by deleting from the underlying storage

func (*PartitionTable) Get added in v1.0.0

func (p *PartitionTable) Get(key string) ([]byte, error)

Get returns the value for passed key

func (*PartitionTable) GetOffset added in v1.0.0

func (p *PartitionTable) GetOffset(defValue int64) (int64, error)

GetOffset returns the magic offset value from storage

func (*PartitionTable) Has added in v1.0.0

func (p *PartitionTable) Has(key string) (bool, error)

Has returns whether the storage contains passed key

func (*PartitionTable) IsRecovered added in v1.0.0

func (p *PartitionTable) IsRecovered() bool

IsRecovered returns whether the partition table is recovered

func (*PartitionTable) RunStatsLoop added in v1.0.0

func (p *PartitionTable) RunStatsLoop(ctx context.Context)

RunStatsLoop starts the handler for stats requests. This loop runs detached from the recover/catchup mechanism so clients can always request stats even if the partition table is not running (like a processor table after it's recovered).

func (*PartitionTable) Set added in v1.0.0

func (p *PartitionTable) Set(key string, value []byte) error

Set sets a key value key in the partition table by modifying the underlying storage

func (*PartitionTable) SetOffset added in v1.0.0

func (p *PartitionTable) SetOffset(value int64) error

SetOffset sets the magic offset value in storage

func (*PartitionTable) SetupAndRecover added in v1.0.0

func (p *PartitionTable) SetupAndRecover(ctx context.Context, restartOnError bool) error

SetupAndRecover sets up the partition storage and recovers to HWM

func (*PartitionTable) TrackMessageWrite added in v1.0.0

func (p *PartitionTable) TrackMessageWrite(ctx context.Context, length int)

TrackMessageWrite updates the write stats to passed length

func (*PartitionTable) WaitRecovered added in v1.0.0

func (p *PartitionTable) WaitRecovered() chan struct{}

WaitRecovered returns a channel that closes when the partition table enters state `PartitionRunning`

type ProcessCallback

type ProcessCallback func(ctx Context, msg interface{})

ProcessCallback function is called for every message received by the processor.

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor is a set of stateful callback functions that, on the arrival of messages, modify the content of a table (the group table) and emit messages into other topics. Messages as well as rows in the group table are key-value pairs. A group is composed by multiple processor instances.

func NewProcessor

func NewProcessor(brokers []string, gg *GroupGraph, options ...ProcessorOption) (*Processor, error)

NewProcessor creates a processor instance in a group given the address of Kafka brokers, the consumer group name, a list of subscriptions (topics, codecs, and callbacks), and series of options.

func (*Processor) Cleanup added in v1.0.0

func (g *Processor) Cleanup(session sarama.ConsumerGroupSession) error

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited but before the offsets are committed for the very last time.

func (*Processor) ConsumeClaim added in v1.0.0

func (g *Processor) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). Once the Messages() channel is closed, the Handler must finish its processing loop and exit.

func (*Processor) Get

func (g *Processor) Get(key string) (interface{}, error)

Get returns a read-only copy of a value from the group table if the respective partition is owned by the processor instace. Get can be called by multiple goroutines concurrently. Get can be only used with stateful processors (ie, when group table is enabled) and after Recovered returns true.

func (*Processor) Graph

func (g *Processor) Graph() *GroupGraph

Graph returns the group graph of the processor.

func (*Processor) Recovered

func (g *Processor) Recovered() bool

Recovered returns whether the processor is running, i.e. if the processor has recovered all lookups/joins/tables and is running

func (*Processor) Run

func (g *Processor) Run(ctx context.Context) (rerr error)

Run starts the processor using passed context. The processor stops in case of errors or if the context is cancelled

func (*Processor) Setup added in v1.0.0

func (g *Processor) Setup(session sarama.ConsumerGroupSession) error

Setup is run at the beginning of a new session, before ConsumeClaim.

func (*Processor) Stats

func (g *Processor) Stats() *ProcessorStats

Stats returns the aggregated stats for the processor including all partitions, tables, lookups and joins

func (*Processor) StatsWithContext added in v1.0.0

func (g *Processor) StatsWithContext(ctx context.Context) *ProcessorStats

StatsWithContext returns stats for the processor, see #Processor.Stats()

func (*Processor) Stop added in v1.0.0

func (g *Processor) Stop()

Stop stops the processor. This is semantically equivalent of closing the Context that was passed to Processor.Run(..). This method will return immediately, errors during running will be returned from teh Processor.Run(..)

func (*Processor) WaitForReady added in v1.0.0

func (g *Processor) WaitForReady()

WaitForReady waits until the processor is ready to consume messages (or is actually consuming messages) i.e., it is done catching up all partition tables, joins and lookup tables

type ProcessorOption

type ProcessorOption func(*poptions, *GroupGraph)

ProcessorOption defines a configuration option to be used when creating a processor.

func WithBackoffBuilder added in v1.0.0

func WithBackoffBuilder(bb BackoffBuilder) ProcessorOption

WithBackoffBuilder replaced the default backoff.

func WithBackoffResetTimeout added in v1.0.0

func WithBackoffResetTimeout(duration time.Duration) ProcessorOption

WithBackoffResetTimeout defines the timeout when the backoff will be reset.

func WithClientID

func WithClientID(clientID string) ProcessorOption

WithClientID defines the client ID used to identify with Kafka.

func WithConsumerGroupBuilder added in v1.0.0

func WithConsumerGroupBuilder(cgb ConsumerGroupBuilder) ProcessorOption

WithConsumerGroupBuilder replaces the default consumer group builder

func WithConsumerSaramaBuilder added in v1.0.0

func WithConsumerSaramaBuilder(cgb SaramaConsumerBuilder) ProcessorOption

WithConsumerSaramaBuilder replaces the default consumer group builder

func WithGroupGraphHook added in v0.1.2

func WithGroupGraphHook(hook func(gg *GroupGraph)) ProcessorOption

WithGroupGraphHook allows a function to obtain the group graph when a processor is started.

func WithHasher

func WithHasher(hasher func() hash.Hash32) ProcessorOption

WithHasher sets the hash function that assigns keys to partitions.

func WithLogger

func WithLogger(log logger.Logger) ProcessorOption

WithLogger sets the logger the processor should use. By default, processors use the standard library logger.

func WithNilHandling

func WithNilHandling(nh NilHandling) ProcessorOption

WithNilHandling configures how the processor should handle messages with nil value. By default the processor ignores nil messages.

func WithPartitionChannelSize

func WithPartitionChannelSize(size int) ProcessorOption

WithPartitionChannelSize replaces the default partition channel size. This is mostly used for testing by setting it to 0 to have synchronous behavior of goka.

func WithProducerBuilder

func WithProducerBuilder(pb ProducerBuilder) ProcessorOption

WithProducerBuilder replaces the default producer builder.

func WithRebalanceCallback added in v0.1.3

func WithRebalanceCallback(cb RebalanceCallback) ProcessorOption

WithRebalanceCallback sets the callback for when a new partition assignment is received. By default, this is an empty function.

func WithStorageBuilder

func WithStorageBuilder(sb storage.Builder) ProcessorOption

WithStorageBuilder defines a builder for the storage of each partition.

func WithTester

func WithTester(t Tester) ProcessorOption

WithTester configures all external connections of a processor, ie, storage, consumer and producer

func WithTopicManagerBuilder

func WithTopicManagerBuilder(tmb TopicManagerBuilder) ProcessorOption

WithTopicManagerBuilder replaces the default topic manager builder.

func WithUpdateCallback

func WithUpdateCallback(cb UpdateCallback) ProcessorOption

WithUpdateCallback defines the callback called upon recovering a message from the log.

type ProcessorStats

type ProcessorStats struct {
	Group  map[int32]*PartitionProcStats
	Lookup map[string]*ViewStats
}

ProcessorStats represents the metrics of all partitions of the processor, including its group, joined tables and lookup tables.

type Producer added in v1.0.0

type Producer interface {
	// Emit sends a message to topic.
	Emit(topic string, key string, value []byte) *Promise
	EmitWithHeaders(topic string, key string, value []byte, headers map[string][]byte) *Promise
	Close() error
}

Producer abstracts the kafka producer

func DefaultProducerBuilder added in v1.0.0

func DefaultProducerBuilder(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error)

DefaultProducerBuilder creates a Kafka producer using the Sarama library.

func NewProducer added in v1.0.0

func NewProducer(brokers []string, config *sarama.Config) (Producer, error)

NewProducer creates new kafka producer for passed brokers.

type ProducerBuilder added in v1.0.0

type ProducerBuilder func(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error)

ProducerBuilder create a Kafka producer.

func ProducerBuilderWithConfig added in v1.0.0

func ProducerBuilderWithConfig(config *sarama.Config) ProducerBuilder

ProducerBuilderWithConfig creates a Kafka consumer using the Sarama library.

type Promise added in v1.0.0

type Promise struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Promise as in https://en.wikipedia.org/wiki/Futures_and_promises

func NewPromise added in v1.0.0

func NewPromise() *Promise

NewPromise creates a new Promise

func (*Promise) Finish added in v1.0.0

func (p *Promise) Finish(msg *sarama.ProducerMessage, err error) *Promise

Finish finishes the promise by executing all callbacks and saving the message/error for late subscribers

func (*Promise) Then added in v1.0.0

func (p *Promise) Then(callback func(err error)) *Promise

Then chains a callback to the Promise

func (*Promise) ThenWithMessage added in v1.0.0

func (p *Promise) ThenWithMessage(callback func(msg *sarama.ProducerMessage, err error)) *Promise

ThenWithMessage chains a callback to the Promise

type RebalanceCallback added in v0.1.3

type RebalanceCallback func(a Assignment)

RebalanceCallback is invoked when the processor receives a new partition assignment.

type RecoveryStats added in v1.0.0

type RecoveryStats struct {
	StartTime    time.Time
	RecoveryTime time.Time

	Offset int64 // last offset processed or recovered
	Hwm    int64 // next offset to be written
}

RecoveryStats groups statistics during recovery

type SaramaConsumerBuilder added in v1.0.0

type SaramaConsumerBuilder func(brokers []string, clientID string) (sarama.Consumer, error)

SaramaConsumerBuilder creates a `sarama.Consumer`

func SaramaConsumerBuilderWithConfig added in v1.0.0

func SaramaConsumerBuilderWithConfig(config *sarama.Config) SaramaConsumerBuilder

SaramaConsumerBuilderWithConfig creates a sarama consumer using passed config

type Signal added in v1.0.0

type Signal struct {
	// contains filtered or unexported fields
}

Signal allows synchronization on a state, waiting for that state and checking the current state

func NewSignal added in v1.0.0

func NewSignal(states ...State) *Signal

NewSignal creates a new Signal based on the states

func (*Signal) IsState added in v1.0.0

func (s *Signal) IsState(state State) bool

IsState returns if the signal is in the requested state

func (*Signal) ObserveStateChange added in v1.0.0

func (s *Signal) ObserveStateChange() *StateChangeObserver

ObserveStateChange returns a channel that receives state changes. Note that the caller must take care of consuming that channel, otherwise the Signal will block upon state changes.

func (*Signal) SetState added in v1.0.0

func (s *Signal) SetState(state State) *Signal

SetState changes the state of the signal and notifies all goroutines waiting for the new state

func (*Signal) State added in v1.0.0

func (s *Signal) State() State

State returns the current state

func (*Signal) WaitForState added in v1.0.0

func (s *Signal) WaitForState(state State) chan struct{}

WaitForState returns a channel that closes when the signal reaches passed state.

func (*Signal) WaitForStateMin added in v1.0.0

func (s *Signal) WaitForStateMin(state State) chan struct{}

WaitForStateMin returns a channel that will be closed, when the signal enters passed state or higher (states are ints, so we're just comparing ints here)

type State added in v1.0.0

type State int

State types a state of the Signal

const (
	// PPStateIdle marks the partition processor as idling (not started yet)
	PPStateIdle State = iota
	// PPStateRecovering indicates a recovering partition processor
	PPStateRecovering
	// PPStateRunning indicates a running partition processor
	PPStateRunning
	// PPStateStopping indicates a stopped partition processor
	PPStateStopping
)
const (
	// ProcStateIdle indicates an idling partition processor (not started yet)
	ProcStateIdle State = iota
	// ProcStateStarting indicates a starting partition processor, i.e. before rebalance
	ProcStateStarting
	// ProcStateSetup indicates a partition processor during setup of a rebalance round
	ProcStateSetup
	// ProcStateRunning indicates a running partition processor
	ProcStateRunning
	// ProcStateStopping indicates a stopping partition processor
	ProcStateStopping
)

type StateChangeObserver added in v1.0.0

type StateChangeObserver struct {
	// contains filtered or unexported fields
}

StateChangeObserver wraps a channel that triggers when the signal's state changes

func (*StateChangeObserver) C added in v1.0.0

func (s *StateChangeObserver) C() <-chan State

C returns the channel to observer state changes

func (*StateChangeObserver) Stop added in v1.0.0

func (s *StateChangeObserver) Stop()

Stop stops the observer. Its update channel will be closed and

type Stream

type Stream string

Stream is the name of an event stream topic in Kafka, ie, a topic with cleanup.policy=delete

type Streams

type Streams []Stream

Streams is a slice of Stream names.

func StringsToStreams added in v1.0.0

func StringsToStreams(strings ...string) Streams

StringsToStreams is a simple cast/conversion functions that allows to pass a slice of strings as a slice of Stream (Streams) Avoids the boilerplate loop over the string array that would be necessary otherwise.

Example
inputTopics := []string{"input1",
	"input2",
	"input3",
}

// use it, e.g. in the Inputs-Edge in the group graph
graph := DefineGroup("group",
	Inputs(StringsToStreams(inputTopics...), new(codec.String), func(ctx Context, msg interface{}) {}),
)
_ = graph
Output:

type Table

type Table string

Table is the name of a table topic in Kafka, ie, a topic with cleanup.policy=compact

func GroupTable

func GroupTable(group Group) Table

GroupTable returns the name of the group table of group.

type TableStats added in v1.0.0

type TableStats struct {
	Stalled bool

	Status PartitionStatus

	Recovery *RecoveryStats

	Input  *InputStats
	Writes *OutputStats
}

TableStats represents stats for a table partition

type Tester

type Tester interface {
	StorageBuilder() storage.Builder
	ProducerBuilder() ProducerBuilder
	ConsumerGroupBuilder() ConsumerGroupBuilder
	ConsumerBuilder() SaramaConsumerBuilder
	EmitterProducerBuilder() ProducerBuilder
	TopicManagerBuilder() TopicManagerBuilder
	RegisterGroupGraph(*GroupGraph) string
	RegisterEmitter(Stream, Codec)
	RegisterView(Table, Codec) string
}

Tester interface to avoid import cycles when a processor needs to register to the tester.

type TopicManager added in v1.0.0

type TopicManager interface {
	// EnsureTableExists checks that a table (log-compacted topic) exists, or create one if possible
	EnsureTableExists(topic string, npar int) error
	// EnsureStreamExists checks that a stream topic exists, or create one if possible
	EnsureStreamExists(topic string, npar int) error
	// EnsureTopicExists checks that a topic exists, or create one if possible,
	// enforcing the given configuration
	EnsureTopicExists(topic string, npar, rfactor int, config map[string]string) error

	// Partitions returns the number of partitions of a topic, that are assigned to the running
	// instance, i.e. it doesn't represent all partitions of a topic.
	Partitions(topic string) ([]int32, error)

	GetOffset(topic string, partitionID int32, time int64) (int64, error)

	// Close closes the topic manager
	Close() error
}

TopicManager provides an interface to create/check topics and their partitions

func DefaultTopicManagerBuilder added in v1.0.0

func DefaultTopicManagerBuilder(brokers []string) (TopicManager, error)

DefaultTopicManagerBuilder creates TopicManager using the Sarama library.

func NewTopicManager added in v1.0.0

func NewTopicManager(brokers []string, saramaConfig *sarama.Config, topicManagerConfig *TopicManagerConfig) (TopicManager, error)

NewTopicManager creates a new topic manager using the sarama library

type TopicManagerBuilder added in v1.0.0

type TopicManagerBuilder func(brokers []string) (TopicManager, error)

TopicManagerBuilder creates a TopicManager to check partition counts and create tables.

func TopicManagerBuilderWithConfig added in v1.0.0

func TopicManagerBuilderWithConfig(config *sarama.Config, tmConfig *TopicManagerConfig) TopicManagerBuilder

TopicManagerBuilderWithConfig creates TopicManager using the Sarama library.

func TopicManagerBuilderWithTopicManagerConfig added in v1.0.0

func TopicManagerBuilderWithTopicManagerConfig(tmConfig *TopicManagerConfig) TopicManagerBuilder

TopicManagerBuilderWithTopicManagerConfig creates TopicManager using the Sarama library.

type TopicManagerConfig added in v1.0.0

type TopicManagerConfig struct {
	Table struct {
		Replication int
	}
	Stream struct {
		Replication int
		Retention   time.Duration
	}
}

TopicManagerConfig contains the configuration to access the Zookeeper servers as well as the desired options of to create tables and stream topics.

func NewTopicManagerConfig added in v1.0.0

func NewTopicManagerConfig() *TopicManagerConfig

NewTopicManagerConfig provides a default configuration for auto-creation with replication factor of 1 and rentention time of 1 hour.

type UpdateCallback

type UpdateCallback func(s storage.Storage, partition int32, key string, value []byte) error

UpdateCallback is invoked upon arrival of a message for a table partition. The partition storage shall be updated in the callback.

type View

type View struct {
	// contains filtered or unexported fields
}

View is a materialized (i.e. persistent) cache of a group table.

Example

This example shows how views are typically created and used in the most basic way.

// create a new view
view, err := NewView([]string{"localhost:9092"},
	"input-topic",
	new(codec.String))

if err != nil {
	log.Fatalf("error creating view: %v", err)
}

// provide a cancelable
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// start the view
done := make(chan struct{})
go func() {
	defer close(done)
	err := view.Run(ctx)
	if err != nil {
		log.Fatalf("Error running view: %v", err)
	}
}()

// wait for the view to be recovered

// Option A: by polling
for !view.Recovered() {
	select {
	case <-ctx.Done():
		return
	case <-time.After(time.Second):
	}
}

// Option B: by waiting for the signal
<-view.WaitRunning()

// retrieve a value from the view
val, err := view.Get("some-key")
if err != nil {
	log.Fatalf("Error getting item from view: %v", err)
}

if val != nil {
	// cast it to string
	// no need for type assertion, if it was not that type, the codec would've failed
	log.Printf("got value %s", val.(string))
}

has, err := view.Has("some-key")
if err != nil {
	log.Fatalf("Error getting item from view: %v", err)
}

_ = has

// stop the view and wait for it to shut down before returning
cancel()
<-done
Output:

Example (Autoreconnect)
// create a new view
view, err := NewView([]string{"localhost:9092"},
	"input-topic",
	new(codec.String),

	// Automatically reconnect in case of errors. This is useful for services where availability
	// is more important than the data being up to date in case of kafka connection issues.
	WithViewAutoReconnect(),

	// Reconnect uses a default backoff mechanism, that can be modified by providing
	// a custom backoff builder using
	// WithViewBackoffBuilder(customBackoffBuilder),

	// When the view is running successfully for some time, the backoff is reset.
	// This time range can be modified using
	// WithViewBackoffResetTimeout(3*time.Second),
)

if err != nil {
	log.Fatalf("error creating view: %v", err)
}

ctx, cancel := context.WithCancel(context.Background())
// start the view
done := make(chan struct{})
go func() {
	defer close(done)
	err := view.Run(ctx)
	if err != nil {
		log.Fatalf("Error running view: %v", err)
	}
}()

<-view.WaitRunning()
// at this point we can safely use the view with Has/Get/Iterate,
// even if the kafka connection is lost

// Stop the view and wait for it to shutdown before returning
cancel()
<-done
Output:

func NewView

func NewView(brokers []string, topic Table, codec Codec, options ...ViewOption) (*View, error)

NewView creates a new View object from a group.

func (*View) CurrentState added in v1.0.0

func (v *View) CurrentState() ViewState

CurrentState returns the current ViewState of the view This is useful for polling e.g. when implementing health checks or metrics

func (*View) Evict

func (v *View) Evict(key string) error

Evict removes the given key only from the local cache. In order to delete a key from Kafka and other Views, context.Delete should be used on a Processor.

func (*View) Get

func (v *View) Get(key string) (interface{}, error)

Get returns the value for the key in the view, if exists. Nil if it doesn't. Get can be called by multiple goroutines concurrently. Get can only be called after Recovered returns true.

func (*View) Has

func (v *View) Has(key string) (bool, error)

Has checks whether a value for passed key exists in the view.

func (*View) Iterator

func (v *View) Iterator() (Iterator, error)

Iterator returns an iterator that iterates over the state of the View.

func (*View) IteratorWithRange

func (v *View) IteratorWithRange(start, limit string) (Iterator, error)

IteratorWithRange returns an iterator that iterates over the state of the View. This iterator is build using the range.

func (*View) ObserveStateChanges added in v1.0.0

func (v *View) ObserveStateChanges() *StateChangeObserver

ObserveStateChanges returns a StateChangeObserver that allows to handle state changes of the view by reading from a channel. It is crucial to continuously read from that channel, otherwise the View might deadlock upon state changes. If the observer is not needed, the caller must call observer.Stop()

Example

view := goka.NewView(...)
go view.Run(ctx)

go func(){
  obs := view.ObserveStateChanges()
  defer obs.Stop()
  for {
    select{
      case state, ok := <-obs.C:
        // handle state (or closed channel)
      case <-ctx.Done():
    }
  }
}()

func (*View) Recovered

func (v *View) Recovered() bool

Recovered returns true when the view has caught up with events from kafka.

func (*View) Run

func (v *View) Run(ctx context.Context) (rerr error)

Run starts consuming the view's topic and saving updates in the local persistent cache.

The view will shutdown in case of errors or when the context is closed. It can be initialized with autoreconnect

view := NewView(..., WithViewAutoReconnect())

which makes the view internally reconnect in case of errors. Then it will only stop by canceling the context (see example).

func (*View) Stats

func (v *View) Stats(ctx context.Context) *ViewStats

Stats returns a set of performance metrics of the view.

func (*View) Topic

func (v *View) Topic() string

Topic returns the view's topic

func (*View) WaitRunning added in v1.0.0

func (v *View) WaitRunning() <-chan struct{}

WaitRunning returns a channel that will be closed when the view enters the running state

type ViewOption

type ViewOption func(*voptions, Table, Codec)

ViewOption defines a configuration option to be used when creating a view.

func WithViewAutoReconnect added in v1.0.0

func WithViewAutoReconnect() ViewOption

WithViewAutoReconnect defines the view is reconnecting internally, so Run() does not return in case of connection errors. The view must be shutdown by cancelling the context passed to Run()

func WithViewBackoffBuilder added in v1.0.0

func WithViewBackoffBuilder(bb BackoffBuilder) ViewOption

WithViewBackoffBuilder replaced the default backoff.

func WithViewBackoffResetTimeout added in v1.0.0

func WithViewBackoffResetTimeout(duration time.Duration) ViewOption

WithViewBackoffResetTimeout defines the timeout when the backoff will be reset.

func WithViewCallback

func WithViewCallback(cb UpdateCallback) ViewOption

WithViewCallback defines the callback called upon recovering a message from the log.

func WithViewClientID

func WithViewClientID(clientID string) ViewOption

WithViewClientID defines the client ID used to identify with Kafka.

func WithViewConsumerSaramaBuilder added in v1.0.0

func WithViewConsumerSaramaBuilder(cgb SaramaConsumerBuilder) ViewOption

WithViewConsumerSaramaBuilder replaces the default sarama consumer builder

func WithViewHasher

func WithViewHasher(hasher func() hash.Hash32) ViewOption

WithViewHasher sets the hash function that assigns keys to partitions.

func WithViewLogger

func WithViewLogger(log logger.Logger) ViewOption

WithViewLogger sets the logger the view should use. By default, views use the standard library logger.

func WithViewRestartable

func WithViewRestartable() ViewOption

WithViewRestartable is kept only for backwards compatibility. DEPRECATED: since the behavior has changed, this name is misleading and should be replaced by WithViewAutoReconnect().

func WithViewStorageBuilder

func WithViewStorageBuilder(sb storage.Builder) ViewOption

WithViewStorageBuilder defines a builder for the storage of each partition.

func WithViewTester added in v0.1.2

func WithViewTester(t Tester) ViewOption

WithViewTester configures all external connections of a processor, ie, storage, consumer and producer

func WithViewTopicManagerBuilder

func WithViewTopicManagerBuilder(tmb TopicManagerBuilder) ViewOption

WithViewTopicManagerBuilder replaces the default topic manager.

type ViewState added in v1.0.0

type ViewState int

ViewState represents the state of the view

const (
	// ViewStateIdle  - the view is not started yet
	ViewStateIdle ViewState = iota
	// ViewStateInitializing - the view (i.e. at least one partition) is initializing
	ViewStateInitializing
	// ViewStateConnecting - the view (i.e. at least one partition) is (re-)connecting
	ViewStateConnecting
	// ViewStateCatchUp - the view (i.e. at least one partition) is still catching up
	ViewStateCatchUp
	// ViewStateRunning - the view (i.e. all partitions) has caught up and is running
	ViewStateRunning
)

type ViewStats

type ViewStats struct {
	Partitions map[int32]*TableStats
}

ViewStats represents the metrics of all partitions of a view.

Directories

Path Synopsis
examples
internal
This package provides a kafka mock that allows integration testing of goka processors.
This package provides a kafka mock that allows integration testing of goka processors.
web

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL