dgoflow

command module
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 6, 2024 License: MIT, MulanPSL-2.0 Imports: 8 Imported by: 0

README

<<<<<<< HEAD

Go-Flow   Tweet

GoDoc Build Go Report Card

Gopher staring_at flow

A Golang based high performance, scalable and distributed workflow framework

It allows to programmatically author distributed workflow as Directed Acyclic Graph (DAG) of tasks. GoFlow executes your tasks on an array of workers by uniformly distributing the loads

Stability and Compatibility

Status: The library is currently undergoing heavy development with frequent, breaking API changes.

☝️ Important Note: Current major version is zero (v0.x.x) to accommodate rapid development and fast iteration. The public API could change without a major version update before v1.0.0 release.

Install It

Install GoFlow

go mod init myflow
go get github.com/s8sg/goflow@master

Write First Flow

Library to Build Flow github.com/s8sg/goflow/flow/v1

GoDoc

Make a flow.go file

package main

import (
	"fmt"
	goflow "github.com/s8sg/goflow/v1"
	flow "github.com/s8sg/goflow/flow/v1"
)

// Workload function
func doSomething(data []byte, option map[string][]string) ([]byte, error) {
	return []byte(fmt.Sprintf("you said \"%s\"", string(data))), nil
}

// Define provide definition of the workflow
func DefineWorkflow(workflow *flow.Workflow, context *flow.Context) error {
    dag := workflow.Dag()
    dag.Node("test", doSomething)
    return nil
}

func main() {
    fs := &goflow.FlowService{
        Port:                8080,
        RedisURL:            "localhost:6379",
        OpenTraceUrl:        "localhost:5775",
        WorkerConcurrency:   5,
        EnableMonitoring:    true,
    }
    fs.Register("myflow", DefineWorkflow)
    fs.Start()
}

Start() runs a HTTP Server that listen on the provided Port. It also runs a flow worker that handles the workload

Run It

Start goflow stack

docker-compose up

This will start the required services

  • redis
  • jaeger
  • dashboard

Run the Flow

go build -o goflow
./goflow

Invoke It

Using curl
curl -d hallo localhost:8080/flow/myflow
Using Client

Using the goflow client you can request the flow directly. The requests are always async and gets queued for the workers to pick up

fs := &goflow.FlowService{
    RedisURL: "localhost:6379",
}
fs.Execute("myflow", &goflow.Request{
    Body: []byte("hallo")
})
Using Dashboard

Dashboard visualize the flow and provides observability Dashboard

Scale It

GoFlow scale horizontally, you can distribute the load by just adding more instances

Worker Mode

Alternatively you can start your GoFlow in worker mode. As a worker, GoFlow only handles the workload instead of running an HTTP server. If required you can only scale the workers

fs := &goflow.FlowService{
    RedisURL:            "localhost:6379",
    OpenTraceUrl:        "localhost:5775",
    WorkerConcurrency:   5,
}
fs.Register("myflow", DefineWorkflow)
fs.StartWorker()
Register Multiple Flow

Register() allows user to bind multiple flows onto single flow service. This way one instance of server/worker can be used for more than one flows

fs.Register("createUser", DefineCreateUserFlow)
fs.Register("deleteUser", DefineDeleteUserFlow)

Creating More Complex DAG

The initial example is a single vertex DAG. Single vertex DAG are great for synchronous task

Using GoFlow's DAG construct one can achieve more complex compositions with multiple vertexes and connect them using edges.

Multi Nodes

A multi-vertex flow is always asynchronous in nature where each nodes gets distributed across the workers

Below is an example of a simple multi vertex flow to validate a KYC image of a user and mark the user according to the result. This is a asynchronous flow with three steps Async Flow

func DefineWorkflow(f *flow.Workflow, context *flow.Context) error {
    dag := f.Dag()
    dag.Node("get-kyc-image", getPresignedURLForImage)
    dag.Node("face-detect", detectFace)
    dag.Node("mark-profile", markProfileBasedOnStatus)
    dag.Edge("get-kyc-image", "face-detect")
    dag.Edge("face-detect", "mark-profile")
    return nil
}
Branching

Branching are great for parallelizing independent workloads in separate branches

Branching can be achieved with simple vertex and edges. GoFlow provides a special operator Aggregator to aggregate result of multiple branch on a converging node

We are extending our earlier example to include a new requirement to match the face with existing data and we are performing the operation in parallel to reduce time Branching

func DefineWorkflow(f *flow.Workflow, context *flow.Context) error {
    dag := f.Dag()
    dag.Node("get-kyc-image", getPresignedURLForImage)
    dag.Node("face-detect", detectFace)
    dag.Node("face-match", matchFace)
    // Here mark-profile depends on the result from face-detect and face-match, 
    // we are using a aggregator to create unified results
    dag.Node("mark-profile", markProfileBasedOnStatus, flow.Aggregator(func(responses map[string][]byte) ([]byte, error) {
       status := validateResults(responses["face-detect"],  responses["face-match"])
       return []byte(status), nil
    }))
    dag.Edge("get-kyc-image", "face-detect")
    dag.Edge("get-kyc-image", "face-match")
    dag.Edge("face-detect", "mark-profile")
    dag.Edge("face-match", "mark-profile")
    return nil
}
Subdag

Subdag allows to reuse existing DAG by embedding it into DAG with wider functionality

SubDag is available as a GoFlow DAG construct which takes a separate DAG as an input and composite it within a vertex, where the vertex completion depends on the embedded DAG's completion

func (currentDag *Dag) SubDag(vertex string, dag *Dag)

Say we have a separate flow that needs the same set of steps to validate a user. With our earlier example we can separate out the validation process into subdag and put it in a library that can be shared across different flows Subdag

func KycImageValidationDag() *flow.Dag {
    dag := flow.NewDag()
    dag.Node("verify-url", s3DocExists)
    dag.Node("face-detect", detectFace)
    dag.Node("face-match", matchFace)
    dag.Node("generate-result", func(data []byte, option map[string][]string) ([]byte, error) {
                 return data, nil
            }, 
            flow.Aggregator(func(responses map[string][]byte) ([]byte, error) {
                status := validateResults(responses["face-detect"],  responses["face-match"])
                status = "failure"
                if status {
                   status = "success"
                }
                return []byte(status), nil
            }
    ))
    dag.Edge("verify-url", "face-detect")
    dag.Edge("verify-url", "face-match")
    dag.Edge("face-detect", "generate-result")
    dag.Edge("face-match", "generate-result")
    return dag
}

Our existing flow embeds the KycImageValidation DAG

func DefineWorkflow(f *flow.Workflow, context *flow.Context) error {
    dag := f.Dag()
    dag.Node("get-image", getPresignedURLForImage)
    dag.SubDag("verify-image", common.KycImageValidationDag)
    dag.Node("mark-profile", markProfileBasedOnStatus)
    dag.Edge("get-image", "verify-image")
    dag.Edge("verify-image", "mark-profile")
    return nil
}
Conditional Branching

Conditional branching is a great way to choose different execution path dynamically

GoFlow provides a DAG component called ConditionalBranch. ConditionalBranch creates a vertex that composites different conditional branches as an individual subdags, each identified with a unique key resemble the condition

func (currentDag *Dag) ConditionalBranch(vertex string, conditions []string, condition sdk.Condition,
    options ...BranchOption) (conditiondags map[string]*Dag)

Condition is a special handler that allows user to dynamically choose one or more execution path based on the result from earlier node and return a set of condition Keys

User gets the condition branches as a response where each branch specific dags are mapped against the specific condition. User can farther define each branch using the DAG constructs

Below is the updated example with a conditional Branch where we are trying to call face-match only when face-detect passes Conditional

func KycImageValidationDag() *flow.Dag {
    dag := flow.NewDag()
    dag.Node("verify-url", s3DocExists)
    dag.Node("face-detect", detectFace)
    // here face match happen only when face-detect is success
    branches = dag.ConditionalBranch("handle-face-detect-response", []string{"pass"}, func(response []byte) []string {
        response := ParseFaceDetectResponse(response)
        if response[0] == "pass" { return []string{"pass"}  }
        return []string{}
    })

    // On the pass branch we are performing the `face-match` . If condition `pass` 
    // is not matched execution of next node `generate-result` is continued

    branches["pass"].Node("face-match", matchFace)
    dag.Node("generate-result", generateResult)
    dag.Edge("verify-url", "face-detect")
    dag.Edge("face-detect", "handle-face-detect-response")
    dag.Edge("handle-face-detect-response", "generate-result")
    return dag
}

You can also have multiple conditional branch in a workflow and different nodes corresponding to each branch

Below is the updated example with two conditional Branches where we are trying to call face-match or create-user based on response from previous node Conditional

func KycImageValidationDag() *flow.Dag {
    dag := flow.NewDag()
    dag.Node("verify-url", s3DocExists)
    dag.Node("face-detect", detectFace)
    // here face match happen only when face-detect is success
    // otherwise create-user is called
    branches = dag.ConditionalBranch("handle-face-detect-response", []string{"pass", "fail"}, 
        func(response []byte) []string {
           response := ParseFaceDetectResponse(response)
           if response.isSuccess() { return []string{"pass"}  }
           return []string{"fail"}
    })
    // On the pass branch we are performing the `face-match`
    branches["pass"].Node("face-match", matchFace)
    // on the fail branch we are performing `create-user`
    branches["fail"].Node("create-user", createUser)
  
    dag.Node("generate-result", generateResult)
    dag.Edge("verify-url", "face-detect")
    dag.Edge("face-detect", "handle-face-detect-response")
    dag.Edge("handle-face-detect-response", "generate-result")
    return dag
}
Foreach Branching

Foreach branching allows user to iteratively perform a certain set of task for a range of values

GoFlow provides a DAG component called ForEachBranch. ForEachBranch creates a vertex composites of a subdag that defines the flow within the iteration

func (currentDag *Dag) ForEachBranch(vertex string, foreach sdk.ForEach, options ...BranchOption) (dag *Dag)

ForEach is a special handler that allows user to dynamically return a set of key and values. For each of the items in the returned set, the user defined dag will get executed

User gets the foreach branch as a response and can define the flow using the DAG constructs

We are updating our flow to execute over a set of user that has been listed for possible fraud Foreach

func DefineWorkflow(f *flow.Workflow, context *flow.Context) error {
    dag := f.Dag()
    dag.Node("get-users", getListedUsers)
    verifyDag = dag.ForEachBranch("for-each-user-verify", func(data []byte) map[string][]byte {
       users := ParseUsersList(data)
       forEachSet := make(map[string][]byte)
       for _, user := range users {
           forEachSet[user.id] = []byte(user.GetKycImageUrl())
       }
       return forEachSet
    })
    verifyDag.SubDag("verify-image", KycImageValidationDag)
    verifyDag.Node("mark-profile", markProfileBasedOnStatus)
    verifyDag.Edge("verify-image", "mark-profile")

    dag.Edge("get-users", "for-each-user-verify")
    return nil
}

=======

dgoflow

介绍

{以下是 Gitee 平台说明,您可以替换此简介 Gitee 是 OSCHINA 推出的基于 Git 的代码托管平台(同时支持 SVN)。专为开发者提供稳定、高效、安全的云端软件开发协作平台 无论是个人、团队、或是企业,都能够用 Gitee 实现代码托管、项目管理、协作开发。企业项目请看 https://gitee.com/enterprises}

软件架构

软件架构说明

安装教程
  1. xxxx
  2. xxxx
  3. xxxx
使用说明
  1. xxxx
  2. xxxx
  3. xxxx
参与贡献
  1. Fork 本仓库
  2. 新建 Feat_xxx 分支
  3. 提交代码
  4. 新建 Pull Request
特技
  1. 使用 Readme_XXX.md 来支持不同的语言,例如 Readme_en.md, Readme_zh.md
  2. Gitee 官方博客 blog.gitee.com
  3. 你可以 https://gitee.com/explore 这个地址来了解 Gitee 上的优秀开源项目
  4. GVP 全称是 Gitee 最有价值开源项目,是综合评定出的优秀开源项目
  5. Gitee 官方提供的使用手册 https://gitee.com/help
  6. Gitee 封面人物是一档用来展示 Gitee 会员风采的栏目 https://gitee.com/gitee-stars/

b8d476b447d55d50b5729957b99e3522e0681e24

Documentation

The Go Gopher

There is no documentation for this package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL