spec

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 15, 2024 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CtxExecutorChKeyPrefix         = "executor-done-chan"
	CtxExecutorChDataVersionPrefix = "executor-done-chan-data-version"
	EncryptedValueDisplay          = "********"
)
View Source
const (
	StoreTypeNFSProto = "file://"
)

Variables

This section is empty.

Functions

func MakeTaskExecutorCtxKey

func MakeTaskExecutorCtxKey(task *PipelineTask) string

Types

type ActionExecutorConfig

type ActionExecutorConfig struct {
	Kind    string            `json:"kind,omitempty"`
	Name    string            `json:"name,omitempty"`
	Options map[string]string `json:"options,omitempty"`
}

func (*ActionExecutorConfig) IsK8sKind

func (a *ActionExecutorConfig) IsK8sKind() bool

type ExecutorDoneChanData

type ExecutorDoneChanData struct {
	Data    interface{}
	Version string
}

type Field

type Field string
const (
	FieldID                   Field = "id"
	FieldPipelineSource       Field = "pipeline_source"
	FieldPipelineYmlName      Field = "pipeline_yml_name"
	FieldClusterName          Field = "cluster_name"
	FieldStatus               Field = "status"
	FieldType                 Field = "type"
	FieldTriggerMode          Field = "trigger_mode"
	FieldCronID               Field = "cron_id"
	FieldIsSnippet            Field = "is_snippet"
	FieldParentPipelineID     Field = "parent_pipeline_id"
	FieldParentTaskID         Field = "parent_task_id"
	FieldCostTimeSec          Field = "cost_time_sec"
	FieldTimeBegin            Field = "time_begin"
	FieldTimeEnd              Field = "time_end"
	FieldTimeCreated          Field = "time_created"
	FieldTimeUpdated          Field = "time_updated"
	FieldPipelineDefinitionID Field = "pipeline_definition_id"
	FieldIsEdge               Field = "is_edge"
	FieldEdgeReportStatus     Field = "edge_report_status"
)

type FlinkSparkConf

type FlinkSparkConf struct {
	// 该部分在 action 的 source 里声明
	Depend    string   `json:"depends,omitempty"`
	MainClass string   `json:"main_class,omitempty"`
	MainArgs  []string `json:"main_args,omitempty"`

	// flink/spark action 运行需要一个 jar resource(flink 为 jarID,spark 为 jarURL)
	// 该部分在运行期动态赋值
	JarResource string `json:"jarResource,omitempty"`
}

type Pipeline

type Pipeline struct {
	PipelineBase
	PipelineExtra
	Definition *definitiondb.PipelineDefinition
	Source     *sourcedb.PipelineSource
	Labels     map[string]string
}

Pipeline

func (*Pipeline) CanArchive

func (p *Pipeline) CanArchive() (bool, string)

func (*Pipeline) CanDelete

func (p *Pipeline) CanDelete() (bool, string)

func (*Pipeline) CanSkipRunningCheck

func (p *Pipeline) CanSkipRunningCheck() bool

CanSkipRunningCheck if pipeline bind queue and EnqueueCondition is skip running, pipeline can skip limit running

func (*Pipeline) DecodeV1UniquePipelineYmlName

func (p *Pipeline) DecodeV1UniquePipelineYmlName(name string) string

DecodeV1UniquePipelineYmlName 根据 GenerateV1UniquePipelineYmlName 生成规则,反解析得到 originName

func (*Pipeline) EnsureGC

func (p *Pipeline) EnsureGC()

EnsureGC without nil field

func (*Pipeline) GenIdentityInfo

func (p *Pipeline) GenIdentityInfo() *commonpb.IdentityInfo

func (*Pipeline) GenerateNormalLabelsForCreateV2

func (p *Pipeline) GenerateNormalLabelsForCreateV2() map[string]string

GenerateNormalLabelsForCreateV2 pipeline.createV2 有一些字段通过标签来传递,例如 commit

func (*Pipeline) GenerateV1UniquePipelineYmlName

func (p *Pipeline) GenerateV1UniquePipelineYmlName(originPipelineYmlPath string) string

GenerateV1UniquePipelineYmlName 为 v1 pipeline 返回 pipelineYmlName,该 name 在 source 下唯一 生成规则: AppID/DiceWorkspace/Branch/PipelineYmlPath 1) 100/PROD/master/ec/dws/itm/workflow/item_1d_df_process.workflow 2) 200/DEV/feature/dice/pipeline.yml

func (*Pipeline) GetCancelUserID

func (p *Pipeline) GetCancelUserID() string

func (*Pipeline) GetConfigManageNamespaces

func (p *Pipeline) GetConfigManageNamespaces() []string

func (*Pipeline) GetLabel

func (p *Pipeline) GetLabel(labelKey string) string

func (*Pipeline) GetOwnerOrRunUserID

func (p *Pipeline) GetOwnerOrRunUserID() string

GetOwnerOrRunUserID get userID to execute pipeline permissions for the task belong to the executor userID if it's triggered by manual, run user is used first to get owner userID,if not exists, to get run userID

func (*Pipeline) GetOwnerUserID

func (p *Pipeline) GetOwnerUserID() string

func (*Pipeline) GetPipelineAppliedResources

func (p *Pipeline) GetPipelineAppliedResources() apistructs.PipelineAppliedResources

GetPipelineAppliedResources return limited and min resource when pipeline run.

func (*Pipeline) GetPipelineQueueID

func (p *Pipeline) GetPipelineQueueID() (uint64, bool)

GetPipelineQueueID return pipeline queue id if exist, or 0.

func (*Pipeline) GetResourceGCTTL

func (p *Pipeline) GetResourceGCTTL() uint64

func (*Pipeline) GetRunUserID

func (p *Pipeline) GetRunUserID() string

func (*Pipeline) GetSubmitUserID

func (p *Pipeline) GetSubmitUserID() string

func (*Pipeline) GetUserID

func (p *Pipeline) GetUserID() string

GetUserID first to get owner userID,if not exists, to get run userID if neither exists, submit userID is used Not included internal UserID

func (*Pipeline) MergeLabels

func (p *Pipeline) MergeLabels() map[string]string

type PipelineArchive

type PipelineArchive struct {
	ID          uint64    `json:"id" xorm:"pk autoincr"`
	TimeCreated time.Time `json:"timeCreated" xorm:"created"`
	TimeUpdated time.Time `json:"timeUpdated" xorm:"updated"`

	PipelineID      uint64                    `json:"pipelineID"`
	PipelineSource  apistructs.PipelineSource `json:"pipelineSource"`
	PipelineYmlName string                    `json:"pipelineYmlName"`
	Status          apistructs.PipelineStatus `json:"status"`

	// DiceVersion record the dice version when archived,
	// it will impact `content` field unmarshal method
	DiceVersion string                 `json:"diceVersion"`
	Content     PipelineArchiveContent `json:"content" xorm:"json"`
}

PipelineArchive pipeline 归档表

func (*PipelineArchive) TableName

func (*PipelineArchive) TableName() string

type PipelineArchiveContent

type PipelineArchiveContent struct {
	Pipeline        Pipeline         `json:"pipeline"`
	PipelineLabels  []PipelineLabel  `json:"pipelineLabels"`
	PipelineStages  []PipelineStage  `json:"pipelineStages"`
	PipelineTasks   []PipelineTask   `json:"pipelineTasks"`
	PipelineReports []PipelineReport `json:"pipelineReports"`
}

PipelineArchiveContent contains: - pipelines - pipeline_labels - pipeline_stages - pipeline_tasks

type PipelineBase

type PipelineBase struct {
	ID uint64 `json:"id" xorm:"pk autoincr"`

	PipelineSource  apistructs.PipelineSource `json:"pipelineSource"`
	PipelineYmlName string                    `json:"pipelineYmlName"`

	ClusterName string `json:"clusterName,omitempty"`

	Status apistructs.PipelineStatus `json:"status,omitempty"`

	Type        apistructs.PipelineType        `json:"type,omitempty"`
	TriggerMode apistructs.PipelineTriggerMode `json:"triggerMode,omitempty"`

	// 定时相关信息
	// +optional
	CronID *uint64 `json:"cronID,omitempty"`

	// Snippet
	IsSnippet        bool    `json:"isSnippet"`
	ParentPipelineID *uint64 `json:"parentPipelineID,omitempty"`
	ParentTaskID     *uint64 `json:"parentTaskID,omitempty"`

	// CostTimeSec 总耗时(秒)
	CostTimeSec int64 `json:"costTimeSec,omitempty"` // pipeline 总耗时/秒
	// TimeBegin 执行开始时间
	TimeBegin *time.Time `json:"timeBegin,omitempty"` // 执行开始时间
	// TimeEnd 执行结束时间
	TimeEnd *time.Time `json:"timeEnd,omitempty"` // 执行结束时间

	TimeCreated *time.Time `json:"timeCreated,omitempty" xorm:"created"`
	TimeUpdated *time.Time `json:"timeUpdated,omitempty" xorm:"updated"`

	PipelineDefinitionID string                      `json:"pipelineDefinitionID"`
	IsEdge               bool                        `json:"isEdge"`
	EdgeReportStatus     apistructs.EdgeReportStatus `json:"edgeReportStatus"`
}

PipelineBase represents `pipeline_bases` table.

func (*PipelineBase) TableName

func (*PipelineBase) TableName() string

type PipelineBaseWithDefinition

type PipelineBaseWithDefinition struct {
	PipelineBase                    `xorm:"extends"`
	definitiondb.PipelineDefinition `xorm:"extends"`
	sourcedb.PipelineSource         `xorm:"extends"`
}

func (*PipelineBaseWithDefinition) TableName

func (*PipelineBaseWithDefinition) TableName() string

type PipelineCombosReq

type PipelineCombosReq struct {
	Branches []string `json:"branches"`
	Sources  []string `json:"sources"`
	YmlNames []string `json:"ymlNames"`
}

type PipelineConfig

type PipelineConfig struct {
	ID    uint64             `json:"id" xorm:"pk autoincr"`
	Type  PipelineConfigType `json:"type"`
	Value interface{}        `json:"value" xorm:"json"`
}

func (PipelineConfig) TableName

func (PipelineConfig) TableName() string

type PipelineConfigType

type PipelineConfigType string
var (
	PipelineConfigTypeActionExecutor PipelineConfigType = "action_executor"
)

type PipelineExtra

type PipelineExtra struct {
	PipelineID uint64 `json:"pipelineID,omitempty" xorm:"pk 'pipeline_id'"`

	// PipelineYml 流水线定义文件
	PipelineYml string `json:"pipelineYml"`

	// Extra 额外信息
	Extra PipelineExtraInfo `json:"extra" xorm:"json"`

	// NormalLabels 普通标签,仅展示,不可过滤
	NormalLabels map[string]string `json:"normalLabels" xorm:"json"`

	// Snapshot 运行时的快照
	Snapshot Snapshot `json:"snapshot" xorm:"json"`

	// CommitDetail 提交详情
	CommitDetail apistructs.CommitDetail `json:"commitDetail" xorm:"json"`

	// Progress 流水线整体执行进度,0-100
	// -1 表示未设置
	// progress 只存最终结果,若 >= 0,直接返回,无需再计算
	Progress int `json:"progress"`

	ExtraTimeCreated *time.Time `json:"timeCreated,omitempty" xorm:"created 'time_created'"`
	ExtraTimeUpdated *time.Time `json:"timeUpdated,omitempty" xorm:"updated 'time_updated'"`
}

PipelineExtra represents `pipeline_extras` table. `pipeline_extras` 与 `pipeline_bases` 一一对应

func (*PipelineExtra) GetCommitID

func (extra *PipelineExtra) GetCommitID() string

func (*PipelineExtra) GetOrgName

func (extra *PipelineExtra) GetOrgName() string

func (*PipelineExtra) TableName

func (*PipelineExtra) TableName() string

type PipelineExtraInfo

type PipelineExtraInfo struct {
	Namespace               string                       `json:"namespace"`
	NotPipelineControlledNs bool                         `json:"notPipelineControlledNs"`
	DiceWorkspace           apistructs.DiceWorkspace     `json:"diceWorkspace,omitempty"`
	PipelineYmlSource       apistructs.PipelineYmlSource `json:"pipelineYmlSource,omitempty"`
	SubmitUser              *basepb.PipelineUser         `json:"submitUser,omitempty"`
	RunUser                 *basepb.PipelineUser         `json:"runUser,omitempty"`
	CancelUser              *basepb.PipelineUser         `json:"cancelUser,omitempty"`
	OwnerUser               *basepb.PipelineUser         `json:"ownerUser,omitempty"`
	InternalClient          string                       `json:"internalClient,omitempty"`
	CronExpr                string                       `json:"cronExpr,omitempty"`
	CronTriggerTime         *time.Time                   `json:"cronTriggerTime,omitempty"` // 秒级精确,毫秒级误差请忽略,cron expr 精确度同样为秒级
	ShowMessage             *basepb.ShowMessage          `json:"showMessage,omitempty"`
	Messages                []string                     `json:"errors,omitempty"` // TODO ShowMessage 和 Message
	// Deprecated
	ConfigManageNamespaceOfSecretsDefault string `json:"configManageNamespaceOfSecretsDefault,omitempty"`
	// Deprecated
	ConfigManageNamespaceOfSecrets string            `json:"configManageNamespaceOfSecrets,omitempty"`
	ConfigManageNamespaces         []string          `json:"configManageNamespaces,omitempty"`
	IncomingSecrets                map[string]string `json:"incomingSecrets,omitempty"`

	CopyFromPipelineID *uint64            `json:"copyFromPipelineID,omitempty"` // 是否是从其他节点拷贝过来
	RerunFailedDetail  *RerunFailedDetail `json:"rerunFailedDetail,omitempty"`

	IsAutoRun      bool                     `json:"isAutoRun,omitempty"` // 创建后是否自动开始执行
	ShareVolumeID  string                   `json:"shareVolumeId,omitempty"`
	TaskWorkspaces []string                 `json:"taskWorkspaces,omitempty"` //工作目录,例如git
	StorageConfig  apistructs.StorageConfig `json:"storageConfig,omitempty"`  // 挂载设置

	CallbackURLs []string `json:"callbackURLs,omitempty"`

	Version string `json:"version,omitempty"` // 1.1, 1.0

	// 是否已经 完成 Reconciler GC
	CompleteReconcilerGC bool `json:"completeReconcilerGC"`

	// 是否已完成 Reconcile teardown
	CompleteReconcilerTeardown bool `json:"completeReconcilerTeardown"`

	// 用于保存自动转换前的 v1 pipelineYmlName(通过 V1 API 创建的流水线,通过该参数调用 gittar 获取内容)
	PipelineYmlNameV1 string `json:"pipelineYmlNameV1,omitempty"`

	// pipeline 运行时的输入参数
	RunPipelineParams []apistructs.PipelineRunParam `json:"runPipelineParams,omitempty"`

	// GC
	GC basepb.PipelineGC `json:"gc,omitempty"`

	// OutputDefines
	DefinedOutputs []apistructs.PipelineOutput `json:"definedOutputs,omitempty"`

	SnippetChain []uint64 `json:"snippetChain,omitempty"`

	QueueInfo *QueueInfo `json:"queueInfo,omitempty"`

	TaskOperates []*pipelinepb.PipelineTaskOperateRequest `json:"taskTaskOperates,omitempty"`

	ContainerInstanceProvider *apistructs.ContainerInstanceProvider `json:"containerInstanceProvider,omitempty"`

	Breakpoint *basepb.Breakpoint `json:"breakpoint,omitempty"`
}

type PipelineLabel

type PipelineLabel struct {
	ID uint64 `json:"id" xorm:"pk autoincr"`

	Type     apistructs.PipelineLabelType `json:"type,omitempty"`
	TargetID uint64                       `json:"targetID"`

	PipelineSource  apistructs.PipelineSource `json:"pipelineSource"`
	PipelineYmlName string                    `json:"pipelineYmlName"`

	Key   string `json:"key"`
	Value string `json:"value"`

	TimeCreated time.Time `json:"timeCreated" xorm:"created"`
	TimeUpdated time.Time `json:"timeUpdated" xorm:"updated"`
}

PipelineLabel 标签

func (PipelineLabel) TableName

func (p PipelineLabel) TableName() string

type PipelineOld

type PipelineOld struct {
	ID uint64 `json:"id,omitempty" xorm:"pk autoincr"`

	// 通过 source + pipelineYmlName 唯一定位
	Source apistructs.PipelineSource `json:"source,omitempty"`
	// 通过 v1 创建的 pipeline,自动生成唯一的 pipelineYmlName
	// 通过 v2 创建的 pipeline,由调用方保证
	PipelineYmlName string `json:"pipelineYmlName,omitempty"`
	PipelineYml     string `json:"pipelineYml,omitempty"`

	// 调度集群
	// +required
	ClusterName string `json:"clusterName,omitempty"`

	// 运行时相关信息
	Type        apistructs.PipelineType        `json:"type,omitempty"`
	TriggerMode apistructs.PipelineTriggerMode `json:"triggerMode,omitempty"`
	Snapshot    Snapshot                       `json:"snapshot,omitempty" xorm:"json"` // 快照
	Progress    float64                        `json:"progress,omitempty" xorm:"-"`    // pipeline 执行进度, eg: 0.8 表示 80%
	Status      apistructs.PipelineStatus      `json:"status,omitempty"`
	Extra       PipelineExtraInfo              `json:"extra,omitempty" xorm:"json"`

	// 时间
	CostTimeSec int64      `json:"costTimeSec,omitempty"`                // pipeline 总耗时/秒
	TimeBegin   *time.Time `json:"timeBegin,omitempty"`                  // 执行开始时间
	TimeEnd     *time.Time `json:"timeEnd,omitempty"`                    // 执行结束时间
	TimeCreated *time.Time `json:"timeCreated,omitempty" xorm:"created"` // 记录创建时间
	TimeUpdated *time.Time `json:"timeUpdated,omitempty" xorm:"updated"` // 记录更新时间

	// 定时相关信息
	// +optional
	CronID *uint64 `json:"cronID,omitempty"`

	// deprecated
	BasePipelineID uint64 `json:"basePipelineID,omitempty"` // 该字段用来分页展示时 group 分组,相同 BasePipelineID 的数据会被折叠成一条,通过执行记录来跳转

	// 应用相关信息
	// +optional
	OrgID           uint64 `json:"orgID,omitempty"`
	OrgName         string `json:"orgName,omitempty"` // tag schedule
	ProjectID       uint64 `json:"projectID,omitempty"`
	ProjectName     string `json:"projectName,omitempty"` // tag schedule
	ApplicationID   uint64 `json:"applicationID,omitempty"`
	ApplicationName string `json:"applicationName,omitempty"`

	// 分支相关信息
	// +optional
	PipelineYmlSource apistructs.PipelineYmlSource `json:"pipelineYmlSource,omitempty"` // yml 文件来源
	Branch            string                       `json:"branch,omitempty"`
	Commit            string                       `json:"commit,omitempty"`
	CommitDetail      apistructs.CommitDetail      `json:"commitDetail,omitempty" xorm:"json"`
}

PipelineOld

func (*PipelineOld) TableName

func (*PipelineOld) TableName() string

type PipelineReport

type PipelineReport struct {
	ID         uint64 `xorm:"pk autoincr"`
	PipelineID uint64
	Type       apistructs.PipelineReportType
	Meta       apistructs.PipelineReportMeta `xorm:"json"`
	CreatorID  string
	UpdaterID  string
	CreatedAt  time.Time `xorm:"created"`
	UpdatedAt  time.Time `xorm:"updated"`
}

PipelineBase represents `dice_pipeline_reports` table.

func (*PipelineReport) ConvertToPB

func (p *PipelineReport) ConvertToPB() (*pb.PipelineReport, error)

func (*PipelineReport) TableName

func (*PipelineReport) TableName() string

type PipelineStage

type PipelineStage struct {
	ID         uint64 `json:"id" xorm:"pk autoincr"`
	PipelineID uint64 `json:"pipelineID"`

	Name   string                    `json:"name"`
	Extra  PipelineStageExtra        `json:"extra" xorm:"json"`
	Status apistructs.PipelineStatus `json:"status"`

	CostTimeSec int64     `json:"costTimeSec"`
	TimeBegin   time.Time `json:"timeBegin"`                  // 执行开始时间
	TimeEnd     time.Time `json:"timeEnd"`                    // 执行结束时间
	TimeCreated time.Time `json:"timeCreated" xorm:"created"` // 记录创建时间
	TimeUpdated time.Time `json:"timeUpdated" xorm:"updated"` // 记录更新时间
}

func (*PipelineStage) Convert2DTO

func (ps *PipelineStage) Convert2DTO() *apistructs.PipelineStageDTO

func (*PipelineStage) TableName

func (ps *PipelineStage) TableName() string

type PipelineStageExtra

type PipelineStageExtra struct {
	PreStage   *PreStageSimple `json:"preStage,omitempty"`
	StageOrder int             `json:"stageOrder"` // 0,1,2,...
}

type PipelineStageWithTask

type PipelineStageWithTask struct {
	PipelineStage
	PipelineTasks []*PipelineTask
}

type PipelineTask

type PipelineTask struct {
	ID         uint64 `json:"id" xorm:"pk autoincr"`
	PipelineID uint64 `json:"pipelineID"`
	StageID    uint64 `json:"stageID"`

	Name         string                    `json:"name"`
	OpType       PipelineTaskOpType        `json:"opType"`         // Deprecated: get, put, task
	Type         string                    `json:"type,omitempty"` // git, buildpack, release, dice ... 当 OpType 为自定义任务时为空
	ExecutorKind PipelineTaskExecutorKind  `json:"executorKind"`   // scheduler, memory
	Status       apistructs.PipelineStatus `json:"status"`
	Extra        PipelineTaskExtra         `json:"extra" xorm:"json"`
	Context      PipelineTaskContext       `json:"context" xorm:"json"`
	Result       *taskresult.Result        `json:"result" xorm:"json"`
	Inspect      taskinspect.Inspect       `json:"inspect" xorm:"json"`

	IsSnippet             bool                                  `json:"isSnippet"`                         // 该节点是否是嵌套流水线节点
	SnippetPipelineID     *uint64                               `json:"snippetPipelineID"`                 // 嵌套的流水线 id
	SnippetPipelineDetail *apistructs.PipelineTaskSnippetDetail `json:"snippetPipelineDetail" xorm:"json"` // 嵌套的流水线详情

	CostTimeSec  int64     `json:"costTimeSec"`                // -1 表示暂无耗时信息, 0 表示确实是0s结束
	QueueTimeSec int64     `json:"queueTimeSec"`               // 等待调度的耗时, -1 暂无耗时信息, 0 表示确实是0s结束 TODO 赋值
	TimeBegin    time.Time `json:"timeBegin"`                  // 执行开始时间
	TimeEnd      time.Time `json:"timeEnd"`                    // 执行结束时间
	TimeCreated  time.Time `json:"timeCreated" xorm:"created"` // 记录创建时间
	TimeUpdated  time.Time `json:"timeUpdated" xorm:"updated"` // 记录更新时间

	IsEdge bool `json:"isEdge"`
}

func (*PipelineTask) CheckExecutorDoneChanDataVersion

func (pt *PipelineTask) CheckExecutorDoneChanDataVersion(actualVersion string) error

func (*PipelineTask) Convert2DTO

func (pt *PipelineTask) Convert2DTO() *apistructs.PipelineTaskDTO

func (*PipelineTask) Convert2PB

func (pt *PipelineTask) Convert2PB() *basepb.PipelineTaskDTO

func (*PipelineTask) ConvertTaskContainer2PB

func (pt *PipelineTask) ConvertTaskContainer2PB() []*basepb.TaskContainer

func (*PipelineTask) GenerateExecutorDoneChanDataVersion

func (pt *PipelineTask) GenerateExecutorDoneChanDataVersion() string

func (*PipelineTask) GetBigDataConf

func (pt *PipelineTask) GetBigDataConf() (apistructs.BigdataSpec, error)

func (*PipelineTask) GetExecutorName

func (pt *PipelineTask) GetExecutorName() PipelineTaskExecutorName

func (*PipelineTask) MergeErrors

func (pt *PipelineTask) MergeErrors() taskerror.OrderedErrors

func (*PipelineTask) MergeErrors2PB

func (pt *PipelineTask) MergeErrors2PB() []*basepb.ErrorResponse

func (*PipelineTask) MergeMetadata

func (pt *PipelineTask) MergeMetadata() metadata.Metadata

func (*PipelineTask) MergePBMetadata

func (pt *PipelineTask) MergePBMetadata() []*commonpb.MetadataField

func (*PipelineTask) MergeTaskParamDetailToDisplay

func (pt *PipelineTask) MergeTaskParamDetailToDisplay(action apistructs.ActionSpec, ymlTask PipelineTask, snapshot Snapshot) (params []*basepb.TaskParamDetail)

func (*PipelineTask) NodeName

func (pt *PipelineTask) NodeName() string

func (*PipelineTask) PrevNodeNames

func (pt *PipelineTask) PrevNodeNames() []string

func (*PipelineTask) ReleaseID

func (pt *PipelineTask) ReleaseID() string

func (*PipelineTask) RuntimeID

func (pt *PipelineTask) RuntimeID() string

func (*PipelineTask) TableName

func (*PipelineTask) TableName() string

type PipelineTaskContext

type PipelineTaskContext struct {
	InStorages  metadata.Metadata `json:"inStorages,omitempty"`
	OutStorages metadata.Metadata `json:"outStorages,omitempty"`

	CmsDiceFiles metadata.Metadata `json:"cmsDiceFiles,omitempty"`
}

func (*PipelineTaskContext) Dedup

func (c *PipelineTaskContext) Dedup()

type PipelineTaskExecutorKind

type PipelineTaskExecutorKind string
var (
	PipelineTaskExecutorKindScheduler PipelineTaskExecutorKind = "SCHEDULER"
	PipelineTaskExecutorKindMemory    PipelineTaskExecutorKind = "MEMORY"
	PipelineTaskExecutorKindAPITest   PipelineTaskExecutorKind = "APITEST"
	PipelineTaskExecutorKindWait      PipelineTaskExecutorKind = "WAIT"
	PipelineTaskExecutorKindK8sJob    PipelineTaskExecutorKind = "K8SJOB"
	PipelineTaskExecutorKindK8sFlink  PipelineTaskExecutorKind = "K8SFLINK"
	PipelineTaskExecutorKindK8sSpark  PipelineTaskExecutorKind = "K8SSPARK"
	PipelineTaskExecutorKindDocker    PipelineTaskExecutorKind = "DOCKER"
	PipelineTaskExecutorKindList                               = []PipelineTaskExecutorKind{PipelineTaskExecutorKindScheduler, PipelineTaskExecutorKindMemory, PipelineTaskExecutorKindAPITest, PipelineTaskExecutorKindWait, PipelineTaskExecutorKindK8sJob}
)

func (PipelineTaskExecutorKind) Check

func (that PipelineTaskExecutorKind) Check() bool

func (PipelineTaskExecutorKind) GenExecutorNameByClusterName

func (that PipelineTaskExecutorKind) GenExecutorNameByClusterName(clusterName string) PipelineTaskExecutorName

func (PipelineTaskExecutorKind) GetDefaultExecutorName

func (that PipelineTaskExecutorKind) GetDefaultExecutorName() PipelineTaskExecutorName

func (PipelineTaskExecutorKind) IsK8sKind

func (that PipelineTaskExecutorKind) IsK8sKind() bool

func (PipelineTaskExecutorKind) String

func (that PipelineTaskExecutorKind) String() string

type PipelineTaskExecutorName

type PipelineTaskExecutorName string
var (
	PipelineTaskExecutorNameEmpty            PipelineTaskExecutorName = ""
	PipelineTaskExecutorNameSchedulerDefault PipelineTaskExecutorName = "scheduler"
	PipelineTaskExecutorNameAPITestDefault   PipelineTaskExecutorName = "api-test"
	PipelineTaskExecutorNameWaitDefault      PipelineTaskExecutorName = "wait"
	PipelineTaskExecutorNameK8sJobDefault    PipelineTaskExecutorName = "k8s-job"
	PipelineTaskExecutorNameK8sFlinkDefault  PipelineTaskExecutorName = "k8s-flink"
	PipelineTaskExecutorNameK8sSparkDefault  PipelineTaskExecutorName = "k8s-spark"
	PipelineTaskExecutorNameDockerDefault    PipelineTaskExecutorName = "docker"
	PipelineTaskExecutorNameList                                      = []PipelineTaskExecutorName{PipelineTaskExecutorNameEmpty, PipelineTaskExecutorNameSchedulerDefault, PipelineTaskExecutorNameAPITestDefault, PipelineTaskExecutorNameWaitDefault, PipelineTaskExecutorNameK8sJobDefault}
)

func (PipelineTaskExecutorName) Check

func (that PipelineTaskExecutorName) Check() bool

func (PipelineTaskExecutorName) String

func (that PipelineTaskExecutorName) String() string

type PipelineTaskExtra

type PipelineTaskExtra struct {
	Namespace               string                     `json:"namespace,omitempty"`
	NotPipelineControlledNs bool                       `json:"notPipelineControlledNs,omitempty"`
	ExecutorName            PipelineTaskExecutorName   `json:"executorName,omitempty"`
	ClusterName             string                     `json:"clusterName,omitempty"`
	AllowFailure            bool                       `json:"allowFailure,omitempty"`
	Pause                   bool                       `json:"pause,omitempty"`
	Timeout                 time.Duration              `json:"timeout,omitempty"`
	PrivateEnvs             map[string]string          `json:"envs,omitempty"`       // PrivateEnvs 由 agent 注入 run 运行时,run 可见,容器内不可见
	PublicEnvs              map[string]string          `json:"publicEnvs,omitempty"` // PublicEnvs 注入容器,run 可见,容器内亦可见
	Labels                  map[string]string          `json:"labels,omitempty"`
	Image                   string                     `json:"image,omitempty"`
	Cmd                     string                     `json:"cmd,omitempty"`
	CmdArgs                 []string                   `json:"cmdArgs,omitempty"`
	Binds                   []apistructs.Bind          `json:"binds,omitempty"`
	TaskContainers          []apistructs.TaskContainer `json:"taskContainers"`
	// Volumes 创建 task 时的 volumes 快照
	// 若一开始 volume 无 volumeID,启动 task 后返回的 volumeID 不会在这里更新,只会更新到 task.Context.OutStorages 里
	Volumes         []metadata.MetadataField `json:"volumes,omitempty"` //
	PreFetcher      *apistructs.PreFetcher   `json:"preFetcher,omitempty"`
	RuntimeResource RuntimeResource          `json:"runtimeResource,omitempty"`
	UUID            string                   `json:"uuid"` // 用于查询日志等,pipeline 开始执行时才会赋值 // 对接多个 executor,不一定每个 executor 都能自定义 UUID,所以这个 uuid 实际上是目标系统的 uuid
	TimeBeginQueue  time.Time                `json:"timeBeginQueue"`
	TimeEndQueue    time.Time                `json:"timeEndQueue"`
	StageOrder      int                      `json:"stageOrder"` // 0,1,2,...

	// RunAfter indicates the tasks this task depends.
	RunAfter []string `json:"runAfter"`

	FlinkSparkConf FlinkSparkConf `json:"flinkSparkConf,omitempty"`

	Action pipelineyml.Action `json:"action,omitempty"`

	OpenapiOAuth2TokenPayload apistructs.OAuth2TokenPayload `json:"openapiOAuth2TokenPayload"`

	LoopOptions *apistructs.PipelineTaskLoopOptions `json:"loopOptions,omitempty"` // 开始执行后保证不为空

	AppliedResources apistructs.PipelineAppliedResources `json:"appliedResources,omitempty"`

	EncryptSecretKeys []string `json:"encryptSecretKeys"` // the encrypt envs' key list

	CurrentPolicy apistructs.Policy `json:"currentPolicy"` // task execution strategy

	ContainerInstanceProvider *apistructs.ContainerInstanceProvider `json:"containerInstanceProvider,omitempty"`

	Breakpoint *basepb.Breakpoint `json:"breakpoint,omitempty"`
}

type PipelineTaskOpType

type PipelineTaskOpType string

GenerateOperation

var (
	PipelineTaskOpTypeGet  PipelineTaskOpType = "get"
	PipelineTaskOpTypePut  PipelineTaskOpType = "put"
	PipelineTaskOpTypeTask PipelineTaskOpType = "task"
)

type PipelineWithStage

type PipelineWithStage struct {
	Pipeline
	PipelineStages []*PipelineStageWithTask
}

type PipelineWithStageAndTask

type PipelineWithStageAndTask struct {
	Pipeline
	PipelineStages []PipelineStage
	PipelineTasks  []PipelineTask
}

type PipelineWithTasks

type PipelineWithTasks struct {
	Pipeline *Pipeline
	Tasks    []*PipelineTask
}

func (*PipelineWithTasks) DoneTasks

func (p *PipelineWithTasks) DoneTasks() []string

type PreStageSimple

type PreStageSimple struct {
	ID     uint64                    `json:"id"`
	Status apistructs.PipelineStatus `json:"preStageStatus,omitempty"`
}

type QueueInfo

type QueueInfo struct {
	QueueID          uint64                          `json:"queueID"`
	CustomPriority   int64                           `json:"customPriority"`
	EnqueueCondition apistructs.EnqueueConditionType `json:"enqueueCondition"`
	// Pipeline priority changed history from initial to latest in queue
	PriorityChangeHistory []int64 `json:"priorityChangeHistory,omitempty"`
}

type RerunFailedDetail

type RerunFailedDetail struct {
	RerunPipelineID uint64            `json:"rerunPipelineID,omitempty"`
	StageIndex      int               `json:"stageIndex,omitempty"`
	SuccessTasks    map[string]uint64 `json:"successTasks,omitempty"`
	FailedTasks     map[string]uint64 `json:"failedTasks,omitempty"`
	NotExecuteTasks map[string]uint64 `json:"notExecuteTasks,omitempty"`
}

type RuntimeResource

type RuntimeResource struct {
	CPU       float64               `json:"cpu"`
	Memory    float64               `json:"memory"`
	Disk      float64               `json:"disk"`
	MaxCPU    float64               `json:"maxCPU"`
	MaxMemory float64               `json:"maxMemory"`
	Network   apistructs.PodNetwork `json:"network"`
}

func GenDefaultTaskResource

func GenDefaultTaskResource() RuntimeResource

type Snapshot

type Snapshot struct {
	PipelineYml     string            `json:"pipeline_yml,omitempty"` // 对占位符进行渲染
	Secrets         map[string]string `json:"secrets,omitempty"`
	PlatformSecrets map[string]string `json:"platformSecrets,omitempty"`
	CmsDiceFiles    map[string]string `json:"cmsDiceFiles,omitempty"`
	Envs            map[string]string `json:"envs,omitempty"`

	AnalyzedCrossCluster *bool `json:"analyzedCrossCluster,omitempty"`

	RunPipelineParams apistructs.PipelineRunParamsWithValue `json:"runPipelineParams,omitempty"` // 流水线运行时参数

	// IdentityInfo 身份信息
	IdentityInfo commonpb.IdentityInfo `json:"identityInfo" xorm:"json"`

	// OutputValues output 定义和从 task 里采集上来的值
	OutputValues []apistructs.PipelineOutputWithValue `json:"outputValues,omitempty"`

	// AppliedResources calculated by all actions
	AppliedResources apistructs.PipelineAppliedResources `json:"appliedResources,omitempty"`

	// BindQueue stores the binding queue info if have.
	BindQueue *pb.Queue `json:"bindQueue,omitempty"`

	// Events stores pipeline level k8s-like events
	Events []*apistructs.PipelineEvent `json:"events,omitempty"`

	// EncryptSecretKeys the encrypt envs' key list
	EncryptSecretKeys []string `json:"encryptSecretKeys"`
}

func (*Snapshot) FromDB

func (s *Snapshot) FromDB(b []byte) error

FromDB 兼容 Snapshot 老数据

func (*Snapshot) ToDB

func (s *Snapshot) ToDB() ([]byte, error)

type StoreType

type StoreType string
var (
	StoreTypeOSS             StoreType = "OSS"
	StoreTypeNFS             StoreType = "NFS"
	StoreTypeDiceVolumeNFS   StoreType = "dice-nfs-volume"
	StoreTypeDiceVolumeLocal StoreType = "dice-local-volume"
	StoreTypeDiceVolumeFake  StoreType = "dice-fake-volume"
	StoreTypeDiceCacheNFS    StoreType = "dice-cache-nfs-volume"
)

type TaskExecutorConfig

type TaskExecutorConfig struct {
	Kind        string            `json:"kind,omitempty"`
	Name        string            `json:"name,omitempty"`
	ClusterName string            `json:"clusterName,omitempty"`
	Options     map[string]string `json:"options,omitempty"`
}

type Volume

type Volume struct {
	HostPath      string `json:"hostPath"`
	ContainerPath string `json:"containerPath"`
	ReadOnly      bool   `json:"readOnly"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL