Documentation ¶
Overview ¶
Package config contains the core configuration for FlytePropeller. This configuration can be added under the “propeller“ section.
Example config:
----------------
propeller: rawoutput-prefix: s3://my-container/test/ metadata-prefix: metadata/propeller/sandbox workers: 4 workflow-reeval-duration: 10s downstream-eval-duration: 5s limit-namespace: "all" prof-port: 11254 metrics-prefix: flyte enable-admin-launcher: true max-ttl-hours: 1 gc-interval: 500m queue: type: batch queue: type: bucket rate: 1000 capacity: 10000 sub-queue: type: bucket rate: 1000 capacity: 10000 # This config assumes using `make start` in flytesnacks repo to startup a DinD k3s container kube-config: "$HOME/kubeconfig/k3s/k3s.yaml" publish-k8s-events: true workflowStore: policy: "ResourceVersionCache"
Index ¶
- func MustRegisterSubSection(subSectionKey string, section config.Config) config.Section
- type ArrayNodeConfig
- type CompositeQueueConfig
- type CompositeQueueType
- type Config
- type DefaultDeadlines
- type EventConfig
- type KubeClientConfig
- type LeaderElectionConfig
- type LiteralOffloadingConfig
- type NodeConfig
- type ParallelismBehavior
- type RawOutputPolicy
- type WorkqueueConfig
- type WorkqueueType
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type ArrayNodeConfig ¶ added in v1.12.0
type ArrayNodeConfig struct { EventVersion int `json:"event-version" pflag:",ArrayNode eventing version. 0 => legacy (drop-in replacement for maptask), 1 => new"` DefaultParallelismBehavior ParallelismBehavior `json:"default-parallelism-behavior" pflag:",Default parallelism behavior for array nodes"` UseMapPluginLogs bool `json:"use-map-plugin-logs" pflag:",Override subNode log links with those configured for the map plugin logs"` }
type CompositeQueueConfig ¶
type CompositeQueueConfig struct { Type CompositeQueueType `json:"type" pflag:",Type of composite queue to use for the WorkQueue"` Queue WorkqueueConfig `json:"queue,omitempty" pflag:",Workflow workqueue configuration, affects the way the work is consumed from the queue."` Sub WorkqueueConfig `` /* 130-byte string literal not displayed */ BatchingInterval config.Duration `json:"batching-interval" pflag:",Duration for which downstream updates are buffered"` BatchSize int `` /* 135-byte string literal not displayed */ }
CompositeQueueConfig contains configuration for the controller queue and the downstream resource queue
type CompositeQueueType ¶
type CompositeQueueType = string
const ( CompositeQueueSimple CompositeQueueType = "simple" CompositeQueueBatch CompositeQueueType = "batch" )
type Config ¶
type Config struct { KubeConfigPath string `json:"kube-config" pflag:",Path to kubernetes client config file."` MasterURL string `json:"master"` Workers int `json:"workers" pflag:",Number of threads to process workflows"` WorkflowReEval config.Duration `json:"workflow-reeval-duration" pflag:",Frequency of re-evaluating workflows"` DownstreamEval config.Duration `json:"downstream-eval-duration" pflag:",Frequency of re-evaluating downstream tasks"` LimitNamespace string `json:"limit-namespace" pflag:",Namespaces to watch for this propeller"` ProfilerPort config.Port `json:"prof-port" pflag:",Profiler port"` MetadataPrefix string `` /* 244-byte string literal not displayed */ DefaultRawOutputPrefix string `` /* 138-byte string literal not displayed */ Queue CompositeQueueConfig `json:"queue,omitempty" pflag:",Workflow workqueue configuration, affects the way the work is consumed from the queue."` MetricsPrefix string `json:"metrics-prefix" pflag:",An optional prefix for all published metrics."` MetricKeys []string `json:"metrics-keys" pflag:",Metrics labels applied to prometheus metrics emitted by the service."` EnableAdminLauncher bool `json:"enable-admin-launcher" pflag:"Enable remote Workflow launcher to Admin"` MaxWorkflowRetries int `json:"max-workflow-retries" pflag:"Maximum number of retries per workflow"` MaxTTLInHours int `json:"max-ttl-hours" pflag:"Maximum number of hours a completed workflow should be retained. Number between 1-23 hours"` GCInterval config.Duration `json:"gc-interval" pflag:"Run periodic GC every 30 minutes"` LeaderElection LeaderElectionConfig `json:"leader-election,omitempty" pflag:",Config for leader election."` PublishK8sEvents bool `json:"publish-k8s-events" pflag:",Enable events publishing to K8s events API."` MaxDatasetSizeBytes int64 `json:"max-output-size-bytes" pflag:",Deprecated! Use storage.limits.maxDownloadMBs instead"` EnableGrpcLatencyMetrics bool `` /* 136-byte string literal not displayed */ KubeConfig KubeClientConfig `json:"kube-client-config" pflag:",Configuration to control the Kubernetes client"` NodeConfig NodeConfig `json:"node-config,omitempty" pflag:",config for a workflow node"` MaxStreakLength int `` /* 152-byte string literal not displayed */ EventConfig EventConfig `json:"event-config,omitempty" pflag:",Configures execution event behavior."` IncludeShardKeyLabel []string `json:"include-shard-key-label" pflag:",Include the specified shard key label in the k8s FlyteWorkflow CRD label selector"` ExcludeShardKeyLabel []string `json:"exclude-shard-key-label" pflag:",Exclude the specified shard key label from the k8s FlyteWorkflow CRD label selector"` IncludeProjectLabel []string `json:"include-project-label" pflag:",Include the specified project label in the k8s FlyteWorkflow CRD label selector"` ExcludeProjectLabel []string `json:"exclude-project-label" pflag:",Exclude the specified project label from the k8s FlyteWorkflow CRD label selector"` IncludeDomainLabel []string `json:"include-domain-label" pflag:",Include the specified domain label in the k8s FlyteWorkflow CRD label selector"` ExcludeDomainLabel []string `json:"exclude-domain-label" pflag:",Exclude the specified domain label from the k8s FlyteWorkflow CRD label selector"` ClusterID string `json:"cluster-id" pflag:",Unique cluster id running this flytepropeller instance with which to annotate execution events"` CreateFlyteWorkflowCRD bool `json:"create-flyteworkflow-crd" pflag:",Enable creation of the FlyteWorkflow CRD on startup"` NodeExecutionWorkerCount int `` /* 126-byte string literal not displayed */ ArrayNode ArrayNodeConfig `json:"array-node-config,omitempty" pflag:",Configuration for array nodes"` LiteralOffloadingConfig LiteralOffloadingConfig `json:"literal-offloading-config" pflag:",config used for literal offloading."` }
Config that uses the flytestdlib Config module to generate commandline and load config files. This configuration is the base configuration to start propeller NOTE: when adding new fields, do not mark them as "omitempty" if it's desirable to read the value from env variables.
type DefaultDeadlines ¶
type DefaultDeadlines struct { DefaultNodeExecutionDeadline config.Duration `` /* 133-byte string literal not displayed */ DefaultNodeActiveDeadline config.Duration `json:"node-active-deadline" pflag:",Default value of node timeout that includes the time spent queued."` DefaultWorkflowActiveDeadline config.Duration `json:"workflow-active-deadline" pflag:",Default value of workflow timeout that includes the time spent queued."` }
DefaultDeadlines contains default values for timeouts
type EventConfig ¶
type EventConfig struct { RawOutputPolicy RawOutputPolicy `json:"raw-output-policy" pflag:",How output data should be passed along in execution events."` FallbackToOutputReference bool `` /* 152-byte string literal not displayed */ // only meant to be overridden for certain node types that have different eventing behavior such as ArrayNode ErrorOnAlreadyExists bool `json:"-"` }
type KubeClientConfig ¶
type KubeClientConfig struct { // QPS indicates the maximum QPS to the master from this client. // If it's zero, the created RESTClient will use DefaultQPS: 5 QPS float32 `json:"qps" pflag:"-,Max QPS to the master for requests to KubeAPI. 0 defaults to 5."` // Maximum burst for throttle. // If it's zero, the created RESTClient will use DefaultBurst: 10. Burst int `json:"burst" pflag:",Max burst rate for throttle. 0 defaults to 10"` // The maximum length of time to wait before giving up on a server request. A value of zero means no timeout. Timeout config.Duration `json:"timeout" pflag:",Max duration allowed for every request to KubeAPI before giving up. 0 implies no timeout."` }
KubeClientConfig contains the configuration used by flytepropeller to configure its internal Kubernetes Client.
type LeaderElectionConfig ¶
type LeaderElectionConfig struct { // Enable or disable leader election. Enabled bool `json:"enabled" pflag:",Enables/Disables leader election."` // Determines the name of the configmap that leader election will use for holding the leader lock. LockConfigMap types.NamespacedName `json:"lock-config-map" pflag:",ConfigMap namespace/name to use for resource lock."` // Duration that non-leader candidates will wait to force acquire leadership. This is measured against time of last // observed ack LeaseDuration config.Duration `` /* 157-byte string literal not displayed */ // RenewDeadline is the duration that the acting master will retry refreshing leadership before giving up. RenewDeadline config.Duration `json:"renew-deadline" pflag:",Duration that the acting master will retry refreshing leadership before giving up."` // RetryPeriod is the duration the LeaderElector clients should wait between tries of actions. RetryPeriod config.Duration `json:"retry-period" pflag:",Duration the LeaderElector clients should wait between tries of actions."` }
LeaderElectionConfig Contains leader election configuration.
type LiteralOffloadingConfig ¶ added in v1.13.2
type LiteralOffloadingConfig struct { Enabled bool // Maps flytekit and union SDK names to minimum supported version that can handle reading offloaded literals. SupportedSDKVersions map[string]string `` /* 145-byte string literal not displayed */ // Default, 10Mbs. Determines the size of a literal at which to trigger offloading MinSizeInMBForOffloading int64 `json:"min-size-in-mb-for-offloading" pflag:",Size of a literal at which to trigger offloading"` // Fail fast threshold MaxSizeInMBForOffloading int64 `json:"max-size-in-mb-for-offloading" pflag:",Size of a literal at which to fail fast"` }
func (LiteralOffloadingConfig) GetSupportedSDKVersion ¶ added in v1.13.2
func (l LiteralOffloadingConfig) GetSupportedSDKVersion(sdk string) string
GetSupportedSDKVersion returns the least supported version for the provided SDK.
func (LiteralOffloadingConfig) IsSupportedSDKVersion ¶ added in v1.13.2
func (l LiteralOffloadingConfig) IsSupportedSDKVersion(ctx context.Context, sdk string, versionString string) bool
IsSupportedSDKVersion returns true if the provided SDK and version are supported by the literal offloading config.
type NodeConfig ¶
type NodeConfig struct { DefaultDeadlines DefaultDeadlines `json:"default-deadlines,omitempty" pflag:",Default value for timeouts"` MaxNodeRetriesOnSystemFailures int64 `json:"max-node-retries-system-failures" pflag:"2,Maximum number of retries per node for node failure due to infra issues"` InterruptibleFailureThreshold int32 `` /* 213-byte string literal not displayed */ DefaultMaxAttempts int32 `json:"default-max-attempts" pflag:"3,Default maximum number of attempts for a node"` IgnoreRetryCause bool `json:"ignore-retry-cause" pflag:",Ignore retry cause and count all attempts toward a node's max attempts"` EnableCRDebugMetadata bool `` /* 172-byte string literal not displayed */ }
NodeConfig contains configuration that is useful for every node execution
type ParallelismBehavior ¶ added in v1.12.0
type ParallelismBehavior = string
ParallelismBehavior defines how ArrayNode should handle subNode parallelism by default
const ( // ParallelismBehaviorHybrid means that ArrayNode will adhere to the parallelism defined in the // ArrayNode exactly. This means `nil` will use the workflow parallelism, and 0 will have // unlimited parallelism. ParallelismBehaviorHybrid ParallelismBehavior = "hybrid" // ParallelismBehaviorUnlimited means that ArrayNode subNodes will be evaluated with unlimited // parallelism for both nil and 0. If a non-default (ie. nil / zero) parallelism is set, then // ArrayNode will adhere to that value. ParallelismBehaviorUnlimited ParallelismBehavior = "unlimited" // ParallelismBehaviorWorkflow means that ArrayNode subNodes will be evaluated using the // configured workflow parallelism for both nil and 0. If a non-default (ie. nil / zero) // parallelism is set, then ArrayNode will adhere to that value. ParallelismBehaviorWorkflow ParallelismBehavior = "workflow" )
type RawOutputPolicy ¶
type RawOutputPolicy = string
Defines how output data should be passed along in execution events.
const ( // Only send output data as a URI referencing where outputs have been uploaded RawOutputPolicyReference RawOutputPolicy = "reference" // Send raw output data in events. RawOutputPolicyInline RawOutputPolicy = "inline" )
type WorkqueueConfig ¶
type WorkqueueConfig struct { // Refer to https://github.com/kubernetes/client-go/tree/master/util/workqueue Type WorkqueueType `json:"type" pflag:",Type of RateLimiter to use for the WorkQueue"` BaseDelay config.Duration `json:"base-delay" pflag:",base backoff delay for failure"` MaxDelay config.Duration `json:"max-delay" pflag:",Max backoff delay for failure"` Rate int64 `json:"rate" pflag:",Bucket Refill rate per second"` Capacity int `json:"capacity" pflag:",Bucket capacity as number of items"` }
WorkqueueConfig has the configuration to configure a workqueue. We may want to generalize this in a package like k8sutils
type WorkqueueType ¶
type WorkqueueType = string
const ( WorkqueueTypeDefault WorkqueueType = "default" WorkqueueTypeBucketRateLimiter WorkqueueType = "bucket" WorkqueueTypeExponentialFailureRateLimiter WorkqueueType = "expfailure" WorkqueueTypeMaxOfRateLimiter WorkqueueType = "maxof" )