patterns

package
v0.0.0-...-d288915 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 27, 2024 License: Apache-2.0 Imports: 8 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrCannotMergeTypes = errors.New("Cannot merge types")
View Source
var ErrIncorrectParameters = errors.New("Incorrect number of parameters given")
View Source
var ErrTaskWorkflowInvalid = errors.New("Could not determine workflow for task")

Functions

func EnqueueChainWorkflow

func EnqueueChainWorkflow(ctx context.Context, m *cereal.Manager, workflowName cereal.WorkflowName, instanceName string, parameters []interface{}) error

func EnqueueParallelWorkflow

func EnqueueParallelWorkflow(ctx context.Context, m *cereal.Manager, workflowName cereal.WorkflowName, instanceName string, subworkflows []string, parameters map[string]interface{}) error

func RegisterSingleTaskWorkflowExecutor

func RegisterSingleTaskWorkflowExecutor(mgr *cereal.Manager, workflowName cereal.WorkflowName,
	allowCancel bool, executor cereal.TaskExecutor, opts cereal.TaskExecutorOpts) error

Types

type ChainWorkflowExecutor

type ChainWorkflowExecutor struct {
	// contains filtered or unexported fields
}

func NewChainWorkflowExecutor

func NewChainWorkflowExecutor(executors ...cereal.WorkflowExecutor) (*ChainWorkflowExecutor, error)

func (*ChainWorkflowExecutor) OnCancel

func (*ChainWorkflowExecutor) OnStart

func (*ChainWorkflowExecutor) OnTaskComplete

type ChainWorkflowInstance

type ChainWorkflowInstance struct {
	// contains filtered or unexported fields
}

func GetChainWorkflowInstance

func GetChainWorkflowInstance(ctx context.Context, m *cereal.Manager, workflowName cereal.WorkflowName, instanceName string) (*ChainWorkflowInstance, error)

func ToChainWorkflowInstance

func ToChainWorkflowInstance(instance cereal.ImmutableWorkflowInstance) (*ChainWorkflowInstance, error)

func (*ChainWorkflowInstance) Err

func (instance *ChainWorkflowInstance) Err() error

func (*ChainWorkflowInstance) GetParameters

func (instance *ChainWorkflowInstance) GetParameters() (*ChainWorkflowParams, error)

func (*ChainWorkflowInstance) GetPayload

func (instance *ChainWorkflowInstance) GetPayload() (*ChainWorkflowPayload, error)

func (*ChainWorkflowInstance) GetResult

func (instance *ChainWorkflowInstance) GetResult() (*ChainWorkflowPayload, error)

func (*ChainWorkflowInstance) GetSubWorkflow

func (instance *ChainWorkflowInstance) GetSubWorkflow(idx int) (cereal.ImmutableWorkflowInstance, error)

func (*ChainWorkflowInstance) IsCancelled

func (instance *ChainWorkflowInstance) IsCancelled() bool

func (*ChainWorkflowInstance) IsRunning

func (instance *ChainWorkflowInstance) IsRunning() bool

func (*ChainWorkflowInstance) WithParams

func (instance *ChainWorkflowInstance) WithParams(params *ChainWorkflowParams)

func (*ChainWorkflowInstance) WithPayload

func (instance *ChainWorkflowInstance) WithPayload(payload *ChainWorkflowPayload)

func (*ChainWorkflowInstance) WithResult

func (instance *ChainWorkflowInstance) WithResult(result *ChainWorkflowPayload)

type ChainWorkflowParams

type ChainWorkflowParams struct {
	WorkflowParams []json.RawMessage
}

ChainWorkflowParams are the parameters the chain workflow expects. It contains an array of parameters, one for each of the subworkflows in the chain.

func ToChainWorkflowParameters

func ToChainWorkflowParameters(parameters []interface{}) (ChainWorkflowParams, error)

type ChainWorkflowPayload

type ChainWorkflowPayload struct {
	Cancelled bool
	State     []WorkflowState
}

ChainWorkflowPayload is the state of the chain workflow. It contains the state (payload, result, err, etc) of each subworkflow in the chain.

func (*ChainWorkflowPayload) Finished

func (p *ChainWorkflowPayload) Finished() bool

func (*ChainWorkflowPayload) IsValid

func (p *ChainWorkflowPayload) IsValid() bool

type ChainWorkflowTaskParam

type ChainWorkflowTaskParam struct {
	XXX_ChainWorkflowIdx int64 `json:"__idx"`
}

ChainWorkflowTaskParam is a struct that is appended to any enqueued tasks. This allows us to know which subworkflow to call when the task completes. This also drives the requirement that the task parameters are either a struct or nil.

type ParallelWorkflowExecutor

type ParallelWorkflowExecutor struct {
	// contains filtered or unexported fields
}

func NewParallelWorkflowExecutor

func NewParallelWorkflowExecutor(executorForFunc ParallelWorkflowExecutorFor) *ParallelWorkflowExecutor

NewParallelWorkflowExecutor creates a parallel workflow executor. The executorForFunc is used to lookup the workflow executor for a given subworkflow

func (*ParallelWorkflowExecutor) OnCancel

func (*ParallelWorkflowExecutor) OnStart

func (*ParallelWorkflowExecutor) OnTaskComplete

type ParallelWorkflowExecutorFor

type ParallelWorkflowExecutorFor func(subworkflow string) (cereal.WorkflowExecutor, bool)

ParallelWorkflowExecutorFor is a function used to look up the workflow executor for a given subworkflow

type ParallelWorkflowInstance

type ParallelWorkflowInstance struct {
	// contains filtered or unexported fields
}

func GetWorkflowInstance

func GetWorkflowInstance(ctx context.Context, m *cereal.Manager, workflowName cereal.WorkflowName, instanceName string) (*ParallelWorkflowInstance, error)

func (*ParallelWorkflowInstance) Err

func (instance *ParallelWorkflowInstance) Err() error

func (*ParallelWorkflowInstance) GetParameters

func (instance *ParallelWorkflowInstance) GetParameters() (*ParallelWorkflowParams, error)

func (*ParallelWorkflowInstance) GetPayload

func (instance *ParallelWorkflowInstance) GetPayload() (*ParallelWorkflowPayload, error)

func (*ParallelWorkflowInstance) GetResult

func (instance *ParallelWorkflowInstance) GetResult() (*ParallelWorkflowPayload, error)

func (*ParallelWorkflowInstance) GetSubWorkflow

func (instance *ParallelWorkflowInstance) GetSubWorkflow(workflowName string) (cereal.ImmutableWorkflowInstance, error)

func (*ParallelWorkflowInstance) IsRunning

func (instance *ParallelWorkflowInstance) IsRunning() bool

func (*ParallelWorkflowInstance) ListSubWorkflows

func (instance *ParallelWorkflowInstance) ListSubWorkflows() []string

type ParallelWorkflowParams

type ParallelWorkflowParams struct {
	SubworkflowKeys []string
	WorkflowParams  map[string]json.RawMessage
}

ParallelWorkflowParams are the parameters for the parallel workflow. It consists of the names of the subworkflows and their parameters.

func ToParallelWorkflowParameters

func ToParallelWorkflowParameters(subworkflows []string, parameters map[string]interface{}) (ParallelWorkflowParams, error)

type ParallelWorkflowPayload

type ParallelWorkflowPayload struct {
	State map[string]WorkflowState
}

ParallelWorkflowPayload keeps track of the state of all the subworkflows and is the payload and result of the parallel workflow

func (ParallelWorkflowPayload) Finished

func (p ParallelWorkflowPayload) Finished() bool

type ParallelWorkflowTaskParam

type ParallelWorkflowTaskParam struct {
	XXX_ParallelWorkflowKey string `json:"__key"`
}

ParallelWorkflowTaskParam is metadata attached to any enqueued tasks. This allows the parallel workflow to figure out which subworkflow to deliver the task completed event to.

type SingleTaskWorkflowExecutor

type SingleTaskWorkflowExecutor struct {
	// contains filtered or unexported fields
}

A SingleTaskWorkflowExecutor handles running a workflow composed of a single task. The workflow enqueues the task and startup. The workflow fails if the task fails.

func NewSingleTaskWorkflowExecutor

func NewSingleTaskWorkflowExecutor(taskName cereal.TaskName, allowCancel bool) *SingleTaskWorkflowExecutor

func (*SingleTaskWorkflowExecutor) OnCancel

func (*SingleTaskWorkflowExecutor) OnStart

func (*SingleTaskWorkflowExecutor) OnTaskComplete

type WorkflowState

type WorkflowState struct {
	Payload        json.RawMessage
	Result         json.RawMessage
	Err            string
	EnqueuedTasks  int
	CompletedTasks int
	IsFinished     bool
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL