util

package
v0.0.0-...-13f83cf Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 2, 2024 License: Apache-2.0 Imports: 63 Imported by: 26

Documentation

Index

Constants

View Source
const (
	// LabelKeyScheduledWorkflowEnabled is a label on a ScheduledWorkflow.
	// It captures whether the ScheduledWorkflow is enabled.
	LabelKeyScheduledWorkflowEnabled = constants.FullName + "/enabled"
	// LabelKeyScheduledWorkflowStatus is a label on a ScheduledWorkflow.
	// It captures the status of the scheduled workflow.
	LabelKeyScheduledWorkflowStatus = constants.FullName + "/status"

	// The maximum byte sizes of the parameter column in package/pipeline DB.
	MaxParameterBytes = 10000

	// LabelKeyWorkflowEpoch is a label on a Workflow.
	// It captures the epoch at which the workflow was scheduled.
	LabelKeyWorkflowEpoch = constants.FullName + "/workflowEpoch"
	// LabelKeyWorkflowIndex is a label on a Workflow.
	// It captures the index of creation the workflow by the ScheduledWorkflow.
	LabelKeyWorkflowIndex = constants.FullName + "/workflowIndex"
	// LabelKeyWorkflowIsOwnedByScheduledWorkflow is a label on a Workflow.
	// It captures whether the workflow is owned by a ScheduledWorkflow.
	LabelKeyWorkflowIsOwnedByScheduledWorkflow = constants.FullName + "/isOwnedByScheduledWorkflow"
	// LabelKeyWorkflowScheduledWorkflowName is a label on a Workflow.
	// It captures whether the name of the owning ScheduledWorkflow.
	LabelKeyWorkflowScheduledWorkflowName = constants.FullName + "/scheduledWorkflowName"

	LabelKeyWorkflowRunId               = "pipeline/runid"
	LabelKeyWorkflowPersistedFinalState = "pipeline/persistedFinalState"

	// LabelKeyWorkflowEpoch is a Workflow annotation key.
	// It captures the the name of the Run.
	AnnotationKeyRunName = "pipelines.kubeflow.org/run_name"

	AnnotationKeyIstioSidecarInject           = "sidecar.istio.io/inject"
	AnnotationValueIstioSidecarInjectEnabled  = "true"
	AnnotationValueIstioSidecarInjectDisabled = "false"

	// LabelKeyCacheEnabled is a workflow label key.
	// It captures whether this step will be selected by cache service.
	// To disable/enable cache for a single run, this label needs to be added in every step under a run.
	LabelKeyCacheEnabled = "pipelines.kubeflow.org/cache_enabled"
)
View Source
const (
	SWFv1      ScheduledWorkflowType = "v1beta1"
	SWFv2      ScheduledWorkflowType = "v2beta1"
	SWFlegacy  ScheduledWorkflowType = "legacy"
	SWFunknown ScheduledWorkflowType = "Unknown"

	ApiVersionV1 = "kubeflow.org/v1beta1"
	ApiVersionV2 = "kubeflow.org/v2beta1"
	SwfKind      = "ScheduledWorkflow"
)
View Source
const (
	API_CODE_NOT_FOUND = 404
)
View Source
const (
	IndexExpression = "[[Index]]"
)

Variables

This section is empty.

Functions

func AnyStringPtr

func AnyStringPtr(val interface{}) *string

func ArchiveTgz

func ArchiveTgz(files map[string]string) (string, error)

ArchiveTgz takes a map of files with name as key and content as value and tar and gzip it to a tgz content string. Nested files and directories are not supported.

func BoolNilOrValue

func BoolNilOrValue(b *bool) string

func BoolPointer

func BoolPointer(b bool) *bool

func BooleanPointer

func BooleanPointer(b bool) *bool

BooleanPointer converts a bool to a bool pointer.

func DateTimePointer

func DateTimePointer(t strfmt.DateTime) *strfmt.DateTime

func ExtractErrorForCLI

func ExtractErrorForCLI(err error, isDebugMode bool) error

func ExtractMasterIPAndPort

func ExtractMasterIPAndPort(config *rest.Config) string

func ExtractTgz

func ExtractTgz(tgzContent string) (map[string]string, error)

ExtractTgz extracts a list of files from a tgz content. The output is a map with file name as key and content as value. Nested files and directories are not supported.

func FormatInt64ForLabel

func FormatInt64ForLabel(epoch int64) string

func FormatTimeForLogging

func FormatTimeForLogging(epoch int64) string

FormatTimeForLogging formats an epoch for logging purposes.

func GetKubernetesClientFromClientConfig

func GetKubernetesClientFromClientConfig(clientConfig clientcmd.ClientConfig) (
	*kubernetes.Clientset, *rest.Config, string, error,
)

func GetMetricValue

func GetMetricValue(collector prometheus.Collector) float64

GetMetricValue get metric value from registered Collector

func GetRpcConnection

func GetRpcConnection(address string) (*grpc.ClientConn, error)

func GetTerminatePatch

func GetTerminatePatch(execType ExecutionType) interface{}

func HasCustomCode

func HasCustomCode(err error, code CustomCode) bool

func Int32Pointer

func Int32Pointer(i int32) *int32

func Int64NilOrValue

func Int64NilOrValue(i *int64) string

func Int64Pointer

func Int64Pointer(i int64) *int64

func IsNotFound

func IsNotFound(err error) bool

IsNotFound returns whether an error indicates that a Kubernetes resource was "not found". This does not identify UserError with codes.NotFound errors, use IsUserErrorCodeMatch instead.

func IsUserErrorCodeMatch

func IsUserErrorCodeMatch(err error, code codes.Code) bool

IsUserErrorCodeMatch returns whether the error is a user error with specified code.

func LogError

func LogError(err error)

func MarshalJsonOrFail

func MarshalJsonOrFail(v interface{}) []byte

func MarshalJsonWithError

func MarshalJsonWithError(v interface{}) ([]byte, error)

Converts an object into []byte array

func MarshalParameters

func MarshalParameters(execType ExecutionType, params SpecParameters) (string, error)

Marshal parameters to JSON encoded string. This also checks result is not longer than a limit.

func MarshalParametersPipelineRun

func MarshalParametersPipelineRun(params SpecParameters) (string, error)

func MarshalParametersWorkflow

func MarshalParametersWorkflow(params SpecParameters) (string, error)

Marshal parameters to JSON encoded string. This also checks result is not longer than a limit.

func MetaV1TimePointer

func MetaV1TimePointer(t metav1.Time) *metav1.Time

func Metav1TimePointer

func Metav1TimePointer(t metav1.Time) *metav1.Time

Metav1TimePointer converts a metav1.Time to a pointer.

func ParseTimeOrFatal

func ParseTimeOrFatal(value string) time.Time

func RetrieveInt64FromLabel

func RetrieveInt64FromLabel(epoch string) (int64, error)

RetrieveInt64FromLabel converts a string label value into an epoch.

func RetrievePodName

func RetrievePodName(wf workflowapi.Workflow, node workflowapi.NodeStatus) string

Derives the Pod name from a given workflowapi.Workflow and workflowapi.NodeStatus This is a workaround for an upstream breaking change with node.ID and node.Name mismatches, see https://github.com/argoproj/argo-workflows/issues/10107#issuecomment-1536113642

func SetExecutionType

func SetExecutionType(newType ExecutionType)

Setter of the executionType

func StringNilOrValue

func StringNilOrValue(s *string) string

func StringPointer

func StringPointer(s string) *string

func TerminateIfError

func TerminateIfError(err error)

TerminateIfError Check if error is nil. Terminate if not.

func TimePointer

func TimePointer(t time.Time) *time.Time

func ToAnyStringPointer

func ToAnyStringPointer(s *string) *workflowapi.AnyString

func ToError

func ToError(s *status.Status) error

Converts google.rpc.Status to an error.

func ToGRPCError

func ToGRPCError(err error) error

func ToGRPCStatus

func ToGRPCStatus(err error) *status1.Status

func ToInt64Pointer

func ToInt64Pointer(t *metav1.Time) *int64

func ToRpcStatus

func ToRpcStatus(e error) *status.Status

Converts an error to google.rpc.Status.

func ToStringPointer

func ToStringPointer(a *workflowapi.AnyString) *string

func Truncate

func Truncate(s string, size float64) string

Truncate the provided string up to provided size

func UInt32Pointer

func UInt32Pointer(i uint32) *uint32

func UnmarshalJsonOrFail

func UnmarshalJsonOrFail(data string, v interface{})

func UnmarshalJsonWithError

func UnmarshalJsonWithError(data interface{}, v *interface{}) error

Converts a []byte array into an interface

func WaitForAPIAvailable

func WaitForAPIAvailable(initializeTimeout time.Duration, basePath string, apiAddress string) error

func Wrap

func Wrap(err error, message string) error

func Wrapf

func Wrapf(err error, format string, args ...interface{}) error

Types

type APICode

type APICode int

type ClientParameters

type ClientParameters struct {
	// Use float64 instead of float32 here to use flag.Float64Var() directly
	QPS   float64
	Burst int
}

ClientParameters contains parameters needed when creating a client

type CustomCode

type CustomCode uint32
const (
	CUSTOM_CODE_TRANSIENT CustomCode = 0
	CUSTOM_CODE_PERMANENT CustomCode = 1
	CUSTOM_CODE_NOT_FOUND CustomCode = 2
	CUSTOM_CODE_GENERIC   CustomCode = 3
)

type CustomError

type CustomError struct {
	// contains filtered or unexported fields
}

func NewCustomError

func NewCustomError(err error, code CustomCode, format string, a ...interface{}) *CustomError

func NewCustomErrorf

func NewCustomErrorf(code CustomCode, format string, a ...interface{}) *CustomError

func (*CustomError) Error

func (e *CustomError) Error() string

type ExecutionClient

type ExecutionClient interface {
	Execution(namespace string) ExecutionInterface
	Compare(old, new interface{}) bool
}

ExecutionClient is used to get a ExecutionInterface in specific namespace scope

func NewExecutionClientOrFatal

func NewExecutionClientOrFatal(execType ExecutionType, initConnectionTimeout time.Duration, clientParams ClientParameters) ExecutionClient

Create an ExecutionClient for the specified ExecutionType

type ExecutionInformer

type ExecutionInformer interface {
	ExecutionInformerEventHandler
	// returns the HasSynced function of the informer
	// HasSynced returns true if the shared informer's store has been
	// informed by at least one full LIST of the authoritative state
	// of the informer's object collection.  This is unrelated to "resync".
	HasSynced() func() bool
	// Use Lister interface to get a specific ExecutionSpec under a namespace
	// second return value indicates if no ExecutionSpec is found
	Get(namespace string, name string) (ExecutionSpec, bool, error)
	// List all ExecutionSpecs that match the label selector
	List(labels *labels.Selector) (ExecutionSpecList, error)
	// Start initializes the informer.
	InformerFactoryStart(stopCh <-chan struct{})
}

func NewExecutionInformerOrFatal

func NewExecutionInformerOrFatal(execType ExecutionType, namespace string,
	initConnectionTimeout time.Duration, clientParams ClientParameters,
) ExecutionInformer

Create an ExecutionInformer for the specified Executiontype

type ExecutionInformerEventHandler

type ExecutionInformerEventHandler interface {
	AddEventHandler(funcs cache.ResourceEventHandler)
}

Mini version of ExecutionSpec informer only contains functions that are needed in current code base ExecutionInformerEventHandler only has AddEventHandler function ExecutionInformer has all functions we need in current code base

type ExecutionInterface

type ExecutionInterface interface {
	// Create an ExecutionSpec
	Create(ctx context.Context, execution ExecutionSpec, opts v1.CreateOptions) (ExecutionSpec, error)
	// Update an ExecutionSpec
	Update(ctx context.Context, execution ExecutionSpec, opts v1.UpdateOptions) (ExecutionSpec, error)
	// Delete an ExecutionSpec
	Delete(ctx context.Context, name string, opts v1.DeleteOptions) error
	// Delete a collection of ExecutionSpec
	DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error
	// Retrieve an ExecutionSpec
	Get(ctx context.Context, name string, opts v1.GetOptions) (ExecutionSpec, error)
	// Retrieve a list of ExecutionSpecs
	List(ctx context.Context, opts v1.ListOptions) (*ExecutionSpecList, error)
	// Path an ExecutionSpec
	Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (ExecutionSpec, error)
}

ExecutionInterface has methods to work with Execution resources.

type ExecutionSpec

type ExecutionSpec interface {
	// ExecutionType
	ExecutionType() ExecutionType

	// SetServiceAccount Set the service account to run the ExecutionSpec.
	SetServiceAccount(serviceAccount string)

	// OverrideParameters overrides some of the parameters.
	OverrideParameters(desiredParams map[string]string)

	// SetAnnotationsToAllTemplatesIfKeyNotExist sets annotations on all templates in a Workflow
	// if the annotation key does not exist
	SetAnnotationsToAllTemplatesIfKeyNotExist(key string, value string)

	SetLabels(key string, value string)
	SetAnnotations(key string, value string)

	ReplaceUID(id string) error
	SetPodMetadataLabels(key string, value string)

	// Get ServiceAccountName
	ServiceAccount() string

	// Get ExecutionStatus which can be used to
	// access status related information
	ExecutionStatus() ExecutionStatus

	// Return a SpecParameters which represents all paramenters
	// key is the parameter's name and value is the SpecParameter which
	// contains default and value
	SpecParameters() SpecParameters

	// Override the existing SpecParameters which means the
	// whole data structure is replaced with new one
	SetSpecParameters(newParams SpecParameters)

	// Create an ExecutionSpec for retry, also return a list of
	// failed pods in the existing ExecutionSpec
	GenerateRetryExecution() (ExecutionSpec, []string, error)

	// Convert to JSON string
	ToStringForStore() string

	// An opaque value that represents the internal version of this object that can
	// be used by clients to determine when objects have changed.
	Version() string

	SetVersion(version string)

	// Name of the ExecutionSpec
	// having Execution prefix to avoid name conflict with underlying data struct
	ExecutionName() string

	// Set Name of the ExecutionSpec
	SetExecutionName(name string)

	// Namespace of the ExecutionSpec
	// having Execution prefix to avoid name conflict with underlying data struct
	ExecutionNamespace() string

	SetExecutionNamespace(namespace string)

	// UID of the ExecutionSpec
	// having Execution prefix to avoid name conflict with underlying data struct
	ExecutionUID() string

	// Get ObjectMeta
	ExecutionObjectMeta() *metav1.ObjectMeta

	// Get TypeMeta
	ExecutionTypeMeta() *metav1.TypeMeta

	// Get ScheduledWorkflowUUID from OwnerReferences
	ScheduledWorkflowUUIDAsStringOrEmpty() string

	// PersistedFinalState whether the workflow final state has being persisted.
	PersistedFinalState() bool

	// If the ExecutionSpec was terminated and not finished yet
	IsTerminating() bool

	// Get schedule time from label in second
	ScheduledAtInSecOr0() int64

	// Copy the ExecutionSpec, remove ExecutionStatus
	// To prevent collisions, clear name, set GenerateName to first 200 runes of previous name.
	GetExecutionSpec() ExecutionSpec

	// Validate the ExecutionSpec
	Validate(lint, ignoreEntrypoint bool) error

	// Decompress ExecutionSpec. In most case, decompress infomation in status
	Decompress() error

	// Check if the ExecutionSpec allows retry, return error if not
	CanRetry() error

	// Convert Spec to JSON string for ScheduleWorkflow
	ToStringForSchedule() string

	// Set Labels for ScheduleWorkflow
	SetCannonicalLabels(name string, nextScheduledEpoch int64, index int64)

	// Set OwnerReferences from a ScheduledWorkflow
	SetOwnerReferences(schedule *swfapi.ScheduledWorkflow)
}

Abastract interface to encapsulate the resource needed by the underlying execution runtime i.e Workflow is for Argo, PipelineRun is for Tekton and etc. Status related information will go to ExecutionStatus interface. TODO: add more methods to make ExecutionSpec fullly represent Workflow. At the beginning

phase, gradually add methods and not break the existing functions. Later on,
other execution runtime support could be added too.

func NewExecutionSpec

func NewExecutionSpec(bytes []byte) (ExecutionSpec, error)

Convert YAML in bytes into ExecutionSpec instance

func NewExecutionSpecFromInterface

func NewExecutionSpecFromInterface(execType ExecutionType, obj interface{}) (ExecutionSpec, error)

Construct a ExecutionSpec based on the data struct. Use this to leverage the existing Workflow creation for Argo. Need to support other runtime when implementation is added.

func NewExecutionSpecJSON

func NewExecutionSpecJSON(execType ExecutionType, bytes []byte) (ExecutionSpec, error)

Convert JSON in bytes into ExecutionSpec instance If the data contains the TypeMeta info, then there is no need to specify the ExecutionType. Explicitly specify it for now

func ScheduleSpecToExecutionSpec

func ScheduleSpecToExecutionSpec(
	execType ExecutionType, wfr *swfapi.WorkflowResource,
) (ExecutionSpec, error)

Unmarshal Spec from ScheduleWorkflow to ExecutionSpec. The returned ExecutionSpec only contains Spec information, and has empty values for the metadata part.

type ExecutionSpecList

type ExecutionSpecList []ExecutionSpec

type ExecutionStatus

type ExecutionStatus interface {
	// FindObjectStoreArtifactKeyOrEmpty loops through all node running statuses and look up the first
	// S3 artifact with the specified nodeID and artifactName. Returns empty if nothing is found.
	FindObjectStoreArtifactKeyOrEmpty(nodeID string, artifactName string) string

	// Get information of current phase, high-level summary of where the Execution is in its lifecycle.
	Condition() common.ExecutionPhase

	// UNIX time the execution finished. If Execution is not finished, return 0
	FinishedAt() int64

	// FinishedAt in Time format
	FinishedAtTime() v1.Time

	// StartedAt in Time format
	StartedAtTime() v1.Time

	// IsInFinalState whether the workflow is in a final state.
	IsInFinalState() bool

	// details about the ExecutionSpec's current condition.
	Message() string

	// This function was in metrics_reporter.go. Moved to here because it
	// accesses the orchestration engine specific data struct. encapsulate the
	// specific data struct and provide a abstract function here.
	CollectionMetrics(retrieveArtifact RetrieveArtifact) ([]*api.RunMetric, []error)

	// does ExecutionStatus contain any finished node or not
	HasMetrics() bool

	// Any node status exists or not
	HasNodes() bool
	// Get node statuses, the NodeStatus data struct could be extended if needed
	NodeStatuses() map[string]NodeStatus
}

Abstract interface to encapsulate the resources of the execution runtime specifically for status information. This interface is mainly to access the status related information

type ExecutionType

type ExecutionType string
const (
	ArgoWorkflow      ExecutionType = "Workflow"
	TektonPipelineRun ExecutionType = "PipelineRun"
	Unknown           ExecutionType = "Unknown"
)

func CurrentExecutionType

func CurrentExecutionType() ExecutionType

Getter of the executionType

type FakeTime

type FakeTime struct {
	// contains filtered or unexported fields
}

func (*FakeTime) Now

func (f *FakeTime) Now() time.Time

type FakeUUIDGenerator

type FakeUUIDGenerator struct {
	// contains filtered or unexported fields
}

FakeUUIDGenerator is a fake implementation of the UUIDGeneratorInterface used for testing. It always generates the UUID and error provided during instantiation.

func (*FakeUUIDGenerator) NewRandom

func (f *FakeUUIDGenerator) NewRandom() (uuid.UUID, error)

type MetricsChan

type MetricsChan chan prometheus.Metric

type NodeStatus

type NodeStatus struct {
	ID          string
	DisplayName string
	State       string
	StartTime   int64
	CreateTime  int64
	FinishTime  int64
	Children    []string
}

Data struct to represent Node status

type ParameterFormatter

type ParameterFormatter struct {
	// contains filtered or unexported fields
}

ParameterFormatter is an object that substitutes specific strings in workflow parameters by information about the workflow execution (time at which the workflow was started, time at which the workflow was scheduled, etc.)

func NewRunParameterFormatter

func NewRunParameterFormatter(runUUID string, runAt int64) *ParameterFormatter

NewRunParameterFormatter returns a new ParameterFormatter to substitute run macros.

func NewSWFParameterFormatter

func NewSWFParameterFormatter(runUUID string, scheduledEpoch int64, nowEpoch int64,
	index int64,
) *ParameterFormatter

NewSWFParameterFormatter returns a new ParameterFormatter to substitute recurring run macros.

func (*ParameterFormatter) Format

func (p *ParameterFormatter) Format(s string) string

Format substitutes special strings in the provided string.

func (*ParameterFormatter) FormatWorkflowParameters

func (p *ParameterFormatter) FormatWorkflowParameters(
	parameters map[string]string,
) map[string]string

type PipelineRun

type PipelineRun struct {
	*pipelineapi.PipelineRun
	// +optional
	Status TektonStatus `json:"status,omitempty"`
}

PipelineRun is a type to help manipulate PipelineRun objects.

func NewPipelineRun

func NewPipelineRun(pr *pipelineapi.PipelineRun) *PipelineRun

NewWorkflow creates a Workflow.

func NewPipelineRunFromBytes

func NewPipelineRunFromBytes(bytes []byte) (*PipelineRun, error)

func NewPipelineRunFromBytesJSON

func NewPipelineRunFromBytesJSON(bytes []byte) (*PipelineRun, error)

func NewPipelineRunFromInterface

func NewPipelineRunFromInterface(obj interface{}) (*PipelineRun, error)

func NewPipelineRunFromScheduleWorkflowSpecBytesJSON

func NewPipelineRunFromScheduleWorkflowSpecBytesJSON(bytes []byte) (*PipelineRun, error)

func (*PipelineRun) CanRetry

func (pr *PipelineRun) CanRetry() error

Always can retry

func (*PipelineRun) CollectionMetrics

func (pr *PipelineRun) CollectionMetrics(retrieveArtifact RetrieveArtifact) ([]*api.RunMetric, []error)

func (*PipelineRun) Condition

func (pr *PipelineRun) Condition() exec.ExecutionPhase

func (*PipelineRun) Decompress

func (pr *PipelineRun) Decompress() error

no compression/decompression in tekton

func (*PipelineRun) ExecutionName

func (pr *PipelineRun) ExecutionName() string

func (*PipelineRun) ExecutionNamespace

func (pr *PipelineRun) ExecutionNamespace() string

func (*PipelineRun) ExecutionObjectMeta

func (pr *PipelineRun) ExecutionObjectMeta() *metav1.ObjectMeta

func (*PipelineRun) ExecutionStatus

func (pr *PipelineRun) ExecutionStatus() ExecutionStatus

func (*PipelineRun) ExecutionType

func (pr *PipelineRun) ExecutionType() ExecutionType

func (*PipelineRun) ExecutionTypeMeta

func (pr *PipelineRun) ExecutionTypeMeta() *metav1.TypeMeta

func (*PipelineRun) ExecutionUID

func (pr *PipelineRun) ExecutionUID() string

func (*PipelineRun) FindObjectStoreArtifactKeyOrEmpty

func (pr *PipelineRun) FindObjectStoreArtifactKeyOrEmpty(nodeID string, artifactName string) string

FindObjectStoreArtifactKeyOrEmpty loops through all node running statuses and look up the first S3 artifact with the specified nodeID and artifactName. Returns empty if nothing is found.

func (*PipelineRun) FindTaskRunByPodName

func (pr *PipelineRun) FindTaskRunByPodName(podName string) (*pipelineapi.PipelineRunTaskRunStatus, string)

FindTaskRunByPodName loops through all workflow task runs and look up by the pod name.

func (*PipelineRun) FinishedAt

func (pr *PipelineRun) FinishedAt() int64

func (*PipelineRun) FinishedAtTime

func (pr *PipelineRun) FinishedAtTime() metav1.Time

func (*PipelineRun) GenerateRetryExecution

func (pr *PipelineRun) GenerateRetryExecution() (ExecutionSpec, []string, error)

func (*PipelineRun) GetExecutionSpec

func (pr *PipelineRun) GetExecutionSpec() ExecutionSpec

func (*PipelineRun) GetWorkflowParametersAsMap

func (w *PipelineRun) GetWorkflowParametersAsMap() map[string]string

func (*PipelineRun) HasMetrics

func (pr *PipelineRun) HasMetrics() bool

func (*PipelineRun) HasNodes

func (pr *PipelineRun) HasNodes() bool

func (*PipelineRun) HasScheduledWorkflowAsParent

func (pr *PipelineRun) HasScheduledWorkflowAsParent() bool

func (*PipelineRun) IsInFinalState

func (pr *PipelineRun) IsInFinalState() bool

IsInFinalState whether the workflow is in a final state.

func (*PipelineRun) IsTerminating

func (pr *PipelineRun) IsTerminating() bool

func (*PipelineRun) IsV2Compatible

func (pr *PipelineRun) IsV2Compatible() bool

IsV2Compatible whether the workflow is a v2 compatible pipeline.

func (*PipelineRun) Message

func (pr *PipelineRun) Message() string

func (*PipelineRun) NodeStatuses

func (pr *PipelineRun) NodeStatuses() map[string]NodeStatus

func (*PipelineRun) OverrideName

func (pr *PipelineRun) OverrideName(name string)

OverrideName sets the name of a Workflow.

func (*PipelineRun) OverrideParameters

func (pr *PipelineRun) OverrideParameters(desiredParams map[string]string)

OverrideParameters overrides some of the parameters of a Workflow.

func (*PipelineRun) PersistedFinalState

func (pr *PipelineRun) PersistedFinalState() bool

PersistedFinalState whether the workflow final state has being persisted.

func (*PipelineRun) ReplaceOrignalPipelineRunName

func (pr *PipelineRun) ReplaceOrignalPipelineRunName(name string) error

func (*PipelineRun) ReplaceUID

func (pr *PipelineRun) ReplaceUID(id string) error

func (*PipelineRun) ScheduledAtInSecOr0

func (pr *PipelineRun) ScheduledAtInSecOr0() int64

func (*PipelineRun) ScheduledWorkflowUUIDAsStringOrEmpty

func (pr *PipelineRun) ScheduledWorkflowUUIDAsStringOrEmpty() string

func (*PipelineRun) ServiceAccount

func (pr *PipelineRun) ServiceAccount() string

func (*PipelineRun) SetAnnotations

func (pr *PipelineRun) SetAnnotations(key string, value string)

func (*PipelineRun) SetAnnotationsToAllTemplatesIfKeyNotExist

func (pr *PipelineRun) SetAnnotationsToAllTemplatesIfKeyNotExist(key string, value string)

SetAnnotationsToAllTemplatesIfKeyNotExist sets annotations on all templates in a Workflow if the annotation key does not exist

func (*PipelineRun) SetCannonicalLabels

func (pr *PipelineRun) SetCannonicalLabels(name string, nextScheduledEpoch int64, index int64)

func (*PipelineRun) SetExecutionName

func (pr *PipelineRun) SetExecutionName(name string)

func (*PipelineRun) SetExecutionNamespace

func (pr *PipelineRun) SetExecutionNamespace(namespace string)

func (*PipelineRun) SetLabels

func (pr *PipelineRun) SetLabels(key string, value string)

func (*PipelineRun) SetLabelsToAllTemplates

func (pr *PipelineRun) SetLabelsToAllTemplates(key string, value string)

SetLabels sets labels on all templates in a Workflow

func (*PipelineRun) SetOwnerReferences

func (pr *PipelineRun) SetOwnerReferences(schedule *swfapi.ScheduledWorkflow)

SetOwnerReferences sets owner references on a Workflow.

func (*PipelineRun) SetPodMetadataLabels

func (pr *PipelineRun) SetPodMetadataLabels(key string, value string)

func (*PipelineRun) SetServiceAccount

func (pr *PipelineRun) SetServiceAccount(serviceAccount string)

SetServiceAccount Set the service account to run the workflow.

func (*PipelineRun) SetSpecParameters

func (pr *PipelineRun) SetSpecParameters(params SpecParameters)

func (*PipelineRun) SetVersion

func (pr *PipelineRun) SetVersion(version string)

func (*PipelineRun) SpecParameters

func (pr *PipelineRun) SpecParameters() SpecParameters

func (*PipelineRun) StartedAtTime

func (pr *PipelineRun) StartedAtTime() metav1.Time

func (*PipelineRun) ToStringForSchedule

func (pr *PipelineRun) ToStringForSchedule() string

func (*PipelineRun) ToStringForStore

func (pr *PipelineRun) ToStringForStore() string

func (*PipelineRun) Validate

func (w *PipelineRun) Validate(lint, ignoreEntrypoint bool) error

func (*PipelineRun) VerifyParameters

func (pr *PipelineRun) VerifyParameters(desiredParams map[string]string) error

func (*PipelineRun) Version

func (pr *PipelineRun) Version() string

type PipelineRunClient

type PipelineRunClient struct {
	// contains filtered or unexported fields
}

implementation of ExecutionClientInterface

func (*PipelineRunClient) Compare

func (prc *PipelineRunClient) Compare(old, new interface{}) bool

func (*PipelineRunClient) Execution

func (prc *PipelineRunClient) Execution(namespace string) ExecutionInterface

type PipelineRunInformer

type PipelineRunInformer struct {
	// contains filtered or unexported fields
}

func (*PipelineRunInformer) AddEventHandler

func (pri *PipelineRunInformer) AddEventHandler(funcs cache.ResourceEventHandler)

func (*PipelineRunInformer) Get

func (pri *PipelineRunInformer) Get(namespace string, name string) (ExecutionSpec, bool, error)

func (*PipelineRunInformer) HasSynced

func (pri *PipelineRunInformer) HasSynced() func() bool

func (*PipelineRunInformer) InformerFactoryStart

func (pri *PipelineRunInformer) InformerFactoryStart(stopCh <-chan struct{})

func (*PipelineRunInformer) List

type PipelineRunInterface

type PipelineRunInterface struct {
	// contains filtered or unexported fields
}

func (*PipelineRunInterface) Create

func (*PipelineRunInterface) Delete

func (pri *PipelineRunInterface) Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error

func (*PipelineRunInterface) DeleteCollection

func (pri *PipelineRunInterface) DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error

func (*PipelineRunInterface) Get

func (*PipelineRunInterface) List

func (*PipelineRunInterface) Patch

func (pri *PipelineRunInterface) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (ExecutionSpec, error)

func (*PipelineRunInterface) Update

type RealTime

type RealTime struct{}

func (*RealTime) Now

func (r *RealTime) Now() time.Time

type RetrieveArtifact

type RetrieveArtifact func(request *api.ReadArtifactRequest) (*api.ReadArtifactResponse, error)

type ScheduledWorkflow

type ScheduledWorkflow struct {
	*swfapi.ScheduledWorkflow
}

ScheduledWorkflow is a type to help manipulate ScheduledWorkflow objects.

func NewScheduledWorkflow

func NewScheduledWorkflow(swf *swfapi.ScheduledWorkflow) *ScheduledWorkflow

NewScheduledWorkflow creates an instance of ScheduledWorkflow.

func (*ScheduledWorkflow) ConditionSummary

func (s *ScheduledWorkflow) ConditionSummary() string

func (*ScheduledWorkflow) CronOrEmpty

func (s *ScheduledWorkflow) CronOrEmpty() string

func (*ScheduledWorkflow) CronScheduleEndTimeInSecOrNull

func (s *ScheduledWorkflow) CronScheduleEndTimeInSecOrNull() *int64

func (*ScheduledWorkflow) CronScheduleStartTimeInSecOrNull

func (s *ScheduledWorkflow) CronScheduleStartTimeInSecOrNull() *int64

func (*ScheduledWorkflow) Get

Get converts this object to a swfapi.ScheduledWorkflow.

func (*ScheduledWorkflow) GetVersion

func (s *ScheduledWorkflow) GetVersion() ScheduledWorkflowType

func (*ScheduledWorkflow) IntervalSecondOr0

func (s *ScheduledWorkflow) IntervalSecondOr0() int64

func (*ScheduledWorkflow) MaxConcurrencyOr0

func (s *ScheduledWorkflow) MaxConcurrencyOr0() int64

func (*ScheduledWorkflow) NoCatchupOrFalse

func (s *ScheduledWorkflow) NoCatchupOrFalse() bool

func (*ScheduledWorkflow) ParametersAsString

func (s *ScheduledWorkflow) ParametersAsString() (string, error)

func (*ScheduledWorkflow) PeriodicScheduleEndTimeInSecOrNull

func (s *ScheduledWorkflow) PeriodicScheduleEndTimeInSecOrNull() *int64

func (*ScheduledWorkflow) PeriodicScheduleStartTimeInSecOrNull

func (s *ScheduledWorkflow) PeriodicScheduleStartTimeInSecOrNull() *int64

func (*ScheduledWorkflow) ToStringForStore

func (s *ScheduledWorkflow) ToStringForStore() string

type ScheduledWorkflowType

type ScheduledWorkflowType string

type SpecParameter

type SpecParameter struct {
	Name string
	// TODO: need to revisit `Default` to see if this is needed
	// https://github.com/kubeflow/pipelines/pull/7766#discussion_r905345651
	Default *string
	Value   *string
}

Represent the value of a Parameter containing Name, Default and Value.

type SpecParameters

type SpecParameters []SpecParameter

Represent the Parameter which is a list of SpecParameters

func UnmarshParametersPipelineRun

func UnmarshParametersPipelineRun(paramsString string) (SpecParameters, error)

func UnmarshParametersWorkflow

func UnmarshParametersWorkflow(paramsString string) (SpecParameters, error)

func UnmarshalParameters

func UnmarshalParameters(execType ExecutionType, paramsString string) (SpecParameters, error)

Unmarshal parameters from JSON encoded string and convert it to SpecParameters

type TektonStatus

type TektonStatus struct {
	*pipelineapi.PipelineRunStatus
	// +optional
	TaskRuns map[string]*pipelineapi.PipelineRunTaskRunStatus `json:"taskRuns,omitempty"`
	// +optional
	Runs map[string]*pipelineapi.PipelineRunRunStatus `json:"runs,omitempty"`
}

type TimeInterface

type TimeInterface interface {
	Now() time.Time
}

func NewFakeTime

func NewFakeTime(now time.Time) TimeInterface

func NewFakeTimeForEpoch

func NewFakeTimeForEpoch() TimeInterface

func NewRealTime

func NewRealTime() TimeInterface

type UUIDGenerator

type UUIDGenerator struct{}

UUIDGenerator is the concrete implementation of the UUIDGeneratorInterface used to generate UUIDs in production deployments.

func NewUUIDGenerator

func NewUUIDGenerator() *UUIDGenerator

func (*UUIDGenerator) NewRandom

func (r *UUIDGenerator) NewRandom() (uuid.UUID, error)

type UUIDGeneratorInterface

type UUIDGeneratorInterface interface {
	NewRandom() (uuid.UUID, error)
}

func NewFakeUUIDGeneratorOrFatal

func NewFakeUUIDGeneratorOrFatal(uuidStringToReturn string, errToReturn error) UUIDGeneratorInterface

NewFakeUUIDGeneratorOrFatal creates a UUIDGenerator that always returns the UUID and error provided as parameters.

type UserError

type UserError struct {
	// contains filtered or unexported fields
}

func NewAlreadyExistError

func NewAlreadyExistError(messageFormat string, a ...interface{}) *UserError

func NewBadRequestError

func NewBadRequestError(err error, externalFormat string, a ...interface{}) *UserError

func NewFailedPreconditionError

func NewFailedPreconditionError(err error, externalFormat string, a ...interface{}) *UserError

func NewInternalServerError

func NewInternalServerError(err error, internalMessageFormat string,
	a ...interface{},
) *UserError

func NewInvalidInputError

func NewInvalidInputError(messageFormat string, a ...interface{}) *UserError

func NewInvalidInputErrorWithDetails

func NewInvalidInputErrorWithDetails(err error, externalMessage string) *UserError

func NewNotFoundError

func NewNotFoundError(err error, externalMessageFormat string,
	a ...interface{},
) *UserError

func NewPermissionDeniedError

func NewPermissionDeniedError(err error, externalFormat string, a ...interface{}) *UserError

func NewResourceNotFoundError

func NewResourceNotFoundError(resourceType string, resourceName string) *UserError

func NewResourcesNotFoundError

func NewResourcesNotFoundError(resourceTypesFormat string, resourceNames ...interface{}) *UserError

func NewUnauthenticatedError

func NewUnauthenticatedError(err error, externalFormat string, a ...interface{}) *UserError

func NewUnavailableServerError

func NewUnavailableServerError(err error, messageFormat string, a ...interface{}) *UserError

func NewUnknownApiVersionError

func NewUnknownApiVersionError(a string, o interface{}) *UserError

func NewUserError

func NewUserError(err error, internalMessage string, externalMessage string) *UserError

func NewUserErrorWithSingleMessage

func NewUserErrorWithSingleMessage(err error, message string) *UserError

func (*UserError) Cause

func (e *UserError) Cause() error

func (*UserError) Error

func (e *UserError) Error() string

func (*UserError) ExternalMessage

func (e *UserError) ExternalMessage() string

func (*UserError) ExternalStatusCode

func (e *UserError) ExternalStatusCode() codes.Code

func (*UserError) GRPCStatus

func (e *UserError) GRPCStatus() *status1.Status

Convert UserError to GRPCStatus. Required by https://pkg.go.dev/google.golang.org/grpc/status#FromError

func (*UserError) Log

func (e *UserError) Log()

func (*UserError) String

func (e *UserError) String() string

func (*UserError) Unwrap

func (e *UserError) Unwrap() error

type Workflow

type Workflow struct {
	*workflowapi.Workflow
}

Workflow is a type to help manipulate Workflow objects.

func NewWorkflow

func NewWorkflow(workflow *workflowapi.Workflow) *Workflow

NewWorkflow creates a Workflow.

func NewWorkflowFromBytes

func NewWorkflowFromBytes(bytes []byte) (*Workflow, error)

func NewWorkflowFromBytesJSON

func NewWorkflowFromBytesJSON(bytes []byte) (*Workflow, error)

func NewWorkflowFromInterface

func NewWorkflowFromInterface(obj interface{}) (*Workflow, error)

func NewWorkflowFromScheduleWorkflowSpecBytesJSON

func NewWorkflowFromScheduleWorkflowSpecBytesJSON(bytes []byte) (*Workflow, error)

func (*Workflow) CanRetry

func (w *Workflow) CanRetry() error

func (*Workflow) CollectionMetrics

func (w *Workflow) CollectionMetrics(retrieveArtifact RetrieveArtifact) ([]*api.RunMetric, []error)

func (*Workflow) Condition

func (w *Workflow) Condition() exec.ExecutionPhase

func (*Workflow) Decompress

func (w *Workflow) Decompress() error

func (*Workflow) ExecutionName

func (w *Workflow) ExecutionName() string

func (*Workflow) ExecutionNamespace

func (w *Workflow) ExecutionNamespace() string

func (*Workflow) ExecutionObjectMeta

func (w *Workflow) ExecutionObjectMeta() *metav1.ObjectMeta

func (*Workflow) ExecutionStatus

func (w *Workflow) ExecutionStatus() ExecutionStatus

ExecutionSpec interface: Get ExecutionStatus which can be used to access status related information

func (*Workflow) ExecutionType

func (w *Workflow) ExecutionType() ExecutionType

Get ExecutionType: ArgoWorkflow

func (*Workflow) ExecutionTypeMeta

func (w *Workflow) ExecutionTypeMeta() *metav1.TypeMeta

func (*Workflow) ExecutionUID

func (w *Workflow) ExecutionUID() string

func (*Workflow) FindObjectStoreArtifactKeyOrEmpty

func (w *Workflow) FindObjectStoreArtifactKeyOrEmpty(nodeName string, artifactName string) string

FindObjectStoreArtifactKeyOrEmpty loops through all node running statuses and look up the first S3 artifact with the specified nodeID and artifactName. Returns empty if nothing is found.

func (*Workflow) FinishedAt

func (w *Workflow) FinishedAt() int64

func (*Workflow) FinishedAtTime

func (w *Workflow) FinishedAtTime() metav1.Time

func (*Workflow) GenerateRetryExecution

func (w *Workflow) GenerateRetryExecution() (ExecutionSpec, []string, error)

func (*Workflow) Get

func (w *Workflow) Get() *workflowapi.Workflow

Get converts this object to a workflowapi.Workflow.

func (*Workflow) GetExecutionSpec

func (w *Workflow) GetExecutionSpec() ExecutionSpec

func (*Workflow) GetWorkflowParametersAsMap

func (w *Workflow) GetWorkflowParametersAsMap() map[string]string

func (*Workflow) HasMetrics

func (w *Workflow) HasMetrics() bool

func (*Workflow) HasNodes

func (w *Workflow) HasNodes() bool

func (*Workflow) HasScheduledWorkflowAsParent

func (w *Workflow) HasScheduledWorkflowAsParent() bool

func (*Workflow) IsInFinalState

func (w *Workflow) IsInFinalState() bool

IsInFinalState whether the workflow is in a final state.

func (*Workflow) IsTerminating

func (w *Workflow) IsTerminating() bool

func (*Workflow) IsV2Compatible

func (w *Workflow) IsV2Compatible() bool

IsV2Compatible whether the workflow is a v2 compatible pipeline.

func (*Workflow) Message

func (w *Workflow) Message() string

func (*Workflow) NodeStatuses

func (w *Workflow) NodeStatuses() map[string]NodeStatus

func (*Workflow) OverrideParameters

func (w *Workflow) OverrideParameters(desiredParams map[string]string)

OverrideParameters overrides some of the parameters of a Workflow.

func (*Workflow) PatchTemplateOutputArtifacts

func (w *Workflow) PatchTemplateOutputArtifacts()

Marking auto-added artifacts as optional. Otherwise most older workflows will start failing after upgrade to Argo 2.3. TODO: Fix the components to explicitly declare the artifacts they really output.

func (*Workflow) PersistedFinalState

func (w *Workflow) PersistedFinalState() bool

PersistedFinalState whether the workflow final state has being persisted.

func (*Workflow) ReplaceUID

func (w *Workflow) ReplaceUID(id string) error

func (*Workflow) ScheduledAtInSecOr0

func (w *Workflow) ScheduledAtInSecOr0() int64

func (*Workflow) ScheduledWorkflowUUIDAsStringOrEmpty

func (w *Workflow) ScheduledWorkflowUUIDAsStringOrEmpty() string

func (*Workflow) ServiceAccount

func (w *Workflow) ServiceAccount() string

func (*Workflow) SetAnnotations

func (w *Workflow) SetAnnotations(key string, value string)

func (*Workflow) SetAnnotationsToAllTemplatesIfKeyNotExist

func (w *Workflow) SetAnnotationsToAllTemplatesIfKeyNotExist(key string, value string)

SetAnnotationsToAllTemplatesIfKeyNotExist sets annotations on all templates in a Workflow if the annotation key does not exist

func (*Workflow) SetCannonicalLabels

func (w *Workflow) SetCannonicalLabels(name string, nextScheduledEpoch int64, index int64)

func (*Workflow) SetExecutionName

func (w *Workflow) SetExecutionName(name string)

OverrideName sets the name of a Workflow.

func (*Workflow) SetExecutionNamespace

func (w *Workflow) SetExecutionNamespace(namespace string)

func (*Workflow) SetLabels

func (w *Workflow) SetLabels(key string, value string)

func (*Workflow) SetLabelsToAllTemplates

func (w *Workflow) SetLabelsToAllTemplates(key string, value string)

SetLabels sets labels on all templates in a Workflow

func (*Workflow) SetOwnerReferences

func (w *Workflow) SetOwnerReferences(schedule *swfapi.ScheduledWorkflow)

SetOwnerReferences sets owner references on a Workflow.

func (*Workflow) SetPodMetadataLabels

func (w *Workflow) SetPodMetadataLabels(key string, value string)

func (*Workflow) SetServiceAccount

func (w *Workflow) SetServiceAccount(serviceAccount string)

SetServiceAccount Set the service account to run the workflow.

func (*Workflow) SetSpecParameters

func (w *Workflow) SetSpecParameters(params SpecParameters)

func (*Workflow) SetVersion

func (w *Workflow) SetVersion(version string)

func (*Workflow) SpecParameters

func (w *Workflow) SpecParameters() SpecParameters

func (*Workflow) StartedAtTime

func (w *Workflow) StartedAtTime() metav1.Time

func (*Workflow) ToStringForSchedule

func (w *Workflow) ToStringForSchedule() string

TODO: merge with ToStringForStore()

func (*Workflow) ToStringForStore

func (w *Workflow) ToStringForStore() string

func (*Workflow) Validate

func (w *Workflow) Validate(lint, ignoreEntrypoint bool) error

func (*Workflow) VerifyParameters

func (w *Workflow) VerifyParameters(desiredParams map[string]string) error

func (*Workflow) Version

func (w *Workflow) Version() string

type WorkflowClient

type WorkflowClient struct {
	// contains filtered or unexported fields
}

implementation of ExecutionClientInterface

func (*WorkflowClient) Compare

func (wc *WorkflowClient) Compare(old, new interface{}) bool

func (*WorkflowClient) Execution

func (wc *WorkflowClient) Execution(namespace string) ExecutionInterface

type WorkflowFormatter

type WorkflowFormatter struct {
	// contains filtered or unexported fields
}

func NewWorkflowFormatter

func NewWorkflowFormatter(uuid UUIDGeneratorInterface, scheduledAtInSec int64,
	nowInSec int64,
) *WorkflowFormatter

func (*WorkflowFormatter) Format

func (p *WorkflowFormatter) Format(workflow *v1alpha1.Workflow) error

type WorkflowInformer

type WorkflowInformer struct {
	// contains filtered or unexported fields
}

func (*WorkflowInformer) AddEventHandler

func (wfi *WorkflowInformer) AddEventHandler(funcs cache.ResourceEventHandler)

func (*WorkflowInformer) Get

func (wfi *WorkflowInformer) Get(namespace string, name string) (ExecutionSpec, bool, error)

func (*WorkflowInformer) HasSynced

func (wfi *WorkflowInformer) HasSynced() func() bool

func (*WorkflowInformer) InformerFactoryStart

func (wfi *WorkflowInformer) InformerFactoryStart(stopCh <-chan struct{})

func (*WorkflowInformer) List

func (wfi *WorkflowInformer) List(labels *labels.Selector) (ExecutionSpecList, error)

type WorkflowInterface

type WorkflowInterface struct {
	// contains filtered or unexported fields
}

func (*WorkflowInterface) Create

func (*WorkflowInterface) Delete

func (wfi *WorkflowInterface) Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error

func (*WorkflowInterface) DeleteCollection

func (wfi *WorkflowInterface) DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error

func (*WorkflowInterface) Get

func (*WorkflowInterface) List

func (*WorkflowInterface) Patch

func (wfi *WorkflowInterface) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (ExecutionSpec, error)

func (*WorkflowInterface) Update

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL