Documentation
¶
Index ¶
- Constants
- func LookupPing(lookupType string, config map[string]interface{}) error
- func SinkPing(sinkType string, config map[string]any) error
- func SourcePing(sourceType string, config map[string]any) error
- type BatchOp
- type CacheOp
- type Collector
- type CompNode
- type CompressOp
- func (o CompressOp) AddInputCount()
- func (o *CompressOp) Exec(ctx api.StreamContext, errCh chan<- error)
- func (o CompressOp) GetInput() (chan<- any, string)
- func (o CompressOp) GetInputCount() int
- func (o CompressOp) SetBarrierHandler(bh checkpoint.BarrierHandler)
- func (o *CompressOp) Worker(_ api.StreamContext, item any) []any
- type DataSinkNode
- type DataSourceNode
- type DecodeOp
- func (o DecodeOp) AddInputCount()
- func (o *DecodeOp) AttachSchema(ctx api.StreamContext, dataSource string, ...)
- func (o *DecodeOp) DetachSchema(ctx api.StreamContext, ruleId string)
- func (o *DecodeOp) Exec(ctx api.StreamContext, errCh chan<- error)
- func (o DecodeOp) GetInput() (chan<- any, string)
- func (o DecodeOp) GetInputCount() int
- func (o *DecodeOp) PayloadBatchDecodeWorker(ctx api.StreamContext, item any) []any
- func (o *DecodeOp) PayloadDecodeWorker(ctx api.StreamContext, item any) []any
- func (o DecodeOp) SetBarrierHandler(bh checkpoint.BarrierHandler)
- func (o *DecodeOp) Worker(ctx api.StreamContext, item any) []any
- type DecompressOp
- func (o DecompressOp) AddInputCount()
- func (o *DecompressOp) Exec(ctx api.StreamContext, errCh chan<- error)
- func (o DecompressOp) GetInput() (chan<- any, string)
- func (o DecompressOp) GetInputCount() int
- func (o DecompressOp) SetBarrierHandler(bh checkpoint.BarrierHandler)
- func (o *DecompressOp) Worker(_ api.StreamContext, item any) []any
- type DedupTriggerNode
- type Emitter
- type EncodeOp
- func (o EncodeOp) AddInputCount()
- func (o *EncodeOp) Exec(ctx api.StreamContext, errCh chan<- error)
- func (o EncodeOp) GetInput() (chan<- any, string)
- func (o EncodeOp) GetInputCount() int
- func (o EncodeOp) SetBarrierHandler(bh checkpoint.BarrierHandler)
- func (o *EncodeOp) Worker(ctx api.StreamContext, item any) []any
- type EncryptNode
- func (o EncryptNode) AddInputCount()
- func (o *EncryptNode) Exec(ctx api.StreamContext, errCh chan<- error)
- func (o EncryptNode) GetInput() (chan<- any, string)
- func (o EncryptNode) GetInputCount() int
- func (o EncryptNode) SetBarrierHandler(bh checkpoint.BarrierHandler)
- func (o *EncryptNode) Worker(_ api.StreamContext, item any) []any
- type EventTimeTrigger
- type JoinAlignNode
- type LookupConf
- type LookupNode
- type MergeableTopo
- type MetricNode
- type OperatorNode
- type PriorityQueue
- type RateLimitOp
- func (o RateLimitOp) AddInputCount()
- func (o *RateLimitOp) AttachSchema(ctx api.StreamContext, dataSource string, ...)
- func (o *RateLimitOp) DetachSchema(ctx api.StreamContext, ruleId string)
- func (o *RateLimitOp) Exec(ctx api.StreamContext, errCh chan<- error)
- func (o RateLimitOp) GetInput() (chan<- any, string)
- func (o RateLimitOp) GetInputCount() int
- func (o RateLimitOp) SetBarrierHandler(bh checkpoint.BarrierHandler)
- type SchemaNode
- type SinkConf
- type SinkNode
- func (o SinkNode) AddInputCount()
- func (s *SinkNode) Exec(ctx api.StreamContext, errCh chan<- error)
- func (o SinkNode) GetInput() (chan<- any, string)
- func (o SinkNode) GetInputCount() int
- func (o SinkNode) SetBarrierHandler(bh checkpoint.BarrierHandler)
- func (s *SinkNode) SetResendOutput(output chan<- any)
- type SourceInstanceNode
- type SourceNode
- func (o SourceNode) AddOutput(output chan<- any, name string) error
- func (o SourceNode) Broadcast(val any)
- func (o SourceNode) BroadcastCustomized(val any, broadcastFunc func(val any))
- func (o SourceNode) Close()
- func (o SourceNode) GetMetrics() []any
- func (o SourceNode) GetName() string
- func (o SourceNode) GetStreamContext() api.StreamContext
- func (m *SourceNode) Open(ctx api.StreamContext, ctrlCh chan<- error)
- func (o SourceNode) RemoveMetrics(ruleId string)
- func (o SourceNode) RemoveOutput(name string) error
- func (m *SourceNode) Rewind(ctx api.StreamContext) error
- func (m *SourceNode) Run(ctx api.StreamContext, ctrlCh chan<- error)
- func (o SourceNode) SetQos(qos def.Qos)
- type SwitchConfig
- type SwitchNode
- func (o SwitchNode) AddInputCount()
- func (n *SwitchNode) AddOutput(output chan<- interface{}, name string) error
- func (n *SwitchNode) Exec(ctx api.StreamContext, errCh chan<- error)
- func (n *SwitchNode) GetEmitter(outputIndex int) Emitter
- func (o SwitchNode) GetInput() (chan<- any, string)
- func (o SwitchNode) GetInputCount() int
- func (o SwitchNode) SetBarrierHandler(bh checkpoint.BarrierHandler)
- type TopNode
- type TransformOp
- func (o TransformOp) AddInputCount()
- func (t *TransformOp) Exec(ctx api.StreamContext, errCh chan<- error)
- func (o TransformOp) GetInput() (chan<- any, string)
- func (o TransformOp) GetInputCount() int
- func (o TransformOp) SetBarrierHandler(bh checkpoint.BarrierHandler)
- func (t *TransformOp) Worker(ctx api.StreamContext, item any) []any
- type TriggerRequest
- type TupleList
- type UnFunc
- type UnOperation
- type UnaryOperator
- func (o UnaryOperator) AddInputCount()
- func (o *UnaryOperator) Exec(ctx api.StreamContext, errCh chan<- error)
- func (o UnaryOperator) GetInput() (chan<- any, string)
- func (o UnaryOperator) GetInputCount() int
- func (o UnaryOperator) SetBarrierHandler(bh checkpoint.BarrierHandler)
- func (o *UnaryOperator) SetOperation(op UnOperation)
- type WatermarkOp
- type WindowConfig
- type WindowOperator
- func (o WindowOperator) AddInputCount()
- func (o *WindowOperator) Close()
- func (o *WindowOperator) Exec(ctx api.StreamContext, errCh chan<- error)
- func (o WindowOperator) GetInput() (chan<- any, string)
- func (o WindowOperator) GetInputCount() int
- func (o WindowOperator) SetBarrierHandler(bh checkpoint.BarrierHandler)
Constants ¶
const ( WatermarkKey = "$$wartermark" EventInputKey = "$$eventinputs" StreamWMKey = "$$streamwms" )
const ( WindowInputsKey = "$$windowInputs" TriggerTimeKey = "$$triggerTime" MsgCountKey = "$$msgCount" )
const BatchKey = "$$batchInputs"
const (
OffsetKey = "$$offset"
)
Variables ¶
This section is empty.
Functions ¶
func LookupPing ¶
Types ¶
type BatchOp ¶
type BatchOp struct {
// contains filtered or unexported fields
}
func NewBatchOp ¶
func (BatchOp) AddInputCount ¶
func (o BatchOp) AddInputCount()
func (BatchOp) GetInputCount ¶
func (o BatchOp) GetInputCount() int
func (BatchOp) SetBarrierHandler ¶
func (o BatchOp) SetBarrierHandler(bh checkpoint.BarrierHandler)
type CacheOp ¶
type CacheOp struct {
// contains filtered or unexported fields
}
CacheOp receives tuples and decide to send through or save to disk. Run right before sink Immutable: true Input: any (mostly MessageTuple/MessageTupleList, may receive RawTuple after transformOp) Special validation: one output only
func NewCacheOp ¶
func NewCacheOp(ctx api.StreamContext, name string, rOpt *def.RuleOption, sc *conf.SinkConf) (*CacheOp, error)
func (CacheOp) AddInputCount ¶
func (o CacheOp) AddInputCount()
func (*CacheOp) Exec ¶
func (s *CacheOp) Exec(ctx api.StreamContext, errCh chan<- error)
Exec ingest data and send through. If channel full, save data to disk cache and start send timer Once all cache sent, stop send timer
func (CacheOp) GetInputCount ¶
func (o CacheOp) GetInputCount() int
func (CacheOp) SetBarrierHandler ¶
func (o CacheOp) SetBarrierHandler(bh checkpoint.BarrierHandler)
type CompNode ¶
CompNode is a composite node. For implicit splitted nodes For example, sink node or source node may be implemented internally as a collection of connected nodes
type CompressOp ¶
type CompressOp struct {
// contains filtered or unexported fields
}
func NewCompressOp ¶
func NewCompressOp(name string, rOpt *def.RuleOption, compressMethod string) (*CompressOp, error)
func (CompressOp) AddInputCount ¶
func (o CompressOp) AddInputCount()
func (*CompressOp) Exec ¶
func (o *CompressOp) Exec(ctx api.StreamContext, errCh chan<- error)
func (CompressOp) GetInputCount ¶
func (o CompressOp) GetInputCount() int
func (CompressOp) SetBarrierHandler ¶
func (o CompressOp) SetBarrierHandler(bh checkpoint.BarrierHandler)
func (*CompressOp) Worker ¶
func (o *CompressOp) Worker(_ api.StreamContext, item any) []any
type DataSinkNode ¶
type DataSinkNode interface { TopNode MetricNode Collector Exec(api.StreamContext, chan<- error) GetStreamContext() api.StreamContext GetInputCount() int AddInputCount() SetQos(def.Qos) SetBarrierHandler(checkpoint.BarrierHandler) }
type DataSourceNode ¶
type DataSourceNode interface { TopNode MetricNode Emitter Open(ctx api.StreamContext, errCh chan<- error) }
type DecodeOp ¶
type DecodeOp struct {
// contains filtered or unexported fields
}
DecodeOp manages the format decoding (employ schema) and sending frequency (for batch decode, like a json array)
func NewDecodeOp ¶
func NewDecodeOp(ctx api.StreamContext, forPayload bool, name, StreamName string, rOpt *def.RuleOption, schema map[string]*ast.JsonStreamField, props map[string]any) (*DecodeOp, error)
func (DecodeOp) AddInputCount ¶
func (o DecodeOp) AddInputCount()
func (*DecodeOp) AttachSchema ¶
func (o *DecodeOp) AttachSchema(ctx api.StreamContext, dataSource string, schema map[string]*ast.JsonStreamField, isWildcard bool)
func (*DecodeOp) DetachSchema ¶
func (o *DecodeOp) DetachSchema(ctx api.StreamContext, ruleId string)
func (*DecodeOp) Exec ¶
func (o *DecodeOp) Exec(ctx api.StreamContext, errCh chan<- error)
Exec decode op receives raw data and converts it to message
func (DecodeOp) GetInputCount ¶
func (o DecodeOp) GetInputCount() int
func (*DecodeOp) PayloadBatchDecodeWorker ¶
func (o *DecodeOp) PayloadBatchDecodeWorker(ctx api.StreamContext, item any) []any
PayloadBatchDecodeWorker deals with payload like
{ "ts": 123456, "batchField": [ {"payloadField":"data","otherField":1}, {"payloadField":"data2","otherField":2} ] }
It merges all payload result into one
{ "ts": 123456, // parsed fields are merged "parsedField": 1, "parsedField": 2, // other fields also merged and keep the latest "otherField": 2 }
If parse result is a list, it will also merge them in
func (*DecodeOp) PayloadDecodeWorker ¶
func (o *DecodeOp) PayloadDecodeWorker(ctx api.StreamContext, item any) []any
PayloadDecodeWorker each input has one message with the payload field to decode
{ "payloadField":"data","otherField":1 } { // parsed fields "parsedField": 1, "parsedField2": 2, // keep the original field if in schema "payloadField":"data", "otherField":1 }
If parse result is a list, it will also output a list
func (DecodeOp) SetBarrierHandler ¶
func (o DecodeOp) SetBarrierHandler(bh checkpoint.BarrierHandler)
type DecompressOp ¶
type DecompressOp struct {
// contains filtered or unexported fields
}
func NewDecompressOp ¶
func NewDecompressOp(name string, rOpt *def.RuleOption, compressMethod string) (*DecompressOp, error)
func (DecompressOp) AddInputCount ¶
func (o DecompressOp) AddInputCount()
func (*DecompressOp) Exec ¶
func (o *DecompressOp) Exec(ctx api.StreamContext, errCh chan<- error)
func (DecompressOp) GetInputCount ¶
func (o DecompressOp) GetInputCount() int
func (DecompressOp) SetBarrierHandler ¶
func (o DecompressOp) SetBarrierHandler(bh checkpoint.BarrierHandler)
func (*DecompressOp) Worker ¶
func (o *DecompressOp) Worker(_ api.StreamContext, item any) []any
type DedupTriggerNode ¶
type DedupTriggerNode struct {
// contains filtered or unexported fields
}
func NewDedupTriggerNode ¶
func NewDedupTriggerNode(name string, options *def.RuleOption, aliasName string, startField string, endField string, nowField string, expire int64) *DedupTriggerNode
func (DedupTriggerNode) AddInputCount ¶
func (o DedupTriggerNode) AddInputCount()
func (*DedupTriggerNode) Exec ¶
func (w *DedupTriggerNode) Exec(ctx api.StreamContext, errCh chan<- error)
func (DedupTriggerNode) GetInputCount ¶
func (o DedupTriggerNode) GetInputCount() int
func (DedupTriggerNode) SetBarrierHandler ¶
func (o DedupTriggerNode) SetBarrierHandler(bh checkpoint.BarrierHandler)
type EncodeOp ¶
type EncodeOp struct {
// contains filtered or unexported fields
}
EncodeOp converts tuple to raw bytes according to the FORMAT property Immutable: false Input: any (mostly MessageTuple/SinkTupleList, may receive RawTuple after transformOp Output: RawTuple
func NewEncodeOp ¶
func NewEncodeOp(ctx api.StreamContext, name string, rOpt *def.RuleOption, sc *SinkConf) (*EncodeOp, error)
func (EncodeOp) AddInputCount ¶
func (o EncodeOp) AddInputCount()
func (*EncodeOp) Exec ¶
func (o *EncodeOp) Exec(ctx api.StreamContext, errCh chan<- error)
Exec decode op receives map/[]map and converts it to bytes. If receiving bytes, just return it.
func (EncodeOp) GetInputCount ¶
func (o EncodeOp) GetInputCount() int
func (EncodeOp) SetBarrierHandler ¶
func (o EncodeOp) SetBarrierHandler(bh checkpoint.BarrierHandler)
type EncryptNode ¶
type EncryptNode struct {
// contains filtered or unexported fields
}
EncryptNode encrypt raw bytes Immutable: false Input: RawTuple Output: RawTuple
func NewEncryptOp ¶
func NewEncryptOp(name string, rOpt *def.RuleOption, encryptMethod string) (*EncryptNode, error)
func (EncryptNode) AddInputCount ¶
func (o EncryptNode) AddInputCount()
func (*EncryptNode) Exec ¶
func (o *EncryptNode) Exec(ctx api.StreamContext, errCh chan<- error)
func (EncryptNode) GetInputCount ¶
func (o EncryptNode) GetInputCount() int
func (EncryptNode) SetBarrierHandler ¶
func (o EncryptNode) SetBarrierHandler(bh checkpoint.BarrierHandler)
func (*EncryptNode) Worker ¶
func (o *EncryptNode) Worker(_ api.StreamContext, item any) []any
type EventTimeTrigger ¶
type EventTimeTrigger struct {
// contains filtered or unexported fields
}
EventTimeTrigger scans the input tuples and find out the tuples in the current window The inputs are sorted by watermark op
func NewEventTimeTrigger ¶
func NewEventTimeTrigger(window *WindowConfig) (*EventTimeTrigger, error)
type JoinAlignNode ¶
type JoinAlignNode struct {
// contains filtered or unexported fields
}
JoinAlignNode will block the stream and buffer all the table tuples. Once buffered, it will combine the later input with the buffer The input for batch table MUST be *WindowTuples
func NewJoinAlignNode ¶
func NewJoinAlignNode(name string, emitters []string, sizes []int, options *def.RuleOption) (*JoinAlignNode, error)
func (JoinAlignNode) AddInputCount ¶
func (o JoinAlignNode) AddInputCount()
func (*JoinAlignNode) Exec ¶
func (n *JoinAlignNode) Exec(ctx api.StreamContext, errCh chan<- error)
func (JoinAlignNode) GetInputCount ¶
func (o JoinAlignNode) GetInputCount() int
func (JoinAlignNode) SetBarrierHandler ¶
func (o JoinAlignNode) SetBarrierHandler(bh checkpoint.BarrierHandler)
type LookupConf ¶
type LookupConf struct { Cache bool `json:"cache"` CacheTTL cast.DurationConf `json:"cacheTtl"` CacheMissingKey bool `json:"cacheMissingKey"` }
type LookupNode ¶
type LookupNode struct {
// contains filtered or unexported fields
}
LookupNode will look up the data from the external source when receiving an event
func NewLookupNode ¶
func (LookupNode) AddInputCount ¶
func (o LookupNode) AddInputCount()
func (*LookupNode) Exec ¶
func (n *LookupNode) Exec(ctx api.StreamContext, errCh chan<- error)
func (LookupNode) GetInputCount ¶
func (o LookupNode) GetInputCount() int
func (LookupNode) SetBarrierHandler ¶
func (o LookupNode) SetBarrierHandler(bh checkpoint.BarrierHandler)
type MergeableTopo ¶
type MergeableTopo interface { GetSource() DataSourceNode // MergeSrc Add child topo as the source with following operators MergeSrc(parentTopo *def.PrintableTopo) // LinkTopo Add printable topo link from the parent topo to the child topo LinkTopo(parentTopo *def.PrintableTopo, parentJointName string) // SubMetrics return the metrics of the sub nodes SubMetrics() ([]string, []any) // Close notifies subtopo to deref Close(ctx api.StreamContext, ruleId string, runId int) }
type MetricNode ¶
type OperatorNode ¶
type OperatorNode interface { DataSinkNode Emitter Broadcast(data interface{}) }
type PriorityQueue ¶
type PriorityQueue []*TriggerRequest
func (*PriorityQueue) Peek ¶
func (pq *PriorityQueue) Peek() *TriggerRequest
func (*PriorityQueue) Pop ¶
func (pq *PriorityQueue) Pop() *TriggerRequest
Pop removes and returns the item with the highest priority from the priority queue
func (*PriorityQueue) Push ¶
func (pq *PriorityQueue) Push(x *TriggerRequest)
Push adds an item to the priority queue
type RateLimitOp ¶
type RateLimitOp struct {
// contains filtered or unexported fields
}
RateLimitOp handle messages at a regular rate, ignoring messages that arrive too quickly, only keep the most recent message. (default strategy) If strategy is set, send through all messages as well as trigger signal and let strategy node handle the merge. Otherwise, send the most recent message at trigger time Input: Raw Output: Raw as it is Concurrency: false
func NewRateLimitOp ¶
func NewRateLimitOp(ctx api.StreamContext, name string, rOpt *def.RuleOption, schema map[string]*ast.JsonStreamField, props map[string]any) (*RateLimitOp, error)
func (RateLimitOp) AddInputCount ¶
func (o RateLimitOp) AddInputCount()
func (*RateLimitOp) AttachSchema ¶
func (o *RateLimitOp) AttachSchema(ctx api.StreamContext, dataSource string, schema map[string]*ast.JsonStreamField, isWildcard bool)
func (*RateLimitOp) DetachSchema ¶
func (o *RateLimitOp) DetachSchema(ctx api.StreamContext, ruleId string)
func (*RateLimitOp) Exec ¶
func (o *RateLimitOp) Exec(ctx api.StreamContext, errCh chan<- error)
Exec ratelimit op deal with 3 merge strategy - latest - merge by mergeField (when format and mergeField is set and no payload format) - merge by merger (when format, payloadFormat and merger is set)
func (RateLimitOp) GetInputCount ¶
func (o RateLimitOp) GetInputCount() int
func (RateLimitOp) SetBarrierHandler ¶
func (o RateLimitOp) SetBarrierHandler(bh checkpoint.BarrierHandler)
type SchemaNode ¶
type SchemaNode interface { // AttachSchema attach the schema to the node. The parameters are ruleId, sourceName, schema, whether is wildcard AttachSchema(api.StreamContext, string, map[string]*ast.JsonStreamField, bool) // DetachSchema detach the schema from the node. The parameters are ruleId DetachSchema(api.StreamContext, string) }
type SinkConf ¶
type SinkConf struct { Concurrency int `json:"concurrency"` Omitempty bool `json:"omitIfEmpty"` SendSingle bool `json:"sendSingle"` DataTemplate string `json:"dataTemplate"` Format string `json:"format"` SchemaId string `json:"schemaId"` Delimiter string `json:"delimiter"` BufferLength int `json:"bufferLength"` Fields []string `json:"fields"` DataField string `json:"dataField"` BatchSize int `json:"batchSize"` LingerInterval cast.DurationConf `json:"lingerInterval"` Compression string `json:"compression"` Encryption string `json:"encryption"` HasHeader bool `json:"hasHeader"` conf.SinkConf }
type SinkNode ¶
type SinkNode struct {
// contains filtered or unexported fields
}
SinkNode represents a sink node that collects data from the stream It typically only do connect and send. It does not do any processing. This node is the skeleton. It will refer to a sink instance to do the real work.
func NewBytesSinkNode ¶
func NewBytesSinkNode(ctx api.StreamContext, name string, sink api.BytesCollector, rOpt def.RuleOption, eoflimit int, sc *conf.SinkConf, isRetry bool) (*SinkNode, error)
NewBytesSinkNode creates a sink node that collects data from the stream. Do some static validation
func NewTupleSinkNode ¶
func NewTupleSinkNode(ctx api.StreamContext, name string, sink api.TupleCollector, rOpt def.RuleOption, eoflimit int, sc *conf.SinkConf, isRetry bool) (*SinkNode, error)
NewTupleSinkNode creates a sink node that collects data from the stream. Do some static validation
func (SinkNode) AddInputCount ¶
func (o SinkNode) AddInputCount()
func (SinkNode) GetInputCount ¶
func (o SinkNode) GetInputCount() int
func (SinkNode) SetBarrierHandler ¶
func (o SinkNode) SetBarrierHandler(bh checkpoint.BarrierHandler)
func (*SinkNode) SetResendOutput ¶
type SourceInstanceNode ¶
type SourceNode ¶
type SourceNode struct {
// contains filtered or unexported fields
}
SourceNode is a node that connects to an external source The SourceNode is an all-in-one source node that support connect and decode and more. The SourceConnectorNode is a node that only connects to external source and does not decode.
func NewSourceNode ¶
func NewSourceNode(ctx api.StreamContext, name string, ss api.Source, props map[string]any, rOpt *def.RuleOption) (*SourceNode, error)
NewSourceNode creates a SourceConnectorNode
func (SourceNode) BroadcastCustomized ¶
func (SourceNode) GetMetrics ¶
func (o SourceNode) GetMetrics() []any
func (SourceNode) GetStreamContext ¶
func (o SourceNode) GetStreamContext() api.StreamContext
func (*SourceNode) Open ¶
func (m *SourceNode) Open(ctx api.StreamContext, ctrlCh chan<- error)
Open will be invoked by topo. It starts reading data.
func (SourceNode) RemoveMetrics ¶
func (o SourceNode) RemoveMetrics(ruleId string)
func (SourceNode) RemoveOutput ¶
func (*SourceNode) Rewind ¶
func (m *SourceNode) Rewind(ctx api.StreamContext) error
func (*SourceNode) Run ¶
func (m *SourceNode) Run(ctx api.StreamContext, ctrlCh chan<- error)
Run Subscribe could be a long-running function
type SwitchConfig ¶
type SwitchNode ¶
type SwitchNode struct {
// contains filtered or unexported fields
}
func NewSwitchNode ¶
func NewSwitchNode(name string, conf *SwitchConfig, options *def.RuleOption) (*SwitchNode, error)
func (SwitchNode) AddInputCount ¶
func (o SwitchNode) AddInputCount()
func (*SwitchNode) AddOutput ¶
func (n *SwitchNode) AddOutput(output chan<- interface{}, name string) error
AddOutput SwitchNode overrides the defaultSinkNode's AddOutput to add output to the outputNodes SwitchNode itself has multiple outlets defined by the outputNodes. This default function will add the output to the first outlet
func (*SwitchNode) Exec ¶
func (n *SwitchNode) Exec(ctx api.StreamContext, errCh chan<- error)
func (*SwitchNode) GetEmitter ¶
func (n *SwitchNode) GetEmitter(outputIndex int) Emitter
GetEmitter returns the nth emitter of the node. SwtichNode is the only node that has multiple emitters In planner graph, fromNodes is a multi-dim array, switch node is the only node that could be in the second dim The dim is the index
func (SwitchNode) GetInputCount ¶
func (o SwitchNode) GetInputCount() int
func (SwitchNode) SetBarrierHandler ¶
func (o SwitchNode) SetBarrierHandler(bh checkpoint.BarrierHandler)
type TransformOp ¶
type TransformOp struct {
// contains filtered or unexported fields
}
TransformOp transforms the row/collection to sink tuples Immutable: false Change trigger frequency: true, by sendSingle property Input: Row/Collection Output: MessageTuple, SinkTupleList, RawTuple
func NewTransformOp ¶
func NewTransformOp(name string, rOpt *def.RuleOption, sc *SinkConf, templates []string) (*TransformOp, error)
NewTransformOp creates a transform node sink conf should have been validated before
func (TransformOp) AddInputCount ¶
func (o TransformOp) AddInputCount()
func (*TransformOp) Exec ¶
func (t *TransformOp) Exec(ctx api.StreamContext, errCh chan<- error)
func (TransformOp) GetInputCount ¶
func (o TransformOp) GetInputCount() int
func (TransformOp) SetBarrierHandler ¶
func (o TransformOp) SetBarrierHandler(bh checkpoint.BarrierHandler)
func (*TransformOp) Worker ¶
func (t *TransformOp) Worker(ctx api.StreamContext, item any) []any
Worker do not need to process error and control messages
type TriggerRequest ¶
type TriggerRequest struct {
// contains filtered or unexported fields
}
type UnFunc ¶
type UnFunc func(api.StreamContext, interface{}) interface{}
UnFunc implements UnOperation as type func (context.Context, interface{})
func (UnFunc) Apply ¶
func (f UnFunc) Apply(ctx api.StreamContext, data interface{}) interface{}
Apply implements UnOperation.Apply method
type UnOperation ¶
type UnOperation interface {
Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, afv *xsql.AggregateFunctionValuer) interface{}
}
UnOperation interface represents unary operations (i.e. Map, Filter, etc)
type UnaryOperator ¶
type UnaryOperator struct {
// contains filtered or unexported fields
}
func New ¶
func New(name string, options *def.RuleOption) *UnaryOperator
New NewUnary creates *UnaryOperator value
func (UnaryOperator) AddInputCount ¶
func (o UnaryOperator) AddInputCount()
func (*UnaryOperator) Exec ¶
func (o *UnaryOperator) Exec(ctx api.StreamContext, errCh chan<- error)
Exec is the entry point for the executor
func (UnaryOperator) GetInputCount ¶
func (o UnaryOperator) GetInputCount() int
func (UnaryOperator) SetBarrierHandler ¶
func (o UnaryOperator) SetBarrierHandler(bh checkpoint.BarrierHandler)
func (*UnaryOperator) SetOperation ¶
func (o *UnaryOperator) SetOperation(op UnOperation)
SetOperation sets the executor operation
type WatermarkOp ¶
type WatermarkOp struct {
// contains filtered or unexported fields
}
WatermarkOp is used when event time is enabled. It is used to align the event time of the input streams It sends out the data in time order with watermark.
func NewWatermarkOp ¶
func NewWatermarkOp(name string, sendWatermark bool, streams []string, options *def.RuleOption) *WatermarkOp
func (WatermarkOp) AddInputCount ¶
func (o WatermarkOp) AddInputCount()
func (*WatermarkOp) Exec ¶
func (w *WatermarkOp) Exec(ctx api.StreamContext, errCh chan<- error)
func (WatermarkOp) GetInputCount ¶
func (o WatermarkOp) GetInputCount() int
func (WatermarkOp) SetBarrierHandler ¶
func (o WatermarkOp) SetBarrierHandler(bh checkpoint.BarrierHandler)
type WindowConfig ¶
type WindowConfig struct { TriggerCondition ast.Expr StateFuncs []*ast.Call Type ast.WindowType // For time window Length time.Duration Interval time.Duration // If the interval is not set, it is equals to Length Delay time.Duration // For count window CountLength int CountInterval int RawInterval int TimeUnit ast.Token }
type WindowOperator ¶
type WindowOperator struct {
// contains filtered or unexported fields
}
func NewWindowOp ¶
func NewWindowOp(name string, w WindowConfig, options *def.RuleOption) (*WindowOperator, error)
func (WindowOperator) AddInputCount ¶
func (o WindowOperator) AddInputCount()
func (*WindowOperator) Close ¶
func (o *WindowOperator) Close()
func (*WindowOperator) Exec ¶
func (o *WindowOperator) Exec(ctx api.StreamContext, errCh chan<- error)
Exec is the entry point for the executor input: *xsql.Tuple from preprocessor output: xsql.WindowTuplesSet
func (WindowOperator) GetInputCount ¶
func (o WindowOperator) GetInputCount() int
func (WindowOperator) SetBarrierHandler ¶
func (o WindowOperator) SetBarrierHandler(bh checkpoint.BarrierHandler)
Source Files
¶
- batch_op.go
- cache_op.go
- compress_op.go
- concurrent.go
- contract.go
- decode_op.go
- decompress_op.go
- dedup_trigger_op.go
- encode_op.go
- encrypt_op.go
- event_window_trigger.go
- join_align_node.go
- lookup_node.go
- node.go
- operations.go
- props.go
- rate_limit.go
- sink_node.go
- source_node.go
- switch_node.go
- transform_op.go
- watermark_op.go
- window_op.go