libdag

package module
v0.0.0-...-1bc2417 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 25, 2019 License: Apache-2.0 Imports: 7 Imported by: 0

README

libdag

一个简单的dag实现库。

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Contains

func Contains(src []string, target string) bool

Types

type CONTROL_CODE

type CONTROL_CODE int
const (
	EXECUTION_CONTINUE CONTROL_CODE = iota
	EXECUTION_SKIP_FOLLOWER_TASK
	EXECUTION_STOP_JOB
)

type ConfigManager

type ConfigManager interface {
	GetConfig(configKey string) (*DagConfig, error)
}

* 配置管理器

type DagConfig

type DagConfig struct {
	Name   string                 `yaml:"name"`
	Input  []string               `yaml:"input"`
	Output []string               `yaml:"output"`
	Nodes  map[string]*NodeConfig `yaml:"nodes"`
}

func ParseConfig

func ParseConfig(configData []byte) (*DagConfig, error)

type DagGraph

type DagGraph struct {
	Nodes map[string]*DagGraphNode
}

邻接表

func NewGraph

func NewGraph() *DagGraph

func ParseGraph

func ParseGraph(dagConf *DagConfig) (*DagGraph, error)

func (*DagGraph) AddEdge

func (this *DagGraph) AddEdge(from string, to string)

建立边

func (*DagGraph) AddNode

func (this *DagGraph) AddNode(name string)

增加节点

func (*DagGraph) Toposort

func (this *DagGraph) Toposort() ([]*DagGraphNode, error)

拓扑排序

type DagGraphNode

type DagGraphNode struct {
	Name     string
	Indegree int        //入度
	Outputs  *list.List //输出节点
}

type DagNodeHandler

type DagNodeHandler interface {
	Init(parentCtx context.Context, params map[string]interface{}) error //初始化方法
	Process(parentCtx context.Context, input map[string]interface{}) (map[string]interface{}, error)
}

* dag节点处理器

type HandlerManager

type HandlerManager interface {
	CreateHandler(handlerName string) (DagNodeHandler, error)
}

* dag节点管理器

type Job

type Job struct {
	Key  string
	Name string
	// contains filtered or unexported fields
}

* 一个计算任务

func NewJob

func NewJob(jobKey string, jobContext *JobContext) (*Job, error)

func (*Job) ParseJobInputs

func (this *Job) ParseJobInputs(input map[string]interface{}) error

func (*Job) ParseJobOutputs

func (this *Job) ParseJobOutputs() (map[string]interface{}, error)

func (*Job) Run

func (this *Job) Run(parentCtx context.Context, input map[string]interface{}) (output map[string]interface{}, err error)
    执行dag job。
	每个dag task节点都有一个waitGroup,用于监听前序节点执行状态

	|task| ----------> |task| ----> |task| ----> |task|
	            \               /
                 ----> |task| ----> |task|
                /
    |task| ----------> |task| ----> |task| ----> |task|

func (*Job) Schedule

func (this *Job) Schedule(parentCtx context.Context) error

type JobContext

type JobContext struct {
	sync.RWMutex
	Config          *DagConfig
	GlobalVals      *sync.Map
	HandlerRegistry HandlerManager
}

func NewJobContext

func NewJobContext(config *DagConfig, registry HandlerManager) *JobContext

func (*JobContext) GetVals

func (this *JobContext) GetVals(paramNames []string) (map[string]interface{}, error)

func (*JobContext) UpdateVals

func (this *JobContext) UpdateVals(params map[string]interface{})

type NodeConfig

type NodeConfig struct {
	Name      string                 `yaml:"name"`
	Labels    []string               `yaml:"labels"`
	Critical  bool                   `yaml:"critical"`
	Processor string                 `yaml:"processor"`
	Params    map[string]interface{} `yaml:"params"`
	Input     map[string]string      `yaml:"input"`
	Output    map[string]string      `yaml:"output"`
}

type Task

type Task struct {
	Key  string
	Name string
	// contains filtered or unexported fields
}

func NewTask

func NewTask(node *NodeConfig, job *Job) (*Task, error)

func (*Task) ParseTaskInputs

func (this *Task) ParseTaskInputs(context *JobContext) (map[string]interface{}, error)

func (*Task) ParseTaskOutputs

func (this *Task) ParseTaskOutputs(output map[string]interface{}, context *JobContext) error

func (*Task) RealRun

func (this *Task) RealRun(parentCtx context.Context, jobContext *JobContext)

* 执行任务

func (*Task) Run

func (this *Task) Run(parentCtx context.Context, jobContext *JobContext, callback func())

func (*Task) WaitPre

func (this *Task) WaitPre()

* 等待前序节点完成

type YamlConfig

type YamlConfig struct {
	DagConfig *DagConfig `yaml:"dag_config"`
}

Directories

Path Synopsis
utils

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL