workflow

package
v1.11.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 14, 2024 License: Apache-2.0 Imports: 15 Imported by: 15

Documentation

Overview

Copyright 2024 The Dapr Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright 2024 The Dapr Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright 2024 The Dapr Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright 2024 The Dapr Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright 2024 The Dapr Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright 2024 The Dapr Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ActivityInput

func ActivityInput(input any) callActivityOption

ActivityInput is an option to pass a JSON-serializable input

func ActivityRawInput

func ActivityRawInput(input string) callActivityOption

ActivityRawInput is an option to pass a byte slice as an input

func ChildWorkflowInput

func ChildWorkflowInput(input any) callChildWorkflowOption

ChildWorkflowInput is an option to provide a JSON-serializable input when calling a child workflow.

func ChildWorkflowInstanceID

func ChildWorkflowInstanceID(instanceID string) callChildWorkflowOption

ChildWorkflowInstanceID is an option to provide an instance id when calling a child workflow.

func ChildWorkflowRawInput

func ChildWorkflowRawInput(input string) callChildWorkflowOption

ChildWorkflowRawInput is an option to provide a byte slice input when calling a child workflow.

func NewTaskSlice

func NewTaskSlice(length int) []task.Task

NewTaskSlice returns a slice of tasks which can be executed in parallel

func WithDaprClient

func WithDaprClient(input dapr.Client) clientOption

WithDaprClient is an option to supply a custom dapr.Client to the workflow client.

func WithEventPayload

func WithEventPayload(data any) api.RaiseEventOptions

WithEventPayload is an option to send a payload with an event to a workflow.

func WithFetchPayloads

func WithFetchPayloads(fetchPayloads bool) api.FetchOrchestrationMetadataOptions

WithFetchPayloads is an option to return the payload from a workflow.

func WithInput

func WithInput(input any) api.NewOrchestrationOptions

WithInput is an option to pass an input when scheduling a new workflow.

func WithInstanceID

func WithInstanceID(id string) api.NewOrchestrationOptions

WithInstanceID is an option to set an InstanceID when scheduling a new workflow.

func WithOutput

func WithOutput(data any) api.TerminateOptions

WithOutput is an option to define an output when terminating a workflow.

func WithRawEventData

func WithRawEventData(data string) api.RaiseEventOptions

WithRawEventData is an option to send a byte slice with an event to a workflow.

func WithRawInput

func WithRawInput(input string) api.NewOrchestrationOptions

WithRawInput is an option to pass a byte slice as an input when scheduling a new workflow.

func WithRawOutput

func WithRawOutput(data string) api.TerminateOptions

WithRawOutput is an option to define a byte slice to output when terminating a workflow.

func WithStartTime

func WithStartTime(time time.Time) api.NewOrchestrationOptions

WithStartTime is an option to set the start time when scheduling a new workflow.

func WorkerWithDaprClient

func WorkerWithDaprClient(input dapr.Client) workerOption

WorkerWithDaprClient allows you to specify a custom dapr.Client for the worker.

Types

type Activity

type Activity func(ctx ActivityContext) (any, error)

type ActivityContext

type ActivityContext struct {
	// contains filtered or unexported fields
}

func (*ActivityContext) Context

func (wfac *ActivityContext) Context() context.Context

func (*ActivityContext) GetInput

func (wfac *ActivityContext) GetInput(v interface{}) error

type Client added in v1.11.0

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(opts ...clientOption) (*Client, error)

NewClient returns a workflow client.

func (*Client) FetchWorkflowMetadata added in v1.11.0

func (c *Client) FetchWorkflowMetadata(ctx context.Context, id string, opts ...api.FetchOrchestrationMetadataOptions) (*Metadata, error)

FetchWorkflowMetadata will return the metadata for a given workflow InstanceID and/or error.

func (*Client) PurgeWorkflow added in v1.11.0

func (c *Client) PurgeWorkflow(ctx context.Context, id string) error

PurgeWorkflow will purge a given workflow and return an error output. NOTE: The workflow must be in a terminated or completed state.

func (*Client) RaiseEvent added in v1.11.0

func (c *Client) RaiseEvent(ctx context.Context, id, eventName string, opts ...api.RaiseEventOptions) error

RaiseEvent will raise an event on a given workflow and return an error output.

func (*Client) ResumeWorkflow added in v1.11.0

func (c *Client) ResumeWorkflow(ctx context.Context, id, reason string) error

ResumeWorkflow will resume a suspended workflow and return an error output.

func (*Client) ScheduleNewWorkflow added in v1.11.0

func (c *Client) ScheduleNewWorkflow(ctx context.Context, workflow string, opts ...api.NewOrchestrationOptions) (id string, err error)

ScheduleNewWorkflow will start a workflow and return the ID and/or error.

func (*Client) SuspendWorkflow added in v1.11.0

func (c *Client) SuspendWorkflow(ctx context.Context, id, reason string) error

SuspendWorkflow will pause a given workflow and return an error output.

func (*Client) TerminateWorkflow added in v1.11.0

func (c *Client) TerminateWorkflow(ctx context.Context, id string, opts ...api.TerminateOptions) error

TerminateWorkflow will stop a given workflow and return an error output.

func (*Client) WaitForWorkflowCompletion added in v1.11.0

func (c *Client) WaitForWorkflowCompletion(ctx context.Context, id string, opts ...api.FetchOrchestrationMetadataOptions) (*Metadata, error)

WaitForWorkflowCompletion will block pending the completion of a specified workflow and return the metadata and/or error.

func (*Client) WaitForWorkflowStart added in v1.11.0

func (c *Client) WaitForWorkflowStart(ctx context.Context, id string, opts ...api.FetchOrchestrationMetadataOptions) (*Metadata, error)

WaitForWorkflowStart will wait for a given workflow to start and return metadata and/or an error.

type FailureDetails

type FailureDetails struct {
	Type           string          `json:"type"`
	Message        string          `json:"message"`
	StackTrace     string          `json:"stackTrace"`
	InnerFailure   *FailureDetails `json:"innerFailure"`
	IsNonRetriable bool            `json:"IsNonRetriable"`
}

type Metadata

type Metadata struct {
	InstanceID             string          `json:"id"`
	Name                   string          `json:"name"`
	RuntimeStatus          Status          `json:"status"`
	CreatedAt              time.Time       `json:"createdAt"`
	LastUpdatedAt          time.Time       `json:"lastUpdatedAt"`
	SerializedInput        string          `json:"serializedInput"`
	SerializedOutput       string          `json:"serializedOutput"`
	SerializedCustomStatus string          `json:"serializedCustomStatus"`
	FailureDetails         *FailureDetails `json:"failureDetails"`
}

type Status

type Status int
const (
	StatusRunning Status = iota
	StatusCompleted
	StatusContinuedAsNew
	StatusFailed
	StatusCanceled
	StatusTerminated
	StatusPending
	StatusSuspended
	StatusUnknown
)

func (Status) String

func (s Status) String() string

String returns the runtime status as a string.

type Workflow

type Workflow func(ctx *WorkflowContext) (any, error)

type WorkflowContext

type WorkflowContext struct {
	// contains filtered or unexported fields
}

func (*WorkflowContext) CallActivity

func (wfc *WorkflowContext) CallActivity(activity interface{}, opts ...callActivityOption) task.Task

CallActivity returns a completable task for a given activity. You must call Await(output any) on the returned Task to block the workflow and wait for the task to complete. The value passed to the Await method must be a pointer or can be nil to ignore the returned value. Alternatively, tasks can be awaited using the task.WhenAll or task.WhenAny methods, allowing the workflow to block and wait for multiple tasks at the same time.

func (*WorkflowContext) CallChildWorkflow

func (wfc *WorkflowContext) CallChildWorkflow(workflow interface{}, opts ...callChildWorkflowOption) task.Task

CallChildWorkflow returns a completable task for a given workflow. You must call Await(output any) on the returned Task to block the workflow and wait for the task to complete. The value passed to the Await method must be a pointer or can be nil to ignore the returned value. Alternatively, tasks can be awaited using the task.WhenAll or task.WhenAny methods, allowing the workflow to block and wait for multiple tasks at the same time.

func (*WorkflowContext) ContinueAsNew

func (wfc *WorkflowContext) ContinueAsNew(newInput any, keepEvents bool)

ContinueAsNew configures the workflow.

func (*WorkflowContext) CreateTimer

func (wfc *WorkflowContext) CreateTimer(duration time.Duration) task.Task

CreateTimer returns a completable task that blocks for a given duration. You must call Await(output any) on the returned Task to block the workflow and wait for the task to complete. The value passed to the Await method must be a pointer or can be nil to ignore the returned value. Alternatively, tasks can be awaited using the task.WhenAll or task.WhenAny methods, allowing the workflow to block and wait for multiple tasks at the same time.

func (*WorkflowContext) CurrentUTCDateTime

func (wfc *WorkflowContext) CurrentUTCDateTime() time.Time

CurrentUTCDateTime returns the current workflow time as UTC. Note that this should be used instead of `time.Now()`, which is not compatible with workflow replays.

func (*WorkflowContext) GetInput

func (wfc *WorkflowContext) GetInput(v interface{}) error

GetInput casts the input from the context to a specified interface.

func (*WorkflowContext) InstanceID

func (wfc *WorkflowContext) InstanceID() string

InstanceID returns the ID of the currently executing workflow

func (*WorkflowContext) IsReplaying

func (wfc *WorkflowContext) IsReplaying() bool

IsReplaying returns whether the workflow is replaying.

func (*WorkflowContext) Name

func (wfc *WorkflowContext) Name() string

Name returns the name string from the workflow context.

func (*WorkflowContext) WaitForExternalEvent

func (wfc *WorkflowContext) WaitForExternalEvent(eventName string, timeout time.Duration) task.Task

WaitForExternalEvent returns a completabel task that waits for a given event to be received. You must call Await(output any) on the returned Task to block the workflow and wait for the task to complete. The value passed to the Await method must be a pointer or can be nil to ignore the returned value. Alternatively, tasks can be awaited using the task.WhenAll or task.WhenAny methods, allowing the workflow to block and wait for multiple tasks at the same time.

type WorkflowState

type WorkflowState struct {
	Metadata api.OrchestrationMetadata
}

func (*WorkflowState) RuntimeStatus

func (wfs *WorkflowState) RuntimeStatus() Status

RuntimeStatus returns the status from a workflow state.

type WorkflowWorker

type WorkflowWorker struct {
	// contains filtered or unexported fields
}

func NewWorker

func NewWorker(opts ...workerOption) (*WorkflowWorker, error)

NewWorker returns a worker that can interface with the workflow engine

func (*WorkflowWorker) RegisterActivity

func (ww *WorkflowWorker) RegisterActivity(a Activity) error

RegisterActivity adds an activity function to the registry

func (*WorkflowWorker) RegisterWorkflow

func (ww *WorkflowWorker) RegisterWorkflow(w Workflow) error

RegisterWorkflow adds a workflow function to the registry

func (*WorkflowWorker) Shutdown

func (ww *WorkflowWorker) Shutdown() error

Shutdown stops the worker

func (*WorkflowWorker) Start

func (ww *WorkflowWorker) Start() error

Start initialises a non-blocking worker to handle workflows and activities registered prior to this being called.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL