api

package
v1.4.0-beta.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 6, 2024 License: Apache-2.0 Imports: 3 Imported by: 33

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Closable

type Closable interface {
	Close(ctx StreamContext) error
}

type Collector

type Collector interface {
	GetInput() (chan<- interface{}, string)
}

type DatetimeRange

type DatetimeRange struct {
	Begin          string `json:"begin" yaml:"begin"`
	End            string `json:"end" yaml:"end"`
	BeginTimestamp int64  `json:"beginTimestamp"`
	EndTimestamp   int64  `json:"endTimestamp"`
}

type DefaultSourceTuple

type DefaultSourceTuple struct {
	Mess map[string]interface{} `json:"message"`
	M    map[string]interface{} `json:"meta"`
	Time time.Time              `json:"timestamp"`
	// contains filtered or unexported fields
}

func NewDefaultRawTuple

func NewDefaultRawTuple(raw []byte, meta map[string]interface{}, ts time.Time) *DefaultSourceTuple

NewDefaultRawTuple creates a new DefaultSourceTuple with raw data. Use this when extend source connector

func NewDefaultSourceTuple

func NewDefaultSourceTuple(message map[string]interface{}, meta map[string]interface{}) *DefaultSourceTuple

NewDefaultSourceTuple creates a new DefaultSourceTuple with message and metadata. Use this when extend all in one source.

func NewDefaultSourceTupleWithTime

func NewDefaultSourceTupleWithTime(message map[string]interface{}, meta map[string]interface{}, timestamp time.Time) *DefaultSourceTuple

func (*DefaultSourceTuple) Message

func (t *DefaultSourceTuple) Message() map[string]interface{}

func (*DefaultSourceTuple) Meta

func (t *DefaultSourceTuple) Meta() map[string]interface{}

func (*DefaultSourceTuple) Raw

func (t *DefaultSourceTuple) Raw() []byte

func (*DefaultSourceTuple) Timestamp

func (t *DefaultSourceTuple) Timestamp() time.Time

type Emitter

type Emitter interface {
	AddOutput(chan<- interface{}, string) error
}

type Function

type Function interface {
	// Validate The argument is a list of xsql.Expr
	Validate(args []interface{}) error
	// Exec Execute the function, return the result and if execution is successful.
	// If execution fails, return the error and false.
	Exec(args []interface{}, ctx FunctionContext) (interface{}, bool)
	// IsAggregate If this function is an aggregate function. Each parameter of an aggregate function will be a slice
	IsAggregate() bool
}

type FunctionContext

type FunctionContext interface {
	StreamContext
	GetFuncId() int
}

type GraphNode

type GraphNode struct {
	Type     string                 `json:"type"`
	NodeType string                 `json:"nodeType"`
	Props    map[string]interface{} `json:"props"`
	// UI is a placeholder for ui properties
	UI map[string]interface{} `json:"ui"`
}

type Logger

type Logger interface {
	Debug(args ...interface{})
	Info(args ...interface{})
	Warn(args ...interface{})
	Error(args ...interface{})
	Debugln(args ...interface{})
	Infoln(args ...interface{})
	Warnln(args ...interface{})
	Errorln(args ...interface{})
	Debugf(format string, args ...interface{})
	Infof(format string, args ...interface{})
	Warnf(format string, args ...interface{})
	Errorf(format string, args ...interface{})
}

type LookupSource

type LookupSource interface {
	// Open creates the connection to the external data source
	Open(ctx StreamContext) error
	// Configure Called during initialization. Configure the source with the data source(e.g. topic for mqtt) and the properties
	// read from the yaml
	Configure(datasource string, props map[string]interface{}) error
	// Lookup receive lookup values to construct the query and return query results
	Lookup(ctx StreamContext, fields []string, keys []string, values []interface{}) ([]SourceTuple, error)
	Closable
}

type MessageClient

type MessageClient interface {
	Subscribe(c StreamContext, subChan []TopicChannel, messageErrors chan error, params map[string]interface{}) error
	Publish(c StreamContext, topic string, message []byte, params map[string]interface{}) error
	Ping() error
}

type Operator

type Operator interface {
	Emitter
	Collector
	Exec(StreamContext, chan<- error)
	GetName() string
	GetMetrics() []any
}

type PrintableTopo

type PrintableTopo struct {
	Sources []string                 `json:"sources"`
	Edges   map[string][]interface{} `json:"edges"`
}

type Qos

type Qos int
const (
	AtMostOnce Qos = iota
	AtLeastOnce
	ExactlyOnce
)

type RawTuple

type RawTuple interface {
	Raw() []byte
}

type ResendSink

type ResendSink interface {
	Sink
	// CollectResend Called when the sink cache resend is triggered
	CollectResend(ctx StreamContext, data interface{}) error
}

type RestartStrategy

type RestartStrategy struct {
	Attempts     int     `json:"attempts" yaml:"attempts"`
	Delay        int     `json:"delay" yaml:"delay"`
	Multiplier   float64 `json:"multiplier" yaml:"multiplier"`
	MaxDelay     int     `json:"maxDelay" yaml:"maxDelay"`
	JitterFactor float64 `json:"jitterFactor" yaml:"jitterFactor"`
}

type Rewindable

type Rewindable interface {
	GetOffset() (interface{}, error)
	Rewind(offset interface{}) error
	ResetOffset(input map[string]interface{}) error
}

type Rule

type Rule struct {
	Triggered bool                     `json:"triggered"`
	Id        string                   `json:"id,omitempty"`
	Name      string                   `json:"name,omitempty"` // The display name of a rule
	Sql       string                   `json:"sql,omitempty"`
	Graph     *RuleGraph               `json:"graph,omitempty"`
	Actions   []map[string]interface{} `json:"actions,omitempty"`
	Options   *RuleOption              `json:"options,omitempty"`
}

Rule the definition of the business logic Sql and Graph are mutually exclusive, at least one of them should be set

func GetDefaultRule

func GetDefaultRule(name, sql string) *Rule

func (*Rule) IsLongRunningScheduleRule

func (r *Rule) IsLongRunningScheduleRule() bool

func (*Rule) IsScheduleRule

func (r *Rule) IsScheduleRule() bool

type RuleGraph

type RuleGraph struct {
	Nodes map[string]*GraphNode `json:"nodes"`
	Topo  *PrintableTopo        `json:"topo"`
}

type RuleOption

type RuleOption struct {
	Debug              bool             `json:"debug" yaml:"debug"`
	LogFilename        string           `json:"logFilename" yaml:"logFilename"`
	IsEventTime        bool             `json:"isEventTime" yaml:"isEventTime"`
	LateTol            int64            `json:"lateTolerance" yaml:"lateTolerance"`
	Concurrency        int              `json:"concurrency" yaml:"concurrency"`
	BufferLength       int              `json:"bufferLength" yaml:"bufferLength"`
	SendMetaToSink     bool             `json:"sendMetaToSink" yaml:"sendMetaToSink"`
	SendError          bool             `json:"sendError" yaml:"sendError"`
	Qos                Qos              `json:"qos" yaml:"qos"`
	CheckpointInterval int              `json:"checkpointInterval" yaml:"checkpointInterval"`
	RestartStrategy    *RestartStrategy `json:"restartStrategy" yaml:"restartStrategy"`
	Cron               string           `json:"cron" yaml:"cron"`
	Duration           string           `json:"duration" yaml:"duration"`
	CronDatetimeRange  []DatetimeRange  `json:"cronDatetimeRange" yaml:"cronDatetimeRange"`
}

type Sink

type Sink interface {
	// Open Should be sync function for normal case. The container will run it in go func
	Open(ctx StreamContext) error
	// Configure Called during initialization. Configure the sink with the properties from rule action definition
	Configure(props map[string]interface{}) error
	// Collect Called when each row of data has transferred to this sink
	Collect(ctx StreamContext, data interface{}) error
	Closable
}

type Source

type Source interface {
	// Open Should be sync function for normal case. The container will run it in go func
	Open(ctx StreamContext, consumer chan<- SourceTuple, errCh chan<- error)
	// Configure Called during initialization. Configure the source with the data source(e.g. topic for mqtt) and the properties
	// read from the yaml
	Configure(datasource string, props map[string]interface{}) error
	Closable
}

type SourceConnector

type SourceConnector interface {
	Source
	Connect(ctx StreamContext) error
	Subscriber
}

type SourceMeta

type SourceMeta struct {
	SourceName string `json:"sourceName"` // the name of the stream or table
	SourceType string `json:"sourceType"` // stream or table
}

SourceMeta is the meta data of a source node. It describes what existed stream/table to refer to. It is part of the Props in the GraphNode and it is optional

type SourceTuple

type SourceTuple interface {
	Message() map[string]interface{}
	Meta() map[string]interface{}
	Timestamp() time.Time
}

type Store

type Store interface {
	SaveState(checkpointId int64, opId string, state map[string]interface{}) error
	// SaveCheckpoint saves the whole checkpoint state into storage
	SaveCheckpoint(checkpointId int64) error
	GetOpState(opId string) (*sync.Map, error)
	Clean() error
}

type StreamContext

type StreamContext interface {
	context.Context
	GetLogger() Logger
	GetRuleId() string
	GetOpId() string
	GetInstanceId() int
	GetRootPath() string
	WithMeta(ruleId string, opId string, store Store) StreamContext
	WithInstance(instanceId int) StreamContext
	WithCancel() (StreamContext, context.CancelFunc)
	SetError(e error)
	// IncrCounter State handling
	IncrCounter(key string, amount int) error
	GetCounter(key string) (int, error)
	PutState(key string, value interface{}) error
	GetState(key string) (interface{}, error)
	DeleteState(key string) error
	// ParseTemplate parse the template string with the given data
	ParseTemplate(template string, data interface{}) (string, error)
	// ParseJsonPath parse the jsonPath string with the given data
	ParseJsonPath(jsonPath string, data interface{}) (interface{}, error)
	// TransformOutput Transform output according to the properties including dataTemplate, sendSingle, fields
	// TransformOutput first transform data through the dataTemplate property,and then select data based on the fields property
	// It is recommended that you do not configure both the dataTemplate property and the fields property.
	// The second parameter is whether the data is transformed or just return as its json format.
	TransformOutput(data interface{}) ([]byte, bool, error)
	// Decode is set in the source according to the format.
	// It decodes byte array into map or map slice.
	Decode(data []byte) (map[string]interface{}, error)

	DecodeIntoList(data []byte) ([]map[string]interface{}, error)
}

type Subscriber

type Subscriber interface {
	Subscribe(ctx StreamContext) error
}

type TopNode

type TopNode interface {
	GetName() string
}

type TopicChannel

type TopicChannel struct {
	// Topic for subscriber to filter on if any
	Topic string
	// Messages is the returned message channel for the subscriber
	Messages chan<- interface{}
}

TopicChannel is the data structure for subscriber

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL