Documentation ¶
Index ¶
- Constants
- Variables
- func Delay(until time.Time) *delayed
- func ErrorHandler(h Step, results *expr.Vars) *errHandler
- func ExclGateway(pp ...*GatewayPath) (*exclGateway, error)
- func ForkGateway() *forkGateway
- func GenericResourceNextCheck(useLimit bool, ptr, buffSize, total, limit uint, hasMore bool) bool
- func GetContextCallStack(ctx context.Context) []uint64
- func InclGateway(pp ...*GatewayPath) (*inclGateway, error)
- func JoinGateway(ss ...Step) *joinGateway
- func LoopBreak() *loopBreak
- func LoopContinue() *loopContinue
- func NewGenericStep(fn execFn) *genericStep
- func Prompt(ownerId uint64, ref string, payload *expr.Vars) *prompted
- func Resume() *resumed
- func SetContextCallStack(ctx context.Context, ss []uint64) context.Context
- func Termination() *termination
- type ExecRequest
- type ExecResponse
- type Frame
- type GatewayPath
- type GatewayPaths
- type Graph
- func (g *Graph) AddParent(c, p Step)
- func (g *Graph) AddStep(s Step, cc ...Step)
- func (g *Graph) Children(s Step) Steps
- func (g *Graph) Exec(context.Context, *ExecRequest) (ExecResponse, error)
- func (g *Graph) Len() int
- func (g *Graph) Orphans() (oo Steps)
- func (g *Graph) Parents(s Step) Steps
- func (g *Graph) StepByID(ID uint64) Step
- type Iterator
- type IteratorHandler
- type PendingPrompt
- type ResultEvaluator
- type ResumedPrompt
- type Session
- func (s *Session) AllPendingPrompts() (out []*PendingPrompt)
- func (s *Session) Cancel()
- func (s *Session) Delayed() bool
- func (s *Session) Error() error
- func (s *Session) Exec(ctx context.Context, step Step, scope *expr.Vars) error
- func (s *Session) ID() uint64
- func (s *Session) Idle() bool
- func (s *Session) Prompted() bool
- func (s *Session) Result() *expr.Vars
- func (s *Session) Resume(ctx context.Context, stateId uint64, input *expr.Vars) (*ResumedPrompt, error)
- func (s *Session) Status() SessionStatus
- func (s *Session) Stop()
- func (s *Session) Suspended() bool
- func (s *Session) UnsentPendingPrompts() (out []*PendingPrompt)
- func (s *Session) UserPendingPrompts(ownerId uint64) (out []*PendingPrompt)
- func (s *Session) Wait(ctx context.Context) error
- func (s *Session) WaitUntil(ctx context.Context, expected ...SessionStatus) error
- type SessionOpt
- func SetCallStack(id ...uint64) SessionOpt
- func SetDumpStacktraceOnPanic(dump bool) SessionOpt
- func SetHandler(fn StateChangeHandler) SessionOpt
- func SetLogger(log *zap.Logger) SessionOpt
- func SetWorkerIntervalSuspended(i time.Duration) SessionOpt
- func SetWorkerIntervalWaiter(i time.Duration) SessionOpt
- func SetWorkflowID(workflowID uint64) SessionOpt
- type SessionStatus
- type State
- type StateChangeHandler
- type Step
- type StepIdentifier
- type Steps
Constants ¶
const (
DefaultMaxIteratorBufferSize uint = 1000
)
Variables ¶
var (
MaxIteratorBufferSize uint = DefaultMaxIteratorBufferSize
)
Functions ¶
func ErrorHandler ¶
func ExclGateway ¶
func ExclGateway(pp ...*GatewayPath) (*exclGateway, error)
ExclGateway fn initializes exclusive gateway
func ForkGateway ¶
func ForkGateway() *forkGateway
ForkGateway fn initializes fork gateway No arguments are required; Graph Graph config is used to determine all possible fork paths on the fly
func GetContextCallStack ¶
func InclGateway ¶
func InclGateway(pp ...*GatewayPath) (*inclGateway, error)
InclGateway fn initializes inclusive gateway
func JoinGateway ¶
func JoinGateway(ss ...Step) *joinGateway
JoinGateway fn initializes join gateway with all paths that are expected to be partial
func LoopContinue ¶
func LoopContinue() *loopContinue
func NewGenericStep ¶
func NewGenericStep(fn execFn) *genericStep
func SetContextCallStack ¶
func Termination ¶
func Termination() *termination
Types ¶
type ExecRequest ¶
type ExecRequest struct { SessionID uint64 StateID uint64 // Current input received on session resume Input *expr.Vars // Current scope Scope *expr.Vars // Helps with gateway join/merge steps // that needs info about the step it's currently merging Parent Step }
ExecRequest is passed to Exec() functions and contains all information for step execution
type ExecResponse ¶
type ExecResponse interface{}
type Frame ¶
type Frame struct { CreatedAt time.Time `json:"createdAt"` SessionID uint64 `json:"sessionID"` StateID uint64 `json:"stateID"` Input *expr.Vars `json:"input"` Scope *expr.Vars `json:"scope"` Results *expr.Vars `json:"results"` ParentID uint64 `json:"parentID"` StepID uint64 `json:"stepID"` NextSteps []uint64 `json:"nextSteps"` // How much time from the 1st step to the start of this step in milliseconds ElapsedTime uint `json:"elapsedTime"` // How much time it took to execute this step in milliseconds StepTime uint `json:"stepTime"` Action string `json:"action,omitempty"` Error string `json:"error,omitempty"` }
type GatewayPath ¶
type GatewayPath struct {
// contains filtered or unexported fields
}
GatewayPath structure is used by subset of gateway nodes
It allows to evaluate test Expression to help determine the gateway if a certain path should be used or not
func NewGatewayPath ¶
func NewGatewayPath(s Step, t pathTester) (gwp *GatewayPath, err error)
NewGatewayPath validates Expression and returns initialized GatewayPath
type GatewayPaths ¶
type GatewayPaths []*GatewayPath
GatewayPath structure is used by subset of gateway nodes
It allows to evaluate test Expression to help determine the gateway if a certain path should be used or not
type Graph ¶
type Graph struct {
// contains filtered or unexported fields
}
list of Graph steps with relations
func (*Graph) Exec ¶
func (g *Graph) Exec(context.Context, *ExecRequest) (ExecResponse, error)
type Iterator ¶
type Iterator interface { // Is the given step this iterator step Is(Step) bool // Initialize iterator Start(context.Context, *expr.Vars) error // Break fn is called when loop is forcefully broken Break() Step Iterator() Step // Next is called before each iteration and returns // 1st step of the iteration branch and variables that are added to the scope Next(context.Context, *expr.Vars) (Step, *expr.Vars, error) }
Iterator can be returned from Exec fn as ExecResponse
It helps session's exec fn() to properly navigate through graph by calling is/break/iterator/next function
func GenericIterator ¶
func GenericIterator(iter, next, exit Step, h IteratorHandler) Iterator
GenericIterator creates a wrapper around IteratorHandler and returns genericIterator that implements Iterator interface
type IteratorHandler ¶
type PendingPrompt ¶
type ResultEvaluator ¶
type ResumedPrompt ¶
type Session ¶
type Session struct {
// contains filtered or unexported fields
}
func NewSession ¶
func NewSession(ctx context.Context, g *Graph, oo ...SessionOpt) *Session
func (*Session) AllPendingPrompts ¶
func (s *Session) AllPendingPrompts() (out []*PendingPrompt)
AllPendingPrompts returns all pending prompts for all user
func (*Session) Status ¶
func (s *Session) Status() SessionStatus
func (*Session) Suspended ¶
Suspended returns true if the workflow has either delayed or prompted steps
func (*Session) UnsentPendingPrompts ¶
func (s *Session) UnsentPendingPrompts() (out []*PendingPrompt)
UnsentPendingPrompts returns unsent pending prompts for all user
func (*Session) UserPendingPrompts ¶
func (s *Session) UserPendingPrompts(ownerId uint64) (out []*PendingPrompt)
UserPendingPrompts prompts fn returns all owner's pending prompts on this session
type SessionOpt ¶
type SessionOpt func(*Session)
func SetCallStack ¶
func SetCallStack(id ...uint64) SessionOpt
func SetDumpStacktraceOnPanic ¶
func SetDumpStacktraceOnPanic(dump bool) SessionOpt
func SetHandler ¶
func SetHandler(fn StateChangeHandler) SessionOpt
func SetLogger ¶
func SetLogger(log *zap.Logger) SessionOpt
func SetWorkerIntervalSuspended ¶
func SetWorkerIntervalSuspended(i time.Duration) SessionOpt
func SetWorkerIntervalWaiter ¶
func SetWorkerIntervalWaiter(i time.Duration) SessionOpt
func SetWorkflowID ¶
func SetWorkflowID(workflowID uint64) SessionOpt
type SessionStatus ¶
type SessionStatus int
const ( SessionActive SessionStatus = iota SessionPrompted SessionDelayed SessionFailed SessionCompleted SessionCanceled )
func (SessionStatus) String ¶
func (s SessionStatus) String() string
type State ¶
type State struct {
// contains filtered or unexported fields
}
state holds information about Session ID
func (State) MakeRequest ¶
func (s State) MakeRequest() *ExecRequest
type StateChangeHandler ¶
type StateChangeHandler func(SessionStatus, *State, *Session) error
type Step ¶
type Step interface { ID() uint64 SetID(uint64) Exec(context.Context, *ExecRequest) (ExecResponse, error) }
type StepIdentifier ¶
type StepIdentifier struct {
// contains filtered or unexported fields
}
func (*StepIdentifier) ID ¶
func (i *StepIdentifier) ID() uint64
func (*StepIdentifier) SetID ¶
func (i *StepIdentifier) SetID(id uint64)