Documentation ¶
Index ¶
- Constants
- func MakeTaskExecutorCtxKey(task *PipelineTask) string
- type ActionExecutorConfig
- type ExecutorDoneChanData
- type Field
- type FlinkSparkConf
- type Pipeline
- func (p *Pipeline) CanArchive() (bool, string)
- func (p *Pipeline) CanDelete() (bool, string)
- func (p *Pipeline) CanSkipRunningCheck() bool
- func (p *Pipeline) DecodeV1UniquePipelineYmlName(name string) string
- func (p *Pipeline) EnsureGC()
- func (p *Pipeline) GenIdentityInfo() *commonpb.IdentityInfo
- func (p *Pipeline) GenerateNormalLabelsForCreateV2() map[string]string
- func (p *Pipeline) GenerateV1UniquePipelineYmlName(originPipelineYmlPath string) string
- func (p *Pipeline) GetCancelUserID() string
- func (p *Pipeline) GetConfigManageNamespaces() []string
- func (p *Pipeline) GetLabel(labelKey string) string
- func (p *Pipeline) GetOwnerOrRunUserID() string
- func (p *Pipeline) GetOwnerUserID() string
- func (p *Pipeline) GetPipelineAppliedResources() apistructs.PipelineAppliedResources
- func (p *Pipeline) GetPipelineQueueID() (uint64, bool)
- func (p *Pipeline) GetResourceGCTTL() uint64
- func (p *Pipeline) GetRunUserID() string
- func (p *Pipeline) GetSubmitUserID() string
- func (p *Pipeline) GetUserID() string
- func (p *Pipeline) MergeLabels() map[string]string
- type PipelineArchive
- type PipelineArchiveContent
- type PipelineBase
- type PipelineBaseWithDefinition
- type PipelineCombosReq
- type PipelineConfig
- type PipelineConfigType
- type PipelineExtra
- type PipelineExtraInfo
- type PipelineLabel
- type PipelineOld
- type PipelineReport
- type PipelineStage
- type PipelineStageExtra
- type PipelineStageWithTask
- type PipelineTask
- func (pt *PipelineTask) CheckExecutorDoneChanDataVersion(actualVersion string) error
- func (pt *PipelineTask) Convert2DTO() *apistructs.PipelineTaskDTO
- func (pt *PipelineTask) Convert2PB() *basepb.PipelineTaskDTO
- func (pt *PipelineTask) ConvertTaskContainer2PB() []*basepb.TaskContainer
- func (pt *PipelineTask) GenerateExecutorDoneChanDataVersion() string
- func (pt *PipelineTask) GetBigDataConf() (apistructs.BigdataSpec, error)
- func (pt *PipelineTask) GetExecutorName() PipelineTaskExecutorName
- func (pt *PipelineTask) MergeErrors() taskerror.OrderedErrors
- func (pt *PipelineTask) MergeErrors2PB() []*basepb.ErrorResponse
- func (pt *PipelineTask) MergeMetadata() metadata.Metadata
- func (pt *PipelineTask) MergePBMetadata() []*commonpb.MetadataField
- func (pt *PipelineTask) MergeTaskParamDetailToDisplay(action apistructs.ActionSpec, ymlTask PipelineTask, snapshot Snapshot) (params []*basepb.TaskParamDetail)
- func (pt *PipelineTask) NodeName() string
- func (pt *PipelineTask) PrevNodeNames() []string
- func (pt *PipelineTask) ReleaseID() string
- func (pt *PipelineTask) RuntimeID() string
- func (*PipelineTask) TableName() string
- type PipelineTaskContext
- type PipelineTaskExecutorKind
- func (that PipelineTaskExecutorKind) Check() bool
- func (that PipelineTaskExecutorKind) GenExecutorNameByClusterName(clusterName string) PipelineTaskExecutorName
- func (that PipelineTaskExecutorKind) GetDefaultExecutorName() PipelineTaskExecutorName
- func (that PipelineTaskExecutorKind) IsK8sKind() bool
- func (that PipelineTaskExecutorKind) String() string
- type PipelineTaskExecutorName
- type PipelineTaskExtra
- type PipelineTaskOpType
- type PipelineWithStage
- type PipelineWithStageAndTask
- type PipelineWithTasks
- type PreStageSimple
- type QueueInfo
- type RerunFailedDetail
- type RuntimeResource
- type Snapshot
- type StoreType
- type TaskExecutorConfig
- type Volume
Constants ¶
const ( CtxExecutorChKeyPrefix = "executor-done-chan" CtxExecutorChDataVersionPrefix = "executor-done-chan-data-version" EncryptedValueDisplay = "********" )
const (
StoreTypeNFSProto = "file://"
)
Variables ¶
This section is empty.
Functions ¶
func MakeTaskExecutorCtxKey ¶
func MakeTaskExecutorCtxKey(task *PipelineTask) string
Types ¶
type ActionExecutorConfig ¶
type ActionExecutorConfig struct { Kind string `json:"kind,omitempty"` Name string `json:"name,omitempty"` Options map[string]string `json:"options,omitempty"` }
func (*ActionExecutorConfig) IsK8sKind ¶
func (a *ActionExecutorConfig) IsK8sKind() bool
type ExecutorDoneChanData ¶
type ExecutorDoneChanData struct { Data interface{} Version string }
type Field ¶
type Field string
const ( FieldID Field = "id" FieldPipelineSource Field = "pipeline_source" FieldPipelineYmlName Field = "pipeline_yml_name" FieldClusterName Field = "cluster_name" FieldStatus Field = "status" FieldType Field = "type" FieldTriggerMode Field = "trigger_mode" FieldCronID Field = "cron_id" FieldIsSnippet Field = "is_snippet" FieldParentPipelineID Field = "parent_pipeline_id" FieldParentTaskID Field = "parent_task_id" FieldCostTimeSec Field = "cost_time_sec" FieldTimeBegin Field = "time_begin" FieldTimeEnd Field = "time_end" FieldTimeCreated Field = "time_created" FieldTimeUpdated Field = "time_updated" FieldPipelineDefinitionID Field = "pipeline_definition_id" FieldIsEdge Field = "is_edge" FieldEdgeReportStatus Field = "edge_report_status" )
type FlinkSparkConf ¶
type FlinkSparkConf struct { // 该部分在 action 的 source 里声明 Depend string `json:"depends,omitempty"` MainClass string `json:"main_class,omitempty"` MainArgs []string `json:"main_args,omitempty"` // flink/spark action 运行需要一个 jar resource(flink 为 jarID,spark 为 jarURL) // 该部分在运行期动态赋值 JarResource string `json:"jarResource,omitempty"` }
type Pipeline ¶
type Pipeline struct { PipelineBase PipelineExtra Definition *definitiondb.PipelineDefinition Source *sourcedb.PipelineSource Labels map[string]string }
Pipeline
func (*Pipeline) CanArchive ¶
func (*Pipeline) CanSkipRunningCheck ¶
CanSkipRunningCheck if pipeline bind queue and EnqueueCondition is skip running, pipeline can skip limit running
func (*Pipeline) DecodeV1UniquePipelineYmlName ¶
DecodeV1UniquePipelineYmlName 根据 GenerateV1UniquePipelineYmlName 生成规则,反解析得到 originName
func (*Pipeline) GenIdentityInfo ¶
func (p *Pipeline) GenIdentityInfo() *commonpb.IdentityInfo
func (*Pipeline) GenerateNormalLabelsForCreateV2 ¶
GenerateNormalLabelsForCreateV2 pipeline.createV2 有一些字段通过标签来传递,例如 commit
func (*Pipeline) GenerateV1UniquePipelineYmlName ¶
GenerateV1UniquePipelineYmlName 为 v1 pipeline 返回 pipelineYmlName,该 name 在 source 下唯一 生成规则: AppID/DiceWorkspace/Branch/PipelineYmlPath 1) 100/PROD/master/ec/dws/itm/workflow/item_1d_df_process.workflow 2) 200/DEV/feature/dice/pipeline.yml
func (*Pipeline) GetCancelUserID ¶
func (*Pipeline) GetConfigManageNamespaces ¶
func (*Pipeline) GetOwnerOrRunUserID ¶
GetOwnerOrRunUserID get userID to execute pipeline permissions for the task belong to the executor userID if it's triggered by manual, run user is used first to get owner userID,if not exists, to get run userID
func (*Pipeline) GetOwnerUserID ¶
func (*Pipeline) GetPipelineAppliedResources ¶
func (p *Pipeline) GetPipelineAppliedResources() apistructs.PipelineAppliedResources
GetPipelineAppliedResources return limited and min resource when pipeline run.
func (*Pipeline) GetPipelineQueueID ¶
GetPipelineQueueID return pipeline queue id if exist, or 0.
func (*Pipeline) GetResourceGCTTL ¶
func (*Pipeline) GetRunUserID ¶
func (*Pipeline) GetSubmitUserID ¶
func (*Pipeline) GetUserID ¶
GetUserID first to get owner userID,if not exists, to get run userID if neither exists, submit userID is used Not included internal UserID
func (*Pipeline) MergeLabels ¶
type PipelineArchive ¶
type PipelineArchive struct { ID uint64 `json:"id" xorm:"pk autoincr"` TimeCreated time.Time `json:"timeCreated" xorm:"created"` TimeUpdated time.Time `json:"timeUpdated" xorm:"updated"` PipelineID uint64 `json:"pipelineID"` PipelineSource apistructs.PipelineSource `json:"pipelineSource"` PipelineYmlName string `json:"pipelineYmlName"` Status apistructs.PipelineStatus `json:"status"` // DiceVersion record the dice version when archived, // it will impact `content` field unmarshal method DiceVersion string `json:"diceVersion"` Content PipelineArchiveContent `json:"content" xorm:"json"` }
PipelineArchive pipeline 归档表
func (*PipelineArchive) TableName ¶
func (*PipelineArchive) TableName() string
type PipelineArchiveContent ¶
type PipelineArchiveContent struct { Pipeline Pipeline `json:"pipeline"` PipelineLabels []PipelineLabel `json:"pipelineLabels"` PipelineStages []PipelineStage `json:"pipelineStages"` PipelineTasks []PipelineTask `json:"pipelineTasks"` PipelineReports []PipelineReport `json:"pipelineReports"` }
PipelineArchiveContent contains: - pipelines - pipeline_labels - pipeline_stages - pipeline_tasks
type PipelineBase ¶
type PipelineBase struct { ID uint64 `json:"id" xorm:"pk autoincr"` PipelineSource apistructs.PipelineSource `json:"pipelineSource"` PipelineYmlName string `json:"pipelineYmlName"` ClusterName string `json:"clusterName,omitempty"` Status apistructs.PipelineStatus `json:"status,omitempty"` Type apistructs.PipelineType `json:"type,omitempty"` TriggerMode apistructs.PipelineTriggerMode `json:"triggerMode,omitempty"` // 定时相关信息 // +optional CronID *uint64 `json:"cronID,omitempty"` // Snippet IsSnippet bool `json:"isSnippet"` ParentPipelineID *uint64 `json:"parentPipelineID,omitempty"` ParentTaskID *uint64 `json:"parentTaskID,omitempty"` // CostTimeSec 总耗时(秒) CostTimeSec int64 `json:"costTimeSec,omitempty"` // pipeline 总耗时/秒 // TimeBegin 执行开始时间 TimeBegin *time.Time `json:"timeBegin,omitempty"` // 执行开始时间 // TimeEnd 执行结束时间 TimeEnd *time.Time `json:"timeEnd,omitempty"` // 执行结束时间 TimeCreated *time.Time `json:"timeCreated,omitempty" xorm:"created"` TimeUpdated *time.Time `json:"timeUpdated,omitempty" xorm:"updated"` PipelineDefinitionID string `json:"pipelineDefinitionID"` IsEdge bool `json:"isEdge"` EdgeReportStatus apistructs.EdgeReportStatus `json:"edgeReportStatus"` }
PipelineBase represents `pipeline_bases` table.
func (*PipelineBase) TableName ¶
func (*PipelineBase) TableName() string
type PipelineBaseWithDefinition ¶
type PipelineBaseWithDefinition struct { PipelineBase `xorm:"extends"` definitiondb.PipelineDefinition `xorm:"extends"` sourcedb.PipelineSource `xorm:"extends"` }
func (*PipelineBaseWithDefinition) TableName ¶
func (*PipelineBaseWithDefinition) TableName() string
type PipelineCombosReq ¶
type PipelineConfig ¶
type PipelineConfig struct { ID uint64 `json:"id" xorm:"pk autoincr"` Type PipelineConfigType `json:"type"` Value interface{} `json:"value" xorm:"json"` }
func (PipelineConfig) TableName ¶
func (PipelineConfig) TableName() string
type PipelineConfigType ¶
type PipelineConfigType string
var (
PipelineConfigTypeActionExecutor PipelineConfigType = "action_executor"
)
type PipelineExtra ¶
type PipelineExtra struct { PipelineID uint64 `json:"pipelineID,omitempty" xorm:"pk 'pipeline_id'"` // PipelineYml 流水线定义文件 PipelineYml string `json:"pipelineYml"` // Extra 额外信息 Extra PipelineExtraInfo `json:"extra" xorm:"json"` // NormalLabels 普通标签,仅展示,不可过滤 NormalLabels map[string]string `json:"normalLabels" xorm:"json"` // Snapshot 运行时的快照 Snapshot Snapshot `json:"snapshot" xorm:"json"` // CommitDetail 提交详情 CommitDetail apistructs.CommitDetail `json:"commitDetail" xorm:"json"` // Progress 流水线整体执行进度,0-100 // -1 表示未设置 // progress 只存最终结果,若 >= 0,直接返回,无需再计算 Progress int `json:"progress"` ExtraTimeCreated *time.Time `json:"timeCreated,omitempty" xorm:"created 'time_created'"` ExtraTimeUpdated *time.Time `json:"timeUpdated,omitempty" xorm:"updated 'time_updated'"` }
PipelineExtra represents `pipeline_extras` table. `pipeline_extras` 与 `pipeline_bases` 一一对应
func (*PipelineExtra) GetCommitID ¶
func (extra *PipelineExtra) GetCommitID() string
func (*PipelineExtra) GetOrgName ¶
func (extra *PipelineExtra) GetOrgName() string
func (*PipelineExtra) TableName ¶
func (*PipelineExtra) TableName() string
type PipelineExtraInfo ¶
type PipelineExtraInfo struct { Namespace string `json:"namespace"` NotPipelineControlledNs bool `json:"notPipelineControlledNs"` DiceWorkspace apistructs.DiceWorkspace `json:"diceWorkspace,omitempty"` PipelineYmlSource apistructs.PipelineYmlSource `json:"pipelineYmlSource,omitempty"` SubmitUser *basepb.PipelineUser `json:"submitUser,omitempty"` RunUser *basepb.PipelineUser `json:"runUser,omitempty"` CancelUser *basepb.PipelineUser `json:"cancelUser,omitempty"` OwnerUser *basepb.PipelineUser `json:"ownerUser,omitempty"` InternalClient string `json:"internalClient,omitempty"` CronExpr string `json:"cronExpr,omitempty"` CronTriggerTime *time.Time `json:"cronTriggerTime,omitempty"` // 秒级精确,毫秒级误差请忽略,cron expr 精确度同样为秒级 ShowMessage *basepb.ShowMessage `json:"showMessage,omitempty"` Messages []string `json:"errors,omitempty"` // TODO ShowMessage 和 Message // Deprecated ConfigManageNamespaceOfSecretsDefault string `json:"configManageNamespaceOfSecretsDefault,omitempty"` // Deprecated ConfigManageNamespaceOfSecrets string `json:"configManageNamespaceOfSecrets,omitempty"` ConfigManageNamespaces []string `json:"configManageNamespaces,omitempty"` IncomingSecrets map[string]string `json:"incomingSecrets,omitempty"` CopyFromPipelineID *uint64 `json:"copyFromPipelineID,omitempty"` // 是否是从其他节点拷贝过来 RerunFailedDetail *RerunFailedDetail `json:"rerunFailedDetail,omitempty"` IsAutoRun bool `json:"isAutoRun,omitempty"` // 创建后是否自动开始执行 TaskWorkspaces []string `json:"taskWorkspaces,omitempty"` //工作目录,例如git StorageConfig apistructs.StorageConfig `json:"storageConfig,omitempty"` // 挂载设置 CallbackURLs []string `json:"callbackURLs,omitempty"` Version string `json:"version,omitempty"` // 1.1, 1.0 // 是否已经 完成 Reconciler GC CompleteReconcilerGC bool `json:"completeReconcilerGC"` // 是否已完成 Reconcile teardown CompleteReconcilerTeardown bool `json:"completeReconcilerTeardown"` // 用于保存自动转换前的 v1 pipelineYmlName(通过 V1 API 创建的流水线,通过该参数调用 gittar 获取内容) PipelineYmlNameV1 string `json:"pipelineYmlNameV1,omitempty"` // pipeline 运行时的输入参数 RunPipelineParams []apistructs.PipelineRunParam `json:"runPipelineParams,omitempty"` // GC GC basepb.PipelineGC `json:"gc,omitempty"` // OutputDefines DefinedOutputs []apistructs.PipelineOutput `json:"definedOutputs,omitempty"` SnippetChain []uint64 `json:"snippetChain,omitempty"` QueueInfo *QueueInfo `json:"queueInfo,omitempty"` TaskOperates []*pipelinepb.PipelineTaskOperateRequest `json:"taskTaskOperates,omitempty"` ContainerInstanceProvider *apistructs.ContainerInstanceProvider `json:"containerInstanceProvider,omitempty"` Breakpoint *basepb.Breakpoint `json:"breakpoint,omitempty"` }
type PipelineLabel ¶
type PipelineLabel struct { ID uint64 `json:"id" xorm:"pk autoincr"` Type apistructs.PipelineLabelType `json:"type,omitempty"` TargetID uint64 `json:"targetID"` PipelineSource apistructs.PipelineSource `json:"pipelineSource"` PipelineYmlName string `json:"pipelineYmlName"` Key string `json:"key"` Value string `json:"value"` TimeCreated time.Time `json:"timeCreated" xorm:"created"` TimeUpdated time.Time `json:"timeUpdated" xorm:"updated"` }
PipelineLabel 标签
func (PipelineLabel) TableName ¶
func (p PipelineLabel) TableName() string
type PipelineOld ¶
type PipelineOld struct { ID uint64 `json:"id,omitempty" xorm:"pk autoincr"` // 通过 source + pipelineYmlName 唯一定位 Source apistructs.PipelineSource `json:"source,omitempty"` // 通过 v1 创建的 pipeline,自动生成唯一的 pipelineYmlName // 通过 v2 创建的 pipeline,由调用方保证 PipelineYmlName string `json:"pipelineYmlName,omitempty"` PipelineYml string `json:"pipelineYml,omitempty"` // 调度集群 // +required ClusterName string `json:"clusterName,omitempty"` // 运行时相关信息 Type apistructs.PipelineType `json:"type,omitempty"` TriggerMode apistructs.PipelineTriggerMode `json:"triggerMode,omitempty"` Snapshot Snapshot `json:"snapshot,omitempty" xorm:"json"` // 快照 Progress float64 `json:"progress,omitempty" xorm:"-"` // pipeline 执行进度, eg: 0.8 表示 80% Status apistructs.PipelineStatus `json:"status,omitempty"` Extra PipelineExtraInfo `json:"extra,omitempty" xorm:"json"` // 时间 CostTimeSec int64 `json:"costTimeSec,omitempty"` // pipeline 总耗时/秒 TimeBegin *time.Time `json:"timeBegin,omitempty"` // 执行开始时间 TimeEnd *time.Time `json:"timeEnd,omitempty"` // 执行结束时间 TimeCreated *time.Time `json:"timeCreated,omitempty" xorm:"created"` // 记录创建时间 TimeUpdated *time.Time `json:"timeUpdated,omitempty" xorm:"updated"` // 记录更新时间 // 定时相关信息 // +optional CronID *uint64 `json:"cronID,omitempty"` // deprecated BasePipelineID uint64 `json:"basePipelineID,omitempty"` // 该字段用来分页展示时 group 分组,相同 BasePipelineID 的数据会被折叠成一条,通过执行记录来跳转 // 应用相关信息 // +optional OrgID uint64 `json:"orgID,omitempty"` OrgName string `json:"orgName,omitempty"` // tag schedule ProjectID uint64 `json:"projectID,omitempty"` ProjectName string `json:"projectName,omitempty"` // tag schedule ApplicationID uint64 `json:"applicationID,omitempty"` ApplicationName string `json:"applicationName,omitempty"` // 分支相关信息 // +optional PipelineYmlSource apistructs.PipelineYmlSource `json:"pipelineYmlSource,omitempty"` // yml 文件来源 Branch string `json:"branch,omitempty"` Commit string `json:"commit,omitempty"` CommitDetail apistructs.CommitDetail `json:"commitDetail,omitempty" xorm:"json"` }
PipelineOld
func (*PipelineOld) TableName ¶
func (*PipelineOld) TableName() string
type PipelineReport ¶
type PipelineReport struct { ID uint64 `xorm:"pk autoincr"` PipelineID uint64 Type apistructs.PipelineReportType Meta apistructs.PipelineReportMeta `xorm:"json"` CreatorID string UpdaterID string CreatedAt time.Time `xorm:"created"` UpdatedAt time.Time `xorm:"updated"` }
PipelineBase represents `dice_pipeline_reports` table.
func (*PipelineReport) ConvertToPB ¶
func (p *PipelineReport) ConvertToPB() (*pb.PipelineReport, error)
func (*PipelineReport) TableName ¶
func (*PipelineReport) TableName() string
type PipelineStage ¶
type PipelineStage struct { ID uint64 `json:"id" xorm:"pk autoincr"` PipelineID uint64 `json:"pipelineID"` Name string `json:"name"` Extra PipelineStageExtra `json:"extra" xorm:"json"` Status apistructs.PipelineStatus `json:"status"` CostTimeSec int64 `json:"costTimeSec"` TimeBegin time.Time `json:"timeBegin"` // 执行开始时间 TimeEnd time.Time `json:"timeEnd"` // 执行结束时间 TimeCreated time.Time `json:"timeCreated" xorm:"created"` // 记录创建时间 TimeUpdated time.Time `json:"timeUpdated" xorm:"updated"` // 记录更新时间 }
func (*PipelineStage) Convert2DTO ¶
func (ps *PipelineStage) Convert2DTO() *apistructs.PipelineStageDTO
func (*PipelineStage) TableName ¶
func (ps *PipelineStage) TableName() string
type PipelineStageExtra ¶
type PipelineStageExtra struct { PreStage *PreStageSimple `json:"preStage,omitempty"` StageOrder int `json:"stageOrder"` // 0,1,2,... }
type PipelineStageWithTask ¶
type PipelineStageWithTask struct { PipelineStage PipelineTasks []*PipelineTask }
type PipelineTask ¶
type PipelineTask struct { ID uint64 `json:"id" xorm:"pk autoincr"` PipelineID uint64 `json:"pipelineID"` StageID uint64 `json:"stageID"` Name string `json:"name"` OpType PipelineTaskOpType `json:"opType"` // Deprecated: get, put, task Type string `json:"type,omitempty"` // git, buildpack, release, dice ... 当 OpType 为自定义任务时为空 ExecutorKind PipelineTaskExecutorKind `json:"executorKind"` // scheduler, memory Status apistructs.PipelineStatus `json:"status"` Extra PipelineTaskExtra `json:"extra" xorm:"json"` Context PipelineTaskContext `json:"context" xorm:"json"` Result *taskresult.Result `json:"result" xorm:"json"` Inspect taskinspect.Inspect `json:"inspect" xorm:"json"` IsSnippet bool `json:"isSnippet"` // 该节点是否是嵌套流水线节点 SnippetPipelineID *uint64 `json:"snippetPipelineID"` // 嵌套的流水线 id SnippetPipelineDetail *apistructs.PipelineTaskSnippetDetail `json:"snippetPipelineDetail" xorm:"json"` // 嵌套的流水线详情 CostTimeSec int64 `json:"costTimeSec"` // -1 表示暂无耗时信息, 0 表示确实是0s结束 QueueTimeSec int64 `json:"queueTimeSec"` // 等待调度的耗时, -1 暂无耗时信息, 0 表示确实是0s结束 TODO 赋值 TimeBegin time.Time `json:"timeBegin"` // 执行开始时间 TimeEnd time.Time `json:"timeEnd"` // 执行结束时间 TimeCreated time.Time `json:"timeCreated" xorm:"created"` // 记录创建时间 TimeUpdated time.Time `json:"timeUpdated" xorm:"updated"` // 记录更新时间 IsEdge bool `json:"isEdge"` }
func (*PipelineTask) CheckExecutorDoneChanDataVersion ¶
func (pt *PipelineTask) CheckExecutorDoneChanDataVersion(actualVersion string) error
func (*PipelineTask) Convert2DTO ¶
func (pt *PipelineTask) Convert2DTO() *apistructs.PipelineTaskDTO
func (*PipelineTask) Convert2PB ¶
func (pt *PipelineTask) Convert2PB() *basepb.PipelineTaskDTO
func (*PipelineTask) ConvertTaskContainer2PB ¶
func (pt *PipelineTask) ConvertTaskContainer2PB() []*basepb.TaskContainer
func (*PipelineTask) GenerateExecutorDoneChanDataVersion ¶
func (pt *PipelineTask) GenerateExecutorDoneChanDataVersion() string
func (*PipelineTask) GetBigDataConf ¶
func (pt *PipelineTask) GetBigDataConf() (apistructs.BigdataSpec, error)
func (*PipelineTask) GetExecutorName ¶
func (pt *PipelineTask) GetExecutorName() PipelineTaskExecutorName
func (*PipelineTask) MergeErrors ¶
func (pt *PipelineTask) MergeErrors() taskerror.OrderedErrors
func (*PipelineTask) MergeErrors2PB ¶
func (pt *PipelineTask) MergeErrors2PB() []*basepb.ErrorResponse
func (*PipelineTask) MergeMetadata ¶
func (pt *PipelineTask) MergeMetadata() metadata.Metadata
func (*PipelineTask) MergePBMetadata ¶
func (pt *PipelineTask) MergePBMetadata() []*commonpb.MetadataField
func (*PipelineTask) MergeTaskParamDetailToDisplay ¶
func (pt *PipelineTask) MergeTaskParamDetailToDisplay(action apistructs.ActionSpec, ymlTask PipelineTask, snapshot Snapshot) (params []*basepb.TaskParamDetail)
func (*PipelineTask) NodeName ¶
func (pt *PipelineTask) NodeName() string
func (*PipelineTask) PrevNodeNames ¶
func (pt *PipelineTask) PrevNodeNames() []string
func (*PipelineTask) ReleaseID ¶
func (pt *PipelineTask) ReleaseID() string
func (*PipelineTask) RuntimeID ¶
func (pt *PipelineTask) RuntimeID() string
func (*PipelineTask) TableName ¶
func (*PipelineTask) TableName() string
type PipelineTaskContext ¶
type PipelineTaskContext struct { InStorages metadata.Metadata `json:"inStorages,omitempty"` OutStorages metadata.Metadata `json:"outStorages,omitempty"` CmsDiceFiles metadata.Metadata `json:"cmsDiceFiles,omitempty"` }
func (*PipelineTaskContext) Dedup ¶
func (c *PipelineTaskContext) Dedup()
type PipelineTaskExecutorKind ¶
type PipelineTaskExecutorKind string
var ( PipelineTaskExecutorKindScheduler PipelineTaskExecutorKind = "SCHEDULER" PipelineTaskExecutorKindMemory PipelineTaskExecutorKind = "MEMORY" PipelineTaskExecutorKindAPITest PipelineTaskExecutorKind = "APITEST" PipelineTaskExecutorKindWait PipelineTaskExecutorKind = "WAIT" PipelineTaskExecutorKindK8sJob PipelineTaskExecutorKind = "K8SJOB" PipelineTaskExecutorKindK8sFlink PipelineTaskExecutorKind = "K8SFLINK" PipelineTaskExecutorKindK8sSpark PipelineTaskExecutorKind = "K8SSPARK" PipelineTaskExecutorKindDocker PipelineTaskExecutorKind = "DOCKER" PipelineTaskExecutorKindList = []PipelineTaskExecutorKind{PipelineTaskExecutorKindScheduler, PipelineTaskExecutorKindMemory, PipelineTaskExecutorKindAPITest, PipelineTaskExecutorKindWait, PipelineTaskExecutorKindK8sJob} )
func (PipelineTaskExecutorKind) Check ¶
func (that PipelineTaskExecutorKind) Check() bool
func (PipelineTaskExecutorKind) GenExecutorNameByClusterName ¶
func (that PipelineTaskExecutorKind) GenExecutorNameByClusterName(clusterName string) PipelineTaskExecutorName
func (PipelineTaskExecutorKind) GetDefaultExecutorName ¶
func (that PipelineTaskExecutorKind) GetDefaultExecutorName() PipelineTaskExecutorName
func (PipelineTaskExecutorKind) IsK8sKind ¶
func (that PipelineTaskExecutorKind) IsK8sKind() bool
func (PipelineTaskExecutorKind) String ¶
func (that PipelineTaskExecutorKind) String() string
type PipelineTaskExecutorName ¶
type PipelineTaskExecutorName string
var ( PipelineTaskExecutorNameEmpty PipelineTaskExecutorName = "" PipelineTaskExecutorNameSchedulerDefault PipelineTaskExecutorName = "scheduler" PipelineTaskExecutorNameAPITestDefault PipelineTaskExecutorName = "api-test" PipelineTaskExecutorNameWaitDefault PipelineTaskExecutorName = "wait" PipelineTaskExecutorNameK8sJobDefault PipelineTaskExecutorName = "k8s-job" PipelineTaskExecutorNameK8sFlinkDefault PipelineTaskExecutorName = "k8s-flink" PipelineTaskExecutorNameK8sSparkDefault PipelineTaskExecutorName = "k8s-spark" PipelineTaskExecutorNameDockerDefault PipelineTaskExecutorName = "docker" PipelineTaskExecutorNameList = []PipelineTaskExecutorName{PipelineTaskExecutorNameEmpty, PipelineTaskExecutorNameSchedulerDefault, PipelineTaskExecutorNameAPITestDefault, PipelineTaskExecutorNameWaitDefault, PipelineTaskExecutorNameK8sJobDefault} )
func (PipelineTaskExecutorName) Check ¶
func (that PipelineTaskExecutorName) Check() bool
func (PipelineTaskExecutorName) String ¶
func (that PipelineTaskExecutorName) String() string
type PipelineTaskExtra ¶
type PipelineTaskExtra struct { Namespace string `json:"namespace,omitempty"` NotPipelineControlledNs bool `json:"notPipelineControlledNs,omitempty"` ExecutorName PipelineTaskExecutorName `json:"executorName,omitempty"` ClusterName string `json:"clusterName,omitempty"` AllowFailure bool `json:"allowFailure,omitempty"` Pause bool `json:"pause,omitempty"` Timeout time.Duration `json:"timeout,omitempty"` PrivateEnvs map[string]string `json:"envs,omitempty"` // PrivateEnvs 由 agent 注入 run 运行时,run 可见,容器内不可见 PublicEnvs map[string]string `json:"publicEnvs,omitempty"` // PublicEnvs 注入容器,run 可见,容器内亦可见 Labels map[string]string `json:"labels,omitempty"` Image string `json:"image,omitempty"` Cmd string `json:"cmd,omitempty"` CmdArgs []string `json:"cmdArgs,omitempty"` Binds []apistructs.Bind `json:"binds,omitempty"` TaskContainers []apistructs.TaskContainer `json:"taskContainers"` // Volumes 创建 task 时的 volumes 快照 // 若一开始 volume 无 volumeID,启动 task 后返回的 volumeID 不会在这里更新,只会更新到 task.Context.OutStorages 里 Volumes []metadata.MetadataField `json:"volumes,omitempty"` // PreFetcher *apistructs.PreFetcher `json:"preFetcher,omitempty"` RuntimeResource RuntimeResource `json:"runtimeResource,omitempty"` UUID string `json:"uuid"` // 用于查询日志等,pipeline 开始执行时才会赋值 // 对接多个 executor,不一定每个 executor 都能自定义 UUID,所以这个 uuid 实际上是目标系统的 uuid TimeBeginQueue time.Time `json:"timeBeginQueue"` TimeEndQueue time.Time `json:"timeEndQueue"` StageOrder int `json:"stageOrder"` // 0,1,2,... // RunAfter indicates the tasks this task depends. RunAfter []string `json:"runAfter"` FlinkSparkConf FlinkSparkConf `json:"flinkSparkConf,omitempty"` Action pipelineyml.Action `json:"action,omitempty"` OpenapiOAuth2TokenPayload apistructs.OAuth2TokenPayload `json:"openapiOAuth2TokenPayload"` LoopOptions *apistructs.PipelineTaskLoopOptions `json:"loopOptions,omitempty"` // 开始执行后保证不为空 AppliedResources apistructs.PipelineAppliedResources `json:"appliedResources,omitempty"` EncryptSecretKeys []string `json:"encryptSecretKeys"` // the encrypt envs' key list CurrentPolicy apistructs.Policy `json:"currentPolicy"` // task execution strategy ContainerInstanceProvider *apistructs.ContainerInstanceProvider `json:"containerInstanceProvider,omitempty"` Breakpoint *basepb.Breakpoint `json:"breakpoint,omitempty"` }
type PipelineTaskOpType ¶
type PipelineTaskOpType string
GenerateOperation
var ( PipelineTaskOpTypeGet PipelineTaskOpType = "get" PipelineTaskOpTypePut PipelineTaskOpType = "put" PipelineTaskOpTypeTask PipelineTaskOpType = "task" )
type PipelineWithStage ¶
type PipelineWithStage struct { Pipeline PipelineStages []*PipelineStageWithTask }
type PipelineWithStageAndTask ¶
type PipelineWithStageAndTask struct { Pipeline PipelineStages []PipelineStage PipelineTasks []PipelineTask }
type PipelineWithTasks ¶
type PipelineWithTasks struct { Pipeline *Pipeline Tasks []*PipelineTask }
func (*PipelineWithTasks) DoneTasks ¶
func (p *PipelineWithTasks) DoneTasks() []string
type PreStageSimple ¶
type PreStageSimple struct { ID uint64 `json:"id"` Status apistructs.PipelineStatus `json:"preStageStatus,omitempty"` }
type QueueInfo ¶
type QueueInfo struct { QueueID uint64 `json:"queueID"` CustomPriority int64 `json:"customPriority"` EnqueueCondition apistructs.EnqueueConditionType `json:"enqueueCondition"` // Pipeline priority changed history from initial to latest in queue PriorityChangeHistory []int64 `json:"priorityChangeHistory,omitempty"` }
type RerunFailedDetail ¶
type RerunFailedDetail struct { RerunPipelineID uint64 `json:"rerunPipelineID,omitempty"` StageIndex int `json:"stageIndex,omitempty"` SuccessTasks map[string]uint64 `json:"successTasks,omitempty"` FailedTasks map[string]uint64 `json:"failedTasks,omitempty"` NotExecuteTasks map[string]uint64 `json:"notExecuteTasks,omitempty"` }
type RuntimeResource ¶
type RuntimeResource struct { CPU float64 `json:"cpu"` Memory float64 `json:"memory"` Disk float64 `json:"disk"` MaxCPU float64 `json:"maxCPU"` MaxMemory float64 `json:"maxMemory"` Network apistructs.PodNetwork `json:"network"` }
func GenDefaultTaskResource ¶
func GenDefaultTaskResource() RuntimeResource
type Snapshot ¶
type Snapshot struct { PipelineYml string `json:"pipeline_yml,omitempty"` // 对占位符进行渲染 Secrets map[string]string `json:"secrets,omitempty"` PlatformSecrets map[string]string `json:"platformSecrets,omitempty"` CmsDiceFiles map[string]string `json:"cmsDiceFiles,omitempty"` Envs map[string]string `json:"envs,omitempty"` AnalyzedCrossCluster *bool `json:"analyzedCrossCluster,omitempty"` RunPipelineParams apistructs.PipelineRunParamsWithValue `json:"runPipelineParams,omitempty"` // 流水线运行时参数 // IdentityInfo 身份信息 IdentityInfo commonpb.IdentityInfo `json:"identityInfo" xorm:"json"` // OutputValues output 定义和从 task 里采集上来的值 OutputValues []apistructs.PipelineOutputWithValue `json:"outputValues,omitempty"` // AppliedResources calculated by all actions AppliedResources apistructs.PipelineAppliedResources `json:"appliedResources,omitempty"` // BindQueue stores the binding queue info if have. BindQueue *pb.Queue `json:"bindQueue,omitempty"` // Events stores pipeline level k8s-like events Events []*apistructs.PipelineEvent `json:"events,omitempty"` // EncryptSecretKeys the encrypt envs' key list EncryptSecretKeys []string `json:"encryptSecretKeys"` }