Documentation ¶
Overview ¶
Package goka is a stateful stream processing library for Apache Kafka (version 0.9+) that eases the development of microservices. Goka extends the concept of consumer group with a group table, which represents the state of the group. A microservice modifies and serves the content of a table employing two complementary object types: processors and views.
Processors ¶
A processor is a set of callback functions that modify the group table when messages arrive and may also emit messages into other topics. Messages as well as rows in the group table are key-value pairs. Callbacks receive the arriving message and the row addressed by the message's key.
In Kafka, keys are used to partition topics. A goka processor consumes from a set of co-partitioned topics (topics with the same number of partitions and the same key range). A group topic keeps track of the group table updates, allowing for recovery and rebalancing of processors: When multiple processor instances start in the same consumer group, the instances split the co-partitioned input topics and load the respective group table partitions from the group topic. A local disk storage minimizes recovery time by caching partitions of group table.
Views ¶
A view is a materialized (ie, persistent) cache of a group table. A view subscribes for the updates of all partitions of a group table and keeps local disk storage in sync with the group topic. With a view, one can easily serve up-to-date content of the group table via, for example, gRPC.
Package goka is a generated GoMock package.
Package goka is a generated GoMock package.
Package goka is a generated GoMock package.
Index ¶
- Variables
- func Debug(gokaDebug, saramaDebug bool)
- func DefaultConfig() *sarama.Config
- func DefaultConsumerGroupBuilder(brokers []string, group, clientID string) (sarama.ConsumerGroup, error)
- func DefaultHasher() func() hash.Hash32
- func DefaultProcessorStoragePath(group Group) string
- func DefaultRebalance(a Assignment)
- func DefaultSaramaConsumerBuilder(brokers []string, clientID string) (sarama.Consumer, error)
- func DefaultUpdate(ctx UpdateContext, s storage.Storage, key string, value []byte) error
- func DefaultViewStoragePath() string
- func NewMockController(t gomock.TestReporter) *gomock.Controller
- func NewPromiseWithFinisher() (*Promise, PromiseFinisher)
- func ReplaceGlobalConfig(config *sarama.Config)
- func ResetSuffixes()
- func SetLoopSuffix(suffix string)
- func SetSaramaLogger(logger Logger)
- func SetTableSuffix(suffix string)
- type Assignment
- type Backoff
- type BackoffBuilder
- type Broker
- type Codec
- type ConsumerGroupBuilder
- type Context
- type ContextOption
- type DefaultUpdateContext
- type Edge
- func Input(topic Stream, c Codec, cb ProcessCallback) Edge
- func Inputs(topics Streams, c Codec, cb ProcessCallback) Edge
- func Join(topic Table, c Codec) Edge
- func Lookup(topic Table, c Codec) Edge
- func Loop(c Codec, cb ProcessCallback) Edge
- func Output(topic Stream, c Codec) Edge
- func Persist(c Codec) Edge
- func Visitor(name string, cb ProcessCallback) Edge
- type Edges
- type Emitter
- func (e *Emitter) Emit(key string, msg interface{}) (*Promise, error)
- func (e *Emitter) EmitSync(key string, msg interface{}) error
- func (e *Emitter) EmitSyncWithHeaders(key string, msg interface{}, headers Headers) error
- func (e *Emitter) EmitWithHeaders(key string, msg interface{}, headers Headers) (*Promise, error)
- func (e *Emitter) Finish() error
- type EmitterOption
- func WithEmitterClientID(clientID string) EmitterOption
- func WithEmitterDefaultHeaders(headers Headers) EmitterOption
- func WithEmitterHasher(hasher func() hash.Hash32) EmitterOption
- func WithEmitterLogger(l Logger) EmitterOption
- func WithEmitterProducerBuilder(pb ProducerBuilder) EmitterOption
- func WithEmitterTester(t Tester) EmitterOption
- func WithEmitterTopicManagerBuilder(tmb TopicManagerBuilder) EmitterOption
- type Getter
- type Group
- type GroupGraph
- func (gg *GroupGraph) AllEdges() Edges
- func (gg *GroupGraph) Group() Group
- func (gg *GroupGraph) GroupTable() Edge
- func (gg *GroupGraph) InputStreams() Edges
- func (gg *GroupGraph) JointTables() Edges
- func (gg *GroupGraph) LookupTables() Edges
- func (gg *GroupGraph) LoopStream() Edge
- func (gg *GroupGraph) OutputStreams() Edges
- func (gg *GroupGraph) Validate() error
- type Headers
- type InputStats
- type Iterator
- type Logger
- type MockAutoConsumer
- func (c *MockAutoConsumer) Close() error
- func (c *MockAutoConsumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error)
- func (c *MockAutoConsumer) ExpectConsumePartition(topic string, partition int32, offset int64) *MockAutoPartitionConsumer
- func (c *MockAutoConsumer) HighWaterMarks() map[string]map[int32]int64
- func (c *MockAutoConsumer) Partitions(topic string) ([]int32, error)
- func (c *MockAutoConsumer) Pause(topicPartitions map[string][]int32)
- func (c *MockAutoConsumer) PauseAll()
- func (c *MockAutoConsumer) Resume(topicPartitions map[string][]int32)
- func (c *MockAutoConsumer) ResumeAll()
- func (c *MockAutoConsumer) SetTopicMetadata(metadata map[string][]int32)
- func (c *MockAutoConsumer) Topics() ([]string, error)
- type MockAutoPartitionConsumer
- func (pc *MockAutoPartitionConsumer) AsyncClose()
- func (pc *MockAutoPartitionConsumer) Close() error
- func (pc *MockAutoPartitionConsumer) Errors() <-chan *sarama.ConsumerError
- func (pc *MockAutoPartitionConsumer) ExpectErrorsDrainedOnClose()
- func (pc *MockAutoPartitionConsumer) ExpectMessagesDrainedOnClose()
- func (pc *MockAutoPartitionConsumer) HighWaterMarkOffset() int64
- func (pc *MockAutoPartitionConsumer) IsPaused() bool
- func (pc *MockAutoPartitionConsumer) Messages() <-chan *sarama.ConsumerMessage
- func (pc *MockAutoPartitionConsumer) Pause()
- func (pc *MockAutoPartitionConsumer) Resume()
- func (pc *MockAutoPartitionConsumer) YieldError(err error)
- func (pc *MockAutoPartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage)
- type MockBroker
- type MockBrokerMockRecorder
- type MockClient
- func (m *MockClient) Broker(arg0 int32) (*sarama.Broker, error)
- func (m *MockClient) Brokers() []*sarama.Broker
- func (m *MockClient) Close() error
- func (m *MockClient) Closed() bool
- func (m *MockClient) Config() *sarama.Config
- func (m *MockClient) Controller() (*sarama.Broker, error)
- func (m *MockClient) Coordinator(arg0 string) (*sarama.Broker, error)
- func (m *MockClient) EXPECT() *MockClientMockRecorder
- func (m *MockClient) GetOffset(arg0 string, arg1 int32, arg2 int64) (int64, error)
- func (m *MockClient) InSyncReplicas(arg0 string, arg1 int32) ([]int32, error)
- func (m *MockClient) InitProducerID() (*sarama.InitProducerIDResponse, error)
- func (m *MockClient) Leader(arg0 string, arg1 int32) (*sarama.Broker, error)
- func (m *MockClient) OfflineReplicas(arg0 string, arg1 int32) ([]int32, error)
- func (m *MockClient) Partitions(arg0 string) ([]int32, error)
- func (m *MockClient) RefreshBrokers(arg0 []string) error
- func (m *MockClient) RefreshController() (*sarama.Broker, error)
- func (m *MockClient) RefreshCoordinator(arg0 string) error
- func (m *MockClient) RefreshMetadata(arg0 ...string) error
- func (m *MockClient) Replicas(arg0 string, arg1 int32) ([]int32, error)
- func (m *MockClient) Topics() ([]string, error)
- func (m *MockClient) WritablePartitions(arg0 string) ([]int32, error)
- type MockClientMockRecorder
- func (mr *MockClientMockRecorder) Broker(arg0 interface{}) *gomock.Call
- func (mr *MockClientMockRecorder) Brokers() *gomock.Call
- func (mr *MockClientMockRecorder) Close() *gomock.Call
- func (mr *MockClientMockRecorder) Closed() *gomock.Call
- func (mr *MockClientMockRecorder) Config() *gomock.Call
- func (mr *MockClientMockRecorder) Controller() *gomock.Call
- func (mr *MockClientMockRecorder) Coordinator(arg0 interface{}) *gomock.Call
- func (mr *MockClientMockRecorder) GetOffset(arg0, arg1, arg2 interface{}) *gomock.Call
- func (mr *MockClientMockRecorder) InSyncReplicas(arg0, arg1 interface{}) *gomock.Call
- func (mr *MockClientMockRecorder) InitProducerID() *gomock.Call
- func (mr *MockClientMockRecorder) Leader(arg0, arg1 interface{}) *gomock.Call
- func (mr *MockClientMockRecorder) OfflineReplicas(arg0, arg1 interface{}) *gomock.Call
- func (mr *MockClientMockRecorder) Partitions(arg0 interface{}) *gomock.Call
- func (mr *MockClientMockRecorder) RefreshBrokers(arg0 interface{}) *gomock.Call
- func (mr *MockClientMockRecorder) RefreshController() *gomock.Call
- func (mr *MockClientMockRecorder) RefreshCoordinator(arg0 interface{}) *gomock.Call
- func (mr *MockClientMockRecorder) RefreshMetadata(arg0 ...interface{}) *gomock.Call
- func (mr *MockClientMockRecorder) Replicas(arg0, arg1 interface{}) *gomock.Call
- func (mr *MockClientMockRecorder) Topics() *gomock.Call
- func (mr *MockClientMockRecorder) WritablePartitions(arg0 interface{}) *gomock.Call
- type MockClusterAdmin
- func (m *MockClusterAdmin) AlterClientQuotas(arg0 []sarama.QuotaEntityComponent, arg1 sarama.ClientQuotasOp, arg2 bool) error
- func (m *MockClusterAdmin) AlterConfig(arg0 sarama.ConfigResourceType, arg1 string, arg2 map[string]*string, ...) error
- func (m *MockClusterAdmin) AlterPartitionReassignments(arg0 string, arg1 [][]int32) error
- func (m *MockClusterAdmin) Close() error
- func (m *MockClusterAdmin) Controller() (*sarama.Broker, error)
- func (m *MockClusterAdmin) CreateACL(arg0 sarama.Resource, arg1 sarama.Acl) error
- func (m *MockClusterAdmin) CreateACLs(acls []*sarama.ResourceAcls) error
- func (m *MockClusterAdmin) CreatePartitions(arg0 string, arg1 int32, arg2 [][]int32, arg3 bool) error
- func (m *MockClusterAdmin) CreateTopic(arg0 string, arg1 *sarama.TopicDetail, arg2 bool) error
- func (m *MockClusterAdmin) DeleteACL(arg0 sarama.AclFilter, arg1 bool) ([]sarama.MatchingAcl, error)
- func (m *MockClusterAdmin) DeleteConsumerGroup(arg0 string) error
- func (m *MockClusterAdmin) DeleteConsumerGroupOffset(arg0, arg1 string, arg2 int32) error
- func (m *MockClusterAdmin) DeleteRecords(arg0 string, arg1 map[int32]int64) error
- func (m *MockClusterAdmin) DeleteTopic(arg0 string) error
- func (m *MockClusterAdmin) DeleteUserScramCredentials(arg0 []sarama.AlterUserScramCredentialsDelete) ([]*sarama.AlterUserScramCredentialsResult, error)
- func (m *MockClusterAdmin) DescribeClientQuotas(arg0 []sarama.QuotaFilterComponent, arg1 bool) ([]sarama.DescribeClientQuotasEntry, error)
- func (m *MockClusterAdmin) DescribeCluster() ([]*sarama.Broker, int32, error)
- func (m *MockClusterAdmin) DescribeConfig(arg0 sarama.ConfigResource) ([]sarama.ConfigEntry, error)
- func (m *MockClusterAdmin) DescribeConsumerGroups(arg0 []string) ([]*sarama.GroupDescription, error)
- func (m *MockClusterAdmin) DescribeLogDirs(arg0 []int32) (map[int32][]sarama.DescribeLogDirsResponseDirMetadata, error)
- func (m *MockClusterAdmin) DescribeTopics(arg0 []string) ([]*sarama.TopicMetadata, error)
- func (m *MockClusterAdmin) DescribeUserScramCredentials(arg0 []string) ([]*sarama.DescribeUserScramCredentialsResult, error)
- func (m *MockClusterAdmin) EXPECT() *MockClusterAdminMockRecorder
- func (m *MockClusterAdmin) IncrementalAlterConfig(arg0 sarama.ConfigResourceType, arg1 string, ...) error
- func (m *MockClusterAdmin) ListAcls(arg0 sarama.AclFilter) ([]sarama.ResourceAcls, error)
- func (m *MockClusterAdmin) ListConsumerGroupOffsets(arg0 string, arg1 map[string][]int32) (*sarama.OffsetFetchResponse, error)
- func (m *MockClusterAdmin) ListConsumerGroups() (map[string]string, error)
- func (m *MockClusterAdmin) ListPartitionReassignments(arg0 string, arg1 []int32) (map[string]map[int32]*sarama.PartitionReplicaReassignmentsStatus, error)
- func (m *MockClusterAdmin) ListTopics() (map[string]sarama.TopicDetail, error)
- func (m *MockClusterAdmin) UpsertUserScramCredentials(arg0 []sarama.AlterUserScramCredentialsUpsert) ([]*sarama.AlterUserScramCredentialsResult, error)
- type MockClusterAdminMockRecorder
- func (mr *MockClusterAdminMockRecorder) AlterClientQuotas(arg0, arg1, arg2 interface{}) *gomock.Call
- func (mr *MockClusterAdminMockRecorder) AlterConfig(arg0, arg1, arg2, arg3 interface{}) *gomock.Call
- func (mr *MockClusterAdminMockRecorder) AlterPartitionReassignments(arg0, arg1 interface{}) *gomock.Call
- func (mr *MockClusterAdminMockRecorder) Close() *gomock.Call
- func (mr *MockClusterAdminMockRecorder) Controller() *gomock.Call
- func (mr *MockClusterAdminMockRecorder) CreateACL(arg0, arg1 interface{}) *gomock.Call
- func (mr *MockClusterAdminMockRecorder) CreatePartitions(arg0, arg1, arg2, arg3 interface{}) *gomock.Call
- func (mr *MockClusterAdminMockRecorder) CreateTopic(arg0, arg1, arg2 interface{}) *gomock.Call
- func (mr *MockClusterAdminMockRecorder) DeleteACL(arg0, arg1 interface{}) *gomock.Call
- func (mr *MockClusterAdminMockRecorder) DeleteConsumerGroup(arg0 interface{}) *gomock.Call
- func (mr *MockClusterAdminMockRecorder) DeleteConsumerGroupOffset(arg0, arg1, arg2 interface{}) *gomock.Call
- func (mr *MockClusterAdminMockRecorder) DeleteRecords(arg0, arg1 interface{}) *gomock.Call
- func (mr *MockClusterAdminMockRecorder) DeleteTopic(arg0 interface{}) *gomock.Call
- func (mr *MockClusterAdminMockRecorder) DeleteUserScramCredentials(arg0 interface{}) *gomock.Call
- func (mr *MockClusterAdminMockRecorder) DescribeClientQuotas(arg0, arg1 interface{}) *gomock.Call
- func (mr *MockClusterAdminMockRecorder) DescribeCluster() *gomock.Call
- func (mr *MockClusterAdminMockRecorder) DescribeConfig(arg0 interface{}) *gomock.Call
- func (mr *MockClusterAdminMockRecorder) DescribeConsumerGroups(arg0 interface{}) *gomock.Call
- func (mr *MockClusterAdminMockRecorder) DescribeLogDirs(arg0 interface{}) *gomock.Call
- func (mr *MockClusterAdminMockRecorder) DescribeTopics(arg0 interface{}) *gomock.Call
- func (mr *MockClusterAdminMockRecorder) DescribeUserScramCredentials(arg0 interface{}) *gomock.Call
- func (mr *MockClusterAdminMockRecorder) IncrementalAlterConfig(arg0, arg1, arg2, arg3 interface{}) *gomock.Call
- func (mr *MockClusterAdminMockRecorder) ListAcls(arg0 interface{}) *gomock.Call
- func (mr *MockClusterAdminMockRecorder) ListConsumerGroupOffsets(arg0, arg1 interface{}) *gomock.Call
- func (mr *MockClusterAdminMockRecorder) ListConsumerGroups() *gomock.Call
- func (mr *MockClusterAdminMockRecorder) ListPartitionReassignments(arg0, arg1 interface{}) *gomock.Call
- func (mr *MockClusterAdminMockRecorder) ListTopics() *gomock.Call
- func (mr *MockClusterAdminMockRecorder) UpsertUserScramCredentials(arg0 interface{}) *gomock.Call
- type MockConsumerGroup
- func (cg *MockConsumerGroup) Close() error
- func (cg *MockConsumerGroup) Consume(ctx context.Context, topics []string, handler sarama.ConsumerGroupHandler) error
- func (cg *MockConsumerGroup) Errors() <-chan error
- func (cg *MockConsumerGroup) FailOnConsume(err error)
- func (cg *MockConsumerGroup) Pause(partitions map[string][]int32)
- func (cg *MockConsumerGroup) PauseAll()
- func (cg *MockConsumerGroup) Resume(partitions map[string][]int32)
- func (cg *MockConsumerGroup) ResumeAll()
- func (cg *MockConsumerGroup) SendError(err error)
- func (cg *MockConsumerGroup) SendMessage(message *sarama.ConsumerMessage) <-chan struct{}
- func (cg *MockConsumerGroup) SendMessageWait(message *sarama.ConsumerMessage)
- type MockConsumerGroupClaim
- type MockConsumerGroupSession
- func (cgs *MockConsumerGroupSession) Claims() map[string][]int32
- func (cgs *MockConsumerGroupSession) Commit()
- func (cgs *MockConsumerGroupSession) Context() context.Context
- func (cgs *MockConsumerGroupSession) GenerationID() int32
- func (cgs *MockConsumerGroupSession) MarkMessage(msg *sarama.ConsumerMessage, metadata string)
- func (cgs *MockConsumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string)
- func (cgs *MockConsumerGroupSession) MemberID() string
- func (cgs *MockConsumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string)
- func (cgs *MockConsumerGroupSession) SendMessage(msg *sarama.ConsumerMessage)
- type MockProducer
- type MockProducerMockRecorder
- type MockStorage
- func (m *MockStorage) Close() error
- func (m *MockStorage) Delete(arg0 string) error
- func (m *MockStorage) EXPECT() *MockStorageMockRecorder
- func (m *MockStorage) Get(arg0 string) ([]byte, error)
- func (m *MockStorage) GetOffset(arg0 int64) (int64, error)
- func (m *MockStorage) Has(arg0 string) (bool, error)
- func (m *MockStorage) Iterator() (storage.Iterator, error)
- func (m *MockStorage) IteratorWithRange(arg0, arg1 []byte) (storage.Iterator, error)
- func (m *MockStorage) MarkRecovered() error
- func (m *MockStorage) Open() error
- func (m *MockStorage) Set(arg0 string, arg1 []byte) error
- func (m *MockStorage) SetOffset(arg0 int64) error
- type MockStorageMockRecorder
- func (mr *MockStorageMockRecorder) Close() *gomock.Call
- func (mr *MockStorageMockRecorder) Delete(arg0 interface{}) *gomock.Call
- func (mr *MockStorageMockRecorder) Get(arg0 interface{}) *gomock.Call
- func (mr *MockStorageMockRecorder) GetOffset(arg0 interface{}) *gomock.Call
- func (mr *MockStorageMockRecorder) Has(arg0 interface{}) *gomock.Call
- func (mr *MockStorageMockRecorder) Iterator() *gomock.Call
- func (mr *MockStorageMockRecorder) IteratorWithRange(arg0, arg1 interface{}) *gomock.Call
- func (mr *MockStorageMockRecorder) MarkRecovered() *gomock.Call
- func (mr *MockStorageMockRecorder) Open() *gomock.Call
- func (mr *MockStorageMockRecorder) Set(arg0, arg1 interface{}) *gomock.Call
- func (mr *MockStorageMockRecorder) SetOffset(arg0 interface{}) *gomock.Call
- type MockTopicManager
- func (m *MockTopicManager) Close() error
- func (m *MockTopicManager) EXPECT() *MockTopicManagerMockRecorder
- func (m *MockTopicManager) EnsureStreamExists(arg0 string, arg1 int) error
- func (m *MockTopicManager) EnsureTableExists(arg0 string, arg1 int) error
- func (m *MockTopicManager) EnsureTopicExists(arg0 string, arg1, arg2 int, arg3 map[string]string) error
- func (m *MockTopicManager) GetOffset(arg0 string, arg1 int32, arg2 int64) (int64, error)
- func (m *MockTopicManager) Partitions(arg0 string) ([]int32, error)
- type MockTopicManagerMockRecorder
- func (mr *MockTopicManagerMockRecorder) Close() *gomock.Call
- func (mr *MockTopicManagerMockRecorder) EnsureStreamExists(arg0, arg1 interface{}) *gomock.Call
- func (mr *MockTopicManagerMockRecorder) EnsureTableExists(arg0, arg1 interface{}) *gomock.Call
- func (mr *MockTopicManagerMockRecorder) EnsureTopicExists(arg0, arg1, arg2, arg3 interface{}) *gomock.Call
- func (mr *MockTopicManagerMockRecorder) GetOffset(arg0, arg1, arg2 interface{}) *gomock.Call
- func (mr *MockTopicManagerMockRecorder) Partitions(arg0 interface{}) *gomock.Call
- type NilHandling
- type OutputStats
- type PPRunMode
- type PartitionProcStats
- type PartitionProcessor
- type PartitionStatus
- type PartitionTable
- func (p *PartitionTable) CatchupForever(ctx context.Context, restartOnError bool) error
- func (p *PartitionTable) Close() error
- func (p *PartitionTable) CurrentState() PartitionStatus
- func (p *PartitionTable) Delete(key string) error
- func (p *PartitionTable) Get(key string) ([]byte, error)
- func (p *PartitionTable) GetOffset(defValue int64) (int64, error)
- func (p *PartitionTable) Has(key string) (bool, error)
- func (p *PartitionTable) IsRecovered() bool
- func (p *PartitionTable) Iterator() (storage.Iterator, error)
- func (p *PartitionTable) IteratorWithRange(start []byte, limit []byte) (storage.Iterator, error)
- func (p *PartitionTable) RunStatsLoop(ctx context.Context)
- func (p *PartitionTable) Set(key string, value []byte) error
- func (p *PartitionTable) SetOffset(value int64) error
- func (p *PartitionTable) SetupAndRecover(ctx context.Context, restartOnError bool) error
- func (p *PartitionTable) TrackMessageWrite(ctx context.Context, length int)
- func (p *PartitionTable) WaitRecovered() <-chan struct{}
- type ProcessCallback
- type Processor
- func (g *Processor) Cleanup(session sarama.ConsumerGroupSession) error
- func (g *Processor) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (g *Processor) Get(key string) (interface{}, error)
- func (g *Processor) Graph() *GroupGraph
- func (g *Processor) PauseAll()
- func (g *Processor) Recovered() bool
- func (g *Processor) ResumeAll()
- func (g *Processor) Run(ctx context.Context) (rerr error)
- func (g *Processor) Setup(session sarama.ConsumerGroupSession) error
- func (g *Processor) StateReader() StateReader
- func (g *Processor) Stats() *ProcessorStats
- func (g *Processor) StatsWithContext(ctx context.Context) *ProcessorStats
- func (g *Processor) Stop()
- func (g *Processor) VisitAll(ctx context.Context, name string, meta interface{}) error
- func (g *Processor) VisitAllWithStats(ctx context.Context, name string, meta interface{}) (int64, error)
- func (g *Processor) WaitForReady()
- func (g *Processor) WaitForReadyContext(ctx context.Context) error
- type ProcessorOption
- func WithBackoffBuilder(bb BackoffBuilder) ProcessorOption
- func WithBackoffResetTimeout(duration time.Duration) ProcessorOption
- func WithClientID(clientID string) ProcessorOption
- func WithConsumerGroupBuilder(cgb ConsumerGroupBuilder) ProcessorOption
- func WithConsumerSaramaBuilder(cgb SaramaConsumerBuilder) ProcessorOption
- func WithGroupGraphHook(hook func(gg *GroupGraph)) ProcessorOption
- func WithHasher(hasher func() hash.Hash32) ProcessorOption
- func WithHotStandby() ProcessorOption
- func WithLogger(l Logger) ProcessorOption
- func WithNilHandling(nh NilHandling) ProcessorOption
- func WithPartitionChannelSize(size int) ProcessorOption
- func WithProducerBuilder(pb ProducerBuilder) ProcessorOption
- func WithProducerDefaultHeaders(hdr Headers) ProcessorOption
- func WithRebalanceCallback(cb RebalanceCallback) ProcessorOption
- func WithRecoverAhead() ProcessorOption
- func WithStorageBuilder(sb storage.Builder) ProcessorOption
- func WithTester(t Tester) ProcessorOption
- func WithTopicManagerBuilder(tmb TopicManagerBuilder) ProcessorOption
- func WithUpdateCallback(cb UpdateCallback) ProcessorOption
- type ProcessorStats
- type Producer
- type ProducerBuilder
- type Promise
- type PromiseFinisher
- type RebalanceCallback
- type RecoveryStats
- type SaramaConsumerBuilder
- type Signal
- func (s *Signal) IsState(state State) bool
- func (s *Signal) ObserveStateChange() *StateChangeObserver
- func (s *Signal) SetState(state State) *Signal
- func (s *Signal) State() State
- func (s *Signal) WaitForState(state State) <-chan struct{}
- func (s *Signal) WaitForStateMin(state State) <-chan struct{}
- type State
- type StateChangeObserver
- type StateReader
- type Stream
- type Streams
- type TMConfigMismatchBehavior
- type Table
- type TableStats
- type Tester
- type TopicManager
- type TopicManagerBuilder
- type TopicManagerConfig
- type UpdateCallback
- type UpdateContext
- type View
- func (v *View) CurrentState() ViewState
- func (v *View) Evict(key string) error
- func (v *View) Get(key string) (interface{}, error)
- func (v *View) Has(key string) (bool, error)
- func (v *View) Iterator() (Iterator, error)
- func (v *View) IteratorWithRange(start, limit string) (Iterator, error)
- func (v *View) ObserveStateChanges() *StateChangeObserver
- func (v *View) Recovered() bool
- func (v *View) Run(ctx context.Context) (rerr error)
- func (v *View) Stats(ctx context.Context) *ViewStats
- func (v *View) Topic() string
- func (v *View) WaitRunning() <-chan struct{}
- type ViewOption
- func WithViewAutoReconnect() ViewOption
- func WithViewBackoffBuilder(bb BackoffBuilder) ViewOption
- func WithViewBackoffResetTimeout(duration time.Duration) ViewOption
- func WithViewCallback(cb UpdateCallback) ViewOption
- func WithViewClientID(clientID string) ViewOption
- func WithViewConsumerSaramaBuilder(cgb SaramaConsumerBuilder) ViewOption
- func WithViewHasher(hasher func() hash.Hash32) ViewOption
- func WithViewLogger(l Logger) ViewOption
- func WithViewRestartable() ViewOption
- func WithViewStorageBuilder(sb storage.Builder) ViewOption
- func WithViewTester(t Tester) ViewOption
- func WithViewTopicManagerBuilder(tmb TopicManagerBuilder) ViewOption
- type ViewState
- type ViewStats
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // CopartitioningStrategy is the rebalance strategy necessary to guarantee the copartitioning // when consuming multiple input topics with multiple processor instances. // This strategy tolerates different sets of topics per member of consumer group to allow // rolling upgrades of processors. // // Note that the topic inconcistency needs to be only temporarily, otherwise not all topic partitions will be consumed as in // the following example: // Assume having topics X and Y, each with partitions [0,1,2] // MemberA requests topic X // MemberB requests topics X and Y, because topic Y was newly added to the processor. // // Then the strategy will plan as follows: // MemberA: X: [0,1] // MemberB: X: [2], Y:[2] // // That means partitions [0,1] from topic Y are not being consumed. // So the assumption is that memberA will be redeployed so that after a second rebalance // both members consume both topics and all partitions. // // If you do not use rolling upgrades, i.e. replace all members of a group simultaneously, it is // safe to use the StrictCopartitioningStrategy CopartitioningStrategy = new(copartitioningStrategy) // StrictCopartitioningStrategy behaves like the copartitioning strategy but it will fail if two members of a consumer group // request a different set of topics, which might indicate a bug or a reused consumer group name. StrictCopartitioningStrategy = &copartitioningStrategy{ failOnInconsistentTopics: true, } )
var ( // ErrEmitterAlreadyClosed is returned when Emit is called after the emitter has been finished. ErrEmitterAlreadyClosed error = errors.New("emitter already closed") )
var ErrVisitAborted = errors.New("VisitAll aborted due to context cancel or rebalance")
ErrVisitAborted indicates a call to VisitAll could not finish due to rebalance or processor shutdown
Functions ¶
func Debug ¶
func Debug(gokaDebug, saramaDebug bool)
Debug enables or disables debug logging using the global logger. The goka debugging setting is applied to any custom loggers in goka components (Processors, Views, Emitters).
func DefaultConfig ¶
DefaultConfig creates a new config used by goka per default Use it to modify and pass to `goka.ReplaceGlobalConifg(...)` to modify goka's global config
func DefaultConsumerGroupBuilder ¶
func DefaultConsumerGroupBuilder(brokers []string, group, clientID string) (sarama.ConsumerGroup, error)
DefaultConsumerGroupBuilder creates a Kafka consumer using the Sarama library.
func DefaultHasher ¶
DefaultHasher returns an FNV hasher builder to assign keys to partitions.
func DefaultProcessorStoragePath ¶
DefaultProcessorStoragePath is the default path where processor state will be stored.
func DefaultRebalance ¶
func DefaultRebalance(a Assignment)
DefaultRebalance is the default callback when a new partition assignment is received. DefaultRebalance can be used in the function passed to WithRebalanceCallback.
func DefaultSaramaConsumerBuilder ¶
DefaultSaramaConsumerBuilder creates a Kafka consumer using the Sarama library.
func DefaultUpdate ¶
DefaultUpdate is the default callback used to update the local storage with from the table topic in Kafka. It is called for every message received during recovery of processors and during the normal operation of views. DefaultUpdate can be used in the function passed to WithUpdateCallback and WithViewCallback.
func DefaultViewStoragePath ¶
func DefaultViewStoragePath() string
DefaultViewStoragePath returns the default path where view state will be stored.
func NewMockController ¶
func NewMockController(t gomock.TestReporter) *gomock.Controller
NewMockController returns a *gomock.Controller using a wrapped testing.T (or whatever) which panics on a Fatalf. This is necessary when using a mock in kafkamock. Otherwise it will freeze on an unexpected call.
func NewPromiseWithFinisher ¶
func NewPromiseWithFinisher() (*Promise, PromiseFinisher)
NewPromiseWithFinisher creates a new Promise and a separate finish method. This is necessary if the promise is used outside of goka package.
func ReplaceGlobalConfig ¶
ReplaceGlobalConfig registeres a standard config used during building if no other config is specified
func ResetSuffixes ¶
func ResetSuffixes()
ResetSuffixes reset both `loopSuffix` and `tableSuffix` to their default value. This function is helpful when there are multiple testcases, so you can do clean-up any change for suffixes.
func SetLoopSuffix ¶
func SetLoopSuffix(suffix string)
SetLoopSuffix changes `loopSuffix` which is a suffix for loop topic of group. Use it to modify loop topic's suffix to otherwise in case you cannot use the default suffix.
func SetSaramaLogger ¶
func SetSaramaLogger(logger Logger)
func SetTableSuffix ¶
func SetTableSuffix(suffix string)
SetTableSuffix changes `tableSuffix` which is a suffix for table topic. Use it to modify table's suffix to otherwise in case you cannot use the default suffix.
Types ¶
type Assignment ¶
Assignment represents a partition:offset assignment for the current connection
type Backoff ¶
Backoff is used for adding backoff capabilities to the restarting of failing partition tables.
func DefaultBackoffBuilder ¶
DefaultBackoffBuilder returnes a simpleBackoff with 10 seconds step increase and 2 minutes max wait
type Broker ¶
type Broker interface { Addr() string Connected() (bool, error) CreateTopics(request *sarama.CreateTopicsRequest) (*sarama.CreateTopicsResponse, error) Open(conf *sarama.Config) error }
Broker is an interface for the sarama broker
type Codec ¶
type Codec interface { Encode(value interface{}) (data []byte, err error) Decode(data []byte) (value interface{}, err error) }
Codec decodes and encodes from and to []byte
type ConsumerGroupBuilder ¶
type ConsumerGroupBuilder func(brokers []string, group, clientID string) (sarama.ConsumerGroup, error)
ConsumerGroupBuilder creates a `sarama.ConsumerGroup`
func ConsumerGroupBuilderWithConfig ¶
func ConsumerGroupBuilderWithConfig(config *sarama.Config) ConsumerGroupBuilder
ConsumerGroupBuilderWithConfig creates a sarama consumergroup using passed config
type Context ¶
type Context interface { // Topic returns the topic of input message. Topic() Stream // Key returns the key of the input message. Key() string // Partition returns the partition of the input message. Partition() int32 // Offset returns the offset of the input message. Offset() int64 // Group returns the group of the input message Group() Group // Value returns the value of the key in the group table. // // This method might panic to initiate an immediate shutdown of the processor // to maintain data integrity. Do not recover from that panic or // the processor might deadlock. Value() interface{} // Headers returns the headers of the input message Headers() Headers // SetValue updates the value of the key in the group table. // It stores the value in the local cache and sends the // update to the Kafka topic representing the group table. // // This method might panic to initiate an immediate shutdown of the processor // to maintain data integrity. Do not recover from that panic or // the processor might deadlock. SetValue(value interface{}, options ...ContextOption) // Delete deletes a value from the group table. IMPORTANT: this deletes the // value associated with the key from both the local cache and the persisted // table in Kafka. // // This method might panic to initiate an immediate shutdown of the processor // to maintain data integrity. Do not recover from that panic or // the processor might deadlock. Delete(options ...ContextOption) // Timestamp returns the timestamp of the input message. If the timestamp is // invalid, a zero time will be returned. Timestamp() time.Time // Join returns the value of key in the copartitioned table. // // This method might panic to initiate an immediate shutdown of the processor // to maintain data integrity. Do not recover from that panic or // the processor might deadlock. Join(topic Table) interface{} // Lookup returns the value of key in the view of table. // // This method might panic to initiate an immediate shutdown of the processor // to maintain data integrity. Do not recover from that panic or // the processor might deadlock. Lookup(topic Table, key string) interface{} // Emit asynchronously writes a message into a topic. // // This method might panic to initiate an immediate shutdown of the processor // to maintain data integrity. Do not recover from that panic or // the processor might deadlock. Emit(topic Stream, key string, value interface{}, options ...ContextOption) // Loopback asynchronously sends a message to another key of the group // table. Value passed to loopback is encoded via the codec given in the // Loop subscription. // // This method might panic to initiate an immediate shutdown of the processor // to maintain data integrity. Do not recover from that panic or // the processor might deadlock. Loopback(key string, value interface{}, options ...ContextOption) // Fail stops execution and shuts down the processor // The callback is stopped immediately by panicking. Do not recover from that panic or // the processor might deadlock. Fail(err error) // Context returns the underlying context used to start the processor or a // subcontext. Returned context.Context can safely be passed to asynchronous code and goroutines. Context() context.Context // DeferCommit makes the callback omit the final commit when the callback returns. // It returns a function that *must* be called eventually to mark the message processing as finished. // If the function is not called, the processor might reprocess the message in future. // Note when calling DeferCommit multiple times, all returned functions must be called. // *Important*: the context where `DeferCommit` is called, is only safe to use within this callback, // never pass it into asynchronous code or goroutines. DeferCommit() func(error) }
Context provides access to the processor's table and emit capabilities to arbitrary topics in kafka. Upon arrival of a message from subscribed topics, the respective ConsumeCallback is invoked with a context object along with the input message. The context is only valid within the callback, do not store it or pass it to other goroutines.
Error handling ¶
Most methods of the context can fail due to different reasons, which are handled in different ways: Synchronous errors like * wrong codec for topic (a message cannot be marshalled or unmarshalled) * Emit to a topic without the Output definition in the group graph * Value/SetValue without defining Persist in the group graph * Join/Lookup without the definition in the group graph etc.. will result in a panic to stop the callback immediately and shutdown the processor. This is necessary to preserve integrity of the processor and avoid further actions. Do not recover from that panic, otherwise the goroutine will deadlock.
Retrying synchronous errors must be implemented by restarting the processor. If errors must be tolerated (which is not advisable because they're usually persistent), provide fail-tolerant versions of the producer, storage or codec as needed.
Asynchronous errors can occur when the callback has been finished, but e.g. sending a batched message to kafka fails due to connection errors or leader election in the cluster. Those errors still shutdown the processor but will not result in a panic in the callback.
type ContextOption ¶
type ContextOption func(*ctxOptions)
ContextOption defines a configuration option to be used when performing operations on a context
func WithCtxEmitHeaders ¶
func WithCtxEmitHeaders(headers Headers) ContextOption
WithCtxEmitHeaders sets kafka headers to use when emitting to kafka
type DefaultUpdateContext ¶
type DefaultUpdateContext struct {
// contains filtered or unexported fields
}
DefaultUpdateContext implements the UpdateContext interface.
func (DefaultUpdateContext) Headers ¶
func (ctx DefaultUpdateContext) Headers() Headers
Headers returns the headers of the input message.
func (DefaultUpdateContext) Offset ¶
func (ctx DefaultUpdateContext) Offset() int64
Offset returns the offset of the input message.
func (DefaultUpdateContext) Partition ¶
func (ctx DefaultUpdateContext) Partition() int32
Partition returns the partition of the input message.
func (DefaultUpdateContext) Topic ¶
func (ctx DefaultUpdateContext) Topic() Stream
Topic returns the topic of input message.
type Edge ¶
Edge represents a topic in Kafka and the corresponding codec to encode and decode the messages of that topic.
func Input ¶
func Input(topic Stream, c Codec, cb ProcessCallback) Edge
Input represents an edge of an input stream topic. The edge specifies the topic name, its codec and the ProcessorCallback used to process it. The topic has to be copartitioned with any other input stream of the group and with the group table. The group starts reading the topic from the newest offset.
func Inputs ¶
func Inputs(topics Streams, c Codec, cb ProcessCallback) Edge
Inputs creates edges of multiple input streams sharing the same codec and callback.
func Join ¶
Join represents an edge of a copartitioned, log-compacted table topic. The edge specifies the topic name and the codec of the messages of the topic. The group starts reading the topic from the oldest offset. The processing of input streams is blocked until all partitions of the table are recovered.
func Lookup ¶
Lookup represents an edge of a non-copartitioned, log-compacted table topic. The edge specifies the topic name and the codec of the messages of the topic. The group starts reading the topic from the oldest offset. The processing of input streams is blocked until the table is fully recovered.
func Loop ¶
func Loop(c Codec, cb ProcessCallback) Edge
Loop represents the edge of the loopback topic of the group. The edge specifies the codec of the messages in the topic and ProcesCallback to process the messages of the topic. Context.Loopback() is used to write messages into this topic from any callback of the group.
func Output ¶
Output represents an edge of an output stream topic. The edge specifies the topic name and the codec of the messages of the topic. Context.Emit() only emits messages into Output edges defined in the group graph. The topic does not have to be copartitioned with the input streams.
func Persist ¶
Persist represents the edge of the group table, which is log-compacted and copartitioned with the input streams. Without Persist, calls to ctx.Value or ctx.SetValue in the consume callback will fail and lead to shutdown of the processor.
This edge specifies the codec of the messages in the topic, ie, the codec of the values of the table. The processing of input streams is blocked until all partitions of the group table are recovered.
The topic name is derived from the group name by appending "-table".
func Visitor ¶
func Visitor(name string, cb ProcessCallback) Edge
Visitor adds a visitor edge to the processor. This allows to iterate over the whole processor state while running. Note that this can block rebalance or processor shutdown. EXPERIMENTAL! This feature is not fully tested and might trigger unknown bugs. Be careful!
type Emitter ¶
type Emitter struct {
// contains filtered or unexported fields
}
Emitter emits messages into a specific Kafka topic, first encoding the message with the given codec.
func NewEmitter ¶
func NewEmitter(brokers []string, topic Stream, codec Codec, options ...EmitterOption) (*Emitter, error)
NewEmitter creates a new emitter using passed brokers, topic, codec and possibly options.
func (*Emitter) EmitSyncWithHeaders ¶
EmitSyncWithHeaders sends a message with the given headers to passed topic and key.
func (*Emitter) EmitWithHeaders ¶
EmitWithHeaders sends a message with the given headers for the passed key using the emitter's codec.
type EmitterOption ¶
EmitterOption defines a configuration option to be used when creating an emitter.
func WithEmitterClientID ¶
func WithEmitterClientID(clientID string) EmitterOption
WithEmitterClientID defines the client ID used to identify with kafka.
func WithEmitterDefaultHeaders ¶
func WithEmitterDefaultHeaders(headers Headers) EmitterOption
WithEmitterDefaultHeaders configures the emitter with default headers which are included with every emit.
func WithEmitterHasher ¶
func WithEmitterHasher(hasher func() hash.Hash32) EmitterOption
WithEmitterHasher sets the hash function that assigns keys to partitions.
func WithEmitterLogger ¶
func WithEmitterLogger(l Logger) EmitterOption
WithEmitterLogger sets the logger the emitter should use. By default, emitters use the standard library logger.
func WithEmitterProducerBuilder ¶
func WithEmitterProducerBuilder(pb ProducerBuilder) EmitterOption
WithEmitterProducerBuilder replaces the default producer builder.
func WithEmitterTester ¶
func WithEmitterTester(t Tester) EmitterOption
WithEmitterTester configures the emitter to use passed tester. This is used for component tests
func WithEmitterTopicManagerBuilder ¶
func WithEmitterTopicManagerBuilder(tmb TopicManagerBuilder) EmitterOption
WithEmitterTopicManagerBuilder replaces the default topic manager builder.
type Getter ¶
Getter functions return a value for a key or an error. If no value exists for the key, nil is returned without errors.
type Group ¶
type Group string
Group is the name of a consumer group in Kafka and represents a processor group in Goka. A processor group may have a group table and a group loopback stream. By default, the group table is named <group>-table and the loopback stream <group>-loop.
type GroupGraph ¶
type GroupGraph struct {
// contains filtered or unexported fields
}
GroupGraph is the specification of a processor group. It contains all input, output, and any other topic from which and into which the processor group may consume or produce events. Each of these links to Kafka is called Edge.
func DefineGroup ¶
func DefineGroup(group Group, edges ...Edge) *GroupGraph
DefineGroup creates a group graph with a given group name and a list of edges.
func (*GroupGraph) AllEdges ¶
func (gg *GroupGraph) AllEdges() Edges
AllEdges returns a list of all edges for the group graph. This allows to modify a graph by cloning it's edges into a new one.
var existing Graph edges := existiting.AllEdges() // modify edges as required // recreate the modifiedg raph newGraph := DefineGroup(existing.Groug(), edges...)
func (*GroupGraph) GroupTable ¶
func (gg *GroupGraph) GroupTable() Edge
GroupTable returns the group table edge of the group.
func (*GroupGraph) InputStreams ¶
func (gg *GroupGraph) InputStreams() Edges
InputStreams returns all input stream edges of the group.
func (*GroupGraph) JointTables ¶
func (gg *GroupGraph) JointTables() Edges
JointTables retuns all joint table edges of the group.
func (*GroupGraph) LookupTables ¶
func (gg *GroupGraph) LookupTables() Edges
LookupTables retuns all lookup table edges of the group.
func (*GroupGraph) LoopStream ¶
func (gg *GroupGraph) LoopStream() Edge
LoopStream returns the loopback edge of the group.
func (*GroupGraph) OutputStreams ¶
func (gg *GroupGraph) OutputStreams() Edges
OutputStreams returns the output stream edges of the group.
func (*GroupGraph) Validate ¶
func (gg *GroupGraph) Validate() error
Validate validates the group graph and returns an error if invalid. Main validation checks are: - at most one loopback stream edge is allowed - at most one group table edge is allowed - at least one input stream is required - table and loopback topics cannot be used in any other edge.
type Headers ¶
Headers represents custom message headers with a convenient interface.
func HeadersFromSarama ¶
func HeadersFromSarama(saramaHeaders []*sarama.RecordHeader) Headers
HeadersFromSarama converts sarama headers to goka's type.
func (Headers) Merged ¶
Merged returns a new instance with all headers merged. Later keys override earlier ones. Handles a nil receiver and nil arguments without panics. If all headers are empty, nil is returned to allow using directly in emit functions.
func (Headers) ToSarama ¶
func (h Headers) ToSarama() []sarama.RecordHeader
ToSarama converts the headers to a slice of sarama.RecordHeader. If called on a nil receiver returns nil.
func (Headers) ToSaramaPtr ¶
func (h Headers) ToSaramaPtr() []*sarama.RecordHeader
ToSaramaPtr converts the headers to a slice of pointers to sarama.RecordHeader. If called on a nil receiver returns nil.
type InputStats ¶
type InputStats struct { Count uint Bytes int OffsetLag int64 LastOffset int64 Delay time.Duration }
InputStats represents the number of messages and the number of bytes consumed from a stream or table topic since the process started.
type Iterator ¶
type Iterator interface { // Next advances the iterator to the next KV-pair. Err should be called // after Next returns false to check whether the iteration finished // from exhaustion or was aborted due to an error. Next() bool // Err returns the error that stopped the iteration if any. Err() error // Return the key of the current item Key() string // Return the value of the current item // This value is already decoded with the view's codec (or nil, if it's nil) Value() (interface{}, error) // Release the iterator. After release, the iterator is not usable anymore Release() // Seek moves the iterator to the begining of a key-value pair sequence that // is greater or equal to the given key. It returns whether at least one of // such key-value pairs exist. If true is returned, Key/Value must be called // immediately to get the first item. Calling Next immediately after a successful // seek will effectively skip an item in the iterator. Seek(key string) bool }
Iterator allows one to iterate over the keys of a view.
type Logger ¶
type Logger interface { // Print will simply print the params Print(...interface{}) // Print will simply print the params Println(...interface{}) // Printf will be used for informational messages. These can be thought of // having an 'Info'-level in a structured logger. Printf(string, ...interface{}) }
Logger is the interface Goka and its subpackages use for logging.
type MockAutoConsumer ¶
type MockAutoConsumer struct {
// contains filtered or unexported fields
}
MockAutoConsumer implements sarama's Consumer interface for testing purposes. Before you can start consuming from this consumer, you have to register topic/partitions using ExpectConsumePartition, and set expectations on them.
func NewMockAutoConsumer ¶
func NewMockAutoConsumer(t *testing.T, config *sarama.Config) *MockAutoConsumer
NewMockAutoConsumer returns a new mock Consumer instance. The t argument should be the *testing.T instance of your test method. An error will be written to it if an expectation is violated. The config argument can be set to nil.
func (*MockAutoConsumer) Close ¶
func (c *MockAutoConsumer) Close() error
Close implements the Close method from the sarama.Consumer interface. It will close all registered PartitionConsumer instances.
func (*MockAutoConsumer) ConsumePartition ¶
func (c *MockAutoConsumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error)
ConsumePartition implements the ConsumePartition method from the sarama.Consumer interface. Before you can start consuming a partition, you have to set expectations on it using ExpectConsumePartition. You can only consume a partition once per consumer.
func (*MockAutoConsumer) ExpectConsumePartition ¶
func (c *MockAutoConsumer) ExpectConsumePartition(topic string, partition int32, offset int64) *MockAutoPartitionConsumer
ExpectConsumePartition will register a topic/partition, so you can set expectations on it. The registered PartitionConsumer will be returned, so you can set expectations on it using method chaining. Once a topic/partition is registered, you are expected to start consuming it using ConsumePartition. If that doesn't happen, an error will be written to the error reporter once the mock consumer is closed. It will also expect that the
func (*MockAutoConsumer) HighWaterMarks ¶
func (c *MockAutoConsumer) HighWaterMarks() map[string]map[int32]int64
HighWaterMarks returns a map of high watermarks for each topic/partition
func (*MockAutoConsumer) Partitions ¶
func (c *MockAutoConsumer) Partitions(topic string) ([]int32, error)
Partitions returns the list of partitions for the given topic, as registered with SetTopicMetadata
func (*MockAutoConsumer) Pause ¶
func (c *MockAutoConsumer) Pause(topicPartitions map[string][]int32)
func (*MockAutoConsumer) PauseAll ¶
func (c *MockAutoConsumer) PauseAll()
func (*MockAutoConsumer) Resume ¶
func (c *MockAutoConsumer) Resume(topicPartitions map[string][]int32)
func (*MockAutoConsumer) ResumeAll ¶
func (c *MockAutoConsumer) ResumeAll()
func (*MockAutoConsumer) SetTopicMetadata ¶
func (c *MockAutoConsumer) SetTopicMetadata(metadata map[string][]int32)
SetTopicMetadata sets the clusters topic/partition metadata, which will be returned by Topics() and Partitions().
func (*MockAutoConsumer) Topics ¶
func (c *MockAutoConsumer) Topics() ([]string, error)
Topics returns a list of topics, as registered with SetTopicMetadata
type MockAutoPartitionConsumer ¶
type MockAutoPartitionConsumer struct {
// contains filtered or unexported fields
}
MockAutoPartitionConsumer implements sarama's PartitionConsumer interface for testing purposes. It is returned by the mock Consumers ConsumePartitionMethod, but only if it is registered first using the Consumer's ExpectConsumePartition method. Before consuming the Errors and Messages channel, you should specify what values will be provided on these channels using YieldMessage and YieldError.
func (*MockAutoPartitionConsumer) AsyncClose ¶
func (pc *MockAutoPartitionConsumer) AsyncClose()
AsyncClose implements the AsyncClose method from the sarama.PartitionConsumer interface.
func (*MockAutoPartitionConsumer) Close ¶
func (pc *MockAutoPartitionConsumer) Close() error
Close implements the Close method from the sarama.PartitionConsumer interface. It will verify whether the partition consumer was actually started.
func (*MockAutoPartitionConsumer) Errors ¶
func (pc *MockAutoPartitionConsumer) Errors() <-chan *sarama.ConsumerError
Errors implements the Errors method from the sarama.PartitionConsumer interface.
func (*MockAutoPartitionConsumer) ExpectErrorsDrainedOnClose ¶
func (pc *MockAutoPartitionConsumer) ExpectErrorsDrainedOnClose()
ExpectErrorsDrainedOnClose sets an expectation on the partition consumer that the errors channel will be fully drained when Close is called. If this expectation is not met, an error is reported to the error reporter.
func (*MockAutoPartitionConsumer) ExpectMessagesDrainedOnClose ¶
func (pc *MockAutoPartitionConsumer) ExpectMessagesDrainedOnClose()
ExpectMessagesDrainedOnClose sets an expectation on the partition consumer that the messages channel will be fully drained when Close is called. If this expectation is not met, an error is reported to the error reporter.
func (*MockAutoPartitionConsumer) HighWaterMarkOffset ¶
func (pc *MockAutoPartitionConsumer) HighWaterMarkOffset() int64
HighWaterMarkOffset returns the highwatermark for the partition
func (*MockAutoPartitionConsumer) IsPaused ¶
func (pc *MockAutoPartitionConsumer) IsPaused() bool
IsPaused indicates if this partition consumer is paused or not
func (*MockAutoPartitionConsumer) Messages ¶
func (pc *MockAutoPartitionConsumer) Messages() <-chan *sarama.ConsumerMessage
Messages implements the Messages method from the sarama.PartitionConsumer interface.
func (*MockAutoPartitionConsumer) Pause ¶
func (pc *MockAutoPartitionConsumer) Pause()
Pause suspends fetching from this partition. Future calls to the broker will not return any records from these partition until it have been resumed using Resume(). Note that this method does not affect partition subscription. In particular, it does not cause a group rebalance when automatic assignment is used.
func (*MockAutoPartitionConsumer) Resume ¶
func (pc *MockAutoPartitionConsumer) Resume()
Resume resumes this partition which have been paused with Pause(). New calls to the broker will return records from these partitions if there are any to be fetched. If the partition was not previously paused, this method is a no-op.
func (*MockAutoPartitionConsumer) YieldError ¶
func (pc *MockAutoPartitionConsumer) YieldError(err error)
YieldError will yield an error on the Errors channel of this partition consumer when it is consumed. By default, the mock consumer will not verify whether this error was consumed from the Errors channel, because there are legitimate reasons for this not to happen. You can call ExpectErrorsDrainedOnClose so it will verify that the channel is empty on close.
func (*MockAutoPartitionConsumer) YieldMessage ¶
func (pc *MockAutoPartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage)
YieldMessage will yield a messages Messages channel of this partition consumer when it is consumed. By default, the mock consumer will not verify whether this message was consumed from the Messages channel, because there are legitimate reasons forthis not to happen. ou can call ExpectMessagesDrainedOnClose so it will verify that the channel is empty on close.
type MockBroker ¶
type MockBroker struct {
// contains filtered or unexported fields
}
MockBroker is a mock of Broker interface
func NewMockBroker ¶
func NewMockBroker(ctrl *gomock.Controller) *MockBroker
NewMockBroker creates a new mock instance
func (*MockBroker) Connected ¶
func (m *MockBroker) Connected() (bool, error)
Connected mocks base method
func (*MockBroker) CreateTopics ¶
func (m *MockBroker) CreateTopics(arg0 *sarama.CreateTopicsRequest) (*sarama.CreateTopicsResponse, error)
CreateTopics mocks base method
func (*MockBroker) EXPECT ¶
func (m *MockBroker) EXPECT() *MockBrokerMockRecorder
EXPECT returns an object that allows the caller to indicate expected use
type MockBrokerMockRecorder ¶
type MockBrokerMockRecorder struct {
// contains filtered or unexported fields
}
MockBrokerMockRecorder is the mock recorder for MockBroker
func (*MockBrokerMockRecorder) Addr ¶
func (mr *MockBrokerMockRecorder) Addr() *gomock.Call
Addr indicates an expected call of Addr
func (*MockBrokerMockRecorder) Connected ¶
func (mr *MockBrokerMockRecorder) Connected() *gomock.Call
Connected indicates an expected call of Connected
func (*MockBrokerMockRecorder) CreateTopics ¶
func (mr *MockBrokerMockRecorder) CreateTopics(arg0 interface{}) *gomock.Call
CreateTopics indicates an expected call of CreateTopics
func (*MockBrokerMockRecorder) Open ¶
func (mr *MockBrokerMockRecorder) Open(arg0 interface{}) *gomock.Call
Open indicates an expected call of Open
type MockClient ¶
type MockClient struct {
// contains filtered or unexported fields
}
MockClient is a mock of Client interface
func NewMockClient ¶
func NewMockClient(ctrl *gomock.Controller) *MockClient
NewMockClient creates a new mock instance
func (*MockClient) Broker ¶
func (m *MockClient) Broker(arg0 int32) (*sarama.Broker, error)
Broker mocks base method
func (*MockClient) Brokers ¶
func (m *MockClient) Brokers() []*sarama.Broker
Brokers mocks base method
func (*MockClient) Controller ¶
func (m *MockClient) Controller() (*sarama.Broker, error)
Controller mocks base method
func (*MockClient) Coordinator ¶
func (m *MockClient) Coordinator(arg0 string) (*sarama.Broker, error)
Coordinator mocks base method
func (*MockClient) EXPECT ¶
func (m *MockClient) EXPECT() *MockClientMockRecorder
EXPECT returns an object that allows the caller to indicate expected use
func (*MockClient) InSyncReplicas ¶
func (m *MockClient) InSyncReplicas(arg0 string, arg1 int32) ([]int32, error)
InSyncReplicas mocks base method
func (*MockClient) InitProducerID ¶
func (m *MockClient) InitProducerID() (*sarama.InitProducerIDResponse, error)
InitProducerID mocks base method
func (*MockClient) OfflineReplicas ¶
func (m *MockClient) OfflineReplicas(arg0 string, arg1 int32) ([]int32, error)
OfflineReplicas mocks base method
func (*MockClient) Partitions ¶
func (m *MockClient) Partitions(arg0 string) ([]int32, error)
Partitions mocks base method
func (*MockClient) RefreshBrokers ¶
func (m *MockClient) RefreshBrokers(arg0 []string) error
RefreshBrokers mocks base method
func (*MockClient) RefreshController ¶
func (m *MockClient) RefreshController() (*sarama.Broker, error)
RefreshController mocks base method
func (*MockClient) RefreshCoordinator ¶
func (m *MockClient) RefreshCoordinator(arg0 string) error
RefreshCoordinator mocks base method
func (*MockClient) RefreshMetadata ¶
func (m *MockClient) RefreshMetadata(arg0 ...string) error
RefreshMetadata mocks base method
func (*MockClient) Replicas ¶
func (m *MockClient) Replicas(arg0 string, arg1 int32) ([]int32, error)
Replicas mocks base method
func (*MockClient) Topics ¶
func (m *MockClient) Topics() ([]string, error)
Topics mocks base method
func (*MockClient) WritablePartitions ¶
func (m *MockClient) WritablePartitions(arg0 string) ([]int32, error)
WritablePartitions mocks base method
type MockClientMockRecorder ¶
type MockClientMockRecorder struct {
// contains filtered or unexported fields
}
MockClientMockRecorder is the mock recorder for MockClient
func (*MockClientMockRecorder) Broker ¶
func (mr *MockClientMockRecorder) Broker(arg0 interface{}) *gomock.Call
Broker indicates an expected call of Broker
func (*MockClientMockRecorder) Brokers ¶
func (mr *MockClientMockRecorder) Brokers() *gomock.Call
Brokers indicates an expected call of Brokers
func (*MockClientMockRecorder) Close ¶
func (mr *MockClientMockRecorder) Close() *gomock.Call
Close indicates an expected call of Close
func (*MockClientMockRecorder) Closed ¶
func (mr *MockClientMockRecorder) Closed() *gomock.Call
Closed indicates an expected call of Closed
func (*MockClientMockRecorder) Config ¶
func (mr *MockClientMockRecorder) Config() *gomock.Call
Config indicates an expected call of Config
func (*MockClientMockRecorder) Controller ¶
func (mr *MockClientMockRecorder) Controller() *gomock.Call
Controller indicates an expected call of Controller
func (*MockClientMockRecorder) Coordinator ¶
func (mr *MockClientMockRecorder) Coordinator(arg0 interface{}) *gomock.Call
Coordinator indicates an expected call of Coordinator
func (*MockClientMockRecorder) GetOffset ¶
func (mr *MockClientMockRecorder) GetOffset(arg0, arg1, arg2 interface{}) *gomock.Call
GetOffset indicates an expected call of GetOffset
func (*MockClientMockRecorder) InSyncReplicas ¶
func (mr *MockClientMockRecorder) InSyncReplicas(arg0, arg1 interface{}) *gomock.Call
InSyncReplicas indicates an expected call of InSyncReplicas
func (*MockClientMockRecorder) InitProducerID ¶
func (mr *MockClientMockRecorder) InitProducerID() *gomock.Call
InitProducerID indicates an expected call of InitProducerID
func (*MockClientMockRecorder) Leader ¶
func (mr *MockClientMockRecorder) Leader(arg0, arg1 interface{}) *gomock.Call
Leader indicates an expected call of Leader
func (*MockClientMockRecorder) OfflineReplicas ¶
func (mr *MockClientMockRecorder) OfflineReplicas(arg0, arg1 interface{}) *gomock.Call
OfflineReplicas indicates an expected call of OfflineReplicas
func (*MockClientMockRecorder) Partitions ¶
func (mr *MockClientMockRecorder) Partitions(arg0 interface{}) *gomock.Call
Partitions indicates an expected call of Partitions
func (*MockClientMockRecorder) RefreshBrokers ¶
func (mr *MockClientMockRecorder) RefreshBrokers(arg0 interface{}) *gomock.Call
RefreshBrokers indicates an expected call of RefreshBrokers
func (*MockClientMockRecorder) RefreshController ¶
func (mr *MockClientMockRecorder) RefreshController() *gomock.Call
RefreshController indicates an expected call of RefreshController
func (*MockClientMockRecorder) RefreshCoordinator ¶
func (mr *MockClientMockRecorder) RefreshCoordinator(arg0 interface{}) *gomock.Call
RefreshCoordinator indicates an expected call of RefreshCoordinator
func (*MockClientMockRecorder) RefreshMetadata ¶
func (mr *MockClientMockRecorder) RefreshMetadata(arg0 ...interface{}) *gomock.Call
RefreshMetadata indicates an expected call of RefreshMetadata
func (*MockClientMockRecorder) Replicas ¶
func (mr *MockClientMockRecorder) Replicas(arg0, arg1 interface{}) *gomock.Call
Replicas indicates an expected call of Replicas
func (*MockClientMockRecorder) Topics ¶
func (mr *MockClientMockRecorder) Topics() *gomock.Call
Topics indicates an expected call of Topics
func (*MockClientMockRecorder) WritablePartitions ¶
func (mr *MockClientMockRecorder) WritablePartitions(arg0 interface{}) *gomock.Call
WritablePartitions indicates an expected call of WritablePartitions
type MockClusterAdmin ¶
type MockClusterAdmin struct {
// contains filtered or unexported fields
}
MockClusterAdmin is a mock of ClusterAdmin interface
func NewMockClusterAdmin ¶
func NewMockClusterAdmin(ctrl *gomock.Controller) *MockClusterAdmin
NewMockClusterAdmin creates a new mock instance
func (*MockClusterAdmin) AlterClientQuotas ¶
func (m *MockClusterAdmin) AlterClientQuotas(arg0 []sarama.QuotaEntityComponent, arg1 sarama.ClientQuotasOp, arg2 bool) error
AlterClientQuotas mocks base method
func (*MockClusterAdmin) AlterConfig ¶
func (m *MockClusterAdmin) AlterConfig(arg0 sarama.ConfigResourceType, arg1 string, arg2 map[string]*string, arg3 bool) error
AlterConfig mocks base method
func (*MockClusterAdmin) AlterPartitionReassignments ¶
func (m *MockClusterAdmin) AlterPartitionReassignments(arg0 string, arg1 [][]int32) error
AlterPartitionReassignments mocks base method
func (*MockClusterAdmin) Controller ¶
func (m *MockClusterAdmin) Controller() (*sarama.Broker, error)
Controller mocks base method
func (*MockClusterAdmin) CreateACLs ¶
func (m *MockClusterAdmin) CreateACLs(acls []*sarama.ResourceAcls) error
CreateACLs is a mock function creating access control lists (ACLs) which are bound to specific resources.
func (*MockClusterAdmin) CreatePartitions ¶
func (m *MockClusterAdmin) CreatePartitions(arg0 string, arg1 int32, arg2 [][]int32, arg3 bool) error
CreatePartitions mocks base method
func (*MockClusterAdmin) CreateTopic ¶
func (m *MockClusterAdmin) CreateTopic(arg0 string, arg1 *sarama.TopicDetail, arg2 bool) error
CreateTopic mocks base method
func (*MockClusterAdmin) DeleteACL ¶
func (m *MockClusterAdmin) DeleteACL(arg0 sarama.AclFilter, arg1 bool) ([]sarama.MatchingAcl, error)
DeleteACL mocks base method
func (*MockClusterAdmin) DeleteConsumerGroup ¶
func (m *MockClusterAdmin) DeleteConsumerGroup(arg0 string) error
DeleteConsumerGroup mocks base method
func (*MockClusterAdmin) DeleteConsumerGroupOffset ¶
func (m *MockClusterAdmin) DeleteConsumerGroupOffset(arg0, arg1 string, arg2 int32) error
DeleteConsumerGroupOffset mocks base method
func (*MockClusterAdmin) DeleteRecords ¶
func (m *MockClusterAdmin) DeleteRecords(arg0 string, arg1 map[int32]int64) error
DeleteRecords mocks base method
func (*MockClusterAdmin) DeleteTopic ¶
func (m *MockClusterAdmin) DeleteTopic(arg0 string) error
DeleteTopic mocks base method
func (*MockClusterAdmin) DeleteUserScramCredentials ¶
func (m *MockClusterAdmin) DeleteUserScramCredentials(arg0 []sarama.AlterUserScramCredentialsDelete) ([]*sarama.AlterUserScramCredentialsResult, error)
DeleteUserScramCredentials mocks base method
func (*MockClusterAdmin) DescribeClientQuotas ¶
func (m *MockClusterAdmin) DescribeClientQuotas(arg0 []sarama.QuotaFilterComponent, arg1 bool) ([]sarama.DescribeClientQuotasEntry, error)
DescribeClientQuotas mocks base method
func (*MockClusterAdmin) DescribeCluster ¶
func (m *MockClusterAdmin) DescribeCluster() ([]*sarama.Broker, int32, error)
DescribeCluster mocks base method
func (*MockClusterAdmin) DescribeConfig ¶
func (m *MockClusterAdmin) DescribeConfig(arg0 sarama.ConfigResource) ([]sarama.ConfigEntry, error)
DescribeConfig mocks base method
func (*MockClusterAdmin) DescribeConsumerGroups ¶
func (m *MockClusterAdmin) DescribeConsumerGroups(arg0 []string) ([]*sarama.GroupDescription, error)
DescribeConsumerGroups mocks base method
func (*MockClusterAdmin) DescribeLogDirs ¶
func (m *MockClusterAdmin) DescribeLogDirs(arg0 []int32) (map[int32][]sarama.DescribeLogDirsResponseDirMetadata, error)
DescribeLogDirs mocks base method
func (*MockClusterAdmin) DescribeTopics ¶
func (m *MockClusterAdmin) DescribeTopics(arg0 []string) ([]*sarama.TopicMetadata, error)
DescribeTopics mocks base method
func (*MockClusterAdmin) DescribeUserScramCredentials ¶
func (m *MockClusterAdmin) DescribeUserScramCredentials(arg0 []string) ([]*sarama.DescribeUserScramCredentialsResult, error)
DescribeUserScramCredentials mocks base method
func (*MockClusterAdmin) EXPECT ¶
func (m *MockClusterAdmin) EXPECT() *MockClusterAdminMockRecorder
EXPECT returns an object that allows the caller to indicate expected use
func (*MockClusterAdmin) IncrementalAlterConfig ¶
func (m *MockClusterAdmin) IncrementalAlterConfig(arg0 sarama.ConfigResourceType, arg1 string, arg2 map[string]sarama.IncrementalAlterConfigsEntry, arg3 bool) error
IncrementalAlterConfig mocks base method
func (*MockClusterAdmin) ListAcls ¶
func (m *MockClusterAdmin) ListAcls(arg0 sarama.AclFilter) ([]sarama.ResourceAcls, error)
ListAcls mocks base method
func (*MockClusterAdmin) ListConsumerGroupOffsets ¶
func (m *MockClusterAdmin) ListConsumerGroupOffsets(arg0 string, arg1 map[string][]int32) (*sarama.OffsetFetchResponse, error)
ListConsumerGroupOffsets mocks base method
func (*MockClusterAdmin) ListConsumerGroups ¶
func (m *MockClusterAdmin) ListConsumerGroups() (map[string]string, error)
ListConsumerGroups mocks base method
func (*MockClusterAdmin) ListPartitionReassignments ¶
func (m *MockClusterAdmin) ListPartitionReassignments(arg0 string, arg1 []int32) (map[string]map[int32]*sarama.PartitionReplicaReassignmentsStatus, error)
ListPartitionReassignments mocks base method
func (*MockClusterAdmin) ListTopics ¶
func (m *MockClusterAdmin) ListTopics() (map[string]sarama.TopicDetail, error)
ListTopics mocks base method
func (*MockClusterAdmin) UpsertUserScramCredentials ¶
func (m *MockClusterAdmin) UpsertUserScramCredentials(arg0 []sarama.AlterUserScramCredentialsUpsert) ([]*sarama.AlterUserScramCredentialsResult, error)
UpsertUserScramCredentials mocks base method
type MockClusterAdminMockRecorder ¶
type MockClusterAdminMockRecorder struct {
// contains filtered or unexported fields
}
MockClusterAdminMockRecorder is the mock recorder for MockClusterAdmin
func (*MockClusterAdminMockRecorder) AlterClientQuotas ¶
func (mr *MockClusterAdminMockRecorder) AlterClientQuotas(arg0, arg1, arg2 interface{}) *gomock.Call
AlterClientQuotas indicates an expected call of AlterClientQuotas
func (*MockClusterAdminMockRecorder) AlterConfig ¶
func (mr *MockClusterAdminMockRecorder) AlterConfig(arg0, arg1, arg2, arg3 interface{}) *gomock.Call
AlterConfig indicates an expected call of AlterConfig
func (*MockClusterAdminMockRecorder) AlterPartitionReassignments ¶
func (mr *MockClusterAdminMockRecorder) AlterPartitionReassignments(arg0, arg1 interface{}) *gomock.Call
AlterPartitionReassignments indicates an expected call of AlterPartitionReassignments
func (*MockClusterAdminMockRecorder) Close ¶
func (mr *MockClusterAdminMockRecorder) Close() *gomock.Call
Close indicates an expected call of Close
func (*MockClusterAdminMockRecorder) Controller ¶
func (mr *MockClusterAdminMockRecorder) Controller() *gomock.Call
Controller indicates an expected call of Controller
func (*MockClusterAdminMockRecorder) CreateACL ¶
func (mr *MockClusterAdminMockRecorder) CreateACL(arg0, arg1 interface{}) *gomock.Call
CreateACL indicates an expected call of CreateACL
func (*MockClusterAdminMockRecorder) CreatePartitions ¶
func (mr *MockClusterAdminMockRecorder) CreatePartitions(arg0, arg1, arg2, arg3 interface{}) *gomock.Call
CreatePartitions indicates an expected call of CreatePartitions
func (*MockClusterAdminMockRecorder) CreateTopic ¶
func (mr *MockClusterAdminMockRecorder) CreateTopic(arg0, arg1, arg2 interface{}) *gomock.Call
CreateTopic indicates an expected call of CreateTopic
func (*MockClusterAdminMockRecorder) DeleteACL ¶
func (mr *MockClusterAdminMockRecorder) DeleteACL(arg0, arg1 interface{}) *gomock.Call
DeleteACL indicates an expected call of DeleteACL
func (*MockClusterAdminMockRecorder) DeleteConsumerGroup ¶
func (mr *MockClusterAdminMockRecorder) DeleteConsumerGroup(arg0 interface{}) *gomock.Call
DeleteConsumerGroup indicates an expected call of DeleteConsumerGroup
func (*MockClusterAdminMockRecorder) DeleteConsumerGroupOffset ¶
func (mr *MockClusterAdminMockRecorder) DeleteConsumerGroupOffset(arg0, arg1, arg2 interface{}) *gomock.Call
DeleteConsumerGroupOffset indicates an expected call of DeleteConsumerGroupOffset
func (*MockClusterAdminMockRecorder) DeleteRecords ¶
func (mr *MockClusterAdminMockRecorder) DeleteRecords(arg0, arg1 interface{}) *gomock.Call
DeleteRecords indicates an expected call of DeleteRecords
func (*MockClusterAdminMockRecorder) DeleteTopic ¶
func (mr *MockClusterAdminMockRecorder) DeleteTopic(arg0 interface{}) *gomock.Call
DeleteTopic indicates an expected call of DeleteTopic
func (*MockClusterAdminMockRecorder) DeleteUserScramCredentials ¶
func (mr *MockClusterAdminMockRecorder) DeleteUserScramCredentials(arg0 interface{}) *gomock.Call
DeleteUserScramCredentials indicates an expected call of DeleteUserScramCredentials
func (*MockClusterAdminMockRecorder) DescribeClientQuotas ¶
func (mr *MockClusterAdminMockRecorder) DescribeClientQuotas(arg0, arg1 interface{}) *gomock.Call
DescribeClientQuotas indicates an expected call of DescribeClientQuotas
func (*MockClusterAdminMockRecorder) DescribeCluster ¶
func (mr *MockClusterAdminMockRecorder) DescribeCluster() *gomock.Call
DescribeCluster indicates an expected call of DescribeCluster
func (*MockClusterAdminMockRecorder) DescribeConfig ¶
func (mr *MockClusterAdminMockRecorder) DescribeConfig(arg0 interface{}) *gomock.Call
DescribeConfig indicates an expected call of DescribeConfig
func (*MockClusterAdminMockRecorder) DescribeConsumerGroups ¶
func (mr *MockClusterAdminMockRecorder) DescribeConsumerGroups(arg0 interface{}) *gomock.Call
DescribeConsumerGroups indicates an expected call of DescribeConsumerGroups
func (*MockClusterAdminMockRecorder) DescribeLogDirs ¶
func (mr *MockClusterAdminMockRecorder) DescribeLogDirs(arg0 interface{}) *gomock.Call
DescribeLogDirs indicates an expected call of DescribeLogDirs
func (*MockClusterAdminMockRecorder) DescribeTopics ¶
func (mr *MockClusterAdminMockRecorder) DescribeTopics(arg0 interface{}) *gomock.Call
DescribeTopics indicates an expected call of DescribeTopics
func (*MockClusterAdminMockRecorder) DescribeUserScramCredentials ¶
func (mr *MockClusterAdminMockRecorder) DescribeUserScramCredentials(arg0 interface{}) *gomock.Call
DescribeUserScramCredentials indicates an expected call of DescribeUserScramCredentials
func (*MockClusterAdminMockRecorder) IncrementalAlterConfig ¶
func (mr *MockClusterAdminMockRecorder) IncrementalAlterConfig(arg0, arg1, arg2, arg3 interface{}) *gomock.Call
IncrementalAlterConfig indicates an expected call of IncrementalAlterConfig
func (*MockClusterAdminMockRecorder) ListAcls ¶
func (mr *MockClusterAdminMockRecorder) ListAcls(arg0 interface{}) *gomock.Call
ListAcls indicates an expected call of ListAcls
func (*MockClusterAdminMockRecorder) ListConsumerGroupOffsets ¶
func (mr *MockClusterAdminMockRecorder) ListConsumerGroupOffsets(arg0, arg1 interface{}) *gomock.Call
ListConsumerGroupOffsets indicates an expected call of ListConsumerGroupOffsets
func (*MockClusterAdminMockRecorder) ListConsumerGroups ¶
func (mr *MockClusterAdminMockRecorder) ListConsumerGroups() *gomock.Call
ListConsumerGroups indicates an expected call of ListConsumerGroups
func (*MockClusterAdminMockRecorder) ListPartitionReassignments ¶
func (mr *MockClusterAdminMockRecorder) ListPartitionReassignments(arg0, arg1 interface{}) *gomock.Call
ListPartitionReassignments indicates an expected call of ListPartitionReassignments
func (*MockClusterAdminMockRecorder) ListTopics ¶
func (mr *MockClusterAdminMockRecorder) ListTopics() *gomock.Call
ListTopics indicates an expected call of ListTopics
func (*MockClusterAdminMockRecorder) UpsertUserScramCredentials ¶
func (mr *MockClusterAdminMockRecorder) UpsertUserScramCredentials(arg0 interface{}) *gomock.Call
UpsertUserScramCredentials indicates an expected call of UpsertUserScramCredentials
type MockConsumerGroup ¶
type MockConsumerGroup struct {
// contains filtered or unexported fields
}
MockConsumerGroup mocks the consumergroup
func NewMockConsumerGroup ¶
func NewMockConsumerGroup(t *testing.T) *MockConsumerGroup
NewMockConsumerGroup creates a new consumer group
func (*MockConsumerGroup) Close ¶
func (cg *MockConsumerGroup) Close() error
Close closes the consumergroup
func (*MockConsumerGroup) Consume ¶
func (cg *MockConsumerGroup) Consume(ctx context.Context, topics []string, handler sarama.ConsumerGroupHandler) error
Consume starts consuming from the consumergroup
func (*MockConsumerGroup) Errors ¶
func (cg *MockConsumerGroup) Errors() <-chan error
Errors returns the errors channel
func (*MockConsumerGroup) FailOnConsume ¶
func (cg *MockConsumerGroup) FailOnConsume(err error)
FailOnConsume marks the consumer to fail on consume
func (*MockConsumerGroup) Pause ¶
func (cg *MockConsumerGroup) Pause(partitions map[string][]int32)
func (*MockConsumerGroup) PauseAll ¶
func (cg *MockConsumerGroup) PauseAll()
func (*MockConsumerGroup) Resume ¶
func (cg *MockConsumerGroup) Resume(partitions map[string][]int32)
func (*MockConsumerGroup) ResumeAll ¶
func (cg *MockConsumerGroup) ResumeAll()
func (*MockConsumerGroup) SendError ¶
func (cg *MockConsumerGroup) SendError(err error)
SendError sends an error the consumergroup
func (*MockConsumerGroup) SendMessage ¶
func (cg *MockConsumerGroup) SendMessage(message *sarama.ConsumerMessage) <-chan struct{}
SendMessage sends a message to the consumergroup returns a channel that will be closed when the message has been committed by the group
func (*MockConsumerGroup) SendMessageWait ¶
func (cg *MockConsumerGroup) SendMessageWait(message *sarama.ConsumerMessage)
SendMessageWait sends a message to the consumergroup waiting for the message for being committed
type MockConsumerGroupClaim ¶
type MockConsumerGroupClaim struct {
// contains filtered or unexported fields
}
MockConsumerGroupClaim mocks the consumergroupclaim
func NewMockConsumerGroupClaim ¶
func NewMockConsumerGroupClaim(topic string, partition int32) *MockConsumerGroupClaim
NewMockConsumerGroupClaim creates a new mocksconsumergroupclaim
func (*MockConsumerGroupClaim) HighWaterMarkOffset ¶
func (cgc *MockConsumerGroupClaim) HighWaterMarkOffset() int64
HighWaterMarkOffset returns the hwm offset
func (*MockConsumerGroupClaim) InitialOffset ¶
func (cgc *MockConsumerGroupClaim) InitialOffset() int64
InitialOffset returns the initial offset
func (*MockConsumerGroupClaim) Messages ¶
func (cgc *MockConsumerGroupClaim) Messages() <-chan *sarama.ConsumerMessage
Messages returns the message channel that must be
func (*MockConsumerGroupClaim) Partition ¶
func (cgc *MockConsumerGroupClaim) Partition() int32
Partition returns the partition
func (*MockConsumerGroupClaim) Topic ¶
func (cgc *MockConsumerGroupClaim) Topic() string
Topic returns the current topic of the claim
type MockConsumerGroupSession ¶
type MockConsumerGroupSession struct {
// contains filtered or unexported fields
}
MockConsumerGroupSession mocks the consumer group session used for testing
func (*MockConsumerGroupSession) Claims ¶
func (cgs *MockConsumerGroupSession) Claims() map[string][]int32
Claims returns the number of partitions assigned in the group session for each topic
func (*MockConsumerGroupSession) Commit ¶
func (cgs *MockConsumerGroupSession) Commit()
Commit the offset to the backend
func (*MockConsumerGroupSession) Context ¶
func (cgs *MockConsumerGroupSession) Context() context.Context
Context returns the consumer group's context
func (*MockConsumerGroupSession) GenerationID ¶
func (cgs *MockConsumerGroupSession) GenerationID() int32
GenerationID returns the generation ID of the group consumer
func (*MockConsumerGroupSession) MarkMessage ¶
func (cgs *MockConsumerGroupSession) MarkMessage(msg *sarama.ConsumerMessage, metadata string)
MarkMessage marks the passed message as consumed
func (*MockConsumerGroupSession) MarkOffset ¶
func (cgs *MockConsumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string)
MarkOffset marks the passed offset consumed in topic/partition
func (*MockConsumerGroupSession) MemberID ¶
func (cgs *MockConsumerGroupSession) MemberID() string
MemberID returns the member ID TODO: clarify what that actually means and whether we need to mock that somehow
func (*MockConsumerGroupSession) ResetOffset ¶
func (cgs *MockConsumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string)
ResetOffset resets the offset to be consumed from
func (*MockConsumerGroupSession) SendMessage ¶
func (cgs *MockConsumerGroupSession) SendMessage(msg *sarama.ConsumerMessage)
SendMessage sends a message to the consumer
type MockProducer ¶
type MockProducer struct {
// contains filtered or unexported fields
}
MockProducer is a mock of Producer interface
func NewMockProducer ¶
func NewMockProducer(ctrl *gomock.Controller) *MockProducer
NewMockProducer creates a new mock instance
func (*MockProducer) EXPECT ¶
func (m *MockProducer) EXPECT() *MockProducerMockRecorder
EXPECT returns an object that allows the caller to indicate expected use
func (*MockProducer) Emit ¶
func (m *MockProducer) Emit(arg0, arg1 string, arg2 []byte) *Promise
Emit mocks base method
func (*MockProducer) EmitWithHeaders ¶
func (m *MockProducer) EmitWithHeaders(arg0, arg1 string, arg2 []byte, arg3 Headers) *Promise
EmitWithHeaders mocks base method
type MockProducerMockRecorder ¶
type MockProducerMockRecorder struct {
// contains filtered or unexported fields
}
MockProducerMockRecorder is the mock recorder for MockProducer
func (*MockProducerMockRecorder) Close ¶
func (mr *MockProducerMockRecorder) Close() *gomock.Call
Close indicates an expected call of Close
func (*MockProducerMockRecorder) Emit ¶
func (mr *MockProducerMockRecorder) Emit(arg0, arg1, arg2 interface{}) *gomock.Call
Emit indicates an expected call of Emit
func (*MockProducerMockRecorder) EmitWithHeaders ¶
func (mr *MockProducerMockRecorder) EmitWithHeaders(arg0, arg1, arg2, arg3 interface{}) *gomock.Call
EmitWithHeaders indicates an expected call of EmitWithHeaders
type MockStorage ¶
type MockStorage struct {
// contains filtered or unexported fields
}
MockStorage is a mock of Storage interface
func NewMockStorage ¶
func NewMockStorage(ctrl *gomock.Controller) *MockStorage
NewMockStorage creates a new mock instance
func (*MockStorage) Delete ¶
func (m *MockStorage) Delete(arg0 string) error
Delete mocks base method
func (*MockStorage) EXPECT ¶
func (m *MockStorage) EXPECT() *MockStorageMockRecorder
EXPECT returns an object that allows the caller to indicate expected use
func (*MockStorage) Get ¶
func (m *MockStorage) Get(arg0 string) ([]byte, error)
Get mocks base method
func (*MockStorage) GetOffset ¶
func (m *MockStorage) GetOffset(arg0 int64) (int64, error)
GetOffset mocks base method
func (*MockStorage) Has ¶
func (m *MockStorage) Has(arg0 string) (bool, error)
Has mocks base method
func (*MockStorage) Iterator ¶
func (m *MockStorage) Iterator() (storage.Iterator, error)
Iterator mocks base method
func (*MockStorage) IteratorWithRange ¶
func (m *MockStorage) IteratorWithRange(arg0, arg1 []byte) (storage.Iterator, error)
IteratorWithRange mocks base method
func (*MockStorage) MarkRecovered ¶
func (m *MockStorage) MarkRecovered() error
MarkRecovered mocks base method
func (*MockStorage) Set ¶
func (m *MockStorage) Set(arg0 string, arg1 []byte) error
Set mocks base method
func (*MockStorage) SetOffset ¶
func (m *MockStorage) SetOffset(arg0 int64) error
SetOffset mocks base method
type MockStorageMockRecorder ¶
type MockStorageMockRecorder struct {
// contains filtered or unexported fields
}
MockStorageMockRecorder is the mock recorder for MockStorage
func (*MockStorageMockRecorder) Close ¶
func (mr *MockStorageMockRecorder) Close() *gomock.Call
Close indicates an expected call of Close
func (*MockStorageMockRecorder) Delete ¶
func (mr *MockStorageMockRecorder) Delete(arg0 interface{}) *gomock.Call
Delete indicates an expected call of Delete
func (*MockStorageMockRecorder) Get ¶
func (mr *MockStorageMockRecorder) Get(arg0 interface{}) *gomock.Call
Get indicates an expected call of Get
func (*MockStorageMockRecorder) GetOffset ¶
func (mr *MockStorageMockRecorder) GetOffset(arg0 interface{}) *gomock.Call
GetOffset indicates an expected call of GetOffset
func (*MockStorageMockRecorder) Has ¶
func (mr *MockStorageMockRecorder) Has(arg0 interface{}) *gomock.Call
Has indicates an expected call of Has
func (*MockStorageMockRecorder) Iterator ¶
func (mr *MockStorageMockRecorder) Iterator() *gomock.Call
Iterator indicates an expected call of Iterator
func (*MockStorageMockRecorder) IteratorWithRange ¶
func (mr *MockStorageMockRecorder) IteratorWithRange(arg0, arg1 interface{}) *gomock.Call
IteratorWithRange indicates an expected call of IteratorWithRange
func (*MockStorageMockRecorder) MarkRecovered ¶
func (mr *MockStorageMockRecorder) MarkRecovered() *gomock.Call
MarkRecovered indicates an expected call of MarkRecovered
func (*MockStorageMockRecorder) Open ¶
func (mr *MockStorageMockRecorder) Open() *gomock.Call
Open indicates an expected call of Open
func (*MockStorageMockRecorder) Set ¶
func (mr *MockStorageMockRecorder) Set(arg0, arg1 interface{}) *gomock.Call
Set indicates an expected call of Set
func (*MockStorageMockRecorder) SetOffset ¶
func (mr *MockStorageMockRecorder) SetOffset(arg0 interface{}) *gomock.Call
SetOffset indicates an expected call of SetOffset
type MockTopicManager ¶
type MockTopicManager struct {
// contains filtered or unexported fields
}
MockTopicManager is a mock of TopicManager interface
func NewMockTopicManager ¶
func NewMockTopicManager(ctrl *gomock.Controller) *MockTopicManager
NewMockTopicManager creates a new mock instance
func (*MockTopicManager) EXPECT ¶
func (m *MockTopicManager) EXPECT() *MockTopicManagerMockRecorder
EXPECT returns an object that allows the caller to indicate expected use
func (*MockTopicManager) EnsureStreamExists ¶
func (m *MockTopicManager) EnsureStreamExists(arg0 string, arg1 int) error
EnsureStreamExists mocks base method
func (*MockTopicManager) EnsureTableExists ¶
func (m *MockTopicManager) EnsureTableExists(arg0 string, arg1 int) error
EnsureTableExists mocks base method
func (*MockTopicManager) EnsureTopicExists ¶
func (m *MockTopicManager) EnsureTopicExists(arg0 string, arg1, arg2 int, arg3 map[string]string) error
EnsureTopicExists mocks base method
func (*MockTopicManager) Partitions ¶
func (m *MockTopicManager) Partitions(arg0 string) ([]int32, error)
Partitions mocks base method
type MockTopicManagerMockRecorder ¶
type MockTopicManagerMockRecorder struct {
// contains filtered or unexported fields
}
MockTopicManagerMockRecorder is the mock recorder for MockTopicManager
func (*MockTopicManagerMockRecorder) Close ¶
func (mr *MockTopicManagerMockRecorder) Close() *gomock.Call
Close indicates an expected call of Close
func (*MockTopicManagerMockRecorder) EnsureStreamExists ¶
func (mr *MockTopicManagerMockRecorder) EnsureStreamExists(arg0, arg1 interface{}) *gomock.Call
EnsureStreamExists indicates an expected call of EnsureStreamExists
func (*MockTopicManagerMockRecorder) EnsureTableExists ¶
func (mr *MockTopicManagerMockRecorder) EnsureTableExists(arg0, arg1 interface{}) *gomock.Call
EnsureTableExists indicates an expected call of EnsureTableExists
func (*MockTopicManagerMockRecorder) EnsureTopicExists ¶
func (mr *MockTopicManagerMockRecorder) EnsureTopicExists(arg0, arg1, arg2, arg3 interface{}) *gomock.Call
EnsureTopicExists indicates an expected call of EnsureTopicExists
func (*MockTopicManagerMockRecorder) GetOffset ¶
func (mr *MockTopicManagerMockRecorder) GetOffset(arg0, arg1, arg2 interface{}) *gomock.Call
GetOffset indicates an expected call of GetOffset
func (*MockTopicManagerMockRecorder) Partitions ¶
func (mr *MockTopicManagerMockRecorder) Partitions(arg0 interface{}) *gomock.Call
Partitions indicates an expected call of Partitions
type NilHandling ¶
type NilHandling int
NilHandling defines how nil messages should be handled by the processor.
const ( // NilIgnore drops any message with nil value. NilIgnore NilHandling = 0 + iota // NilProcess passes the nil value to ProcessCallback. NilProcess // NilDecode passes the nil value to decoder before calling ProcessCallback. NilDecode )
type OutputStats ¶
OutputStats represents the number of messages and the number of bytes emitted into a stream or table since the process started.
type PPRunMode ¶
type PPRunMode int
PPRunMode configures how the partition processor participates as part of the processor
type PartitionProcStats ¶
type PartitionProcStats struct { Now time.Time TableStats *TableStats Joined map[string]*TableStats Input map[string]*InputStats Output map[string]*OutputStats }
PartitionProcStats represents metrics and measurements of a partition processor
type PartitionProcessor ¶
type PartitionProcessor struct {
// contains filtered or unexported fields
}
PartitionProcessor handles message processing of one partition by serializing messages from different input topics. It also handles joined tables as well as lookup views (managed by `Processor`).
func (*PartitionProcessor) Recovered ¶
func (pp *PartitionProcessor) Recovered() bool
Recovered returns whether the processor is running (i.e. all joins, lookups and the table is recovered and it's consuming messages)
func (*PartitionProcessor) Start ¶
func (pp *PartitionProcessor) Start(setupCtx, ctx context.Context) error
Start initializes the partition processor * recover the table * recover all join tables * run the join-tables in catchup mode * start the processor processing loop to receive messages This method takes two contexts, as it does two distinct phases:
- setting up the partition (loading table, joins etc.), after which it returns. This needs a separate context to allow terminatin the setup phase
- starting the message-processing-loop of the actual processor. This will keep running after `Start` returns, so it uses the second context.
func (*PartitionProcessor) Stop ¶
func (pp *PartitionProcessor) Stop() error
Stop stops the partition processor
func (*PartitionProcessor) VisitValues ¶
func (pp *PartitionProcessor) VisitValues(ctx context.Context, name string, meta interface{}, visited *int64) error
VisitValues iterates over all values in the table and calls the "visit"-callback for the passed name. Optional parameter value can be set, which will just be forwarded to the visitor-function the function returns after all items of the table have been added to the channel.
type PartitionStatus ¶
type PartitionStatus int
PartitionStatus is the status of the partition of a table (group table or joined table).
const ( // PartitionStopped indicates the partition stopped and should not be used anymore. PartitionStopped PartitionStatus = iota // PartitionInitializing indicates that the underlying storage is initializing (e.g. opening leveldb files), // and has not actually started working yet. PartitionInitializing // PartitionConnecting indicates the partition trying to (re-)connect to Kafka PartitionConnecting // PartitionRecovering indicates the partition is recovering and the storage // is writing updates in bulk-mode (if the storage implementation supports it). PartitionRecovering // PartitionPreparing indicates the end of the bulk-mode. Depending on the storage // implementation, the Preparing phase may take long because the storage compacts its logs. PartitionPreparing // PartitionRunning indicates the partition is recovered and processing updates // in normal operation. PartitionRunning )
type PartitionTable ¶
type PartitionTable struct {
// contains filtered or unexported fields
}
PartitionTable manages the usage of a table for one partition. It allows to setup and recover/catchup the table contents from kafka, allow updates via Get/Set/Delete accessors
func (*PartitionTable) CatchupForever ¶
func (p *PartitionTable) CatchupForever(ctx context.Context, restartOnError bool) error
CatchupForever starts catching the partition table forever (until the context is cancelled). Option restartOnError allows the view to stay open/intact even in case of consumer errors
func (*PartitionTable) Close ¶
func (p *PartitionTable) Close() error
Close closes the partition table
func (*PartitionTable) CurrentState ¶
func (p *PartitionTable) CurrentState() PartitionStatus
CurrentState returns the partition's current status
func (*PartitionTable) Delete ¶
func (p *PartitionTable) Delete(key string) error
Delete removes the passed key from the partition table by deleting from the underlying storage
func (*PartitionTable) Get ¶
func (p *PartitionTable) Get(key string) ([]byte, error)
Get returns the value for passed key
func (*PartitionTable) GetOffset ¶
func (p *PartitionTable) GetOffset(defValue int64) (int64, error)
GetOffset returns the magic offset value from storage
func (*PartitionTable) Has ¶
func (p *PartitionTable) Has(key string) (bool, error)
Has returns whether the storage contains passed key
func (*PartitionTable) IsRecovered ¶
func (p *PartitionTable) IsRecovered() bool
IsRecovered returns whether the partition table is recovered
func (*PartitionTable) Iterator ¶
func (p *PartitionTable) Iterator() (storage.Iterator, error)
Iterator returns an iterator on the table's storage. If the partition_table is not in a running state, it will return an error to prevent serving incomplete data
func (*PartitionTable) IteratorWithRange ¶
IteratorWithRange returns an iterator on the table's storage for passed range. If the partition_table is not in a running state, it will return an error to prevent serving incomplete data
func (*PartitionTable) RunStatsLoop ¶
func (p *PartitionTable) RunStatsLoop(ctx context.Context)
RunStatsLoop starts the handler for stats requests. This loop runs detached from the recover/catchup mechanism so clients can always request stats even if the partition table is not running (like a processor table after it's recovered).
func (*PartitionTable) Set ¶
func (p *PartitionTable) Set(key string, value []byte) error
Set sets a key value key in the partition table by modifying the underlying storage
func (*PartitionTable) SetOffset ¶
func (p *PartitionTable) SetOffset(value int64) error
SetOffset sets the magic offset value in storage
func (*PartitionTable) SetupAndRecover ¶
func (p *PartitionTable) SetupAndRecover(ctx context.Context, restartOnError bool) error
SetupAndRecover sets up the partition storage and recovers to HWM
func (*PartitionTable) TrackMessageWrite ¶
func (p *PartitionTable) TrackMessageWrite(ctx context.Context, length int)
TrackMessageWrite updates the write stats to passed length
func (*PartitionTable) WaitRecovered ¶
func (p *PartitionTable) WaitRecovered() <-chan struct{}
WaitRecovered returns a channel that closes when the partition table enters state `PartitionRunning`
type ProcessCallback ¶
type ProcessCallback func(ctx Context, msg interface{})
ProcessCallback function is called for every message received by the processor.
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
Processor is a set of stateful callback functions that, on the arrival of messages, modify the content of a table (the group table) and emit messages into other topics. Messages as well as rows in the group table are key-value pairs. A group is composed by multiple processor instances.
func NewProcessor ¶
func NewProcessor(brokers []string, gg *GroupGraph, options ...ProcessorOption) (*Processor, error)
NewProcessor creates a processor instance in a group given the address of Kafka brokers, the consumer group name, a list of subscriptions (topics, codecs, and callbacks), and series of options.
func (*Processor) Cleanup ¶
func (g *Processor) Cleanup(session sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited but before the offsets are committed for the very last time.
func (*Processor) ConsumeClaim ¶
func (g *Processor) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). Once the Messages() channel is closed, the Handler must finish its processing loop and exit.
func (*Processor) Get ¶
Get returns a read-only copy of a value from the group table if the respective partition is owned by the processor instace. Get can be called by multiple goroutines concurrently. Get can be only used with stateful processors (ie, when group table is enabled) and after Recovered returns true.
func (*Processor) Graph ¶
func (g *Processor) Graph() *GroupGraph
Graph returns the group graph of the processor.
func (*Processor) Recovered ¶
Recovered returns whether the processor is running, i.e. if the processor has recovered all lookups/joins/tables and is running
func (*Processor) Run ¶
Run starts the processor using passed context. The processor stops in case of errors or if the context is cancelled
func (*Processor) Setup ¶
func (g *Processor) Setup(session sarama.ConsumerGroupSession) error
Setup is run at the beginning of a new session, before ConsumeClaim.
func (*Processor) StateReader ¶
func (g *Processor) StateReader() StateReader
StateReader returns a read only interface of the processors state.
func (*Processor) Stats ¶
func (g *Processor) Stats() *ProcessorStats
Stats returns the aggregated stats for the processor including all partitions, tables, lookups and joins
func (*Processor) StatsWithContext ¶
func (g *Processor) StatsWithContext(ctx context.Context) *ProcessorStats
StatsWithContext returns stats for the processor, see #Processor.Stats()
func (*Processor) Stop ¶
func (g *Processor) Stop()
Stop stops the processor. This is semantically equivalent of closing the Context that was passed to Processor.Run(..). This method will return immediately, errors during running will be returned from Processor.Run(..)
func (*Processor) VisitAllWithStats ¶
func (g *Processor) VisitAllWithStats(ctx context.Context, name string, meta interface{}) (int64, error)
VisitAllWithStats visits all keys in parallel by passing the visit request to all partitions. The optional argument "meta" will be forwarded to the visit-function of each key of the table. The function returns when * all values have been visited or * the context is cancelled or * the processor rebalances/shuts down Return parameters: * number of visited items * error
func (*Processor) WaitForReady ¶
func (g *Processor) WaitForReady()
WaitForReady waits until the processor is ready to consume messages (or is actually consuming messages) i.e., it is done catching up all partition tables, joins and lookup tables
func (*Processor) WaitForReadyContext ¶
WaitForReadyContext is context aware option of WaitForReady. It either waits until the processor is ready or until context is canceled. If the return was caused by context it will return context reported error.
type ProcessorOption ¶
type ProcessorOption func(*poptions, *GroupGraph)
ProcessorOption defines a configuration option to be used when creating a processor.
func WithBackoffBuilder ¶
func WithBackoffBuilder(bb BackoffBuilder) ProcessorOption
WithBackoffBuilder replaced the default backoff.
func WithBackoffResetTimeout ¶
func WithBackoffResetTimeout(duration time.Duration) ProcessorOption
WithBackoffResetTimeout defines the timeout when the backoff will be reset.
func WithClientID ¶
func WithClientID(clientID string) ProcessorOption
WithClientID defines the client ID used to identify with Kafka.
func WithConsumerGroupBuilder ¶
func WithConsumerGroupBuilder(cgb ConsumerGroupBuilder) ProcessorOption
WithConsumerGroupBuilder replaces the default consumer group builder
func WithConsumerSaramaBuilder ¶
func WithConsumerSaramaBuilder(cgb SaramaConsumerBuilder) ProcessorOption
WithConsumerSaramaBuilder replaces the default consumer group builder
func WithGroupGraphHook ¶
func WithGroupGraphHook(hook func(gg *GroupGraph)) ProcessorOption
WithGroupGraphHook allows a function to obtain the group graph when a processor is started.
func WithHasher ¶
func WithHasher(hasher func() hash.Hash32) ProcessorOption
WithHasher sets the hash function that assigns keys to partitions.
func WithHotStandby ¶
func WithHotStandby() ProcessorOption
WithHotStandby configures the processor to keep partitions up to date which are not part of the current generation's assignment. This allows fast processor failover since all partitions are hot in other processor instances, but it requires more resources (in particular network and disk). If this option is used, the option `WithRecoverAhead` should also be added to avoid unnecessary delays.
func WithLogger ¶
func WithLogger(l Logger) ProcessorOption
WithLogger sets the logger the processor should use. By default, processors use the standard library logger.
func WithNilHandling ¶
func WithNilHandling(nh NilHandling) ProcessorOption
WithNilHandling configures how the processor should handle messages with nil value. By default the processor ignores nil messages.
func WithPartitionChannelSize ¶
func WithPartitionChannelSize(size int) ProcessorOption
WithPartitionChannelSize replaces the default partition channel size. This is mostly used for testing by setting it to 0 to have synchronous behavior of goka.
func WithProducerBuilder ¶
func WithProducerBuilder(pb ProducerBuilder) ProcessorOption
WithProducerBuilder replaces the default producer builder.
func WithProducerDefaultHeaders ¶
func WithProducerDefaultHeaders(hdr Headers) ProcessorOption
WithProducerDefaultHeaders configures the producer with default headers which are included with every emit.
func WithRebalanceCallback ¶
func WithRebalanceCallback(cb RebalanceCallback) ProcessorOption
WithRebalanceCallback sets the callback for when a new partition assignment is received. By default, this is an empty function.
func WithRecoverAhead ¶
func WithRecoverAhead() ProcessorOption
WithRecoverAhead configures the processor to recover joins and the processor table ahead of joining the group. This reduces the processing delay that occurs when adding new instances to groups with high-volume-joins/tables. If the processor does not use joins or a table, it does not have any effect.
func WithStorageBuilder ¶
func WithStorageBuilder(sb storage.Builder) ProcessorOption
WithStorageBuilder defines a builder for the storage of each partition.
func WithTester ¶
func WithTester(t Tester) ProcessorOption
WithTester configures all external connections of a processor, ie, storage, consumer and producer
func WithTopicManagerBuilder ¶
func WithTopicManagerBuilder(tmb TopicManagerBuilder) ProcessorOption
WithTopicManagerBuilder replaces the default topic manager builder.
func WithUpdateCallback ¶
func WithUpdateCallback(cb UpdateCallback) ProcessorOption
WithUpdateCallback defines the callback called upon recovering a message from the log.
type ProcessorStats ¶
type ProcessorStats struct { Group map[int32]*PartitionProcStats Lookup map[string]*ViewStats }
ProcessorStats represents the metrics of all partitions of the processor, including its group, joined tables and lookup tables.
type Producer ¶
type Producer interface { // Emit sends a message to topic. Emit(topic string, key string, value []byte) *Promise EmitWithHeaders(topic string, key string, value []byte, headers Headers) *Promise Close() error }
Producer abstracts the kafka producer
type ProducerBuilder ¶
type ProducerBuilder func(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error)
ProducerBuilder create a Kafka producer.
func ProducerBuilderWithConfig ¶
func ProducerBuilderWithConfig(config *sarama.Config) ProducerBuilder
ProducerBuilderWithConfig creates a Kafka consumer using the Sarama library.
type Promise ¶
Promise as in https://en.wikipedia.org/wiki/Futures_and_promises
func (*Promise) ThenWithMessage ¶
func (p *Promise) ThenWithMessage(callback func(msg *sarama.ProducerMessage, err error)) *Promise
ThenWithMessage chains a callback to the Promise
type PromiseFinisher ¶
type PromiseFinisher func(msg *sarama.ProducerMessage, err error) *Promise
PromiseFinisher finishes a promise
type RebalanceCallback ¶
type RebalanceCallback func(a Assignment)
RebalanceCallback is invoked when the processor receives a new partition assignment.
type RecoveryStats ¶
type RecoveryStats struct { StartTime time.Time RecoveryTime time.Time Offset int64 // last offset processed or recovered Hwm int64 // next offset to be written }
RecoveryStats groups statistics during recovery
type SaramaConsumerBuilder ¶
SaramaConsumerBuilder creates a `sarama.Consumer`
func SaramaConsumerBuilderWithConfig ¶
func SaramaConsumerBuilderWithConfig(config *sarama.Config) SaramaConsumerBuilder
SaramaConsumerBuilderWithConfig creates a sarama consumer using passed config
type Signal ¶
type Signal struct {
// contains filtered or unexported fields
}
Signal allows synchronization on a state, waiting for that state and checking the current state
func (*Signal) ObserveStateChange ¶
func (s *Signal) ObserveStateChange() *StateChangeObserver
ObserveStateChange returns a channel that receives state changes. Note that the caller must take care of consuming that channel, otherwise the Signal will block upon state changes.
func (*Signal) SetState ¶
SetState changes the state of the signal and notifies all goroutines waiting for the new state
func (*Signal) WaitForState ¶
WaitForState returns a channel that closes when the signal reaches passed state.
func (*Signal) WaitForStateMin ¶
WaitForStateMin returns a channel that will be closed, when the signal enters passed state or higher (states are ints, so we're just comparing ints here)
type State ¶
type State int
State types a state of the Signal
const ( // PPStateIdle marks the partition processor as idling (not started yet) PPStateIdle State = iota // PPStateRecovering indicates a recovering partition processor PPStateRecovering // PPStateRunning indicates a running partition processor PPStateRunning // PPStateStopping indicates a stopping partition processor PPStateStopping // PPStateStopped indicates a stopped partition processor PPStateStopped )
const ( // ProcStateIdle indicates an idling partition processor (not started yet) ProcStateIdle State = iota // ProcStateStarting indicates a starting partition processor, i.e. before rebalance ProcStateStarting // ProcStateSetup indicates a partition processor during setup of a rebalance round ProcStateSetup // ProcStateRunning indicates a running partition processor ProcStateRunning // ProcStateStopping indicates a stopping partition processor ProcStateStopping )
type StateChangeObserver ¶
type StateChangeObserver struct {
// contains filtered or unexported fields
}
StateChangeObserver wraps a channel that triggers when the signal's state changes
func (*StateChangeObserver) C ¶
func (s *StateChangeObserver) C() <-chan State
C returns the channel to observer state changes
func (*StateChangeObserver) Stop ¶
func (s *StateChangeObserver) Stop()
Stop stops the observer. Its update channel will be closed and
type StateReader ¶
type StateReader interface { State() State IsState(State) bool WaitForStateMin(state State) <-chan struct{} WaitForState(state State) <-chan struct{} ObserveStateChange() *StateChangeObserver }
StateReader is a read only abstraction of a Signal to expose the current state.
type Stream ¶
type Stream string
Stream is the name of an event stream topic in Kafka, ie, a topic with cleanup.policy=delete
type Streams ¶
type Streams []Stream
Streams is a slice of Stream names.
func StringsToStreams ¶
StringsToStreams is a simple cast/conversion functions that allows to pass a slice of strings as a slice of Stream (Streams) Avoids the boilerplate loop over the string array that would be necessary otherwise.
Example ¶
inputTopics := []string{ "input1", "input2", "input3", } // use it, e.g. in the Inputs-Edge in the group graph graph := DefineGroup("group", Inputs(StringsToStreams(inputTopics...), new(codec.String), func(ctx Context, msg interface{}) {}), ) _ = graph
Output:
type TMConfigMismatchBehavior ¶
type TMConfigMismatchBehavior int
TMConfigMismatchBehavior configures how configuration mismatches of a topic (replication, num partitions, compaction) should be treated
const ( // TMConfigMismatchBehaviorIgnore ignore wrong config values TMConfigMismatchBehaviorIgnore TMConfigMismatchBehavior = 0 // TMConfigMismatchBehaviorWarn warns if the topic is configured differently than requested TMConfigMismatchBehaviorWarn TMConfigMismatchBehavior = 1 // TMConfigMismatchBehaviorFail makes checking the topic fail, if the configuration different than requested TMConfigMismatchBehaviorFail TMConfigMismatchBehavior = 2 )
type Table ¶
type Table string
Table is the name of a table topic in Kafka, ie, a topic with cleanup.policy=compact
func GroupTable ¶
GroupTable returns the name of the group table of group.
type TableStats ¶
type TableStats struct { Stalled bool Status PartitionStatus RunMode PPRunMode Recovery *RecoveryStats Input *InputStats Writes *OutputStats }
TableStats represents stats for a table partition
type Tester ¶
type Tester interface { StorageBuilder() storage.Builder ProducerBuilder() ProducerBuilder ConsumerGroupBuilder() ConsumerGroupBuilder ConsumerBuilder() SaramaConsumerBuilder EmitterProducerBuilder() ProducerBuilder TopicManagerBuilder() TopicManagerBuilder RegisterGroupGraph(*GroupGraph) string RegisterEmitter(Stream, Codec) RegisterView(Table, Codec) string }
Tester interface to avoid import cycles when a processor needs to register to the tester.
type TopicManager ¶
type TopicManager interface { // EnsureTableExists checks that a table (log-compacted topic) exists, or create one if possible EnsureTableExists(topic string, npar int) error // EnsureStreamExists checks that a stream topic exists, or create one if possible EnsureStreamExists(topic string, npar int) error // EnsureTopicExists checks that a topic exists, or create one if possible, // enforcing the given configuration EnsureTopicExists(topic string, npar, rfactor int, config map[string]string) error // Partitions returns the number of partitions of a topic, that are assigned to the running // instance, i.e. it doesn't represent all partitions of a topic. Partitions(topic string) ([]int32, error) GetOffset(topic string, partitionID int32, time int64) (int64, error) // Close closes the topic manager Close() error }
TopicManager provides an interface to create/check topics and their partitions
func DefaultTopicManagerBuilder ¶
func DefaultTopicManagerBuilder(brokers []string) (TopicManager, error)
DefaultTopicManagerBuilder creates TopicManager using the Sarama library.
func NewTopicManager ¶
func NewTopicManager(brokers []string, saramaConfig *sarama.Config, topicManagerConfig *TopicManagerConfig) (TopicManager, error)
NewTopicManager creates a new topic manager using the sarama library
type TopicManagerBuilder ¶
type TopicManagerBuilder func(brokers []string) (TopicManager, error)
TopicManagerBuilder creates a TopicManager to check partition counts and create tables.
func TopicManagerBuilderWithConfig ¶
func TopicManagerBuilderWithConfig(config *sarama.Config, tmConfig *TopicManagerConfig) TopicManagerBuilder
TopicManagerBuilderWithConfig creates TopicManager using the Sarama library.
func TopicManagerBuilderWithTopicManagerConfig ¶
func TopicManagerBuilderWithTopicManagerConfig(tmConfig *TopicManagerConfig) TopicManagerBuilder
TopicManagerBuilderWithTopicManagerConfig creates TopicManager using the Sarama library.
type TopicManagerConfig ¶
type TopicManagerConfig struct { Logger logger Table struct { Replication int // CleanupPolicy allows to overwrite the default cleanup policy for streams. // Defaults to 'compact' if not set CleanupPolicy string } Stream struct { Replication int Retention time.Duration // CleanupPolicy allows to overwrite the default cleanup policy for streams. // Defaults to 'delete' if not set CleanupPolicy string } // CreateTopicTimeout timeout for the topic manager to wait for the topic being created. // Set to 0 to turn off checking topic creation. // Defaults to 10 seconds CreateTopicTimeout time.Duration // TMConfigMismatchBehavior configures how configuration mismatches of a topic (replication, num partitions, compaction) should be // treated MismatchBehavior TMConfigMismatchBehavior }
TopicManagerConfig contains options of to create tables and stream topics.
func NewTopicManagerConfig ¶
func NewTopicManagerConfig() *TopicManagerConfig
NewTopicManagerConfig provides a default configuration for auto-creation with replication factor of 2 and rentention time of 1 hour. Use this function rather than creating TopicManagerConfig from scratch to initialize the config with reasonable defaults
type UpdateCallback ¶
UpdateCallback is invoked upon arrival of a message for a table partition.
type UpdateContext ¶
type UpdateContext interface { // Topic returns the topic of input message. Topic() Stream // Partition returns the partition of the input message. Partition() int32 // Offset returns the offset of the input message. Offset() int64 // Headers returns the headers of the input message. // // It is recommended to lazily evaluate the headers to reduce overhead per message // when headers are not used. Headers() Headers }
UpdateContext defines the interface for UpdateCallback arguments.
type View ¶
type View struct {
// contains filtered or unexported fields
}
View is a materialized (i.e. persistent) cache of a group table.
Example ¶
This example shows how views are typically created and used in the most basic way.
// create a new view view, err := NewView([]string{"localhost:9092"}, "input-topic", new(codec.String)) if err != nil { log.Fatalf("error creating view: %v", err) } // provide a cancelable ctx, cancel := context.WithCancel(context.Background()) defer cancel() // start the view done := make(chan struct{}) go func() { defer close(done) err := view.Run(ctx) if err != nil { log.Fatalf("Error running view: %v", err) } }() // wait for the view to be recovered // Option A: by polling for !view.Recovered() { select { case <-ctx.Done(): return case <-time.After(time.Second): } } // Option B: by waiting for the signal <-view.WaitRunning() // retrieve a value from the view val, err := view.Get("some-key") if err != nil { log.Fatalf("Error getting item from view: %v", err) } if val != nil { // cast it to string // no need for type assertion, if it was not that type, the codec would've failed log.Printf("got value %s", val.(string)) } has, err := view.Has("some-key") if err != nil { log.Fatalf("Error getting item from view: %v", err) } _ = has // stop the view and wait for it to shut down before returning cancel() <-done
Output:
Example (Autoreconnect) ¶
// create a new view view, err := NewView([]string{"localhost:9092"}, "input-topic", new(codec.String), // Automatically reconnect in case of errors. This is useful for services where availability // is more important than the data being up to date in case of kafka connection issues. WithViewAutoReconnect(), // Reconnect uses a default backoff mechanism, that can be modified by providing // a custom backoff builder using // WithViewBackoffBuilder(customBackoffBuilder), // When the view is running successfully for some time, the backoff is reset. // This time range can be modified using // WithViewBackoffResetTimeout(3*time.Second), ) if err != nil { log.Fatalf("error creating view: %v", err) } ctx, cancel := context.WithCancel(context.Background()) // start the view done := make(chan struct{}) go func() { defer close(done) err := view.Run(ctx) if err != nil { log.Fatalf("Error running view: %v", err) } }() <-view.WaitRunning() // at this point we can safely use the view with Has/Get/Iterate, // even if the kafka connection is lost // Stop the view and wait for it to shutdown before returning cancel() <-done
Output:
func (*View) CurrentState ¶
CurrentState returns the current ViewState of the view This is useful for polling e.g. when implementing health checks or metrics
func (*View) Evict ¶
Evict removes the given key only from the local cache. In order to delete a key from Kafka and other Views, context.Delete should be used on a Processor.
func (*View) Get ¶
Get returns the value for the key in the view, if exists. Nil if it doesn't. Get can be called by multiple goroutines concurrently. Get can only be called after Recovered returns true.
func (*View) IteratorWithRange ¶
IteratorWithRange returns an iterator that iterates over the state of the View. This iterator is build using the range.
func (*View) ObserveStateChanges ¶
func (v *View) ObserveStateChanges() *StateChangeObserver
ObserveStateChanges returns a StateChangeObserver that allows to handle state changes of the view by reading from a channel. It is crucial to continuously read from that channel, otherwise the View might deadlock upon state changes. If the observer is not needed, the caller must call observer.Stop()
Example
view := goka.NewView(...) go view.Run(ctx) go func(){ obs := view.ObserveStateChanges() defer obs.Stop() for { select{ case state, ok := <-obs.C: // handle state (or closed channel) case <-ctx.Done(): } } }()
func (*View) Run ¶
Run starts consuming the view's topic and saving updates in the local persistent cache.
The view will shutdown in case of errors or when the context is closed. It can be initialized with autoreconnect
view := NewView(..., WithViewAutoReconnect())
which makes the view internally reconnect in case of errors. Then it will only stop by canceling the context (see example).
func (*View) WaitRunning ¶
func (v *View) WaitRunning() <-chan struct{}
WaitRunning returns a channel that will be closed when the view enters the running state
type ViewOption ¶
ViewOption defines a configuration option to be used when creating a view.
func WithViewAutoReconnect ¶
func WithViewAutoReconnect() ViewOption
WithViewAutoReconnect defines the view is reconnecting internally, so Run() does not return in case of connection errors. The view must be shutdown by cancelling the context passed to Run()
func WithViewBackoffBuilder ¶
func WithViewBackoffBuilder(bb BackoffBuilder) ViewOption
WithViewBackoffBuilder replaced the default backoff.
func WithViewBackoffResetTimeout ¶
func WithViewBackoffResetTimeout(duration time.Duration) ViewOption
WithViewBackoffResetTimeout defines the timeout when the backoff will be reset.
func WithViewCallback ¶
func WithViewCallback(cb UpdateCallback) ViewOption
WithViewCallback defines the callback called upon recovering a message from the log.
func WithViewClientID ¶
func WithViewClientID(clientID string) ViewOption
WithViewClientID defines the client ID used to identify with Kafka.
func WithViewConsumerSaramaBuilder ¶
func WithViewConsumerSaramaBuilder(cgb SaramaConsumerBuilder) ViewOption
WithViewConsumerSaramaBuilder replaces the default sarama consumer builder
func WithViewHasher ¶
func WithViewHasher(hasher func() hash.Hash32) ViewOption
WithViewHasher sets the hash function that assigns keys to partitions.
func WithViewLogger ¶
func WithViewLogger(l Logger) ViewOption
WithViewLogger sets the logger the view should use. By default, views use the standard library logger.
func WithViewRestartable ¶
func WithViewRestartable() ViewOption
WithViewRestartable is kept only for backwards compatibility. DEPRECATED: since the behavior has changed, this name is misleading and should be replaced by WithViewAutoReconnect().
func WithViewStorageBuilder ¶
func WithViewStorageBuilder(sb storage.Builder) ViewOption
WithViewStorageBuilder defines a builder for the storage of each partition.
func WithViewTester ¶
func WithViewTester(t Tester) ViewOption
WithViewTester configures all external connections of a processor, ie, storage, consumer and producer
func WithViewTopicManagerBuilder ¶
func WithViewTopicManagerBuilder(tmb TopicManagerBuilder) ViewOption
WithViewTopicManagerBuilder replaces the default topic manager.
type ViewState ¶
type ViewState int
ViewState represents the state of the view
const ( // ViewStateIdle - the view is not started yet ViewStateIdle ViewState = iota // ViewStateInitializing - the view (i.e. at least one partition) is initializing ViewStateInitializing // ViewStateConnecting - the view (i.e. at least one partition) is (re-)connecting ViewStateConnecting // ViewStateCatchUp - the view (i.e. at least one partition) is still catching up ViewStateCatchUp // ViewStateRunning - the view (i.e. all partitions) has caught up and is running ViewStateRunning )
type ViewStats ¶
type ViewStats struct {
Partitions map[int32]*TableStats
}
ViewStats represents the metrics of all partitions of a view.
Source Files ¶
- assignment.go
- broker.go
- builders.go
- codec.go
- config.go
- context.go
- copartition_strategy.go
- doc.go
- emitter.go
- errors.go
- graph.go
- headers.go
- iterator.go
- logger.go
- mockautoconsumers.go
- mockbuilder.go
- mockcontroller.go
- mocks.go
- mockssarama.go
- mockstorage.go
- once.go
- options.go
- partition_processor.go
- partition_table.go
- processor.go
- producer.go
- promise.go
- proxy.go
- signal.go
- simple_backoff.go
- stats.go
- topic_manager.go
- view.go
Directories ¶
Path | Synopsis |
---|---|
examples
|
|
This package provides a kafka mock that allows integration testing of goka processors.
|
This package provides a kafka mock that allows integration testing of goka processors. |
web
|
|