queues

package
v0.6.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 3, 2024 License: MIT Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrClientNil is returned when the temporal client is nil.
	ErrClientNil = errors.New("client is nil")

	ErrWorkerNil = errors.New("worker is nil")

	// ErrChildWorkflowExecutionAttempt is returned when attempting to execute a child workflow without the parent.
	ErrChildWorkflowExecutionAttempt = errors.New("attempting to execute child workflow directly. use ExecuteWorkflow instead")

	// ErrExternalWorkflowSignalAttempt is returned when attempting to signal an external workflow from within a workflow.
	ErrExternalWorkflowSignalAttempt = errors.New("attempting to signal external workflow directly. use SignalExternalWorkflow instead")
)

Functions

func DefaultPrefix

func DefaultPrefix() string

DefaultPrefix gets the default prefix.

func NewWorkerOptions

func NewWorkerOptions(opts ...WorkerOption) worker.Options

NewWorkerOptions creates a new worker.Options struct with the given options applied.

Example usage:
workerOptions := NewWorkerOptions(
	WithWorkerOptionOnFatalError(func(err error) {
		// Handle fatal error
	}),
	WithWorkerOptionDisableEagerActivities(true),
	WithWorkerOptionMaxConcurrentEagerActivityExecutionSize(10),
	WithWorkerOptionDisableRegistrationAliasing(false),
	WithWorkerOptionBuildID("my-build-id"),
	WithWorkerOptionUseBuildIDForVersioning(true),
)

func SetDefaultPrefix

func SetDefaultPrefix(val string)

SetDefaultPrefix sets the default prefix.

Types

type ChildWorkflowFuture

type ChildWorkflowFuture workflow.ChildWorkflowFuture

aliases for types that are used from other packages.

type Name

type Name string

Name is the name of the queue.

func (Name) String

func (q Name) String() string

type Query added in v0.5.0

type Query string

Query is a string alias intended for defining groups of workflow queries. We could have created an alias for for Signal type, but for some wierd reason, if was causing temporal to panic when marshalling the type to JSON.

func (Query) MarshalJSON added in v0.5.0

func (q Query) MarshalJSON() ([]byte, error)

func (Query) String added in v0.5.0

func (q Query) String() string

func (*Query) UnmarshalJSON added in v0.5.0

func (q *Query) UnmarshalJSON(data []byte) error

type Queue

type Queue interface {
	// Name gets the name of the queue as string.
	Name() Name

	String() string

	// Prefix gets the prefix of the queue as string.
	Prefix() string

	// WorkflowID sanitzes the workflow ID given the workflows.Options.
	WorkflowID(opts wrk.Options) string

	// ExecuteWorkflow executes a workflow given the context, workflows.Options, workflow function or function name, and
	// optional payload. Lets say, we have a queue called "default", we can either pass in the workflow function or the
	// function name.
	//
	//  q := queues.New(queues.WithName("default"), queues.WithClient(client))
	//  q.ExecuteWorkflow(
	//    ctx,
	//    workflows.NewOptions(
	//      workflows.WithBlock("healthz"),
	//      workflows.WithBlockID(uuid.New().String()),
	//    ),
	//    WorkflowFn, // or "WorkflowFunctionName"
	//    payload...,    // optional.
	//  )
	ExecuteWorkflow(ctx context.Context, opts wrk.Options, fn any, payload ...any) (WorkflowRun, error)

	// ExecuteChildWorkflow executes a child workflow given the parent workflow context, workflows.Options, workflow
	// function or function name and optional payload. It must be executed from within a workflow.
	//
	//  future, err := q.ExecuteChildWorkflow(
	//    ctx,
	//    workflows.NewOptions(
	//      workflows.WithParent(ctx), // This is important. It tells the queue that this is a child workflow.
	//      workflows.WithBlock("healthz"),
	//      workflows.WithBlockID(uuid.New().String()),
	//    ),
	//    WorkflowFn,    // or "WorkflowFunctionName"
	//    payload...,    // optional.
	//  )
	ExecuteChildWorkflow(ctx workflow.Context, opts wrk.Options, fn any, payload ...any) (ChildWorkflowFuture, error)

	// SignalWorkflow signals a workflow given the workflow ID, signal name and optional payload.
	//
	//  if err := q.SignalWorkflow(
	//    ctx,
	//    workflows.NewOptions(
	//      workflows.WithParent(ctx), // This is important. It tells the queue that this is a child workflow.
	//      workflows.WithBlock("healthz"),
	//      workflows.WithBlockID(uuid.New().String()),
	//    ),
	//    "signal-name",
	//    payload,    // or nil
	//  ); err != nil {
	//    // handle error
	//  }
	SignalWorkflow(ctx context.Context, opts wrk.Options, signal Signal, payload any) error

	// SignalWithStartWorkflow signals a workflow given the workflow ID, signal name and optional payload.
	//
	//  run, err := q.SignalWithStartWorkflow(
	//    ctx,
	//    workflows.NewOptions(
	//      workflows.WithParent(ctx), // This is important. It tells the queue that this is a child workflow.
	//      workflows.WithBlock("healthz"),
	//      workflows.WithBlockID(uuid.New().String()),
	//    ),
	//    "signal-name",
	//    arg,    // or nil
	//    WorkflowFn, // or "WorkflowFunctionName"
	//    payload..., // optional.
	//  )
	SignalWithStartWorkflow(ctx context.Context, opts wrk.Options, signal Signal, args any, fn any, payload ...any) (WorkflowRun, error)

	// SignalExternalWorkflow signals a workflow given the workflow ID, signal name and optional payload.
	//
	//  future, err := q.SignalExternalWorkflow(
	//    ctx,
	//    workflows.NewOptions(
	//      workflows.WithParent(ctx), // This is important. It tells the queue that this is a child workflow.
	//      workflows.WithBlock("healthz"),
	//      workflows.WithBlockID(uuid.New().String()),
	//    ),
	//    "signal-name",
	//    payload,    // or nil
	//  )
	SignalExternalWorkflow(ctx workflow.Context, opts wrk.Options, signal Signal, args any) (WorkflowFuture, error)

	// QueryWorkflow queries a workflow given the workflow opts, query name and optional arguments.
	//
	//  result, err := q.QueryWorkflow(
	//    ctx,
	//    workflows.NewOptions(
	//      workflows.WithWorkflowID("my-workflow-id"),
	//    ),
	//    Signal("query-name"),
	//    arg1, arg2, // Optional arguments passed to the query function.
	//  )
	//
	//  if err != nil {
	//    // handle error
	//  }
	//  // Decode the result.
	QueryWorkflow(ctx context.Context, opts wrk.Options, query Query, args ...any) (converter.EncodedValue, error)

	// CreateWorker configures the worker for the queue.
	//
	// This function configures the worker responsible for executing registered workflows and activities.  It uses a
	// builder pattern, with helper functions prefixed by queues.WithWorkerOption{Option}, where {Option} corresponds to
	// a field in Temporal's worker.Option. This allows configuring worker behavior at runtime, such as setting maximum
	// concurrent tasks, enabling sessions etc.  The worker is a singleton, meaning only one worker can be created per
	// queue. Call this function *before* registering workflows and activities (queues.RegisterWorkflow and
	// queues.RegisterActivity) to ensure correct association.
	//
	//	q := queues.New(queues.WithName("my-queue"), queues.WithClient(client))
	//	q.CreateWorker(
	//		queues.WithWorkerOptionEnableSessionWorker(true), // Enable session worker.
	//	)
	CreateWorker(opts ...WorkerOption)

	// Start starts the worker against the queue. CreateWorker must be called before calling this function.
	Start(context.Context) error

	Stop(context.Context) error

	// Shutdown shuts down the worker against the queue.
	Shutdown(context.Context) error

	// RegisterWorkflow registers a workflow against the queue. It is a wrapper around the worker.RegisterWorkflow.
	RegisterWorkflow(any)

	// RegisterActivity registers an activity against the queue. It is wrapper around the worker.RegisterActivity.
	RegisterActivity(any)
}

Queue defines the queue interface.

func New

func New(opts ...QueueOption) Queue

New creates a new queue with the given opts. For a queue named "default", we will defined it as follows:

var DefaultQueue = queue.New(
  queue.WithName("default"),
  queue.WithClient(client),
  queue.WithMaxWorkflowAttempts(1),
)

type QueueOption

type QueueOption func(Queue)

QueueOption is the option for a queue.

func WithClient

func WithClient(c client.Client) QueueOption

WithClient sets the client for the queue.

func WithName

func WithName(name string) QueueOption

WithName sets the queue name and the prefix for the workflow ID.

func WithWorkflowMaxAttempts

func WithWorkflowMaxAttempts(attempts int32) QueueOption

WithWorkflowMaxAttempts sets the maximum number of attempts for all the workflows in the queue. The default value is 0 i.e. RetryForever.

type Queues

type Queues map[Name]Queue

Queues is a map of queues.

type Signal added in v0.5.0

type Signal string

Signal is a string alias intended for defining groups of workflow signals, "register" , "send_welcome_email" etc. It ensures consistency and code clarity. The Signal type provides methods for conversion and serialization, promoting good developer experience.

func (Signal) MarshalJSON added in v0.5.0

func (s Signal) MarshalJSON() ([]byte, error)

func (Signal) String added in v0.5.0

func (s Signal) String() string

func (*Signal) UnmarshalJSON added in v0.5.0

func (s *Signal) UnmarshalJSON(data []byte) error

type WorkerOption

type WorkerOption func(*worker.Options)

WorkerOption is a function that configures a worker.Options struct.

func WithWorkerOptionBackgroundActivityContext

func WithWorkerOptionBackgroundActivityContext(ctx context.Context) WorkerOption

WithWorkerOptionBackgroundActivityContext sets the root context for all activities.

func WithWorkerOptionBuildID

func WithWorkerOptionBuildID(buildID string) WorkerOption

WithWorkerOptionBuildID assigns a BuildID to this worker. NOTE: Experimental.

func WithWorkerOptionDeadlockDetectionTimeout

func WithWorkerOptionDeadlockDetectionTimeout(timeout time.Duration) WorkerOption

WithWorkerOptionDeadlockDetectionTimeout sets the maximum amount of time that a workflow task will be allowed to run. The default value is 1 second.

func WithWorkerOptionDefaultHeartbeatThrottleInterval

func WithWorkerOptionDefaultHeartbeatThrottleInterval(interval time.Duration) WorkerOption

WithWorkerOptionDefaultHeartbeatThrottleInterval sets the default amount of time between sending each pending heartbeat to the server. The default value is 30 seconds.

func WithWorkerOptionDisableEagerActivities

func WithWorkerOptionDisableEagerActivities(disable bool) WorkerOption

WithWorkerOptionDisableEagerActivities disables eager activities. The default value is false.

func WithWorkerOptionDisableRegistrationAliasing

func WithWorkerOptionDisableRegistrationAliasing(disable bool) WorkerOption

WithWorkerOptionDisableRegistrationAliasing disables allowing workflow and activity functions registered with custom names from being called with their function references. The default value is false.

func WithWorkerOptionDisableWorkflowWorker

func WithWorkerOptionDisableWorkflowWorker(disable bool) WorkerOption

WithWorkerOptionDisableWorkflowWorker disables the workflow worker for this worker. The default value is false.

func WithWorkerOptionEnableLoggingInReplay

func WithWorkerOptionEnableLoggingInReplay(enable bool) WorkerOption

WithWorkerOptionEnableLoggingInReplay enables logging in replay mode. This is only useful for debugging purposes. The default value is false.

func WithWorkerOptionEnableSessionWorker

func WithWorkerOptionEnableSessionWorker(enable bool) WorkerOption

WithWorkerOptionEnableSessionWorker enables running session workers for activities within a session. The default value is false.

func WithWorkerOptionIdentity

func WithWorkerOptionIdentity(identity string) WorkerOption

WithWorkerOptionIdentity sets the identity for the worker, overwriting the client-level Identity value.

func WithWorkerOptionInterceptors

func WithWorkerOptionInterceptors(interceptors []interceptor.WorkerInterceptor) WorkerOption

WithWorkerOptionInterceptors sets the interceptors to apply to the worker.

func WithWorkerOptionLocalActivityWorkerOnly

func WithWorkerOptionLocalActivityWorkerOnly(localOnly bool) WorkerOption

WithWorkerOptionLocalActivityWorkerOnly sets the worker to only handle workflow tasks and local activities. The default value is false.

func WithWorkerOptionMaxConcurrentActivityExecutionSize

func WithWorkerOptionMaxConcurrentActivityExecutionSize(size int) WorkerOption

WithWorkerOptionMaxConcurrentActivityExecutionSize sets the maximum concurrent activity executions this worker can have. The zero value of this uses the default value (1000).

func WithWorkerOptionMaxConcurrentActivityTaskPollers

func WithWorkerOptionMaxConcurrentActivityTaskPollers(count int) WorkerOption

WithWorkerOptionMaxConcurrentActivityTaskPollers sets the maximum number of goroutines that will concurrently poll the temporal-server to retrieve activity tasks. The default value is 2.

func WithWorkerOptionMaxConcurrentEagerActivityExecutionSize

func WithWorkerOptionMaxConcurrentEagerActivityExecutionSize(size int) WorkerOption

WithWorkerOptionMaxConcurrentEagerActivityExecutionSize sets the maximum number of eager activities that can be running. The default value of 0 means unlimited.

func WithWorkerOptionMaxConcurrentLocalActivityExecutionSize

func WithWorkerOptionMaxConcurrentLocalActivityExecutionSize(size int) WorkerOption

WithWorkerOptionMaxConcurrentLocalActivityExecutionSize sets the maximum concurrent local activity executions this worker can have. The zero value of this uses the default value (1000).

func WithWorkerOptionMaxConcurrentSessionExecutionSize

func WithWorkerOptionMaxConcurrentSessionExecutionSize(size int) WorkerOption

WithWorkerOptionMaxConcurrentSessionExecutionSize sets the maximum number of concurrently running sessions the resource supports. The default value is 1000.

func WithWorkerOptionMaxConcurrentWorkflowTaskExecutionSize

func WithWorkerOptionMaxConcurrentWorkflowTaskExecutionSize(size int) WorkerOption

WithWorkerOptionMaxConcurrentWorkflowTaskExecutionSize sets the maximum concurrent workflow task executions this worker can have. The zero value of this uses the default value (1000). This value cannot be 1.

func WithWorkerOptionMaxConcurrentWorkflowTaskPollers

func WithWorkerOptionMaxConcurrentWorkflowTaskPollers(count int) WorkerOption

WithWorkerOptionMaxConcurrentWorkflowTaskPollers sets the maximum number of goroutines that will concurrently poll the temporal-server to retrieve workflow tasks. The default value is 2. This value cannot be 1.

func WithWorkerOptionMaxHeartbeatThrottleInterval

func WithWorkerOptionMaxHeartbeatThrottleInterval(interval time.Duration) WorkerOption

WithWorkerOptionMaxHeartbeatThrottleInterval sets the maximum amount of time between sending each pending heartbeat to the server. The default value is 60 seconds.

func WithWorkerOptionOnFatalError

func WithWorkerOptionOnFatalError(fn func(error)) WorkerOption

WithWorkerOptionOnFatalError sets the callback invoked on fatal error.

func WithWorkerOptionStickyScheduleToStartTimeout

func WithWorkerOptionStickyScheduleToStartTimeout(timeout time.Duration) WorkerOption

WithWorkerOptionStickyScheduleToStartTimeout sets the sticky schedule to start timeout. The default value is 5 seconds.

func WithWorkerOptionTaskQueueActivitiesPerSecond

func WithWorkerOptionTaskQueueActivitiesPerSecond(rate float64) WorkerOption

WithWorkerOptionTaskQueueActivitiesPerSecond sets the rate limiting on number of activities that can be executed per second for the entire task queue.

This is managed by the server. The zero value of this uses the default value (100,000).

func WithWorkerOptionUseBuildIDForVersioning

func WithWorkerOptionUseBuildIDForVersioning(use bool) WorkerOption

WithWorkerOptionUseBuildIDForVersioning opts this worker into the Worker Versioning feature. NOTE: Experimental.

func WithWorkerOptionWorkerActivitiesPerSecond

func WithWorkerOptionWorkerActivitiesPerSecond(rate float64) WorkerOption

WithWorkerOptionWorkerActivitiesPerSecond sets the rate limiting on number of activities that can be executed per second per worker. This can be used to limit resources used by the worker. The zero value of this uses the default value (100,000).

func WithWorkerOptionWorkerLocalActivitiesPerSecond

func WithWorkerOptionWorkerLocalActivitiesPerSecond(rate float64) WorkerOption

WithWorkerOptionWorkerLocalActivitiesPerSecond sets the rate limiting on number of local activities that can be executed per second per worker.

This can be used to limit resources used by the worker. The zero value of this uses the default value (100,000).

func WithWorkerOptionWorkerStopTimeout

func WithWorkerOptionWorkerStopTimeout(timeout time.Duration) WorkerOption

WithWorkerOptionWorkerStopTimeout sets the worker graceful stop timeout. The default value is 0 seconds.

func WithWorkerOptionWorkflowPanicPolicy

func WithWorkerOptionWorkflowPanicPolicy(policy worker.WorkflowPanicPolicy) WorkerOption

WithWorkerOptionWorkflowPanicPolicy sets how the workflow worker deals with non-deterministic history events and panics. The default value is BlockWorkflow.

type WorkflowFuture

type WorkflowFuture workflow.Future

aliases for types that are used from other packages.

type WorkflowRun

type WorkflowRun client.WorkflowRun

aliases for types that are used from other packages.

type WorkflowSignal

type WorkflowSignal string

WorkflowSignal is a string alias intended for defining groups of workflow signals.

Depraecated: Use Signal instead.

func (WorkflowSignal) MarshalJSON

func (s WorkflowSignal) MarshalJSON() ([]byte, error)

func (WorkflowSignal) String

func (s WorkflowSignal) String() string

func (*WorkflowSignal) UnmarshalJSON

func (s *WorkflowSignal) UnmarshalJSON(data []byte) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL