Documentation ¶
Overview ¶
Package controller contains the K8s controller logic. This does not contain the actual workflow re-conciliation. It is then entrypoint into the K8s based Flyte controller.
Index ¶
- Constants
- func CalculateHoursToDelete(retentionPeriodHours, currentHourOfDay int) []string
- func CompletedWorkflowsLabelSelector() *v1.LabelSelector
- func CompletedWorkflowsSelectorOutsideRetentionPeriod(retentionPeriodHours int, currentTime time.Time) *v1.LabelSelector
- func FinalizersIdentical(o1 v1.Object, o2 v1.Object) bool
- func HasCompletedLabel(w *v1alpha1.FlyteWorkflow) bool
- func HasFinalizer(meta v1.Object) bool
- func IgnoreCompletedWorkflowsLabelSelector() *v1.LabelSelector
- func IsDeleted(meta v1.Object) bool
- func NewWorkQueue(ctx context.Context, cfg config.WorkqueueConfig, name string) (workqueue.RateLimitingInterface, error)
- func RecordSystemError(w *v1alpha1.FlyteWorkflow, err error) *v1alpha1.FlyteWorkflow
- func ResetFinalizers(meta v1.Object)
- func SetCompletedLabel(w *v1alpha1.FlyteWorkflow, currentTime time.Time)
- func SetFinalizerIfEmpty(meta v1.Object, finalizer string)
- func SharedInformerOptions(cfg *config.Config, defaultNamespace string) []informers.SharedInformerOption
- func StartController(ctx context.Context, cfg *config.Config, defaultNamespace string) error
- type BatchingWorkQueue
- func (b *BatchingWorkQueue) AddToSubQueue(item interface{})
- func (b *BatchingWorkQueue) AddToSubQueueAfter(item interface{}, duration time.Duration)
- func (b *BatchingWorkQueue) AddToSubQueueRateLimited(item interface{})
- func (b *BatchingWorkQueue) ShutdownAll()
- func (b *BatchingWorkQueue) Start(ctx context.Context)
- type CompositeWorkQueue
- type Controller
- type GarbageCollector
- type Handler
- type Propeller
- type ResourceLevelMonitor
- type SimpleWorkQueue
- func (s *SimpleWorkQueue) AddToSubQueue(item interface{})
- func (s *SimpleWorkQueue) AddToSubQueueAfter(item interface{}, duration time.Duration)
- func (s *SimpleWorkQueue) AddToSubQueueRateLimited(item interface{})
- func (s *SimpleWorkQueue) ShutdownAll()
- func (s *SimpleWorkQueue) Start(ctx context.Context)
- type WorkerPool
Constants ¶
const FinalizerKey = "flyte-finalizer"
Variables ¶
This section is empty.
Functions ¶
func CalculateHoursToDelete ¶
Calculates a list of all the hours that should be deleted given the current hour of the day and the retentionperiod in hours Usually this is a list of all hours out of the 24 hours in the day - retention period - the current hour of the day
func CompletedWorkflowsLabelSelector ¶
func CompletedWorkflowsLabelSelector() *v1.LabelSelector
Creates a new LabelSelector that selects all workflows that have the completed Label
func CompletedWorkflowsSelectorOutsideRetentionPeriod ¶
func CompletedWorkflowsSelectorOutsideRetentionPeriod(retentionPeriodHours int, currentTime time.Time) *v1.LabelSelector
Creates a new selector that selects all completed workflows and workflows with completed hour label outside of the retention window
func FinalizersIdentical ¶
Currently we only compare the lengths of finalizers. If you add finalizers directly these API;'s will not work
func HasCompletedLabel ¶
func HasCompletedLabel(w *v1alpha1.FlyteWorkflow) bool
func IgnoreCompletedWorkflowsLabelSelector ¶
func IgnoreCompletedWorkflowsLabelSelector() *v1.LabelSelector
This function creates a label selector, that will ignore all objects (in this case workflow) that DOES NOT have a label key=workflowTerminationStatusKey with a value=workflowTerminatedValue
func IsDeleted ¶
Check if the deletion timestamp is set, this is set automatically when an object is deleted
func NewWorkQueue ¶
func NewWorkQueue(ctx context.Context, cfg config.WorkqueueConfig, name string) (workqueue.RateLimitingInterface, error)
func RecordSystemError ¶ added in v0.7.0
func RecordSystemError(w *v1alpha1.FlyteWorkflow, err error) *v1alpha1.FlyteWorkflow
Helper method to record system error in the workflow.
func SetCompletedLabel ¶
func SetCompletedLabel(w *v1alpha1.FlyteWorkflow, currentTime time.Time)
func SetFinalizerIfEmpty ¶
Sets a new finalizer in case the finalizer is empty
func SharedInformerOptions ¶ added in v0.16.24
func SharedInformerOptions(cfg *config.Config, defaultNamespace string) []informers.SharedInformerOption
SharedInformerOptions creates informer options to work with FlytePropeller Sharding
Types ¶
type BatchingWorkQueue ¶
type BatchingWorkQueue struct { // workqueue is a rate limited work queue. This is used to queue work to be // processed instead of performing it as soon as a change happens. This // means we can ensure we only process a fixed amount of resources at a // time, and makes it easy to ensure we are never processing the same item // simultaneously in two different workers. workqueue.RateLimitingInterface // contains filtered or unexported fields }
A BatchingWorkQueue consists of 2 queues and migrates items from sub-queue to parent queue as a batch at a specified interval
func (*BatchingWorkQueue) AddToSubQueue ¶
func (b *BatchingWorkQueue) AddToSubQueue(item interface{})
func (*BatchingWorkQueue) AddToSubQueueAfter ¶
func (b *BatchingWorkQueue) AddToSubQueueAfter(item interface{}, duration time.Duration)
func (*BatchingWorkQueue) AddToSubQueueRateLimited ¶
func (b *BatchingWorkQueue) AddToSubQueueRateLimited(item interface{})
func (*BatchingWorkQueue) ShutdownAll ¶
func (b *BatchingWorkQueue) ShutdownAll()
func (*BatchingWorkQueue) Start ¶
func (b *BatchingWorkQueue) Start(ctx context.Context)
type CompositeWorkQueue ¶
type CompositeWorkQueue interface { workqueue.RateLimitingInterface // Specialized interface that should be called to start the migration of work from SubQueue to primaryQueue Start(ctx context.Context) // Shutsdown all the queues that are in the context ShutdownAll() // Adds the item explicitly to the subqueue AddToSubQueue(item interface{}) // Adds the item explicitly to the subqueue, using a rate limiter AddToSubQueueRateLimited(item interface{}) // Adds the item explicitly to the subqueue after some duration AddToSubQueueAfter(item interface{}, duration time.Duration) }
A CompositeWorkQueue can be used in cases where the work is enqueued by two sources. It can be enqueued by either
- Informer for the Primary Object itself. In case of FlytePropeller, this is the workflow object
- Informer or any other process that enqueues the top-level object for re-evaluation in response to one of the sub-objects being ready. In the case of FlytePropeller this is the "Node/Task" updates, will re-enqueue the workflow to be re-evaluated
func NewCompositeWorkQueue ¶
func NewCompositeWorkQueue(ctx context.Context, cfg config.CompositeQueueConfig, scope promutils.Scope) (CompositeWorkQueue, error)
type Controller ¶
type Controller struct {
// contains filtered or unexported fields
}
Controller is the controller implementation for FlyteWorkflow resources
func New ¶
func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Interface, flytepropellerClientset clientset.Interface, flyteworkflowInformerFactory informers.SharedInformerFactory, kubeClient executors.Client, scope promutils.Scope) (*Controller, error)
New returns a new FlyteWorkflow controller
type GarbageCollector ¶
type GarbageCollector struct {
// contains filtered or unexported fields
}
Garbage collector is an active background cleanup service, that deletes all workflows that are completed and older than the configured TTL
func NewGarbageCollector ¶
func NewGarbageCollector(cfg *config.Config, scope promutils.Scope, clk clock.Clock, namespaceClient corev1.NamespaceInterface, wfClient v1alpha1.FlyteworkflowV1alpha1Interface) (*GarbageCollector, error)
type Propeller ¶
type Propeller struct {
// contains filtered or unexported fields
}
Core Propeller structure that houses the Reconciliation loop for Flytepropeller
func NewPropellerHandler ¶
func NewPropellerHandler(_ context.Context, cfg *config.Config, wfStore workflowstore.FlyteWorkflow, executor executors.Workflow, scope promutils.Scope) *Propeller
NewPropellerHandler creates a new Propeller and initializes metrics
func (*Propeller) Handle ¶
Handle method is the entry point for the reconciler. It compares the actual state with the desired, and attempts to converge the two. It then updates the GetExecutionStatus block of the FlyteWorkflow resource with the current status of the resource. Every FlyteWorkflow transitions through the following
The Workflow to be worked on is identified for the given namespace and executionID (which is the name of the workflow) The return value should be an error, in the case, we wish to retry this workflow <pre>
+--------+ +---------+ +------------+ +---------+ | | | | | | | | | Ready +--------> Running +--------> Succeeding +-----> Success | | | | | | | | | +--------+ +---------+ +------------+ +---------+ | | | | | +----v----+ +---------------------+ +--------+ | | | | (optional) | | | +-------------> Failing +--------> HandlingFailureNode +--------> Failed | | | | | | | +---------+ +---------------------+ +--------+
</pre>
func (*Propeller) Initialize ¶
Initializes all downstream executors
func (*Propeller) TryMutateWorkflow ¶ added in v0.7.0
func (p *Propeller) TryMutateWorkflow(ctx context.Context, originalW *v1alpha1.FlyteWorkflow) (*v1alpha1.FlyteWorkflow, error)
TryMutateWorkflow will try to mutate the workflow by traversing it and reconciling the desired and actual state. The desired state here is the entire workflow is completed, actual state is each nodes current execution state.
type ResourceLevelMonitor ¶ added in v0.7.0
type ResourceLevelMonitor struct { Scope promutils.Scope // Meta timer - this times each collection cycle to measure how long it takes to collect the levels GaugeVec below CollectorTimer promutils.StopWatch // contains filtered or unexported fields }
ResourceLevelMonitor is responsible for emitting metrics that show the current number of Flyte workflows, by project and domain. It needs to be kicked off. The periodicity is not currently configurable because it seems unnecessary. It will also a timer measuring how long it takes to run each measurement cycle.
func NewResourceLevelMonitor ¶ added in v0.7.0
func NewResourceLevelMonitor(scope promutils.Scope, lister lister.FlyteWorkflowLister) *ResourceLevelMonitor
func (*ResourceLevelMonitor) RunCollector ¶ added in v0.7.0
func (r *ResourceLevelMonitor) RunCollector(ctx context.Context)
type SimpleWorkQueue ¶
type SimpleWorkQueue struct { // workqueue is a rate limited work queue. This is used to queue work to be // processed instead of performing it as soon as a change happens. This // means we can ensure we only process a fixed amount of resources at a // time, and makes it easy to ensure we are never processing the same item // simultaneously in two different workers. workqueue.RateLimitingInterface }
SimpleWorkQueue provides a simple RateLimitingInterface, but ensures that the compositeQueue interface works with a default queue.
func (*SimpleWorkQueue) AddToSubQueue ¶
func (s *SimpleWorkQueue) AddToSubQueue(item interface{})
func (*SimpleWorkQueue) AddToSubQueueAfter ¶
func (s *SimpleWorkQueue) AddToSubQueueAfter(item interface{}, duration time.Duration)
func (*SimpleWorkQueue) AddToSubQueueRateLimited ¶
func (s *SimpleWorkQueue) AddToSubQueueRateLimited(item interface{})
func (*SimpleWorkQueue) ShutdownAll ¶
func (s *SimpleWorkQueue) ShutdownAll()
func (*SimpleWorkQueue) Start ¶
func (s *SimpleWorkQueue) Start(ctx context.Context)
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
func NewWorkerPool ¶
func NewWorkerPool(ctx context.Context, scope promutils.Scope, workQueue CompositeWorkQueue, handler Handler) *WorkerPool
func (*WorkerPool) Initialize ¶
func (w *WorkerPool) Initialize(ctx context.Context) error
func (*WorkerPool) Run ¶
func (w *WorkerPool) Run(ctx context.Context, threadiness int, synced ...cache.InformerSynced) error
Run will set up the event handlers for types we are interested in, as well as syncing informer caches and starting workers. It will block until stopCh is closed, at which point it will shutdown the workqueue and wait for workers to finish processing their current work items.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package config contains the core configuration for FlytePropeller.
|
Package config contains the core configuration for FlytePropeller. |
Package nodes contains the Core Nodes Executor implementation and a subpackage for every node kind This module implements the core Nodes executor.
|
Package nodes contains the Core Nodes Executor implementation and a subpackage for every node kind This module implements the core Nodes executor. |