workflow

package
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 7, 2024 License: BSD-3-Clause Imports: 26 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Execute

func Execute(name string, gid string, data []byte) error

Execute is the same as ExecuteCtx, but with context.Background Deprecated: use ExecuteCtx instaead

func Execute2

func Execute2(name string, gid string, data []byte) ([]byte, error)

Execute2 is the same as Execute, but workflow func can return result Deprecated: use ExecuteCtx instaead

func ExecuteByQS

func ExecuteByQS(qs url.Values, body []byte) error

ExecuteByQS is like Execute, but name and gid will be obtained from qs Deprecated: use ExecuteCtx instaead

func ExecuteCtx

func ExecuteCtx(ctx context.Context, name string, gid string, data []byte) ([]byte, error)

ExecuteCtx will execute a workflow with the gid and specified params if the workflow with the gid does not exist, then create a new workflow and execute it if the workflow with the gid exists, resume to execute it

func GrpcError2DtmError

func GrpcError2DtmError(err error) error

GrpcError2DtmError translate grpc error to dtm error

func HTTPResp2DtmError

func HTTPResp2DtmError(resp *http.Response) ([]byte, error)

HTTPResp2DtmError check for dtm error and return it

func InitGrpc

func InitGrpc(grpcDtm string, clientHost string, grpcServer *grpc.Server)

InitGrpc will init Workflow engine to use grpc param dtm specify the dtm address param clientHost specify the client host for dtm to callback if a workflow timeout param grpcServer specify the grpc server

func InitHTTP

func InitHTTP(httpDtm string, callback string)

InitHTTP will init Workflow engine to use http param httpDtm specify the dtm address param callback specify the url for dtm to callback if a workflow timeout

func Interceptor

func Interceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error

Interceptor is the middleware for workflow to capture grpc call result

func NewRespBodyFromBytes

func NewRespBodyFromBytes(body []byte) io.ReadCloser

NewRespBodyFromBytes creates an io.ReadCloser from a byte slice that is suitable for use as an http response body.

func Register

func Register(name string, handler WfFunc, custom ...func(wf *Workflow)) error

Register will register a workflow with the specified name

func Register2

func Register2(name string, handler WfFunc2, custom ...func(wf *Workflow)) error

Register2 is the same as Register, but workflow func can return result

func SetProtocolForTest

func SetProtocolForTest(protocol string)

SetProtocolForTest change protocol directly. only used by test

Types

type Options

type Options struct {

	// Default == HTTPResp2DtmError : Code 409 => ErrFailure; Code 425 => ErrOngoing
	HTTPResp2DtmError func(*http.Response) ([]byte, error)

	// Default == GrpcError2DtmError: Code Aborted => ErrFailure; Code FailedPrecondition => ErrOngoing
	GRPCError2DtmError func(error) error

	// This Option specify whether a branch returning ErrFailure should be compensated on rollback.
	// for most idempotent branches, no compensation is needed.
	// But for a timeout request, the caller cannot know where the request is successful, so the compensation should be called
	CompensateErrorBranch bool
}

Options is for specifying workflow options

type WfFunc

type WfFunc func(wf *Workflow, data []byte) error

WfFunc is the type for workflow function

type WfFunc2

type WfFunc2 func(wf *Workflow, data []byte) ([]byte, error)

WfFunc2 is the type for workflow function with return value

type WfPhase2Func

type WfPhase2Func func(bb *dtmcli.BranchBarrier) error

WfPhase2Func is the type for phase 2 function param bb is a BranchBarrier, which is introduced by http://d.dtm.pub/practice/barrier.html

type Workflow

type Workflow struct {
	// The name of the workflow
	Name    string
	Options Options
	*dtmimp.TransBase
	// contains filtered or unexported fields
}

Workflow is the type for a workflow

func (*Workflow) Do

func (wf *Workflow) Do(fn func(bb *dtmcli.BranchBarrier) ([]byte, error)) ([]byte, error)

Do will do an action which will be recored

func (*Workflow) DoXa

func (wf *Workflow) DoXa(dbConf dtmcli.DBConf, fn func(db *sql.DB) ([]byte, error)) ([]byte, error)

DoXa will begin a local xa transaction after the return of workflow function, xa commit/rollback will be called

func (*Workflow) NewBranch

func (wf *Workflow) NewBranch() *Workflow

NewBranch will start a new branch transaction

func (*Workflow) NewBranchCtx

func (wf *Workflow) NewBranchCtx() context.Context

NewBranchCtx will call NewBranch and return a workflow context

func (*Workflow) NewRequest

func (wf *Workflow) NewRequest() *resty.Request

NewRequest return a new resty request, whose progress will be recorded

func (*Workflow) OnCommit

func (wf *Workflow) OnCommit(fn WfPhase2Func) *Workflow

OnCommit will will set the callback for current branch when commit happen. If you are writing a tcc transaction, then you should write the confirm operation here

func (*Workflow) OnFinish

func (wf *Workflow) OnFinish(fn func(bb *dtmcli.BranchBarrier, isRollback bool) error) *Workflow

OnFinish will both set the callback for OnCommit and OnRollback

func (*Workflow) OnRollback

func (wf *Workflow) OnRollback(compensate WfPhase2Func) *Workflow

OnRollback will set the callback for current branch when rollback happen. If you are writing a saga transaction, then you should write the compensation here If you are writing a tcc transaction, then you should write the cancel operation here

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL