controller

package
v0.13.16 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 24, 2021 License: Apache-2.0 Imports: 53 Imported by: 1

Documentation

Overview

Package controller contains the K8s controller logic. This does not contain the actual workflow re-conciliation. It is then entrypoint into the K8s based Flyte controller.

Index

Constants

View Source
const FinalizerKey = "flyte-finalizer"

Variables

This section is empty.

Functions

func CalculateHoursToDelete

func CalculateHoursToDelete(retentionPeriodHours, currentHourOfDay int) []string

Calculates a list of all the hours that should be deleted given the current hour of the day and the retentionperiod in hours Usually this is a list of all hours out of the 24 hours in the day - retention period - the current hour of the day

func CompletedWorkflowsLabelSelector

func CompletedWorkflowsLabelSelector() *v1.LabelSelector

Creates a new LabelSelector that selects all workflows that have the completed Label

func CompletedWorkflowsSelectorOutsideRetentionPeriod

func CompletedWorkflowsSelectorOutsideRetentionPeriod(retentionPeriodHours int, currentTime time.Time) *v1.LabelSelector

Creates a new selector that selects all completed workflows and workflows with completed hour label outside of the retention window

func FinalizersIdentical

func FinalizersIdentical(o1 v1.Object, o2 v1.Object) bool

Currently we only compare the lengths of finalizers. If you add finalizers directly these API;'s will not work

func HasCompletedLabel

func HasCompletedLabel(w *v1alpha1.FlyteWorkflow) bool

func HasFinalizer

func HasFinalizer(meta v1.Object) bool

Check if any finalizer is set

func IgnoreCompletedWorkflowsLabelSelector

func IgnoreCompletedWorkflowsLabelSelector() *v1.LabelSelector

This function creates a label selector, that will ignore all objects (in this case workflow) that DOES NOT have a label key=workflowTerminationStatusKey with a value=workflowTerminatedValue

func IsDeleted

func IsDeleted(meta v1.Object) bool

Check if the deletion timestamp is set, this is set automatically when an object is deleted

func RecordSystemError added in v0.7.0

func RecordSystemError(w *v1alpha1.FlyteWorkflow, err error) *v1alpha1.FlyteWorkflow

Helper method to record system error in the workflow.

func ResetFinalizers

func ResetFinalizers(meta v1.Object)

Reset all the finalizers on the object

func SetCompletedLabel

func SetCompletedLabel(w *v1alpha1.FlyteWorkflow, currentTime time.Time)

func SetFinalizerIfEmpty

func SetFinalizerIfEmpty(meta v1.Object, finalizer string)

Sets a new finalizer in case the finalizer is empty

Types

type BatchingWorkQueue

type BatchingWorkQueue struct {
	// workqueue is a rate limited work queue. This is used to queue work to be
	// processed instead of performing it as soon as a change happens. This
	// means we can ensure we only process a fixed amount of resources at a
	// time, and makes it easy to ensure we are never processing the same item
	// simultaneously in two different workers.
	workqueue.RateLimitingInterface
	// contains filtered or unexported fields
}

A BatchingWorkQueue consists of 2 queues and migrates items from sub-queue to parent queue as a batch at a specified interval

func (*BatchingWorkQueue) AddToSubQueue

func (b *BatchingWorkQueue) AddToSubQueue(item interface{})

func (*BatchingWorkQueue) AddToSubQueueAfter

func (b *BatchingWorkQueue) AddToSubQueueAfter(item interface{}, duration time.Duration)

func (*BatchingWorkQueue) AddToSubQueueRateLimited

func (b *BatchingWorkQueue) AddToSubQueueRateLimited(item interface{})

func (*BatchingWorkQueue) ShutdownAll

func (b *BatchingWorkQueue) ShutdownAll()

func (*BatchingWorkQueue) Start

func (b *BatchingWorkQueue) Start(ctx context.Context)

type CompositeWorkQueue

type CompositeWorkQueue interface {
	workqueue.RateLimitingInterface
	// Specialized interface that should be called to start the migration of work from SubQueue to primaryQueue
	Start(ctx context.Context)
	// Shutsdown all the queues that are in the context
	ShutdownAll()
	// Adds the item explicitly to the subqueue
	AddToSubQueue(item interface{})
	// Adds the item explicitly to the subqueue, using a rate limiter
	AddToSubQueueRateLimited(item interface{})
	// Adds the item explicitly to the subqueue after some duration
	AddToSubQueueAfter(item interface{}, duration time.Duration)
}

A CompositeWorkQueue can be used in cases where the work is enqueued by two sources. It can be enqueued by either

  1. Informer for the Primary Object itself. In case of FlytePropeller, this is the workflow object
  2. Informer or any other process that enqueues the top-level object for re-evaluation in response to one of the sub-objects being ready. In the case of FlytePropeller this is the "Node/Task" updates, will re-enqueue the workflow to be re-evaluated

type Controller

type Controller struct {
	// contains filtered or unexported fields
}

Controller is the controller implementation for FlyteWorkflow resources

func New

func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Interface, flytepropellerClientset clientset.Interface,
	flyteworkflowInformerFactory informers.SharedInformerFactory, kubeClient executors.Client, scope promutils.Scope) (*Controller, error)

NewController returns a new FlyteWorkflow controller

func (*Controller) Run

func (c *Controller) Run(ctx context.Context) error

Runs either as a leader -if configured- or as a standalone process.

type GarbageCollector

type GarbageCollector struct {
	// contains filtered or unexported fields
}

Garbage collector is an active background cleanup service, that deletes all workflows that are completed and older than the configured TTL

func NewGarbageCollector

func NewGarbageCollector(cfg *config.Config, scope promutils.Scope, clk clock.Clock, namespaceClient corev1.NamespaceInterface, wfClient v1alpha1.FlyteworkflowV1alpha1Interface) (*GarbageCollector, error)

func (*GarbageCollector) StartGC

func (g *GarbageCollector) StartGC(ctx context.Context) error

Use this method to start a background garbage collection routine. Use the context to signal an exit signal

type Handler

type Handler interface {
	// Initialize the Handler
	Initialize(ctx context.Context) error
	// Handle method that should handle the object and try to converge the desired and the actual state
	Handle(ctx context.Context, namespace, key string) error
}

type Propeller

type Propeller struct {
	// contains filtered or unexported fields
}

Core Propeller structure that houses the Reconciliation loop for Flytepropeller

func NewPropellerHandler

func NewPropellerHandler(_ context.Context, cfg *config.Config, wfStore workflowstore.FlyteWorkflow, executor executors.Workflow, scope promutils.Scope) *Propeller

NewPropellerHandler creates a new Propeller and initializes metrics

func (*Propeller) Handle

func (p *Propeller) Handle(ctx context.Context, namespace, name string) error

Handle method is the entry point for the reconciler. It compares the actual state with the desired, and attempts to converge the two. It then updates the GetExecutionStatus block of the FlyteWorkflow resource with the current status of the resource. Every FlyteWorkflow transitions through the following

The Workflow to be worked on is identified for the given namespace and executionID (which is the name of the workflow) The return value should be an error, in the case, we wish to retry this workflow <pre>

+--------+        +---------+        +------------+     +---------+
|        |        |         |        |            |     |         |
| Ready  +--------> Running +--------> Succeeding +-----> Success |
|        |        |         |        |            |     |         |
+--------+        +---------+        +------------+     +---------+
    |                  |
    |                  |
    |             +----v----+        +---------------------+        +--------+
    |             |         |        |     (optional)      |        |        |
    +-------------> Failing +--------> HandlingFailureNode +--------> Failed |
                  |         |        |                     |        |        |
                  +---------+        +---------------------+        +--------+

</pre>

func (*Propeller) Initialize

func (p *Propeller) Initialize(ctx context.Context) error

Initializes all downstream executors

func (*Propeller) TryMutateWorkflow added in v0.7.0

func (p *Propeller) TryMutateWorkflow(ctx context.Context, originalW *v1alpha1.FlyteWorkflow) (*v1alpha1.FlyteWorkflow, error)

TryMutateWorkflow will try to mutate the workflow by traversing it and reconciling the desired and actual state. The desired state here is the entire workflow is completed, actual state is each nodes current execution state.

type ResourceLevelMonitor added in v0.7.0

type ResourceLevelMonitor struct {
	Scope promutils.Scope

	// Meta timer - this times each collection cycle to measure how long it takes to collect the levels GaugeVec below
	CollectorTimer promutils.StopWatch
	// contains filtered or unexported fields
}

This object is responsible for emitting metrics that show the current number of Flyte workflows, cut by project and domain. It needs to be kicked off. The periodicity is not currently configurable because it seems unnecessary. It will also a timer measuring how long it takes to run each measurement cycle.

func NewResourceLevelMonitor added in v0.7.0

func NewResourceLevelMonitor(scope promutils.Scope, lister lister.FlyteWorkflowLister) *ResourceLevelMonitor

func (*ResourceLevelMonitor) RunCollector added in v0.7.0

func (r *ResourceLevelMonitor) RunCollector(ctx context.Context)

type SimpleWorkQueue

type SimpleWorkQueue struct {
	// workqueue is a rate limited work queue. This is used to queue work to be
	// processed instead of performing it as soon as a change happens. This
	// means we can ensure we only process a fixed amount of resources at a
	// time, and makes it easy to ensure we are never processing the same item
	// simultaneously in two different workers.
	workqueue.RateLimitingInterface
}

SimpleWorkQueue provides a simple RateLimitingInterface, but ensures that the compositeQueue interface works with a default queue.

func (*SimpleWorkQueue) AddToSubQueue

func (s *SimpleWorkQueue) AddToSubQueue(item interface{})

func (*SimpleWorkQueue) AddToSubQueueAfter

func (s *SimpleWorkQueue) AddToSubQueueAfter(item interface{}, duration time.Duration)

func (*SimpleWorkQueue) AddToSubQueueRateLimited

func (s *SimpleWorkQueue) AddToSubQueueRateLimited(item interface{})

func (*SimpleWorkQueue) ShutdownAll

func (s *SimpleWorkQueue) ShutdownAll()

func (*SimpleWorkQueue) Start

func (s *SimpleWorkQueue) Start(ctx context.Context)

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

func NewWorkerPool

func NewWorkerPool(ctx context.Context, scope promutils.Scope, workQueue CompositeWorkQueue, handler Handler) *WorkerPool

func (*WorkerPool) Initialize

func (w *WorkerPool) Initialize(ctx context.Context) error

func (*WorkerPool) Run

func (w *WorkerPool) Run(ctx context.Context, threadiness int, synced ...cache.InformerSynced) error

Run will set up the event handlers for types we are interested in, as well as syncing informer caches and starting workers. It will block until stopCh is closed, at which point it will shutdown the workqueue and wait for workers to finish processing their current work items.

Directories

Path Synopsis
Package config contains the core configuration for FlytePropeller.
Package config contains the core configuration for FlytePropeller.
Core Nodes Executor implementation This module implements the core Nodes executor.
Core Nodes Executor implementation This module implements the core Nodes executor.
end

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL