lro

package module
v1.11.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 6, 2024 License: Apache-2.0 Imports: 26 Imported by: 0

README

Long-Running Operations (LROs)

This Go package provides a client for managing long-running operations (LROs) using Google Cloud Spanner and Google Cloud Workflows.

Background

Long-running operations (LROs) are a pattern in resource-driven design where an operation takes an extended period of time to complete. This can be due to a number of factors, such as the size of the operation, the amount of data involved, or the availability of resources.

LROs are often used for tasks such as:

  • Copying large amounts of data
  • Processing large amounts of data
  • Running complex queries
  • Executing long-running tasks

LROs can be implemented in a number of ways, but the most common approach is to use a client-server model. In this model, the client initiates the operation and the server performs the operation in the background. The client can then poll the server to check the status of the operation until it is complete.

More details on LROs are available at: https://google.aip.dev/151

This package makes managing LROs on your server super simple. 😎

Requirements

This package is part of the Alis Build Platform and have the following requirements:

  • A Google Cloud Spanner database is required and grant permission to the Alis Build Platform service account.
  • Enable the Managed Operations feature within the Alis Build VS Code extension which will provision the requred Spanner table as well as the underlyging Google Cloud Workflows resource within your deployment.

Features

  • Simplified LRO Management: Create, retrieve, update, and wait for LROs with ease.
  • Spanner Integration: Persistently store LRO state in Google Cloud Spanner for reliability.
  • Resumable LROs: Leverage Google Cloud Workflows to handle asynchronous waiting and resume operations seamlessly.
  • Developer-Friendly: Intuitive API and clear examples for smooth integration into your Go projects.

Key Concepts:

  • Operation: Represents an LRO and provides methods for managing its lifecycle.
  • State: Store and retrieve custom state associated with an LRO, enabling you to resume operations from where they left off.
  • Wait: Block until an operation is complete, with options for timeouts, polling intervals, and waiting on child operations.
  • Asynchronous Wait: Delegate long waits to Google Cloud Workflows, freeing up your application resources.

Getting Started:

  1. Installation:

    go get go.alis.build/lro
    
  2. Initialization:

    import (
        "context"
        "go.alis.build/lro"
        "go.alis.build/sproto" 
    )
    
    func main() {
        ctx := context.Background()
    
        // Configure Spanner
        spannerConfig := &lro.SpannerConfig{
            Project:  "your-gcp-project",
            Instance: "your-spanner-instance",
            Database: "your-spanner-database",
        }
    
        // Create a new LRO client
        client, err := lro.NewClient(ctx, spannerConfig)
        if err != nil {
            // Handle error
        }
        defer client.Close()
    
        // ... your LRO management logic ...
    }
    
  3. Example Usage:

    // Create a new LRO
    op, err := lro.NewOperation[any](ctx, client) 
    if err != nil {
        // Handle error
    }
    
    // ... perform long-running task ...
    
    // Mark the operation as done
    err = op.Done(&yourResponseProtoMessage)
    if err != nil {
        // Handle error
    }
    

Documentation

Overview

Package lro provides a client for managing long-running operations (LROs) using Google Cloud Spanner. LROs are a pattern in resource-driven design where an operation takes an extended period of time to complete. This can be due to a number of factors, such as the size of the operation, the amount of data involved, or the availability of resources.

The lro package provides a number of features for managing LROs, including:

  • Creating LROs: The CreateOperation method creates a new LRO and stores it in Spanner.
  • Getting LROs: The GetOperation method gets an LRO from Spanner.
  • Updating LROs: The SetSuccessful and SetFailed methods update the status of an LRO in Spanner.
  • Waiting for LROs to finish: The WaitOperation method returns the LRO when it is done or when a specified timeout is reached.

// More details on LROs are available at: https://google.aip.dev/151

This package support both standard LRO, as well as resumable LROs making use of Google Cloud Workflows to poll the relevant operation(s).

Index

Examples

Constants

View Source
const (
	// OperationColumnName is the column name used in spanner to store LROs
	OperationColumnName = "Operation"
	// StateColumnName is the column name used in spanner to store states (if used)
	StateColumnName = "State"
	// ResumePointColumnName is the column name used in spanner to the point to resume to.
	ResumePointColumnName = "ResumePoint"
)
View Source
const OperationIdHeaderKey = "x-alis-operation-id"

OperationIdHeaderKey is use to indicate the the LRO already exists, and does not need to be created

Variables

View Source
var EOF = errors.New("EOF")

EOF is the error returned when no more entities (such as children operations) are available. Functions should return EOF only to signal a graceful end read.

Functions

func UnmarshalOperation added in v1.9.3

func UnmarshalOperation(operation *longrunningpb.Operation, response, metadata proto.Message) error

UnmarshalOperation retrieves the underlying long-running operation (LRO) and unmarshals its response and metadata into the provided protocol buffer messages.

Parameters:

  • operation: The resource name of the operation in the format `operations/*`.
  • response: The protocol buffer message into which the response of the LRO should be unmarshalled. Can be nil.
  • metadata: The protocol buffer message into which the metadata of the LRO should be unmarshalled. Can be nil.

Returns:

  • An error if the operation is not done, the operation resulted in an error, or there was an issue unmarshalling the response or metadata. Nil otherwise.

func WaitOperation added in v0.5.1

func WaitOperation(ctx context.Context, operation string, service OperationsService, timeout time.Duration,
) (*longrunningpb.Operation, error)

Wait waits for the operation to complete.

Arguments:

  • operation: The full LRO resource name, of the format 'operations/*'
  • service: The service from which the operation originates
  • timeout: The period after which the method time outs and returns an error.

Before using this method consider using the op.Wait() method with the lro.WithClient() option to use a custom LRO client.

Types

type Client added in v0.0.2

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(ctx context.Context, spannerConfig *SpannerConfig, opts ...ClientOption) (*Client, error)

NewClient creates a new Client for managing long-running operations (LROs).

It initializes a client with the provided Spanner and Workflows configurations. Spanner is used to store LRO state, and Workflows is used for execution async wait operations.

The following environment variables can be used to configure the client:

  • ALIS_OS_PROJECT: The Google Cloud project ID.
  • ALIS_REGION: The Google Cloud region.
  • ALIS_RUN_HASH: The Cloud Run hash used for the internal gateway.

Use any of the client options WithLocation, WithProject, [WithWorkflowsResumeHost] to override any of the defaults.

func (*Client) Close added in v1.1.0

func (c *Client) Close()

Close closes the underlying spanner.Client instance.

func (*Client) GetOperation added in v0.0.2

GetOperation retrieves a LRO from the database.

func (*Client) UnmarshalOperation added in v1.3.0

func (c *Client) UnmarshalOperation(ctx context.Context, operation string, response, metadata proto.Message) error

SetResponse retrieves the underlying LRO and unmarshals the Response into the provided response object. It takes three arguments

  • ctx: Context
  • operation: The resource name of the operation in the format `operations/*`
  • response: The response object into which the underlyging response of the LRO should be marshalled into.

type ClientOption added in v1.9.0

type ClientOption func(*ClientOptions)

ClientOption is a functional option for the NewClient method.

func WithLocation added in v1.11.0

func WithLocation(location string) ClientOption

WithProject sets the default location, which allows one to override the project as per the ALIS_REGION env.

func WithProject added in v1.11.0

func WithProject(project string) ClientOption

WithProject sets the default Google Cloud Project, which allows one to override the project as per the ALIS_OS_PROJECT env.

func WithResumeHost added in v1.11.2

func WithResumeHost(host string) ClientOption

WithResumeHost enables Google Cloud Workflows integration for handling resumable Long-Running Operations (LROs). The default host is the relevant internal gateway inferred from the ALIS_RUN_HASH env. Use this method to override the host. Example host: https://internal-gateway-....run.app

type ClientOptions added in v1.9.0

type ClientOptions struct {
	// contains filtered or unexported fields
}

type ErrInvalidNextToken added in v0.8.0

type ErrInvalidNextToken struct {
	// contains filtered or unexported fields
}

func (ErrInvalidNextToken) Error added in v0.8.0

func (e ErrInvalidNextToken) Error() string

type ErrNotFound

type ErrNotFound struct {
	Operation string // unavailable locations
}

ErrNotFound is returned when the requested operation does not exist.

func (ErrNotFound) Error

func (e ErrNotFound) Error() string

type ErrWaitDeadlineExceeded added in v0.3.0

type ErrWaitDeadlineExceeded struct {
	// contains filtered or unexported fields
}

ErrWaitDeadlineExceeded is returned when the WaitOperation exceeds the specified, or default, timeout

func (ErrWaitDeadlineExceeded) Error added in v0.3.0

func (e ErrWaitDeadlineExceeded) Error() string

type InvalidOperationName

type InvalidOperationName struct {
	Name string // unavailable locations
}

func (InvalidOperationName) Error

func (e InvalidOperationName) Error() string

type Operation added in v1.3.0

type Operation[T any] struct {
	// contains filtered or unexported fields
}

Operation is an object to to manage the lifecycle of a Google Longrunning-Operation.

func NewOperation added in v1.3.6

func NewOperation[T any](ctx context.Context, client *Client, opts ...OperationOption) (*Operation[T], error)

NewOperation creates a new Operation object used to simplify the management of the underlying LRO. The default behaviour of this function is to create a new underlying LRO. However, If the provided ctx contains a value for the x-alis-operation-id key, the function instantiates an Operation to manage the LRO identified by the x-alis-operation-id. The same applies if the WithExistingOperation option is used.

Example (non-resumable):

// No need to create a custom State object, simply add 'any' type
op, _ := lro.NewOperation[any](ctx, client)

Example (resumable lro with internal state):

// In this case, define your own state object which could be used to transfer state between async wait operations.
type MyState struct{}
op, _ := lro.NewOperation[MyState](ctx, client)

// Retrieve the state object.
state = op.State()
Example
// Create a global context object.
ctx := context.Background()

// create client (preferably only once at a global level in the init function of your package/service)
client, _ := NewClient(ctx, &SpannerConfig{})

// An simple illustration of using an LRO
exampleRpcMethod := func(ctx context.Context, req *proto.Message) (*longrunningpb.Operation, error) {
	// Create an Operation object used to manage LRO activities
	type MyState struct{}
	op, err := NewOperation[MyState](ctx, client)
	if err != nil {
		return nil, err
	}

	// 2. We'll ship the business logic to a go routine to run as a background task
	go func() error {
		// If relevant get the ResumePoint
		switch op.ResumePoint() {
		case "resumePoint1":
			goto point1
		}

		{
			// This is where we will wait...
			// Scenario 1: simply wait for 30 seconds.
			op.Wait(WithSleep(30 * time.Second))

			// Scenario 2: Make one or more hits to methods returning LROs, and wait for these to complete.
			op.Wait(WithChildOperations([]string{"operations/123", "operations/456"}...))

			// Scenario 3: Customise the poll frequency and timeout
			op.Wait(WithChildOperations([]string{"operations/123", "operations/456"}...), WithTimeout(10*time.Minute), WithPollFrequency(30*time.Second))

			// Scenario 4: If the underlying operations are from another product, you would need to use a different client to poll
			// the relevant GetOperation methods.  The service you connect to needs to satisfy the LRO interface as defined by google.longrunning package
			var conn grpc.ClientConnInterface // create a connection to the relevant gRPC server
			myLroClient := longrunningpb.NewOperationsClient(conn)
			op.Wait(WithChildOperations([]string{"operations/123", "operations/456"}...), WithService(myLroClient))

			// Scenario 5: If, however the operations from another product does not implement the google.longrunning service, you could use
			// any service that implements a GetOperation() method, therefore satisfying the OperationsService interface defined in this package.
			var myProductClient OperationsService
			op.Wait(WithChildOperations([]string{"operations/123", "operations/456"}...), WithService(myProductClient))

			// Scenario 6: Wait asynchronously for a longer time.
			op.SetState(&MyState{}) // Explicitly set the state before waiting asynchronously
			op.Wait(WithSleep(24*time.Hour), WithAsync("resumePoint1"))
			// since we explicitly configure the Async option, we need to exit this method now.
			return nil
		}

	point1:
		// Once resumed, get the last available State and do some cool stuff.
		state := op.State()
		_ = state

		// And finally mark the LRO as complete.
		var res proto.Message // replace with actual Response definition.
		return op.Done(res)
	}()

	// 3. Return the LRO to the caller
	return op.ReturnRPC()
}
_, _ = exampleRpcMethod(ctx, nil)
Output:

func (*Operation[T]) Delete added in v1.3.0

func (o *Operation[T]) Delete() error

Delete deletes the LRO, including auxiliary columns

func (*Operation[T]) Done added in v1.3.0

func (o *Operation[T]) Done(response proto.Message) error

Done marks the operation as done with a success response.

func (*Operation[T]) Error added in v1.3.0

func (o *Operation[T]) Error(error error) error

Error marks the operation as done with an error.

func (*Operation[T]) GetOperation added in v1.9.3

func (o *Operation[T]) GetOperation() (*longrunningpb.Operation, error)

GetOperation retrieves the underlying longrunningpb.Operation.

func (*Operation[T]) InDevMode added in v1.11.7

func (o *Operation[T]) InDevMode() bool

InDevMode returns a true if the Operation is running locally

func (*Operation[T]) Name added in v1.11.0

func (o *Operation[T]) Name() string

Name returns the name of the underlying Operation resource.

func (*Operation[T]) ResumePoint added in v1.11.0

func (o *Operation[T]) ResumePoint() string

ResumePoint retrieves the latest point from which to resume an asynchronous operation.

func (*Operation[T]) ReturnRPC added in v1.9.2

func (o *Operation[T]) ReturnRPC() (*longrunningpb.Operation, error)

ReturnRPC returns the underlying longrunningpb.Operation for further processing or sending to other APIs.

func (*Operation[T]) SetMetadata added in v1.3.0

func (o *Operation[T]) SetMetadata(metadata proto.Message) (*longrunningpb.Operation, error)

SetMetadata sets the metadata for the operation.

func (*Operation[T]) SetState added in v1.11.0

func (o *Operation[T]) SetState(state *T)

SaveState saves a current state with the LRO resource

func (*Operation[T]) State added in v1.11.0

func (o *Operation[T]) State() *T

GetState returns the state of the current LRO.

func (*Operation[T]) Wait added in v1.11.0

func (o *Operation[T]) Wait(opts ...WaitOption) error

Wait blocks until the operation is marked as done or the configured timeout is reached.

By default, Wait will wait for up to 7 minutes, polling every 3 seconds. These values can be configured by providing WaitOption.

A slice of child Operation names can be provided as a WaitOption. In this case, Wait will block until the parent operation and all child operations are marked as done, or until the timeout is reached.

If the operation is not done when the timeout is reached, Wait will return an ErrWaitDeadlineExceeded error.

----

Example 1:

// Simply wait for 30 seconds.
op.Wait(WithSleep(30 * time.Second))

Example 2:

// Make one or more hits to methods returning LROs, and wait for these to complete.
op.Wait(WithChildOperations([]string{"operations/123", "operations/456"}))

Example 3:

// Customise the poll frequency and timeout
op.Wait(WithChildOperations([]string{"operations/123", "operations/456"}), WithTimeout(10*time.Minute), WithPollFrequency(30*time.Second))

Example 4:

// If the underlying operations are from another product, you would need to use a different client to poll
// the relevant GetOperation methods.  The service you connect to needs to satisfy the LRO interface as defined by google.longrunning package
var conn grpc.ClientConnInterface // create a connection to the relevant gRPC server
myLroClient := longrunningpb.NewOperationsClient(conn)
op.Wait(WithChildOperations([]string{"operations/123", "operations/456"}), WithService(myLroClient))

Example 5:

// If, however the operations from another product does not implement the google.longrunning service, you could use
// any service that implements a GetOperation() method, therefore satisfying the [OperationsService] interface defined in this package.
var myProductClient OperationsService
op.Wait(WithChildOperations([]string{"operations/123", "operations/456"}), WithService(myProductClient))

Example 6:

// Wait asynchronously for a longer time.
op.SetState(&MyState{}) // Explicitly set the state before waiting asynchronously
op.Wait(WithSleep(24*time.Hour), WithAsync("resumePoint1"))
return nil  // a `return` statement usually follows when waiting asynchronously

type OperationOption added in v1.11.0

type OperationOption func(*OperationOptions)

ClientOption is a functional option for the NewOperation method.

func WithCallbackFn added in v1.11.0

func WithCallbackFn(fn func(ctx context.Context)) OperationOption

WithCallbackFn sets the function to call back when running an Async method locally.

func WithExistingOperation added in v1.10.0

func WithExistingOperation(name string) OperationOption

WithExistingOperation allows one to instantiate a new Operation object from an existing LRO. If this is provided the underlyging NewOperation method will not create a new Operation resource nor infer one from the ctx.

func WithResumeMethod added in v1.11.0

func WithResumeMethod(name string) OperationOption

WithResumeMethod sets the fully qualified method name to be resumed

In most scenarios this will not be required since the method name will be automatically made avaialable by the grpc server exposed by the grpc.Method() Example: "/myorg.co.jobs.v1.JobsService/GenerateClientReports"

type OperationOptions added in v1.11.0

type OperationOptions struct {
	// contains filtered or unexported fields
}

type OperationsService added in v1.11.0

type OperationsService interface {
	GetOperation(ctx context.Context, in *longrunningpb.GetOperationRequest, opts ...grpc.CallOption) (*longrunningpb.Operation, error)
}

origin is an interface that wraps the GetOperation method. This allows us to use the GetOperation method of the service from which the operation originated, which should implement this interface if it produces longrunning operations.

type SpannerConfig added in v1.3.0

type SpannerConfig struct {
	// Project
	// The Project where the Database is deployed
	Project string
	// Spanner Instance
	// The instance name of the dabase, for example 'primary'
	Instance string
	// Spanner Database
	// The database name, for example 'myorganisation-myproduct'
	Database string
	// contains filtered or unexported fields
}

SpannerConfig is used to configure the underlygin Google Cloud Spanner client.

type WaitConfig added in v1.11.0

type WaitConfig struct {
	// contains filtered or unexported fields
}

WaitConfig is used to store the waiting configurations specified as functional WaitOption(s) in the Wait() and WaitAsync() methods.

type WaitOption added in v1.9.0

type WaitOption func(*WaitConfig) error

WaitOption is a functional option for WaitConfig.

func WithAsync added in v1.11.0

func WithAsync(resumePoint string) WaitOption

WithAsync allows one to exit the method and wait asynchronously.

func WithChildGetOperationEndpoint added in v1.11.0

func WithChildGetOperationEndpoint(endpoint string) WaitOption

WithChildGetOperationEndpoint is used for local development

func WithChildOperations added in v1.11.0

func WithChildOperations(operations ...string) WaitOption

WithChildOperations specifies operations for which to wait. Format: ["operations/123", "operations/456", "operations/789"]

func WithPollFrequency added in v1.9.0

func WithPollFrequency(pollFrequency time.Duration) WaitOption

WithPollFrequency specified a constant duration to use when polling the underlyging Child Operations to wait for.

func WithService added in v1.11.0

func WithService(service OperationsService) WaitOption

WithService allows one to override the underlying Operations client used to poll the child operations

func WithSleep added in v1.11.0

func WithSleep(sleep time.Duration) WaitOption

WithSleep specified a constant duration for which to wait.

func WithTimeout added in v1.9.0

func WithTimeout(timeout time.Duration) WaitOption

WithTimeout specified a constant duration after which the Wait method will return a ErrWaitDeadlineExceeded error.

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL