light_flow

package module
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 24, 2024 License: MIT Imports: 11 Imported by: 0

README

light-flow

Introduction

light-flow is a task arrange framework.

Designed to provide the most efficient orchestration and execution strategy for tasks with dependencies.

Features

Efficient Task Planning: The framework allows you to define task dependencies, enabling the efficient planning and sequencing of steps based on their relationships

Context Connect And Isolation: Tasks can only access to the context of dependent tasks up to the root task. Modifications to the context by the current task will not affect disconnected tasks.

Rich Test Case: Test cases cover every public API and most scenarios.

Minimal Code Requirement: With this framework, you only need to write a minimal amount of code to define and execute your tasks.

Task Dependency Visualization:The framework provides a draw plugin to visualize the dependencies between tasks.

Installation

Install the framework by running the following command:

go get gitee.com/MetaphysicCoding/light-flow

Draw your flow

Step 1: Define Step Functions

Define the step functions that will be executed in the workflow.

Each step function should have a specific signature and return result and errors.

import (
	"fmt"
	flow "gitee.com/MetaphysicCoding/light-flow"
	"time"
)

func Step1(ctx *flow.Context) (any, error) {
    // If we have previously set the 'name' in input
    // then we can retrieve the 'name'
	name, exist := ctx.Get("name")
	if exist {
		fmt.Printf("step1 get 'name' from ctx: %s \n", name.(string))
	}
	ctx.Set("age", 18)
	return "finish", nil
}

func Step2(ctx *flow.Context) (any, error) {
    // If we have previously set the 'name'in the dependent steps, 
    // then we can retrieve the 'name'
	age, exist := ctx.Get("age")
	if exist {
		fmt.Printf("step2 get 'age' from ctx: %d \n", age.(int))
	}
	result, exist := ctx.GetStepResult("Step1")
	if exist {
		fmt.Printf("step2 get result from step1: %s \n", result.(string))
	}
	return nil, nil
}
Step 2: Register Work-Flow And Process

**After register, you could add current process in another work-flow. **

And you cloud merge another process into current process, usage see in advance usage

func init() {
	workflow := flow.RegisterFlow("MyFlow")
     // Processes of workflow are parallel
	process := workflow.AddProcess("MyProcess")
}
Step 3: Add Step And Define Dependencies
func init() {
	...
	process := workflow.AddProcess("MyProcess")
    // AddStep automatically uses "Step1" as step name
	process.AddStep(Step1)
    // AddStepWithAlias use alias "Step2" as the step name
    // Identify Step 1 as a dependency of Step 2
	process.AddStepWithAlias("Step2", Step2, Step1)
}
Step 4: Run Work-Flow
func main() {
    // Done flow run work-flow and block until all processes are completed.
    // If you want to run  asynchronously, you could use AsyncFlow in stead
    // AsyncFlow can stop and pause the workflow or process.
	features := flow.DoneFlow("MyFlow", map[string]any{"name": "foo"})
}
Step 5: Get Execute ResultI By Features
func main() {
	result := flow.DoneFlow("MyFlow", map[string]any{"name": "foo"})
	if result.Success() {
		return
	}
	for processName, feature := range result.Features() {
        // Exceptions may include Timeout、Panic、Error
		fmt.Printf("process [%s] failed exceptions: %v \n", processName, feature.Exceptions())
	}
}
Complete Example
package main

import (
	"fmt"
	flow "gitee.com/MetaphysicCoding/light-flow"
)

func Step1(ctx *flow.Context) (any, error) {
	name, exist := ctx.Get("name")
	if exist {
		fmt.Printf("step1 get 'name' from ctx: %s \n", name.(string))
	}
	ctx.Set("age", 18)
	return "finish", nil
}

func Step2(ctx *flow.Context) (any, error) {
	age, exist := ctx.Get("age")
	if exist {
		fmt.Printf("step2 get 'age' from ctx: %d \n", age.(int))
	}
	result, exist := ctx.GetStepResult("Step1")
	if exist {
		fmt.Printf("step2 get result from step1: %s \n", result.(string))
	}
	return nil, nil
}

func init() {
	workflow := flow.RegisterFlow("MyFlow")
	process := workflow.AddProcess("MyProcess")
	process.AddStep(Step1)
	process.AddStepWithAlias("Step2", Step2, Step1)
}

func main() {
	result := flow.DoneFlow("MyFlow", map[string]any{"name": "foo"})
	if result.Success() {
		return
	}
	for processName, feature := range result.Features() {
		fmt.Printf("process [%s] failed exceptions: %v \n", processName, feature.Exceptions())
	}
}

Advance Usage

Depends
  1. Depends indicates that the current step will not be executed until all dependent steps are completed.
  2. flow.Context use adjacency list as data structure. The current step can only access context from dependent steps up to the root step.
Context Connect And Isolation

We define dependencies like flow code.

workflow := flow.RegisterFlow("MyFlow")
process := workflow.AddProcess("MyProcess")
process.AddStep(TaskA)
process.AddStep(TaskB, TaskA)
process.AddStep(TaskC, TaskA)
process.AddStep(TaskD, TaskB, TaskC)
process.AddStep(TaskE, TaskC)

The dependency relationship is shown in the figure

Relation

TaskD can access the context of TaskB,TaskC,TaskA.

TaskE can access the context of TaskC,TaskA, but TaskE can't access the context of TaskB.

Note:  
Key first match in its own context, then matched in parents context, 
finally matched in global contxt.

You can use AddPriority to break the order 
Process Reuse And Merge

You can add a registered process to the work-flow and merge a registered process into the current process.

workflow := flow.RegisterFlow("MyFlow")
// Add a registered process called 'RegisterProcess' to the current workflow.
workflow.AddRegisterProcess("RegisterProcess")

process := workflow.AddProcess("MyProcess")
// Merge process called 'AnotherProcess' to the current process.
process.Merge("AnotherProcess")
// You can merge a process to the current process at any position, 
// and you can still add your own steps both before and after the merge operation.
process.AddStep(TaskA)

Merge will eliminates duplicate steps and merges the dependencies of duplicate steps after deduplication.

If merge creates a cycle, then Merge method will panic and it indicates which two steps form the cycle.

Documentation

Index

Constants

View Source
const (
	Before = "Before"
	After  = "After"
)

Variables

View Source
var (
	End     = &StatusEnum{0b1, "End"}
	Head    = &StatusEnum{0b1 << 1, "Head"}
	HasNext = &StatusEnum{0b1 << 2, "HasNext"}
	Merged  = &StatusEnum{0b1 << 3, "Merged"}
)

these constants are used to indicate the position of the process

View Source
var (
	Pending      = &StatusEnum{0, "Pending"}
	Running      = &StatusEnum{0b1, "Running"}
	Pause        = &StatusEnum{0b1 << 1, "Pause"}
	Success      = &StatusEnum{0b1 << 15, "Success"}
	NormalMask   = &StatusEnum{0b1<<16 - 1, "NormalMask"}
	Cancel       = &StatusEnum{0b1 << 16, "Cancel"}
	Timeout      = &StatusEnum{0b1 << 17, "Timeout"}
	Panic        = &StatusEnum{0b1 << 18, "Panic"}
	Error        = &StatusEnum{0b1 << 19, "Error"}
	Stop         = &StatusEnum{0b1 << 20, "Stop"}
	CallbackFail = &StatusEnum{0b1 << 21, "CallbackFail"}
	Failed       = &StatusEnum{0b1 << 31, "Failed"}
	// AbnormalMask An abnormal step status will cause the cancellation of dependent unexecuted steps.
	AbnormalMask = &StatusEnum{NormalMask.flag << 16, "AbnormalMask"}
)

these variable are used to indicate the status of the unit

Functions

func BuildRunFlow

func BuildRunFlow(name string, input map[string]any) *runFlow

func CopyProperties

func CopyProperties(src, dst interface{})

func CopyPropertiesSkipNotEmpty

func CopyPropertiesSkipNotEmpty(src, dst interface{})

CopyPropertiesSkipNotEmpty will skip field contain`skip` tag value

func CreateStruct

func CreateStruct[T any](src any) (target T)

func GetFuncName

func GetFuncName(f interface{}) string

GetFuncName function retrieves the stepName of a provided function. If the provided function is anonymous function, it panics.

func GetStructName

func GetStructName(obj any) string

func SetIdGenerator

func SetIdGenerator(method func() string)

Types

type BasicInfoI

type BasicInfoI interface {
	StatusI

	GetId() string
	GetName() string
	// contains filtered or unexported methods
}

type Context

type Context interface {
	ContextName() string
	Get(key string) (value any, exist bool)
	GetAll(key string) map[string]any
	GetResult(key string) (value any, exist bool)
	Set(key string, value any)
}

type Controller

type Controller interface {
	Resume()
	Pause()
	Stop()
}

type FlowConfig

type FlowConfig struct {
	Handler[*FlowInfo] `flow:"skip"`
	ProcessConfig      `flow:"skip"`
}

func CreateDefaultConfig

func CreateDefaultConfig() *FlowConfig

func (*FlowConfig) AfterFlow

func (fc *FlowConfig) AfterFlow(must bool, callback func(*FlowInfo) (keepOn bool, err error)) *callback[*FlowInfo]

func (*FlowConfig) BeforeFlow

func (fc *FlowConfig) BeforeFlow(must bool, callback func(*FlowInfo) (keepOn bool, err error)) *callback[*FlowInfo]

type FlowController

type FlowController interface {
	Controller
	ResultI
	Done() []*Future
	ListProcess() []string
	ProcessController(name string) Controller
}

func AsyncArgs

func AsyncArgs(name string, args ...any) FlowController

func AsyncFlow

func AsyncFlow(name string, input map[string]any) FlowController

type FlowInfo

type FlowInfo struct {
	// contains filtered or unexported fields
}

func (FlowInfo) ContextName

func (vc FlowInfo) ContextName() string

func (FlowInfo) Get

func (vc FlowInfo) Get(key string) (value any, exist bool)

func (FlowInfo) GetAll

func (vc FlowInfo) GetAll(key string) map[string]any

func (FlowInfo) GetId

func (bi FlowInfo) GetId() string

func (FlowInfo) GetName

func (bi FlowInfo) GetName() string

func (FlowInfo) GetResult

func (vc FlowInfo) GetResult(key string) (value any, exist bool)

func (FlowInfo) Set

func (vc FlowInfo) Set(key string, value any)

type FlowMeta

type FlowMeta struct {
	FlowConfig
	// contains filtered or unexported fields
}

func RegisterFlow

func RegisterFlow(name string) *FlowMeta

func (*FlowMeta) CloneProcess

func (fm *FlowMeta) CloneProcess(name string) *ProcessMeta

func (*FlowMeta) NoUseDefault

func (fm *FlowMeta) NoUseDefault() *FlowMeta

func (*FlowMeta) Process

func (fm *FlowMeta) Process(name string) *ProcessMeta

type Future

type Future struct {
	*Status
	// contains filtered or unexported fields
}

func (*Future) Done

func (f *Future) Done()

Done method waits for the corresponding process to complete.

func (Future) GetId

func (bi Future) GetId() string

func (Future) GetName

func (bi Future) GetName() string

type Handler

type Handler[T BasicInfoI] struct {
	// contains filtered or unexported fields
}

type ProcessConfig

type ProcessConfig struct {
	StepConfig
	ProcTimeout       time.Duration
	ProcNotUseDefault bool
	// contains filtered or unexported fields
}

func (*ProcessConfig) AfterProcess

func (pc *ProcessConfig) AfterProcess(must bool, callback func(*ProcessInfo) (keepOn bool, err error)) *callback[*ProcessInfo]

func (*ProcessConfig) AfterStep

func (pc *ProcessConfig) AfterStep(must bool, callback func(*StepInfo) (keepOn bool, err error)) *callback[*StepInfo]

func (*ProcessConfig) BeforeProcess

func (pc *ProcessConfig) BeforeProcess(must bool, callback func(*ProcessInfo) (keepOn bool, err error)) *callback[*ProcessInfo]

func (*ProcessConfig) BeforeStep

func (pc *ProcessConfig) BeforeStep(must bool, callback func(*StepInfo) (keepOn bool, err error)) *callback[*StepInfo]

func (*ProcessConfig) NotUseDefault

func (pc *ProcessConfig) NotUseDefault()

func (*ProcessConfig) ProcessTimeout

func (pc *ProcessConfig) ProcessTimeout(timeout time.Duration) *ProcessConfig

func (*ProcessConfig) StepsRetry

func (pc *ProcessConfig) StepsRetry(retry int) *ProcessConfig

func (*ProcessConfig) StepsTimeout

func (pc *ProcessConfig) StepsTimeout(timeout time.Duration) *ProcessConfig

type ProcessInfo

type ProcessInfo struct {
	FlowId string
	// contains filtered or unexported fields
}

func (ProcessInfo) ContextName

func (vc ProcessInfo) ContextName() string

func (ProcessInfo) Get

func (vc ProcessInfo) Get(key string) (value any, exist bool)

func (ProcessInfo) GetAll

func (vc ProcessInfo) GetAll(key string) map[string]any

func (*ProcessInfo) GetId

func (bi *ProcessInfo) GetId() string

func (*ProcessInfo) GetName

func (bi *ProcessInfo) GetName() string

func (ProcessInfo) GetResult

func (vc ProcessInfo) GetResult(key string) (value any, exist bool)

func (ProcessInfo) Set

func (vc ProcessInfo) Set(key string, value any)

type ProcessMeta

type ProcessMeta struct {
	ProcessConfig
	// contains filtered or unexported fields
}

func (*ProcessMeta) AliasStep

func (pm *ProcessMeta) AliasStep(run func(ctx Context) (any, error), alias string, depends ...any) *StepMeta

func (*ProcessMeta) Merge

func (pm *ProcessMeta) Merge(name string)

Merge will not merge config, because has not effective design to not use merged config.

func (*ProcessMeta) Step

func (pm *ProcessMeta) Step(run func(ctx Context) (any, error), depends ...any) *StepMeta

func (*ProcessMeta) Tail

func (pm *ProcessMeta) Tail(run func(ctx Context) (any, error), alias string) *StepMeta

type ResultI

type ResultI interface {
	BasicInfoI
	Futures() []*Future
	Fails() []*Future
}

func DoneArgs

func DoneArgs(name string, args ...any) ResultI

func DoneFlow

func DoneFlow(name string, input map[string]any) ResultI

type Set

type Set[T comparable] struct {
	// contains filtered or unexported fields
}

func CreateSetBySliceFunc

func CreateSetBySliceFunc[T any, K comparable](src []T, transfer func(T) K) *Set[K]

func NewRoutineUnsafeSet

func NewRoutineUnsafeSet[T comparable]() *Set[T]

func (*Set[T]) Add

func (s *Set[T]) Add(item T)

func (*Set[T]) Contains

func (s *Set[T]) Contains(item T) bool

func (*Set[T]) Remove

func (s *Set[T]) Remove(item T)

func (*Set[T]) Size

func (s *Set[T]) Size() int

func (*Set[T]) Slice

func (s *Set[T]) Slice() []T

type Status

type Status int64

func (*Status) Append

func (s *Status) Append(enum *StatusEnum) bool

func (*Status) Contain

func (s *Status) Contain(enum *StatusEnum) bool

func (*Status) Exceptions

func (s *Status) Exceptions() []string

Exceptions return contain exception's message

func (*Status) ExplainStatus

func (s *Status) ExplainStatus() []string

ExplainStatus function explains the status represented by the provided bitmask. The function checks the status against predefined abnormal and normal flags, and returns a slice of strings containing the names of the matching flags. Parameter status is the bitmask representing the status. The returned slice contains the names of the matching flags in the layer they were found. If abnormal flags are found, normal flags will be ignored.

func (*Status) Normal

func (s *Status) Normal() bool

Normal return true if not exception occur

func (*Status) Pop

func (s *Status) Pop(enum *StatusEnum) bool

Pop function pops a status bit from the specified address. The function checks if the specified status bit exists in the current value. If it exists, it removes the status bit, and returns true indicating successful removal of the status bit. Otherwise, it returns false.

func (*Status) Success

func (s *Status) Success() bool

Success return true if finish running and success

type StatusEnum

type StatusEnum struct {
	// contains filtered or unexported fields
}

func (*StatusEnum) Contained

func (s *StatusEnum) Contained(explain ...string) bool

func (*StatusEnum) Message

func (s *StatusEnum) Message() string

type StatusI

type StatusI interface {
	Contain(enum *StatusEnum) bool
	Success() bool
	Exceptions() []string
}

type StepConfig

type StepConfig struct {
	StepTimeout time.Duration
	StepRetry   int
}

type StepInfo

type StepInfo struct {
	ProcessId string
	FlowId    string
	Start     time.Time
	End       time.Time
	Err       error
	// contains filtered or unexported fields
}

func (StepInfo) ContextName

func (vc StepInfo) ContextName() string

func (*StepInfo) Error

func (si *StepInfo) Error() error

func (StepInfo) Get

func (vc StepInfo) Get(key string) (value any, exist bool)

func (StepInfo) GetAll

func (vc StepInfo) GetAll(key string) map[string]any

func (StepInfo) GetId

func (bi StepInfo) GetId() string

func (StepInfo) GetName

func (bi StepInfo) GetName() string

func (StepInfo) GetResult

func (vc StepInfo) GetResult(key string) (value any, exist bool)

func (StepInfo) Set

func (vc StepInfo) Set(key string, value any)

type StepMeta

type StepMeta struct {
	StepConfig
	// contains filtered or unexported fields
}

func (*StepMeta) Next

func (meta *StepMeta) Next(run func(ctx Context) (any, error), alias ...string) *StepMeta

func (*StepMeta) Priority

func (meta *StepMeta) Priority(priority map[string]any)

func (*StepMeta) Retry

func (meta *StepMeta) Retry(retry int) *StepMeta

func (*StepMeta) Same

func (meta *StepMeta) Same(run func(ctx Context) (any, error), alias ...string) *StepMeta

func (*StepMeta) Timeout

func (meta *StepMeta) Timeout(timeout time.Duration) *StepMeta

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL