temporalnexus

package
v1.32.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 14, 2025 License: MIT Imports: 14 Imported by: 6

Documentation

Overview

Package temporalnexus provides utilities for exposing Temporal constructs as Nexus Operations.

Nexus RPC is a modern open-source service framework for arbitrary-length operations whose lifetime may extend beyond a traditional RPC. Nexus was designed with durable execution in mind, as an underpinning to connect durable executions within and across namespaces, clusters and regions – with a clean API contract to streamline multi-team collaboration. Any service can be exposed as a set of sync or async Nexus operations – the latter provides an operation identity and a uniform interface to get the status of an operation or its result, receive a completion callback, or cancel the operation.

Temporal leverages the Nexus RPC protocol to facilitate calling across namespace and cluster and boundaries.

See also:

Nexus over HTTP Spec: https://github.com/nexus-rpc/api/blob/main/SPEC.md

Nexus Go SDK: https://github.com/nexus-rpc/sdk-go

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func ConvertLinkWorkflowEventToNexusLink(we *commonpb.Link_WorkflowEvent) nexus.Link

ConvertLinkWorkflowEventToNexusLink converts a Link_WorkflowEvent type to Nexus Link.

NOTE: Experimental

func ConvertNexusLinkToLinkWorkflowEvent added in v1.29.0

func ConvertNexusLinkToLinkWorkflowEvent(link nexus.Link) (*commonpb.Link_WorkflowEvent, error)

ConvertNexusLinkToLinkWorkflowEvent converts a Nexus Link to Link_WorkflowEvent.

NOTE: Experimental

func GetLogger added in v1.29.0

func GetLogger(ctx context.Context) log.Logger

GetLogger returns a logger to be used in a Nexus operation's context.

func GetMetricsHandler added in v1.29.0

func GetMetricsHandler(ctx context.Context) metrics.Handler

GetMetricsHandler returns a metrics handler to be used in a Nexus operation's context.

func MustNewWorkflowRunOperationWithOptions

func MustNewWorkflowRunOperationWithOptions[I, O any](options WorkflowRunOperationOptions[I, O]) nexus.Operation[I, O]

MustNewWorkflowRunOperation map an operation to a workflow run with the given options. Panics if invalid options are provided.

NOTE: Experimental

func NewSyncOperation

func NewSyncOperation[I any, O any](
	name string,
	handler func(context.Context, client.Client, I, nexus.StartOperationOptions) (O, error),
) nexus.Operation[I, O]

NewSyncOperation is a helper for creating a synchronous-only nexus.Operation from a given name and handler function. The handler is passed the client that the worker was created with. Sync operations are useful for exposing short-lived Temporal client requests, such as signals, queries, sync update, list workflows, etc...

NOTE: Experimental

Example
package main

import (
	"context"

	"github.com/nexus-rpc/sdk-go/nexus"
	"go.temporal.io/sdk/client"
	"go.temporal.io/sdk/temporalnexus"
	"go.temporal.io/sdk/worker"
)

type MyInput struct {
	ID string
}

type MyQueryOutput struct {
}

func main() {
	opRead := temporalnexus.NewSyncOperation("my-read-only-operation", func(ctx context.Context, c client.Client, input MyInput, opts nexus.StartOperationOptions) (MyQueryOutput, error) {
		var ret MyQueryOutput
		res, err := c.QueryWorkflow(ctx, input.ID, "", "some-query", nil)
		if err != nil {
			return ret, err
		}
		return ret, res.Get(&ret)
	})

	// Operations don't have to return values.
	opWrite := temporalnexus.NewSyncOperation("my-write-operation", func(ctx context.Context, c client.Client, input MyInput, opts nexus.StartOperationOptions) (nexus.NoValue, error) {
		return nil, c.SignalWorkflow(ctx, input.ID, "", "some-signal", nil)
	})

	service := nexus.NewService("my-service")
	_ = service.Register(opRead, opWrite)

	c, _ := client.Dial(client.Options{
		HostPort:  "localhost:7233",
		Namespace: "my-namespace",
	})
	w := worker.New(c, "my-task-queue", worker.Options{})
	w.RegisterNexusService(service)
}
Output:

func NewWorkflowRunOperation

func NewWorkflowRunOperation[I, O any](
	name string,
	workflow func(workflow.Context, I) (O, error),
	getOptions func(context.Context, I, nexus.StartOperationOptions) (client.StartWorkflowOptions, error),
) nexus.Operation[I, O]

NewWorkflowRunOperation maps an operation to a workflow run.

NOTE: Experimental

Example
package main

import (
	"context"

	"github.com/nexus-rpc/sdk-go/nexus"
	"go.temporal.io/sdk/client"
	"go.temporal.io/sdk/temporalnexus"
	"go.temporal.io/sdk/worker"
	"go.temporal.io/sdk/workflow"
)

type MyOutput struct {
}

type MyInput struct {
	ID string
}

func MyHandlerWorkflow(workflow.Context, MyInput) (MyOutput, error) {
	return MyOutput{}, nil
}

func main() {
	op := temporalnexus.NewWorkflowRunOperation(
		"my-async-operation",
		MyHandlerWorkflow,
		func(ctx context.Context, input MyInput, opts nexus.StartOperationOptions) (client.StartWorkflowOptions, error) {
			return client.StartWorkflowOptions{
				// Workflow ID is required and must be deterministically generated from the input in order
				// for the operation to be idempotent as the request to start the operation may be retried.
				ID: input.ID,
			}, nil
		})

	service := nexus.NewService("my-service")
	_ = service.Register(op)

	c, _ := client.Dial(client.Options{
		HostPort:  "localhost:7233",
		Namespace: "my-namespace",
	})
	w := worker.New(c, "my-task-queue", worker.Options{})
	w.RegisterWorkflow(MyHandlerWorkflow)
	w.RegisterNexusService(service)
}
Output:

func NewWorkflowRunOperationWithOptions

func NewWorkflowRunOperationWithOptions[I, O any](options WorkflowRunOperationOptions[I, O]) (nexus.Operation[I, O], error)

NewWorkflowRunOperation map an operation to a workflow run with the given options. Returns an error if invalid options are provided.

NOTE: Experimental

Example
package main

import (
	"context"

	"github.com/nexus-rpc/sdk-go/nexus"
	"go.temporal.io/sdk/client"
	"go.temporal.io/sdk/temporalnexus"
	"go.temporal.io/sdk/worker"
	"go.temporal.io/sdk/workflow"
)

type MyWorkflowInput struct {
}

type MyOutput struct {
}

type MyInput struct {
	ID string
}

func MyHandlerWorkflow(workflow.Context, MyInput) (MyOutput, error) {
	return MyOutput{}, nil
}

func MyHandlerWorkflowWithAlternativeInput(workflow.Context, MyWorkflowInput) (MyOutput, error) {
	return MyOutput{}, nil
}

func main() {
	// Alternative 1 - long form version of NewWorkflowRunOperation.
	opAlt1, _ := temporalnexus.NewWorkflowRunOperationWithOptions(
		temporalnexus.WorkflowRunOperationOptions[MyInput, MyOutput]{
			Name:     "my-async-op-1",
			Workflow: MyHandlerWorkflow,
			GetOptions: func(ctx context.Context, input MyInput, opts nexus.StartOperationOptions) (client.StartWorkflowOptions, error) {
				return client.StartWorkflowOptions{
					// Workflow ID is required and must be deterministically generated from the input in order
					// for the operation to be idempotent as the request to start the operation may be retried.
					ID: input.ID,
				}, nil
			},
		})

	// Alternative 2 - start a workflow with alternative inputs.
	opAlt2, _ := temporalnexus.NewWorkflowRunOperationWithOptions(
		temporalnexus.WorkflowRunOperationOptions[MyInput, MyOutput]{
			Name: "my-async-op-2",
			Handler: func(ctx context.Context, input MyInput, opts nexus.StartOperationOptions) (temporalnexus.WorkflowHandle[MyOutput], error) {
				// Workflows started with this API must take a single input and return single output.
				// To start workflows with different signatures, use ExecuteUntypedWorkflow.
				return temporalnexus.ExecuteWorkflow(ctx, opts, client.StartWorkflowOptions{
					// Workflow ID is required and must be deterministically generated from the input in order
					// for the operation to be idempotent as the request to start the operation may be retried.
					ID: input.ID,
				}, MyHandlerWorkflowWithAlternativeInput, MyWorkflowInput{})
			},
		})

	service := nexus.NewService("my-service")
	_ = service.Register(opAlt1, opAlt2)

	c, _ := client.Dial(client.Options{
		HostPort:  "localhost:7233",
		Namespace: "my-namespace",
	})
	w := worker.New(c, "my-task-queue", worker.Options{})
	w.RegisterWorkflow(MyHandlerWorkflow)
	w.RegisterWorkflow(MyHandlerWorkflowWithAlternativeInput)
	w.RegisterNexusService(service)
}
Output:

Types

type WorkflowHandle

type WorkflowHandle[T any] interface {
	// ID is the workflow's ID.
	ID() string
	// ID is the workflow's run ID.
	RunID() string
	// contains filtered or unexported methods
}

WorkflowHandle is a readonly representation of a workflow run backing a Nexus operation. It's created via the ExecuteWorkflow and ExecuteUntypedWorkflow methods.

NOTE: Experimental

func ExecuteUntypedWorkflow

func ExecuteUntypedWorkflow[R any](
	ctx context.Context,
	nexusOptions nexus.StartOperationOptions,
	startWorkflowOptions client.StartWorkflowOptions,
	workflow any,
	args ...any,
) (WorkflowHandle[R], error)

ExecuteUntypedWorkflow starts a workflow with by function reference or string name, linking the execution chain to a Nexus operation. Useful for invoking workflows that don't follow the single argument - single return type signature. See ExecuteWorkflow for more information.

NOTE: Experimental

func ExecuteWorkflow

func ExecuteWorkflow[I, O any, WF func(workflow.Context, I) (O, error)](
	ctx context.Context,
	nexusOptions nexus.StartOperationOptions,
	startWorkflowOptions client.StartWorkflowOptions,
	workflow WF,
	arg I,
) (WorkflowHandle[O], error)

ExecuteWorkflow starts a workflow run for a WorkflowRunOperationOptions Handler, linking the execution chain to a Nexus operation (subsequent runs started from continue-as-new and retries). Automatically propagates the callback and request ID from the nexus options to the workflow.

NOTE: Experimental

type WorkflowRunOperationOptions

type WorkflowRunOperationOptions[I, O any] struct {
	// Operation name.
	Name string
	// Workflow function to map this operation to. The operation input maps directly to workflow input.
	// The workflow name is resolved as it would when using this function in client.ExecuteOperation.
	// GetOptions must be provided when setting this option. Mutually exclusive with Handler.
	Workflow func(workflow.Context, I) (O, error)
	// Options for starting the workflow. Must be set if Workflow is set. Mutually exclusive with Handler.
	// The options returned must include a workflow ID that is deterministically generated from the input in order
	// for the operation to be idempotent as the request to start the operation may be retried.
	// TaskQueue is optional and defaults to the current worker's task queue.
	GetOptions func(context.Context, I, nexus.StartOperationOptions) (client.StartWorkflowOptions, error)
	// Handler for starting a workflow with a different input than the operation. Mutually exclusive with Workflow
	// and GetOptions.
	Handler func(context.Context, I, nexus.StartOperationOptions) (WorkflowHandle[O], error)
}

WorkflowRunOperationOptions are options for NewWorkflowRunOperationWithOptions.

NOTE: Experimental

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL