Documentation ¶
Index ¶
- Constants
- func CheckReg(str, pattern string) bool
- func CheckScalarResource(res string) error
- func ConcatenatePVCName(fsID string) string
- func ConcatenatePVName(namespace, fsID string) string
- func GetBindSource(fsID string) string
- func ID(userName, fsName string) string
- func IsEmptyResource(resourceInfo ResourceInfo) bool
- func IsImmutableJobStatus(status JobStatus) bool
- func IsValidFsMetaDriver(metaDriver string) bool
- func ProcessStepCacheByMap(cache *Cache, globalCacheMap map[string]interface{}, ...) error
- func ProcessStepFsMount(fsMountList *[]FsMount, globalFsMountList []interface{}) error
- func RunYaml2Map(runYaml []byte) (map[string]interface{}, error)
- func ValidateResource(resourceInfo ResourceInfo, scalarResourcesType []string) error
- func ValidateResourceItem(res string) error
- func ValidateScalarResourceInfo(scalarResources ScalarResourcesType, scalarResourcesType []string) error
- type ActionType
- type Artifacts
- type Cache
- type ClientOptions
- type Cluster
- type Component
- type ComponentView
- type Conf
- func (c *Conf) Framework() Framework
- func (c *Conf) GetAllFileSystem() []FileSystem
- func (c *Conf) GetAnnotations() map[string]string
- func (c *Conf) GetArgs() []string
- func (c *Conf) GetClusterID() string
- func (c *Conf) GetCommand() string
- func (c *Conf) GetEnv() map[string]string
- func (c *Conf) GetEnvSubset(prefix string) map[string]string
- func (c *Conf) GetEnvValue(key string) string
- func (c *Conf) GetExtraFS() []FileSystem
- func (c *Conf) GetFS() string
- func (c *Conf) GetFileSystem() FileSystem
- func (c *Conf) GetFlavour() string
- func (c *Conf) GetImage() string
- func (c *Conf) GetJobExecutorReplicas() string
- func (c *Conf) GetJobMode() string
- func (c *Conf) GetJobReplicas() string
- func (c *Conf) GetLabels() map[string]string
- func (c *Conf) GetName() string
- func (c *Conf) GetNamespace() string
- func (c *Conf) GetPSCommand() string
- func (c *Conf) GetPSFlavour() string
- func (c *Conf) GetPSReplicas() string
- func (c *Conf) GetPriority() string
- func (c *Conf) GetQueueID() string
- func (c *Conf) GetQueueName() string
- func (c *Conf) GetRestartPolicy() string
- func (c *Conf) GetUserName() string
- func (c *Conf) GetWorkerCommand() string
- func (c *Conf) GetWorkerFlavour() string
- func (c *Conf) GetWorkerReplicas() string
- func (c *Conf) GetYamlPath() string
- func (c *Conf) SetAnnotations(k, v string)
- func (c *Conf) SetClusterID(id string)
- func (c *Conf) SetEnv(name, value string)
- func (c *Conf) SetFS(fsID string)
- func (c *Conf) SetFlavour(flavourKey string)
- func (c *Conf) SetLabels(k, v string)
- func (c *Conf) SetNamespace(ns string)
- func (c *Conf) SetPSFlavour(flavourKey string)
- func (c *Conf) SetPriority(pc string)
- func (c *Conf) SetQueueID(id string)
- func (c *Conf) SetQueueName(queueName string)
- func (c *Conf) SetUserName(userName string)
- func (c *Conf) SetWorkerFlavour(flavourKey string)
- func (c *Conf) Type() JobType
- type DagView
- func (d DagView) GetComponentName() string
- func (d DagView) GetDeps() string
- func (d DagView) GetEndTime() string
- func (d DagView) GetMsg() string
- func (d DagView) GetName() string
- func (d DagView) GetParentDagID() string
- func (d DagView) GetSeq() int
- func (d DagView) GetStartTime() string
- func (d DagView) GetStatus() JobStatus
- func (d *DagView) SetDeps(deps string)
- type FailureOptions
- type FileSystem
- type Flavour
- type Framework
- type FrameworkVersion
- type FsMount
- type FsOptions
- type FsScope
- type JobLogInfo
- type JobLogRequest
- type JobStatus
- type JobType
- type JobView
- func (j JobView) GetComponentName() string
- func (j JobView) GetDeps() string
- func (j JobView) GetEndTime() string
- func (j JobView) GetMsg() string
- func (j JobView) GetName() string
- func (j JobView) GetParentDagID() string
- func (j JobView) GetSeq() int
- func (j JobView) GetStartTime() string
- func (j JobView) GetStatus() JobStatus
- func (j *JobView) SetDeps(deps string)
- type LogInfo
- type LogRunArtifactRequest
- type LogRunCacheRequest
- type Member
- type MemberRole
- type NodeQuotaInfo
- type PFJobConf
- type Parser
- func (p *Parser) IsDag(comp map[string]interface{}) bool
- func (p *Parser) ParseCache(cacheMap map[string]interface{}, cache *Cache) error
- func (p *Parser) ParseComponents(entryPoints map[string]interface{}) (map[string]Component, error)
- func (p *Parser) ParseDag(params map[string]interface{}, dagComp *WorkflowSourceDag) error
- func (p *Parser) ParseFsMount(fsMap map[string]interface{}, fs *FsMount) error
- func (p *Parser) ParseFsOptions(fsMap map[string]interface{}, fs *FsOptions) error
- func (p *Parser) ParseFsScope(fsMap map[string]interface{}, fs *FsScope) error
- func (p *Parser) ParseStep(params map[string]interface{}, step *WorkflowSourceStep) error
- func (p *Parser) ParseWorkflowSource(bodyMap map[string]interface{}, wfs *WorkflowSource) error
- func (p *Parser) TransJsonMap2Yaml(jsonMap map[string]interface{}) error
- type PostProcessView
- type QuotaSummary
- type Reference
- type ResourceInfo
- type ResourceName
- type RunOptions
- type RuntimeView
- type ScalarResourcesType
- type TaskLogInfo
- type TaskStatus
- type WorkflowSource
- func (wfs *WorkflowSource) GetComponentByFullName(fullName string) (Component, error)
- func (wfs *WorkflowSource) GetCompsMapAndRelName(components map[string]Component, absoluteName string) (map[string]Component, string, bool)
- func (wfs *WorkflowSource) GetDisabled() []string
- func (wfs *WorkflowSource) GetFsMounts() ([]FsMount, error)
- func (wfs *WorkflowSource) IsDisabled(componentName string) (bool, error)
- func (wfs *WorkflowSource) ProcessRuntimeComponents(components map[string]Component, componentType string, ...) error
- func (wfs *WorkflowSource) TransToRunYamlRaw() (runYamlRaw string, err error)
- func (wfs *WorkflowSource) UnmarshalJSON(data []byte) error
- type WorkflowSourceDag
- func (d *WorkflowSourceDag) DeepCopy() Component
- func (d *WorkflowSourceDag) GetArtifactPath(artName string) (string, error)
- func (d *WorkflowSourceDag) GetArtifacts() Artifacts
- func (d *WorkflowSourceDag) GetCondition() string
- func (d *WorkflowSourceDag) GetDeps() []string
- func (d *WorkflowSourceDag) GetInputArtifactPath(artName string) (string, error)
- func (d *WorkflowSourceDag) GetLoopArgument() interface{}
- func (d *WorkflowSourceDag) GetLoopArgumentLength() int
- func (d *WorkflowSourceDag) GetName() string
- func (d *WorkflowSourceDag) GetOutputArtifactPath(artName string) (string, error)
- func (d *WorkflowSourceDag) GetParameterValue(paramName string) (interface{}, error)
- func (d *WorkflowSourceDag) GetParameters() map[string]interface{}
- func (d *WorkflowSourceDag) GetSubComponet(subComponentName string) (Component, bool)
- func (d *WorkflowSourceDag) GetType() string
- func (d *WorkflowSourceDag) InitInputArtifacts()
- func (d *WorkflowSourceDag) InitOutputArtifacts()
- func (d *WorkflowSourceDag) InitParameters()
- func (d *WorkflowSourceDag) UpdateCondition(condition string)
- func (d *WorkflowSourceDag) UpdateDeps(deps string)
- func (d *WorkflowSourceDag) UpdateLoopArguemt(loopArgument interface{})
- func (d *WorkflowSourceDag) UpdateName(name string)
- type WorkflowSourceStep
- func (s *WorkflowSourceStep) DeepCopy() Component
- func (s *WorkflowSourceStep) GetArtifactPath(artName string) (string, error)
- func (s *WorkflowSourceStep) GetArtifacts() Artifacts
- func (s *WorkflowSourceStep) GetCondition() string
- func (s *WorkflowSourceStep) GetDeps() []string
- func (s *WorkflowSourceStep) GetInputArtifactPath(artName string) (string, error)
- func (s *WorkflowSourceStep) GetLoopArgument() interface{}
- func (s *WorkflowSourceStep) GetLoopArgumentLength() int
- func (s *WorkflowSourceStep) GetName() string
- func (s *WorkflowSourceStep) GetOutputArtifactPath(artName string) (string, error)
- func (s *WorkflowSourceStep) GetParameterValue(paramName string) (interface{}, error)
- func (s *WorkflowSourceStep) GetParameters() map[string]interface{}
- func (s *WorkflowSourceStep) GetType() string
- func (s *WorkflowSourceStep) InitInputArtifacts()
- func (s *WorkflowSourceStep) InitOutputArtifacts()
- func (s *WorkflowSourceStep) InitParameters()
- func (s *WorkflowSourceStep) UpdateCondition(condition string)
- func (s *WorkflowSourceStep) UpdateDeps(deps string)
- func (s *WorkflowSourceStep) UpdateLoopArguemt(loopArgument interface{})
- func (s *WorkflowSourceStep) UpdateName(name string)
Constants ¶
const ( LocalType = "Local" KubernetesType = "Kubernetes" )
const ( PFSTypeLocal = "local" PVNameTemplate = "pfs-$(pfs.fs.id)-$(namespace)-pv" PVCNameTemplate = "pfs-$(pfs.fs.id)-pvc" FSIDFormat = "$(pfs.fs.id)" NameSpaceFormat = "$(namespace)" PFSID = "pfs.fs.id" PFSInfo = "pfs.fs.info" PFSCache = "pfs.fs.cache" PFSServer = "pfs.server" PFSClusterID = "pfs.cluster.id" FusePodMntDir = "/home/paddleflow/mnt" FsMetaMemory = "mem" FsMetaDisk = "disk" FuseKeyFsInfo = "fs-info" LabelKeyFsID = "fsID" LabelKeyCacheID = "cacheID" LabelKeyNodeName = "nodename" LabelKeyUsedSize = "usedSize" AnnotationKeyCacheDir = "cacheDir" AnnotationKeyMTime = "modifiedTime" AnnotationKeyMountPrefix = "mount-" EnvKeyMountPodName = "POD_NAME" EnvKeyNamespace = "NAMESPACE" MountPodNamespace = "paddleflow" )
const ( EnvJobType = "PF_JOB_TYPE" EnvJobQueueName = "PF_JOB_QUEUE_NAME" EnvJobQueueID = "PF_JOB_QUEUE_ID" EnvJobClusterName = "PF_JOB_CLUSTER_NAME" EnvJobClusterID = "PF_JOB_CLUSTER_ID" EnvJobNamespace = "PF_JOB_NAMESPACE" EnvJobUserName = "PF_USER_NAME" EnvJobFsID = "PF_FS_ID" EnvJobPVCName = "PF_JOB_PVC_NAME" EnvJobPriority = "PF_JOB_PRIORITY" EnvJobMode = "PF_JOB_MODE" EnvJobFramework = "PF_JOB_FRAMEWORK" // EnvJobYamlPath Additional configuration for a specific job EnvJobYamlPath = "PF_JOB_YAML_PATH" EnvIsCustomYaml = "PF_IS_CUSTOM_YAML" // EnvJobWorkDir The working directory of the job, `null` means command without a working directory EnvJobWorkDir = "PF_WORK_DIR" EnvMountPath = "PF_MOUNT_PATH" EnvJobRestartPolicy = "PF_JOB_RESTART_POLICY" // EnvJobModePS env EnvJobModePS = "PS" EnvJobPSPort = "PF_JOB_PS_PORT" EnvJobPServerReplicas = "PF_JOB_PSERVER_REPLICAS" EnvJobPServerFlavour = "PF_JOB_PSERVER_FLAVOUR" EnvJobPServerCommand = "PF_JOB_PSERVER_COMMAND" EnvJobWorkerReplicas = "PF_JOB_WORKER_REPLICAS" EnvJobWorkerFlavour = "PF_JOB_WORKER_FLAVOUR" EnvJobWorkerCommand = "PF_JOB_WORKER_COMMAND" // EnvJobModeCollective env EnvJobModeCollective = "Collective" EnvJobReplicas = "PF_JOB_REPLICAS" EnvJobFlavour = "PF_JOB_FLAVOUR" // EnvJobModePod env reuse EnvJobReplicas and EnvJobFlavour EnvJobModePod = "Pod" // spark job env EnvJobSparkMainFile = "PF_JOB_SPARK_MAIN_FILE" EnvJobSparkMainClass = "PF_JOB_SPARK_MAIN_CLASS" EnvJobSparkArguments = "PF_JOB_SPARK_ARGUMENTS" EnvJobDriverFlavour = "PF_JOB_DRIVER_FLAVOUR" EnvJobExecutorReplicas = "PF_JOB_EXECUTOR_REPLICAS" EnvJobExecutorFlavour = "PF_JOB_EXECUTOR_FLAVOUR" // TODO move to framework TypeVcJob JobType = "vcjob" TypeSparkJob JobType = "spark" TypePaddleJob JobType = "paddlejob" TypePodJob JobType = "pod" StatusJobInit JobStatus = "init" StatusJobPending JobStatus = "pending" StatusJobRunning JobStatus = "running" StatusJobFailed JobStatus = "failed" StatusJobSucceeded JobStatus = "succeeded" StatusJobTerminating JobStatus = "terminating" StatusJobTerminated JobStatus = "terminated" StatusJobCancelled JobStatus = "cancelled" StatusJobSkipped JobStatus = "skipped" StatusTaskPending TaskStatus = "pending" StatusTaskRunning TaskStatus = "running" StatusTaskSucceeded TaskStatus = "succeeded" StatusTaskFailed TaskStatus = "failed" RoleMaster MemberRole = "master" RoleWorker MemberRole = "worker" RoleDriver MemberRole = "driver" RoleExecutor MemberRole = "executor" RolePServer MemberRole = "pserver" RolePWorker MemberRole = "pworker" TypeSingle JobType = "single" TypeDistributed JobType = "distributed" TypeWorkflow JobType = "workflow" FrameworkSpark Framework = "spark" FrameworkMPI Framework = "mpi" FrameworkTF Framework = "tensorflow" FrameworkPytorch Framework = "pytorch" FrameworkPaddle Framework = "paddle" FrameworkMXNet Framework = "mxnet" FrameworkRay Framework = "ray" FrameworkStandalone Framework = "standalone" ListenerTypeJob = "job" ListenerTypeTask = "task" ListenerTypeQueue = "queue" // job priority EnvJobVeryLowPriority = "VERY_LOW" EnvJobLowPriority = "LOW" EnvJobNormalPriority = "NORMAL" EnvJobHighPriority = "HIGH" EnvJobVeryHighPriority = "VERY_HIGH" // priority class PriorityClassVeryLow = "very-low" PriorityClassLow = "low" PriorityClassNormal = "normal" PriorityClassHigh = "high" PriorityClassVeryHigh = "very-high" JobOwnerLabel = "owner" JobOwnerValue = "paddleflow" JobIDLabel = "paddleflow-job-id" JobTTLSeconds = "padleflow/job-ttl-seconds" JobLabelFramework = "paddleflow-job-framework" VolcanoJobNameLabel = "volcano.sh/job-name" QueueLabelKey = "volcano.sh/queue-name" SparkAPPJobNameLabel = "sparkoperator.k8s.io/app-name" JobPrefix = "job" DefaultSchedulerName = "volcano" DefaultFSMountPath = "/home/paddleflow/storage/mnt" // EnvPaddleParaJob defines env for Paddle Para Job EnvPaddleParaJob = "PF_PADDLE_PARA_JOB" EnvPaddleParaPriority = "PF_PADDLE_PARA_PRIORITY" EnvPaddleParaConfigHostFile = "PF_PADDLE_PARA_CONFIG_FILE" // PaddleParaVolumeName defines config for Paddle Para Pod PaddleParaVolumeName = "paddle-para-conf-volume" PaddleParaAnnotationKeyJobName = "paddle-para/job-name" PaddleParaAnnotationKeyPriority = "paddle-para/priority" PaddleParaEnvJobName = "FLAGS_job_name" PaddleParaEnvGPUConfigFile = "GPU_CONFIG_FILE" PaddleParaGPUConfigFilePath = "/opt/paddle/para/gpu_config.json" // RayJob keywords EnvRayJobEntryPoint = "RAY_JOB_ENTRY_POINT" EnvRayJobRuntimeEnv = "RAY_JOB_RUNTIME_ENV" EnvRayJobEnableAutoScaling = "RAY_JOB_ENABLE_AUTOSCALING" EnvRayJobAutoScalingMode = "RAY_JOB_AUTOSCALING_MODE" EnvRayJobAutoScalingTimeout = "RAY_JOB_AUTOSCALING_IDLE_TIMEOUT" EnvRayJobHeaderFlavour = "RAY_JOB_HEADER_FLAVOUR" EnvRayJobHeaderImage = "RAY_JOB_HEADER_IMAGE" EnvRayJobHeaderPriority = "RAY_JOB_HEADER_PRIORITY" EnvRayJobHeaderPreStop = "RAY_JOB_HEADER_PRE_STOP" EnvRayJobHeaderStartParamsPrefix = "RAY_JOB_HEADER_START_PARAMS_" EnvRayJobWorkerGroupName = "RAY_JOB_WORKER_GROUP_NAME" EnvRayJobWorkerFlavour = "RAY_JOB_WORKER_FLAVOUR" EnvRayJobWorkerImage = "RAY_JOB_WORKER_IMAGE" EnvRayJobWorkerPriority = "RAY_JOB_WORKER_PRIORITY" EnvRayJobWorkerReplicas = "RAY_JOB_WORKER_REPLICAS" EnvRayJobWorkerMinReplicas = "RAY_JOB_WORKER_MIN_REPLICAS" EnvRayJobWorkerMaxReplicas = "RAY_JOB_WORKER_MAX_REPLICAS" EnvRayJobWorkerStartParamsPrefix = "RAY_JOB_WORKER_START_PARAMS_" )
const ( StatusQueueCreating = "creating" StatusQueueOpen = "open" StatusQueueUpdating = "updating" StatusQueueClosing = "closing" StatusQueueClosed = "closed" TypeElasticQuota = "elasticQuota" TypeVolcanoCapabilityQuota = "volcanoCapabilityQuota" )
const ( ArtifactTypeInput = "input" ArtifactTypeOutput = "output" EntryPointsStr = "entry_points" CacheAttributeEnable = "enable" CacheAttributeMaxExpiredTime = "max_expired_time" CacheAttributeFsScope = "fs_scope" FailureStrategyFailFast = "fail_fast" FailureStrategyContinue = "continue" EnvDockerEnv = "dockerEnv" FsPrefix = "fs-" CompTypeComponents = "components" CompTypeEntryPoints = "entryPoints" CompTypePostProcess = "postProcess" )
Variables ¶
This section is empty.
Functions ¶
func CheckScalarResource ¶
func ConcatenatePVCName ¶ added in v0.14.3
func ConcatenatePVName ¶ added in v0.14.3
func GetBindSource ¶
func IsEmptyResource ¶
func IsEmptyResource(resourceInfo ResourceInfo) bool
IsEmptyResource return true when cpu or mem is nil
func IsImmutableJobStatus ¶
func IsValidFsMetaDriver ¶
func ProcessStepCacheByMap ¶ added in v0.14.3
func ProcessStepFsMount ¶ added in v0.14.3
func RunYaml2Map ¶ added in v0.14.3
将yaml解析为map
func ValidateResource ¶ added in v0.14.3
func ValidateResource(resourceInfo ResourceInfo, scalarResourcesType []string) error
ValidateResource validate resource info
func ValidateResourceItem ¶ added in v0.14.3
ValidateResourceItem check resource for cpu or memory
func ValidateScalarResourceInfo ¶
func ValidateScalarResourceInfo(scalarResources ScalarResourcesType, scalarResourcesType []string) error
ValidateScalarResourceInfo validate scalar resource info
Types ¶
type ActionType ¶
type ActionType string
const ( Create ActionType = "create" Update ActionType = "update" Delete ActionType = "delete" Terminate ActionType = "terminate" )
type Artifacts ¶
type Artifacts struct { Input map[string]string `yaml:"input" json:"input"` Output map[string]string `yaml:"output" json:"output"` }
func (*Artifacts) ValidateOutputMapByList ¶
type ClientOptions ¶
ClientOptions used to build rest config.
type Cluster ¶
type Cluster struct { ID string Name string Type string // ClientOpt defines client config for cluster ClientOpt ClientOptions }
type Component ¶ added in v0.14.3
type Component interface { GetDeps() []string GetArtifacts() Artifacts GetArtifactPath(artName string) (string, error) GetInputArtifactPath(artName string) (string, error) GetOutputArtifactPath(artName string) (string, error) GetParameters() map[string]interface{} GetParameterValue(paramName string) (interface{}, error) GetCondition() string GetLoopArgument() interface{} GetLoopArgumentLength() int GetType() string GetName() string // 下面几个Update 函数在进行模板替换的时候会用到 UpdateCondition(string) UpdateLoopArguemt(interface{}) UpdateName(name string) UpdateDeps(deps string) InitInputArtifacts() InitOutputArtifacts() InitParameters() // 用于 deepCopy, 避免复用时出现问题 DeepCopy() Component }
Component包括Dag和Step,有Struct WorkflowSourceStep 和 WorkflowSourceDag实现了该接口
type ComponentView ¶ added in v0.14.3
type Conf ¶
type Conf struct { Name string `json:"name"` // 存储资源 FileSystem FileSystem `json:"fs,omitempty"` ExtraFileSystem []FileSystem `json:"extraFS,omitempty"` // 计算资源 Flavour Flavour `json:"flavour,omitempty"` Priority string `json:"priority"` ClusterID string `json:"clusterID"` QueueID string `json:"queueID"` QueueName string `json:"queueName,omitempty"` // 运行时需要的参数 Labels map[string]string `json:"labels"` Annotations map[string]string `json:"annotations"` Env map[string]string `json:"env,omitempty"` Command string `json:"command,omitempty"` Image string `json:"image"` Port int `json:"port,omitempty"` Args []string `json:"args,omitempty"` }
func (*Conf) GetAllFileSystem ¶ added in v0.14.3
func (c *Conf) GetAllFileSystem() []FileSystem
GetAllFileSystem combine FileSystem and ExtraFileSystem to a slice
func (*Conf) GetAnnotations ¶ added in v0.14.5
func (*Conf) GetClusterID ¶
func (*Conf) GetCommand ¶
func (*Conf) GetEnvSubset ¶ added in v0.14.5
func (*Conf) GetEnvValue ¶ added in v0.14.5
func (*Conf) GetExtraFS ¶ added in v0.14.3
func (c *Conf) GetExtraFS() []FileSystem
func (*Conf) GetFileSystem ¶ added in v0.14.3
func (c *Conf) GetFileSystem() FileSystem
func (*Conf) GetFlavour ¶
func (*Conf) GetJobExecutorReplicas ¶
func (*Conf) GetJobMode ¶
func (*Conf) GetJobReplicas ¶
func (*Conf) GetNamespace ¶
func (*Conf) GetPSCommand ¶
func (*Conf) GetPSFlavour ¶
func (*Conf) GetPSReplicas ¶
func (*Conf) GetPriority ¶
func (*Conf) GetQueueID ¶
func (*Conf) GetQueueName ¶
func (*Conf) GetRestartPolicy ¶ added in v0.14.5
func (*Conf) GetUserName ¶
func (*Conf) GetWorkerCommand ¶
func (*Conf) GetWorkerFlavour ¶
func (*Conf) GetWorkerReplicas ¶
func (*Conf) GetYamlPath ¶
func (*Conf) SetAnnotations ¶
func (*Conf) SetClusterID ¶
func (*Conf) SetFlavour ¶
func (*Conf) SetNamespace ¶
func (*Conf) SetPSFlavour ¶
func (*Conf) SetPriority ¶
func (*Conf) SetQueueID ¶
func (*Conf) SetQueueName ¶
SetQueueName set queue name
func (*Conf) SetUserName ¶
func (*Conf) SetWorkerFlavour ¶
type DagView ¶ added in v0.14.3
type DagView struct { PK int64 `json:"-"` DagID string `json:"id"` Name string `json:"name"` Type string `json:"type"` DagName string `json:"dagName"` ParentDagID string `json:"parentDagID"` LoopSeq int `json:"-"` Deps string `json:"deps"` Parameters map[string]string `json:"parameters"` Artifacts Artifacts `json:"artifacts"` StartTime string `json:"startTime"` EndTime string `json:"endTime"` Status JobStatus `json:"status"` Message string `json:"message"` EntryPoints map[string][]ComponentView `json:"entryPoints"` }
func (DagView) GetComponentName ¶ added in v0.14.3
func (DagView) GetEndTime ¶ added in v0.14.5
func (DagView) GetParentDagID ¶ added in v0.14.3
func (DagView) GetStartTime ¶ added in v0.14.5
type FailureOptions ¶
type FailureOptions struct {
Strategy string `yaml:"strategy" json:"strategy"`
}
type FileSystem ¶
type FileSystem struct { ID string `json:"id,omitempty"` Name string `json:"name"` Type string `json:"type"` HostPath string `json:"hostPath,omitempty"` MountPath string `json:"mountPath,omitempty"` SubPath string `json:"subPath,omitempty"` ReadOnly bool `json:"readOnly,omitempty"` }
FileSystem indicate PaddleFlow
type Flavour ¶
type Flavour struct { ResourceInfo `yaml:",inline"` Name string `json:"name" yaml:"name"` }
Flavour is a set of resources that can be used to run a job.
type FrameworkVersion ¶ added in v0.14.5
type FrameworkVersion struct { Framework string `json:"framework"` APIVersion string `json:"apiVersion"` }
func NewFrameworkVersion ¶ added in v0.14.5
func NewFrameworkVersion(framework, apiVersion string) FrameworkVersion
func (*FrameworkVersion) String ¶ added in v0.14.5
func (f *FrameworkVersion) String() string
type JobLogInfo ¶
type JobLogInfo struct { JobID string `json:"jobID"` TaskList []TaskLogInfo `json:"taskList"` }
type JobLogRequest ¶
type JobView ¶
type JobView struct { PK int64 `json:"-"` JobID string `json:"jobID"` Name string `json:"name"` Type string `json:"type"` StepName string `json:"stepName"` ParentDagID string `json:"parentDagID"` LoopSeq int `json:"-"` Command string `json:"command"` Parameters map[string]string `json:"parameters"` Env map[string]string `json:"env"` ExtraFS []FsMount `json:"extraFS"` StartTime string `json:"startTime"` EndTime string `json:"endTime"` Status JobStatus `json:"status"` Deps string `json:"deps"` DockerEnv string `json:"dockerEnv"` Artifacts Artifacts `json:"artifacts"` Cache Cache `json:"cache"` JobMessage string `json:"jobMessage"` CacheRunID string `json:"cacheRunID"` CacheJobID string `json:"cacheJobID"` }
JobView is view of job info responded to user, while Job is for pipeline and job engine to process
func (JobView) GetComponentName ¶ added in v0.14.3
func (JobView) GetEndTime ¶ added in v0.14.5
func (JobView) GetParentDagID ¶ added in v0.14.3
func (JobView) GetStartTime ¶ added in v0.14.5
type LogRunArtifactRequest ¶
type LogRunArtifactRequest struct { Md5 string `json:"md5"` RunID string `json:"runID"` FsID string `json:"fsID"` FsName string `json:"fsname"` UserName string `json:"username"` ArtifactPath string `json:"artifactPath"` Step string `json:"step"` JobID string `json:"jobID"` Type string `json:"type"` ArtifactName string `json:"artifactName"` Meta string `json:"meta"` }
type LogRunCacheRequest ¶
type LogRunCacheRequest struct { FirstFp string `json:"firstFp"` SecondFp string `json:"secondFp"` Source string `json:"source"` RunID string `json:"runID"` Step string `json:"step"` JobID string `json:"jobID"` FsID string `json:"fsID"` FsName string `json:"fsname"` UserName string `json:"username"` ExpiredTime string `json:"expiredTime"` Strategy string `json:"strategy"` }
type Member ¶ added in v0.14.5
type Member struct { ID string `json:"id"` Replicas int `json:"replicas"` Role MemberRole `json:"role"` Conf `json:",inline"` }
type MemberRole ¶
type MemberRole string
type NodeQuotaInfo ¶
type PFJobConf ¶
type PFJobConf interface { GetName() string GetEnv() map[string]string GetEnvValue(key string) string GetEnvSubset(prefix string) map[string]string GetCommand() string GetImage() string GetFileSystem() FileSystem GetExtraFS() []FileSystem GetArgs() []string GetPriority() string SetPriority(string) GetQueueName() string GetQueueID() string GetClusterID() string GetUserName() string // Deprecated GetFS() string SetFS(string) GetYamlPath() string GetNamespace() string GetJobMode() string GetFlavour() string GetPSFlavour() string GetWorkerFlavour() string SetQueueID(string) SetClusterID(string) SetNamespace(string) SetEnv(string, string) SetLabels(string, string) SetAnnotations(string, string) Type() JobType Framework() Framework }
type Parser ¶ added in v0.14.3
type Parser struct { }
func (*Parser) ParseCache ¶ added in v0.14.3
func (*Parser) ParseComponents ¶ added in v0.14.3
func (*Parser) ParseDag ¶ added in v0.14.3
func (p *Parser) ParseDag(params map[string]interface{}, dagComp *WorkflowSourceDag) error
该函数用于给生成给WorkflowSourceDag的各个字段赋值,但不会进行默认值填充,不会进行全局参数对局部参数的替换
func (*Parser) ParseFsMount ¶ added in v0.14.3
func (*Parser) ParseFsOptions ¶ added in v0.14.3
func (*Parser) ParseFsScope ¶ added in v0.14.3
func (*Parser) ParseStep ¶ added in v0.14.3
func (p *Parser) ParseStep(params map[string]interface{}, step *WorkflowSourceStep) error
func (*Parser) ParseWorkflowSource ¶ added in v0.14.3
func (p *Parser) ParseWorkflowSource(bodyMap map[string]interface{}, wfs *WorkflowSource) error
该函数将请求体解析成WorkflowSource, 该函数未完成全局替换操作
func (*Parser) TransJsonMap2Yaml ¶ added in v0.14.3
type PostProcessView ¶
type QuotaSummary ¶
type Reference ¶ added in v0.14.3
type Reference struct {
Component string `yaml:"component" json:"component"`
}
type ResourceInfo ¶
type ResourceInfo struct { CPU string `json:"cpu" yaml:"cpu"` Mem string `json:"mem" yaml:"mem"` ScalarResources ScalarResourcesType `json:"scalarResources,omitempty" yaml:"scalarResources,omitempty"` }
ResourceInfo is a struct that contains the information of a resource.
func (ResourceInfo) ToMap ¶ added in v0.14.3
func (r ResourceInfo) ToMap() map[string]string
type ResourceName ¶
type ResourceName string
ResourceName is the name identifying various resources in a ResourceList.
type RunOptions ¶ added in v0.14.3
type RuntimeView ¶
type RuntimeView map[string][]ComponentView
RuntimeView is view of run responded to user, while workflowRuntime is for pipeline engine to process
func (*RuntimeView) UnmarshalJSON ¶ added in v0.14.5
func (rv *RuntimeView) UnmarshalJSON(data []byte) error
type ScalarResourcesType ¶
type ScalarResourcesType map[ResourceName]string
ScalarResourcesType is the type of scalar resources
type TaskLogInfo ¶
type TaskStatus ¶
type TaskStatus string
type WorkflowSource ¶
type WorkflowSource struct { Name string `yaml:"name" json:"name"` DockerEnv string `yaml:"docker_env" json:"dockerEnv"` EntryPoints WorkflowSourceDag `yaml:"entry_points" json:"entryPoints"` Components map[string]Component `yaml:"components" json:"components"` Cache Cache `yaml:"cache" json:"cache"` Parallelism int `yaml:"parallelism" json:"parallelism"` Disabled string `yaml:"disabled" json:"disabled"` FailureOptions FailureOptions `yaml:"failure_options" json:"failureOptions"` PostProcess map[string]*WorkflowSourceStep `yaml:"post_process" json:"postProcess"` FsOptions FsOptions `yaml:"fs_options" json:"fsOptions"` }
func GetWorkflowSource ¶ added in v0.14.3
func GetWorkflowSource(runYaml []byte) (WorkflowSource, error)
该函数除了将yaml解析为wfs,还进行了全局参数替换操作
func GetWorkflowSourceByMap ¶ added in v0.14.3
func GetWorkflowSourceByMap(yamlMap map[string]interface{}) (WorkflowSource, error)
由Map解析得到一个Wfs,该Map中的key需要是下划线格式
func (*WorkflowSource) GetComponentByFullName ¶ added in v0.14.3
func (wfs *WorkflowSource) GetComponentByFullName(fullName string) (Component, error)
func (*WorkflowSource) GetCompsMapAndRelName ¶ added in v0.14.3
func (wfs *WorkflowSource) GetCompsMapAndRelName(components map[string]Component, absoluteName string) (map[string]Component, string, bool)
递归的检查Absolute Name对应的Component是否存在,并返回该Comp的所有同级别节点,和它的Relative Name
func (*WorkflowSource) GetDisabled ¶
func (wfs *WorkflowSource) GetDisabled() []string
func (*WorkflowSource) GetFsMounts ¶ added in v0.14.3
func (wfs *WorkflowSource) GetFsMounts() ([]FsMount, error)
func (*WorkflowSource) IsDisabled ¶
func (wfs *WorkflowSource) IsDisabled(componentName string) (bool, error)
func (*WorkflowSource) ProcessRuntimeComponents ¶ added in v0.14.3
func (wfs *WorkflowSource) ProcessRuntimeComponents(components map[string]Component, componentType string, yamlMap map[string]interface{}, componentsMap map[string]interface{}) error
对Step的DockerEnv、Cache进行全局替换
func (*WorkflowSource) TransToRunYamlRaw ¶ added in v0.14.3
func (wfs *WorkflowSource) TransToRunYamlRaw() (runYamlRaw string, err error)
func (*WorkflowSource) UnmarshalJSON ¶ added in v0.14.5
func (wfs *WorkflowSource) UnmarshalJSON(data []byte) error
type WorkflowSourceDag ¶ added in v0.14.3
type WorkflowSourceDag struct { Name string `yaml:"-" json:"name"` Type string `yaml:"-" json:"type"` LoopArgument interface{} `yaml:"loop_argument" json:"loopArgument"` Condition string `yaml:"condition" json:"condition"` Parameters map[string]interface{} `yaml:"parameters" json:"parameters"` Deps string `yaml:"deps" json:"deps"` Artifacts Artifacts `yaml:"artifacts" json:"artifacts"` EntryPoints map[string]Component `yaml:"entry_points" json:"entryPoints"` }
func (*WorkflowSourceDag) DeepCopy ¶ added in v0.14.3
func (d *WorkflowSourceDag) DeepCopy() Component
func (*WorkflowSourceDag) GetArtifactPath ¶ added in v0.14.3
func (d *WorkflowSourceDag) GetArtifactPath(artName string) (string, error)
获取 artifact 的路径
func (*WorkflowSourceDag) GetArtifacts ¶ added in v0.14.3
func (d *WorkflowSourceDag) GetArtifacts() Artifacts
func (*WorkflowSourceDag) GetCondition ¶ added in v0.14.3
func (d *WorkflowSourceDag) GetCondition() string
func (*WorkflowSourceDag) GetDeps ¶ added in v0.14.3
func (d *WorkflowSourceDag) GetDeps() []string
func (*WorkflowSourceDag) GetInputArtifactPath ¶ added in v0.14.3
func (d *WorkflowSourceDag) GetInputArtifactPath(artName string) (string, error)
获取 输入artifact的存储路径
func (*WorkflowSourceDag) GetLoopArgument ¶ added in v0.14.3
func (d *WorkflowSourceDag) GetLoopArgument() interface{}
func (*WorkflowSourceDag) GetLoopArgumentLength ¶ added in v0.14.3
func (d *WorkflowSourceDag) GetLoopArgumentLength() int
func (*WorkflowSourceDag) GetName ¶ added in v0.14.3
func (d *WorkflowSourceDag) GetName() string
func (*WorkflowSourceDag) GetOutputArtifactPath ¶ added in v0.14.3
func (d *WorkflowSourceDag) GetOutputArtifactPath(artName string) (string, error)
获取输出artifact的存储路径
func (*WorkflowSourceDag) GetParameterValue ¶ added in v0.14.3
func (d *WorkflowSourceDag) GetParameterValue(paramName string) (interface{}, error)
获取指定 parameter 的值
func (*WorkflowSourceDag) GetParameters ¶ added in v0.14.3
func (d *WorkflowSourceDag) GetParameters() map[string]interface{}
func (*WorkflowSourceDag) GetSubComponet ¶ added in v0.14.3
func (d *WorkflowSourceDag) GetSubComponet(subComponentName string) (Component, bool)
func (*WorkflowSourceDag) GetType ¶ added in v0.14.3
func (d *WorkflowSourceDag) GetType() string
func (*WorkflowSourceDag) InitInputArtifacts ¶ added in v0.14.3
func (d *WorkflowSourceDag) InitInputArtifacts()
func (*WorkflowSourceDag) InitOutputArtifacts ¶ added in v0.14.3
func (d *WorkflowSourceDag) InitOutputArtifacts()
func (*WorkflowSourceDag) InitParameters ¶ added in v0.14.3
func (d *WorkflowSourceDag) InitParameters()
func (*WorkflowSourceDag) UpdateCondition ¶ added in v0.14.3
func (d *WorkflowSourceDag) UpdateCondition(condition string)
func (*WorkflowSourceDag) UpdateDeps ¶ added in v0.14.3
func (d *WorkflowSourceDag) UpdateDeps(deps string)
func (*WorkflowSourceDag) UpdateLoopArguemt ¶ added in v0.14.3
func (d *WorkflowSourceDag) UpdateLoopArguemt(loopArgument interface{})
func (*WorkflowSourceDag) UpdateName ¶ added in v0.14.3
func (d *WorkflowSourceDag) UpdateName(name string)
type WorkflowSourceStep ¶
type WorkflowSourceStep struct { Name string `yaml:"-" json:"name"` LoopArgument interface{} `yaml:"loop_argument" json:"loopArgument"` Condition string `yaml:"condition" json:"condition"` Parameters map[string]interface{} `yaml:"parameters" json:"parameters"` Command string `yaml:"command" json:"command"` Deps string `yaml:"deps" json:"deps"` Artifacts Artifacts `yaml:"artifacts" json:"artifacts"` Env map[string]string `yaml:"env" json:"env"` DockerEnv string `yaml:"docker_env" json:"dockerEnv"` Cache Cache `yaml:"cache" json:"cache"` Reference Reference `yaml:"reference" json:"reference"` ExtraFS []FsMount `yaml:"extra_fs" json:"extraFS"` }
func (*WorkflowSourceStep) DeepCopy ¶ added in v0.14.3
func (s *WorkflowSourceStep) DeepCopy() Component
func (*WorkflowSourceStep) GetArtifactPath ¶ added in v0.14.3
func (s *WorkflowSourceStep) GetArtifactPath(artName string) (string, error)
获取 artifact 的路径
func (*WorkflowSourceStep) GetArtifacts ¶ added in v0.14.3
func (s *WorkflowSourceStep) GetArtifacts() Artifacts
func (*WorkflowSourceStep) GetCondition ¶ added in v0.14.3
func (s *WorkflowSourceStep) GetCondition() string
func (*WorkflowSourceStep) GetDeps ¶
func (s *WorkflowSourceStep) GetDeps() []string
func (*WorkflowSourceStep) GetInputArtifactPath ¶ added in v0.14.3
func (s *WorkflowSourceStep) GetInputArtifactPath(artName string) (string, error)
获取 输入artifact的存储路径
func (*WorkflowSourceStep) GetLoopArgument ¶ added in v0.14.3
func (s *WorkflowSourceStep) GetLoopArgument() interface{}
func (*WorkflowSourceStep) GetLoopArgumentLength ¶ added in v0.14.3
func (s *WorkflowSourceStep) GetLoopArgumentLength() int
func (*WorkflowSourceStep) GetName ¶ added in v0.14.3
func (s *WorkflowSourceStep) GetName() string
func (*WorkflowSourceStep) GetOutputArtifactPath ¶ added in v0.14.3
func (s *WorkflowSourceStep) GetOutputArtifactPath(artName string) (string, error)
获取输出artifact的存储路径
func (*WorkflowSourceStep) GetParameterValue ¶ added in v0.14.3
func (s *WorkflowSourceStep) GetParameterValue(paramName string) (interface{}, error)
获取指定 parameter 的值
func (*WorkflowSourceStep) GetParameters ¶ added in v0.14.3
func (s *WorkflowSourceStep) GetParameters() map[string]interface{}
func (*WorkflowSourceStep) GetType ¶ added in v0.14.3
func (s *WorkflowSourceStep) GetType() string
func (*WorkflowSourceStep) InitInputArtifacts ¶ added in v0.14.3
func (s *WorkflowSourceStep) InitInputArtifacts()
func (*WorkflowSourceStep) InitOutputArtifacts ¶ added in v0.14.3
func (s *WorkflowSourceStep) InitOutputArtifacts()
func (*WorkflowSourceStep) InitParameters ¶ added in v0.14.3
func (s *WorkflowSourceStep) InitParameters()
func (*WorkflowSourceStep) UpdateCondition ¶ added in v0.14.3
func (s *WorkflowSourceStep) UpdateCondition(condition string)
func (*WorkflowSourceStep) UpdateDeps ¶ added in v0.14.3
func (s *WorkflowSourceStep) UpdateDeps(deps string)
func (*WorkflowSourceStep) UpdateLoopArguemt ¶ added in v0.14.3
func (s *WorkflowSourceStep) UpdateLoopArguemt(loopArgument interface{})
func (*WorkflowSourceStep) UpdateName ¶ added in v0.14.3
func (s *WorkflowSourceStep) UpdateName(name string)