config

package
v1.13.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 23, 2024 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Overview

Package config contains the core configuration for FlytePropeller. This configuration can be added under the “propeller“ section.

Example config:

----------------

propeller:
   rawoutput-prefix: s3://my-container/test/
   metadata-prefix: metadata/propeller/sandbox
   workers: 4
   workflow-reeval-duration: 10s
   downstream-eval-duration: 5s
   limit-namespace: "all"
   prof-port: 11254
   metrics-prefix: flyte
   enable-admin-launcher: true
   max-ttl-hours: 1
   gc-interval: 500m
   queue:
     type: batch
     queue:
       type: bucket
       rate: 1000
       capacity: 10000
     sub-queue:
       type: bucket
       rate: 1000
       capacity: 10000
   # This config assumes using `make start` in flytesnacks repo to startup a DinD k3s container
   kube-config: "$HOME/kubeconfig/k3s/k3s.yaml"
   publish-k8s-events: true
   workflowStore:
     policy: "ResourceVersionCache"

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func MustRegisterSubSection

func MustRegisterSubSection(subSectionKey string, section config.Config) config.Section

MustRegisterSubSection can be used to configure any subsections the the propeller configuration

Types

type ArrayNodeConfig added in v1.12.0

type ArrayNodeConfig struct {
	EventVersion               int                 `json:"event-version" pflag:",ArrayNode eventing version. 0 => legacy (drop-in replacement for maptask), 1 => new"`
	DefaultParallelismBehavior ParallelismBehavior `json:"default-parallelism-behavior" pflag:",Default parallelism behavior for array nodes"`
	UseMapPluginLogs           bool                `json:"use-map-plugin-logs" pflag:",Override subNode log links with those configured for the map plugin logs"`
}

type CompositeQueueConfig

type CompositeQueueConfig struct {
	Type             CompositeQueueType `json:"type" pflag:",Type of composite queue to use for the WorkQueue"`
	Queue            WorkqueueConfig    `json:"queue,omitempty" pflag:",Workflow workqueue configuration, affects the way the work is consumed from the queue."`
	Sub              WorkqueueConfig    `` /* 130-byte string literal not displayed */
	BatchingInterval config.Duration    `json:"batching-interval" pflag:",Duration for which downstream updates are buffered"`
	BatchSize        int                `` /* 135-byte string literal not displayed */
}

CompositeQueueConfig contains configuration for the controller queue and the downstream resource queue

type CompositeQueueType

type CompositeQueueType = string
const (
	CompositeQueueSimple CompositeQueueType = "simple"
	CompositeQueueBatch  CompositeQueueType = "batch"
)

type Config

type Config struct {
	KubeConfigPath           string                  `json:"kube-config" pflag:",Path to kubernetes client config file."`
	MasterURL                string                  `json:"master"`
	Workers                  int                     `json:"workers" pflag:",Number of threads to process workflows"`
	WorkflowReEval           config.Duration         `json:"workflow-reeval-duration" pflag:",Frequency of re-evaluating workflows"`
	DownstreamEval           config.Duration         `json:"downstream-eval-duration" pflag:",Frequency of re-evaluating downstream tasks"`
	LimitNamespace           string                  `json:"limit-namespace" pflag:",Namespaces to watch for this propeller"`
	ProfilerPort             config.Port             `json:"prof-port" pflag:",Profiler port"`
	MetadataPrefix           string                  `` /* 244-byte string literal not displayed */
	DefaultRawOutputPrefix   string                  `` /* 138-byte string literal not displayed */
	Queue                    CompositeQueueConfig    `json:"queue,omitempty" pflag:",Workflow workqueue configuration, affects the way the work is consumed from the queue."`
	MetricsPrefix            string                  `json:"metrics-prefix" pflag:",An optional prefix for all published metrics."`
	MetricKeys               []string                `json:"metrics-keys" pflag:",Metrics labels applied to prometheus metrics emitted by the service."`
	EnableAdminLauncher      bool                    `json:"enable-admin-launcher" pflag:"Enable remote Workflow launcher to Admin"`
	MaxWorkflowRetries       int                     `json:"max-workflow-retries" pflag:"Maximum number of retries per workflow"`
	MaxTTLInHours            int                     `json:"max-ttl-hours" pflag:"Maximum number of hours a completed workflow should be retained. Number between 1-23 hours"`
	GCInterval               config.Duration         `json:"gc-interval" pflag:"Run periodic GC every 30 minutes"`
	LeaderElection           LeaderElectionConfig    `json:"leader-election,omitempty" pflag:",Config for leader election."`
	PublishK8sEvents         bool                    `json:"publish-k8s-events" pflag:",Enable events publishing to K8s events API."`
	MaxDatasetSizeBytes      int64                   `json:"max-output-size-bytes" pflag:",Deprecated! Use storage.limits.maxDownloadMBs instead"`
	EnableGrpcLatencyMetrics bool                    `` /* 136-byte string literal not displayed */
	KubeConfig               KubeClientConfig        `json:"kube-client-config" pflag:",Configuration to control the Kubernetes client"`
	NodeConfig               NodeConfig              `json:"node-config,omitempty" pflag:",config for a workflow node"`
	MaxStreakLength          int                     `` /* 152-byte string literal not displayed */
	EventConfig              EventConfig             `json:"event-config,omitempty" pflag:",Configures execution event behavior."`
	IncludeShardKeyLabel     []string                `json:"include-shard-key-label" pflag:",Include the specified shard key label in the k8s FlyteWorkflow CRD label selector"`
	ExcludeShardKeyLabel     []string                `json:"exclude-shard-key-label" pflag:",Exclude the specified shard key label from the k8s FlyteWorkflow CRD label selector"`
	IncludeProjectLabel      []string                `json:"include-project-label" pflag:",Include the specified project label in the k8s FlyteWorkflow CRD label selector"`
	ExcludeProjectLabel      []string                `json:"exclude-project-label" pflag:",Exclude the specified project label from the k8s FlyteWorkflow CRD label selector"`
	IncludeDomainLabel       []string                `json:"include-domain-label" pflag:",Include the specified domain label in the k8s FlyteWorkflow CRD label selector"`
	ExcludeDomainLabel       []string                `json:"exclude-domain-label" pflag:",Exclude the specified domain label from the k8s FlyteWorkflow CRD label selector"`
	ClusterID                string                  `json:"cluster-id" pflag:",Unique cluster id running this flytepropeller instance with which to annotate execution events"`
	CreateFlyteWorkflowCRD   bool                    `json:"create-flyteworkflow-crd" pflag:",Enable creation of the FlyteWorkflow CRD on startup"`
	NodeExecutionWorkerCount int                     `` /* 126-byte string literal not displayed */
	ArrayNode                ArrayNodeConfig         `json:"array-node-config,omitempty" pflag:",Configuration for array nodes"`
	LiteralOffloadingConfig  LiteralOffloadingConfig `json:"literal-offloading-config" pflag:",config used for literal offloading."`
}

Config that uses the flytestdlib Config module to generate commandline and load config files. This configuration is the base configuration to start propeller NOTE: when adding new fields, do not mark them as "omitempty" if it's desirable to read the value from env variables.

func GetConfig

func GetConfig() *Config

GetConfig extracts the Configuration from the global config module in flytestdlib and returns the corresponding type-casted object.

func (Config) GetPFlagSet

func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet

GetPFlagSet will return strongly types pflags for all fields in Config and its nested types. The format of the flags is json-name.json-sub-name... etc.

type DefaultDeadlines

type DefaultDeadlines struct {
	DefaultNodeExecutionDeadline  config.Duration `` /* 133-byte string literal not displayed */
	DefaultNodeActiveDeadline     config.Duration `json:"node-active-deadline" pflag:",Default value of node timeout that includes the time spent queued."`
	DefaultWorkflowActiveDeadline config.Duration `json:"workflow-active-deadline" pflag:",Default value of workflow timeout that includes the time spent queued."`
}

DefaultDeadlines contains default values for timeouts

type EventConfig

type EventConfig struct {
	RawOutputPolicy           RawOutputPolicy `json:"raw-output-policy" pflag:",How output data should be passed along in execution events."`
	FallbackToOutputReference bool            `` /* 152-byte string literal not displayed */
	// only meant to be overridden for certain node types that have different eventing behavior such as ArrayNode
	ErrorOnAlreadyExists bool `json:"-"`
}

type KubeClientConfig

type KubeClientConfig struct {
	// QPS indicates the maximum QPS to the master from this client.
	// If it's zero, the created RESTClient will use DefaultQPS: 5
	QPS float32 `json:"qps" pflag:"-,Max QPS to the master for requests to KubeAPI. 0 defaults to 5."`
	// Maximum burst for throttle.
	// If it's zero, the created RESTClient will use DefaultBurst: 10.
	Burst int `json:"burst" pflag:",Max burst rate for throttle. 0 defaults to 10"`
	// The maximum length of time to wait before giving up on a server request. A value of zero means no timeout.
	Timeout config.Duration `json:"timeout" pflag:",Max duration allowed for every request to KubeAPI before giving up. 0 implies no timeout."`
}

KubeClientConfig contains the configuration used by flytepropeller to configure its internal Kubernetes Client.

type LeaderElectionConfig

type LeaderElectionConfig struct {
	// Enable or disable leader election.
	Enabled bool `json:"enabled" pflag:",Enables/Disables leader election."`

	// Determines the name of the configmap that leader election will use for holding the leader lock.
	LockConfigMap types.NamespacedName `json:"lock-config-map" pflag:",ConfigMap namespace/name to use for resource lock."`

	// Duration that non-leader candidates will wait to force acquire leadership. This is measured against time of last
	// observed ack
	LeaseDuration config.Duration `` /* 157-byte string literal not displayed */

	// RenewDeadline is the duration that the acting master will retry refreshing leadership before giving up.
	RenewDeadline config.Duration `json:"renew-deadline" pflag:",Duration that the acting master will retry refreshing leadership before giving up."`

	// RetryPeriod is the duration the LeaderElector clients should wait between tries of actions.
	RetryPeriod config.Duration `json:"retry-period" pflag:",Duration the LeaderElector clients should wait between tries of actions."`
}

LeaderElectionConfig Contains leader election configuration.

type LiteralOffloadingConfig added in v1.13.2

type LiteralOffloadingConfig struct {
	Enabled bool
	// Maps flytekit and union SDK names to minimum supported version that can handle reading offloaded literals.
	SupportedSDKVersions map[string]string `` /* 145-byte string literal not displayed */
	// Default, 10Mbs. Determines the size of a literal at which to trigger offloading
	MinSizeInMBForOffloading int64 `json:"min-size-in-mb-for-offloading" pflag:",Size of a literal at which to trigger offloading"`
	// Fail fast threshold
	MaxSizeInMBForOffloading int64 `json:"max-size-in-mb-for-offloading" pflag:",Size of a literal at which to fail fast"`
}

func (LiteralOffloadingConfig) GetSupportedSDKVersion added in v1.13.2

func (l LiteralOffloadingConfig) GetSupportedSDKVersion(sdk string) string

GetSupportedSDKVersion returns the least supported version for the provided SDK.

func (LiteralOffloadingConfig) IsSupportedSDKVersion added in v1.13.2

func (l LiteralOffloadingConfig) IsSupportedSDKVersion(ctx context.Context, sdk string, versionString string) bool

IsSupportedSDKVersion returns true if the provided SDK and version are supported by the literal offloading config.

type NodeConfig

type NodeConfig struct {
	DefaultDeadlines               DefaultDeadlines `json:"default-deadlines,omitempty" pflag:",Default value for timeouts"`
	MaxNodeRetriesOnSystemFailures int64            `json:"max-node-retries-system-failures" pflag:"2,Maximum number of retries per node for node failure due to infra issues"`
	InterruptibleFailureThreshold  int32            `` /* 213-byte string literal not displayed */
	DefaultMaxAttempts             int32            `json:"default-max-attempts" pflag:"3,Default maximum number of attempts for a node"`
	IgnoreRetryCause               bool             `json:"ignore-retry-cause" pflag:",Ignore retry cause and count all attempts toward a node's max attempts"`
	EnableCRDebugMetadata          bool             `` /* 172-byte string literal not displayed */
}

NodeConfig contains configuration that is useful for every node execution

type ParallelismBehavior added in v1.12.0

type ParallelismBehavior = string

ParallelismBehavior defines how ArrayNode should handle subNode parallelism by default

const (
	// ParallelismBehaviorHybrid means that ArrayNode will adhere to the parallelism defined in the
	// ArrayNode exactly. This means `nil` will use the workflow parallelism, and 0 will have
	// unlimited parallelism.
	ParallelismBehaviorHybrid ParallelismBehavior = "hybrid"

	// ParallelismBehaviorUnlimited means that ArrayNode subNodes will be evaluated with unlimited
	// parallelism for both nil and 0. If a non-default (ie. nil / zero) parallelism is set, then
	// ArrayNode will adhere to that value.
	ParallelismBehaviorUnlimited ParallelismBehavior = "unlimited"

	// ParallelismBehaviorWorkflow means that ArrayNode subNodes will be evaluated using the
	// configured workflow parallelism for both nil and 0. If a non-default (ie. nil / zero)
	// parallelism is set, then ArrayNode will adhere to that value.
	ParallelismBehaviorWorkflow ParallelismBehavior = "workflow"
)

type RawOutputPolicy

type RawOutputPolicy = string

Defines how output data should be passed along in execution events.

const (
	// Only send output data as a URI referencing where outputs have been uploaded
	RawOutputPolicyReference RawOutputPolicy = "reference"
	// Send raw output data in events.
	RawOutputPolicyInline RawOutputPolicy = "inline"
)

type WorkqueueConfig

type WorkqueueConfig struct {
	// Refer to https://github.com/kubernetes/client-go/tree/master/util/workqueue
	Type      WorkqueueType   `json:"type" pflag:",Type of RateLimiter to use for the WorkQueue"`
	BaseDelay config.Duration `json:"base-delay" pflag:",base backoff delay for failure"`
	MaxDelay  config.Duration `json:"max-delay" pflag:",Max backoff delay for failure"`
	Rate      int64           `json:"rate" pflag:",Bucket Refill rate per second"`
	Capacity  int             `json:"capacity" pflag:",Bucket capacity as number of items"`
}

WorkqueueConfig has the configuration to configure a workqueue. We may want to generalize this in a package like k8sutils

type WorkqueueType

type WorkqueueType = string
const (
	WorkqueueTypeDefault                       WorkqueueType = "default"
	WorkqueueTypeBucketRateLimiter             WorkqueueType = "bucket"
	WorkqueueTypeExponentialFailureRateLimiter WorkqueueType = "expfailure"
	WorkqueueTypeMaxOfRateLimiter              WorkqueueType = "maxof"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL