Documentation ¶
Index ¶
- Constants
- func NewBaseComponentRuntime(name, fullname string, component schema.Component, seq int, ...) *baseComponentRuntime
- func NewInnerSolver(cp schema.Component, runtimeName string, config *runConfig) *innerSolver
- func NewParallelismManager(parallelism int) *parallelismManager
- func NewReferenceSolver(ws *schema.WorkflowSource) *referenceSolver
- func NewRunConfig(workflowSource *schema.WorkflowSource, mainFS *schema.FsMount, ...) *runConfig
- type BaseJob
- type BaseWorkflow
- type CacheCalculator
- type ConditionCalculator
- type CtxAndCancel
- type DagRuntime
- func (crt DagRuntime) CalculateCondition() (bool, error)
- func (drt *DagRuntime) CancellNotReadyComponent(subComponent schema.Component, reason string)
- func (drt *DagRuntime) CreateSubRuntimeAccordingView(view schema.ComponentView, name string) componentRuntime
- func (drt *DagRuntime) GetSubComponentArtifactPaths(componentName string, artName string) (string, error)
- func (drt *DagRuntime) GetSubComponentParameterValue(componentName string, paramName string) (interface{}, error)
- func (drt *DagRuntime) Listen()
- func (drt *DagRuntime) ProcessFailureOptions(event WorkflowEvent)
- func (drt *DagRuntime) ProcessFailureOptionsWithContinue(component schema.Component)
- func (drt *DagRuntime) ProcessFailureOptionsWithFailFast()
- func (drt *DagRuntime) Restart(dagView *schema.DagView)
- func (drt *DagRuntime) Resume(dagView *schema.DagView)
- func (drt *DagRuntime) Start()
- func (drt *DagRuntime) Stop()
- type DependencySolver
- type Job
- type LocalJob
- type PaddleFlowJob
- func (pfj *PaddleFlowJob) Cancelled() bool
- func (pfj *PaddleFlowJob) Check() (schema.JobStatus, error)
- func (pfj *PaddleFlowJob) Failed() bool
- func (pfj *PaddleFlowJob) Job() BaseJob
- func (pfj *PaddleFlowJob) JobID() string
- func (pfj *PaddleFlowJob) NotEnded() bool
- func (pfj *PaddleFlowJob) Skipped() bool
- func (pfj *PaddleFlowJob) Start() (string, error)
- func (pfj *PaddleFlowJob) Started() bool
- func (pfj *PaddleFlowJob) Stop() error
- func (pfj *PaddleFlowJob) Succeeded() bool
- func (pfj *PaddleFlowJob) Terminated() bool
- func (pfj *PaddleFlowJob) Update(cmd string, params map[string]string, envs map[string]string, ...)
- func (pfj *PaddleFlowJob) Validate() error
- func (pfj *PaddleFlowJob) Watch()
- type PathToModTime
- type RuntimeStatus
- type StepRuntime
- type WfEventType
- type WfEventValue
- type Workflow
- func (wf *Workflow) NewWorkflowRuntime() error
- func (wf *Workflow) Restart(entryPointView *schema.DagView, postProcessView schema.PostProcessView)
- func (wf *Workflow) Resume(entryPointView *schema.DagView, postProcessView schema.PostProcessView, ...)
- func (wf *Workflow) Start()
- func (wf *Workflow) Status() string
- func (wf *Workflow) Stop(force bool)
- type WorkflowCallbacks
- type WorkflowEvent
- type WorkflowRuntime
- func (wfr *WorkflowRuntime) IsCompleted() bool
- func (wfr *WorkflowRuntime) Listen()
- func (wfr *WorkflowRuntime) Restart(entryPointView *schema.DagView, postProcessView schema.PostProcessView)
- func (wfr *WorkflowRuntime) Resume(entryPointView *schema.DagView, postProcessView schema.PostProcessView, ...)
- func (wfr *WorkflowRuntime) Start()
- func (wfr *WorkflowRuntime) Status() string
- func (wfr *WorkflowRuntime) Stop(force bool) error
Constants ¶
const ( // 事件类型 WfEventNoraml WfEventType = "Normal" // 正常事件类型,包括状态更新等 WfEventWarn WfEventType = "Warnning" // 警告事件类型,不影响业务运行,但需要关注 WfEventError WfEventType = "Error" // 异常事件类型,影响业务运行,例如其他模块函数返回异常 // 事件值 WfEventFailureOptionsTriggered WfEventValue = "FailureOpitons" WfEventJobUpdate WfEventValue = "JobUpdate" WfEventRunUpdate WfEventValue = "RunUpdate" WfEventDagUpdate WfEventValue = "DagUpdate" WfEventJobSubmitErr WfEventValue = "JobSubmitErr" WfEventJobWatchErr WfEventValue = "JobWatchErr" WfEventJobStopErr WfEventValue = "JobStopErr" )
Variables ¶
This section is empty.
Functions ¶
func NewBaseComponentRuntime ¶ added in v0.14.3
func NewInnerSolver ¶ added in v0.14.3
func NewParallelismManager ¶ added in v0.14.3
func NewParallelismManager(parallelism int) *parallelismManager
func NewReferenceSolver ¶ added in v0.14.3
func NewReferenceSolver(ws *schema.WorkflowSource) *referenceSolver
func NewRunConfig ¶ added in v0.14.3
func NewRunConfig(workflowSource *schema.WorkflowSource, mainFS *schema.FsMount, userName, runID string, logger *logrus.Entry, callbacks WorkflowCallbacks, pplSource string) *runConfig
Types ¶
type BaseJob ¶
type BaseJob struct { ID string `json:"jobID"` Name string `json:"name"` // step名字,不同run的不同step,必须拥有不同名字 Command string `json:"command"` // 区别于step,是替换后的,可以直接运行 Parameters map[string]string `json:"parameters"` // 区别于step,是替换后的,可以直接运行 Artifacts schema.Artifacts `json:"artifacts"` // 区别于step,是替换后的,可以直接运行 Env map[string]string `json:"env"` StartTime string `json:"startTime"` EndTime string `json:"endTime"` Status schema.JobStatus `json:"status"` Message string `json:"message"` }
func NewBaseJob ¶
type BaseWorkflow ¶
type BaseWorkflow struct { Name string `json:"name,omitempty"` RunID string `json:"runId,omitempty"` Desc string `json:"desc,omitempty"` Params map[string]interface{} `json:"params,omitempty"` Extra map[string]string `json:"extra,omitempty"` // 可以存放一些ID,fsId,userId等 Source schema.WorkflowSource `json:"-"` // Yaml string // contains filtered or unexported fields }
func NewBaseWorkflow ¶
func NewBaseWorkflow(wfSource schema.WorkflowSource, runID string, params map[string]interface{}, extra map[string]string) BaseWorkflow
type CacheCalculator ¶
type CacheCalculator interface { // 计算第一层 fingerprint CalculateFirstFingerprint() (fingerprint string, err error) // 计算 第二层 fingerprint CalculateSecondFingerprint() (fingerprint string, err error) // contains filtered or unexported methods }
func NewCacheCalculator ¶
func NewCacheCalculator(job PaddleFlowJob, cacheConfig schema.Cache, logger *logrus.Entry, mainFs *schema.FsMount, extraFs []schema.FsMount) (CacheCalculator, error)
调用方应该保证在启用了 cache 功能的情况下才会调用NewCacheCalculator
func NewConservativeCacheCalculator ¶
func NewConservativeCacheCalculator(job PaddleFlowJob, cacheConfig schema.Cache, logger *logrus.Entry, mainFs *schema.FsMount, extraFs []schema.FsMount) (CacheCalculator, error)
调用方应该保证在启用了 cache 功能的情况下才会调用NewConservativeCacheCalculator
type ConditionCalculator ¶ added in v0.14.3
type ConditionCalculator struct {
// contains filtered or unexported fields
}
func NewConditionCalculator ¶ added in v0.14.3
func NewConditionCalculator(condition string) *ConditionCalculator
type CtxAndCancel ¶ added in v0.14.3
type CtxAndCancel struct {
// contains filtered or unexported fields
}
type DagRuntime ¶ added in v0.14.3
type DagRuntime struct { // 用于解析依赖参数模板 *DependencySolver // dagruntime 的全局唯一标识符 ID string // contains filtered or unexported fields }
func NewDagRuntime ¶ added in v0.14.3
func NewDagRuntime(name, fullName string, dag *schema.WorkflowSourceDag, seq int, ctx context.Context, failureOpitonsCtx context.Context, eventChannel chan<- WorkflowEvent, config *runConfig, parentDagID string) *DagRuntime
func (DagRuntime) CalculateCondition ¶ added in v0.14.3
func (*DagRuntime) CancellNotReadyComponent ¶ added in v0.14.3
func (drt *DagRuntime) CancellNotReadyComponent(subComponent schema.Component, reason string)
func (*DagRuntime) CreateSubRuntimeAccordingView ¶ added in v0.14.3
func (drt *DagRuntime) CreateSubRuntimeAccordingView(view schema.ComponentView, name string) componentRuntime
func (*DagRuntime) GetSubComponentArtifactPaths ¶ added in v0.14.3
func (drt *DagRuntime) GetSubComponentArtifactPaths(componentName string, artName string) (string, error)
func (*DagRuntime) GetSubComponentParameterValue ¶ added in v0.14.3
func (drt *DagRuntime) GetSubComponentParameterValue(componentName string, paramName string) (interface{}, error)
func (*DagRuntime) ProcessFailureOptions ¶ added in v0.14.3
func (drt *DagRuntime) ProcessFailureOptions(event WorkflowEvent)
func (*DagRuntime) ProcessFailureOptionsWithContinue ¶ added in v0.14.3
func (drt *DagRuntime) ProcessFailureOptionsWithContinue(component schema.Component)
func (*DagRuntime) ProcessFailureOptionsWithFailFast ¶ added in v0.14.3
func (drt *DagRuntime) ProcessFailureOptionsWithFailFast()
func (*DagRuntime) Restart ¶ added in v0.14.3
func (drt *DagRuntime) Restart(dagView *schema.DagView)
重新执行 TODO
func (*DagRuntime) Resume ¶ added in v0.14.3
func (drt *DagRuntime) Resume(dagView *schema.DagView)
func (*DagRuntime) Start ¶ added in v0.14.3
func (drt *DagRuntime) Start()
开始执行 runtime 不返回error,直接通过 event 向上冒泡
func (*DagRuntime) Stop ¶ added in v0.14.3
func (drt *DagRuntime) Stop()
type DependencySolver ¶ added in v0.14.3
type DependencySolver struct {
*DagRuntime
}
用于解析可能存在依赖关系的模版 如 parameter,artifact 字段,因为其有可能会引用上游节点,以及父节点(子节点) 的相关信息
func NewDependencySolver ¶ added in v0.14.3
func NewDependencySolver(dr *DagRuntime) *DependencySolver
func (DependencySolver) CalculateCondition ¶ added in v0.14.3
func (*DependencySolver) ResolveAfterDone ¶ added in v0.14.3
func (ds *DependencySolver) ResolveAfterDone() error
ResolveAfterDone: 当dag运行成功时调用,解析输出artifact 模版
func (*DependencySolver) ResolveBeforeRun ¶ added in v0.14.3
func (ds *DependencySolver) ResolveBeforeRun(subComponent schema.Component) error
type Job ¶
type Job interface { Job() BaseJob Update(cmd string, params map[string]string, envs map[string]string, artifacts *schema.Artifacts) Validate() error Start() (string, error) Stop() error Check() (schema.JobStatus, error) Watch() Started() bool Succeeded() bool Failed() bool Terminated() bool Skipped() bool NotEnded() bool JobID() string }
type LocalJob ¶
---------------------------------------------------------------------------- Local Process Job ----------------------------------------------------------------------------
type PaddleFlowJob ¶
----------------------------------------------------------------------------
K8S Job
----------------------------------------------------------------------------
func NewPaddleFlowJob ¶
func NewPaddleFlowJob(name, image string, eventChannel chan<- WorkflowEvent, mainFS *schema.FsMount, extraFS []schema.FsMount) *PaddleFlowJob
func NewPaddleFlowJobWithJobView ¶ added in v0.14.3
func NewPaddleFlowJobWithJobView(view *schema.JobView, image string, eventChannel chan<- WorkflowEvent, mainFS *schema.FsMount, extraFS []schema.FsMount) *PaddleFlowJob
func (*PaddleFlowJob) Cancelled ¶
func (pfj *PaddleFlowJob) Cancelled() bool
func (*PaddleFlowJob) Failed ¶
func (pfj *PaddleFlowJob) Failed() bool
func (*PaddleFlowJob) Job ¶
func (pfj *PaddleFlowJob) Job() BaseJob
func (*PaddleFlowJob) JobID ¶ added in v0.14.3
func (pfj *PaddleFlowJob) JobID() string
func (*PaddleFlowJob) NotEnded ¶
func (pfj *PaddleFlowJob) NotEnded() bool
func (*PaddleFlowJob) Skipped ¶
func (pfj *PaddleFlowJob) Skipped() bool
func (*PaddleFlowJob) Started ¶
func (pfj *PaddleFlowJob) Started() bool
func (*PaddleFlowJob) Succeeded ¶
func (pfj *PaddleFlowJob) Succeeded() bool
func (*PaddleFlowJob) Terminated ¶
func (pfj *PaddleFlowJob) Terminated() bool
type PathToModTime ¶ added in v0.14.3
type RuntimeStatus ¶ added in v0.14.3
var ( StatusRuntimeInit RuntimeStatus = schema.StatusJobInit StatusRuntimePending RuntimeStatus = schema.StatusJobPending StatusRuntimeRunning RuntimeStatus = schema.StatusJobRunning StatusRuntimeFailed RuntimeStatus = schema.StatusJobFailed StatusRuntimeSucceeded RuntimeStatus = schema.StatusJobSucceeded StatusRuntimeTerminating RuntimeStatus = schema.StatusJobTerminating StatusRuntimeTerminated RuntimeStatus = schema.StatusJobTerminated StatusRuntimeCancelled RuntimeStatus = schema.StatusJobCancelled StatusRuntimeSkipped RuntimeStatus = schema.StatusJobSkipped )
type StepRuntime ¶ added in v0.14.3
type StepRuntime struct { CacheRunID string CacheJobID string // contains filtered or unexported fields }
func NewStepRuntime ¶ added in v0.14.3
func NewStepRuntime(name, fullName string, step *schema.WorkflowSourceStep, seq int, ctx context.Context, failureOpitonsCtx context.Context, eventChannel chan<- WorkflowEvent, config *runConfig, ParentDagID string) *StepRuntime
func (StepRuntime) CalculateCondition ¶ added in v0.14.3
func (*StepRuntime) Listen ¶ added in v0.14.3
func (srt *StepRuntime) Listen()
func (*StepRuntime) Restart ¶ added in v0.14.3
func (srt *StepRuntime) Restart(view *schema.JobView)
Restart: 根据 jobView 来重启step 如果 jobView 中的状态为 Succeeded, 则直接返回,无需重启 如果 jobView 中的状态为 Running, 则进入监听即可 否则 创建一个新的job并开始调度执行
func (*StepRuntime) Resume ¶ added in v0.14.3
func (srt *StepRuntime) Resume(view *schema.JobView)
func (*StepRuntime) Start ¶ added in v0.14.3
func (srt *StepRuntime) Start()
func (*StepRuntime) Stop ¶ added in v0.14.3
func (srt *StepRuntime) Stop()
type WfEventType ¶
type WfEventType string
type WfEventValue ¶
type WfEventValue string
type Workflow ¶
type Workflow struct { BaseWorkflow // contains filtered or unexported fields }
func NewWorkflow ¶
func NewWorkflow(wfSource schema.WorkflowSource, runID string, params map[string]interface{}, extra map[string]string, callbacks WorkflowCallbacks) (*Workflow, error)
实例化一个Workflow,并返回
func (*Workflow) NewWorkflowRuntime ¶ added in v0.14.3
初始化 workflow runtime
func (*Workflow) Restart ¶
func (wf *Workflow) Restart(entryPointView *schema.DagView, postProcessView schema.PostProcessView)
Restart 从 DB 中恢复重启 workflow Restart 调用逻辑:1. NewWorkflow 2. SetWorkflowRuntime 3. Restart
type WorkflowCallbacks ¶
type WorkflowCallbacks struct { GetJobCb func(jobID string) (schema.JobView, error) UpdateRuntimeCb func(id string, event interface{}) (int64, bool) LogCacheCb func(req schema.LogRunCacheRequest) (string, error) ListCacheCb func(firstFp, fsID, source string) ([]models.RunCache, error) LogArtifactCb func(req schema.LogRunArtifactRequest) error }
type WorkflowEvent ¶
type WorkflowEvent struct { Type WfEventType Event WfEventValue Message string Extra map[string]interface{} }
func NewWorkflowEvent ¶
func NewWorkflowEvent(e WfEventValue, msg string, extra map[string]interface{}) *WorkflowEvent
实例化
type WorkflowRuntime ¶
type WorkflowRuntime struct { EventChan chan WorkflowEvent // contains filtered or unexported fields }
工作流运行时
func NewWorkflowRuntime ¶
func NewWorkflowRuntime(rc *runConfig) *WorkflowRuntime
func (*WorkflowRuntime) IsCompleted ¶
func (wfr *WorkflowRuntime) IsCompleted() bool
func (*WorkflowRuntime) Listen ¶
func (wfr *WorkflowRuntime) Listen()
func (*WorkflowRuntime) Restart ¶
func (wfr *WorkflowRuntime) Restart(entryPointView *schema.DagView, postProcessView schema.PostProcessView)
Restart: 重新运行
func (*WorkflowRuntime) Resume ¶ added in v0.14.3
func (wfr *WorkflowRuntime) Resume(entryPointView *schema.DagView, postProcessView schema.PostProcessView, runStatus string, stopForce bool)
func (*WorkflowRuntime) Status ¶
func (wfr *WorkflowRuntime) Status() string
func (*WorkflowRuntime) Stop ¶
func (wfr *WorkflowRuntime) Stop(force bool) error
Stop 停止 Workflow do not call ctx_cancel(), which will be called when all steps has terminated eventually. 这里不通过 cancel channel 去取消 Step 的原因是防止有多个地方向通过一个 channel 传递东西,防止runtime hang 住