wfengine

package
v1.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 19, 2024 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Overview

* Description : workflow engine v3.1 Author : dayunzhangyunfeng@didiglobal.com Date : 2021-05-14

*

Description : workflow v3.0 with branch
Author		: dayunzhangyunfeng@didiglobal.com
Date		: 2021-05-14

Index

Constants

View Source
const (
	YES = "Y"
	NO  = "N"
)
View Source
const (
	RedisKeySceneModule = "scene_module_app_"
	RedisKeyWorkflow    = "workflow_app_"
	RedisKeyVersion     = "version_app_"
)
View Source
const (
	ActionTypeTask    = "task"
	ActionTypeCond    = "condition"
	ActionTypeFlow    = "flow"
	ActionTypeTimeout = "timeout"
	BranchKeyDefault  = "default"
	BranchKeyJoiner   = "_"
)
View Source
const (
	ErrNoUnknown = 1000
)

Variables

This section is empty.

Functions

func GetLatestVersionFromFile

func GetLatestVersionFromFile(path string) (string, error)

func LoadSceneModuleMapFromFile

func LoadSceneModuleMapFromFile(path string) (map[int64]*SceneModule, error)

func LoadWorkflowFromApollo

func LoadWorkflowFromApollo(configParams map[string]string) (map[int64]*Workflow, error)

func LoadWorkflowFromFile

func LoadWorkflowFromFile(dirPath string, smMap map[int64]*SceneModule) (map[int64]*Workflow, error)

Types

type Action

type Action struct {
	ActionType     string   `json:"action_type"`
	ActionId       string   `json:"action_id"`
	ActionName     string   `json:"action_name"`
	Params         []*Param `json:"params"`
	NextActionIds  []string `json:"next_action_ids"`
	NextConditions []string `json:"next_conditions"`
	PrevActionIds  []string `json:"prev_action_ids"`
	Timeout        int64    `json:"timeout"`
	TimeoutAsync   bool     `json:"timeout_async"`
	TimeoutDynamic bool     `json:"timeout_dynamic"`
	RefWorkflowId  int64    `json:"ref_workflow_id"`
	Description    string   `json:"description"`
}

func (*Action) Detach

func (a *Action) Detach(prevAction *Action)

type CondExecutors

type CondExecutors struct {
	InnerExecutor  *reflect.Value
	OuterExecutors *sync.Map
}

func GetCondExecutors

func GetCondExecutors() *CondExecutors

func (CondExecutors) Execute

func (c CondExecutors) Execute(actionName string, paramValues []interface{}) (string, error)

func (CondExecutors) RegisterCondExecutor

func (c CondExecutors) RegisterCondExecutor(conditionName string, cdt interface{})

type Condition

type Condition struct{}

func (*Condition) EQ

func (c *Condition) EQ(itra, itrb interface{}) (string, error)

*

比较两个数的大小是否相等,相等返回1,否则返回0

func (*Condition) GE

func (c *Condition) GE(itra, itrb interface{}) (string, error)

func (*Condition) GT

func (c *Condition) GT(itra, itrb interface{}) (string, error)

func (*Condition) LE

func (c *Condition) LE(itra, itrb interface{}) (string, error)

func (*Condition) LT

func (c *Condition) LT(itra, itrb interface{}) (string, error)

func (*Condition) NE

func (c *Condition) NE(itra, itrb interface{}) (string, error)

*

比较两个数的大小是否不相等,不相等返回y,否则返回n

func (*Condition) SW

func (c *Condition) SW(itra interface{}) (string, error)

type FlowSelector

type FlowSelector interface {
	SelectWorkflowId(sc *model.StrategyContext, sceneModule *SceneModule) (int64, string, error)
}

* there are 3 flow selector:

1: random
2: custom
3: apollo (apollo platform in didi)

type GroupSelector

type GroupSelector struct {
	FlowSelector
}

func (*GroupSelector) SelectWorkflowId

func (a *GroupSelector) SelectWorkflowId(sc *model.StrategyContext, sceneModule *SceneModule) (int64, string, error)

* apollo分流, 如果出现error,就取缺省分桶,同时返回error信息

type IModelBase

type IModelBase interface {
	DoAction(context.Context, *model.StrategyContext) interface{}
	OnTimeout(context.Context, *model.StrategyContext)
	SetName(string)
	GetName() string
}

type ModelBase

type ModelBase struct {
	IModelBase
	Name string
}

func (*ModelBase) DoAction

func (m *ModelBase) DoAction(context.Context, *model.StrategyContext) interface{}

func (*ModelBase) GetName

func (m *ModelBase) GetName() string

func (*ModelBase) OnTimeout

func (m *ModelBase) OnTimeout(context.Context, *model.StrategyContext)

func (*ModelBase) SetName

func (m *ModelBase) SetName(name string)

type ModuleObjBase

type ModuleObjBase interface {
	NewObj(moduleName string) IModelBase
}

type Param

type Param struct {
	Name  string `json:"name"`
	Value string `json:"value"`
	Type  string `json:"type"`
}

type RandomSelector

type RandomSelector struct {
	FlowSelector
}

func (*RandomSelector) SelectWorkflowId

func (r *RandomSelector) SelectWorkflowId(sc *model.StrategyContext, sceneModule *SceneModule) (int64, string, error)

在线随机分流

type SceneModule

type SceneModule struct {
	Id                     int64            `json:"id"`
	Name                   string           `json:"name"`
	AppId                  int64            `json:"appid"`
	BucketType             int              `json:"bucket_type"`
	SlotMap                map[int]int64    `json:"slots"`
	UpdateTime             time.Time        `json:"update_time"`
	FlowType               int              `json:"flow_type"`
	GroupWorkflowMap       map[string]int64 `json:"group_workflows"`
	DefaultWorkflowId      int64            `json:"default_workflow_id"`
	DispatchExperimentName string           `json:"dispatch_experiment_name"`
}

func (*SceneModule) GetWorkflowId

func (this *SceneModule) GetWorkflowId(groupName string) (int64, error)

type TimeWaiter

type TimeWaiter struct {
	// contains filtered or unexported fields
}

func NewTimeWaiter

func NewTimeWaiter(timeOut int64) *TimeWaiter

func (*TimeWaiter) AddTimeout

func (t *TimeWaiter) AddTimeout(timeout int64)

func (*TimeWaiter) Done

func (t *TimeWaiter) Done()

func (*TimeWaiter) Wait

func (t *TimeWaiter) Wait() bool

type Workflow

type Workflow struct {
	Id          int64           `json:"id"`
	DimensionId int64           `json:"dimension_id"`
	SceneId     int64           `json:"scene_id"`
	FlowChart   string          `json:"flow_chart"`
	FlowCharts  *WorkflowChart  `json:"flow_charts"`
	FlowBranch  *WorkflowBranch `json:"flow_branch"`
	IsDefault   int             `json:"is_default"`
	Range1      string          `json:"range1"`
	Range2      string          `json:"range2"`
	Remark      string          `json:"remark"`
	UpdateTime  time.Time       `json:"update_time"`
	GroupName   string          `json:"group_name"`
}

func (*Workflow) GetWorkflowChart

func (w *Workflow) GetWorkflowChart() *WorkflowChart

type WorkflowBranch

type WorkflowBranch struct {
	SortedBranch  []string           `json:"sorted_branch"`
	CurrentBranch map[string]int     `json:"current_branch"`
	ActionMap     map[string]*Action `json:"actions"`
}

type WorkflowChart

type WorkflowChart struct {
	FirstActionId string             `json:"first_action_id"`
	LastActionId  string             `json:"last_action_id"`
	HashCondition bool               `json:"has_condition"`
	ActionMap     map[string]*Action `json:"actions"`
}

func NewWorkflowChart

func NewWorkflowChart(flowChartStr string) (*WorkflowChart, error)

func (*WorkflowChart) CreateWaitMap

func (w *WorkflowChart) CreateWaitMap() (map[string]*sync.WaitGroup, map[string]*TimeWaiter)

type WorkflowEngine

type WorkflowEngine struct {
	FlowSelectors map[int]FlowSelector `json:"flow_selectors"`
	// contains filtered or unexported fields
}

func NewWorkflowEngine

func NewWorkflowEngine(sceneModuleMap map[int64]*SceneModule, workflowMap map[int64]*Workflow, version string, moduleObj ModuleObjBase) (*WorkflowEngine, error)

func NewWorkflowEngineFromConfig

func NewWorkflowEngineFromConfig(moduleObj ModuleObjBase, configParams map[string]string) (*WorkflowEngine, error)

func NewWorkflowEngineFromFile

func NewWorkflowEngineFromFile(moduleObj ModuleObjBase, configPath string) (*WorkflowEngine, error)

func NewWorkflowEngineFromKV

func NewWorkflowEngineFromKV(moduleObj ModuleObjBase, sceneModuleMapString, workflowMapStr, version string) (*WorkflowEngine, error)

func (*WorkflowEngine) GetVersion

func (w *WorkflowEngine) GetVersion() string

func (*WorkflowEngine) GetWorkflow

func (w *WorkflowEngine) GetWorkflow(workflowId int64) (*Workflow, bool)

func (*WorkflowEngine) RegisterCondExecutor

func (w *WorkflowEngine) RegisterCondExecutor(conditionName string, executor interface{})

func (*WorkflowEngine) Run

func (*WorkflowEngine) SelectWorkflow

func (w *WorkflowEngine) SelectWorkflow(ctx context.Context, sc *model.StrategyContext) (*Workflow, error)

func (*WorkflowEngine) SetCustomFlowSelector

func (w *WorkflowEngine) SetCustomFlowSelector(flowSelector FlowSelector)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL