Documentation ¶
Index ¶
- Constants
- Variables
- type ActiveAction
- type BaseInfo
- type BaseInfoGetter
- type Check
- type Command
- type CommandName
- type Dag
- type DagInstance
- func (dagIns *DagInstance) Block(reason string)
- func (dagIns *DagInstance) CanModifyStatus() bool
- func (dagIns *DagInstance) Cancel(taskInsIds []string) error
- func (dagIns *DagInstance) Fail(reason string)
- func (dagIns *DagInstance) Retry(taskInsIds []string) error
- func (dagIns *DagInstance) Run()
- func (dagIns *DagInstance) Success()
- func (dagIns *DagInstance) VarsGetter() utils.KeyValueGetter
- func (dagIns *DagInstance) VarsIterator() utils.KeyValueIterator
- type DagInstanceHookFunc
- type DagInstanceLifecycleHook
- type DagInstanceStatus
- type DagInstanceVar
- type DagInstanceVars
- type DagStatus
- type DagVar
- type DagVars
- type MockBaseInfoGetter
- type Operator
- type PreChecks
- type ShareData
- func (d *ShareData) Get(key string) (string, bool)
- func (*ShareData) GormDataType() string
- func (d *ShareData) MarshalBSON() ([]byte, error)
- func (d *ShareData) MarshalJSON() ([]byte, error)
- func (s *ShareData) Scan(value interface{}) error
- func (d *ShareData) Set(key string, val string)
- func (d *ShareData) UnmarshalBSON(data []byte) error
- func (d *ShareData) UnmarshalJSON(data []byte) error
- func (s *ShareData) Value() (driver.Value, error)
- type SpecifiedVar
- type StringArray
- type StringMap
- type Task
- type TaskCondition
- type TaskConditionSource
- type TaskInstance
- func (t *TaskInstance) DoPreCheck(dagIns *DagInstance) (isActive bool, err error)
- func (t *TaskInstance) GetDepend() []string
- func (t *TaskInstance) GetGraphID() string
- func (t *TaskInstance) GetID() string
- func (t *TaskInstance) GetStatus() TaskInstanceStatus
- func (t *TaskInstance) InitialDep(ctx run.ExecuteContext, patch func(*TaskInstance) error, dagIns *DagInstance)
- func (t *TaskInstance) Run(params interface{}, act run.Action) (err error)
- func (t *TaskInstance) SetStatus(s TaskInstanceStatus) error
- func (t *TaskInstance) Trace(msg string, ops ...run.TraceOp)
- type TaskInstanceStatus
- type TraceInfo
- type TraceInfos
- type Trigger
Constants ¶
const ( CommandNameRetry = "retry" CommandNameCancel = "cancel" )
Variables ¶
var ( StoreMarshal func(interface{}) ([]byte, error) StoreUnmarshal func([]byte, interface{}) error )
Functions ¶
This section is empty.
Types ¶
type ActiveAction ¶
type ActiveAction string
const ( // skip action when all condition is meet, otherwise execute it ActiveActionSkip ActiveAction = "skip" // block action when all condition is meet, otherwise execute it ActiveActionBlock ActiveAction = "block" )
type BaseInfo ¶
type BaseInfo struct { ID string `yaml:"id" json:"id" bson:"_id" gorm:"primarykey"` CreatedAt int64 `yaml:"createdAt" json:"createdAt" bson:"createdAt"` UpdatedAt int64 `yaml:"updatedAt" json:"updatedAt" bson:"updatedAt"` }
BaseInfo
type Check ¶
type Check struct { Conditions []TaskCondition `yaml:"conditions,omitempty" json:"conditions,omitempty" bson:"conditions,omitempty"` Act ActiveAction `yaml:"act,omitempty" json:"act,omitempty" bson:"act,omitempty"` }
Check
func (*Check) IsMeet ¶
func (c *Check) IsMeet(dagIns *DagInstance) bool
IsMeet return if check is meet
type Command ¶
type Command struct { Name CommandName TargetTaskInsIDs []string }
Command
func (Command) GormDataType ¶
type Dag ¶
type Dag struct { BaseInfo `yaml:",inline" json:",inline" bson:"inline"` Name string `yaml:"name,omitempty" json:"name,omitempty" bson:"name,omitempty"` Desc string `yaml:"desc,omitempty" json:"desc,omitempty" bson:"desc,omitempty"` Cron string `yaml:"cron,omitempty" json:"cron,omitempty" bson:"cron,omitempty"` Vars DagVars `yaml:"vars,omitempty" json:"vars,omitempty" bson:"vars,omitempty" gorm:"type:json"` Status DagStatus `yaml:"status,omitempty" json:"status,omitempty" bson:"status,omitempty" gorm:"type:string"` Tasks []Task `yaml:"tasks,omitempty" json:"tasks,omitempty" bson:"tasks,omitempty" gorm:"-"` }
Dag
type DagInstance ¶
type DagInstance struct { BaseInfo `bson:"inline"` DagID string `json:"dagId,omitempty" bson:"dagId,omitempty"` Trigger Trigger `json:"trigger,omitempty" bson:"trigger,omitempty" gorm:"type:string"` Worker string `json:"worker,omitempty" bson:"worker,omitempty"` Vars DagInstanceVars `json:"vars,omitempty" bson:"vars,omitempty" gorm:"type:json"` Status DagInstanceStatus `json:"status,omitempty" bson:"status,omitempty" gorm:"type:string"` Reason string `json:"reason,omitempty" bson:"reason,omitempty"` Cmd *Command `json:"cmd,omitempty" bson:"cmd,omitempty" gorm:"type:json"` }
DagInstance
func (*DagInstance) CanModifyStatus ¶
func (dagIns *DagInstance) CanModifyStatus() bool
CanChange indicate if the dag instance can modify status
func (*DagInstance) Cancel ¶
func (dagIns *DagInstance) Cancel(taskInsIds []string) error
Cancel a task, it is just set a command, command will execute by Parser
func (*DagInstance) Retry ¶
func (dagIns *DagInstance) Retry(taskInsIds []string) error
Retry a task, it is just set a command, command will execute by Parser
func (*DagInstance) VarsGetter ¶
func (dagIns *DagInstance) VarsGetter() utils.KeyValueGetter
VarsGetter
func (*DagInstance) VarsIterator ¶
func (dagIns *DagInstance) VarsIterator() utils.KeyValueIterator
VarsIterator
type DagInstanceHookFunc ¶
type DagInstanceHookFunc func(dagIns *DagInstance)
type DagInstanceLifecycleHook ¶
type DagInstanceLifecycleHook struct { BeforeRun DagInstanceHookFunc BeforeSuccess DagInstanceHookFunc BeforeFail DagInstanceHookFunc BeforeBlock DagInstanceHookFunc BeforeRetry DagInstanceHookFunc }
DagInstanceLifecycleHook
var (
HookDagInstance DagInstanceLifecycleHook
)
type DagInstanceStatus ¶
type DagInstanceStatus string
DagInstanceStatus
const ( DagInstanceStatusInit DagInstanceStatus = "init" DagInstanceStatusScheduled DagInstanceStatus = "scheduled" DagInstanceStatusRunning DagInstanceStatus = "running" DagInstanceStatusBlocked DagInstanceStatus = "blocked" DagInstanceStatusFailed DagInstanceStatus = "failed" DagInstanceStatusSuccess DagInstanceStatus = "success" )
type DagInstanceVar ¶
type DagInstanceVar struct {
Value string `json:"value,omitempty" bson:"value,omitempty"`
}
DagInstanceVar
type DagInstanceVars ¶
type DagInstanceVars map[string]DagInstanceVar
DagInstanceVars
func (DagInstanceVars) GormDataType ¶
func (DagInstanceVars) GormDataType() string
func (DagInstanceVars) Render ¶
func (vars DagInstanceVars) Render(p map[string]interface{}) (map[string]interface{}, error)
Render variables
func (*DagInstanceVars) Scan ¶
func (d *DagInstanceVars) Scan(value interface{}) error
实现 sql.Scanner 接口,Scan 将 value 扫描至 Jsonb
type DagVar ¶
type DagVar struct { Desc string `yaml:"desc,omitempty" json:"desc,omitempty" bson:"desc,omitempty"` DefaultValue string `yaml:"defaultValue,omitempty" json:"defaultValue,omitempty" bson:"defaultValue,omitempty"` }
DagVar
type DagVars ¶
func (DagVars) GormDataType ¶
type MockBaseInfoGetter ¶
MockBaseInfoGetter is an autogenerated mock type for the BaseInfoGetter type
func (*MockBaseInfoGetter) GetBaseInfo ¶
func (_m *MockBaseInfoGetter) GetBaseInfo() *BaseInfo
GetBaseInfo provides a mock function with given fields:
type PreChecks ¶
func (PreChecks) GormDataType ¶
type ShareData ¶
type ShareData struct { // contains filtered or unexported fields }
ShareData can read/write within all tasks and will persist it if you want a high performance just within same task, you can use ExecuteContext's Context
func (*ShareData) GormDataType ¶
func (*ShareData) MarshalBSON ¶
MarshalBSON used by mongo
func (*ShareData) MarshalJSON ¶
MarshalJSON used by json
func (*ShareData) UnmarshalBSON ¶
UnmarshalBSON used by mongo
func (*ShareData) UnmarshalJSON ¶
UnmarshalJSON used by json
type StringArray ¶
type StringArray []string
func (StringArray) GormDataType ¶
func (StringArray) GormDataType() string
func (*StringArray) Scan ¶
func (d *StringArray) Scan(value interface{}) error
实现 sql.Scanner 接口,Scan 将 value 扫描至 Jsonb
type StringMap ¶
type StringMap map[string]interface{}
func (StringMap) GormDataType ¶
type Task ¶
type Task struct { ID string `yaml:"id,omitempty" json:"id,omitempty" bson:"id,omitempty" gorm:"primarykey"` Name string `yaml:"name,omitempty" json:"name,omitempty" bson:"name,omitempty"` DagID string `yaml:"dagId,omitempty" json:"dagId,omitempty" bson:"dagId,omitempty"` DependOn StringArray `yaml:"dependOn,omitempty" json:"dependOn,omitempty" bson:"dependOn,omitempty" gorm:"type:json"` ActionName string `yaml:"actionName,omitempty" json:"actionName,omitempty" bson:"actionName,omitempty"` TimeoutSecs int `yaml:"timeoutSecs,omitempty" json:"timeoutSecs,omitempty" bson:"timeoutSecs,omitempty"` Params StringMap `yaml:"params,omitempty" json:"params,omitempty" bson:"params,omitempty" gorm:"type:json"` PreChecks PreChecks `yaml:"preCheck,omitempty" json:"preCheck,omitempty" bson:"preCheck,omitempty" gorm:"type:json"` }
Task
type TaskCondition ¶
type TaskCondition struct { Source TaskConditionSource `yaml:"source,omitempty" json:"source,omitempty" bson:"source,omitempty"` Key string `yaml:"key,omitempty" json:"key,omitempty" bson:"key,omitempty"` Values []string `yaml:"values,omitempty" json:"values,omitempty" bson:"values,omitempty"` Op Operator `yaml:"op,omitempty" json:"op,omitempty" bson:"op,omitempty"` }
TaskCondition
func (*TaskCondition) IsMeet ¶
func (c *TaskCondition) IsMeet(dagIns *DagInstance) bool
IsMeet return if check is meet
type TaskConditionSource ¶
type TaskConditionSource string
const ( TaskConditionSourceVars TaskConditionSource = "vars" )
func (TaskConditionSource) BuildKvGetter ¶
func (t TaskConditionSource) BuildKvGetter(dagIns *DagInstance) utils.KeyValueGetter
BuildKvGetter
type TaskInstance ¶
type TaskInstance struct { BaseInfo `bson:"inline"` // Task's Id it should be unique in a dag instance TaskID string `json:"taskId,omitempty" bson:"taskId,omitempty"` DagInsID string `json:"dagInsId,omitempty" bson:"dagInsId,omitempty"` Name string `json:"name,omitempty" bson:"name,omitempty"` DependOn StringArray `json:"dependOn,omitempty" bson:"dependOn,omitempty" gorm:"type:json"` ActionName string `json:"actionName,omitempty" bson:"actionName,omitempty"` TimeoutSecs int `json:"timeoutSecs" bson:"timeoutSecs"` Params StringMap `json:"params,omitempty" bson:"params,omitempty" gorm:"type:json"` Traces TraceInfos `json:"traces,omitempty" bson:"traces,omitempty" gorm:"type:json"` Status TaskInstanceStatus `json:"status,omitempty" bson:"status,omitempty" gorm:"type:string"` Reason string `json:"reason,omitempty" bson:"reason,omitempty" gorm:"type:text"` PreChecks PreChecks `json:"preChecks,omitempty" bson:"preChecks,omitempty" gorm:"type:json"` // used to save changes Patch func(*TaskInstance) error `json:"-" bson:"-" gorm:"-"` Context run.ExecuteContext `json:"-" bson:"-" gorm:"-"` RelatedDagInstance *DagInstance `json:"-" bson:"-" gorm:"-"` // contains filtered or unexported fields }
TaskInstance
func (*TaskInstance) DoPreCheck ¶
func (t *TaskInstance) DoPreCheck(dagIns *DagInstance) (isActive bool, err error)
DoPreCheck
func (*TaskInstance) InitialDep ¶
func (t *TaskInstance) InitialDep(ctx run.ExecuteContext, patch func(*TaskInstance) error, dagIns *DagInstance)
InitialDep
func (*TaskInstance) Run ¶
func (t *TaskInstance) Run(params interface{}, act run.Action) (err error)
Run action
func (*TaskInstance) SetStatus ¶
func (t *TaskInstance) SetStatus(s TaskInstanceStatus) error
SetStatus will persist task instance
type TaskInstanceStatus ¶
type TaskInstanceStatus string
TaskInstanceStatus
const ( TaskInstanceStatusInit TaskInstanceStatus = "init" TaskInstanceStatusCanceled TaskInstanceStatus = "canceled" TaskInstanceStatusRunning TaskInstanceStatus = "running" TaskInstanceStatusEnding TaskInstanceStatus = "ending" TaskInstanceStatusFailed TaskInstanceStatus = "failed" TaskInstanceStatusRetrying TaskInstanceStatus = "retrying" TaskInstanceStatusSuccess TaskInstanceStatus = "success" TaskInstanceStatusBlocked TaskInstanceStatus = "blocked" TaskInstanceStatusSkipped TaskInstanceStatus = "skipped" )
type TraceInfo ¶
type TraceInfo struct { Time int64 `json:"time,omitempty" bson:"time,omitempty"` Message string `json:"message,omitempty" bson:"message,omitempty"` }
TraceInfo
type TraceInfos ¶
type TraceInfos []TraceInfo
func (*TraceInfos) Scan ¶
func (t *TraceInfos) Scan(value interface{}) error
实现 sql.Scanner 接口,Scan 将 value 扫描至 Jsonb