Documentation ¶
Index ¶
- Constants
- Variables
- func IsNotFound(err error) bool
- func IsWorkflowStale(err error) bool
- func IsWorkflowTerminated(err error) bool
- func IsWorkflowTooLarge(err error) bool
- func SetConfig(cfg *Config) error
- type Config
- type ExecutionStatsHolder
- func (esh *ExecutionStatsHolder) AddOrUpdateEntry(executionID string, executionStats SingleExecutionStats) error
- func (esh *ExecutionStatsHolder) AggregateActiveValues() (int, uint32, uint32, error)
- func (esh *ExecutionStatsHolder) LogAllActiveExecutions(ctx context.Context)
- func (esh *ExecutionStatsHolder) RemoveTerminatedExecutions(ctx context.Context, workflows map[string]bool) error
- type ExecutionStatsMonitor
- type FlyteWorkflow
- func NewPassthroughWorkflowStore(_ context.Context, scope promutils.Scope, ...) FlyteWorkflow
- func NewResourceVersionCachingStore(_ context.Context, scope promutils.Scope, workflowStore FlyteWorkflow) FlyteWorkflow
- func NewTerminatedTrackingStore(_ context.Context, scope promutils.Scope, workflowStore FlyteWorkflow) (FlyteWorkflow, error)
- func NewWorkflowStore(ctx context.Context, cfg *Config, lister v1alpha1.FlyteWorkflowLister, ...) (FlyteWorkflow, error)
- type InmemoryWorkflowStore
- func (i *InmemoryWorkflowStore) Create(ctx context.Context, w *v1alpha1.FlyteWorkflow) error
- func (i *InmemoryWorkflowStore) Delete(ctx context.Context, namespace, name string) error
- func (i *InmemoryWorkflowStore) Get(ctx context.Context, namespace, name string) (*v1alpha1.FlyteWorkflow, error)
- func (i *InmemoryWorkflowStore) Update(ctx context.Context, w *v1alpha1.FlyteWorkflow, priorityClass PriorityClass) (newWF *v1alpha1.FlyteWorkflow, err error)
- func (i *InmemoryWorkflowStore) UpdateStatus(ctx context.Context, w *v1alpha1.FlyteWorkflow, priorityClass PriorityClass) (newWF *v1alpha1.FlyteWorkflow, err error)
- type Policy
- type PriorityClass
- type SingleExecutionStats
Constants ¶
const ( // PolicyInMemory provides an inmemory Workflow store which is useful for testing PolicyInMemory = "InMemory" // PolicyPassThrough just calls the underlying Clientset or the shared informer cache to get or write the workflow PolicyPassThrough = "PassThrough" // PolicyTrackTerminated tracks terminated workflows PolicyTrackTerminated = "TrackTerminated" // PolicyResourceVersionCache uses the resource version on the Workflow object, to determine if the inmemory copy // of the workflow is stale PolicyResourceVersionCache = "ResourceVersionCache" )
Variables ¶
var ErrStaleWorkflowError = fmt.Errorf("stale Workflow Found error")
ErrStaleWorkflowError signals that the local copy of workflow is Stale, i.e., a new version was written to the datastore, But the informer cache has not yet synced to the latest copy
var ErrWorkflowNotFound = fmt.Errorf("workflow not-found error")
ErrWorkflowNotFound indicates that the workflow does not exist and it is safe to ignore the event
var ErrWorkflowTerminated = fmt.Errorf("workflow has already been terminated")
ErrWorkflowTerminated indicates that the workflow being operated on has previously been stored in a terminal state.
var ErrWorkflowToLarge = fmt.Errorf("workflow too large")
ErrWorkflowToLarge is returned in cased an update operation fails because the Workflow object (CRD) has surpassed the Datastores supported limit.
Functions ¶
func IsNotFound ¶
IsNotFound returns true if the error is caused by ErrWorkflowNotFound
func IsWorkflowStale ¶
IsWorkflowStale returns true if the error is caused by ErrStaleWorkflowError
func IsWorkflowTerminated ¶
IsWorkflowTerminated returns true if the error is caused by ErrWorkflowTerminated
func IsWorkflowTooLarge ¶
IsWorkflowTooLarge returns true if the error is caused by ErrWorkflowToLarge
Types ¶
type Config ¶
type Config struct {
Policy Policy `json:"policy" pflag:",Workflow Store Policy to initialize"`
}
Config for Workflow access in the controller. Various policies are available like - InMemory, PassThrough, TrackTerminated, ResourceVersionCache
type ExecutionStatsHolder ¶ added in v1.12.0
type ExecutionStatsHolder struct {
// contains filtered or unexported fields
}
ExecutionStatsHolder manages a map of execution IDs to their ExecutionStats.
func NewExecutionStatsHolder ¶ added in v1.12.0
func NewExecutionStatsHolder() (*ExecutionStatsHolder, error)
NewExecutionStatsHolder creates a new ExecutionStatsHolder instance with an initialized map.
func (*ExecutionStatsHolder) AddOrUpdateEntry ¶ added in v1.12.0
func (esh *ExecutionStatsHolder) AddOrUpdateEntry(executionID string, executionStats SingleExecutionStats) error
AddOrUpdateEntry adds or updates an entry in the executions map.
func (*ExecutionStatsHolder) AggregateActiveValues ¶ added in v1.12.0
func (esh *ExecutionStatsHolder) AggregateActiveValues() (int, uint32, uint32, error)
Returns the aggregate of all active node and task counts in the map.
func (*ExecutionStatsHolder) LogAllActiveExecutions ¶ added in v1.12.0
func (esh *ExecutionStatsHolder) LogAllActiveExecutions(ctx context.Context)
func (*ExecutionStatsHolder) RemoveTerminatedExecutions ¶ added in v1.12.0
func (esh *ExecutionStatsHolder) RemoveTerminatedExecutions(ctx context.Context, workflows map[string]bool) error
RemoveTerminatedExecutions removes all terminated or deleted workflows from the executions map. This expects a set of strings for simplified lookup in the critical section.
type ExecutionStatsMonitor ¶ added in v1.12.0
type ExecutionStatsMonitor struct { Scope promutils.Scope // These are currently aggregated values across all active workflows ActiveNodeExecutions prometheus.Gauge ActiveTaskExecutions prometheus.Gauge ActiveWorkflowExecutions prometheus.Gauge // contains filtered or unexported fields }
func NewExecutionStatsMonitor ¶ added in v1.12.0
func NewExecutionStatsMonitor(scope promutils.Scope, lister lister.FlyteWorkflowLister, activeExecutions *ExecutionStatsHolder) *ExecutionStatsMonitor
func (*ExecutionStatsMonitor) RunStatsMonitor ¶ added in v1.12.0
func (e *ExecutionStatsMonitor) RunStatsMonitor(ctx context.Context)
type FlyteWorkflow ¶
type FlyteWorkflow interface { Get(ctx context.Context, namespace, name string) (*v1alpha1.FlyteWorkflow, error) UpdateStatus(ctx context.Context, workflow *v1alpha1.FlyteWorkflow, priorityClass PriorityClass) ( newWF *v1alpha1.FlyteWorkflow, err error) Update(ctx context.Context, workflow *v1alpha1.FlyteWorkflow, priorityClass PriorityClass) ( newWF *v1alpha1.FlyteWorkflow, err error) }
FlyteWorkflow store interface provides an abstraction of accessing the actual FlyteWorkflow object.
func NewPassthroughWorkflowStore ¶
func NewPassthroughWorkflowStore(_ context.Context, scope promutils.Scope, wfClient v1alpha12.FlyteworkflowV1alpha1Interface, flyteworkflowLister listers.FlyteWorkflowLister) FlyteWorkflow
func NewResourceVersionCachingStore ¶
func NewResourceVersionCachingStore(_ context.Context, scope promutils.Scope, workflowStore FlyteWorkflow) FlyteWorkflow
func NewTerminatedTrackingStore ¶
func NewTerminatedTrackingStore(_ context.Context, scope promutils.Scope, workflowStore FlyteWorkflow) (FlyteWorkflow, error)
func NewWorkflowStore ¶
func NewWorkflowStore(ctx context.Context, cfg *Config, lister v1alpha1.FlyteWorkflowLister, workflows flyteworkflowv1alpha1.FlyteworkflowV1alpha1Interface, scope promutils.Scope) (FlyteWorkflow, error)
type InmemoryWorkflowStore ¶
type InmemoryWorkflowStore struct {
// contains filtered or unexported fields
}
func NewInMemoryWorkflowStore ¶
func NewInMemoryWorkflowStore() *InmemoryWorkflowStore
Returns an inmemory store, that will update the resource version for every update automatically. This is a good idea to test that resource version checks work, but does not represent that the object was actually updated or not The resource version is ALWAYS updated.
func (*InmemoryWorkflowStore) Create ¶
func (i *InmemoryWorkflowStore) Create(ctx context.Context, w *v1alpha1.FlyteWorkflow) error
func (*InmemoryWorkflowStore) Delete ¶
func (i *InmemoryWorkflowStore) Delete(ctx context.Context, namespace, name string) error
func (*InmemoryWorkflowStore) Get ¶
func (i *InmemoryWorkflowStore) Get(ctx context.Context, namespace, name string) (*v1alpha1.FlyteWorkflow, error)
func (*InmemoryWorkflowStore) Update ¶
func (i *InmemoryWorkflowStore) Update(ctx context.Context, w *v1alpha1.FlyteWorkflow, priorityClass PriorityClass) ( newWF *v1alpha1.FlyteWorkflow, err error)
func (*InmemoryWorkflowStore) UpdateStatus ¶
func (i *InmemoryWorkflowStore) UpdateStatus(ctx context.Context, w *v1alpha1.FlyteWorkflow, priorityClass PriorityClass) ( newWF *v1alpha1.FlyteWorkflow, err error)
type PriorityClass ¶
type PriorityClass int
const ( PriorityClassCritical PriorityClass = iota PriorityClassRegular )
type SingleExecutionStats ¶ added in v1.12.0
SingleExecutionStats holds stats about a single workflow execution, such as active node and task counts.