flow

package
v1.96.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 14, 2024 License: Apache-2.0 Imports: 13 Imported by: 40

Documentation

Overview

Package flow provides utilities to construct a directed acyclic computational graph that is then executed and monitored with maximum parallelism.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ContextWithTimeout is context.WithTimeout. Exposed for testing.
	ContextWithTimeout = context.WithTimeout
)

Functions

func Causes

func Causes(err error) *multierror.Error

Causes reports the causes of all Task errors of the given Flow error.

func Errors

func Errors(err error) *multierror.Error

Errors reports all wrapped Task errors of the given Flow error.

func MakeDescription added in v1.76.0

func MakeDescription(stats *Stats) string

MakeDescription returns a description based on the stats.

func WasCanceled

func WasCanceled(err error) bool

WasCanceled determines whether the given flow error was caused by cancellation.

Types

type ErrorCleaner

type ErrorCleaner func(context.Context, string)

ErrorCleaner is called when a task which errored during the previous reconciliation phase completes with success

type Flow

type Flow struct {
	// contains filtered or unexported fields
}

Flow is a validated executable Graph.

func (*Flow) Len

func (f *Flow) Len() int

Len retrieves the amount of tasks in a Flow.

func (*Flow) Name

func (f *Flow) Name() string

Name retrieves the name of a flow.

func (*Flow) Run

func (f *Flow) Run(ctx context.Context, opts Opts) error

Run starts an execution of a Flow. It blocks until the Flow has finished and returns the error, if any.

type Graph

type Graph struct {
	// contains filtered or unexported fields
}

Graph is a builder for a Flow.

func NewGraph

func NewGraph(name string) *Graph

NewGraph returns a new Graph with the given name.

func (*Graph) Add

func (g *Graph) Add(task Task) TaskID

Add adds the given Task to the graph. This panics if - There is already a Task present with the same name - One of the dependencies of the Task is not present

func (*Graph) Compile

func (g *Graph) Compile() *Flow

Compile compiles the graph into an executable Flow.

func (*Graph) Name

func (g *Graph) Name() string

Name returns the name of a graph.

type Opts

type Opts struct {
	// Log is used to log any output during flow execution.
	Log logr.Logger
	// ProgressReporter is used to report the progress during flow execution.
	ProgressReporter ProgressReporter
	// ErrorCleaner is used to clean up a previously failed task.
	ErrorCleaner func(ctx context.Context, taskID string)
	// ErrorContext is used to store any error related context.
	ErrorContext *errorsutils.ErrorContext
}

Opts are options for a Flow execution. If they are not set, they are left blank and don't affect the Flow.

type ProgressReporter

type ProgressReporter interface {
	// Start starts the progress reporter.
	Start(context.Context) error
	// Stop stops the progress reporter.
	Stop()
	// Report reports the progress using the current statistics.
	Report(context.Context, *Stats)
}

ProgressReporter is used to report the current progress of a flow.

func NewDelayingProgressReporter added in v1.10.2

func NewDelayingProgressReporter(clock clock.Clock, reporterFn ProgressReporterFn, period time.Duration) ProgressReporter

NewDelayingProgressReporter returns a new progress reporter with the given function and the configured period. A period of `0` will lead to immediate reports as soon as flow tasks are completed.

func NewImmediateProgressReporter added in v1.10.2

func NewImmediateProgressReporter(reporterFn ProgressReporterFn) ProgressReporter

NewImmediateProgressReporter returns a new progress reporter with the given function.

type ProgressReporterFn added in v1.10.2

type ProgressReporterFn func(context.Context, *Stats)

ProgressReporterFn is continuously called on progress in a flow.

type RecoverFn

type RecoverFn func(ctx context.Context, err error) error

RecoverFn is a function that can recover an error.

type Stats

type Stats struct {
	FlowName  string
	All       TaskIDs
	Succeeded TaskIDs
	Failed    TaskIDs
	Running   TaskIDs
	Skipped   TaskIDs
	Pending   TaskIDs
}

Stats are the statistics of a Flow execution.

func InitialStats

func InitialStats(flowName string, all TaskIDs) *Stats

InitialStats creates a new Stats object with the given set of initial TaskIDs. The initial TaskIDs are added to all TaskIDs as well as to the pending ones.

func (*Stats) Copy

func (s *Stats) Copy() *Stats

Copy deeply copies a Stats object.

func (*Stats) ProgressPercent

func (s *Stats) ProgressPercent() int32

ProgressPercent retrieves the progress of a Flow execution in percent.

type Task

type Task struct {
	Name         string
	Fn           TaskFn
	SkipIf       bool
	Dependencies TaskIDs
}

Task is a unit of work. It has a name, a payload function and a set of dependencies. A is only started once all its dependencies have been completed successfully.

func (*Task) Spec

func (t *Task) Spec() *TaskSpec

Spec returns the TaskSpec of a task.

type TaskFn

type TaskFn func(ctx context.Context) error

TaskFn is a payload function of a task.

func Parallel

func Parallel(fns ...TaskFn) TaskFn

Parallel runs the given TaskFns in parallel, collecting their errors in a multierror.

func ParallelExitOnError

func ParallelExitOnError(fns ...TaskFn) TaskFn

ParallelExitOnError runs the given TaskFns in parallel and stops execution as soon as one TaskFn returns an error.

func ParallelN added in v1.86.0

func ParallelN(n int, fns ...TaskFn) TaskFn

ParallelN returns a function that runs the given TaskFns in parallel by spawning N workers, collecting their errors in a multierror. If N <= 0, then N will be defaulted to len(fns).

func Sequential

func Sequential(fns ...TaskFn) TaskFn

Sequential runs the given TaskFns sequentially.

func (TaskFn) Recover

func (t TaskFn) Recover(recoverFn RecoverFn) TaskFn

Recover creates a new TaskFn that recovers an error with the given RecoverFn.

func (TaskFn) RetryUntilTimeout

func (t TaskFn) RetryUntilTimeout(interval, timeout time.Duration) TaskFn

RetryUntilTimeout returns a TaskFn that is retried until the timeout is reached.

func (TaskFn) Timeout

func (t TaskFn) Timeout(timeout time.Duration) TaskFn

Timeout returns a TaskFn that is bound to a context which times out.

func (TaskFn) ToRecoverFn

func (t TaskFn) ToRecoverFn() RecoverFn

ToRecoverFn converts the TaskFn to a RecoverFn that ignores the incoming error.

type TaskID

type TaskID string

TaskID is an id of a task.

func (TaskID) TaskIDs

func (t TaskID) TaskIDs() []TaskID

TaskIDs retrieves this TaskID as a singleton slice.

type TaskIDSlice

type TaskIDSlice []TaskID

TaskIDSlice is a slice of TaskIDs.

func (TaskIDSlice) Len

func (t TaskIDSlice) Len() int

func (TaskIDSlice) Less

func (t TaskIDSlice) Less(i1, i2 int) bool

func (TaskIDSlice) Swap

func (t TaskIDSlice) Swap(i1, i2 int)

func (TaskIDSlice) TaskIDs

func (t TaskIDSlice) TaskIDs() []TaskID

TaskIDs returns this as a slice of TaskIDs.

type TaskIDer

type TaskIDer interface {
	// TaskIDs reports all TaskIDs of this TaskIDer.
	TaskIDs() []TaskID
}

TaskIDer can produce a slice of TaskIDs. Default implementations of this are TaskIDs, TaskID and TaskIDSlice

type TaskIDs

type TaskIDs map[TaskID]struct{}

TaskIDs is a set of TaskID.

func NewTaskIDs

func NewTaskIDs(ids ...TaskIDer) TaskIDs

NewTaskIDs returns a new set of TaskIDs initialized to contain all TaskIDs of the given TaskIDers.

func (TaskIDs) Copy

func (t TaskIDs) Copy() TaskIDs

Copy makes a deep copy of this TaskIDs.

func (TaskIDs) Delete

func (t TaskIDs) Delete(iders ...TaskIDer) TaskIDs

Delete deletes the TaskIDs of all TaskIDers from this TaskIDs.

func (TaskIDs) Has

func (t TaskIDs) Has(id TaskID) bool

Has checks if the given TaskID is present in this set.

func (TaskIDs) Insert

func (t TaskIDs) Insert(iders ...TaskIDer) TaskIDs

Insert inserts the TaskIDs of all TaskIDers into this TaskIDs.

func (TaskIDs) InsertIf added in v0.34.0

func (t TaskIDs) InsertIf(condition bool, iders ...TaskIDer) TaskIDs

InsertIf inserts the TaskIDs of all TaskIDers into this TaskIDs if the given condition evaluates to true.

func (TaskIDs) Len

func (t TaskIDs) Len() int

Len returns the amount of TaskIDs this contains.

func (TaskIDs) List

func (t TaskIDs) List() TaskIDSlice

List returns the elements of this in an ordered slice.

func (TaskIDs) StringList

func (t TaskIDs) StringList() []string

StringList returns the elements of this in an ordered string slice.

func (TaskIDs) TaskIDs

func (t TaskIDs) TaskIDs() []TaskID

TaskIDs retrieves all TaskIDs as an unsorted slice.

func (TaskIDs) UnsortedList

func (t TaskIDs) UnsortedList() TaskIDSlice

UnsortedList returns the elements of this in an unordered slice.

func (TaskIDs) UnsortedStringList

func (t TaskIDs) UnsortedStringList() []string

UnsortedStringList returns the elements of this in an unordered string slice.

type TaskSpec

type TaskSpec struct {
	Fn           TaskFn
	Skip         bool
	Dependencies TaskIDs
}

TaskSpec is functional body of a Task, consisting only of the payload function and the dependencies of the Task.

type Tasks

type Tasks map[TaskID]*TaskSpec

Tasks is a mapping from TaskID to TaskSpec.

Directories

Path Synopsis
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL