Documentation ¶
Index ¶
- Constants
- func LookupPing(lookupType string, config map[string]interface{}) error
- func SinkPing(sinkType string, config map[string]interface{}) error
- func SourcePing(sourceType string, config map[string]interface{}) error
- type BatchOp
- type DataSourceNode
- type DecodeOp
- func (o DecodeOp) AddInputCount()
- func (o *DecodeOp) AttachSchema(ctx api.StreamContext, dataSource string, ...)
- func (o *DecodeOp) DetachSchema(ruleId string)
- func (o *DecodeOp) Exec(ctx api.StreamContext, errCh chan<- error)
- func (o DecodeOp) GetInput() (chan<- interface{}, string)
- func (o DecodeOp) GetInputCount() int
- func (o DecodeOp) SetBarrierHandler(bh checkpoint.BarrierHandler)
- func (o *DecodeOp) Worker(item any) []any
- type DecompressOp
- func (o DecompressOp) AddInputCount()
- func (o *DecompressOp) Exec(ctx api.StreamContext, errCh chan<- error)
- func (o DecompressOp) GetInput() (chan<- interface{}, string)
- func (o DecompressOp) GetInputCount() int
- func (o DecompressOp) SetBarrierHandler(bh checkpoint.BarrierHandler)
- func (o *DecompressOp) Worker(item any) []any
- type DynamicChannelBuffer
- type EventTimeTrigger
- type JoinAlignNode
- type LookupConf
- type LookupNode
- type MergeableTopo
- type OperatorNode
- type SchemaNode
- type SinkConf
- type SinkNode
- func (o SinkNode) AddInputCount()
- func (m *SinkNode) AddOutput(_ chan<- interface{}, name string) error
- func (m *SinkNode) Broadcast(_ interface{})
- func (o SinkNode) GetInput() (chan<- interface{}, string)
- func (o SinkNode) GetInputCount() int
- func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error)
- func (o SinkNode) SetBarrierHandler(bh checkpoint.BarrierHandler)
- type SourceConnectorNode
- func (o SourceConnectorNode) AddOutput(output chan<- interface{}, name string) error
- func (o SourceConnectorNode) Broadcast(val interface{})
- func (o SourceConnectorNode) GetMetrics() []any
- func (o SourceConnectorNode) GetName() string
- func (o SourceConnectorNode) GetStreamContext() api.StreamContext
- func (m *SourceConnectorNode) Open(ctx api.StreamContext, ctrlCh chan<- error)
- func (o SourceConnectorNode) RemoveMetrics(ruleId string)
- func (m *SourceConnectorNode) Run(ctx api.StreamContext, ctrlCh chan<- error)
- func (o SourceConnectorNode) SetQos(qos api.Qos)
- type SourceInstanceNode
- type SourceNode
- func (o SourceNode) AddOutput(output chan<- interface{}, name string) error
- func (o SourceNode) Broadcast(val interface{})
- func (o SourceNode) GetMetrics() []any
- func (o SourceNode) GetName() string
- func (m *SourceNode) GetSource() api.Source
- func (o SourceNode) GetStreamContext() api.StreamContext
- func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error)
- func (o SourceNode) RemoveMetrics(ruleId string)
- func (m *SourceNode) SetProps(props map[string]interface{})
- func (o SourceNode) SetQos(qos api.Qos)
- type SwitchConfig
- type SwitchNode
- func (o SwitchNode) AddInputCount()
- func (n *SwitchNode) AddOutput(output chan<- interface{}, name string) error
- func (n *SwitchNode) Exec(ctx api.StreamContext, errCh chan<- error)
- func (n *SwitchNode) GetEmitter(outputIndex int) api.Emitter
- func (o SwitchNode) GetInput() (chan<- interface{}, string)
- func (o SwitchNode) GetInputCount() int
- func (o SwitchNode) SetBarrierHandler(bh checkpoint.BarrierHandler)
- type TupleList
- type UnFunc
- type UnOperation
- type UnaryOperator
- func (o UnaryOperator) AddInputCount()
- func (o *UnaryOperator) Exec(ctx api.StreamContext, errCh chan<- error)
- func (o UnaryOperator) GetInput() (chan<- interface{}, string)
- func (o UnaryOperator) GetInputCount() int
- func (o UnaryOperator) SetBarrierHandler(bh checkpoint.BarrierHandler)
- func (o *UnaryOperator) SetOperation(op UnOperation)
- type WatermarkOp
- type WindowConfig
- type WindowOperator
Constants ¶
const ( WatermarkKey = "$$wartermark" EventInputKey = "$$eventinputs" StreamWMKey = "$$streamwms" )
const ( WindowInputsKey = "$$windowInputs" TriggerTimeKey = "$$triggerTime" MsgCountKey = "$$msgCount" )
const BatchKey = "$$batchInputs"
const OffsetKey = "$$offset"
Variables ¶
This section is empty.
Functions ¶
func LookupPing ¶
func SourcePing ¶
Types ¶
type BatchOp ¶
type BatchOp struct {
// contains filtered or unexported fields
}
func NewBatchOp ¶
func (BatchOp) AddInputCount ¶
func (o BatchOp) AddInputCount()
func (BatchOp) GetInputCount ¶
func (o BatchOp) GetInputCount() int
func (BatchOp) SetBarrierHandler ¶
func (o BatchOp) SetBarrierHandler(bh checkpoint.BarrierHandler)
type DataSourceNode ¶
type DecodeOp ¶
type DecodeOp struct {
// contains filtered or unexported fields
}
func NewDecodeOp ¶
func (DecodeOp) AddInputCount ¶
func (o DecodeOp) AddInputCount()
func (*DecodeOp) AttachSchema ¶
func (o *DecodeOp) AttachSchema(ctx api.StreamContext, dataSource string, schema map[string]*ast.JsonStreamField, isWildcard bool)
func (*DecodeOp) DetachSchema ¶
func (*DecodeOp) Exec ¶
func (o *DecodeOp) Exec(ctx api.StreamContext, errCh chan<- error)
Exec decode op receives raw data and converts it to message
func (DecodeOp) GetInputCount ¶
func (o DecodeOp) GetInputCount() int
func (DecodeOp) SetBarrierHandler ¶
func (o DecodeOp) SetBarrierHandler(bh checkpoint.BarrierHandler)
type DecompressOp ¶
type DecompressOp struct {
// contains filtered or unexported fields
}
func NewDecompressOp ¶
func NewDecompressOp(name string, rOpt *api.RuleOption, compressMethod string) (*DecompressOp, error)
func (DecompressOp) AddInputCount ¶
func (o DecompressOp) AddInputCount()
func (*DecompressOp) Exec ¶
func (o *DecompressOp) Exec(ctx api.StreamContext, errCh chan<- error)
func (DecompressOp) GetInputCount ¶
func (o DecompressOp) GetInputCount() int
func (DecompressOp) SetBarrierHandler ¶
func (o DecompressOp) SetBarrierHandler(bh checkpoint.BarrierHandler)
func (*DecompressOp) Worker ¶
func (o *DecompressOp) Worker(item any) []any
type DynamicChannelBuffer ¶
type DynamicChannelBuffer struct { In chan api.SourceTuple Out chan api.SourceTuple // contains filtered or unexported fields }
func NewDynamicChannelBuffer ¶
func NewDynamicChannelBuffer() *DynamicChannelBuffer
func (*DynamicChannelBuffer) Close ¶
func (b *DynamicChannelBuffer) Close()
func (*DynamicChannelBuffer) GetLength ¶
func (b *DynamicChannelBuffer) GetLength() int
func (*DynamicChannelBuffer) SetLimit ¶
func (b *DynamicChannelBuffer) SetLimit(limit int)
type EventTimeTrigger ¶
type EventTimeTrigger struct {
// contains filtered or unexported fields
}
EventTimeTrigger scans the input tuples and find out the tuples in the current window The inputs are sorted by watermark op
func NewEventTimeTrigger ¶
func NewEventTimeTrigger(window *WindowConfig) (*EventTimeTrigger, error)
type JoinAlignNode ¶
type JoinAlignNode struct {
// contains filtered or unexported fields
}
JoinAlignNode will block the stream and buffer all the table tuples. Once buffered, it will combine the later input with the buffer The input for batch table MUST be *WindowTuples
func NewJoinAlignNode ¶
func NewJoinAlignNode(name string, emitters []string, options *api.RuleOption) (*JoinAlignNode, error)
func (JoinAlignNode) AddInputCount ¶
func (o JoinAlignNode) AddInputCount()
func (*JoinAlignNode) Exec ¶
func (n *JoinAlignNode) Exec(ctx api.StreamContext, errCh chan<- error)
func (JoinAlignNode) GetInputCount ¶
func (o JoinAlignNode) GetInputCount() int
func (JoinAlignNode) SetBarrierHandler ¶
func (o JoinAlignNode) SetBarrierHandler(bh checkpoint.BarrierHandler)
type LookupConf ¶
type LookupNode ¶
type LookupNode struct {
// contains filtered or unexported fields
}
LookupNode will look up the data from the external source when receiving an event
func NewLookupNode ¶
func (LookupNode) AddInputCount ¶
func (o LookupNode) AddInputCount()
func (*LookupNode) Exec ¶
func (n *LookupNode) Exec(ctx api.StreamContext, errCh chan<- error)
func (LookupNode) GetInputCount ¶
func (o LookupNode) GetInputCount() int
func (LookupNode) SetBarrierHandler ¶
func (o LookupNode) SetBarrierHandler(bh checkpoint.BarrierHandler)
type MergeableTopo ¶
type MergeableTopo interface { GetSource() DataSourceNode // MergeSrc Add child topo as the source with following operators MergeSrc(parentTopo *api.PrintableTopo) // LinkTopo Add printable topo link from the parent topo to the child topo LinkTopo(parentTopo *api.PrintableTopo, parentJointName string) // SubMetrics return the metrics of the sub nodes SubMetrics() ([]string, []any) // Close notifies subtopo to deref Close(ruleId string) }
type OperatorNode ¶
type OperatorNode interface { api.Operator Broadcast(data interface{}) GetStreamContext() api.StreamContext GetInputCount() int AddInputCount() SetQos(api.Qos) SetBarrierHandler(checkpoint.BarrierHandler) RemoveMetrics(name string) }
type SchemaNode ¶
type SchemaNode interface { // AttachSchema attach the schema to the node. The parameters are ruleId, sourceName, schema, whether is wildcard AttachSchema(api.StreamContext, string, map[string]*ast.JsonStreamField, bool) // DetachSchema detach the schema from the node. The parameters are ruleId DetachSchema(string) }
type SinkConf ¶
type SinkConf struct { Concurrency int `json:"concurrency"` Omitempty bool `json:"omitIfEmpty"` SendSingle bool `json:"sendSingle"` DataTemplate string `json:"dataTemplate"` Format string `json:"format"` SchemaId string `json:"schemaId"` Delimiter string `json:"delimiter"` BufferLength int `json:"bufferLength"` Fields []string `json:"fields"` DataField string `json:"dataField"` BatchSize int `json:"batchSize"` LingerInterval int `json:"lingerInterval"` conf.SinkConf }
type SinkNode ¶
type SinkNode struct {
// contains filtered or unexported fields
}
func NewSinkNode ¶
func NewSinkNodeWithSink ¶
NewSinkNodeWithSink Only for mock source, do not use it in production
func (SinkNode) AddInputCount ¶
func (o SinkNode) AddInputCount()
func (*SinkNode) Broadcast ¶
func (m *SinkNode) Broadcast(_ interface{})
Broadcast Override defaultNode
func (SinkNode) GetInputCount ¶
func (o SinkNode) GetInputCount() int
func (SinkNode) SetBarrierHandler ¶
func (o SinkNode) SetBarrierHandler(bh checkpoint.BarrierHandler)
type SourceConnectorNode ¶
type SourceConnectorNode struct {
// contains filtered or unexported fields
}
SourceConnectorNode is a node that connects to an external source The SourceNode is an all-in-one source node that support connect and decode and more. The SourceConnectorNode is a node that only connects to external source and does not decode.
func NewSourceConnectorNode ¶
func NewSourceConnectorNode(name string, ss api.SourceConnector, dataSource string, props map[string]any, rOpt *api.RuleOption) (*SourceConnectorNode, error)
NewSourceConnectorNode creates a SourceConnectorNode
func (SourceConnectorNode) GetMetrics ¶
func (o SourceConnectorNode) GetMetrics() []any
func (SourceConnectorNode) GetStreamContext ¶
func (o SourceConnectorNode) GetStreamContext() api.StreamContext
func (*SourceConnectorNode) Open ¶
func (m *SourceConnectorNode) Open(ctx api.StreamContext, ctrlCh chan<- error)
Open will be invoked by topo. It starts reading data.
func (SourceConnectorNode) RemoveMetrics ¶
func (o SourceConnectorNode) RemoveMetrics(ruleId string)
func (*SourceConnectorNode) Run ¶
func (m *SourceConnectorNode) Run(ctx api.StreamContext, ctrlCh chan<- error)
type SourceInstanceNode ¶
type SourceNode ¶
type SourceNode struct { IsWildcard bool IsSchemaless bool // contains filtered or unexported fields }
func NewSourceNode ¶
func NewSourceNode(name string, st ast.StreamType, op UnOperation, options *ast.Options, rOptions *api.RuleOption, isWildcard, isSchemaless bool, schema map[string]*ast.JsonStreamField) *SourceNode
func (SourceNode) GetMetrics ¶
func (o SourceNode) GetMetrics() []any
func (*SourceNode) GetSource ¶
func (m *SourceNode) GetSource() api.Source
func (SourceNode) GetStreamContext ¶
func (o SourceNode) GetStreamContext() api.StreamContext
func (*SourceNode) Open ¶
func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error)
func (SourceNode) RemoveMetrics ¶
func (o SourceNode) RemoveMetrics(ruleId string)
func (*SourceNode) SetProps ¶
func (m *SourceNode) SetProps(props map[string]interface{})
type SwitchConfig ¶
type SwitchNode ¶
type SwitchNode struct {
// contains filtered or unexported fields
}
func NewSwitchNode ¶
func NewSwitchNode(name string, conf *SwitchConfig, options *api.RuleOption) (*SwitchNode, error)
func (SwitchNode) AddInputCount ¶
func (o SwitchNode) AddInputCount()
func (*SwitchNode) AddOutput ¶
func (n *SwitchNode) AddOutput(output chan<- interface{}, name string) error
AddOutput SwitchNode overrides the defaultSinkNode's AddOutput to add output to the outputNodes SwitchNode itself has multiple outlets defined by the outputNodes. This default function will add the output to the first outlet
func (*SwitchNode) Exec ¶
func (n *SwitchNode) Exec(ctx api.StreamContext, errCh chan<- error)
func (*SwitchNode) GetEmitter ¶
func (n *SwitchNode) GetEmitter(outputIndex int) api.Emitter
GetEmitter returns the nth emitter of the node. SwtichNode is the only node that has multiple emitters In planner graph, fromNodes is a multi-dim array, switch node is the only node that could be in the second dim The dim is the index
func (SwitchNode) GetInputCount ¶
func (o SwitchNode) GetInputCount() int
func (SwitchNode) SetBarrierHandler ¶
func (o SwitchNode) SetBarrierHandler(bh checkpoint.BarrierHandler)
type UnFunc ¶
type UnFunc func(api.StreamContext, interface{}) interface{}
UnFunc implements UnOperation as type func (context.Context, interface{})
func (UnFunc) Apply ¶
func (f UnFunc) Apply(ctx api.StreamContext, data interface{}) interface{}
Apply implements UnOperation.Apply method
type UnOperation ¶
type UnOperation interface {
Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, afv *xsql.AggregateFunctionValuer) interface{}
}
UnOperation interface represents unary operations (i.e. Map, Filter, etc)
type UnaryOperator ¶
type UnaryOperator struct {
// contains filtered or unexported fields
}
func New ¶
func New(name string, options *api.RuleOption) *UnaryOperator
New NewUnary creates *UnaryOperator value
func (UnaryOperator) AddInputCount ¶
func (o UnaryOperator) AddInputCount()
func (*UnaryOperator) Exec ¶
func (o *UnaryOperator) Exec(ctx api.StreamContext, errCh chan<- error)
Exec is the entry point for the executor
func (UnaryOperator) GetInputCount ¶
func (o UnaryOperator) GetInputCount() int
func (UnaryOperator) SetBarrierHandler ¶
func (o UnaryOperator) SetBarrierHandler(bh checkpoint.BarrierHandler)
func (*UnaryOperator) SetOperation ¶
func (o *UnaryOperator) SetOperation(op UnOperation)
SetOperation sets the executor operation
type WatermarkOp ¶
type WatermarkOp struct {
// contains filtered or unexported fields
}
WatermarkOp is used when event time is enabled. It is used to align the event time of the input streams It sends out the data in time order with watermark.
func NewWatermarkOp ¶
func NewWatermarkOp(name string, sendWatermark bool, streams []string, options *api.RuleOption) *WatermarkOp
func (WatermarkOp) AddInputCount ¶
func (o WatermarkOp) AddInputCount()
func (*WatermarkOp) Exec ¶
func (w *WatermarkOp) Exec(ctx api.StreamContext, errCh chan<- error)
func (WatermarkOp) GetInputCount ¶
func (o WatermarkOp) GetInputCount() int
func (WatermarkOp) SetBarrierHandler ¶
func (o WatermarkOp) SetBarrierHandler(bh checkpoint.BarrierHandler)
type WindowConfig ¶
type WindowOperator ¶
type WindowOperator struct {
// contains filtered or unexported fields
}
func NewWindowOp ¶
func NewWindowOp(name string, w WindowConfig, options *api.RuleOption) (*WindowOperator, error)
func (WindowOperator) AddInputCount ¶
func (o WindowOperator) AddInputCount()
func (*WindowOperator) Exec ¶
func (o *WindowOperator) Exec(ctx api.StreamContext, errCh chan<- error)
Exec is the entry point for the executor input: *xsql.Tuple from preprocessor output: xsql.WindowTuplesSet
func (WindowOperator) GetInputCount ¶
func (o WindowOperator) GetInputCount() int
func (WindowOperator) SetBarrierHandler ¶
func (o WindowOperator) SetBarrierHandler(bh checkpoint.BarrierHandler)