Documentation ¶
Index ¶
- Variables
- func NoOpServiceTaskConsumerFn(_ context.Context, _ string) error
- func NoOpWorkFlowProcessMappingFn(_ context.Context, _ *model.Workflow, _ *model.Process) (uint64, error)
- func WithEmbargo(embargo int) *publishEmbargoOption
- type NamespaceKvs
- type Nats
- func (s *Nats) CheckProcessTaskDeprecation(ctx context.Context, workflow *model.Workflow, processName string) error
- func (s *Nats) CloseUserTask(ctx context.Context, trackingID string) error
- func (s *Nats) Conn() common.NatsConn
- func (s *Nats) CreateExecution(ctx context.Context, execution *model.Execution) (*model.Execution, error)
- func (s *Nats) CreateJob(ctx context.Context, job *model.WorkflowState) (string, error)
- func (s *Nats) CreateProcessInstance(ctx context.Context, executionId string, parentProcessID string, ...) (*model.ProcessInstance, error)
- func (s *Nats) DeleteJob(ctx context.Context, trackingID string) error
- func (s *Nats) DeleteNamespace(ctx context.Context, ns string) error
- func (s *Nats) DeprecateTaskSpec(ctx context.Context, uid []string) error
- func (s *Nats) DestroyProcessInstance(ctx context.Context, state *model.WorkflowState, pi *model.ProcessInstance, ...) error
- func (s *Nats) EnsureServiceTaskConsumer(ctx context.Context, uid string) error
- func (s *Nats) GetElement(ctx context.Context, state *model.WorkflowState) (*model.Element, error)
- func (s *Nats) GetExecutableWorkflowIds(ctx context.Context) ([]string, error)
- func (s *Nats) GetExecution(ctx context.Context, executionID string) (*model.Execution, error)
- func (s *Nats) GetGatewayInstance(ctx context.Context, gatewayInstanceID string) (*model.Gateway, error)
- func (s *Nats) GetGatewayInstanceID(state *model.WorkflowState) (string, string, error)
- func (s *Nats) GetJob(ctx context.Context, trackingID string) (*model.WorkflowState, error)
- func (s *Nats) GetLatestVersion(ctx context.Context, workflowName string) (string, error)
- func (s *Nats) GetOldState(ctx context.Context, id string) (*model.WorkflowState, error)
- func (s *Nats) GetProcessHistory(ctx context.Context, processInstanceId string, ...)
- func (s *Nats) GetProcessIdFor(ctx context.Context, startEventMessageName string) (string, error)
- func (s *Nats) GetProcessInstance(ctx context.Context, processInstanceID string) (*model.ProcessInstance, error)
- func (s *Nats) GetProcessInstanceStatus(ctx context.Context, id string, wch chan<- *model.WorkflowState, ...)
- func (s *Nats) GetTaskSpecByUID(ctx context.Context, uid string) (*model.TaskSpec, error)
- func (s *Nats) GetTaskSpecUID(ctx context.Context, name string) (string, error)
- func (s *Nats) GetTaskSpecUsage(ctx context.Context, uid []string) (*model.TaskSpecUsageReport, error)
- func (s *Nats) GetTaskSpecUsageByName(ctx context.Context, name string) (*model.TaskSpecUsageReport, error)
- func (s *Nats) GetTaskSpecVersions(ctx context.Context, name string) (*model.TaskSpecVersions, error)
- func (s *Nats) GetUserTaskIDs(ctx context.Context, owner string) (*model.UserTasks, error)
- func (s *Nats) GetWorkflow(ctx context.Context, workflowID string) (*model.Workflow, error)
- func (s *Nats) GetWorkflowNameFor(ctx context.Context, processName string) (string, error)
- func (s *Nats) GetWorkflowVersions(ctx context.Context, workflowName string, wch chan<- *model.WorkflowVersion, ...)
- func (s *Nats) HasValidProcess(ctx context.Context, processInstanceId, executionId string) (*model.ProcessInstance, *model.Execution, error)
- func (s *Nats) Heartbeat(ctx context.Context, req *model.HeartbeatRequest) error
- func (s *Nats) KvsFor(ctx context.Context, ns string) (*NamespaceKvs, error)
- func (s *Nats) ListExecutableProcesses(ctx context.Context, wch chan<- *model.ListExecutableProcessesItem, ...)
- func (s *Nats) ListExecutionProcesses(ctx context.Context, id string) ([]string, error)
- func (s *Nats) ListExecutions(ctx context.Context, workflowName string, wch chan<- *model.ListExecutionItem, ...)
- func (s *Nats) ListTaskSpecUIDs(ctx context.Context, deprecated bool) ([]string, error)
- func (s *Nats) ListWorkflows(ctx context.Context, res chan<- *model.ListWorkflowResponse, errs chan<- error)
- func (s *Nats) Log(ctx context.Context, req *model.LogRequest) error
- func (s *Nats) OwnerID(ctx context.Context, name string) (string, error)
- func (s *Nats) OwnerName(ctx context.Context, id string) (string, error)
- func (s *Nats) ProcessServiceTasks(ctx context.Context, wf *model.Workflow, svcTaskConsFn ServiceTaskConsumerFn, ...) error
- func (s *Nats) PublishMessage(ctx context.Context, name string, key string, vars []byte) error
- func (s *Nats) PublishWorkflowState(ctx context.Context, stateName string, state *model.WorkflowState, ...) error
- func (s *Nats) PutTaskSpec(ctx context.Context, spec *model.TaskSpec) (string, error)
- func (s *Nats) RecordHistory(ctx context.Context, state *model.WorkflowState, ...) error
- func (s *Nats) RecordHistoryActivityComplete(ctx context.Context, state *model.WorkflowState) error
- func (s *Nats) RecordHistoryActivityExecute(ctx context.Context, state *model.WorkflowState) error
- func (s *Nats) RecordHistoryProcessAbort(ctx context.Context, state *model.WorkflowState) error
- func (s *Nats) RecordHistoryProcessComplete(ctx context.Context, state *model.WorkflowState) error
- func (s *Nats) RecordHistoryProcessSpawn(ctx context.Context, state *model.WorkflowState, newProcessInstanceID string) error
- func (s *Nats) RecordHistoryProcessStart(ctx context.Context, state *model.WorkflowState) error
- func (s *Nats) SaveState(ctx context.Context, id string, state *model.WorkflowState) error
- func (s *Nats) SetAbort(processor services.AbortFunc)
- func (s *Nats) SetCompleteActivity(processor services.CompleteActivityFunc)
- func (s *Nats) SetCompleteActivityProcessor(processor services.CompleteActivityProcessorFunc)
- func (s *Nats) SetCompleteJobProcessor(processor services.CompleteJobProcessorFunc)
- func (s *Nats) SetEventProcessor(processor services.EventProcessorFunc)
- func (s *Nats) SetLaunchFunc(processor services.LaunchFunc)
- func (s *Nats) SetMessageProcessor(processor services.MessageProcessorFunc)
- func (s *Nats) SetTraversalProvider(provider services.TraversalFunc)
- func (s *Nats) Shutdown()
- func (s *Nats) StartProcessing(ctx context.Context) error
- func (s *Nats) StoreWorkflow(ctx context.Context, wf *model.Workflow) (string, error)
- func (s *Nats) XDestroyProcessInstance(ctx context.Context, state *model.WorkflowState) error
- type PublishOpt
- type ServiceTaskConsumerFn
- type WorkflowProcessMappingFn
Constants ¶
This section is empty.
Variables ¶
var NatsConfig string
NatsConfig holds the current nats configuration for SHAR.
Functions ¶
func NoOpServiceTaskConsumerFn ¶ added in v1.1.927
NoOpServiceTaskConsumerFn no op service task consumer fn
func NoOpWorkFlowProcessMappingFn ¶ added in v1.1.927
func NoOpWorkFlowProcessMappingFn(_ context.Context, _ *model.Workflow, _ *model.Process) (uint64, error)
NoOpWorkFlowProcessMappingFn no op workflow to process mapping fn
func WithEmbargo ¶
func WithEmbargo(embargo int) *publishEmbargoOption
WithEmbargo allows the specification of an embargo time on a workflow state message
Types ¶
type NamespaceKvs ¶ added in v1.1.797
type NamespaceKvs struct {
// contains filtered or unexported fields
}
NamespaceKvs defines all of the key value stores shar needs to operate
type Nats ¶
type Nats struct {
// contains filtered or unexported fields
}
Nats contains the engine functions that communicate with NATS.
func New ¶
func New(conn *nats.Conn, txConn *nats.Conn, storageType jetstream.StorageType, concurrency int, allowOrphanServiceTasks bool, telCfg telemetry.Config) (*Nats, error)
New creates a new instance of the NATS communication layer.
func (*Nats) CheckProcessTaskDeprecation ¶ added in v1.1.670
func (s *Nats) CheckProcessTaskDeprecation(ctx context.Context, workflow *model.Workflow, processName string) error
CheckProcessTaskDeprecation checks if all the tasks in a process have not been deprecated.
func (*Nats) CloseUserTask ¶
CloseUserTask removes a completed user task.
func (*Nats) CreateExecution ¶ added in v1.0.645
func (s *Nats) CreateExecution(ctx context.Context, execution *model.Execution) (*model.Execution, error)
CreateExecution given a workflow, starts a new execution and returns its ID
func (*Nats) CreateProcessInstance ¶
func (s *Nats) CreateProcessInstance(ctx context.Context, executionId string, parentProcessID string, parentElementID string, processName string, workflowName string, workflowId string) (*model.ProcessInstance, error)
CreateProcessInstance creates a new instance of a process and attaches it to the workflow instance.
func (*Nats) DeleteNamespace ¶ added in v1.1.927
DeleteNamespace deletes the key-value store for the specified namespace in SHAR. It iterates over all the key-value stores and deletes them one by one. The function returns nil if all key-value stores are successfully deleted.
func (*Nats) DeprecateTaskSpec ¶ added in v1.1.670
DeprecateTaskSpec deprecates one or more task specs by ID.
func (*Nats) DestroyProcessInstance ¶
func (s *Nats) DestroyProcessInstance(ctx context.Context, state *model.WorkflowState, pi *model.ProcessInstance, execution *model.Execution) error
DestroyProcessInstance deletes a process instance and removes the workflow instance dependent on all process instances being satisfied.
func (*Nats) EnsureServiceTaskConsumer ¶ added in v1.1.670
EnsureServiceTaskConsumer creates or updates a service task consumer.
func (*Nats) GetElement ¶
GetElement gets the definition for the current element given a workflow state.
func (*Nats) GetExecutableWorkflowIds ¶ added in v1.1.670
GetExecutableWorkflowIds returns a list of all workflow Ids that contain executable processes
func (*Nats) GetExecution ¶ added in v1.0.645
GetExecution retrieves an execution given its ID.
func (*Nats) GetGatewayInstance ¶
func (s *Nats) GetGatewayInstance(ctx context.Context, gatewayInstanceID string) (*model.Gateway, error)
GetGatewayInstance - returns a gateway instance from the KV store.
func (*Nats) GetGatewayInstanceID ¶
GetGatewayInstanceID - returns a gateawy instance ID and a satisfying route to that gateway.
func (*Nats) GetLatestVersion ¶
GetLatestVersion queries the workflow versions table for the latest entry
func (*Nats) GetOldState ¶
GetOldState gets a task state given its tracking ID.
func (*Nats) GetProcessHistory ¶
func (s *Nats) GetProcessHistory(ctx context.Context, processInstanceId string, wch chan<- *model.ProcessHistoryEntry, errs chan<- error)
GetProcessHistory fetches the history object for a process.
func (*Nats) GetProcessIdFor ¶ added in v1.1.725
GetProcessIdFor retrieves the processId that a begun by a message start event
func (*Nats) GetProcessInstance ¶
func (s *Nats) GetProcessInstance(ctx context.Context, processInstanceID string) (*model.ProcessInstance, error)
GetProcessInstance returns a process instance for a given process ID
func (*Nats) GetProcessInstanceStatus ¶
func (s *Nats) GetProcessInstanceStatus(ctx context.Context, id string, wch chan<- *model.WorkflowState, errs chan<- error)
GetProcessInstanceStatus returns a list of workflow statuses for the specified process instance ID.
func (*Nats) GetTaskSpecByUID ¶ added in v1.0.623
GetTaskSpecByUID fetches a task spec from the database.
func (*Nats) GetTaskSpecUID ¶ added in v1.0.623
GetTaskSpecUID fetches
func (*Nats) GetTaskSpecUsage ¶ added in v1.1.670
func (s *Nats) GetTaskSpecUsage(ctx context.Context, uid []string) (*model.TaskSpecUsageReport, error)
GetTaskSpecUsage returns the usage report for a list of task specs.
func (*Nats) GetTaskSpecUsageByName ¶ added in v1.1.670
func (s *Nats) GetTaskSpecUsageByName(ctx context.Context, name string) (*model.TaskSpecUsageReport, error)
GetTaskSpecUsageByName produces a report of running and executable places where the task spec is in use.
func (*Nats) GetTaskSpecVersions ¶ added in v1.1.670
func (s *Nats) GetTaskSpecVersions(ctx context.Context, name string) (*model.TaskSpecVersions, error)
GetTaskSpecVersions fetches the versions of a given task spec name
func (*Nats) GetUserTaskIDs ¶
GetUserTaskIDs gets a list of tasks given an owner.
func (*Nats) GetWorkflow ¶
GetWorkflow - retrieves a workflow model given its ID
func (*Nats) GetWorkflowNameFor ¶ added in v1.0.645
GetWorkflowNameFor - get the worflow name a process is associated with
func (*Nats) GetWorkflowVersions ¶
func (s *Nats) GetWorkflowVersions(ctx context.Context, workflowName string, wch chan<- *model.WorkflowVersion, errs chan<- error)
GetWorkflowVersions - returns a list of versions for a given workflow.
func (*Nats) HasValidProcess ¶
func (s *Nats) HasValidProcess(ctx context.Context, processInstanceId, executionId string) (*model.ProcessInstance, *model.Execution, error)
HasValidProcess - checks for a valid process and instance for a workflow process and instance ids
func (*Nats) KvsFor ¶ added in v1.1.797
KvsFor retrieves the shar KVs for a given namespace. If they do not exist for a namespace, it will initialise them and store them in a map for future lookup.
func (*Nats) ListExecutableProcesses ¶ added in v1.1.1032
func (s *Nats) ListExecutableProcesses(ctx context.Context, wch chan<- *model.ListExecutableProcessesItem, errs chan<- error)
ListExecutableProcesses returns a list of all the executable processes in SHAR. It retrieves the current SHAR namespace from the context and fetches the workflow versions for that namespace from the key-value store. It then iterates through each workflow version and loads the corresponding workflow. For each process in the workflow, it creates a ListExecutableProcessesItem object and populates it with the process name, workflow name, and the executable start parameters obtained from the workflow's start events. It sends each ListExecutableProcessesItem object to the wch channel.
Parameters: - ctx: The context containing the SHAR namespace. - wch: The channel for sending the list of executable processes. - errs: The channel for sending any errors that occur.
Returns: Nothing. Errors are sent to the errs channel if encountered.
func (*Nats) ListExecutionProcesses ¶ added in v1.0.645
ListExecutionProcesses gets the current processIDs for an execution.
func (*Nats) ListExecutions ¶ added in v1.0.645
func (s *Nats) ListExecutions(ctx context.Context, workflowName string, wch chan<- *model.ListExecutionItem, errs chan<- error)
ListExecutions returns a list of running workflows and versions given a workflow Name
func (*Nats) ListTaskSpecUIDs ¶ added in v1.1.725
ListTaskSpecUIDs lists UIDs of active (and optionally deprecated) tasks specs.
func (*Nats) ListWorkflows ¶
func (s *Nats) ListWorkflows(ctx context.Context, res chan<- *model.ListWorkflowResponse, errs chan<- error)
ListWorkflows returns a list of all the workflows in SHAR.
func (*Nats) ProcessServiceTasks ¶ added in v1.1.927
func (s *Nats) ProcessServiceTasks(ctx context.Context, wf *model.Workflow, svcTaskConsFn ServiceTaskConsumerFn, wfProcessMappingFn WorkflowProcessMappingFn) error
ProcessServiceTasks iterates over service tasks in the processes of a given workflow setting, validating them and setting their uid into their element definitions
func (*Nats) PublishMessage ¶
PublishMessage publishes a workflow message.
func (*Nats) PublishWorkflowState ¶
func (s *Nats) PublishWorkflowState(ctx context.Context, stateName string, state *model.WorkflowState, opts ...PublishOpt) error
PublishWorkflowState publishes a SHAR state object to a given subject
func (*Nats) PutTaskSpec ¶ added in v1.0.623
PutTaskSpec writes a task spec to the database.
func (*Nats) RecordHistory ¶ added in v1.1.927
func (s *Nats) RecordHistory(ctx context.Context, state *model.WorkflowState, historyType model.ProcessHistoryType) error
RecordHistory records into the history KV.
func (*Nats) RecordHistoryActivityComplete ¶
RecordHistoryActivityComplete records the activity completion into the history object.
func (*Nats) RecordHistoryActivityExecute ¶
RecordHistoryActivityExecute records the activity execute into the history object.
func (*Nats) RecordHistoryProcessAbort ¶
RecordHistoryProcessAbort records the process aborting into the history object.
func (*Nats) RecordHistoryProcessComplete ¶
RecordHistoryProcessComplete records the process completion into the history object.
func (*Nats) RecordHistoryProcessSpawn ¶
func (s *Nats) RecordHistoryProcessSpawn(ctx context.Context, state *model.WorkflowState, newProcessInstanceID string) error
RecordHistoryProcessSpawn records the process spawning a new process into the history object.
func (*Nats) RecordHistoryProcessStart ¶
RecordHistoryProcessStart records the process start into the history object.
func (*Nats) SetCompleteActivity ¶
func (s *Nats) SetCompleteActivity(processor services.CompleteActivityFunc)
SetCompleteActivity sets the callback which generates complete activity events.
func (*Nats) SetCompleteActivityProcessor ¶
func (s *Nats) SetCompleteActivityProcessor(processor services.CompleteActivityProcessorFunc)
SetCompleteActivityProcessor sets the callback fired when an activity completes.
func (*Nats) SetCompleteJobProcessor ¶
func (s *Nats) SetCompleteJobProcessor(processor services.CompleteJobProcessorFunc)
SetCompleteJobProcessor sets the callback for completed tasks.
func (*Nats) SetEventProcessor ¶
func (s *Nats) SetEventProcessor(processor services.EventProcessorFunc)
SetEventProcessor sets the callback for processing workflow activities.
func (*Nats) SetLaunchFunc ¶
func (s *Nats) SetLaunchFunc(processor services.LaunchFunc)
SetLaunchFunc sets the callback used to start child workflows.
func (*Nats) SetMessageProcessor ¶
func (s *Nats) SetMessageProcessor(processor services.MessageProcessorFunc)
SetMessageProcessor sets the callback used to create new workflow instances based on a timer.
func (*Nats) SetTraversalProvider ¶
func (s *Nats) SetTraversalProvider(provider services.TraversalFunc)
SetTraversalProvider sets the callback used to handle traversals.
func (*Nats) StartProcessing ¶
StartProcessing begins listening to all the message processing queues.
func (*Nats) StoreWorkflow ¶
StoreWorkflow stores a workflow definition and returns a unique ID
func (*Nats) XDestroyProcessInstance ¶ added in v1.0.645
XDestroyProcessInstance terminates a running process instance with a cancellation reason and error
type PublishOpt ¶
type PublishOpt interface {
Apply(n *publishOptions)
}
PublishOpt represents an option that can be used when publishing a workflow state
type ServiceTaskConsumerFn ¶ added in v1.1.927
ServiceTaskConsumerFn defines the type of a function that ensures existence of a service task consumer