Documentation ¶
Index ¶
- func GetAllRegisteredJoints() map[string]interface{}
- func RegisterPipeJoint(joint interface{})
- func RegisterPipeJointWithName(jointName string, joint interface{})
- type ComplexProcessor
- type Context
- func (context *Context) End(msg interface{})
- func (context *Context) Exit(msg interface{})
- func (context *Context) IsEnd() bool
- func (context *Context) IsExit() bool
- func (context *Context) IsPause() bool
- func (context *Context) Marshall() []byte
- func (context *Context) Pause()
- func (context *Context) Resume()
- type Filter
- type Input
- type Joint
- type JointType
- type Output
- type ParaKey
- type Parameters
- func (para *Parameters) Get(key ParaKey) interface{}
- func (para *Parameters) GetArray(key ParaKey) ([]interface{}, bool)
- func (para *Parameters) GetBool(key ParaKey, defaultV bool) bool
- func (para *Parameters) GetBytes(key ParaKey) ([]byte, bool)
- func (para *Parameters) GetInt(key ParaKey, defaultV int) (int, bool)
- func (para *Parameters) GetInt64(key ParaKey, defaultV int64) (int64, bool)
- func (para *Parameters) GetInt64OrDefault(key ParaKey, defaultV int64) int64
- func (para *Parameters) GetIntOrDefault(key ParaKey, defaultV int) int
- func (para *Parameters) GetMap(key ParaKey) (map[string]interface{}, bool)
- func (para *Parameters) GetOrDefault(key ParaKey, val interface{}) interface{}
- func (para *Parameters) GetString(key ParaKey) (string, bool)
- func (para *Parameters) GetStringArray(key ParaKey) ([]string, bool)
- func (para *Parameters) GetStringMap(key ParaKey) (result map[string]string, ok bool)
- func (para *Parameters) GetStringOrDefault(key ParaKey, val string) string
- func (para *Parameters) GetTime(key ParaKey) (time.Time, bool)
- func (para *Parameters) Has(key ParaKey) bool
- func (para *Parameters) MustGet(key ParaKey) interface{}
- func (para *Parameters) MustGetArray(key ParaKey) []interface{}
- func (para *Parameters) MustGetBytes(key ParaKey) []byte
- func (para *Parameters) MustGetInt(key ParaKey) int
- func (para *Parameters) MustGetInt64(key ParaKey) int64
- func (para *Parameters) MustGetMap(key ParaKey) map[string]interface{}
- func (para *Parameters) MustGetString(key ParaKey) string
- func (para *Parameters) MustGetStringArray(key ParaKey) []string
- func (para *Parameters) MustGetTime(key ParaKey) time.Time
- func (para *Parameters) Set(key ParaKey, value interface{})
- type Pipeline
- func (pipe *Pipeline) Context(s *Context) *Pipeline
- func (pipe *Pipeline) CurrentProcessor() string
- func (pipe *Pipeline) End(s Processor) *Pipeline
- func (pipe *Pipeline) Error(s Processor) *Pipeline
- func (pipe *Pipeline) Filter(s Filter) *Pipeline
- func (pipe *Pipeline) GetContext() *Context
- func (pipe *Pipeline) GetID() string
- func (pipe *Pipeline) Input(s Input) *Pipeline
- func (pipe *Pipeline) Join(s Processor) *Pipeline
- func (pipe *Pipeline) Output(s Output) *Pipeline
- func (pipe *Pipeline) Pause() *Pipeline
- func (pipe *Pipeline) Resume() *Pipeline
- func (pipe *Pipeline) Run() *Context
- func (pipe *Pipeline) Run1() *Pipeline
- func (pipe *Pipeline) Start(s Processor) *Pipeline
- func (pipe *Pipeline) Start1() *Pipeline
- func (pipe *Pipeline) Stop() *Pipeline
- type PipelineConfig
- type Processor
- type ProcessorConfig
- type RunningState
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GetAllRegisteredJoints ¶
func GetAllRegisteredJoints() map[string]interface{}
func RegisterPipeJoint ¶
func RegisterPipeJoint(joint interface{})
func RegisterPipeJointWithName ¶
func RegisterPipeJointWithName(jointName string, joint interface{})
Types ¶
type ComplexProcessor ¶
type ComplexProcessor interface { Processor }
type Context ¶
type Context struct { Parameters SequenceID int64 `json:"sequence"` IsSimulate bool `json:"is_simulate"` IgnoreBroken bool `json:"ignore_broken"` Payload interface{} `json:"-"` PipelineID string // contains filtered or unexported fields }
func UnMarshall ¶
func (*Context) End ¶
func (context *Context) End(msg interface{})
End break all pipelines, but the end phrase not included
func (*Context) IsEnd ¶
IsEnd indicates whether the pipe process is end, end means no more processes will be execute
type Filter ¶
func GetFilterJointInstance ¶
func GetFilterJointInstance(cfg *ProcessorConfig) Filter
type Input ¶
func GetInputJointInstance ¶
func GetInputJointInstance(cfg *ProcessorConfig) Input
type JointType ¶
type JointType string
const FILTER JointType = "FILTER"
const INPUT JointType = "INPUT"
const OUTPUT JointType = "OUTPUT"
const PROCESSOR JointType = "PROCESSOR"
type Output ¶
func GetOutputJointInstance ¶
func GetOutputJointInstance(cfg *ProcessorConfig) Output
type Parameters ¶
type Parameters struct { Data map[string]interface{} `json:"data,omitempty"` // contains filtered or unexported fields }
func (*Parameters) Get ¶
func (para *Parameters) Get(key ParaKey) interface{}
func (*Parameters) GetArray ¶
func (para *Parameters) GetArray(key ParaKey) ([]interface{}, bool)
GetArray will return a array which type of the items are interface {}
func (*Parameters) GetInt64 ¶
func (para *Parameters) GetInt64(key ParaKey, defaultV int64) (int64, bool)
func (*Parameters) GetInt64OrDefault ¶
func (para *Parameters) GetInt64OrDefault(key ParaKey, defaultV int64) int64
func (*Parameters) GetIntOrDefault ¶
func (para *Parameters) GetIntOrDefault(key ParaKey, defaultV int) int
func (*Parameters) GetMap ¶
func (para *Parameters) GetMap(key ParaKey) (map[string]interface{}, bool)
func (*Parameters) GetOrDefault ¶
func (para *Parameters) GetOrDefault(key ParaKey, val interface{}) interface{}
func (*Parameters) GetStringArray ¶
func (para *Parameters) GetStringArray(key ParaKey) ([]string, bool)
func (*Parameters) GetStringMap ¶
func (para *Parameters) GetStringMap(key ParaKey) (result map[string]string, ok bool)
func (*Parameters) GetStringOrDefault ¶
func (para *Parameters) GetStringOrDefault(key ParaKey, val string) string
func (*Parameters) Has ¶
func (para *Parameters) Has(key ParaKey) bool
func (*Parameters) MustGet ¶
func (para *Parameters) MustGet(key ParaKey) interface{}
func (*Parameters) MustGetArray ¶
func (para *Parameters) MustGetArray(key ParaKey) []interface{}
func (*Parameters) MustGetBytes ¶
func (para *Parameters) MustGetBytes(key ParaKey) []byte
func (*Parameters) MustGetInt ¶
func (para *Parameters) MustGetInt(key ParaKey) int
MustGetInt return 0 if not key was found
func (*Parameters) MustGetInt64 ¶
func (para *Parameters) MustGetInt64(key ParaKey) int64
func (*Parameters) MustGetMap ¶
func (para *Parameters) MustGetMap(key ParaKey) map[string]interface{}
func (*Parameters) MustGetString ¶
func (para *Parameters) MustGetString(key ParaKey) string
func (*Parameters) MustGetStringArray ¶
func (para *Parameters) MustGetStringArray(key ParaKey) []string
func (*Parameters) MustGetTime ¶
func (para *Parameters) MustGetTime(key ParaKey) time.Time
func (*Parameters) Set ¶
func (para *Parameters) Set(key ParaKey, value interface{})
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
func NewPipeline ¶
func NewPipelineFromConfig ¶
func NewPipelineFromConfig(name string, config *PipelineConfig, context *Context) *Pipeline
func (*Pipeline) CurrentProcessor ¶
func (*Pipeline) GetContext ¶
type PipelineConfig ¶
type PipelineConfig struct { ID string `gorm:"not null;unique;primary_key" json:"id,omitempty" index:"id"` Name string `json:"name,omitempty" config:"name"` //TODO remove StartProcessor *ProcessorConfig `json:"start,omitempty" config:"start"` Processors []*ProcessorConfig `json:"process,omitempty" config:"process"` EndProcessor *ProcessorConfig `json:"end,omitempty" config:"end"` ErrorProcessor *ProcessorConfig `json:"error,omitempty" config:"error"` Input *ProcessorConfig `json:"input,omitempty" config:"input"` Filters []*ProcessorConfig `json:"filters,omitempty" config:"filters"` Output *ProcessorConfig `json:"output,omitempty" config:"output"` Created time.Time `json:"created,omitempty"` Updated time.Time `json:"updated,omitempty"` Tags []string `json:"tags,omitempty" config:"tags"` }
PipelineConfig config for each pipeline, a pipeline may have more than one processors
func GetStaticPipelineConfig ¶
func GetStaticPipelineConfig(pipelineID string) PipelineConfig
type Processor ¶
func GetJointInstance ¶
func GetJointInstance(cfg *ProcessorConfig) Processor
type ProcessorConfig ¶
type ProcessorConfig struct { Name string `json:"joint" config:"joint"` //the joint name Parameters map[string]interface{} `json:"parameters,omitempty" config:"parameters"` //kv parameters for this joint Enabled bool `json:"enabled" config:"enabled"` }
ProcessorConfig configs for each joint
type RunningState ¶
type RunningState string
const FINISHED RunningState = "FINISHED"
const PAUSED RunningState = "PAUSED"
const STARTED RunningState = "STARTED"
const STOPPED RunningState = "STOPPED"
Click to show internal directories.
Click to hide internal directories.