Documentation ¶
Overview ¶
Package flow provides utilities to construct a directed acyclic computational graph that is then executed and monitored with maximum parallelism.
Index ¶
- Variables
- func Causes(err error) *multierror.Error
- func Errors(err error) *multierror.Error
- func WasCanceled(err error) bool
- type ErrorCleaner
- type Flow
- type Graph
- type Opts
- type ProgressReporter
- type ProgressReporterFn
- type RecoverFn
- type Stats
- type Task
- type TaskFn
- func (t TaskFn) DoIf(condition bool) TaskFn
- func (t TaskFn) Recover(recoverFn RecoverFn) TaskFn
- func (t TaskFn) RetryUntilTimeout(interval, timeout time.Duration) TaskFn
- func (t TaskFn) SkipIf(condition bool) TaskFn
- func (t TaskFn) Timeout(timeout time.Duration) TaskFn
- func (t TaskFn) ToRecoverFn() RecoverFn
- type TaskID
- type TaskIDSlice
- type TaskIDer
- type TaskIDs
- func (t TaskIDs) Copy() TaskIDs
- func (t TaskIDs) Delete(iders ...TaskIDer) TaskIDs
- func (t TaskIDs) Has(id TaskID) bool
- func (t TaskIDs) Insert(iders ...TaskIDer) TaskIDs
- func (t TaskIDs) InsertIf(condition bool, iders ...TaskIDer) TaskIDs
- func (t TaskIDs) Len() int
- func (t TaskIDs) List() TaskIDSlice
- func (t TaskIDs) StringList() []string
- func (t TaskIDs) TaskIDs() []TaskID
- func (t TaskIDs) UnsortedList() TaskIDSlice
- func (t TaskIDs) UnsortedStringList() []string
- type TaskSpec
- type Tasks
Constants ¶
This section is empty.
Variables ¶
var ( // ContextWithTimeout is context.WithTimeout. Exposed for testing. ContextWithTimeout = context.WithTimeout )
Functions ¶
func Causes ¶
func Causes(err error) *multierror.Error
Causes reports the causes of all Task errors of the given Flow error.
func Errors ¶
func Errors(err error) *multierror.Error
Errors reports all wrapped Task errors of the given Flow error.
func WasCanceled ¶
WasCanceled determines whether the given flow error was caused by cancellation.
Types ¶
type ErrorCleaner ¶
ErrorCleaner is called when a task which errored during the previous reconciliation phase completes with success
type Flow ¶
type Flow struct {
// contains filtered or unexported fields
}
Flow is a validated executable Graph.
type Graph ¶
type Graph struct {
// contains filtered or unexported fields
}
Graph is a builder for a Flow.
func (*Graph) Add ¶
Add adds the given Task to the graph. This panics if - There is already a Task present with the same name - One of the dependencies of the Task is not present
type Opts ¶
type Opts struct { Logger logrus.FieldLogger ProgressReporter ProgressReporter ErrorCleaner func(ctx context.Context, taskID string) ErrorContext *utilerrors.ErrorContext Context context.Context }
Opts are options for a Flow execution. If they are not set, they are left blank and don't affect the Flow.
type ProgressReporter ¶
type ProgressReporter interface { // Start starts the progress reporter. Start(context.Context) error // Stop stops the progress reporter. Stop() // Report reports the progress using the current statistics. Report(context.Context, *Stats) }
ProgressReporter is used to report the current progress of a flow.
func NewDelayingProgressReporter ¶
func NewDelayingProgressReporter(reporterFn ProgressReporterFn, period time.Duration) ProgressReporter
NewDelayingProgressReporter returns a new progress reporter with the given function and the configured period. A period of `0` will lead to immediate reports as soon as flow tasks are completed.
func NewImmediateProgressReporter ¶
func NewImmediateProgressReporter(reporterFn ProgressReporterFn) ProgressReporter
NewImmediateProgressReporter returns a new progress reporter with the given function.
type ProgressReporterFn ¶
ProgressReporterFn is continuously called on progress in a flow.
type Stats ¶
type Stats struct { FlowName string All TaskIDs Succeeded TaskIDs Failed TaskIDs Running TaskIDs Pending TaskIDs }
Stats are the statistics of a Flow execution.
func InitialStats ¶
InitialStats creates a new Stats object with the given set of initial TaskIDs. The initial TaskIDs are added to all TaskIDs as well as to the pending ones.
func (*Stats) ProgressPercent ¶
ProgressPercent retrieves the progress of a Flow execution in percent.
type Task ¶
Task is a unit of work. It has a name, a payload function and a set of dependencies. A is only started once all its dependencies have been completed successfully.
type TaskFn ¶
TaskFn is a payload function of a task.
EmptyTaskFn is a TaskFn that does nothing (returns nil).
func Parallel ¶
Parallel runs the given TaskFns in parallel, collecting their errors in a multierror.
func ParallelExitOnError ¶
ParallelExitOnError runs the given TaskFns in parallel and stops execution as soon as one TaskFn returns an error.
func Sequential ¶
Sequential runs the given TaskFns sequentially.
func (TaskFn) DoIf ¶
DoIf returns a TaskFn that will be executed if the condition is true when it is called. Otherwise, it will do nothing when called.
func (TaskFn) Recover ¶
Recover creates a new TaskFn that recovers an error with the given RecoverFn.
func (TaskFn) RetryUntilTimeout ¶
RetryUntilTimeout returns a TaskFn that is retried until the timeout is reached.
func (TaskFn) SkipIf ¶
SkipIf returns a TaskFn that does nothing if the condition is true, otherwise the function will be executed once called.
func (TaskFn) ToRecoverFn ¶
ToRecoverFn converts the TaskFn to a RecoverFn that ignores the incoming error.
type TaskIDSlice ¶
type TaskIDSlice []TaskID
TaskIDSlice is a slice of TaskIDs.
func (TaskIDSlice) Len ¶
func (t TaskIDSlice) Len() int
func (TaskIDSlice) Less ¶
func (t TaskIDSlice) Less(i1, i2 int) bool
func (TaskIDSlice) Swap ¶
func (t TaskIDSlice) Swap(i1, i2 int)
func (TaskIDSlice) TaskIDs ¶
func (t TaskIDSlice) TaskIDs() []TaskID
TaskIDs returns this as a slice of TaskIDs.
type TaskIDer ¶
type TaskIDer interface { // TaskIDs reports all TaskIDs of this TaskIDer. TaskIDs() []TaskID }
TaskIDer can produce a slice of TaskIDs. Default implementations of this are TaskIDs, TaskID and TaskIDSlice
type TaskIDs ¶
type TaskIDs map[TaskID]struct{}
TaskIDs is a set of TaskID.
func NewTaskIDs ¶
NewTaskIDs returns a new set of TaskIDs initialized to contain all TaskIDs of the given TaskIDers.
func (TaskIDs) InsertIf ¶
InsertIf inserts the TaskIDs of all TaskIDers into this TaskIDs if the given condition evaluates to true.
func (TaskIDs) List ¶
func (t TaskIDs) List() TaskIDSlice
List returns the elements of this in an ordered slice.
func (TaskIDs) StringList ¶
StringList returns the elements of this in an ordered string slice.
func (TaskIDs) UnsortedList ¶
func (t TaskIDs) UnsortedList() TaskIDSlice
UnsortedList returns the elements of this in an unordered slice.
func (TaskIDs) UnsortedStringList ¶
UnsortedStringList returns the elements of this in an unordered string slice.