Documentation ¶
Index ¶
- func GCPodName(wfr string) string
- func GetExecutionContext(wfr *v1alpha1.WorkflowRun) *v1alpha1.ExecutionContext
- func IsTrivial(wf *v1alpha1.Workflow, stage string) bool
- func IsWorkflowRunTerminated(wfr *v1alpha1.WorkflowRun) bool
- func NextStages(wf *v1alpha1.Workflow, wfr *v1alpha1.WorkflowRun) []string
- func ParseTime(t string) (time.Duration, error)
- type AttemptAction
- type EventUpdater
- type GCProcessor
- type LimitedQueues
- type LimitedSortedQueue
- type Node
- type Operator
- type ParallelismController
- type PodEventWatcher
- type TimeoutProcessor
- type WorkloadProcessor
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GetExecutionContext ¶ added in v0.9.3
func GetExecutionContext(wfr *v1alpha1.WorkflowRun) *v1alpha1.ExecutionContext
GetExecutionContext gets execution context from WorkflowRun, if not found, use the default context in workflow controller configuration.
func IsWorkflowRunTerminated ¶ added in v0.9.3
func IsWorkflowRunTerminated(wfr *v1alpha1.WorkflowRun) bool
IsWorkflowRunTerminated judges whether the WorkflowRun has be terminated. Return true if terminated, otherwise return false.
func NextStages ¶
func NextStages(wf *v1alpha1.Workflow, wfr *v1alpha1.WorkflowRun) []string
NextStages determine next stages that can be started to execute. It returns stages that are not started yet but have all depended stages finished.
Types ¶
type AttemptAction ¶ added in v1.0.0
type AttemptAction string
AttemptAction defines the action while try to run a new workflowRun
const ( // AttemptActionStart represents the WorkflowRun can start to run AttemptActionStart AttemptAction = "Start" // AttemptActionQueued represents the WorkflowRun is queued AttemptActionQueued AttemptAction = "Queued" // AttemptActionFailed represents the WorkflowRun will fail directly due to queue full AttemptActionFailed AttemptAction = "Failed" )
type EventUpdater ¶ added in v1.1.0
type EventUpdater interface {
UpdateEvents(stage string, events []v1alpha1.StageEvent) error
}
EventUpdater update events to the stage status of corresponding workflowRun's status. Event with same name will be override.
func NewEventUpdater ¶ added in v1.1.0
func NewEventUpdater(client clientset.Interface, wfrNamespace, wfrName string) EventUpdater
NewEventUpdater creates an event updater
type GCProcessor ¶
type GCProcessor struct {
// contains filtered or unexported fields
}
GCProcessor processes garbage collection for WorkflowRun objects.
func NewGCProcessor ¶
func NewGCProcessor(client clientset.Interface, enabled bool) *GCProcessor
NewGCProcessor create new GC processor.
func (*GCProcessor) Add ¶
func (p *GCProcessor) Add(wfr *v1alpha1.WorkflowRun)
Add WorkflowRun object to GC processor, it will firstly judge whether the WorkflowRun object needs GC, if it's true, it will perform GC on it in the right time.
type LimitedQueues ¶
type LimitedQueues struct { // Maximum queue size, it indicates maximum number of WorkflowRuns to retain for each Workflow. MaxQueueSize int // Workflow queue map. It use Workflow name and namespace as the key, and manage Queue for each // Workflow. Queues map[string]*LimitedSortedQueue // k8s client used to clean old WorkflowRun Client clientset.Interface }
LimitedQueues manages WorkflowRun queue for each Workflow. Queue for each Workflow is limited to a given maximum size, if new WorkflowRun created, the oldest one would be removed.
func NewLimitedQueues ¶
func NewLimitedQueues(client clientset.Interface, maxSize int) *LimitedQueues
NewLimitedQueues creates a limited queues for WorkflowRuns, and start auto scan.
func (*LimitedQueues) AddOrRefresh ¶
func (w *LimitedQueues) AddOrRefresh(wfr *v1alpha1.WorkflowRun)
AddOrRefresh adds a WorkflowRun to its corresponding queue, if the queue size exceed the maximum size, the oldest one would be deleted. And if the WorkflowRun already exists in the queue, its 'refresh' time field would be refreshed.
func (*LimitedQueues) AutoScan ¶
func (w *LimitedQueues) AutoScan()
AutoScan scans all WorkflowRuns in the queues regularly, remove abnormal ones with old enough refresh time.
func (*LimitedQueues) Refresh ¶ added in v0.9.3
func (w *LimitedQueues) Refresh(wfr *v1alpha1.WorkflowRun)
Refresh refreshes the WorkflowRun in the queue, the refresh time would be updated.
type LimitedSortedQueue ¶
type LimitedSortedQueue struct {
// contains filtered or unexported fields
}
LimitedSortedQueue is a sorted fixed length queue implemented with single linked list. Note that each queue would have a sentinel node to assist the implementation, it's a dummy node, and won't be counted in the queue size. So an empty queue would have head pointed to dummy node, with queue size 0.
func NewQueue ¶
func NewQueue(key string, max int) *LimitedSortedQueue
NewQueue creates a limited sorted queue.
func (*LimitedSortedQueue) Pop ¶
func (q *LimitedSortedQueue) Pop() *Node
Pop pops up a WorkflowRun object from the queue, it's the oldest one that will be popped.
func (*LimitedSortedQueue) PushOrRefresh ¶
func (q *LimitedSortedQueue) PushOrRefresh(wfr *v1alpha1.WorkflowRun)
PushOrRefresh pushes a WorkflowRun object to the queue, it will be inserted in the right place to keep the queue sorted by creation time. If the object already existed in the queue, its refresh time would be updated.
func (*LimitedSortedQueue) Refresh ¶ added in v0.9.3
func (q *LimitedSortedQueue) Refresh(wfr *v1alpha1.WorkflowRun) bool
Refresh updates refresh time of WorkflowRun in the queue, if the WorkflowRun found in the queue and update successfully, return true, otherwise return false.
type Node ¶
type Node struct {
// contains filtered or unexported fields
}
Node represents a WorkflowRun in the queue. The 'next' link to next node in the queue, and the 'refresh' stands the last time this node is refreshed.
'refresh' here is used to deal with a rarely occurred case: when one WorkflowRun got deleted in etcd, but workflow controller didn't get notified. Workflow controller would perform resync with etcd regularly, (5 minutes by default), during resync every WorkflowRun in the queue would be refreshed, it one WorkflowRun is deleted in etcd abnormally, it wouldn't get refreshed in the queue, so we can judge by the refresh time for this case.
When we found a node that hasn't be refreshed for a long time (for example, twice the resync period), then we remove this node from the queue.
type Operator ¶
type Operator interface { // Get Recorder GetRecorder() record.EventRecorder // Get WorkflowRun instance. GetWorkflowRun() *v1alpha1.WorkflowRun // Update WorkflowRun, mainly the status. Update() error // Update stage status. UpdateStageStatus(stage string, status *v1alpha1.Status) // Update stage pod info. UpdateStagePodInfo(stage string, podInfo *v1alpha1.PodInfo) // Update stage outputs, they are key-value results from stage execution UpdateStageOutputs(stage string, keyValues []v1alpha1.KeyValue) // Decide overall status of the WorkflowRun from stage status. OverallStatus() (*v1alpha1.Status, error) // Garbage collection on the WorkflowRun based on GC policy configured // in Workflow Controller. Pod and data on PV would be cleaned. // - 'lastTry' indicates whether this is the last time to perform GC, // if set to true, the WorkflowRun status will be marked as cleaned regardless // whether the GC action succeeded or not. // - 'wfrDeletion' indicates whether the GC is performed because of WorkflowRun deleted. GC(lastTry, wfrDeletion bool) error // Run next stages in the Workflow and resolve overall status. Reconcile() error // ResolveGlobalVariables resolves global variables from workflow. ResolveGlobalVariables() }
Operator is used to perform operations on a WorkflowRun instance, such as update status, run next stages, garbage collection, etc.
func NewOperator ¶
func NewOperator(clusterClient kubernetes.Interface, client clientset.Interface, wfr interface{}, namespace string) (Operator, error)
NewOperator create a new operator.
wfr can be passed as a workflowRun's name or a workflowRun object. if a name is passed, this func will not get the related workflow and use it to initialize the Operator; but if a object is passed, will do that.
And operator returned by passing a workflowRun name can not invoke the operator's some methods as follows: InitStagesStatus, Update, OverallStatus and Reconcile.
type ParallelismController ¶ added in v1.0.0
type ParallelismController interface { // AttemptNew tries to run a new WorkflowRun, and returns the corresponding action. AttemptNew(ns, wf, wfr string) AttemptAction // MarkFinished mark a WorkflowRun execution finished MarkFinished(ns, wf, wfr string) }
ParallelismController is an interface to manage parallelism of WorkflowRun executions
func NewParallelismController ¶ added in v1.0.0
func NewParallelismController(parallelismConfig *controller.ParallelismConfig) ParallelismController
NewParallelismController creates a ParallelismController
type PodEventWatcher ¶ added in v1.1.0
type PodEventWatcher interface { // Invoke Work in background goroutine, this is a blocking method. Work(stage, podNamespace, podName string) }
PodEventWatcher watches Kubernetes events for a pod and records Warning type events to WorkflowRun status.
type TimeoutProcessor ¶
type TimeoutProcessor struct {
// contains filtered or unexported fields
}
TimeoutProcessor manages timeout of WorkflowRun.
func NewTimeoutProcessor ¶
func NewTimeoutProcessor(client clientset.Interface) *TimeoutProcessor
NewTimeoutProcessor creates a timeout manager and run it.
func (*TimeoutProcessor) AddIfNotExist ¶ added in v1.1.0
func (m *TimeoutProcessor) AddIfNotExist(wfr *v1alpha1.WorkflowRun) error
AddIfNotExist adds a WorkflowRun to the timeout manager if it is not exist.
func (*TimeoutProcessor) Run ¶
func (m *TimeoutProcessor) Run(interval time.Duration)
Run will check timeout of managed WorkflowRun and process items that have expired their time.
type WorkloadProcessor ¶ added in v0.9.3
type WorkloadProcessor struct {
// contains filtered or unexported fields
}
WorkloadProcessor processes stage workload. There are kinds of workload supported: pod, delegation. With pod, Cyclone would create a pod to run the stage. With delegation, Cyclone would send a POST request to the given URL in the workload spec.
func NewWorkloadProcessor ¶ added in v0.9.3
func NewWorkloadProcessor(clusterClient kubernetes.Interface, client clientset.Interface, wf *v1alpha1.Workflow, wfr *v1alpha1.WorkflowRun, stage *v1alpha1.Stage, wfrOperator Operator) *WorkloadProcessor
NewWorkloadProcessor ...
func (*WorkloadProcessor) Process ¶ added in v0.9.3
func (p *WorkloadProcessor) Process() error
Process processes the stage according to workload type.