Versions in this module Expand all Collapse all v0 v0.2.0 Dec 1, 2022 v0.1.0 Dec 1, 2022 Changes in this version + var CopartitioningStrategy = new(copartitioningStrategy) + var ErrEmitterAlreadyClosed error = errors.New("emitter already closed") + var ErrVisitAborted = errors.New("VisitAll aborted due to context cancel or rebalance") + var StrictCopartitioningStrategy = &copartitioningStrategy + func Debug(gokaDebug, saramaDebug bool) + func DefaultConfig() *sarama.Config + func DefaultConsumerGroupBuilder(brokers []string, group, clientID string) (sarama.ConsumerGroup, error) + func DefaultHasher() func() hash.Hash32 + func DefaultProcessorStoragePath(group Group) string + func DefaultRebalance(a Assignment) + func DefaultSaramaConsumerBuilder(brokers []string, clientID string) (sarama.Consumer, error) + func DefaultUpdate(ctx UpdateContext, s storage.Storage, key string, value []byte) error + func DefaultViewStoragePath() string + func NewMockController(t gomock.TestReporter) *gomock.Controller + func NewPromiseWithFinisher() (*Promise, PromiseFinisher) + func ReplaceGlobalConfig(config *sarama.Config) + func ResetSuffixes() + func SetLoopSuffix(suffix string) + func SetSaramaLogger(logger Logger) + func SetTableSuffix(suffix string) + type Assignment map[int32]int64 + type Backoff interface + Duration func() time.Duration + Reset func() + func DefaultBackoffBuilder() (Backoff, error) + func NewSimpleBackoff(step time.Duration, max time.Duration) Backoff + type BackoffBuilder func() (Backoff, error) + type Broker interface + Addr func() string + Connected func() (bool, error) + CreateTopics func(request *sarama.CreateTopicsRequest) (*sarama.CreateTopicsResponse, error) + Open func(conf *sarama.Config) error + type Codec interface + Decode func(data []byte) (value interface{}, err error) + Encode func(value interface{}) (data []byte, err error) + type ConsumerGroupBuilder func(brokers []string, group, clientID string) (sarama.ConsumerGroup, error) + func ConsumerGroupBuilderWithConfig(config *sarama.Config) ConsumerGroupBuilder + type Context interface + Context func() context.Context + DeferCommit func() func(error) + Delete func(options ...ContextOption) + Emit func(topic Stream, key string, value interface{}, options ...ContextOption) + Fail func(err error) + Group func() Group + Headers func() Headers + Join func(topic Table) interface{} + Key func() string + Lookup func(topic Table, key string) interface{} + Loopback func(key string, value interface{}, options ...ContextOption) + Offset func() int64 + Partition func() int32 + SetValue func(value interface{}, options ...ContextOption) + Timestamp func() time.Time + Topic func() Stream + Value func() interface{} + type ContextOption func(*ctxOptions) + func WithCtxEmitHeaders(headers Headers) ContextOption + type DefaultUpdateContext struct + func (ctx DefaultUpdateContext) Headers() Headers + func (ctx DefaultUpdateContext) Offset() int64 + func (ctx DefaultUpdateContext) Partition() int32 + func (ctx DefaultUpdateContext) Topic() Stream + type Edge interface + Codec func() Codec + String func() string + Topic func() string + func Input(topic Stream, c Codec, cb ProcessCallback) Edge + func Inputs(topics Streams, c Codec, cb ProcessCallback) Edge + func Join(topic Table, c Codec) Edge + func Lookup(topic Table, c Codec) Edge + func Loop(c Codec, cb ProcessCallback) Edge + func Output(topic Stream, c Codec) Edge + func Persist(c Codec) Edge + func Visitor(name string, cb ProcessCallback) Edge + type Edges []Edge + func (e Edges) Topics() []string + type Emitter struct + func NewEmitter(brokers []string, topic Stream, codec Codec, options ...EmitterOption) (*Emitter, error) + func (e *Emitter) Emit(key string, msg interface{}) (*Promise, error) + func (e *Emitter) EmitSync(key string, msg interface{}) error + func (e *Emitter) EmitSyncWithHeaders(key string, msg interface{}, headers Headers) error + func (e *Emitter) EmitWithHeaders(key string, msg interface{}, headers Headers) (*Promise, error) + func (e *Emitter) Finish() error + type EmitterOption func(*eoptions, Stream, Codec) + func WithEmitterClientID(clientID string) EmitterOption + func WithEmitterDefaultHeaders(headers Headers) EmitterOption + func WithEmitterHasher(hasher func() hash.Hash32) EmitterOption + func WithEmitterLogger(l Logger) EmitterOption + func WithEmitterProducerBuilder(pb ProducerBuilder) EmitterOption + func WithEmitterTester(t Tester) EmitterOption + func WithEmitterTopicManagerBuilder(tmb TopicManagerBuilder) EmitterOption + type Getter func(string) (interface{}, error) + type Group string + type GroupGraph struct + func DefineGroup(group Group, edges ...Edge) *GroupGraph + func (gg *GroupGraph) AllEdges() Edges + func (gg *GroupGraph) Group() Group + func (gg *GroupGraph) GroupTable() Edge + func (gg *GroupGraph) InputStreams() Edges + func (gg *GroupGraph) JointTables() Edges + func (gg *GroupGraph) LookupTables() Edges + func (gg *GroupGraph) LoopStream() Edge + func (gg *GroupGraph) OutputStreams() Edges + func (gg *GroupGraph) Validate() error + type Headers map[string][]byte + func HeadersFromSarama(saramaHeaders []*sarama.RecordHeader) Headers + func (h Headers) Merged(headersList ...Headers) Headers + func (h Headers) ToSarama() []sarama.RecordHeader + func (h Headers) ToSaramaPtr() []*sarama.RecordHeader + type InputStats struct + Bytes int + Count uint + Delay time.Duration + LastOffset int64 + OffsetLag int64 + type Iterator interface + Err func() error + Key func() string + Next func() bool + Release func() + Seek func(key string) bool + Value func() (interface{}, error) + type Logger interface + Print func(...interface{}) + Printf func(string, ...interface{}) + Println func(...interface{}) + func DefaultLogger() Logger + type MockAutoConsumer struct + func NewMockAutoConsumer(t *testing.T, config *sarama.Config) *MockAutoConsumer + func (c *MockAutoConsumer) Close() error + func (c *MockAutoConsumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error) + func (c *MockAutoConsumer) ExpectConsumePartition(topic string, partition int32, offset int64) *MockAutoPartitionConsumer + func (c *MockAutoConsumer) HighWaterMarks() map[string]map[int32]int64 + func (c *MockAutoConsumer) Partitions(topic string) ([]int32, error) + func (c *MockAutoConsumer) Pause(topicPartitions map[string][]int32) + func (c *MockAutoConsumer) PauseAll() + func (c *MockAutoConsumer) Resume(topicPartitions map[string][]int32) + func (c *MockAutoConsumer) ResumeAll() + func (c *MockAutoConsumer) SetTopicMetadata(metadata map[string][]int32) + func (c *MockAutoConsumer) Topics() ([]string, error) + type MockAutoPartitionConsumer struct + func (pc *MockAutoPartitionConsumer) AsyncClose() + func (pc *MockAutoPartitionConsumer) Close() error + func (pc *MockAutoPartitionConsumer) Errors() <-chan *sarama.ConsumerError + func (pc *MockAutoPartitionConsumer) ExpectErrorsDrainedOnClose() + func (pc *MockAutoPartitionConsumer) ExpectMessagesDrainedOnClose() + func (pc *MockAutoPartitionConsumer) HighWaterMarkOffset() int64 + func (pc *MockAutoPartitionConsumer) IsPaused() bool + func (pc *MockAutoPartitionConsumer) Messages() <-chan *sarama.ConsumerMessage + func (pc *MockAutoPartitionConsumer) Pause() + func (pc *MockAutoPartitionConsumer) Resume() + func (pc *MockAutoPartitionConsumer) YieldError(err error) + func (pc *MockAutoPartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage) + type MockBroker struct + func NewMockBroker(ctrl *gomock.Controller) *MockBroker + func (m *MockBroker) Addr() string + func (m *MockBroker) Connected() (bool, error) + func (m *MockBroker) CreateTopics(arg0 *sarama.CreateTopicsRequest) (*sarama.CreateTopicsResponse, error) + func (m *MockBroker) EXPECT() *MockBrokerMockRecorder + func (m *MockBroker) Open(arg0 *sarama.Config) error + type MockBrokerMockRecorder struct + func (mr *MockBrokerMockRecorder) Addr() *gomock.Call + func (mr *MockBrokerMockRecorder) Connected() *gomock.Call + func (mr *MockBrokerMockRecorder) CreateTopics(arg0 interface{}) *gomock.Call + func (mr *MockBrokerMockRecorder) Open(arg0 interface{}) *gomock.Call + type MockClient struct + func NewMockClient(ctrl *gomock.Controller) *MockClient + func (m *MockClient) Broker(arg0 int32) (*sarama.Broker, error) + func (m *MockClient) Brokers() []*sarama.Broker + func (m *MockClient) Close() error + func (m *MockClient) Closed() bool + func (m *MockClient) Config() *sarama.Config + func (m *MockClient) Controller() (*sarama.Broker, error) + func (m *MockClient) Coordinator(arg0 string) (*sarama.Broker, error) + func (m *MockClient) EXPECT() *MockClientMockRecorder + func (m *MockClient) GetOffset(arg0 string, arg1 int32, arg2 int64) (int64, error) + func (m *MockClient) InSyncReplicas(arg0 string, arg1 int32) ([]int32, error) + func (m *MockClient) InitProducerID() (*sarama.InitProducerIDResponse, error) + func (m *MockClient) Leader(arg0 string, arg1 int32) (*sarama.Broker, error) + func (m *MockClient) OfflineReplicas(arg0 string, arg1 int32) ([]int32, error) + func (m *MockClient) Partitions(arg0 string) ([]int32, error) + func (m *MockClient) RefreshBrokers(arg0 []string) error + func (m *MockClient) RefreshController() (*sarama.Broker, error) + func (m *MockClient) RefreshCoordinator(arg0 string) error + func (m *MockClient) RefreshMetadata(arg0 ...string) error + func (m *MockClient) Replicas(arg0 string, arg1 int32) ([]int32, error) + func (m *MockClient) Topics() ([]string, error) + func (m *MockClient) WritablePartitions(arg0 string) ([]int32, error) + type MockClientMockRecorder struct + func (mr *MockClientMockRecorder) Broker(arg0 interface{}) *gomock.Call + func (mr *MockClientMockRecorder) Brokers() *gomock.Call + func (mr *MockClientMockRecorder) Close() *gomock.Call + func (mr *MockClientMockRecorder) Closed() *gomock.Call + func (mr *MockClientMockRecorder) Config() *gomock.Call + func (mr *MockClientMockRecorder) Controller() *gomock.Call + func (mr *MockClientMockRecorder) Coordinator(arg0 interface{}) *gomock.Call + func (mr *MockClientMockRecorder) GetOffset(arg0, arg1, arg2 interface{}) *gomock.Call + func (mr *MockClientMockRecorder) InSyncReplicas(arg0, arg1 interface{}) *gomock.Call + func (mr *MockClientMockRecorder) InitProducerID() *gomock.Call + func (mr *MockClientMockRecorder) Leader(arg0, arg1 interface{}) *gomock.Call + func (mr *MockClientMockRecorder) OfflineReplicas(arg0, arg1 interface{}) *gomock.Call + func (mr *MockClientMockRecorder) Partitions(arg0 interface{}) *gomock.Call + func (mr *MockClientMockRecorder) RefreshBrokers(arg0 interface{}) *gomock.Call + func (mr *MockClientMockRecorder) RefreshController() *gomock.Call + func (mr *MockClientMockRecorder) RefreshCoordinator(arg0 interface{}) *gomock.Call + func (mr *MockClientMockRecorder) RefreshMetadata(arg0 ...interface{}) *gomock.Call + func (mr *MockClientMockRecorder) Replicas(arg0, arg1 interface{}) *gomock.Call + func (mr *MockClientMockRecorder) Topics() *gomock.Call + func (mr *MockClientMockRecorder) WritablePartitions(arg0 interface{}) *gomock.Call + type MockClusterAdmin struct + func NewMockClusterAdmin(ctrl *gomock.Controller) *MockClusterAdmin + func (m *MockClusterAdmin) AlterClientQuotas(arg0 []sarama.QuotaEntityComponent, arg1 sarama.ClientQuotasOp, arg2 bool) error + func (m *MockClusterAdmin) AlterConfig(arg0 sarama.ConfigResourceType, arg1 string, arg2 map[string]*string, ...) error + func (m *MockClusterAdmin) AlterPartitionReassignments(arg0 string, arg1 [][]int32) error + func (m *MockClusterAdmin) Close() error + func (m *MockClusterAdmin) Controller() (*sarama.Broker, error) + func (m *MockClusterAdmin) CreateACL(arg0 sarama.Resource, arg1 sarama.Acl) error + func (m *MockClusterAdmin) CreateACLs(acls []*sarama.ResourceAcls) error + func (m *MockClusterAdmin) CreatePartitions(arg0 string, arg1 int32, arg2 [][]int32, arg3 bool) error + func (m *MockClusterAdmin) CreateTopic(arg0 string, arg1 *sarama.TopicDetail, arg2 bool) error + func (m *MockClusterAdmin) DeleteACL(arg0 sarama.AclFilter, arg1 bool) ([]sarama.MatchingAcl, error) + func (m *MockClusterAdmin) DeleteConsumerGroup(arg0 string) error + func (m *MockClusterAdmin) DeleteConsumerGroupOffset(arg0, arg1 string, arg2 int32) error + func (m *MockClusterAdmin) DeleteRecords(arg0 string, arg1 map[int32]int64) error + func (m *MockClusterAdmin) DeleteTopic(arg0 string) error + func (m *MockClusterAdmin) DeleteUserScramCredentials(arg0 []sarama.AlterUserScramCredentialsDelete) ([]*sarama.AlterUserScramCredentialsResult, error) + func (m *MockClusterAdmin) DescribeClientQuotas(arg0 []sarama.QuotaFilterComponent, arg1 bool) ([]sarama.DescribeClientQuotasEntry, error) + func (m *MockClusterAdmin) DescribeCluster() ([]*sarama.Broker, int32, error) + func (m *MockClusterAdmin) DescribeConfig(arg0 sarama.ConfigResource) ([]sarama.ConfigEntry, error) + func (m *MockClusterAdmin) DescribeConsumerGroups(arg0 []string) ([]*sarama.GroupDescription, error) + func (m *MockClusterAdmin) DescribeLogDirs(arg0 []int32) (map[int32][]sarama.DescribeLogDirsResponseDirMetadata, error) + func (m *MockClusterAdmin) DescribeTopics(arg0 []string) ([]*sarama.TopicMetadata, error) + func (m *MockClusterAdmin) DescribeUserScramCredentials(arg0 []string) ([]*sarama.DescribeUserScramCredentialsResult, error) + func (m *MockClusterAdmin) EXPECT() *MockClusterAdminMockRecorder + func (m *MockClusterAdmin) IncrementalAlterConfig(arg0 sarama.ConfigResourceType, arg1 string, ...) error + func (m *MockClusterAdmin) ListAcls(arg0 sarama.AclFilter) ([]sarama.ResourceAcls, error) + func (m *MockClusterAdmin) ListConsumerGroupOffsets(arg0 string, arg1 map[string][]int32) (*sarama.OffsetFetchResponse, error) + func (m *MockClusterAdmin) ListConsumerGroups() (map[string]string, error) + func (m *MockClusterAdmin) ListPartitionReassignments(arg0 string, arg1 []int32) (map[string]map[int32]*sarama.PartitionReplicaReassignmentsStatus, error) + func (m *MockClusterAdmin) ListTopics() (map[string]sarama.TopicDetail, error) + func (m *MockClusterAdmin) UpsertUserScramCredentials(arg0 []sarama.AlterUserScramCredentialsUpsert) ([]*sarama.AlterUserScramCredentialsResult, error) + type MockClusterAdminMockRecorder struct + func (mr *MockClusterAdminMockRecorder) AlterClientQuotas(arg0, arg1, arg2 interface{}) *gomock.Call + func (mr *MockClusterAdminMockRecorder) AlterConfig(arg0, arg1, arg2, arg3 interface{}) *gomock.Call + func (mr *MockClusterAdminMockRecorder) AlterPartitionReassignments(arg0, arg1 interface{}) *gomock.Call + func (mr *MockClusterAdminMockRecorder) Close() *gomock.Call + func (mr *MockClusterAdminMockRecorder) Controller() *gomock.Call + func (mr *MockClusterAdminMockRecorder) CreateACL(arg0, arg1 interface{}) *gomock.Call + func (mr *MockClusterAdminMockRecorder) CreatePartitions(arg0, arg1, arg2, arg3 interface{}) *gomock.Call + func (mr *MockClusterAdminMockRecorder) CreateTopic(arg0, arg1, arg2 interface{}) *gomock.Call + func (mr *MockClusterAdminMockRecorder) DeleteACL(arg0, arg1 interface{}) *gomock.Call + func (mr *MockClusterAdminMockRecorder) DeleteConsumerGroup(arg0 interface{}) *gomock.Call + func (mr *MockClusterAdminMockRecorder) DeleteConsumerGroupOffset(arg0, arg1, arg2 interface{}) *gomock.Call + func (mr *MockClusterAdminMockRecorder) DeleteRecords(arg0, arg1 interface{}) *gomock.Call + func (mr *MockClusterAdminMockRecorder) DeleteTopic(arg0 interface{}) *gomock.Call + func (mr *MockClusterAdminMockRecorder) DeleteUserScramCredentials(arg0 interface{}) *gomock.Call + func (mr *MockClusterAdminMockRecorder) DescribeClientQuotas(arg0, arg1 interface{}) *gomock.Call + func (mr *MockClusterAdminMockRecorder) DescribeCluster() *gomock.Call + func (mr *MockClusterAdminMockRecorder) DescribeConfig(arg0 interface{}) *gomock.Call + func (mr *MockClusterAdminMockRecorder) DescribeConsumerGroups(arg0 interface{}) *gomock.Call + func (mr *MockClusterAdminMockRecorder) DescribeLogDirs(arg0 interface{}) *gomock.Call + func (mr *MockClusterAdminMockRecorder) DescribeTopics(arg0 interface{}) *gomock.Call + func (mr *MockClusterAdminMockRecorder) DescribeUserScramCredentials(arg0 interface{}) *gomock.Call + func (mr *MockClusterAdminMockRecorder) IncrementalAlterConfig(arg0, arg1, arg2, arg3 interface{}) *gomock.Call + func (mr *MockClusterAdminMockRecorder) ListAcls(arg0 interface{}) *gomock.Call + func (mr *MockClusterAdminMockRecorder) ListConsumerGroupOffsets(arg0, arg1 interface{}) *gomock.Call + func (mr *MockClusterAdminMockRecorder) ListConsumerGroups() *gomock.Call + func (mr *MockClusterAdminMockRecorder) ListPartitionReassignments(arg0, arg1 interface{}) *gomock.Call + func (mr *MockClusterAdminMockRecorder) ListTopics() *gomock.Call + func (mr *MockClusterAdminMockRecorder) UpsertUserScramCredentials(arg0 interface{}) *gomock.Call + type MockConsumerGroup struct + func NewMockConsumerGroup(t *testing.T) *MockConsumerGroup + func (cg *MockConsumerGroup) Close() error + func (cg *MockConsumerGroup) Consume(ctx context.Context, topics []string, handler sarama.ConsumerGroupHandler) error + func (cg *MockConsumerGroup) Errors() <-chan error + func (cg *MockConsumerGroup) FailOnConsume(err error) + func (cg *MockConsumerGroup) Pause(partitions map[string][]int32) + func (cg *MockConsumerGroup) PauseAll() + func (cg *MockConsumerGroup) Resume(partitions map[string][]int32) + func (cg *MockConsumerGroup) ResumeAll() + func (cg *MockConsumerGroup) SendError(err error) + func (cg *MockConsumerGroup) SendMessage(message *sarama.ConsumerMessage) <-chan struct{} + func (cg *MockConsumerGroup) SendMessageWait(message *sarama.ConsumerMessage) + type MockConsumerGroupClaim struct + func NewMockConsumerGroupClaim(topic string, partition int32) *MockConsumerGroupClaim + func (cgc *MockConsumerGroupClaim) HighWaterMarkOffset() int64 + func (cgc *MockConsumerGroupClaim) InitialOffset() int64 + func (cgc *MockConsumerGroupClaim) Messages() <-chan *sarama.ConsumerMessage + func (cgc *MockConsumerGroupClaim) Partition() int32 + func (cgc *MockConsumerGroupClaim) Topic() string + type MockConsumerGroupSession struct + func (cgs *MockConsumerGroupSession) Claims() map[string][]int32 + func (cgs *MockConsumerGroupSession) Commit() + func (cgs *MockConsumerGroupSession) Context() context.Context + func (cgs *MockConsumerGroupSession) GenerationID() int32 + func (cgs *MockConsumerGroupSession) MarkMessage(msg *sarama.ConsumerMessage, metadata string) + func (cgs *MockConsumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string) + func (cgs *MockConsumerGroupSession) MemberID() string + func (cgs *MockConsumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string) + func (cgs *MockConsumerGroupSession) SendMessage(msg *sarama.ConsumerMessage) + type MockProducer struct + func NewMockProducer(ctrl *gomock.Controller) *MockProducer + func (m *MockProducer) Close() error + func (m *MockProducer) EXPECT() *MockProducerMockRecorder + func (m *MockProducer) Emit(arg0, arg1 string, arg2 []byte) *Promise + func (m *MockProducer) EmitWithHeaders(arg0, arg1 string, arg2 []byte, arg3 Headers) *Promise + type MockProducerMockRecorder struct + func (mr *MockProducerMockRecorder) Close() *gomock.Call + func (mr *MockProducerMockRecorder) Emit(arg0, arg1, arg2 interface{}) *gomock.Call + func (mr *MockProducerMockRecorder) EmitWithHeaders(arg0, arg1, arg2, arg3 interface{}) *gomock.Call + type MockStorage struct + func NewMockStorage(ctrl *gomock.Controller) *MockStorage + func (m *MockStorage) Close() error + func (m *MockStorage) Delete(arg0 string) error + func (m *MockStorage) EXPECT() *MockStorageMockRecorder + func (m *MockStorage) Get(arg0 string) ([]byte, error) + func (m *MockStorage) GetOffset(arg0 int64) (int64, error) + func (m *MockStorage) Has(arg0 string) (bool, error) + func (m *MockStorage) Iterator() (storage.Iterator, error) + func (m *MockStorage) IteratorWithRange(arg0, arg1 []byte) (storage.Iterator, error) + func (m *MockStorage) MarkRecovered() error + func (m *MockStorage) Open() error + func (m *MockStorage) Set(arg0 string, arg1 []byte) error + func (m *MockStorage) SetOffset(arg0 int64) error + type MockStorageMockRecorder struct + func (mr *MockStorageMockRecorder) Close() *gomock.Call + func (mr *MockStorageMockRecorder) Delete(arg0 interface{}) *gomock.Call + func (mr *MockStorageMockRecorder) Get(arg0 interface{}) *gomock.Call + func (mr *MockStorageMockRecorder) GetOffset(arg0 interface{}) *gomock.Call + func (mr *MockStorageMockRecorder) Has(arg0 interface{}) *gomock.Call + func (mr *MockStorageMockRecorder) Iterator() *gomock.Call + func (mr *MockStorageMockRecorder) IteratorWithRange(arg0, arg1 interface{}) *gomock.Call + func (mr *MockStorageMockRecorder) MarkRecovered() *gomock.Call + func (mr *MockStorageMockRecorder) Open() *gomock.Call + func (mr *MockStorageMockRecorder) Set(arg0, arg1 interface{}) *gomock.Call + func (mr *MockStorageMockRecorder) SetOffset(arg0 interface{}) *gomock.Call + type MockTopicManager struct + func NewMockTopicManager(ctrl *gomock.Controller) *MockTopicManager + func (m *MockTopicManager) Close() error + func (m *MockTopicManager) EXPECT() *MockTopicManagerMockRecorder + func (m *MockTopicManager) EnsureStreamExists(arg0 string, arg1 int) error + func (m *MockTopicManager) EnsureTableExists(arg0 string, arg1 int) error + func (m *MockTopicManager) EnsureTopicExists(arg0 string, arg1, arg2 int, arg3 map[string]string) error + func (m *MockTopicManager) GetOffset(arg0 string, arg1 int32, arg2 int64) (int64, error) + func (m *MockTopicManager) Partitions(arg0 string) ([]int32, error) + type MockTopicManagerMockRecorder struct + func (mr *MockTopicManagerMockRecorder) Close() *gomock.Call + func (mr *MockTopicManagerMockRecorder) EnsureStreamExists(arg0, arg1 interface{}) *gomock.Call + func (mr *MockTopicManagerMockRecorder) EnsureTableExists(arg0, arg1 interface{}) *gomock.Call + func (mr *MockTopicManagerMockRecorder) EnsureTopicExists(arg0, arg1, arg2, arg3 interface{}) *gomock.Call + func (mr *MockTopicManagerMockRecorder) GetOffset(arg0, arg1, arg2 interface{}) *gomock.Call + func (mr *MockTopicManagerMockRecorder) Partitions(arg0 interface{}) *gomock.Call + type NilHandling int + const NilDecode + const NilIgnore + const NilProcess + type OutputStats struct + Bytes int + Count uint + type PPRunMode int + type PartitionProcStats struct + Input map[string]*InputStats + Joined map[string]*TableStats + Now time.Time + Output map[string]*OutputStats + TableStats *TableStats + type PartitionProcessor struct + func (pp *PartitionProcessor) Recovered() bool + func (pp *PartitionProcessor) Start(setupCtx, ctx context.Context) error + func (pp *PartitionProcessor) Stop() error + func (pp *PartitionProcessor) VisitValues(ctx context.Context, name string, meta interface{}, visited *int64) error + type PartitionStatus int + const PartitionConnecting + const PartitionInitializing + const PartitionPreparing + const PartitionRecovering + const PartitionRunning + const PartitionStopped + type PartitionTable struct + func (p *PartitionTable) CatchupForever(ctx context.Context, restartOnError bool) error + func (p *PartitionTable) Close() error + func (p *PartitionTable) CurrentState() PartitionStatus + func (p *PartitionTable) Delete(key string) error + func (p *PartitionTable) Get(key string) ([]byte, error) + func (p *PartitionTable) GetOffset(defValue int64) (int64, error) + func (p *PartitionTable) Has(key string) (bool, error) + func (p *PartitionTable) IsRecovered() bool + func (p *PartitionTable) Iterator() (storage.Iterator, error) + func (p *PartitionTable) IteratorWithRange(start []byte, limit []byte) (storage.Iterator, error) + func (p *PartitionTable) RunStatsLoop(ctx context.Context) + func (p *PartitionTable) Set(key string, value []byte) error + func (p *PartitionTable) SetOffset(value int64) error + func (p *PartitionTable) SetupAndRecover(ctx context.Context, restartOnError bool) error + func (p *PartitionTable) TrackMessageWrite(ctx context.Context, length int) + func (p *PartitionTable) WaitRecovered() <-chan struct{} + type ProcessCallback func(ctx Context, msg interface{}) + type Processor struct + func NewProcessor(brokers []string, gg *GroupGraph, options ...ProcessorOption) (*Processor, error) + func (g *Processor) Cleanup(session sarama.ConsumerGroupSession) error + func (g *Processor) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error + func (g *Processor) Get(key string) (interface{}, error) + func (g *Processor) Graph() *GroupGraph + func (g *Processor) PauseAll() + func (g *Processor) Recovered() bool + func (g *Processor) ResumeAll() + func (g *Processor) Run(ctx context.Context) (rerr error) + func (g *Processor) Setup(session sarama.ConsumerGroupSession) error + func (g *Processor) StateReader() StateReader + func (g *Processor) Stats() *ProcessorStats + func (g *Processor) StatsWithContext(ctx context.Context) *ProcessorStats + func (g *Processor) Stop() + func (g *Processor) VisitAll(ctx context.Context, name string, meta interface{}) error + func (g *Processor) VisitAllWithStats(ctx context.Context, name string, meta interface{}) (int64, error) + func (g *Processor) WaitForReady() + func (g *Processor) WaitForReadyContext(ctx context.Context) error + type ProcessorOption func(*poptions, *GroupGraph) + func WithBackoffBuilder(bb BackoffBuilder) ProcessorOption + func WithBackoffResetTimeout(duration time.Duration) ProcessorOption + func WithClientID(clientID string) ProcessorOption + func WithConsumerGroupBuilder(cgb ConsumerGroupBuilder) ProcessorOption + func WithConsumerSaramaBuilder(cgb SaramaConsumerBuilder) ProcessorOption + func WithGroupGraphHook(hook func(gg *GroupGraph)) ProcessorOption + func WithHasher(hasher func() hash.Hash32) ProcessorOption + func WithHotStandby() ProcessorOption + func WithLogger(l Logger) ProcessorOption + func WithNilHandling(nh NilHandling) ProcessorOption + func WithPartitionChannelSize(size int) ProcessorOption + func WithProducerBuilder(pb ProducerBuilder) ProcessorOption + func WithProducerDefaultHeaders(hdr Headers) ProcessorOption + func WithRebalanceCallback(cb RebalanceCallback) ProcessorOption + func WithRecoverAhead() ProcessorOption + func WithStorageBuilder(sb storage.Builder) ProcessorOption + func WithTester(t Tester) ProcessorOption + func WithTopicManagerBuilder(tmb TopicManagerBuilder) ProcessorOption + func WithUpdateCallback(cb UpdateCallback) ProcessorOption + type ProcessorStats struct + Group map[int32]*PartitionProcStats + Lookup map[string]*ViewStats + type Producer interface + Close func() error + Emit func(topic string, key string, value []byte) *Promise + EmitWithHeaders func(topic string, key string, value []byte, headers Headers) *Promise + func DefaultProducerBuilder(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error) + func NewProducer(brokers []string, config *sarama.Config) (Producer, error) + type ProducerBuilder func(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error) + func ProducerBuilderWithConfig(config *sarama.Config) ProducerBuilder + type Promise struct + func NewPromise() *Promise + func (p *Promise) Then(callback func(err error)) *Promise + func (p *Promise) ThenWithMessage(callback func(msg *sarama.ProducerMessage, err error)) *Promise + type PromiseFinisher func(msg *sarama.ProducerMessage, err error) *Promise + type RebalanceCallback func(a Assignment) + type RecoveryStats struct + Hwm int64 + Offset int64 + RecoveryTime time.Time + StartTime time.Time + type SaramaConsumerBuilder func(brokers []string, clientID string) (sarama.Consumer, error) + func SaramaConsumerBuilderWithConfig(config *sarama.Config) SaramaConsumerBuilder + type Signal struct + func NewSignal(states ...State) *Signal + func (s *Signal) IsState(state State) bool + func (s *Signal) ObserveStateChange() *StateChangeObserver + func (s *Signal) SetState(state State) *Signal + func (s *Signal) State() State + func (s *Signal) WaitForState(state State) <-chan struct{} + func (s *Signal) WaitForStateMin(state State) <-chan struct{} + type State int + const PPStateIdle + const PPStateRecovering + const PPStateRunning + const PPStateStopped + const PPStateStopping + const ProcStateIdle + const ProcStateRunning + const ProcStateSetup + const ProcStateStarting + const ProcStateStopping + type StateChangeObserver struct + func (s *StateChangeObserver) C() <-chan State + func (s *StateChangeObserver) Stop() + type StateReader interface + IsState func(State) bool + ObserveStateChange func() *StateChangeObserver + State func() State + WaitForState func(state State) <-chan struct{} + WaitForStateMin func(state State) <-chan struct{} + type Stream string + type Streams []Stream + func StringsToStreams(strings ...string) Streams + type TMConfigMismatchBehavior int + const TMConfigMismatchBehaviorFail + const TMConfigMismatchBehaviorIgnore + const TMConfigMismatchBehaviorWarn + type Table string + func GroupTable(group Group) Table + type TableStats struct + Input *InputStats + Recovery *RecoveryStats + RunMode PPRunMode + Stalled bool + Status PartitionStatus + Writes *OutputStats + type Tester interface + ConsumerBuilder func() SaramaConsumerBuilder + ConsumerGroupBuilder func() ConsumerGroupBuilder + EmitterProducerBuilder func() ProducerBuilder + ProducerBuilder func() ProducerBuilder + RegisterEmitter func(Stream, Codec) + RegisterGroupGraph func(*GroupGraph) string + RegisterView func(Table, Codec) string + StorageBuilder func() storage.Builder + TopicManagerBuilder func() TopicManagerBuilder + type TopicManager interface + Close func() error + EnsureStreamExists func(topic string, npar int) error + EnsureTableExists func(topic string, npar int) error + EnsureTopicExists func(topic string, npar, rfactor int, config map[string]string) error + GetOffset func(topic string, partitionID int32, time int64) (int64, error) + Partitions func(topic string) ([]int32, error) + func DefaultTopicManagerBuilder(brokers []string) (TopicManager, error) + func NewTopicManager(brokers []string, saramaConfig *sarama.Config, ...) (TopicManager, error) + type TopicManagerBuilder func(brokers []string) (TopicManager, error) + func TopicManagerBuilderWithConfig(config *sarama.Config, tmConfig *TopicManagerConfig) TopicManagerBuilder + func TopicManagerBuilderWithTopicManagerConfig(tmConfig *TopicManagerConfig) TopicManagerBuilder + type TopicManagerConfig struct + CreateTopicTimeout time.Duration + Logger logger + MismatchBehavior TMConfigMismatchBehavior + Stream struct{ ... } + Table struct{ ... } + func NewTopicManagerConfig() *TopicManagerConfig + type UpdateCallback func(ctx UpdateContext, s storage.Storage, key string, value []byte) error + type UpdateContext interface + Headers func() Headers + Offset func() int64 + Partition func() int32 + Topic func() Stream + type View struct + func NewView(brokers []string, topic Table, codec Codec, options ...ViewOption) (*View, error) + func (v *View) CurrentState() ViewState + func (v *View) Evict(key string) error + func (v *View) Get(key string) (interface{}, error) + func (v *View) Has(key string) (bool, error) + func (v *View) Iterator() (Iterator, error) + func (v *View) IteratorWithRange(start, limit string) (Iterator, error) + func (v *View) ObserveStateChanges() *StateChangeObserver + func (v *View) Recovered() bool + func (v *View) Run(ctx context.Context) (rerr error) + func (v *View) Stats(ctx context.Context) *ViewStats + func (v *View) Topic() string + func (v *View) WaitRunning() <-chan struct{} + type ViewOption func(*voptions, Table, Codec) + func WithViewAutoReconnect() ViewOption + func WithViewBackoffBuilder(bb BackoffBuilder) ViewOption + func WithViewBackoffResetTimeout(duration time.Duration) ViewOption + func WithViewCallback(cb UpdateCallback) ViewOption + func WithViewClientID(clientID string) ViewOption + func WithViewConsumerSaramaBuilder(cgb SaramaConsumerBuilder) ViewOption + func WithViewHasher(hasher func() hash.Hash32) ViewOption + func WithViewLogger(l Logger) ViewOption + func WithViewRestartable() ViewOption + func WithViewStorageBuilder(sb storage.Builder) ViewOption + func WithViewTester(t Tester) ViewOption + func WithViewTopicManagerBuilder(tmb TopicManagerBuilder) ViewOption + type ViewState int + const ViewStateCatchUp + const ViewStateConnecting + const ViewStateIdle + const ViewStateInitializing + const ViewStateRunning + type ViewStats struct + Partitions map[int32]*TableStats