Documentation ¶
Index ¶
- Variables
- func EnqueueChainWorkflow(ctx context.Context, m *cereal.Manager, workflowName cereal.WorkflowName, ...) error
- func EnqueueParallelWorkflow(ctx context.Context, m *cereal.Manager, workflowName cereal.WorkflowName, ...) error
- func RegisterSingleTaskWorkflowExecutor(mgr *cereal.Manager, workflowName cereal.WorkflowName, allowCancel bool, ...) error
- type ChainWorkflowExecutor
- func (m *ChainWorkflowExecutor) OnCancel(w cereal.WorkflowInstance, ev cereal.CancelEvent) cereal.Decision
- func (m *ChainWorkflowExecutor) OnStart(w cereal.WorkflowInstance, ev cereal.StartEvent) cereal.Decision
- func (m *ChainWorkflowExecutor) OnTaskComplete(w cereal.WorkflowInstance, ev cereal.TaskCompleteEvent) cereal.Decision
- type ChainWorkflowInstance
- func (instance *ChainWorkflowInstance) Err() error
- func (instance *ChainWorkflowInstance) GetParameters() (*ChainWorkflowParams, error)
- func (instance *ChainWorkflowInstance) GetPayload() (*ChainWorkflowPayload, error)
- func (instance *ChainWorkflowInstance) GetResult() (*ChainWorkflowPayload, error)
- func (instance *ChainWorkflowInstance) GetSubWorkflow(idx int) (cereal.ImmutableWorkflowInstance, error)
- func (instance *ChainWorkflowInstance) IsCancelled() bool
- func (instance *ChainWorkflowInstance) IsRunning() bool
- func (instance *ChainWorkflowInstance) WithParams(params *ChainWorkflowParams)
- func (instance *ChainWorkflowInstance) WithPayload(payload *ChainWorkflowPayload)
- func (instance *ChainWorkflowInstance) WithResult(result *ChainWorkflowPayload)
- type ChainWorkflowParams
- type ChainWorkflowPayload
- type ChainWorkflowTaskParam
- type ParallelWorkflowExecutor
- func (m *ParallelWorkflowExecutor) OnCancel(w cereal.WorkflowInstance, ev cereal.CancelEvent) cereal.Decision
- func (m *ParallelWorkflowExecutor) OnStart(w cereal.WorkflowInstance, ev cereal.StartEvent) cereal.Decision
- func (m *ParallelWorkflowExecutor) OnTaskComplete(w cereal.WorkflowInstance, ev cereal.TaskCompleteEvent) cereal.Decision
- type ParallelWorkflowExecutorFor
- type ParallelWorkflowInstance
- func (instance *ParallelWorkflowInstance) Err() error
- func (instance *ParallelWorkflowInstance) GetParameters() (*ParallelWorkflowParams, error)
- func (instance *ParallelWorkflowInstance) GetPayload() (*ParallelWorkflowPayload, error)
- func (instance *ParallelWorkflowInstance) GetResult() (*ParallelWorkflowPayload, error)
- func (instance *ParallelWorkflowInstance) GetSubWorkflow(workflowName string) (cereal.ImmutableWorkflowInstance, error)
- func (instance *ParallelWorkflowInstance) IsRunning() bool
- func (instance *ParallelWorkflowInstance) ListSubWorkflows() []string
- type ParallelWorkflowParams
- type ParallelWorkflowPayload
- type ParallelWorkflowTaskParam
- type SingleTaskWorkflowExecutor
- func (s *SingleTaskWorkflowExecutor) OnCancel(w cereal.WorkflowInstance, ev cereal.CancelEvent) cereal.Decision
- func (s *SingleTaskWorkflowExecutor) OnStart(w cereal.WorkflowInstance, ev cereal.StartEvent) cereal.Decision
- func (s *SingleTaskWorkflowExecutor) OnTaskComplete(w cereal.WorkflowInstance, ev cereal.TaskCompleteEvent) cereal.Decision
- type WorkflowState
Constants ¶
This section is empty.
Variables ¶
var ErrCannotMergeTypes = errors.New("Cannot merge types")
var ErrIncorrectParameters = errors.New("Incorrect number of parameters given")
var ErrTaskWorkflowInvalid = errors.New("Could not determine workflow for task")
Functions ¶
func EnqueueChainWorkflow ¶
func EnqueueParallelWorkflow ¶
func RegisterSingleTaskWorkflowExecutor ¶
func RegisterSingleTaskWorkflowExecutor(mgr *cereal.Manager, workflowName cereal.WorkflowName, allowCancel bool, executor cereal.TaskExecutor, opts cereal.TaskExecutorOpts) error
Types ¶
type ChainWorkflowExecutor ¶
type ChainWorkflowExecutor struct {
// contains filtered or unexported fields
}
func NewChainWorkflowExecutor ¶
func NewChainWorkflowExecutor(executors ...cereal.WorkflowExecutor) (*ChainWorkflowExecutor, error)
func (*ChainWorkflowExecutor) OnCancel ¶
func (m *ChainWorkflowExecutor) OnCancel(w cereal.WorkflowInstance, ev cereal.CancelEvent) cereal.Decision
func (*ChainWorkflowExecutor) OnStart ¶
func (m *ChainWorkflowExecutor) OnStart(w cereal.WorkflowInstance, ev cereal.StartEvent) cereal.Decision
func (*ChainWorkflowExecutor) OnTaskComplete ¶
func (m *ChainWorkflowExecutor) OnTaskComplete(w cereal.WorkflowInstance, ev cereal.TaskCompleteEvent) cereal.Decision
type ChainWorkflowInstance ¶
type ChainWorkflowInstance struct {
// contains filtered or unexported fields
}
func GetChainWorkflowInstance ¶
func GetChainWorkflowInstance(ctx context.Context, m *cereal.Manager, workflowName cereal.WorkflowName, instanceName string) (*ChainWorkflowInstance, error)
func ToChainWorkflowInstance ¶
func ToChainWorkflowInstance(instance cereal.ImmutableWorkflowInstance) (*ChainWorkflowInstance, error)
func (*ChainWorkflowInstance) Err ¶
func (instance *ChainWorkflowInstance) Err() error
func (*ChainWorkflowInstance) GetParameters ¶
func (instance *ChainWorkflowInstance) GetParameters() (*ChainWorkflowParams, error)
func (*ChainWorkflowInstance) GetPayload ¶
func (instance *ChainWorkflowInstance) GetPayload() (*ChainWorkflowPayload, error)
func (*ChainWorkflowInstance) GetResult ¶
func (instance *ChainWorkflowInstance) GetResult() (*ChainWorkflowPayload, error)
func (*ChainWorkflowInstance) GetSubWorkflow ¶
func (instance *ChainWorkflowInstance) GetSubWorkflow(idx int) (cereal.ImmutableWorkflowInstance, error)
func (*ChainWorkflowInstance) IsCancelled ¶
func (instance *ChainWorkflowInstance) IsCancelled() bool
func (*ChainWorkflowInstance) IsRunning ¶
func (instance *ChainWorkflowInstance) IsRunning() bool
func (*ChainWorkflowInstance) WithParams ¶
func (instance *ChainWorkflowInstance) WithParams(params *ChainWorkflowParams)
func (*ChainWorkflowInstance) WithPayload ¶
func (instance *ChainWorkflowInstance) WithPayload(payload *ChainWorkflowPayload)
func (*ChainWorkflowInstance) WithResult ¶
func (instance *ChainWorkflowInstance) WithResult(result *ChainWorkflowPayload)
type ChainWorkflowParams ¶
type ChainWorkflowParams struct {
WorkflowParams []json.RawMessage
}
ChainWorkflowParams are the parameters the chain workflow expects. It contains an array of parameters, one for each of the subworkflows in the chain.
func ToChainWorkflowParameters ¶
func ToChainWorkflowParameters(parameters []interface{}) (ChainWorkflowParams, error)
type ChainWorkflowPayload ¶
type ChainWorkflowPayload struct { Cancelled bool State []WorkflowState }
ChainWorkflowPayload is the state of the chain workflow. It contains the state (payload, result, err, etc) of each subworkflow in the chain.
func (*ChainWorkflowPayload) Finished ¶
func (p *ChainWorkflowPayload) Finished() bool
func (*ChainWorkflowPayload) IsValid ¶
func (p *ChainWorkflowPayload) IsValid() bool
type ChainWorkflowTaskParam ¶
type ChainWorkflowTaskParam struct {
XXX_ChainWorkflowIdx int64 `json:"__idx"`
}
ChainWorkflowTaskParam is a struct that is appended to any enqueued tasks. This allows us to know which subworkflow to call when the task completes. This also drives the requirement that the task parameters are either a struct or nil.
type ParallelWorkflowExecutor ¶
type ParallelWorkflowExecutor struct {
// contains filtered or unexported fields
}
func NewParallelWorkflowExecutor ¶
func NewParallelWorkflowExecutor(executorForFunc ParallelWorkflowExecutorFor) *ParallelWorkflowExecutor
NewParallelWorkflowExecutor creates a parallel workflow executor. The executorForFunc is used to lookup the workflow executor for a given subworkflow
func (*ParallelWorkflowExecutor) OnCancel ¶
func (m *ParallelWorkflowExecutor) OnCancel(w cereal.WorkflowInstance, ev cereal.CancelEvent) cereal.Decision
func (*ParallelWorkflowExecutor) OnStart ¶
func (m *ParallelWorkflowExecutor) OnStart(w cereal.WorkflowInstance, ev cereal.StartEvent) cereal.Decision
func (*ParallelWorkflowExecutor) OnTaskComplete ¶
func (m *ParallelWorkflowExecutor) OnTaskComplete(w cereal.WorkflowInstance, ev cereal.TaskCompleteEvent) cereal.Decision
type ParallelWorkflowExecutorFor ¶
type ParallelWorkflowExecutorFor func(subworkflow string) (cereal.WorkflowExecutor, bool)
ParallelWorkflowExecutorFor is a function used to look up the workflow executor for a given subworkflow
type ParallelWorkflowInstance ¶
type ParallelWorkflowInstance struct {
// contains filtered or unexported fields
}
func GetWorkflowInstance ¶
func GetWorkflowInstance(ctx context.Context, m *cereal.Manager, workflowName cereal.WorkflowName, instanceName string) (*ParallelWorkflowInstance, error)
func ToParallelWorkflowInstance ¶
func ToParallelWorkflowInstance(instance cereal.ImmutableWorkflowInstance) (*ParallelWorkflowInstance, error)
func (*ParallelWorkflowInstance) Err ¶
func (instance *ParallelWorkflowInstance) Err() error
func (*ParallelWorkflowInstance) GetParameters ¶
func (instance *ParallelWorkflowInstance) GetParameters() (*ParallelWorkflowParams, error)
func (*ParallelWorkflowInstance) GetPayload ¶
func (instance *ParallelWorkflowInstance) GetPayload() (*ParallelWorkflowPayload, error)
func (*ParallelWorkflowInstance) GetResult ¶
func (instance *ParallelWorkflowInstance) GetResult() (*ParallelWorkflowPayload, error)
func (*ParallelWorkflowInstance) GetSubWorkflow ¶
func (instance *ParallelWorkflowInstance) GetSubWorkflow(workflowName string) (cereal.ImmutableWorkflowInstance, error)
func (*ParallelWorkflowInstance) IsRunning ¶
func (instance *ParallelWorkflowInstance) IsRunning() bool
func (*ParallelWorkflowInstance) ListSubWorkflows ¶
func (instance *ParallelWorkflowInstance) ListSubWorkflows() []string
type ParallelWorkflowParams ¶
type ParallelWorkflowParams struct { SubworkflowKeys []string WorkflowParams map[string]json.RawMessage }
ParallelWorkflowParams are the parameters for the parallel workflow. It consists of the names of the subworkflows and their parameters.
func ToParallelWorkflowParameters ¶
func ToParallelWorkflowParameters(subworkflows []string, parameters map[string]interface{}) (ParallelWorkflowParams, error)
type ParallelWorkflowPayload ¶
type ParallelWorkflowPayload struct {
State map[string]WorkflowState
}
ParallelWorkflowPayload keeps track of the state of all the subworkflows and is the payload and result of the parallel workflow
func (ParallelWorkflowPayload) Finished ¶
func (p ParallelWorkflowPayload) Finished() bool
type ParallelWorkflowTaskParam ¶
type ParallelWorkflowTaskParam struct {
XXX_ParallelWorkflowKey string `json:"__key"`
}
ParallelWorkflowTaskParam is metadata attached to any enqueued tasks. This allows the parallel workflow to figure out which subworkflow to deliver the task completed event to.
type SingleTaskWorkflowExecutor ¶
type SingleTaskWorkflowExecutor struct {
// contains filtered or unexported fields
}
A SingleTaskWorkflowExecutor handles running a workflow composed of a single task. The workflow enqueues the task and startup. The workflow fails if the task fails.
func NewSingleTaskWorkflowExecutor ¶
func NewSingleTaskWorkflowExecutor(taskName cereal.TaskName, allowCancel bool) *SingleTaskWorkflowExecutor
func (*SingleTaskWorkflowExecutor) OnCancel ¶
func (s *SingleTaskWorkflowExecutor) OnCancel(w cereal.WorkflowInstance, ev cereal.CancelEvent) cereal.Decision
func (*SingleTaskWorkflowExecutor) OnStart ¶
func (s *SingleTaskWorkflowExecutor) OnStart(w cereal.WorkflowInstance, ev cereal.StartEvent) cereal.Decision
func (*SingleTaskWorkflowExecutor) OnTaskComplete ¶
func (s *SingleTaskWorkflowExecutor) OnTaskComplete(w cereal.WorkflowInstance, ev cereal.TaskCompleteEvent) cereal.Decision
type WorkflowState ¶
type WorkflowState struct { Payload json.RawMessage Result json.RawMessage Err string EnqueuedTasks int CompletedTasks int IsFinished bool }