Documentation ¶
Index ¶
- Constants
- Variables
- type Cache
- type CacheTuple
- type DefaultStatManager
- func (sm *DefaultStatManager) GetMetrics() []interface{}
- func (sm *DefaultStatManager) IncTotalExceptions()
- func (sm *DefaultStatManager) IncTotalRecordsIn()
- func (sm *DefaultStatManager) IncTotalRecordsOut()
- func (sm *DefaultStatManager) ProcessTimeEnd()
- func (sm *DefaultStatManager) ProcessTimeStart()
- func (sm *DefaultStatManager) SetBufferLength(l int64)
- type DynamicChannelBuffer
- type LinkedQueue
- type MetricGroup
- type OperatorNode
- type PrometheusMetrics
- type PrometheusStatManager
- type SinkNode
- func (o SinkNode) AddInputCount()
- func (m *SinkNode) AddOutput(_ chan<- interface{}, name string) error
- func (m *SinkNode) Broadcast(_ interface{}) error
- func (o SinkNode) GetInput() (chan<- interface{}, string)
- func (o SinkNode) GetInputCount() int
- func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error)
- func (m *SinkNode) SaveCache()
- func (o SinkNode) SetBarrierHandler(bh checkpoints.BarrierHandler)
- type SourceNode
- func (o SourceNode) AddOutput(output chan<- interface{}, name string) error
- func (o SourceNode) Broadcast(val interface{}) error
- func (o SourceNode) GetMetrics() (result [][]interface{})
- func (o SourceNode) GetName() string
- func (o SourceNode) GetStreamContext() api.StreamContext
- func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error)
- func (o SourceNode) SetConcurrency(concurr int)
- func (o SourceNode) SetQos(qos api.Qos)
- type StatManager
- type TupleList
- type UnFunc
- type UnOperation
- type UnaryOperator
- func (o UnaryOperator) AddInputCount()
- func (o *UnaryOperator) Exec(ctx api.StreamContext, errCh chan<- error)
- func (o UnaryOperator) GetInput() (chan<- interface{}, string)
- func (o UnaryOperator) GetInputCount() int
- func (o UnaryOperator) SetBarrierHandler(bh checkpoints.BarrierHandler)
- func (o *UnaryOperator) SetOperation(op UnOperation)
- type WatermarkGenerator
- type WatermarkTuple
- type WindowConfig
- type WindowOperator
- func (o WindowOperator) AddInputCount()
- func (o *WindowOperator) Exec(ctx api.StreamContext, errCh chan<- error)
- func (o WindowOperator) GetInput() (chan<- interface{}, string)
- func (o WindowOperator) GetInputCount() int
- func (o *WindowOperator) GetMetrics() [][]interface{}
- func (o WindowOperator) SetBarrierHandler(bh checkpoints.BarrierHandler)
Constants ¶
View Source
const BufferLength = "buffer_length"
View Source
const ExceptionsTotal = "exceptions_total"
View Source
const LastInvocation = "last_invocation"
View Source
const MSG_COUNT_KEY = "$$msgCount"
View Source
const OFFSET_KEY = "$$offset"
View Source
const ProcessLatencyUs = "process_latency_us"
View Source
const RecordsInTotal = "records_in_total"
View Source
const RecordsOutTotal = "records_out_total"
View Source
const TRIGGER_TIME_KEY = "$$triggerTime"
View Source
const WATERMARK_KEY = "$$wartermark"
View Source
const WINDOW_INPUTS_KEY = "$$windowInputs"
Variables ¶
View Source
var (
MetricNames = []string{RecordsInTotal, RecordsOutTotal, ExceptionsTotal, ProcessLatencyUs, BufferLength, LastInvocation}
)
Functions ¶
This section is empty.
Types ¶
type Cache ¶
type Cache struct { Out chan *CacheTuple Complete chan int // contains filtered or unexported fields }
func NewCheckpointbasedCache ¶
func NewCheckpointbasedCache(in <-chan interface{}, limit int, tch <-chan struct{}, errCh chan<- error, ctx api.StreamContext) *Cache
func NewTimebasedCache ¶
type CacheTuple ¶
type CacheTuple struct {
// contains filtered or unexported fields
}
type DefaultStatManager ¶
type DefaultStatManager struct {
// contains filtered or unexported fields
}
The statManager is not thread safe. Make sure it is used in only one instance
func (*DefaultStatManager) GetMetrics ¶
func (sm *DefaultStatManager) GetMetrics() []interface{}
func (*DefaultStatManager) IncTotalExceptions ¶
func (sm *DefaultStatManager) IncTotalExceptions()
func (*DefaultStatManager) IncTotalRecordsIn ¶
func (sm *DefaultStatManager) IncTotalRecordsIn()
func (*DefaultStatManager) IncTotalRecordsOut ¶
func (sm *DefaultStatManager) IncTotalRecordsOut()
func (*DefaultStatManager) ProcessTimeEnd ¶
func (sm *DefaultStatManager) ProcessTimeEnd()
func (*DefaultStatManager) ProcessTimeStart ¶
func (sm *DefaultStatManager) ProcessTimeStart()
func (*DefaultStatManager) SetBufferLength ¶
func (sm *DefaultStatManager) SetBufferLength(l int64)
type DynamicChannelBuffer ¶
type DynamicChannelBuffer struct { In chan api.SourceTuple Out chan api.SourceTuple // contains filtered or unexported fields }
func NewDynamicChannelBuffer ¶
func NewDynamicChannelBuffer() *DynamicChannelBuffer
func (*DynamicChannelBuffer) Close ¶
func (b *DynamicChannelBuffer) Close()
func (*DynamicChannelBuffer) GetLength ¶
func (b *DynamicChannelBuffer) GetLength() int
func (*DynamicChannelBuffer) SetLimit ¶
func (b *DynamicChannelBuffer) SetLimit(limit int)
type LinkedQueue ¶
func (*LinkedQueue) String ¶
func (l *LinkedQueue) String() string
type MetricGroup ¶
type MetricGroup struct { TotalRecordsIn *prometheus.CounterVec TotalRecordsOut *prometheus.CounterVec TotalExceptions *prometheus.CounterVec ProcessLatency *prometheus.GaugeVec BufferLength *prometheus.GaugeVec }
type OperatorNode ¶
type OperatorNode interface { api.Operator Broadcast(data interface{}) error GetStreamContext() api.StreamContext GetInputCount() int AddInputCount() SetQos(api.Qos) SetBarrierHandler(checkpoints.BarrierHandler) }
type PrometheusMetrics ¶
type PrometheusMetrics struct {
// contains filtered or unexported fields
}
func GetPrometheusMetrics ¶
func GetPrometheusMetrics() *PrometheusMetrics
func (*PrometheusMetrics) GetMetricsGroup ¶
func (m *PrometheusMetrics) GetMetricsGroup(opType string) *MetricGroup
type PrometheusStatManager ¶
type PrometheusStatManager struct { DefaultStatManager // contains filtered or unexported fields }
func (*PrometheusStatManager) IncTotalExceptions ¶
func (sm *PrometheusStatManager) IncTotalExceptions()
func (*PrometheusStatManager) IncTotalRecordsIn ¶
func (sm *PrometheusStatManager) IncTotalRecordsIn()
func (*PrometheusStatManager) IncTotalRecordsOut ¶
func (sm *PrometheusStatManager) IncTotalRecordsOut()
func (*PrometheusStatManager) ProcessTimeEnd ¶
func (sm *PrometheusStatManager) ProcessTimeEnd()
func (*PrometheusStatManager) SetBufferLength ¶
func (sm *PrometheusStatManager) SetBufferLength(l int64)
type SinkNode ¶
type SinkNode struct {
// contains filtered or unexported fields
}
func NewSinkNode ¶
func NewSinkNodeWithSink ¶
Only for mock source, do not use it in production
func (SinkNode) AddInputCount ¶
func (o SinkNode) AddInputCount()
func (SinkNode) GetInputCount ¶
func (o SinkNode) GetInputCount() int
func (SinkNode) SetBarrierHandler ¶
func (o SinkNode) SetBarrierHandler(bh checkpoints.BarrierHandler)
type SourceNode ¶
type SourceNode struct {
// contains filtered or unexported fields
}
func NewSourceNode ¶
func NewSourceNode(name string, options map[string]string) *SourceNode
func NewSourceNodeWithSource ¶
Only for mock source, do not use it in production
func (SourceNode) GetMetrics ¶
func (o SourceNode) GetMetrics() (result [][]interface{})
func (SourceNode) GetStreamContext ¶
func (o SourceNode) GetStreamContext() api.StreamContext
func (*SourceNode) Open ¶
func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error)
func (SourceNode) SetConcurrency ¶
func (o SourceNode) SetConcurrency(concurr int)
SetConcurrency sets the concurrency level for the operation
type StatManager ¶
type StatManager interface { IncTotalRecordsIn() IncTotalRecordsOut() IncTotalExceptions() ProcessTimeStart() ProcessTimeEnd() SetBufferLength(l int64) GetMetrics() []interface{} }
func NewStatManager ¶
func NewStatManager(opType string, ctx api.StreamContext) (StatManager, error)
type UnFunc ¶
type UnFunc func(api.StreamContext, interface{}) interface{}
UnFunc implements UnOperation as type func (context.Context, interface{})
func (UnFunc) Apply ¶
func (f UnFunc) Apply(ctx api.StreamContext, data interface{}) interface{}
Apply implements UnOperation.Apply method
type UnOperation ¶
type UnOperation interface {
Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, afv *xsql.AggregateFunctionValuer) interface{}
}
UnOperation interface represents unary operations (i.e. Map, Filter, etc)
type UnaryOperator ¶
type UnaryOperator struct {
// contains filtered or unexported fields
}
func New ¶
func New(name string, bufferLength int) *UnaryOperator
NewUnary creates *UnaryOperator value
func (UnaryOperator) AddInputCount ¶
func (o UnaryOperator) AddInputCount()
func (*UnaryOperator) Exec ¶
func (o *UnaryOperator) Exec(ctx api.StreamContext, errCh chan<- error)
Exec is the entry point for the executor
func (UnaryOperator) GetInputCount ¶
func (o UnaryOperator) GetInputCount() int
func (UnaryOperator) SetBarrierHandler ¶
func (o UnaryOperator) SetBarrierHandler(bh checkpoints.BarrierHandler)
func (*UnaryOperator) SetOperation ¶
func (o *UnaryOperator) SetOperation(op UnOperation)
SetOperation sets the executor operation
type WatermarkGenerator ¶
type WatermarkGenerator struct {
// contains filtered or unexported fields
}
func NewWatermarkGenerator ¶
func NewWatermarkGenerator(window *WindowConfig, l int64, s []string, stream chan<- interface{}) (*WatermarkGenerator, error)
type WatermarkTuple ¶
type WatermarkTuple struct {
Timestamp int64
}
func (*WatermarkTuple) GetTimestamp ¶
func (t *WatermarkTuple) GetTimestamp() int64
func (*WatermarkTuple) IsWatermark ¶
func (t *WatermarkTuple) IsWatermark() bool
type WindowConfig ¶
type WindowConfig struct { Type xsql.WindowType Length int Interval int //If interval is not set, it is equals to Length }
type WindowOperator ¶
type WindowOperator struct {
// contains filtered or unexported fields
}
func NewWindowOp ¶
func NewWindowOp(name string, w WindowConfig, isEventTime bool, lateTolerance int64, streams []string, bufferLength int) (*WindowOperator, error)
func (WindowOperator) AddInputCount ¶
func (o WindowOperator) AddInputCount()
func (*WindowOperator) Exec ¶
func (o *WindowOperator) Exec(ctx api.StreamContext, errCh chan<- error)
Exec is the entry point for the executor input: *xsql.Tuple from preprocessor output: xsql.WindowTuplesSet
func (WindowOperator) GetInputCount ¶
func (o WindowOperator) GetInputCount() int
func (*WindowOperator) GetMetrics ¶
func (o *WindowOperator) GetMetrics() [][]interface{}
func (WindowOperator) SetBarrierHandler ¶
func (o WindowOperator) SetBarrierHandler(bh checkpoints.BarrierHandler)
Click to show internal directories.
Click to hide internal directories.