flow

package module
v0.0.0-...-1aad9ac Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 16, 2024 License: Apache-2.0 Imports: 6 Imported by: 0

README

🔩 Go Flow

Go Flow is a lightweight workflow framework based on state machines, designed to simplify the creation and management of workflows in Go.

✨ Getting Started

go get github.com/basenana/go-flow
Sample Task

You can create a workflow directly from functions:

builder := flow.NewFlowBuilder("sample-flow-01")

builder.Task(flow.NewFuncTask("task-1", func(ctx context.Context) error {
    fmt.Println("do something in task 1")
    return nil
}))
builder.Task(flow.NewFuncTask("task-2", func(ctx context.Context) error {
    fmt.Println("do something in task 2")
    return nil
}))
builder.Task(flow.NewFuncTask("task-3", func(ctx context.Context) error {
    fmt.Println("do something in task 3")
    return nil
}))

sampleFlow := builder.Finish()

runner := flow.NewRunner(sampleFlow)
_ = runner.Start(context.TODO())

Observer

You can customize the observer to track the status of flows and tasks, enabling operations such as persistence and logging.

type storageObserver struct {
	flow map[string]*flow.Flow
	task map[string]flow.Task

	sync.Mutex
}

func (s *storageObserver) Handle(event flow.UpdateEvent) {
	s.Lock()
	defer s.Unlock()

	fmt.Printf("update flow %s", event.Flow.ID)
	s.flow[event.Flow.ID] = event.Flow

	if event.Task != nil {
		fmt.Printf("update flow %s task %s", event.Flow.ID, event.Task.GetName())
		s.task[event.Task.GetName()] = event.Task
	}
}

var _ flow.Observer = &storageObserver{}

Register the observer in the builder:

builder := flow.NewFlowBuilder("sample-flow-01").Observer(&storageObserver{
		flow: make(map[string]*flow.Flow),
		task: make(map[string]flow.Task),
	})
DAG

For more complex tasks, workflows based on DAGs are also supported:

builder := flow.NewFlowBuilder("dag-flow-01").Coordinator(flow.NewDAGCoordinator())

task1 := flow.NewFuncTask("task-1", func(ctx context.Context) error {
    fmt.Println("do something in task 1")
    return nil
})
builder.Task(flow.WithDirector(task1, flow.NextTask{
    OnSucceed: "task-3",
    OnFailed:  "task-fail",
}))

builder.Task(flow.NewFuncTask("task-2", func(ctx context.Context) error {
    fmt.Println("do something in task 2")
    return nil
}))
builder.Task(flow.NewFuncTask("task-3", func(ctx context.Context) error {
    fmt.Println("do something in task 3")
    return nil
}))

builder.Task(flow.NewFuncTask("task-fail", func(ctx context.Context) error {
    fmt.Println("ops")
    return nil
}))

dagFlow := builder.Finish()

runner := flow.NewRunner(dagFlow)
_ = runner.Start(context.TODO())

🧩 Extensible

My Task

You can also define your own Task types and corresponding Executors:

type MyTask struct {
	flow.BasicTask
	parameters map[string]string
}

func NewMyTask(name string, parameters map[string]string) flow.Task {
	return &MyTask{BasicTask: flow.BasicTask{Name: name}, parameters: parameters}
}

type MyExecutor struct{}

var _ flow.Executor = &MyExecutor{}

func (m *MyExecutor) Setup(ctx context.Context) error {
	return nil
}

func (m *MyExecutor) Exec(ctx context.Context, flow *flow.Flow, task flow.Task) error {
	myTask, ok := task.(*MyTask)
	if !ok {
		return fmt.Errorf("not my task")
	}

	fmt.Printf("exec my task: %s\n", myTask.Name)
	return nil
}

func (m *MyExecutor) Teardown(ctx context.Context) error {
	return nil
}

Load the custom Tasks into the Flow:

builder := flow.NewFlowBuilder("my-flow-01").Executor(&MyExecutor{})

builder.Task(NewMyTask("my-task-1", make(map[string]string)))
builder.Task(NewMyTask("my-task-2", make(map[string]string)))
builder.Task(NewMyTask("my-task-3", make(map[string]string)))

myFlow := builder.Finish()

runner := flow.NewRunner(myFlow)
_ = runner.Start(context.TODO())

Documentation

Index

Constants

View Source
const (
	InitializingStatus = "initializing"
	RunningStatus      = "running"
	PausingStatus      = "pausing"
	SucceedStatus      = "succeed"
	FailedStatus       = "failed"
	ErrorStatus        = "error"
	PausedStatus       = "paused"
	CanceledStatus     = "canceled"

	TriggerEvent       = "flow.execute.trigger"
	ExecuteFinishEvent = "flow.execute.finish"
	ExecuteFailedEvent = "flow.execute.failed"
	ExecuteErrorEvent  = "flow.execute.error"
	ExecutePauseEvent  = "flow.execute.pause"
	ExecutePausedEvent = "flow.execute.paused"
	ExecuteResumeEvent = "flow.execute.resume"
	ExecuteCancelEvent = "flow.execute.cancel"

	FailAndInterrupt FailOperation = "interrupt"
	FailAndPause     FailOperation = "pause"
	FailButContinue  FailOperation = "continue"
)

Variables

This section is empty.

Functions

func IsFinishedStatus

func IsFinishedStatus(sts string) bool

Types

type BasicTask

type BasicTask struct {
	Name    string
	Status  string
	Message string
}

func (*BasicTask) GetMessage

func (t *BasicTask) GetMessage() string

func (*BasicTask) GetName

func (t *BasicTask) GetName() string

func (*BasicTask) GetStatus

func (t *BasicTask) GetStatus() string

func (*BasicTask) SetMessage

func (t *BasicTask) SetMessage(msg string)

func (*BasicTask) SetStatus

func (t *BasicTask) SetStatus(status string)

type Builder

type Builder struct {
	// contains filtered or unexported fields
}

func NewFlowBuilder

func NewFlowBuilder(id string) *Builder

func (*Builder) Coordinator

func (b *Builder) Coordinator(coordinator Coordinator) *Builder

func (*Builder) Executor

func (b *Builder) Executor(executor Executor) *Builder

func (*Builder) Finish

func (b *Builder) Finish() *Flow

func (*Builder) Observer

func (b *Builder) Observer(observer Observer) *Builder

func (*Builder) Task

func (b *Builder) Task(task Task) *Builder

type Coordinator

type Coordinator interface {
	NewTask(task Task)
	UpdateTask(task Task)
	NextBatch(ctx context.Context) ([]Task, error)
	HandleFail(task Task, err error) FailOperation
}

type DAGCoordinator

type DAGCoordinator struct {
	// contains filtered or unexported fields
}

func NewDAGCoordinator

func NewDAGCoordinator() *DAGCoordinator

func (*DAGCoordinator) HandleFail

func (g *DAGCoordinator) HandleFail(task Task, err error) FailOperation

func (*DAGCoordinator) NewTask

func (g *DAGCoordinator) NewTask(task Task)

func (*DAGCoordinator) NextBatch

func (g *DAGCoordinator) NextBatch(ctx context.Context) ([]Task, error)

func (*DAGCoordinator) UpdateTask

func (g *DAGCoordinator) UpdateTask(task Task)

type Executor

type Executor interface {
	Setup(ctx context.Context) error
	Exec(ctx context.Context, flow *Flow, task Task) error
	Teardown(ctx context.Context) error
}

type FSM

type FSM struct {
	// contains filtered or unexported fields
}

func NewFSM

func NewFSM(status string) *FSM

func (*FSM) Do

func (m *FSM) Do(handler fsmEventHandler) *FSM

func (*FSM) Event

func (m *FSM) Event(event statusEvent) (err error)

func (*FSM) From

func (m *FSM) From(statues []string) *FSM

func (*FSM) To

func (m *FSM) To(status string) *FSM

func (*FSM) When

func (m *FSM) When(event string) *FSM

type FailOperation

type FailOperation string

type Flow

type Flow struct {
	ID      string
	Status  string
	Message string
	// contains filtered or unexported fields
}

func (*Flow) SetStatus

func (f *Flow) SetStatus(status string, message string)

func (*Flow) SetTaskStatue

func (f *Flow) SetTaskStatue(task Task, status, msg string)

type FunctionTask

type FunctionTask struct {
	*BasicTask
	// contains filtered or unexported fields
}

func (*FunctionTask) Run

func (f *FunctionTask) Run(ctx context.Context) error

type NextTask

type NextTask struct {
	OnSucceed string
	OnFailed  string
}

type Observer

type Observer interface {
	Handle(event UpdateEvent)
}

type Runnable

type Runnable interface {
	Run(ctx context.Context) error
}

type Runner

type Runner struct {
	*Flow
	// contains filtered or unexported fields
}

func NewRunner

func NewRunner(f *Flow) *Runner

func (*Runner) Cancel

func (r *Runner) Cancel() error

func (*Runner) Pause

func (r *Runner) Pause() error

func (*Runner) Resume

func (r *Runner) Resume() error

func (*Runner) Start

func (r *Runner) Start(ctx context.Context) (err error)

type StringSet

type StringSet map[string]struct{}

func NewStringSet

func NewStringSet(strList ...string) StringSet

func (StringSet) Del

func (s StringSet) Del(str string)

func (StringSet) Has

func (s StringSet) Has(str string) bool

func (StringSet) Insert

func (s StringSet) Insert(strList ...string)

func (StringSet) Len

func (s StringSet) Len() int

func (StringSet) List

func (s StringSet) List() (result []string)

type Task

type Task interface {
	GetName() string
	GetStatus() string
	SetStatus(string)
	GetMessage() string
	SetMessage(string)
}

func NewFuncTask

func NewFuncTask(name string, runFn func(ctx context.Context) error) Task

func WithDirector

func WithDirector(task Task, nextTask NextTask) Task

type UpdateEvent

type UpdateEvent struct {
	Flow *Flow
	Task Task
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL