workflow

package
v1.1.1171 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 14, 2024 License: MIT Imports: 46 Imported by: 0

Documentation

Index

Constants

View Source
const (
	AbortTypeActivity    = iota // AbortTypeActivity signifies an activity is being aborted
	AbortTypeServiceTask = iota // AbortTypeServiceTask signifies a service task is being aborted
)

Variables

This section is empty.

Functions

func NoOpServiceTaskConsumerFn

func NoOpServiceTaskConsumerFn(_ context.Context, _ string) error

NoOpServiceTaskConsumerFn no op service task consumer fn

func NoOpWorkFlowProcessMappingFn

func NoOpWorkFlowProcessMappingFn(_ context.Context, _ *model.Workflow, _ *model.Process) (uint64, error)

NoOpWorkFlowProcessMappingFn no op workflow to process mapping fn

func WithEmbargo

func WithEmbargo(embargo int) *publishEmbargoOption

WithEmbargo allows the specification of an embargo time on a workflow state message

func WithHeaders

func WithHeaders(headers map[string]string) *publishHeadersOption

WithHeaders allows the addition of extra headers to a workflow state message

Types

type AbortType

type AbortType int

AbortType represents the type of termination being handled by the abort function

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine contains the workflow processing functions

func New

func New(natsService *natz.NatsService, operations *Operations, options *option.ServerOptions) (*Engine, error)

New returns an instance of the core workflow engine.

func (*Engine) Compensate

func (s *Engine) Compensate(ctx context.Context, state *model.WorkflowState) error

Compensate is a method of the Engine struct that performs the compensation process for a given workflow state. It retrieves the necessary information and history from the workflow and processes it to determine the compensation steps. It then creates a compensation plan and publishes each step to a designated subject for further processing. Finally, it updates the state of the workflow to indicate that the compensation is in progress. It returns an error if any step of the compensation process encounters an issue.

func (*Engine) GetGatewayInstance

func (s *Engine) GetGatewayInstance(ctx context.Context, gatewayInstanceID string) (*model.Gateway, error)

GetGatewayInstance - returns a gateway instance from the KV store.

func (*Engine) GetGatewayInstanceID

func (s *Engine) GetGatewayInstanceID(state *model.WorkflowState) (string, string, error)

GetGatewayInstanceID - returns a gateawy instance ID and a satisfying route to that gateway.

func (*Engine) Shutdown

func (s *Engine) Shutdown()

Shutdown signals the engine to stop processing.

func (*Engine) Start

func (c *Engine) Start(ctx context.Context) error

Start sets up the activity and job processors and starts the engine processing workflows.

func (*Engine) StartProcessing

func (s *Engine) StartProcessing(ctx context.Context) error

StartProcessing begins listening to all the message processing queues.

type Operations added in v1.1.1131

type Operations struct {
	// contains filtered or unexported fields
}

Operations provides methods for executing and managing workflow processes. It provides methods for various tasks such as canceling process instances, completing tasks, retrieving workflow-related information, and managing workflow execution.

func NewOperations added in v1.1.1131

func NewOperations(natsService *natz.NatsService) (*Operations, error)

NewOperations constructs a new Operations instance

func (*Operations) CancelProcessInstance added in v1.1.1131

func (c *Operations) CancelProcessInstance(ctx context.Context, state *model.WorkflowState) error

CancelProcessInstance cancels a workflow instance with a reason.

func (*Operations) CheckProcessTaskDeprecation added in v1.1.1131

func (s *Operations) CheckProcessTaskDeprecation(ctx context.Context, workflow *model.Workflow, processName string) error

CheckProcessTaskDeprecation checks if all the tasks in a process have not been deprecated.

func (*Operations) CompleteManualTask added in v1.1.1131

func (c *Operations) CompleteManualTask(ctx context.Context, job *model.WorkflowState, newvars []byte) error

CompleteManualTask completes a manual workflow task

func (*Operations) CompleteSendMessageTask added in v1.1.1131

func (c *Operations) CompleteSendMessageTask(ctx context.Context, job *model.WorkflowState, newvars []byte) error

CompleteSendMessageTask completes a send message task

func (*Operations) CompleteServiceTask added in v1.1.1131

func (c *Operations) CompleteServiceTask(ctx context.Context, job *model.WorkflowState, newvars []byte) error

CompleteServiceTask completes a workflow service task

func (*Operations) CompleteUserTask added in v1.1.1131

func (c *Operations) CompleteUserTask(ctx context.Context, job *model.WorkflowState, newvars []byte) error

CompleteUserTask completes and closes a user task with variables

func (*Operations) CreateExecution added in v1.1.1131

func (s *Operations) CreateExecution(ctx context.Context, execution *model.Execution) (*model.Execution, error)

CreateExecution given a workflow, starts a new execution and returns its ID

func (*Operations) CreateJob added in v1.1.1131

func (s *Operations) CreateJob(ctx context.Context, job *model.WorkflowState) (string, error)

CreateJob stores a workflow task state for user tasks.

func (*Operations) CreateProcessInstance added in v1.1.1131

func (s *Operations) CreateProcessInstance(ctx context.Context, executionId string, parentProcessID string, parentElementID string, processName string, workflowName string, workflowId string) (*model.ProcessInstance, error)

CreateProcessInstance creates a new instance of a process and attaches it to the workflow instance.

func (*Operations) DeleteJob added in v1.1.1131

func (s *Operations) DeleteJob(ctx context.Context, trackingID string) error

DeleteJob removes a workflow task state.

func (*Operations) DeleteNamespace added in v1.1.1131

func (s *Operations) DeleteNamespace(ctx context.Context, ns string) error

DeleteNamespace deletes the key-value store for the specified namespace in SHAR. It iterates over all the key-value stores and deletes them one by one. The function returns nil if all key-value stores are successfully deleted.

func (*Operations) DeprecateTaskSpec added in v1.1.1131

func (s *Operations) DeprecateTaskSpec(ctx context.Context, uid []string) error

DeprecateTaskSpec deprecates one or more task specs by ID.

func (*Operations) DestroyProcessInstance added in v1.1.1131

func (s *Operations) DestroyProcessInstance(ctx context.Context, state *model.WorkflowState, processInstanceId string, executionId string) error

DestroyProcessInstance deletes a process instance and removes the workflow instance dependent on all process instances being satisfied.

func (*Operations) EnsureServiceTaskConsumer added in v1.1.1131

func (s *Operations) EnsureServiceTaskConsumer(ctx context.Context, uid string) error

EnsureServiceTaskConsumer creates or updates a service task consumer.

func (*Operations) GetCompensationInputVariables added in v1.1.1131

func (s *Operations) GetCompensationInputVariables(ctx context.Context, processInstanceId string, trackingId string) ([]byte, error)

GetCompensationInputVariables is a method of the Operations struct that retrieves the original input variables for a specific process instance and tracking ID. It returns the variables in byte array format.

func (*Operations) GetCompensationOutputVariables added in v1.1.1131

func (s *Operations) GetCompensationOutputVariables(ctx context.Context, processInstanceId string, trackingId string) ([]byte, error)

GetCompensationOutputVariables is a method of the Operations struct that retrieves the original output variables of a compensation history entry for a specific process instance and tracking ID. It returns the output variables as a byte array.

func (*Operations) GetElement added in v1.1.1131

func (s *Operations) GetElement(ctx context.Context, state *model.WorkflowState) (*model.Element, error)

GetElement gets the definition for the current element given a workflow state.

func (*Operations) GetExecutableWorkflowIds added in v1.1.1131

func (s *Operations) GetExecutableWorkflowIds(ctx context.Context) ([]string, error)

GetExecutableWorkflowIds returns a list of all workflow Ids that contain executable processes

func (*Operations) GetExecution added in v1.1.1131

func (s *Operations) GetExecution(ctx context.Context, executionID string) (*model.Execution, error)

GetExecution retrieves an execution given its ID.

func (*Operations) GetJob added in v1.1.1131

func (s *Operations) GetJob(ctx context.Context, trackingID string) (*model.WorkflowState, error)

GetJob gets a workflow task state.

func (*Operations) GetLatestVersion added in v1.1.1131

func (s *Operations) GetLatestVersion(ctx context.Context, workflowName string) (string, error)

GetLatestVersion queries the workflow versions table for the latest entry

func (*Operations) GetOldState added in v1.1.1131

func (s *Operations) GetOldState(ctx context.Context, id string) (*model.WorkflowState, error)

GetOldState gets a task state given its tracking ID.

func (*Operations) GetProcessHistory added in v1.1.1131

func (s *Operations) GetProcessHistory(ctx context.Context, processInstanceId string, wch chan<- *model.ProcessHistoryEntry, errs chan<- error)

GetProcessHistory fetches the history object for a process.

func (*Operations) GetProcessHistoryItem added in v1.1.1131

func (s *Operations) GetProcessHistoryItem(ctx context.Context, processInstanceID string, trackingID string, historyType model.ProcessHistoryType) (*model.ProcessHistoryEntry, error)

GetProcessHistoryItem retrieves a process history entry based on the given process instance ID, tracking ID, and history type. If the entry is successfully retrieved, it is unmarshaled into a model.ProcessHistoryEntry object and returned. If an error occurs during the retrieval or unmarshaling process, an error is returned.

func (*Operations) GetProcessIdFor added in v1.1.1131

func (s *Operations) GetProcessIdFor(ctx context.Context, startEventMessageName string) (string, error)

GetProcessIdFor retrieves the processId that a begun by a message start event

func (*Operations) GetProcessInstance added in v1.1.1131

func (s *Operations) GetProcessInstance(ctx context.Context, processInstanceID string) (*model.ProcessInstance, error)

GetProcessInstance returns a process instance for a given process ID

func (*Operations) GetProcessInstanceStatus added in v1.1.1131

func (s *Operations) GetProcessInstanceStatus(ctx context.Context, id string, wch chan<- *model.WorkflowState, errs chan<- error)

GetProcessInstanceStatus returns a list of workflow statuses for the specified process instance ID.

func (*Operations) GetTaskSpecByUID added in v1.1.1131

func (s *Operations) GetTaskSpecByUID(ctx context.Context, uid string) (*model.TaskSpec, error)

GetTaskSpecByUID fetches a task spec from the database.

func (*Operations) GetTaskSpecUID added in v1.1.1131

func (s *Operations) GetTaskSpecUID(ctx context.Context, name string) (string, error)

GetTaskSpecUID fetches

func (*Operations) GetTaskSpecUsage added in v1.1.1131

func (s *Operations) GetTaskSpecUsage(ctx context.Context, uid []string) (*model.TaskSpecUsageReport, error)

GetTaskSpecUsage returns the usage report for a list of task specs.

func (*Operations) GetTaskSpecUsageByName added in v1.1.1131

func (s *Operations) GetTaskSpecUsageByName(ctx context.Context, name string) (*model.TaskSpecUsageReport, error)

GetTaskSpecUsageByName produces a report of running and executable places where the task spec is in use.

func (*Operations) GetTaskSpecVersions added in v1.1.1131

func (s *Operations) GetTaskSpecVersions(ctx context.Context, name string) (*model.TaskSpecVersions, error)

GetTaskSpecVersions fetches the versions of a given task spec name

func (*Operations) GetUserTaskIDs added in v1.1.1131

func (s *Operations) GetUserTaskIDs(ctx context.Context, owner string) (*model.UserTasks, error)

GetUserTaskIDs gets a list of tasks given an owner.

func (*Operations) GetWorkflow added in v1.1.1131

func (s *Operations) GetWorkflow(ctx context.Context, workflowID string) (*model.Workflow, error)

GetWorkflow - retrieves a workflow model given its ID

func (*Operations) GetWorkflowNameFor added in v1.1.1131

func (s *Operations) GetWorkflowNameFor(ctx context.Context, processName string) (string, error)

GetWorkflowNameFor - get the worflow name a process is associated with

func (*Operations) GetWorkflowVersions added in v1.1.1131

func (s *Operations) GetWorkflowVersions(ctx context.Context, workflowName string, wch chan<- *model.WorkflowVersion, errs chan<- error)

GetWorkflowVersions - returns a list of versions for a given workflow.

func (*Operations) HandleWorkflowError added in v1.1.1131

func (c *Operations) HandleWorkflowError(ctx context.Context, errorCode string, message string, inVars []byte, job *model.WorkflowState) error

HandleWorkflowError handles a workflow error by looking up the error definitions in the workflow, determining the appropriate action to take, and publishing the necessary workflow state updates. It returns an error if there was an issue retrieving the workflow definition, if the workflow doesn't support the specified error code.

func (*Operations) HasValidExecution added in v1.1.1131

func (s *Operations) HasValidExecution(ctx context.Context, executionId string) (*model.Execution, error)

HasValidExecution checks to see whether an execution exists for the executionId

func (*Operations) HasValidProcess added in v1.1.1131

func (s *Operations) HasValidProcess(ctx context.Context, processInstanceId, executionId string) (*model.ProcessInstance, *model.Execution, error)

HasValidProcess - checks for a valid process and instance for a workflow process and instance ids

func (*Operations) Heartbeat added in v1.1.1131

func (s *Operations) Heartbeat(ctx context.Context, req *model.HeartbeatRequest) error

Heartbeat saves a client status to the client KV.

func (*Operations) Launch added in v1.1.1131

func (c *Operations) Launch(ctx context.Context, processName string, vars []byte) (string, string, error)

Launch starts a new instance of a workflow and returns an execution ID.

func (*Operations) LaunchWithParent added in v1.1.1131

func (c *Operations) LaunchWithParent(ctx context.Context, processName string, ID common.TrackingID, vrs []byte, parentpiID string, parentElID string) (string, string, error)

LaunchWithParent contains the underlying logic to start a workflow. It is also called to spawn new instances of child workflows.

func (*Operations) ListExecutableProcesses added in v1.1.1131

func (s *Operations) ListExecutableProcesses(ctx context.Context, wch chan<- *model.ListExecutableProcessesItem, errs chan<- error)

ListExecutableProcesses returns a list of all the executable processes in SHAR. It retrieves the current SHAR namespace from the context and fetches the workflow versions for that namespace from the key-value store. It then iterates through each workflow version and loads the corresponding workflow. For each process in the workflow, it creates a ListExecutableProcessesItem object and populates it with the process name, workflow name, and the executable start parameters obtained from the workflow's start events. It sends each ListExecutableProcessesItem object to the wch channel.

Parameters: - ctx: The context containing the SHAR namespace. - wch: The channel for sending the list of executable processes. - errs: The channel for sending any errors that occur.

Returns: Nothing. Errors are sent to the errs channel if encountered.

func (*Operations) ListExecutionProcesses added in v1.1.1131

func (s *Operations) ListExecutionProcesses(ctx context.Context, id string) ([]string, error)

ListExecutionProcesses gets the current processIDs for an execution.

func (*Operations) ListExecutions added in v1.1.1131

func (s *Operations) ListExecutions(ctx context.Context, workflowName string, wch chan<- *model.ListExecutionItem, errs chan<- error)

ListExecutions returns a list of running workflows and versions given a workflow Name

func (*Operations) ListTaskSpecUIDs added in v1.1.1131

func (s *Operations) ListTaskSpecUIDs(ctx context.Context, deprecated bool) ([]string, error)

ListTaskSpecUIDs lists UIDs of active (and optionally deprecated) tasks specs.

func (*Operations) ListWorkflows added in v1.1.1131

func (s *Operations) ListWorkflows(ctx context.Context, res chan<- *model.ListWorkflowResponse, errs chan<- error)

ListWorkflows returns a list of all the workflows in SHAR.

func (*Operations) LoadWorkflow added in v1.1.1131

func (c *Operations) LoadWorkflow(ctx context.Context, model *model.Workflow) (string, error)

LoadWorkflow loads a model.Process describing a workflow into the engine ready for execution.

func (*Operations) Log added in v1.1.1131

func (s *Operations) Log(ctx context.Context, req *model.LogRequest) error

Log publishes LogRequest to WorkflowTelemetry Logs subject

func (*Operations) OwnerID added in v1.1.1131

func (s *Operations) OwnerID(ctx context.Context, name string) (string, error)

OwnerID gets a unique identifier for a task owner.

func (*Operations) OwnerName added in v1.1.1131

func (s *Operations) OwnerName(ctx context.Context, id string) (string, error)

OwnerName retrieves an owner name given an ID.

func (*Operations) ProcessServiceTasks added in v1.1.1131

func (s *Operations) ProcessServiceTasks(ctx context.Context, wf *model.Workflow, svcTaskConsFn ServiceTaskConsumerFn, wfProcessMappingFn WorkflowProcessMappingFn) error

ProcessServiceTasks iterates over service tasks in the processes of a given workflow setting, validating them and setting their uid into their element definitions

func (*Operations) PublishMsg added in v1.1.1131

func (s *Operations) PublishMsg(ctx context.Context, subject string, sharMsg proto.Message) error

PublishMsg publishes a workflow message.

func (*Operations) PublishWorkflowState added in v1.1.1131

func (s *Operations) PublishWorkflowState(ctx context.Context, stateName string, state *model.WorkflowState, opts ...PublishOpt) error

PublishWorkflowState publishes a SHAR state object to a given subject

func (*Operations) PutTaskSpec added in v1.1.1131

func (s *Operations) PutTaskSpec(ctx context.Context, spec *model.TaskSpec) (string, error)

PutTaskSpec writes a task spec to the database.

func (*Operations) RecordHistory added in v1.1.1131

func (s *Operations) RecordHistory(ctx context.Context, state *model.WorkflowState, historyType model.ProcessHistoryType) error

RecordHistory records into the history KV.

func (*Operations) RecordHistoryActivityComplete added in v1.1.1131

func (s *Operations) RecordHistoryActivityComplete(ctx context.Context, state *model.WorkflowState) error

RecordHistoryActivityComplete records the activity completion into the history object.

func (*Operations) RecordHistoryActivityExecute added in v1.1.1131

func (s *Operations) RecordHistoryActivityExecute(ctx context.Context, state *model.WorkflowState) error

RecordHistoryActivityExecute records the activity execute into the history object.

func (*Operations) RecordHistoryCompensationCheckpoint added in v1.1.1131

func (s *Operations) RecordHistoryCompensationCheckpoint(ctx context.Context, state *model.WorkflowState) error

RecordHistoryCompensationCheckpoint records the process aborting into the history object.

func (*Operations) RecordHistoryJobAbort added in v1.1.1131

func (s *Operations) RecordHistoryJobAbort(ctx context.Context, state *model.WorkflowState) error

RecordHistoryJobAbort records the job abort into the history object.

func (*Operations) RecordHistoryJobComplete added in v1.1.1131

func (s *Operations) RecordHistoryJobComplete(ctx context.Context, state *model.WorkflowState) error

RecordHistoryJobComplete records the job completion into the history object.

func (*Operations) RecordHistoryJobExecute added in v1.1.1131

func (s *Operations) RecordHistoryJobExecute(ctx context.Context, state *model.WorkflowState) error

RecordHistoryJobExecute records the job execute into the history object.

func (*Operations) RecordHistoryProcessAbort added in v1.1.1131

func (s *Operations) RecordHistoryProcessAbort(ctx context.Context, state *model.WorkflowState) error

RecordHistoryProcessAbort records the process aborting into the history object.

func (*Operations) RecordHistoryProcessComplete added in v1.1.1131

func (s *Operations) RecordHistoryProcessComplete(ctx context.Context, state *model.WorkflowState) error

RecordHistoryProcessComplete records the process completion into the history object.

func (*Operations) RecordHistoryProcessSpawn added in v1.1.1131

func (s *Operations) RecordHistoryProcessSpawn(ctx context.Context, state *model.WorkflowState, newProcessInstanceID string) error

RecordHistoryProcessSpawn records the process spawning a new process into the history object.

func (*Operations) RecordHistoryProcessStart added in v1.1.1131

func (s *Operations) RecordHistoryProcessStart(ctx context.Context, state *model.WorkflowState) error

RecordHistoryProcessStart records the process start into the history object.

func (*Operations) SaveState added in v1.1.1131

func (s *Operations) SaveState(ctx context.Context, id string, state *model.WorkflowState) error

SaveState saves the task state.

func (*Operations) SignalFatalError added in v1.1.1131

func (s *Operations) SignalFatalError(ctx context.Context, state *model.WorkflowState, log *slog.Logger)

SignalFatalError publishes a FatalError message on death of a process in a workflow

func (*Operations) StartJob added in v1.1.1131

func (s *Operations) StartJob(ctx context.Context, subject string, job *model.WorkflowState, el *model.Element, v []byte, opts ...PublishOpt) error

StartJob launches a user/service task

func (*Operations) StoreWorkflow added in v1.1.1131

func (s *Operations) StoreWorkflow(ctx context.Context, wf *model.Workflow) (string, error)

StoreWorkflow stores a workflow definition and returns a unique ID

func (*Operations) XDestroyProcessInstance added in v1.1.1131

func (s *Operations) XDestroyProcessInstance(ctx context.Context, state *model.WorkflowState) error

XDestroyProcessInstance terminates a running process instance with a cancellation reason and error

type PublishOpt

type PublishOpt interface {
	Apply(n *publishOptions)
}

PublishOpt represents an option that can be used when publishing a workflow state

type ServiceTaskConsumerFn

type ServiceTaskConsumerFn func(ctx context.Context, id string) error

ServiceTaskConsumerFn defines the type of a function that ensures existence of a service task consumer

type WorkflowProcessMappingFn

type WorkflowProcessMappingFn func(ctx context.Context, wf *model.Workflow, i *model.Process) (uint64, error)

WorkflowProcessMappingFn defines the type of a function that creates a workflow to process mapping

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL