Documentation ¶
Index ¶
- Constants
- Variables
- func AllowedFields(fields ...string) func(*config.Config) error
- func Cap() int
- func Close(p Processor) error
- func ExtractFilterMetadata(filter interface{}) map[string]FilterProperty
- func Free() int
- func GetFilterMetadata() util.MapStr
- func GetPoolStats() interface{}
- func MutuallyExclusiveRequiredFields(fields ...string) func(*config.Config) error
- func NewConditionList(config []conditions.Config) ([]conditions.Condition, error)
- func NewFilterConditionList(config []conditions.Config) ([]conditions.Condition, error)
- func Reboot()
- func RegisterFilterConfigMetadata(name string, filter interface{})
- func RegisterFilterPlugin(name string, constructor FilterConstructor)
- func RegisterFilterPluginWithConfigMetadata(name string, constructor FilterConstructor, filter interface{})
- func RegisterProcessorPlugin(name string, constructor ProcessorConstructor)
- func Release()
- func ReleaseContext(ctx *Context)
- func RequireFields(fields ...string) func(*config.Config) error
- func Running() int
- func Submit(task *Task) error
- type Closer
- type Constructor
- type Context
- func (ctx *Context) AddFlowProcess(str string)
- func (ctx *Context) CancelTask()
- func (ctx *Context) Errors() []error
- func (ctx *Context) Exit()
- func (ctx *Context) Failed(err error)
- func (ctx *Context) Finished()
- func (ctx *Context) GetCreateTime() time.Time
- func (ctx *Context) GetEndTime() *time.Time
- func (ctx *Context) GetFlowProcess() []string
- func (ctx *Context) GetRequestProcess() []string
- func (ctx *Context) GetRunningState() RunningState
- func (ctx *Context) GetStartTime() *time.Time
- func (ctx *Context) HasError() bool
- func (ctx *Context) ID() string
- func (ctx *Context) IsCanceled() bool
- func (ctx *Context) IsExit() bool
- func (ctx *Context) IsFailed() bool
- func (ctx *Context) IsLoopReleased() bool
- func (ctx *Context) IsPause() bool
- func (ctx *Context) IsReleased() bool
- func (ctx *Context) Pause()
- func (ctx *Context) RecordError(err error)
- func (ctx *Context) ResetContext()
- func (ctx *Context) Restart()
- func (ctx *Context) Resume()
- func (ctx *Context) SetLoopReleased()
- func (ctx *Context) ShouldContinue() bool
- func (ctx *Context) Started()
- func (ctx *Context) Starting()
- func (ctx *Context) Stopped()
- func (ctx *Context) Stopping()
- type DAGConfig
- type DAGProcessor
- type Dag
- type Filter
- type FilterConstructor
- type FilterProperty
- type Filters
- type IfThenElseFilter
- type IfThenElseProcessor
- type Job
- type Logger
- type Namespace
- func (ns *Namespace) FilterPlugin() FilterConstructor
- func (ns *Namespace) ProcessorConstructors() map[string]ProcessorConstructor
- func (ns *Namespace) ProcessorPlugin() ProcessorConstructor
- func (ns *Namespace) RegisterFilter(name string, factory FilterConstructor) error
- func (ns *Namespace) RegisterProcessor(name string, factory ProcessorConstructor) error
- type Option
- func WithExpiryDuration(expiryDuration time.Duration) Option
- func WithLogger(logger Logger) Option
- func WithMaxBlockingTasks(maxBlockingTasks int) Option
- func WithNonblocking(nonblocking bool) Option
- func WithOptions(options Options) Option
- func WithPanicHandler(panicHandler func(interface{})) Option
- func WithPreAlloc(preAlloc bool) Option
- type Options
- type PipelineConfigV2
- type Pool
- func (p *Pool) Cap() int
- func (p *Pool) Free() int
- func (p *Pool) IsClosed() bool
- func (p *Pool) Reboot()
- func (p *Pool) Release()
- func (p *Pool) ReleaseTimeout(timeout time.Duration) error
- func (p *Pool) Running() int
- func (p *Pool) Submit(task *Task) error
- func (p *Pool) SubmitSimpleTask(f func()) error
- func (p *Pool) SubmitWithTag(task *Task) error
- func (p *Pool) Tune(size int)
- type PoolWithFunc
- func (p *PoolWithFunc) Cap() int
- func (p *PoolWithFunc) Free() int
- func (p *PoolWithFunc) Invoke(args interface{}) error
- func (p *PoolWithFunc) IsClosed() bool
- func (p *PoolWithFunc) Reboot()
- func (p *PoolWithFunc) Release()
- func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error
- func (p *PoolWithFunc) Running() int
- func (p *PoolWithFunc) Tune(size int)
- type Processor
- type ProcessorBase
- type ProcessorConstructor
- type Processors
- func (procs *Processors) AddProcessor(p Processor)
- func (procs *Processors) AddProcessors(p Processors)
- func (procs *Processors) All() []Processor
- func (procs *Processors) Close() error
- func (procs *Processors) Name() string
- func (procs *Processors) Process(ctx *Context) error
- func (procs *Processors) Release()
- func (procs *Processors) String() string
- type Releaser
- type RunningState
- type StateItem
- type Task
- type WhenFilter
- type WhenProcessor
Constants ¶
const ( // DefaultAntsPoolSize is the default capacity for a default goroutine pool. DefaultAntsPoolSize = math.MaxInt32 // DefaultCleanIntervalTime is the interval time to clean up goroutines. DefaultCleanIntervalTime = time.Second )
const ( // OPENED represents that the pool is opened. OPENED = iota // CLOSED represents that the pool is closed. CLOSED )
Variables ¶
var ( // ErrLackPoolFunc will be returned when invokers don't provide function for pool. ErrLackPoolFunc = errors.New("must provide function for pool") // ErrInvalidPoolExpiry will be returned when setting a negative number as the periodic duration to purge goroutines. ErrInvalidPoolExpiry = errors.New("invalid expiry for pool") // ErrPoolClosed will be returned when submitting task to a closed pool. ErrPoolClosed = errors.New("this pool has been closed") // ErrPoolOverload will be returned when the pool is full and no workers available. ErrPoolOverload = errors.New("too many goroutines blocked on submit or Nonblocking is set") // ErrInvalidPreAllocSize will be returned when trying to set up a negative capacity under PreAlloc mode. ErrInvalidPreAllocSize = errors.New("can not set up a negative capacity under PreAlloc mode") // ErrTimeout will be returned after the operations timed out. ErrTimeout = errors.New("operation timed out") )
Functions ¶
func AllowedFields ¶
AllowedFields checks that only allowed fields are used in the configuration.
func ExtractFilterMetadata ¶
func ExtractFilterMetadata(filter interface{}) map[string]FilterProperty
func GetFilterMetadata ¶
func GetPoolStats ¶
func GetPoolStats() interface{}
func MutuallyExclusiveRequiredFields ¶
MutuallyExclusiveRequiredFields checks that only one of the given fields is used at the same time. It is an error for none of the fields to be present.
func NewConditionList ¶
func NewConditionList(config []conditions.Config) ([]conditions.Condition, error)
NewConditionList takes a slice of Config objects and turns them into real Condition objects.
func NewFilterConditionList ¶
func NewFilterConditionList(config []conditions.Config) ([]conditions.Condition, error)
NewFilterConditionList takes a slice of Config objects and turns them into real Condition objects.
func RegisterFilterConfigMetadata ¶
func RegisterFilterConfigMetadata(name string, filter interface{})
func RegisterFilterPlugin ¶
func RegisterFilterPlugin(name string, constructor FilterConstructor)
func RegisterFilterPluginWithConfigMetadata ¶
func RegisterFilterPluginWithConfigMetadata(name string, constructor FilterConstructor, filter interface{})
func RegisterProcessorPlugin ¶
func RegisterProcessorPlugin(name string, constructor ProcessorConstructor)
func ReleaseContext ¶
func ReleaseContext(ctx *Context)
ReleaseContext could be called concurrently Doesn't handle context lifecycle, only recycle the resources Mark the context as released, quit the pipeline loop automatically
func RequireFields ¶
RequireFields checks that the required fields are present in the configuration.
Types ¶
type Constructor ¶
type Constructor func(config *config.Config) (ProcessorBase, error)
type Context ¶
type Context struct { ParentContext *Context `json:"-"` context.Context `json:"-"` param.Parameters `json:"parameters,omitempty"` Config PipelineConfigV2 `json:"-"` // contains filtered or unexported fields }
func AcquireContext ¶
func AcquireContext(config PipelineConfigV2) *Context
func (*Context) AddFlowProcess ¶
func (*Context) CancelTask ¶
func (ctx *Context) CancelTask()
func (*Context) GetCreateTime ¶
func (*Context) GetEndTime ¶
func (*Context) GetFlowProcess ¶
func (*Context) GetRequestProcess ¶
func (*Context) GetRunningState ¶
func (ctx *Context) GetRunningState() RunningState
func (*Context) GetStartTime ¶
func (*Context) IsCanceled ¶
func (*Context) IsExit ¶
IsExit means pipeline has been manually stopped, will not running until Restart
func (*Context) IsLoopReleased ¶
func (*Context) IsReleased ¶
func (*Context) Pause ¶
func (ctx *Context) Pause()
Pause will pause the pipeline running loop until Resume called
func (*Context) RecordError ¶
func (*Context) ResetContext ¶
func (ctx *Context) ResetContext()
ResetContext only clears the context informations, doesn't modify state values
func (*Context) SetLoopReleased ¶
func (ctx *Context) SetLoopReleased()
func (*Context) ShouldContinue ¶
should filters continue to process
type DAGConfig ¶
type DAGConfig struct { Enabled bool `config:"enabled"` Mode string `config:"mode"` ParallelProcessors []*config2.Config `config:"parallel"` FirstFinishedProcessors []*config2.Config `config:"first"` AfterJoinAllProcessors []*config2.Config `config:"join"` AfterAnyProcessors []*config2.Config `config:"end"` }
type DAGProcessor ¶
type DAGProcessor struct {
// contains filtered or unexported fields
}
func (DAGProcessor) Name ¶
func (this DAGProcessor) Name() string
func (DAGProcessor) Process ¶
func (this DAGProcessor) Process(c *Context) error
type Dag ¶
type Dag struct {
// contains filtered or unexported fields
}
Dag represents directed acyclic graph
type Filter ¶
type Filter interface { Name() string Filter(ctx *fasthttp.RequestCtx) }
func NewFilterConditionRule ¶
func NewFilterConditionRule( config conditions.Config, p Filter, ) (Filter, error)
NewFilterConditionRule returns a processor that will execute the provided processor if the condition is true.
type FilterConstructor ¶
func FilterConfigChecked ¶
func FilterConfigChecked( constr FilterConstructor, checks ...func(*config.Config) error, ) FilterConstructor
func NewFilterConditional ¶
func NewFilterConditional( ruleFactory FilterConstructor, ) FilterConstructor
NewFilterConditional returns a constructor suitable for registering when conditionals as a plugin.
type FilterProperty ¶
type Filters ¶
type Filters struct {
List []Filter `json:"list,omitempty"`
}
func NewFilterList ¶
func NewFilterList() *Filters
func (*Filters) AddFilters ¶
func (*Filters) Filter ¶
func (procs *Filters) Filter(ctx *fasthttp.RequestCtx)
type IfThenElseFilter ¶
type IfThenElseFilter struct {
// contains filtered or unexported fields
}
IfThenElseFilter executes one set of processors (then) if the condition is true and another set of processors (else) if the condition is false.
func NewIfElseThenFilter ¶
func NewIfElseThenFilter(cfg *config.Config) (*IfThenElseFilter, error)
NewIfElseThenFilter construct a new IfThenElseFilter.
func (IfThenElseFilter) Filter ¶
func (p IfThenElseFilter) Filter(ctx *fasthttp.RequestCtx)
Run checks the if condition and executes the processors attached to the then statement or the else statement based on the condition.
func (IfThenElseFilter) Name ¶
func (p IfThenElseFilter) Name() string
func (*IfThenElseFilter) String ¶
func (p *IfThenElseFilter) String() string
type IfThenElseProcessor ¶
type IfThenElseProcessor struct {
// contains filtered or unexported fields
}
IfThenElseProcessor executes one set of processors (then) if the condition is true and another set of processors (else) if the condition is false.
func NewIfElseThenProcessor ¶
func NewIfElseThenProcessor(cfg *config.Config) (*IfThenElseProcessor, error)
NewIfElseThenProcessor construct a new IfThenElseProcessor.
func (IfThenElseProcessor) Name ¶
func (p IfThenElseProcessor) Name() string
func (IfThenElseProcessor) Process ¶
func (p IfThenElseProcessor) Process(ctx *Context) error
Run checks the if condition and executes the processors attached to the then statement or the else statement based on the condition.
func (*IfThenElseProcessor) String ¶
func (p *IfThenElseProcessor) String() string
type Job ¶
type Job struct {
// contains filtered or unexported fields
}
Job - Each job consists of one or more tasks Each Job can runs tasks in order(Sequential) or unordered
type Logger ¶
type Logger interface { // Printf must have the same semantics as log.Printf. Printf(format string, args ...interface{}) }
Logger is used for logging formatted messages.
type Namespace ¶
func NewNamespace ¶
func NewNamespace() *Namespace
func (*Namespace) FilterPlugin ¶
func (ns *Namespace) FilterPlugin() FilterConstructor
func (*Namespace) ProcessorConstructors ¶
func (ns *Namespace) ProcessorConstructors() map[string]ProcessorConstructor
func (*Namespace) ProcessorPlugin ¶
func (ns *Namespace) ProcessorPlugin() ProcessorConstructor
func (*Namespace) RegisterFilter ¶
func (ns *Namespace) RegisterFilter(name string, factory FilterConstructor) error
func (*Namespace) RegisterProcessor ¶
func (ns *Namespace) RegisterProcessor(name string, factory ProcessorConstructor) error
type Option ¶
type Option func(opts *Options)
Option represents the optional function.
func WithExpiryDuration ¶
WithExpiryDuration sets up the interval time of cleaning up goroutines.
func WithMaxBlockingTasks ¶
WithMaxBlockingTasks sets up the maximum number of goroutines that are blocked when it reaches the capacity of pool.
func WithNonblocking ¶
WithNonblocking indicates that pool will return nil when there is no available workers.
func WithOptions ¶
WithOptions accepts the whole options config.
func WithPanicHandler ¶
func WithPanicHandler(panicHandler func(interface{})) Option
WithPanicHandler sets up panic handler.
func WithPreAlloc ¶
WithPreAlloc indicates whether it should malloc for workers.
type Options ¶
type Options struct { // ExpiryDuration is a period for the scavenger goroutine to clean up those expired workers, // the scavenger scans all workers every `ExpiryDuration` and clean up those workers that haven't been // used for more than `ExpiryDuration`. ExpiryDuration time.Duration // PreAlloc indicates whether to make memory pre-allocation when initializing Pool. PreAlloc bool // Max number of goroutine blocking on pool.Submit. // 0 (default value) means no such limit. MaxBlockingTasks int // When Nonblocking is true, Pool.Submit will never be blocked. // ErrPoolOverload will be returned when Pool.Submit cannot be done at once. // When Nonblocking is true, MaxBlockingTasks is inoperative. Nonblocking bool // PanicHandler is used to handle panics from each worker goroutine. // if nil, panics will be thrown out again from worker goroutines. PanicHandler func(interface{}) // Logger is the customized logger for logging info, if it is not set, // default standard logger from log package is used. Logger Logger }
Options contains all options which will be applied when instantiating an ants pool.
type PipelineConfigV2 ¶
type PipelineConfigV2 struct { Name string `config:"name" json:"name,omitempty"` Enabled *bool `config:"enabled" json:"enabled,omitempty"` Singleton bool `config:"singleton" json:"singleton"` AutoStart bool `config:"auto_start" json:"auto_start"` KeepRunning bool `config:"keep_running" json:"keep_running"` RetryDelayInMs int `config:"retry_delay_in_ms" json:"retry_delay_in_ms"` MaxRunningInMs int `config:"max_running_in_ms" json:"max_running_in_ms"` Logging struct { Enabled bool `config:"enabled" json:"enabled"` } `config:"logging" json:"logging"` Processors []*config.Config `config:"processor" json:"-"` Labels map[string]interface{} `config:"labels" json:"labels"` Transient bool `config:"-" json:"transient"` }
func (PipelineConfigV2) Equals ¶
func (this PipelineConfigV2) Equals(target PipelineConfigV2) bool
func (PipelineConfigV2) ProcessorsEquals ¶
func (this PipelineConfigV2) ProcessorsEquals(target PipelineConfigV2) bool
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
Pool accepts the tasks from client, it limits the total of goroutines to a given number by recycling goroutines.
func (*Pool) Free ¶
Free returns the amount of available goroutines to work, -1 indicates this pool is unlimited.
func (*Pool) Release ¶
func (p *Pool) Release()
Release closes this pool and releases the worker queue.
func (*Pool) ReleaseTimeout ¶
ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out.
func (*Pool) SubmitSimpleTask ¶
Submit submits a task to this pool.
Note that you are allowed to call Pool.Submit() from the current Pool.Submit(), but what calls for special attention is that you will get blocked with the latest Pool.Submit() call once the current Pool runs out of its capacity, and to avoid this, you should instantiate a Pool with ants.WithNonblocking(true).
func (*Pool) SubmitWithTag ¶
type PoolWithFunc ¶
type PoolWithFunc struct {
// contains filtered or unexported fields
}
PoolWithFunc accepts the tasks from client, it limits the total of goroutines to a given number by recycling goroutines.
func NewPoolWithFunc ¶
func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWithFunc, error)
NewPoolWithFunc generates an instance of ants pool with a specific function.
func (*PoolWithFunc) Free ¶
func (p *PoolWithFunc) Free() int
Free returns the amount of available goroutines to work, -1 indicates this pool is unlimited.
func (*PoolWithFunc) Invoke ¶
func (p *PoolWithFunc) Invoke(args interface{}) error
Invoke submits a task to pool.
Note that you are allowed to call Pool.Invoke() from the current Pool.Invoke(), but what calls for special attention is that you will get blocked with the latest Pool.Invoke() call once the current Pool runs out of its capacity, and to avoid this, you should instantiate a PoolWithFunc with ants.WithNonblocking(true).
func (*PoolWithFunc) IsClosed ¶
func (p *PoolWithFunc) IsClosed() bool
IsClosed indicates whether the pool is closed.
func (*PoolWithFunc) Release ¶
func (p *PoolWithFunc) Release()
Release closes this pool and releases the worker queue.
func (*PoolWithFunc) ReleaseTimeout ¶
func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error
ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out.
func (*PoolWithFunc) Running ¶
func (p *PoolWithFunc) Running() int
Running returns the amount of the currently running goroutines.
func (*PoolWithFunc) Tune ¶
func (p *PoolWithFunc) Tune(size int)
Tune changes the capacity of this pool, note that it is noneffective to the infinite or pre-allocation pool.
type Processor ¶
type Processor interface { ProcessorBase Process(s *Context) error }
func NewConditionRule ¶
func NewConditionRule( config conditions.Config, p Processor, ) (Processor, error)
NewConditionRule returns a processor that will execute the provided processor if the condition is true.
type ProcessorBase ¶
type ProcessorBase interface {
Name() string
}
type ProcessorConstructor ¶
func NewConditional ¶
func NewConditional( ruleFactory ProcessorConstructor, ) ProcessorConstructor
NewConditional returns a constructor suitable for registering when conditionals as a plugin.
func ProcessorConfigChecked ¶
func ProcessorConfigChecked( constr ProcessorConstructor, checks ...func(*config.Config) error, ) ProcessorConstructor
type Processors ¶
func NewPipeline ¶
func NewPipeline(cfg []*config.Config) (*Processors, error)
func NewPipelineList ¶
func NewPipelineList() *Processors
func (*Processors) AddProcessor ¶
func (procs *Processors) AddProcessor(p Processor)
func (*Processors) AddProcessors ¶
func (procs *Processors) AddProcessors(p Processors)
func (*Processors) All ¶
func (procs *Processors) All() []Processor
func (*Processors) Close ¶
func (procs *Processors) Close() error
func (*Processors) Name ¶
func (procs *Processors) Name() string
func (*Processors) Process ¶
func (procs *Processors) Process(ctx *Context) error
func (*Processors) Release ¶
func (procs *Processors) Release()
func (*Processors) String ¶
func (procs *Processors) String() string
type RunningState ¶
type RunningState string
const FAILED RunningState = "FAILED"
const FINISHED RunningState = "FINISHED"
const STARTED RunningState = "STARTED"
const STARTING RunningState = "STARTING"
const STOPPED RunningState = "STOPPED"
const STOPPING RunningState = "STOPPING"
func (RunningState) IsEnded ¶
func (s RunningState) IsEnded() bool
type WhenFilter ¶
type WhenFilter struct {
// contains filtered or unexported fields
}
WhenFilter is a tuple of condition plus a Processor.
func (WhenFilter) Filter ¶
func (r WhenFilter) Filter(ctx *fasthttp.RequestCtx)
Run executes this WhenFilter.
func (WhenFilter) Name ¶
func (r WhenFilter) Name() string
func (*WhenFilter) String ¶
func (r *WhenFilter) String() string
type WhenProcessor ¶
type WhenProcessor struct {
// contains filtered or unexported fields
}
WhenProcessor is a tuple of condition plus a Processor.
func (WhenProcessor) Name ¶
func (r WhenProcessor) Name() string
func (WhenProcessor) Process ¶
func (r WhenProcessor) Process(ctx *Context) error
Run executes this WhenProcessor.
func (*WhenProcessor) String ¶
func (r *WhenProcessor) String() string
Source Files ¶
- async-runner.go
- checks.go
- conditionals.go
- config.go
- context.go
- dag.go
- dag_processor.go
- dsl.go
- filter.go
- filter_conditionals.go
- job.go
- options.go
- pool.go
- pool_func.go
- pools.go
- processor.go
- register.go
- runner.go
- sync-runner.go
- worker.go
- worker_array.go
- worker_func.go
- worker_loop_queue.go
- worker_stack.go