Documentation ¶
Overview ¶
dataflow package is an implementation combining workflow/tasks mananging concpets most popular in data science. this project was most inspired by nextflow and SciPipe a really awesome collection of tools from scientific-workflows, bioinformatics-pipelines, workflow-engines, most importantly is implemented in go!
Index ¶
- Variables
- func DurationPtr(t time.Duration) *time.Duration
- func Exec(tasks ...Task) bool
- func ExecWithError(tasks ...Task) error
- func HeapProcess(inputChan <-chan WorkFunction, options *Settings) <-chan OrderedOutput
- type Actuator
- type BaseActuator
- type GoroutinePool
- type Job
- type JobManager
- type Operator
- type Options
- type OrderedOutput
- type Pool
- type Processor
- type Result
- type Settings
- type Task
- type TimedActuator
- type WorkFunction
- type Worker
Constants ¶
This section is empty.
Variables ¶
var ErrorTimeOut = fmt.Errorf("TimeOut")
ErrorTimeOut is the error when executes tasks timeout
var ( // ErrorUsingActuator is the error when goroutine pool has exception ErrorUsingActuator = fmt.Errorf("ErrorUsingActuator") )
Functions ¶
func DurationPtr ¶
DurationPtr helps to make a duration ptr
func Exec ¶
Exec simply runs the tasks concurrently True will be returned is all tasks complete successfully otherwise false will be returned
func ExecWithError ¶
ExecWithError simply runs the tasks concurrently nil will be returned is all tasks complete successfully otherwise custom error will be returned
func HeapProcess ¶
func HeapProcess(inputChan <-chan WorkFunction, options *Settings) <-chan OrderedOutput
Process processes work function based on input. It Accepts an WorkFunction read channel, work function and concurrent go routine pool size. It Returns an interface{} channel.
Types ¶
type Actuator ¶
type Actuator struct {
// contains filtered or unexported fields
}
Actuator is the base struct
func NewActuator ¶
NewActuator creates an Actuator instance
func (*Actuator) ExecWithContext ¶
ExecWithContext is used to run tasks concurrently Return nil when tasks are all completed successfully, or return error when some exception happen such as timeout
func (*Actuator) GetTimeout ¶
GetTimeout return the timeout set before
type BaseActuator ¶
type BaseActuator interface { Exec(tasks ...Task) error ExecWithContext(ctx context.Context, tasks ...Task) error }
BaseActuator is the actuator interface
type GoroutinePool ¶
type GoroutinePool interface { Submit(f func()) error Release() }
GoroutinePool is the base routine pool interface User can use custom goroutine pool by implementing this interface
type JobManager ¶
JobManager is a actuator which has a worker pool
func NewJobManager ¶
func NewJobManager(workerNum int, opt ...*Options) *JobManager
NewJobManager creates an JobManager instance
func (*JobManager) Exec ¶
func (c *JobManager) Exec(tasks ...Task) error
Exec is used to run tasks concurrently
func (*JobManager) ExecWithContext ¶
func (c *JobManager) ExecWithContext(ctx context.Context, tasks ...Task) error
ExecWithContext uses goroutine pool to run tasks concurrently Return nil when tasks are all completed successfully, or return error when some exception happen such as timeout
func (*JobManager) GetTimeout ¶
func (c *JobManager) GetTimeout() *time.Duration
GetTimeout return the timeout set before
func (*JobManager) WithPool ¶
func (c *JobManager) WithPool(pool GoroutinePool) *JobManager
WithPool will support for using custom goroutine pool
type Operator ¶
type Operator interface {
Operation() (interface{}, error)
}
Interface is a type that performs an operation on itself, returning any error.
type OrderedOutput ¶
type OrderedOutput struct { Value interface{} Remaining func() int }
OrderedOutput is the output channel type from Process
type Pool ¶
type Pool struct { JobQueue chan Job // contains filtered or unexported fields }
func NewPool ¶
Will make pool of gorouting workers. numWorkers - how many workers will be created for this pool queueLen - how many jobs can we accept until we block
Returned object contains JobQueue reference, which you can use to send job to pool.
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
The Processor type manages a number of concurrent Processes.
func NewProcessor ¶
Return a new Processor to operate the function f over the number of threads specified taking input from queue and placing the result in buffer. Threads is limited by GOMAXPROCS, if threads is greater GOMAXPROCS or less than 1 then threads is set to GOMAXPROCS.
type TimedActuator ¶
type TimedActuator interface { BaseActuator GetTimeout() *time.Duration // contains filtered or unexported methods }
TimedActuator is the actuator interface within timeout method