pipeline

package
v1.5.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 15, 2024 License: GPL-3.0 Imports: 8 Imported by: 0

README

流水线(Pipeline)

流水线是用来定义流程并执行相关操作的抽象。下图给出了流水线的一个简单抽象。

image

我们定义的流水线包含以下几种概念:

  1. 节点:封装某一种或者某一类操作,这个操作由使用流水线包的项目按照自己的需求自行实现, 我们也会逐渐添加一些预定义的节点供大家使用;
  2. 流程:节点的容器,规定了其内节点的执行顺序,预定义的流程包括顺序(seq)流程, 条件(if)流程,循环(loop)流程,遍历(range)流程,交换参数(bi)流程,也可按照需要自定义流程;
  3. 流水线:由流程和节点组成,其根流程本质上就是一个顺序流程, 其下可以包含各种其他流程或者节点。

结构如下图所示,整体采用了组合模式

image

定义流水线

定义流水线需要不断交错进行两个步骤,节点或流程开发和流水线编排。

节点或流程开发

代码定义了Component接口,从上面的类图中可以看到,无论是流程还是节点,本质上就是一个Component,需要定义自己的流程或者节点,仅需要实现Component接口

// "git.sxidc.com/go-tools/utils/pipeline/component"

type Component interface {
	GetType() string // 返回组件类型,如内部预定义的顺序流程的,返回seq
	GetName() string // 返回组件实例的名称,这个名称是在流水线编排中传递进来的
	Run(globalRunParams *GlobalRunParams, dynamicParams map[string]any) (any, error) // 执行逻辑,由于流水线使用了Go的协程,这里返回一个RunToken,需要Wait在RunToken上,以获取结果
}

为了简化开发,我们提供了一个Component的基类,定义一个自己的Component可以用下面更简单的方法

type FooComponent struct {
	component.BaseComponent
}

func (f *FooComponent) Run(globalRunParams *component.GlobalRunParams, dynamicParams map[string]any) (any, error) {
	return f.OnRun(globalRunParams, dynamicParams, 
		func(globalRunParams *component.GlobalRunParams, dynamicParams map[string]any) (any, error) { 
			// 完成运行逻辑
			// globalRunParams全局运行参数,运行流水线章节详细说明 
			// dynamicParams动态运行参数,运行流水线章节详细说明 
			// 返回值是本组件运行后的结果及出错信息
	})
}

组件的创建使用了Builder模式,需要定义一个实现了Builder接口的组件构建器,Builder接口如下所示:

// "git.sxidc.com/go-tools/utils/pipeline/component"

type Builder interface {
	ProductType() string // 返回制品类型,应当与上面实现的Component的GetType方法返回值一致
	Build(name string, buildParams map[string]any, runParams map[string]any) (Component, error) // 构建过程
}

但是我们定义的流程和节点还需要进行注册才能使用,注册需要调用如下函数:

// "git.sxidc.com/go-tools/utils/pipeline/component"

func RegisterComponentBuilders(builders ...Builder) error

如果要动态注销,需要调用下面的接口:

// "git.sxidc.com/go-tools/utils/pipeline/component"

func UnRegisterComponents(typeNames []string)

如果此时想单独测试自己编写的组件,可以使用下面的函数创建组件

// "git.sxidc.com/go-tools/utils/pipeline/component"

func BuildComponent(typeName string, name string, buildParams map[string]any, runParams map[string]any) (Component, error)
流水线编排

有两种编排流水线的方式:代码通过构造Definition结构编排流水线和通过YAML编排

通过Definition编排

Definition结构定义如下:

// "git.sxidc.com/go-tools/pipeline/utils"

type Definition struct {
	Name       string                `yaml:"name"` // 流水线名称
	Components []ComponentDefinition `yaml:"components"` // 组件定义
}

type ComponentDefinition struct {
    Type        string         `yaml:"type"` // 组件类型,确保是注册过的组件
    Name        string         `yaml:"name"` // 组件名称
    BuildParams map[string]any `yaml:"build_params"` // 组件构建参数,不同的组件需要的构建参数不同
    RunParams   map[string]any `yaml:"run_params"` // 构建时指定的静态运行参数
}

使用Definition需要填充对应的属性,然后运行Definition的方法即可创建流水线:

// "git.sxidc.com/go-tools/pipeline/utils"

func (def *Definition) NewPipeline() (*Pipeline, error)
通过YAML编排

通过yaml文件编排本质上还是通过Definition生成,编写的YAML文件格式从上面的Definition的tag中就能看到,下面给出一个测试中用到的YAML定义:

name: test
components:
  - type: seq
    name: seq-flow
    build_params:
      components:
        - type: println
          name: seq-flow-node
          run_params:
            content: seq-flow-node
  - type: println
    name: pipeline-node
    run_params:
      content: pipeline-node
  - type: if
    name: if-flow
    build_params:
      condition:
        type: seq
        name: condition_flow
        build_params:
          components:
            - type: bool
              name: bool
              run_params:
                op: pass
                value: true
      condition_true:
        type: seq
        name: true-flow
        build_params:
          components:
            - type: println
              name: if-node-true
              run_params:
                content: if-node-true
      condition_false:
        type: println
        name: if-node-false
        run_params:
          content: if-node-false
  - type: loop
    name: loop-flow
    build_params:
      condition:
        type: bool
        name: condition-node
        run_params:
          op: rand
      sub:
        type: seq
        name: sub-flow
        build_params:
          components:
            - type: println
              name: loop-sub-node
              run_params:
                content: loop-sub-node
  - type: range
    name: range-flow
    build_params:
      values:
        - range first
        - range second
      sub:
        type: println
        name: range-sub-node
        run_params:
          content: range-sub-node
  - type: bi
    name: bi-flow
    build_params:
      components:
        - type: println
          name: bi-flow-node
          run_params:
            content: bi-flow-node
    run_params:
      is_bi: true
      left_params: left
      right_params: right

后面章节会给出几种内置流程的构建参数和运行参数,可以对照查看上面的编排。YAML编排可以将内容写入文件,也可以是内存中保存的,有两个函数可以使用YAML编排创建流水线:

// "git.sxidc.com/go-tools/pipeline/utils"

// 通过YAML定义文件创建流水线
func NewPipelineFromYaml(yamlPath string) (*Pipeline, error)

// 通过YAML字符串创建流水线
func NewPipelineFromYamlStr(yamlStr string) (*Pipeline, error)

运行流水线

运行流水线,这里只需要调用Pipeline的方法即可

// "git.sxidc.com/go-tools/pipeline/utils"

func (p *Pipeline) Run(globalRunParams map[string]any, dynamicParams map[string]any) *component.RunToken

下面是一段样例代码

	token := p.Run(globalRunParams, dynamicParams)
	if token.Wait(); token.Err != nil {
		t.Fatal(token.Err)
	}

	fmt.Println(token.Result)

这里可以看到有两个参数,分别是全局运行参数和动态运行参数,这里对运行流水线的运行参数进行说明,运行流水线的参数有三种:

  1. 全局运行参数:该运行参数会在流水线中的所有流程和节点之间传递,也可以在运行时利用该参数传递组件之间共享的参数;
  2. 构建时静态运行参数:构建一个组件(流程或者节点)的时候,我们可以传递静态运行参数,该参数会在组件运行过程中被使用;
  3. 动态运行参数:用来在运行时指定的运行参数,如果同时指定了静态运行参数,则会覆盖静态运行参数

动态运行时参数有两种构造方式:一种是使用map直接构造,另一种是利用YAML构造,但不管使用哪种方法,动态运行时参数都是通过组件的名称进行查找的,如下是对应上面YAML编排示例的动态运行参数构造:

seq-flow:
  seq-flow-node:
    content: "!!!!seq-flow-node!!!!"
pipeline-node:
  content: "!!!!pipeline-node!!!!"
if-flow:
  condition_flow:
    bool:
      op: pass
      value: true
  true-flow:
    if-node-true:
      content: "!!!!if-node-true!!!!"
  if-node-false:
    content: "!!!!if-node-false!!!!"
loop-flow:
  sub-flow:
    loop-sub-node:
      content: "!!!!loop-sub-node!!!!"
range-flow:
  range-sub-node:
    content: "!!!!range-sub-node!!!!"
bi-flow:
  is_bi: true
  left_params: "!!!!left!!!!"
  right_params: "!!!!right!!!!"
  bi-flow-node:
    content: "!!!!bi-flow-node!!!!"

附:自定义的几种流程的构建参数和运行参数

顺序流程

流程类型:seq

构建参数:

components: # 包含的子组件定义数组
  - type: 任何的流程或者节点类型
    name: 该组件在流水线中的名称
    build_params: # 该类型组件的构建参数
      ...
    run_params: # 该类型组件的静态运行时参数
      ...

运行参数:无

条件流程

流程类型:if

构建参数:

condition: # 条件判断组件,返回结果必须为bool
  type: 任何的流程或者节点类型
  name: 该组件在流水线中的名称
  build_params: # 该类型组件的构建参数
    ...
  run_params: # 该类型组件的静态运行时参数
    ...
condition_true: # 条件为真时运行的组件
  type: 任何的流程或者节点类型
  name: 该组件在流水线中的名称
  build_params: # 该类型组件的构建参数
    ...
  run_params: # 该类型组件的静态运行时参数
    ...
condition_false: # 条件为假时运行的组件
  type: 任何的流程或者节点类型
  name: 该组件在流水线中的名称
  build_params: # 该类型组件的构建参数
    ...
  run_params: # 该类型组件的静态运行时参数
    ...

运行参数:无

循环流程

流程类型:loop

构建参数:

condition: # 条件判断组件,返回结果必须为bool
  type: 任何的流程或者节点类型
  name: 该组件在流水线中的名称
  build_params: # 该类型组件的构建参数
    ...
  run_params: # 该类型组件的静态运行时参数
    ...
sub: # 循环体组件
  type: 任何的流程或者节点类型
  name: 该组件在流水线中的名称
  build_params: # 该类型组件的构建参数
    ...
  run_params: # 该类型组件的静态运行时参数
    ...

运行参数:无

遍历流程

流程类型:range

构建参数:

values: # 需要遍历的值的数组
  - 遍历值
sub:
  type: 任何的流程或者节点类型
  name: 该组件在流水线中的名称
  build_params: # 该类型组件的构建参数
    ...
  run_params: # 该类型组件的静态运行时参数
    ...

运行参数:无

** 该流程会在子组件的动态运行参数中植入一个键为range_once_value的参数值,该值给出了当前range到的值,另外,还会植入一个键为range_index的参数值(int类型),给出遍历到的Index。

交换流程

流程类型:bi

构建参数:

components:
  - type: 任何的流程或者节点类型
    name: 该组件在流水线中的名称
    build_params: # 该类型组件的构建参数
      ...
    run_params: # 该类型组件的静态运行时参数
      ...

运行参数:

is_bi: 是否双向运行(交换参数)
left_params: 左参数,对应Go的any类型
right_params: 右参数,对应Go的any类型

** 该流程会在每轮运行时在动态运行参数中植入两个参数bi_left和bi_right,分别代表本轮执行的左参数和右参数

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func LoadDynamicParamsFromYaml

func LoadDynamicParamsFromYaml(yamlPath string) (map[string]any, error)

func LoadDynamicParamsFromYamlStr

func LoadDynamicParamsFromYamlStr(yamlStr string) (map[string]any, error)

Types

type ComponentDefinition

type ComponentDefinition struct {
	Type        string         `yaml:"type"`
	Name        string         `yaml:"name"`
	BuildParams map[string]any `yaml:"build_params"`
	RunParams   map[string]any `yaml:"run_params"`
}

func (*ComponentDefinition) Check

func (def *ComponentDefinition) Check() error

type Definition

type Definition struct {
	Name       string                `yaml:"name"`
	Components []ComponentDefinition `yaml:"components"`
}

func (*Definition) Check

func (def *Definition) Check() error

func (*Definition) NewPipeline

func (def *Definition) NewPipeline() (*Pipeline, error)

type Pipeline

type Pipeline struct {
	Flow *flow.Seq
}

func NewPipelineFromYaml

func NewPipelineFromYaml(yamlPath string) (*Pipeline, error)

func NewPipelineFromYamlStr

func NewPipelineFromYamlStr(yamlStr string) (*Pipeline, error)

func (*Pipeline) Run

func (p *Pipeline) Run(globalRunParams map[string]any, dynamicParams map[string]any) *RunToken

type RunToken

type RunToken struct {
	Result any
	Err    error
	// contains filtered or unexported fields
}

func NewRunToken

func NewRunToken() *RunToken

func (*RunToken) Done

func (token *RunToken) Done()

func (*RunToken) Wait

func (token *RunToken) Wait()

Directories

Path Synopsis
test

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL