Documentation ¶
Index ¶
- Variables
- func DefaultPrefix() string
- func NewWorkerOptions(opts ...WorkerOption) worker.Options
- func SetDefaultPrefix(val string)
- type ChildWorkflowFuture
- type Name
- type Query
- type Queue
- type QueueOption
- type Queues
- type Signal
- type WorkerOption
- func WithWorkerOptionBackgroundActivityContext(ctx context.Context) WorkerOption
- func WithWorkerOptionBuildID(buildID string) WorkerOption
- func WithWorkerOptionDeadlockDetectionTimeout(timeout time.Duration) WorkerOption
- func WithWorkerOptionDefaultHeartbeatThrottleInterval(interval time.Duration) WorkerOption
- func WithWorkerOptionDisableEagerActivities(disable bool) WorkerOption
- func WithWorkerOptionDisableRegistrationAliasing(disable bool) WorkerOption
- func WithWorkerOptionDisableWorkflowWorker(disable bool) WorkerOption
- func WithWorkerOptionEnableLoggingInReplay(enable bool) WorkerOption
- func WithWorkerOptionEnableSessionWorker(enable bool) WorkerOption
- func WithWorkerOptionIdentity(identity string) WorkerOption
- func WithWorkerOptionInterceptors(interceptors []interceptor.WorkerInterceptor) WorkerOption
- func WithWorkerOptionLocalActivityWorkerOnly(localOnly bool) WorkerOption
- func WithWorkerOptionMaxConcurrentActivityExecutionSize(size int) WorkerOption
- func WithWorkerOptionMaxConcurrentActivityTaskPollers(count int) WorkerOption
- func WithWorkerOptionMaxConcurrentEagerActivityExecutionSize(size int) WorkerOption
- func WithWorkerOptionMaxConcurrentLocalActivityExecutionSize(size int) WorkerOption
- func WithWorkerOptionMaxConcurrentSessionExecutionSize(size int) WorkerOption
- func WithWorkerOptionMaxConcurrentWorkflowTaskExecutionSize(size int) WorkerOption
- func WithWorkerOptionMaxConcurrentWorkflowTaskPollers(count int) WorkerOption
- func WithWorkerOptionMaxHeartbeatThrottleInterval(interval time.Duration) WorkerOption
- func WithWorkerOptionOnFatalError(fn func(error)) WorkerOption
- func WithWorkerOptionStickyScheduleToStartTimeout(timeout time.Duration) WorkerOption
- func WithWorkerOptionTaskQueueActivitiesPerSecond(rate float64) WorkerOption
- func WithWorkerOptionUseBuildIDForVersioning(use bool) WorkerOption
- func WithWorkerOptionWorkerActivitiesPerSecond(rate float64) WorkerOption
- func WithWorkerOptionWorkerLocalActivitiesPerSecond(rate float64) WorkerOption
- func WithWorkerOptionWorkerStopTimeout(timeout time.Duration) WorkerOption
- func WithWorkerOptionWorkflowPanicPolicy(policy worker.WorkflowPanicPolicy) WorkerOption
- type WorkflowFuture
- type WorkflowRun
- type WorkflowSignal
Constants ¶
This section is empty.
Variables ¶
var ( // ErrClientNil is returned when the temporal client is nil. ErrClientNil = errors.New("client is nil") ErrWorkerNil = errors.New("worker is nil") // ErrChildWorkflowExecutionAttempt is returned when attempting to execute a child workflow without the parent. ErrChildWorkflowExecutionAttempt = errors.New("attempting to execute child workflow directly. use ExecuteWorkflow instead") // ErrExternalWorkflowSignalAttempt is returned when attempting to signal an external workflow from within a workflow. ErrExternalWorkflowSignalAttempt = errors.New("attempting to signal external workflow directly. use SignalExternalWorkflow instead") )
Functions ¶
func NewWorkerOptions ¶
func NewWorkerOptions(opts ...WorkerOption) worker.Options
NewWorkerOptions creates a new worker.Options struct with the given options applied.
Example usage: workerOptions := NewWorkerOptions( WithWorkerOptionOnFatalError(func(err error) { // Handle fatal error }), WithWorkerOptionDisableEagerActivities(true), WithWorkerOptionMaxConcurrentEagerActivityExecutionSize(10), WithWorkerOptionDisableRegistrationAliasing(false), WithWorkerOptionBuildID("my-build-id"), WithWorkerOptionUseBuildIDForVersioning(true), )
Types ¶
type ChildWorkflowFuture ¶
type ChildWorkflowFuture workflow.ChildWorkflowFuture
aliases for types that are used from other packages.
type Query ¶ added in v0.5.0
type Query string
Query is a string alias intended for defining groups of workflow queries. We could have created an alias for for Signal type, but for some wierd reason, if was causing temporal to panic when marshalling the type to JSON.
func (Query) MarshalJSON ¶ added in v0.5.0
func (*Query) UnmarshalJSON ¶ added in v0.5.0
type Queue ¶
type Queue interface { // Name gets the name of the queue as string. Name() Name String() string // Prefix gets the prefix of the queue as string. Prefix() string // WorkflowID sanitzes the workflow ID given the workflows.Options. WorkflowID(opts wrk.Options) string // ExecuteWorkflow executes a workflow given the context, workflows.Options, workflow function or function name, and // optional payload. Lets say, we have a queue called "default", we can either pass in the workflow function or the // function name. // // q := queues.New(queues.WithName("default"), queues.WithClient(client)) // q.ExecuteWorkflow( // ctx, // workflows.NewOptions( // workflows.WithBlock("healthz"), // workflows.WithBlockID(uuid.New().String()), // ), // WorkflowFn, // or "WorkflowFunctionName" // payload..., // optional. // ) ExecuteWorkflow(ctx context.Context, opts wrk.Options, fn any, payload ...any) (WorkflowRun, error) // ExecuteChildWorkflow executes a child workflow given the parent workflow context, workflows.Options, workflow // function or function name and optional payload. It must be executed from within a workflow. // // future, err := q.ExecuteChildWorkflow( // ctx, // workflows.NewOptions( // workflows.WithParent(ctx), // This is important. It tells the queue that this is a child workflow. // workflows.WithBlock("healthz"), // workflows.WithBlockID(uuid.New().String()), // ), // WorkflowFn, // or "WorkflowFunctionName" // payload..., // optional. // ) ExecuteChildWorkflow(ctx workflow.Context, opts wrk.Options, fn any, payload ...any) (ChildWorkflowFuture, error) // SignalWorkflow signals a workflow given the workflow ID, signal name and optional payload. // // if err := q.SignalWorkflow( // ctx, // workflows.NewOptions( // workflows.WithParent(ctx), // This is important. It tells the queue that this is a child workflow. // workflows.WithBlock("healthz"), // workflows.WithBlockID(uuid.New().String()), // ), // "signal-name", // payload, // or nil // ); err != nil { // // handle error // } SignalWorkflow(ctx context.Context, opts wrk.Options, signal Signal, payload any) error // SignalWithStartWorkflow signals a workflow given the workflow ID, signal name and optional payload. // // run, err := q.SignalWithStartWorkflow( // ctx, // workflows.NewOptions( // workflows.WithParent(ctx), // This is important. It tells the queue that this is a child workflow. // workflows.WithBlock("healthz"), // workflows.WithBlockID(uuid.New().String()), // ), // "signal-name", // arg, // or nil // WorkflowFn, // or "WorkflowFunctionName" // payload..., // optional. // ) SignalWithStartWorkflow(ctx context.Context, opts wrk.Options, signal Signal, args any, fn any, payload ...any) (WorkflowRun, error) // SignalExternalWorkflow signals a workflow given the workflow ID, signal name and optional payload. // // future, err := q.SignalExternalWorkflow( // ctx, // workflows.NewOptions( // workflows.WithParent(ctx), // This is important. It tells the queue that this is a child workflow. // workflows.WithBlock("healthz"), // workflows.WithBlockID(uuid.New().String()), // ), // "signal-name", // payload, // or nil // ) SignalExternalWorkflow(ctx workflow.Context, opts wrk.Options, signal Signal, args any) (WorkflowFuture, error) // QueryWorkflow queries a workflow given the workflow opts, query name and optional arguments. // // result, err := q.QueryWorkflow( // ctx, // workflows.NewOptions( // workflows.WithWorkflowID("my-workflow-id"), // ), // Signal("query-name"), // arg1, arg2, // Optional arguments passed to the query function. // ) // // if err != nil { // // handle error // } // // Decode the result. QueryWorkflow(ctx context.Context, opts wrk.Options, query Query, args ...any) (converter.EncodedValue, error) // CreateWorker configures the worker for the queue. // // This function configures the worker responsible for executing registered workflows and activities. It uses a // builder pattern, with helper functions prefixed by queues.WithWorkerOption{Option}, where {Option} corresponds to // a field in Temporal's worker.Option. This allows configuring worker behavior at runtime, such as setting maximum // concurrent tasks, enabling sessions etc. The worker is a singleton, meaning only one worker can be created per // queue. Call this function *before* registering workflows and activities (queues.RegisterWorkflow and // queues.RegisterActivity) to ensure correct association. // // q := queues.New(queues.WithName("my-queue"), queues.WithClient(client)) // q.CreateWorker( // queues.WithWorkerOptionEnableSessionWorker(true), // Enable session worker. // ) CreateWorker(opts ...WorkerOption) // Start starts the worker against the queue. CreateWorker must be called before calling this function. Start(context.Context) error Stop(context.Context) error // Shutdown shuts down the worker against the queue. Shutdown(context.Context) error // RegisterWorkflow registers a workflow against the queue. It is a wrapper around the worker.RegisterWorkflow. RegisterWorkflow(any) // RegisterActivity registers an activity against the queue. It is wrapper around the worker.RegisterActivity. RegisterActivity(any) }
Queue defines the queue interface.
func New ¶
func New(opts ...QueueOption) Queue
New creates a new queue with the given opts. For a queue named "default", we will defined it as follows:
var DefaultQueue = queue.New( queue.WithName("default"), queue.WithClient(client), queue.WithMaxWorkflowAttempts(1), )
type QueueOption ¶
type QueueOption func(Queue)
QueueOption is the option for a queue.
func WithClient ¶
func WithClient(c client.Client) QueueOption
WithClient sets the client for the queue.
func WithName ¶
func WithName(name string) QueueOption
WithName sets the queue name and the prefix for the workflow ID.
func WithWorkflowMaxAttempts ¶
func WithWorkflowMaxAttempts(attempts int32) QueueOption
WithWorkflowMaxAttempts sets the maximum number of attempts for all the workflows in the queue. The default value is 0 i.e. RetryForever.
type Signal ¶ added in v0.5.0
type Signal string
Signal is a string alias intended for defining groups of workflow signals, "register" , "send_welcome_email" etc. It ensures consistency and code clarity. The Signal type provides methods for conversion and serialization, promoting good developer experience.
func (Signal) MarshalJSON ¶ added in v0.5.0
func (*Signal) UnmarshalJSON ¶ added in v0.5.0
type WorkerOption ¶
WorkerOption is a function that configures a worker.Options struct.
func WithWorkerOptionBackgroundActivityContext ¶
func WithWorkerOptionBackgroundActivityContext(ctx context.Context) WorkerOption
WithWorkerOptionBackgroundActivityContext sets the root context for all activities.
func WithWorkerOptionBuildID ¶
func WithWorkerOptionBuildID(buildID string) WorkerOption
WithWorkerOptionBuildID assigns a BuildID to this worker. NOTE: Experimental.
func WithWorkerOptionDeadlockDetectionTimeout ¶
func WithWorkerOptionDeadlockDetectionTimeout(timeout time.Duration) WorkerOption
WithWorkerOptionDeadlockDetectionTimeout sets the maximum amount of time that a workflow task will be allowed to run. The default value is 1 second.
func WithWorkerOptionDefaultHeartbeatThrottleInterval ¶
func WithWorkerOptionDefaultHeartbeatThrottleInterval(interval time.Duration) WorkerOption
WithWorkerOptionDefaultHeartbeatThrottleInterval sets the default amount of time between sending each pending heartbeat to the server. The default value is 30 seconds.
func WithWorkerOptionDisableEagerActivities ¶
func WithWorkerOptionDisableEagerActivities(disable bool) WorkerOption
WithWorkerOptionDisableEagerActivities disables eager activities. The default value is false.
func WithWorkerOptionDisableRegistrationAliasing ¶
func WithWorkerOptionDisableRegistrationAliasing(disable bool) WorkerOption
WithWorkerOptionDisableRegistrationAliasing disables allowing workflow and activity functions registered with custom names from being called with their function references. The default value is false.
func WithWorkerOptionDisableWorkflowWorker ¶
func WithWorkerOptionDisableWorkflowWorker(disable bool) WorkerOption
WithWorkerOptionDisableWorkflowWorker disables the workflow worker for this worker. The default value is false.
func WithWorkerOptionEnableLoggingInReplay ¶
func WithWorkerOptionEnableLoggingInReplay(enable bool) WorkerOption
WithWorkerOptionEnableLoggingInReplay enables logging in replay mode. This is only useful for debugging purposes. The default value is false.
func WithWorkerOptionEnableSessionWorker ¶
func WithWorkerOptionEnableSessionWorker(enable bool) WorkerOption
WithWorkerOptionEnableSessionWorker enables running session workers for activities within a session. The default value is false.
func WithWorkerOptionIdentity ¶
func WithWorkerOptionIdentity(identity string) WorkerOption
WithWorkerOptionIdentity sets the identity for the worker, overwriting the client-level Identity value.
func WithWorkerOptionInterceptors ¶
func WithWorkerOptionInterceptors(interceptors []interceptor.WorkerInterceptor) WorkerOption
WithWorkerOptionInterceptors sets the interceptors to apply to the worker.
func WithWorkerOptionLocalActivityWorkerOnly ¶
func WithWorkerOptionLocalActivityWorkerOnly(localOnly bool) WorkerOption
WithWorkerOptionLocalActivityWorkerOnly sets the worker to only handle workflow tasks and local activities. The default value is false.
func WithWorkerOptionMaxConcurrentActivityExecutionSize ¶
func WithWorkerOptionMaxConcurrentActivityExecutionSize(size int) WorkerOption
WithWorkerOptionMaxConcurrentActivityExecutionSize sets the maximum concurrent activity executions this worker can have. The zero value of this uses the default value (1000).
func WithWorkerOptionMaxConcurrentActivityTaskPollers ¶
func WithWorkerOptionMaxConcurrentActivityTaskPollers(count int) WorkerOption
WithWorkerOptionMaxConcurrentActivityTaskPollers sets the maximum number of goroutines that will concurrently poll the temporal-server to retrieve activity tasks. The default value is 2.
func WithWorkerOptionMaxConcurrentEagerActivityExecutionSize ¶
func WithWorkerOptionMaxConcurrentEagerActivityExecutionSize(size int) WorkerOption
WithWorkerOptionMaxConcurrentEagerActivityExecutionSize sets the maximum number of eager activities that can be running. The default value of 0 means unlimited.
func WithWorkerOptionMaxConcurrentLocalActivityExecutionSize ¶
func WithWorkerOptionMaxConcurrentLocalActivityExecutionSize(size int) WorkerOption
WithWorkerOptionMaxConcurrentLocalActivityExecutionSize sets the maximum concurrent local activity executions this worker can have. The zero value of this uses the default value (1000).
func WithWorkerOptionMaxConcurrentSessionExecutionSize ¶
func WithWorkerOptionMaxConcurrentSessionExecutionSize(size int) WorkerOption
WithWorkerOptionMaxConcurrentSessionExecutionSize sets the maximum number of concurrently running sessions the resource supports. The default value is 1000.
func WithWorkerOptionMaxConcurrentWorkflowTaskExecutionSize ¶
func WithWorkerOptionMaxConcurrentWorkflowTaskExecutionSize(size int) WorkerOption
WithWorkerOptionMaxConcurrentWorkflowTaskExecutionSize sets the maximum concurrent workflow task executions this worker can have. The zero value of this uses the default value (1000). This value cannot be 1.
func WithWorkerOptionMaxConcurrentWorkflowTaskPollers ¶
func WithWorkerOptionMaxConcurrentWorkflowTaskPollers(count int) WorkerOption
WithWorkerOptionMaxConcurrentWorkflowTaskPollers sets the maximum number of goroutines that will concurrently poll the temporal-server to retrieve workflow tasks. The default value is 2. This value cannot be 1.
func WithWorkerOptionMaxHeartbeatThrottleInterval ¶
func WithWorkerOptionMaxHeartbeatThrottleInterval(interval time.Duration) WorkerOption
WithWorkerOptionMaxHeartbeatThrottleInterval sets the maximum amount of time between sending each pending heartbeat to the server. The default value is 60 seconds.
func WithWorkerOptionOnFatalError ¶
func WithWorkerOptionOnFatalError(fn func(error)) WorkerOption
WithWorkerOptionOnFatalError sets the callback invoked on fatal error.
func WithWorkerOptionStickyScheduleToStartTimeout ¶
func WithWorkerOptionStickyScheduleToStartTimeout(timeout time.Duration) WorkerOption
WithWorkerOptionStickyScheduleToStartTimeout sets the sticky schedule to start timeout. The default value is 5 seconds.
func WithWorkerOptionTaskQueueActivitiesPerSecond ¶
func WithWorkerOptionTaskQueueActivitiesPerSecond(rate float64) WorkerOption
WithWorkerOptionTaskQueueActivitiesPerSecond sets the rate limiting on number of activities that can be executed per second for the entire task queue.
This is managed by the server. The zero value of this uses the default value (100,000).
func WithWorkerOptionUseBuildIDForVersioning ¶
func WithWorkerOptionUseBuildIDForVersioning(use bool) WorkerOption
WithWorkerOptionUseBuildIDForVersioning opts this worker into the Worker Versioning feature. NOTE: Experimental.
func WithWorkerOptionWorkerActivitiesPerSecond ¶
func WithWorkerOptionWorkerActivitiesPerSecond(rate float64) WorkerOption
WithWorkerOptionWorkerActivitiesPerSecond sets the rate limiting on number of activities that can be executed per second per worker. This can be used to limit resources used by the worker. The zero value of this uses the default value (100,000).
func WithWorkerOptionWorkerLocalActivitiesPerSecond ¶
func WithWorkerOptionWorkerLocalActivitiesPerSecond(rate float64) WorkerOption
WithWorkerOptionWorkerLocalActivitiesPerSecond sets the rate limiting on number of local activities that can be executed per second per worker.
This can be used to limit resources used by the worker. The zero value of this uses the default value (100,000).
func WithWorkerOptionWorkerStopTimeout ¶
func WithWorkerOptionWorkerStopTimeout(timeout time.Duration) WorkerOption
WithWorkerOptionWorkerStopTimeout sets the worker graceful stop timeout. The default value is 0 seconds.
func WithWorkerOptionWorkflowPanicPolicy ¶
func WithWorkerOptionWorkflowPanicPolicy(policy worker.WorkflowPanicPolicy) WorkerOption
WithWorkerOptionWorkflowPanicPolicy sets how the workflow worker deals with non-deterministic history events and panics. The default value is BlockWorkflow.
type WorkflowFuture ¶
aliases for types that are used from other packages.
type WorkflowRun ¶
type WorkflowRun client.WorkflowRun
aliases for types that are used from other packages.
type WorkflowSignal ¶
type WorkflowSignal string
WorkflowSignal is a string alias intended for defining groups of workflow signals.
Depraecated: Use Signal instead.
func (WorkflowSignal) MarshalJSON ¶
func (s WorkflowSignal) MarshalJSON() ([]byte, error)
func (WorkflowSignal) String ¶
func (s WorkflowSignal) String() string
func (*WorkflowSignal) UnmarshalJSON ¶
func (s *WorkflowSignal) UnmarshalJSON(data []byte) error