README ¶
Goka
Goka is a compact yet powerful distributed stream processing library for Apache Kafka written in Go. Goka aims to reduce the complexity of building highly scalable and highly available microservices.
Goka extends the concept of Kafka consumer groups by binding a state table to them and persisting them in Kafka. Goka provides sane defaults and a pluggable architecture.
Features
-
Message Input and Output
Goka handles all the message input and output for you. You only have to provide one or more callback functions that handle messages from any of the Kafka topics you are interested in. You only ever have to deal with deserialized messages.
-
Scaling
Goka automatically distributes the processing and state across multiple instances of a service. This enables effortless scaling when the load increases.
-
Fault Tolerance
In case of a failure, Goka will redistribute the failed instance's workload and state across the remaining healthy instances. All state is safely stored in Kafka and messages delivered with at-least-once semantics.
-
Built-in Monitoring and Introspection
Goka provides a web interface for monitoring performance and querying values in the state.
-
Modularity
Goka fosters a pluggable architecture which enables you to replace for example the storage layer or the Kafka communication layer.
Documentation
This README provides a brief, high level overview of the ideas behind Goka. A more detailed introduction of the project can be found in this blog post.
Package API documentation is available at GoDoc and the Wiki provides several tips for configuring, extending, and deploying Goka applications.
Installation
You can install Goka by running the following command:
$ go get -u github.com/lovoo/goka
Concepts
Goka relies on Kafka for message passing, fault-tolerant state storage and workload partitioning.
-
Emitters deliver key-value messages into Kafka. As an example, an emitter could be a database handler emitting the state changes into Kafka for other interested applications to consume.
-
Processor is a set of callback functions that consume and perform state transformations upon delivery of these emitted messages. Processor groups are formed of one or more instances of a processor. Goka distributes the partitions of the input topics across all processor instances in a processor group. This enables effortless scaling and fault-tolerance. If a processor instance fails, its partitions and state are reassigned to the remaining healthy members of the processor group. Processors can also emit further messages into Kafka.
-
Group table is the state of a processor group. It is a partitioned key-value table stored in Kafka that belongs to a single processor group. If a processor instance fails, the remaining instances will take over the group table partitions of the failed instance recovering them from Kafka.
-
Views are local caches of a complete group table. Views provide read-only access to the group tables and can be used to provide external services for example through a gRPC interface.
-
Local storage keeps a local copy of the group table partitions to speedup recovery and reduce memory utilization. By default, the local storage uses LevelDB, but in-memory map and Redis-based storage are also available.
Get Started
An example Goka application could look like the following.
An emitter emits a single message with key "some-key" and value "some-value" into the "example-stream" topic.
A processor processes the "example-stream" topic counting the number of messages delivered for "some-key".
The counter is persisted in the "example-group-table" topic.
To locally start a dockerized Zookeeper and Kafka instances, execute make start
with the Makefile
in the examples folder.
package main
import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
)
var (
brokers = []string{"localhost:9092"}
topic goka.Stream = "example-stream"
group goka.Group = "example-group"
)
// emits a single message and leave
func runEmitter() {
emitter, err := goka.NewEmitter(brokers, topic, new(codec.String))
if err != nil {
log.Fatalf("error creating emitter: %v", err)
}
defer emitter.Finish()
err = emitter.EmitSync("some-key", "some-value")
if err != nil {
log.Fatalf("error emitting message: %v", err)
}
fmt.Println("message emitted")
}
// process messages until ctrl-c is pressed
func runProcessor() {
// process callback is invoked for each message delivered from
// "example-stream" topic.
cb := func(ctx goka.Context, msg interface{}) {
var counter int64
// ctx.Value() gets from the group table the value that is stored for
// the message's key.
if val := ctx.Value(); val != nil {
counter = val.(int64)
}
counter++
// SetValue stores the incremented counter in the group table for in
// the message's key.
ctx.SetValue(counter)
log.Printf("key = %s, counter = %v, msg = %v", ctx.Key(), counter, msg)
}
// Define a new processor group. The group defines all inputs, outputs, and
// serialization formats. The group-table topic is "example-group-table".
g := goka.DefineGroup(group,
goka.Input(topic, new(codec.String), cb),
goka.Persist(new(codec.Int64)),
)
p, err := goka.NewProcessor(brokers, g)
if err != nil {
log.Fatalf("error creating processor: %v", err)
}
go func() {
if err = p.Start(); err != nil {
log.Fatalf("error running processor: %v", err)
}
}()
wait := make(chan os.Signal, 1)
signal.Notify(wait, syscall.SIGINT, syscall.SIGTERM)
<-wait // wait for SIGINT/SIGTERM
p.Stop() // gracefully stop processor
}
func main() {
runEmitter() // emits one message and stops
runProcessor() // press ctrl-c to stop
}
Note that tables have to be configured in Kafka with log compaction. For details check the Wiki.
How to contribute
Contributions are always welcome. Please fork the repo, create a pull request against master, and be sure tests pass. See the GitHub Flow for details.
Documentation ¶
Overview ¶
Package goka is a stateful stream processing library for Apache Kafka (version 0.9+) that eases the development of microservices. Goka extends the concept of consumer group with a group table, which represents the state of the group. A microservice modifies and serves the content of a table employing two complementary object types: processors and views.
Processors ¶
A processor is a set of callback functions that modify the group table when messages arrive and may also emit messages into other topics. Messages as well as rows in the group table are key-value pairs. Callbacks receive the arriving message and the row addressed by the message's key.
In Kafka, keys are used to partition topics. A goka processor consumes from a set of co-partitioned topics (topics with the same number of partitions and the same key range). A group topic keeps track of the group table updates, allowing for recovery and rebalancing of processors: When multiple processor instances start in the same consumer group, the instances split the co-partitioned input topics and load the respective group table partitions from the group topic. A local disk storage minimizes recovery time by caching partitions of group table.
Views ¶
A view is a materialized (ie, persistent) cache of a group table. A view subscribes for the updates of all partitions of a group table and keeps local disk storage in sync with the group topic. With a view, one can easily serve up-to-date content of the group table via, for example, gRPC.
Index ¶
- func DefaultHasher() func() hash.Hash32
- func DefaultProcessorStoragePath(group Group) string
- func DefaultUpdate(s storage.Storage, partition int32, key string, value []byte) error
- func DefaultViewStoragePath() string
- func NewConstHasher(part uint32) hash.Hash32
- func NewMockController(t gomock.TestReporter) *gomock.Controller
- type Codec
- type Context
- type Edge
- func Input(topic Stream, c Codec, cb ProcessCallback) Edge
- func Inputs(topics Streams, c Codec, cb ProcessCallback) Edge
- func Join(topic Table, c Codec) Edge
- func Lookup(topic Table, c Codec) Edge
- func Loop(c Codec, cb ProcessCallback) Edge
- func Output(topic Stream, c Codec) Edge
- func Persist(c Codec) Edge
- type Edges
- type EmitHandler
- type Emitter
- type EmitterOption
- func WithEmitterClientID(clientID string) EmitterOption
- func WithEmitterHasher(hasher func() hash.Hash32) EmitterOption
- func WithEmitterLogger(log logger.Logger) EmitterOption
- func WithEmitterProducerBuilder(pb kafka.ProducerBuilder) EmitterOption
- func WithEmitterTopicManagerBuilder(tmb kafka.TopicManagerBuilder) EmitterOption
- type Getter
- type Group
- type GroupGraph
- func (gg *GroupGraph) Group() Group
- func (gg *GroupGraph) GroupTable() Edge
- func (gg *GroupGraph) InputStreams() Edges
- func (gg *GroupGraph) JointTables() Edges
- func (gg *GroupGraph) LookupTables() Edges
- func (gg *GroupGraph) LoopStream() Edge
- func (gg *GroupGraph) OutputStreams() Edges
- func (gg *GroupGraph) Validate() error
- type InputStats
- type Iterator
- type KafkaMock
- func (km *KafkaMock) Consume(topic string, key string, msg []byte)
- func (km *KafkaMock) ConsumeData(topic string, key string, data []byte)
- func (km *KafkaMock) ConsumeProto(topic string, key string, msg proto.Message)
- func (km *KafkaMock) ConsumeString(topic string, key string, msg string)
- func (km *KafkaMock) ExpectAllEmitted(handler func(topic string, key string, value []byte))
- func (km *KafkaMock) ExpectEmit(topic string, key string, expecter func(value []byte))
- func (km *KafkaMock) Finish(fail bool)
- func (km *KafkaMock) ProcessorOptions() []ProcessorOption
- func (km *KafkaMock) ReplaceEmitHandler(emitter EmitHandler)
- func (km *KafkaMock) SetCodec(codec Codec) *KafkaMock
- func (km *KafkaMock) SetGroupTableCreator(creator func() (string, []byte))
- func (km *KafkaMock) SetValue(key string, value interface{})
- func (km *KafkaMock) ValueForKey(key string) interface{}
- type NilHandling
- type OutputStats
- type PartitionStats
- type PartitionStatus
- type ProcessCallback
- type Processor
- type ProcessorOption
- func WithClientID(clientID string) ProcessorOption
- func WithConsumerBuilder(cb kafka.ConsumerBuilder) ProcessorOption
- func WithHasher(hasher func() hash.Hash32) ProcessorOption
- func WithLogger(log logger.Logger) ProcessorOption
- func WithNilHandling(nh NilHandling) ProcessorOption
- func WithPartitionChannelSize(size int) ProcessorOption
- func WithProducerBuilder(pb kafka.ProducerBuilder) ProcessorOption
- func WithStorageBuilder(sb storage.Builder) ProcessorOption
- func WithTopicManagerBuilder(tmb kafka.TopicManagerBuilder) ProcessorOption
- func WithUpdateCallback(cb UpdateCallback) ProcessorOption
- type ProcessorStats
- type Stream
- type Streams
- type Table
- type Tester
- type UpdateCallback
- type View
- func (v *View) Evict(key string) error
- func (v *View) Get(key string) (interface{}, error)
- func (v *View) Has(key string) (bool, error)
- func (v *View) Iterator() (Iterator, error)
- func (v *View) IteratorWithRange(start, limit string) (Iterator, error)
- func (v *View) Recovered() bool
- func (v *View) Start() error
- func (v *View) Stats() *ViewStats
- func (v *View) Stop()
- func (v *View) Terminate() error
- func (v *View) Topic() string
- type ViewOption
- func WithViewCallback(cb UpdateCallback) ViewOption
- func WithViewClientID(clientID string) ViewOption
- func WithViewConsumerBuilder(cb kafka.ConsumerBuilder) ViewOption
- func WithViewHasher(hasher func() hash.Hash32) ViewOption
- func WithViewLogger(log logger.Logger) ViewOption
- func WithViewPartitionChannelSize(size int) ViewOption
- func WithViewRestartable() ViewOption
- func WithViewStorageBuilder(sb storage.Builder) ViewOption
- func WithViewTopicManagerBuilder(tmb kafka.TopicManagerBuilder) ViewOption
- type ViewStats
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DefaultHasher ¶
DefaultHasher returns an FNV hasher builder to assign keys to partitions.
func DefaultProcessorStoragePath ¶
DefaultProcessorStoragePath is the default path where processor state will be stored.
func DefaultUpdate ¶
DefaultUpdate is the default callback used to update the local storage with from the table topic in Kafka. It is called for every message received during recovery of processors and during the normal operation of views. DefaultUpdate can be used in the function passed to WithUpdateCallback and WithViewCallback.
func DefaultViewStoragePath ¶
func DefaultViewStoragePath() string
DefaultViewStoragePath returns the default path where view state will be stored.
func NewConstHasher ¶
NewConstHasher creates a constant hasher that hashes any value to 0.
func NewMockController ¶
func NewMockController(t gomock.TestReporter) *gomock.Controller
NewMockController returns a *gomock.Controller using a wrapped testing.T (or whatever) which panics on a Fatalf. This is necessary when using a mock in kafkamock. Otherwise it will freeze on an unexpected call.
Types ¶
type Codec ¶
type Codec interface { Encode(value interface{}) (data []byte, err error) Decode(data []byte) (value interface{}, err error) }
Codec decodes and encodes from and to []byte
type Context ¶
type Context interface { // Topic returns the topic of input message. Topic() Stream // Key returns the key of the input message. Key() string // Value returns the value of the key in the group table. Value() interface{} // SetValue updates the value of the key in the group table. SetValue(value interface{}) // Delete deletes a value from the group table. IMPORTANT: this deletes the // value associated with the key from both the local cache and the persisted // table in Kafka. Delete() // Timestamp returns the timestamp of the input message. If the timestamp is // invalid, a zero time will be returned. Timestamp() time.Time // Join returns the value of key in the copartitioned table. Join(topic Table) interface{} // Lookup returns the value of key in the view of table. Lookup(topic Table, key string) interface{} // Emit asynchronously writes a message into a topic. Emit(topic Stream, key string, value interface{}) // Loopback asynchronously sends a message to another key of the group // table. Value passed to loopback is encoded via the codec given in the // Loop subscription. Loopback(key string, value interface{}) // Fail stops execution and shuts down the processor Fail(err error) }
Context provides access to the processor's table and emit capabilities to arbitrary topics in kafka. Upon arrival of a message from subscribed topics, the respective ConsumeCallback is invoked with a context object along with the input message.
type Edge ¶
Edge represents a topic in Kafka and the corresponding codec to encode and decode the messages of that topic.
func Input ¶
func Input(topic Stream, c Codec, cb ProcessCallback) Edge
Input represents an edge of an input stream topic. The edge specifies the topic name, its codec and the ProcessorCallback used to process it. The topic has to be copartitioned with any other input stream of the group and with the group table. The group starts reading the topic from the newest offset.
func Inputs ¶
func Inputs(topics Streams, c Codec, cb ProcessCallback) Edge
Inputs creates edges of multiple input streams sharing the same codec and callback.
func Join ¶
Join represents an edge of a copartitioned, log-compacted table topic. The edge specifies the topic name and the codec of the messages of the topic. The group starts reading the topic from the oldest offset. The processing of input streams is blocked until all partitions of the table are recovered.
func Lookup ¶
Lookup represents an edge of a non-copartitioned, log-compacted table topic. The edge specifies the topic name and the codec of the messages of the topic. The group starts reading the topic from the oldest offset. The processing of input streams is blocked until the table is fully recovered.
func Loop ¶
func Loop(c Codec, cb ProcessCallback) Edge
Loop represents the edge of the loopback topic of the group. The edge specifies the codec of the messages in the topic and ProcesCallback to process the messages of the topic. Context.Loopback() is used to write messages into this topic from any callback of the group.
func Output ¶
Output represents an edge of an output stream topic. The edge specifies the topic name and the codec of the messages of the topic. Context.Emit() only emits messages into Output edges defined in the group graph. The topic does not have to be copartitioned with the input streams.
func Persist ¶
Persist represents the edge of the group table, which is log-compacted and copartitioned with the input streams. This edge specifies the codec of the messages in the topic, ie, the codec of the values of the table. The processing of input streams is blocked until all partitions of the group table are recovered.
type EmitHandler ¶
EmitHandler abstracts a function that allows to overwrite kafkamock's Emit function to simulate producer errors
type Emitter ¶
type Emitter struct {
// contains filtered or unexported fields
}
Emitter emits messages into a specific Kafka topic, first encoding the message with the given codec.
func NewEmitter ¶
func NewEmitter(brokers []string, topic Stream, codec Codec, options ...EmitterOption) (*Emitter, error)
NewEmitter creates a new emitter using passed brokers, topic, codec and possibly options.
type EmitterOption ¶
type EmitterOption func(*eoptions)
EmitterOption defines a configuration option to be used when creating an emitter.
func WithEmitterClientID ¶
func WithEmitterClientID(clientID string) EmitterOption
WithEmitterClientID defines the client ID used to identify with kafka.
func WithEmitterHasher ¶
func WithEmitterHasher(hasher func() hash.Hash32) EmitterOption
WithEmitterHasher sets the hash function that assigns keys to partitions.
func WithEmitterLogger ¶
func WithEmitterLogger(log logger.Logger) EmitterOption
WithEmitterLogger sets the logger the emitter should use. By default, emitters use the standard library logger.
func WithEmitterProducerBuilder ¶
func WithEmitterProducerBuilder(pb kafka.ProducerBuilder) EmitterOption
WithEmitterProducerBuilder replaces the default producer builder.
func WithEmitterTopicManagerBuilder ¶
func WithEmitterTopicManagerBuilder(tmb kafka.TopicManagerBuilder) EmitterOption
WithEmitterTopicManagerBuilder replaces the default topic manager builder.
type Getter ¶
Getter functions return a value for a key or an error. If no value exists for the key, nil is returned without errors.
type Group ¶
type Group string
Group is the name of a consumer group in Kafka and represents a processor group in Goka. A processor group may have a group table and a group loopback stream. By default, the group table is named <group>-table and the loopback stream <group>-loop.
type GroupGraph ¶
type GroupGraph struct {
// contains filtered or unexported fields
}
GroupGraph is the specification of a processor group. It contains all input, output, and any other topic from which and into which the processor group may consume or produce events. Each of these links to Kafka is called Edge.
func DefineGroup ¶
func DefineGroup(group Group, edges ...Edge) *GroupGraph
DefineGroup creates a group graph with a given group name and a list of edges.
func (*GroupGraph) GroupTable ¶
func (gg *GroupGraph) GroupTable() Edge
GroupTable returns the group table edge of the group.
func (*GroupGraph) InputStreams ¶
func (gg *GroupGraph) InputStreams() Edges
InputStreams returns all input stream edges of the group.
func (*GroupGraph) JointTables ¶
func (gg *GroupGraph) JointTables() Edges
JointTables retuns all joint table edges of the group.
func (*GroupGraph) LookupTables ¶
func (gg *GroupGraph) LookupTables() Edges
LookupTables retuns all lookup table edges of the group.
func (*GroupGraph) LoopStream ¶
func (gg *GroupGraph) LoopStream() Edge
LoopStream returns the loopback edge of the group.
func (*GroupGraph) OutputStreams ¶
func (gg *GroupGraph) OutputStreams() Edges
OutputStreams returns the output stream edges of the group.
func (*GroupGraph) Validate ¶
func (gg *GroupGraph) Validate() error
Validate validates the group graph and returns an error if invalid. Main validation checks are: - at most one loopback stream edge is allowed - at most one group table edge is allowed - at least one input stream is required - table and loopback topics cannot be used in any other edge.
type InputStats ¶
InputStats represents the number of messages and the number of bytes consumed from a stream or table topic since the process started.
type Iterator ¶
type Iterator interface { Next() bool Key() string Value() (interface{}, error) Release() Seek(key string) bool }
Iterator allows one to iterate over the keys of a view.
type KafkaMock ¶
type KafkaMock struct {
// contains filtered or unexported fields
}
KafkaMock allows interacting with a test processor
func NewKafkaMock ¶
NewKafkaMock returns a new testprocessor mocking every external service
func (*KafkaMock) ConsumeData ¶
ConsumeData simulates a message with a byte slice payload. This is the same as Consume. ConsumeData is a helper function consuming marshalled data. This function is used by ConsumeProto by the test case as well as any emit calls of the processor being tested.
func (*KafkaMock) ConsumeProto ¶
ConsumeProto simulates a message on kafka in a topic with a key.
func (*KafkaMock) ConsumeString ¶
ConsumeString simulates a message with a string payload.
func (*KafkaMock) ExpectAllEmitted ¶
ExpectAllEmitted calls passed expected-emit-handler function for all emitted values and clears the emitted values
func (*KafkaMock) ExpectEmit ¶
ExpectEmit ensures a message exists in passed topic and key. The message may be inspected/unmarshalled by a passed expecter function.
func (*KafkaMock) Finish ¶
Finish marks the kafkamock that there is no emit to be expected. Set @param fail to true, if kafkamock is supposed to fail the test case in case of remaining emits. Clears the list of emits either case. This should always be called at the end of a test case to make sure no emits of prior test cases are stuck in the list and mess with the test results.
func (*KafkaMock) ProcessorOptions ¶
func (km *KafkaMock) ProcessorOptions() []ProcessorOption
ProcessorOptions returns the options that must be passed to NewProcessor to use the Mock. It essentially replaces the consumer/producer/topicmanager with a mock. For convenience, the storage is also mocked. For example, a normal call to NewProcessor like this
NewProcessor(brokers, group, subscriptions, option_a, option_b, option_c, )
would become in the unit test: kafkaMock := NewKafkaMock(t) NewProcessor(brokers, group, subscriptions,
append(kafkaMock.ProcessorOptions(), option_a, option_b, option_c, )..., )
func (*KafkaMock) ReplaceEmitHandler ¶
func (km *KafkaMock) ReplaceEmitHandler(emitter EmitHandler)
ReplaceEmitHandler replaces the emitter.
func (*KafkaMock) SetGroupTableCreator ¶
SetGroupTableCreator sets a creator for the group table.
func (*KafkaMock) ValueForKey ¶
ValueForKey attempts to get a value from KafkaMock's storage.
type NilHandling ¶
type NilHandling int
NilHandling defines how nil messages should be handled by the processor.
const ( // NilIgnore drops any message with nil value. NilIgnore NilHandling = 0 + iota // NilProcess passes the nil value to ProcessCallback. NilProcess // NilDecode passes the nil value to decoder before calling ProcessCallback. NilDecode )
type OutputStats ¶
OutputStats represents the number of messages and the number of bytes emitted into a stream or table since the process started.
type PartitionStats ¶
type PartitionStats struct { Now time.Time Table struct { Status PartitionStatus Stalled bool Offset int64 // last offset processed or recovered Hwm int64 // next offset to be written StartTime time.Time RecoveryTime time.Time } Input map[string]InputStats Output map[string]OutputStats }
PartitionStats represents metrics and measurements of a partition.
type PartitionStatus ¶
type PartitionStatus int
PartitionStatus is the status of the partition of a table (group table or joined table).
const ( // PartitionRecovering indicates the partition is recovering and the storage // is writing updates in bulk-mode (if the storage implementation supports it). PartitionRecovering PartitionStatus = iota // PartitionPreparing indicates the end of the bulk-mode. Depending on the storage // implementation, the Preparing phase may take long because the storage compacts its logs. PartitionPreparing // PartitionRunning indicates the partition is recovered and processing updates // in normal operation. PartitionRunning )
type ProcessCallback ¶
type ProcessCallback func(ctx Context, msg interface{})
ProcessCallback function is called for every message received by the processor.
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
Processor is a set of stateful callback functions that, on the arrival of messages, modify the content of a table (the group table) and emit messages into other topics. Messages as well as rows in the group table are key-value pairs. A group is composed by multiple processor instances.
Example (Simplest) ¶
Example shows how to use a callback. For each partition of the topics, a new goroutine will be created. Topics should be co-partitioned (they should have the same number of partitions and be partitioned by the same key).
Output:
func NewProcessor ¶
func NewProcessor(brokers []string, gg *GroupGraph, options ...ProcessorOption) (*Processor, error)
NewProcessor creates a processor instance in a group given the address of Kafka brokers, the consumer group name, a list of subscriptions (topics, codecs, and callbacks), and series of options.
func (*Processor) Get ¶
Get returns a read-only copy of a value from the group table if the respective partition is owned by the processor instace. Get can be called by multiple goroutines concurrently. Get can be only used with stateful processors (ie, when group table is enabled) and after Recovered returns true.
func (*Processor) Graph ¶
func (g *Processor) Graph() *GroupGraph
Graph returns the GroupGraph given at the creation of the processor.
func (*Processor) Recovered ¶
Recovered returns true when the processor has caught up with events from kafka.
func (*Processor) Start ¶
Start starts receiving messages from Kafka for the subscribed topics. For each partition, a recovery will be attempted.
func (*Processor) Stats ¶
func (g *Processor) Stats() *ProcessorStats
Stats returns a set of performance metrics of the processor.
type ProcessorOption ¶
type ProcessorOption func(*poptions)
ProcessorOption defines a configuration option to be used when creating a processor.
func WithClientID ¶
func WithClientID(clientID string) ProcessorOption
WithClientID defines the client ID used to identify with Kafka.
func WithConsumerBuilder ¶
func WithConsumerBuilder(cb kafka.ConsumerBuilder) ProcessorOption
WithConsumerBuilder replaces the default consumer builder.
func WithHasher ¶
func WithHasher(hasher func() hash.Hash32) ProcessorOption
WithHasher sets the hash function that assigns keys to partitions.
func WithLogger ¶
func WithLogger(log logger.Logger) ProcessorOption
WithLogger sets the logger the processor should use. By default, processors use the standard library logger.
func WithNilHandling ¶
func WithNilHandling(nh NilHandling) ProcessorOption
WithNilHandling configures how the processor should handle messages with nil value. By default the processor ignores nil messages.
func WithPartitionChannelSize ¶
func WithPartitionChannelSize(size int) ProcessorOption
WithPartitionChannelSize replaces the default partition channel size. This is mostly used for testing by setting it to 0 to have synchronous behavior of goka.
func WithProducerBuilder ¶
func WithProducerBuilder(pb kafka.ProducerBuilder) ProcessorOption
WithProducerBuilder replaces the default producer builder.
func WithStorageBuilder ¶
func WithStorageBuilder(sb storage.Builder) ProcessorOption
WithStorageBuilder defines a builder for the storage of each partition.
func WithTopicManagerBuilder ¶
func WithTopicManagerBuilder(tmb kafka.TopicManagerBuilder) ProcessorOption
WithTopicManagerBuilder replaces the default topic manager builder.
func WithUpdateCallback ¶
func WithUpdateCallback(cb UpdateCallback) ProcessorOption
WithUpdateCallback defines the callback called upon recovering a message from the log.
type ProcessorStats ¶
type ProcessorStats struct { Group map[int32]*PartitionStats Joined map[int32]map[string]*PartitionStats Lookup map[string]*ViewStats }
ProcessorStats represents the metrics of all partitions of the processor, including its group, joined tables and lookup tables.
type Stream ¶
type Stream string
Stream is the name of an event stream topic in Kafka, ie, a topic with cleanup.policy=delete
type Table ¶
type Table string
Table is the name of a table topic in Kafka, ie, a topic with cleanup.policy=compact
func GroupTable ¶
GroupTable returns the name of the group table of group.
type Tester ¶
type Tester interface { Errorf(format string, args ...interface{}) Fatalf(format string, args ...interface{}) Fatal(a ...interface{}) }
Tester abstracts the interface we assume from the test case. Will most likely be *testing.T
type UpdateCallback ¶
UpdateCallback is invoked upon arrival of a message for a table partition. The partition storage shall be updated in the callback.
type View ¶
type View struct {
// contains filtered or unexported fields
}
View is a materialized (i.e. persistent) cache of a group table.
Example (Simple) ¶
Output:
func (*View) Evict ¶
Evict removes the given key only from the local cache. In order to delete a key from Kafka and other Views, context.Delete should be used on a Processor.
func (*View) Get ¶
Get returns the value for the key in the view, if exists. Nil if it doesn't. Get can be called by multiple goroutines concurrently. Get can only be called after Recovered returns true.
func (*View) IteratorWithRange ¶
IteratorWithRange returns an iterator that iterates over the state of the View. This iterator is build using the range.
type ViewOption ¶
type ViewOption func(*voptions)
ViewOption defines a configuration option to be used when creating a view.
func WithViewCallback ¶
func WithViewCallback(cb UpdateCallback) ViewOption
WithViewCallback defines the callback called upon recovering a message from the log.
func WithViewClientID ¶
func WithViewClientID(clientID string) ViewOption
WithViewClientID defines the client ID used to identify with Kafka.
func WithViewConsumerBuilder ¶
func WithViewConsumerBuilder(cb kafka.ConsumerBuilder) ViewOption
WithViewConsumerBuilder replaces default view consumer.
func WithViewHasher ¶
func WithViewHasher(hasher func() hash.Hash32) ViewOption
WithViewHasher sets the hash function that assigns keys to partitions.
func WithViewLogger ¶
func WithViewLogger(log logger.Logger) ViewOption
WithViewLogger sets the logger the view should use. By default, views use the standard library logger.
func WithViewPartitionChannelSize ¶
func WithViewPartitionChannelSize(size int) ViewOption
WithViewPartitionChannelSize replaces the default partition channel size. This is mostly used for testing by setting it to 0 to have synchronous behavior of goka.
func WithViewRestartable ¶
func WithViewRestartable() ViewOption
WithViewRestartable defines the view can be restarted, even when Start() returns errors. If the view is restartable, the client must call Terminate() to release all resources, ie, close the local storage.
func WithViewStorageBuilder ¶
func WithViewStorageBuilder(sb storage.Builder) ViewOption
WithViewStorageBuilder defines a builder for the storage of each partition.
func WithViewTopicManagerBuilder ¶
func WithViewTopicManagerBuilder(tmb kafka.TopicManagerBuilder) ViewOption
WithViewTopicManagerBuilder replaces the default topic manager.
type ViewStats ¶
type ViewStats struct {
Partitions map[int32]*PartitionStats
}
ViewStats represents the metrics of all partitions of a view.