Documentation ¶
Index ¶
- func Contains(src []string, target string) bool
- type CONTROL_CODE
- type ConfigManager
- type DagConfig
- type DagGraph
- type DagGraphNode
- type DagNodeHandler
- type HandlerManager
- type Job
- func (this *Job) ParseJobInputs(input map[string]interface{}) error
- func (this *Job) ParseJobOutputs() (map[string]interface{}, error)
- func (this *Job) Run(parentCtx context.Context, input map[string]interface{}) (output map[string]interface{}, err error)
- func (this *Job) Schedule(parentCtx context.Context) error
- type JobContext
- type NodeConfig
- type Task
- func (this *Task) ParseTaskInputs(context *JobContext) (map[string]interface{}, error)
- func (this *Task) ParseTaskOutputs(output map[string]interface{}, context *JobContext) error
- func (this *Task) RealRun(parentCtx context.Context, jobContext *JobContext)
- func (this *Task) Run(parentCtx context.Context, jobContext *JobContext, callback func())
- func (this *Task) WaitPre()
- type YamlConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type CONTROL_CODE ¶
type CONTROL_CODE int
const ( EXECUTION_CONTINUE CONTROL_CODE = iota EXECUTION_SKIP_FOLLOWER_TASK EXECUTION_STOP_JOB )
type ConfigManager ¶
* 配置管理器
type DagConfig ¶
type DagConfig struct { Name string `yaml:"name"` Input []string `yaml:"input"` Output []string `yaml:"output"` Nodes map[string]*NodeConfig `yaml:"nodes"` }
func ParseConfig ¶
type DagGraphNode ¶
type DagNodeHandler ¶
type DagNodeHandler interface { Init(parentCtx context.Context, params map[string]interface{}) error //初始化方法 Process(parentCtx context.Context, input map[string]interface{}) (map[string]interface{}, error) }
* dag节点处理器
type HandlerManager ¶
type HandlerManager interface {
CreateHandler(handlerName string) (DagNodeHandler, error)
}
* dag节点管理器
type Job ¶
* 一个计算任务
func (*Job) ParseJobInputs ¶
func (*Job) ParseJobOutputs ¶
func (*Job) Run ¶
func (this *Job) Run(parentCtx context.Context, input map[string]interface{}) (output map[string]interface{}, err error)
执行dag job。 每个dag task节点都有一个waitGroup,用于监听前序节点执行状态 |task| ----------> |task| ----> |task| ----> |task| \ / ----> |task| ----> |task| / |task| ----------> |task| ----> |task| ----> |task|
type JobContext ¶
type JobContext struct { sync.RWMutex Config *DagConfig GlobalVals *sync.Map HandlerRegistry HandlerManager }
func NewJobContext ¶
func NewJobContext(config *DagConfig, registry HandlerManager) *JobContext
func (*JobContext) GetVals ¶
func (this *JobContext) GetVals(paramNames []string) (map[string]interface{}, error)
func (*JobContext) UpdateVals ¶
func (this *JobContext) UpdateVals(params map[string]interface{})
type NodeConfig ¶
type Task ¶
func (*Task) ParseTaskInputs ¶
func (this *Task) ParseTaskInputs(context *JobContext) (map[string]interface{}, error)
func (*Task) ParseTaskOutputs ¶
func (this *Task) ParseTaskOutputs(output map[string]interface{}, context *JobContext) error
func (*Task) RealRun ¶
func (this *Task) RealRun(parentCtx context.Context, jobContext *JobContext)
* 执行任务
type YamlConfig ¶
type YamlConfig struct {
DagConfig *DagConfig `yaml:"dag_config"`
}
Click to show internal directories.
Click to hide internal directories.