flow

package module
v0.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 7, 2023 License: MIT Imports: 24 Imported by: 0

README

flow

实例代码

查看 examples

Documentation

Index

Constants

View Source
const (
	EntityUnique   = "flow-entity-unique"
	EntityKind     = "flow-entity-kind"
	EntityDesc     = "flow-entity-desc"
	EntityWorker   = "flow-entity-worker"
	EntityEndpoint = "flow-entity-endpoint"
)

the metadata of Entity

View Source
const (
	EchoName     = "flow-echo-name"
	EchoID       = "flow-echo-id"
	EchoDesc     = "flow-echo-desc"
	EchoOwner    = "flow-echo-owner"
	EchoWorker   = "flow-echo-worker"
	EchoEndpoint = "flow-echo-endpoint"
)

the metadata of echo

View Source
const (
	StepName     = "flow-step-name"
	StepId       = "flow-step-id"
	StepDesc     = "flow-step-desc"
	StepOwner    = "flow-step-owner"
	StepWorker   = "flow-step-worker"
	StepEndpoint = "flow-step-endpoint"
)

the metadata of Step

Variables

View Source
var (
	Root         = "/vine.io/flow"
	WorkflowPath = path.Join(Root, "wf")
	ErrAborted   = errors.New("be aborted")
)

Functions

func EchoToAPI

func EchoToAPI(echo Echo) *api.Echo

EchoToAPI returns a new instance *api.Echo based on the specified Echo interface implementation.

func EntityToAPI

func EntityToAPI(entity Entity) *api.Entity

EntityToAPI returns a new instance *api.Entity based on the specified Entity interface implementation.

func ExtractFields

func ExtractFields(t any) []string

func ExtractTypeField

func ExtractTypeField(t any) ([]byte, error)

func GetTypePkgName

func GetTypePkgName(p reflect.Type) string

GetTypePkgName returns the package path and kind for object based on reflect.Type.

func InjectTypeFields

func InjectTypeFields(t any, items map[string][]byte, entityData []byte) error

func IsCancel

func IsCancel(err error) bool

func IsRetriedErr

func IsRetriedErr(err error) bool

IsRetriedErr returns boolean value if the error is specified code.

func Load

func Load(tps ...any)

func NewOptions

func NewOptions(opts ...Option) *api.WorkflowOption

func SetTypeEntityField

func SetTypeEntityField(t any, data []byte) error

func StepToAPI

func StepToAPI(step Step) *api.Step

StepToAPI returns a new instance *api.Step based on the specified Step interface implementation.

func StepToWorkStep

func StepToWorkStep(step Step) *api.WorkflowStep

StepToWorkStep returns a new instance *api.WorkflowStep based on the specified Step interface implementation.

Types

type CallPack

type CallPack struct {
	// contains filtered or unexported fields
}

func NewCall

func NewCall(ctx context.Context, chunk *api.PipeCallRequest) *CallPack

func (*CallPack) Destroy

func (p *CallPack) Destroy()

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(cfg ClientConfig, attrs map[string]string) (*Client, error)

func (*Client) AbortWorkflow

func (c *Client) AbortWorkflow(ctx context.Context, wid string) error

func (*Client) Call

func (c *Client) Call(ctx context.Context, client, name string, data []byte) ([]byte, error)

func (*Client) Id

func (c *Client) Id() string

func (*Client) InspectWorkflow

func (c *Client) InspectWorkflow(ctx context.Context, wid string) (*api.Workflow, error)

func (*Client) ListRegistry added in v0.2.0

func (c *Client) ListRegistry(ctx context.Context) ([]*api.Entity, []*api.Echo, []*api.Step, error)

func (*Client) ListWorkFlow

func (c *Client) ListWorkFlow(ctx context.Context) ([]*api.WorkflowSnapshot, error)

func (*Client) ListWorker added in v0.2.0

func (c *Client) ListWorker(ctx context.Context) ([]*api.Worker, error)

func (*Client) NewSession

func (c *Client) NewSession() (*PipeSession, error)

func (*Client) PauseWorkflow

func (c *Client) PauseWorkflow(ctx context.Context, wid string) error

func (*Client) ResumeWorkflow

func (c *Client) ResumeWorkflow(ctx context.Context, wid string) error

func (*Client) Step

func (c *Client) Step(ctx context.Context, name string, action api.StepAction, items map[string][]byte, data []byte) ([]byte, error)

func (*Client) WatchWorkflow

func (c *Client) WatchWorkflow(ctx context.Context, wid string) (WorkflowWatcher, error)

type ClientConfig

type ClientConfig struct {
	// contains filtered or unexported fields
}

func NewConfig

func NewConfig(name, id, address string) ClientConfig

func (*ClientConfig) WithConn added in v0.2.2

func (c *ClientConfig) WithConn(conn vclient.Client) *ClientConfig

func (*ClientConfig) WithHeartbeat added in v0.2.2

func (c *ClientConfig) WithHeartbeat(t time.Duration) *ClientConfig

func (*ClientConfig) WithTimeout added in v0.2.2

func (c *ClientConfig) WithTimeout(t time.Duration) *ClientConfig

type ClientPipe

type ClientPipe struct {
	Id string
	// contains filtered or unexported fields
}

func NewPipe

func NewPipe(id string, pr *Peer, stream PipeStream) *ClientPipe

func (*ClientPipe) Call

func (p *ClientPipe) Call(pack *CallPack) (<-chan []byte, <-chan error)

func (*ClientPipe) Close

func (p *ClientPipe) Close()

func (*ClientPipe) Start

func (p *ClientPipe) Start()

func (*ClientPipe) Step

func (p *ClientPipe) Step(pack *StepPack) (<-chan []byte, <-chan error)

type ClientStore

type ClientStore struct {
	// contains filtered or unexported fields
}

func NewClientStore

func NewClientStore() *ClientStore

func (*ClientStore) GetEcho

func (s *ClientStore) GetEcho(name string) (Echo, bool)

func (*ClientStore) GetEntity

func (s *ClientStore) GetEntity(kind string) (Entity, bool)

func (*ClientStore) GetStep

func (s *ClientStore) GetStep(name string) (Step, bool)

type Echo

type Echo interface {
	Metadata() map[string]string
	Call(ctx context.Context, req []byte) ([]byte, error)
}

Echo 描述一个具体的请求

type EchoSet

type EchoSet struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewEchoSet

func NewEchoSet() *EchoSet

func (*EchoSet) Add

func (s *EchoSet) Add(echo *api.Echo)

func (*EchoSet) Contains

func (s *EchoSet) Contains(name string) bool

func (*EchoSet) Del

func (s *EchoSet) Del(echo *api.Echo)

func (*EchoSet) Get

func (s *EchoSet) Get(name string) (*api.Echo, bool)

func (*EchoSet) List added in v0.2.0

func (s *EchoSet) List() []*api.Echo

type Empty

type Empty struct {
	Name string `json:"name"`
}

func (*Empty) Marshal

func (e *Empty) Marshal() ([]byte, error)

func (*Empty) Metadata

func (e *Empty) Metadata() map[string]string

func (*Empty) OwnerReferences

func (e *Empty) OwnerReferences() []*api.OwnerReference

func (*Empty) Unmarshal

func (e *Empty) Unmarshal(data []byte) error

type EmptyEcho

type EmptyEcho struct{}

func (*EmptyEcho) Call

func (e *EmptyEcho) Call(ctx context.Context, req []byte) ([]byte, error)

func (*EmptyEcho) Metadata

func (e *EmptyEcho) Metadata() map[string]string

type EmptyStep

type EmptyStep struct {
	Client string
	E      *Empty `flow:"entity"`
	A      string `flow:"name:a"`
	B      int32  `flow:"name:b"`
	C      string `flow:"name:c"`
}

func NewEmptyStep

func NewEmptyStep(e *Empty) *EmptyStep

func (*EmptyStep) Cancel

func (s *EmptyStep) Cancel(ctx *PipeSessionCtx) error

func (*EmptyStep) Commit

func (s *EmptyStep) Commit(ctx *PipeSessionCtx) error

func (*EmptyStep) Metadata

func (s *EmptyStep) Metadata() map[string]string

func (*EmptyStep) Prepare

func (s *EmptyStep) Prepare(ctx *PipeSessionCtx) error

func (*EmptyStep) Rollback

func (s *EmptyStep) Rollback(ctx *PipeSessionCtx) error

type Entity

type Entity interface {
	// Metadata Entity 属性信息
	Metadata() map[string]string
	// OwnerReferences Entity 之间的依赖信息
	OwnerReferences() []*api.OwnerReference
	// Marshal Entity 序列化
	Marshal() ([]byte, error)
	// Unmarshal Entity 反序列化
	Unmarshal(data []byte) error
}

Entity 描述工作流中的具体资源,是工作流中的执行单元

type EntitySet

type EntitySet struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewEntitySet

func NewEntitySet() *EntitySet

func (*EntitySet) Add

func (s *EntitySet) Add(entity *api.Entity)

func (*EntitySet) Contains

func (s *EntitySet) Contains(name string) bool

func (*EntitySet) Del

func (s *EntitySet) Del(entity *api.Entity)

func (*EntitySet) Get

func (s *EntitySet) Get(kind string) (*api.Entity, bool)

func (*EntitySet) List added in v0.2.0

func (s *EntitySet) List() []*api.Entity

type MemberPipeHandler

type MemberPipeHandler func(*api.PipeResponse) (*api.PipeRequest, error)

type MemberPipeStream

type MemberPipeStream struct {
	// contains filtered or unexported fields
}

func NewMemberPipeStream

func NewMemberPipeStream(ctx context.Context, id string, handler MemberPipeHandler) *MemberPipeStream

func (*MemberPipeStream) Close

func (s *MemberPipeStream) Close() error

func (*MemberPipeStream) Context

func (s *MemberPipeStream) Context() context.Context

func (*MemberPipeStream) Recv

func (s *MemberPipeStream) Recv() (*api.PipeRequest, error)

func (*MemberPipeStream) Send

func (s *MemberPipeStream) Send(req *api.PipeResponse) error

type Option

type Option func(option *api.WorkflowOption)

Option represents a configuration option for Workflow struct. It provides a way to modify the fields of the Workflow struct a flexible ways.

func WithId

func WithId(id string) Option

WithId sets the Wid field of *api.WorkflowOption to the specified value.

func WithMaxRetry

func WithMaxRetry(retry int32) Option

WithMaxRetry sets the MaxRetries field of *api.WorkflowOption to the specified value.

func WithMode

func WithMode(mode api.WorkflowMode) Option

WithMode sets the Mode field of *api.WorkflowOption to the specified value.

func WithName

func WithName(name string) Option

WithName sets the Name field of *api.WorkflowOption to the specified value.

type Peer

type Peer struct {
	Server string
	Client string
}

type PipeSession

type PipeSession struct {
	// contains filtered or unexported fields
}

func NewPipeSession

func NewPipeSession(c *Client) (*PipeSession, error)

func (*PipeSession) Close

func (s *PipeSession) Close()

func (*PipeSession) ExecuteWorkflow

func (s *PipeSession) ExecuteWorkflow(ctx context.Context, spec *api.Workflow, watch bool) (WorkflowWatcher, error)

func (*PipeSession) IsConnected

func (s *PipeSession) IsConnected() bool

func (*PipeSession) NewWorkflow

func (s *PipeSession) NewWorkflow(opts ...Option) *WorkflowBuilder

type PipeSessionCtx

type PipeSessionCtx struct {
	context.Context
	// contains filtered or unexported fields
}

func NewSessionCtx

func NewSessionCtx(ctx context.Context, wid, step string, revision api.Revision, c *Client) *PipeSessionCtx

func (*PipeSessionCtx) Call

func (c *PipeSessionCtx) Call(ctx context.Context, target, echo string, data []byte, opts ...vclient.CallOption) ([]byte, error)

func (*PipeSessionCtx) Get

func (c *PipeSessionCtx) Get(ctx context.Context, key string, opts ...vclient.CallOption) ([]byte, error)

func (*PipeSessionCtx) Put

func (c *PipeSessionCtx) Put(ctx context.Context, key string, data any, opts ...vclient.CallOption) error

func (*PipeSessionCtx) Revision

func (c *PipeSessionCtx) Revision() *api.Revision

func (*PipeSessionCtx) Trace

func (c *PipeSessionCtx) Trace(ctx context.Context, text []byte, opts ...vclient.CallOption) error

func (*PipeSessionCtx) WorkflowID

func (c *PipeSessionCtx) WorkflowID() string

type PipeSet

type PipeSet struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewPipeSet

func NewPipeSet() *PipeSet

func (*PipeSet) Add

func (ps *PipeSet) Add(p *ClientPipe)

func (*PipeSet) Del

func (ps *PipeSet) Del(p *ClientPipe)

func (*PipeSet) Get

func (ps *PipeSet) Get(id string) (*ClientPipe, bool)

type PipeStream

type PipeStream interface {
	Context() context.Context
	Send(*api.PipeResponse) error
	Recv() (*api.PipeRequest, error)
	Close() error
}

type RpcServer

type RpcServer struct {
	// contains filtered or unexported fields
}

func NewRPCServer

func NewRPCServer(s vserver.Server, scheduler *Scheduler) (*RpcServer, error)

func (*RpcServer) AbortWorkflow

func (rs *RpcServer) AbortWorkflow(ctx context.Context, req *api.AbortWorkflowRequest, rsp *api.AbortWorkflowResponse) error

func (*RpcServer) Call

func (rs *RpcServer) Call(ctx context.Context, req *api.CallRequest, rsp *api.CallResponse) error

func (*RpcServer) Id

func (rs *RpcServer) Id() string

func (*RpcServer) InspectWorkflow

func (rs *RpcServer) InspectWorkflow(ctx context.Context, req *api.InspectWorkflowRequest, rsp *api.InspectWorkflowResponse) error

func (*RpcServer) ListRegistry added in v0.2.0

func (rs *RpcServer) ListRegistry(ctx context.Context, req *api.ListRegistryRequest, rsp *api.ListRegistryResponse) error

func (*RpcServer) ListWorker added in v0.2.0

func (rs *RpcServer) ListWorker(ctx context.Context, req *api.ListWorkerRequest, rsp *api.ListWorkerResponse) error

func (*RpcServer) ListWorkflow

func (rs *RpcServer) ListWorkflow(ctx context.Context, req *api.ListWorkflowRequest, rsp *api.ListWorkflowResponse) error

func (*RpcServer) PauseWorkflow

func (rs *RpcServer) PauseWorkflow(ctx context.Context, req *api.PauseWorkflowRequest, rsp *api.PauseWorkflowResponse) error

func (*RpcServer) Pipe

func (rs *RpcServer) Pipe(ctx context.Context, stream api.FlowRpc_PipeStream) error

func (*RpcServer) Register

func (rs *RpcServer) Register(ctx context.Context, req *api.RegisterRequest, rsp *api.RegisterResponse) error

func (*RpcServer) ResumeWorkflow

func (rs *RpcServer) ResumeWorkflow(ctx context.Context, req *api.ResumeWorkflowRequest, rsp *api.ResumeWorkflowResponse) error

func (*RpcServer) RunWorkflow

func (*RpcServer) Step

func (rs *RpcServer) Step(ctx context.Context, req *api.StepRequest, rsp *api.StepResponse) error

func (*RpcServer) StepGet

func (rs *RpcServer) StepGet(ctx context.Context, req *api.StepGetRequest, rsp *api.StepGetResponse) error

func (*RpcServer) StepPut

func (rs *RpcServer) StepPut(ctx context.Context, req *api.StepPutRequest, rsp *api.StepPutResponse) error

func (*RpcServer) StepTrace

func (rs *RpcServer) StepTrace(ctx context.Context, req *api.StepTraceRequest, rsp *api.StepTraceResponse) error

func (*RpcServer) Stop

func (rs *RpcServer) Stop() error

func (*RpcServer) WatchWorkflow

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

func NewScheduler

func NewScheduler(storage *clientv3.Client, size int) (*Scheduler, error)

func (*Scheduler) ExecuteWorkflow

func (s *Scheduler) ExecuteWorkflow(w *api.Workflow, ps *PipeSet) error

func (*Scheduler) GetRegistry added in v0.2.0

func (s *Scheduler) GetRegistry() (entities []*api.Entity, echoes []*api.Echo, steps []*api.Step)

func (*Scheduler) GetWorkers added in v0.2.0

func (s *Scheduler) GetWorkers(ctx context.Context) ([]*api.Worker, error)

func (*Scheduler) GetWorkflow

func (s *Scheduler) GetWorkflow(wid string) (*Workflow, bool)

func (*Scheduler) InspectWorkflow

func (s *Scheduler) InspectWorkflow(ctx context.Context, wid string) (*api.Workflow, error)

func (*Scheduler) IsClosed

func (s *Scheduler) IsClosed() bool

func (*Scheduler) Register

func (s *Scheduler) Register(worker *api.Worker, entities []*api.Entity, echoes []*api.Echo, steps []*api.Step) error

func (*Scheduler) StepGet

func (s *Scheduler) StepGet(ctx context.Context, wid, key string) ([]byte, error)

func (*Scheduler) StepPut

func (s *Scheduler) StepPut(ctx context.Context, wid, key, value string) error

func (*Scheduler) StepTrace

func (s *Scheduler) StepTrace(ctx context.Context, wid, step string, text []byte) error

func (*Scheduler) Stop

func (s *Scheduler) Stop(wait bool)

func (*Scheduler) WatchWorkflow

func (s *Scheduler) WatchWorkflow(ctx context.Context, wid string) (<-chan *api.WorkflowWatchResult, error)

func (*Scheduler) WorkflowSnapshots

func (s *Scheduler) WorkflowSnapshots() []*api.WorkflowSnapshot

type Step

type Step interface {
	Metadata() map[string]string

	Prepare(ctx *PipeSessionCtx) error

	Commit(ctx *PipeSessionCtx) error

	Rollback(ctx *PipeSessionCtx) error

	Cancel(ctx *PipeSessionCtx) error
}

Step 表示具有原子性的复杂操作

type StepPack

type StepPack struct {
	// contains filtered or unexported fields
}

func NewStep

func NewStep(ctx context.Context, chunk *api.PipeStepRequest) *StepPack

func (*StepPack) Destroy

func (p *StepPack) Destroy()

type StepSet

type StepSet struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewStepSet

func NewStepSet() *StepSet

func (*StepSet) Add

func (s *StepSet) Add(step *api.Step)

func (*StepSet) Contains

func (s *StepSet) Contains(name string) bool

func (*StepSet) Del

func (s *StepSet) Del(step *api.Step)

func (*StepSet) Get

func (s *StepSet) Get(name string) (*api.Step, bool)

func (*StepSet) List added in v0.2.0

func (s *StepSet) List() []*api.Step

type Tag

type Tag struct {
	Name     string
	IsEntity bool
}

type Workflow

type Workflow struct {
	// protects for api.Workflow and snapshot
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewWorkflow

func NewWorkflow(spec *api.Workflow) *Workflow

func (*Workflow) Abort

func (w *Workflow) Abort()

func (*Workflow) Cancel

func (w *Workflow) Cancel()

func (*Workflow) Context

func (w *Workflow) Context() context.Context

func (*Workflow) Execute

func (w *Workflow) Execute(ps *PipeSet, client *clientv3.Client)

func (*Workflow) ID

func (w *Workflow) ID() string

func (*Workflow) Init

func (w *Workflow) Init(client *clientv3.Client) (err error)

func (*Workflow) Inspect

func (w *Workflow) Inspect(ctx context.Context, client *clientv3.Client) (*api.Workflow, error)

func (*Workflow) IsAbort

func (w *Workflow) IsAbort() bool

func (*Workflow) IsStop

func (w *Workflow) IsStop() bool

func (*Workflow) NewSnapshot

func (w *Workflow) NewSnapshot() *api.WorkflowSnapshot

func (*Workflow) NewWatcher

func (w *Workflow) NewWatcher(ctx context.Context, client *clientv3.Client) (<-chan *api.WorkflowWatchResult, error)

func (*Workflow) Pause

func (w *Workflow) Pause() bool

func (*Workflow) Resume

func (w *Workflow) Resume() bool

type WorkflowBuilder

type WorkflowBuilder struct {
	// contains filtered or unexported fields
}

WorkflowBuilder the builder pattern is used to separate the construction of a complex object of its representation so that the same construction process can create different representations.

func NewBuilder

func NewBuilder(opts ...Option) *WorkflowBuilder

NewBuilder returns a new instance of the WorkflowBuilder struct. The builder can be used to construct a Workflow struct with specific options.

func (*WorkflowBuilder) Build

func (b *WorkflowBuilder) Build() *api.Workflow

Build returns a new Workflow struct based on the current configuration of the WorkflowBuilder.

func (*WorkflowBuilder) Entities

func (b *WorkflowBuilder) Entities(entities ...Entity) *WorkflowBuilder

Entities adds a slice of Entity interface implementations to Workflow struct.

func (*WorkflowBuilder) Items

func (b *WorkflowBuilder) Items(items map[string][]byte) *WorkflowBuilder

Items adds a map of key-value pairs to the Workflow struct.

func (*WorkflowBuilder) Steps

func (b *WorkflowBuilder) Steps(steps ...*api.WorkflowStep) *WorkflowBuilder

Steps adds a slice of Step interface implementations to the Workflow struct.

type WorkflowWatcher

type WorkflowWatcher interface {
	Next() (*api.WorkflowWatchResult, error)
}

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL