Documentation ¶
Overview ¶
Package lro provides a client for managing long-running operations (LROs) using Google Cloud Spanner. LROs are a pattern in resource-driven design where an operation takes an extended period of time to complete. This can be due to a number of factors, such as the size of the operation, the amount of data involved, or the availability of resources.
The lro package provides a number of features for managing LROs, including:
- Creating LROs: The CreateOperation method creates a new LRO and stores it in Spanner.
- Getting LROs: The GetOperation method gets an LRO from Spanner.
- Updating LROs: The SetSuccessful and SetFailed methods update the status of an LRO in Spanner.
- Waiting for LROs to finish: The WaitOperation method returns the LRO when it is done or when a specified timeout is reached.
// More details on LROs are available at: https://google.aip.dev/151
This package support both standard LRO, as well as resumable LROs making use of Google Cloud Workflows to poll the relevant operation(s).
Index ¶
- Constants
- Variables
- func UnmarshalOperation(operation *longrunningpb.Operation, response, metadata proto.Message) error
- func WaitOperation(ctx context.Context, operation string, service OperationsService, ...) (*longrunningpb.Operation, error)
- type Client
- type ClientOption
- type ClientOptions
- type ErrInvalidNextToken
- type ErrNotFound
- type ErrWaitDeadlineExceeded
- type InvalidOperationName
- type Operation
- func (o *Operation[T]) Delete() error
- func (o *Operation[T]) Done(response proto.Message) error
- func (o *Operation[T]) Error(error error) error
- func (o *Operation[T]) GetOperation() (*longrunningpb.Operation, error)
- func (o *Operation[T]) InDevMode() bool
- func (o *Operation[T]) Name() string
- func (o *Operation[T]) ResumePoint() string
- func (o *Operation[T]) ReturnRPC() (*longrunningpb.Operation, error)
- func (o *Operation[T]) SetMetadata(metadata proto.Message) (*longrunningpb.Operation, error)
- func (o *Operation[T]) SetState(state *T)
- func (o *Operation[T]) State() *T
- func (o *Operation[T]) Wait(opts ...WaitOption) error
- type OperationOption
- type OperationOptions
- type OperationsService
- type SpannerConfig
- type WaitConfig
- type WaitOption
- func WithAsync(resumePoint string) WaitOption
- func WithChildGetOperationEndpoint(endpoint string) WaitOption
- func WithChildOperations(operations ...string) WaitOption
- func WithPollFrequency(pollFrequency time.Duration) WaitOption
- func WithService(service OperationsService) WaitOption
- func WithSleep(sleep time.Duration) WaitOption
- func WithTimeout(timeout time.Duration) WaitOption
Examples ¶
Constants ¶
const ( // OperationColumnName is the column name used in spanner to store LROs OperationColumnName = "Operation" // StateColumnName is the column name used in spanner to store states (if used) StateColumnName = "State" // ResumePointColumnName is the column name used in spanner to the point to resume to. ResumePointColumnName = "ResumePoint" )
const OperationIdHeaderKey = "x-alis-operation-id"
OperationIdHeaderKey is use to indicate the the LRO already exists, and does not need to be created
Variables ¶
var EOF = errors.New("EOF")
EOF is the error returned when no more entities (such as children operations) are available. Functions should return EOF only to signal a graceful end read.
Functions ¶
func UnmarshalOperation ¶ added in v1.9.3
func UnmarshalOperation(operation *longrunningpb.Operation, response, metadata proto.Message) error
UnmarshalOperation retrieves the underlying long-running operation (LRO) and unmarshals its response and metadata into the provided protocol buffer messages.
Parameters:
- operation: The resource name of the operation in the format `operations/*`.
- response: The protocol buffer message into which the response of the LRO should be unmarshalled. Can be nil.
- metadata: The protocol buffer message into which the metadata of the LRO should be unmarshalled. Can be nil.
Returns:
- An error if the operation is not done, the operation resulted in an error, or there was an issue unmarshalling the response or metadata. Nil otherwise.
func WaitOperation ¶ added in v0.5.1
func WaitOperation(ctx context.Context, operation string, service OperationsService, timeout time.Duration, ) (*longrunningpb.Operation, error)
Wait waits for the operation to complete.
Arguments:
- operation: The full LRO resource name, of the format 'operations/*'
- service: The service from which the operation originates
- timeout: The period after which the method time outs and returns an error.
Before using this method consider using the op.Wait() method with the lro.WithClient() option to use a custom LRO client.
Types ¶
type Client ¶ added in v0.0.2
type Client struct {
// contains filtered or unexported fields
}
func NewClient ¶
func NewClient(ctx context.Context, spannerConfig *SpannerConfig, opts ...ClientOption) (*Client, error)
NewClient creates a new Client for managing long-running operations (LROs).
It initializes a client with the provided Spanner and Workflows configurations. Spanner is used to store LRO state, and Workflows is used for execution async wait operations.
The following environment variables can be used to configure the client:
- ALIS_OS_PROJECT: The Google Cloud project ID.
- ALIS_REGION: The Google Cloud region.
- ALIS_RUN_HASH: The Cloud Run hash used for the internal gateway.
Use any of the client options WithLocation, WithProject, [WithWorkflowsResumeHost] to override any of the defaults.
func (*Client) Close ¶ added in v1.1.0
func (c *Client) Close()
Close closes the underlying spanner.Client instance.
func (*Client) GetOperation ¶ added in v0.0.2
func (c *Client) GetOperation(ctx context.Context, req *longrunningpb.GetOperationRequest, opts ...grpc.CallOption) (*longrunningpb.Operation, error)
GetOperation retrieves a LRO from the database.
func (*Client) UnmarshalOperation ¶ added in v1.3.0
func (c *Client) UnmarshalOperation(ctx context.Context, operation string, response, metadata proto.Message) error
SetResponse retrieves the underlying LRO and unmarshals the Response into the provided response object. It takes three arguments
- ctx: Context
- operation: The resource name of the operation in the format `operations/*`
- response: The response object into which the underlyging response of the LRO should be marshalled into.
type ClientOption ¶ added in v1.9.0
type ClientOption func(*ClientOptions)
ClientOption is a functional option for the NewClient method.
func WithLocation ¶ added in v1.11.0
func WithLocation(location string) ClientOption
WithProject sets the default location, which allows one to override the project as per the ALIS_REGION env.
func WithProject ¶ added in v1.11.0
func WithProject(project string) ClientOption
WithProject sets the default Google Cloud Project, which allows one to override the project as per the ALIS_OS_PROJECT env.
func WithResumeHost ¶ added in v1.11.2
func WithResumeHost(host string) ClientOption
WithResumeHost enables Google Cloud Workflows integration for handling resumable Long-Running Operations (LROs). The default host is the relevant internal gateway inferred from the ALIS_RUN_HASH env. Use this method to override the host. Example host: https://internal-gateway-....run.app
type ClientOptions ¶ added in v1.9.0
type ClientOptions struct {
// contains filtered or unexported fields
}
type ErrInvalidNextToken ¶ added in v0.8.0
type ErrInvalidNextToken struct {
// contains filtered or unexported fields
}
func (ErrInvalidNextToken) Error ¶ added in v0.8.0
func (e ErrInvalidNextToken) Error() string
type ErrNotFound ¶
type ErrNotFound struct {
Operation string // unavailable locations
}
ErrNotFound is returned when the requested operation does not exist.
func (ErrNotFound) Error ¶
func (e ErrNotFound) Error() string
type ErrWaitDeadlineExceeded ¶ added in v0.3.0
type ErrWaitDeadlineExceeded struct {
// contains filtered or unexported fields
}
ErrWaitDeadlineExceeded is returned when the WaitOperation exceeds the specified, or default, timeout
func (ErrWaitDeadlineExceeded) Error ¶ added in v0.3.0
func (e ErrWaitDeadlineExceeded) Error() string
type InvalidOperationName ¶
type InvalidOperationName struct {
Name string // unavailable locations
}
func (InvalidOperationName) Error ¶
func (e InvalidOperationName) Error() string
type Operation ¶ added in v1.3.0
type Operation[T any] struct { // contains filtered or unexported fields }
Operation is an object to to manage the lifecycle of a Google Longrunning-Operation.
func NewOperation ¶ added in v1.3.6
func NewOperation[T any](ctx context.Context, client *Client, opts ...OperationOption) (*Operation[T], error)
NewOperation creates a new Operation object used to simplify the management of the underlying LRO. The default behaviour of this function is to create a new underlying LRO. However, If the provided ctx contains a value for the x-alis-operation-id key, the function instantiates an Operation to manage the LRO identified by the x-alis-operation-id. The same applies if the WithExistingOperation option is used.
Example (non-resumable):
// No need to create a custom State object, simply add 'any' type op, _ := lro.NewOperation[any](ctx, client)
Example (resumable lro with internal state):
// In this case, define your own state object which could be used to transfer state between async wait operations. type MyState struct{} op, _ := lro.NewOperation[MyState](ctx, client) // Retrieve the state object. state = op.State()
Example ¶
// Create a global context object. ctx := context.Background() // create client (preferably only once at a global level in the init function of your package/service) client, _ := NewClient(ctx, &SpannerConfig{}) // An simple illustration of using an LRO exampleRpcMethod := func(ctx context.Context, req *proto.Message) (*longrunningpb.Operation, error) { // Create an Operation object used to manage LRO activities type MyState struct{} op, err := NewOperation[MyState](ctx, client) if err != nil { return nil, err } // 2. We'll ship the business logic to a go routine to run as a background task go func() error { // If relevant get the ResumePoint switch op.ResumePoint() { case "resumePoint1": goto point1 } { // This is where we will wait... // Scenario 1: simply wait for 30 seconds. op.Wait(WithSleep(30 * time.Second)) // Scenario 2: Make one or more hits to methods returning LROs, and wait for these to complete. op.Wait(WithChildOperations([]string{"operations/123", "operations/456"}...)) // Scenario 3: Customise the poll frequency and timeout op.Wait(WithChildOperations([]string{"operations/123", "operations/456"}...), WithTimeout(10*time.Minute), WithPollFrequency(30*time.Second)) // Scenario 4: If the underlying operations are from another product, you would need to use a different client to poll // the relevant GetOperation methods. The service you connect to needs to satisfy the LRO interface as defined by google.longrunning package var conn grpc.ClientConnInterface // create a connection to the relevant gRPC server myLroClient := longrunningpb.NewOperationsClient(conn) op.Wait(WithChildOperations([]string{"operations/123", "operations/456"}...), WithService(myLroClient)) // Scenario 5: If, however the operations from another product does not implement the google.longrunning service, you could use // any service that implements a GetOperation() method, therefore satisfying the OperationsService interface defined in this package. var myProductClient OperationsService op.Wait(WithChildOperations([]string{"operations/123", "operations/456"}...), WithService(myProductClient)) // Scenario 6: Wait asynchronously for a longer time. op.SetState(&MyState{}) // Explicitly set the state before waiting asynchronously op.Wait(WithSleep(24*time.Hour), WithAsync("resumePoint1")) // since we explicitly configure the Async option, we need to exit this method now. return nil } point1: // Once resumed, get the last available State and do some cool stuff. state := op.State() _ = state // And finally mark the LRO as complete. var res proto.Message // replace with actual Response definition. return op.Done(res) }() // 3. Return the LRO to the caller return op.ReturnRPC() } _, _ = exampleRpcMethod(ctx, nil)
Output:
func (*Operation[T]) Done ¶ added in v1.3.0
Done marks the operation as done with a success response.
func (*Operation[T]) GetOperation ¶ added in v1.9.3
func (o *Operation[T]) GetOperation() (*longrunningpb.Operation, error)
GetOperation retrieves the underlying longrunningpb.Operation.
func (*Operation[T]) InDevMode ¶ added in v1.11.7
InDevMode returns a true if the Operation is running locally
func (*Operation[T]) Name ¶ added in v1.11.0
Name returns the name of the underlying Operation resource.
func (*Operation[T]) ResumePoint ¶ added in v1.11.0
ResumePoint retrieves the latest point from which to resume an asynchronous operation.
func (*Operation[T]) ReturnRPC ¶ added in v1.9.2
func (o *Operation[T]) ReturnRPC() (*longrunningpb.Operation, error)
ReturnRPC returns the underlying longrunningpb.Operation for further processing or sending to other APIs.
func (*Operation[T]) SetMetadata ¶ added in v1.3.0
SetMetadata sets the metadata for the operation.
func (*Operation[T]) SetState ¶ added in v1.11.0
func (o *Operation[T]) SetState(state *T)
SaveState saves a current state with the LRO resource
func (*Operation[T]) State ¶ added in v1.11.0
func (o *Operation[T]) State() *T
GetState returns the state of the current LRO.
func (*Operation[T]) Wait ¶ added in v1.11.0
func (o *Operation[T]) Wait(opts ...WaitOption) error
Wait blocks until the operation is marked as done or the configured timeout is reached.
By default, Wait will wait for up to 7 minutes, polling every 3 seconds. These values can be configured by providing WaitOption.
A slice of child Operation names can be provided as a WaitOption. In this case, Wait will block until the parent operation and all child operations are marked as done, or until the timeout is reached.
If the operation is not done when the timeout is reached, Wait will return an ErrWaitDeadlineExceeded error.
----
Example 1:
// Simply wait for 30 seconds. op.Wait(WithSleep(30 * time.Second))
Example 2:
// Make one or more hits to methods returning LROs, and wait for these to complete. op.Wait(WithChildOperations([]string{"operations/123", "operations/456"}))
Example 3:
// Customise the poll frequency and timeout op.Wait(WithChildOperations([]string{"operations/123", "operations/456"}), WithTimeout(10*time.Minute), WithPollFrequency(30*time.Second))
Example 4:
// If the underlying operations are from another product, you would need to use a different client to poll // the relevant GetOperation methods. The service you connect to needs to satisfy the LRO interface as defined by google.longrunning package var conn grpc.ClientConnInterface // create a connection to the relevant gRPC server myLroClient := longrunningpb.NewOperationsClient(conn) op.Wait(WithChildOperations([]string{"operations/123", "operations/456"}), WithService(myLroClient))
Example 5:
// If, however the operations from another product does not implement the google.longrunning service, you could use // any service that implements a GetOperation() method, therefore satisfying the [OperationsService] interface defined in this package. var myProductClient OperationsService op.Wait(WithChildOperations([]string{"operations/123", "operations/456"}), WithService(myProductClient))
Example 6:
// Wait asynchronously for a longer time. op.SetState(&MyState{}) // Explicitly set the state before waiting asynchronously op.Wait(WithSleep(24*time.Hour), WithAsync("resumePoint1")) return nil // a `return` statement usually follows when waiting asynchronously
type OperationOption ¶ added in v1.11.0
type OperationOption func(*OperationOptions)
ClientOption is a functional option for the NewOperation method.
func WithCallbackFn ¶ added in v1.11.0
func WithCallbackFn(fn func(ctx context.Context)) OperationOption
WithCallbackFn sets the function to call back when running an Async method locally.
func WithExistingOperation ¶ added in v1.10.0
func WithExistingOperation(name string) OperationOption
WithExistingOperation allows one to instantiate a new Operation object from an existing LRO. If this is provided the underlyging NewOperation method will not create a new Operation resource nor infer one from the ctx.
func WithResumeMethod ¶ added in v1.11.0
func WithResumeMethod(name string) OperationOption
WithResumeMethod sets the fully qualified method name to be resumed
In most scenarios this will not be required since the method name will be automatically made avaialable by the grpc server exposed by the grpc.Method() Example: "/myorg.co.jobs.v1.JobsService/GenerateClientReports"
type OperationOptions ¶ added in v1.11.0
type OperationOptions struct {
// contains filtered or unexported fields
}
type OperationsService ¶ added in v1.11.0
type OperationsService interface {
GetOperation(ctx context.Context, in *longrunningpb.GetOperationRequest, opts ...grpc.CallOption) (*longrunningpb.Operation, error)
}
origin is an interface that wraps the GetOperation method. This allows us to use the GetOperation method of the service from which the operation originated, which should implement this interface if it produces longrunning operations.
type SpannerConfig ¶ added in v1.3.0
type SpannerConfig struct { // Project // The Project where the Database is deployed Project string // Spanner Instance // The instance name of the dabase, for example 'primary' Instance string // Spanner Database // The database name, for example 'myorganisation-myproduct' Database string // contains filtered or unexported fields }
SpannerConfig is used to configure the underlygin Google Cloud Spanner client.
type WaitConfig ¶ added in v1.11.0
type WaitConfig struct {
// contains filtered or unexported fields
}
WaitConfig is used to store the waiting configurations specified as functional WaitOption(s) in the Wait() and WaitAsync() methods.
type WaitOption ¶ added in v1.9.0
type WaitOption func(*WaitConfig) error
WaitOption is a functional option for WaitConfig.
func WithAsync ¶ added in v1.11.0
func WithAsync(resumePoint string) WaitOption
WithAsync allows one to exit the method and wait asynchronously.
func WithChildGetOperationEndpoint ¶ added in v1.11.0
func WithChildGetOperationEndpoint(endpoint string) WaitOption
WithChildGetOperationEndpoint is used for local development
func WithChildOperations ¶ added in v1.11.0
func WithChildOperations(operations ...string) WaitOption
WithChildOperations specifies operations for which to wait. Format: ["operations/123", "operations/456", "operations/789"]
func WithPollFrequency ¶ added in v1.9.0
func WithPollFrequency(pollFrequency time.Duration) WaitOption
WithPollFrequency specified a constant duration to use when polling the underlyging Child Operations to wait for.
func WithService ¶ added in v1.11.0
func WithService(service OperationsService) WaitOption
WithService allows one to override the underlying Operations client used to poll the child operations
func WithSleep ¶ added in v1.11.0
func WithSleep(sleep time.Duration) WaitOption
WithSleep specified a constant duration for which to wait.
func WithTimeout ¶ added in v1.9.0
func WithTimeout(timeout time.Duration) WaitOption
WithTimeout specified a constant duration after which the Wait method will return a ErrWaitDeadlineExceeded error.