Documentation ¶
Overview ¶
Package goka is a stateful stream processing library for Apache Kafka (version 0.9+) that eases the development of microservices. Goka extends the concept of consumer group with a group table, which represents the state of the group. A microservice modifies and serves the content of a table employing two complementary object types: processors and views.
Processors ¶
A processor is a set of callback functions that modify the group table when messages arrive and may also emit messages into other topics. Messages as well as rows in the group table are key-value pairs. Callbacks receive the arriving message and the row addressed by the message's key.
In Kafka, keys are used to partition topics. A goka processor consumes from a set of co-partitioned topics (topics with the same number of partitions and the same key range). A group topic keeps track of the group table updates, allowing for recovery and rebalancing of processors: When multiple processor instances start in the same consumer group, the instances split the co-partitioned input topics and load the respective group table partitions from the group topic. A local disk storage minimizes recovery time by caching partitions of group table.
Views ¶
A view is a materialized (ie, persistent) cache of a group table. A view subscribes for the updates of all partitions of a group table and keeps local disk storage in sync with the group topic. With a view, one can easily serve up-to-date content of the group table via, for example, gRPC.
Package goka is a generated GoMock package.
Package goka is a generated GoMock package.
Package goka is a generated GoMock package.
Index ¶
- Variables
- func DefaultConfig() *sarama.Config
- func DefaultConsumerGroupBuilder(brokers []string, group, clientID string) (sarama.ConsumerGroup, error)
- func DefaultHasher() func() hash.Hash32
- func DefaultProcessorStoragePath(group Group) string
- func DefaultRebalance(a Assignment)
- func DefaultSaramaConsumerBuilder(brokers []string, clientID string) (sarama.Consumer, error)
- func DefaultUpdate(s storage.Storage, partition int32, key string, value []byte) error
- func DefaultViewStoragePath() string
- func NewMockController(t gomock.TestReporter) *gomock.Controller
- func ReplaceGlobalConfig(config *sarama.Config)
- type Assignment
- type Backoff
- type BackoffBuilder
- type Broker
- type Codec
- type ConsumerGroupBuilder
- type Context
- type Edge
- func Input(topic Stream, c Codec, cb ProcessCallback) Edge
- func Inputs(topics Streams, c Codec, cb ProcessCallback) Edge
- func Join(topic Table, c Codec) Edge
- func Lookup(topic Table, c Codec) Edge
- func Loop(c Codec, cb ProcessCallback) Edge
- func Output(topic Stream, c Codec) Edge
- func Persist(c Codec) Edge
- type Edges
- type Emitter
- type EmitterOption
- func WithEmitterClientID(clientID string) EmitterOption
- func WithEmitterHasher(hasher func() hash.Hash32) EmitterOption
- func WithEmitterLogger(log logger.Logger) EmitterOption
- func WithEmitterProducerBuilder(pb ProducerBuilder) EmitterOption
- func WithEmitterTester(t Tester) EmitterOption
- func WithEmitterTopicManagerBuilder(tmb TopicManagerBuilder) EmitterOption
- type Getter
- type Group
- type GroupGraph
- func (gg *GroupGraph) Group() Group
- func (gg *GroupGraph) GroupTable() Edge
- func (gg *GroupGraph) InputStreams() Edges
- func (gg *GroupGraph) JointTables() Edges
- func (gg *GroupGraph) LookupTables() Edges
- func (gg *GroupGraph) LoopStream() Edge
- func (gg *GroupGraph) OutputStreams() Edges
- func (gg *GroupGraph) Validate() error
- type InputStats
- type Iterator
- type MockAutoConsumer
- func (c *MockAutoConsumer) Close() error
- func (c *MockAutoConsumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error)
- func (c *MockAutoConsumer) ExpectConsumePartition(topic string, partition int32, offset int64) *MockAutoPartitionConsumer
- func (c *MockAutoConsumer) HighWaterMarks() map[string]map[int32]int64
- func (c *MockAutoConsumer) Partitions(topic string) ([]int32, error)
- func (c *MockAutoConsumer) SetTopicMetadata(metadata map[string][]int32)
- func (c *MockAutoConsumer) Topics() ([]string, error)
- type MockAutoPartitionConsumer
- func (pc *MockAutoPartitionConsumer) AsyncClose()
- func (pc *MockAutoPartitionConsumer) Close() error
- func (pc *MockAutoPartitionConsumer) Errors() <-chan *sarama.ConsumerError
- func (pc *MockAutoPartitionConsumer) ExpectErrorsDrainedOnClose()
- func (pc *MockAutoPartitionConsumer) ExpectMessagesDrainedOnClose()
- func (pc *MockAutoPartitionConsumer) HighWaterMarkOffset() int64
- func (pc *MockAutoPartitionConsumer) Messages() <-chan *sarama.ConsumerMessage
- func (pc *MockAutoPartitionConsumer) YieldError(err error)
- func (pc *MockAutoPartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage)
- type MockBroker
- type MockBrokerMockRecorder
- type MockClient
- func (m *MockClient) Brokers() []*sarama.Broker
- func (m *MockClient) Close() error
- func (m *MockClient) Closed() bool
- func (m *MockClient) Config() *sarama.Config
- func (m *MockClient) Controller() (*sarama.Broker, error)
- func (m *MockClient) Coordinator(arg0 string) (*sarama.Broker, error)
- func (m *MockClient) EXPECT() *MockClientMockRecorder
- func (m *MockClient) GetOffset(arg0 string, arg1 int32, arg2 int64) (int64, error)
- func (m *MockClient) InSyncReplicas(arg0 string, arg1 int32) ([]int32, error)
- func (m *MockClient) InitProducerID() (*sarama.InitProducerIDResponse, error)
- func (m *MockClient) Leader(arg0 string, arg1 int32) (*sarama.Broker, error)
- func (m *MockClient) OfflineReplicas(arg0 string, arg1 int32) ([]int32, error)
- func (m *MockClient) Partitions(arg0 string) ([]int32, error)
- func (m *MockClient) RefreshController() (*sarama.Broker, error)
- func (m *MockClient) RefreshCoordinator(arg0 string) error
- func (m *MockClient) RefreshMetadata(arg0 ...string) error
- func (m *MockClient) Replicas(arg0 string, arg1 int32) ([]int32, error)
- func (m *MockClient) Topics() ([]string, error)
- func (m *MockClient) WritablePartitions(arg0 string) ([]int32, error)
- type MockClientMockRecorder
- func (mr *MockClientMockRecorder) Brokers() *gomock.Call
- func (mr *MockClientMockRecorder) Close() *gomock.Call
- func (mr *MockClientMockRecorder) Closed() *gomock.Call
- func (mr *MockClientMockRecorder) Config() *gomock.Call
- func (mr *MockClientMockRecorder) Controller() *gomock.Call
- func (mr *MockClientMockRecorder) Coordinator(arg0 interface{}) *gomock.Call
- func (mr *MockClientMockRecorder) GetOffset(arg0, arg1, arg2 interface{}) *gomock.Call
- func (mr *MockClientMockRecorder) InSyncReplicas(arg0, arg1 interface{}) *gomock.Call
- func (mr *MockClientMockRecorder) InitProducerID() *gomock.Call
- func (mr *MockClientMockRecorder) Leader(arg0, arg1 interface{}) *gomock.Call
- func (mr *MockClientMockRecorder) OfflineReplicas(arg0, arg1 interface{}) *gomock.Call
- func (mr *MockClientMockRecorder) Partitions(arg0 interface{}) *gomock.Call
- func (mr *MockClientMockRecorder) RefreshController() *gomock.Call
- func (mr *MockClientMockRecorder) RefreshCoordinator(arg0 interface{}) *gomock.Call
- func (mr *MockClientMockRecorder) RefreshMetadata(arg0 ...interface{}) *gomock.Call
- func (mr *MockClientMockRecorder) Replicas(arg0, arg1 interface{}) *gomock.Call
- func (mr *MockClientMockRecorder) Topics() *gomock.Call
- func (mr *MockClientMockRecorder) WritablePartitions(arg0 interface{}) *gomock.Call
- type MockConsumerGroup
- func (cg *MockConsumerGroup) Close() error
- func (cg *MockConsumerGroup) Consume(ctx context.Context, topics []string, handler sarama.ConsumerGroupHandler) error
- func (cg *MockConsumerGroup) Errors() <-chan error
- func (cg *MockConsumerGroup) FailOnConsume(err error)
- func (cg *MockConsumerGroup) SendError(err error)
- func (cg *MockConsumerGroup) SendMessage(message *sarama.ConsumerMessage) <-chan struct{}
- func (cg *MockConsumerGroup) SendMessageWait(message *sarama.ConsumerMessage)
- type MockConsumerGroupClaim
- type MockConsumerGroupSession
- func (cgs *MockConsumerGroupSession) Claims() map[string][]int32
- func (cgs *MockConsumerGroupSession) Commit()
- func (cgs *MockConsumerGroupSession) Context() context.Context
- func (cgs *MockConsumerGroupSession) GenerationID() int32
- func (cgs *MockConsumerGroupSession) MarkMessage(msg *sarama.ConsumerMessage, metadata string)
- func (cgs *MockConsumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string)
- func (cgs *MockConsumerGroupSession) MemberID() string
- func (cgs *MockConsumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string)
- func (cgs *MockConsumerGroupSession) SendMessage(msg *sarama.ConsumerMessage)
- type MockProducer
- type MockProducerMockRecorder
- type MockStorage
- func (m *MockStorage) Close() error
- func (m *MockStorage) Delete(arg0 string) error
- func (m *MockStorage) EXPECT() *MockStorageMockRecorder
- func (m *MockStorage) Get(arg0 string) ([]byte, error)
- func (m *MockStorage) GetOffset(arg0 int64) (int64, error)
- func (m *MockStorage) Has(arg0 string) (bool, error)
- func (m *MockStorage) Iterator() (storage.Iterator, error)
- func (m *MockStorage) IteratorWithRange(arg0, arg1 []byte) (storage.Iterator, error)
- func (m *MockStorage) MarkRecovered() error
- func (m *MockStorage) Open() error
- func (m *MockStorage) Recovered() bool
- func (m *MockStorage) Set(arg0 string, arg1 []byte) error
- func (m *MockStorage) SetOffset(arg0 int64) error
- type MockStorageMockRecorder
- func (mr *MockStorageMockRecorder) Close() *gomock.Call
- func (mr *MockStorageMockRecorder) Delete(arg0 interface{}) *gomock.Call
- func (mr *MockStorageMockRecorder) Get(arg0 interface{}) *gomock.Call
- func (mr *MockStorageMockRecorder) GetOffset(arg0 interface{}) *gomock.Call
- func (mr *MockStorageMockRecorder) Has(arg0 interface{}) *gomock.Call
- func (mr *MockStorageMockRecorder) Iterator() *gomock.Call
- func (mr *MockStorageMockRecorder) IteratorWithRange(arg0, arg1 interface{}) *gomock.Call
- func (mr *MockStorageMockRecorder) MarkRecovered() *gomock.Call
- func (mr *MockStorageMockRecorder) Open() *gomock.Call
- func (mr *MockStorageMockRecorder) Recovered() *gomock.Call
- func (mr *MockStorageMockRecorder) Set(arg0, arg1 interface{}) *gomock.Call
- func (mr *MockStorageMockRecorder) SetOffset(arg0 interface{}) *gomock.Call
- type MockTopicManager
- func (m *MockTopicManager) Close() error
- func (m *MockTopicManager) EXPECT() *MockTopicManagerMockRecorder
- func (m *MockTopicManager) EnsureStreamExists(arg0 string, arg1 int) error
- func (m *MockTopicManager) EnsureTableExists(arg0 string, arg1 int) error
- func (m *MockTopicManager) EnsureTopicExists(arg0 string, arg1, arg2 int, arg3 map[string]string) error
- func (m *MockTopicManager) GetOffset(arg0 string, arg1 int32, arg2 int64) (int64, error)
- func (m *MockTopicManager) Partitions(arg0 string) ([]int32, error)
- type MockTopicManagerMockRecorder
- func (mr *MockTopicManagerMockRecorder) Close() *gomock.Call
- func (mr *MockTopicManagerMockRecorder) EnsureStreamExists(arg0, arg1 interface{}) *gomock.Call
- func (mr *MockTopicManagerMockRecorder) EnsureTableExists(arg0, arg1 interface{}) *gomock.Call
- func (mr *MockTopicManagerMockRecorder) EnsureTopicExists(arg0, arg1, arg2, arg3 interface{}) *gomock.Call
- func (mr *MockTopicManagerMockRecorder) GetOffset(arg0, arg1, arg2 interface{}) *gomock.Call
- func (mr *MockTopicManagerMockRecorder) Partitions(arg0 interface{}) *gomock.Call
- type NilHandling
- type OutputStats
- type PartitionProcStats
- type PartitionProcessor
- type PartitionStatus
- type PartitionTable
- func (p *PartitionTable) CatchupForever(ctx context.Context, restartOnError bool) error
- func (p *PartitionTable) Close() error
- func (p *PartitionTable) CurrentState() PartitionStatus
- func (p *PartitionTable) Delete(key string) error
- func (p *PartitionTable) Get(key string) ([]byte, error)
- func (p *PartitionTable) GetOffset(defValue int64) (int64, error)
- func (p *PartitionTable) Has(key string) (bool, error)
- func (p *PartitionTable) IsRecovered() bool
- func (p *PartitionTable) RunStatsLoop(ctx context.Context)
- func (p *PartitionTable) Set(key string, value []byte) error
- func (p *PartitionTable) SetOffset(value int64) error
- func (p *PartitionTable) SetupAndRecover(ctx context.Context, restartOnError bool) error
- func (p *PartitionTable) TrackMessageWrite(ctx context.Context, length int)
- func (p *PartitionTable) WaitRecovered() chan struct{}
- type ProcessCallback
- type Processor
- func (g *Processor) Cleanup(session sarama.ConsumerGroupSession) error
- func (g *Processor) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (g *Processor) Get(key string) (interface{}, error)
- func (g *Processor) Graph() *GroupGraph
- func (g *Processor) Recovered() bool
- func (g *Processor) Run(ctx context.Context) (rerr error)
- func (g *Processor) Setup(session sarama.ConsumerGroupSession) error
- func (g *Processor) Stats() *ProcessorStats
- func (g *Processor) StatsWithContext(ctx context.Context) *ProcessorStats
- func (g *Processor) Stop()
- func (g *Processor) WaitForReady()
- type ProcessorOption
- func WithBackoffBuilder(bb BackoffBuilder) ProcessorOption
- func WithBackoffResetTimeout(duration time.Duration) ProcessorOption
- func WithClientID(clientID string) ProcessorOption
- func WithConsumerGroupBuilder(cgb ConsumerGroupBuilder) ProcessorOption
- func WithConsumerSaramaBuilder(cgb SaramaConsumerBuilder) ProcessorOption
- func WithGroupGraphHook(hook func(gg *GroupGraph)) ProcessorOption
- func WithHasher(hasher func() hash.Hash32) ProcessorOption
- func WithLogger(log logger.Logger) ProcessorOption
- func WithNilHandling(nh NilHandling) ProcessorOption
- func WithPartitionChannelSize(size int) ProcessorOption
- func WithProducerBuilder(pb ProducerBuilder) ProcessorOption
- func WithRebalanceCallback(cb RebalanceCallback) ProcessorOption
- func WithStorageBuilder(sb storage.Builder) ProcessorOption
- func WithTester(t Tester) ProcessorOption
- func WithTopicManagerBuilder(tmb TopicManagerBuilder) ProcessorOption
- func WithUpdateCallback(cb UpdateCallback) ProcessorOption
- type ProcessorStats
- type Producer
- type ProducerBuilder
- type Promise
- type RebalanceCallback
- type RecoveryStats
- type SaramaConsumerBuilder
- type Signal
- type State
- type StateChangeObserver
- type Stream
- type Streams
- type Table
- type TableStats
- type Tester
- type TopicManager
- type TopicManagerBuilder
- type TopicManagerConfig
- type UpdateCallback
- type View
- func (v *View) CurrentState() ViewState
- func (v *View) Evict(key string) error
- func (v *View) Get(key string) (interface{}, error)
- func (v *View) Has(key string) (bool, error)
- func (v *View) Iterator() (Iterator, error)
- func (v *View) IteratorWithRange(start, limit string) (Iterator, error)
- func (v *View) ObserveStateChanges() *StateChangeObserver
- func (v *View) Recovered() bool
- func (v *View) Run(ctx context.Context) (rerr error)
- func (v *View) Stats(ctx context.Context) *ViewStats
- func (v *View) Topic() string
- func (v *View) WaitRunning() <-chan struct{}
- type ViewOption
- func WithViewAutoReconnect() ViewOption
- func WithViewBackoffBuilder(bb BackoffBuilder) ViewOption
- func WithViewBackoffResetTimeout(duration time.Duration) ViewOption
- func WithViewCallback(cb UpdateCallback) ViewOption
- func WithViewClientID(clientID string) ViewOption
- func WithViewConsumerSaramaBuilder(cgb SaramaConsumerBuilder) ViewOption
- func WithViewHasher(hasher func() hash.Hash32) ViewOption
- func WithViewLogger(log logger.Logger) ViewOption
- func WithViewRestartable() ViewOption
- func WithViewStorageBuilder(sb storage.Builder) ViewOption
- func WithViewTester(t Tester) ViewOption
- func WithViewTopicManagerBuilder(tmb TopicManagerBuilder) ViewOption
- type ViewState
- type ViewStats
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var CopartitioningStrategy = new(copartitioningStrategy)
CopartitioningStrategy is the rebalance strategy necessary to guarantee the copartitioning when consuming multiple input topics with multiple processor instances
var ( // ErrEmitterAlreadyClosed is returned when Emit is called after the emitter has been finished. ErrEmitterAlreadyClosed error = errors.New("emitter already closed") )
Functions ¶
func DefaultConfig ¶ added in v1.0.0
DefaultConfig creates a new config used by goka per default Use it to modify and pass to `goka.ReplaceGlobalConifg(...)` to modify goka's global config
func DefaultConsumerGroupBuilder ¶ added in v1.0.0
func DefaultConsumerGroupBuilder(brokers []string, group, clientID string) (sarama.ConsumerGroup, error)
DefaultConsumerGroupBuilder creates a Kafka consumer using the Sarama library.
func DefaultHasher ¶
DefaultHasher returns an FNV hasher builder to assign keys to partitions.
func DefaultProcessorStoragePath ¶
DefaultProcessorStoragePath is the default path where processor state will be stored.
func DefaultRebalance ¶ added in v0.1.3
func DefaultRebalance(a Assignment)
DefaultRebalance is the default callback when a new partition assignment is received. DefaultRebalance can be used in the function passed to WithRebalanceCallback.
func DefaultSaramaConsumerBuilder ¶ added in v1.0.0
DefaultSaramaConsumerBuilder creates a Kafka consumer using the Sarama library.
func DefaultUpdate ¶
DefaultUpdate is the default callback used to update the local storage with from the table topic in Kafka. It is called for every message received during recovery of processors and during the normal operation of views. DefaultUpdate can be used in the function passed to WithUpdateCallback and WithViewCallback.
func DefaultViewStoragePath ¶
func DefaultViewStoragePath() string
DefaultViewStoragePath returns the default path where view state will be stored.
func NewMockController ¶ added in v1.0.0
func NewMockController(t gomock.TestReporter) *gomock.Controller
NewMockController returns a *gomock.Controller using a wrapped testing.T (or whatever) which panics on a Fatalf. This is necessary when using a mock in kafkamock. Otherwise it will freeze on an unexpected call.
func ReplaceGlobalConfig ¶ added in v1.0.0
ReplaceGlobalConfig registeres a standard config used during building if no other config is specified
Types ¶
type Assignment ¶ added in v1.0.0
Assignment represents a partition:offset assignment for the current connection
type Backoff ¶ added in v1.0.0
Backoff is used for adding backoff capabilities to the restarting of failing partition tables.
func DefaultBackoffBuilder ¶ added in v1.0.0
DefaultBackoffBuilder returnes a simpleBackoff with 10 second steps
func NewSimpleBackoff ¶ added in v1.0.0
NewSimpleBackoff returns a simple backoff waiting the specified duration longer each iteration until reset.
type BackoffBuilder ¶ added in v1.0.0
BackoffBuilder creates a backoff
type Broker ¶ added in v1.0.0
type Broker interface { Addr() string Connected() (bool, error) CreateTopics(request *sarama.CreateTopicsRequest) (*sarama.CreateTopicsResponse, error) Open(conf *sarama.Config) error }
Broker is an interface for the sarama broker
type Codec ¶
type Codec interface { Encode(value interface{}) (data []byte, err error) Decode(data []byte) (value interface{}, err error) }
Codec decodes and encodes from and to []byte
type ConsumerGroupBuilder ¶ added in v1.0.0
type ConsumerGroupBuilder func(brokers []string, group, clientID string) (sarama.ConsumerGroup, error)
ConsumerGroupBuilder creates a `sarama.ConsumerGroup`
func ConsumerGroupBuilderWithConfig ¶ added in v1.0.0
func ConsumerGroupBuilderWithConfig(config *sarama.Config) ConsumerGroupBuilder
ConsumerGroupBuilderWithConfig creates a sarama consumergroup using passed config
type Context ¶
type Context interface { // Topic returns the topic of input message. Topic() Stream // Key returns the key of the input message. Key() string // Partition returns the partition of the input message. Partition() int32 // Offset returns the offset of the input message. Offset() int64 // Value returns the value of the key in the group table. // // This method might panic to initiate an immediate shutdown of the processor // to maintain data integrity. Do not recover from that panic or // the processor might deadlock. Value() interface{} // Headers returns the headers of the input message Headers() map[string][]byte // SetValue updates the value of the key in the group table. // It stores the value in the local cache and sends the // update to the Kafka topic representing the group table. // // This method might panic to initiate an immediate shutdown of the processor // to maintain data integrity. Do not recover from that panic or // the processor might deadlock. SetValue(value interface{}) // Delete deletes a value from the group table. IMPORTANT: this deletes the // value associated with the key from both the local cache and the persisted // table in Kafka. // // This method might panic to initiate an immediate shutdown of the processor // to maintain data integrity. Do not recover from that panic or // the processor might deadlock. Delete() // Timestamp returns the timestamp of the input message. If the timestamp is // invalid, a zero time will be returned. Timestamp() time.Time // Join returns the value of key in the copartitioned table. // // This method might panic to initiate an immediate shutdown of the processor // to maintain data integrity. Do not recover from that panic or // the processor might deadlock. Join(topic Table) interface{} // Lookup returns the value of key in the view of table. // // This method might panic to initiate an immediate shutdown of the processor // to maintain data integrity. Do not recover from that panic or // the processor might deadlock. Lookup(topic Table, key string) interface{} // Emit asynchronously writes a message into a topic. // // This method might panic to initiate an immediate shutdown of the processor // to maintain data integrity. Do not recover from that panic or // the processor might deadlock. Emit(topic Stream, key string, value interface{}) // Loopback asynchronously sends a message to another key of the group // table. Value passed to loopback is encoded via the codec given in the // Loop subscription. // // This method might panic to initiate an immediate shutdown of the processor // to maintain data integrity. Do not recover from that panic or // the processor might deadlock. Loopback(key string, value interface{}) // Fail stops execution and shuts down the processor // The callback is stopped immediately by panicking. Do not recover from that panic or // the processor might deadlock. Fail(err error) // Context returns the underlying context used to start the processor or a // subcontext. Context() context.Context }
Context provides access to the processor's table and emit capabilities to arbitrary topics in kafka. Upon arrival of a message from subscribed topics, the respective ConsumeCallback is invoked with a context object along with the input message. The context is only valid within the callback, do not store it or pass it to other goroutines.
Error handling ¶
Most methods of the context can fail due to different reasons, which are handled in different ways: Synchronous errors like * wrong codec for topic (a message cannot be marshalled or unmarshalled) * Emit to a topic without the Output definition in the group graph * Value/SetValue without defining Persist in the group graph * Join/Lookup without the definition in the group graph etc.. will result in a panic to stop the callback immediately and shutdown the processor. This is necessary to preserve integrity of the processor and avoid further actions. Do not recover from that panic, otherwise the goroutine will deadlock.
Retrying synchronous errors must be implemented by restarting the processor. If errors must be tolerated (which is not advisable because they're usually persistent), provide fail-tolerant versions of the producer, storage or codec as needed.
Asynchronous errors can occur when the callback has been finished, but e.g. sending a batched message to kafka fails due to connection errors or leader election in the cluster. Those errors still shutdown the processor but will not result in a panic in the callback.
type Edge ¶
Edge represents a topic in Kafka and the corresponding codec to encode and decode the messages of that topic.
func Input ¶
func Input(topic Stream, c Codec, cb ProcessCallback) Edge
Input represents an edge of an input stream topic. The edge specifies the topic name, its codec and the ProcessorCallback used to process it. The topic has to be copartitioned with any other input stream of the group and with the group table. The group starts reading the topic from the newest offset.
func Inputs ¶
func Inputs(topics Streams, c Codec, cb ProcessCallback) Edge
Inputs creates edges of multiple input streams sharing the same codec and callback.
func Join ¶
Join represents an edge of a copartitioned, log-compacted table topic. The edge specifies the topic name and the codec of the messages of the topic. The group starts reading the topic from the oldest offset. The processing of input streams is blocked until all partitions of the table are recovered.
func Lookup ¶
Lookup represents an edge of a non-copartitioned, log-compacted table topic. The edge specifies the topic name and the codec of the messages of the topic. The group starts reading the topic from the oldest offset. The processing of input streams is blocked until the table is fully recovered.
func Loop ¶
func Loop(c Codec, cb ProcessCallback) Edge
Loop represents the edge of the loopback topic of the group. The edge specifies the codec of the messages in the topic and ProcesCallback to process the messages of the topic. Context.Loopback() is used to write messages into this topic from any callback of the group.
func Output ¶
Output represents an edge of an output stream topic. The edge specifies the topic name and the codec of the messages of the topic. Context.Emit() only emits messages into Output edges defined in the group graph. The topic does not have to be copartitioned with the input streams.
func Persist ¶
Persist represents the edge of the group table, which is log-compacted and copartitioned with the input streams. Without Persist, calls to ctx.Value or ctx.SetValue in the consume callback will fail and lead to shutdown of the processor.
This edge specifies the codec of the messages in the topic, ie, the codec of the values of the table. The processing of input streams is blocked until all partitions of the group table are recovered.
The topic name is derived from the group name by appending "-table".
type Emitter ¶
type Emitter struct {
// contains filtered or unexported fields
}
Emitter emits messages into a specific Kafka topic, first encoding the message with the given codec.
func NewEmitter ¶
func NewEmitter(brokers []string, topic Stream, codec Codec, options ...EmitterOption) (*Emitter, error)
NewEmitter creates a new emitter using passed brokers, topic, codec and possibly options.
type EmitterOption ¶
EmitterOption defines a configuration option to be used when creating an emitter.
func WithEmitterClientID ¶
func WithEmitterClientID(clientID string) EmitterOption
WithEmitterClientID defines the client ID used to identify with kafka.
func WithEmitterHasher ¶
func WithEmitterHasher(hasher func() hash.Hash32) EmitterOption
WithEmitterHasher sets the hash function that assigns keys to partitions.
func WithEmitterLogger ¶
func WithEmitterLogger(log logger.Logger) EmitterOption
WithEmitterLogger sets the logger the emitter should use. By default, emitters use the standard library logger.
func WithEmitterProducerBuilder ¶
func WithEmitterProducerBuilder(pb ProducerBuilder) EmitterOption
WithEmitterProducerBuilder replaces the default producer builder.
func WithEmitterTester ¶ added in v0.1.1
func WithEmitterTester(t Tester) EmitterOption
WithEmitterTester configures the emitter to use passed tester. This is used for component tests
func WithEmitterTopicManagerBuilder ¶
func WithEmitterTopicManagerBuilder(tmb TopicManagerBuilder) EmitterOption
WithEmitterTopicManagerBuilder replaces the default topic manager builder.
type Getter ¶
Getter functions return a value for a key or an error. If no value exists for the key, nil is returned without errors.
type Group ¶
type Group string
Group is the name of a consumer group in Kafka and represents a processor group in Goka. A processor group may have a group table and a group loopback stream. By default, the group table is named <group>-table and the loopback stream <group>-loop.
type GroupGraph ¶
type GroupGraph struct {
// contains filtered or unexported fields
}
GroupGraph is the specification of a processor group. It contains all input, output, and any other topic from which and into which the processor group may consume or produce events. Each of these links to Kafka is called Edge.
func DefineGroup ¶
func DefineGroup(group Group, edges ...Edge) *GroupGraph
DefineGroup creates a group graph with a given group name and a list of edges.
func (*GroupGraph) GroupTable ¶
func (gg *GroupGraph) GroupTable() Edge
GroupTable returns the group table edge of the group.
func (*GroupGraph) InputStreams ¶
func (gg *GroupGraph) InputStreams() Edges
InputStreams returns all input stream edges of the group.
func (*GroupGraph) JointTables ¶
func (gg *GroupGraph) JointTables() Edges
JointTables retuns all joint table edges of the group.
func (*GroupGraph) LookupTables ¶
func (gg *GroupGraph) LookupTables() Edges
LookupTables retuns all lookup table edges of the group.
func (*GroupGraph) LoopStream ¶
func (gg *GroupGraph) LoopStream() Edge
LoopStream returns the loopback edge of the group.
func (*GroupGraph) OutputStreams ¶
func (gg *GroupGraph) OutputStreams() Edges
OutputStreams returns the output stream edges of the group.
func (*GroupGraph) Validate ¶
func (gg *GroupGraph) Validate() error
Validate validates the group graph and returns an error if invalid. Main validation checks are: - at most one loopback stream edge is allowed - at most one group table edge is allowed - at least one input stream is required - table and loopback topics cannot be used in any other edge.
type InputStats ¶
type InputStats struct { Count uint Bytes int OffsetLag int64 LastOffset int64 Delay time.Duration }
InputStats represents the number of messages and the number of bytes consumed from a stream or table topic since the process started.
type Iterator ¶
type Iterator interface { // Next advances the iterator to the next KV-pair. Err should be called // after Next returns false to check whether the iteration finished // from exhaustion or was aborted due to an error. Next() bool // Err returns the error that stopped the iteration if any. Err() error // Return the key of the current item Key() string // Return the value of the current item // This value is already decoded with the view's codec (or nil, if it's nil) Value() (interface{}, error) // Release the iterator. After release, the iterator is not usable anymore Release() // Seek moves the iterator to the begining of a key-value pair sequence that // is greater or equal to the given key. It returns whether at least one of // such key-value pairs exist. Next must be called after seeking to access // the first pair. Seek(key string) bool }
Iterator allows one to iterate over the keys of a view.
type MockAutoConsumer ¶ added in v1.0.0
type MockAutoConsumer struct {
// contains filtered or unexported fields
}
MockAutoConsumer implements sarama's Consumer interface for testing purposes. Before you can start consuming from this consumer, you have to register topic/partitions using ExpectConsumePartition, and set expectations on them.
func NewMockAutoConsumer ¶ added in v1.0.0
func NewMockAutoConsumer(t *testing.T, config *sarama.Config) *MockAutoConsumer
NewMockAutoConsumer returns a new mock Consumer instance. The t argument should be the *testing.T instance of your test method. An error will be written to it if an expectation is violated. The config argument can be set to nil.
func (*MockAutoConsumer) Close ¶ added in v1.0.0
func (c *MockAutoConsumer) Close() error
Close implements the Close method from the sarama.Consumer interface. It will close all registered PartitionConsumer instances.
func (*MockAutoConsumer) ConsumePartition ¶ added in v1.0.0
func (c *MockAutoConsumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error)
ConsumePartition implements the ConsumePartition method from the sarama.Consumer interface. Before you can start consuming a partition, you have to set expectations on it using ExpectConsumePartition. You can only consume a partition once per consumer.
func (*MockAutoConsumer) ExpectConsumePartition ¶ added in v1.0.0
func (c *MockAutoConsumer) ExpectConsumePartition(topic string, partition int32, offset int64) *MockAutoPartitionConsumer
ExpectConsumePartition will register a topic/partition, so you can set expectations on it. The registered PartitionConsumer will be returned, so you can set expectations on it using method chaining. Once a topic/partition is registered, you are expected to start consuming it using ConsumePartition. If that doesn't happen, an error will be written to the error reporter once the mock consumer is closed. It will also expect that the
func (*MockAutoConsumer) HighWaterMarks ¶ added in v1.0.0
func (c *MockAutoConsumer) HighWaterMarks() map[string]map[int32]int64
HighWaterMarks returns a map of high watermarks for each topic/partition
func (*MockAutoConsumer) Partitions ¶ added in v1.0.0
func (c *MockAutoConsumer) Partitions(topic string) ([]int32, error)
Partitions returns the list of parititons for the given topic, as registered with SetTopicMetadata
func (*MockAutoConsumer) SetTopicMetadata ¶ added in v1.0.0
func (c *MockAutoConsumer) SetTopicMetadata(metadata map[string][]int32)
SetTopicMetadata sets the clusters topic/partition metadata, which will be returned by Topics() and Partitions().
func (*MockAutoConsumer) Topics ¶ added in v1.0.0
func (c *MockAutoConsumer) Topics() ([]string, error)
Topics returns a list of topics, as registered with SetTopicMetadata
type MockAutoPartitionConsumer ¶ added in v1.0.0
type MockAutoPartitionConsumer struct {
// contains filtered or unexported fields
}
MockAutoPartitionConsumer implements sarama's PartitionConsumer interface for testing purposes. It is returned by the mock Consumers ConsumePartitionMethod, but only if it is registered first using the Consumer's ExpectConsumePartition method. Before consuming the Errors and Messages channel, you should specify what values will be provided on these channels using YieldMessage and YieldError.
func (*MockAutoPartitionConsumer) AsyncClose ¶ added in v1.0.0
func (pc *MockAutoPartitionConsumer) AsyncClose()
AsyncClose implements the AsyncClose method from the sarama.PartitionConsumer interface.
func (*MockAutoPartitionConsumer) Close ¶ added in v1.0.0
func (pc *MockAutoPartitionConsumer) Close() error
Close implements the Close method from the sarama.PartitionConsumer interface. It will verify whether the partition consumer was actually started.
func (*MockAutoPartitionConsumer) Errors ¶ added in v1.0.0
func (pc *MockAutoPartitionConsumer) Errors() <-chan *sarama.ConsumerError
Errors implements the Errors method from the sarama.PartitionConsumer interface.
func (*MockAutoPartitionConsumer) ExpectErrorsDrainedOnClose ¶ added in v1.0.0
func (pc *MockAutoPartitionConsumer) ExpectErrorsDrainedOnClose()
ExpectErrorsDrainedOnClose sets an expectation on the partition consumer that the errors channel will be fully drained when Close is called. If this expectation is not met, an error is reported to the error reporter.
func (*MockAutoPartitionConsumer) ExpectMessagesDrainedOnClose ¶ added in v1.0.0
func (pc *MockAutoPartitionConsumer) ExpectMessagesDrainedOnClose()
ExpectMessagesDrainedOnClose sets an expectation on the partition consumer that the messages channel will be fully drained when Close is called. If this expectation is not met, an error is reported to the error reporter.
func (*MockAutoPartitionConsumer) HighWaterMarkOffset ¶ added in v1.0.0
func (pc *MockAutoPartitionConsumer) HighWaterMarkOffset() int64
HighWaterMarkOffset returns the highwatermark for the partition
func (*MockAutoPartitionConsumer) Messages ¶ added in v1.0.0
func (pc *MockAutoPartitionConsumer) Messages() <-chan *sarama.ConsumerMessage
Messages implements the Messages method from the sarama.PartitionConsumer interface.
func (*MockAutoPartitionConsumer) YieldError ¶ added in v1.0.0
func (pc *MockAutoPartitionConsumer) YieldError(err error)
YieldError will yield an error on the Errors channel of this partition consumer when it is consumed. By default, the mock consumer will not verify whether this error was consumed from the Errors channel, because there are legitimate reasons for this not to happen. You can call ExpectErrorsDrainedOnClose so it will verify that the channel is empty on close.
func (*MockAutoPartitionConsumer) YieldMessage ¶ added in v1.0.0
func (pc *MockAutoPartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage)
YieldMessage will yield a messages Messages channel of this partition consumer when it is consumed. By default, the mock consumer will not verify whether this message was consumed from the Messages channel, because there are legitimate reasons forthis not to happen. ou can call ExpectMessagesDrainedOnClose so it will verify that the channel is empty on close.
type MockBroker ¶ added in v1.0.0
type MockBroker struct {
// contains filtered or unexported fields
}
MockBroker is a mock of Broker interface
func NewMockBroker ¶ added in v1.0.0
func NewMockBroker(ctrl *gomock.Controller) *MockBroker
NewMockBroker creates a new mock instance
func (*MockBroker) Connected ¶ added in v1.0.0
func (m *MockBroker) Connected() (bool, error)
Connected mocks base method
func (*MockBroker) CreateTopics ¶ added in v1.0.0
func (m *MockBroker) CreateTopics(arg0 *sarama.CreateTopicsRequest) (*sarama.CreateTopicsResponse, error)
CreateTopics mocks base method
func (*MockBroker) EXPECT ¶ added in v1.0.0
func (m *MockBroker) EXPECT() *MockBrokerMockRecorder
EXPECT returns an object that allows the caller to indicate expected use
type MockBrokerMockRecorder ¶ added in v1.0.0
type MockBrokerMockRecorder struct {
// contains filtered or unexported fields
}
MockBrokerMockRecorder is the mock recorder for MockBroker
func (*MockBrokerMockRecorder) Addr ¶ added in v1.0.0
func (mr *MockBrokerMockRecorder) Addr() *gomock.Call
Addr indicates an expected call of Addr
func (*MockBrokerMockRecorder) Connected ¶ added in v1.0.0
func (mr *MockBrokerMockRecorder) Connected() *gomock.Call
Connected indicates an expected call of Connected
func (*MockBrokerMockRecorder) CreateTopics ¶ added in v1.0.0
func (mr *MockBrokerMockRecorder) CreateTopics(arg0 interface{}) *gomock.Call
CreateTopics indicates an expected call of CreateTopics
func (*MockBrokerMockRecorder) Open ¶ added in v1.0.0
func (mr *MockBrokerMockRecorder) Open(arg0 interface{}) *gomock.Call
Open indicates an expected call of Open
type MockClient ¶ added in v1.0.0
type MockClient struct {
// contains filtered or unexported fields
}
MockClient is a mock of Client interface
func NewMockClient ¶ added in v1.0.0
func NewMockClient(ctrl *gomock.Controller) *MockClient
NewMockClient creates a new mock instance
func (*MockClient) Brokers ¶ added in v1.0.0
func (m *MockClient) Brokers() []*sarama.Broker
Brokers mocks base method
func (*MockClient) Close ¶ added in v1.0.0
func (m *MockClient) Close() error
Close mocks base method
func (*MockClient) Closed ¶ added in v1.0.0
func (m *MockClient) Closed() bool
Closed mocks base method
func (*MockClient) Config ¶ added in v1.0.0
func (m *MockClient) Config() *sarama.Config
Config mocks base method
func (*MockClient) Controller ¶ added in v1.0.0
func (m *MockClient) Controller() (*sarama.Broker, error)
Controller mocks base method
func (*MockClient) Coordinator ¶ added in v1.0.0
func (m *MockClient) Coordinator(arg0 string) (*sarama.Broker, error)
Coordinator mocks base method
func (*MockClient) EXPECT ¶ added in v1.0.0
func (m *MockClient) EXPECT() *MockClientMockRecorder
EXPECT returns an object that allows the caller to indicate expected use
func (*MockClient) InSyncReplicas ¶ added in v1.0.0
func (m *MockClient) InSyncReplicas(arg0 string, arg1 int32) ([]int32, error)
InSyncReplicas mocks base method
func (*MockClient) InitProducerID ¶ added in v1.0.0
func (m *MockClient) InitProducerID() (*sarama.InitProducerIDResponse, error)
InitProducerID mocks base method
func (*MockClient) OfflineReplicas ¶ added in v1.0.0
func (m *MockClient) OfflineReplicas(arg0 string, arg1 int32) ([]int32, error)
OfflineReplicas mocks base method
func (*MockClient) Partitions ¶ added in v1.0.0
func (m *MockClient) Partitions(arg0 string) ([]int32, error)
Partitions mocks base method
func (*MockClient) RefreshController ¶ added in v1.0.0
func (m *MockClient) RefreshController() (*sarama.Broker, error)
RefreshController mocks base method
func (*MockClient) RefreshCoordinator ¶ added in v1.0.0
func (m *MockClient) RefreshCoordinator(arg0 string) error
RefreshCoordinator mocks base method
func (*MockClient) RefreshMetadata ¶ added in v1.0.0
func (m *MockClient) RefreshMetadata(arg0 ...string) error
RefreshMetadata mocks base method
func (*MockClient) Replicas ¶ added in v1.0.0
func (m *MockClient) Replicas(arg0 string, arg1 int32) ([]int32, error)
Replicas mocks base method
func (*MockClient) Topics ¶ added in v1.0.0
func (m *MockClient) Topics() ([]string, error)
Topics mocks base method
func (*MockClient) WritablePartitions ¶ added in v1.0.0
func (m *MockClient) WritablePartitions(arg0 string) ([]int32, error)
WritablePartitions mocks base method
type MockClientMockRecorder ¶ added in v1.0.0
type MockClientMockRecorder struct {
// contains filtered or unexported fields
}
MockClientMockRecorder is the mock recorder for MockClient
func (*MockClientMockRecorder) Brokers ¶ added in v1.0.0
func (mr *MockClientMockRecorder) Brokers() *gomock.Call
Brokers indicates an expected call of Brokers
func (*MockClientMockRecorder) Close ¶ added in v1.0.0
func (mr *MockClientMockRecorder) Close() *gomock.Call
Close indicates an expected call of Close
func (*MockClientMockRecorder) Closed ¶ added in v1.0.0
func (mr *MockClientMockRecorder) Closed() *gomock.Call
Closed indicates an expected call of Closed
func (*MockClientMockRecorder) Config ¶ added in v1.0.0
func (mr *MockClientMockRecorder) Config() *gomock.Call
Config indicates an expected call of Config
func (*MockClientMockRecorder) Controller ¶ added in v1.0.0
func (mr *MockClientMockRecorder) Controller() *gomock.Call
Controller indicates an expected call of Controller
func (*MockClientMockRecorder) Coordinator ¶ added in v1.0.0
func (mr *MockClientMockRecorder) Coordinator(arg0 interface{}) *gomock.Call
Coordinator indicates an expected call of Coordinator
func (*MockClientMockRecorder) GetOffset ¶ added in v1.0.0
func (mr *MockClientMockRecorder) GetOffset(arg0, arg1, arg2 interface{}) *gomock.Call
GetOffset indicates an expected call of GetOffset
func (*MockClientMockRecorder) InSyncReplicas ¶ added in v1.0.0
func (mr *MockClientMockRecorder) InSyncReplicas(arg0, arg1 interface{}) *gomock.Call
InSyncReplicas indicates an expected call of InSyncReplicas
func (*MockClientMockRecorder) InitProducerID ¶ added in v1.0.0
func (mr *MockClientMockRecorder) InitProducerID() *gomock.Call
InitProducerID indicates an expected call of InitProducerID
func (*MockClientMockRecorder) Leader ¶ added in v1.0.0
func (mr *MockClientMockRecorder) Leader(arg0, arg1 interface{}) *gomock.Call
Leader indicates an expected call of Leader
func (*MockClientMockRecorder) OfflineReplicas ¶ added in v1.0.0
func (mr *MockClientMockRecorder) OfflineReplicas(arg0, arg1 interface{}) *gomock.Call
OfflineReplicas indicates an expected call of OfflineReplicas
func (*MockClientMockRecorder) Partitions ¶ added in v1.0.0
func (mr *MockClientMockRecorder) Partitions(arg0 interface{}) *gomock.Call
Partitions indicates an expected call of Partitions
func (*MockClientMockRecorder) RefreshController ¶ added in v1.0.0
func (mr *MockClientMockRecorder) RefreshController() *gomock.Call
RefreshController indicates an expected call of RefreshController
func (*MockClientMockRecorder) RefreshCoordinator ¶ added in v1.0.0
func (mr *MockClientMockRecorder) RefreshCoordinator(arg0 interface{}) *gomock.Call
RefreshCoordinator indicates an expected call of RefreshCoordinator
func (*MockClientMockRecorder) RefreshMetadata ¶ added in v1.0.0
func (mr *MockClientMockRecorder) RefreshMetadata(arg0 ...interface{}) *gomock.Call
RefreshMetadata indicates an expected call of RefreshMetadata
func (*MockClientMockRecorder) Replicas ¶ added in v1.0.0
func (mr *MockClientMockRecorder) Replicas(arg0, arg1 interface{}) *gomock.Call
Replicas indicates an expected call of Replicas
func (*MockClientMockRecorder) Topics ¶ added in v1.0.0
func (mr *MockClientMockRecorder) Topics() *gomock.Call
Topics indicates an expected call of Topics
func (*MockClientMockRecorder) WritablePartitions ¶ added in v1.0.0
func (mr *MockClientMockRecorder) WritablePartitions(arg0 interface{}) *gomock.Call
WritablePartitions indicates an expected call of WritablePartitions
type MockConsumerGroup ¶ added in v1.0.0
type MockConsumerGroup struct {
// contains filtered or unexported fields
}
MockConsumerGroup mocks the consumergroup
func NewMockConsumerGroup ¶ added in v1.0.0
func NewMockConsumerGroup(t *testing.T) *MockConsumerGroup
NewMockConsumerGroup creates a new consumer group
func (*MockConsumerGroup) Close ¶ added in v1.0.0
func (cg *MockConsumerGroup) Close() error
Close closes the consumergroup
func (*MockConsumerGroup) Consume ¶ added in v1.0.0
func (cg *MockConsumerGroup) Consume(ctx context.Context, topics []string, handler sarama.ConsumerGroupHandler) error
Consume starts consuming from the consumergroup
func (*MockConsumerGroup) Errors ¶ added in v1.0.0
func (cg *MockConsumerGroup) Errors() <-chan error
Errors returns the errors channel
func (*MockConsumerGroup) FailOnConsume ¶ added in v1.0.0
func (cg *MockConsumerGroup) FailOnConsume(err error)
FailOnConsume marks the consumer to fail on consume
func (*MockConsumerGroup) SendError ¶ added in v1.0.0
func (cg *MockConsumerGroup) SendError(err error)
SendError sends an error the consumergroup
func (*MockConsumerGroup) SendMessage ¶ added in v1.0.0
func (cg *MockConsumerGroup) SendMessage(message *sarama.ConsumerMessage) <-chan struct{}
SendMessage sends a message to the consumergroup returns a channel that will be closed when the message has been committed by the group
func (*MockConsumerGroup) SendMessageWait ¶ added in v1.0.0
func (cg *MockConsumerGroup) SendMessageWait(message *sarama.ConsumerMessage)
SendMessageWait sends a message to the consumergroup waiting for the message for being committed
type MockConsumerGroupClaim ¶ added in v1.0.0
type MockConsumerGroupClaim struct {
// contains filtered or unexported fields
}
MockConsumerGroupClaim mocks the consumergroupclaim
func NewMockConsumerGroupClaim ¶ added in v1.0.0
func NewMockConsumerGroupClaim(topic string, partition int32) *MockConsumerGroupClaim
NewMockConsumerGroupClaim creates a new mocksconsumergroupclaim
func (*MockConsumerGroupClaim) HighWaterMarkOffset ¶ added in v1.0.0
func (cgc *MockConsumerGroupClaim) HighWaterMarkOffset() int64
HighWaterMarkOffset returns the hwm offset
func (*MockConsumerGroupClaim) InitialOffset ¶ added in v1.0.0
func (cgc *MockConsumerGroupClaim) InitialOffset() int64
InitialOffset returns the initial offset
func (*MockConsumerGroupClaim) Messages ¶ added in v1.0.0
func (cgc *MockConsumerGroupClaim) Messages() <-chan *sarama.ConsumerMessage
Messages returns the message channel that must be
func (*MockConsumerGroupClaim) Partition ¶ added in v1.0.0
func (cgc *MockConsumerGroupClaim) Partition() int32
Partition returns the partition
func (*MockConsumerGroupClaim) Topic ¶ added in v1.0.0
func (cgc *MockConsumerGroupClaim) Topic() string
Topic returns the current topic of the claim
type MockConsumerGroupSession ¶ added in v1.0.0
type MockConsumerGroupSession struct {
// contains filtered or unexported fields
}
MockConsumerGroupSession mocks the consumer group session used for testing
func (*MockConsumerGroupSession) Claims ¶ added in v1.0.0
func (cgs *MockConsumerGroupSession) Claims() map[string][]int32
Claims returns the number of partitions assigned in the group session for each topic
func (*MockConsumerGroupSession) Commit ¶ added in v1.0.1
func (cgs *MockConsumerGroupSession) Commit()
Commit the offset to the backend
func (*MockConsumerGroupSession) Context ¶ added in v1.0.0
func (cgs *MockConsumerGroupSession) Context() context.Context
Context returns the consumer group's context
func (*MockConsumerGroupSession) GenerationID ¶ added in v1.0.0
func (cgs *MockConsumerGroupSession) GenerationID() int32
GenerationID returns the generation ID of the group consumer
func (*MockConsumerGroupSession) MarkMessage ¶ added in v1.0.0
func (cgs *MockConsumerGroupSession) MarkMessage(msg *sarama.ConsumerMessage, metadata string)
MarkMessage marks the passed message as consumed
func (*MockConsumerGroupSession) MarkOffset ¶ added in v1.0.0
func (cgs *MockConsumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string)
MarkOffset marks the passed offset consumed in topic/partition
func (*MockConsumerGroupSession) MemberID ¶ added in v1.0.0
func (cgs *MockConsumerGroupSession) MemberID() string
MemberID returns the member ID TOOD: clarify what that actually means and whether we need to mock taht somehow
func (*MockConsumerGroupSession) ResetOffset ¶ added in v1.0.0
func (cgs *MockConsumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string)
ResetOffset resets the offset to be consumed from
func (*MockConsumerGroupSession) SendMessage ¶ added in v1.0.0
func (cgs *MockConsumerGroupSession) SendMessage(msg *sarama.ConsumerMessage)
SendMessage sends a message to the consumer
type MockProducer ¶ added in v1.0.0
type MockProducer struct {
// contains filtered or unexported fields
}
MockProducer is a mock of Producer interface
func NewMockProducer ¶ added in v1.0.0
func NewMockProducer(ctrl *gomock.Controller) *MockProducer
NewMockProducer creates a new mock instance
func (*MockProducer) Close ¶ added in v1.0.0
func (m *MockProducer) Close() error
Close mocks base method
func (*MockProducer) EXPECT ¶ added in v1.0.0
func (m *MockProducer) EXPECT() *MockProducerMockRecorder
EXPECT returns an object that allows the caller to indicate expected use
func (*MockProducer) Emit ¶ added in v1.0.0
func (m *MockProducer) Emit(arg0, arg1 string, arg2 []byte) *Promise
Emit mocks base method
func (*MockProducer) EmitWithHeaders ¶ added in v1.0.1
func (m *MockProducer) EmitWithHeaders(arg0, arg1 string, arg2 []byte, arg3 map[string][]byte) *Promise
EmitWithHeaders mocks base method
type MockProducerMockRecorder ¶ added in v1.0.0
type MockProducerMockRecorder struct {
// contains filtered or unexported fields
}
MockProducerMockRecorder is the mock recorder for MockProducer
func (*MockProducerMockRecorder) Close ¶ added in v1.0.0
func (mr *MockProducerMockRecorder) Close() *gomock.Call
Close indicates an expected call of Close
func (*MockProducerMockRecorder) Emit ¶ added in v1.0.0
func (mr *MockProducerMockRecorder) Emit(arg0, arg1, arg2 interface{}) *gomock.Call
Emit indicates an expected call of Emit
func (*MockProducerMockRecorder) EmitWithHeaders ¶ added in v1.0.1
func (mr *MockProducerMockRecorder) EmitWithHeaders(arg0, arg1, arg2, arg3 interface{}) *gomock.Call
EmitWithHeaders indicates an expected call of EmitWithHeaders
type MockStorage ¶ added in v1.0.0
type MockStorage struct {
// contains filtered or unexported fields
}
MockStorage is a mock of Storage interface
func NewMockStorage ¶ added in v1.0.0
func NewMockStorage(ctrl *gomock.Controller) *MockStorage
NewMockStorage creates a new mock instance
func (*MockStorage) Close ¶ added in v1.0.0
func (m *MockStorage) Close() error
Close mocks base method
func (*MockStorage) Delete ¶ added in v1.0.0
func (m *MockStorage) Delete(arg0 string) error
Delete mocks base method
func (*MockStorage) EXPECT ¶ added in v1.0.0
func (m *MockStorage) EXPECT() *MockStorageMockRecorder
EXPECT returns an object that allows the caller to indicate expected use
func (*MockStorage) Get ¶ added in v1.0.0
func (m *MockStorage) Get(arg0 string) ([]byte, error)
Get mocks base method
func (*MockStorage) GetOffset ¶ added in v1.0.0
func (m *MockStorage) GetOffset(arg0 int64) (int64, error)
GetOffset mocks base method
func (*MockStorage) Has ¶ added in v1.0.0
func (m *MockStorage) Has(arg0 string) (bool, error)
Has mocks base method
func (*MockStorage) Iterator ¶ added in v1.0.0
func (m *MockStorage) Iterator() (storage.Iterator, error)
Iterator mocks base method
func (*MockStorage) IteratorWithRange ¶ added in v1.0.0
func (m *MockStorage) IteratorWithRange(arg0, arg1 []byte) (storage.Iterator, error)
IteratorWithRange mocks base method
func (*MockStorage) MarkRecovered ¶ added in v1.0.0
func (m *MockStorage) MarkRecovered() error
MarkRecovered mocks base method
func (*MockStorage) Open ¶ added in v1.0.0
func (m *MockStorage) Open() error
Open mocks base method
func (*MockStorage) Recovered ¶ added in v1.0.0
func (m *MockStorage) Recovered() bool
Recovered mocks base method
func (*MockStorage) Set ¶ added in v1.0.0
func (m *MockStorage) Set(arg0 string, arg1 []byte) error
Set mocks base method
func (*MockStorage) SetOffset ¶ added in v1.0.0
func (m *MockStorage) SetOffset(arg0 int64) error
SetOffset mocks base method
type MockStorageMockRecorder ¶ added in v1.0.0
type MockStorageMockRecorder struct {
// contains filtered or unexported fields
}
MockStorageMockRecorder is the mock recorder for MockStorage
func (*MockStorageMockRecorder) Close ¶ added in v1.0.0
func (mr *MockStorageMockRecorder) Close() *gomock.Call
Close indicates an expected call of Close
func (*MockStorageMockRecorder) Delete ¶ added in v1.0.0
func (mr *MockStorageMockRecorder) Delete(arg0 interface{}) *gomock.Call
Delete indicates an expected call of Delete
func (*MockStorageMockRecorder) Get ¶ added in v1.0.0
func (mr *MockStorageMockRecorder) Get(arg0 interface{}) *gomock.Call
Get indicates an expected call of Get
func (*MockStorageMockRecorder) GetOffset ¶ added in v1.0.0
func (mr *MockStorageMockRecorder) GetOffset(arg0 interface{}) *gomock.Call
GetOffset indicates an expected call of GetOffset
func (*MockStorageMockRecorder) Has ¶ added in v1.0.0
func (mr *MockStorageMockRecorder) Has(arg0 interface{}) *gomock.Call
Has indicates an expected call of Has
func (*MockStorageMockRecorder) Iterator ¶ added in v1.0.0
func (mr *MockStorageMockRecorder) Iterator() *gomock.Call
Iterator indicates an expected call of Iterator
func (*MockStorageMockRecorder) IteratorWithRange ¶ added in v1.0.0
func (mr *MockStorageMockRecorder) IteratorWithRange(arg0, arg1 interface{}) *gomock.Call
IteratorWithRange indicates an expected call of IteratorWithRange
func (*MockStorageMockRecorder) MarkRecovered ¶ added in v1.0.0
func (mr *MockStorageMockRecorder) MarkRecovered() *gomock.Call
MarkRecovered indicates an expected call of MarkRecovered
func (*MockStorageMockRecorder) Open ¶ added in v1.0.0
func (mr *MockStorageMockRecorder) Open() *gomock.Call
Open indicates an expected call of Open
func (*MockStorageMockRecorder) Recovered ¶ added in v1.0.0
func (mr *MockStorageMockRecorder) Recovered() *gomock.Call
Recovered indicates an expected call of Recovered
func (*MockStorageMockRecorder) Set ¶ added in v1.0.0
func (mr *MockStorageMockRecorder) Set(arg0, arg1 interface{}) *gomock.Call
Set indicates an expected call of Set
func (*MockStorageMockRecorder) SetOffset ¶ added in v1.0.0
func (mr *MockStorageMockRecorder) SetOffset(arg0 interface{}) *gomock.Call
SetOffset indicates an expected call of SetOffset
type MockTopicManager ¶ added in v1.0.0
type MockTopicManager struct {
// contains filtered or unexported fields
}
MockTopicManager is a mock of TopicManager interface
func NewMockTopicManager ¶ added in v1.0.0
func NewMockTopicManager(ctrl *gomock.Controller) *MockTopicManager
NewMockTopicManager creates a new mock instance
func (*MockTopicManager) Close ¶ added in v1.0.0
func (m *MockTopicManager) Close() error
Close mocks base method
func (*MockTopicManager) EXPECT ¶ added in v1.0.0
func (m *MockTopicManager) EXPECT() *MockTopicManagerMockRecorder
EXPECT returns an object that allows the caller to indicate expected use
func (*MockTopicManager) EnsureStreamExists ¶ added in v1.0.0
func (m *MockTopicManager) EnsureStreamExists(arg0 string, arg1 int) error
EnsureStreamExists mocks base method
func (*MockTopicManager) EnsureTableExists ¶ added in v1.0.0
func (m *MockTopicManager) EnsureTableExists(arg0 string, arg1 int) error
EnsureTableExists mocks base method
func (*MockTopicManager) EnsureTopicExists ¶ added in v1.0.0
func (m *MockTopicManager) EnsureTopicExists(arg0 string, arg1, arg2 int, arg3 map[string]string) error
EnsureTopicExists mocks base method
func (*MockTopicManager) Partitions ¶ added in v1.0.0
func (m *MockTopicManager) Partitions(arg0 string) ([]int32, error)
Partitions mocks base method
type MockTopicManagerMockRecorder ¶ added in v1.0.0
type MockTopicManagerMockRecorder struct {
// contains filtered or unexported fields
}
MockTopicManagerMockRecorder is the mock recorder for MockTopicManager
func (*MockTopicManagerMockRecorder) Close ¶ added in v1.0.0
func (mr *MockTopicManagerMockRecorder) Close() *gomock.Call
Close indicates an expected call of Close
func (*MockTopicManagerMockRecorder) EnsureStreamExists ¶ added in v1.0.0
func (mr *MockTopicManagerMockRecorder) EnsureStreamExists(arg0, arg1 interface{}) *gomock.Call
EnsureStreamExists indicates an expected call of EnsureStreamExists
func (*MockTopicManagerMockRecorder) EnsureTableExists ¶ added in v1.0.0
func (mr *MockTopicManagerMockRecorder) EnsureTableExists(arg0, arg1 interface{}) *gomock.Call
EnsureTableExists indicates an expected call of EnsureTableExists
func (*MockTopicManagerMockRecorder) EnsureTopicExists ¶ added in v1.0.0
func (mr *MockTopicManagerMockRecorder) EnsureTopicExists(arg0, arg1, arg2, arg3 interface{}) *gomock.Call
EnsureTopicExists indicates an expected call of EnsureTopicExists
func (*MockTopicManagerMockRecorder) GetOffset ¶ added in v1.0.0
func (mr *MockTopicManagerMockRecorder) GetOffset(arg0, arg1, arg2 interface{}) *gomock.Call
GetOffset indicates an expected call of GetOffset
func (*MockTopicManagerMockRecorder) Partitions ¶ added in v1.0.0
func (mr *MockTopicManagerMockRecorder) Partitions(arg0 interface{}) *gomock.Call
Partitions indicates an expected call of Partitions
type NilHandling ¶
type NilHandling int
NilHandling defines how nil messages should be handled by the processor.
const ( // NilIgnore drops any message with nil value. NilIgnore NilHandling = 0 + iota // NilProcess passes the nil value to ProcessCallback. NilProcess // NilDecode passes the nil value to decoder before calling ProcessCallback. NilDecode )
type OutputStats ¶
OutputStats represents the number of messages and the number of bytes emitted into a stream or table since the process started.
type PartitionProcStats ¶ added in v1.0.0
type PartitionProcStats struct { Now time.Time TableStats *TableStats Joined map[string]*TableStats Input map[string]*InputStats Output map[string]*OutputStats }
PartitionProcStats represents metrics and measurements of a partition processor
type PartitionProcessor ¶ added in v1.0.0
type PartitionProcessor struct {
// contains filtered or unexported fields
}
PartitionProcessor handles message processing of one partition by serializing messages from different input topics. It also handles joined tables as well as lookup views (managed by `Processor`).
func (*PartitionProcessor) EnqueueMessage ¶ added in v1.0.0
func (pp *PartitionProcessor) EnqueueMessage(msg *sarama.ConsumerMessage)
EnqueueMessage enqueues a message in the partition processor's event channel for processing
func (*PartitionProcessor) Errors ¶ added in v1.0.0
func (pp *PartitionProcessor) Errors() <-chan error
Errors returns a channel or errors during consumption
func (*PartitionProcessor) Recovered ¶ added in v1.0.0
func (pp *PartitionProcessor) Recovered() bool
Recovered returns whether the processor is running (i.e. all joins, lookups and the table is recovered and it's consuming messages)
func (*PartitionProcessor) Setup ¶ added in v1.0.0
func (pp *PartitionProcessor) Setup(ctx context.Context) error
Setup initializes the processor after a rebalance
func (*PartitionProcessor) Stop ¶ added in v1.0.0
func (pp *PartitionProcessor) Stop() error
Stop stops the partition processor
type PartitionStatus ¶
type PartitionStatus int
PartitionStatus is the status of the partition of a table (group table or joined table).
const ( // PartitionStopped indicates the partition stopped and should not be used anymore. PartitionStopped PartitionStatus = iota // PartitionInitializing indicates that the underlying storage is initializing (e.g. opening leveldb files), // and has not actually started working yet. PartitionInitializing // PartitionConnecting indicates the partition trying to (re-)connect to Kafka PartitionConnecting // PartitionRecovering indicates the partition is recovering and the storage // is writing updates in bulk-mode (if the storage implementation supports it). PartitionRecovering // PartitionPreparing indicates the end of the bulk-mode. Depending on the storage // implementation, the Preparing phase may take long because the storage compacts its logs. PartitionPreparing // PartitionRunning indicates the partition is recovered and processing updates // in normal operation. PartitionRunning )
type PartitionTable ¶ added in v1.0.0
type PartitionTable struct {
// contains filtered or unexported fields
}
PartitionTable manages the usage of a table for one partition. It allows to setup and recover/catchup the table contents from kafka, allow updates via Get/Set/Delete accessors
func (*PartitionTable) CatchupForever ¶ added in v1.0.0
func (p *PartitionTable) CatchupForever(ctx context.Context, restartOnError bool) error
CatchupForever starts catching the partition table forever (until the context is cancelled). Option restartOnError allows the view to stay open/intact even in case of consumer errors
func (*PartitionTable) Close ¶ added in v1.0.0
func (p *PartitionTable) Close() error
Close closes the partition table
func (*PartitionTable) CurrentState ¶ added in v1.0.0
func (p *PartitionTable) CurrentState() PartitionStatus
CurrentState returns the partition's current status
func (*PartitionTable) Delete ¶ added in v1.0.0
func (p *PartitionTable) Delete(key string) error
Delete removes the passed key from the partition table by deleting from the underlying storage
func (*PartitionTable) Get ¶ added in v1.0.0
func (p *PartitionTable) Get(key string) ([]byte, error)
Get returns the value for passed key
func (*PartitionTable) GetOffset ¶ added in v1.0.0
func (p *PartitionTable) GetOffset(defValue int64) (int64, error)
GetOffset returns the magic offset value from storage
func (*PartitionTable) Has ¶ added in v1.0.0
func (p *PartitionTable) Has(key string) (bool, error)
Has returns whether the storage contains passed key
func (*PartitionTable) IsRecovered ¶ added in v1.0.0
func (p *PartitionTable) IsRecovered() bool
IsRecovered returns whether the partition table is recovered
func (*PartitionTable) RunStatsLoop ¶ added in v1.0.0
func (p *PartitionTable) RunStatsLoop(ctx context.Context)
RunStatsLoop starts the handler for stats requests. This loop runs detached from the recover/catchup mechanism so clients can always request stats even if the partition table is not running (like a processor table after it's recovered).
func (*PartitionTable) Set ¶ added in v1.0.0
func (p *PartitionTable) Set(key string, value []byte) error
Set sets a key value key in the partition table by modifying the underlying storage
func (*PartitionTable) SetOffset ¶ added in v1.0.0
func (p *PartitionTable) SetOffset(value int64) error
SetOffset sets the magic offset value in storage
func (*PartitionTable) SetupAndRecover ¶ added in v1.0.0
func (p *PartitionTable) SetupAndRecover(ctx context.Context, restartOnError bool) error
SetupAndRecover sets up the partition storage and recovers to HWM
func (*PartitionTable) TrackMessageWrite ¶ added in v1.0.0
func (p *PartitionTable) TrackMessageWrite(ctx context.Context, length int)
TrackMessageWrite updates the write stats to passed length
func (*PartitionTable) WaitRecovered ¶ added in v1.0.0
func (p *PartitionTable) WaitRecovered() chan struct{}
WaitRecovered returns a channel that closes when the partition table enters state `PartitionRunning`
type ProcessCallback ¶
type ProcessCallback func(ctx Context, msg interface{})
ProcessCallback function is called for every message received by the processor.
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
Processor is a set of stateful callback functions that, on the arrival of messages, modify the content of a table (the group table) and emit messages into other topics. Messages as well as rows in the group table are key-value pairs. A group is composed by multiple processor instances.
func NewProcessor ¶
func NewProcessor(brokers []string, gg *GroupGraph, options ...ProcessorOption) (*Processor, error)
NewProcessor creates a processor instance in a group given the address of Kafka brokers, the consumer group name, a list of subscriptions (topics, codecs, and callbacks), and series of options.
func (*Processor) Cleanup ¶ added in v1.0.0
func (g *Processor) Cleanup(session sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited but before the offsets are committed for the very last time.
func (*Processor) ConsumeClaim ¶ added in v1.0.0
func (g *Processor) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). Once the Messages() channel is closed, the Handler must finish its processing loop and exit.
func (*Processor) Get ¶
Get returns a read-only copy of a value from the group table if the respective partition is owned by the processor instace. Get can be called by multiple goroutines concurrently. Get can be only used with stateful processors (ie, when group table is enabled) and after Recovered returns true.
func (*Processor) Graph ¶
func (g *Processor) Graph() *GroupGraph
Graph returns the group graph of the processor.
func (*Processor) Recovered ¶
Recovered returns whether the processor is running, i.e. if the processor has recovered all lookups/joins/tables and is running
func (*Processor) Run ¶
Run starts the processor using passed context. The processor stops in case of errors or if the context is cancelled
func (*Processor) Setup ¶ added in v1.0.0
func (g *Processor) Setup(session sarama.ConsumerGroupSession) error
Setup is run at the beginning of a new session, before ConsumeClaim.
func (*Processor) Stats ¶
func (g *Processor) Stats() *ProcessorStats
Stats returns the aggregated stats for the processor including all partitions, tables, lookups and joins
func (*Processor) StatsWithContext ¶ added in v1.0.0
func (g *Processor) StatsWithContext(ctx context.Context) *ProcessorStats
StatsWithContext returns stats for the processor, see #Processor.Stats()
func (*Processor) Stop ¶ added in v1.0.0
func (g *Processor) Stop()
Stop stops the processor. This is semantically equivalent of closing the Context that was passed to Processor.Run(..). This method will return immediately, errors during running will be returned from teh Processor.Run(..)
func (*Processor) WaitForReady ¶ added in v1.0.0
func (g *Processor) WaitForReady()
WaitForReady waits until the processor is ready to consume messages (or is actually consuming messages) i.e., it is done catching up all partition tables, joins and lookup tables
type ProcessorOption ¶
type ProcessorOption func(*poptions, *GroupGraph)
ProcessorOption defines a configuration option to be used when creating a processor.
func WithBackoffBuilder ¶ added in v1.0.0
func WithBackoffBuilder(bb BackoffBuilder) ProcessorOption
WithBackoffBuilder replaced the default backoff.
func WithBackoffResetTimeout ¶ added in v1.0.0
func WithBackoffResetTimeout(duration time.Duration) ProcessorOption
WithBackoffResetTimeout defines the timeout when the backoff will be reset.
func WithClientID ¶
func WithClientID(clientID string) ProcessorOption
WithClientID defines the client ID used to identify with Kafka.
func WithConsumerGroupBuilder ¶ added in v1.0.0
func WithConsumerGroupBuilder(cgb ConsumerGroupBuilder) ProcessorOption
WithConsumerGroupBuilder replaces the default consumer group builder
func WithConsumerSaramaBuilder ¶ added in v1.0.0
func WithConsumerSaramaBuilder(cgb SaramaConsumerBuilder) ProcessorOption
WithConsumerSaramaBuilder replaces the default consumer group builder
func WithGroupGraphHook ¶ added in v0.1.2
func WithGroupGraphHook(hook func(gg *GroupGraph)) ProcessorOption
WithGroupGraphHook allows a function to obtain the group graph when a processor is started.
func WithHasher ¶
func WithHasher(hasher func() hash.Hash32) ProcessorOption
WithHasher sets the hash function that assigns keys to partitions.
func WithLogger ¶
func WithLogger(log logger.Logger) ProcessorOption
WithLogger sets the logger the processor should use. By default, processors use the standard library logger.
func WithNilHandling ¶
func WithNilHandling(nh NilHandling) ProcessorOption
WithNilHandling configures how the processor should handle messages with nil value. By default the processor ignores nil messages.
func WithPartitionChannelSize ¶
func WithPartitionChannelSize(size int) ProcessorOption
WithPartitionChannelSize replaces the default partition channel size. This is mostly used for testing by setting it to 0 to have synchronous behavior of goka.
func WithProducerBuilder ¶
func WithProducerBuilder(pb ProducerBuilder) ProcessorOption
WithProducerBuilder replaces the default producer builder.
func WithRebalanceCallback ¶ added in v0.1.3
func WithRebalanceCallback(cb RebalanceCallback) ProcessorOption
WithRebalanceCallback sets the callback for when a new partition assignment is received. By default, this is an empty function.
func WithStorageBuilder ¶
func WithStorageBuilder(sb storage.Builder) ProcessorOption
WithStorageBuilder defines a builder for the storage of each partition.
func WithTester ¶
func WithTester(t Tester) ProcessorOption
WithTester configures all external connections of a processor, ie, storage, consumer and producer
func WithTopicManagerBuilder ¶
func WithTopicManagerBuilder(tmb TopicManagerBuilder) ProcessorOption
WithTopicManagerBuilder replaces the default topic manager builder.
func WithUpdateCallback ¶
func WithUpdateCallback(cb UpdateCallback) ProcessorOption
WithUpdateCallback defines the callback called upon recovering a message from the log.
type ProcessorStats ¶
type ProcessorStats struct { Group map[int32]*PartitionProcStats Lookup map[string]*ViewStats }
ProcessorStats represents the metrics of all partitions of the processor, including its group, joined tables and lookup tables.
type Producer ¶ added in v1.0.0
type Producer interface { // Emit sends a message to topic. Emit(topic string, key string, value []byte) *Promise EmitWithHeaders(topic string, key string, value []byte, headers map[string][]byte) *Promise Close() error }
Producer abstracts the kafka producer
type ProducerBuilder ¶ added in v1.0.0
type ProducerBuilder func(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error)
ProducerBuilder create a Kafka producer.
func ProducerBuilderWithConfig ¶ added in v1.0.0
func ProducerBuilderWithConfig(config *sarama.Config) ProducerBuilder
ProducerBuilderWithConfig creates a Kafka consumer using the Sarama library.
type Promise ¶ added in v1.0.0
Promise as in https://en.wikipedia.org/wiki/Futures_and_promises
func (*Promise) Finish ¶ added in v1.0.0
func (p *Promise) Finish(msg *sarama.ProducerMessage, err error) *Promise
Finish finishes the promise by executing all callbacks and saving the message/error for late subscribers
func (*Promise) ThenWithMessage ¶ added in v1.0.0
func (p *Promise) ThenWithMessage(callback func(msg *sarama.ProducerMessage, err error)) *Promise
ThenWithMessage chains a callback to the Promise
type RebalanceCallback ¶ added in v0.1.3
type RebalanceCallback func(a Assignment)
RebalanceCallback is invoked when the processor receives a new partition assignment.
type RecoveryStats ¶ added in v1.0.0
type RecoveryStats struct { StartTime time.Time RecoveryTime time.Time Offset int64 // last offset processed or recovered Hwm int64 // next offset to be written }
RecoveryStats groups statistics during recovery
type SaramaConsumerBuilder ¶ added in v1.0.0
SaramaConsumerBuilder creates a `sarama.Consumer`
func SaramaConsumerBuilderWithConfig ¶ added in v1.0.0
func SaramaConsumerBuilderWithConfig(config *sarama.Config) SaramaConsumerBuilder
SaramaConsumerBuilderWithConfig creates a sarama consumer using passed config
type Signal ¶ added in v1.0.0
type Signal struct {
// contains filtered or unexported fields
}
Signal allows synchronization on a state, waiting for that state and checking the current state
func (*Signal) ObserveStateChange ¶ added in v1.0.0
func (s *Signal) ObserveStateChange() *StateChangeObserver
ObserveStateChange returns a channel that receives state changes. Note that the caller must take care of consuming that channel, otherwise the Signal will block upon state changes.
func (*Signal) SetState ¶ added in v1.0.0
SetState changes the state of the signal and notifies all goroutines waiting for the new state
func (*Signal) WaitForState ¶ added in v1.0.0
WaitForState returns a channel that closes when the signal reaches passed state.
func (*Signal) WaitForStateMin ¶ added in v1.0.0
WaitForStateMin returns a channel that will be closed, when the signal enters passed state or higher (states are ints, so we're just comparing ints here)
type State ¶ added in v1.0.0
type State int
State types a state of the Signal
const ( // PPStateIdle marks the partition processor as idling (not started yet) PPStateIdle State = iota // PPStateRecovering indicates a recovering partition processor PPStateRecovering // PPStateRunning indicates a running partition processor PPStateRunning // PPStateStopping indicates a stopped partition processor PPStateStopping )
const ( // ProcStateIdle indicates an idling partition processor (not started yet) ProcStateIdle State = iota // ProcStateStarting indicates a starting partition processor, i.e. before rebalance ProcStateStarting // ProcStateSetup indicates a partition processor during setup of a rebalance round ProcStateSetup // ProcStateRunning indicates a running partition processor ProcStateRunning // ProcStateStopping indicates a stopping partition processor ProcStateStopping )
type StateChangeObserver ¶ added in v1.0.0
type StateChangeObserver struct {
// contains filtered or unexported fields
}
StateChangeObserver wraps a channel that triggers when the signal's state changes
func (*StateChangeObserver) C ¶ added in v1.0.0
func (s *StateChangeObserver) C() <-chan State
C returns the channel to observer state changes
func (*StateChangeObserver) Stop ¶ added in v1.0.0
func (s *StateChangeObserver) Stop()
Stop stops the observer. Its update channel will be closed and
type Stream ¶
type Stream string
Stream is the name of an event stream topic in Kafka, ie, a topic with cleanup.policy=delete
type Streams ¶
type Streams []Stream
Streams is a slice of Stream names.
func StringsToStreams ¶ added in v1.0.0
StringsToStreams is a simple cast/conversion functions that allows to pass a slice of strings as a slice of Stream (Streams) Avoids the boilerplate loop over the string array that would be necessary otherwise.
Example ¶
inputTopics := []string{"input1", "input2", "input3", } // use it, e.g. in the Inputs-Edge in the group graph graph := DefineGroup("group", Inputs(StringsToStreams(inputTopics...), new(codec.String), func(ctx Context, msg interface{}) {}), ) _ = graph
Output:
type Table ¶
type Table string
Table is the name of a table topic in Kafka, ie, a topic with cleanup.policy=compact
func GroupTable ¶
GroupTable returns the name of the group table of group.
type TableStats ¶ added in v1.0.0
type TableStats struct { Stalled bool Status PartitionStatus Recovery *RecoveryStats Input *InputStats Writes *OutputStats }
TableStats represents stats for a table partition
type Tester ¶
type Tester interface { StorageBuilder() storage.Builder ProducerBuilder() ProducerBuilder ConsumerGroupBuilder() ConsumerGroupBuilder ConsumerBuilder() SaramaConsumerBuilder EmitterProducerBuilder() ProducerBuilder TopicManagerBuilder() TopicManagerBuilder RegisterGroupGraph(*GroupGraph) string RegisterEmitter(Stream, Codec) RegisterView(Table, Codec) string }
Tester interface to avoid import cycles when a processor needs to register to the tester.
type TopicManager ¶ added in v1.0.0
type TopicManager interface { // EnsureTableExists checks that a table (log-compacted topic) exists, or create one if possible EnsureTableExists(topic string, npar int) error // EnsureStreamExists checks that a stream topic exists, or create one if possible EnsureStreamExists(topic string, npar int) error // EnsureTopicExists checks that a topic exists, or create one if possible, // enforcing the given configuration EnsureTopicExists(topic string, npar, rfactor int, config map[string]string) error // Partitions returns the number of partitions of a topic, that are assigned to the running // instance, i.e. it doesn't represent all partitions of a topic. Partitions(topic string) ([]int32, error) GetOffset(topic string, partitionID int32, time int64) (int64, error) // Close closes the topic manager Close() error }
TopicManager provides an interface to create/check topics and their partitions
func DefaultTopicManagerBuilder ¶ added in v1.0.0
func DefaultTopicManagerBuilder(brokers []string) (TopicManager, error)
DefaultTopicManagerBuilder creates TopicManager using the Sarama library.
func NewTopicManager ¶ added in v1.0.0
func NewTopicManager(brokers []string, saramaConfig *sarama.Config, topicManagerConfig *TopicManagerConfig) (TopicManager, error)
NewTopicManager creates a new topic manager using the sarama library
type TopicManagerBuilder ¶ added in v1.0.0
type TopicManagerBuilder func(brokers []string) (TopicManager, error)
TopicManagerBuilder creates a TopicManager to check partition counts and create tables.
func TopicManagerBuilderWithConfig ¶ added in v1.0.0
func TopicManagerBuilderWithConfig(config *sarama.Config, tmConfig *TopicManagerConfig) TopicManagerBuilder
TopicManagerBuilderWithConfig creates TopicManager using the Sarama library.
func TopicManagerBuilderWithTopicManagerConfig ¶ added in v1.0.0
func TopicManagerBuilderWithTopicManagerConfig(tmConfig *TopicManagerConfig) TopicManagerBuilder
TopicManagerBuilderWithTopicManagerConfig creates TopicManager using the Sarama library.
type TopicManagerConfig ¶ added in v1.0.0
type TopicManagerConfig struct { Table struct { Replication int } Stream struct { Replication int Retention time.Duration } }
TopicManagerConfig contains the configuration to access the Zookeeper servers as well as the desired options of to create tables and stream topics.
func NewTopicManagerConfig ¶ added in v1.0.0
func NewTopicManagerConfig() *TopicManagerConfig
NewTopicManagerConfig provides a default configuration for auto-creation with replication factor of 1 and rentention time of 1 hour.
type UpdateCallback ¶
UpdateCallback is invoked upon arrival of a message for a table partition. The partition storage shall be updated in the callback.
type View ¶
type View struct {
// contains filtered or unexported fields
}
View is a materialized (i.e. persistent) cache of a group table.
Example ¶
This example shows how views are typically created and used in the most basic way.
// create a new view view, err := NewView([]string{"localhost:9092"}, "input-topic", new(codec.String)) if err != nil { log.Fatalf("error creating view: %v", err) } // provide a cancelable ctx, cancel := context.WithCancel(context.Background()) defer cancel() // start the view done := make(chan struct{}) go func() { defer close(done) err := view.Run(ctx) if err != nil { log.Fatalf("Error running view: %v", err) } }() // wait for the view to be recovered // Option A: by polling for !view.Recovered() { select { case <-ctx.Done(): return case <-time.After(time.Second): } } // Option B: by waiting for the signal <-view.WaitRunning() // retrieve a value from the view val, err := view.Get("some-key") if err != nil { log.Fatalf("Error getting item from view: %v", err) } if val != nil { // cast it to string // no need for type assertion, if it was not that type, the codec would've failed log.Printf("got value %s", val.(string)) } has, err := view.Has("some-key") if err != nil { log.Fatalf("Error getting item from view: %v", err) } _ = has // stop the view and wait for it to shut down before returning cancel() <-done
Output:
Example (Autoreconnect) ¶
// create a new view view, err := NewView([]string{"localhost:9092"}, "input-topic", new(codec.String), // Automatically reconnect in case of errors. This is useful for services where availability // is more important than the data being up to date in case of kafka connection issues. WithViewAutoReconnect(), // Reconnect uses a default backoff mechanism, that can be modified by providing // a custom backoff builder using // WithViewBackoffBuilder(customBackoffBuilder), // When the view is running successfully for some time, the backoff is reset. // This time range can be modified using // WithViewBackoffResetTimeout(3*time.Second), ) if err != nil { log.Fatalf("error creating view: %v", err) } ctx, cancel := context.WithCancel(context.Background()) // start the view done := make(chan struct{}) go func() { defer close(done) err := view.Run(ctx) if err != nil { log.Fatalf("Error running view: %v", err) } }() <-view.WaitRunning() // at this point we can safely use the view with Has/Get/Iterate, // even if the kafka connection is lost // Stop the view and wait for it to shutdown before returning cancel() <-done
Output:
func (*View) CurrentState ¶ added in v1.0.0
CurrentState returns the current ViewState of the view This is useful for polling e.g. when implementing health checks or metrics
func (*View) Evict ¶
Evict removes the given key only from the local cache. In order to delete a key from Kafka and other Views, context.Delete should be used on a Processor.
func (*View) Get ¶
Get returns the value for the key in the view, if exists. Nil if it doesn't. Get can be called by multiple goroutines concurrently. Get can only be called after Recovered returns true.
func (*View) IteratorWithRange ¶
IteratorWithRange returns an iterator that iterates over the state of the View. This iterator is build using the range.
func (*View) ObserveStateChanges ¶ added in v1.0.0
func (v *View) ObserveStateChanges() *StateChangeObserver
ObserveStateChanges returns a StateChangeObserver that allows to handle state changes of the view by reading from a channel. It is crucial to continuously read from that channel, otherwise the View might deadlock upon state changes. If the observer is not needed, the caller must call observer.Stop()
Example
view := goka.NewView(...) go view.Run(ctx) go func(){ obs := view.ObserveStateChanges() defer obs.Stop() for { select{ case state, ok := <-obs.C: // handle state (or closed channel) case <-ctx.Done(): } } }()
func (*View) Run ¶
Run starts consuming the view's topic and saving updates in the local persistent cache.
The view will shutdown in case of errors or when the context is closed. It can be initialized with autoreconnect
view := NewView(..., WithViewAutoReconnect())
which makes the view internally reconnect in case of errors. Then it will only stop by canceling the context (see example).
func (*View) WaitRunning ¶ added in v1.0.0
func (v *View) WaitRunning() <-chan struct{}
WaitRunning returns a channel that will be closed when the view enters the running state
type ViewOption ¶
ViewOption defines a configuration option to be used when creating a view.
func WithViewAutoReconnect ¶ added in v1.0.0
func WithViewAutoReconnect() ViewOption
WithViewAutoReconnect defines the view is reconnecting internally, so Run() does not return in case of connection errors. The view must be shutdown by cancelling the context passed to Run()
func WithViewBackoffBuilder ¶ added in v1.0.0
func WithViewBackoffBuilder(bb BackoffBuilder) ViewOption
WithViewBackoffBuilder replaced the default backoff.
func WithViewBackoffResetTimeout ¶ added in v1.0.0
func WithViewBackoffResetTimeout(duration time.Duration) ViewOption
WithViewBackoffResetTimeout defines the timeout when the backoff will be reset.
func WithViewCallback ¶
func WithViewCallback(cb UpdateCallback) ViewOption
WithViewCallback defines the callback called upon recovering a message from the log.
func WithViewClientID ¶
func WithViewClientID(clientID string) ViewOption
WithViewClientID defines the client ID used to identify with Kafka.
func WithViewConsumerSaramaBuilder ¶ added in v1.0.0
func WithViewConsumerSaramaBuilder(cgb SaramaConsumerBuilder) ViewOption
WithViewConsumerSaramaBuilder replaces the default sarama consumer builder
func WithViewHasher ¶
func WithViewHasher(hasher func() hash.Hash32) ViewOption
WithViewHasher sets the hash function that assigns keys to partitions.
func WithViewLogger ¶
func WithViewLogger(log logger.Logger) ViewOption
WithViewLogger sets the logger the view should use. By default, views use the standard library logger.
func WithViewRestartable ¶
func WithViewRestartable() ViewOption
WithViewRestartable is kept only for backwards compatibility. DEPRECATED: since the behavior has changed, this name is misleading and should be replaced by WithViewAutoReconnect().
func WithViewStorageBuilder ¶
func WithViewStorageBuilder(sb storage.Builder) ViewOption
WithViewStorageBuilder defines a builder for the storage of each partition.
func WithViewTester ¶ added in v0.1.2
func WithViewTester(t Tester) ViewOption
WithViewTester configures all external connections of a processor, ie, storage, consumer and producer
func WithViewTopicManagerBuilder ¶
func WithViewTopicManagerBuilder(tmb TopicManagerBuilder) ViewOption
WithViewTopicManagerBuilder replaces the default topic manager.
type ViewState ¶ added in v1.0.0
type ViewState int
ViewState represents the state of the view
const ( // ViewStateIdle - the view is not started yet ViewStateIdle ViewState = iota // ViewStateInitializing - the view (i.e. at least one partition) is initializing ViewStateInitializing // ViewStateConnecting - the view (i.e. at least one partition) is (re-)connecting ViewStateConnecting // ViewStateCatchUp - the view (i.e. at least one partition) is still catching up ViewStateCatchUp // ViewStateRunning - the view (i.e. all partitions) has caught up and is running ViewStateRunning )
type ViewStats ¶
type ViewStats struct {
Partitions map[int32]*TableStats
}
ViewStats represents the metrics of all partitions of a view.
Source Files ¶
- assignment.go
- broker.go
- builders.go
- codec.go
- config.go
- context.go
- copartition_strategy.go
- doc.go
- emitter.go
- errors.go
- graph.go
- iterator.go
- mockautoconsumers.go
- mockbuilder.go
- mockcontroller.go
- mocks.go
- mockssarama.go
- mockstorage.go
- once.go
- options.go
- partition_processor.go
- partition_table.go
- processor.go
- producer.go
- promise.go
- proxy.go
- signal.go
- simple_backoff.go
- stats.go
- topic_manager.go
- view.go
Directories ¶
Path | Synopsis |
---|---|
examples
|
|
internal
|
|
This package provides a kafka mock that allows integration testing of goka processors.
|
This package provides a kafka mock that allows integration testing of goka processors. |
web
|
|