Documentation ¶
Index ¶
- func RemoveSubTopo(name string)
- type SrcSubTopo
- func (s *SrcSubTopo) AddOperator(inputs []node.Emitter, operator node.OperatorNode) *SrcSubTopo
- func (s *SrcSubTopo) AddOutput(output chan<- interface{}, name string) error
- func (s *SrcSubTopo) AddSrc(src node.DataSourceNode) *SrcSubTopo
- func (s *SrcSubTopo) Close(ctx api.StreamContext, ruleId string, runId int)
- func (s *SrcSubTopo) EnableCheckpoint(sources *[]checkpoint.StreamTask, ops *[]checkpoint.NonSourceTask)
- func (s *SrcSubTopo) GetMetrics() []any
- func (s *SrcSubTopo) GetName() string
- func (s *SrcSubTopo) GetSource() node.DataSourceNode
- func (s *SrcSubTopo) LinkTopo(parentTopo *def.PrintableTopo, parentJointName string)
- func (s *SrcSubTopo) MergeSrc(parentTopo *def.PrintableTopo)
- func (s *SrcSubTopo) Open(ctx api.StreamContext, parentErrCh chan<- error)
- func (s *SrcSubTopo) OpsCount() int
- func (s *SrcSubTopo) RemoveMetrics(ruleId string)
- func (s *SrcSubTopo) RemoveOutput(name string) error
- func (s *SrcSubTopo) StoreSchema(ruleID, dataSource string, schema map[string]*ast.JsonStreamField, ...)
- func (s *SrcSubTopo) SubMetrics() (keys []string, values []any)
- type Topo
- func (s *Topo) AddOperator(inputs []node.Emitter, operator node.OperatorNode) *Topo
- func (s *Topo) AddSink(inputs []node.Emitter, snk node.DataSinkNode) *Topo
- func (s *Topo) AddSinkAlterOperator(sink *node.SinkNode, operator node.OperatorNode) *Topo
- func (s *Topo) AddSrc(src node.DataSourceNode) *Topo
- func (s *Topo) Cancel()
- func (s *Topo) EnableTracer(isEnabled bool, strategy kctx.TraceStrategy)
- func (s *Topo) GetContext() api.StreamContext
- func (s *Topo) GetCoordinator() *checkpoint.Coordinator
- func (s *Topo) GetMetrics() (keys []string, values []any)
- func (s *Topo) GetMetricsV2() map[string]map[string]any
- func (s *Topo) GetName() string
- func (s *Topo) GetStreams() []string
- func (s *Topo) GetTopo() *def.PrintableTopo
- func (s *Topo) HasOpen() bool
- func (s *Topo) IsTraceEnabled() bool
- func (s *Topo) Open() <-chan error
- func (s *Topo) RemoveMetrics()
- func (s *Topo) ResetStreamOffset(name string, input map[string]interface{}) error
- func (s *Topo) SetStreams(streams []string)
- func (s *Topo) WaitClose()
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func RemoveSubTopo ¶
func RemoveSubTopo(name string)
Types ¶
type SrcSubTopo ¶
type SrcSubTopo struct {
// contains filtered or unexported fields
}
SrcSubTopo Implements node.SourceNode
func GetOrCreateSubTopo ¶ added in v2.0.1
func GetOrCreateSubTopo(name string) (*SrcSubTopo, bool)
func (*SrcSubTopo) AddOperator ¶
func (s *SrcSubTopo) AddOperator(inputs []node.Emitter, operator node.OperatorNode) *SrcSubTopo
AddOperator adds an internal operator to the subtopo.
func (*SrcSubTopo) AddOutput ¶
func (s *SrcSubTopo) AddOutput(output chan<- interface{}, name string) error
func (*SrcSubTopo) AddSrc ¶
func (s *SrcSubTopo) AddSrc(src node.DataSourceNode) *SrcSubTopo
func (*SrcSubTopo) Close ¶
func (s *SrcSubTopo) Close(ctx api.StreamContext, ruleId string, runId int)
func (*SrcSubTopo) EnableCheckpoint ¶
func (s *SrcSubTopo) EnableCheckpoint(sources *[]checkpoint.StreamTask, ops *[]checkpoint.NonSourceTask)
func (*SrcSubTopo) GetMetrics ¶
func (s *SrcSubTopo) GetMetrics() []any
func (*SrcSubTopo) GetName ¶
func (s *SrcSubTopo) GetName() string
func (*SrcSubTopo) GetSource ¶
func (s *SrcSubTopo) GetSource() node.DataSourceNode
func (*SrcSubTopo) LinkTopo ¶
func (s *SrcSubTopo) LinkTopo(parentTopo *def.PrintableTopo, parentJointName string)
func (*SrcSubTopo) MergeSrc ¶
func (s *SrcSubTopo) MergeSrc(parentTopo *def.PrintableTopo)
func (*SrcSubTopo) Open ¶
func (s *SrcSubTopo) Open(ctx api.StreamContext, parentErrCh chan<- error)
func (*SrcSubTopo) OpsCount ¶
func (s *SrcSubTopo) OpsCount() int
func (*SrcSubTopo) RemoveMetrics ¶
func (s *SrcSubTopo) RemoveMetrics(ruleId string)
RemoveMetrics is called when the rule is deleted
func (*SrcSubTopo) RemoveOutput ¶
func (s *SrcSubTopo) RemoveOutput(name string) error
func (*SrcSubTopo) StoreSchema ¶
func (s *SrcSubTopo) StoreSchema(ruleID, dataSource string, schema map[string]*ast.JsonStreamField, isWildCard bool)
func (*SrcSubTopo) SubMetrics ¶
func (s *SrcSubTopo) SubMetrics() (keys []string, values []any)
type Topo ¶
type Topo struct {
// contains filtered or unexported fields
}
Topo is the runtime DAG for a rule It only run once. If the rule restarts, another topo is created.
func NewWithNameAndOptions ¶
func NewWithNameAndOptions(name string, options *def.RuleOption) (*Topo, error)
func (*Topo) AddOperator ¶
func (*Topo) AddSinkAlterOperator ¶
func (*Topo) Cancel ¶
func (s *Topo) Cancel()
Cancel may be called multiple times so must be idempotent
func (*Topo) EnableTracer ¶
func (s *Topo) EnableTracer(isEnabled bool, strategy kctx.TraceStrategy)
func (*Topo) GetContext ¶
func (s *Topo) GetContext() api.StreamContext
func (*Topo) GetCoordinator ¶
func (s *Topo) GetCoordinator() *checkpoint.Coordinator
func (*Topo) GetMetrics ¶
func (*Topo) GetStreams ¶
func (*Topo) GetTopo ¶
func (s *Topo) GetTopo() *def.PrintableTopo
func (*Topo) IsTraceEnabled ¶
func (*Topo) RemoveMetrics ¶
func (s *Topo) RemoveMetrics()
func (*Topo) ResetStreamOffset ¶
func (*Topo) SetStreams ¶
Click to show internal directories.
Click to hide internal directories.