task

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 28, 2024 License: Apache-2.0 Imports: 12 Imported by: 16

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrTaskBlocked = errors.New("the current task is blocked")

ErrTaskBlocked is not an error, but rather a control flow signal indicating that an orchestrator function has executed as far as it can and that it now needs to unload, dispatch any scheduled tasks, and commit its current execution progress to durable storage.

View Source
var ErrTaskCanceled = errors.New("the task was canceled") // CONSIDER: More specific info about the task

ErrTaskCanceled is used to indicate that a task was canceled. Tasks can be canceled, for example, when configured timeouts expire.

Functions

func NewTaskExecutor

func NewTaskExecutor(registry *TaskRegistry) backend.Executor

NewTaskExecutor returns a backend.Executor implementation that executes orchestrator and activity functions in-memory.

func WithActivityInput added in v0.3.0

func WithActivityInput(input any) callActivityOption

WithActivityInput configures an input for an activity invocation. The specified input must be JSON serializable.

func WithRawActivityInput added in v0.3.0

func WithRawActivityInput(input string) callActivityOption

WithRawActivityInput configures a raw input for an activity invocation.

func WithRawSubOrchestratorInput added in v0.3.0

func WithRawSubOrchestratorInput(input string) subOrchestratorOption

WithRawSubOrchestratorInput is a functional option type for the CallSubOrchestrator orchestrator method that takes a raw input value.

func WithSubOrchestrationInstanceID added in v0.3.0

func WithSubOrchestrationInstanceID(instanceID string) subOrchestratorOption

WithSubOrchestrationInstanceID is a functional option type for the CallSubOrchestrator orchestrator method that specifies the instance ID of the sub-orchestration.

func WithSubOrchestratorInput added in v0.3.0

func WithSubOrchestratorInput(input any) subOrchestratorOption

WithSubOrchestratorInput is a functional option type for the CallSubOrchestrator orchestrator method that takes an input value and marshals it to JSON.

Types

type Activity

type Activity func(ctx ActivityContext) (any, error)

Activity is the functional interface for activity implementations.

type ActivityContext

type ActivityContext interface {
	GetInput(resultPtr any) error
	Context() context.Context
}

ActivityContext is the context parameter type for activity implementations.

type ContinueAsNewOption added in v0.2.1

type ContinueAsNewOption func(*OrchestrationContext)

ContinueAsNewOption is a functional option type for the ContinueAsNew orchestrator method.

func WithKeepUnprocessedEvents added in v0.2.1

func WithKeepUnprocessedEvents() ContinueAsNewOption

WithKeepUnprocessedEvents returns a ContinueAsNewOptions struct that instructs the runtime to carry forward any unprocessed external events to the new instance.

type OrchestrationContext

type OrchestrationContext struct {
	ID             api.InstanceID
	Name           string
	IsReplaying    bool
	CurrentTimeUtc time.Time
	// contains filtered or unexported fields
}

OrchestrationContext is the parameter type for orchestrator functions.

func NewOrchestrationContext

func NewOrchestrationContext(registry *TaskRegistry, id api.InstanceID, oldEvents []*protos.HistoryEvent, newEvents []*protos.HistoryEvent) *OrchestrationContext

NewOrchestrationContext returns a new OrchestrationContext struct with the specified parameters.

func (*OrchestrationContext) CallActivity

func (ctx *OrchestrationContext) CallActivity(activity interface{}, opts ...callActivityOption) Task

CallActivity schedules an asynchronous invocation of an activity function. The [activity] parameter can be either the name of an activity as a string or can be a pointer to the function that implements the activity, in which case the name is obtained via reflection.

func (*OrchestrationContext) CallSubOrchestrator added in v0.3.0

func (ctx *OrchestrationContext) CallSubOrchestrator(orchestrator interface{}, opts ...subOrchestratorOption) Task

func (*OrchestrationContext) ContinueAsNew

func (ctx *OrchestrationContext) ContinueAsNew(newInput any, options ...ContinueAsNewOption)

func (*OrchestrationContext) CreateTimer

func (ctx *OrchestrationContext) CreateTimer(delay time.Duration) Task

CreateTimer schedules a durable timer that expires after the specified delay.

func (*OrchestrationContext) GetInput

func (octx *OrchestrationContext) GetInput(v any) error

GetInput unmarshals the serialized orchestration input and stores it in [v].

func (*OrchestrationContext) WaitForSingleEvent

func (ctx *OrchestrationContext) WaitForSingleEvent(eventName string, timeout time.Duration) Task

WaitForSingleEvent creates a task that is completed only after an event named [eventName] is received by this orchestration or when the specified timeout expires.

The [timeout] parameter can be used to define a timeout for receiving the event. If the timeout expires before the named event is received, the task will be completed and will return a timeout error value ErrTaskCanceled when awaited. Otherwise, the awaited task will return the deserialized payload of the received event. A Duration value of zero returns a canceled task if the event isn't already available in the history. Use a negative Duration to wait indefinitely for the event to be received.

Orchestrators can wait for the same event name multiple times, so waiting for multiple events with the same name is allowed. Each event received by an orchestrator will complete just one task returned by this method.

Note that event names are case-insensitive.

type Orchestrator

type Orchestrator func(ctx *OrchestrationContext) (any, error)

Orchestrator is the functional interface for orchestrator functions.

type Task

type Task interface {
	Await(v any) error
}

Task is an interface for asynchronous durable tasks. A task is conceptually similar to a future.

type TaskRegistry

type TaskRegistry struct {
	// contains filtered or unexported fields
}

TaskRegistry contains maps of names to corresponding orchestrator and activity functions.

func NewTaskRegistry

func NewTaskRegistry() *TaskRegistry

NewTaskRegistry returns a new TaskRegistry struct.

func (*TaskRegistry) AddActivity

func (r *TaskRegistry) AddActivity(a Activity) error

AddActivity adds an activity function to the registry. The name of the activity function is determined using reflection.

func (*TaskRegistry) AddActivityN

func (r *TaskRegistry) AddActivityN(name string, a Activity) error

AddActivityN adds an activity function to the registry with a specified name.

func (*TaskRegistry) AddOrchestrator

func (r *TaskRegistry) AddOrchestrator(o Orchestrator) error

AddOrchestrator adds an orchestrator function to the registry. The name of the orchestrator function is determined using reflection.

func (*TaskRegistry) AddOrchestratorN

func (r *TaskRegistry) AddOrchestratorN(name string, o Orchestrator) error

AddOrchestratorN adds an orchestrator function to the registry with a specified name.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL