pipeline

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 16, 2024 License: AGPL-3.0 Imports: 21 Imported by: 61

Documentation

Index

Constants

View Source
const (
	// DefaultAntsPoolSize is the default capacity for a default goroutine pool.
	DefaultAntsPoolSize = math.MaxInt32

	// DefaultCleanIntervalTime is the interval time to clean up goroutines.
	DefaultCleanIntervalTime = time.Second
)
View Source
const (
	// OPENED represents that the pool is opened.
	OPENED = iota

	// CLOSED represents that the pool is closed.
	CLOSED
)

Variables

View Source
var (

	// ErrLackPoolFunc will be returned when invokers don't provide function for pool.
	ErrLackPoolFunc = errors.New("must provide function for pool")

	// ErrInvalidPoolExpiry will be returned when setting a negative number as the periodic duration to purge goroutines.
	ErrInvalidPoolExpiry = errors.New("invalid expiry for pool")

	// ErrPoolClosed will be returned when submitting task to a closed pool.
	ErrPoolClosed = errors.New("this pool has been closed")

	// ErrPoolOverload will be returned when the pool is full and no workers available.
	ErrPoolOverload = errors.New("too many goroutines blocked on submit or Nonblocking is set")

	// ErrInvalidPreAllocSize will be returned when trying to set up a negative capacity under PreAlloc mode.
	ErrInvalidPreAllocSize = errors.New("can not set up a negative capacity under PreAlloc mode")

	// ErrTimeout will be returned after the operations timed out.
	ErrTimeout = errors.New("operation timed out")
)

Functions

func AllowedFields

func AllowedFields(fields ...string) func(*config.Config) error

AllowedFields checks that only allowed fields are used in the configuration.

func Cap

func Cap() int

Cap returns the capacity of this default pool.

func Close

func Close(p Processor) error

func ExtractFilterMetadata

func ExtractFilterMetadata(filter interface{}) map[string]FilterProperty

func Free

func Free() int

Free returns the available goroutines to work.

func GetFilterMetadata

func GetFilterMetadata() util.MapStr

func GetPoolStats

func GetPoolStats() interface{}

func MutuallyExclusiveRequiredFields

func MutuallyExclusiveRequiredFields(fields ...string) func(*config.Config) error

MutuallyExclusiveRequiredFields checks that only one of the given fields is used at the same time. It is an error for none of the fields to be present.

func NewConditionList

func NewConditionList(config []conditions.Config) ([]conditions.Condition, error)

NewConditionList takes a slice of Config objects and turns them into real Condition objects.

func NewFilterConditionList

func NewFilterConditionList(config []conditions.Config) ([]conditions.Condition, error)

NewFilterConditionList takes a slice of Config objects and turns them into real Condition objects.

func Reboot

func Reboot()

Reboot reboots the default pool.

func RegisterFilterConfigMetadata

func RegisterFilterConfigMetadata(name string, filter interface{})

func RegisterFilterPlugin

func RegisterFilterPlugin(name string, constructor FilterConstructor)

func RegisterFilterPluginWithConfigMetadata

func RegisterFilterPluginWithConfigMetadata(name string, constructor FilterConstructor, filter interface{})

func RegisterProcessorPlugin

func RegisterProcessorPlugin(name string, constructor ProcessorConstructor)

func Release

func Release()

Release Closes the default pool.

func ReleaseContext

func ReleaseContext(ctx *Context)

ReleaseContext could be called concurrently Doesn't handle context lifecycle, only recycle the resources Mark the context as released, quit the pipeline loop automatically

func RequireFields

func RequireFields(fields ...string) func(*config.Config) error

RequireFields checks that the required fields are present in the configuration.

func Running

func Running() int

Running returns the number of the currently running goroutines.

func Submit

func Submit(task *Task) error

Submit submits a task to pool.

Types

type Closer

type Closer interface {
	Close() error
}

type Constructor

type Constructor func(config *config.Config) (ProcessorBase, error)

type Context

type Context struct {
	ParentContext *Context `json:"-"`

	context.Context  `json:"-"`
	param.Parameters `json:"parameters,omitempty"`

	Config PipelineConfigV2 `json:"-"`
	// contains filtered or unexported fields
}

func AcquireContext

func AcquireContext(config PipelineConfigV2) *Context

func (*Context) AddFlowProcess

func (ctx *Context) AddFlowProcess(str string)

func (*Context) CancelTask

func (ctx *Context) CancelTask()

func (*Context) Errors

func (ctx *Context) Errors() []error

func (*Context) Exit

func (ctx *Context) Exit()

Exit tells pipeline to exit

func (*Context) Failed

func (ctx *Context) Failed(err error)

func (*Context) Finished

func (ctx *Context) Finished()

func (*Context) GetCreateTime

func (ctx *Context) GetCreateTime() time.Time

func (*Context) GetEndTime

func (ctx *Context) GetEndTime() *time.Time

func (*Context) GetFlowProcess

func (ctx *Context) GetFlowProcess() []string

func (*Context) GetRequestProcess

func (ctx *Context) GetRequestProcess() []string

func (*Context) GetRunningState

func (ctx *Context) GetRunningState() RunningState

func (*Context) GetStartTime

func (ctx *Context) GetStartTime() *time.Time

func (*Context) HasError

func (ctx *Context) HasError() bool

func (*Context) ID

func (ctx *Context) ID() string

func (*Context) IsCanceled

func (ctx *Context) IsCanceled() bool

func (*Context) IsExit

func (ctx *Context) IsExit() bool

IsExit means pipeline has been manually stopped, will not running until Restart

func (*Context) IsFailed

func (ctx *Context) IsFailed() bool

func (*Context) IsLoopReleased

func (ctx *Context) IsLoopReleased() bool

func (*Context) IsPause

func (ctx *Context) IsPause() bool

func (*Context) IsReleased

func (ctx *Context) IsReleased() bool

func (*Context) Pause

func (ctx *Context) Pause()

Pause will pause the pipeline running loop until Resume called

func (*Context) RecordError

func (ctx *Context) RecordError(err error)

func (*Context) ResetContext

func (ctx *Context) ResetContext()

ResetContext only clears the context informations, doesn't modify state values

func (*Context) Restart

func (ctx *Context) Restart()

Restart marks the pipeline as ready to run

func (*Context) Resume

func (ctx *Context) Resume()

Resume recovers pipeline from Pause

func (*Context) SetLoopReleased

func (ctx *Context) SetLoopReleased()

func (*Context) ShouldContinue

func (ctx *Context) ShouldContinue() bool

should filters continue to process

func (*Context) Started

func (ctx *Context) Started()

func (*Context) Starting

func (ctx *Context) Starting()

func (*Context) Stopped

func (ctx *Context) Stopped()

func (*Context) Stopping

func (ctx *Context) Stopping()

Only STARTING/STARTED pipelines will get marked as STOPPING Other states happen when pipeline finished, no need to mark it again.

type DAGConfig

type DAGConfig struct {
	Enabled                 bool              `config:"enabled"`
	Mode                    string            `config:"mode"`
	ParallelProcessors      []*config2.Config `config:"parallel"`
	FirstFinishedProcessors []*config2.Config `config:"first"`
	AfterJoinAllProcessors  []*config2.Config `config:"join"`
	AfterAnyProcessors      []*config2.Config `config:"end"`
}

type DAGProcessor

type DAGProcessor struct {
	// contains filtered or unexported fields
}

func (DAGProcessor) Name

func (this DAGProcessor) Name() string

func (DAGProcessor) Process

func (this DAGProcessor) Process(c *Context) error

type Dag

type Dag struct {
	// contains filtered or unexported fields
}

Dag represents directed acyclic graph

func NewDAG

func NewDAG(mode string) *Dag

NewPipeline creates new DAG

func (*Dag) Any

func (dag *Dag) Any(tasks ...func()) *anyResult

func (*Dag) Parse

func (dag *Dag) Parse(dsl string) *Dag

func (*Dag) Pipeline

func (dag *Dag) Pipeline(tasks ...Processor) *pipelineResult

Pipeline executes tasks sequentially

func (*Dag) Run

func (dag *Dag) Run(ctx *Context)

Run starts the tasks It will block until all functions are done

func (*Dag) RunAsync

func (dag *Dag) RunAsync(ctx *Context, onComplete func())

RunAsync executes Run on another goroutine

func (*Dag) Spawns

func (dag *Dag) Spawns(tasks ...Processor) *spawnsResult

type Filter

type Filter interface {
	Name() string
	Filter(ctx *fasthttp.RequestCtx)
}

func NewFilterConditionRule

func NewFilterConditionRule(
	config conditions.Config,
	p Filter,
) (Filter, error)

NewFilterConditionRule returns a processor that will execute the provided processor if the condition is true.

type FilterConstructor

type FilterConstructor func(config *config.Config) (Filter, error)

func FilterConfigChecked

func FilterConfigChecked(
	constr FilterConstructor,
	checks ...func(*config.Config) error,
) FilterConstructor

func NewFilterConditional

func NewFilterConditional(
	ruleFactory FilterConstructor,
) FilterConstructor

NewFilterConditional returns a constructor suitable for registering when conditionals as a plugin.

type FilterProperty

type FilterProperty struct {
	Type         string      `config:"type" json:"type,omitempty"`
	SubType      string      `config:"sub_type" json:"sub_type,omitempty"`
	DefaultValue interface{} `config:"default_value" json:"default_value,omitempty"`
}

type Filters

type Filters struct {
	List []Filter `json:"list,omitempty"`
}

func NewFilter

func NewFilter(cfg []*config.Config) (*Filters, error)

func NewFilterList

func NewFilterList() *Filters

func (*Filters) AddFilter

func (procs *Filters) AddFilter(p Filter)

func (*Filters) AddFilters

func (procs *Filters) AddFilters(p Filters)

func (*Filters) All

func (procs *Filters) All() []Filter

func (*Filters) Close

func (procs *Filters) Close() error

func (*Filters) Filter

func (procs *Filters) Filter(ctx *fasthttp.RequestCtx)

func (*Filters) Name

func (procs *Filters) Name() string

func (*Filters) String

func (procs *Filters) String() string

type IfThenElseFilter

type IfThenElseFilter struct {
	// contains filtered or unexported fields
}

IfThenElseFilter executes one set of processors (then) if the condition is true and another set of processors (else) if the condition is false.

func NewIfElseThenFilter

func NewIfElseThenFilter(cfg *config.Config) (*IfThenElseFilter, error)

NewIfElseThenFilter construct a new IfThenElseFilter.

func (IfThenElseFilter) Filter

func (p IfThenElseFilter) Filter(ctx *fasthttp.RequestCtx)

Run checks the if condition and executes the processors attached to the then statement or the else statement based on the condition.

func (IfThenElseFilter) Name

func (p IfThenElseFilter) Name() string

func (*IfThenElseFilter) String

func (p *IfThenElseFilter) String() string

type IfThenElseProcessor

type IfThenElseProcessor struct {
	// contains filtered or unexported fields
}

IfThenElseProcessor executes one set of processors (then) if the condition is true and another set of processors (else) if the condition is false.

func NewIfElseThenProcessor

func NewIfElseThenProcessor(cfg *config.Config) (*IfThenElseProcessor, error)

NewIfElseThenProcessor construct a new IfThenElseProcessor.

func (IfThenElseProcessor) Name

func (p IfThenElseProcessor) Name() string

func (IfThenElseProcessor) Process

func (p IfThenElseProcessor) Process(ctx *Context) error

Run checks the if condition and executes the processors attached to the then statement or the else statement based on the condition.

func (*IfThenElseProcessor) String

func (p *IfThenElseProcessor) String() string

type Job

type Job struct {
	// contains filtered or unexported fields
}

Job - Each job consists of one or more tasks Each Job can runs tasks in order(Sequential) or unordered

type Logger

type Logger interface {
	// Printf must have the same semantics as log.Printf.
	Printf(format string, args ...interface{})
}

Logger is used for logging formatted messages.

type Namespace

type Namespace struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewNamespace

func NewNamespace() *Namespace

func (*Namespace) FilterPlugin

func (ns *Namespace) FilterPlugin() FilterConstructor

func (*Namespace) ProcessorConstructors

func (ns *Namespace) ProcessorConstructors() map[string]ProcessorConstructor

func (*Namespace) ProcessorPlugin

func (ns *Namespace) ProcessorPlugin() ProcessorConstructor

func (*Namespace) RegisterFilter

func (ns *Namespace) RegisterFilter(name string, factory FilterConstructor) error

func (*Namespace) RegisterProcessor

func (ns *Namespace) RegisterProcessor(name string, factory ProcessorConstructor) error

type Option

type Option func(opts *Options)

Option represents the optional function.

func WithExpiryDuration

func WithExpiryDuration(expiryDuration time.Duration) Option

WithExpiryDuration sets up the interval time of cleaning up goroutines.

func WithLogger

func WithLogger(logger Logger) Option

WithLogger sets up a customized logger.

func WithMaxBlockingTasks

func WithMaxBlockingTasks(maxBlockingTasks int) Option

WithMaxBlockingTasks sets up the maximum number of goroutines that are blocked when it reaches the capacity of pool.

func WithNonblocking

func WithNonblocking(nonblocking bool) Option

WithNonblocking indicates that pool will return nil when there is no available workers.

func WithOptions

func WithOptions(options Options) Option

WithOptions accepts the whole options config.

func WithPanicHandler

func WithPanicHandler(panicHandler func(interface{})) Option

WithPanicHandler sets up panic handler.

func WithPreAlloc

func WithPreAlloc(preAlloc bool) Option

WithPreAlloc indicates whether it should malloc for workers.

type Options

type Options struct {
	// ExpiryDuration is a period for the scavenger goroutine to clean up those expired workers,
	// the scavenger scans all workers every `ExpiryDuration` and clean up those workers that haven't been
	// used for more than `ExpiryDuration`.
	ExpiryDuration time.Duration

	// PreAlloc indicates whether to make memory pre-allocation when initializing Pool.
	PreAlloc bool

	// Max number of goroutine blocking on pool.Submit.
	// 0 (default value) means no such limit.
	MaxBlockingTasks int

	// When Nonblocking is true, Pool.Submit will never be blocked.
	// ErrPoolOverload will be returned when Pool.Submit cannot be done at once.
	// When Nonblocking is true, MaxBlockingTasks is inoperative.
	Nonblocking bool

	// PanicHandler is used to handle panics from each worker goroutine.
	// if nil, panics will be thrown out again from worker goroutines.
	PanicHandler func(interface{})

	// Logger is the customized logger for logging info, if it is not set,
	// default standard logger from log package is used.
	Logger Logger
}

Options contains all options which will be applied when instantiating an ants pool.

type PipelineConfigV2

type PipelineConfigV2 struct {
	Name           string `config:"name" json:"name,omitempty"`
	Enabled        *bool  `config:"enabled" json:"enabled,omitempty"`
	Singleton      bool   `config:"singleton" json:"singleton"`
	AutoStart      bool   `config:"auto_start" json:"auto_start"`
	KeepRunning    bool   `config:"keep_running" json:"keep_running"`
	RetryDelayInMs int    `config:"retry_delay_in_ms" json:"retry_delay_in_ms"`
	MaxRunningInMs int    `config:"max_running_in_ms" json:"max_running_in_ms"`
	Logging        struct {
		Enabled bool `config:"enabled" json:"enabled"`
	} `config:"logging" json:"logging"`
	Processors []*config.Config       `config:"processor" json:"-"`
	Labels     map[string]interface{} `config:"labels" json:"labels"`

	Transient bool `config:"-" json:"transient"`
}

func (PipelineConfigV2) Equals

func (this PipelineConfigV2) Equals(target PipelineConfigV2) bool

func (PipelineConfigV2) ProcessorsEquals

func (this PipelineConfigV2) ProcessorsEquals(target PipelineConfigV2) bool

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool accepts the tasks from client, it limits the total of goroutines to a given number by recycling goroutines.

func NewPool

func NewPool(size int, options ...Option) (*Pool, error)

NewPool generates an instance of ants pool.

func NewPoolWithTag

func NewPoolWithTag(tag string, size int, options ...Option) (*Pool, error)

func (*Pool) Cap

func (p *Pool) Cap() int

Cap returns the capacity of this pool.

func (*Pool) Free

func (p *Pool) Free() int

Free returns the amount of available goroutines to work, -1 indicates this pool is unlimited.

func (*Pool) IsClosed

func (p *Pool) IsClosed() bool

IsClosed indicates whether the pool is closed.

func (*Pool) Reboot

func (p *Pool) Reboot()

Reboot reboots a closed pool.

func (*Pool) Release

func (p *Pool) Release()

Release closes this pool and releases the worker queue.

func (*Pool) ReleaseTimeout

func (p *Pool) ReleaseTimeout(timeout time.Duration) error

ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out.

func (*Pool) Running

func (p *Pool) Running() int

Running returns the amount of the currently running goroutines.

func (*Pool) Submit

func (p *Pool) Submit(task *Task) error

func (*Pool) SubmitSimpleTask

func (p *Pool) SubmitSimpleTask(f func()) error

Submit submits a task to this pool.

Note that you are allowed to call Pool.Submit() from the current Pool.Submit(), but what calls for special attention is that you will get blocked with the latest Pool.Submit() call once the current Pool runs out of its capacity, and to avoid this, you should instantiate a Pool with ants.WithNonblocking(true).

func (*Pool) SubmitWithTag

func (p *Pool) SubmitWithTag(task *Task) error

func (*Pool) Tune

func (p *Pool) Tune(size int)

Tune changes the capacity of this pool, note that it is noneffective to the infinite or pre-allocation pool.

type PoolWithFunc

type PoolWithFunc struct {
	// contains filtered or unexported fields
}

PoolWithFunc accepts the tasks from client, it limits the total of goroutines to a given number by recycling goroutines.

func NewPoolWithFunc

func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWithFunc, error)

NewPoolWithFunc generates an instance of ants pool with a specific function.

func (*PoolWithFunc) Cap

func (p *PoolWithFunc) Cap() int

Cap returns the capacity of this pool.

func (*PoolWithFunc) Free

func (p *PoolWithFunc) Free() int

Free returns the amount of available goroutines to work, -1 indicates this pool is unlimited.

func (*PoolWithFunc) Invoke

func (p *PoolWithFunc) Invoke(args interface{}) error

Invoke submits a task to pool.

Note that you are allowed to call Pool.Invoke() from the current Pool.Invoke(), but what calls for special attention is that you will get blocked with the latest Pool.Invoke() call once the current Pool runs out of its capacity, and to avoid this, you should instantiate a PoolWithFunc with ants.WithNonblocking(true).

func (*PoolWithFunc) IsClosed

func (p *PoolWithFunc) IsClosed() bool

IsClosed indicates whether the pool is closed.

func (*PoolWithFunc) Reboot

func (p *PoolWithFunc) Reboot()

Reboot reboots a closed pool.

func (*PoolWithFunc) Release

func (p *PoolWithFunc) Release()

Release closes this pool and releases the worker queue.

func (*PoolWithFunc) ReleaseTimeout

func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error

ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out.

func (*PoolWithFunc) Running

func (p *PoolWithFunc) Running() int

Running returns the amount of the currently running goroutines.

func (*PoolWithFunc) Tune

func (p *PoolWithFunc) Tune(size int)

Tune changes the capacity of this pool, note that it is noneffective to the infinite or pre-allocation pool.

type Processor

type Processor interface {
	ProcessorBase
	Process(s *Context) error
}

func NewConditionRule

func NewConditionRule(
	config conditions.Config,
	p Processor,
) (Processor, error)

NewConditionRule returns a processor that will execute the provided processor if the condition is true.

func NewDAGProcessor

func NewDAGProcessor(c *config2.Config) (Processor, error)

type ProcessorBase

type ProcessorBase interface {
	Name() string
}

type ProcessorConstructor

type ProcessorConstructor func(config *config.Config) (Processor, error)

func NewConditional

func NewConditional(
	ruleFactory ProcessorConstructor,
) ProcessorConstructor

NewConditional returns a constructor suitable for registering when conditionals as a plugin.

func ProcessorConfigChecked

func ProcessorConfigChecked(
	constr ProcessorConstructor,
	checks ...func(*config.Config) error,
) ProcessorConstructor

type Processors

type Processors struct {
	SkipCatchError bool // skip catch internal error
	List           []Processor
}

func NewPipeline

func NewPipeline(cfg []*config.Config) (*Processors, error)

func NewPipelineList

func NewPipelineList() *Processors

func (*Processors) AddProcessor

func (procs *Processors) AddProcessor(p Processor)

func (*Processors) AddProcessors

func (procs *Processors) AddProcessors(p Processors)

func (*Processors) All

func (procs *Processors) All() []Processor

func (*Processors) Close

func (procs *Processors) Close() error

func (*Processors) Name

func (procs *Processors) Name() string

func (*Processors) Process

func (procs *Processors) Process(ctx *Context) error

func (*Processors) Release

func (procs *Processors) Release()

func (*Processors) String

func (procs *Processors) String() string

type Releaser

type Releaser interface {
	Release() error
}

type RunningState

type RunningState string
const FAILED RunningState = "FAILED"
const FINISHED RunningState = "FINISHED"
const STARTED RunningState = "STARTED"
const STARTING RunningState = "STARTING"
const STOPPED RunningState = "STOPPED"
const STOPPING RunningState = "STOPPING"

func (RunningState) IsEnded

func (s RunningState) IsEnded() bool

type StateItem

type StateItem struct {
	Steps             int64
	State             RunningState
	ContextParameters util.MapStr
	ExitErr           error
	ProcessErrs       []error
}

type Task

type Task struct {
	Handler func(ctx *Context, v ...interface{})
	Params  []interface{}
	Context *Context
}

type WhenFilter

type WhenFilter struct {
	// contains filtered or unexported fields
}

WhenFilter is a tuple of condition plus a Processor.

func (WhenFilter) Filter

func (r WhenFilter) Filter(ctx *fasthttp.RequestCtx)

Run executes this WhenFilter.

func (WhenFilter) Name

func (r WhenFilter) Name() string

func (*WhenFilter) String

func (r *WhenFilter) String() string

type WhenProcessor

type WhenProcessor struct {
	// contains filtered or unexported fields
}

WhenProcessor is a tuple of condition plus a Processor.

func (WhenProcessor) Name

func (r WhenProcessor) Name() string

func (WhenProcessor) Process

func (r WhenProcessor) Process(ctx *Context) error

Run executes this WhenProcessor.

func (*WhenProcessor) String

func (r *WhenProcessor) String() string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL