node

package
v2.0.0-alpha.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 28, 2024 License: Apache-2.0 Imports: 35 Imported by: 0

Documentation

Index

Constants

View Source
const (
	WatermarkKey  = "$$wartermark"
	EventInputKey = "$$eventinputs"
	StreamWMKey   = "$$streamwms"
)
View Source
const (
	WindowInputsKey = "$$windowInputs"
	TriggerTimeKey  = "$$triggerTime"
	MsgCountKey     = "$$msgCount"
)
View Source
const BatchKey = "$$batchInputs"

Variables

This section is empty.

Functions

func SourcePing

func SourcePing(sourceType string, config map[string]interface{}) error

Types

type BatchOp

type BatchOp struct {
	// contains filtered or unexported fields
}

func NewBatchOp

func NewBatchOp(name string, rOpt *def.RuleOption, batchSize, lingerInterval int) (*BatchOp, error)

func (BatchOp) AddInputCount

func (o BatchOp) AddInputCount()

func (*BatchOp) Exec

func (b *BatchOp) Exec(ctx api.StreamContext, errCh chan<- error)

func (BatchOp) GetInput

func (o BatchOp) GetInput() (chan<- interface{}, string)

func (BatchOp) GetInputCount

func (o BatchOp) GetInputCount() int

func (BatchOp) SetBarrierHandler

func (o BatchOp) SetBarrierHandler(bh checkpoint.BarrierHandler)

type Collector

type Collector interface {
	GetInput() (chan<- interface{}, string)
}

type CompressOp

type CompressOp struct {
	// contains filtered or unexported fields
}

func NewCompressOp

func NewCompressOp(name string, rOpt *def.RuleOption, compressMethod string) (*CompressOp, error)

func (CompressOp) AddInputCount

func (o CompressOp) AddInputCount()

func (*CompressOp) Exec

func (o *CompressOp) Exec(ctx api.StreamContext, errCh chan<- error)

func (CompressOp) GetInput

func (o CompressOp) GetInput() (chan<- interface{}, string)

func (CompressOp) GetInputCount

func (o CompressOp) GetInputCount() int

func (CompressOp) SetBarrierHandler

func (o CompressOp) SetBarrierHandler(bh checkpoint.BarrierHandler)

func (*CompressOp) Worker

func (o *CompressOp) Worker(_ api.StreamContext, item any) []any

type DataSinkNode

type DataSinkNode interface {
	TopNode
	MetricNode
	Collector
	Exec(api.StreamContext, chan<- error)
	GetStreamContext() api.StreamContext
	GetInputCount() int
	AddInputCount()
	SetQos(def.Qos)
	SetBarrierHandler(checkpoint.BarrierHandler)
}

type DataSourceNode

type DataSourceNode interface {
	TopNode
	MetricNode
	Emitter
	Open(ctx api.StreamContext, errCh chan<- error)
}

type DecodeOp

type DecodeOp struct {
	// contains filtered or unexported fields
}

func NewDecodeOp

func NewDecodeOp(name, StreamName string, ruleId string, rOpt *def.RuleOption, options *ast.Options, isWildcard, isSchemaless bool, schema map[string]*ast.JsonStreamField) (*DecodeOp, error)

func (DecodeOp) AddInputCount

func (o DecodeOp) AddInputCount()

func (*DecodeOp) AttachSchema

func (o *DecodeOp) AttachSchema(ctx api.StreamContext, dataSource string, schema map[string]*ast.JsonStreamField, isWildcard bool)

func (*DecodeOp) DetachSchema

func (o *DecodeOp) DetachSchema(ruleId string)

func (*DecodeOp) Exec

func (o *DecodeOp) Exec(ctx api.StreamContext, errCh chan<- error)

Exec decode op receives raw data and converts it to message

func (DecodeOp) GetInput

func (o DecodeOp) GetInput() (chan<- interface{}, string)

func (DecodeOp) GetInputCount

func (o DecodeOp) GetInputCount() int

func (DecodeOp) SetBarrierHandler

func (o DecodeOp) SetBarrierHandler(bh checkpoint.BarrierHandler)

func (*DecodeOp) Worker

func (o *DecodeOp) Worker(ctx api.StreamContext, item any) []any

type DecompressOp

type DecompressOp struct {
	// contains filtered or unexported fields
}

func NewDecompressOp

func NewDecompressOp(name string, rOpt *def.RuleOption, compressMethod string) (*DecompressOp, error)

func (DecompressOp) AddInputCount

func (o DecompressOp) AddInputCount()

func (*DecompressOp) Exec

func (o *DecompressOp) Exec(ctx api.StreamContext, errCh chan<- error)

func (DecompressOp) GetInput

func (o DecompressOp) GetInput() (chan<- interface{}, string)

func (DecompressOp) GetInputCount

func (o DecompressOp) GetInputCount() int

func (DecompressOp) SetBarrierHandler

func (o DecompressOp) SetBarrierHandler(bh checkpoint.BarrierHandler)

func (*DecompressOp) Worker

func (o *DecompressOp) Worker(_ api.StreamContext, item any) []any

type DedupTriggerNode

type DedupTriggerNode struct {
	// contains filtered or unexported fields
}

func NewDedupTriggerNode

func NewDedupTriggerNode(name string, options *def.RuleOption, aliasName string, startField string, endField string, nowField string, expire int64) *DedupTriggerNode

func (DedupTriggerNode) AddInputCount

func (o DedupTriggerNode) AddInputCount()

func (*DedupTriggerNode) Exec

func (w *DedupTriggerNode) Exec(ctx api.StreamContext, errCh chan<- error)

func (DedupTriggerNode) GetInput

func (o DedupTriggerNode) GetInput() (chan<- interface{}, string)

func (DedupTriggerNode) GetInputCount

func (o DedupTriggerNode) GetInputCount() int

func (DedupTriggerNode) SetBarrierHandler

func (o DedupTriggerNode) SetBarrierHandler(bh checkpoint.BarrierHandler)

type Emitter

type Emitter interface {
	AddOutput(chan<- interface{}, string) error
}

type EncodeOp

type EncodeOp struct {
	// contains filtered or unexported fields
}

func NewEncodeOp

func NewEncodeOp(name string, rOpt *def.RuleOption, sc *SinkConf) (*EncodeOp, error)

func (EncodeOp) AddInputCount

func (o EncodeOp) AddInputCount()

func (*EncodeOp) Exec

func (o *EncodeOp) Exec(ctx api.StreamContext, errCh chan<- error)

Exec decode op receives map/[]map and converts it to bytes. If receiving bytes, just return it.

func (EncodeOp) GetInput

func (o EncodeOp) GetInput() (chan<- interface{}, string)

func (EncodeOp) GetInputCount

func (o EncodeOp) GetInputCount() int

func (EncodeOp) SetBarrierHandler

func (o EncodeOp) SetBarrierHandler(bh checkpoint.BarrierHandler)

func (*EncodeOp) Worker

func (o *EncodeOp) Worker(ctx api.StreamContext, item any) []any

type EncryptNode

type EncryptNode struct {
	// contains filtered or unexported fields
}

func NewEncryptOp

func NewEncryptOp(name string, rOpt *def.RuleOption, encryptMethod string) (*EncryptNode, error)

func (EncryptNode) AddInputCount

func (o EncryptNode) AddInputCount()

func (*EncryptNode) Exec

func (o *EncryptNode) Exec(ctx api.StreamContext, errCh chan<- error)

func (EncryptNode) GetInput

func (o EncryptNode) GetInput() (chan<- interface{}, string)

func (EncryptNode) GetInputCount

func (o EncryptNode) GetInputCount() int

func (EncryptNode) SetBarrierHandler

func (o EncryptNode) SetBarrierHandler(bh checkpoint.BarrierHandler)

func (*EncryptNode) Worker

func (o *EncryptNode) Worker(_ api.StreamContext, item any) []any

type EventTimeTrigger

type EventTimeTrigger struct {
	// contains filtered or unexported fields
}

EventTimeTrigger scans the input tuples and find out the tuples in the current window The inputs are sorted by watermark op

func NewEventTimeTrigger

func NewEventTimeTrigger(window *WindowConfig) (*EventTimeTrigger, error)

type JoinAlignNode

type JoinAlignNode struct {
	// contains filtered or unexported fields
}

JoinAlignNode will block the stream and buffer all the table tuples. Once buffered, it will combine the later input with the buffer The input for batch table MUST be *WindowTuples

func NewJoinAlignNode

func NewJoinAlignNode(name string, emitters []string, options *def.RuleOption) (*JoinAlignNode, error)

func (JoinAlignNode) AddInputCount

func (o JoinAlignNode) AddInputCount()

func (*JoinAlignNode) Exec

func (n *JoinAlignNode) Exec(ctx api.StreamContext, errCh chan<- error)

func (JoinAlignNode) GetInput

func (o JoinAlignNode) GetInput() (chan<- interface{}, string)

func (JoinAlignNode) GetInputCount

func (o JoinAlignNode) GetInputCount() int

func (JoinAlignNode) SetBarrierHandler

func (o JoinAlignNode) SetBarrierHandler(bh checkpoint.BarrierHandler)

type LookupConf

type LookupConf struct {
	Cache           bool `json:"cache"`
	CacheTTL        int  `json:"cacheTtl"`
	CacheMissingKey bool `json:"cacheMissingKey"`
}

type LookupNode

type LookupNode struct {
	// contains filtered or unexported fields
}

LookupNode will look up the data from the external source when receiving an event

func NewLookupNode

func NewLookupNode(name string, fields []string, keys []string, joinType ast.JoinType, vals []ast.Expr, srcOptions *ast.Options, options *def.RuleOption) (*LookupNode, error)

func (LookupNode) AddInputCount

func (o LookupNode) AddInputCount()

func (*LookupNode) Exec

func (n *LookupNode) Exec(ctx api.StreamContext, errCh chan<- error)

func (LookupNode) GetInput

func (o LookupNode) GetInput() (chan<- interface{}, string)

func (LookupNode) GetInputCount

func (o LookupNode) GetInputCount() int

func (LookupNode) SetBarrierHandler

func (o LookupNode) SetBarrierHandler(bh checkpoint.BarrierHandler)

type MergeableTopo

type MergeableTopo interface {
	GetSource() DataSourceNode
	// MergeSrc Add child topo as the source with following operators
	MergeSrc(parentTopo *def.PrintableTopo)
	// LinkTopo Add printable topo link from the parent topo to the child topo
	LinkTopo(parentTopo *def.PrintableTopo, parentJointName string)
	// SubMetrics return the metrics of the sub nodes
	SubMetrics() ([]string, []any)
	// Close notifies subtopo to deref
	Close(ruleId string)
}

type MetricNode

type MetricNode interface {
	GetMetrics() []any
	RemoveMetrics(ruleId string)
}

type OperatorNode

type OperatorNode interface {
	DataSinkNode
	Emitter
	Broadcast(data interface{})
}

type PriorityQueue

type PriorityQueue []*TriggerRequest

func (*PriorityQueue) Peek

func (pq *PriorityQueue) Peek() *TriggerRequest

func (*PriorityQueue) Pop

func (pq *PriorityQueue) Pop() *TriggerRequest

Pop removes and returns the item with the highest priority from the priority queue

func (*PriorityQueue) Push

func (pq *PriorityQueue) Push(x *TriggerRequest)

Push adds an item to the priority queue

type SchemaNode

type SchemaNode interface {
	// AttachSchema attach the schema to the node. The parameters are ruleId, sourceName, schema, whether is wildcard
	AttachSchema(api.StreamContext, string, map[string]*ast.JsonStreamField, bool)
	// DetachSchema detach the schema from the node. The parameters are ruleId
	DetachSchema(string)
}

type SinkConf

type SinkConf struct {
	Concurrency    int      `json:"concurrency"`
	Omitempty      bool     `json:"omitIfEmpty"`
	SendSingle     bool     `json:"sendSingle"`
	DataTemplate   string   `json:"dataTemplate"`
	Format         string   `json:"format"`
	SchemaId       string   `json:"schemaId"`
	Delimiter      string   `json:"delimiter"`
	BufferLength   int      `json:"bufferLength"`
	Fields         []string `json:"fields"`
	DataField      string   `json:"dataField"`
	BatchSize      int      `json:"batchSize"`
	LingerInterval int      `json:"lingerInterval"`
	Compression    string   `json:"compression"`
	Encryption     string   `json:"encryption"`
	conf.SinkConf
}

func ParseConf

func ParseConf(logger api.Logger, props map[string]any) (*SinkConf, error)

type SinkNode

type SinkNode struct {
	// contains filtered or unexported fields
}

SinkNode represents a sink node that collects data from the stream It typically only do connect and send. It does not do any processing. This node is the skeleton. It will refer to a sink instance to do the real work.

func NewBytesSinkNode

func NewBytesSinkNode(ctx api.StreamContext, name string, sink api.BytesCollector, rOpt *def.RuleOption, eoflimit int) (*SinkNode, error)

NewBytesSinkNode creates a sink node that collects data from the stream. Do some static validation

func NewTupleSinkNode

func NewTupleSinkNode(ctx api.StreamContext, name string, sink api.TupleCollector, rOpt *def.RuleOption, eoflimit int) (*SinkNode, error)

NewTupleSinkNode creates a sink node that collects data from the stream. Do some static validation

func (SinkNode) AddInputCount

func (o SinkNode) AddInputCount()

func (*SinkNode) Exec

func (s *SinkNode) Exec(ctx api.StreamContext, errCh chan<- error)

func (SinkNode) GetInput

func (o SinkNode) GetInput() (chan<- interface{}, string)

func (SinkNode) GetInputCount

func (o SinkNode) GetInputCount() int

func (SinkNode) SetBarrierHandler

func (o SinkNode) SetBarrierHandler(bh checkpoint.BarrierHandler)

type SourceInstanceNode

type SourceInstanceNode interface {
	GetSource() api.Source
}

type SourceNode

type SourceNode struct {
	// contains filtered or unexported fields
}

SourceNode is a node that connects to an external source The SourceNode is an all-in-one source node that support connect and decode and more. The SourceConnectorNode is a node that only connects to external source and does not decode.

func NewSourceNode

func NewSourceNode(ctx api.StreamContext, name string, ss api.Source, props map[string]any, rOpt *def.RuleOption) (*SourceNode, error)

NewSourceNode creates a SourceConnectorNode

func (SourceNode) AddOutput

func (o SourceNode) AddOutput(output chan<- interface{}, name string) error

func (SourceNode) Broadcast

func (o SourceNode) Broadcast(val interface{})

func (SourceNode) GetMetrics

func (o SourceNode) GetMetrics() []any

func (SourceNode) GetName

func (o SourceNode) GetName() string

func (SourceNode) GetStreamContext

func (o SourceNode) GetStreamContext() api.StreamContext

func (*SourceNode) Open

func (m *SourceNode) Open(ctx api.StreamContext, ctrlCh chan<- error)

Open will be invoked by topo. It starts reading data.

func (SourceNode) RemoveMetrics

func (o SourceNode) RemoveMetrics(ruleId string)

func (*SourceNode) Run

func (m *SourceNode) Run(ctx api.StreamContext, ctrlCh chan<- error)

Run Subscribe could be a long-running function

func (SourceNode) SetQos

func (o SourceNode) SetQos(qos def.Qos)

type SwitchConfig

type SwitchConfig struct {
	Cases            []ast.Expr
	StopAtFirstMatch bool
}

type SwitchNode

type SwitchNode struct {
	// contains filtered or unexported fields
}

func NewSwitchNode

func NewSwitchNode(name string, conf *SwitchConfig, options *def.RuleOption) (*SwitchNode, error)

func (SwitchNode) AddInputCount

func (o SwitchNode) AddInputCount()

func (*SwitchNode) AddOutput

func (n *SwitchNode) AddOutput(output chan<- interface{}, name string) error

AddOutput SwitchNode overrides the defaultSinkNode's AddOutput to add output to the outputNodes SwitchNode itself has multiple outlets defined by the outputNodes. This default function will add the output to the first outlet

func (*SwitchNode) Exec

func (n *SwitchNode) Exec(ctx api.StreamContext, errCh chan<- error)

func (*SwitchNode) GetEmitter

func (n *SwitchNode) GetEmitter(outputIndex int) Emitter

GetEmitter returns the nth emitter of the node. SwtichNode is the only node that has multiple emitters In planner graph, fromNodes is a multi-dim array, switch node is the only node that could be in the second dim The dim is the index

func (SwitchNode) GetInput

func (o SwitchNode) GetInput() (chan<- interface{}, string)

func (SwitchNode) GetInputCount

func (o SwitchNode) GetInputCount() int

func (SwitchNode) SetBarrierHandler

func (o SwitchNode) SetBarrierHandler(bh checkpoint.BarrierHandler)

type TopNode

type TopNode interface {
	GetName() string
}

type TransformOp

type TransformOp struct {
	// contains filtered or unexported fields
}

func NewTransformOp

func NewTransformOp(name string, rOpt *def.RuleOption, sc *SinkConf) (*TransformOp, error)

NewTransformOp creates a transform node sink conf should have been validated before

func (TransformOp) AddInputCount

func (o TransformOp) AddInputCount()

func (*TransformOp) Exec

func (t *TransformOp) Exec(ctx api.StreamContext, errCh chan<- error)

func (TransformOp) GetInput

func (o TransformOp) GetInput() (chan<- interface{}, string)

func (TransformOp) GetInputCount

func (o TransformOp) GetInputCount() int

func (TransformOp) SetBarrierHandler

func (o TransformOp) SetBarrierHandler(bh checkpoint.BarrierHandler)

func (*TransformOp) Worker

func (t *TransformOp) Worker(ctx api.StreamContext, item any) []any

Worker do not need to process error and control messages

type TriggerRequest

type TriggerRequest struct {
	// contains filtered or unexported fields
}

type TupleList

type TupleList struct {
	// contains filtered or unexported fields
}

func NewTupleList

func NewTupleList(tuples []*xsql.Tuple, windowSize int) (TupleList, error)

type UnFunc

type UnFunc func(api.StreamContext, interface{}) interface{}

UnFunc implements UnOperation as type func (context.Context, interface{})

func (UnFunc) Apply

func (f UnFunc) Apply(ctx api.StreamContext, data interface{}) interface{}

Apply implements UnOperation.Apply method

type UnOperation

type UnOperation interface {
	Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, afv *xsql.AggregateFunctionValuer) interface{}
}

UnOperation interface represents unary operations (i.e. Map, Filter, etc)

type UnaryOperator

type UnaryOperator struct {
	// contains filtered or unexported fields
}

func New

func New(name string, options *def.RuleOption) *UnaryOperator

New NewUnary creates *UnaryOperator value

func (UnaryOperator) AddInputCount

func (o UnaryOperator) AddInputCount()

func (*UnaryOperator) Exec

func (o *UnaryOperator) Exec(ctx api.StreamContext, errCh chan<- error)

Exec is the entry point for the executor

func (UnaryOperator) GetInput

func (o UnaryOperator) GetInput() (chan<- interface{}, string)

func (UnaryOperator) GetInputCount

func (o UnaryOperator) GetInputCount() int

func (UnaryOperator) SetBarrierHandler

func (o UnaryOperator) SetBarrierHandler(bh checkpoint.BarrierHandler)

func (*UnaryOperator) SetOperation

func (o *UnaryOperator) SetOperation(op UnOperation)

SetOperation sets the executor operation

type WatermarkOp

type WatermarkOp struct {
	// contains filtered or unexported fields
}

WatermarkOp is used when event time is enabled. It is used to align the event time of the input streams It sends out the data in time order with watermark.

func NewWatermarkOp

func NewWatermarkOp(name string, sendWatermark bool, streams []string, options *def.RuleOption) *WatermarkOp

func (WatermarkOp) AddInputCount

func (o WatermarkOp) AddInputCount()

func (*WatermarkOp) Exec

func (w *WatermarkOp) Exec(ctx api.StreamContext, errCh chan<- error)

func (WatermarkOp) GetInput

func (o WatermarkOp) GetInput() (chan<- interface{}, string)

func (WatermarkOp) GetInputCount

func (o WatermarkOp) GetInputCount() int

func (WatermarkOp) SetBarrierHandler

func (o WatermarkOp) SetBarrierHandler(bh checkpoint.BarrierHandler)

type WindowConfig

type WindowConfig struct {
	TriggerCondition ast.Expr
	StateFuncs       []*ast.Call
	Type             ast.WindowType
	Length           int64
	Interval         int64 // If the interval is not set, it is equals to Length
	Delay            int64
	RawInterval      int
	TimeUnit         ast.Token
}

type WindowOperator

type WindowOperator struct {
	// contains filtered or unexported fields
}

func NewWindowOp

func NewWindowOp(name string, w WindowConfig, options *def.RuleOption) (*WindowOperator, error)

func (WindowOperator) AddInputCount

func (o WindowOperator) AddInputCount()

func (*WindowOperator) Exec

func (o *WindowOperator) Exec(ctx api.StreamContext, errCh chan<- error)

Exec is the entry point for the executor input: *xsql.Tuple from preprocessor output: xsql.WindowTuplesSet

func (WindowOperator) GetInput

func (o WindowOperator) GetInput() (chan<- interface{}, string)

func (WindowOperator) GetInputCount

func (o WindowOperator) GetInputCount() int

func (WindowOperator) SetBarrierHandler

func (o WindowOperator) SetBarrierHandler(bh checkpoint.BarrierHandler)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL