workflow

package
v0.19.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 27, 2024 License: MIT Imports: 20 Imported by: 29

Documentation

Index

Constants

View Source
const (
	QueueDefault = core.QueueDefault
)

Variables

View Source
var (
	DefaultSubWorkflowRetryOptions = RetryOptions{

		MaxAttempts: 1,
	}

	DefaultSubWorkflowOptions = SubWorkflowOptions{
		RetryOptions: DefaultSubWorkflowRetryOptions,
	}
)
View Source
var Canceled = sync.Canceled
View Source
var DefaultActivityOptions = ActivityOptions{
	RetryOptions: DefaultRetryOptions,
}
View Source
var DefaultRetryOptions = RetryOptions{
	MaxAttempts:        3,
	BackoffCoefficient: 1,
}

Functions

func CanRetry added in v0.16.0

func CanRetry(err error) bool

CanRetry returns true if the given error is retryable

func ContinueAsNew added in v0.15.0

func ContinueAsNew(ctx Context, args ...any) error

ContinueAsNew restarts the current workflow with the given arguments.

func Go

func Go(ctx Context, f func(ctx Context))

Go spawns a workflow "goroutine".

func Logger added in v0.0.9

func Logger(ctx Context) *slog.Logger

Logger returns the logger for the current workflow.

func NewError added in v0.16.0

func NewError(err error) error

NewError wraps the given error into a workflow error which will be automatically retried

func NewPermanentError added in v0.16.0

func NewPermanentError(err error) error

NewPermanentError wraps the given error into a workflow error which will not be automatically retried

func Now

func Now(ctx Context) time.Time

Now returns the current time.

func Replaying

func Replaying(ctx Context) bool

Replaying returns true if the current workflow execution is replaying or not.

func Select

func Select(ctx Context, cases ...SelectCase)

Select is the workflow-save equivalent of the select statement.

func Sleep

func Sleep(ctx Context, d time.Duration) error

Sleep sleeps for the given duration.

func WithCancel

func WithCancel(parent Context) (ctx Context, cancel CancelFunc)

WithCancel returns a copy of parent with a new Done channel. The returned context's Done channel is closed when the returned cancel function is called or when the parent context's Done channel is closed, whichever happens first.

Canceling this context releases resources associated with it, so code should call cancel as soon as the operations running in this Context complete.

func WithCancelCause added in v0.19.0

func WithCancelCause(parent Context) (ctx Context, cancel CancelCauseFunc)

WithCancelCause behaves like WithCancel but returns a CancelCauseFunc instead of a CancelFunc. Calling cancel with a non-nil error (the "cause") records that error in ctx; it can then be retrieved using Cause(ctx). Calling cancel with nil sets the cause to Canceled.

Example use:

ctx, cancel := context.WithCancelCause(parent)
cancel(myError)
ctx.Err() // returns context.Canceled
context.Cause(ctx) // returns myError

func WithTimerName added in v0.19.0

func WithTimerName(name string) timerOption

Types

type Activity

type Activity = interface{}

type ActivityOptions

type ActivityOptions struct {
	// Queue defines the activity queue this activity will be delivered to. By default, this inherits
	// the queue from the workflow instance.
	Queue core.Queue

	// RetryOptions defines how to retry the activity in case of failure.
	RetryOptions RetryOptions
}

type CancelCauseFunc added in v0.19.0

type CancelCauseFunc = sync.CancelCauseFunc

A CancelCauseFunc behaves like a CancelFunc but additionally sets the cancellation cause. This cause can be retrieved by calling [Cause] on the canceled Context or on any of its derived Contexts.

If the context has already been canceled, CancelCauseFunc does not set the cause. For example, if childContext is derived from parentContext:

  • if parentContext is canceled with cause1 before childContext is canceled with cause2, then Cause(parentContext) == Cause(childContext) == cause1
  • if childContext is canceled with cause2 before parentContext is canceled with cause1, then Cause(parentContext) == cause1 and Cause(childContext) == cause2

type CancelFunc

type CancelFunc = sync.CancelFunc

type Channel

type Channel[T any] interface {
	// Send sends a value to the channel. If the channel is closed, this will panic.
	Send(ctx Context, v T)

	// SendNonblocking sends a value to the channel. This call is non-blocking and will return whether the
	// value could be sent. If the channel is closed, this will panic
	SendNonblocking(v T) (ok bool)

	// Receive receives a value from the channel.
	Receive(ctx Context) (v T, ok bool)

	// ReceiveNonBlocking tries to receives a value from the channel. This call is non-blocking and will return
	// whether the value could be returned.
	ReceiveNonBlocking() (v T, ok bool)

	// Close closes the channel. This will cause all future send operations to panic.
	Close()

	// Len returns the number of elements currently in the channel.
	Len() int
}

func NewBufferedChannel

func NewBufferedChannel[T any](size int) Channel[T]

NewBufferedChannel creates a new buffered channel with the given size.

func NewChannel

func NewChannel[T any]() Channel[T]

NewChannel creates a new channel.

func NewSignalChannel

func NewSignalChannel[T any](ctx Context, name string) Channel[T]

NewSignalChannel returns a new signal channel.

type Context

type Context = sync.Context

func NewDisconnectedContext

func NewDisconnectedContext(ctx Context) Context

NewDisconnectedContext creates a new context that is disconnected from any parent context.

func WithValue added in v0.13.0

func WithValue(parent Context, key, val interface{}) Context

WithValue returns a copy of parent in which the value associated with key is val.

Use context Values only for request-scoped data that transits processes and APIs, not for passing optional parameters to functions.

The provided key must be comparable and should not be of type string or any other built-in type to avoid collisions between packages using context. Users of WithValue should define their own types for keys. To avoid allocating when assigning to an interface{}, context keys often have concrete type struct{}. Alternatively, exported context key variables' static type should be a pointer or interface.

type ContextPropagator added in v0.14.0

type ContextPropagator interface {
	// Inject injects values from context into metadata
	Inject(context.Context, *Metadata) error

	// Extract extracts values from metadata into context
	Extract(context.Context, *Metadata) (context.Context, error)

	// InjectFromWorkflow injects values from the workflow context into metadata
	InjectFromWorkflow(Context, *Metadata) error

	// ExtractToWorkflow extracts values from metadata into a workflow context
	ExtractToWorkflow(Context, *Metadata) (Context, error)
}

type Error added in v0.16.0

type Error = workflowerrors.Error

type Future

type Future[T any] interface {
	// Get returns the value if set, blocks otherwise
	Get(ctx Context) (T, error)
}

func CreateSubWorkflowInstance

func CreateSubWorkflowInstance[TResult any](ctx Context, options SubWorkflowOptions, workflow Workflow, args ...any) Future[TResult]

CreateSubWorkflowInstance creates a new sub-workflow instance of the given workflow.

func ExecuteActivity

func ExecuteActivity[TResult any](ctx Context, options ActivityOptions, activity Activity, args ...any) Future[TResult]

ExecuteActivity schedules the given activity to be executed

func ScheduleTimer

func ScheduleTimer(ctx Context, delay time.Duration, opts ...timerOption) Future[any]

ScheduleTimer schedules a timer to fire after the given delay.

func SideEffect

func SideEffect[TResult any](ctx Context, f func(ctx Context) TResult) Future[TResult]

SideEffect executes the given function and returns a future that will be resolved with the result of the function.

In contrast to Activities, SideEffects are executed inline with the workflow code. They should only be used for short and inexpensive operations. For longer operations, consider using an Activity.

func SignalWorkflow added in v0.5.0

func SignalWorkflow[T any](ctx Context, instanceID string, name string, arg T) Future[any]

SignalWorkflow sends a signal to another running workflow instance.

func WithRetries added in v0.16.0

func WithRetries[T any](ctx Context, retryOptions RetryOptions, fn func(ctx Context, attempt int) Future[T]) Future[T]

WithRetries executes the given function with retries.

type Instance

type Instance = core.WorkflowInstance

Instance represents a workflow instance.

func WorkflowInstance added in v0.0.3

func WorkflowInstance(ctx Context) *Instance

WorkflowInstance returns the current workflow instance.

type Metadata added in v0.4.0

type Metadata = metadata.WorkflowMetadata

Metadata represents the metadata of a workflow instance.

type PanicError added in v0.16.0

type PanicError = workflowerrors.PanicError

type Queue added in v0.19.0

type Queue = core.Queue

Queue represents a queue for workflow instances and activities.

type RetryOptions

type RetryOptions struct {
	// Maximum number of times to retry
	MaxAttempts int

	// Time to wait before first retry
	FirstRetryInterval time.Duration

	// Maximum delay for any individual retry attempt
	MaxRetryInterval time.Duration

	// Coeffecient for calculation the next retry delay
	BackoffCoefficient float64

	// Timeout after which retries are aborted
	RetryTimeout time.Duration
}

type SelectCase

type SelectCase = sync.SelectCase

func Await

func Await[T any](f Future[T], handler func(Context, Future[T])) SelectCase

Await calls the provided handler when the given future is ready.

func Default

func Default(handler func(Context)) SelectCase

Default calls the provided handler if none of the other cases match.

func Receive

func Receive[T any](c Channel[T], handler func(ctx Context, v T, ok bool)) SelectCase

Receive calls the provided handler if the given channel can receive a value. The handler receives the received value, and the ok flag indicating whether the value was received or the channel was closed.

func Send added in v0.5.1

func Send[T any](c Channel[T], value *T, handler func(ctx Context)) SelectCase

Send calls the provided handler if the given value can be sent to the channel.

type Span added in v0.4.1

type Span interface {
	// End ends the span.
	End()
}

type SubWorkflowOptions

type SubWorkflowOptions struct {
	InstanceID string

	// Queue to use for the sub-workflow, if not set, the queue of the calling workflow will be used.
	Queue Queue

	RetryOptions RetryOptions
}

type WaitGroup added in v0.0.3

type WaitGroup = sync.WaitGroup

func NewWaitGroup added in v0.0.3

func NewWaitGroup() WaitGroup

NewWaitGroup creates a new WaitGroup instance.

type Workflow

type Workflow = interface{}

type WorkflowTracer added in v0.4.1

type WorkflowTracer struct {
}

func Tracer added in v0.4.1

func Tracer(ctx Context) *WorkflowTracer

Tracer creates a the workflow tracer.

func (*WorkflowTracer) Start added in v0.4.1

func (wt *WorkflowTracer) Start(ctx Context, name string, opts ...trace.SpanStartOption) (Context, Span)

Start starts a new span.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL