Documentation
¶
Index ¶
- Variables
- func EchoToAPI(echo Echo) *api.Echo
- func EntityToAPI(entity Entity) *api.Entity
- func ExtractFields(t any) []string
- func ExtractTypeField(t any) ([]byte, error)
- func GetTypePkgName(p reflect.Type) string
- func InjectTypeFields(t any, items, args map[string]string, entityData []byte) error
- func IsCancel(err error) bool
- func IsRetriedErr(err error) bool
- func Load(tps ...any)
- func NewOptions(opts ...Option) *api.WorkflowOption
- func SetTypeEntityField(t any, data []byte) error
- func StepToAPI(step Step) *api.Step
- func StepToWorkStep(step Step, worker string) *api.WorkflowStep
- type CallPack
- type Client
- func (c *Client) AbortWorkflow(ctx context.Context, wid string) error
- func (c *Client) Call(ctx context.Context, client, name string, data []byte, ...) ([]byte, error)
- func (c *Client) ExecuteWorkflow(ctx context.Context, spec *api.Workflow, watch bool) (WorkflowWatcher, error)
- func (c *Client) Id() string
- func (c *Client) InspectWorkflow(ctx context.Context, wid string) (*api.Workflow, error)
- func (c *Client) ListRegistry(ctx context.Context) ([]*api.Entity, []*api.Echo, []*api.Step, error)
- func (c *Client) ListWorkFlow(ctx context.Context) ([]*api.WorkflowSnapshot, error)
- func (c *Client) ListWorker(ctx context.Context) ([]*api.Worker, error)
- func (c *Client) NewSession() (*PipeSession, error)
- func (c *Client) NewWorkflow(opts ...Option) *WorkflowBuilder
- func (c *Client) PauseWorkflow(ctx context.Context, wid string) error
- func (c *Client) ResumeWorkflow(ctx context.Context, wid string) error
- func (c *Client) Step(ctx context.Context, name string, action api.StepAction, ...) ([]byte, error)
- func (c *Client) WatchWorkflow(ctx context.Context, wid string, opts ...vclient.CallOption) (WorkflowWatcher, error)
- type ClientConfig
- type ClientPipe
- type ClientStore
- type Echo
- type EchoSet
- type Empty
- type EmptyEcho
- type EmptyStep
- type Entity
- type EntitySet
- type MemberPipeHandler
- type MemberPipeStream
- type Option
- type Peer
- type PipeSession
- type PipeSessionCtx
- func (c *PipeSessionCtx) Call(ctx context.Context, target, echo string, data []byte, ...) ([]byte, error)
- func (c *PipeSessionCtx) Get(ctx context.Context, key string, opts ...vclient.CallOption) ([]byte, error)
- func (c *PipeSessionCtx) Put(ctx context.Context, key string, data any, opts ...vclient.CallOption) error
- func (c *PipeSessionCtx) Revision() *api.Revision
- func (c *PipeSessionCtx) Trace(ctx context.Context, text []byte, opts ...vclient.CallOption) error
- func (c *PipeSessionCtx) WorkflowID() string
- type PipeSet
- type PipeStream
- type RpcServer
- func (rs *RpcServer) AbortWorkflow(ctx context.Context, req *api.AbortWorkflowRequest, ...) error
- func (rs *RpcServer) Call(ctx context.Context, req *api.CallRequest, rsp *api.CallResponse) error
- func (rs *RpcServer) Id() string
- func (rs *RpcServer) InspectWorkflow(ctx context.Context, req *api.InspectWorkflowRequest, ...) error
- func (rs *RpcServer) ListRegistry(ctx context.Context, req *api.ListRegistryRequest, ...) error
- func (rs *RpcServer) ListWorker(ctx context.Context, req *api.ListWorkerRequest, rsp *api.ListWorkerResponse) error
- func (rs *RpcServer) ListWorkflow(ctx context.Context, req *api.ListWorkflowRequest, ...) error
- func (rs *RpcServer) PauseWorkflow(ctx context.Context, req *api.PauseWorkflowRequest, ...) error
- func (rs *RpcServer) Pipe(ctx context.Context, stream api.FlowRpc_PipeStream) error
- func (rs *RpcServer) Register(ctx context.Context, req *api.RegisterRequest, rsp *api.RegisterResponse) error
- func (rs *RpcServer) ResumeWorkflow(ctx context.Context, req *api.ResumeWorkflowRequest, ...) error
- func (rs *RpcServer) RunWorkflow(ctx context.Context, req *api.RunWorkflowRequest, ...) error
- func (rs *RpcServer) Step(ctx context.Context, req *api.StepRequest, rsp *api.StepResponse) error
- func (rs *RpcServer) StepGet(ctx context.Context, req *api.StepGetRequest, rsp *api.StepGetResponse) error
- func (rs *RpcServer) StepPut(ctx context.Context, req *api.StepPutRequest, rsp *api.StepPutResponse) error
- func (rs *RpcServer) StepTrace(ctx context.Context, req *api.StepTraceRequest, rsp *api.StepTraceResponse) error
- func (rs *RpcServer) Stop() error
- func (rs *RpcServer) WatchWorkflow(ctx context.Context, req *api.WatchWorkflowRequest, ...) error
- type Scheduler
- func (s *Scheduler) ExecuteWorkflow(w *api.Workflow, ps *PipeSet) error
- func (s *Scheduler) GetRegistry() (entities []*api.Entity, echoes []*api.Echo, steps []*api.Step)
- func (s *Scheduler) GetWorkers(ctx context.Context) ([]*api.Worker, error)
- func (s *Scheduler) GetWorkflow(wid string) (*Workflow, bool)
- func (s *Scheduler) InspectWorkflow(ctx context.Context, wid string) (*api.Workflow, error)
- func (s *Scheduler) IsClosed() bool
- func (s *Scheduler) Register(worker *api.Worker, entities []*api.Entity, echoes []*api.Echo, ...) error
- func (s *Scheduler) StepGet(ctx context.Context, wid, key string) ([]byte, error)
- func (s *Scheduler) StepPut(ctx context.Context, wid, key, value string) error
- func (s *Scheduler) StepTrace(ctx context.Context, wid, step string, text []byte) error
- func (s *Scheduler) Stop(wait bool)
- func (s *Scheduler) WatchWorkflow(ctx context.Context, wid string) (<-chan *api.WorkflowWatchResult, error)
- func (s *Scheduler) WorkflowSnapshots() []*api.WorkflowSnapshot
- type Step
- type StepPack
- type StepSet
- type Tag
- type TagKind
- type TestStep
- type Workflow
- func (w *Workflow) Abort()
- func (w *Workflow) Cancel()
- func (w *Workflow) Context() context.Context
- func (w *Workflow) Execute(ps *PipeSet, client *clientv3.Client)
- func (w *Workflow) ID() string
- func (w *Workflow) Init(client *clientv3.Client) (err error)
- func (w *Workflow) Inspect(ctx context.Context, client *clientv3.Client) (*api.Workflow, error)
- func (w *Workflow) IsAbort() bool
- func (w *Workflow) IsStop() bool
- func (w *Workflow) NewSnapshot() *api.WorkflowSnapshot
- func (w *Workflow) NewWatcher(ctx context.Context, client *clientv3.Client) (<-chan *api.WorkflowWatchResult, error)
- func (w *Workflow) Pause() bool
- func (w *Workflow) Resume() bool
- type WorkflowBuilder
- type WorkflowStepBuilder
- type WorkflowWatcher
Constants ¶
This section is empty.
Variables ¶
Functions ¶
func EchoToAPI ¶
EchoToAPI returns a new instance *api.Echo based on the specified Echo interface implementation.
func EntityToAPI ¶
EntityToAPI returns a new instance *api.Entity based on the specified Entity interface implementation.
func ExtractFields ¶
func ExtractTypeField ¶
func GetTypePkgName ¶
GetTypePkgName returns the package path and kind for object based on reflect.Type.
func InjectTypeFields ¶
func IsRetriedErr ¶
IsRetriedErr returns boolean value if the error is specified code.
func NewOptions ¶
func NewOptions(opts ...Option) *api.WorkflowOption
func SetTypeEntityField ¶
func StepToAPI ¶
StepToAPI returns a new instance *api.Step based on the specified Step interface implementation.
func StepToWorkStep ¶
func StepToWorkStep(step Step, worker string) *api.WorkflowStep
StepToWorkStep returns a new instance *api.WorkflowStep based on the specified Step interface implementation.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func (*Client) AbortWorkflow ¶
func (*Client) ExecuteWorkflow ¶ added in v0.5.0
func (*Client) InspectWorkflow ¶
func (*Client) ListRegistry ¶ added in v0.2.0
func (*Client) ListWorkFlow ¶
func (*Client) ListWorker ¶ added in v0.2.0
func (*Client) NewSession ¶
func (c *Client) NewSession() (*PipeSession, error)
func (*Client) NewWorkflow ¶ added in v0.5.0
func (c *Client) NewWorkflow(opts ...Option) *WorkflowBuilder
func (*Client) PauseWorkflow ¶
func (*Client) ResumeWorkflow ¶
func (*Client) WatchWorkflow ¶
func (c *Client) WatchWorkflow(ctx context.Context, wid string, opts ...vclient.CallOption) (WorkflowWatcher, error)
type ClientConfig ¶
type ClientConfig struct {
// contains filtered or unexported fields
}
func NewConfig ¶
func NewConfig(name, id, address string) ClientConfig
func (*ClientConfig) WithConn ¶ added in v0.2.2
func (c *ClientConfig) WithConn(conn vclient.Client) *ClientConfig
func (*ClientConfig) WithHeartbeat ¶ added in v0.2.2
func (c *ClientConfig) WithHeartbeat(t time.Duration) *ClientConfig
func (*ClientConfig) WithStore ¶ added in v0.4.3
func (c *ClientConfig) WithStore(store *ClientStore) *ClientConfig
func (*ClientConfig) WithTimeout ¶ added in v0.2.2
func (c *ClientConfig) WithTimeout(t time.Duration) *ClientConfig
type ClientPipe ¶
type ClientPipe struct { Id string // contains filtered or unexported fields }
func NewPipe ¶
func NewPipe(id string, pr *Peer, stream PipeStream) *ClientPipe
func (*ClientPipe) Close ¶
func (p *ClientPipe) Close()
func (*ClientPipe) Start ¶
func (p *ClientPipe) Start()
type ClientStore ¶
type ClientStore struct {
// contains filtered or unexported fields
}
func NewClientStore ¶
func NewClientStore() *ClientStore
func (*ClientStore) Load ¶ added in v0.4.3
func (s *ClientStore) Load(ts ...any)
type Echo ¶
type Echo interface { // Owner 所属 Entity 信息 Owner() reflect.Type Call(ctx context.Context, data []byte) ([]byte, error) // String Echo 描述信息 String() string }
Echo 描述一个具体的请求
type EchoSet ¶
func NewEchoSet ¶
func NewEchoSet() *EchoSet
type Empty ¶
type Empty struct {
Name string `json:"name"`
}
func (*Empty) OwnerReferences ¶
func (e *Empty) OwnerReferences() []*api.OwnerReference
type EmptyStep ¶
type EmptyStep struct{}
func (*EmptyStep) Cancel ¶
func (s *EmptyStep) Cancel(ctx *PipeSessionCtx) error
func (*EmptyStep) Commit ¶
func (s *EmptyStep) Commit(ctx *PipeSessionCtx) error
func (*EmptyStep) Prepare ¶
func (s *EmptyStep) Prepare(ctx *PipeSessionCtx) error
func (*EmptyStep) Rollback ¶
func (s *EmptyStep) Rollback(ctx *PipeSessionCtx) error
type Entity ¶
type Entity interface { // OwnerReferences Entity 之间的依赖信息 OwnerReferences() []*api.OwnerReference // Marshal Entity 序列化 Marshal() ([]byte, error) // Unmarshal Entity 反序列化 Unmarshal(data []byte) error // String Entity 说明 String() string }
Entity 描述工作流中的具体资源,是工作流中的执行单元
type EntitySet ¶
func NewEntitySet ¶
func NewEntitySet() *EntitySet
type MemberPipeHandler ¶
type MemberPipeHandler func(*api.PipeResponse) (*api.PipeRequest, error)
type MemberPipeStream ¶
type MemberPipeStream struct {
// contains filtered or unexported fields
}
func NewMemberPipeStream ¶
func NewMemberPipeStream(ctx context.Context, id string, handler MemberPipeHandler) *MemberPipeStream
func (*MemberPipeStream) Close ¶
func (s *MemberPipeStream) Close() error
func (*MemberPipeStream) Context ¶
func (s *MemberPipeStream) Context() context.Context
func (*MemberPipeStream) Recv ¶
func (s *MemberPipeStream) Recv() (*api.PipeRequest, error)
func (*MemberPipeStream) Send ¶
func (s *MemberPipeStream) Send(req *api.PipeResponse) error
type Option ¶
type Option func(option *api.WorkflowOption)
Option represents a configuration option for Workflow struct. It provides a way to modify the fields of the Workflow struct a flexible ways.
func WithMaxRetry ¶
WithMaxRetry sets the MaxRetries field of *api.WorkflowOption to the specified value.
func WithMode ¶
func WithMode(mode api.WorkflowMode) Option
WithMode sets the Mode field of *api.WorkflowOption to the specified value.
type PipeSession ¶
type PipeSession struct {
// contains filtered or unexported fields
}
func NewPipeSession ¶
func NewPipeSession(c *Client) (*PipeSession, error)
func (*PipeSession) Close ¶
func (s *PipeSession) Close()
func (*PipeSession) IsConnected ¶
func (s *PipeSession) IsConnected() bool
type PipeSessionCtx ¶
func NewSessionCtx ¶
func (*PipeSessionCtx) Call ¶
func (c *PipeSessionCtx) Call(ctx context.Context, target, echo string, data []byte, opts ...vclient.CallOption) ([]byte, error)
func (*PipeSessionCtx) Get ¶
func (c *PipeSessionCtx) Get(ctx context.Context, key string, opts ...vclient.CallOption) ([]byte, error)
func (*PipeSessionCtx) Put ¶
func (c *PipeSessionCtx) Put(ctx context.Context, key string, data any, opts ...vclient.CallOption) error
func (*PipeSessionCtx) Revision ¶
func (c *PipeSessionCtx) Revision() *api.Revision
func (*PipeSessionCtx) Trace ¶
func (c *PipeSessionCtx) Trace(ctx context.Context, text []byte, opts ...vclient.CallOption) error
func (*PipeSessionCtx) WorkflowID ¶
func (c *PipeSessionCtx) WorkflowID() string
type PipeSet ¶
func NewPipeSet ¶
func NewPipeSet() *PipeSet
func (*PipeSet) Add ¶
func (ps *PipeSet) Add(p *ClientPipe)
func (*PipeSet) Del ¶
func (ps *PipeSet) Del(p *ClientPipe)
type PipeStream ¶
type PipeStream interface { Context() context.Context Send(*api.PipeResponse) error Recv() (*api.PipeRequest, error) Close() error }
type RpcServer ¶
type RpcServer struct {
// contains filtered or unexported fields
}
func (*RpcServer) AbortWorkflow ¶
func (rs *RpcServer) AbortWorkflow(ctx context.Context, req *api.AbortWorkflowRequest, rsp *api.AbortWorkflowResponse) error
func (*RpcServer) Call ¶
func (rs *RpcServer) Call(ctx context.Context, req *api.CallRequest, rsp *api.CallResponse) error
func (*RpcServer) InspectWorkflow ¶
func (rs *RpcServer) InspectWorkflow(ctx context.Context, req *api.InspectWorkflowRequest, rsp *api.InspectWorkflowResponse) error
func (*RpcServer) ListRegistry ¶ added in v0.2.0
func (rs *RpcServer) ListRegistry(ctx context.Context, req *api.ListRegistryRequest, rsp *api.ListRegistryResponse) error
func (*RpcServer) ListWorker ¶ added in v0.2.0
func (rs *RpcServer) ListWorker(ctx context.Context, req *api.ListWorkerRequest, rsp *api.ListWorkerResponse) error
func (*RpcServer) ListWorkflow ¶
func (rs *RpcServer) ListWorkflow(ctx context.Context, req *api.ListWorkflowRequest, rsp *api.ListWorkflowResponse) error
func (*RpcServer) PauseWorkflow ¶
func (rs *RpcServer) PauseWorkflow(ctx context.Context, req *api.PauseWorkflowRequest, rsp *api.PauseWorkflowResponse) error
func (*RpcServer) Register ¶
func (rs *RpcServer) Register(ctx context.Context, req *api.RegisterRequest, rsp *api.RegisterResponse) error
func (*RpcServer) ResumeWorkflow ¶
func (rs *RpcServer) ResumeWorkflow(ctx context.Context, req *api.ResumeWorkflowRequest, rsp *api.ResumeWorkflowResponse) error
func (*RpcServer) RunWorkflow ¶
func (rs *RpcServer) RunWorkflow(ctx context.Context, req *api.RunWorkflowRequest, stream api.FlowRpc_RunWorkflowStream) error
func (*RpcServer) Step ¶
func (rs *RpcServer) Step(ctx context.Context, req *api.StepRequest, rsp *api.StepResponse) error
func (*RpcServer) StepGet ¶
func (rs *RpcServer) StepGet(ctx context.Context, req *api.StepGetRequest, rsp *api.StepGetResponse) error
func (*RpcServer) StepPut ¶
func (rs *RpcServer) StepPut(ctx context.Context, req *api.StepPutRequest, rsp *api.StepPutResponse) error
func (*RpcServer) StepTrace ¶
func (rs *RpcServer) StepTrace(ctx context.Context, req *api.StepTraceRequest, rsp *api.StepTraceResponse) error
func (*RpcServer) WatchWorkflow ¶
func (rs *RpcServer) WatchWorkflow(ctx context.Context, req *api.WatchWorkflowRequest, stream api.FlowRpc_WatchWorkflowStream) error
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
func NewScheduler ¶
func (*Scheduler) ExecuteWorkflow ¶
func (*Scheduler) GetRegistry ¶ added in v0.2.0
func (*Scheduler) GetWorkers ¶ added in v0.2.0
func (*Scheduler) InspectWorkflow ¶
func (*Scheduler) WatchWorkflow ¶
func (*Scheduler) WorkflowSnapshots ¶
func (s *Scheduler) WorkflowSnapshots() []*api.WorkflowSnapshot
type Step ¶
type Step interface { // Owner Step 所属 Entity Owner() reflect.Type Prepare(ctx *PipeSessionCtx) error Commit(ctx *PipeSessionCtx) error Rollback(ctx *PipeSessionCtx) error Cancel(ctx *PipeSessionCtx) error // String Step 描述信息 String() string }
Step 表示具有原子性的复杂操作
type StepSet ¶
func NewStepSet ¶
func NewStepSet() *StepSet
type TestStep ¶ added in v0.6.0
type TestStep struct { E *Empty `flow:"entity"` A string `flow:"ctx:a"` B int32 `flow:"ctx:b"` C string `flow:"ctx:c"` ArgsA string `flow:"args:a"` // contains filtered or unexported fields }
func (*TestStep) Cancel ¶ added in v0.6.0
func (s *TestStep) Cancel(ctx *PipeSessionCtx) error
func (*TestStep) Commit ¶ added in v0.6.0
func (s *TestStep) Commit(ctx *PipeSessionCtx) error
func (*TestStep) Prepare ¶ added in v0.6.0
func (s *TestStep) Prepare(ctx *PipeSessionCtx) error
func (*TestStep) Rollback ¶ added in v0.6.0
func (s *TestStep) Rollback(ctx *PipeSessionCtx) error
type Workflow ¶
type Workflow struct { // protects for api.Workflow and snapshot sync.RWMutex // contains filtered or unexported fields }
func NewWorkflow ¶
func (*Workflow) NewSnapshot ¶
func (w *Workflow) NewSnapshot() *api.WorkflowSnapshot
func (*Workflow) NewWatcher ¶
type WorkflowBuilder ¶
type WorkflowBuilder struct {
// contains filtered or unexported fields
}
WorkflowBuilder the builder pattern is used to separate the construction of a complex object of its representation so that the same construction process can create different representations.
func NewBuilder ¶
func NewBuilder(opts ...Option) *WorkflowBuilder
NewBuilder returns a new instance of the WorkflowBuilder struct. The builder can be used to construct a Workflow struct with specific options.
func (*WorkflowBuilder) Build ¶
func (b *WorkflowBuilder) Build() *api.Workflow
Build returns a new Workflow struct based on the current configuration of the WorkflowBuilder.
func (*WorkflowBuilder) Entities ¶
func (b *WorkflowBuilder) Entities(entities ...Entity) *WorkflowBuilder
Entities adds a slice of Entity interface implementations to Workflow struct.
func (*WorkflowBuilder) Items ¶
func (b *WorkflowBuilder) Items(items map[string]string) *WorkflowBuilder
Items adds a map of key-value pairs to the Workflow struct.
func (*WorkflowBuilder) Steps ¶
func (b *WorkflowBuilder) Steps(steps ...*api.WorkflowStep) *WorkflowBuilder
Steps adds a slice of Step interface implementations to the Workflow struct.
type WorkflowStepBuilder ¶ added in v0.7.0
type WorkflowStepBuilder struct {
// contains filtered or unexported fields
}
WorkflowStepBuilder the builder pattern is used to separate the construction of a complex object of its representation so that the same construction process can create different representations.
func NewStepBuilder ¶ added in v0.7.0
func NewStepBuilder(step Step, worker string) *WorkflowStepBuilder
func (*WorkflowStepBuilder) Arg ¶ added in v0.7.0
func (b *WorkflowStepBuilder) Arg(k string, v any) *WorkflowStepBuilder
func (*WorkflowStepBuilder) Args ¶ added in v0.7.0
func (b *WorkflowStepBuilder) Args(args map[string]any) *WorkflowStepBuilder
func (*WorkflowStepBuilder) Build ¶ added in v0.7.0
func (b *WorkflowStepBuilder) Build() *api.WorkflowStep
func (*WorkflowStepBuilder) ID ¶ added in v0.7.0
func (b *WorkflowStepBuilder) ID(id string) *WorkflowStepBuilder
type WorkflowWatcher ¶
type WorkflowWatcher interface {
Next() (*api.WorkflowWatchResult, error)
}
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
protoc-gen-flow
protoc-gen-vine is a plugin for the Google protocol buffer compiler to generate Go code.
|
protoc-gen-vine is a plugin for the Google protocol buffer compiler to generate Go code. |
examples
|
|