Documentation ¶
Index ¶
- type Closable
- type Collector
- type DatetimeRange
- type DefaultSourceTuple
- func NewDefaultRawTuple(raw []byte, meta map[string]interface{}, ts time.Time) *DefaultSourceTuple
- func NewDefaultSourceTuple(message map[string]interface{}, meta map[string]interface{}) *DefaultSourceTuple
- func NewDefaultSourceTupleWithTime(message map[string]interface{}, meta map[string]interface{}, ...) *DefaultSourceTuple
- type Emitter
- type Function
- type FunctionContext
- type GraphNode
- type Logger
- type LookupSource
- type MessageClient
- type Operator
- type PrintableTopo
- type Qos
- type RawTuple
- type ResendSink
- type RestartStrategy
- type Rewindable
- type Rule
- type RuleGraph
- type RuleOption
- type Sink
- type Source
- type SourceConnector
- type SourceMeta
- type SourceTuple
- type Store
- type StreamContext
- type Subscriber
- type TopNode
- type TopicChannel
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Closable ¶
type Closable interface {
Close(ctx StreamContext) error
}
type DatetimeRange ¶
type DefaultSourceTuple ¶
type DefaultSourceTuple struct { Mess map[string]interface{} `json:"message"` M map[string]interface{} `json:"meta"` Time time.Time `json:"timestamp"` // contains filtered or unexported fields }
func NewDefaultRawTuple ¶
func NewDefaultRawTuple(raw []byte, meta map[string]interface{}, ts time.Time) *DefaultSourceTuple
NewDefaultRawTuple creates a new DefaultSourceTuple with raw data. Use this when extend source connector
func NewDefaultSourceTuple ¶
func NewDefaultSourceTuple(message map[string]interface{}, meta map[string]interface{}) *DefaultSourceTuple
NewDefaultSourceTuple creates a new DefaultSourceTuple with message and metadata. Use this when extend all in one source.
func NewDefaultSourceTupleWithTime ¶
func NewDefaultSourceTupleWithTime(message map[string]interface{}, meta map[string]interface{}, timestamp time.Time) *DefaultSourceTuple
func (*DefaultSourceTuple) Message ¶
func (t *DefaultSourceTuple) Message() map[string]interface{}
func (*DefaultSourceTuple) Meta ¶
func (t *DefaultSourceTuple) Meta() map[string]interface{}
func (*DefaultSourceTuple) Raw ¶
func (t *DefaultSourceTuple) Raw() []byte
func (*DefaultSourceTuple) Timestamp ¶
func (t *DefaultSourceTuple) Timestamp() time.Time
type Function ¶
type Function interface { // Validate The argument is a list of xsql.Expr Validate(args []interface{}) error // Exec Execute the function, return the result and if execution is successful. // If execution fails, return the error and false. Exec(args []interface{}, ctx FunctionContext) (interface{}, bool) // IsAggregate If this function is an aggregate function. Each parameter of an aggregate function will be a slice IsAggregate() bool }
type FunctionContext ¶
type FunctionContext interface { StreamContext GetFuncId() int }
type Logger ¶
type Logger interface { Debug(args ...interface{}) Info(args ...interface{}) Warn(args ...interface{}) Error(args ...interface{}) Debugln(args ...interface{}) Infoln(args ...interface{}) Warnln(args ...interface{}) Errorln(args ...interface{}) Debugf(format string, args ...interface{}) Infof(format string, args ...interface{}) Warnf(format string, args ...interface{}) Errorf(format string, args ...interface{}) }
type LookupSource ¶
type LookupSource interface { // Open creates the connection to the external data source Open(ctx StreamContext) error // Configure Called during initialization. Configure the source with the data source(e.g. topic for mqtt) and the properties // read from the yaml Configure(datasource string, props map[string]interface{}) error // Lookup receive lookup values to construct the query and return query results Lookup(ctx StreamContext, fields []string, keys []string, values []interface{}) ([]SourceTuple, error) Closable }
type MessageClient ¶
type MessageClient interface { Subscribe(c StreamContext, subChan []TopicChannel, messageErrors chan error, params map[string]interface{}) error Publish(c StreamContext, topic string, message []byte, params map[string]interface{}) error Ping() error }
type PrintableTopo ¶
type ResendSink ¶
type ResendSink interface { Sink // CollectResend Called when the sink cache resend is triggered CollectResend(ctx StreamContext, data interface{}) error }
type RestartStrategy ¶
type Rewindable ¶
type Rule ¶
type Rule struct { Triggered bool `json:"triggered"` Id string `json:"id,omitempty"` Name string `json:"name,omitempty"` // The display name of a rule Sql string `json:"sql,omitempty"` Graph *RuleGraph `json:"graph,omitempty"` Actions []map[string]interface{} `json:"actions,omitempty"` Options *RuleOption `json:"options,omitempty"` }
Rule the definition of the business logic Sql and Graph are mutually exclusive, at least one of them should be set
func GetDefaultRule ¶
func (*Rule) IsLongRunningScheduleRule ¶
func (*Rule) IsScheduleRule ¶
type RuleGraph ¶
type RuleGraph struct { Nodes map[string]*GraphNode `json:"nodes"` Topo *PrintableTopo `json:"topo"` }
type RuleOption ¶
type RuleOption struct { Debug bool `json:"debug" yaml:"debug"` LogFilename string `json:"logFilename" yaml:"logFilename"` IsEventTime bool `json:"isEventTime" yaml:"isEventTime"` LateTol int64 `json:"lateTolerance" yaml:"lateTolerance"` Concurrency int `json:"concurrency" yaml:"concurrency"` BufferLength int `json:"bufferLength" yaml:"bufferLength"` SendMetaToSink bool `json:"sendMetaToSink" yaml:"sendMetaToSink"` SendError bool `json:"sendError" yaml:"sendError"` Qos Qos `json:"qos" yaml:"qos"` CheckpointInterval int `json:"checkpointInterval" yaml:"checkpointInterval"` RestartStrategy *RestartStrategy `json:"restartStrategy" yaml:"restartStrategy"` Cron string `json:"cron" yaml:"cron"` Duration string `json:"duration" yaml:"duration"` CronDatetimeRange []DatetimeRange `json:"cronDatetimeRange" yaml:"cronDatetimeRange"` }
type Sink ¶
type Sink interface { // Open Should be sync function for normal case. The container will run it in go func Open(ctx StreamContext) error // Configure Called during initialization. Configure the sink with the properties from rule action definition Configure(props map[string]interface{}) error // Collect Called when each row of data has transferred to this sink Collect(ctx StreamContext, data interface{}) error Closable }
type Source ¶
type Source interface { // Open Should be sync function for normal case. The container will run it in go func Open(ctx StreamContext, consumer chan<- SourceTuple, errCh chan<- error) // Configure Called during initialization. Configure the source with the data source(e.g. topic for mqtt) and the properties // read from the yaml Configure(datasource string, props map[string]interface{}) error Closable }
type SourceConnector ¶
type SourceConnector interface { Source Connect(ctx StreamContext) error Subscriber }
type SourceMeta ¶
type SourceMeta struct { SourceName string `json:"sourceName"` // the name of the stream or table SourceType string `json:"sourceType"` // stream or table }
SourceMeta is the meta data of a source node. It describes what existed stream/table to refer to. It is part of the Props in the GraphNode and it is optional
type SourceTuple ¶
type StreamContext ¶
type StreamContext interface { context.Context GetLogger() Logger GetRuleId() string GetOpId() string GetInstanceId() int GetRootPath() string WithMeta(ruleId string, opId string, store Store) StreamContext WithInstance(instanceId int) StreamContext WithCancel() (StreamContext, context.CancelFunc) SetError(e error) // IncrCounter State handling IncrCounter(key string, amount int) error GetCounter(key string) (int, error) PutState(key string, value interface{}) error GetState(key string) (interface{}, error) DeleteState(key string) error // ParseTemplate parse the template string with the given data ParseTemplate(template string, data interface{}) (string, error) // ParseJsonPath parse the jsonPath string with the given data ParseJsonPath(jsonPath string, data interface{}) (interface{}, error) // TransformOutput Transform output according to the properties including dataTemplate, sendSingle, fields // TransformOutput first transform data through the dataTemplate property,and then select data based on the fields property // It is recommended that you do not configure both the dataTemplate property and the fields property. // The second parameter is whether the data is transformed or just return as its json format. TransformOutput(data interface{}) ([]byte, bool, error) // Decode is set in the source according to the format. // It decodes byte array into map or map slice. Decode(data []byte) (map[string]interface{}, error) DecodeIntoList(data []byte) ([]map[string]interface{}, error) }
type Subscriber ¶
type Subscriber interface {
Subscribe(ctx StreamContext) error
}
type TopicChannel ¶
type TopicChannel struct { // Topic for subscriber to filter on if any Topic string // Messages is the returned message channel for the subscriber Messages chan<- interface{} }
TopicChannel is the data structure for subscriber
Click to show internal directories.
Click to hide internal directories.