Documentation ¶
Index ¶
- Constants
- func CheckReg(str, pattern string) bool
- func CheckScalarResource(res string) error
- func ConcatenatePVCName(fsID string) string
- func ConcatenatePVName(namespace, fsID string) string
- func GetBindSource(fsID string) string
- func ID(userName, fsName string) string
- func IsEmptyResource(resourceInfo ResourceInfo) bool
- func IsImmutableJobStatus(status JobStatus) bool
- func IsValidFsMetaDriver(metaDriver string) bool
- func ProcessStepCacheByMap(cache *Cache, globalCacheMap map[string]interface{}, ...) error
- func ProcessStepFsMount(fsMountList *[]FsMount, globalFsMountList []interface{}) error
- func RunYaml2Map(runYaml []byte) (map[string]interface{}, error)
- func ValidateResource(resourceInfo ResourceInfo, scalarResourcesType []string) error
- func ValidateResourceItem(res string) error
- func ValidateScalarResourceInfo(scalarResources ScalarResourcesType, scalarResourcesType []string) error
- type ActionType
- type Artifacts
- type Cache
- type ClientOptions
- type Cluster
- type Component
- type ComponentView
- type Conf
- func (c *Conf) Framework() Framework
- func (c *Conf) GetAllFileSystem() []FileSystem
- func (c *Conf) GetAnnotations() map[string]string
- func (c *Conf) GetArgs() []string
- func (c *Conf) GetClusterID() string
- func (c *Conf) GetCommand() string
- func (c *Conf) GetEnv() map[string]string
- func (c *Conf) GetEnvSubset(prefix string) map[string]string
- func (c *Conf) GetEnvValue(key string) string
- func (c *Conf) GetExtraFS() []FileSystem
- func (c *Conf) GetFileSystem() FileSystem
- func (c *Conf) GetFlavour() string
- func (c *Conf) GetImage() string
- func (c *Conf) GetJobMode() string
- func (c *Conf) GetLabels() map[string]string
- func (c *Conf) GetLimitFlavour() string
- func (c *Conf) GetName() string
- func (c *Conf) GetNamespace() string
- func (c *Conf) GetPriority() string
- func (c *Conf) GetProcessedFileSystem() []FileSystem
- func (c *Conf) GetQueueID() string
- func (c *Conf) GetQueueName() string
- func (c *Conf) GetRestartPolicy() string
- func (c *Conf) GetUserName() string
- func (c *Conf) SetAnnotations(k, v string)
- func (c *Conf) SetClusterID(id string)
- func (c *Conf) SetEnv(name, value string)
- func (c *Conf) SetFlavour(flavourKey string)
- func (c *Conf) SetLabels(k, v string)
- func (c *Conf) SetNamespace(ns string)
- func (c *Conf) SetPriority(pc string)
- func (c *Conf) SetProcessedFileSystem(fs []FileSystem)
- func (c *Conf) SetQueueID(id string)
- func (c *Conf) SetQueueName(queueName string)
- func (c *Conf) Type() JobType
- type DagView
- func (d DagView) GetComponentName() string
- func (d DagView) GetDeps() string
- func (d DagView) GetEndTime() string
- func (d DagView) GetMsg() string
- func (d DagView) GetName() string
- func (d DagView) GetParentDagID() string
- func (d DagView) GetSeq() int
- func (d DagView) GetStartTime() string
- func (d DagView) GetStatus() JobStatus
- func (d *DagView) SetDeps(deps string)
- type FailureOptions
- type FileSystem
- type Flavour
- type Framework
- type FrameworkVersion
- type FsMount
- type FsOptions
- type FsScope
- type JobLogInfo
- type JobLogRequest
- type JobStatus
- type JobType
- type JobView
- func (j JobView) GetComponentName() string
- func (j JobView) GetDeps() string
- func (j JobView) GetEndTime() string
- func (j JobView) GetMsg() string
- func (j JobView) GetName() string
- func (j JobView) GetParentDagID() string
- func (j JobView) GetSeq() int
- func (j JobView) GetStartTime() string
- func (j JobView) GetStatus() JobStatus
- func (j *JobView) SetDeps(deps string)
- type LogInfo
- type LogRunArtifactRequest
- type LogRunCacheRequest
- type Member
- type MemberRole
- type MixedLogRequest
- type NodeQuotaInfo
- type PFJobConf
- type Parser
- func (p *Parser) IsDag(comp map[string]interface{}) bool
- func (p *Parser) ParseCache(cacheMap map[string]interface{}, cache *Cache) error
- func (p *Parser) ParseComponents(entryPoints map[string]interface{}) (map[string]Component, error)
- func (p *Parser) ParseDag(params map[string]interface{}, dagComp *WorkflowSourceDag) error
- func (p *Parser) ParseFsMount(fsMap map[string]interface{}, fs *FsMount) error
- func (p *Parser) ParseFsOptions(fsMap map[string]interface{}, fs *FsOptions) error
- func (p *Parser) ParseFsScope(fsMap map[string]interface{}, fs *FsScope) error
- func (p *Parser) ParseStep(params map[string]interface{}, step *WorkflowSourceStep) error
- func (p *Parser) ParseWorkflowSource(bodyMap map[string]interface{}, wfs *WorkflowSource) error
- func (p *Parser) TransJsonMap2Yaml(jsonMap map[string]interface{}) error
- type PostProcessView
- type QuotaSummary
- type Reference
- type ResourceInfo
- type ResourceName
- type RunOptions
- type RuntimeView
- type ScalarResourcesType
- type TaskLogInfo
- type TaskStatus
- type WorkflowSource
- func (wfs *WorkflowSource) GetComponentByFullName(fullName string) (Component, error)
- func (wfs *WorkflowSource) GetCompsMapAndRelName(components map[string]Component, absoluteName string) (map[string]Component, string, bool)
- func (wfs *WorkflowSource) GetDisabled() []string
- func (wfs *WorkflowSource) GetFsMounts() ([]FsMount, error)
- func (wfs *WorkflowSource) IsDisabled(componentName string) (bool, error)
- func (wfs *WorkflowSource) ProcessRuntimeComponents(components map[string]Component, componentType string, ...) error
- func (wfs *WorkflowSource) TransToRunYamlRaw() (runYamlRaw string, err error)
- func (wfs *WorkflowSource) UnmarshalJSON(data []byte) error
- type WorkflowSourceDag
- func (d *WorkflowSourceDag) DeepCopy() Component
- func (d *WorkflowSourceDag) GetArtifactPath(artName string) (string, error)
- func (d *WorkflowSourceDag) GetArtifacts() Artifacts
- func (d *WorkflowSourceDag) GetCondition() string
- func (d *WorkflowSourceDag) GetDeps() []string
- func (d *WorkflowSourceDag) GetInputArtifactPath(artName string) (string, error)
- func (d *WorkflowSourceDag) GetLoopArgument() interface{}
- func (d *WorkflowSourceDag) GetLoopArgumentLength() int
- func (d *WorkflowSourceDag) GetName() string
- func (d *WorkflowSourceDag) GetOutputArtifactPath(artName string) (string, error)
- func (d *WorkflowSourceDag) GetParameterValue(paramName string) (interface{}, error)
- func (d *WorkflowSourceDag) GetParameters() map[string]interface{}
- func (d *WorkflowSourceDag) GetSubComponet(subComponentName string) (Component, bool)
- func (d *WorkflowSourceDag) GetType() string
- func (d *WorkflowSourceDag) InitInputArtifacts()
- func (d *WorkflowSourceDag) InitOutputArtifacts()
- func (d *WorkflowSourceDag) InitParameters()
- func (d *WorkflowSourceDag) UpdateCondition(condition string)
- func (d *WorkflowSourceDag) UpdateDeps(deps string)
- func (d *WorkflowSourceDag) UpdateLoopArguemt(loopArgument interface{})
- func (d *WorkflowSourceDag) UpdateName(name string)
- type WorkflowSourceStep
- func (s *WorkflowSourceStep) DeepCopy() Component
- func (s *WorkflowSourceStep) GetArtifactPath(artName string) (string, error)
- func (s *WorkflowSourceStep) GetArtifacts() Artifacts
- func (s *WorkflowSourceStep) GetCondition() string
- func (s *WorkflowSourceStep) GetDeps() []string
- func (s *WorkflowSourceStep) GetInputArtifactPath(artName string) (string, error)
- func (s *WorkflowSourceStep) GetLoopArgument() interface{}
- func (s *WorkflowSourceStep) GetLoopArgumentLength() int
- func (s *WorkflowSourceStep) GetName() string
- func (s *WorkflowSourceStep) GetOutputArtifactPath(artName string) (string, error)
- func (s *WorkflowSourceStep) GetParameterValue(paramName string) (interface{}, error)
- func (s *WorkflowSourceStep) GetParameters() map[string]interface{}
- func (s *WorkflowSourceStep) GetType() string
- func (s *WorkflowSourceStep) InitInputArtifacts()
- func (s *WorkflowSourceStep) InitOutputArtifacts()
- func (s *WorkflowSourceStep) InitParameters()
- func (s *WorkflowSourceStep) UpdateCondition(condition string)
- func (s *WorkflowSourceStep) UpdateDeps(deps string)
- func (s *WorkflowSourceStep) UpdateLoopArguemt(loopArgument interface{})
- func (s *WorkflowSourceStep) UpdateName(name string)
Constants ¶
const ( LocalType = "Local" KubernetesType = "Kubernetes" K3SType = "K3S" )
const ( PFSTypeLocal = "local" PVNameTemplate = "pfs-$(pfs.fs.id)-$(namespace)-pv" PVCNameTemplate = "pfs-$(pfs.fs.id)-pvc" FSIDFormat = "$(pfs.fs.id)" NameSpaceFormat = "$(namespace)" PFSID = "pfs.fs.id" PFSInfo = "pfs.fs.info" PFSCache = "pfs.fs.cache" PFSServer = "pfs.server" PFSClusterID = "pfs.cluster.id" FusePodMntDir = "/home/paddleflow/mnt" FsMetaMemory = "mem" FsMetaDisk = "disk" FuseKeyFsInfo = "fs-info" LabelKeyFsID = "fsID" LabelKeyCacheID = "cacheID" LabelKeyNodeName = "nodename" LabelKeyUsedSize = "usedSize" AnnotationKeyCacheDir = "cacheDir" AnnotationKeyMTime = "modifiedTime" AnnotationKeyMountPrefix = "mount-" EnvKeyMountPodName = "POD_NAME" EnvKeyNamespace = "NAMESPACE" MountPodNamespace = "paddleflow" )
const ( EnvJobType = "PF_JOB_TYPE" EnvJobQueueName = "PF_JOB_QUEUE_NAME" EnvJobQueueID = "PF_JOB_QUEUE_ID" EnvJobClusterName = "PF_JOB_CLUSTER_NAME" EnvJobClusterID = "PF_JOB_CLUSTER_ID" EnvJobNamespace = "PF_JOB_NAMESPACE" EnvJobUserName = "PF_USER_NAME" EnvJobFsID = "PF_FS_ID" EnvJobPVCName = "PF_JOB_PVC_NAME" EnvJobPriority = "PF_JOB_PRIORITY" EnvJobMode = "PF_JOB_MODE" EnvJobFramework = "PF_JOB_FRAMEWORK" // EnvJobYamlPath Additional configuration for a specific job EnvJobYamlPath = "PF_JOB_YAML_PATH" EnvIsCustomYaml = "PF_IS_CUSTOM_YAML" // EnvJobWorkDir The working directory of the job, `null` means command without a working directory EnvJobWorkDir = "PF_WORK_DIR" EnvMountPath = "PF_MOUNT_PATH" EnvJobRestartPolicy = "PF_JOB_RESTART_POLICY" EnvEnableJobQueueSync = "PF_JOB_QUEUE_SYNC" // EnvJobModePS env EnvJobModePS = "PS" EnvJobPSPort = "PF_JOB_PS_PORT" EnvJobPServerReplicas = "PF_JOB_PSERVER_REPLICAS" EnvJobPServerFlavour = "PF_JOB_PSERVER_FLAVOUR" EnvJobPServerCommand = "PF_JOB_PSERVER_COMMAND" EnvJobWorkerReplicas = "PF_JOB_WORKER_REPLICAS" EnvJobWorkerFlavour = "PF_JOB_WORKER_FLAVOUR" EnvJobWorkerCommand = "PF_JOB_WORKER_COMMAND" // EnvJobModeCollective env EnvJobModeCollective = "Collective" EnvJobReplicas = "PF_JOB_REPLICAS" EnvJobFlavour = "PF_JOB_FLAVOUR" EnvJobLimitFlavour = "PF_JOB_LIMIT_FLAVOUR" EnvJobLimitFlavourNone = "NONE" // EnvJobModePod env reuse EnvJobReplicas and EnvJobFlavour EnvJobModePod = "Pod" // spark job env EnvJobSparkMainFile = "PF_JOB_SPARK_MAIN_FILE" EnvJobSparkMainClass = "PF_JOB_SPARK_MAIN_CLASS" EnvJobSparkArguments = "PF_JOB_SPARK_ARGUMENTS" EnvJobDriverFlavour = "PF_JOB_DRIVER_FLAVOUR" EnvJobExecutorReplicas = "PF_JOB_EXECUTOR_REPLICAS" EnvJobExecutorFlavour = "PF_JOB_EXECUTOR_FLAVOUR" // TODO move to framework TypeVcJob JobType = "vcjob" TypeSparkJob JobType = "spark" TypePaddleJob JobType = "paddlejob" TypePodJob JobType = "pod" TypeDeployment JobType = "deployment" StatusJobInit JobStatus = "init" StatusJobPending JobStatus = "pending" StatusJobRunning JobStatus = "running" StatusJobFailed JobStatus = "failed" StatusJobSucceeded JobStatus = "succeeded" StatusJobTerminating JobStatus = "terminating" StatusJobTerminated JobStatus = "terminated" StatusJobCancelled JobStatus = "cancelled" StatusJobSkipped JobStatus = "skipped" StatusTaskPending TaskStatus = "pending" StatusTaskRunning TaskStatus = "running" StatusTaskSucceeded TaskStatus = "succeeded" StatusTaskFailed TaskStatus = "failed" RoleMaster MemberRole = "master" RoleWorker MemberRole = "worker" RoleDriver MemberRole = "driver" RoleExecutor MemberRole = "executor" RolePServer MemberRole = "pserver" RolePWorker MemberRole = "pworker" TypeSingle JobType = "single" TypeDistributed JobType = "distributed" TypeWorkflow JobType = "workflow" FrameworkSpark Framework = "spark" FrameworkMPI Framework = "mpi" FrameworkTF Framework = "tensorflow" FrameworkPytorch Framework = "pytorch" FrameworkPaddle Framework = "paddle" FrameworkMXNet Framework = "mxnet" FrameworkRay Framework = "ray" FrameworkStandalone Framework = "standalone" ListenerTypeJob = "job" ListenerTypeTask = "task" ListenerTypeQueue = "queue" ListenerTypeNode = "node" ListenerTypeNodeTask = "nodeTask" EnvPFNodeLabels = "PF_NODE_LABELS" EnvPFTaskLabels = "PF_TASK_LABELS" EnvPFResourceFilter = "PF_NODE_RESOURCES_FILTER" PFNodeLabels = "resource-isolation-type" // job priority EnvJobVeryLowPriority = "VERY_LOW" EnvJobLowPriority = "LOW" EnvJobNormalPriority = "NORMAL" EnvJobHighPriority = "HIGH" EnvJobVeryHighPriority = "VERY_HIGH" // priority class PriorityClassVeryLow = "very-low" PriorityClassLow = "low" PriorityClassNormal = "normal" PriorityClassHigh = "high" PriorityClassVeryHigh = "very-high" JobOwnerLabel = "owner" JobOwnerValue = "paddleflow" JobIDLabel = "paddleflow-job-id" JobTTLSeconds = "padleflow/job-ttl-seconds" JobLabelFramework = "paddleflow-job-framework" VolcanoJobNameLabel = "volcano.sh/job-name" QueueLabelKey = "volcano.sh/queue-name" SparkAPPJobNameLabel = "sparkoperator.k8s.io/app-name" JobPrefix = "job" DefaultSchedulerName = "volcano" DefaultFSMountPath = "/home/paddleflow/storage/mnt" // EnvPaddleParaJob defines env for Paddle Para Job EnvPaddleParaJob = "PF_PADDLE_PARA_JOB" EnvPaddleParaPriority = "PF_PADDLE_PARA_PRIORITY" EnvPaddleParaConfigHostFile = "PF_PADDLE_PARA_CONFIG_FILE" // PaddleParaVolumeName defines config for Paddle Para Pod PaddleParaVolumeName = "paddle-para-conf-volume" PaddleParaAnnotationKeyJobName = "paddle-para/job-name" PaddleParaAnnotationKeyPriority = "paddle-para/priority" PaddleParaEnvJobName = "FLAGS_job_name" PaddleParaEnvGPUConfigFile = "GPU_CONFIG_FILE" PaddleParaGPUConfigFilePath = "/opt/paddle/para/gpu_config.json" // RayJob keywords EnvRayJobEntryPoint = "RAY_JOB_ENTRY_POINT" EnvRayJobRuntimeEnv = "RAY_JOB_RUNTIME_ENV" EnvRayJobEnableAutoScaling = "RAY_JOB_ENABLE_AUTOSCALING" EnvRayJobAutoScalingMode = "RAY_JOB_AUTOSCALING_MODE" EnvRayJobAutoScalingTimeout = "RAY_JOB_AUTOSCALING_IDLE_TIMEOUT" EnvRayJobHeaderFlavour = "RAY_JOB_HEADER_FLAVOUR" EnvRayJobHeaderImage = "RAY_JOB_HEADER_IMAGE" EnvRayJobHeaderPriority = "RAY_JOB_HEADER_PRIORITY" EnvRayJobHeaderPreStop = "RAY_JOB_HEADER_PRE_STOP" EnvRayJobHeaderStartParamsPrefix = "RAY_JOB_HEADER_START_PARAMS_" EnvRayJobWorkerGroupName = "RAY_JOB_WORKER_GROUP_NAME" EnvRayJobWorkerFlavour = "RAY_JOB_WORKER_FLAVOUR" EnvRayJobWorkerImage = "RAY_JOB_WORKER_IMAGE" EnvRayJobWorkerPriority = "RAY_JOB_WORKER_PRIORITY" EnvRayJobWorkerReplicas = "RAY_JOB_WORKER_REPLICAS" EnvRayJobWorkerMinReplicas = "RAY_JOB_WORKER_MIN_REPLICAS" EnvRayJobWorkerMaxReplicas = "RAY_JOB_WORKER_MAX_REPLICAS" EnvRayJobWorkerStartParamsPrefix = "RAY_JOB_WORKER_START_PARAMS_" // ENVK3SNodeName makesure schedule without volcano ENVK3SNodeName = "K3S_NODE_NAME" )
const ( StatusQueueCreating = "creating" StatusQueueOpen = "open" StatusQueueUpdating = "updating" StatusQueueClosing = "closing" StatusQueueClosed = "closed" TypeElasticQuota = "elasticQuota" TypeVolcanoCapabilityQuota = "volcanoCapabilityQuota" )
const ( ArtifactTypeInput = "input" ArtifactTypeOutput = "output" EntryPointsStr = "entry_points" CacheAttributeEnable = "enable" CacheAttributeMaxExpiredTime = "max_expired_time" CacheAttributeFsScope = "fs_scope" FailureStrategyFailFast = "fail_fast" FailureStrategyContinue = "continue" EnvDockerEnv = "dockerEnv" FsPrefix = "fs-" CompTypeComponents = "components" CompTypeEntryPoints = "entryPoints" CompTypePostProcess = "postProcess" )
Variables ¶
This section is empty.
Functions ¶
func CheckScalarResource ¶
func ConcatenatePVCName ¶ added in v0.14.3
func ConcatenatePVName ¶ added in v0.14.3
func GetBindSource ¶
func IsEmptyResource ¶
func IsEmptyResource(resourceInfo ResourceInfo) bool
IsEmptyResource return true when cpu or mem is nil
func IsImmutableJobStatus ¶
func IsValidFsMetaDriver ¶
func ProcessStepCacheByMap ¶ added in v0.14.3
func ProcessStepFsMount ¶ added in v0.14.3
func RunYaml2Map ¶ added in v0.14.3
将yaml解析为map
func ValidateResource ¶ added in v0.14.3
func ValidateResource(resourceInfo ResourceInfo, scalarResourcesType []string) error
ValidateResource validate resource info
func ValidateResourceItem ¶ added in v0.14.3
ValidateResourceItem check resource for cpu or memory
func ValidateScalarResourceInfo ¶
func ValidateScalarResourceInfo(scalarResources ScalarResourcesType, scalarResourcesType []string) error
ValidateScalarResourceInfo validate scalar resource info
Types ¶
type ActionType ¶
type ActionType string
const ( Create ActionType = "create" Update ActionType = "update" Delete ActionType = "delete" Terminate ActionType = "terminate" )
type Artifacts ¶
type Artifacts struct { Input map[string]string `yaml:"input" json:"input"` Output map[string]string `yaml:"output" json:"output"` }
func (*Artifacts) ValidateOutputMapByList ¶
type ClientOptions ¶
ClientOptions used to build rest config.
type Cluster ¶
type Cluster struct { ID string Name string Type string // ClientOpt defines client config for cluster ClientOpt ClientOptions }
type Component ¶ added in v0.14.3
type Component interface { GetDeps() []string GetArtifacts() Artifacts GetArtifactPath(artName string) (string, error) GetInputArtifactPath(artName string) (string, error) GetOutputArtifactPath(artName string) (string, error) GetParameters() map[string]interface{} GetParameterValue(paramName string) (interface{}, error) GetCondition() string GetLoopArgument() interface{} GetLoopArgumentLength() int GetType() string GetName() string // 下面几个Update 函数在进行模板替换的时候会用到 UpdateCondition(string) UpdateLoopArguemt(interface{}) UpdateName(name string) UpdateDeps(deps string) InitInputArtifacts() InitOutputArtifacts() InitParameters() // 用于 deepCopy, 避免复用时出现问题 DeepCopy() Component }
Component包括Dag和Step,有Struct WorkflowSourceStep 和 WorkflowSourceDag实现了该接口
type ComponentView ¶ added in v0.14.3
type Conf ¶
type Conf struct { Name string `json:"name"` // 存储资源 FileSystem FileSystem `json:"fs,omitempty"` ExtraFileSystem []FileSystem `json:"extraFS,omitempty"` // 计算资源 Flavour Flavour `json:"flavour,omitempty"` LimitFlavour Flavour `json:"limitFlavour,omitempty"` Priority string `json:"priority"` ClusterID string `json:"clusterID"` QueueID string `json:"queueID"` QueueName string `json:"queueName,omitempty"` // 运行时需要的参数 Labels map[string]string `json:"labels"` Annotations map[string]string `json:"annotations"` Env map[string]string `json:"env,omitempty"` Command string `json:"command,omitempty"` Image string `json:"image"` Port int `json:"port,omitempty"` Args []string `json:"args,omitempty"` // contains filtered or unexported fields }
func (*Conf) GetAllFileSystem ¶ added in v0.14.3
func (c *Conf) GetAllFileSystem() []FileSystem
GetAllFileSystem combine FileSystem and ExtraFileSystem to a slice
func (*Conf) GetAnnotations ¶ added in v0.14.5
func (*Conf) GetClusterID ¶
func (*Conf) GetCommand ¶
func (*Conf) GetEnvSubset ¶ added in v0.14.5
func (*Conf) GetEnvValue ¶ added in v0.14.5
func (*Conf) GetExtraFS ¶ added in v0.14.3
func (c *Conf) GetExtraFS() []FileSystem
func (*Conf) GetFileSystem ¶ added in v0.14.3
func (c *Conf) GetFileSystem() FileSystem
func (*Conf) GetFlavour ¶
func (*Conf) GetJobMode ¶
func (*Conf) GetLimitFlavour ¶ added in v0.14.6
func (*Conf) GetNamespace ¶
func (*Conf) GetPriority ¶
func (*Conf) GetProcessedFileSystem ¶ added in v0.14.6
func (c *Conf) GetProcessedFileSystem() []FileSystem
func (*Conf) GetQueueID ¶
func (*Conf) GetQueueName ¶
func (*Conf) GetRestartPolicy ¶ added in v0.14.5
func (*Conf) GetUserName ¶
func (*Conf) SetAnnotations ¶
func (*Conf) SetClusterID ¶
func (*Conf) SetFlavour ¶
func (*Conf) SetNamespace ¶
func (*Conf) SetPriority ¶
func (*Conf) SetProcessedFileSystem ¶ added in v0.14.6
func (c *Conf) SetProcessedFileSystem(fs []FileSystem)
func (*Conf) SetQueueID ¶
func (*Conf) SetQueueName ¶
SetQueueName set queue name
type DagView ¶ added in v0.14.3
type DagView struct { PK int64 `json:"-"` DagID string `json:"id"` Name string `json:"name"` Type string `json:"type"` DagName string `json:"dagName"` ParentDagID string `json:"parentDagID"` LoopSeq int `json:"-"` Deps string `json:"deps"` Parameters map[string]string `json:"parameters"` Artifacts Artifacts `json:"artifacts"` StartTime string `json:"startTime"` EndTime string `json:"endTime"` Status JobStatus `json:"status"` Message string `json:"message"` EntryPoints map[string][]ComponentView `json:"entryPoints"` }
func (DagView) GetComponentName ¶ added in v0.14.3
func (DagView) GetEndTime ¶ added in v0.14.5
func (DagView) GetParentDagID ¶ added in v0.14.3
func (DagView) GetStartTime ¶ added in v0.14.5
type FailureOptions ¶
type FailureOptions struct {
Strategy string `yaml:"strategy" json:"strategy"`
}
type FileSystem ¶
type FileSystem struct { ID string `json:"id,omitempty"` Name string `json:"name"` Type string `json:"type"` HostPath string `json:"hostPath,omitempty"` MountPath string `json:"mountPath,omitempty"` SubPath string `json:"subPath,omitempty"` ReadOnly bool `json:"readOnly,omitempty"` }
FileSystem indicate PaddleFlow
type Flavour ¶
type Flavour struct { ResourceInfo `yaml:",inline"` Name string `json:"name" yaml:"name"` }
Flavour is a set of resources that can be used to run a job.
type FrameworkVersion ¶ added in v0.14.5
type FrameworkVersion struct { Framework string `json:"framework"` APIVersion string `json:"apiVersion"` }
func NewFrameworkVersion ¶ added in v0.14.5
func NewFrameworkVersion(framework, apiVersion string) FrameworkVersion
func (*FrameworkVersion) String ¶ added in v0.14.5
func (f *FrameworkVersion) String() string
type JobLogInfo ¶
type JobLogInfo struct { JobID string `json:"jobID"` TaskList []TaskLogInfo `json:"taskList"` ResourceName string `json:"name"` Resourcetype string `json:"type"` Events []string `json:"eventList"` }
type JobLogRequest ¶
type JobView ¶
type JobView struct { PK int64 `json:"-"` JobID string `json:"jobID"` Name string `json:"name"` Type string `json:"type"` StepName string `json:"stepName"` ParentDagID string `json:"parentDagID"` LoopSeq int `json:"-"` Command string `json:"command"` Parameters map[string]string `json:"parameters"` Env map[string]string `json:"env"` ExtraFS []FsMount `json:"extraFS"` StartTime string `json:"startTime"` EndTime string `json:"endTime"` Status JobStatus `json:"status"` Deps string `json:"deps"` DockerEnv string `json:"dockerEnv"` Artifacts Artifacts `json:"artifacts"` Cache Cache `json:"cache"` JobMessage string `json:"jobMessage"` CacheRunID string `json:"cacheRunID"` CacheJobID string `json:"cacheJobID"` }
JobView is view of job info responded to user, while Job is for pipeline and job engine to process
func (JobView) GetComponentName ¶ added in v0.14.3
func (JobView) GetEndTime ¶ added in v0.14.5
func (JobView) GetParentDagID ¶ added in v0.14.3
func (JobView) GetStartTime ¶ added in v0.14.5
type LogRunArtifactRequest ¶
type LogRunArtifactRequest struct { Md5 string `json:"md5"` RunID string `json:"runID"` FsID string `json:"fsID"` FsName string `json:"fsname"` UserName string `json:"username"` ArtifactPath string `json:"artifactPath"` Step string `json:"step"` JobID string `json:"jobID"` Type string `json:"type"` ArtifactName string `json:"artifactName"` Meta string `json:"meta"` }
type LogRunCacheRequest ¶
type LogRunCacheRequest struct { FirstFp string `json:"firstFp"` SecondFp string `json:"secondFp"` Source string `json:"source"` RunID string `json:"runID"` Step string `json:"step"` JobID string `json:"jobID"` FsID string `json:"fsID"` FsName string `json:"fsname"` UserName string `json:"username"` ExpiredTime string `json:"expiredTime"` Strategy string `json:"strategy"` }
type Member ¶ added in v0.14.5
type Member struct { ID string `json:"id"` Replicas int `json:"replicas"` Role MemberRole `json:"role"` Conf `json:",inline"` }
type MemberRole ¶
type MemberRole string
type MixedLogRequest ¶ added in v0.14.6
type MixedLogRequest struct { Name string Namespace string ResourceType string Framework string LineLimit string SizeLimit int64 IsReadFromTail bool }
MixedLogRequest can request job log or k8s pod/deploy events and log
type NodeQuotaInfo ¶
type PFJobConf ¶
type PFJobConf interface { GetName() string GetEnv() map[string]string GetEnvValue(key string) string GetEnvSubset(prefix string) map[string]string GetCommand() string GetImage() string GetFileSystem() FileSystem GetExtraFS() []FileSystem GetArgs() []string GetPriority() string SetPriority(string) GetQueueName() string GetQueueID() string GetClusterID() string GetUserName() string GetNamespace() string GetFlavour() string GetLimitFlavour() string SetQueueID(string) SetClusterID(string) SetNamespace(string) SetEnv(string, string) SetLabels(string, string) SetAnnotations(string, string) Type() JobType Framework() Framework }
type Parser ¶ added in v0.14.3
type Parser struct { }
func (*Parser) ParseCache ¶ added in v0.14.3
func (*Parser) ParseComponents ¶ added in v0.14.3
func (*Parser) ParseDag ¶ added in v0.14.3
func (p *Parser) ParseDag(params map[string]interface{}, dagComp *WorkflowSourceDag) error
该函数用于给生成给WorkflowSourceDag的各个字段赋值,但不会进行默认值填充,不会进行全局参数对局部参数的替换
func (*Parser) ParseFsMount ¶ added in v0.14.3
func (*Parser) ParseFsOptions ¶ added in v0.14.3
func (*Parser) ParseFsScope ¶ added in v0.14.3
func (*Parser) ParseStep ¶ added in v0.14.3
func (p *Parser) ParseStep(params map[string]interface{}, step *WorkflowSourceStep) error
func (*Parser) ParseWorkflowSource ¶ added in v0.14.3
func (p *Parser) ParseWorkflowSource(bodyMap map[string]interface{}, wfs *WorkflowSource) error
该函数将请求体解析成WorkflowSource, 该函数未完成全局替换操作
func (*Parser) TransJsonMap2Yaml ¶ added in v0.14.3
type PostProcessView ¶
type QuotaSummary ¶
type Reference ¶ added in v0.14.3
type Reference struct {
Component string `yaml:"component" json:"component"`
}
type ResourceInfo ¶
type ResourceInfo struct { CPU string `json:"cpu" yaml:"cpu"` Mem string `json:"mem" yaml:"mem"` ScalarResources ScalarResourcesType `json:"scalarResources,omitempty" yaml:"scalarResources,omitempty"` }
ResourceInfo is a struct that contains the information of a resource.
func (ResourceInfo) ToMap ¶ added in v0.14.3
func (r ResourceInfo) ToMap() map[string]string
type ResourceName ¶
type ResourceName string
ResourceName is the name identifying various resources in a ResourceList.
type RunOptions ¶ added in v0.14.3
type RuntimeView ¶
type RuntimeView map[string][]ComponentView
RuntimeView is view of run responded to user, while workflowRuntime is for pipeline engine to process
func (*RuntimeView) UnmarshalJSON ¶ added in v0.14.5
func (rv *RuntimeView) UnmarshalJSON(data []byte) error
type ScalarResourcesType ¶
type ScalarResourcesType map[ResourceName]string
ScalarResourcesType is the type of scalar resources
type TaskLogInfo ¶
type TaskStatus ¶
type TaskStatus string
type WorkflowSource ¶
type WorkflowSource struct { Name string `yaml:"name" json:"name"` DockerEnv string `yaml:"docker_env" json:"dockerEnv"` EntryPoints WorkflowSourceDag `yaml:"entry_points" json:"entryPoints"` Components map[string]Component `yaml:"components" json:"components"` Cache Cache `yaml:"cache" json:"cache"` Parallelism int `yaml:"parallelism" json:"parallelism"` Disabled string `yaml:"disabled" json:"disabled"` FailureOptions FailureOptions `yaml:"failure_options" json:"failureOptions"` PostProcess map[string]*WorkflowSourceStep `yaml:"post_process" json:"postProcess"` FsOptions FsOptions `yaml:"fs_options" json:"fsOptions"` }
func GetWorkflowSource ¶ added in v0.14.3
func GetWorkflowSource(runYaml []byte) (WorkflowSource, error)
该函数除了将yaml解析为wfs,还进行了全局参数替换操作
func GetWorkflowSourceByMap ¶ added in v0.14.3
func GetWorkflowSourceByMap(yamlMap map[string]interface{}) (WorkflowSource, error)
由Map解析得到一个Wfs,该Map中的key需要是下划线格式
func (*WorkflowSource) GetComponentByFullName ¶ added in v0.14.3
func (wfs *WorkflowSource) GetComponentByFullName(fullName string) (Component, error)
func (*WorkflowSource) GetCompsMapAndRelName ¶ added in v0.14.3
func (wfs *WorkflowSource) GetCompsMapAndRelName(components map[string]Component, absoluteName string) (map[string]Component, string, bool)
递归的检查Absolute Name对应的Component是否存在,并返回该Comp的所有同级别节点,和它的Relative Name
func (*WorkflowSource) GetDisabled ¶
func (wfs *WorkflowSource) GetDisabled() []string
func (*WorkflowSource) GetFsMounts ¶ added in v0.14.3
func (wfs *WorkflowSource) GetFsMounts() ([]FsMount, error)
func (*WorkflowSource) IsDisabled ¶
func (wfs *WorkflowSource) IsDisabled(componentName string) (bool, error)
func (*WorkflowSource) ProcessRuntimeComponents ¶ added in v0.14.3
func (wfs *WorkflowSource) ProcessRuntimeComponents(components map[string]Component, componentType string, yamlMap map[string]interface{}, componentsMap map[string]interface{}) error
对Step的DockerEnv、Cache进行全局替换
func (*WorkflowSource) TransToRunYamlRaw ¶ added in v0.14.3
func (wfs *WorkflowSource) TransToRunYamlRaw() (runYamlRaw string, err error)
func (*WorkflowSource) UnmarshalJSON ¶ added in v0.14.5
func (wfs *WorkflowSource) UnmarshalJSON(data []byte) error
type WorkflowSourceDag ¶ added in v0.14.3
type WorkflowSourceDag struct { Name string `yaml:"-" json:"name"` Type string `yaml:"-" json:"type"` LoopArgument interface{} `yaml:"loop_argument" json:"loopArgument"` Condition string `yaml:"condition" json:"condition"` Parameters map[string]interface{} `yaml:"parameters" json:"parameters"` Deps string `yaml:"deps" json:"deps"` Artifacts Artifacts `yaml:"artifacts" json:"artifacts"` EntryPoints map[string]Component `yaml:"entry_points" json:"entryPoints"` }
func (*WorkflowSourceDag) DeepCopy ¶ added in v0.14.3
func (d *WorkflowSourceDag) DeepCopy() Component
func (*WorkflowSourceDag) GetArtifactPath ¶ added in v0.14.3
func (d *WorkflowSourceDag) GetArtifactPath(artName string) (string, error)
获取 artifact 的路径
func (*WorkflowSourceDag) GetArtifacts ¶ added in v0.14.3
func (d *WorkflowSourceDag) GetArtifacts() Artifacts
func (*WorkflowSourceDag) GetCondition ¶ added in v0.14.3
func (d *WorkflowSourceDag) GetCondition() string
func (*WorkflowSourceDag) GetDeps ¶ added in v0.14.3
func (d *WorkflowSourceDag) GetDeps() []string
func (*WorkflowSourceDag) GetInputArtifactPath ¶ added in v0.14.3
func (d *WorkflowSourceDag) GetInputArtifactPath(artName string) (string, error)
获取 输入artifact的存储路径
func (*WorkflowSourceDag) GetLoopArgument ¶ added in v0.14.3
func (d *WorkflowSourceDag) GetLoopArgument() interface{}
func (*WorkflowSourceDag) GetLoopArgumentLength ¶ added in v0.14.3
func (d *WorkflowSourceDag) GetLoopArgumentLength() int
func (*WorkflowSourceDag) GetName ¶ added in v0.14.3
func (d *WorkflowSourceDag) GetName() string
func (*WorkflowSourceDag) GetOutputArtifactPath ¶ added in v0.14.3
func (d *WorkflowSourceDag) GetOutputArtifactPath(artName string) (string, error)
获取输出artifact的存储路径
func (*WorkflowSourceDag) GetParameterValue ¶ added in v0.14.3
func (d *WorkflowSourceDag) GetParameterValue(paramName string) (interface{}, error)
获取指定 parameter 的值
func (*WorkflowSourceDag) GetParameters ¶ added in v0.14.3
func (d *WorkflowSourceDag) GetParameters() map[string]interface{}
func (*WorkflowSourceDag) GetSubComponet ¶ added in v0.14.3
func (d *WorkflowSourceDag) GetSubComponet(subComponentName string) (Component, bool)
func (*WorkflowSourceDag) GetType ¶ added in v0.14.3
func (d *WorkflowSourceDag) GetType() string
func (*WorkflowSourceDag) InitInputArtifacts ¶ added in v0.14.3
func (d *WorkflowSourceDag) InitInputArtifacts()
func (*WorkflowSourceDag) InitOutputArtifacts ¶ added in v0.14.3
func (d *WorkflowSourceDag) InitOutputArtifacts()
func (*WorkflowSourceDag) InitParameters ¶ added in v0.14.3
func (d *WorkflowSourceDag) InitParameters()
func (*WorkflowSourceDag) UpdateCondition ¶ added in v0.14.3
func (d *WorkflowSourceDag) UpdateCondition(condition string)
func (*WorkflowSourceDag) UpdateDeps ¶ added in v0.14.3
func (d *WorkflowSourceDag) UpdateDeps(deps string)
func (*WorkflowSourceDag) UpdateLoopArguemt ¶ added in v0.14.3
func (d *WorkflowSourceDag) UpdateLoopArguemt(loopArgument interface{})
func (*WorkflowSourceDag) UpdateName ¶ added in v0.14.3
func (d *WorkflowSourceDag) UpdateName(name string)
type WorkflowSourceStep ¶
type WorkflowSourceStep struct { Name string `yaml:"-" json:"name"` LoopArgument interface{} `yaml:"loop_argument" json:"loopArgument"` Condition string `yaml:"condition" json:"condition"` Parameters map[string]interface{} `yaml:"parameters" json:"parameters"` Command string `yaml:"command" json:"command"` Deps string `yaml:"deps" json:"deps"` Artifacts Artifacts `yaml:"artifacts" json:"artifacts"` Env map[string]string `yaml:"env" json:"env"` DockerEnv string `yaml:"docker_env" json:"dockerEnv"` Cache Cache `yaml:"cache" json:"cache"` Reference Reference `yaml:"reference" json:"reference"` ExtraFS []FsMount `yaml:"extra_fs" json:"extraFS"` }
func (*WorkflowSourceStep) DeepCopy ¶ added in v0.14.3
func (s *WorkflowSourceStep) DeepCopy() Component
func (*WorkflowSourceStep) GetArtifactPath ¶ added in v0.14.3
func (s *WorkflowSourceStep) GetArtifactPath(artName string) (string, error)
获取 artifact 的路径
func (*WorkflowSourceStep) GetArtifacts ¶ added in v0.14.3
func (s *WorkflowSourceStep) GetArtifacts() Artifacts
func (*WorkflowSourceStep) GetCondition ¶ added in v0.14.3
func (s *WorkflowSourceStep) GetCondition() string
func (*WorkflowSourceStep) GetDeps ¶
func (s *WorkflowSourceStep) GetDeps() []string
func (*WorkflowSourceStep) GetInputArtifactPath ¶ added in v0.14.3
func (s *WorkflowSourceStep) GetInputArtifactPath(artName string) (string, error)
获取 输入artifact的存储路径
func (*WorkflowSourceStep) GetLoopArgument ¶ added in v0.14.3
func (s *WorkflowSourceStep) GetLoopArgument() interface{}
func (*WorkflowSourceStep) GetLoopArgumentLength ¶ added in v0.14.3
func (s *WorkflowSourceStep) GetLoopArgumentLength() int
func (*WorkflowSourceStep) GetName ¶ added in v0.14.3
func (s *WorkflowSourceStep) GetName() string
func (*WorkflowSourceStep) GetOutputArtifactPath ¶ added in v0.14.3
func (s *WorkflowSourceStep) GetOutputArtifactPath(artName string) (string, error)
获取输出artifact的存储路径
func (*WorkflowSourceStep) GetParameterValue ¶ added in v0.14.3
func (s *WorkflowSourceStep) GetParameterValue(paramName string) (interface{}, error)
获取指定 parameter 的值
func (*WorkflowSourceStep) GetParameters ¶ added in v0.14.3
func (s *WorkflowSourceStep) GetParameters() map[string]interface{}
func (*WorkflowSourceStep) GetType ¶ added in v0.14.3
func (s *WorkflowSourceStep) GetType() string
func (*WorkflowSourceStep) InitInputArtifacts ¶ added in v0.14.3
func (s *WorkflowSourceStep) InitInputArtifacts()
func (*WorkflowSourceStep) InitOutputArtifacts ¶ added in v0.14.3
func (s *WorkflowSourceStep) InitOutputArtifacts()
func (*WorkflowSourceStep) InitParameters ¶ added in v0.14.3
func (s *WorkflowSourceStep) InitParameters()
func (*WorkflowSourceStep) UpdateCondition ¶ added in v0.14.3
func (s *WorkflowSourceStep) UpdateCondition(condition string)
func (*WorkflowSourceStep) UpdateDeps ¶ added in v0.14.3
func (s *WorkflowSourceStep) UpdateDeps(deps string)
func (*WorkflowSourceStep) UpdateLoopArguemt ¶ added in v0.14.3
func (s *WorkflowSourceStep) UpdateLoopArguemt(loopArgument interface{})
func (*WorkflowSourceStep) UpdateName ¶ added in v0.14.3
func (s *WorkflowSourceStep) UpdateName(name string)