Documentation ¶
Index ¶
- Constants
- Variables
- func GetPluginInsManager() *pluginInsManager
- func GetPluginInsManager4Test() *pluginInsManager
- type Closable
- type Command
- type Control
- type ControlChannel
- type DataInChannel
- type DataOutChannel
- type DataReqChannel
- type FuncData
- type FuncMeta
- type FuncReply
- type Meta
- type NanomsgReqChannel
- type NanomsgReqRepChannel
- type PluginIns
- type PluginMeta
- type PluginStatus
- type PortableConfig
- type PortableFunc
- type PortableSink
- func (ps *PortableSink) Close(ctx api.StreamContext) error
- func (ps *PortableSink) Collect(ctx api.StreamContext, item api.RawTuple) error
- func (ps *PortableSink) Connect(ctx api.StreamContext, _ api.StatusChangeHandler) error
- func (ps *PortableSink) Provision(ctx api.StreamContext, configs map[string]any) error
- type PortableSource
- func (ps *PortableSource) Close(ctx api.StreamContext) error
- func (ps *PortableSource) Configure(topic string, props map[string]interface{}) error
- func (ps *PortableSource) Connect(ctx api.StreamContext, _ api.StatusChangeHandler) error
- func (ps *PortableSource) Provision(ctx api.StreamContext, configs map[string]any) error
- func (ps *PortableSource) Subscribe(ctx api.StreamContext, ingest api.TupleIngest, ingestError api.ErrorIngest) error
Constants ¶
const ( PluginStatusRunning = "running" PluginStatusInit = "initializing" PluginStatusErr = "error" PluginStatusStop = "stop" )
const ( TYPE_SOURCE = "source" TYPE_SINK = "sink" TYPE_FUNC = "func" )
const ( CMD_START = "start" CMD_STOP = "stop" )
const (
REPLY_OK = "ok"
)
Variables ¶
var PortbleConf = &PortableConfig{
SendTimeout: 1000,
}
TODO setting configuration
Functions ¶
func GetPluginInsManager ¶
func GetPluginInsManager() *pluginInsManager
func GetPluginInsManager4Test ¶
func GetPluginInsManager4Test() *pluginInsManager
Types ¶
type ControlChannel ¶
func CreateControlChannel ¶
func CreateControlChannel(pluginName string) (ControlChannel, error)
type DataInChannel ¶
func CreateSinkAckChannel ¶
func CreateSinkAckChannel(ctx api.StreamContext) (DataInChannel, error)
func CreateSourceChannel ¶
func CreateSourceChannel(ctx api.StreamContext) (DataInChannel, error)
type DataOutChannel ¶
func CreateSinkChannel ¶
func CreateSinkChannel(ctx api.StreamContext) (DataOutChannel, error)
type DataReqChannel ¶
func CreateFunctionChannel ¶
func CreateFunctionChannel(symbolName string) (DataReqChannel, error)
type FuncReply ¶
type FuncReply struct { State bool `json:"state"` Result interface{} `json:"result"` }
type NanomsgReqChannel ¶
NanomsgReqChannel shared by symbols
func (*NanomsgReqChannel) Close ¶
func (r *NanomsgReqChannel) Close() error
func (*NanomsgReqChannel) Handshake ¶
func (r *NanomsgReqChannel) Handshake() error
Handshake should only be called once
func (*NanomsgReqChannel) SendCmd ¶
func (r *NanomsgReqChannel) SendCmd(arg []byte) error
type NanomsgReqRepChannel ¶
func (*NanomsgReqRepChannel) Close ¶
func (r *NanomsgReqRepChannel) Close() error
type PluginIns ¶
type PluginIns struct { sync.RWMutex Status *PluginStatus // contains filtered or unexported fields }
PluginIns created at two scenarios 1. At runtime, plugin is created/updated: in order to be able to reload rules that already uses previous ins 2. At system start/restart Once created, never deleted until system shutdown
func NewPluginIns ¶
func NewPluginIns(name string, ctrlChan ControlChannel, process *os.Process) *PluginIns
func NewPluginInsForTest ¶
func NewPluginInsForTest(name string, ctrlChan ControlChannel) *PluginIns
func (*PluginIns) GetStatus ¶
func (i *PluginIns) GetStatus() *PluginStatus
func (*PluginIns) StartSymbol ¶
func (i *PluginIns) StartSymbol(ctx api.StreamContext, ctrl *Control) error
func (*PluginIns) StopSymbol ¶
func (i *PluginIns) StopSymbol(ctx api.StreamContext, ctrl *Control) error
type PluginMeta ¶
type PluginStatus ¶
type PluginStatus struct { RefCount map[string]int `json:"refCount"` Status string `json:"status"` ErrMsg string `json:"errMsg"` }
func NewPluginStatus ¶
func NewPluginStatus() *PluginStatus
func (*PluginStatus) GetRuleRefCount ¶
func (s *PluginStatus) GetRuleRefCount(rule string) int
func (*PluginStatus) StartRunning ¶
func (s *PluginStatus) StartRunning()
func (*PluginStatus) StatusErr ¶
func (s *PluginStatus) StatusErr(err error)
func (*PluginStatus) Stop ¶
func (s *PluginStatus) Stop()
type PortableConfig ¶
type PortableConfig struct {
SendTimeout int64 `json:"sendTimeout"`
}
type PortableFunc ¶
type PortableFunc struct {
// contains filtered or unexported fields
}
PortableFunc each function symbol only has a singleton Each singleton are long-running go routine Currently, it is cached and never ended once created It is actually a wrapper of the data channel and can be fit to any plugin instance Thus, it is possible to hot reload, which is simply attach a new nng client to the same channel without changing the server(plugin runtime) side TODO think about ending a portable func when needed.
func NewPortableFunc ¶
func NewPortableFunc(symbolName string, reg *PluginMeta) (_ *PortableFunc, e error)
func (*PortableFunc) Close ¶
func (f *PortableFunc) Close() error
func (*PortableFunc) Exec ¶
func (f *PortableFunc) Exec(ctx api.FunctionContext, args []any) (interface{}, bool)
func (*PortableFunc) IsAggregate ¶
func (f *PortableFunc) IsAggregate() bool
func (*PortableFunc) Validate ¶
func (f *PortableFunc) Validate(args []interface{}) error
type PortableSink ¶
type PortableSink struct {
// contains filtered or unexported fields
}
func NewPortableSink ¶
func NewPortableSink(symbolName string, reg *PluginMeta) *PortableSink
func (*PortableSink) Close ¶
func (ps *PortableSink) Close(ctx api.StreamContext) error
func (*PortableSink) Collect ¶
func (ps *PortableSink) Collect(ctx api.StreamContext, item api.RawTuple) error
func (*PortableSink) Connect ¶
func (ps *PortableSink) Connect(ctx api.StreamContext, _ api.StatusChangeHandler) error
func (*PortableSink) Provision ¶
func (ps *PortableSink) Provision(ctx api.StreamContext, configs map[string]any) error
type PortableSource ¶
type PortableSource struct {
// contains filtered or unexported fields
}
func NewPortableSource ¶
func NewPortableSource(symbolName string, reg *PluginMeta) *PortableSource
func (*PortableSource) Close ¶
func (ps *PortableSource) Close(ctx api.StreamContext) error
func (*PortableSource) Configure ¶
func (ps *PortableSource) Configure(topic string, props map[string]interface{}) error
func (*PortableSource) Connect ¶
func (ps *PortableSource) Connect(ctx api.StreamContext, _ api.StatusChangeHandler) error
func (*PortableSource) Provision ¶
func (ps *PortableSource) Provision(ctx api.StreamContext, configs map[string]any) error
func (*PortableSource) Subscribe ¶
func (ps *PortableSource) Subscribe(ctx api.StreamContext, ingest api.TupleIngest, ingestError api.ErrorIngest) error