goka

package module
v0.0.0-...-42652a5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 22, 2022 License: BSD-3-Clause Imports: 23 Imported by: 0

README

Goka

License Unit Tests/System Tests GoDoc Go Report Card

Goka is a compact yet powerful distributed stream processing library for Apache Kafka written in Go. Goka aims to reduce the complexity of building highly scalable and highly available microservices.

Goka extends the concept of Kafka consumer groups by binding a state table to them and persisting them in Kafka. Goka provides sane defaults and a pluggable architecture.

Features

  • Message Input and Output

    Goka handles all the message input and output for you. You only have to provide one or more callback functions that handle messages from any of the Kafka topics you are interested in. You only ever have to deal with deserialized messages.

  • Scaling

    Goka automatically distributes the processing and state across multiple instances of a service. This enables effortless scaling when the load increases.

  • Fault Tolerance

    In case of a failure, Goka will redistribute the failed instance's workload and state across the remaining healthy instances. All state is safely stored in Kafka and messages delivered with at-least-once semantics.

  • Built-in Monitoring and Introspection

    Goka provides a web interface for monitoring performance and querying values in the state.

  • Modularity

    Goka fosters a pluggable architecture which enables you to replace for example the storage layer or the Kafka communication layer.

Documentation

This README provides a brief, high level overview of the ideas behind Goka. A more detailed introduction of the project can be found in this blog post.

Package API documentation is available at GoDoc and the Wiki provides several tips for configuring, extending, and deploying Goka applications.

Installation

You can install Goka by running the following command:

$ go get -u github.com/iGGUss/goka

Configuration

Goka relies on Sarama to perform the actual communication with Kafka, which offers many configuration settings. The config is documented here.

In most cases, you need to modify the config, e.g. to set the Kafka Version.

cfg := goka.DefaultConfig()
cfg.Version = sarama.V2_4_0_0
goka.ReplaceGlobalConfig(cfg)

This makes all goka components use the updated config.

If you do need specific configuration for different components, you need to pass customized builders to the component's constructor, e.g.

cfg := goka.DefaultConfig()
// modify the config with component-specific settings


// use the config by creating a builder which allows to override global config
goka.NewProcessor(// ...,
	goka.WithConsumerGroupBuilder(
		goka.ConsumerGroupBuilderWithConfig(cfg),
	),
	// ...
)

Concepts

Goka relies on Kafka for message passing, fault-tolerant state storage and workload partitioning.

  • Emitters deliver key-value messages into Kafka. As an example, an emitter could be a database handler emitting the state changes into Kafka for other interested applications to consume.

  • Processor is a set of callback functions that consume and perform state transformations upon delivery of these emitted messages. Processor groups are formed of one or more instances of a processor. Goka distributes the partitions of the input topics across all processor instances in a processor group. This enables effortless scaling and fault-tolerance. If a processor instance fails, its partitions and state are reassigned to the remaining healthy members of the processor group. Processors can also emit further messages into Kafka.

  • Group table is the state of a processor group. It is a partitioned key-value table stored in Kafka that belongs to a single processor group. If a processor instance fails, the remaining instances will take over the group table partitions of the failed instance recovering them from Kafka.

  • Views are local caches of a complete group table. Views provide read-only access to the group tables and can be used to provide external services for example through a gRPC interface.

  • Local storage keeps a local copy of the group table partitions to speedup recovery and reduce memory utilization. By default, the local storage uses LevelDB, but in-memory map and Redis-based storage are also available.

Get Started

An example Goka application could look like the following. An emitter emits a single message with key "some-key" and value "some-value" into the "example-stream" topic. A processor processes the "example-stream" topic counting the number of messages delivered for "some-key". The counter is persisted in the "example-group-table" topic. To locally start a dockerized Zookeeper and Kafka instances, execute make start with the Makefile in the examples folder.

package main

import (
	"context"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/iGGUss/goka"
	"github.com/iGGUss/goka/codec"
)

var (
	brokers             = []string{"localhost:9092"}
	topic   goka.Stream = "example-stream"
	group   goka.Group  = "example-group"
)

// Emit messages forever every second
func runEmitter() {
	emitter, err := goka.NewEmitter(brokers, topic, new(codec.String))
	if err != nil {
		log.Fatalf("error creating emitter: %v", err)
	}
	defer emitter.Finish()
	for {
		time.Sleep(1 * time.Second)
		err = emitter.EmitSync("some-key", "some-value")
		if err != nil {
			log.Fatalf("error emitting message: %v", err)
		}
	}
}

// process messages until ctrl-c is pressed
func runProcessor() {
	// process callback is invoked for each message delivered from
	// "example-stream" topic.
	cb := func(ctx goka.Context, msg interface{}) {
		var counter int64
		// ctx.Value() gets from the group table the value that is stored for
		// the message's key.
		if val := ctx.Value(); val != nil {
			counter = val.(int64)
		}
		counter++
		// SetValue stores the incremented counter in the group table for in
		// the message's key.
		ctx.SetValue(counter)
		log.Printf("key = %s, counter = %v, msg = %v", ctx.Key(), counter, msg)
	}

	// Define a new processor group. The group defines all inputs, outputs, and
	// serialization formats. The group-table topic is "example-group-table".
	g := goka.DefineGroup(group,
		goka.Input(topic, new(codec.String), cb),
		goka.Persist(new(codec.Int64)),
	)

	p, err := goka.NewProcessor(brokers, g)
	if err != nil {
		log.Fatalf("error creating processor: %v", err)
	}
	ctx, cancel := context.WithCancel(context.Background())
	done := make(chan bool)
	go func() {
		defer close(done)
		if err = p.Run(ctx); err != nil {
			log.Fatalf("error running processor: %v", err)
		} else {
			log.Printf("Processor shutdown cleanly")
		}
	}()

	wait := make(chan os.Signal, 1)
	signal.Notify(wait, syscall.SIGINT, syscall.SIGTERM)
	<-wait   // wait for SIGINT/SIGTERM
	cancel() // gracefully stop processor
	<-done
}

func main() {
	go runEmitter() // emits one message and stops
	runProcessor()  // press ctrl-c to stop
}

A very similar example is also in 1-simplest. Just run go run examples/1-simplest/main.go.

Note that tables have to be configured in Kafka with log compaction. For details check the Wiki.

How to contribute

Contributions are always welcome. Please fork the repo, create a pull request against master, and be sure tests pass. See the GitHub Flow for details.

Documentation

Overview

Package goka is a stateful stream processing library for Apache Kafka (version 0.9+) that eases the development of microservices. Goka extends the concept of consumer group with a group table, which represents the state of the group. A microservice modifies and serves the content of a table employing two complementary object types: processors and views.

Processors

A processor is a set of callback functions that modify the group table when messages arrive and may also emit messages into other topics. Messages as well as rows in the group table are key-value pairs. Callbacks receive the arriving message and the row addressed by the message's key.

In Kafka, keys are used to partition topics. A goka processor consumes from a set of co-partitioned topics (topics with the same number of partitions and the same key range). A group topic keeps track of the group table updates, allowing for recovery and rebalancing of processors: When multiple processor instances start in the same consumer group, the instances split the co-partitioned input topics and load the respective group table partitions from the group topic. A local disk storage minimizes recovery time by caching partitions of group table.

Views

A view is a materialized (ie, persistent) cache of a group table. A view subscribes for the updates of all partitions of a group table and keeps local disk storage in sync with the group topic. With a view, one can easily serve up-to-date content of the group table via, for example, gRPC.

Package goka is a generated GoMock package.

Package goka is a generated GoMock package.

Package goka is a generated GoMock package.

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// CopartitioningStrategy is the rebalance strategy necessary to guarantee the copartitioning
	// when consuming multiple input topics with multiple processor instances.
	// This strategy tolerates different sets of topics per member of consumer group to allow
	// rolling upgrades of processors.
	//
	// Note that the topic inconcistency needs to be only temporarily, otherwise not all topic partitions will be consumed as in
	// the following example:
	// Assume having topics X and Y, each with partitions [0,1,2]
	// MemberA requests topic X
	// MemberB requests topics X and Y, because topic Y was newly added to the processor.
	//
	// Then the strategy will plan as follows:
	// MemberA: X: [0,1]
	// MemberB: X: [2], Y:[2]
	//
	// That means partitions [0,1] from topic Y are not being consumed.
	// So the assumption is that memberA will be redeployed so that after a second rebalance
	// both members consume both topics and all partitions.
	//
	// If you do not use rolling upgrades, i.e. replace all members of a group simultaneously, it is
	// safe to use the StrictCopartitioningStrategy
	CopartitioningStrategy = new(copartitioningStrategy)

	// StrictCopartitioningStrategy behaves like the copartitioning strategy but it will fail if two members of a consumer group
	// request a different set of topics, which might indicate a bug or a reused consumer group name.
	StrictCopartitioningStrategy = &copartitioningStrategy{
		failOnInconsistentTopics: true,
	}
)
View Source
var (
	// ErrEmitterAlreadyClosed is returned when Emit is called after the emitter has been finished.
	ErrEmitterAlreadyClosed error = errors.New("emitter already closed")
)
View Source
var ErrVisitAborted = errors.New("VisitAll aborted due to context cancel or rebalance")

ErrVisitAborted indicates a call to VisitAll could not finish due to rebalance or processor shutdown

Functions

func Debug

func Debug(gokaDebug, saramaDebug bool)

Debug enables or disables debug logging using the global logger. The goka debugging setting is applied to any custom loggers in goka components (Processors, Views, Emitters).

func DefaultConfig

func DefaultConfig() *sarama.Config

DefaultConfig creates a new config used by goka per default Use it to modify and pass to `goka.ReplaceGlobalConifg(...)` to modify goka's global config

func DefaultConsumerGroupBuilder

func DefaultConsumerGroupBuilder(brokers []string, group, clientID string) (sarama.ConsumerGroup, error)

DefaultConsumerGroupBuilder creates a Kafka consumer using the Sarama library.

func DefaultHasher

func DefaultHasher() func() hash.Hash32

DefaultHasher returns an FNV hasher builder to assign keys to partitions.

func DefaultProcessorStoragePath

func DefaultProcessorStoragePath(group Group) string

DefaultProcessorStoragePath is the default path where processor state will be stored.

func DefaultRebalance

func DefaultRebalance(a Assignment)

DefaultRebalance is the default callback when a new partition assignment is received. DefaultRebalance can be used in the function passed to WithRebalanceCallback.

func DefaultSaramaConsumerBuilder

func DefaultSaramaConsumerBuilder(brokers []string, clientID string) (sarama.Consumer, error)

DefaultSaramaConsumerBuilder creates a Kafka consumer using the Sarama library.

func DefaultUpdate

func DefaultUpdate(ctx UpdateContext, s storage.Storage, key string, value []byte) error

DefaultUpdate is the default callback used to update the local storage with from the table topic in Kafka. It is called for every message received during recovery of processors and during the normal operation of views. DefaultUpdate can be used in the function passed to WithUpdateCallback and WithViewCallback.

func DefaultViewStoragePath

func DefaultViewStoragePath() string

DefaultViewStoragePath returns the default path where view state will be stored.

func NewMockController

func NewMockController(t gomock.TestReporter) *gomock.Controller

NewMockController returns a *gomock.Controller using a wrapped testing.T (or whatever) which panics on a Fatalf. This is necessary when using a mock in kafkamock. Otherwise it will freeze on an unexpected call.

func NewPromiseWithFinisher

func NewPromiseWithFinisher() (*Promise, PromiseFinisher)

NewPromiseWithFinisher creates a new Promise and a separate finish method. This is necessary if the promise is used outside of goka package.

func ReplaceGlobalConfig

func ReplaceGlobalConfig(config *sarama.Config)

ReplaceGlobalConfig registeres a standard config used during building if no other config is specified

func ResetSuffixes

func ResetSuffixes()

ResetSuffixes reset both `loopSuffix` and `tableSuffix` to their default value. This function is helpful when there are multiple testcases, so you can do clean-up any change for suffixes.

func SetLoopSuffix

func SetLoopSuffix(suffix string)

SetLoopSuffix changes `loopSuffix` which is a suffix for loop topic of group. Use it to modify loop topic's suffix to otherwise in case you cannot use the default suffix.

func SetSaramaLogger

func SetSaramaLogger(logger Logger)

func SetTableSuffix

func SetTableSuffix(suffix string)

SetTableSuffix changes `tableSuffix` which is a suffix for table topic. Use it to modify table's suffix to otherwise in case you cannot use the default suffix.

Types

type Assignment

type Assignment map[int32]int64

Assignment represents a partition:offset assignment for the current connection

type Backoff

type Backoff interface {
	Duration() time.Duration
	Reset()
}

Backoff is used for adding backoff capabilities to the restarting of failing partition tables.

func DefaultBackoffBuilder

func DefaultBackoffBuilder() (Backoff, error)

DefaultBackoffBuilder returnes a simpleBackoff with 10 seconds step increase and 2 minutes max wait

func NewSimpleBackoff

func NewSimpleBackoff(step time.Duration, max time.Duration) Backoff

NewSimpleBackoff returns a simple backoff waiting the specified duration longer each iteration until reset.

type BackoffBuilder

type BackoffBuilder func() (Backoff, error)

BackoffBuilder creates a backoff

type Broker

type Broker interface {
	Addr() string
	Connected() (bool, error)
	CreateTopics(request *sarama.CreateTopicsRequest) (*sarama.CreateTopicsResponse, error)
	Open(conf *sarama.Config) error
}

Broker is an interface for the sarama broker

type Codec

type Codec interface {
	Encode(value interface{}) (data []byte, err error)
	Decode(data []byte) (value interface{}, err error)
}

Codec decodes and encodes from and to []byte

type ConsumerGroupBuilder

type ConsumerGroupBuilder func(brokers []string, group, clientID string) (sarama.ConsumerGroup, error)

ConsumerGroupBuilder creates a `sarama.ConsumerGroup`

func ConsumerGroupBuilderWithConfig

func ConsumerGroupBuilderWithConfig(config *sarama.Config) ConsumerGroupBuilder

ConsumerGroupBuilderWithConfig creates a sarama consumergroup using passed config

type Context

type Context interface {
	// Topic returns the topic of input message.
	Topic() Stream

	// Key returns the key of the input message.
	Key() string

	// Partition returns the partition of the input message.
	Partition() int32

	// Offset returns the offset of the input message.
	Offset() int64

	// Group returns the group of the input message
	Group() Group

	// Value returns the value of the key in the group table.
	//
	// This method might panic to initiate an immediate shutdown of the processor
	// to maintain data integrity. Do not recover from that panic or
	// the processor might deadlock.
	Value() interface{}
	ValueForKey(key string) interface{}

	// Headers returns the headers of the input message
	Headers() Headers

	// SetValue updates the value of the key in the group table.
	// It stores the value in the local cache and sends the
	// update to the Kafka topic representing the group table.
	//
	// This method might panic to initiate an immediate shutdown of the processor
	// to maintain data integrity. Do not recover from that panic or
	// the processor might deadlock.
	SetValue(value interface{}, options ...ContextOption)
	SetValueForKey(key string, value interface{})

	// Delete deletes a value from the group table. IMPORTANT: this deletes the
	// value associated with the key from both the local cache and the persisted
	// table in Kafka.
	//
	// This method might panic to initiate an immediate shutdown of the processor
	// to maintain data integrity. Do not recover from that panic or
	// the processor might deadlock.
	Delete(options ...ContextOption)

	// Timestamp returns the timestamp of the input message. If the timestamp is
	// invalid, a zero time will be returned.
	Timestamp() time.Time

	// Join returns the value of key in the copartitioned table.
	//
	// This method might panic to initiate an immediate shutdown of the processor
	// to maintain data integrity. Do not recover from that panic or
	// the processor might deadlock.
	Join(topic Table) interface{}

	// Lookup returns the value of key in the view of table.
	//
	// This method might panic to initiate an immediate shutdown of the processor
	// to maintain data integrity. Do not recover from that panic or
	// the processor might deadlock.
	Lookup(topic Table, key string) interface{}

	// Emit asynchronously writes a message into a topic.
	//
	// This method might panic to initiate an immediate shutdown of the processor
	// to maintain data integrity. Do not recover from that panic or
	// the processor might deadlock.
	Emit(topic Stream, key string, value interface{}, options ...ContextOption)

	// Loopback asynchronously sends a message to another key of the group
	// table. Value passed to loopback is encoded via the codec given in the
	// Loop subscription.
	//
	// This method might panic to initiate an immediate shutdown of the processor
	// to maintain data integrity. Do not recover from that panic or
	// the processor might deadlock.
	Loopback(key string, value interface{}, options ...ContextOption)

	// Fail stops execution and shuts down the processor
	// The callback is stopped immediately by panicking. Do not recover from that panic or
	// the processor might deadlock.
	Fail(err error)

	// Context returns the underlying context used to start the processor or a
	// subcontext. Returned context.Context can safely be passed to asynchronous code and goroutines.
	Context() context.Context

	// DeferCommit makes the callback omit the final commit when the callback returns.
	// It returns a function that *must* be called eventually to mark the message processing as finished.
	// If the function is not called, the processor might reprocess the message in future.
	// Note when calling DeferCommit multiple times, all returned functions must be called.
	// *Important*: the context where `DeferCommit` is called, is only safe to use within this callback,
	// never pass it into asynchronous code or goroutines.
	DeferCommit() func(error)
}

Context provides access to the processor's table and emit capabilities to arbitrary topics in kafka. Upon arrival of a message from subscribed topics, the respective ConsumeCallback is invoked with a context object along with the input message. The context is only valid within the callback, do not store it or pass it to other goroutines.

Error handling

Most methods of the context can fail due to different reasons, which are handled in different ways: Synchronous errors like * wrong codec for topic (a message cannot be marshalled or unmarshalled) * Emit to a topic without the Output definition in the group graph * Value/SetValue without defining Persist in the group graph * Join/Lookup without the definition in the group graph etc.. will result in a panic to stop the callback immediately and shutdown the processor. This is necessary to preserve integrity of the processor and avoid further actions. Do not recover from that panic, otherwise the goroutine will deadlock.

Retrying synchronous errors must be implemented by restarting the processor. If errors must be tolerated (which is not advisable because they're usually persistent), provide fail-tolerant versions of the producer, storage or codec as needed.

Asynchronous errors can occur when the callback has been finished, but e.g. sending a batched message to kafka fails due to connection errors or leader election in the cluster. Those errors still shutdown the processor but will not result in a panic in the callback.

type ContextOption

type ContextOption func(*ctxOptions)

ContextOption defines a configuration option to be used when performing operations on a context

func WithCtxEmitHeaders

func WithCtxEmitHeaders(headers Headers) ContextOption

WithCtxEmitHeaders sets kafka headers to use when emitting to kafka

type DefaultUpdateContext

type DefaultUpdateContext struct {
	// contains filtered or unexported fields
}

DefaultUpdateContext implements the UpdateContext interface.

func (DefaultUpdateContext) Headers

func (ctx DefaultUpdateContext) Headers() Headers

Headers returns the headers of the input message.

func (DefaultUpdateContext) Offset

func (ctx DefaultUpdateContext) Offset() int64

Offset returns the offset of the input message.

func (DefaultUpdateContext) Partition

func (ctx DefaultUpdateContext) Partition() int32

Partition returns the partition of the input message.

func (DefaultUpdateContext) Topic

func (ctx DefaultUpdateContext) Topic() Stream

Topic returns the topic of input message.

type Edge

type Edge interface {
	String() string
	Topic() string
	Codec() Codec
}

Edge represents a topic in Kafka and the corresponding codec to encode and decode the messages of that topic.

func Input

func Input(topic Stream, c Codec, cb ProcessCallback) Edge

Input represents an edge of an input stream topic. The edge specifies the topic name, its codec and the ProcessorCallback used to process it. The topic has to be copartitioned with any other input stream of the group and with the group table. The group starts reading the topic from the newest offset.

func Inputs

func Inputs(topics Streams, c Codec, cb ProcessCallback) Edge

Inputs creates edges of multiple input streams sharing the same codec and callback.

func Join

func Join(topic Table, c Codec) Edge

Join represents an edge of a copartitioned, log-compacted table topic. The edge specifies the topic name and the codec of the messages of the topic. The group starts reading the topic from the oldest offset. The processing of input streams is blocked until all partitions of the table are recovered.

func Lookup

func Lookup(topic Table, c Codec) Edge

Lookup represents an edge of a non-copartitioned, log-compacted table topic. The edge specifies the topic name and the codec of the messages of the topic. The group starts reading the topic from the oldest offset. The processing of input streams is blocked until the table is fully recovered.

func Loop

func Loop(c Codec, cb ProcessCallback) Edge

Loop represents the edge of the loopback topic of the group. The edge specifies the codec of the messages in the topic and ProcesCallback to process the messages of the topic. Context.Loopback() is used to write messages into this topic from any callback of the group.

func Output

func Output(topic Stream, c Codec) Edge

Output represents an edge of an output stream topic. The edge specifies the topic name and the codec of the messages of the topic. Context.Emit() only emits messages into Output edges defined in the group graph. The topic does not have to be copartitioned with the input streams.

func Persist

func Persist(c Codec) Edge

Persist represents the edge of the group table, which is log-compacted and copartitioned with the input streams. Without Persist, calls to ctx.Value or ctx.SetValue in the consume callback will fail and lead to shutdown of the processor.

This edge specifies the codec of the messages in the topic, ie, the codec of the values of the table. The processing of input streams is blocked until all partitions of the group table are recovered.

The topic name is derived from the group name by appending "-table".

func Visitor

func Visitor(name string, cb ProcessCallback) Edge

Visitor adds a visitor edge to the processor. This allows to iterate over the whole processor state while running. Note that this can block rebalance or processor shutdown. EXPERIMENTAL! This feature is not fully tested and might trigger unknown bugs. Be careful!

type Edges

type Edges []Edge

Edges is a slice of edge objects.

func (Edges) Topics

func (e Edges) Topics() []string

Topics returns the names of the topics of the edges.

type Emitter

type Emitter struct {
	// contains filtered or unexported fields
}

Emitter emits messages into a specific Kafka topic, first encoding the message with the given codec.

func NewEmitter

func NewEmitter(brokers []string, topic Stream, codec Codec, options ...EmitterOption) (*Emitter, error)

NewEmitter creates a new emitter using passed brokers, topic, codec and possibly options.

func (*Emitter) Emit

func (e *Emitter) Emit(key string, msg interface{}) (*Promise, error)

Emit sends a message for passed key using the emitter's codec.

func (*Emitter) EmitSync

func (e *Emitter) EmitSync(key string, msg interface{}) error

EmitSync sends a message to passed topic and key.

func (*Emitter) EmitSyncWithHeaders

func (e *Emitter) EmitSyncWithHeaders(key string, msg interface{}, headers Headers) error

EmitSyncWithHeaders sends a message with the given headers to passed topic and key.

func (*Emitter) EmitWithHeaders

func (e *Emitter) EmitWithHeaders(key string, msg interface{}, headers Headers) (*Promise, error)

EmitWithHeaders sends a message with the given headers for the passed key using the emitter's codec.

func (*Emitter) Finish

func (e *Emitter) Finish() error

Finish waits until the emitter is finished producing all pending messages.

type EmitterOption

type EmitterOption func(*eoptions, Stream, Codec)

EmitterOption defines a configuration option to be used when creating an emitter.

func WithEmitterClientID

func WithEmitterClientID(clientID string) EmitterOption

WithEmitterClientID defines the client ID used to identify with kafka.

func WithEmitterDefaultHeaders

func WithEmitterDefaultHeaders(headers Headers) EmitterOption

WithEmitterDefaultHeaders configures the emitter with default headers which are included with every emit.

func WithEmitterHasher

func WithEmitterHasher(hasher func() hash.Hash32) EmitterOption

WithEmitterHasher sets the hash function that assigns keys to partitions.

func WithEmitterLogger

func WithEmitterLogger(l Logger) EmitterOption

WithEmitterLogger sets the logger the emitter should use. By default, emitters use the standard library logger.

func WithEmitterProducerBuilder

func WithEmitterProducerBuilder(pb ProducerBuilder) EmitterOption

WithEmitterProducerBuilder replaces the default producer builder.

func WithEmitterTester

func WithEmitterTester(t Tester) EmitterOption

WithEmitterTester configures the emitter to use passed tester. This is used for component tests

func WithEmitterTopicManagerBuilder

func WithEmitterTopicManagerBuilder(tmb TopicManagerBuilder) EmitterOption

WithEmitterTopicManagerBuilder replaces the default topic manager builder.

type Getter

type Getter func(string) (interface{}, error)

Getter functions return a value for a key or an error. If no value exists for the key, nil is returned without errors.

type Group

type Group string

Group is the name of a consumer group in Kafka and represents a processor group in Goka. A processor group may have a group table and a group loopback stream. By default, the group table is named <group>-table and the loopback stream <group>-loop.

type GroupGraph

type GroupGraph struct {
	// contains filtered or unexported fields
}

GroupGraph is the specification of a processor group. It contains all input, output, and any other topic from which and into which the processor group may consume or produce events. Each of these links to Kafka is called Edge.

func DefineGroup

func DefineGroup(group Group, edges ...Edge) *GroupGraph

DefineGroup creates a group graph with a given group name and a list of edges.

func (*GroupGraph) AllEdges

func (gg *GroupGraph) AllEdges() Edges

AllEdges returns a list of all edges for the group graph. This allows to modify a graph by cloning it's edges into a new one.

var existing Graph
edges := existiting.AllEdges()
// modify edges as required
// recreate the modifiedg raph
newGraph := DefineGroup(existing.Groug(), edges...)

func (*GroupGraph) Group

func (gg *GroupGraph) Group() Group

Group returns the group name.

func (*GroupGraph) GroupTable

func (gg *GroupGraph) GroupTable() Edge

GroupTable returns the group table edge of the group.

func (*GroupGraph) InputStreams

func (gg *GroupGraph) InputStreams() Edges

InputStreams returns all input stream edges of the group.

func (*GroupGraph) JointTables

func (gg *GroupGraph) JointTables() Edges

JointTables retuns all joint table edges of the group.

func (*GroupGraph) LookupTables

func (gg *GroupGraph) LookupTables() Edges

LookupTables retuns all lookup table edges of the group.

func (*GroupGraph) LoopStream

func (gg *GroupGraph) LoopStream() Edge

LoopStream returns the loopback edge of the group.

func (*GroupGraph) OutputStreams

func (gg *GroupGraph) OutputStreams() Edges

OutputStreams returns the output stream edges of the group.

func (*GroupGraph) Validate

func (gg *GroupGraph) Validate() error

Validate validates the group graph and returns an error if invalid. Main validation checks are: - at most one loopback stream edge is allowed - at most one group table edge is allowed - at least one input stream is required - table and loopback topics cannot be used in any other edge.

type Headers

type Headers map[string][]byte

Headers represents custom message headers with a convenient interface.

func HeadersFromSarama

func HeadersFromSarama(saramaHeaders []*sarama.RecordHeader) Headers

HeadersFromSarama converts sarama headers to goka's type.

func (Headers) Merged

func (h Headers) Merged(headersList ...Headers) Headers

Merged returns a new instance with all headers merged. Later keys override earlier ones. Handles a nil receiver and nil arguments without panics. If all headers are empty, nil is returned to allow using directly in emit functions.

func (Headers) ToSarama

func (h Headers) ToSarama() []sarama.RecordHeader

ToSarama converts the headers to a slice of sarama.RecordHeader. If called on a nil receiver returns nil.

func (Headers) ToSaramaPtr

func (h Headers) ToSaramaPtr() []*sarama.RecordHeader

ToSaramaPtr converts the headers to a slice of pointers to sarama.RecordHeader. If called on a nil receiver returns nil.

type InputStats

type InputStats struct {
	Count      uint
	Bytes      int
	OffsetLag  int64
	LastOffset int64
	Delay      time.Duration
}

InputStats represents the number of messages and the number of bytes consumed from a stream or table topic since the process started.

type Iterator

type Iterator interface {
	// Next advances the iterator to the next KV-pair. Err should be called
	// after Next returns false to check whether the iteration finished
	// from exhaustion or was aborted due to an error.
	Next() bool
	// Err returns the error that stopped the iteration if any.
	Err() error
	// Return the key of the current item
	Key() string
	// Return the value of the current item
	// This value is already decoded with the view's codec (or nil, if it's nil)
	Value() (interface{}, error)
	// Release the iterator. After release, the iterator is not usable anymore
	Release()
	// Seek moves the iterator to the begining of a key-value pair sequence that
	// is greater or equal to the given key. It returns whether at least one of
	// such key-value pairs exist. If true is returned, Key/Value must be called
	// immediately to get the first item. Calling Next immediately after a successful
	// seek will effectively skip an item in the iterator.
	Seek(key string) bool
}

Iterator allows one to iterate over the keys of a view.

type Logger

type Logger interface {
	// Print will simply print the params
	Print(...interface{})

	// Print will simply print the params
	Println(...interface{})

	// Printf will be used for informational messages. These can be thought of
	// having an 'Info'-level in a structured logger.
	Printf(string, ...interface{})
}

Logger is the interface Goka and its subpackages use for logging.

func DefaultLogger

func DefaultLogger() Logger

Default returns the standard library logger

type MockAutoConsumer

type MockAutoConsumer struct {
	// contains filtered or unexported fields
}

MockAutoConsumer implements sarama's Consumer interface for testing purposes. Before you can start consuming from this consumer, you have to register topic/partitions using ExpectConsumePartition, and set expectations on them.

func NewMockAutoConsumer

func NewMockAutoConsumer(t *testing.T, config *sarama.Config) *MockAutoConsumer

NewMockAutoConsumer returns a new mock Consumer instance. The t argument should be the *testing.T instance of your test method. An error will be written to it if an expectation is violated. The config argument can be set to nil.

func (*MockAutoConsumer) Close

func (c *MockAutoConsumer) Close() error

Close implements the Close method from the sarama.Consumer interface. It will close all registered PartitionConsumer instances.

func (*MockAutoConsumer) ConsumePartition

func (c *MockAutoConsumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error)

ConsumePartition implements the ConsumePartition method from the sarama.Consumer interface. Before you can start consuming a partition, you have to set expectations on it using ExpectConsumePartition. You can only consume a partition once per consumer.

func (*MockAutoConsumer) ExpectConsumePartition

func (c *MockAutoConsumer) ExpectConsumePartition(topic string, partition int32, offset int64) *MockAutoPartitionConsumer

ExpectConsumePartition will register a topic/partition, so you can set expectations on it. The registered PartitionConsumer will be returned, so you can set expectations on it using method chaining. Once a topic/partition is registered, you are expected to start consuming it using ConsumePartition. If that doesn't happen, an error will be written to the error reporter once the mock consumer is closed. It will also expect that the

func (*MockAutoConsumer) HighWaterMarks

func (c *MockAutoConsumer) HighWaterMarks() map[string]map[int32]int64

HighWaterMarks returns a map of high watermarks for each topic/partition

func (*MockAutoConsumer) Partitions

func (c *MockAutoConsumer) Partitions(topic string) ([]int32, error)

Partitions returns the list of partitions for the given topic, as registered with SetTopicMetadata

func (*MockAutoConsumer) Pause

func (c *MockAutoConsumer) Pause(topicPartitions map[string][]int32)

func (*MockAutoConsumer) PauseAll

func (c *MockAutoConsumer) PauseAll()

func (*MockAutoConsumer) Resume

func (c *MockAutoConsumer) Resume(topicPartitions map[string][]int32)

func (*MockAutoConsumer) ResumeAll

func (c *MockAutoConsumer) ResumeAll()

func (*MockAutoConsumer) SetTopicMetadata

func (c *MockAutoConsumer) SetTopicMetadata(metadata map[string][]int32)

SetTopicMetadata sets the clusters topic/partition metadata, which will be returned by Topics() and Partitions().

func (*MockAutoConsumer) Topics

func (c *MockAutoConsumer) Topics() ([]string, error)

Topics returns a list of topics, as registered with SetTopicMetadata

type MockAutoPartitionConsumer

type MockAutoPartitionConsumer struct {
	// contains filtered or unexported fields
}

MockAutoPartitionConsumer implements sarama's PartitionConsumer interface for testing purposes. It is returned by the mock Consumers ConsumePartitionMethod, but only if it is registered first using the Consumer's ExpectConsumePartition method. Before consuming the Errors and Messages channel, you should specify what values will be provided on these channels using YieldMessage and YieldError.

func (*MockAutoPartitionConsumer) AsyncClose

func (pc *MockAutoPartitionConsumer) AsyncClose()

AsyncClose implements the AsyncClose method from the sarama.PartitionConsumer interface.

func (*MockAutoPartitionConsumer) Close

func (pc *MockAutoPartitionConsumer) Close() error

Close implements the Close method from the sarama.PartitionConsumer interface. It will verify whether the partition consumer was actually started.

func (*MockAutoPartitionConsumer) Errors

func (pc *MockAutoPartitionConsumer) Errors() <-chan *sarama.ConsumerError

Errors implements the Errors method from the sarama.PartitionConsumer interface.

func (*MockAutoPartitionConsumer) ExpectErrorsDrainedOnClose

func (pc *MockAutoPartitionConsumer) ExpectErrorsDrainedOnClose()

ExpectErrorsDrainedOnClose sets an expectation on the partition consumer that the errors channel will be fully drained when Close is called. If this expectation is not met, an error is reported to the error reporter.

func (*MockAutoPartitionConsumer) ExpectMessagesDrainedOnClose

func (pc *MockAutoPartitionConsumer) ExpectMessagesDrainedOnClose()

ExpectMessagesDrainedOnClose sets an expectation on the partition consumer that the messages channel will be fully drained when Close is called. If this expectation is not met, an error is reported to the error reporter.

func (*MockAutoPartitionConsumer) HighWaterMarkOffset

func (pc *MockAutoPartitionConsumer) HighWaterMarkOffset() int64

HighWaterMarkOffset returns the highwatermark for the partition

func (*MockAutoPartitionConsumer) IsPaused

func (pc *MockAutoPartitionConsumer) IsPaused() bool

IsPaused indicates if this partition consumer is paused or not

func (*MockAutoPartitionConsumer) Messages

func (pc *MockAutoPartitionConsumer) Messages() <-chan *sarama.ConsumerMessage

Messages implements the Messages method from the sarama.PartitionConsumer interface.

func (*MockAutoPartitionConsumer) Pause

func (pc *MockAutoPartitionConsumer) Pause()

Pause suspends fetching from this partition. Future calls to the broker will not return any records from these partition until it have been resumed using Resume(). Note that this method does not affect partition subscription. In particular, it does not cause a group rebalance when automatic assignment is used.

func (*MockAutoPartitionConsumer) Resume

func (pc *MockAutoPartitionConsumer) Resume()

Resume resumes this partition which have been paused with Pause(). New calls to the broker will return records from these partitions if there are any to be fetched. If the partition was not previously paused, this method is a no-op.

func (*MockAutoPartitionConsumer) YieldError

func (pc *MockAutoPartitionConsumer) YieldError(err error)

YieldError will yield an error on the Errors channel of this partition consumer when it is consumed. By default, the mock consumer will not verify whether this error was consumed from the Errors channel, because there are legitimate reasons for this not to happen. You can call ExpectErrorsDrainedOnClose so it will verify that the channel is empty on close.

func (*MockAutoPartitionConsumer) YieldMessage

func (pc *MockAutoPartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage)

YieldMessage will yield a messages Messages channel of this partition consumer when it is consumed. By default, the mock consumer will not verify whether this message was consumed from the Messages channel, because there are legitimate reasons forthis not to happen. ou can call ExpectMessagesDrainedOnClose so it will verify that the channel is empty on close.

type MockBroker

type MockBroker struct {
	// contains filtered or unexported fields
}

MockBroker is a mock of Broker interface

func NewMockBroker

func NewMockBroker(ctrl *gomock.Controller) *MockBroker

NewMockBroker creates a new mock instance

func (*MockBroker) Addr

func (m *MockBroker) Addr() string

Addr mocks base method

func (*MockBroker) Connected

func (m *MockBroker) Connected() (bool, error)

Connected mocks base method

func (*MockBroker) CreateTopics

CreateTopics mocks base method

func (*MockBroker) EXPECT

func (m *MockBroker) EXPECT() *MockBrokerMockRecorder

EXPECT returns an object that allows the caller to indicate expected use

func (*MockBroker) Open

func (m *MockBroker) Open(arg0 *sarama.Config) error

Open mocks base method

type MockBrokerMockRecorder

type MockBrokerMockRecorder struct {
	// contains filtered or unexported fields
}

MockBrokerMockRecorder is the mock recorder for MockBroker

func (*MockBrokerMockRecorder) Addr

func (mr *MockBrokerMockRecorder) Addr() *gomock.Call

Addr indicates an expected call of Addr

func (*MockBrokerMockRecorder) Connected

func (mr *MockBrokerMockRecorder) Connected() *gomock.Call

Connected indicates an expected call of Connected

func (*MockBrokerMockRecorder) CreateTopics

func (mr *MockBrokerMockRecorder) CreateTopics(arg0 interface{}) *gomock.Call

CreateTopics indicates an expected call of CreateTopics

func (*MockBrokerMockRecorder) Open

func (mr *MockBrokerMockRecorder) Open(arg0 interface{}) *gomock.Call

Open indicates an expected call of Open

type MockClient

type MockClient struct {
	// contains filtered or unexported fields
}

MockClient is a mock of Client interface

func NewMockClient

func NewMockClient(ctrl *gomock.Controller) *MockClient

NewMockClient creates a new mock instance

func (*MockClient) Broker

func (m *MockClient) Broker(arg0 int32) (*sarama.Broker, error)

Broker mocks base method

func (*MockClient) Brokers

func (m *MockClient) Brokers() []*sarama.Broker

Brokers mocks base method

func (*MockClient) Close

func (m *MockClient) Close() error

Close mocks base method

func (*MockClient) Closed

func (m *MockClient) Closed() bool

Closed mocks base method

func (*MockClient) Config

func (m *MockClient) Config() *sarama.Config

Config mocks base method

func (*MockClient) Controller

func (m *MockClient) Controller() (*sarama.Broker, error)

Controller mocks base method

func (*MockClient) Coordinator

func (m *MockClient) Coordinator(arg0 string) (*sarama.Broker, error)

Coordinator mocks base method

func (*MockClient) EXPECT

func (m *MockClient) EXPECT() *MockClientMockRecorder

EXPECT returns an object that allows the caller to indicate expected use

func (*MockClient) GetOffset

func (m *MockClient) GetOffset(arg0 string, arg1 int32, arg2 int64) (int64, error)

GetOffset mocks base method

func (*MockClient) InSyncReplicas

func (m *MockClient) InSyncReplicas(arg0 string, arg1 int32) ([]int32, error)

InSyncReplicas mocks base method

func (*MockClient) InitProducerID

func (m *MockClient) InitProducerID() (*sarama.InitProducerIDResponse, error)

InitProducerID mocks base method

func (*MockClient) Leader

func (m *MockClient) Leader(arg0 string, arg1 int32) (*sarama.Broker, error)

Leader mocks base method

func (*MockClient) OfflineReplicas

func (m *MockClient) OfflineReplicas(arg0 string, arg1 int32) ([]int32, error)

OfflineReplicas mocks base method

func (*MockClient) Partitions

func (m *MockClient) Partitions(arg0 string) ([]int32, error)

Partitions mocks base method

func (*MockClient) RefreshBrokers

func (m *MockClient) RefreshBrokers(arg0 []string) error

RefreshBrokers mocks base method

func (*MockClient) RefreshController

func (m *MockClient) RefreshController() (*sarama.Broker, error)

RefreshController mocks base method

func (*MockClient) RefreshCoordinator

func (m *MockClient) RefreshCoordinator(arg0 string) error

RefreshCoordinator mocks base method

func (*MockClient) RefreshMetadata

func (m *MockClient) RefreshMetadata(arg0 ...string) error

RefreshMetadata mocks base method

func (*MockClient) Replicas

func (m *MockClient) Replicas(arg0 string, arg1 int32) ([]int32, error)

Replicas mocks base method

func (*MockClient) Topics

func (m *MockClient) Topics() ([]string, error)

Topics mocks base method

func (*MockClient) WritablePartitions

func (m *MockClient) WritablePartitions(arg0 string) ([]int32, error)

WritablePartitions mocks base method

type MockClientMockRecorder

type MockClientMockRecorder struct {
	// contains filtered or unexported fields
}

MockClientMockRecorder is the mock recorder for MockClient

func (*MockClientMockRecorder) Broker

func (mr *MockClientMockRecorder) Broker(arg0 interface{}) *gomock.Call

Broker indicates an expected call of Broker

func (*MockClientMockRecorder) Brokers

func (mr *MockClientMockRecorder) Brokers() *gomock.Call

Brokers indicates an expected call of Brokers

func (*MockClientMockRecorder) Close

func (mr *MockClientMockRecorder) Close() *gomock.Call

Close indicates an expected call of Close

func (*MockClientMockRecorder) Closed

func (mr *MockClientMockRecorder) Closed() *gomock.Call

Closed indicates an expected call of Closed

func (*MockClientMockRecorder) Config

func (mr *MockClientMockRecorder) Config() *gomock.Call

Config indicates an expected call of Config

func (*MockClientMockRecorder) Controller

func (mr *MockClientMockRecorder) Controller() *gomock.Call

Controller indicates an expected call of Controller

func (*MockClientMockRecorder) Coordinator

func (mr *MockClientMockRecorder) Coordinator(arg0 interface{}) *gomock.Call

Coordinator indicates an expected call of Coordinator

func (*MockClientMockRecorder) GetOffset

func (mr *MockClientMockRecorder) GetOffset(arg0, arg1, arg2 interface{}) *gomock.Call

GetOffset indicates an expected call of GetOffset

func (*MockClientMockRecorder) InSyncReplicas

func (mr *MockClientMockRecorder) InSyncReplicas(arg0, arg1 interface{}) *gomock.Call

InSyncReplicas indicates an expected call of InSyncReplicas

func (*MockClientMockRecorder) InitProducerID

func (mr *MockClientMockRecorder) InitProducerID() *gomock.Call

InitProducerID indicates an expected call of InitProducerID

func (*MockClientMockRecorder) Leader

func (mr *MockClientMockRecorder) Leader(arg0, arg1 interface{}) *gomock.Call

Leader indicates an expected call of Leader

func (*MockClientMockRecorder) OfflineReplicas

func (mr *MockClientMockRecorder) OfflineReplicas(arg0, arg1 interface{}) *gomock.Call

OfflineReplicas indicates an expected call of OfflineReplicas

func (*MockClientMockRecorder) Partitions

func (mr *MockClientMockRecorder) Partitions(arg0 interface{}) *gomock.Call

Partitions indicates an expected call of Partitions

func (*MockClientMockRecorder) RefreshBrokers

func (mr *MockClientMockRecorder) RefreshBrokers(arg0 interface{}) *gomock.Call

RefreshBrokers indicates an expected call of RefreshBrokers

func (*MockClientMockRecorder) RefreshController

func (mr *MockClientMockRecorder) RefreshController() *gomock.Call

RefreshController indicates an expected call of RefreshController

func (*MockClientMockRecorder) RefreshCoordinator

func (mr *MockClientMockRecorder) RefreshCoordinator(arg0 interface{}) *gomock.Call

RefreshCoordinator indicates an expected call of RefreshCoordinator

func (*MockClientMockRecorder) RefreshMetadata

func (mr *MockClientMockRecorder) RefreshMetadata(arg0 ...interface{}) *gomock.Call

RefreshMetadata indicates an expected call of RefreshMetadata

func (*MockClientMockRecorder) Replicas

func (mr *MockClientMockRecorder) Replicas(arg0, arg1 interface{}) *gomock.Call

Replicas indicates an expected call of Replicas

func (*MockClientMockRecorder) Topics

func (mr *MockClientMockRecorder) Topics() *gomock.Call

Topics indicates an expected call of Topics

func (*MockClientMockRecorder) WritablePartitions

func (mr *MockClientMockRecorder) WritablePartitions(arg0 interface{}) *gomock.Call

WritablePartitions indicates an expected call of WritablePartitions

type MockClusterAdmin

type MockClusterAdmin struct {
	// contains filtered or unexported fields
}

MockClusterAdmin is a mock of ClusterAdmin interface

func NewMockClusterAdmin

func NewMockClusterAdmin(ctrl *gomock.Controller) *MockClusterAdmin

NewMockClusterAdmin creates a new mock instance

func (*MockClusterAdmin) AlterClientQuotas

func (m *MockClusterAdmin) AlterClientQuotas(arg0 []sarama.QuotaEntityComponent, arg1 sarama.ClientQuotasOp, arg2 bool) error

AlterClientQuotas mocks base method

func (*MockClusterAdmin) AlterConfig

func (m *MockClusterAdmin) AlterConfig(arg0 sarama.ConfigResourceType, arg1 string, arg2 map[string]*string, arg3 bool) error

AlterConfig mocks base method

func (*MockClusterAdmin) AlterPartitionReassignments

func (m *MockClusterAdmin) AlterPartitionReassignments(arg0 string, arg1 [][]int32) error

AlterPartitionReassignments mocks base method

func (*MockClusterAdmin) Close

func (m *MockClusterAdmin) Close() error

Close mocks base method

func (*MockClusterAdmin) Controller

func (m *MockClusterAdmin) Controller() (*sarama.Broker, error)

Controller mocks base method

func (*MockClusterAdmin) CreateACL

func (m *MockClusterAdmin) CreateACL(arg0 sarama.Resource, arg1 sarama.Acl) error

CreateACL mocks base method

func (*MockClusterAdmin) CreateACLs

func (m *MockClusterAdmin) CreateACLs(acls []*sarama.ResourceAcls) error

CreateACLs is a mock function creating access control lists (ACLs) which are bound to specific resources.

func (*MockClusterAdmin) CreatePartitions

func (m *MockClusterAdmin) CreatePartitions(arg0 string, arg1 int32, arg2 [][]int32, arg3 bool) error

CreatePartitions mocks base method

func (*MockClusterAdmin) CreateTopic

func (m *MockClusterAdmin) CreateTopic(arg0 string, arg1 *sarama.TopicDetail, arg2 bool) error

CreateTopic mocks base method

func (*MockClusterAdmin) DeleteACL

func (m *MockClusterAdmin) DeleteACL(arg0 sarama.AclFilter, arg1 bool) ([]sarama.MatchingAcl, error)

DeleteACL mocks base method

func (*MockClusterAdmin) DeleteConsumerGroup

func (m *MockClusterAdmin) DeleteConsumerGroup(arg0 string) error

DeleteConsumerGroup mocks base method

func (*MockClusterAdmin) DeleteConsumerGroupOffset

func (m *MockClusterAdmin) DeleteConsumerGroupOffset(arg0, arg1 string, arg2 int32) error

DeleteConsumerGroupOffset mocks base method

func (*MockClusterAdmin) DeleteRecords

func (m *MockClusterAdmin) DeleteRecords(arg0 string, arg1 map[int32]int64) error

DeleteRecords mocks base method

func (*MockClusterAdmin) DeleteTopic

func (m *MockClusterAdmin) DeleteTopic(arg0 string) error

DeleteTopic mocks base method

func (*MockClusterAdmin) DeleteUserScramCredentials

DeleteUserScramCredentials mocks base method

func (*MockClusterAdmin) DescribeClientQuotas

func (m *MockClusterAdmin) DescribeClientQuotas(arg0 []sarama.QuotaFilterComponent, arg1 bool) ([]sarama.DescribeClientQuotasEntry, error)

DescribeClientQuotas mocks base method

func (*MockClusterAdmin) DescribeCluster

func (m *MockClusterAdmin) DescribeCluster() ([]*sarama.Broker, int32, error)

DescribeCluster mocks base method

func (*MockClusterAdmin) DescribeConfig

func (m *MockClusterAdmin) DescribeConfig(arg0 sarama.ConfigResource) ([]sarama.ConfigEntry, error)

DescribeConfig mocks base method

func (*MockClusterAdmin) DescribeConsumerGroups

func (m *MockClusterAdmin) DescribeConsumerGroups(arg0 []string) ([]*sarama.GroupDescription, error)

DescribeConsumerGroups mocks base method

func (*MockClusterAdmin) DescribeLogDirs

func (m *MockClusterAdmin) DescribeLogDirs(arg0 []int32) (map[int32][]sarama.DescribeLogDirsResponseDirMetadata, error)

DescribeLogDirs mocks base method

func (*MockClusterAdmin) DescribeTopics

func (m *MockClusterAdmin) DescribeTopics(arg0 []string) ([]*sarama.TopicMetadata, error)

DescribeTopics mocks base method

func (*MockClusterAdmin) DescribeUserScramCredentials

func (m *MockClusterAdmin) DescribeUserScramCredentials(arg0 []string) ([]*sarama.DescribeUserScramCredentialsResult, error)

DescribeUserScramCredentials mocks base method

func (*MockClusterAdmin) EXPECT

EXPECT returns an object that allows the caller to indicate expected use

func (*MockClusterAdmin) IncrementalAlterConfig

func (m *MockClusterAdmin) IncrementalAlterConfig(arg0 sarama.ConfigResourceType, arg1 string, arg2 map[string]sarama.IncrementalAlterConfigsEntry, arg3 bool) error

IncrementalAlterConfig mocks base method

func (*MockClusterAdmin) ListAcls

func (m *MockClusterAdmin) ListAcls(arg0 sarama.AclFilter) ([]sarama.ResourceAcls, error)

ListAcls mocks base method

func (*MockClusterAdmin) ListConsumerGroupOffsets

func (m *MockClusterAdmin) ListConsumerGroupOffsets(arg0 string, arg1 map[string][]int32) (*sarama.OffsetFetchResponse, error)

ListConsumerGroupOffsets mocks base method

func (*MockClusterAdmin) ListConsumerGroups

func (m *MockClusterAdmin) ListConsumerGroups() (map[string]string, error)

ListConsumerGroups mocks base method

func (*MockClusterAdmin) ListPartitionReassignments

func (m *MockClusterAdmin) ListPartitionReassignments(arg0 string, arg1 []int32) (map[string]map[int32]*sarama.PartitionReplicaReassignmentsStatus, error)

ListPartitionReassignments mocks base method

func (*MockClusterAdmin) ListTopics

func (m *MockClusterAdmin) ListTopics() (map[string]sarama.TopicDetail, error)

ListTopics mocks base method

func (*MockClusterAdmin) UpsertUserScramCredentials

UpsertUserScramCredentials mocks base method

type MockClusterAdminMockRecorder

type MockClusterAdminMockRecorder struct {
	// contains filtered or unexported fields
}

MockClusterAdminMockRecorder is the mock recorder for MockClusterAdmin

func (*MockClusterAdminMockRecorder) AlterClientQuotas

func (mr *MockClusterAdminMockRecorder) AlterClientQuotas(arg0, arg1, arg2 interface{}) *gomock.Call

AlterClientQuotas indicates an expected call of AlterClientQuotas

func (*MockClusterAdminMockRecorder) AlterConfig

func (mr *MockClusterAdminMockRecorder) AlterConfig(arg0, arg1, arg2, arg3 interface{}) *gomock.Call

AlterConfig indicates an expected call of AlterConfig

func (*MockClusterAdminMockRecorder) AlterPartitionReassignments

func (mr *MockClusterAdminMockRecorder) AlterPartitionReassignments(arg0, arg1 interface{}) *gomock.Call

AlterPartitionReassignments indicates an expected call of AlterPartitionReassignments

func (*MockClusterAdminMockRecorder) Close

Close indicates an expected call of Close

func (*MockClusterAdminMockRecorder) Controller

func (mr *MockClusterAdminMockRecorder) Controller() *gomock.Call

Controller indicates an expected call of Controller

func (*MockClusterAdminMockRecorder) CreateACL

func (mr *MockClusterAdminMockRecorder) CreateACL(arg0, arg1 interface{}) *gomock.Call

CreateACL indicates an expected call of CreateACL

func (*MockClusterAdminMockRecorder) CreatePartitions

func (mr *MockClusterAdminMockRecorder) CreatePartitions(arg0, arg1, arg2, arg3 interface{}) *gomock.Call

CreatePartitions indicates an expected call of CreatePartitions

func (*MockClusterAdminMockRecorder) CreateTopic

func (mr *MockClusterAdminMockRecorder) CreateTopic(arg0, arg1, arg2 interface{}) *gomock.Call

CreateTopic indicates an expected call of CreateTopic

func (*MockClusterAdminMockRecorder) DeleteACL

func (mr *MockClusterAdminMockRecorder) DeleteACL(arg0, arg1 interface{}) *gomock.Call

DeleteACL indicates an expected call of DeleteACL

func (*MockClusterAdminMockRecorder) DeleteConsumerGroup

func (mr *MockClusterAdminMockRecorder) DeleteConsumerGroup(arg0 interface{}) *gomock.Call

DeleteConsumerGroup indicates an expected call of DeleteConsumerGroup

func (*MockClusterAdminMockRecorder) DeleteConsumerGroupOffset

func (mr *MockClusterAdminMockRecorder) DeleteConsumerGroupOffset(arg0, arg1, arg2 interface{}) *gomock.Call

DeleteConsumerGroupOffset indicates an expected call of DeleteConsumerGroupOffset

func (*MockClusterAdminMockRecorder) DeleteRecords

func (mr *MockClusterAdminMockRecorder) DeleteRecords(arg0, arg1 interface{}) *gomock.Call

DeleteRecords indicates an expected call of DeleteRecords

func (*MockClusterAdminMockRecorder) DeleteTopic

func (mr *MockClusterAdminMockRecorder) DeleteTopic(arg0 interface{}) *gomock.Call

DeleteTopic indicates an expected call of DeleteTopic

func (*MockClusterAdminMockRecorder) DeleteUserScramCredentials

func (mr *MockClusterAdminMockRecorder) DeleteUserScramCredentials(arg0 interface{}) *gomock.Call

DeleteUserScramCredentials indicates an expected call of DeleteUserScramCredentials

func (*MockClusterAdminMockRecorder) DescribeClientQuotas

func (mr *MockClusterAdminMockRecorder) DescribeClientQuotas(arg0, arg1 interface{}) *gomock.Call

DescribeClientQuotas indicates an expected call of DescribeClientQuotas

func (*MockClusterAdminMockRecorder) DescribeCluster

func (mr *MockClusterAdminMockRecorder) DescribeCluster() *gomock.Call

DescribeCluster indicates an expected call of DescribeCluster

func (*MockClusterAdminMockRecorder) DescribeConfig

func (mr *MockClusterAdminMockRecorder) DescribeConfig(arg0 interface{}) *gomock.Call

DescribeConfig indicates an expected call of DescribeConfig

func (*MockClusterAdminMockRecorder) DescribeConsumerGroups

func (mr *MockClusterAdminMockRecorder) DescribeConsumerGroups(arg0 interface{}) *gomock.Call

DescribeConsumerGroups indicates an expected call of DescribeConsumerGroups

func (*MockClusterAdminMockRecorder) DescribeLogDirs

func (mr *MockClusterAdminMockRecorder) DescribeLogDirs(arg0 interface{}) *gomock.Call

DescribeLogDirs indicates an expected call of DescribeLogDirs

func (*MockClusterAdminMockRecorder) DescribeTopics

func (mr *MockClusterAdminMockRecorder) DescribeTopics(arg0 interface{}) *gomock.Call

DescribeTopics indicates an expected call of DescribeTopics

func (*MockClusterAdminMockRecorder) DescribeUserScramCredentials

func (mr *MockClusterAdminMockRecorder) DescribeUserScramCredentials(arg0 interface{}) *gomock.Call

DescribeUserScramCredentials indicates an expected call of DescribeUserScramCredentials

func (*MockClusterAdminMockRecorder) IncrementalAlterConfig

func (mr *MockClusterAdminMockRecorder) IncrementalAlterConfig(arg0, arg1, arg2, arg3 interface{}) *gomock.Call

IncrementalAlterConfig indicates an expected call of IncrementalAlterConfig

func (*MockClusterAdminMockRecorder) ListAcls

func (mr *MockClusterAdminMockRecorder) ListAcls(arg0 interface{}) *gomock.Call

ListAcls indicates an expected call of ListAcls

func (*MockClusterAdminMockRecorder) ListConsumerGroupOffsets

func (mr *MockClusterAdminMockRecorder) ListConsumerGroupOffsets(arg0, arg1 interface{}) *gomock.Call

ListConsumerGroupOffsets indicates an expected call of ListConsumerGroupOffsets

func (*MockClusterAdminMockRecorder) ListConsumerGroups

func (mr *MockClusterAdminMockRecorder) ListConsumerGroups() *gomock.Call

ListConsumerGroups indicates an expected call of ListConsumerGroups

func (*MockClusterAdminMockRecorder) ListPartitionReassignments

func (mr *MockClusterAdminMockRecorder) ListPartitionReassignments(arg0, arg1 interface{}) *gomock.Call

ListPartitionReassignments indicates an expected call of ListPartitionReassignments

func (*MockClusterAdminMockRecorder) ListTopics

func (mr *MockClusterAdminMockRecorder) ListTopics() *gomock.Call

ListTopics indicates an expected call of ListTopics

func (*MockClusterAdminMockRecorder) UpsertUserScramCredentials

func (mr *MockClusterAdminMockRecorder) UpsertUserScramCredentials(arg0 interface{}) *gomock.Call

UpsertUserScramCredentials indicates an expected call of UpsertUserScramCredentials

type MockConsumerGroup

type MockConsumerGroup struct {
	// contains filtered or unexported fields
}

MockConsumerGroup mocks the consumergroup

func NewMockConsumerGroup

func NewMockConsumerGroup(t *testing.T) *MockConsumerGroup

NewMockConsumerGroup creates a new consumer group

func (*MockConsumerGroup) Close

func (cg *MockConsumerGroup) Close() error

Close closes the consumergroup

func (*MockConsumerGroup) Consume

func (cg *MockConsumerGroup) Consume(ctx context.Context, topics []string, handler sarama.ConsumerGroupHandler) error

Consume starts consuming from the consumergroup

func (*MockConsumerGroup) Errors

func (cg *MockConsumerGroup) Errors() <-chan error

Errors returns the errors channel

func (*MockConsumerGroup) FailOnConsume

func (cg *MockConsumerGroup) FailOnConsume(err error)

FailOnConsume marks the consumer to fail on consume

func (*MockConsumerGroup) Pause

func (cg *MockConsumerGroup) Pause(partitions map[string][]int32)

func (*MockConsumerGroup) PauseAll

func (cg *MockConsumerGroup) PauseAll()

func (*MockConsumerGroup) Resume

func (cg *MockConsumerGroup) Resume(partitions map[string][]int32)

func (*MockConsumerGroup) ResumeAll

func (cg *MockConsumerGroup) ResumeAll()

func (*MockConsumerGroup) SendError

func (cg *MockConsumerGroup) SendError(err error)

SendError sends an error the consumergroup

func (*MockConsumerGroup) SendMessage

func (cg *MockConsumerGroup) SendMessage(message *sarama.ConsumerMessage) <-chan struct{}

SendMessage sends a message to the consumergroup returns a channel that will be closed when the message has been committed by the group

func (*MockConsumerGroup) SendMessageWait

func (cg *MockConsumerGroup) SendMessageWait(message *sarama.ConsumerMessage)

SendMessageWait sends a message to the consumergroup waiting for the message for being committed

type MockConsumerGroupClaim

type MockConsumerGroupClaim struct {
	// contains filtered or unexported fields
}

MockConsumerGroupClaim mocks the consumergroupclaim

func NewMockConsumerGroupClaim

func NewMockConsumerGroupClaim(topic string, partition int32) *MockConsumerGroupClaim

NewMockConsumerGroupClaim creates a new mocksconsumergroupclaim

func (*MockConsumerGroupClaim) HighWaterMarkOffset

func (cgc *MockConsumerGroupClaim) HighWaterMarkOffset() int64

HighWaterMarkOffset returns the hwm offset

func (*MockConsumerGroupClaim) InitialOffset

func (cgc *MockConsumerGroupClaim) InitialOffset() int64

InitialOffset returns the initial offset

func (*MockConsumerGroupClaim) Messages

func (cgc *MockConsumerGroupClaim) Messages() <-chan *sarama.ConsumerMessage

Messages returns the message channel that must be

func (*MockConsumerGroupClaim) Partition

func (cgc *MockConsumerGroupClaim) Partition() int32

Partition returns the partition

func (*MockConsumerGroupClaim) Topic

func (cgc *MockConsumerGroupClaim) Topic() string

Topic returns the current topic of the claim

type MockConsumerGroupSession

type MockConsumerGroupSession struct {
	// contains filtered or unexported fields
}

MockConsumerGroupSession mocks the consumer group session used for testing

func (*MockConsumerGroupSession) Claims

func (cgs *MockConsumerGroupSession) Claims() map[string][]int32

Claims returns the number of partitions assigned in the group session for each topic

func (*MockConsumerGroupSession) Commit

func (cgs *MockConsumerGroupSession) Commit()

Commit the offset to the backend

func (*MockConsumerGroupSession) Context

func (cgs *MockConsumerGroupSession) Context() context.Context

Context returns the consumer group's context

func (*MockConsumerGroupSession) GenerationID

func (cgs *MockConsumerGroupSession) GenerationID() int32

GenerationID returns the generation ID of the group consumer

func (*MockConsumerGroupSession) MarkMessage

func (cgs *MockConsumerGroupSession) MarkMessage(msg *sarama.ConsumerMessage, metadata string)

MarkMessage marks the passed message as consumed

func (*MockConsumerGroupSession) MarkOffset

func (cgs *MockConsumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string)

MarkOffset marks the passed offset consumed in topic/partition

func (*MockConsumerGroupSession) MemberID

func (cgs *MockConsumerGroupSession) MemberID() string

MemberID returns the member ID TODO: clarify what that actually means and whether we need to mock that somehow

func (*MockConsumerGroupSession) ResetOffset

func (cgs *MockConsumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string)

ResetOffset resets the offset to be consumed from

func (*MockConsumerGroupSession) SendMessage

func (cgs *MockConsumerGroupSession) SendMessage(msg *sarama.ConsumerMessage)

SendMessage sends a message to the consumer

type MockProducer

type MockProducer struct {
	// contains filtered or unexported fields
}

MockProducer is a mock of Producer interface

func NewMockProducer

func NewMockProducer(ctrl *gomock.Controller) *MockProducer

NewMockProducer creates a new mock instance

func (*MockProducer) Close

func (m *MockProducer) Close() error

Close mocks base method

func (*MockProducer) EXPECT

EXPECT returns an object that allows the caller to indicate expected use

func (*MockProducer) Emit

func (m *MockProducer) Emit(arg0, arg1 string, arg2 []byte) *Promise

Emit mocks base method

func (*MockProducer) EmitWithHeaders

func (m *MockProducer) EmitWithHeaders(arg0, arg1 string, arg2 []byte, arg3 Headers) *Promise

EmitWithHeaders mocks base method

type MockProducerMockRecorder

type MockProducerMockRecorder struct {
	// contains filtered or unexported fields
}

MockProducerMockRecorder is the mock recorder for MockProducer

func (*MockProducerMockRecorder) Close

func (mr *MockProducerMockRecorder) Close() *gomock.Call

Close indicates an expected call of Close

func (*MockProducerMockRecorder) Emit

func (mr *MockProducerMockRecorder) Emit(arg0, arg1, arg2 interface{}) *gomock.Call

Emit indicates an expected call of Emit

func (*MockProducerMockRecorder) EmitWithHeaders

func (mr *MockProducerMockRecorder) EmitWithHeaders(arg0, arg1, arg2, arg3 interface{}) *gomock.Call

EmitWithHeaders indicates an expected call of EmitWithHeaders

type MockStorage

type MockStorage struct {
	// contains filtered or unexported fields
}

MockStorage is a mock of Storage interface

func NewMockStorage

func NewMockStorage(ctrl *gomock.Controller) *MockStorage

NewMockStorage creates a new mock instance

func (*MockStorage) Close

func (m *MockStorage) Close() error

Close mocks base method

func (*MockStorage) Delete

func (m *MockStorage) Delete(arg0 string) error

Delete mocks base method

func (*MockStorage) EXPECT

func (m *MockStorage) EXPECT() *MockStorageMockRecorder

EXPECT returns an object that allows the caller to indicate expected use

func (*MockStorage) Get

func (m *MockStorage) Get(arg0 string) ([]byte, error)

Get mocks base method

func (*MockStorage) GetOffset

func (m *MockStorage) GetOffset(arg0 int64) (int64, error)

GetOffset mocks base method

func (*MockStorage) Has

func (m *MockStorage) Has(arg0 string) (bool, error)

Has mocks base method

func (*MockStorage) Iterator

func (m *MockStorage) Iterator() (storage.Iterator, error)

Iterator mocks base method

func (*MockStorage) IteratorWithRange

func (m *MockStorage) IteratorWithRange(arg0, arg1 []byte) (storage.Iterator, error)

IteratorWithRange mocks base method

func (*MockStorage) MarkRecovered

func (m *MockStorage) MarkRecovered() error

MarkRecovered mocks base method

func (*MockStorage) Open

func (m *MockStorage) Open() error

Open mocks base method

func (*MockStorage) Set

func (m *MockStorage) Set(arg0 string, arg1 []byte) error

Set mocks base method

func (*MockStorage) SetOffset

func (m *MockStorage) SetOffset(arg0 int64) error

SetOffset mocks base method

type MockStorageMockRecorder

type MockStorageMockRecorder struct {
	// contains filtered or unexported fields
}

MockStorageMockRecorder is the mock recorder for MockStorage

func (*MockStorageMockRecorder) Close

func (mr *MockStorageMockRecorder) Close() *gomock.Call

Close indicates an expected call of Close

func (*MockStorageMockRecorder) Delete

func (mr *MockStorageMockRecorder) Delete(arg0 interface{}) *gomock.Call

Delete indicates an expected call of Delete

func (*MockStorageMockRecorder) Get

func (mr *MockStorageMockRecorder) Get(arg0 interface{}) *gomock.Call

Get indicates an expected call of Get

func (*MockStorageMockRecorder) GetOffset

func (mr *MockStorageMockRecorder) GetOffset(arg0 interface{}) *gomock.Call

GetOffset indicates an expected call of GetOffset

func (*MockStorageMockRecorder) Has

func (mr *MockStorageMockRecorder) Has(arg0 interface{}) *gomock.Call

Has indicates an expected call of Has

func (*MockStorageMockRecorder) Iterator

func (mr *MockStorageMockRecorder) Iterator() *gomock.Call

Iterator indicates an expected call of Iterator

func (*MockStorageMockRecorder) IteratorWithRange

func (mr *MockStorageMockRecorder) IteratorWithRange(arg0, arg1 interface{}) *gomock.Call

IteratorWithRange indicates an expected call of IteratorWithRange

func (*MockStorageMockRecorder) MarkRecovered

func (mr *MockStorageMockRecorder) MarkRecovered() *gomock.Call

MarkRecovered indicates an expected call of MarkRecovered

func (*MockStorageMockRecorder) Open

func (mr *MockStorageMockRecorder) Open() *gomock.Call

Open indicates an expected call of Open

func (*MockStorageMockRecorder) Set

func (mr *MockStorageMockRecorder) Set(arg0, arg1 interface{}) *gomock.Call

Set indicates an expected call of Set

func (*MockStorageMockRecorder) SetOffset

func (mr *MockStorageMockRecorder) SetOffset(arg0 interface{}) *gomock.Call

SetOffset indicates an expected call of SetOffset

type MockTopicManager

type MockTopicManager struct {
	// contains filtered or unexported fields
}

MockTopicManager is a mock of TopicManager interface

func NewMockTopicManager

func NewMockTopicManager(ctrl *gomock.Controller) *MockTopicManager

NewMockTopicManager creates a new mock instance

func (*MockTopicManager) Close

func (m *MockTopicManager) Close() error

Close mocks base method

func (*MockTopicManager) EXPECT

EXPECT returns an object that allows the caller to indicate expected use

func (*MockTopicManager) EnsureStreamExists

func (m *MockTopicManager) EnsureStreamExists(arg0 string, arg1 int) error

EnsureStreamExists mocks base method

func (*MockTopicManager) EnsureTableExists

func (m *MockTopicManager) EnsureTableExists(arg0 string, arg1 int) error

EnsureTableExists mocks base method

func (*MockTopicManager) EnsureTopicExists

func (m *MockTopicManager) EnsureTopicExists(arg0 string, arg1, arg2 int, arg3 map[string]string) error

EnsureTopicExists mocks base method

func (*MockTopicManager) GetOffset

func (m *MockTopicManager) GetOffset(arg0 string, arg1 int32, arg2 int64) (int64, error)

GetOffset mocks base method

func (*MockTopicManager) Partitions

func (m *MockTopicManager) Partitions(arg0 string) ([]int32, error)

Partitions mocks base method

type MockTopicManagerMockRecorder

type MockTopicManagerMockRecorder struct {
	// contains filtered or unexported fields
}

MockTopicManagerMockRecorder is the mock recorder for MockTopicManager

func (*MockTopicManagerMockRecorder) Close

Close indicates an expected call of Close

func (*MockTopicManagerMockRecorder) EnsureStreamExists

func (mr *MockTopicManagerMockRecorder) EnsureStreamExists(arg0, arg1 interface{}) *gomock.Call

EnsureStreamExists indicates an expected call of EnsureStreamExists

func (*MockTopicManagerMockRecorder) EnsureTableExists

func (mr *MockTopicManagerMockRecorder) EnsureTableExists(arg0, arg1 interface{}) *gomock.Call

EnsureTableExists indicates an expected call of EnsureTableExists

func (*MockTopicManagerMockRecorder) EnsureTopicExists

func (mr *MockTopicManagerMockRecorder) EnsureTopicExists(arg0, arg1, arg2, arg3 interface{}) *gomock.Call

EnsureTopicExists indicates an expected call of EnsureTopicExists

func (*MockTopicManagerMockRecorder) GetOffset

func (mr *MockTopicManagerMockRecorder) GetOffset(arg0, arg1, arg2 interface{}) *gomock.Call

GetOffset indicates an expected call of GetOffset

func (*MockTopicManagerMockRecorder) Partitions

func (mr *MockTopicManagerMockRecorder) Partitions(arg0 interface{}) *gomock.Call

Partitions indicates an expected call of Partitions

type NilHandling

type NilHandling int

NilHandling defines how nil messages should be handled by the processor.

const (
	// NilIgnore drops any message with nil value.
	NilIgnore NilHandling = 0 + iota
	// NilProcess passes the nil value to ProcessCallback.
	NilProcess
	// NilDecode passes the nil value to decoder before calling ProcessCallback.
	NilDecode
)

type OutputStats

type OutputStats struct {
	Count uint
	Bytes int
}

OutputStats represents the number of messages and the number of bytes emitted into a stream or table since the process started.

type PPRunMode

type PPRunMode int

PPRunMode configures how the partition processor participates as part of the processor

type PartitionProcStats

type PartitionProcStats struct {
	Now time.Time

	TableStats *TableStats

	Joined map[string]*TableStats

	Input  map[string]*InputStats
	Output map[string]*OutputStats
}

PartitionProcStats represents metrics and measurements of a partition processor

type PartitionProcessor

type PartitionProcessor struct {
	// contains filtered or unexported fields
}

PartitionProcessor handles message processing of one partition by serializing messages from different input topics. It also handles joined tables as well as lookup views (managed by `Processor`).

func (*PartitionProcessor) Recovered

func (pp *PartitionProcessor) Recovered() bool

Recovered returns whether the processor is running (i.e. all joins, lookups and the table is recovered and it's consuming messages)

func (*PartitionProcessor) Start

func (pp *PartitionProcessor) Start(setupCtx, ctx context.Context) error

Start initializes the partition processor * recover the table * recover all join tables * run the join-tables in catchup mode * start the processor processing loop to receive messages This method takes two contexts, as it does two distinct phases:

  • setting up the partition (loading table, joins etc.), after which it returns. This needs a separate context to allow terminatin the setup phase
  • starting the message-processing-loop of the actual processor. This will keep running after `Start` returns, so it uses the second context.

func (*PartitionProcessor) Stop

func (pp *PartitionProcessor) Stop() error

Stop stops the partition processor

func (*PartitionProcessor) VisitValues

func (pp *PartitionProcessor) VisitValues(ctx context.Context, name string, meta interface{}, visited *int64) error

VisitValues iterates over all values in the table and calls the "visit"-callback for the passed name. Optional parameter value can be set, which will just be forwarded to the visitor-function the function returns after all items of the table have been added to the channel.

type PartitionStatus

type PartitionStatus int

PartitionStatus is the status of the partition of a table (group table or joined table).

const (
	// PartitionStopped indicates the partition stopped and should not be used anymore.
	PartitionStopped PartitionStatus = iota
	// PartitionInitializing indicates that the underlying storage is initializing (e.g. opening leveldb files),
	// and has not actually started working yet.
	PartitionInitializing
	// PartitionConnecting indicates the partition trying to (re-)connect to Kafka
	PartitionConnecting
	// PartitionRecovering indicates the partition is recovering and the storage
	// is writing updates in bulk-mode (if the storage implementation supports it).
	PartitionRecovering
	// PartitionPreparing indicates the end of the bulk-mode. Depending on the storage
	// implementation, the Preparing phase may take long because the storage compacts its logs.
	PartitionPreparing
	// PartitionRunning indicates the partition is recovered and processing updates
	// in normal operation.
	PartitionRunning
)

type PartitionTable

type PartitionTable struct {
	// contains filtered or unexported fields
}

PartitionTable manages the usage of a table for one partition. It allows to setup and recover/catchup the table contents from kafka, allow updates via Get/Set/Delete accessors

func (*PartitionTable) CatchupForever

func (p *PartitionTable) CatchupForever(ctx context.Context, restartOnError bool) error

CatchupForever starts catching the partition table forever (until the context is cancelled). Option restartOnError allows the view to stay open/intact even in case of consumer errors

func (*PartitionTable) Close

func (p *PartitionTable) Close() error

Close closes the partition table

func (*PartitionTable) CurrentState

func (p *PartitionTable) CurrentState() PartitionStatus

CurrentState returns the partition's current status

func (*PartitionTable) Delete

func (p *PartitionTable) Delete(key string) error

Delete removes the passed key from the partition table by deleting from the underlying storage

func (*PartitionTable) Get

func (p *PartitionTable) Get(key string) ([]byte, error)

Get returns the value for passed key

func (*PartitionTable) GetOffset

func (p *PartitionTable) GetOffset(defValue int64) (int64, error)

GetOffset returns the magic offset value from storage

func (*PartitionTable) Has

func (p *PartitionTable) Has(key string) (bool, error)

Has returns whether the storage contains passed key

func (*PartitionTable) IsRecovered

func (p *PartitionTable) IsRecovered() bool

IsRecovered returns whether the partition table is recovered

func (*PartitionTable) Iterator

func (p *PartitionTable) Iterator() (storage.Iterator, error)

Iterator returns an iterator on the table's storage. If the partition_table is not in a running state, it will return an error to prevent serving incomplete data

func (*PartitionTable) IteratorWithRange

func (p *PartitionTable) IteratorWithRange(start []byte, limit []byte) (storage.Iterator, error)

IteratorWithRange returns an iterator on the table's storage for passed range. If the partition_table is not in a running state, it will return an error to prevent serving incomplete data

func (*PartitionTable) RunStatsLoop

func (p *PartitionTable) RunStatsLoop(ctx context.Context)

RunStatsLoop starts the handler for stats requests. This loop runs detached from the recover/catchup mechanism so clients can always request stats even if the partition table is not running (like a processor table after it's recovered).

func (*PartitionTable) Set

func (p *PartitionTable) Set(key string, value []byte) error

Set sets a key value key in the partition table by modifying the underlying storage

func (*PartitionTable) SetOffset

func (p *PartitionTable) SetOffset(value int64) error

SetOffset sets the magic offset value in storage

func (*PartitionTable) SetupAndRecover

func (p *PartitionTable) SetupAndRecover(ctx context.Context, restartOnError bool) error

SetupAndRecover sets up the partition storage and recovers to HWM

func (*PartitionTable) TrackMessageWrite

func (p *PartitionTable) TrackMessageWrite(ctx context.Context, length int)

TrackMessageWrite updates the write stats to passed length

func (*PartitionTable) WaitRecovered

func (p *PartitionTable) WaitRecovered() <-chan struct{}

WaitRecovered returns a channel that closes when the partition table enters state `PartitionRunning`

type ProcessCallback

type ProcessCallback func(ctx Context, msg interface{})

ProcessCallback function is called for every message received by the processor.

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor is a set of stateful callback functions that, on the arrival of messages, modify the content of a table (the group table) and emit messages into other topics. Messages as well as rows in the group table are key-value pairs. A group is composed by multiple processor instances.

func NewProcessor

func NewProcessor(brokers []string, gg *GroupGraph, options ...ProcessorOption) (*Processor, error)

NewProcessor creates a processor instance in a group given the address of Kafka brokers, the consumer group name, a list of subscriptions (topics, codecs, and callbacks), and series of options.

func (*Processor) Cleanup

func (g *Processor) Cleanup(session sarama.ConsumerGroupSession) error

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited but before the offsets are committed for the very last time.

func (*Processor) ConsumeClaim

func (g *Processor) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). Once the Messages() channel is closed, the Handler must finish its processing loop and exit.

func (*Processor) Get

func (g *Processor) Get(key string) (interface{}, error)

Get returns a read-only copy of a value from the group table if the respective partition is owned by the processor instace. Get can be called by multiple goroutines concurrently. Get can be only used with stateful processors (ie, when group table is enabled) and after Recovered returns true.

func (*Processor) Graph

func (g *Processor) Graph() *GroupGraph

Graph returns the group graph of the processor.

func (*Processor) Recovered

func (g *Processor) Recovered() bool

Recovered returns whether the processor is running, i.e. if the processor has recovered all lookups/joins/tables and is running

func (*Processor) Run

func (g *Processor) Run(ctx context.Context) (rerr error)

Run starts the processor using passed context. The processor stops in case of errors or if the context is cancelled

func (*Processor) Setup

func (g *Processor) Setup(session sarama.ConsumerGroupSession) error

Setup is run at the beginning of a new session, before ConsumeClaim.

func (*Processor) StateReader

func (g *Processor) StateReader() StateReader

StateReader returns a read only interface of the processors state.

func (*Processor) Stats

func (g *Processor) Stats() *ProcessorStats

Stats returns the aggregated stats for the processor including all partitions, tables, lookups and joins

func (*Processor) StatsWithContext

func (g *Processor) StatsWithContext(ctx context.Context) *ProcessorStats

StatsWithContext returns stats for the processor, see #Processor.Stats()

func (*Processor) Stop

func (g *Processor) Stop()

Stop stops the processor. This is semantically equivalent of closing the Context that was passed to Processor.Run(..). This method will return immediately, errors during running will be returned from Processor.Run(..)

func (*Processor) VisitAll

func (g *Processor) VisitAll(ctx context.Context, name string, meta interface{}) error

VisitAll visits all values from the processor table.

func (*Processor) VisitAllWithStats

func (g *Processor) VisitAllWithStats(ctx context.Context, name string, meta interface{}) (int64, error)

VisitAllWithStats visits all keys in parallel by passing the visit request to all partitions. The optional argument "meta" will be forwarded to the visit-function of each key of the table. The function returns when * all values have been visited or * the context is cancelled or * the processor rebalances/shuts down Return parameters: * number of visited items * error

func (*Processor) WaitForReady

func (g *Processor) WaitForReady()

WaitForReady waits until the processor is ready to consume messages (or is actually consuming messages) i.e., it is done catching up all partition tables, joins and lookup tables

func (*Processor) WaitForReadyContext

func (g *Processor) WaitForReadyContext(ctx context.Context) error

WaitForReadyContext is context aware option of WaitForReady. It either waits until the processor is ready or until context is canceled. If the return was caused by context it will return context reported error.

type ProcessorOption

type ProcessorOption func(*poptions, *GroupGraph)

ProcessorOption defines a configuration option to be used when creating a processor.

func WithBackoffBuilder

func WithBackoffBuilder(bb BackoffBuilder) ProcessorOption

WithBackoffBuilder replaced the default backoff.

func WithBackoffResetTimeout

func WithBackoffResetTimeout(duration time.Duration) ProcessorOption

WithBackoffResetTimeout defines the timeout when the backoff will be reset.

func WithClientID

func WithClientID(clientID string) ProcessorOption

WithClientID defines the client ID used to identify with Kafka.

func WithConsumerGroupBuilder

func WithConsumerGroupBuilder(cgb ConsumerGroupBuilder) ProcessorOption

WithConsumerGroupBuilder replaces the default consumer group builder

func WithConsumerSaramaBuilder

func WithConsumerSaramaBuilder(cgb SaramaConsumerBuilder) ProcessorOption

WithConsumerSaramaBuilder replaces the default consumer group builder

func WithGroupGraphHook

func WithGroupGraphHook(hook func(gg *GroupGraph)) ProcessorOption

WithGroupGraphHook allows a function to obtain the group graph when a processor is started.

func WithHasher

func WithHasher(hasher func() hash.Hash32) ProcessorOption

WithHasher sets the hash function that assigns keys to partitions.

func WithHotStandby

func WithHotStandby() ProcessorOption

WithHotStandby configures the processor to keep partitions up to date which are not part of the current generation's assignment. This allows fast processor failover since all partitions are hot in other processor instances, but it requires more resources (in particular network and disk). If this option is used, the option `WithRecoverAhead` should also be added to avoid unnecessary delays.

func WithLogger

func WithLogger(l Logger) ProcessorOption

WithLogger sets the logger the processor should use. By default, processors use the standard library logger.

func WithNilHandling

func WithNilHandling(nh NilHandling) ProcessorOption

WithNilHandling configures how the processor should handle messages with nil value. By default the processor ignores nil messages.

func WithPartitionChannelSize

func WithPartitionChannelSize(size int) ProcessorOption

WithPartitionChannelSize replaces the default partition channel size. This is mostly used for testing by setting it to 0 to have synchronous behavior of goka.

func WithProducerBuilder

func WithProducerBuilder(pb ProducerBuilder) ProcessorOption

WithProducerBuilder replaces the default producer builder.

func WithProducerDefaultHeaders

func WithProducerDefaultHeaders(hdr Headers) ProcessorOption

WithProducerDefaultHeaders configures the producer with default headers which are included with every emit.

func WithRebalanceCallback

func WithRebalanceCallback(cb RebalanceCallback) ProcessorOption

WithRebalanceCallback sets the callback for when a new partition assignment is received. By default, this is an empty function.

func WithRecoverAhead

func WithRecoverAhead() ProcessorOption

WithRecoverAhead configures the processor to recover joins and the processor table ahead of joining the group. This reduces the processing delay that occurs when adding new instances to groups with high-volume-joins/tables. If the processor does not use joins or a table, it does not have any effect.

func WithStorageBuilder

func WithStorageBuilder(sb storage.Builder) ProcessorOption

WithStorageBuilder defines a builder for the storage of each partition.

func WithTester

func WithTester(t Tester) ProcessorOption

WithTester configures all external connections of a processor, ie, storage, consumer and producer

func WithTopicManagerBuilder

func WithTopicManagerBuilder(tmb TopicManagerBuilder) ProcessorOption

WithTopicManagerBuilder replaces the default topic manager builder.

func WithUpdateCallback

func WithUpdateCallback(cb UpdateCallback) ProcessorOption

WithUpdateCallback defines the callback called upon recovering a message from the log.

type ProcessorStats

type ProcessorStats struct {
	Group  map[int32]*PartitionProcStats
	Lookup map[string]*ViewStats
}

ProcessorStats represents the metrics of all partitions of the processor, including its group, joined tables and lookup tables.

type Producer

type Producer interface {
	// Emit sends a message to topic.
	Emit(topic string, key string, value []byte) *Promise
	EmitWithHeaders(topic string, key string, value []byte, headers Headers) *Promise
	Close() error
}

Producer abstracts the kafka producer

func DefaultProducerBuilder

func DefaultProducerBuilder(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error)

DefaultProducerBuilder creates a Kafka producer using the Sarama library.

func NewProducer

func NewProducer(brokers []string, config *sarama.Config) (Producer, error)

NewProducer creates new kafka producer for passed brokers.

type ProducerBuilder

type ProducerBuilder func(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error)

ProducerBuilder create a Kafka producer.

func ProducerBuilderWithConfig

func ProducerBuilderWithConfig(config *sarama.Config) ProducerBuilder

ProducerBuilderWithConfig creates a Kafka consumer using the Sarama library.

type Promise

type Promise struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Promise as in https://en.wikipedia.org/wiki/Futures_and_promises

func NewPromise

func NewPromise() *Promise

NewPromise creates a new Promise

func (*Promise) Then

func (p *Promise) Then(callback func(err error)) *Promise

Then chains a callback to the Promise

func (*Promise) ThenWithMessage

func (p *Promise) ThenWithMessage(callback func(msg *sarama.ProducerMessage, err error)) *Promise

ThenWithMessage chains a callback to the Promise

type PromiseFinisher

type PromiseFinisher func(msg *sarama.ProducerMessage, err error) *Promise

PromiseFinisher finishes a promise

type RebalanceCallback

type RebalanceCallback func(a Assignment)

RebalanceCallback is invoked when the processor receives a new partition assignment.

type RecoveryStats

type RecoveryStats struct {
	StartTime    time.Time
	RecoveryTime time.Time

	Offset int64 // last offset processed or recovered
	Hwm    int64 // next offset to be written
}

RecoveryStats groups statistics during recovery

type SaramaConsumerBuilder

type SaramaConsumerBuilder func(brokers []string, clientID string) (sarama.Consumer, error)

SaramaConsumerBuilder creates a `sarama.Consumer`

func SaramaConsumerBuilderWithConfig

func SaramaConsumerBuilderWithConfig(config *sarama.Config) SaramaConsumerBuilder

SaramaConsumerBuilderWithConfig creates a sarama consumer using passed config

type Signal

type Signal struct {
	// contains filtered or unexported fields
}

Signal allows synchronization on a state, waiting for that state and checking the current state

func NewSignal

func NewSignal(states ...State) *Signal

NewSignal creates a new Signal based on the states

func (*Signal) IsState

func (s *Signal) IsState(state State) bool

IsState returns if the signal is in the requested state

func (*Signal) ObserveStateChange

func (s *Signal) ObserveStateChange() *StateChangeObserver

ObserveStateChange returns a channel that receives state changes. Note that the caller must take care of consuming that channel, otherwise the Signal will block upon state changes.

func (*Signal) SetState

func (s *Signal) SetState(state State) *Signal

SetState changes the state of the signal and notifies all goroutines waiting for the new state

func (*Signal) State

func (s *Signal) State() State

State returns the current state

func (*Signal) WaitForState

func (s *Signal) WaitForState(state State) <-chan struct{}

WaitForState returns a channel that closes when the signal reaches passed state.

func (*Signal) WaitForStateMin

func (s *Signal) WaitForStateMin(state State) <-chan struct{}

WaitForStateMin returns a channel that will be closed, when the signal enters passed state or higher (states are ints, so we're just comparing ints here)

type State

type State int

State types a state of the Signal

const (
	// PPStateIdle marks the partition processor as idling (not started yet)
	PPStateIdle State = iota
	// PPStateRecovering indicates a recovering partition processor
	PPStateRecovering
	// PPStateRunning indicates a running partition processor
	PPStateRunning
	// PPStateStopping indicates a stopping partition processor
	PPStateStopping
	// PPStateStopped indicates a stopped partition processor
	PPStateStopped
)
const (
	// ProcStateIdle indicates an idling partition processor (not started yet)
	ProcStateIdle State = iota
	// ProcStateStarting indicates a starting partition processor, i.e. before rebalance
	ProcStateStarting
	// ProcStateSetup indicates a partition processor during setup of a rebalance round
	ProcStateSetup
	// ProcStateRunning indicates a running partition processor
	ProcStateRunning
	// ProcStateStopping indicates a stopping partition processor
	ProcStateStopping
)

type StateChangeObserver

type StateChangeObserver struct {
	// contains filtered or unexported fields
}

StateChangeObserver wraps a channel that triggers when the signal's state changes

func (*StateChangeObserver) C

func (s *StateChangeObserver) C() <-chan State

C returns the channel to observer state changes

func (*StateChangeObserver) Stop

func (s *StateChangeObserver) Stop()

Stop stops the observer. Its update channel will be closed and

type StateReader

type StateReader interface {
	State() State
	IsState(State) bool
	WaitForStateMin(state State) <-chan struct{}
	WaitForState(state State) <-chan struct{}
	ObserveStateChange() *StateChangeObserver
}

StateReader is a read only abstraction of a Signal to expose the current state.

type Stream

type Stream string

Stream is the name of an event stream topic in Kafka, ie, a topic with cleanup.policy=delete

type Streams

type Streams []Stream

Streams is a slice of Stream names.

func StringsToStreams

func StringsToStreams(strings ...string) Streams

StringsToStreams is a simple cast/conversion functions that allows to pass a slice of strings as a slice of Stream (Streams) Avoids the boilerplate loop over the string array that would be necessary otherwise.

Example
inputTopics := []string{
	"input1",
	"input2",
	"input3",
}

// use it, e.g. in the Inputs-Edge in the group graph
graph := DefineGroup("group",
	Inputs(StringsToStreams(inputTopics...), new(codec.String), func(ctx Context, msg interface{}) {}),
)
_ = graph
Output:

type TMConfigMismatchBehavior

type TMConfigMismatchBehavior int

TMConfigMismatchBehavior configures how configuration mismatches of a topic (replication, num partitions, compaction) should be treated

const (
	// TMConfigMismatchBehaviorIgnore ignore wrong config values
	TMConfigMismatchBehaviorIgnore TMConfigMismatchBehavior = 0

	// TMConfigMismatchBehaviorWarn warns if the topic is configured differently than requested
	TMConfigMismatchBehaviorWarn TMConfigMismatchBehavior = 1

	// TMConfigMismatchBehaviorFail makes checking the topic fail, if the configuration different than requested
	TMConfigMismatchBehaviorFail TMConfigMismatchBehavior = 2
)

type Table

type Table string

Table is the name of a table topic in Kafka, ie, a topic with cleanup.policy=compact

func GroupTable

func GroupTable(group Group) Table

GroupTable returns the name of the group table of group.

type TableStats

type TableStats struct {
	Stalled bool

	Status PartitionStatus

	RunMode PPRunMode

	Recovery *RecoveryStats

	Input  *InputStats
	Writes *OutputStats
}

TableStats represents stats for a table partition

type Tester

type Tester interface {
	StorageBuilder() storage.Builder
	ProducerBuilder() ProducerBuilder
	ConsumerGroupBuilder() ConsumerGroupBuilder
	ConsumerBuilder() SaramaConsumerBuilder
	EmitterProducerBuilder() ProducerBuilder
	TopicManagerBuilder() TopicManagerBuilder
	RegisterGroupGraph(*GroupGraph) string
	RegisterEmitter(Stream, Codec)
	RegisterView(Table, Codec) string
}

Tester interface to avoid import cycles when a processor needs to register to the tester.

type TopicManager

type TopicManager interface {
	// EnsureTableExists checks that a table (log-compacted topic) exists, or create one if possible
	EnsureTableExists(topic string, npar int) error
	// EnsureStreamExists checks that a stream topic exists, or create one if possible
	EnsureStreamExists(topic string, npar int) error
	// EnsureTopicExists checks that a topic exists, or create one if possible,
	// enforcing the given configuration
	EnsureTopicExists(topic string, npar, rfactor int, config map[string]string) error

	// Partitions returns the number of partitions of a topic, that are assigned to the running
	// instance, i.e. it doesn't represent all partitions of a topic.
	Partitions(topic string) ([]int32, error)

	GetOffset(topic string, partitionID int32, time int64) (int64, error)

	// Close closes the topic manager
	Close() error
}

TopicManager provides an interface to create/check topics and their partitions

func DefaultTopicManagerBuilder

func DefaultTopicManagerBuilder(brokers []string) (TopicManager, error)

DefaultTopicManagerBuilder creates TopicManager using the Sarama library.

func NewTopicManager

func NewTopicManager(brokers []string, saramaConfig *sarama.Config, topicManagerConfig *TopicManagerConfig) (TopicManager, error)

NewTopicManager creates a new topic manager using the sarama library

type TopicManagerBuilder

type TopicManagerBuilder func(brokers []string) (TopicManager, error)

TopicManagerBuilder creates a TopicManager to check partition counts and create tables.

func TopicManagerBuilderWithConfig

func TopicManagerBuilderWithConfig(config *sarama.Config, tmConfig *TopicManagerConfig) TopicManagerBuilder

TopicManagerBuilderWithConfig creates TopicManager using the Sarama library.

func TopicManagerBuilderWithTopicManagerConfig

func TopicManagerBuilderWithTopicManagerConfig(tmConfig *TopicManagerConfig) TopicManagerBuilder

TopicManagerBuilderWithTopicManagerConfig creates TopicManager using the Sarama library.

type TopicManagerConfig

type TopicManagerConfig struct {
	Logger logger
	Table  struct {
		Replication int
		// CleanupPolicy allows to overwrite the default cleanup policy for streams.
		// Defaults to 'compact' if not set
		CleanupPolicy string
	}
	Stream struct {
		Replication int
		Retention   time.Duration
		// CleanupPolicy allows to overwrite the default cleanup policy for streams.
		// Defaults to 'delete' if not set
		CleanupPolicy string
	}

	// CreateTopicTimeout timeout for the topic manager to wait for the topic being created.
	// Set to 0 to turn off checking topic creation.
	// Defaults to 10 seconds
	CreateTopicTimeout time.Duration

	// TMConfigMismatchBehavior configures how configuration mismatches of a topic (replication, num partitions, compaction) should be
	// treated
	MismatchBehavior TMConfigMismatchBehavior
}

TopicManagerConfig contains options of to create tables and stream topics.

func NewTopicManagerConfig

func NewTopicManagerConfig() *TopicManagerConfig

NewTopicManagerConfig provides a default configuration for auto-creation with replication factor of 2 and rentention time of 1 hour. Use this function rather than creating TopicManagerConfig from scratch to initialize the config with reasonable defaults

type UpdateCallback

type UpdateCallback func(ctx UpdateContext, s storage.Storage, key string, value []byte) error

UpdateCallback is invoked upon arrival of a message for a table partition.

type UpdateContext

type UpdateContext interface {
	// Topic returns the topic of input message.
	Topic() Stream

	// Partition returns the partition of the input message.
	Partition() int32

	// Offset returns the offset of the input message.
	Offset() int64

	// Headers returns the headers of the input message.
	//
	// It is recommended to lazily evaluate the headers to reduce overhead per message
	// when headers are not used.
	Headers() Headers
}

UpdateContext defines the interface for UpdateCallback arguments.

type View

type View struct {
	// contains filtered or unexported fields
}

View is a materialized (i.e. persistent) cache of a group table.

Example

This example shows how views are typically created and used in the most basic way.

// create a new view
view, err := NewView([]string{"localhost:9092"},
	"input-topic",
	new(codec.String))
if err != nil {
	log.Fatalf("error creating view: %v", err)
}

// provide a cancelable
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// start the view
done := make(chan struct{})
go func() {
	defer close(done)
	err := view.Run(ctx)
	if err != nil {
		log.Fatalf("Error running view: %v", err)
	}
}()

// wait for the view to be recovered

// Option A: by polling
for !view.Recovered() {
	select {
	case <-ctx.Done():
		return
	case <-time.After(time.Second):
	}
}

// Option B: by waiting for the signal
<-view.WaitRunning()

// retrieve a value from the view
val, err := view.Get("some-key")
if err != nil {
	log.Fatalf("Error getting item from view: %v", err)
}

if val != nil {
	// cast it to string
	// no need for type assertion, if it was not that type, the codec would've failed
	log.Printf("got value %s", val.(string))
}

has, err := view.Has("some-key")
if err != nil {
	log.Fatalf("Error getting item from view: %v", err)
}

_ = has

// stop the view and wait for it to shut down before returning
cancel()
<-done
Output:

Example (Autoreconnect)
// create a new view
view, err := NewView([]string{"localhost:9092"},
	"input-topic",
	new(codec.String),

	// Automatically reconnect in case of errors. This is useful for services where availability
	// is more important than the data being up to date in case of kafka connection issues.
	WithViewAutoReconnect(),

	// Reconnect uses a default backoff mechanism, that can be modified by providing
	// a custom backoff builder using
	// WithViewBackoffBuilder(customBackoffBuilder),

	// When the view is running successfully for some time, the backoff is reset.
	// This time range can be modified using
	// WithViewBackoffResetTimeout(3*time.Second),
)
if err != nil {
	log.Fatalf("error creating view: %v", err)
}

ctx, cancel := context.WithCancel(context.Background())
// start the view
done := make(chan struct{})
go func() {
	defer close(done)
	err := view.Run(ctx)
	if err != nil {
		log.Fatalf("Error running view: %v", err)
	}
}()

<-view.WaitRunning()
// at this point we can safely use the view with Has/Get/Iterate,
// even if the kafka connection is lost

// Stop the view and wait for it to shutdown before returning
cancel()
<-done
Output:

func NewView

func NewView(brokers []string, topic Table, codec Codec, options ...ViewOption) (*View, error)

NewView creates a new View object from a group.

func (*View) CurrentState

func (v *View) CurrentState() ViewState

CurrentState returns the current ViewState of the view This is useful for polling e.g. when implementing health checks or metrics

func (*View) Evict

func (v *View) Evict(key string) error

Evict removes the given key only from the local cache. In order to delete a key from Kafka and other Views, context.Delete should be used on a Processor.

func (*View) Get

func (v *View) Get(key string) (interface{}, error)

Get returns the value for the key in the view, if exists. Nil if it doesn't. Get can be called by multiple goroutines concurrently. Get can only be called after Recovered returns true.

func (*View) Has

func (v *View) Has(key string) (bool, error)

Has checks whether a value for passed key exists in the view.

func (*View) Iterator

func (v *View) Iterator() (Iterator, error)

Iterator returns an iterator that iterates over the state of the View.

func (*View) IteratorWithRange

func (v *View) IteratorWithRange(start, limit string) (Iterator, error)

IteratorWithRange returns an iterator that iterates over the state of the View. This iterator is build using the range.

func (*View) ObserveStateChanges

func (v *View) ObserveStateChanges() *StateChangeObserver

ObserveStateChanges returns a StateChangeObserver that allows to handle state changes of the view by reading from a channel. It is crucial to continuously read from that channel, otherwise the View might deadlock upon state changes. If the observer is not needed, the caller must call observer.Stop()

Example

view := goka.NewView(...)
go view.Run(ctx)

go func(){
  obs := view.ObserveStateChanges()
  defer obs.Stop()
  for {
    select{
      case state, ok := <-obs.C:
        // handle state (or closed channel)
      case <-ctx.Done():
    }
  }
}()

func (*View) Recovered

func (v *View) Recovered() bool

Recovered returns true when the view has caught up with events from kafka.

func (*View) Run

func (v *View) Run(ctx context.Context) (rerr error)

Run starts consuming the view's topic and saving updates in the local persistent cache.

The view will shutdown in case of errors or when the context is closed. It can be initialized with autoreconnect

view := NewView(..., WithViewAutoReconnect())

which makes the view internally reconnect in case of errors. Then it will only stop by canceling the context (see example).

func (*View) Stats

func (v *View) Stats(ctx context.Context) *ViewStats

Stats returns a set of performance metrics of the view.

func (*View) Topic

func (v *View) Topic() string

Topic returns the view's topic

func (*View) WaitRunning

func (v *View) WaitRunning() <-chan struct{}

WaitRunning returns a channel that will be closed when the view enters the running state

type ViewOption

type ViewOption func(*voptions, Table, Codec)

ViewOption defines a configuration option to be used when creating a view.

func WithViewAutoReconnect

func WithViewAutoReconnect() ViewOption

WithViewAutoReconnect defines the view is reconnecting internally, so Run() does not return in case of connection errors. The view must be shutdown by cancelling the context passed to Run()

func WithViewBackoffBuilder

func WithViewBackoffBuilder(bb BackoffBuilder) ViewOption

WithViewBackoffBuilder replaced the default backoff.

func WithViewBackoffResetTimeout

func WithViewBackoffResetTimeout(duration time.Duration) ViewOption

WithViewBackoffResetTimeout defines the timeout when the backoff will be reset.

func WithViewCallback

func WithViewCallback(cb UpdateCallback) ViewOption

WithViewCallback defines the callback called upon recovering a message from the log.

func WithViewClientID

func WithViewClientID(clientID string) ViewOption

WithViewClientID defines the client ID used to identify with Kafka.

func WithViewConsumerSaramaBuilder

func WithViewConsumerSaramaBuilder(cgb SaramaConsumerBuilder) ViewOption

WithViewConsumerSaramaBuilder replaces the default sarama consumer builder

func WithViewHasher

func WithViewHasher(hasher func() hash.Hash32) ViewOption

WithViewHasher sets the hash function that assigns keys to partitions.

func WithViewLogger

func WithViewLogger(l Logger) ViewOption

WithViewLogger sets the logger the view should use. By default, views use the standard library logger.

func WithViewRestartable

func WithViewRestartable() ViewOption

WithViewRestartable is kept only for backwards compatibility. DEPRECATED: since the behavior has changed, this name is misleading and should be replaced by WithViewAutoReconnect().

func WithViewStorageBuilder

func WithViewStorageBuilder(sb storage.Builder) ViewOption

WithViewStorageBuilder defines a builder for the storage of each partition.

func WithViewTester

func WithViewTester(t Tester) ViewOption

WithViewTester configures all external connections of a processor, ie, storage, consumer and producer

func WithViewTopicManagerBuilder

func WithViewTopicManagerBuilder(tmb TopicManagerBuilder) ViewOption

WithViewTopicManagerBuilder replaces the default topic manager.

type ViewState

type ViewState int

ViewState represents the state of the view

const (
	// ViewStateIdle  - the view is not started yet
	ViewStateIdle ViewState = iota
	// ViewStateInitializing - the view (i.e. at least one partition) is initializing
	ViewStateInitializing
	// ViewStateConnecting - the view (i.e. at least one partition) is (re-)connecting
	ViewStateConnecting
	// ViewStateCatchUp - the view (i.e. at least one partition) is still catching up
	ViewStateCatchUp
	// ViewStateRunning - the view (i.e. all partitions) has caught up and is running
	ViewStateRunning
)

type ViewStats

type ViewStats struct {
	Partitions map[int32]*TableStats
}

ViewStats represents the metrics of all partitions of a view.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL