Versions in this module Expand all Collapse all v2 v2.0.1 Aug 9, 2023 v2.0.0 Jun 23, 2023 Changes in this version + func NewRecordContext(record kafka.Record) context.Context + func RecordFromContext(ctx context.Context) kafka.Record + type Builder interface + Build func(ctx BuilderContext) (Topology, error) + Describe func() string + NewKSubTopologyBuilder func(kind SubTopologyKind) SubTopologyBuilder + RemoveSubTopology func(builder SubTopologyBuilder) + Reset func(ctx BuilderContext) error + SubTopologies func() SubTopologyBuilders + type BuilderContext interface + Admin func() kafka.Admin + ApplicationId func() string + Logger func() log.Logger + MetricsReporter func() metrics.Reporter + ProducerBuilder func() kafka.ProducerBuilder + StoreRegistry func() stores.Registry + func NewBuilderContext(appId string, registry stores.Registry, producer kafka.ProducerBuilder, ...) BuilderContext + type ChangeLogger interface + Log func(ctx context.Context, key, value []byte) error + type ChangelogSyncer interface + Stop func() error + Sync func(ctx context.Context, synced chan struct{}) error + type ChangelogSyncerBuilder interface + Build func(ctx SubTopologyContext, store stores.Store) (ChangelogSyncer, error) + BuildLogger func(ctx SubTopologyContext, store string) (ChangeLogger, error) + Internal func() bool + Setup func(ctx SubTopologyBuilderContext) error + Topic func() string + type CloseableNode interface + Close func() error + type DefaultNode struct + func (n *DefaultNode) AddEdge(node Node) + func (n *DefaultNode) Edges() []Node + func (n *DefaultNode) Err(message string) error + func (n *DefaultNode) Forward(ctx context.Context, kIn, vIn interface{}, cont bool) (interface{}, interface{}, bool, error) + func (n *DefaultNode) ForwardAll(ctx context.Context, kvs []KeyValPair, cont bool) (kOut interface{}, vOut interface{}, next bool, err error) + func (n *DefaultNode) Id() NodeId + func (n *DefaultNode) Ignore() (interface{}, interface{}, bool, error) + func (n *DefaultNode) IgnoreAndWrapErrWith(err error, message string) (interface{}, interface{}, bool, error) + func (n *DefaultNode) IgnoreWithError(err error) (interface{}, interface{}, bool, error) + func (n *DefaultNode) NameAs(name string) + func (n *DefaultNode) NodeName() string + func (n *DefaultNode) ReadsFrom() []string + func (n *DefaultNode) SetId(id NodeId) + func (n *DefaultNode) Setup(ctx SubTopologySetupContext) error + func (n *DefaultNode) String() string + func (n *DefaultNode) WrapErr(err error) error + func (n *DefaultNode) WrapErrWith(err error, message string) error + func (n *DefaultNode) WritesAt() []string + type Edge struct + func NewEdge(parent, node NodeBuilder) Edge + func (e Edge) Node() NodeBuilder + func (e Edge) Parent() NodeBuilder + type InitableNode interface + Init func(ctx NodeContext) error + type KeyValPair struct + Key interface{} + Value interface{} + type LoggableStateStore interface + type LoggableStoreBuilder interface + Build func(ctx SubTopologyContext) (StateStore, error) + Changelog func() ChangelogSyncerBuilder + KeyEncoder func() encoding.Encoder + Name func() string + NameFormatter func(ctx SubTopologyContext) StateStoreNameFunc + ValEncoder func() encoding.Encoder + type Node interface + AddEdge func(node Node) + Edges func() []Node + Run func(ctx base.Context, kIn, vIn interface{}) (kOut, vOut interface{}, cont bool, err error) + type NodeBuilder interface + Build func(ctx SubTopologyContext) (Node, error) + SetId func(id NodeId) + Setup func(ctx SubTopologySetupContext) error + type NodeContext interface + Store func(name string) StateStore + func NewNodeContext(parent SubTopologyContext, subTopology SubTopology) NodeContext + type NodeId struct + func NewNodeId(id int, path, subTopologyID string) NodeId + func (id NodeId) Id() int + func (id NodeId) String() string + func (id NodeId) SubTopologyId() string + type NodeInfo interface + Id func() NodeId + NameAs func(name string) + NodeName func() string + ReadsFrom func() []string + Type func() Type + WritesAt func() []string + type RecodeContext interface + type Sink interface + Close func() error + Encoder func() SinkEncoder + Topic func() string + type SinkBuilder interface + AutoCreate func() bool + Topic func() string + type SinkEncoder struct + Key encoding.Encoder + Value encoding.Encoder + type Source interface + AutoCreate func() bool + CoPartitionedWith func() Source + Encoder func() SourceEncoder + InitialOffset func() kafka.Offset + Internal func() bool + RePartitionedAs func() Source + ShouldCoPartitionedWith func(source Source) + Topic func() string + TopicConfigs func() kafka.TopicConfig + type SourceEncoder struct + Key encoding.Encoder + Value encoding.Encoder + type State struct + Store LoggableStateStore + Type StateType + type StateBuilder struct + Store LoggableStateStore + Type StateType + type StateStore interface + Flush func() error + ResetCache func() + type StateStoreNameFunc func(store string) string + type StateStoreProvider interface + Store func(ctx SubTopologyContext) LoggableStateStore + type StateType string + const StateReadOnly + const StateReadWrite + type SubTopology interface + Close func() error + Id func() SubTopologyId + Init func(ctx SubTopologyContext) error + Nodes func() []Node + Sinks func() []Sink + Source func(topic string) Source + StateStores func() map[string]StateStore + Store func(name string) StateStore + type SubTopologyBuilder interface + AddEdge func(parent, node NodeBuilder) + AddNode func(builder NodeBuilder) + AddNodeWithEdge func(node, edge NodeBuilder) + AddSource func(source Source) + AddStore func(builder LoggableStoreBuilder) + Build func(ctx SubTopologyContext) (SubTopology, error) + Edges func() []Edge + Id func() SubTopologyId + Kind func() SubTopologyKind + MergeSubTopology func(subTp SubTopologyBuilder) + NodeSource func(node NodeBuilder) Source + Nodes func() []NodeBuilder + Parent func(node NodeBuilder) NodeBuilder + RemoveAll func() + RemoveNode func(node NodeBuilder) + SetId func(SubTopologyId) + Setup func(ctx SubTopologySetupContext) error + Sources func() []Source + StateStores func() map[string]LoggableStoreBuilder + type SubTopologyBuilderContext interface + MaxPartitionCount func() int32 + type SubTopologyBuilders []SubTopologyBuilder + func (b SubTopologyBuilders) SourceTopics() []string + func (b SubTopologyBuilders) SourceTopicsFor(kind SubTopologyKind) []string + func (b SubTopologyBuilders) Topics() []string + type SubTopologyContext interface + Logger func() log.Logger + Partition func() int32 + PartitionConsumer func() kafka.PartitionConsumer + Producer func() kafka.Producer + TopicMeta func() map[string]*kafka.Topic + func NewSubTopologyContext(parent context.Context, partition int32, builderCtx BuilderContext, ...) SubTopologyContext + type SubTopologyId struct + func NewSubTopologyId(id int, name string) SubTopologyId + func (id SubTopologyId) Id() int + func (id SubTopologyId) Name() string + func (id SubTopologyId) String() string + type SubTopologyKind string + const KindGlobalTable + const KindStream + const KindTable + type SubTopologySetupContext interface + MaxPartitionCount func() int32 + TopicMeta func() map[string]*kafka.TopicConfig + func NewSubTopologySetupContext(builderCtx BuilderContext, topicMeta map[string]*kafka.TopicConfig, ...) SubTopologySetupContext + type Topology interface + Describe func() string + GlobalTableTopologies func() SubTopologyBuilders + SourceByTopic func(topic string) Source + StreamTopologies func() SubTopologyBuilders + SubTopologies func() []SubTopologyBuilder + SubTopology func(id SubTopologyId) SubTopologyBuilder + SubTopologyByTopic func(topic string) SubTopologyBuilder + type Type struct + Attrs map[string]string + Name string + type Visualizer interface + AddTopology func(node Builder) + Visualize func() (string, error) + func NewTopologyVisualizer() Visualizer