controller

package
v1.14.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 17, 2024 License: Apache-2.0 Imports: 70 Imported by: 0

Documentation

Overview

Package controller contains the K8s controller logic. This does not contain the actual workflow re-conciliation. It is then entrypoint into the K8s based Flyte controller.

Index

Constants

View Source
const FinalizerKey = "flyte-finalizer"

Variables

This section is empty.

Functions

func CalculateHoursToDelete

func CalculateHoursToDelete(retentionPeriodHours, currentHourOfDay int) []string

CalculateHoursToDelete calculates a list of all the hours that should be deleted given the current hour of the day and the retention period in hours. Usually this is a list of all hours out of the 24 hours in the day - retention period - the current hour of the day

func CalculateHoursToKeep

func CalculateHoursToKeep(retentionPeriodHours int, currentTime time.Time) []string

CalculateHoursToKeep calculates a list of all the hours that should be kept given the current time and the retention period in hours

func CompletedWorkflowsLabelSelector

func CompletedWorkflowsLabelSelector() *v1.LabelSelector

CompletedWorkflowsLabelSelector creates a new LabelSelector that selects all workflows that have the completed Label

func CompletedWorkflowsSelectorOutsideRetentionPeriod

func CompletedWorkflowsSelectorOutsideRetentionPeriod(retentionPeriodHours int, currentTime time.Time) *v1.LabelSelector

CompletedWorkflowsSelectorOutsideRetentionPeriod creates a new selector that selects all completed workflows and workflows with completed hour label outside the retention window.

func CreateControllerManager

func CreateControllerManager(ctx context.Context, cfg *config.Config, options manager.Options) (manager.Manager, error)

func DeprecatedCompletedWorkflowsSelectorOutsideRetentionPeriod

func DeprecatedCompletedWorkflowsSelectorOutsideRetentionPeriod(retentionPeriodHours int, currentTime time.Time) *v1.LabelSelector

DeprecatedCompletedWorkflowsSelectorOutsideRetentionPeriod Deprecated

func FinalizersIdentical

func FinalizersIdentical(o1 v1.Object, o2 v1.Object) bool

Currently we only compare the lengths of finalizers. If you add finalizers directly these API;'s will not work

func FormatTimeForLabel

func FormatTimeForLabel(currentTime time.Time) string

FormatTimeForLabel returns a string representation of the time with the following properties: 1. It's safe to put as a label in k8s objects' metadata 2. The granularity is up to the hour only. 3. The format is YYYY-MM-DD.HH 4. Is always in UTC.

func HasCompletedLabel

func HasCompletedLabel(w *v1alpha1.FlyteWorkflow) bool

func HasFinalizer

func HasFinalizer(meta v1.Object) bool

Check if any finalizer is set

func IgnoreCompletedWorkflowsLabelSelector

func IgnoreCompletedWorkflowsLabelSelector() *v1.LabelSelector

IgnoreCompletedWorkflowsLabelSelector this function creates a label selector, that will ignore all objects (in this case workflow) that DOES NOT have a label key=workflowTerminationStatusKey with a value=workflowTerminatedValue

func IsDeleted

func IsDeleted(meta v1.Object) bool

Check if the deletion timestamp is set, this is set automatically when an object is deleted

func NewDedupingBucketRateLimiter added in v1.14.0

func NewDedupingBucketRateLimiter(limiter interfaces.Limiter) workqueue.RateLimiter

func NewLimiter added in v1.14.0

func NewLimiter(r rate.Limit, b int) interfaces.Limiter

func RecordSystemError

func RecordSystemError(w *v1alpha1.FlyteWorkflow, err error) *v1alpha1.FlyteWorkflow

Helper method to record system error in the workflow.

func ResetFinalizers

func ResetFinalizers(meta v1.Object)

Reset all the finalizers on the object

func SetCompletedLabel

func SetCompletedLabel(w *v1alpha1.FlyteWorkflow, currentTime time.Time)

func SetDefinitionVersionIfEmpty

func SetDefinitionVersionIfEmpty(wf *v1alpha1.FlyteWorkflow, version v1alpha1.WorkflowDefinitionVersion)

func SetFinalizerIfEmpty

func SetFinalizerIfEmpty(meta v1.Object, finalizer string)

Sets a new finalizer in case the finalizer is empty

func SharedInformerOptions

func SharedInformerOptions(cfg *config.Config, defaultNamespace string) []informers.SharedInformerOption

SharedInformerOptions creates informer options to work with FlytePropeller Sharding

func StartController

func StartController(ctx context.Context, cfg *config.Config, defaultNamespace string, mgr manager.Manager, scope *promutils.Scope) error

StartController creates a new FlytePropeller Controller and starts it

func StartControllerManager

func StartControllerManager(ctx context.Context, mgr manager.Manager) error

StartControllerManager Start controller runtime manager to start listening to resource changes. K8sPluginManager uses controller runtime to create informers for the CRDs being monitored by plugins. The informer EventHandler enqueues the owner workflow for reevaluation. These informer events allow propeller to detect workflow changes faster than the default sync interval for workflow CRDs.

Types

type BatchingWorkQueue

type BatchingWorkQueue struct {
	// workqueue is a rate limited work queue. This is used to queue work to be
	// processed instead of performing it as soon as a change happens. This
	// means we can ensure we only process a fixed amount of resources at a
	// time, and makes it easy to ensure we are never processing the same item
	// simultaneously in two different workers.
	workqueue.RateLimitingInterface
	// contains filtered or unexported fields
}

A BatchingWorkQueue consists of 2 queues and migrates items from sub-queue to parent queue as a batch at a specified interval

func (*BatchingWorkQueue) AddToSubQueue

func (b *BatchingWorkQueue) AddToSubQueue(item interface{})

func (*BatchingWorkQueue) AddToSubQueueAfter

func (b *BatchingWorkQueue) AddToSubQueueAfter(item interface{}, duration time.Duration)

func (*BatchingWorkQueue) AddToSubQueueRateLimited

func (b *BatchingWorkQueue) AddToSubQueueRateLimited(item interface{})

func (*BatchingWorkQueue) ShutdownAll

func (b *BatchingWorkQueue) ShutdownAll()

func (*BatchingWorkQueue) Start

func (b *BatchingWorkQueue) Start(ctx context.Context)

type CompositeWorkQueue

type CompositeWorkQueue interface {
	workqueue.RateLimitingInterface
	// Specialized interface that should be called to start the migration of work from SubQueue to primaryQueue
	Start(ctx context.Context)
	// Shutsdown all the queues that are in the context
	ShutdownAll()
	// Adds the item explicitly to the subqueue
	AddToSubQueue(item interface{})
	// Adds the item explicitly to the subqueue, using a rate limiter
	AddToSubQueueRateLimited(item interface{})
	// Adds the item explicitly to the subqueue after some duration
	AddToSubQueueAfter(item interface{}, duration time.Duration)
}

A CompositeWorkQueue can be used in cases where the work is enqueued by two sources. It can be enqueued by either

  1. Informer for the Primary Object itself. In case of FlytePropeller, this is the workflow object
  2. Informer or any other process that enqueues the top-level object for re-evaluation in response to one of the sub-objects being ready. In the case of FlytePropeller this is the "Node/Task" updates, will re-enqueue the workflow to be re-evaluated

type Controller

type Controller struct {
	// contains filtered or unexported fields
}

Controller is the controller implementation for FlyteWorkflow resources

func New

func New(ctx context.Context, cfg *config.Config, kubeClientset kubernetes.Interface, flytepropellerClientset clientset.Interface,
	flyteworkflowInformerFactory informers.SharedInformerFactory, informerFactory k8sInformers.SharedInformerFactory,
	kubeClient executors.Client, scope promutils.Scope) (*Controller, error)

New returns a new FlyteWorkflow controller

func (*Controller) Run

func (c *Controller) Run(ctx context.Context) error

Run either as a leader -if configured- or as a standalone process.

type GarbageCollector

type GarbageCollector struct {
	// contains filtered or unexported fields
}

GarbageCollector is an active background cleanup service, that deletes all workflows that are completed and older than the configured TTL

func (*GarbageCollector) StartGC

func (g *GarbageCollector) StartGC(ctx context.Context) error

StartGC starts a background garbage collection routine. Use the context to signal an exit signal

type Handler

type Handler interface {
	// Initialize the Handler
	Initialize(ctx context.Context) error
	// Handle method that should handle the object and try to converge the desired and the actual state
	Handle(ctx context.Context, namespace, key string) error
}

type Propeller

type Propeller struct {
	// contains filtered or unexported fields
}

Core Propeller structure that houses the Reconciliation loop for Flytepropeller

func NewPropellerHandler

func NewPropellerHandler(_ context.Context, cfg *config.Config, store *storage.DataStore, wfStore workflowstore.FlyteWorkflow, executor executors.Workflow, scope promutils.Scope) *Propeller

NewPropellerHandler creates a new Propeller and initializes metrics

func (*Propeller) Handle

func (p *Propeller) Handle(ctx context.Context, namespace, name string) error

Handle method is the entry point for the reconciler. It compares the actual state with the desired, and attempts to converge the two. It then updates the GetExecutionStatus block of the FlyteWorkflow resource with the current status of the resource. Every FlyteWorkflow transitions through the following

The Workflow to be worked on is identified for the given namespace and executionID (which is the name of the workflow) The return value should be an error, in the case, we wish to retry this workflow <pre>

+--------+        +---------+        +------------+     +---------+
|        |        |         |        |            |     |         |
| Ready  +--------> Running +--------> Succeeding +-----> Success |
|        |        |         |        |            |     |         |
+--------+        +---------+        +------------+     +---------+
    |                  |
    |                  |
    |             +----v----+        +---------------------+        +--------+
    |             |         |        |     (optional)      |        |        |
    +-------------> Failing +--------> HandlingFailureNode +--------> Failed |
                  |         |        |                     |        |        |
                  +---------+        +---------------------+        +--------+

</pre>

func (*Propeller) Initialize

func (p *Propeller) Initialize(ctx context.Context) error

Initialize initializes all downstream executors

func (*Propeller) TryMutateWorkflow

func (p *Propeller) TryMutateWorkflow(ctx context.Context, originalW *v1alpha1.FlyteWorkflow) (*v1alpha1.FlyteWorkflow, error)

TryMutateWorkflow will try to mutate the workflow by traversing it and reconciling the desired and actual state. The desired state here is the entire workflow is completed, actual state is each nodes current execution state.

type ResourceLevelMonitor

type ResourceLevelMonitor struct {
	Scope promutils.Scope

	// Meta timer - this times each collection cycle to measure how long it takes to collect the levels GaugeVec below
	CollectorTimer promutils.StopWatch
	// contains filtered or unexported fields
}

ResourceLevelMonitor is responsible for emitting metrics that show the current number of Flyte workflows, by project and domain. It needs to be kicked off. The periodicity is not currently configurable because it seems unnecessary. It will also a timer measuring how long it takes to run each measurement cycle.

func NewResourceLevelMonitor

func NewResourceLevelMonitor(scope promutils.Scope, lister lister.FlyteWorkflowLister) *ResourceLevelMonitor

func (*ResourceLevelMonitor) RunCollector

func (r *ResourceLevelMonitor) RunCollector(ctx context.Context)

type SimpleWorkQueue

type SimpleWorkQueue struct {
	// workqueue is a rate limited work queue. This is used to queue work to be
	// processed instead of performing it as soon as a change happens. This
	// means we can ensure we only process a fixed amount of resources at a
	// time, and makes it easy to ensure we are never processing the same item
	// simultaneously in two different workers.
	workqueue.RateLimitingInterface
}

SimpleWorkQueue provides a simple RateLimitingInterface, but ensures that the compositeQueue interface works with a default queue.

func (*SimpleWorkQueue) AddToSubQueue

func (s *SimpleWorkQueue) AddToSubQueue(item interface{})

func (*SimpleWorkQueue) AddToSubQueueAfter

func (s *SimpleWorkQueue) AddToSubQueueAfter(item interface{}, duration time.Duration)

func (*SimpleWorkQueue) AddToSubQueueRateLimited

func (s *SimpleWorkQueue) AddToSubQueueRateLimited(item interface{})

func (*SimpleWorkQueue) ShutdownAll

func (s *SimpleWorkQueue) ShutdownAll()

func (*SimpleWorkQueue) Start

func (s *SimpleWorkQueue) Start(ctx context.Context)

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

func NewWorkerPool

func NewWorkerPool(ctx context.Context, scope promutils.Scope, workQueue CompositeWorkQueue, handler Handler) *WorkerPool

func (*WorkerPool) Initialize

func (w *WorkerPool) Initialize(ctx context.Context) error

func (*WorkerPool) Run

func (w *WorkerPool) Run(ctx context.Context, threadiness int, synced ...cache.InformerSynced) error

Run will set up the event handlers for types we are interested in, as well as syncing informer caches and starting workers. It will block until stopCh is closed, at which point it will shutdown the workqueue and wait for workers to finish processing their current work items.

Directories

Path Synopsis
Package config contains the core configuration for FlytePropeller.
Package config contains the core configuration for FlytePropeller.
Package nodes contains the Core Nodes Executor implementation and a subpackage for every node kind This module implements the core Nodes executor.
Package nodes contains the Core Nodes Executor implementation and a subpackage for every node kind This module implements the core Nodes executor.
end

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL