Documentation ¶
Index ¶
- Variables
- func NewTaskExecutor(registry *TaskRegistry) backend.Executor
- type Activity
- type ActivityContext
- type OrchestrationContext
- func (ctx *OrchestrationContext) CallActivity(activity interface{}, input any) Task
- func (ctx *OrchestrationContext) ContinueAsNew(newInput any)
- func (ctx *OrchestrationContext) CreateTimer(delay time.Duration) Task
- func (octx *OrchestrationContext) GetInput(v any) error
- func (ctx *OrchestrationContext) WaitForSingleEvent(eventName string, timeout time.Duration) Task
- type Orchestrator
- type Task
- type TaskRegistry
Constants ¶
This section is empty.
Variables ¶
var ErrTaskBlocked = errors.New("the current task is blocked")
ErrTaskBlocked is not an error, but rather a control flow signal indicating that an orchestrator function has executed as far as it can and that it now needs to unload, dispatch any scheduled tasks, and commit its current execution progress to durable storage.
var ErrTaskCanceled = errors.New("the task was canceled") // CONSIDER: More specific info about the task
ErrTaskCanceled is used to indicate that a task was canceled. Tasks can be canceled, for example, when configured timeouts expire.
Functions ¶
func NewTaskExecutor ¶
func NewTaskExecutor(registry *TaskRegistry) backend.Executor
NewTaskExecutor returns a backend.Executor implementation that executes orchestrator and activity functions in-memory.
Types ¶
type Activity ¶
type Activity func(ctx ActivityContext) (any, error)
Activity is the functional interface for activity implementations.
type ActivityContext ¶
ActivityContext is the context parameter type for activity implementations.
type OrchestrationContext ¶
type OrchestrationContext struct { ID api.InstanceID Name string IsReplaying bool CurrentTimeUtc time.Time // contains filtered or unexported fields }
OrchestrationContext is the parameter type for orchestrator functions.
func NewOrchestrationContext ¶
func NewOrchestrationContext(registry *TaskRegistry, id api.InstanceID, oldEvents []*protos.HistoryEvent, newEvents []*protos.HistoryEvent) *OrchestrationContext
NewOrchestrationContext returns a new OrchestrationContext struct with the specified parameters.
func (*OrchestrationContext) CallActivity ¶
func (ctx *OrchestrationContext) CallActivity(activity interface{}, input any) Task
CallActivity schedules an asynchronous invocation of an activity function. The [activity] parameter can be either the name of an activity as a string or can be a pointer to the function that implements the activity, in which case the name is obtained via reflection.
The [input] value must be marshalable to JSON.
func (*OrchestrationContext) ContinueAsNew ¶
func (ctx *OrchestrationContext) ContinueAsNew(newInput any)
func (*OrchestrationContext) CreateTimer ¶
func (ctx *OrchestrationContext) CreateTimer(delay time.Duration) Task
CreateTimer schedules a durable timer that expires after the specified delay.
func (*OrchestrationContext) GetInput ¶
func (octx *OrchestrationContext) GetInput(v any) error
GetInput unmarshals the serialized orchestration input and stores it in [v].
func (*OrchestrationContext) WaitForSingleEvent ¶
func (ctx *OrchestrationContext) WaitForSingleEvent(eventName string, timeout time.Duration) Task
WaitForSingleEvent creates a task that is completed only after an event named [eventName] is received by this orchestration or when the specified timeout expires.
The [timeout] parameter can be used to define a timeout for receiving the event. If the timeout expires before the named event is received, the task will be completed and will return a timeout error value ErrTaskCanceled when awaited. Otherwise, the awaited task will return the deserialized payload of the received event. A Duration value of zero or less results in no timeout.
Orchestrators can wait for the same event name multiple times, so waiting for multiple events with the same name is allowed. Each event received by an orchestrator will complete just one task returned by this method.
Note that event names are case-insensitive.
type Orchestrator ¶
type Orchestrator func(ctx *OrchestrationContext) (any, error)
Orchestrator is the functional interface for orchestrator functions.
type Task ¶
Task is an interface for asynchronous durable tasks. A task is conceptually similar to a future.
type TaskRegistry ¶
type TaskRegistry struct {
// contains filtered or unexported fields
}
TaskRegistry contains maps of names to corresponding orchestrator and activity functions.
func NewTaskRegistry ¶
func NewTaskRegistry() *TaskRegistry
NewTaskRegistry returns a new TaskRegistry struct.
func (*TaskRegistry) AddActivity ¶
func (r *TaskRegistry) AddActivity(a Activity) error
AddActivity adds an activity function to the registry. The name of the activity function is determined using reflection.
func (*TaskRegistry) AddActivityN ¶
func (r *TaskRegistry) AddActivityN(name string, a Activity) error
AddActivityN adds an activity function to the registry with a specified name.
func (*TaskRegistry) AddOrchestrator ¶
func (r *TaskRegistry) AddOrchestrator(o Orchestrator) error
AddOrchestrator adds an orchestrator function to the registry. The name of the orchestrator function is determined using reflection.
func (*TaskRegistry) AddOrchestratorN ¶
func (r *TaskRegistry) AddOrchestratorN(name string, o Orchestrator) error
AddOrchestratorN adds an orchestrator function to the registry with a specified name.