Documentation ¶
Index ¶
- Constants
- func CalculateHoursToDelete(retentionPeriodHours, currentHourOfDay int) []string
- func CompletedWorkflowsLabelSelector() *v1.LabelSelector
- func CompletedWorkflowsSelectorOutsideRetentionPeriod(retentionPeriodHours int, currentTime time.Time) *v1.LabelSelector
- func FinalizersIdentical(o1 v1.Object, o2 v1.Object) bool
- func HasCompletedLabel(w *v1alpha1.FlyteWorkflow) bool
- func HasFinalizer(meta v1.Object) bool
- func IgnoreCompletedWorkflowsLabelSelector() *v1.LabelSelector
- func IsDeleted(meta v1.Object) bool
- func NewWorkQueue(ctx context.Context, cfg config.WorkqueueConfig, name string) (workqueue.RateLimitingInterface, error)
- func ResetFinalizers(meta v1.Object)
- func SetCompletedLabel(w *v1alpha1.FlyteWorkflow, currentTime time.Time)
- func SetFinalizerIfEmpty(meta v1.Object, finalizer string)
- type BatchingWorkQueue
- func (b *BatchingWorkQueue) AddToSubQueue(item interface{})
- func (b *BatchingWorkQueue) AddToSubQueueAfter(item interface{}, duration time.Duration)
- func (b *BatchingWorkQueue) AddToSubQueueRateLimited(item interface{})
- func (b *BatchingWorkQueue) ShutdownAll()
- func (b *BatchingWorkQueue) Start(ctx context.Context)
- type CompositeWorkQueue
- type Controller
- type GarbageCollector
- type Handler
- type Propeller
- type SimpleWorkQueue
- func (s *SimpleWorkQueue) AddToSubQueue(item interface{})
- func (s *SimpleWorkQueue) AddToSubQueueAfter(item interface{}, duration time.Duration)
- func (s *SimpleWorkQueue) AddToSubQueueRateLimited(item interface{})
- func (s *SimpleWorkQueue) ShutdownAll()
- func (s *SimpleWorkQueue) Start(ctx context.Context)
- type WorkerPool
Constants ¶
const FinalizerKey = "flyte-finalizer"
const (
ResourceVersion contextutils.Key = "rv_ver"
)
Variables ¶
This section is empty.
Functions ¶
func CalculateHoursToDelete ¶
Calculates a list of all the hours that should be deleted given the current hour of the day and the retentionperiod in hours Usually this is a list of all hours out of the 24 hours in the day - retention period - the current hour of the day
func CompletedWorkflowsLabelSelector ¶
func CompletedWorkflowsLabelSelector() *v1.LabelSelector
Creates a new LabelSelector that selects all workflows that have the completed Label
func CompletedWorkflowsSelectorOutsideRetentionPeriod ¶
func CompletedWorkflowsSelectorOutsideRetentionPeriod(retentionPeriodHours int, currentTime time.Time) *v1.LabelSelector
Creates a new selector that selects all completed workflows and workflows with completed hour label outside of the retention window
func FinalizersIdentical ¶
Currently we only compare the lengths of finalizers. If you add finalizers directly these API;'s will not work
func HasCompletedLabel ¶
func HasCompletedLabel(w *v1alpha1.FlyteWorkflow) bool
func IgnoreCompletedWorkflowsLabelSelector ¶
func IgnoreCompletedWorkflowsLabelSelector() *v1.LabelSelector
This function creates a label selector, that will ignore all objects (in this case workflow) that DOES NOT have a label key=workflowTerminationStatusKey with a value=workflowTerminatedValue
func IsDeleted ¶
Check if the deletion timestamp is set, this is set automatically when an object is deleted
func NewWorkQueue ¶
func NewWorkQueue(ctx context.Context, cfg config.WorkqueueConfig, name string) (workqueue.RateLimitingInterface, error)
func SetCompletedLabel ¶
func SetCompletedLabel(w *v1alpha1.FlyteWorkflow, currentTime time.Time)
func SetFinalizerIfEmpty ¶
Sets a new finalizer in case the finalizer is empty
Types ¶
type BatchingWorkQueue ¶
type BatchingWorkQueue struct { // workqueue is a rate limited work queue. This is used to queue work to be // processed instead of performing it as soon as a change happens. This // means we can ensure we only process a fixed amount of resources at a // time, and makes it easy to ensure we are never processing the same item // simultaneously in two different workers. workqueue.RateLimitingInterface // contains filtered or unexported fields }
A BatchingWorkQueue consists of 2 queues and migrates items from sub-queue to parent queue as a batch at a specified interval
func (*BatchingWorkQueue) AddToSubQueue ¶
func (b *BatchingWorkQueue) AddToSubQueue(item interface{})
func (*BatchingWorkQueue) AddToSubQueueAfter ¶
func (b *BatchingWorkQueue) AddToSubQueueAfter(item interface{}, duration time.Duration)
func (*BatchingWorkQueue) AddToSubQueueRateLimited ¶
func (b *BatchingWorkQueue) AddToSubQueueRateLimited(item interface{})
func (*BatchingWorkQueue) ShutdownAll ¶
func (b *BatchingWorkQueue) ShutdownAll()
func (*BatchingWorkQueue) Start ¶
func (b *BatchingWorkQueue) Start(ctx context.Context)
type CompositeWorkQueue ¶
type CompositeWorkQueue interface { workqueue.RateLimitingInterface // Specialized interface that should be called to start the migration of work from SubQueue to primaryQueue Start(ctx context.Context) // Shutsdown all the queues that are in the context ShutdownAll() // Adds the item explicitly to the subqueue AddToSubQueue(item interface{}) // Adds the item explicitly to the subqueue, using a rate limiter AddToSubQueueRateLimited(item interface{}) // Adds the item explicitly to the subqueue after some duration AddToSubQueueAfter(item interface{}, duration time.Duration) }
A CompositeWorkQueue can be used in cases where the work is enqueued by two sources. It can be enqueued by either
- Informer for the Primary Object itself. In case of FlytePropeller, this is the workflow object
- Informer or any other process that enqueues the top-level object for re-evaluation in response to one of the sub-objects being ready. In the case of FlytePropeller this is the "Node/Task" updates, will re-enqueue the workflow to be re-evaluated
func NewCompositeWorkQueue ¶
func NewCompositeWorkQueue(ctx context.Context, cfg config.CompositeQueueConfig, scope promutils.Scope) (CompositeWorkQueue, error)
type Controller ¶
type Controller struct {
// contains filtered or unexported fields
}
Controller is the controller implementation for FlyteWorkflow resources
func New ¶
func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Interface, flytepropellerClientset clientset.Interface, flyteworkflowInformerFactory informers.SharedInformerFactory, kubeClient executors.Client, scope promutils.Scope) (*Controller, error)
NewController returns a new FlyteWorkflow controller
type GarbageCollector ¶
type GarbageCollector struct {
// contains filtered or unexported fields
}
Garbage collector is an active background cleanup service, that deletes all workflows that are completed and older than the configured TTL
func NewGarbageCollector ¶
func NewGarbageCollector(cfg *config.Config, scope promutils.Scope, clk clock.Clock, namespaceClient corev1.NamespaceInterface, wfClient v1alpha1.FlyteworkflowV1alpha1Interface, namespace string) (*GarbageCollector, error)
type Propeller ¶
type Propeller struct {
// contains filtered or unexported fields
}
func NewPropellerHandler ¶
func (*Propeller) Handle ¶
reconciler compares the actual state with the desired, and attempts to converge the two. It then updates the GetExecutionStatus block of the FlyteWorkflow resource with the current status of the resource. Every FlyteWorkflow transitions through the following
The Workflow to be worked on is identified for the given namespace and executionID (which is the name of the workflow) The return value should be an error, in the case, we wish to retry this workflow <pre>
+--------+ +--------+ +--------+ +--------+ | | | | | | | | | Ready +--------> Running+--------> Succeeding---> Success| | | | | | | | | +--------+ +--------+ +--------- +--------+ | | | | | +----v---+ +--------+ | | | | | +-------------> Failing+--------> Failed | | | | | +--------+ +--------+
</pre>
type SimpleWorkQueue ¶
type SimpleWorkQueue struct { // workqueue is a rate limited work queue. This is used to queue work to be // processed instead of performing it as soon as a change happens. This // means we can ensure we only process a fixed amount of resources at a // time, and makes it easy to ensure we are never processing the same item // simultaneously in two different workers. workqueue.RateLimitingInterface }
SimpleWorkQueue provides a simple RateLimitingInterface, but ensures that the compositeQueue interface works with a default queue.
func (*SimpleWorkQueue) AddToSubQueue ¶
func (s *SimpleWorkQueue) AddToSubQueue(item interface{})
func (*SimpleWorkQueue) AddToSubQueueAfter ¶
func (s *SimpleWorkQueue) AddToSubQueueAfter(item interface{}, duration time.Duration)
func (*SimpleWorkQueue) AddToSubQueueRateLimited ¶
func (s *SimpleWorkQueue) AddToSubQueueRateLimited(item interface{})
func (*SimpleWorkQueue) ShutdownAll ¶
func (s *SimpleWorkQueue) ShutdownAll()
func (*SimpleWorkQueue) Start ¶
func (s *SimpleWorkQueue) Start(ctx context.Context)
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
func NewWorkerPool ¶
func NewWorkerPool(ctx context.Context, scope promutils.Scope, workQueue CompositeWorkQueue, handler Handler) *WorkerPool
func (*WorkerPool) Initialize ¶
func (w *WorkerPool) Initialize(ctx context.Context) error
func (*WorkerPool) Run ¶
func (w *WorkerPool) Run(ctx context.Context, threadiness int, synced ...cache.InformerSynced) error
Run will set up the event handlers for types we are interested in, as well as syncing informer caches and starting workers. It will block until stopCh is closed, at which point it will shutdown the workqueue and wait for workers to finish processing their current work items.