workflowstore

package
v1.12.0-b0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 29, 2024 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// PolicyInMemory provides an inmemory Workflow store which is useful for testing
	PolicyInMemory = "InMemory"
	// PolicyPassThrough just calls the underlying Clientset or the shared informer cache to get or write the workflow
	PolicyPassThrough = "PassThrough"
	// PolicyTrackTerminated tracks terminated workflows
	PolicyTrackTerminated = "TrackTerminated"
	// PolicyResourceVersionCache uses the resource version on the Workflow object, to determine if the inmemory copy
	// of the workflow is stale
	PolicyResourceVersionCache = "ResourceVersionCache"
)

Variables

View Source
var ErrStaleWorkflowError = fmt.Errorf("stale Workflow Found error")

ErrStaleWorkflowError signals that the local copy of workflow is Stale, i.e., a new version was written to the datastore, But the informer cache has not yet synced to the latest copy

View Source
var ErrWorkflowNotFound = fmt.Errorf("workflow not-found error")

ErrWorkflowNotFound indicates that the workflow does not exist and it is safe to ignore the event

View Source
var ErrWorkflowTerminated = fmt.Errorf("workflow has already been terminated")

ErrWorkflowTerminated indicates that the workflow being operated on has previously been stored in a terminal state.

View Source
var ErrWorkflowToLarge = fmt.Errorf("workflow too large")

ErrWorkflowToLarge is returned in cased an update operation fails because the Workflow object (CRD) has surpassed the Datastores supported limit.

Functions

func IsNotFound

func IsNotFound(err error) bool

IsNotFound returns true if the error is caused by ErrWorkflowNotFound

func IsWorkflowStale

func IsWorkflowStale(err error) bool

IsWorkflowStale returns true if the error is caused by ErrStaleWorkflowError

func IsWorkflowTerminated

func IsWorkflowTerminated(err error) bool

IsWorkflowTerminated returns true if the error is caused by ErrWorkflowTerminated

func IsWorkflowTooLarge

func IsWorkflowTooLarge(err error) bool

IsWorkflowTooLarge returns true if the error is caused by ErrWorkflowToLarge

func SetConfig

func SetConfig(cfg *Config) error

Types

type Config

type Config struct {
	Policy Policy `json:"policy" pflag:",Workflow Store Policy to initialize"`
}

Config for Workflow access in the controller. Various policies are available like - InMemory, PassThrough, TrackTerminated, ResourceVersionCache

func GetConfig

func GetConfig() *Config

func (Config) GetPFlagSet

func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet

GetPFlagSet will return strongly types pflags for all fields in Config and its nested types. The format of the flags is json-name.json-sub-name... etc.

type ExecutionStatsHolder added in v1.12.0

type ExecutionStatsHolder struct {
	// contains filtered or unexported fields
}

ExecutionStatsHolder manages a map of execution IDs to their ExecutionStats.

func NewExecutionStatsHolder added in v1.12.0

func NewExecutionStatsHolder() (*ExecutionStatsHolder, error)

NewExecutionStatsHolder creates a new ExecutionStatsHolder instance with an initialized map.

func (*ExecutionStatsHolder) AddOrUpdateEntry added in v1.12.0

func (esh *ExecutionStatsHolder) AddOrUpdateEntry(executionID string, executionStats SingleExecutionStats) error

AddOrUpdateEntry adds or updates an entry in the executions map.

func (*ExecutionStatsHolder) AggregateActiveValues added in v1.12.0

func (esh *ExecutionStatsHolder) AggregateActiveValues() (int, uint32, uint32, error)

Returns the aggregate of all active node and task counts in the map.

func (*ExecutionStatsHolder) LogAllActiveExecutions added in v1.12.0

func (esh *ExecutionStatsHolder) LogAllActiveExecutions(ctx context.Context)

func (*ExecutionStatsHolder) RemoveTerminatedExecutions added in v1.12.0

func (esh *ExecutionStatsHolder) RemoveTerminatedExecutions(ctx context.Context, workflows map[string]bool) error

RemoveTerminatedExecutions removes all terminated or deleted workflows from the executions map. This expects a set of strings for simplified lookup in the critical section.

type ExecutionStatsMonitor added in v1.12.0

type ExecutionStatsMonitor struct {
	Scope promutils.Scope

	// These are currently aggregated values across all active workflows
	ActiveNodeExecutions     prometheus.Gauge
	ActiveTaskExecutions     prometheus.Gauge
	ActiveWorkflowExecutions prometheus.Gauge
	// contains filtered or unexported fields
}

func NewExecutionStatsMonitor added in v1.12.0

func NewExecutionStatsMonitor(scope promutils.Scope, lister lister.FlyteWorkflowLister, activeExecutions *ExecutionStatsHolder) *ExecutionStatsMonitor

func (*ExecutionStatsMonitor) RunStatsMonitor added in v1.12.0

func (e *ExecutionStatsMonitor) RunStatsMonitor(ctx context.Context)

type FlyteWorkflow

type FlyteWorkflow interface {
	Get(ctx context.Context, namespace, name string) (*v1alpha1.FlyteWorkflow, error)
	UpdateStatus(ctx context.Context, workflow *v1alpha1.FlyteWorkflow, priorityClass PriorityClass) (
		newWF *v1alpha1.FlyteWorkflow, err error)
	Update(ctx context.Context, workflow *v1alpha1.FlyteWorkflow, priorityClass PriorityClass) (
		newWF *v1alpha1.FlyteWorkflow, err error)
}

FlyteWorkflow store interface provides an abstraction of accessing the actual FlyteWorkflow object.

func NewPassthroughWorkflowStore

func NewPassthroughWorkflowStore(_ context.Context, scope promutils.Scope, wfClient v1alpha12.FlyteworkflowV1alpha1Interface,
	flyteworkflowLister listers.FlyteWorkflowLister) FlyteWorkflow

func NewResourceVersionCachingStore

func NewResourceVersionCachingStore(_ context.Context, scope promutils.Scope, workflowStore FlyteWorkflow) FlyteWorkflow

func NewTerminatedTrackingStore

func NewTerminatedTrackingStore(_ context.Context, scope promutils.Scope, workflowStore FlyteWorkflow) (FlyteWorkflow, error)

type InmemoryWorkflowStore

type InmemoryWorkflowStore struct {
	// contains filtered or unexported fields
}

func NewInMemoryWorkflowStore

func NewInMemoryWorkflowStore() *InmemoryWorkflowStore

Returns an inmemory store, that will update the resource version for every update automatically. This is a good idea to test that resource version checks work, but does not represent that the object was actually updated or not The resource version is ALWAYS updated.

func (*InmemoryWorkflowStore) Create

func (*InmemoryWorkflowStore) Delete

func (i *InmemoryWorkflowStore) Delete(ctx context.Context, namespace, name string) error

func (*InmemoryWorkflowStore) Get

func (i *InmemoryWorkflowStore) Get(ctx context.Context, namespace, name string) (*v1alpha1.FlyteWorkflow, error)

func (*InmemoryWorkflowStore) Update

func (i *InmemoryWorkflowStore) Update(ctx context.Context, w *v1alpha1.FlyteWorkflow, priorityClass PriorityClass) (
	newWF *v1alpha1.FlyteWorkflow, err error)

func (*InmemoryWorkflowStore) UpdateStatus

func (i *InmemoryWorkflowStore) UpdateStatus(ctx context.Context, w *v1alpha1.FlyteWorkflow, priorityClass PriorityClass) (
	newWF *v1alpha1.FlyteWorkflow, err error)

type Policy

type Policy = string

type PriorityClass

type PriorityClass int
const (
	PriorityClassCritical PriorityClass = iota
	PriorityClassRegular
)

type SingleExecutionStats added in v1.12.0

type SingleExecutionStats struct {
	ActiveNodeCount uint32
	ActiveTaskCount uint32
}

SingleExecutionStats holds stats about a single workflow execution, such as active node and task counts.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL