task

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 30, 2023 License: MIT Imports: 13 Imported by: 16

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrTaskBlocked = errors.New("the current task is blocked")

ErrTaskBlocked is not an error, but rather a control flow signal indicating that an orchestrator function has executed as far as it can and that it now needs to unload, dispatch any scheduled tasks, and commit its current execution progress to durable storage.

View Source
var ErrTaskCanceled = errors.New("the task was canceled") // CONSIDER: More specific info about the task

ErrTaskCanceled is used to indicate that a task was canceled. Tasks can be canceled, for example, when configured timeouts expire.

Functions

func NewTaskExecutor

func NewTaskExecutor(registry *TaskRegistry) backend.Executor

NewTaskExecutor returns a backend.Executor implementation that executes orchestrator and activity functions in-memory.

Types

type Activity

type Activity func(ctx ActivityContext) (any, error)

Activity is the functional interface for activity implementations.

type ActivityContext

type ActivityContext interface {
	GetInput(resultPtr any) error
	Context() context.Context
}

ActivityContext is the context parameter type for activity implementations.

type OrchestrationContext

type OrchestrationContext struct {
	ID             api.InstanceID
	Name           string
	IsReplaying    bool
	CurrentTimeUtc time.Time
	// contains filtered or unexported fields
}

OrchestrationContext is the parameter type for orchestrator functions.

func NewOrchestrationContext

func NewOrchestrationContext(registry *TaskRegistry, id api.InstanceID, oldEvents []*protos.HistoryEvent, newEvents []*protos.HistoryEvent) *OrchestrationContext

NewOrchestrationContext returns a new OrchestrationContext struct with the specified parameters.

func (*OrchestrationContext) CallActivity

func (ctx *OrchestrationContext) CallActivity(activity interface{}, input any) Task

CallActivity schedules an asynchronous invocation of an activity function. The [activity] parameter can be either the name of an activity as a string or can be a pointer to the function that implements the activity, in which case the name is obtained via reflection.

The [input] value must be marshalable to JSON.

func (*OrchestrationContext) ContinueAsNew

func (ctx *OrchestrationContext) ContinueAsNew(newInput any)

func (*OrchestrationContext) CreateTimer

func (ctx *OrchestrationContext) CreateTimer(delay time.Duration) Task

CreateTimer schedules a durable timer that expires after the specified delay.

func (*OrchestrationContext) GetInput

func (octx *OrchestrationContext) GetInput(v any) error

GetInput unmarshals the serialized orchestration input and stores it in [v].

func (*OrchestrationContext) WaitForSingleEvent

func (ctx *OrchestrationContext) WaitForSingleEvent(eventName string, timeout time.Duration) Task

WaitForSingleEvent creates a task that is completed only after an event named [eventName] is received by this orchestration or when the specified timeout expires.

The [timeout] parameter can be used to define a timeout for receiving the event. If the timeout expires before the named event is received, the task will be completed and will return a timeout error value ErrTaskCanceled when awaited. Otherwise, the awaited task will return the deserialized payload of the received event. A Duration value of zero or less results in no timeout.

Orchestrators can wait for the same event name multiple times, so waiting for multiple events with the same name is allowed. Each event received by an orchestrator will complete just one task returned by this method.

Note that event names are case-insensitive.

type Orchestrator

type Orchestrator func(ctx *OrchestrationContext) (any, error)

Orchestrator is the functional interface for orchestrator functions.

type Task

type Task interface {
	Await(v any) error
}

Task is an interface for asynchronous durable tasks. A task is conceptually similar to a future.

type TaskRegistry

type TaskRegistry struct {
	// contains filtered or unexported fields
}

TaskRegistry contains maps of names to corresponding orchestrator and activity functions.

func NewTaskRegistry

func NewTaskRegistry() *TaskRegistry

NewTaskRegistry returns a new TaskRegistry struct.

func (*TaskRegistry) AddActivity

func (r *TaskRegistry) AddActivity(a Activity) error

AddActivity adds an activity function to the registry. The name of the activity function is determined using reflection.

func (*TaskRegistry) AddActivityN

func (r *TaskRegistry) AddActivityN(name string, a Activity) error

AddActivityN adds an activity function to the registry with a specified name.

func (*TaskRegistry) AddOrchestrator

func (r *TaskRegistry) AddOrchestrator(o Orchestrator) error

AddOrchestrator adds an orchestrator function to the registry. The name of the orchestrator function is determined using reflection.

func (*TaskRegistry) AddOrchestratorN

func (r *TaskRegistry) AddOrchestratorN(name string, o Orchestrator) error

AddOrchestratorN adds an orchestrator function to the registry with a specified name.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL