task

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 5, 2024 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrTaskBlocked = errors.New("the current task is blocked")

ErrTaskBlocked is not an error, but rather a control flow signal indicating that an orchestrator function has executed as far as it can and that it now needs to unload, dispatch any scheduled tasks, and commit its current execution progress to durable storage.

View Source
var ErrTaskCanceled = errors.New("the task was canceled") // CONSIDER: More specific info about the task

ErrTaskCanceled is used to indicate that a task was canceled. Tasks can be canceled, for example, when configured timeouts expire.

Functions

func NewTaskExecutor

func NewTaskExecutor(registry *TaskRegistry) backend.Executor

NewTaskExecutor returns a backend.Executor implementation that executes orchestrator and activity functions in-memory.

func WithActivityInput

func WithActivityInput(input any) callActivityOption

WithActivityInput configures an input for an activity invocation. The specified input must be JSON serializable.

func WithActivityRetryPolicy

func WithActivityRetryPolicy(policy *RetryPolicy) callActivityOption

func WithRawActivityInput

func WithRawActivityInput(input string) callActivityOption

WithRawActivityInput configures a raw input for an activity invocation.

func WithRawSubOrchestratorInput

func WithRawSubOrchestratorInput(input string) subOrchestratorOption

WithRawSubOrchestratorInput is a functional option type for the CallSubOrchestrator orchestrator method that takes a raw input value.

func WithSubOrchestrationInstanceID

func WithSubOrchestrationInstanceID(instanceID string) subOrchestratorOption

WithSubOrchestrationInstanceID is a functional option type for the CallSubOrchestrator orchestrator method that specifies the instance ID of the sub-orchestration.

func WithSubOrchestrationRetryPolicy

func WithSubOrchestrationRetryPolicy(policy *RetryPolicy) subOrchestratorOption

func WithSubOrchestratorInput

func WithSubOrchestratorInput(input any) subOrchestratorOption

WithSubOrchestratorInput is a functional option type for the CallSubOrchestrator orchestrator method that takes an input value and marshals it to JSON.

Types

type Activity

type Activity func(ctx ActivityContext) (any, error)

Activity is the functional interface for activity implementations.

type ActivityContext

type ActivityContext interface {
	GetInput(resultPtr any) error
	Context() context.Context
}

ActivityContext is the context parameter type for activity implementations.

type ContinueAsNewOption

type ContinueAsNewOption func(*OrchestrationContext)

ContinueAsNewOption is a functional option type for the ContinueAsNew orchestrator method.

func WithKeepUnprocessedEvents

func WithKeepUnprocessedEvents() ContinueAsNewOption

WithKeepUnprocessedEvents returns a ContinueAsNewOptions struct that instructs the runtime to carry forward any unprocessed external events to the new instance.

type OrchestrationContext

type OrchestrationContext struct {
	ID             api.InstanceID
	Name           string
	IsReplaying    bool
	CurrentTimeUtc time.Time
	// contains filtered or unexported fields
}

OrchestrationContext is the parameter type for orchestrator functions.

func NewOrchestrationContext

func NewOrchestrationContext(registry *TaskRegistry, id api.InstanceID, oldEvents []*protos.HistoryEvent, newEvents []*protos.HistoryEvent) *OrchestrationContext

NewOrchestrationContext returns a new OrchestrationContext struct with the specified parameters.

func (*OrchestrationContext) CallActivity

func (ctx *OrchestrationContext) CallActivity(activity interface{}, opts ...callActivityOption) Task

CallActivity schedules an asynchronous invocation of an activity function. The [activity] parameter can be either the name of an activity as a string or can be a pointer to the function that implements the activity, in which case the name is obtained via reflection.

func (*OrchestrationContext) CallSubOrchestrator

func (ctx *OrchestrationContext) CallSubOrchestrator(orchestrator interface{}, opts ...subOrchestratorOption) Task

func (*OrchestrationContext) ContinueAsNew

func (ctx *OrchestrationContext) ContinueAsNew(newInput any, options ...ContinueAsNewOption)

func (*OrchestrationContext) CreateTimer

func (ctx *OrchestrationContext) CreateTimer(delay time.Duration) Task

CreateTimer schedules a durable timer that expires after the specified delay.

func (*OrchestrationContext) GetInput

func (octx *OrchestrationContext) GetInput(v any) error

GetInput unmarshals the serialized orchestration input and stores it in [v].

func (*OrchestrationContext) SetCustomStatus

func (octx *OrchestrationContext) SetCustomStatus(cs string)

func (*OrchestrationContext) WaitForSingleEvent

func (ctx *OrchestrationContext) WaitForSingleEvent(eventName string, timeout time.Duration) Task

WaitForSingleEvent creates a task that is completed only after an event named [eventName] is received by this orchestration or when the specified timeout expires.

The [timeout] parameter can be used to define a timeout for receiving the event. If the timeout expires before the named event is received, the task will be completed and will return a timeout error value ErrTaskCanceled when awaited. Otherwise, the awaited task will return the deserialized payload of the received event. A Duration value of zero returns a canceled task if the event isn't already available in the history. Use a negative Duration to wait indefinitely for the event to be received.

Orchestrators can wait for the same event name multiple times, so waiting for multiple events with the same name is allowed. Each event received by an orchestrator will complete just one task returned by this method.

Note that event names are case-insensitive.

type Orchestrator

type Orchestrator func(ctx *OrchestrationContext) (any, error)

Orchestrator is the functional interface for orchestrator functions.

type RetryPolicy

type RetryPolicy struct {
	// Max number of attempts to try the activity call, first execution inclusive
	MaxAttempts int
	// Timespan to wait for the first retry
	InitialRetryInterval time.Duration
	// Used to determine rate of increase of back-off
	BackoffCoefficient float64
	// Max timespan to wait for a retry
	MaxRetryInterval time.Duration
	// Total timeout across all the retries performed
	RetryTimeout time.Duration
	// Optional function to control if retries should proceed
	Handle func(error) bool
}

func (*RetryPolicy) Validate

func (policy *RetryPolicy) Validate() error

type Task

type Task interface {
	Await(v any) error
}

Task is an interface for asynchronous durable tasks. A task is conceptually similar to a future.

type TaskRegistry

type TaskRegistry struct {
	// contains filtered or unexported fields
}

TaskRegistry contains maps of names to corresponding orchestrator and activity functions.

func NewTaskRegistry

func NewTaskRegistry() *TaskRegistry

NewTaskRegistry returns a new TaskRegistry struct.

func (*TaskRegistry) AddActivity

func (r *TaskRegistry) AddActivity(a Activity) error

AddActivity adds an activity function to the registry. The name of the activity function is determined using reflection.

func (*TaskRegistry) AddActivityN

func (r *TaskRegistry) AddActivityN(name string, a Activity) error

AddActivityN adds an activity function to the registry with a specified name.

func (*TaskRegistry) AddOrchestrator

func (r *TaskRegistry) AddOrchestrator(o Orchestrator) error

AddOrchestrator adds an orchestrator function to the registry. The name of the orchestrator function is determined using reflection.

func (*TaskRegistry) AddOrchestratorN

func (r *TaskRegistry) AddOrchestratorN(name string, o Orchestrator) error

AddOrchestratorN adds an orchestrator function to the registry with a specified name.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL