Documentation
¶
Index ¶
- Constants
- func HandleCallback(ctx context.Context, req CallbackRequest, wf WorkflowState, s *State, ...) (interface{}, error)
- func Resume(ctx context.Context, wf WorkflowState, s *State, save Checkpoint) error
- func Validate(s Section) error
- func Walk(s Stmt, f func(s Stmt) bool) (bool, error)
- type BreakStmt
- type CallbackRequest
- type Checkpoint
- type CheckpointType
- type ContinueStmt
- type Empty
- type ErrExec
- type Event
- type ForStmt
- type GoStmt
- type Handler
- type ReflectHandler
- func (h *ReflectHandler) Handle(ctx context.Context, req CallbackRequest, input interface{}) (interface{}, error)
- func (h ReflectHandler) MarshalJSON() ([]byte, error)
- func (h ReflectHandler) Schemas() (in *jsonschema.Schema, out *jsonschema.Schema, err error)
- func (t *ReflectHandler) Setup(ctx context.Context, req CallbackRequest) (string, error)
- func (t *ReflectHandler) Teardown(ctx context.Context, req CallbackRequest, handled bool) error
- func (h *ReflectHandler) Type() string
- type ReturnStmt
- type Section
- type State
- type Stmt
- type StmtStep
- type Stop
- type SwitchCase
- type SwitchStmt
- type Thread
- type ThreadStatus
- type Threads
- type WG
- type WaitCondStmt
- type WaitEvent
- type WaitEventStatus
- type WaitEventsStmt
- type WorkflowState
- type WorkflowStatus
Constants ¶
const HandlerTypeReflect = "reflect"
const MainThread = "_main_"
Variables ¶
This section is empty.
Functions ¶
func HandleCallback ¶ added in v0.9.0
func HandleCallback(ctx context.Context, req CallbackRequest, wf WorkflowState, s *State, input interface{}) (interface{}, error)
Handle incoming event. This func will only execute callback handler and update the state After this function completes successfully - workflow should be resumed using Resume()
func Resume ¶ added in v0.9.0
func Resume(ctx context.Context, wf WorkflowState, s *State, save Checkpoint) error
Resume will continue workflow, executing steps in a process. Resume may fail in the middle of the processing. To avoid data loss and out-of-order duplicated step execution - save() will be called to checkpoint current state of the workflow.
Types ¶
type BreakStmt ¶ added in v0.5.0
type BreakStmt struct { }
func (BreakStmt) MarshalJSON ¶ added in v0.9.9
type CallbackRequest ¶ added in v0.5.0
type Checkpoint ¶ added in v0.7.0
type Checkpoint func(ctx context.Context, t CheckpointType) error
Checkpoint is used to save workflow state while it's being processed You may want to checkpoint/save your workflow only for specific checkpoint types to increase performance and avoid unnecessary saves.
type CheckpointType ¶ added in v0.12.0
type CheckpointType string
Type of checkpoint event
const ( // Step was executed successfully. You probably want to save your workflow after this event CheckpointAfterStep CheckpointType = "afterStep" CheckpointAfterSetup CheckpointType = "afterSetup" CheckpointAfterTeardown CheckpointType = "afterTeardown" CheckpointAfterResume CheckpointType = "afterResume" CheckpointAfterCondition CheckpointType = "afterCondition" )
type ContinueStmt ¶ added in v0.10.0
type ContinueStmt struct { }
func (ContinueStmt) MarshalJSON ¶ added in v0.10.0
func (s ContinueStmt) MarshalJSON() ([]byte, error)
func (ContinueStmt) Resume ¶ added in v0.10.0
func (s ContinueStmt) Resume(ctx *resumeContext) (*Stop, error)
type ErrExec ¶ added in v0.11.0
type ErrExec struct { Step string // contains filtered or unexported fields }
type Event ¶ added in v0.9.9
type Event struct { Callback CallbackRequest Handler Handler Stmt Stmt }
func (Event) MarshalJSON ¶ added in v0.9.9
type ForStmt ¶ added in v0.5.0
func (ForStmt) MarshalJSON ¶ added in v0.9.9
type GoStmt ¶ added in v0.5.0
type GoStmt struct { // ID is needed to identify threads when there are many threads running with the same name. // (for example they were created in a loop) ID func() string `json:"-"` Name string Stmt Stmt }
func (GoStmt) MarshalJSON ¶ added in v0.9.9
type Handler ¶ added in v0.5.0
type Handler interface { Type() string Setup(ctx context.Context, req CallbackRequest) (string, error) Handle(ctx context.Context, req CallbackRequest, input interface{}) (interface{}, error) Teardown(ctx context.Context, req CallbackRequest, handled bool) error }
func FindHandler ¶ added in v0.8.6
func FindHandler(req CallbackRequest, sec Stmt) (Handler, error)
type ReflectHandler ¶ added in v0.13.0
type ReflectHandler struct {
Handler interface{} `json:"-"`
}
This is an example of how to create your custom events
func (*ReflectHandler) Handle ¶ added in v0.13.0
func (h *ReflectHandler) Handle(ctx context.Context, req CallbackRequest, input interface{}) (interface{}, error)
code that will be executed when event is received
func (ReflectHandler) MarshalJSON ¶ added in v0.13.0
func (h ReflectHandler) MarshalJSON() ([]byte, error)
func (ReflectHandler) Schemas ¶ added in v0.13.0
func (h ReflectHandler) Schemas() (in *jsonschema.Schema, out *jsonschema.Schema, err error)
func (*ReflectHandler) Setup ¶ added in v0.13.0
func (t *ReflectHandler) Setup(ctx context.Context, req CallbackRequest) (string, error)
when we will start listening for this event - Setup() will be called for us to setup this event on external services
func (*ReflectHandler) Teardown ¶ added in v0.13.0
func (t *ReflectHandler) Teardown(ctx context.Context, req CallbackRequest, handled bool) error
when we will stop listening for this event - Teardown() will be called for us to remove this event on external services
func (*ReflectHandler) Type ¶ added in v0.13.0
func (h *ReflectHandler) Type() string
type ReturnStmt ¶ added in v0.5.0
type ReturnStmt struct { }
func (ReturnStmt) MarshalJSON ¶ added in v0.9.9
func (s ReturnStmt) MarshalJSON() ([]byte, error)
func (ReturnStmt) Resume ¶ added in v0.5.0
func (s ReturnStmt) Resume(ctx *resumeContext) (*Stop, error)
type Section ¶ added in v0.5.0
type Section []Stmt
Section is similar to code block {} with a list of statements.
type State ¶ added in v0.5.0
type State struct { ID string // id of workflow instance Workflow string // name of workflow definition. Used to choose proper state type to unmarshal & resume on Status WorkflowStatus // current status Threads Threads PC int }
func (*State) UpdateWorkflowsStatus ¶ added in v0.13.1
func (s *State) UpdateWorkflowsStatus()
type Stmt ¶ added in v0.5.0
type Stmt interface { // Resume continues execution of the process, based on resumeContext // If ctx.Running == true Resume should execute the statement or continue execution after. // If ctx.Running = false Resume should not execute the statement, but recursively search for the statement that needs to be resumed. // If it needs to be resumed - don't execute it, but continue execution from this statement. // // If stmt not found *Stop will be nil and ctx.Running will be false // If stmt is found, but process has finished - *Stop will be nil and ctx.Running will be true // Otherwise Resume should always return *Stop or err != nil Resume(ctx *resumeContext) (*Stop, error) }
Stmt is async statement definition that should support workflow resuming & search.
type StmtStep ¶ added in v0.5.0
func (StmtStep) MarshalJSON ¶ added in v0.9.9
type Stop ¶ added in v0.5.0
type Stop struct { Step string // execute step WaitEvents *WaitEventsStmt // wait for Events Return bool // stop this thread Cond string // wait for cond }
Stop tells us that syncronous part of the workflow has finished. It means we either:
type SwitchCase ¶ added in v0.5.0
func Default ¶ added in v0.5.0
func Default(name string, sec Stmt) SwitchCase
func (SwitchCase) MarshalJSON ¶ added in v0.9.9
func (s SwitchCase) MarshalJSON() ([]byte, error)
type SwitchStmt ¶ added in v0.5.0
type SwitchStmt struct {
Cases []SwitchCase
}
func Switch ¶ added in v0.5.0
func Switch(ss ...SwitchCase) *SwitchStmt
execute statements based on condition
func (*SwitchStmt) Else ¶ added in v0.9.5
func (s *SwitchStmt) Else(name string, sec ...Stmt) *SwitchStmt
func (*SwitchStmt) ElseIf ¶ added in v0.9.5
func (s *SwitchStmt) ElseIf(cond bool, name string, sec ...Stmt) *SwitchStmt
func (SwitchStmt) MarshalJSON ¶ added in v0.9.9
func (s SwitchStmt) MarshalJSON() ([]byte, error)
func (*SwitchStmt) Resume ¶ added in v0.5.0
func (s *SwitchStmt) Resume(ctx *resumeContext) (*Stop, error)
type ThreadStatus ¶ added in v0.5.0
type ThreadStatus string
const ( ThreadExecuting ThreadStatus = "Executing" // next step for this thread is to execute "CurStep" step ThreadResuming ThreadStatus = "Resuming" // next step for this thread is to continue from "CurStep" step ThreadWaitingEvent ThreadStatus = "WaitingEvent" // thread is waiting for "CurStep" wait condition and will be resumed via OnCallback() ThreadWaitingCondition ThreadStatus = "WaitingCondition" // thread is waiting for condition to happen. i.e. it's waiting for other thread to update some data )
type WaitCondStmt ¶ added in v0.9.9
type WaitCondStmt struct { Name string Cond bool Handler func(ctx context.Context) `json:"-"` // executed when cond is true. }
func FindWaitingStep ¶ added in v0.9.6
func FindWaitingStep(name string, sec Stmt) (WaitCondStmt, error)
func WaitFor ¶ added in v0.5.0
func WaitFor(label string, cond bool, handler func(ctx context.Context)) WaitCondStmt
Wait statement wait for condition to be true.
func (WaitCondStmt) MarshalJSON ¶ added in v0.9.9
func (s WaitCondStmt) MarshalJSON() ([]byte, error)
func (WaitCondStmt) Resume ¶ added in v0.9.9
func (f WaitCondStmt) Resume(ctx *resumeContext) (*Stop, error)
type WaitEvent ¶ added in v0.5.0
type WaitEvent struct { Type string Req CallbackRequest Status WaitEventStatus Handled bool Error string }
type WaitEventStatus ¶ added in v0.6.0
type WaitEventStatus string
Event statuses are needed to make sure that the process of setting up, tearing down and error handling do not interfere with each other. So if setup or teardown of one event fails - we don't retry other events
const ( EventPendingSetup WaitEventStatus = "PendingSetup" // event is just created EventSetup WaitEventStatus = "Setup" // event was successfully setup EventPendingTeardown WaitEventStatus = "PendingTeardown" // event was successfully setup EventSetupError WaitEventStatus = "SetupError" // there was an error during setup EventTeardownError WaitEventStatus = "TeardownError" // there was an error during teardown )
type WaitEventsStmt ¶ added in v0.9.9
func Wait ¶ added in v0.5.0
func Wait(name string, ss ...Event) WaitEventsStmt
Wait for multiple events exclusively
func (WaitEventsStmt) MarshalJSON ¶ added in v0.9.9
func (s WaitEventsStmt) MarshalJSON() ([]byte, error)
func (WaitEventsStmt) Resume ¶ added in v0.9.9
func (s WaitEventsStmt) Resume(ctx *resumeContext) (*Stop, error)
type WorkflowState ¶ added in v0.5.0
type WorkflowState interface { // Definition func may be called multiple times so it should be idempotent. // All actions should done in callbacks or steps. Definition() Section }
WorkflowState should be a Go struct supporting JSON unmarshalling into it. When process is resumed - current state is unmarshalled into it and then Definition() is called. This is needed to eliminate lasy parameters and conditions i.e. instead of 'If( func() bool { return s.IsAvailable})' we can write 'If(s.IsAvailable)'.
type WorkflowStatus ¶ added in v0.5.0
type WorkflowStatus string
const ( WorkflowResuming WorkflowStatus = "Resuming" // at least 1 thread is not waiting WorkflowWaiting WorkflowStatus = "Waiting" // all threads are waiting WorkflowFinished WorkflowStatus = "Finished" )