Documentation
¶
Overview ¶
Package temporalnexus provides utilities for exposing Temporal constructs as Nexus Operations.
Nexus RPC is a modern open-source service framework for arbitrary-length operations whose lifetime may extend beyond a traditional RPC. Nexus was designed with durable execution in mind, as an underpinning to connect durable executions within and across namespaces, clusters and regions – with a clean API contract to streamline multi-team collaboration. Any service can be exposed as a set of sync or async Nexus operations – the latter provides an operation identity and a uniform interface to get the status of an operation or its result, receive a completion callback, or cancel the operation.
Temporal leverages the Nexus RPC protocol to facilitate calling across namespace and cluster and boundaries.
See also:
Nexus over HTTP Spec: https://github.com/nexus-rpc/api/blob/main/SPEC.md
Nexus Go SDK: https://github.com/nexus-rpc/sdk-go
Index ¶
- func ConvertLinkWorkflowEventToNexusLink(we *commonpb.Link_WorkflowEvent) nexus.Link
- func ConvertNexusLinkToLinkWorkflowEvent(link nexus.Link) (*commonpb.Link_WorkflowEvent, error)
- func GetLogger(ctx context.Context) log.Logger
- func GetMetricsHandler(ctx context.Context) metrics.Handler
- func MustNewWorkflowRunOperationWithOptions[I, O any](options WorkflowRunOperationOptions[I, O]) nexus.Operation[I, O]
- func NewSyncOperation[I any, O any](name string, ...) nexus.Operation[I, O]
- func NewWorkflowRunOperation[I, O any](name string, workflow func(workflow.Context, I) (O, error), ...) nexus.Operation[I, O]
- func NewWorkflowRunOperationWithOptions[I, O any](options WorkflowRunOperationOptions[I, O]) (nexus.Operation[I, O], error)
- type WorkflowHandle
- type WorkflowRunOperationOptions
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ConvertLinkWorkflowEventToNexusLink ¶ added in v1.29.0
func ConvertLinkWorkflowEventToNexusLink(we *commonpb.Link_WorkflowEvent) nexus.Link
ConvertLinkWorkflowEventToNexusLink converts a Link_WorkflowEvent type to Nexus Link.
NOTE: Experimental
func ConvertNexusLinkToLinkWorkflowEvent ¶ added in v1.29.0
func ConvertNexusLinkToLinkWorkflowEvent(link nexus.Link) (*commonpb.Link_WorkflowEvent, error)
ConvertNexusLinkToLinkWorkflowEvent converts a Nexus Link to Link_WorkflowEvent.
NOTE: Experimental
func GetLogger ¶ added in v1.29.0
GetLogger returns a logger to be used in a Nexus operation's context.
func GetMetricsHandler ¶ added in v1.29.0
GetMetricsHandler returns a metrics handler to be used in a Nexus operation's context.
func MustNewWorkflowRunOperationWithOptions ¶
func MustNewWorkflowRunOperationWithOptions[I, O any](options WorkflowRunOperationOptions[I, O]) nexus.Operation[I, O]
MustNewWorkflowRunOperation map an operation to a workflow run with the given options. Panics if invalid options are provided.
NOTE: Experimental
func NewSyncOperation ¶
func NewSyncOperation[I any, O any]( name string, handler func(context.Context, client.Client, I, nexus.StartOperationOptions) (O, error), ) nexus.Operation[I, O]
NewSyncOperation is a helper for creating a synchronous-only nexus.Operation from a given name and handler function. The handler is passed the client that the worker was created with. Sync operations are useful for exposing short-lived Temporal client requests, such as signals, queries, sync update, list workflows, etc...
NOTE: Experimental
Example ¶
package main import ( "context" "github.com/nexus-rpc/sdk-go/nexus" "go.temporal.io/sdk/client" "go.temporal.io/sdk/temporalnexus" "go.temporal.io/sdk/worker" ) type MyInput struct { ID string } type MyQueryOutput struct { } func main() { opRead := temporalnexus.NewSyncOperation("my-read-only-operation", func(ctx context.Context, c client.Client, input MyInput, opts nexus.StartOperationOptions) (MyQueryOutput, error) { var ret MyQueryOutput res, err := c.QueryWorkflow(ctx, input.ID, "", "some-query", nil) if err != nil { return ret, err } return ret, res.Get(&ret) }) // Operations don't have to return values. opWrite := temporalnexus.NewSyncOperation("my-write-operation", func(ctx context.Context, c client.Client, input MyInput, opts nexus.StartOperationOptions) (nexus.NoValue, error) { return nil, c.SignalWorkflow(ctx, input.ID, "", "some-signal", nil) }) service := nexus.NewService("my-service") _ = service.Register(opRead, opWrite) c, _ := client.Dial(client.Options{ HostPort: "localhost:7233", Namespace: "my-namespace", }) w := worker.New(c, "my-task-queue", worker.Options{}) w.RegisterNexusService(service) }
Output:
func NewWorkflowRunOperation ¶
func NewWorkflowRunOperation[I, O any]( name string, workflow func(workflow.Context, I) (O, error), getOptions func(context.Context, I, nexus.StartOperationOptions) (client.StartWorkflowOptions, error), ) nexus.Operation[I, O]
NewWorkflowRunOperation maps an operation to a workflow run.
NOTE: Experimental
Example ¶
package main import ( "context" "github.com/nexus-rpc/sdk-go/nexus" "go.temporal.io/sdk/client" "go.temporal.io/sdk/temporalnexus" "go.temporal.io/sdk/worker" "go.temporal.io/sdk/workflow" ) type MyOutput struct { } type MyInput struct { ID string } func MyHandlerWorkflow(workflow.Context, MyInput) (MyOutput, error) { return MyOutput{}, nil } func main() { op := temporalnexus.NewWorkflowRunOperation( "my-async-operation", MyHandlerWorkflow, func(ctx context.Context, input MyInput, opts nexus.StartOperationOptions) (client.StartWorkflowOptions, error) { return client.StartWorkflowOptions{ // Workflow ID is required and must be deterministically generated from the input in order // for the operation to be idempotent as the request to start the operation may be retried. ID: input.ID, }, nil }) service := nexus.NewService("my-service") _ = service.Register(op) c, _ := client.Dial(client.Options{ HostPort: "localhost:7233", Namespace: "my-namespace", }) w := worker.New(c, "my-task-queue", worker.Options{}) w.RegisterWorkflow(MyHandlerWorkflow) w.RegisterNexusService(service) }
Output:
func NewWorkflowRunOperationWithOptions ¶
func NewWorkflowRunOperationWithOptions[I, O any](options WorkflowRunOperationOptions[I, O]) (nexus.Operation[I, O], error)
NewWorkflowRunOperation map an operation to a workflow run with the given options. Returns an error if invalid options are provided.
NOTE: Experimental
Example ¶
package main import ( "context" "github.com/nexus-rpc/sdk-go/nexus" "go.temporal.io/sdk/client" "go.temporal.io/sdk/temporalnexus" "go.temporal.io/sdk/worker" "go.temporal.io/sdk/workflow" ) type MyWorkflowInput struct { } type MyOutput struct { } type MyInput struct { ID string } func MyHandlerWorkflow(workflow.Context, MyInput) (MyOutput, error) { return MyOutput{}, nil } func MyHandlerWorkflowWithAlternativeInput(workflow.Context, MyWorkflowInput) (MyOutput, error) { return MyOutput{}, nil } func main() { // Alternative 1 - long form version of NewWorkflowRunOperation. opAlt1, _ := temporalnexus.NewWorkflowRunOperationWithOptions( temporalnexus.WorkflowRunOperationOptions[MyInput, MyOutput]{ Name: "my-async-op-1", Workflow: MyHandlerWorkflow, GetOptions: func(ctx context.Context, input MyInput, opts nexus.StartOperationOptions) (client.StartWorkflowOptions, error) { return client.StartWorkflowOptions{ // Workflow ID is required and must be deterministically generated from the input in order // for the operation to be idempotent as the request to start the operation may be retried. ID: input.ID, }, nil }, }) // Alternative 2 - start a workflow with alternative inputs. opAlt2, _ := temporalnexus.NewWorkflowRunOperationWithOptions( temporalnexus.WorkflowRunOperationOptions[MyInput, MyOutput]{ Name: "my-async-op-2", Handler: func(ctx context.Context, input MyInput, opts nexus.StartOperationOptions) (temporalnexus.WorkflowHandle[MyOutput], error) { // Workflows started with this API must take a single input and return single output. // To start workflows with different signatures, use ExecuteUntypedWorkflow. return temporalnexus.ExecuteWorkflow(ctx, opts, client.StartWorkflowOptions{ // Workflow ID is required and must be deterministically generated from the input in order // for the operation to be idempotent as the request to start the operation may be retried. ID: input.ID, }, MyHandlerWorkflowWithAlternativeInput, MyWorkflowInput{}) }, }) service := nexus.NewService("my-service") _ = service.Register(opAlt1, opAlt2) c, _ := client.Dial(client.Options{ HostPort: "localhost:7233", Namespace: "my-namespace", }) w := worker.New(c, "my-task-queue", worker.Options{}) w.RegisterWorkflow(MyHandlerWorkflow) w.RegisterWorkflow(MyHandlerWorkflowWithAlternativeInput) w.RegisterNexusService(service) }
Output:
Types ¶
type WorkflowHandle ¶
type WorkflowHandle[T any] interface { // ID is the workflow's ID. ID() string // ID is the workflow's run ID. RunID() string // contains filtered or unexported methods }
WorkflowHandle is a readonly representation of a workflow run backing a Nexus operation. It's created via the ExecuteWorkflow and ExecuteUntypedWorkflow methods.
NOTE: Experimental
func ExecuteUntypedWorkflow ¶
func ExecuteUntypedWorkflow[R any]( ctx context.Context, nexusOptions nexus.StartOperationOptions, startWorkflowOptions client.StartWorkflowOptions, workflow any, args ...any, ) (WorkflowHandle[R], error)
ExecuteUntypedWorkflow starts a workflow with by function reference or string name, linking the execution chain to a Nexus operation. Useful for invoking workflows that don't follow the single argument - single return type signature. See ExecuteWorkflow for more information.
NOTE: Experimental
func ExecuteWorkflow ¶
func ExecuteWorkflow[I, O any, WF func(workflow.Context, I) (O, error)]( ctx context.Context, nexusOptions nexus.StartOperationOptions, startWorkflowOptions client.StartWorkflowOptions, workflow WF, arg I, ) (WorkflowHandle[O], error)
ExecuteWorkflow starts a workflow run for a WorkflowRunOperationOptions Handler, linking the execution chain to a Nexus operation (subsequent runs started from continue-as-new and retries). Automatically propagates the callback and request ID from the nexus options to the workflow.
NOTE: Experimental
type WorkflowRunOperationOptions ¶
type WorkflowRunOperationOptions[I, O any] struct { // Operation name. Name string // Workflow function to map this operation to. The operation input maps directly to workflow input. // The workflow name is resolved as it would when using this function in client.ExecuteOperation. // GetOptions must be provided when setting this option. Mutually exclusive with Handler. Workflow func(workflow.Context, I) (O, error) // Options for starting the workflow. Must be set if Workflow is set. Mutually exclusive with Handler. // The options returned must include a workflow ID that is deterministically generated from the input in order // for the operation to be idempotent as the request to start the operation may be retried. // TaskQueue is optional and defaults to the current worker's task queue. GetOptions func(context.Context, I, nexus.StartOperationOptions) (client.StartWorkflowOptions, error) // Handler for starting a workflow with a different input than the operation. Mutually exclusive with Workflow // and GetOptions. Handler func(context.Context, I, nexus.StartOperationOptions) (WorkflowHandle[O], error) }
WorkflowRunOperationOptions are options for NewWorkflowRunOperationWithOptions.
NOTE: Experimental