Documentation ¶
Index ¶
- Variables
- func NewTaskExecutor(registry *TaskRegistry) backend.Executor
- func WithActivityInput(input any) callActivityOption
- func WithActivityRetryPolicy(policy *RetryPolicy) callActivityOption
- func WithRawActivityInput(input string) callActivityOption
- func WithRawSubOrchestratorInput(input string) subOrchestratorOption
- func WithSubOrchestrationInstanceID(instanceID string) subOrchestratorOption
- func WithSubOrchestrationRetryPolicy(policy *RetryPolicy) subOrchestratorOption
- func WithSubOrchestratorInput(input any) subOrchestratorOption
- type Activity
- type ActivityContext
- type ContinueAsNewOption
- type OrchestrationContext
- func (ctx *OrchestrationContext) CallActivity(activity interface{}, opts ...callActivityOption) Task
- func (ctx *OrchestrationContext) CallSubOrchestrator(orchestrator interface{}, opts ...subOrchestratorOption) Task
- func (ctx *OrchestrationContext) ContinueAsNew(newInput any, options ...ContinueAsNewOption)
- func (ctx *OrchestrationContext) CreateTimer(delay time.Duration) Task
- func (octx *OrchestrationContext) GetInput(v any) error
- func (octx *OrchestrationContext) SetCustomStatus(cs string)
- func (ctx *OrchestrationContext) WaitForSingleEvent(eventName string, timeout time.Duration) Task
- type Orchestrator
- type RetryPolicy
- type Task
- type TaskRegistry
Constants ¶
This section is empty.
Variables ¶
var ErrTaskBlocked = errors.New("the current task is blocked")
ErrTaskBlocked is not an error, but rather a control flow signal indicating that an orchestrator function has executed as far as it can and that it now needs to unload, dispatch any scheduled tasks, and commit its current execution progress to durable storage.
var ErrTaskCanceled = errors.New("the task was canceled") // CONSIDER: More specific info about the task
ErrTaskCanceled is used to indicate that a task was canceled. Tasks can be canceled, for example, when configured timeouts expire.
Functions ¶
func NewTaskExecutor ¶
func NewTaskExecutor(registry *TaskRegistry) backend.Executor
NewTaskExecutor returns a backend.Executor implementation that executes orchestrator and activity functions in-memory.
func WithActivityInput ¶
func WithActivityInput(input any) callActivityOption
WithActivityInput configures an input for an activity invocation. The specified input must be JSON serializable.
func WithActivityRetryPolicy ¶
func WithActivityRetryPolicy(policy *RetryPolicy) callActivityOption
func WithRawActivityInput ¶
func WithRawActivityInput(input string) callActivityOption
WithRawActivityInput configures a raw input for an activity invocation.
func WithRawSubOrchestratorInput ¶
func WithRawSubOrchestratorInput(input string) subOrchestratorOption
WithRawSubOrchestratorInput is a functional option type for the CallSubOrchestrator orchestrator method that takes a raw input value.
func WithSubOrchestrationInstanceID ¶
func WithSubOrchestrationInstanceID(instanceID string) subOrchestratorOption
WithSubOrchestrationInstanceID is a functional option type for the CallSubOrchestrator orchestrator method that specifies the instance ID of the sub-orchestration.
func WithSubOrchestrationRetryPolicy ¶
func WithSubOrchestrationRetryPolicy(policy *RetryPolicy) subOrchestratorOption
func WithSubOrchestratorInput ¶
func WithSubOrchestratorInput(input any) subOrchestratorOption
WithSubOrchestratorInput is a functional option type for the CallSubOrchestrator orchestrator method that takes an input value and marshals it to JSON.
Types ¶
type Activity ¶
type Activity func(ctx ActivityContext) (any, error)
Activity is the functional interface for activity implementations.
type ActivityContext ¶
ActivityContext is the context parameter type for activity implementations.
type ContinueAsNewOption ¶
type ContinueAsNewOption func(*OrchestrationContext)
ContinueAsNewOption is a functional option type for the ContinueAsNew orchestrator method.
func WithKeepUnprocessedEvents ¶
func WithKeepUnprocessedEvents() ContinueAsNewOption
WithKeepUnprocessedEvents returns a ContinueAsNewOptions struct that instructs the runtime to carry forward any unprocessed external events to the new instance.
type OrchestrationContext ¶
type OrchestrationContext struct { ID api.InstanceID Name string IsReplaying bool CurrentTimeUtc time.Time // contains filtered or unexported fields }
OrchestrationContext is the parameter type for orchestrator functions.
func NewOrchestrationContext ¶
func NewOrchestrationContext(registry *TaskRegistry, id api.InstanceID, oldEvents []*protos.HistoryEvent, newEvents []*protos.HistoryEvent) *OrchestrationContext
NewOrchestrationContext returns a new OrchestrationContext struct with the specified parameters.
func (*OrchestrationContext) CallActivity ¶
func (ctx *OrchestrationContext) CallActivity(activity interface{}, opts ...callActivityOption) Task
CallActivity schedules an asynchronous invocation of an activity function. The [activity] parameter can be either the name of an activity as a string or can be a pointer to the function that implements the activity, in which case the name is obtained via reflection.
func (*OrchestrationContext) CallSubOrchestrator ¶
func (ctx *OrchestrationContext) CallSubOrchestrator(orchestrator interface{}, opts ...subOrchestratorOption) Task
func (*OrchestrationContext) ContinueAsNew ¶
func (ctx *OrchestrationContext) ContinueAsNew(newInput any, options ...ContinueAsNewOption)
func (*OrchestrationContext) CreateTimer ¶
func (ctx *OrchestrationContext) CreateTimer(delay time.Duration) Task
CreateTimer schedules a durable timer that expires after the specified delay.
func (*OrchestrationContext) GetInput ¶
func (octx *OrchestrationContext) GetInput(v any) error
GetInput unmarshals the serialized orchestration input and stores it in [v].
func (*OrchestrationContext) SetCustomStatus ¶
func (octx *OrchestrationContext) SetCustomStatus(cs string)
func (*OrchestrationContext) WaitForSingleEvent ¶
func (ctx *OrchestrationContext) WaitForSingleEvent(eventName string, timeout time.Duration) Task
WaitForSingleEvent creates a task that is completed only after an event named [eventName] is received by this orchestration or when the specified timeout expires.
The [timeout] parameter can be used to define a timeout for receiving the event. If the timeout expires before the named event is received, the task will be completed and will return a timeout error value ErrTaskCanceled when awaited. Otherwise, the awaited task will return the deserialized payload of the received event. A Duration value of zero returns a canceled task if the event isn't already available in the history. Use a negative Duration to wait indefinitely for the event to be received.
Orchestrators can wait for the same event name multiple times, so waiting for multiple events with the same name is allowed. Each event received by an orchestrator will complete just one task returned by this method.
Note that event names are case-insensitive.
type Orchestrator ¶
type Orchestrator func(ctx *OrchestrationContext) (any, error)
Orchestrator is the functional interface for orchestrator functions.
type RetryPolicy ¶
type RetryPolicy struct { // Max number of attempts to try the activity call, first execution inclusive MaxAttempts int // Timespan to wait for the first retry InitialRetryInterval time.Duration // Used to determine rate of increase of back-off BackoffCoefficient float64 // Max timespan to wait for a retry MaxRetryInterval time.Duration // Total timeout across all the retries performed RetryTimeout time.Duration // Optional function to control if retries should proceed Handle func(error) bool }
func (*RetryPolicy) Validate ¶
func (policy *RetryPolicy) Validate() error
type Task ¶
Task is an interface for asynchronous durable tasks. A task is conceptually similar to a future.
type TaskRegistry ¶
type TaskRegistry struct {
// contains filtered or unexported fields
}
TaskRegistry contains maps of names to corresponding orchestrator and activity functions.
func NewTaskRegistry ¶
func NewTaskRegistry() *TaskRegistry
NewTaskRegistry returns a new TaskRegistry struct.
func (*TaskRegistry) AddActivity ¶
func (r *TaskRegistry) AddActivity(a Activity) error
AddActivity adds an activity function to the registry. The name of the activity function is determined using reflection.
func (*TaskRegistry) AddActivityN ¶
func (r *TaskRegistry) AddActivityN(name string, a Activity) error
AddActivityN adds an activity function to the registry with a specified name.
func (*TaskRegistry) AddOrchestrator ¶
func (r *TaskRegistry) AddOrchestrator(o Orchestrator) error
AddOrchestrator adds an orchestrator function to the registry. The name of the orchestrator function is determined using reflection.
func (*TaskRegistry) AddOrchestratorN ¶
func (r *TaskRegistry) AddOrchestratorN(name string, o Orchestrator) error
AddOrchestratorN adds an orchestrator function to the registry with a specified name.