flow

package module
v0.9.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 6, 2023 License: MIT Imports: 25 Imported by: 0

README

flow

实例代码

查看 examples

生成 echo 代码

cd $GOPATH/src
protoc -I. --gogo_out=:. --flow_out=:. github.com/vine-io/flow/examples/pb/hello.proto

启动 etcd

etcd

启动 server 和 client

go build -o _output/server examples/server/server.go
go build -o _output/client examples/client/client.go

_output/server
_output/client

下载 protoc-gen-flow

bash -c tool/install.sh

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	Root         = "/vine.io/flow"
	WorkflowPath = path.Join(Root, "wf")
	ErrAborted   = errors.New("be aborted")
)

Functions

func EchoToAPI

func EchoToAPI(echo Echo) *api.Echo

EchoToAPI returns a new instance *api.Echo based on the specified Echo interface implementation.

func EntityToAPI

func EntityToAPI(entity Entity) *api.Entity

EntityToAPI returns a new instance *api.Entity based on the specified Entity interface implementation.

func ExtractFields

func ExtractFields(t any) []string

func ExtractTypeField

func ExtractTypeField(t any) ([]byte, error)

func GetTypePkgName

func GetTypePkgName(p reflect.Type) string

GetTypePkgName returns the package path and kind for object based on reflect.Type.

func InjectTypeFields

func InjectTypeFields(t any, items, args map[string]string, entityData []byte) error

func IsCancel

func IsCancel(err error) bool

func IsRetriedErr

func IsRetriedErr(err error) bool

IsRetriedErr returns boolean value if the error is specified code.

func Load

func Load(tps ...any)

func NewOptions

func NewOptions(opts ...Option) *api.WorkflowOption

func ProvideWithName added in v0.9.1

func ProvideWithName(name string, value any) error

func Provides added in v0.9.1

func Provides(values ...any) error

func SetTypeEntityField

func SetTypeEntityField(t any, data []byte) error

func StepToAPI

func StepToAPI(step Step) *api.Step

StepToAPI returns a new instance *api.Step based on the specified Step interface implementation.

func StepToWorkStep

func StepToWorkStep(step Step, worker string) *api.WorkflowStep

StepToWorkStep returns a new instance *api.WorkflowStep based on the specified Step interface implementation.

Types

type CallPack

type CallPack struct {
	// contains filtered or unexported fields
}

func NewCall

func NewCall(ctx context.Context, chunk *api.PipeCallRequest) *CallPack

func (*CallPack) Destroy

func (p *CallPack) Destroy()

type CellStep added in v0.8.2

type CellStep struct {
	EmptyStep
}

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(cfg ClientConfig, attrs map[string]string) (*Client, error)

func (*Client) AbortWorkflow

func (c *Client) AbortWorkflow(ctx context.Context, wid string) error

func (*Client) Call

func (c *Client) Call(ctx context.Context, client, name string, data []byte, opts ...vclient.CallOption) ([]byte, error)

func (*Client) ExecuteWorkflow added in v0.5.0

func (c *Client) ExecuteWorkflow(ctx context.Context, spec *api.Workflow, watch bool) (WorkflowWatcher, error)

func (*Client) Id

func (c *Client) Id() string

func (*Client) InspectWorkflow

func (c *Client) InspectWorkflow(ctx context.Context, wid string) (*api.Workflow, error)

func (*Client) ListRegistry added in v0.2.0

func (c *Client) ListRegistry(ctx context.Context) ([]*api.Entity, []*api.Echo, []*api.Step, error)

func (*Client) ListWorkFlow

func (c *Client) ListWorkFlow(ctx context.Context) ([]*api.WorkflowSnapshot, error)

func (*Client) ListWorker added in v0.2.0

func (c *Client) ListWorker(ctx context.Context) ([]*api.Worker, error)

func (*Client) NewSession

func (c *Client) NewSession() (*PipeSession, error)

func (*Client) NewWorkflow added in v0.5.0

func (c *Client) NewWorkflow(opts ...Option) *WorkflowBuilder

func (*Client) PauseWorkflow

func (c *Client) PauseWorkflow(ctx context.Context, wid string) error

func (*Client) ResumeWorkflow

func (c *Client) ResumeWorkflow(ctx context.Context, wid string) error

func (*Client) Step

func (c *Client) Step(ctx context.Context, name string, action api.StepAction, items, args map[string]string, data []byte, opts ...vclient.CallOption) ([]byte, error)

func (*Client) WatchWorkflow

func (c *Client) WatchWorkflow(ctx context.Context, wid string, opts ...vclient.CallOption) (WorkflowWatcher, error)

type ClientConfig

type ClientConfig struct {
	// contains filtered or unexported fields
}

func NewConfig

func NewConfig(name, id, address string) ClientConfig

func (*ClientConfig) WithConn added in v0.2.2

func (c *ClientConfig) WithConn(conn vclient.Client) *ClientConfig

func (*ClientConfig) WithHeartbeat added in v0.2.2

func (c *ClientConfig) WithHeartbeat(t time.Duration) *ClientConfig

func (*ClientConfig) WithStore added in v0.4.3

func (c *ClientConfig) WithStore(store *ClientStore) *ClientConfig

func (*ClientConfig) WithTimeout added in v0.2.2

func (c *ClientConfig) WithTimeout(t time.Duration) *ClientConfig

type ClientPipe

type ClientPipe struct {
	Id string
	// contains filtered or unexported fields
}

func NewPipe

func NewPipe(id string, pr *Peer, stream PipeStream) *ClientPipe

func (*ClientPipe) Call

func (p *ClientPipe) Call(pack *CallPack) (<-chan []byte, <-chan error)

func (*ClientPipe) Close

func (p *ClientPipe) Close()

func (*ClientPipe) Start

func (p *ClientPipe) Start()

func (*ClientPipe) Step

func (p *ClientPipe) Step(pack *StepPack) (<-chan []byte, <-chan error)

type ClientStore

type ClientStore struct {
	// contains filtered or unexported fields
}

func NewClientStore

func NewClientStore() *ClientStore

func (*ClientStore) GetEcho

func (s *ClientStore) GetEcho(name string) (Echo, bool)

func (*ClientStore) GetEntity

func (s *ClientStore) GetEntity(kind string) (Entity, bool)

func (*ClientStore) GetStep

func (s *ClientStore) GetStep(name string) (Step, bool)

func (*ClientStore) Load added in v0.4.3

func (s *ClientStore) Load(ts ...any)

func (*ClientStore) PopulateEcho added in v0.9.1

func (s *ClientStore) PopulateEcho(name string) (Echo, error)

func (*ClientStore) PopulateStep added in v0.9.1

func (s *ClientStore) PopulateStep(name string) (Step, error)

func (*ClientStore) ProvideWithName added in v0.9.1

func (s *ClientStore) ProvideWithName(name string, value any) error

func (*ClientStore) Provides added in v0.9.1

func (s *ClientStore) Provides(values ...any) error

type Echo

type Echo interface {
	// Owner 所属 Entity 信息
	Owner() reflect.Type

	Call(ctx context.Context, data []byte) ([]byte, error)
	// Desc Echo 描述信息
	Desc() string
}

Echo 描述一个具体的请求

type EchoSet

type EchoSet struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewEchoSet

func NewEchoSet() *EchoSet

func (*EchoSet) Add

func (s *EchoSet) Add(echo *api.Echo)

func (*EchoSet) Contains

func (s *EchoSet) Contains(name string) bool

func (*EchoSet) Del

func (s *EchoSet) Del(echo *api.Echo)

func (*EchoSet) Get

func (s *EchoSet) Get(name string) (*api.Echo, bool)

func (*EchoSet) List added in v0.2.0

func (s *EchoSet) List() []*api.Echo

type Empty

type Empty struct{}

func (*Empty) Desc added in v0.9.2

func (e *Empty) Desc() string

func (*Empty) Marshal

func (e *Empty) Marshal() ([]byte, error)

func (*Empty) OwnerReferences

func (e *Empty) OwnerReferences() []*api.OwnerReference

func (*Empty) Unmarshal

func (e *Empty) Unmarshal(data []byte) error

type EmptyEcho

type EmptyEcho struct{}

func (*EmptyEcho) Call

func (e *EmptyEcho) Call(ctx context.Context, data []byte) ([]byte, error)

func (*EmptyEcho) Desc added in v0.9.2

func (e *EmptyEcho) Desc() string

func (*EmptyEcho) Owner added in v0.6.0

func (e *EmptyEcho) Owner() reflect.Type

type EmptyStep

type EmptyStep struct{}

func (*EmptyStep) Cancel

func (s *EmptyStep) Cancel(ctx *PipeSessionCtx) error

func (*EmptyStep) Commit

func (s *EmptyStep) Commit(ctx *PipeSessionCtx) error

func (*EmptyStep) Desc added in v0.9.2

func (s *EmptyStep) Desc() string

func (*EmptyStep) Owner added in v0.6.0

func (s *EmptyStep) Owner() reflect.Type

func (*EmptyStep) Prepare

func (s *EmptyStep) Prepare(ctx *PipeSessionCtx) error

func (*EmptyStep) Rollback

func (s *EmptyStep) Rollback(ctx *PipeSessionCtx) error

type Entity

type Entity interface {
	// OwnerReferences Entity 之间的依赖信息
	OwnerReferences() []*api.OwnerReference
	// Marshal Entity 序列化
	Marshal() ([]byte, error)
	// Unmarshal Entity 反序列化
	Unmarshal(data []byte) error
	// Desc Entity 说明
	Desc() string
}

Entity 描述工作流中的具体资源,是工作流中的执行单元

type EntitySet

type EntitySet struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewEntitySet

func NewEntitySet() *EntitySet

func (*EntitySet) Add

func (s *EntitySet) Add(entity *api.Entity)

func (*EntitySet) Contains

func (s *EntitySet) Contains(name string) bool

func (*EntitySet) Del

func (s *EntitySet) Del(entity *api.Entity)

func (*EntitySet) Get

func (s *EntitySet) Get(kind string) (*api.Entity, bool)

func (*EntitySet) List added in v0.2.0

func (s *EntitySet) List() []*api.Entity

type MemberPipeHandler

type MemberPipeHandler func(*api.PipeResponse) (*api.PipeRequest, error)

type MemberPipeStream

type MemberPipeStream struct {
	// contains filtered or unexported fields
}

func NewMemberPipeStream

func NewMemberPipeStream(ctx context.Context, id string, handler MemberPipeHandler) *MemberPipeStream

func (*MemberPipeStream) Close

func (s *MemberPipeStream) Close() error

func (*MemberPipeStream) Context

func (s *MemberPipeStream) Context() context.Context

func (*MemberPipeStream) Recv

func (s *MemberPipeStream) Recv() (*api.PipeRequest, error)

func (*MemberPipeStream) Send

func (s *MemberPipeStream) Send(req *api.PipeResponse) error

type Option

type Option func(option *api.WorkflowOption)

Option represents a configuration option for Workflow struct. It provides a way to modify the fields of the Workflow struct a flexible ways.

func WithId

func WithId(id string) Option

WithId sets the Wid field of *api.WorkflowOption to the specified value.

func WithMaxRetry

func WithMaxRetry(retry int32) Option

WithMaxRetry sets the MaxRetries field of *api.WorkflowOption to the specified value.

func WithMode

func WithMode(mode api.WorkflowMode) Option

WithMode sets the Mode field of *api.WorkflowOption to the specified value.

func WithName

func WithName(name string) Option

WithName sets the Name field of *api.WorkflowOption to the specified value.

type Peer

type Peer struct {
	Server string
	Client string
}

type PipeSession

type PipeSession struct {
	// contains filtered or unexported fields
}

func NewPipeSession

func NewPipeSession(c *Client) (*PipeSession, error)

func (*PipeSession) Close

func (s *PipeSession) Close()

func (*PipeSession) IsConnected

func (s *PipeSession) IsConnected() bool

type PipeSessionCtx

type PipeSessionCtx struct {
	context.Context
	// contains filtered or unexported fields
}

func NewSessionCtx

func NewSessionCtx(ctx context.Context, wid, step string, revision api.Revision, c *Client) *PipeSessionCtx

func (*PipeSessionCtx) Call

func (c *PipeSessionCtx) Call(ctx context.Context, target, echo string, data []byte, opts ...vclient.CallOption) ([]byte, error)

func (*PipeSessionCtx) Get

func (c *PipeSessionCtx) Get(ctx context.Context, key string, opts ...vclient.CallOption) ([]byte, error)

func (*PipeSessionCtx) Put

func (c *PipeSessionCtx) Put(ctx context.Context, key string, data any, opts ...vclient.CallOption) error

func (*PipeSessionCtx) Revision

func (c *PipeSessionCtx) Revision() *api.Revision

func (*PipeSessionCtx) Trace

func (c *PipeSessionCtx) Trace(ctx context.Context, text []byte, opts ...vclient.CallOption) error

func (*PipeSessionCtx) WorkflowID

func (c *PipeSessionCtx) WorkflowID() string

type PipeSet

type PipeSet struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewPipeSet

func NewPipeSet() *PipeSet

func (*PipeSet) Add

func (ps *PipeSet) Add(p *ClientPipe)

func (*PipeSet) Del

func (ps *PipeSet) Del(p *ClientPipe)

func (*PipeSet) Get

func (ps *PipeSet) Get(id string) (*ClientPipe, bool)

type PipeStream

type PipeStream interface {
	Context() context.Context
	Send(*api.PipeResponse) error
	Recv() (*api.PipeRequest, error)
	Close() error
}

type RpcServer

type RpcServer struct {
	// contains filtered or unexported fields
}

func NewRPCServer

func NewRPCServer(s vserver.Server, scheduler *Scheduler) (*RpcServer, error)

func (*RpcServer) AbortWorkflow

func (rs *RpcServer) AbortWorkflow(ctx context.Context, req *api.AbortWorkflowRequest, rsp *api.AbortWorkflowResponse) error

func (*RpcServer) Call

func (rs *RpcServer) Call(ctx context.Context, req *api.CallRequest, rsp *api.CallResponse) error

func (*RpcServer) Id

func (rs *RpcServer) Id() string

func (*RpcServer) InspectWorkflow

func (rs *RpcServer) InspectWorkflow(ctx context.Context, req *api.InspectWorkflowRequest, rsp *api.InspectWorkflowResponse) error

func (*RpcServer) ListRegistry added in v0.2.0

func (rs *RpcServer) ListRegistry(ctx context.Context, req *api.ListRegistryRequest, rsp *api.ListRegistryResponse) error

func (*RpcServer) ListWorker added in v0.2.0

func (rs *RpcServer) ListWorker(ctx context.Context, req *api.ListWorkerRequest, rsp *api.ListWorkerResponse) error

func (*RpcServer) ListWorkflow

func (rs *RpcServer) ListWorkflow(ctx context.Context, req *api.ListWorkflowRequest, rsp *api.ListWorkflowResponse) error

func (*RpcServer) PauseWorkflow

func (rs *RpcServer) PauseWorkflow(ctx context.Context, req *api.PauseWorkflowRequest, rsp *api.PauseWorkflowResponse) error

func (*RpcServer) Pipe

func (rs *RpcServer) Pipe(ctx context.Context, stream api.FlowRpc_PipeStream) error

func (*RpcServer) Register

func (rs *RpcServer) Register(ctx context.Context, req *api.RegisterRequest, rsp *api.RegisterResponse) error

func (*RpcServer) ResumeWorkflow

func (rs *RpcServer) ResumeWorkflow(ctx context.Context, req *api.ResumeWorkflowRequest, rsp *api.ResumeWorkflowResponse) error

func (*RpcServer) RunWorkflow

func (*RpcServer) Step

func (rs *RpcServer) Step(ctx context.Context, req *api.StepRequest, rsp *api.StepResponse) error

func (*RpcServer) StepGet

func (rs *RpcServer) StepGet(ctx context.Context, req *api.StepGetRequest, rsp *api.StepGetResponse) error

func (*RpcServer) StepPut

func (rs *RpcServer) StepPut(ctx context.Context, req *api.StepPutRequest, rsp *api.StepPutResponse) error

func (*RpcServer) StepTrace

func (rs *RpcServer) StepTrace(ctx context.Context, req *api.StepTraceRequest, rsp *api.StepTraceResponse) error

func (*RpcServer) Stop

func (rs *RpcServer) Stop() error

func (*RpcServer) WatchWorkflow

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

func NewScheduler

func NewScheduler(storage *clientv3.Client, size int) (*Scheduler, error)

func (*Scheduler) ExecuteWorkflow

func (s *Scheduler) ExecuteWorkflow(w *api.Workflow, ps *PipeSet) error

func (*Scheduler) GetRegistry added in v0.2.0

func (s *Scheduler) GetRegistry() (entities []*api.Entity, echoes []*api.Echo, steps []*api.Step)

func (*Scheduler) GetWorkers added in v0.2.0

func (s *Scheduler) GetWorkers(ctx context.Context) ([]*api.Worker, error)

func (*Scheduler) GetWorkflow

func (s *Scheduler) GetWorkflow(wid string) (*Workflow, bool)

func (*Scheduler) InspectWorkflow

func (s *Scheduler) InspectWorkflow(ctx context.Context, wid string) (*api.Workflow, error)

func (*Scheduler) IsClosed

func (s *Scheduler) IsClosed() bool

func (*Scheduler) Register

func (s *Scheduler) Register(worker *api.Worker, entities []*api.Entity, echoes []*api.Echo, steps []*api.Step) error

func (*Scheduler) StepGet

func (s *Scheduler) StepGet(ctx context.Context, wid, key string) ([]byte, error)

func (*Scheduler) StepPut

func (s *Scheduler) StepPut(ctx context.Context, wid, key, value string) error

func (*Scheduler) StepTrace

func (s *Scheduler) StepTrace(ctx context.Context, wid, step string, text []byte) error

func (*Scheduler) Stop

func (s *Scheduler) Stop(wait bool)

func (*Scheduler) WatchWorkflow

func (s *Scheduler) WatchWorkflow(ctx context.Context, wid string) (<-chan *api.WorkflowWatchResult, error)

func (*Scheduler) WorkflowSnapshots

func (s *Scheduler) WorkflowSnapshots() []*api.WorkflowSnapshot

type Step

type Step interface {
	// Owner Step 所属 Entity
	Owner() reflect.Type

	Prepare(ctx *PipeSessionCtx) error

	Commit(ctx *PipeSessionCtx) error

	Rollback(ctx *PipeSessionCtx) error

	Cancel(ctx *PipeSessionCtx) error
	// Desc Step 描述信息
	Desc() string
}

Step 表示具有原子性的复杂操作

type StepPack

type StepPack struct {
	// contains filtered or unexported fields
}

func NewStep

func NewStep(ctx context.Context, chunk *api.PipeStepRequest) *StepPack

func (*StepPack) Destroy

func (p *StepPack) Destroy()

type StepSet

type StepSet struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewStepSet

func NewStepSet() *StepSet

func (*StepSet) Add

func (s *StepSet) Add(step *api.Step)

func (*StepSet) Contains

func (s *StepSet) Contains(name string) bool

func (*StepSet) Del

func (s *StepSet) Del(step *api.Step)

func (*StepSet) Get

func (s *StepSet) Get(name string) (*api.Step, bool)

func (*StepSet) List added in v0.2.0

func (s *StepSet) List() []*api.Step

type Tag

type Tag struct {
	Name     string
	Kind     TagKind
	IsEntity bool
}

type TagKind added in v0.7.0

type TagKind int
const (
	TagKindCtx TagKind = iota + 1
	TagKindArgs
)

type TestStep added in v0.6.0

type TestStep struct {
	E     *Empty `flow:"entity"`
	A     string `flow:"ctx:a"`
	B     int32  `flow:"ctx:b"`
	C     string `flow:"ctx:c"`
	ArgsA string `flow:"args:a"`
	// contains filtered or unexported fields
}

func (*TestStep) Cancel added in v0.6.0

func (s *TestStep) Cancel(ctx *PipeSessionCtx) error

func (*TestStep) Commit added in v0.6.0

func (s *TestStep) Commit(ctx *PipeSessionCtx) error

func (*TestStep) Desc added in v0.9.2

func (s *TestStep) Desc() string

func (*TestStep) Owner added in v0.6.0

func (s *TestStep) Owner() reflect.Type

func (*TestStep) Prepare added in v0.6.0

func (s *TestStep) Prepare(ctx *PipeSessionCtx) error

func (*TestStep) Rollback added in v0.6.0

func (s *TestStep) Rollback(ctx *PipeSessionCtx) error

type Workflow

type Workflow struct {
	// protects for api.Workflow and snapshot
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewWorkflow

func NewWorkflow(spec *api.Workflow) *Workflow

func (*Workflow) Abort

func (w *Workflow) Abort()

func (*Workflow) Cancel

func (w *Workflow) Cancel()

func (*Workflow) Context

func (w *Workflow) Context() context.Context

func (*Workflow) Execute

func (w *Workflow) Execute(ps *PipeSet, client *clientv3.Client)

func (*Workflow) ID

func (w *Workflow) ID() string

func (*Workflow) Init

func (w *Workflow) Init(client *clientv3.Client) (err error)

func (*Workflow) Inspect

func (w *Workflow) Inspect(ctx context.Context, client *clientv3.Client) (*api.Workflow, error)

func (*Workflow) IsAbort

func (w *Workflow) IsAbort() bool

func (*Workflow) IsStop

func (w *Workflow) IsStop() bool

func (*Workflow) NewSnapshot

func (w *Workflow) NewSnapshot() *api.WorkflowSnapshot

func (*Workflow) NewWatcher

func (w *Workflow) NewWatcher(ctx context.Context, client *clientv3.Client) (<-chan *api.WorkflowWatchResult, error)

func (*Workflow) Pause

func (w *Workflow) Pause() bool

func (*Workflow) Resume

func (w *Workflow) Resume() bool

type WorkflowBuilder

type WorkflowBuilder struct {
	// contains filtered or unexported fields
}

WorkflowBuilder the builder pattern is used to separate the construction of a complex object of its representation so that the same construction process can create different representations.

func NewBuilder

func NewBuilder(opts ...Option) *WorkflowBuilder

NewBuilder returns a new instance of the WorkflowBuilder struct. The builder can be used to construct a Workflow struct with specific options.

func (*WorkflowBuilder) Build

func (b *WorkflowBuilder) Build() *api.Workflow

Build returns a new Workflow struct based on the current configuration of the WorkflowBuilder.

func (*WorkflowBuilder) Entities

func (b *WorkflowBuilder) Entities(entities ...Entity) *WorkflowBuilder

Entities adds a slice of Entity interface implementations to Workflow struct.

func (*WorkflowBuilder) Items

func (b *WorkflowBuilder) Items(items map[string]string) *WorkflowBuilder

Items adds a map of key-value pairs to the Workflow struct.

func (*WorkflowBuilder) Steps

func (b *WorkflowBuilder) Steps(steps ...*api.WorkflowStep) *WorkflowBuilder

Steps adds a slice of Step interface implementations to the Workflow struct.

type WorkflowStepBuilder added in v0.7.0

type WorkflowStepBuilder struct {
	// contains filtered or unexported fields
}

WorkflowStepBuilder the builder pattern is used to separate the construction of a complex object of its representation so that the same construction process can create different representations.

func NewStepBuilder added in v0.7.0

func NewStepBuilder(step Step, worker string) *WorkflowStepBuilder

func (*WorkflowStepBuilder) Arg added in v0.7.0

func (*WorkflowStepBuilder) Args added in v0.7.0

func (*WorkflowStepBuilder) Build added in v0.7.0

func (b *WorkflowStepBuilder) Build() *api.WorkflowStep

func (*WorkflowStepBuilder) ID added in v0.7.0

type WorkflowWatcher

type WorkflowWatcher interface {
	Next() (*api.WorkflowWatchResult, error)
}

Directories

Path Synopsis
cmd
protoc-gen-flow
protoc-gen-vine is a plugin for the Google protocol buffer compiler to generate Go code.
protoc-gen-vine is a plugin for the Google protocol buffer compiler to generate Go code.
examples
pb

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL