runtime

package
v2.0.0-beta.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 14, 2024 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	PluginStatusRunning = "running"
	PluginStatusInit    = "initializing"
	PluginStatusErr     = "error"
	PluginStatusStop    = "stop"
)
View Source
const (
	TYPE_SOURCE = "source"
	TYPE_SINK   = "sink"
	TYPE_FUNC   = "func"
)
View Source
const (
	CMD_START = "start"
	CMD_STOP  = "stop"
)
View Source
const (
	REPLY_OK = "ok"
)

Variables

View Source
var PortbleConf = &PortableConfig{
	SendTimeout: 1000,
}

TODO setting configuration

Functions

func GetPluginInsManager

func GetPluginInsManager() *pluginInsManager

func GetPluginInsManager4Test

func GetPluginInsManager4Test() *pluginInsManager

Types

type Closable

type Closable interface {
	Close() error
}

type Command

type Command struct {
	Cmd string `json:"cmd"`
	Arg string `json:"arg"`
}

type Control

type Control struct {
	SymbolName string                 `json:"symbolName"`
	Meta       Meta                   `json:"meta"`
	PluginType string                 `json:"pluginType"`
	DataSource string                 `json:"dataSource,omitempty"`
	Config     map[string]interface{} `json:"config,omitempty"`
}

type ControlChannel

type ControlChannel interface {
	Handshake() error
	SendCmd(arg []byte) error
	Closable
}

func CreateControlChannel

func CreateControlChannel(pluginName string) (ControlChannel, error)

type DataInChannel

type DataInChannel interface {
	Recv() ([]byte, error)
	Closable
}

func CreateSinkAckChannel

func CreateSinkAckChannel(ctx api.StreamContext) (DataInChannel, error)

func CreateSourceChannel

func CreateSourceChannel(ctx api.StreamContext) (DataInChannel, error)

type DataOutChannel

type DataOutChannel interface {
	Send([]byte) error
	Closable
}

func CreateSinkChannel

func CreateSinkChannel(ctx api.StreamContext) (DataOutChannel, error)

type DataReqChannel

type DataReqChannel interface {
	Req([]byte) ([]byte, error)
	Closable
}

func CreateFunctionChannel

func CreateFunctionChannel(symbolName string) (DataReqChannel, error)

type FuncData

type FuncData struct {
	Func string      `json:"func"`
	Arg  interface{} `json:"arg"`
}

type FuncMeta

type FuncMeta struct {
	Meta
	FuncId int `json:"funcId"`
}

type FuncReply

type FuncReply struct {
	State  bool        `json:"state"`
	Result interface{} `json:"result"`
}

type Meta

type Meta struct {
	RuleId     string `json:"ruleId"`
	OpId       string `json:"opId"`
	InstanceId int    `json:"instanceId"`
}

type NanomsgReqChannel

type NanomsgReqChannel struct {
	sync.Mutex
	// contains filtered or unexported fields
}

NanomsgReqChannel shared by symbols

func (*NanomsgReqChannel) Close

func (r *NanomsgReqChannel) Close() error

func (*NanomsgReqChannel) Handshake

func (r *NanomsgReqChannel) Handshake() error

Handshake should only be called once

func (*NanomsgReqChannel) SendCmd

func (r *NanomsgReqChannel) SendCmd(arg []byte) error

type NanomsgReqRepChannel

type NanomsgReqRepChannel struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func (*NanomsgReqRepChannel) Close

func (r *NanomsgReqRepChannel) Close() error

func (*NanomsgReqRepChannel) Req

func (r *NanomsgReqRepChannel) Req(arg []byte) ([]byte, error)

type PluginIns

type PluginIns struct {
	sync.RWMutex

	Status *PluginStatus
	// contains filtered or unexported fields
}

PluginIns created at two scenarios 1. At runtime, plugin is created/updated: in order to be able to reload rules that already uses previous ins 2. At system start/restart Once created, never deleted until system shutdown

func NewPluginIns

func NewPluginIns(name string, ctrlChan ControlChannel, process *os.Process) *PluginIns

func NewPluginInsForTest

func NewPluginInsForTest(name string, ctrlChan ControlChannel) *PluginIns

func (*PluginIns) GetStatus

func (i *PluginIns) GetStatus() *PluginStatus

func (*PluginIns) StartSymbol

func (i *PluginIns) StartSymbol(ctx api.StreamContext, ctrl *Control) error

func (*PluginIns) Stop

func (i *PluginIns) Stop() error

Stop intentionally

func (*PluginIns) StopSymbol

func (i *PluginIns) StopSymbol(ctx api.StreamContext, ctrl *Control) error

type PluginMeta

type PluginMeta struct {
	Name        string  `json:"name"`
	Version     string  `json:"version"`
	Language    string  `json:"language"`
	Executable  string  `json:"executable"`
	VirtualType *string `json:"virtualEnvType,omitempty"`
	Env         *string `json:"env,omitempty"`
}

type PluginStatus

type PluginStatus struct {
	RefCount map[string]int `json:"refCount"`
	Status   string         `json:"status"`
	ErrMsg   string         `json:"errMsg"`
}

func NewPluginStatus

func NewPluginStatus() *PluginStatus

func (*PluginStatus) GetRuleRefCount

func (s *PluginStatus) GetRuleRefCount(rule string) int

func (*PluginStatus) StartRunning

func (s *PluginStatus) StartRunning()

func (*PluginStatus) StatusErr

func (s *PluginStatus) StatusErr(err error)

func (*PluginStatus) Stop

func (s *PluginStatus) Stop()

type PortableConfig

type PortableConfig struct {
	SendTimeout int64 `json:"sendTimeout"`
}

type PortableFunc

type PortableFunc struct {
	// contains filtered or unexported fields
}

PortableFunc each function symbol only has a singleton Each singleton are long-running go routine Currently, it is cached and never ended once created It is actually a wrapper of the data channel and can be fit to any plugin instance Thus, it is possible to hot reload, which is simply attach a new nng client to the same channel without changing the server(plugin runtime) side TODO think about ending a portable func when needed.

func NewPortableFunc

func NewPortableFunc(symbolName string, reg *PluginMeta) (_ *PortableFunc, e error)

func (*PortableFunc) Close

func (f *PortableFunc) Close() error

func (*PortableFunc) Exec

func (f *PortableFunc) Exec(ctx api.FunctionContext, args []any) (interface{}, bool)

func (*PortableFunc) IsAggregate

func (f *PortableFunc) IsAggregate() bool

func (*PortableFunc) Validate

func (f *PortableFunc) Validate(args []interface{}) error

type PortableSink

type PortableSink struct {
	// contains filtered or unexported fields
}

func NewPortableSink

func NewPortableSink(symbolName string, reg *PluginMeta) *PortableSink

func (*PortableSink) Close

func (ps *PortableSink) Close(ctx api.StreamContext) error

func (*PortableSink) Collect

func (ps *PortableSink) Collect(ctx api.StreamContext, item api.RawTuple) error

func (*PortableSink) Connect

func (*PortableSink) Provision

func (ps *PortableSink) Provision(ctx api.StreamContext, configs map[string]any) error

type PortableSource

type PortableSource struct {
	// contains filtered or unexported fields
}

func NewPortableSource

func NewPortableSource(symbolName string, reg *PluginMeta) *PortableSource

func (*PortableSource) Close

func (ps *PortableSource) Close(ctx api.StreamContext) error

func (*PortableSource) Configure

func (ps *PortableSource) Configure(topic string, props map[string]interface{}) error

func (*PortableSource) Connect

func (*PortableSource) Provision

func (ps *PortableSource) Provision(ctx api.StreamContext, configs map[string]any) error

func (*PortableSource) Subscribe

func (ps *PortableSource) Subscribe(ctx api.StreamContext, ingest api.TupleIngest, ingestError api.ErrorIngest) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL