Documentation ¶
Index ¶
- func NewRecordContext(record kafka.Record) context.Context
- func RecordFromContext(ctx context.Context) kafka.Record
- type Builder
- type BuilderContext
- type ChangeLogger
- type ChangelogSyncer
- type ChangelogSyncerBuilder
- type CloseableNode
- type DefaultNode
- func (n *DefaultNode) AddEdge(node Node)
- func (n *DefaultNode) Edges() []Node
- func (n *DefaultNode) Err(message string) error
- func (n *DefaultNode) Forward(ctx context.Context, kIn, vIn interface{}, cont bool) (interface{}, interface{}, bool, error)
- func (n *DefaultNode) ForwardAll(ctx context.Context, kvs []KeyValPair, cont bool) (kOut interface{}, vOut interface{}, next bool, err error)
- func (n *DefaultNode) Id() NodeId
- func (n *DefaultNode) Ignore() (interface{}, interface{}, bool, error)
- func (n *DefaultNode) IgnoreAndWrapErrWith(err error, message string) (interface{}, interface{}, bool, error)
- func (n *DefaultNode) IgnoreWithError(err error) (interface{}, interface{}, bool, error)
- func (n *DefaultNode) NameAs(name string)
- func (n *DefaultNode) NodeName() string
- func (n *DefaultNode) ReadsFrom() []string
- func (n *DefaultNode) SetId(id NodeId)
- func (n *DefaultNode) Setup(ctx SubTopologySetupContext) error
- func (n *DefaultNode) String() string
- func (n *DefaultNode) WrapErr(err error) error
- func (n *DefaultNode) WrapErrWith(err error, message string) error
- func (n *DefaultNode) WritesAt() []string
- type Edge
- type InitableNode
- type KeyValPair
- type LoggableStateStore
- type LoggableStoreBuilder
- type Node
- type NodeBuilder
- type NodeContext
- type NodeId
- type NodeInfo
- type RecodeContext
- type Sink
- type SinkBuilder
- type SinkEncoder
- type Source
- type SourceEncoder
- type State
- type StateBuilder
- type StateStore
- type StateStoreNameFunc
- type StateStoreProvider
- type StateType
- type SubTopology
- type SubTopologyBuilder
- type SubTopologyBuilderContext
- type SubTopologyBuilders
- type SubTopologyContext
- type SubTopologyId
- type SubTopologyKind
- type SubTopologySetupContext
- type Topology
- type Type
- type Visualizer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Builder ¶
type Builder interface { NewKSubTopologyBuilder(kind SubTopologyKind) SubTopologyBuilder RemoveSubTopology(builder SubTopologyBuilder) SubTopologies() SubTopologyBuilders Build(ctx BuilderContext) (Topology, error) Describe() string Reset(ctx BuilderContext) error }
type BuilderContext ¶
type BuilderContext interface { StoreRegistry() stores.Registry ProducerBuilder() kafka.ProducerBuilder Admin() kafka.Admin ApplicationId() string Logger() log.Logger MetricsReporter() metrics.Reporter }
func NewBuilderContext ¶
type ChangeLogger ¶
type ChangelogSyncer ¶
type ChangelogSyncerBuilder ¶
type ChangelogSyncerBuilder interface { // Setup setups the changelog by creating changelog topics and offset stores Setup(ctx SubTopologyBuilderContext) error Build(ctx SubTopologyContext, store stores.Store) (ChangelogSyncer, error) BuildLogger(ctx SubTopologyContext, store string) (ChangeLogger, error) Internal() bool Topic() string }
type CloseableNode ¶
type DefaultNode ¶
type DefaultNode struct {
// contains filtered or unexported fields
}
func (*DefaultNode) AddEdge ¶
func (n *DefaultNode) AddEdge(node Node)
func (*DefaultNode) Edges ¶
func (n *DefaultNode) Edges() []Node
func (*DefaultNode) Err ¶
func (n *DefaultNode) Err(message string) error
func (*DefaultNode) ForwardAll ¶
func (n *DefaultNode) ForwardAll(ctx context.Context, kvs []KeyValPair, cont bool) (kOut interface{}, vOut interface{}, next bool, err error)
func (*DefaultNode) Id ¶
func (n *DefaultNode) Id() NodeId
func (*DefaultNode) Ignore ¶
func (n *DefaultNode) Ignore() (interface{}, interface{}, bool, error)
func (*DefaultNode) IgnoreAndWrapErrWith ¶
func (n *DefaultNode) IgnoreAndWrapErrWith(err error, message string) (interface{}, interface{}, bool, error)
func (*DefaultNode) IgnoreWithError ¶
func (n *DefaultNode) IgnoreWithError(err error) (interface{}, interface{}, bool, error)
func (*DefaultNode) NameAs ¶
func (n *DefaultNode) NameAs(name string)
func (*DefaultNode) NodeName ¶
func (n *DefaultNode) NodeName() string
func (*DefaultNode) ReadsFrom ¶
func (n *DefaultNode) ReadsFrom() []string
func (*DefaultNode) SetId ¶
func (n *DefaultNode) SetId(id NodeId)
func (*DefaultNode) Setup ¶
func (n *DefaultNode) Setup(ctx SubTopologySetupContext) error
func (*DefaultNode) String ¶
func (n *DefaultNode) String() string
func (*DefaultNode) WrapErr ¶
func (n *DefaultNode) WrapErr(err error) error
func (*DefaultNode) WrapErrWith ¶
func (n *DefaultNode) WrapErrWith(err error, message string) error
func (*DefaultNode) WritesAt ¶
func (n *DefaultNode) WritesAt() []string
type Edge ¶
type Edge struct {
// contains filtered or unexported fields
}
func NewEdge ¶
func NewEdge(parent, node NodeBuilder) Edge
func (Edge) Node ¶
func (e Edge) Node() NodeBuilder
func (Edge) Parent ¶
func (e Edge) Parent() NodeBuilder
type InitableNode ¶
type InitableNode interface { Node // Init is called once the Node build is completed and before message processing starts. // Please refer SubTopology.Init() Init(ctx NodeContext) error }
type KeyValPair ¶
type KeyValPair struct { Key interface{} Value interface{} }
type LoggableStateStore ¶
type LoggableStateStore interface { StateStore ChangeLogger }
type LoggableStoreBuilder ¶
type LoggableStoreBuilder interface { Name() string NameFormatter(ctx SubTopologyContext) StateStoreNameFunc KeyEncoder() encoding.Encoder ValEncoder() encoding.Encoder Build(ctx SubTopologyContext) (StateStore, error) Changelog() ChangelogSyncerBuilder }
type NodeBuilder ¶
type NodeBuilder interface { NodeInfo SetId(id NodeId) // Setup is called once when stream app starting (SubTopologyBuilder.Setup()) // Eg usage: create changelog topics for node Setup(ctx SubTopologySetupContext) error // Build calls with every Consumer PartitionAssignEvent and this method is responsible to create a new instance of // the node Build(ctx SubTopologyContext) (Node, error) }
type NodeContext ¶
type NodeContext interface { SubTopologyContext Store(name string) StateStore }
func NewNodeContext ¶
func NewNodeContext(parent SubTopologyContext, subTopology SubTopology) NodeContext
type NodeId ¶
type NodeId struct {
// contains filtered or unexported fields
}
func (NodeId) SubTopologyId ¶
type RecodeContext ¶
type Sink ¶
type Sink interface { Node Encoder() SinkEncoder Topic() string // Close closes the source buffers Close() error }
type SinkBuilder ¶
type SinkBuilder interface { NodeBuilder Topic() string AutoCreate() bool }
type SinkEncoder ¶
type Source ¶
type Source interface { Node NodeBuilder Encoder() SourceEncoder Topic() string ShouldCoPartitionedWith(source Source) TopicConfigs() kafka.TopicConfig CoPartitionedWith() Source RePartitionedAs() Source AutoCreate() bool Internal() bool InitialOffset() kafka.Offset }
type SourceEncoder ¶
type State ¶
type State struct { Type StateType Store LoggableStateStore }
type StateBuilder ¶
type StateBuilder struct { Type StateType Store LoggableStateStore }
type StateStore ¶
type StateStore interface { stores.Store // Flush flashes the records in buffer to stores.Store Flush() error ResetCache() ChangelogSyncer }
type StateStoreNameFunc ¶
type StateStoreProvider ¶
type StateStoreProvider interface {
Store(ctx SubTopologyContext) LoggableStateStore
}
type SubTopology ¶
type SubTopology interface { Id() SubTopologyId Sinks() []Sink Source(topic string) Source Nodes() []Node Init(ctx SubTopologyContext) error Close() error Store(name string) StateStore StateStores() map[string]StateStore }
type SubTopologyBuilder ¶
type SubTopologyBuilder interface { Id() SubTopologyId SetId(SubTopologyId) AddNode(builder NodeBuilder) AddNodeWithEdge(node, edge NodeBuilder) Parent(node NodeBuilder) NodeBuilder NodeSource(node NodeBuilder) Source Setup(ctx SubTopologySetupContext) error Build(ctx SubTopologyContext) (SubTopology, error) AddEdge(parent, node NodeBuilder) RemoveNode(node NodeBuilder) RemoveAll() Edges() []Edge Sources() []Source MergeSubTopology(subTp SubTopologyBuilder) Nodes() []NodeBuilder AddStore(builder LoggableStoreBuilder) StateStores() map[string]LoggableStoreBuilder AddSource(source Source) Kind() SubTopologyKind }
type SubTopologyBuilderContext ¶
type SubTopologyBuilderContext interface { BuilderContext MaxPartitionCount() int32 }
type SubTopologyBuilders ¶
type SubTopologyBuilders []SubTopologyBuilder
func (SubTopologyBuilders) SourceTopics ¶
func (b SubTopologyBuilders) SourceTopics() []string
func (SubTopologyBuilders) SourceTopicsFor ¶
func (b SubTopologyBuilders) SourceTopicsFor(kind SubTopologyKind) []string
func (SubTopologyBuilders) Topics ¶
func (b SubTopologyBuilders) Topics() []string
type SubTopologyContext ¶
type SubTopologyContext interface { BuilderContext context.Context PartitionConsumer() kafka.PartitionConsumer Partition() int32 Producer() kafka.Producer Logger() log.Logger TopicMeta() map[string]*kafka.Topic }
func NewSubTopologyContext ¶
func NewSubTopologyContext( parent context.Context, partition int32, builderCtx BuilderContext, producer kafka.Producer, partitionConsumer kafka.PartitionConsumer, logger log.Logger, topicMeta map[string]*kafka.Topic, ) SubTopologyContext
type SubTopologyId ¶
type SubTopologyId struct {
// contains filtered or unexported fields
}
func NewSubTopologyId ¶
func NewSubTopologyId(id int, name string) SubTopologyId
func (SubTopologyId) Id ¶
func (id SubTopologyId) Id() int
func (SubTopologyId) Name ¶
func (id SubTopologyId) Name() string
func (SubTopologyId) String ¶
func (id SubTopologyId) String() string
type SubTopologyKind ¶
type SubTopologyKind string
const ( KindStream SubTopologyKind = `stream` KindTable SubTopologyKind = `table` KindGlobalTable SubTopologyKind = `global-table` )
type SubTopologySetupContext ¶
type SubTopologySetupContext interface { BuilderContext MaxPartitionCount() int32 TopicMeta() map[string]*kafka.TopicConfig }
func NewSubTopologySetupContext ¶
func NewSubTopologySetupContext( builderCtx BuilderContext, topicMeta map[string]*kafka.TopicConfig, maxPartitions int32, ) SubTopologySetupContext
type Topology ¶
type Topology interface { SubTopologies() []SubTopologyBuilder SubTopology(id SubTopologyId) SubTopologyBuilder SubTopologyByTopic(topic string) SubTopologyBuilder StreamTopologies() SubTopologyBuilders SourceByTopic(topic string) Source GlobalTableTopologies() SubTopologyBuilders Describe() string }
type Visualizer ¶
func NewTopologyVisualizer ¶
func NewTopologyVisualizer() Visualizer
Source Files ¶
Click to show internal directories.
Click to hide internal directories.