Documentation ¶
Index ¶
- Constants
- Variables
- func SetCommander(c Commander)
- func SetExecutor(e Executor)
- func SetKeeper(e Keeper)
- func SetParser(e Parser)
- func SetStore(e Store)
- type Closer
- type CommandOptSetter
- type CommandOption
- type Commander
- type DefCommander
- func (c *DefCommander) CancelTask(taskInsIds []string, ops ...CommandOptSetter) error
- func (c *DefCommander) RetryDagIns(dagInsId string, ops ...CommandOptSetter) error
- func (c *DefCommander) RetryTask(taskInsIds []string, ops ...CommandOptSetter) error
- func (c *DefCommander) RunDag(dagId string, specVars map[string]string) (*entity.DagInstance, error)
- type DefDispatcher
- type DefExecutor
- type DefParser
- type DefWatchDog
- type DistributedMutex
- type Executor
- type Keeper
- type ListDagInput
- type ListDagInstanceInput
- type ListTaskInstanceInput
- type LockOption
- type LockOptionOp
- type MockCloser
- type MockExecutor
- type MockKeeper
- func (_m *MockKeeper) AliveNodes() ([]string, error)
- func (_m *MockKeeper) Close()
- func (_m *MockKeeper) IsAlive(workerKey string) (bool, error)
- func (_m *MockKeeper) IsLeader() bool
- func (_m *MockKeeper) NewMutex(key string) DistributedMutex
- func (_m *MockKeeper) WorkerKey() string
- func (_m *MockKeeper) WorkerNumber() int
- type MockParser
- type MockStore
- func (_m *MockStore) BatchCreatTaskIns(taskIns []*entity.TaskInstance) error
- func (_m *MockStore) BatchUpdateDagIns(dagIns []*entity.DagInstance) error
- func (_m *MockStore) BatchUpdateTaskIns(taskIns []*entity.TaskInstance) error
- func (_m *MockStore) Close()
- func (_m *MockStore) CreateDag(dag *entity.Dag) error
- func (_m *MockStore) CreateDagIns(dagIns *entity.DagInstance) error
- func (_m *MockStore) GetDag(dagId string) (*entity.Dag, error)
- func (_m *MockStore) GetDagInstance(dagInsId string) (*entity.DagInstance, error)
- func (_m *MockStore) GetTaskIns(taskIns string) (*entity.TaskInstance, error)
- func (_m *MockStore) ListDagInstance(input *ListDagInstanceInput) ([]*entity.DagInstance, error)
- func (_m *MockStore) ListTaskInstance(input *ListTaskInstanceInput) ([]*entity.TaskInstance, error)
- func (_m *MockStore) Marshal(obj interface{}) ([]byte, error)
- func (_m *MockStore) PatchDagIns(dagIns *entity.DagInstance, mustsPatchFields ...string) error
- func (_m *MockStore) PatchTaskIns(taskIns *entity.TaskInstance) error
- func (_m *MockStore) Unmarshal(bytes []byte, ptr interface{}) error
- func (_m *MockStore) UpdateDag(dag *entity.Dag) error
- func (_m *MockStore) UpdateDagIns(dagIns *entity.DagInstance) error
- func (_m *MockStore) UpdateTaskIns(taskIns *entity.TaskInstance) error
- type MockTaskInfoGetter
- type Parser
- type Store
- type TaskInfoGetter
- type TaskNode
- func (t *TaskNode) AppendChild(task *TaskNode)
- func (t *TaskNode) AppendParent(task *TaskNode)
- func (t *TaskNode) CanBeExecuted() bool
- func (t *TaskNode) CanExecuteChild() bool
- func (t *TaskNode) ComputeStatus() (status TreeStatus, srcTaskInsId string)
- func (t *TaskNode) Executable() bool
- func (t *TaskNode) GetExecutableTaskIds() (executables []string)
- func (t *TaskNode) GetNextTaskIds(completedOrRetryTask *entity.TaskInstance) (executable []string, find bool)
- func (t *TaskNode) HasCycle() (cycleStart *TaskNode)
- type TaskTree
- type TreeStatus
Constants ¶
const ( ReasonSuccessAfterCanceled = "success after canceled" ReasonParentCancel = "parent success but already be canceled" )
const DefFailedReason = "force failed by watch dog because it execute too long"
Variables ¶
var ( // CommSync means commander will watch dag instance's cmd executing situation until it's command is executed // usually command executing time is very short, so async mode is enough, // but if you want a sync call, you set it to true CommSync = func() CommandOptSetter { return func(opt *CommandOption) { opt.isSync = true } } // CommSync means commander will watch dag instance's cmd executing situation until it's command is executed // usually command executing time is very short, so async mode is enough, // but if you want a sync call, you set it to true CommSyncTimeout = func(duration time.Duration) CommandOptSetter { return func(opt *CommandOption) { if duration > 0 { opt.syncTimeout = duration } } } // CommSyncInterval is just work at sync mode, it is the interval of watch dag instance // default is 500ms CommSyncInterval = func(duration time.Duration) CommandOptSetter { return func(opt *CommandOption) { if duration > 0 { opt.syncInterval = duration } } } )
var (
ActionMap = map[string]run.Action{}
)
Functions ¶
Types ¶
type CommandOptSetter ¶
type CommandOptSetter func(opt *CommandOption)
type CommandOption ¶
type CommandOption struct {
// contains filtered or unexported fields
}
CommandOption
type Commander ¶
type Commander interface { RunDag(dagId string, specVar map[string]string) (*entity.DagInstance, error) RetryDagIns(dagInsId string, ops ...CommandOptSetter) error RetryTask(taskInsIds []string, ops ...CommandOptSetter) error CancelTask(taskInsIds []string, ops ...CommandOptSetter) error }
Commander used to execute command
type DefCommander ¶
type DefCommander struct { }
DefCommander used to execute command
func (*DefCommander) CancelTask ¶
func (c *DefCommander) CancelTask(taskInsIds []string, ops ...CommandOptSetter) error
CancelTask
func (*DefCommander) RetryDagIns ¶
func (c *DefCommander) RetryDagIns(dagInsId string, ops ...CommandOptSetter) error
RetryDagIns
func (*DefCommander) RetryTask ¶
func (c *DefCommander) RetryTask(taskInsIds []string, ops ...CommandOptSetter) error
RetryTask
func (*DefCommander) RunDag ¶
func (c *DefCommander) RunDag(dagId string, specVars map[string]string) (*entity.DagInstance, error)
RunDag
type DefDispatcher ¶
type DefDispatcher struct {
// contains filtered or unexported fields
}
DefDispatcher
type DefExecutor ¶
type DefExecutor struct {
// contains filtered or unexported fields
}
DefExecutor
func NewDefExecutor ¶
func NewDefExecutor(timeout time.Duration, workers int) *DefExecutor
NewDefExecutor
func (*DefExecutor) CancelTaskIns ¶
func (e *DefExecutor) CancelTaskIns(taskInsIds []string) error
CancelTaskIns
func (*DefExecutor) Push ¶
func (e *DefExecutor) Push(dagIns *entity.DagInstance, taskIns *entity.TaskInstance)
Push task to execute
type DefParser ¶
type DefParser struct {
// contains filtered or unexported fields
}
DefParser
func NewDefParser ¶
NewDefParser
func (*DefParser) EntryTaskIns ¶
func (p *DefParser) EntryTaskIns(taskIns *entity.TaskInstance)
EntryTaskIns
func (*DefParser) InitialDagIns ¶
func (p *DefParser) InitialDagIns(dagIns *entity.DagInstance)
InitialDagIns
type DefWatchDog ¶
type DefWatchDog struct {
// contains filtered or unexported fields
}
DefWatchDog
func NewDefWatchDog ¶
func NewDefWatchDog(dagScheduledTimeout time.Duration) *DefWatchDog
NewDefWatchDog
type DistributedMutex ¶
type DistributedMutex interface { Lock(ctx context.Context, ops ...LockOptionOp) error Unlock(ctx context.Context) error }
DistributedMutex
type Executor ¶
type Executor interface { Push(dagIns *entity.DagInstance, taskIns *entity.TaskInstance) CancelTaskIns(taskInsIds []string) error }
Executor is used to execute task
type Keeper ¶
type Keeper interface { Closer IsLeader() bool IsAlive(workerKey string) (bool, error) AliveNodes() ([]string, error) WorkerKey() string WorkerNumber() int NewMutex(key string) DistributedMutex }
Keeper
type ListDagInstanceInput ¶
type ListDagInstanceInput struct { Worker string DagID string UpdatedEnd int64 Status []entity.DagInstanceStatus HasCmd bool Limit int64 Offset int64 }
ListDagInstanceInput
type ListTaskInstanceInput ¶
type ListTaskInstanceInput struct { IDs []string DagInsID string Status []entity.TaskInstanceStatus // query expired tasks(it will calculate task's timeout) Expired bool SelectField []string }
ListTaskInstanceInput
type LockOption ¶
LockOption
type LockOptionOp ¶
type LockOptionOp func(option *LockOption)
func LockTTL ¶
func LockTTL(d time.Duration) LockOptionOp
LockTTL configured lock ttl, default values: 30s
type MockCloser ¶
MockCloser is an autogenerated mock type for the Closer type
func (*MockCloser) Close ¶
func (_m *MockCloser) Close()
Close provides a mock function with given fields:
type MockExecutor ¶
MockExecutor is an autogenerated mock type for the Executor type
func (*MockExecutor) CancelTaskIns ¶
func (_m *MockExecutor) CancelTaskIns(taskInsId []string) error
CancelTaskIns provides a mock function with given fields: taskInsId
func (*MockExecutor) Push ¶
func (_m *MockExecutor) Push(data *entity.DagInstance, taskIns *entity.TaskInstance)
Push provides a mock function with given fields: dagIns, taskIns
type MockKeeper ¶
MockKeeper is an autogenerated mock type for the Keeper type
func (*MockKeeper) AliveNodes ¶
func (_m *MockKeeper) AliveNodes() ([]string, error)
AliveNodes provides a mock function with given fields:
func (*MockKeeper) Close ¶
func (_m *MockKeeper) Close()
Close provides a mock function with given fields:
func (*MockKeeper) IsLeader ¶
func (_m *MockKeeper) IsLeader() bool
IsLeader provides a mock function with given fields:
func (*MockKeeper) NewMutex ¶
func (_m *MockKeeper) NewMutex(key string) DistributedMutex
NewMutex provides a mock function with given fields: key
func (*MockKeeper) WorkerKey ¶
func (_m *MockKeeper) WorkerKey() string
WorkerKey provides a mock function with given fields:
func (*MockKeeper) WorkerNumber ¶
func (_m *MockKeeper) WorkerNumber() int
WorkerNumber provides a mock function with given fields:
type MockParser ¶
MockParser is an autogenerated mock type for the Parser type
func (*MockParser) EntryTaskIns ¶
func (_m *MockParser) EntryTaskIns(taskIns *entity.TaskInstance)
EntryTaskIns provides a mock function with given fields: taskIns
func (*MockParser) InitialDagIns ¶
func (_m *MockParser) InitialDagIns(dagIns *entity.DagInstance)
InitialDagIns provides a mock function with given fields: dagIns
type MockStore ¶
MockStore is an autogenerated mock type for the Store type
func (*MockStore) BatchCreatTaskIns ¶
func (_m *MockStore) BatchCreatTaskIns(taskIns []*entity.TaskInstance) error
BatchCreatTaskIns provides a mock function with given fields: taskIns
func (*MockStore) BatchUpdateDagIns ¶
func (_m *MockStore) BatchUpdateDagIns(dagIns []*entity.DagInstance) error
BatchUpdateDagIns provides a mock function with given fields: dagIns
func (*MockStore) BatchUpdateTaskIns ¶
func (_m *MockStore) BatchUpdateTaskIns(taskIns []*entity.TaskInstance) error
BatchUpdateTaskIns provides a mock function with given fields: taskIns
func (*MockStore) Close ¶
func (_m *MockStore) Close()
Close provides a mock function with given fields:
func (*MockStore) CreateDagIns ¶
func (_m *MockStore) CreateDagIns(dagIns *entity.DagInstance) error
CreateDagIns provides a mock function with given fields: dagIns
func (*MockStore) GetDagInstance ¶
func (_m *MockStore) GetDagInstance(dagInsId string) (*entity.DagInstance, error)
GetDagInstance provides a mock function with given fields: dagInsId
func (*MockStore) GetTaskIns ¶
func (_m *MockStore) GetTaskIns(taskIns string) (*entity.TaskInstance, error)
GetTaskIns provides a mock function with given fields: taskIns
func (*MockStore) ListDagInstance ¶
func (_m *MockStore) ListDagInstance(input *ListDagInstanceInput) ([]*entity.DagInstance, error)
ListDagInstance provides a mock function with given fields: input
func (*MockStore) ListTaskInstance ¶
func (_m *MockStore) ListTaskInstance(input *ListTaskInstanceInput) ([]*entity.TaskInstance, error)
ListTaskInstance provides a mock function with given fields: input
func (*MockStore) PatchDagIns ¶
func (_m *MockStore) PatchDagIns(dagIns *entity.DagInstance, mustsPatchFields ...string) error
PatchDagIns provides a mock function with given fields: dagIns, mustsPatchFields
func (*MockStore) PatchTaskIns ¶
func (_m *MockStore) PatchTaskIns(taskIns *entity.TaskInstance) error
PatchTaskIns provides a mock function with given fields: taskIns
func (*MockStore) UpdateDagIns ¶
func (_m *MockStore) UpdateDagIns(dagIns *entity.DagInstance) error
UpdateDagIns provides a mock function with given fields: dagIns
func (*MockStore) UpdateTaskIns ¶
func (_m *MockStore) UpdateTaskIns(taskIns *entity.TaskInstance) error
UpdateTaskIns provides a mock function with given fields: taskIns
type MockTaskInfoGetter ¶
type MockTaskInfoGetter struct { ID string Depend []string Status entity.TaskInstanceStatus }
func (*MockTaskInfoGetter) GetDepend ¶
func (_m *MockTaskInfoGetter) GetDepend() []string
GetDepend provides a mock function with given fields:
func (*MockTaskInfoGetter) GetGraphID ¶
func (_m *MockTaskInfoGetter) GetGraphID() string
GetGraphID provides a mock function with given fields:
func (*MockTaskInfoGetter) GetID ¶
func (_m *MockTaskInfoGetter) GetID() string
GetID provides a mock function with given fields:
func (*MockTaskInfoGetter) GetStatus ¶
func (_m *MockTaskInfoGetter) GetStatus() entity.TaskInstanceStatus
GetStatus provides a mock function with given fields:
type Parser ¶
type Parser interface { InitialDagIns(dagIns *entity.DagInstance) EntryTaskIns(taskIns *entity.TaskInstance) }
Parser used to execute command, init dag instance and push task instance
type Store ¶
type Store interface { Closer CreateDag(dag *entity.Dag) error CreateDagIns(dagIns *entity.DagInstance) error BatchCreatTaskIns(taskIns []*entity.TaskInstance) error PatchTaskIns(taskIns *entity.TaskInstance) error PatchDagIns(dagIns *entity.DagInstance, mustsPatchFields ...string) error UpdateDag(dagIns *entity.Dag) error UpdateDagIns(dagIns *entity.DagInstance) error UpdateTaskIns(taskIns *entity.TaskInstance) error BatchUpdateDagIns(dagIns []*entity.DagInstance) error BatchUpdateTaskIns(taskIns []*entity.TaskInstance) error GetTaskIns(taskIns string) (*entity.TaskInstance, error) GetDag(dagId string) (*entity.Dag, error) GetDagInstance(dagInsId string) (*entity.DagInstance, error) ListDagInstance(input *ListDagInstanceInput) ([]*entity.DagInstance, error) ListTaskInstance(input *ListTaskInstanceInput) ([]*entity.TaskInstance, error) Marshal(obj interface{}) ([]byte, error) Unmarshal(bytes []byte, ptr interface{}) error }
Store used to persist obj
type TaskInfoGetter ¶
type TaskInfoGetter interface { GetDepend() []string GetID() string GetGraphID() string GetStatus() entity.TaskInstanceStatus }
TaskInfoGetter
func MapMockTasksToGetter ¶
func MapMockTasksToGetter(taskIns []*MockTaskInfoGetter) (ret []TaskInfoGetter)
MapMockTasksToGetter
func MapTaskInsToGetter ¶
func MapTaskInsToGetter(taskIns []*entity.TaskInstance) (ret []TaskInfoGetter)
MapTaskInsToGetter
func MapTasksToGetter ¶
func MapTasksToGetter(taskIns []entity.Task) (ret []TaskInfoGetter)
MapTasksToGetter
type TaskNode ¶
type TaskNode struct { TaskInsID string Status entity.TaskInstanceStatus // contains filtered or unexported fields }
TaskNode
func NewTaskNodeFromGetter ¶
func NewTaskNodeFromGetter(instance TaskInfoGetter) *TaskNode
NewTaskNodeFromGetter
func (*TaskNode) CanBeExecuted ¶
CanBeExecuted check whether task could be executed
func (*TaskNode) ComputeStatus ¶
func (t *TaskNode) ComputeStatus() (status TreeStatus, srcTaskInsId string)
ComputeStatus
func (*TaskNode) GetExecutableTaskIds ¶
GetExecutableTaskIds is unique task id map
func (*TaskNode) GetNextTaskIds ¶
func (t *TaskNode) GetNextTaskIds(completedOrRetryTask *entity.TaskInstance) (executable []string, find bool)
GetNextTaskIds
type TreeStatus ¶
type TreeStatus string
const ( TreeStatusRunning TreeStatus = "running" TreeStatusSuccess TreeStatus = "success" TreeStatusFailed TreeStatus = "failed" TreeStatusBlocked TreeStatus = "blocked" )