Documentation ¶
Overview ¶
Package flow provides utilities to construct a directed acyclic computational graph that is then executed and monitored with maximum parallelism.
Index ¶
- Variables
- func Causes(err error) *multierror.Error
- func Errors(err error) *multierror.Error
- func MakeDescription(stats *Stats) string
- func RegisterMetrics(r prometheus.Registerer)
- func WasCanceled(err error) bool
- type ErrorCleaner
- type Flow
- type Graph
- type Opts
- type ProgressReporter
- type ProgressReporterFn
- type RecoverFn
- type Stats
- type Task
- type TaskFn
- type TaskID
- type TaskIDSlice
- type TaskIDer
- type TaskIDs
- func (t TaskIDs) Copy() TaskIDs
- func (t TaskIDs) Delete(iders ...TaskIDer) TaskIDs
- func (t TaskIDs) Has(id TaskID) bool
- func (t TaskIDs) Insert(iders ...TaskIDer) TaskIDs
- func (t TaskIDs) InsertIf(condition bool, iders ...TaskIDer) TaskIDs
- func (t TaskIDs) Len() int
- func (t TaskIDs) List() TaskIDSlice
- func (t TaskIDs) StringList() []string
- func (t TaskIDs) TaskIDs() []TaskID
- func (t TaskIDs) UnsortedList() TaskIDSlice
- func (t TaskIDs) UnsortedStringList() []string
- type TaskSpec
- type Tasks
Constants ¶
This section is empty.
Variables ¶
var ( // ContextWithTimeout is context.WithTimeout. Exposed for testing. ContextWithTimeout = context.WithTimeout )
Functions ¶
func Causes ¶
func Causes(err error) *multierror.Error
Causes reports the causes of all Task errors of the given Flow error.
func Errors ¶
func Errors(err error) *multierror.Error
Errors reports all wrapped Task errors of the given Flow error.
func MakeDescription ¶
MakeDescription returns a description based on the stats.
func RegisterMetrics ¶
func RegisterMetrics(r prometheus.Registerer)
RegisterMetrics registers the metrics for the flow library on the passed registry. This function can only be called once. If this function is not called, no metrics are collected in this package.
func WasCanceled ¶
WasCanceled determines whether the given flow error was caused by cancellation.
Types ¶
type ErrorCleaner ¶
ErrorCleaner is called when a task which errored during the previous reconciliation phase completes with success
type Flow ¶
type Flow struct {
// contains filtered or unexported fields
}
Flow is a validated executable Graph.
type Graph ¶
type Graph struct { // Clock is used to retrieve the current time. Clock clock.Clock // contains filtered or unexported fields }
Graph is a builder for a Flow.
func (*Graph) Add ¶
Add adds the given Task to the graph. This panics if - There is already a Task present with the same name - One of the dependencies of the Task is not present
type Opts ¶
type Opts struct { // Log is used to log any output during flow execution. Log logr.Logger // ProgressReporter is used to report the progress during flow execution. ProgressReporter ProgressReporter // ErrorCleaner is used to clean up a previously failed task. ErrorCleaner func(ctx context.Context, taskID string) // ErrorContext is used to store any error related context. ErrorContext *errorsutils.ErrorContext }
Opts are options for a Flow execution. If they are not set, they are left blank and don't affect the Flow.
type ProgressReporter ¶
type ProgressReporter interface { // Start starts the progress reporter. Start(context.Context) error // Stop stops the progress reporter. Stop() // Report reports the progress using the current statistics. Report(context.Context, *Stats) }
ProgressReporter is used to report the current progress of a flow.
func NewDelayingProgressReporter ¶
func NewDelayingProgressReporter(clock clock.Clock, reporterFn ProgressReporterFn, period time.Duration) ProgressReporter
NewDelayingProgressReporter returns a new progress reporter with the given function and the configured period. A period of `0` will lead to immediate reports as soon as flow tasks are completed.
func NewImmediateProgressReporter ¶
func NewImmediateProgressReporter(reporterFn ProgressReporterFn) ProgressReporter
NewImmediateProgressReporter returns a new progress reporter with the given function.
type ProgressReporterFn ¶
ProgressReporterFn is continuously called on progress in a flow.
type Stats ¶
type Stats struct { FlowName string All TaskIDs Succeeded TaskIDs Failed TaskIDs Running TaskIDs Skipped TaskIDs Pending TaskIDs }
Stats are the statistics of a Flow execution.
func InitialStats ¶
InitialStats creates a new Stats object with the given set of initial TaskIDs. The initial TaskIDs are added to all TaskIDs as well as to the pending ones.
func (*Stats) ProgressPercent ¶
ProgressPercent retrieves the progress of a Flow execution in percent.
type Task ¶
Task is a unit of work. It has a name, a payload function and a set of dependencies. A is only started once all its dependencies have been completed successfully.
type TaskFn ¶
TaskFn is a payload function of a task.
func Parallel ¶
Parallel runs the given TaskFns in parallel, collecting their errors in a multierror.
func ParallelExitOnError ¶
ParallelExitOnError runs the given TaskFns in parallel and stops execution as soon as one TaskFn returns an error.
func ParallelN ¶
ParallelN returns a function that runs the given TaskFns in parallel by spawning N workers, collecting their errors in a multierror. If N <= 0, then N will be defaulted to len(fns).
func Sequential ¶
Sequential runs the given TaskFns sequentially.
func (TaskFn) Recover ¶
Recover creates a new TaskFn that recovers an error with the given RecoverFn.
func (TaskFn) RetryUntilTimeout ¶
RetryUntilTimeout returns a TaskFn that is retried until the timeout is reached.
func (TaskFn) ToRecoverFn ¶
ToRecoverFn converts the TaskFn to a RecoverFn that ignores the incoming error.
type TaskIDSlice ¶
type TaskIDSlice []TaskID
TaskIDSlice is a slice of TaskIDs.
func (TaskIDSlice) Len ¶
func (t TaskIDSlice) Len() int
func (TaskIDSlice) Less ¶
func (t TaskIDSlice) Less(i1, i2 int) bool
func (TaskIDSlice) Swap ¶
func (t TaskIDSlice) Swap(i1, i2 int)
func (TaskIDSlice) TaskIDs ¶
func (t TaskIDSlice) TaskIDs() []TaskID
TaskIDs returns this as a slice of TaskIDs.
type TaskIDer ¶
type TaskIDer interface { // TaskIDs reports all TaskIDs of this TaskIDer. TaskIDs() []TaskID }
TaskIDer can produce a slice of TaskIDs. Default implementations of this are TaskIDs, TaskID and TaskIDSlice
type TaskIDs ¶
type TaskIDs map[TaskID]struct{}
TaskIDs is a set of TaskID.
func NewTaskIDs ¶
NewTaskIDs returns a new set of TaskIDs initialized to contain all TaskIDs of the given TaskIDers.
func (TaskIDs) InsertIf ¶
InsertIf inserts the TaskIDs of all TaskIDers into this TaskIDs if the given condition evaluates to true.
func (TaskIDs) List ¶
func (t TaskIDs) List() TaskIDSlice
List returns the elements of this in an ordered slice.
func (TaskIDs) StringList ¶
StringList returns the elements of this in an ordered string slice.
func (TaskIDs) UnsortedList ¶
func (t TaskIDs) UnsortedList() TaskIDSlice
UnsortedList returns the elements of this in an unordered slice.
func (TaskIDs) UnsortedStringList ¶
UnsortedStringList returns the elements of this in an unordered string slice.