Documentation ¶
Overview ¶
Package controller contains the K8s controller logic. This does not contain the actual workflow re-conciliation. It is then entrypoint into the K8s based Flyte controller.
Index ¶
- Constants
- func CalculateHoursToDelete(retentionPeriodHours, currentHourOfDay int) []string
- func CalculateHoursToKeep(retentionPeriodHours int, currentTime time.Time) []string
- func CompletedWorkflowsLabelSelector() *v1.LabelSelector
- func CompletedWorkflowsSelectorOutsideRetentionPeriod(retentionPeriodHours int, currentTime time.Time) *v1.LabelSelector
- func CreateControllerManager(ctx context.Context, cfg *config.Config, options manager.Options) (manager.Manager, error)
- func DeprecatedCompletedWorkflowsSelectorOutsideRetentionPeriod(retentionPeriodHours int, currentTime time.Time) *v1.LabelSelector
- func FinalizersIdentical(o1 v1.Object, o2 v1.Object) bool
- func FormatTimeForLabel(currentTime time.Time) string
- func HasCompletedLabel(w *v1alpha1.FlyteWorkflow) bool
- func HasFinalizer(meta v1.Object) bool
- func IgnoreCompletedWorkflowsLabelSelector() *v1.LabelSelector
- func IsDeleted(meta v1.Object) bool
- func NewDedupingBucketRateLimiter(limiter interfaces.Limiter) workqueue.RateLimiter
- func NewLimiter(r rate.Limit, b int) interfaces.Limiter
- func NewWorkQueue(ctx context.Context, cfg config.WorkqueueConfig, name string) (workqueue.RateLimitingInterface, error)
- func RecordSystemError(w *v1alpha1.FlyteWorkflow, err error) *v1alpha1.FlyteWorkflow
- func ResetFinalizers(meta v1.Object)
- func SetCompletedLabel(w *v1alpha1.FlyteWorkflow, currentTime time.Time)
- func SetDefinitionVersionIfEmpty(wf *v1alpha1.FlyteWorkflow, version v1alpha1.WorkflowDefinitionVersion)
- func SetFinalizerIfEmpty(meta v1.Object, finalizer string)
- func SharedInformerOptions(cfg *config.Config, defaultNamespace string) []informers.SharedInformerOption
- func StartController(ctx context.Context, cfg *config.Config, defaultNamespace string, ...) error
- func StartControllerManager(ctx context.Context, mgr manager.Manager) error
- type BatchingWorkQueue
- func (b *BatchingWorkQueue) AddToSubQueue(item interface{})
- func (b *BatchingWorkQueue) AddToSubQueueAfter(item interface{}, duration time.Duration)
- func (b *BatchingWorkQueue) AddToSubQueueRateLimited(item interface{})
- func (b *BatchingWorkQueue) ShutdownAll()
- func (b *BatchingWorkQueue) Start(ctx context.Context)
- type CompositeWorkQueue
- type Controller
- type GarbageCollector
- type Handler
- type Propeller
- type ResourceLevelMonitor
- type SimpleWorkQueue
- func (s *SimpleWorkQueue) AddToSubQueue(item interface{})
- func (s *SimpleWorkQueue) AddToSubQueueAfter(item interface{}, duration time.Duration)
- func (s *SimpleWorkQueue) AddToSubQueueRateLimited(item interface{})
- func (s *SimpleWorkQueue) ShutdownAll()
- func (s *SimpleWorkQueue) Start(ctx context.Context)
- type WorkerPool
Constants ¶
const FinalizerKey = "flyte-finalizer"
Variables ¶
This section is empty.
Functions ¶
func CalculateHoursToDelete ¶
CalculateHoursToDelete calculates a list of all the hours that should be deleted given the current hour of the day and the retention period in hours. Usually this is a list of all hours out of the 24 hours in the day - retention period - the current hour of the day
func CalculateHoursToKeep ¶
CalculateHoursToKeep calculates a list of all the hours that should be kept given the current time and the retention period in hours
func CompletedWorkflowsLabelSelector ¶
func CompletedWorkflowsLabelSelector() *v1.LabelSelector
CompletedWorkflowsLabelSelector creates a new LabelSelector that selects all workflows that have the completed Label
func CompletedWorkflowsSelectorOutsideRetentionPeriod ¶
func CompletedWorkflowsSelectorOutsideRetentionPeriod(retentionPeriodHours int, currentTime time.Time) *v1.LabelSelector
CompletedWorkflowsSelectorOutsideRetentionPeriod creates a new selector that selects all completed workflows and workflows with completed hour label outside the retention window.
func CreateControllerManager ¶
func DeprecatedCompletedWorkflowsSelectorOutsideRetentionPeriod ¶
func DeprecatedCompletedWorkflowsSelectorOutsideRetentionPeriod(retentionPeriodHours int, currentTime time.Time) *v1.LabelSelector
DeprecatedCompletedWorkflowsSelectorOutsideRetentionPeriod Deprecated
func FinalizersIdentical ¶
Currently we only compare the lengths of finalizers. If you add finalizers directly these API;'s will not work
func FormatTimeForLabel ¶
FormatTimeForLabel returns a string representation of the time with the following properties: 1. It's safe to put as a label in k8s objects' metadata 2. The granularity is up to the hour only. 3. The format is YYYY-MM-DD.HH 4. Is always in UTC.
func HasCompletedLabel ¶
func HasCompletedLabel(w *v1alpha1.FlyteWorkflow) bool
func IgnoreCompletedWorkflowsLabelSelector ¶
func IgnoreCompletedWorkflowsLabelSelector() *v1.LabelSelector
IgnoreCompletedWorkflowsLabelSelector this function creates a label selector, that will ignore all objects (in this case workflow) that DOES NOT have a label key=workflowTerminationStatusKey with a value=workflowTerminatedValue
func IsDeleted ¶
Check if the deletion timestamp is set, this is set automatically when an object is deleted
func NewDedupingBucketRateLimiter ¶ added in v1.14.0
func NewDedupingBucketRateLimiter(limiter interfaces.Limiter) workqueue.RateLimiter
func NewLimiter ¶ added in v1.14.0
func NewLimiter(r rate.Limit, b int) interfaces.Limiter
func NewWorkQueue ¶
func NewWorkQueue(ctx context.Context, cfg config.WorkqueueConfig, name string) (workqueue.RateLimitingInterface, error)
func RecordSystemError ¶
func RecordSystemError(w *v1alpha1.FlyteWorkflow, err error) *v1alpha1.FlyteWorkflow
Helper method to record system error in the workflow.
func SetCompletedLabel ¶
func SetCompletedLabel(w *v1alpha1.FlyteWorkflow, currentTime time.Time)
func SetDefinitionVersionIfEmpty ¶
func SetDefinitionVersionIfEmpty(wf *v1alpha1.FlyteWorkflow, version v1alpha1.WorkflowDefinitionVersion)
func SetFinalizerIfEmpty ¶
Sets a new finalizer in case the finalizer is empty
func SharedInformerOptions ¶
func SharedInformerOptions(cfg *config.Config, defaultNamespace string) []informers.SharedInformerOption
SharedInformerOptions creates informer options to work with FlytePropeller Sharding
func StartController ¶
func StartController(ctx context.Context, cfg *config.Config, defaultNamespace string, mgr manager.Manager, scope *promutils.Scope) error
StartController creates a new FlytePropeller Controller and starts it
func StartControllerManager ¶
StartControllerManager Start controller runtime manager to start listening to resource changes. K8sPluginManager uses controller runtime to create informers for the CRDs being monitored by plugins. The informer EventHandler enqueues the owner workflow for reevaluation. These informer events allow propeller to detect workflow changes faster than the default sync interval for workflow CRDs.
Types ¶
type BatchingWorkQueue ¶
type BatchingWorkQueue struct { // workqueue is a rate limited work queue. This is used to queue work to be // processed instead of performing it as soon as a change happens. This // means we can ensure we only process a fixed amount of resources at a // time, and makes it easy to ensure we are never processing the same item // simultaneously in two different workers. workqueue.RateLimitingInterface // contains filtered or unexported fields }
A BatchingWorkQueue consists of 2 queues and migrates items from sub-queue to parent queue as a batch at a specified interval
func (*BatchingWorkQueue) AddToSubQueue ¶
func (b *BatchingWorkQueue) AddToSubQueue(item interface{})
func (*BatchingWorkQueue) AddToSubQueueAfter ¶
func (b *BatchingWorkQueue) AddToSubQueueAfter(item interface{}, duration time.Duration)
func (*BatchingWorkQueue) AddToSubQueueRateLimited ¶
func (b *BatchingWorkQueue) AddToSubQueueRateLimited(item interface{})
func (*BatchingWorkQueue) ShutdownAll ¶
func (b *BatchingWorkQueue) ShutdownAll()
func (*BatchingWorkQueue) Start ¶
func (b *BatchingWorkQueue) Start(ctx context.Context)
type CompositeWorkQueue ¶
type CompositeWorkQueue interface { workqueue.RateLimitingInterface // Specialized interface that should be called to start the migration of work from SubQueue to primaryQueue Start(ctx context.Context) // Shutsdown all the queues that are in the context ShutdownAll() // Adds the item explicitly to the subqueue AddToSubQueue(item interface{}) // Adds the item explicitly to the subqueue, using a rate limiter AddToSubQueueRateLimited(item interface{}) // Adds the item explicitly to the subqueue after some duration AddToSubQueueAfter(item interface{}, duration time.Duration) }
A CompositeWorkQueue can be used in cases where the work is enqueued by two sources. It can be enqueued by either
- Informer for the Primary Object itself. In case of FlytePropeller, this is the workflow object
- Informer or any other process that enqueues the top-level object for re-evaluation in response to one of the sub-objects being ready. In the case of FlytePropeller this is the "Node/Task" updates, will re-enqueue the workflow to be re-evaluated
func NewCompositeWorkQueue ¶
func NewCompositeWorkQueue(ctx context.Context, cfg config.CompositeQueueConfig, scope promutils.Scope) (CompositeWorkQueue, error)
type Controller ¶
type Controller struct {
// contains filtered or unexported fields
}
Controller is the controller implementation for FlyteWorkflow resources
func New ¶
func New(ctx context.Context, cfg *config.Config, kubeClientset kubernetes.Interface, flytepropellerClientset clientset.Interface, flyteworkflowInformerFactory informers.SharedInformerFactory, informerFactory k8sInformers.SharedInformerFactory, kubeClient executors.Client, scope promutils.Scope) (*Controller, error)
New returns a new FlyteWorkflow controller
type GarbageCollector ¶
type GarbageCollector struct {
// contains filtered or unexported fields
}
GarbageCollector is an active background cleanup service, that deletes all workflows that are completed and older than the configured TTL
func NewGarbageCollector ¶
func NewGarbageCollector(cfg *config.Config, scope promutils.Scope, clk clock.WithTicker, namespaceClient corev1.NamespaceInterface, wfClient v1alpha1.FlyteworkflowV1alpha1Interface) (*GarbageCollector, error)
type Propeller ¶
type Propeller struct {
// contains filtered or unexported fields
}
Core Propeller structure that houses the Reconciliation loop for Flytepropeller
func NewPropellerHandler ¶
func NewPropellerHandler(_ context.Context, cfg *config.Config, store *storage.DataStore, wfStore workflowstore.FlyteWorkflow, executor executors.Workflow, scope promutils.Scope) *Propeller
NewPropellerHandler creates a new Propeller and initializes metrics
func (*Propeller) Handle ¶
Handle method is the entry point for the reconciler. It compares the actual state with the desired, and attempts to converge the two. It then updates the GetExecutionStatus block of the FlyteWorkflow resource with the current status of the resource. Every FlyteWorkflow transitions through the following
The Workflow to be worked on is identified for the given namespace and executionID (which is the name of the workflow) The return value should be an error, in the case, we wish to retry this workflow <pre>
+--------+ +---------+ +------------+ +---------+ | | | | | | | | | Ready +--------> Running +--------> Succeeding +-----> Success | | | | | | | | | +--------+ +---------+ +------------+ +---------+ | | | | | +----v----+ +---------------------+ +--------+ | | | | (optional) | | | +-------------> Failing +--------> HandlingFailureNode +--------> Failed | | | | | | | +---------+ +---------------------+ +--------+
</pre>
func (*Propeller) Initialize ¶
Initialize initializes all downstream executors
func (*Propeller) TryMutateWorkflow ¶
func (p *Propeller) TryMutateWorkflow(ctx context.Context, originalW *v1alpha1.FlyteWorkflow) (*v1alpha1.FlyteWorkflow, error)
TryMutateWorkflow will try to mutate the workflow by traversing it and reconciling the desired and actual state. The desired state here is the entire workflow is completed, actual state is each nodes current execution state.
type ResourceLevelMonitor ¶
type ResourceLevelMonitor struct { Scope promutils.Scope // Meta timer - this times each collection cycle to measure how long it takes to collect the levels GaugeVec below CollectorTimer promutils.StopWatch // contains filtered or unexported fields }
ResourceLevelMonitor is responsible for emitting metrics that show the current number of Flyte workflows, by project and domain. It needs to be kicked off. The periodicity is not currently configurable because it seems unnecessary. It will also a timer measuring how long it takes to run each measurement cycle.
func NewResourceLevelMonitor ¶
func NewResourceLevelMonitor(scope promutils.Scope, lister lister.FlyteWorkflowLister) *ResourceLevelMonitor
func (*ResourceLevelMonitor) RunCollector ¶
func (r *ResourceLevelMonitor) RunCollector(ctx context.Context)
type SimpleWorkQueue ¶
type SimpleWorkQueue struct { // workqueue is a rate limited work queue. This is used to queue work to be // processed instead of performing it as soon as a change happens. This // means we can ensure we only process a fixed amount of resources at a // time, and makes it easy to ensure we are never processing the same item // simultaneously in two different workers. workqueue.RateLimitingInterface }
SimpleWorkQueue provides a simple RateLimitingInterface, but ensures that the compositeQueue interface works with a default queue.
func (*SimpleWorkQueue) AddToSubQueue ¶
func (s *SimpleWorkQueue) AddToSubQueue(item interface{})
func (*SimpleWorkQueue) AddToSubQueueAfter ¶
func (s *SimpleWorkQueue) AddToSubQueueAfter(item interface{}, duration time.Duration)
func (*SimpleWorkQueue) AddToSubQueueRateLimited ¶
func (s *SimpleWorkQueue) AddToSubQueueRateLimited(item interface{})
func (*SimpleWorkQueue) ShutdownAll ¶
func (s *SimpleWorkQueue) ShutdownAll()
func (*SimpleWorkQueue) Start ¶
func (s *SimpleWorkQueue) Start(ctx context.Context)
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
func NewWorkerPool ¶
func NewWorkerPool(ctx context.Context, scope promutils.Scope, workQueue CompositeWorkQueue, handler Handler) *WorkerPool
func (*WorkerPool) Initialize ¶
func (w *WorkerPool) Initialize(ctx context.Context) error
func (*WorkerPool) Run ¶
func (w *WorkerPool) Run(ctx context.Context, threadiness int, synced ...cache.InformerSynced) error
Run will set up the event handlers for types we are interested in, as well as syncing informer caches and starting workers. It will block until stopCh is closed, at which point it will shutdown the workqueue and wait for workers to finish processing their current work items.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package config contains the core configuration for FlytePropeller.
|
Package config contains the core configuration for FlytePropeller. |
Package nodes contains the Core Nodes Executor implementation and a subpackage for every node kind This module implements the core Nodes executor.
|
Package nodes contains the Core Nodes Executor implementation and a subpackage for every node kind This module implements the core Nodes executor. |