Documentation
¶
Index ¶
- Constants
- Variables
- func EchoToAPI(echo Echo) *api.Echo
- func EntityToAPI(entity Entity) *api.Entity
- func ExtractFields(t any) []string
- func ExtractTypeField(t any) ([]byte, error)
- func GetTypePkgName(p reflect.Type) string
- func InjectTypeFields(t any, items map[string][]byte, entityData []byte) error
- func IsCancel(err error) bool
- func IsRetriedErr(err error) bool
- func Load(tps ...any)
- func NewOptions(opts ...Option) *api.WorkflowOption
- func SetTypeEntityField(t any, data []byte) error
- func StepToAPI(step Step) *api.Step
- func StepToWorkStep(step Step) *api.WorkflowStep
- type CallPack
- type Client
- func (c *Client) AbortWorkflow(ctx context.Context, wid string) error
- func (c *Client) Call(ctx context.Context, client, name string, data []byte) ([]byte, error)
- func (c *Client) Id() string
- func (c *Client) InspectWorkflow(ctx context.Context, wid string) (*api.Workflow, error)
- func (c *Client) ListRegistry(ctx context.Context) ([]*api.Entity, []*api.Echo, []*api.Step, error)
- func (c *Client) ListWorkFlow(ctx context.Context) ([]*api.WorkflowSnapshot, error)
- func (c *Client) ListWorker(ctx context.Context) ([]*api.Worker, error)
- func (c *Client) NewSession() (*PipeSession, error)
- func (c *Client) PauseWorkflow(ctx context.Context, wid string) error
- func (c *Client) ResumeWorkflow(ctx context.Context, wid string) error
- func (c *Client) Step(ctx context.Context, name string, action api.StepAction, ...) ([]byte, error)
- func (c *Client) WatchWorkflow(ctx context.Context, wid string) (WorkflowWatcher, error)
- type ClientConfig
- type ClientPipe
- type ClientStore
- type Echo
- type EchoSet
- type Empty
- type EmptyEcho
- type EmptyStep
- type Entity
- type EntitySet
- type MemberPipeHandler
- type MemberPipeStream
- type Option
- type Peer
- type PipeSession
- type PipeSessionCtx
- func (c *PipeSessionCtx) Call(ctx context.Context, data []byte) ([]byte, error)
- func (c *PipeSessionCtx) Get(ctx context.Context, key string) ([]byte, error)
- func (c *PipeSessionCtx) Put(ctx context.Context, key string, data any) error
- func (c *PipeSessionCtx) Revision() *api.Revision
- func (c *PipeSessionCtx) Trace(ctx context.Context, text []byte) error
- func (c *PipeSessionCtx) WorkflowID() string
- type PipeSet
- type PipeStream
- type RpcServer
- func (rs *RpcServer) AbortWorkflow(ctx context.Context, req *api.AbortWorkflowRequest, ...) error
- func (rs *RpcServer) Call(ctx context.Context, req *api.CallRequest, rsp *api.CallResponse) error
- func (rs *RpcServer) Id() string
- func (rs *RpcServer) InspectWorkflow(ctx context.Context, req *api.InspectWorkflowRequest, ...) error
- func (rs *RpcServer) ListRegistry(ctx context.Context, req *api.ListRegistryRequest, ...) error
- func (rs *RpcServer) ListWorker(ctx context.Context, req *api.ListWorkerRequest, rsp *api.ListWorkerResponse) error
- func (rs *RpcServer) ListWorkflow(ctx context.Context, req *api.ListWorkflowRequest, ...) error
- func (rs *RpcServer) PauseWorkflow(ctx context.Context, req *api.PauseWorkflowRequest, ...) error
- func (rs *RpcServer) Pipe(ctx context.Context, stream api.FlowRpc_PipeStream) error
- func (rs *RpcServer) Register(ctx context.Context, req *api.RegisterRequest, rsp *api.RegisterResponse) error
- func (rs *RpcServer) ResumeWorkflow(ctx context.Context, req *api.ResumeWorkflowRequest, ...) error
- func (rs *RpcServer) RunWorkflow(ctx context.Context, req *api.RunWorkflowRequest, ...) error
- func (rs *RpcServer) Step(ctx context.Context, req *api.StepRequest, rsp *api.StepResponse) error
- func (rs *RpcServer) StepGet(ctx context.Context, req *api.StepGetRequest, rsp *api.StepGetResponse) error
- func (rs *RpcServer) StepPut(ctx context.Context, req *api.StepPutRequest, rsp *api.StepPutResponse) error
- func (rs *RpcServer) StepTrace(ctx context.Context, req *api.StepTraceRequest, rsp *api.StepTraceResponse) error
- func (rs *RpcServer) Stop() error
- func (rs *RpcServer) WatchWorkflow(ctx context.Context, req *api.WatchWorkflowRequest, ...) error
- type Scheduler
- func (s *Scheduler) ExecuteWorkflow(w *api.Workflow, ps *PipeSet) error
- func (s *Scheduler) GetRegistry() (entities []*api.Entity, echoes []*api.Echo, steps []*api.Step)
- func (s *Scheduler) GetWorkers(ctx context.Context) ([]string, error)
- func (s *Scheduler) GetWorkflow(wid string) (*Workflow, bool)
- func (s *Scheduler) InspectWorkflow(ctx context.Context, wid string) (*api.Workflow, error)
- func (s *Scheduler) IsClosed() bool
- func (s *Scheduler) Register(worker string, entities []*api.Entity, echoes []*api.Echo, steps []*api.Step) error
- func (s *Scheduler) StepGet(ctx context.Context, wid, key string) ([]byte, error)
- func (s *Scheduler) StepPut(ctx context.Context, wid, key, value string) error
- func (s *Scheduler) StepTrace(ctx context.Context, wid, step string, text []byte) error
- func (s *Scheduler) Stop(wait bool)
- func (s *Scheduler) WatchWorkflow(ctx context.Context, wid string) (<-chan *api.WorkflowWatchResult, error)
- func (s *Scheduler) WorkflowSnapshots() []*api.WorkflowSnapshot
- type Step
- type StepPack
- type StepSet
- type Tag
- type Workflow
- func (w *Workflow) Abort()
- func (w *Workflow) Cancel()
- func (w *Workflow) Context() context.Context
- func (w *Workflow) Execute(ps *PipeSet, client *clientv3.Client)
- func (w *Workflow) ID() string
- func (w *Workflow) Init(client *clientv3.Client) (err error)
- func (w *Workflow) Inspect(ctx context.Context, client *clientv3.Client) (*api.Workflow, error)
- func (w *Workflow) IsAbort() bool
- func (w *Workflow) IsStop() bool
- func (w *Workflow) NewSnapshot() *api.WorkflowSnapshot
- func (w *Workflow) NewWatcher(ctx context.Context, client *clientv3.Client) (<-chan *api.WorkflowWatchResult, error)
- func (w *Workflow) Pause() bool
- func (w *Workflow) Resume() bool
- type WorkflowBuilder
- type WorkflowWatcher
Constants ¶
const ( EntityUnique = "flow-entity-unique" EntityKind = "flow-entity-kind" EntityDesc = "flow-entity-desc" EntityWorker = "flow-entity-worker" EntityEndpoint = "flow-entity-endpoint" )
the metadata of Entity
const ( EchoName = "flow-echo-name" EchoID = "flow-echo-id" EchoDesc = "flow-echo-desc" EchoOwner = "flow-echo-owner" EchoWorker = "flow-echo-worker" EchoEndpoint = "flow-echo-endpoint" )
the metadata of echo
const ( StepName = "flow-step-name" StepId = "flow-step-id" StepDesc = "flow-step-desc" StepOwner = "flow-step-owner" StepWorker = "flow-step-worker" StepEndpoint = "flow-step-endpoint" )
the metadata of Step
Variables ¶
Functions ¶
func EchoToAPI ¶
EchoToAPI returns a new instance *api.Echo based on the specified Echo interface implementation.
func EntityToAPI ¶
EntityToAPI returns a new instance *api.Entity based on the specified Entity interface implementation.
func ExtractFields ¶
func ExtractTypeField ¶
func GetTypePkgName ¶
GetTypePkgName returns the package path and kind for object based on reflect.Type.
func InjectTypeFields ¶
func IsRetriedErr ¶
IsRetriedErr returns boolean value if the error is specified code.
func NewOptions ¶
func NewOptions(opts ...Option) *api.WorkflowOption
func SetTypeEntityField ¶
func StepToAPI ¶
StepToAPI returns a new instance *api.Step based on the specified Step interface implementation.
func StepToWorkStep ¶
func StepToWorkStep(step Step) *api.WorkflowStep
StepToWorkStep returns a new instance *api.WorkflowStep based on the specified Step interface implementation.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func NewClient ¶
func NewClient(cfg ClientConfig) (*Client, error)
func (*Client) AbortWorkflow ¶
func (*Client) InspectWorkflow ¶
func (*Client) ListRegistry ¶ added in v0.2.0
func (*Client) ListWorkFlow ¶
func (*Client) ListWorker ¶ added in v0.2.0
func (*Client) NewSession ¶
func (c *Client) NewSession() (*PipeSession, error)
func (*Client) PauseWorkflow ¶
func (*Client) ResumeWorkflow ¶
func (*Client) WatchWorkflow ¶
type ClientConfig ¶
type ClientConfig struct {
// contains filtered or unexported fields
}
func NewConfig ¶
func NewConfig(name, id, address string) ClientConfig
type ClientPipe ¶
type ClientPipe struct { Id string // contains filtered or unexported fields }
func NewPipe ¶
func NewPipe(id string, pr *Peer, stream PipeStream) *ClientPipe
func (*ClientPipe) Close ¶
func (p *ClientPipe) Close()
func (*ClientPipe) Start ¶
func (p *ClientPipe) Start()
type ClientStore ¶
type ClientStore struct {
// contains filtered or unexported fields
}
func NewClientStore ¶
func NewClientStore() *ClientStore
type Echo ¶
type Echo interface { Metadata() map[string]string Call(ctx context.Context, req []byte) ([]byte, error) }
Echo 描述一个具体的请求
type EchoSet ¶
func NewEchoSet ¶
func NewEchoSet() *EchoSet
type Empty ¶
type Empty struct {
Name string `json:"name"`
}
func (*Empty) OwnerReferences ¶
func (e *Empty) OwnerReferences() []*api.OwnerReference
type EmptyStep ¶
type EmptyStep struct { Client string E *Empty `flow:"entity"` A string `flow:"name:a"` B int32 `flow:"name:b"` C string `flow:"name:c"` }
func NewEmptyStep ¶
func (*EmptyStep) Cancel ¶
func (s *EmptyStep) Cancel(ctx *PipeSessionCtx) error
func (*EmptyStep) Commit ¶
func (s *EmptyStep) Commit(ctx *PipeSessionCtx) error
func (*EmptyStep) Prepare ¶
func (s *EmptyStep) Prepare(ctx *PipeSessionCtx) error
func (*EmptyStep) Rollback ¶
func (s *EmptyStep) Rollback(ctx *PipeSessionCtx) error
type Entity ¶
type Entity interface { // Metadata Entity 属性信息 Metadata() map[string]string // OwnerReferences Entity 之间的依赖信息 OwnerReferences() []*api.OwnerReference // Marshal Entity 序列化 Marshal() ([]byte, error) // Unmarshal Entity 反序列化 Unmarshal(data []byte) error }
Entity 描述工作流中的具体资源,是工作流中的执行单元
type EntitySet ¶
func NewEntitySet ¶
func NewEntitySet() *EntitySet
type MemberPipeHandler ¶
type MemberPipeHandler func(*api.PipeResponse) (*api.PipeRequest, error)
type MemberPipeStream ¶
type MemberPipeStream struct {
// contains filtered or unexported fields
}
func NewMemberPipeStream ¶
func NewMemberPipeStream(ctx context.Context, id string, handler MemberPipeHandler) *MemberPipeStream
func (*MemberPipeStream) Close ¶
func (s *MemberPipeStream) Close() error
func (*MemberPipeStream) Context ¶
func (s *MemberPipeStream) Context() context.Context
func (*MemberPipeStream) Recv ¶
func (s *MemberPipeStream) Recv() (*api.PipeRequest, error)
func (*MemberPipeStream) Send ¶
func (s *MemberPipeStream) Send(req *api.PipeResponse) error
type Option ¶
type Option func(option *api.WorkflowOption)
Option represents a configuration option for Workflow struct. It provides a way to modify the fields of the Workflow struct a flexible ways.
func WithMaxRetry ¶
WithMaxRetry sets the MaxRetries field of *api.WorkflowOption to the specified value.
func WithMode ¶
func WithMode(mode api.WorkflowMode) Option
WithMode sets the Mode field of *api.WorkflowOption to the specified value.
type PipeSession ¶
type PipeSession struct {
// contains filtered or unexported fields
}
func NewPipeSession ¶
func NewPipeSession(c *Client) (*PipeSession, error)
func (*PipeSession) Close ¶
func (s *PipeSession) Close()
func (*PipeSession) ExecuteWorkflow ¶
func (s *PipeSession) ExecuteWorkflow(ctx context.Context, spec *api.Workflow, watch bool) (WorkflowWatcher, error)
func (*PipeSession) IsConnected ¶
func (s *PipeSession) IsConnected() bool
func (*PipeSession) NewWorkflow ¶
func (s *PipeSession) NewWorkflow(opts ...Option) *WorkflowBuilder
type PipeSessionCtx ¶
func NewSessionCtx ¶
func (*PipeSessionCtx) Revision ¶
func (c *PipeSessionCtx) Revision() *api.Revision
func (*PipeSessionCtx) Trace ¶
func (c *PipeSessionCtx) Trace(ctx context.Context, text []byte) error
func (*PipeSessionCtx) WorkflowID ¶
func (c *PipeSessionCtx) WorkflowID() string
type PipeSet ¶
func NewPipeSet ¶
func NewPipeSet() *PipeSet
func (*PipeSet) Add ¶
func (ps *PipeSet) Add(p *ClientPipe)
func (*PipeSet) Del ¶
func (ps *PipeSet) Del(p *ClientPipe)
type PipeStream ¶
type PipeStream interface { Context() context.Context Send(*api.PipeResponse) error Recv() (*api.PipeRequest, error) Close() error }
type RpcServer ¶
type RpcServer struct {
// contains filtered or unexported fields
}
func (*RpcServer) AbortWorkflow ¶
func (rs *RpcServer) AbortWorkflow(ctx context.Context, req *api.AbortWorkflowRequest, rsp *api.AbortWorkflowResponse) error
func (*RpcServer) Call ¶
func (rs *RpcServer) Call(ctx context.Context, req *api.CallRequest, rsp *api.CallResponse) error
func (*RpcServer) InspectWorkflow ¶
func (rs *RpcServer) InspectWorkflow(ctx context.Context, req *api.InspectWorkflowRequest, rsp *api.InspectWorkflowResponse) error
func (*RpcServer) ListRegistry ¶ added in v0.2.0
func (rs *RpcServer) ListRegistry(ctx context.Context, req *api.ListRegistryRequest, rsp *api.ListRegistryResponse) error
func (*RpcServer) ListWorker ¶ added in v0.2.0
func (rs *RpcServer) ListWorker(ctx context.Context, req *api.ListWorkerRequest, rsp *api.ListWorkerResponse) error
func (*RpcServer) ListWorkflow ¶
func (rs *RpcServer) ListWorkflow(ctx context.Context, req *api.ListWorkflowRequest, rsp *api.ListWorkflowResponse) error
func (*RpcServer) PauseWorkflow ¶
func (rs *RpcServer) PauseWorkflow(ctx context.Context, req *api.PauseWorkflowRequest, rsp *api.PauseWorkflowResponse) error
func (*RpcServer) Register ¶
func (rs *RpcServer) Register(ctx context.Context, req *api.RegisterRequest, rsp *api.RegisterResponse) error
func (*RpcServer) ResumeWorkflow ¶
func (rs *RpcServer) ResumeWorkflow(ctx context.Context, req *api.ResumeWorkflowRequest, rsp *api.ResumeWorkflowResponse) error
func (*RpcServer) RunWorkflow ¶
func (rs *RpcServer) RunWorkflow(ctx context.Context, req *api.RunWorkflowRequest, stream api.FlowRpc_RunWorkflowStream) error
func (*RpcServer) Step ¶
func (rs *RpcServer) Step(ctx context.Context, req *api.StepRequest, rsp *api.StepResponse) error
func (*RpcServer) StepGet ¶
func (rs *RpcServer) StepGet(ctx context.Context, req *api.StepGetRequest, rsp *api.StepGetResponse) error
func (*RpcServer) StepPut ¶
func (rs *RpcServer) StepPut(ctx context.Context, req *api.StepPutRequest, rsp *api.StepPutResponse) error
func (*RpcServer) StepTrace ¶
func (rs *RpcServer) StepTrace(ctx context.Context, req *api.StepTraceRequest, rsp *api.StepTraceResponse) error
func (*RpcServer) WatchWorkflow ¶
func (rs *RpcServer) WatchWorkflow(ctx context.Context, req *api.WatchWorkflowRequest, stream api.FlowRpc_WatchWorkflowStream) error
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
func NewScheduler ¶
func (*Scheduler) ExecuteWorkflow ¶
func (*Scheduler) GetRegistry ¶ added in v0.2.0
func (*Scheduler) GetWorkers ¶ added in v0.2.0
func (*Scheduler) InspectWorkflow ¶
func (*Scheduler) WatchWorkflow ¶
func (*Scheduler) WorkflowSnapshots ¶
func (s *Scheduler) WorkflowSnapshots() []*api.WorkflowSnapshot
type Step ¶
type Step interface { Metadata() map[string]string Prepare(ctx *PipeSessionCtx) error Commit(ctx *PipeSessionCtx) error Rollback(ctx *PipeSessionCtx) error Cancel(ctx *PipeSessionCtx) error }
Step 表示具有原子性的复杂操作
type StepSet ¶
func NewStepSet ¶
func NewStepSet() *StepSet
type Workflow ¶
type Workflow struct { // protects for api.Workflow and snapshot sync.RWMutex // contains filtered or unexported fields }
func NewWorkflow ¶
func (*Workflow) NewSnapshot ¶
func (w *Workflow) NewSnapshot() *api.WorkflowSnapshot
func (*Workflow) NewWatcher ¶
type WorkflowBuilder ¶
type WorkflowBuilder struct {
// contains filtered or unexported fields
}
WorkflowBuilder the builder pattern is used to separate the construction of a complex object of its representation so that the same construction process can create different representations.
func NewBuilder ¶
func NewBuilder(opts ...Option) *WorkflowBuilder
NewBuilder returns a new instance of the WorkflowBuilder struct. The builder can be used to construct a Workflow struct with specific options.
func (*WorkflowBuilder) Build ¶
func (b *WorkflowBuilder) Build() *api.Workflow
Build returns a new Workflow struct based on the current configuration of the WorkflowBuilder.
func (*WorkflowBuilder) Entities ¶
func (b *WorkflowBuilder) Entities(entities ...Entity) *WorkflowBuilder
Entities adds a slice of Entity interface implementations to Workflow struct.
func (*WorkflowBuilder) Items ¶
func (b *WorkflowBuilder) Items(items map[string][]byte) *WorkflowBuilder
Items adds a map of key-value pairs to the Workflow struct.
func (*WorkflowBuilder) Steps ¶
func (b *WorkflowBuilder) Steps(steps ...*api.WorkflowStep) *WorkflowBuilder
Steps adds a slice of Step interface implementations to the Workflow struct.
type WorkflowWatcher ¶
type WorkflowWatcher interface {
Next() (*api.WorkflowWatchResult, error)
}