Documentation
¶
Overview ¶
Package goka is a stateful stream processing library for Apache Kafka (version 0.9+) that eases the development of microservices. Goka extends the concept of consumer group with a group table, which represents the state of the group. A microservice modifies and serves the content of a table employing two complementary object types: processors and views.
Processors ¶
A processor is a set of callback functions that modify the group table when messages arrive and may also emit messages into other topics. Messages as well as rows in the group table are key-value pairs. Callbacks receive the arriving message and the row addressed by the message's key.
In Kafka, keys are used to partition topics. A goka processor consumes from a set of co-partitioned topics (topics with the same number of partitions and the same key range). A group topic keeps track of the group table updates, allowing for recovery and rebalancing of processors: When multiple processor instances start in the same consumer group, the instances split the co-partitioned input topics and load the respective group table partitions from the group topic. A local disk storage minimizes recovery time by caching partitions of group table.
Views ¶
A view is a materialized (ie, persistent) cache of a group table. A view subscribes for the updates of all partitions of a group table and keeps local disk storage in sync with the group topic. With a view, one can easily serve up-to-date content of the group table via, for example, gRPC.
Index ¶
- func DefaultUpdate(s storage.Storage, partition int32, key string, value []byte) error
- func NewMockController(t gomock.TestReporter) *gomock.Controller
- type Codec
- type Context
- type Edge
- func Input(topic Stream, c Codec, cb ProcessCallback) Edge
- func Inputs(topics Streams, c Codec, cb ProcessCallback) Edge
- func Join(topic Table, c Codec) Edge
- func Lookup(topic Table, c Codec) Edge
- func Loop(c Codec, cb ProcessCallback) Edge
- func Output(topic Stream, c Codec) Edge
- func Persist(c Codec) Edge
- type Edges
- type EmitHandler
- type Emitter
- type EmitterOption
- type Errors
- type Getter
- type Group
- type GroupGraph
- func (gg *GroupGraph) Group() Group
- func (gg *GroupGraph) GroupTable() Edge
- func (gg *GroupGraph) InputStreams() Edges
- func (gg *GroupGraph) JointTables() Edges
- func (gg *GroupGraph) LookupTables() Edges
- func (gg *GroupGraph) LoopStream() Edge
- func (gg *GroupGraph) OutputStreams() Edges
- func (gg *GroupGraph) Validate() error
- type KafkaMock
- func (km *KafkaMock) ConsumeProto(topic string, key string, msg proto.Message)
- func (km *KafkaMock) ConsumeString(topic string, key string, msg string)
- func (km *KafkaMock) ExpectAllEmitted(handler func(topic string, key string, value []byte))
- func (km *KafkaMock) ExpectEmit(topic string, key string, expecter func(value []byte))
- func (km *KafkaMock) Finish(fail bool)
- func (km *KafkaMock) ProcessorOptions() []ProcessorOption
- func (km *KafkaMock) ReplaceEmitHandler(emitter EmitHandler)
- func (km *KafkaMock) SetGroupTableCreator(creator func() (string, []byte))
- func (km *KafkaMock) SetValue(key string, value interface{})
- func (km *KafkaMock) ValueForKey(key string) interface{}
- type ProcessCallback
- type Processor
- type ProcessorOption
- func WithClientID(clientID string) ProcessorOption
- func WithConsumer(c kafka.Consumer) ProcessorOption
- func WithKafkaMetrics(registry metrics.Registry) ProcessorOption
- func WithPartitionChannelSize(size int) ProcessorOption
- func WithProducer(p kafka.Producer) ProcessorOption
- func WithStorageBuilder(sb StorageBuilder) ProcessorOption
- func WithStoragePath(storagePath string) ProcessorOption
- func WithStorageSnapshotInterval(interval time.Duration) ProcessorOption
- func WithTopicManager(tm kafka.TopicManager) ProcessorOption
- func WithUpdateCallback(cb UpdateCallback) ProcessorOption
- type StorageBuilder
- type Stream
- type Streams
- type Table
- type Tester
- type UpdateCallback
- type View
- type ViewOption
- func WithViewCallback(cb UpdateCallback) ViewOption
- func WithViewConsumer(c kafka.Consumer) ViewOption
- func WithViewKafkaMetrics(registry metrics.Registry) ViewOption
- func WithViewPartitionChannelSize(size int) ViewOption
- func WithViewStorageBuilder(sb StorageBuilder) ViewOption
- func WithViewStoragePath(storagePath string) ViewOption
- func WithViewStorageSnapshotInterval(interval time.Duration) ViewOption
- func WithViewTopicManager(tm kafka.TopicManager) ViewOption
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DefaultUpdate ¶
DefaultUpdate is the default callback used to update the local storage with from the table topic in Kafka. It is called for every message received during recovery of processors and during the normal operation of views. DefaultUpdate can be used in the function passed to WithUpdateCallback and WithViewCallback.
func NewMockController ¶
func NewMockController(t gomock.TestReporter) *gomock.Controller
NewMockController returns a *gomock.Controller using a wrapped testing.T (or whatever) which panics on a Fatalf. This is necessary when using a mock in kafkamock. Otherwise it will freeze on an unexpected call.
Types ¶
type Codec ¶
type Codec interface { Encode(value interface{}) (data []byte, err error) Decode(data []byte) (value interface{}, err error) }
Codec decodes and encodes from and to []byte
type Context ¶
type Context interface { // Topic returns the topic of input message. Topic() Stream // Key returns the key of the input message. Key() string // Value returns the value of the key in the group table. Value() interface{} // SetValue updates the value of the key in the group table. SetValue(value interface{}) // Join returns the value of key in the copartitioned table. Join(topic Table) interface{} // Lookup returns the value of key in the view of table. Lookup(topic Table, key string) interface{} // Emit asynchronously writes a message into a topic. Emit(topic Stream, key string, value interface{}) // Loopback asynchronously sends a message to another key of the group // table. Value passed to loopback is encoded via the codec given in the // Loop subscription. Loopback(key string, value interface{}) // Fail stops execution and shuts down the processor Fail(err error) }
Context provides access to the processor's table and emit capabilities to arbitrary topics in kafka. Upon arrival of a message from subscribed topics, the respective ConsumeCallback is invoked with a context object along with the input message.
type Edge ¶
func Input ¶
func Input(topic Stream, c Codec, cb ProcessCallback) Edge
Stream returns a subscription for a co-partitioned topic. The processor subscribing for a stream topic will start reading from the newest offset of the partition.
func Inputs ¶
func Inputs(topics Streams, c Codec, cb ProcessCallback) Edge
Inputs creates Edges for multiple input streams sharing the same codec and callback.
func Join ¶
Table is one or more co-partitioned, log-compacted topic. The processor subscribing for a table topic will start reading from the oldest offset of the partition.
func Loop ¶
func Loop(c Codec, cb ProcessCallback) Edge
Loop defines a consume callback on the loop topic
type EmitHandler ¶
EmitHandler abstracts a function that allows to overwrite kafkamock's Emit function to simulate producer errors
type Emitter ¶
type Emitter struct {
// contains filtered or unexported fields
}
func NewEmitter ¶
func NewEmitter(brokers []string, topic Stream, codec Codec, options ...EmitterOption) (*Emitter, error)
NewEmitter creates a new emitter using passed brokers, topic, codec and possibly options
type EmitterOption ¶
type EmitterOption func(*eoptions)
EmitterOption defines a configuration option to be used when creating an emitter.
func WithEmitterClientID ¶
func WithEmitterClientID(clientID string) EmitterOption
WithEmitterClientID defines the client ID used to identify with kafka.
func WithEmitterKafkaMetrics ¶
func WithEmitterKafkaMetrics(registry metrics.Registry) EmitterOption
WithEmitterKafkaMetrics sets a go-metrics registry to collect kafka metrics. The metric-points are https://godoc.org/github.com/Shopify/sarama
func WithEmitterProducer ¶
func WithEmitterProducer(p kafka.Producer) EmitterOption
WithEmitterProducer replaces goka's default producer. Mainly for testing.
func WithEmitterTopicManager ¶
func WithEmitterTopicManager(tm kafka.TopicManager) EmitterOption
WithEmitterTopicManager defines a topic manager.
type Errors ¶
type Errors struct {
// contains filtered or unexported fields
}
Errors represent a list of errors triggered during the execution of a goka view/processor. Normally, the first error leads to stopping the processor/view, but during shutdown, more errors might occur.
type Getter ¶
Getter functions return a value for a key or an error. If no value exists for the key, nil is returned without errors.
type GroupGraph ¶
type GroupGraph struct {
// contains filtered or unexported fields
}
func DefineGroup ¶
func DefineGroup(group Group, edges ...Edge) *GroupGraph
func (*GroupGraph) Group ¶
func (gg *GroupGraph) Group() Group
func (*GroupGraph) GroupTable ¶
func (gg *GroupGraph) GroupTable() Edge
func (*GroupGraph) InputStreams ¶
func (gg *GroupGraph) InputStreams() Edges
func (*GroupGraph) JointTables ¶
func (gg *GroupGraph) JointTables() Edges
func (*GroupGraph) LookupTables ¶
func (gg *GroupGraph) LookupTables() Edges
func (*GroupGraph) LoopStream ¶
func (gg *GroupGraph) LoopStream() Edge
func (*GroupGraph) OutputStreams ¶
func (gg *GroupGraph) OutputStreams() Edges
func (*GroupGraph) Validate ¶
func (gg *GroupGraph) Validate() error
type KafkaMock ¶
type KafkaMock struct {
// contains filtered or unexported fields
}
KafkaMock is allows interacting with a test processor
func NewKafkaMock ¶
NewKafkaMock returns a new testprocessor mocking every external service
func (*KafkaMock) ConsumeProto ¶
ConsumeProto simulates a message on kafka in a topic with a key.
func (*KafkaMock) ConsumeString ¶
func (*KafkaMock) ExpectAllEmitted ¶
ExpectAllEmitted calls passed expected-emit-handler function for all emitted values and clears the emitted values
func (*KafkaMock) ExpectEmit ¶
ExpectEmit ensures a message exists in passed topic and key. The message may be inspected/unmarshalled by a passed expecter function.
func (*KafkaMock) Finish ¶
Finish marks the kafkamock that there is no emit to be expected. Set @param fail to true, if kafkamock is supposed to fail the test case in case of remaining emits. Clears the list of emits either case. This should always be called at the end of a test case to make sure no emits of prior test cases are stuck in the list and mess with the test results.
func (*KafkaMock) ProcessorOptions ¶
func (km *KafkaMock) ProcessorOptions() []ProcessorOption
ProcessorOptions returns the options that must be passed to NewProcessor to use the Mock. It essentially replaces the consumer/producer/topicmanager with a mock. For convenience, the storage is also mocked. For example, a normal call to NewProcessor like this
NewProcessor(brokers, group, subscriptions, option_a, option_b, option_c, )
would become in the unit test: kafkaMock := NewKafkaMock(t) NewProcessor(brokers, group, subscriptions,
append(kafkaMock.ProcessorOptions(), option_a, option_b, option_c, )..., )
func (*KafkaMock) ReplaceEmitHandler ¶
func (km *KafkaMock) ReplaceEmitHandler(emitter EmitHandler)
func (*KafkaMock) SetGroupTableCreator ¶
func (*KafkaMock) ValueForKey ¶
ValueForKey attempts to get a value from KafkaMock's storage.
type ProcessCallback ¶
type ProcessCallback func(ctx Context, msg interface{})
ProcessCallback function is called for every message received by the processor.
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
Processor is a set of stateful callback functions that, on the arrival of messages, modify the content of a table (the group table) and emit messages into other topics. Messages as well as rows in the group table are key-value pairs. A group is composed by multiple processor instances.
Example (Simplest) ¶
Example shows how to use a callback. For each partition of the topics, a new goroutine will be created. Topics should be co-partitioned (they should have the same number of partitions and be partitioned by the same key).
var ( brokers = []string{"127.0.0.1:9092"} group Group = "group" topic Stream = "topic" ) consume := func(ctx Context, m interface{}) { fmt.Printf("Hello world: %v", m) } c, err := NewProcessor(brokers, DefineGroup(group, Input(topic, rawCodec, consume))) if err != nil { log.Fatalln(err) } // start consumer with a goroutine (blocks) go func() { err := c.Start() panic(err) }() // wait for bad things to happen wait := make(chan os.Signal, 1) signal.Notify(wait, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM) <-wait c.Stop()
Output:
func NewProcessor ¶
func NewProcessor(brokers []string, gg *GroupGraph, options ...ProcessorOption) (*Processor, error)
NewProcessor creates a processor instance in a group given the address of Kafka brokers, the consumer group name, a list of subscriptions (topics, codecs, and callbacks), and series of options.
func (*Processor) Get ¶
Get returns a read-only copy of a value from the group table if the respective partition is owned by the processor instace. Get can be only used with stateful processors (ie, when group table is enabled).
func (*Processor) Registry ¶
func (g *Processor) Registry() metrics.Registry
Registry returns the go-metrics registry used by the processor.
type ProcessorOption ¶
type ProcessorOption func(*poptions)
ProcessorOption defines a configuration option to be used when creating a processor.
func WithClientID ¶
func WithClientID(clientID string) ProcessorOption
WithClientID defines the client ID used to identify with kafka.
func WithConsumer ¶
func WithConsumer(c kafka.Consumer) ProcessorOption
WithConsumer replaces goka's default consumer. Mainly for testing.
func WithKafkaMetrics ¶
func WithKafkaMetrics(registry metrics.Registry) ProcessorOption
WithKafkaMetrics sets a go-metrics registry to collect kafka metrics. The metric-points are https://godoc.org/github.com/Shopify/sarama
func WithPartitionChannelSize ¶
func WithPartitionChannelSize(size int) ProcessorOption
WithPartitionChannelSize replaces the default partition channel size. This is mostly used for testing by setting it to 0 to have synchronous behavior of goka.
func WithProducer ¶
func WithProducer(p kafka.Producer) ProcessorOption
WithProducer replaces goka'S default producer. Mainly for testing.
func WithStorageBuilder ¶
func WithStorageBuilder(sb StorageBuilder) ProcessorOption
WithStorageBuilder defines a builder for the storage of each partition.
func WithStoragePath ¶
func WithStoragePath(storagePath string) ProcessorOption
WithStoragePath defines the base path for the local storage on disk
func WithStorageSnapshotInterval ¶
func WithStorageSnapshotInterval(interval time.Duration) ProcessorOption
WithStorageSnapshotInterval sets the interval in which the storage will snapshot to disk (if it is supported by the storage at all) Greater interval -> less writes to disk, more memory usage Smaller interval -> more writes to disk, less memory usage
func WithTopicManager ¶
func WithTopicManager(tm kafka.TopicManager) ProcessorOption
WithTopicManager defines a topic manager.
func WithUpdateCallback ¶
func WithUpdateCallback(cb UpdateCallback) ProcessorOption
WithUpdateCallback defines the callback called upon recovering a message from the log.
type StorageBuilder ¶
type StorageBuilder func(topic string, partition int32, codec Codec, reg metrics.Registry) (storage.Storage, error)
StorageBuilder creates a local storage (a persistent cache) for a topic table. StorageBuilder creates one storage for each partition of the topic.
type Table ¶
type Table string
func GroupTable ¶
GroupTable returns the name of the group table of group.
type Tester ¶
type Tester interface { Errorf(format string, args ...interface{}) Fatalf(format string, args ...interface{}) Fatal(a ...interface{}) }
Tester abstracts the interface we assume from the test case. Will most likely be *testing.T
type UpdateCallback ¶
UpdateCallback is invoked upon arrival of a message for a table partition. The partition storage shall be updated in the callback.
type View ¶
type View struct {
// contains filtered or unexported fields
}
View is a materialized (i.e. persistent) cache of a group table.
Example (Simple) ¶
var ( brokers = []string{"localhost:9092"} group Group = "group-name" ) sr, err := NewView(brokers, GroupTable(group), nil) if err != nil { panic(err) } errs := sr.Start() if errs != nil { panic(errs) }
Output:
func (*View) Registry ¶
func (v *View) Registry() metrics.Registry
Registry returns the go-metrics registry used by the view.
type ViewOption ¶
type ViewOption func(*voptions)
ViewOption defines a configuration option to be used when creating a view.
func WithViewCallback ¶
func WithViewCallback(cb UpdateCallback) ViewOption
WithViewCallback defines the callback called upon recovering a message from the log.
func WithViewConsumer ¶
func WithViewConsumer(c kafka.Consumer) ViewOption
WithViewConsumer replaces goka's default view consumer. Mainly for testing.
func WithViewKafkaMetrics ¶
func WithViewKafkaMetrics(registry metrics.Registry) ViewOption
WithViewKafkaMetrics sets a go-metrics registry to collect kafka metrics. The metric-points are https://godoc.org/github.com/Shopify/sarama
func WithViewPartitionChannelSize ¶
func WithViewPartitionChannelSize(size int) ViewOption
WithViewPartitionChannelSize replaces the default partition channel size. This is mostly used for testing by setting it to 0 to have synchronous behavior of goka.
func WithViewStorageBuilder ¶
func WithViewStorageBuilder(sb StorageBuilder) ViewOption
WithViewStorageBuilder defines a builder for the storage of each partition.
func WithViewStoragePath ¶
func WithViewStoragePath(storagePath string) ViewOption
WithViewStoragePath defines the base path for the local storage on disk
func WithViewStorageSnapshotInterval ¶
func WithViewStorageSnapshotInterval(interval time.Duration) ViewOption
WithViewStorageSnapshotInterval sets the interval in which the storage will snapshot to disk (if it is supported by the storage at all) Greater interval -> less writes to disk, more memory usage Smaller interval -> more writes to disk, less memory usage
func WithViewTopicManager ¶
func WithViewTopicManager(tm kafka.TopicManager) ViewOption
WithViewTopicManager defines a topic manager.