pipeline

package
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 4, 2021 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetAllRegisteredJoints

func GetAllRegisteredJoints() map[string]interface{}

func RegisterPipeJoint

func RegisterPipeJoint(joint interface{})

func RegisterPipeJointWithName

func RegisterPipeJointWithName(jointName string, joint interface{})

Types

type ComplexProcessor

type ComplexProcessor interface {
	Processor
}

type Context

type Context struct {
	Parameters

	SequenceID   int64       `json:"sequence"`
	IsSimulate   bool        `json:"is_simulate"`
	IgnoreBroken bool        `json:"ignore_broken"`
	Payload      interface{} `json:"-"`

	PipelineID string
	// contains filtered or unexported fields
}

func UnMarshall

func UnMarshall(b []byte) Context

func (*Context) End

func (context *Context) End(msg interface{})

End break all pipelines, but the end phrase not included

func (*Context) Exit

func (context *Context) Exit(msg interface{})

Exit tells pipeline to exit

func (*Context) IsEnd

func (context *Context) IsEnd() bool

IsEnd indicates whether the pipe process is end, end means no more processes will be execute

func (*Context) IsExit

func (context *Context) IsExit() bool

IsExit means all pipelines will be broke and jump to outside, even the end phrase will not be executed as well

func (*Context) IsPause

func (context *Context) IsPause() bool

func (*Context) Marshall

func (context *Context) Marshall() []byte

func (*Context) Pause

func (context *Context) Pause()

func (*Context) Resume

func (context *Context) Resume()

type Filter

type Filter interface {
	Joint
	Filter([]byte) error
}

func GetFilterJointInstance

func GetFilterJointInstance(cfg *ProcessorConfig) Filter

type Input

type Input interface {
	Joint
	Open() error
	Close() error
	Read() ([]byte, error)
}

func GetInputJointInstance

func GetInputJointInstance(cfg *ProcessorConfig) Input

type Joint

type Joint interface {
	Name() string
}

type JointType

type JointType string
const FILTER JointType = "FILTER"
const INPUT JointType = "INPUT"
const OUTPUT JointType = "OUTPUT"
const PROCESSOR JointType = "PROCESSOR"

type Output

type Output interface {
	Joint
	Open() error
	Close() error
	Write([]byte) error
}

func GetOutputJointInstance

func GetOutputJointInstance(cfg *ProcessorConfig) Output

type ParaKey

type ParaKey string

type Parameters

type Parameters struct {
	Data map[string]interface{} `json:"data,omitempty"`
	// contains filtered or unexported fields
}

func (*Parameters) Get

func (para *Parameters) Get(key ParaKey) interface{}

func (*Parameters) GetArray

func (para *Parameters) GetArray(key ParaKey) ([]interface{}, bool)

GetArray will return a array which type of the items are interface {}

func (*Parameters) GetBool

func (para *Parameters) GetBool(key ParaKey, defaultV bool) bool

func (*Parameters) GetBytes

func (para *Parameters) GetBytes(key ParaKey) ([]byte, bool)

func (*Parameters) GetInt

func (para *Parameters) GetInt(key ParaKey, defaultV int) (int, bool)

func (*Parameters) GetInt64

func (para *Parameters) GetInt64(key ParaKey, defaultV int64) (int64, bool)

func (*Parameters) GetInt64OrDefault

func (para *Parameters) GetInt64OrDefault(key ParaKey, defaultV int64) int64

func (*Parameters) GetIntOrDefault

func (para *Parameters) GetIntOrDefault(key ParaKey, defaultV int) int

func (*Parameters) GetMap

func (para *Parameters) GetMap(key ParaKey) (map[string]interface{}, bool)

func (*Parameters) GetOrDefault

func (para *Parameters) GetOrDefault(key ParaKey, val interface{}) interface{}

func (*Parameters) GetString

func (para *Parameters) GetString(key ParaKey) (string, bool)

func (*Parameters) GetStringArray

func (para *Parameters) GetStringArray(key ParaKey) ([]string, bool)

func (*Parameters) GetStringMap

func (para *Parameters) GetStringMap(key ParaKey) (result map[string]string, ok bool)

func (*Parameters) GetStringOrDefault

func (para *Parameters) GetStringOrDefault(key ParaKey, val string) string

func (*Parameters) GetTime

func (para *Parameters) GetTime(key ParaKey) (time.Time, bool)

func (*Parameters) Has

func (para *Parameters) Has(key ParaKey) bool

func (*Parameters) MustGet

func (para *Parameters) MustGet(key ParaKey) interface{}

func (*Parameters) MustGetArray

func (para *Parameters) MustGetArray(key ParaKey) []interface{}

func (*Parameters) MustGetBytes

func (para *Parameters) MustGetBytes(key ParaKey) []byte

func (*Parameters) MustGetInt

func (para *Parameters) MustGetInt(key ParaKey) int

MustGetInt return 0 if not key was found

func (*Parameters) MustGetInt64

func (para *Parameters) MustGetInt64(key ParaKey) int64

func (*Parameters) MustGetMap

func (para *Parameters) MustGetMap(key ParaKey) map[string]interface{}

func (*Parameters) MustGetString

func (para *Parameters) MustGetString(key ParaKey) string

func (*Parameters) MustGetStringArray

func (para *Parameters) MustGetStringArray(key ParaKey) []string

func (*Parameters) MustGetTime

func (para *Parameters) MustGetTime(key ParaKey) time.Time

func (*Parameters) Set

func (para *Parameters) Set(key ParaKey, value interface{})

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

func NewPipeline

func NewPipeline(name string) *Pipeline

func NewPipelineFromConfig

func NewPipelineFromConfig(name string, config *PipelineConfig, context *Context) *Pipeline

func (*Pipeline) Context

func (pipe *Pipeline) Context(s *Context) *Pipeline

func (*Pipeline) CurrentProcessor

func (pipe *Pipeline) CurrentProcessor() string

func (*Pipeline) End

func (pipe *Pipeline) End(s Processor) *Pipeline

func (*Pipeline) Error

func (pipe *Pipeline) Error(s Processor) *Pipeline

func (*Pipeline) Filter

func (pipe *Pipeline) Filter(s Filter) *Pipeline

func (*Pipeline) GetContext

func (pipe *Pipeline) GetContext() *Context

func (*Pipeline) GetID

func (pipe *Pipeline) GetID() string

func (*Pipeline) Input

func (pipe *Pipeline) Input(s Input) *Pipeline

func (*Pipeline) Join

func (pipe *Pipeline) Join(s Processor) *Pipeline

func (*Pipeline) Output

func (pipe *Pipeline) Output(s Output) *Pipeline

func (*Pipeline) Pause

func (pipe *Pipeline) Pause() *Pipeline

func (*Pipeline) Resume

func (pipe *Pipeline) Resume() *Pipeline

func (*Pipeline) Run

func (pipe *Pipeline) Run() *Context

func (*Pipeline) Run1

func (pipe *Pipeline) Run1() *Pipeline

func (*Pipeline) Start

func (pipe *Pipeline) Start(s Processor) *Pipeline

func (*Pipeline) Start1

func (pipe *Pipeline) Start1() *Pipeline

func (*Pipeline) Stop

func (pipe *Pipeline) Stop() *Pipeline

type PipelineConfig

type PipelineConfig struct {
	ID   string `gorm:"not null;unique;primary_key" json:"id,omitempty" index:"id"`
	Name string `json:"name,omitempty" config:"name"`

	//TODO remove
	StartProcessor *ProcessorConfig   `json:"start,omitempty" config:"start"`
	Processors     []*ProcessorConfig `json:"process,omitempty" config:"process"`
	EndProcessor   *ProcessorConfig   `json:"end,omitempty" config:"end"`
	ErrorProcessor *ProcessorConfig   `json:"error,omitempty" config:"error"`

	Input   *ProcessorConfig   `json:"input,omitempty" config:"input"`
	Filters []*ProcessorConfig `json:"filters,omitempty" config:"filters"`
	Output  *ProcessorConfig   `json:"output,omitempty" config:"output"`

	Created time.Time `json:"created,omitempty"`
	Updated time.Time `json:"updated,omitempty"`
	Tags    []string  `json:"tags,omitempty" config:"tags"`
}

PipelineConfig config for each pipeline, a pipeline may have more than one processors

func GetStaticPipelineConfig

func GetStaticPipelineConfig(pipelineID string) PipelineConfig

type Processor

type Processor interface {
	Joint
	Process(s *Context) error
}

func GetJointInstance

func GetJointInstance(cfg *ProcessorConfig) Processor

type ProcessorConfig

type ProcessorConfig struct {
	Name       string                 `json:"joint" config:"joint"`                     //the joint name
	Parameters map[string]interface{} `json:"parameters,omitempty" config:"parameters"` //kv parameters for this joint
	Enabled    bool                   `json:"enabled" config:"enabled"`
}

ProcessorConfig configs for each joint

type RunningState

type RunningState string
const FINISHED RunningState = "FINISHED"
const PAUSED RunningState = "PAUSED"
const STARTED RunningState = "STARTED"
const STOPPED RunningState = "STOPPED"

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL