mod

package
v1.0.1-20230322 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 22, 2023 License: MIT Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ReasonSuccessAfterCanceled = "success after canceled"
	ReasonParentCancel         = "parent success but already be canceled"
)
View Source
const DefFailedReason = "force failed by watch dog because it execute too long"

Variables

View Source
var (
	// CommSync means commander will watch dag instance's cmd executing situation until it's command is executed
	// usually command executing time is very short, so async mode is enough,
	// but if you want a sync call, you set it to true
	CommSync = func() CommandOptSetter {
		return func(opt *CommandOption) {
			opt.isSync = true
		}
	}
	// CommSync means commander will watch dag instance's cmd executing situation until it's command is executed
	// usually command executing time is very short, so async mode is enough,
	// but if you want a sync call, you set it to true
	CommSyncTimeout = func(duration time.Duration) CommandOptSetter {
		return func(opt *CommandOption) {
			if duration > 0 {
				opt.syncTimeout = duration
			}
		}
	}
	// CommSyncInterval is just work at sync mode, it is the interval of watch dag instance
	// default is 500ms
	CommSyncInterval = func(duration time.Duration) CommandOptSetter {
		return func(opt *CommandOption) {
			if duration > 0 {
				opt.syncInterval = duration
			}
		}
	}
)
View Source
var (
	ActionMap = map[string]run.Action{}
)

Functions

func SetCommander

func SetCommander(c Commander)

SetCommander

func SetExecutor

func SetExecutor(e Executor)

SetExecutor

func SetKeeper

func SetKeeper(e Keeper)

SetKeeper

func SetParser

func SetParser(e Parser)

SetParser

func SetStore

func SetStore(e Store)

SetStore

Types

type Closer

type Closer interface {
	Close()
}

Closer means the component need be closeFunc

type CommandOptSetter

type CommandOptSetter func(opt *CommandOption)

type CommandOption

type CommandOption struct {
	// contains filtered or unexported fields
}

CommandOption

type Commander

type Commander interface {
	RunDag(dagId string, specVar map[string]string) (*entity.DagInstance, error)
	RetryDagIns(dagInsId string, ops ...CommandOptSetter) error
	RetryTask(taskInsIds []string, ops ...CommandOptSetter) error
	CancelTask(taskInsIds []string, ops ...CommandOptSetter) error
}

Commander used to execute command

func GetCommander

func GetCommander() Commander

GetCommander

type DefCommander

type DefCommander struct {
}

DefCommander used to execute command

func (*DefCommander) CancelTask

func (c *DefCommander) CancelTask(taskInsIds []string, ops ...CommandOptSetter) error

CancelTask

func (*DefCommander) RetryDagIns

func (c *DefCommander) RetryDagIns(dagInsId string, ops ...CommandOptSetter) error

RetryDagIns

func (*DefCommander) RetryTask

func (c *DefCommander) RetryTask(taskInsIds []string, ops ...CommandOptSetter) error

RetryTask

func (*DefCommander) RunDag

func (c *DefCommander) RunDag(dagId string, specVars map[string]string) (*entity.DagInstance, error)

RunDag

type DefDispatcher

type DefDispatcher struct {
	// contains filtered or unexported fields
}

DefDispatcher

func NewDefDispatcher

func NewDefDispatcher() *DefDispatcher

NewDefDispatcher

func (*DefDispatcher) Close

func (d *DefDispatcher) Close()

Close component

func (*DefDispatcher) Do

func (d *DefDispatcher) Do() error

Do dispatch

func (*DefDispatcher) Init

func (d *DefDispatcher) Init()

Init

func (*DefDispatcher) WatchInitDags

func (d *DefDispatcher) WatchInitDags()

WatchInitDags

type DefExecutor

type DefExecutor struct {
	// contains filtered or unexported fields
}

DefExecutor

func NewDefExecutor

func NewDefExecutor(timeout time.Duration, workers int) *DefExecutor

NewDefExecutor

func (*DefExecutor) CancelTaskIns

func (e *DefExecutor) CancelTaskIns(taskInsIds []string) error

CancelTaskIns

func (*DefExecutor) Close

func (e *DefExecutor) Close()

Close

func (*DefExecutor) Init

func (e *DefExecutor) Init()

Init

func (*DefExecutor) Push

func (e *DefExecutor) Push(dagIns *entity.DagInstance, taskIns *entity.TaskInstance)

Push task to execute

type DefParser

type DefParser struct {
	// contains filtered or unexported fields
}

DefParser

func NewDefParser

func NewDefParser(workerNumber int, taskTimeout time.Duration) *DefParser

NewDefParser

func (*DefParser) Close

func (p *DefParser) Close()

Close

func (*DefParser) EntryTaskIns

func (p *DefParser) EntryTaskIns(taskIns *entity.TaskInstance)

EntryTaskIns

func (*DefParser) Init

func (p *DefParser) Init()

Init

func (*DefParser) InitialDagIns

func (p *DefParser) InitialDagIns(dagIns *entity.DagInstance)

InitialDagIns

type DefWatchDog

type DefWatchDog struct {
	// contains filtered or unexported fields
}

DefWatchDog

func NewDefWatchDog

func NewDefWatchDog(dagScheduledTimeout time.Duration) *DefWatchDog

NewDefWatchDog

func (*DefWatchDog) Close

func (wd *DefWatchDog) Close()

Close

func (*DefWatchDog) Init

func (wd *DefWatchDog) Init()

Init

type DistributedMutex

type DistributedMutex interface {
	Lock(ctx context.Context, ops ...LockOptionOp) error
	Unlock(ctx context.Context) error
}

DistributedMutex

type Executor

type Executor interface {
	Push(dagIns *entity.DagInstance, taskIns *entity.TaskInstance)
	CancelTaskIns(taskInsIds []string) error
}

Executor is used to execute task

func GetExecutor

func GetExecutor() Executor

GetExecutor

type Keeper

type Keeper interface {
	Closer
	IsLeader() bool
	IsAlive(workerKey string) (bool, error)
	AliveNodes() ([]string, error)
	WorkerKey() string
	WorkerNumber() int
	NewMutex(key string) DistributedMutex
}

Keeper

func GetKeeper

func GetKeeper() Keeper

GetKeeper

type ListDagInput

type ListDagInput struct {
}

ListDagInput

type ListDagInstanceInput

type ListDagInstanceInput struct {
	Worker     string
	DagID      string
	UpdatedEnd int64
	Status     []entity.DagInstanceStatus
	HasCmd     bool
	Limit      int64
	Offset     int64
}

ListDagInstanceInput

type ListTaskInstanceInput

type ListTaskInstanceInput struct {
	IDs      []string
	DagInsID string
	Status   []entity.TaskInstanceStatus
	// query expired tasks(it will calculate task's timeout)
	Expired     bool
	SelectField []string
}

ListTaskInstanceInput

type LockOption

type LockOption struct {
	TTL               time.Duration
	ReentrantIdentity string
	SpinInterval      time.Duration
}

LockOption

func NewLockOption

func NewLockOption(ops []LockOptionOp) *LockOption

NewLockOption

type LockOptionOp

type LockOptionOp func(option *LockOption)

func LockTTL

func LockTTL(d time.Duration) LockOptionOp

LockTTL configured lock ttl, default values: 30s

func Reentrant

func Reentrant(identity string) LockOptionOp

Reentrant mean lock it reentrant

type MockCloser

type MockCloser struct {
	mock.Mock
}

MockCloser is an autogenerated mock type for the Closer type

func (*MockCloser) Close

func (_m *MockCloser) Close()

Close provides a mock function with given fields:

type MockExecutor

type MockExecutor struct {
	mock.Mock
}

MockExecutor is an autogenerated mock type for the Executor type

func (*MockExecutor) CancelTaskIns

func (_m *MockExecutor) CancelTaskIns(taskInsId []string) error

CancelTaskIns provides a mock function with given fields: taskInsId

func (*MockExecutor) Push

func (_m *MockExecutor) Push(data *entity.DagInstance, taskIns *entity.TaskInstance)

Push provides a mock function with given fields: dagIns, taskIns

type MockKeeper

type MockKeeper struct {
	mock.Mock
}

MockKeeper is an autogenerated mock type for the Keeper type

func (*MockKeeper) AliveNodes

func (_m *MockKeeper) AliveNodes() ([]string, error)

AliveNodes provides a mock function with given fields:

func (*MockKeeper) Close

func (_m *MockKeeper) Close()

Close provides a mock function with given fields:

func (*MockKeeper) IsAlive

func (_m *MockKeeper) IsAlive(workerKey string) (bool, error)

func (*MockKeeper) IsLeader

func (_m *MockKeeper) IsLeader() bool

IsLeader provides a mock function with given fields:

func (*MockKeeper) NewMutex

func (_m *MockKeeper) NewMutex(key string) DistributedMutex

NewMutex provides a mock function with given fields: key

func (*MockKeeper) WorkerKey

func (_m *MockKeeper) WorkerKey() string

WorkerKey provides a mock function with given fields:

func (*MockKeeper) WorkerNumber

func (_m *MockKeeper) WorkerNumber() int

WorkerNumber provides a mock function with given fields:

type MockParser

type MockParser struct {
	mock.Mock
}

MockParser is an autogenerated mock type for the Parser type

func (*MockParser) EntryTaskIns

func (_m *MockParser) EntryTaskIns(taskIns *entity.TaskInstance)

EntryTaskIns provides a mock function with given fields: taskIns

func (*MockParser) InitialDagIns

func (_m *MockParser) InitialDagIns(dagIns *entity.DagInstance)

InitialDagIns provides a mock function with given fields: dagIns

type MockStore

type MockStore struct {
	mock.Mock
}

MockStore is an autogenerated mock type for the Store type

func (*MockStore) BatchCreatTaskIns

func (_m *MockStore) BatchCreatTaskIns(taskIns []*entity.TaskInstance) error

BatchCreatTaskIns provides a mock function with given fields: taskIns

func (*MockStore) BatchUpdateDagIns

func (_m *MockStore) BatchUpdateDagIns(dagIns []*entity.DagInstance) error

BatchUpdateDagIns provides a mock function with given fields: dagIns

func (*MockStore) BatchUpdateTaskIns

func (_m *MockStore) BatchUpdateTaskIns(taskIns []*entity.TaskInstance) error

BatchUpdateTaskIns provides a mock function with given fields: taskIns

func (*MockStore) Close

func (_m *MockStore) Close()

Close provides a mock function with given fields:

func (*MockStore) CreateDag

func (_m *MockStore) CreateDag(dag *entity.Dag) error

CreateDag provides a mock function with given fields: dag

func (*MockStore) CreateDagIns

func (_m *MockStore) CreateDagIns(dagIns *entity.DagInstance) error

CreateDagIns provides a mock function with given fields: dagIns

func (*MockStore) GetDag

func (_m *MockStore) GetDag(dagId string) (*entity.Dag, error)

GetDag provides a mock function with given fields: dagId

func (*MockStore) GetDagInstance

func (_m *MockStore) GetDagInstance(dagInsId string) (*entity.DagInstance, error)

GetDagInstance provides a mock function with given fields: dagInsId

func (*MockStore) GetTaskIns

func (_m *MockStore) GetTaskIns(taskIns string) (*entity.TaskInstance, error)

GetTaskIns provides a mock function with given fields: taskIns

func (*MockStore) ListDagInstance

func (_m *MockStore) ListDagInstance(input *ListDagInstanceInput) ([]*entity.DagInstance, error)

ListDagInstance provides a mock function with given fields: input

func (*MockStore) ListTaskInstance

func (_m *MockStore) ListTaskInstance(input *ListTaskInstanceInput) ([]*entity.TaskInstance, error)

ListTaskInstance provides a mock function with given fields: input

func (*MockStore) Marshal

func (_m *MockStore) Marshal(obj interface{}) ([]byte, error)

func (*MockStore) PatchDagIns

func (_m *MockStore) PatchDagIns(dagIns *entity.DagInstance, mustsPatchFields ...string) error

PatchDagIns provides a mock function with given fields: dagIns, mustsPatchFields

func (*MockStore) PatchTaskIns

func (_m *MockStore) PatchTaskIns(taskIns *entity.TaskInstance) error

PatchTaskIns provides a mock function with given fields: taskIns

func (*MockStore) Unmarshal

func (_m *MockStore) Unmarshal(bytes []byte, ptr interface{}) error

func (*MockStore) UpdateDag

func (_m *MockStore) UpdateDag(dag *entity.Dag) error

UpdateDagIns provides a mock function with given fields: dagIns

func (*MockStore) UpdateDagIns

func (_m *MockStore) UpdateDagIns(dagIns *entity.DagInstance) error

UpdateDagIns provides a mock function with given fields: dagIns

func (*MockStore) UpdateTaskIns

func (_m *MockStore) UpdateTaskIns(taskIns *entity.TaskInstance) error

UpdateTaskIns provides a mock function with given fields: taskIns

type MockTaskInfoGetter

type MockTaskInfoGetter struct {
	ID     string
	Depend []string
	Status entity.TaskInstanceStatus
}

func (*MockTaskInfoGetter) GetDepend

func (_m *MockTaskInfoGetter) GetDepend() []string

GetDepend provides a mock function with given fields:

func (*MockTaskInfoGetter) GetGraphID

func (_m *MockTaskInfoGetter) GetGraphID() string

GetGraphID provides a mock function with given fields:

func (*MockTaskInfoGetter) GetID

func (_m *MockTaskInfoGetter) GetID() string

GetID provides a mock function with given fields:

func (*MockTaskInfoGetter) GetStatus

GetStatus provides a mock function with given fields:

type Parser

type Parser interface {
	InitialDagIns(dagIns *entity.DagInstance)
	EntryTaskIns(taskIns *entity.TaskInstance)
}

Parser used to execute command, init dag instance and push task instance

func GetParser

func GetParser() Parser

GetParser

type Store

type Store interface {
	Closer
	CreateDag(dag *entity.Dag) error
	CreateDagIns(dagIns *entity.DagInstance) error
	BatchCreatTaskIns(taskIns []*entity.TaskInstance) error
	PatchTaskIns(taskIns *entity.TaskInstance) error
	PatchDagIns(dagIns *entity.DagInstance, mustsPatchFields ...string) error
	UpdateDag(dagIns *entity.Dag) error
	UpdateDagIns(dagIns *entity.DagInstance) error
	UpdateTaskIns(taskIns *entity.TaskInstance) error
	BatchUpdateDagIns(dagIns []*entity.DagInstance) error
	BatchUpdateTaskIns(taskIns []*entity.TaskInstance) error
	GetTaskIns(taskIns string) (*entity.TaskInstance, error)
	GetDag(dagId string) (*entity.Dag, error)
	GetDagInstance(dagInsId string) (*entity.DagInstance, error)
	ListDagInstance(input *ListDagInstanceInput) ([]*entity.DagInstance, error)
	ListTaskInstance(input *ListTaskInstanceInput) ([]*entity.TaskInstance, error)
	Marshal(obj interface{}) ([]byte, error)
	Unmarshal(bytes []byte, ptr interface{}) error
}

Store used to persist obj

func GetStore

func GetStore() Store

GetStore

type TaskInfoGetter

type TaskInfoGetter interface {
	GetDepend() []string
	GetID() string
	GetGraphID() string
	GetStatus() entity.TaskInstanceStatus
}

TaskInfoGetter

func MapMockTasksToGetter

func MapMockTasksToGetter(taskIns []*MockTaskInfoGetter) (ret []TaskInfoGetter)

MapMockTasksToGetter

func MapTaskInsToGetter

func MapTaskInsToGetter(taskIns []*entity.TaskInstance) (ret []TaskInfoGetter)

MapTaskInsToGetter

func MapTasksToGetter

func MapTasksToGetter(taskIns []entity.Task) (ret []TaskInfoGetter)

MapTasksToGetter

type TaskNode

type TaskNode struct {
	TaskInsID string
	Status    entity.TaskInstanceStatus
	// contains filtered or unexported fields
}

TaskNode

func BuildRootNode

func BuildRootNode(tasks []TaskInfoGetter) (*TaskNode, error)

BuildRootNode

func MustBuildRootNode

func MustBuildRootNode(tasks []TaskInfoGetter) *TaskNode

MustBuildRootNode

func NewTaskNodeFromGetter

func NewTaskNodeFromGetter(instance TaskInfoGetter) *TaskNode

NewTaskNodeFromGetter

func (*TaskNode) AppendChild

func (t *TaskNode) AppendChild(task *TaskNode)

AppendChild

func (*TaskNode) AppendParent

func (t *TaskNode) AppendParent(task *TaskNode)

AppendParent

func (*TaskNode) CanBeExecuted

func (t *TaskNode) CanBeExecuted() bool

CanBeExecuted check whether task could be executed

func (*TaskNode) CanExecuteChild

func (t *TaskNode) CanExecuteChild() bool

CanExecuteChild

func (*TaskNode) ComputeStatus

func (t *TaskNode) ComputeStatus() (status TreeStatus, srcTaskInsId string)

ComputeStatus

func (*TaskNode) Executable

func (t *TaskNode) Executable() bool

Executable

func (*TaskNode) GetExecutableTaskIds

func (t *TaskNode) GetExecutableTaskIds() (executables []string)

GetExecutableTaskIds is unique task id map

func (*TaskNode) GetNextTaskIds

func (t *TaskNode) GetNextTaskIds(completedOrRetryTask *entity.TaskInstance) (executable []string, find bool)

GetNextTaskIds

func (*TaskNode) HasCycle

func (t *TaskNode) HasCycle() (cycleStart *TaskNode)

HasCycle

type TaskTree

type TaskTree struct {
	DagIns *entity.DagInstance
	Root   *TaskNode
}

TaskTree

type TreeStatus

type TreeStatus string
const (
	TreeStatusRunning TreeStatus = "running"
	TreeStatusSuccess TreeStatus = "success"
	TreeStatusFailed  TreeStatus = "failed"
	TreeStatusBlocked TreeStatus = "blocked"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL