light_flow

package module
v0.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 25, 2024 License: MIT Imports: 12 Imported by: 0

README

light-flow

Introduction

light-flow is a task arrange framework.

Designed to provide the most efficient orchestration and execution strategy for tasks with dependencies.

Features

Efficient Task Planning: The framework allows you to define task dependencies, enabling the efficient planning and sequencing of steps based on their relationships

Context Connect And Isolation: Tasks can only access to the context of dependent tasks up to the root task. Modifications to the context by the current task will not affect disconnected tasks.

Rich Test Case: Test cases cover every public API and most scenarios.

Minimal Code Requirement: With this framework, you only need to write a minimal amount of code to define and execute your tasks.

Task Dependency Visualization:The framework provides a draw plugin to visualize the dependencies between tasks.

Installation

Install the framework by running the following command:

go get gitee.com/MetaphysicCoding/light-flow

Draw your flow

Step 1: Define Step Functions

Define the step functions that will be executed in the workflow.

Each step function should have a specific signature and return result and errors.

import (
	"fmt"
	flow "gitee.com/MetaphysicCoding/light-flow"
	"time"
)

func Step1(ctx *flow.Context) (any, error) {
    // If we have previously set the 'name' in input
    // then we can retrieve the 'name'
	name, exist := ctx.Get("name")
	if exist {
		fmt.Printf("step1 get 'name' from ctx: %s \n", name.(string))
	}
	ctx.Set("age", 18)
	return "finish", nil
}

func Step2(ctx *flow.Context) (any, error) {
    // If we have previously set the 'name'in the dependent steps, 
    // then we can retrieve the 'name'
	age, exist := ctx.Get("age")
	if exist {
		fmt.Printf("step2 get 'age' from ctx: %d \n", age.(int))
	}
	result, exist := ctx.GetStepResult("Step1")
	if exist {
		fmt.Printf("step2 get result from step1: %s \n", result.(string))
	}
	return nil, nil
}
Step 2: Register Work-Flow And Process

**After register, you could add current process in another work-flow. **

And you cloud merge another process into current process, usage see in advance usage

func init() {
	workflow := flow.RegisterFlow("MyFlow")
     // Processes of workflow are parallel
	process := workflow.AddProcess("MyProcess")
}
Step 3: Add Step And Define Dependencies
func init() {
	...
	process := workflow.AddProcess("MyProcess")
    // AddStep automatically uses "Step1" as step name
	process.AddStep(Step1)
    // AddStepWithAlias use alias "Step2" as the step name
    // Identify Step 1 as a dependency of Step 2
	process.AddStepWithAlias("Step2", Step2, Step1)
}
Step 4: Run Work-Flow
func main() {
    // Done flow run work-flow and block until all processes are completed.
    // If you want to run  asynchronously, you could use AsyncFlow in stead
    // AsyncFlow can stop and pause the workflow or process.
	features := flow.DoneFlow("MyFlow", map[string]any{"name": "foo"})
}
Step 5: Get Execute ResultI By Features
func main() {
	result := flow.DoneFlow("MyFlow", map[string]any{"name": "foo"})
	if result.Success() {
		return
	}
	for processName, feature := range result.Features() {
        // Exceptions may include Timeout、Panic、Error
		fmt.Printf("process [%s] failed exceptions: %v \n", processName, feature.Exceptions())
	}
}
Complete Example
package main

import (
	"fmt"
	flow "gitee.com/MetaphysicCoding/light-flow"
)

func Step1(ctx *flow.Context) (any, error) {
	name, exist := ctx.Get("name")
	if exist {
		fmt.Printf("step1 get 'name' from ctx: %s \n", name.(string))
	}
	ctx.Set("age", 18)
	return "finish", nil
}

func Step2(ctx *flow.Context) (any, error) {
	age, exist := ctx.Get("age")
	if exist {
		fmt.Printf("step2 get 'age' from ctx: %d \n", age.(int))
	}
	result, exist := ctx.GetStepResult("Step1")
	if exist {
		fmt.Printf("step2 get result from step1: %s \n", result.(string))
	}
	return nil, nil
}

func init() {
	workflow := flow.RegisterFlow("MyFlow")
	process := workflow.AddProcess("MyProcess")
	process.AddStep(Step1)
	process.AddStepWithAlias("Step2", Step2, Step1)
}

func main() {
	result := flow.DoneFlow("MyFlow", map[string]any{"name": "foo"})
	if result.Success() {
		return
	}
	for processName, feature := range result.Features() {
		fmt.Printf("process [%s] failed exceptions: %v \n", processName, feature.Exceptions())
	}
}

Advance Usage

Depends
  1. Depends indicates that the current step will not be executed until all dependent steps are completed.
  2. flow.Context use adjacency list as data structure. The current step can only access context from dependent steps up to the root step.
Context Connect And Isolation

We define dependencies like flow code.

workflow := flow.RegisterFlow("MyFlow")
process := workflow.AddProcess("MyProcess")
process.AddStep(TaskA)
process.AddStep(TaskB, TaskA)
process.AddStep(TaskC, TaskA)
process.AddStep(TaskD, TaskB, TaskC)
process.AddStep(TaskE, TaskC)

The dependency relationship is shown in the figure

Relation

TaskD can access the context of TaskB,TaskC,TaskA.

TaskE can access the context of TaskC,TaskA, but TaskE can't access the context of TaskB.

Note:  
Key first match in its own context, then matched in parents context, 
finally matched in global contxt.

You can use AddPriority to break the order 
Process Reuse And Merge

You can add a registered process to the work-flow and merge a registered process into the current process.

workflow := flow.RegisterFlow("MyFlow")
// Add a registered process called 'RegisterProcess' to the current workflow.
workflow.AddRegisterProcess("RegisterProcess")

process := workflow.AddProcess("MyProcess")
// Merge process called 'AnotherProcess' to the current process.
process.Merge("AnotherProcess")
// You can merge a process to the current process at any position, 
// and you can still add your own steps both before and after the merge operation.
process.AddStep(TaskA)

Merge will eliminates duplicate steps and merges the dependencies of duplicate steps after deduplication.

If merge creates a cycle, then Merge method will panic and it indicates which two steps form the cycle.

Documentation

Index

Constants

View Source
const (
	Before = "Before"
	After  = "After"
)

Variables

View Source
var (
	Pending      = &statusEnum{0, "Pending"}
	Running      = &statusEnum{0b1, "Running"}
	Pause        = &statusEnum{0b1 << 1, "Pause"}
	Success      = &statusEnum{0b1 << 15, "Success"}
	NormalMask   = &statusEnum{0b1<<16 - 1, "NormalMask"}
	Cancel       = &statusEnum{0b1 << 16, "Cancel"}
	Timeout      = &statusEnum{0b1 << 17, "Timeout"}
	Panic        = &statusEnum{0b1 << 18, "Panic"}
	Error        = &statusEnum{0b1 << 19, "Error"}
	Stop         = &statusEnum{0b1 << 20, "Stop"}
	CallbackFail = &statusEnum{0b1 << 21, "CallbackFail"}
	Failed       = &statusEnum{0b1 << 31, "Failed"}
	// AbnormalMask An abnormal step status will cause the cancellation of dependent unexecuted steps.
	AbnormalMask = &statusEnum{NormalMask.flag << 16, "AbnormalMask"}
)

these variable are used to indicate the status of the unit

Functions

func BuildRunFlow

func BuildRunFlow(name string, input map[string]any) *runFlow

func DoneArgs

func DoneArgs(name string, args ...any) resultI

func DoneFlow

func DoneFlow(name string, input map[string]any) resultI

func SetIdGenerator

func SetIdGenerator(method func() string)

Types

type FlowConfig

type FlowConfig struct {
	ProcessConfig `flow:"skip"`
	// contains filtered or unexported fields
}

func CreateDefaultConfig

func CreateDefaultConfig() *FlowConfig

func (*FlowConfig) AfterFlow

func (fc *FlowConfig) AfterFlow(must bool, callback func(*WorkFlow) (keepOn bool, err error)) *callback[*WorkFlow]

func (*FlowConfig) BeforeFlow

func (fc *FlowConfig) BeforeFlow(must bool, callback func(*WorkFlow) (keepOn bool, err error)) *callback[*WorkFlow]

type FlowController

type FlowController interface {
	Done() []*Future
	ListProcess() []string
	ProcessController(name string) controller
	// contains filtered or unexported methods
}

func AsyncArgs

func AsyncArgs(name string, args ...any) FlowController

func AsyncFlow

func AsyncFlow(name string, input map[string]any) FlowController

type FlowMeta

type FlowMeta struct {
	FlowConfig
	// contains filtered or unexported fields
}

func RegisterFlow

func RegisterFlow(name string) *FlowMeta

func (*FlowMeta) NoUseDefault

func (fm *FlowMeta) NoUseDefault() *FlowMeta

func (*FlowMeta) Process

func (fm *FlowMeta) Process(name string) *ProcessMeta

type Future

type Future struct {
	// contains filtered or unexported fields
}

func (*Future) Done

func (f *Future) Done()

Done method waits for the corresponding process to complete.

func (Future) Exceptions added in v0.0.5

func (s Future) Exceptions() []string

Exceptions return contain exception's message

func (Future) ExplainStatus added in v0.0.5

func (s Future) ExplainStatus() []string

ExplainStatus function explains the status represented by the provided bitmask. The function checks the status against predefined abnormal and normal flags, and returns a slice of strings containing the names of the matching flags. Parameter status is the bitmask representing the status. The returned slice contains the names of the matching flags in the layer they were found. If abnormal flags are found, normal flags will be ignored.

func (Future) GetId

func (bi Future) GetId() string

func (Future) GetName

func (bi Future) GetName() string

func (Future) Has added in v0.0.5

func (s Future) Has(enum *statusEnum) bool

func (Future) Normal added in v0.0.5

func (s Future) Normal() bool

Normal return true if not exception occur

func (Future) Success added in v0.0.5

func (s Future) Success() bool

Success return true if finish running and success

type Process added in v0.0.5

type Process struct {
	FlowId string
	// contains filtered or unexported fields
}

func (*Process) ContextName added in v0.0.5

func (pi *Process) ContextName() string

Fix ContextName return first step name

func (*Process) GetId added in v0.0.5

func (bi *Process) GetId() string

func (*Process) GetName added in v0.0.5

func (bi *Process) GetName() string

type ProcessConfig

type ProcessConfig struct {
	StepConfig
	ProcTimeout       time.Duration
	ProcNotUseDefault bool
	// contains filtered or unexported fields
}

func (*ProcessConfig) AfterProcess

func (pc *ProcessConfig) AfterProcess(must bool, callback func(*Process) (keepOn bool, err error)) *callback[*Process]

func (*ProcessConfig) AfterStep

func (pc *ProcessConfig) AfterStep(must bool, callback func(*Step) (keepOn bool, err error)) *callback[*Step]

func (*ProcessConfig) BeforeProcess

func (pc *ProcessConfig) BeforeProcess(must bool, callback func(*Process) (keepOn bool, err error)) *callback[*Process]

func (*ProcessConfig) BeforeStep

func (pc *ProcessConfig) BeforeStep(must bool, callback func(*Step) (keepOn bool, err error)) *callback[*Step]

func (*ProcessConfig) NotUseDefault

func (pc *ProcessConfig) NotUseDefault()

func (*ProcessConfig) ProcessTimeout

func (pc *ProcessConfig) ProcessTimeout(timeout time.Duration) *ProcessConfig

func (*ProcessConfig) StepsRetry

func (pc *ProcessConfig) StepsRetry(retry int) *ProcessConfig

func (*ProcessConfig) StepsTimeout

func (pc *ProcessConfig) StepsTimeout(timeout time.Duration) *ProcessConfig

type ProcessMeta

type ProcessMeta struct {
	ProcessConfig
	// contains filtered or unexported fields
}

func (*ProcessMeta) Merge

func (pm *ProcessMeta) Merge(name string)

Merge will not merge config, because has not effective design to not use merged config.

func (*ProcessMeta) NameStep added in v0.0.6

func (pm *ProcessMeta) NameStep(run func(ctx StepCtx) (any, error), name string, depends ...any) *StepMeta

func (*ProcessMeta) Step

func (pm *ProcessMeta) Step(run func(ctx StepCtx) (any, error), depends ...any) *StepMeta

func (*ProcessMeta) Tail

func (pm *ProcessMeta) Tail(run func(ctx StepCtx) (any, error), alias ...string) *StepMeta

type Step added in v0.0.5

type Step struct {
	StepCtx
	ProcessId string
	FlowId    string
	Start     time.Time
	End       time.Time
	Err       error
	// contains filtered or unexported fields
}

func (*Step) Error added in v0.0.5

func (si *Step) Error() error

func (Step) GetId added in v0.0.5

func (bi Step) GetId() string

func (Step) GetName added in v0.0.5

func (bi Step) GetName() string

type StepConfig

type StepConfig struct {
	StepTimeout time.Duration
	StepRetry   int
}

type StepCtx added in v0.0.5

type StepCtx interface {
	GetEndValues(key string) map[string]any
	GetResult(key string) (value any, exist bool)
	// contains filtered or unexported methods
}

type StepMeta

type StepMeta struct {
	StepConfig
	// contains filtered or unexported fields
}

func (*StepMeta) Next

func (meta *StepMeta) Next(run func(ctx StepCtx) (any, error), alias ...string) *StepMeta

func (*StepMeta) Priority

func (meta *StepMeta) Priority(priority map[string]any)

func (*StepMeta) Retry

func (meta *StepMeta) Retry(retry int) *StepMeta

func (*StepMeta) Same

func (meta *StepMeta) Same(run func(ctx StepCtx) (any, error), alias ...string) *StepMeta

func (*StepMeta) Timeout

func (meta *StepMeta) Timeout(timeout time.Duration) *StepMeta

type WorkFlow added in v0.0.5

type WorkFlow struct {
	// contains filtered or unexported fields
}

func (WorkFlow) GetId added in v0.0.5

func (bi WorkFlow) GetId() string

func (WorkFlow) GetName added in v0.0.5

func (bi WorkFlow) GetName() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL