client

package
v0.0.0-...-ebb4f00 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 9, 2023 License: Apache-2.0 Imports: 54 Imported by: 0

Documentation

Overview

Package client contains a high-level remote execution client library.

Index

Constants

View Source
const (
	// DefaultMaxBatchSize is the maximum size of a batch to upload with BatchWriteBlobs. We set it to slightly
	// below 4 MB, because that is the limit of a message size in gRPC
	DefaultMaxBatchSize = 4*1024*1024 - 1024

	// DefaultMaxBatchDigests is a suggested approximate limit based on current RBE implementation.
	// Above that BatchUpdateBlobs calls start to exceed a typical minute timeout.
	DefaultMaxBatchDigests = 4000

	// DefaultMaxQueryBatchDigests is a suggested limit for the number of items for in batch for a missing blobs query.
	DefaultMaxQueryBatchDigests = 10_000

	// DefaultDirMode is mode used to create directories.
	DefaultDirMode = 0777

	// DefaultExecutableMode is mode used to create executable files.
	DefaultExecutableMode = 0777

	// DefaultRegularMode is mode used to create non-executable files.
	DefaultRegularMode = 0644
)
View Source
const DefaultCASConcurrency = 500

DefaultCASConcurrency is the default maximum number of concurrent upload and download operations.

View Source
const DefaultCompressedBytestreamThreshold = -1

DefaultCompressedBytestreamThreshold is the default threshold, in bytes, for transferring blobs compressed on ByteStream.Write RPCs.

View Source
const DefaultMaxConcurrentRequests = 25

DefaultMaxConcurrentRequests specifies the default maximum number of concurrent requests on a single connection that the GRPC balancer can perform.

View Source
const DefaultMaxConcurrentStreams = 25

DefaultMaxConcurrentStreams specifies the default threshold value at which the GRPC balancer should create new sub-connections.

View Source
const DefaultUnifiedDownloadBufferSize = 10000

DefaultUnifiedDownloadBufferSize is the default UnifiedDownloadBufferSize.

View Source
const DefaultUnifiedDownloadTickDuration = UnifiedDownloadTickDuration(50 * time.Millisecond)

DefaultUnifiedDownloadTickDuration is the default UnifiedDownloadTickDuration.

View Source
const DefaultUnifiedUploadBufferSize = 10000

DefaultUnifiedUploadBufferSize is the default UnifiedUploadBufferSize.

View Source
const DefaultUnifiedUploadTickDuration = UnifiedUploadTickDuration(50 * time.Millisecond)

DefaultUnifiedUploadTickDuration is the default UnifiedUploadTickDuration.

View Source
const (

	// HomeDirMacro is replaced by the current user's home dir in the CredFile dial parameter.
	HomeDirMacro = "${HOME}"
)

Variables

View Source
var DefaultRPCTimeouts = map[string]time.Duration{
	"default":          20 * time.Second,
	"GetCapabilities":  5 * time.Second,
	"BatchUpdateBlobs": time.Minute,
	"BatchReadBlobs":   time.Minute,
	"GetTree":          time.Minute,

	"Execute":       0,
	"WaitExecution": 0,
}

DefaultRPCTimeouts contains the default timeout of various RPC calls to RBE.

View Source
var ErrEmptySegment = errors.New("empty segment in resoure name")

ErrEmptySegment indicates an attempt to construct a resource name with an empty segment.

Functions

func NewCompressedWriteBuffer

func NewCompressedWriteBuffer(w io.Writer) (io.WriteCloser, chan error, error)

NewCompressedWriteBuffer creates wraps a io.Writer contained compressed contents to write decompressed contents.

func OperationStatus

func OperationStatus(op *oppb.Operation) *status.Status

OperationStatus returns an operation error status, if it is present, and nil otherwise.

Types

type Action

type Action struct {
	// Args are the command-line arguments to start the process. The first argument is the process
	// name, and the rest are its arguments.
	Args []string
	// EnvVars are the variables to add to the process's environment.
	EnvVars map[string]string
	// InputRoot and InputFiles contain the details of the input tree, in remote execution format.
	// They should normally be constructed through the PackageTree function.
	InputRoot  digest.Digest
	InputFiles map[digest.Digest][]byte
	// OutputFiles is a list of output files requested (full paths).
	OutputFiles []string
	// OutputDirs is a list of output directories requested (full paths).
	OutputDirs []string
	// Docker image is a docker:// URL to the docker image in which execution will take place.
	DockerImage string
	// Timeout is the maximum execution time for the action. Note that it's not an overall timeout on
	// the process, since there may be additional time for transferring files, waiting for a worker to
	// become available, or other overhead.
	//
	// If 0, the server's default timeout is used.
	Timeout time.Duration
	// DoNotCache, if true, indicates that the result of this action should never be cached. It
	// implies SkipCache.
	DoNotCache bool
	// SkipCache, if true, indicates that this action should be executed even if there is a copy of
	// its result in the action cache that could be used instead.
	SkipCache bool
}

Action encodes the full details of an action to be sent to the remote execution service for execution. It corresponds roughly, but not exactly, to the Action proto used by the Remote Execution API.

type AuthType

type AuthType int

AuthType indicates the type of authentication being used.

const (
	// UnknownAuth refers to unknown authentication type.
	UnknownAuth AuthType = iota

	// NoAuth refers to no authentication when connecting to the RBE service.
	NoAuth

	// ExternalTokenAuth is used to connect to the RBE service.
	ExternalTokenAuth

	// CredsFileAuth refers to a JSON credentials file used to connect to the RBE service.
	CredsFileAuth

	// ApplicationDefaultCredsAuth refers to Google Application default credentials that is
	// used to connect to the RBE service.
	ApplicationDefaultCredsAuth

	// GCECredsAuth refers to GCE machine credentials that is
	// used to connect to the RBE service.
	GCECredsAuth
)

func Dial

func Dial(ctx context.Context, endpoint string, params DialParams) (*grpc.ClientConn, AuthType, error)

Dial dials a given endpoint and returns the grpc connection that is established.

func DialRaw

func DialRaw(ctx context.Context, params DialParams) (*grpc.ClientConn, AuthType, error)

DialRaw dials a remote execution service and returns the grpc connection that is established. TODO(olaola): remove this overload when all clients use Dial.

func (AuthType) String

func (a AuthType) String() string

String returns a human readable form of authentication used to connect to RBE.

type CASConcurrency

type CASConcurrency int

CASConcurrency is the number of simultaneous requests that will be issued for CAS upload and download operations.

func (CASConcurrency) Apply

func (cy CASConcurrency) Apply(c *Client)

Apply sets the CASConcurrency flag on a client.

type ChunkMaxSize

type ChunkMaxSize int

ChunkMaxSize is maximum chunk size to use in Bytestream wrappers.

func (ChunkMaxSize) Apply

func (s ChunkMaxSize) Apply(c *Client)

Apply sets the client's maximal chunk size s.

type Client

type Client struct {
	// InstanceName is the instance name for the targeted remote execution instance; e.g. for Google
	// RBE: "projects/<foo>/instances/default_instance".
	// It should NOT be used to construct resource names, but rather only for reusing the instance name as is.
	// Use the ResourceName method to create correctly formatted resource names.
	InstanceName string

	// Retrier is the Retrier that is used for RPCs made by this client.
	//
	// These fields are logically "protected" and are intended for use by extensions of Client.
	Retrier       *Retrier
	Connection    *grpc.ClientConn
	CASConnection *grpc.ClientConn // Can be different from Connection a separate CAS endpoint is provided.
	// StartupCapabilities denotes whether to load ServerCapabilities on startup.
	StartupCapabilities StartupCapabilities
	// LegacyExecRootRelativeOutputs denotes whether outputs are relative to the exec root.
	LegacyExecRootRelativeOutputs LegacyExecRootRelativeOutputs
	// ChunkMaxSize is maximum chunk size to use for CAS uploads/downloads.
	ChunkMaxSize ChunkMaxSize
	// CompressedBytestreamThreshold is the threshold in bytes for which blobs are read and written
	// compressed. Use 0 for all writes being compressed, and a negative number for all operations being
	// uncompressed.
	CompressedBytestreamThreshold CompressedBytestreamThreshold
	// UploadCompressionPredicate is a function called to decide whether a blob should be compressed for upload.
	UploadCompressionPredicate UploadCompressionPredicate
	// MaxBatchDigests is maximum amount of digests to batch in upload and download operations.
	MaxBatchDigests MaxBatchDigests
	// MaxQueryBatchDigests is maximum amount of digests to batch in CAS query operations.
	MaxQueryBatchDigests MaxQueryBatchDigests
	// MaxBatchSize is maximum size in bytes of a batch request for batch operations.
	MaxBatchSize MaxBatchSize
	// DirMode is mode used to create directories.
	DirMode os.FileMode
	// ExecutableMode is mode used to create executable files.
	ExecutableMode os.FileMode
	// RegularMode is mode used to create non-executable files.
	RegularMode os.FileMode
	// UtilizeLocality is to specify whether client downloads files utilizing disk access locality.
	UtilizeLocality UtilizeLocality
	// UnifiedUploads specifies whether the client uploads files in the background.
	UnifiedUploads UnifiedUploads
	// UnifiedUploadBufferSize specifies when the unified upload daemon flushes the pending requests.
	UnifiedUploadBufferSize UnifiedUploadBufferSize
	// UnifiedUploadTickDuration specifies how often the unified upload daemon flushes the pending requests.
	UnifiedUploadTickDuration UnifiedUploadTickDuration
	// UnifiedDownloads specifies whether the client downloads files in the background.
	UnifiedDownloads UnifiedDownloads
	// UnifiedDownloadBufferSize specifies when the unified download daemon flushes the pending requests.
	UnifiedDownloadBufferSize UnifiedDownloadBufferSize
	// UnifiedDownloadTickDuration specifies how often the unified download daemon flushes the pending requests.
	UnifiedDownloadTickDuration UnifiedDownloadTickDuration
	// TreeSymlinkOpts controls how symlinks are handled when constructing a tree.
	TreeSymlinkOpts *TreeSymlinkOpts
	// contains filtered or unexported fields
}

Client is a client to several services, including remote execution and services used in conjunction with remote execution. A Client must be constructed by calling Dial() or NewClient() rather than attempting to assemble it directly.

Unless specified otherwise, and provided the fields are not modified, a Client is safe for concurrent use.

func NewClient

func NewClient(ctx context.Context, instanceName string, params DialParams, opts ...Opt) (*Client, error)

NewClient connects to a remote execution service and returns a client suitable for higher-level functionality.

func NewClientFromConnection

func NewClientFromConnection(ctx context.Context, instanceName string, conn, casConn *grpc.ClientConn, opts ...Opt) (*Client, error)

NewClientFromConnection creates a client from gRPC connections to a remote execution service and a cas service.

func (*Client) BatchDownloadBlobs

func (c *Client) BatchDownloadBlobs(ctx context.Context, dgs []digest.Digest) (map[digest.Digest][]byte, error)

BatchDownloadBlobs downloads a number of blobs from the CAS to memory. They must collectively be below the maximum total size for a batch read, which is about 4 MB (see MaxBatchSize). Digests must be computed in advance by the caller. In case multiple errors occur during the blob read, the last error will be returned.

func (*Client) BatchDownloadBlobsWithStats

func (c *Client) BatchDownloadBlobsWithStats(ctx context.Context, dgs []digest.Digest) (map[digest.Digest]CompressedBlobInfo, error)

func (*Client) BatchReadBlobs

func (c *Client) BatchReadBlobs(ctx context.Context, req *repb.BatchReadBlobsRequest) (res *repb.BatchReadBlobsResponse, err error)

BatchReadBlobs wraps the underlying call with specific client options. NOTE that its retry logic ignores the per-blob errors embedded in the response. It is recommended to use BatchDownloadBlobs instead.

func (*Client) BatchUpdateBlobs

func (c *Client) BatchUpdateBlobs(ctx context.Context, req *repb.BatchUpdateBlobsRequest) (res *repb.BatchUpdateBlobsResponse, err error)

BatchUpdateBlobs wraps the underlying call with specific client options. NOTE that its retry logic ignores the per-blob errors embedded in the response; you probably want to use BatchWriteBlobs() instead.

func (*Client) BatchWriteBlobs

func (c *Client) BatchWriteBlobs(ctx context.Context, blobs map[digest.Digest][]byte) error

BatchWriteBlobs (over)writes specified blobs to the CAS, regardless if they already exist.

The collective size must be below the maximum total size for a batch upload, which is about 4 MB (see MaxBatchSize). In case multiple errors occur during the blob upload, the last error is returned.

func (*Client) CallWithTimeout

func (c *Client) CallWithTimeout(ctx context.Context, rpcName string, f func(ctx context.Context) error) error

CallWithTimeout executes the given function f with a context that times out after an RPC timeout.

This method is logically "protected" and is intended for use by extensions of Client.

func (*Client) CancelOperation

func (c *Client) CancelOperation(ctx context.Context, req *oppb.CancelOperationRequest) (res *emptypb.Empty, err error)

CancelOperation wraps the underlying call with specific client options.

func (*Client) CheckActionCache

func (c *Client) CheckActionCache(ctx context.Context, acDg *repb.Digest) (*repb.ActionResult, error)

CheckActionCache queries remote action cache, returning an ActionResult or nil if it doesn't exist.

func (*Client) CheckCapabilities

func (c *Client) CheckCapabilities(ctx context.Context) (err error)

CheckCapabilities verifies that this client can work with the remote server in terms of API version and digest function. It sets some client parameters according to remote server preferences, like MaxBatchSize.

func (*Client) Close

func (c *Client) Close() error

Close closes the underlying gRPC connection(s).

func (*Client) ComputeMerkleTree

func (c *Client) ComputeMerkleTree(ctx context.Context, execRoot, workingDir, remoteWorkingDir string, is *command.InputSpec, cache filemetadata.Cache) (root digest.Digest, inputs []*uploadinfo.Entry, stats *TreeStats, err error)

ComputeMerkleTree packages an InputSpec into uploadable inputs, returned as uploadinfo.Entrys

func (*Client) ComputeOutputsToUpload

func (c *Client) ComputeOutputsToUpload(execRoot, workingDir string, paths []string, cache filemetadata.Cache, sb command.SymlinkBehaviorType, nodeProperties map[string]*cpb.NodeProperties) (map[digest.Digest]*uploadinfo.Entry, *repb.ActionResult, error)

ComputeOutputsToUpload transforms the provided local output paths into uploadable Chunkers. The paths have to be relative to execRoot. It also populates the remote ActionResult, packaging output directories as trees where required.

func (*Client) DeleteOperation

func (c *Client) DeleteOperation(ctx context.Context, req *oppb.DeleteOperationRequest) (res *emptypb.Empty, err error)

DeleteOperation wraps the underlying call with specific client options.

func (*Client) DownloadActionOutputs

func (c *Client) DownloadActionOutputs(ctx context.Context, resPb *repb.ActionResult, outDir string, cache filemetadata.Cache) (*MovedBytesMetadata, error)

DownloadActionOutputs downloads the output files and directories in the given action result. It returns the amount of downloaded bytes. It returns the number of logical and real bytes downloaded, which may be different from sum of sizes of the files due to dedupping and compression.

func (*Client) DownloadDirectory

func (c *Client) DownloadDirectory(ctx context.Context, d digest.Digest, outDir string, cache filemetadata.Cache) (map[string]*TreeOutput, *MovedBytesMetadata, error)

DownloadDirectory downloads the entire directory of given digest. It returns the number of logical and real bytes downloaded, which may be different from sum of sizes of the files due to dedupping and compression.

func (*Client) DownloadFiles

func (c *Client) DownloadFiles(ctx context.Context, outDir string, outputs map[digest.Digest]*TreeOutput) (*MovedBytesMetadata, error)

DownloadFiles downloads the output files under |outDir|. It returns the number of logical and real bytes downloaded, which may be different from sum of sizes of the files due to dedupping and compression.

func (*Client) DownloadOutputs

func (c *Client) DownloadOutputs(ctx context.Context, outs map[string]*TreeOutput, outDir string, cache filemetadata.Cache) (*MovedBytesMetadata, error)

DownloadOutputs downloads the specified outputs. It returns the amount of downloaded bytes. It returns the number of logical and real bytes downloaded, which may be different from sum of sizes of the files due to dedupping and compression.

func (*Client) Execute

func (c *Client) Execute(ctx context.Context, req *repb.ExecuteRequest) (res regrpc.Execution_ExecuteClient, err error)

Execute wraps the underlying call with specific client options. The wrapper is here for completeness to provide access to the low-level RPCs. Prefer using higher-level ExecuteAndWait instead, as it includes retries/timeouts handling.

func (*Client) ExecuteAction

func (c *Client) ExecuteAction(ctx context.Context, ac *Action) (*repb.ActionResult, error)

ExecuteAction performs all of the steps necessary to execute an action, including checking the cache if applicable, uploading necessary protos and inputs to the CAS, queueing the action, and waiting for the result.

Execute may block for a long time while the action is in progress. Currently, two-phase queue-wait is not supported; the token necessary to query the job is not provided to users.

This method MAY return a non-nil ActionResult along with a non-nil error if the action failed. The ActionResult may include, for example, the stdout/stderr digest from the attempt.

ExecuteAction is a convenience method which wraps both PrepAction and ExecuteAndWait, along with other steps such as uploading extra inputs and parsing Operation protos.

func (*Client) ExecuteAndWait

func (c *Client) ExecuteAndWait(ctx context.Context, req *repb.ExecuteRequest) (op *oppb.Operation, err error)

ExecuteAndWait calls Execute on the underlying client and WaitExecution if necessary. It returns the completed operation or an error.

The retry logic is complicated. Assuming retries are enabled, we want the retry to call WaitExecution if there's an Operation "in progress", and to call Execute otherwise. In practice that means:

  1. If an error occurs before the first operation is returned, or after the final operation is returned (i.e. the one with op.Done==true), retry by calling Execute again.
  2. Otherwise, retry by calling WaitExecution with the last operation name.

In addition, we want the retrier to trigger based on certain operation statuses as well as on explicit errors. (The shouldRetry function knows which statuses.) We do this by mapping statuses, if present, to errors inside the closure and then throwing away such "fake" errors outside the closure (if we ran out of retries or if there was never a retrier enabled). The exception is deadline-exceeded statuses, which we never give to the retrier (and hence will always propagate directly to the caller).

func (*Client) ExecuteAndWaitProgress

func (c *Client) ExecuteAndWaitProgress(ctx context.Context, req *repb.ExecuteRequest, progress func(metadata *repb.ExecuteOperationMetadata)) (op *oppb.Operation, err error)

ExecuteAndWaitProgress calls Execute on the underlying client and WaitExecution if necessary. It returns the completed operation or an error. The supplied callback function is called for each message received to update the state of the remote action.

func (*Client) FindMissingBlobs

func (c *Client) FindMissingBlobs(ctx context.Context, req *repb.FindMissingBlobsRequest) (res *repb.FindMissingBlobsResponse, err error)

FindMissingBlobs wraps the underlying call with specific client options.

func (*Client) FlattenActionOutputs

func (c *Client) FlattenActionOutputs(ctx context.Context, ar *repb.ActionResult) (map[string]*TreeOutput, error)

FlattenActionOutputs collects and flattens all the outputs of an action. It downloads the output directory metadata, if required, but not the leaf file blobs.

func (*Client) FlattenTree

func (c *Client) FlattenTree(tree *repb.Tree, rootPath string) (map[string]*TreeOutput, error)

FlattenTree takes a Tree message and calculates the relative paths of all the files to the tree root. Note that only files/symlinks/empty directories are included in the returned slice, not the intermediate directories. Directories containing only other directories will be omitted.

func (*Client) GetActionResult

func (c *Client) GetActionResult(ctx context.Context, req *repb.GetActionResultRequest) (res *repb.ActionResult, err error)

GetActionResult wraps the underlying call with specific client options.

func (*Client) GetBackendCapabilities

func (c *Client) GetBackendCapabilities(ctx context.Context, conn *grpc.ClientConn, req *repb.GetCapabilitiesRequest) (res *repb.ServerCapabilities, err error)

GetBackendCapabilities returns the capabilities for a specific server connection (either the main connection or the CAS connection).

func (*Client) GetCapabilities

func (c *Client) GetCapabilities(ctx context.Context) (res *repb.ServerCapabilities, err error)

GetCapabilities returns the capabilities for the targeted servers. If the CAS URL was set differently to the execution server then the CacheCapabilities will be determined from that; ExecutionCapabilities will always come from the main URL.

func (*Client) GetCapabilitiesForInstance

func (c *Client) GetCapabilitiesForInstance(ctx context.Context, instance string) (res *repb.ServerCapabilities, err error)

GetCapabilitiesForInstance returns the capabilities for the targeted servers. If the CAS URL was set differently to the execution server then the CacheCapabilities will be determined from that; ExecutionCapabilities will always come from the main URL.

func (*Client) GetDirectoryTree

func (c *Client) GetDirectoryTree(ctx context.Context, d *repb.Digest) (result []*repb.Directory, err error)

GetDirectoryTree returns the entire directory tree rooted at the given digest (which must target a Directory stored in the CAS).

func (*Client) GetOperation

func (c *Client) GetOperation(ctx context.Context, req *oppb.GetOperationRequest) (res *oppb.Operation, err error)

GetOperation wraps the underlying call with specific client options.

func (*Client) GetTree

GetTree wraps the underlying call with specific client options. The wrapper is here for completeness to provide access to the low-level RPCs. Prefer using higher-level GetDirectoryTree instead, as it includes retries/timeouts handling.

func (*Client) IsCasNG

func (c *Client) IsCasNG() bool

IsCasNG returns true if casng feature flag is turned on.

func (*Client) ListOperations

func (c *Client) ListOperations(ctx context.Context, req *oppb.ListOperationsRequest) (res *oppb.ListOperationsResponse, err error)

ListOperations wraps the underlying call with specific client options.

func (*Client) MissingBlobs

func (c *Client) MissingBlobs(ctx context.Context, digests []digest.Digest) ([]digest.Digest, error)

MissingBlobs queries the CAS to determine if it has the specified blobs. Returns a slice of missing blobs.

func (*Client) NgUpload

func (c *Client) NgUpload(ctx context.Context, reqs ...casng.UploadRequest) ([]digest.Digest, casng.Stats, error)

NgUpload delegates to Upload of the casng package.

func (*Client) NgUploadTree

func (c *Client) NgUploadTree(ctx context.Context, execRoot impath.Absolute, workingDir, remoteWorkingDir impath.Relative, reqs ...casng.UploadRequest) (rootDigest digest.Digest, uploaded []digest.Digest, stats casng.Stats, err error)

NgUploadTree delegates to UploadTree of the casng package.

func (*Client) PrepAction

func (c *Client) PrepAction(ctx context.Context, ac *Action) (*repb.Digest, *repb.ActionResult, error)

PrepAction constructs the Command and Action protos, checks the action cache if appropriate, and uploads the action if the cache was not checked or if there was no cache hit. If successful, PrepAction returns the digest of the Action and a (possibly nil) pointer to an ActionResult representing the result of the cache check, if any.

func (*Client) QueryWriteStatus

func (c *Client) QueryWriteStatus(ctx context.Context, req *bspb.QueryWriteStatusRequest) (res *bspb.QueryWriteStatusResponse, err error)

QueryWriteStatus wraps the underlying call with specific client options.

func (*Client) RPCOpts

func (c *Client) RPCOpts() []grpc.CallOption

RPCOpts returns the default RPC options that should be used for calls made with this client.

This method is logically "protected" and is intended for use by extensions of Client.

func (*Client) Read

func (c *Client) Read(ctx context.Context, req *bspb.ReadRequest) (res bsgrpc.ByteStream_ReadClient, err error)

Read wraps the underlying call with specific client options. The wrapper is here for completeness to provide access to the low-level RPCs. Prefer using higher-level functions such as ReadBlob(ToFile) instead, as they include retries/timeouts handling.

func (*Client) ReadBlob

func (c *Client) ReadBlob(ctx context.Context, d digest.Digest) ([]byte, *MovedBytesMetadata, error)

ReadBlob fetches a blob from the CAS into a byte slice. Returns the size of the blob and the amount of bytes moved through the wire.

func (*Client) ReadBlobRange

func (c *Client) ReadBlobRange(ctx context.Context, d digest.Digest, offset, limit int64) ([]byte, *MovedBytesMetadata, error)

ReadBlobRange fetches a partial blob from the CAS into a byte slice, starting from offset bytes and including at most limit bytes (or no limit if limit==0). The offset must be non-negative and no greater than the size of the entire blob. The limit must not be negative, but offset+limit may be greater than the size of the entire blob.

func (*Client) ReadBlobToFile

func (c *Client) ReadBlobToFile(ctx context.Context, d digest.Digest, fpath string) (*MovedBytesMetadata, error)

ReadBlobToFile fetches a blob with a provided digest name from the CAS, saving it into a file. It returns the number of bytes read.

func (*Client) ReadBytes

func (c *Client) ReadBytes(ctx context.Context, name string) ([]byte, error)

ReadBytes fetches a resource's contents into a byte slice.

ReadBytes panics with ErrTooLarge if an attempt is made to read a resource with contents too large to fit into a byte array.

func (*Client) ReadProto

func (c *Client) ReadProto(ctx context.Context, d digest.Digest, msg proto.Message) (*MovedBytesMetadata, error)

ReadProto reads a blob from the CAS and unmarshals it into the given message. Returns the size of the proto and the amount of bytes moved through the wire.

func (*Client) ReadResourceTo

func (c *Client) ReadResourceTo(ctx context.Context, name string, w io.Writer) (int64, error)

ReadResourceTo writes a resource's contents to a Writer.

func (*Client) ReadResourceToFile

func (c *Client) ReadResourceToFile(ctx context.Context, name, fpath string) (int64, error)

ReadResourceToFile fetches a resource's contents, saving it into a file.

The provided resource name must be a child resource of this client's instance, e.g. '/blobs/abc-123/45' (NOT 'projects/foo/bar/baz').

The number of bytes read is returned.

func (*Client) ResourceName

func (c *Client) ResourceName(segments ...string) (string, error)

ResourceName constructs a correctly formatted resource name as defined in the spec. No keyword validation is performed since the semantics of the path are defined by the server. See: https://github.com/bazelbuild/remote-apis/blob/cb8058798964f0adf6dbab2f4c2176ae2d653447/build/bazel/remote/execution/v2/remote_execution.proto#L223

func (*Client) ResourceNameCompressedWrite

func (c *Client) ResourceNameCompressedWrite(hash string, sizeBytes int64) string

ResourceNameCompressedWrite generates a valid write resource name. TODO(rubensf): Converge compressor to proto in https://github.com/bazelbuild/remote-apis/pull/168 once that gets merged in.

func (*Client) ResourceNameWrite

func (c *Client) ResourceNameWrite(hash string, sizeBytes int64) string

ResourceNameWrite generates a valid write resource name.

func (*Client) RunBackgroundTasks

func (c *Client) RunBackgroundTasks(ctx context.Context)

RunBackgroundTasks starts background goroutines for the client.

func (*Client) SupportsActionPlatformProperties

func (c *Client) SupportsActionPlatformProperties() bool

SupportsActionPlatformProperties returns whether the server's RE API version supports the `Action.platform_properties` field.

func (*Client) SupportsCommandOutputPaths

func (c *Client) SupportsCommandOutputPaths() bool

SupportsCommandOutputPaths returns whether the server's RE API version supports the `Command.action_paths` field.

func (*Client) UpdateActionResult

func (c *Client) UpdateActionResult(ctx context.Context, req *repb.UpdateActionResultRequest) (res *repb.ActionResult, err error)

UpdateActionResult wraps the underlying call with specific client options.

func (*Client) UploadIfMissing

func (c *Client) UploadIfMissing(ctx context.Context, entries ...*uploadinfo.Entry) ([]digest.Digest, int64, error)

UploadIfMissing writes the missing blobs from those specified to the CAS.

The blobs are first matched against existing ones and only the missing blobs are written. Returns a slice of missing digests that were written and the sum of total bytes moved, which may be different from logical bytes moved (i.e. sum of digest sizes) due to compression.

func (*Client) WaitExecution

func (c *Client) WaitExecution(ctx context.Context, req *repb.WaitExecutionRequest) (res regrpc.Execution_ExecuteClient, err error)

WaitExecution wraps the underlying call with specific client options. The wrapper is here for completeness to provide access to the low-level RPCs. Prefer using higher-level ExecuteAndWait instead, as it includes retries/timeouts handling.

func (*Client) Write

func (c *Client) Write(ctx context.Context) (res bsgrpc.ByteStream_WriteClient, err error)

Write wraps the underlying call with specific client options. The wrapper is here for completeness to provide access to the low-level RPCs. Prefer using higher-level functions such as WriteBlob(s) instead, as they include retries/timeouts handling.

func (*Client) WriteBlob

func (c *Client) WriteBlob(ctx context.Context, blob []byte) (digest.Digest, error)

WriteBlob (over)writes a blob to the CAS regardless if it already exists.

func (*Client) WriteBlobs

func (c *Client) WriteBlobs(ctx context.Context, blobs map[digest.Digest][]byte) error

WriteBlobs is a proxy method for UploadIfMissing that facilitates specifying a map of digest-to-blob. It's intended for use with PackageTree. TODO(olaola): rethink the API of this layer: * Do we want to allow []byte uploads, or require the user to construct Chunkers? * How to consistently distinguish in the API between should we use GetMissing or not? * Should BatchWrite be a public method at all?

func (*Client) WriteBytes

func (c *Client) WriteBytes(ctx context.Context, name string, data []byte) error

WriteBytes uploads a byte slice.

func (*Client) WriteBytesAtRemoteOffset

func (c *Client) WriteBytesAtRemoteOffset(ctx context.Context, name string, data []byte, doNotFinalize bool, initialOffset int64) (int64, error)

WriteBytesAtRemoteOffset uploads a byte slice with a given resource name to the CAS at an arbitrary offset but retries still resend from the initial Offset. As of now(2023-02-08), ByteStream.WriteRequest.FinishWrite and an arbitrary offset are supported for uploads with LogStream resource name. If doNotFinalize is set to true, ByteStream.WriteRequest.FinishWrite will be set to false.

func (*Client) WriteProto

func (c *Client) WriteProto(ctx context.Context, msg proto.Message) (digest.Digest, error)

WriteProto is a proxy method for WriteBlob that allows specifying a proto to write.

type CompressedBlobInfo

type CompressedBlobInfo struct {
	CompressedSize int64
	Data           []byte
}

CompressedBlobInfo is primarily used to store stats about compressed blob size in addition to the actual blob data.

type CompressedBytestreamThreshold

type CompressedBytestreamThreshold int64

CompressedBytestreamThreshold is the threshold for compressing blobs when writing/reading. See comment in related field on the Client struct.

func (CompressedBytestreamThreshold) Apply

Apply sets the client's maximal chunk size s.

type DialParams

type DialParams struct {
	// Service contains the address of remote execution service.
	Service string

	// CASService contains the address of the CAS service, if it is separate from
	// the remote execution service.
	CASService string

	// UseApplicationDefault indicates that the default credentials should be used.
	UseApplicationDefault bool

	// UseComputeEngine indicates that the default CE credentials should be used.
	UseComputeEngine bool

	// UseExternalAuthToken indicates whether an externally specified auth token should be used.
	// If set to true, ExternalPerRPCCreds should also be non-nil.
	UseExternalAuthToken bool

	// ExternalPerRPCCreds refers to the per RPC credentials that should be used for each RPC.
	ExternalPerRPCCreds *PerRPCCreds

	// CredFile is the JSON file that contains the credentials for RPCs.
	CredFile string

	// ActAsAccount is the service account to act as when making RPC calls.
	ActAsAccount string

	// NoSecurity is true if there is no security: no credentials are configured
	// (NoAuth is implied) and grpc.WithInsecure() is passed in. Should only be
	// used in test code.
	NoSecurity bool

	// NoAuth is true if TLS is enabled (NoSecurity is false) but the client does
	// not need to authenticate with the server.
	NoAuth bool

	// TransportCredsOnly is true if it's the caller's responsibility to set per-RPC credentials
	// on individual calls. This overrides ActAsAccount, UseApplicationDefault, and UseComputeEngine.
	// This is not the same as NoSecurity, as transport credentials will still be set.
	TransportCredsOnly bool

	// TLSCACertFile is the PEM file that contains TLS root certificates.
	TLSCACertFile string

	// TLSServerName overrides the server name sent in TLS, if set to a non-empty string.
	TLSServerName string

	// DialOpts defines the set of gRPC DialOptions to apply, in addition to any used internally.
	DialOpts []grpc.DialOption

	// MaxConcurrentRequests specifies the maximum number of concurrent RPCs on a single connection.
	MaxConcurrentRequests uint32

	// MaxConcurrentStreams specifies the maximum number of concurrent stream RPCs on a single connection.
	MaxConcurrentStreams uint32

	// TLSClientAuthCert specifies the public key in PEM format for using mTLS auth to connect to the RBE service.
	//
	// If this is specified, TLSClientAuthKey must also be specified.
	TLSClientAuthCert string

	// TLSClientAuthKey specifies the private key for using mTLS auth to connect to the RBE service.
	//
	// If this is specified, TLSClientAuthCert must also be specified.
	TLSClientAuthKey string
}

DialParams contains all the parameters that Dial needs.

type DirMode

type DirMode os.FileMode

DirMode is mode used to create directories.

func (DirMode) Apply

func (m DirMode) Apply(c *Client)

Apply sets the client's DirMode to m.

type ExecutableMode

type ExecutableMode os.FileMode

ExecutableMode is mode used to create executable files.

func (ExecutableMode) Apply

func (m ExecutableMode) Apply(c *Client)

Apply sets the client's ExecutableMode to m.

type InitError

type InitError struct {
	// Err refers to the underlying client initialization error.
	Err error

	// AuthUsed stores the type of authentication used to connect to RBE.
	AuthUsed AuthType
}

InitError is used to wrap the error returned when initializing a new client to also indicate the type of authentication used.

func (*InitError) Error

func (ce *InitError) Error() string

Error returns a string error that includes information about the type of auth used to connect to RBE.

type LegacyExecRootRelativeOutputs

type LegacyExecRootRelativeOutputs bool

LegacyExecRootRelativeOutputs controls whether the client uses legacy behavior of treating output paths as relative to the exec root instead of the working directory.

func (LegacyExecRootRelativeOutputs) Apply

Apply sets the LegacyExecRootRelativeOutputs flag on a client.

type MaxBatchDigests

type MaxBatchDigests int

MaxBatchDigests is maximum amount of digests to batch in upload and download operations.

func (MaxBatchDigests) Apply

func (s MaxBatchDigests) Apply(c *Client)

Apply sets the client's maximal batch digests to s.

type MaxBatchSize

type MaxBatchSize int64

MaxBatchSize is maximum size in bytes of a batch request for batch operations.

func (MaxBatchSize) Apply

func (s MaxBatchSize) Apply(c *Client)

Apply sets the client's maximum batch size to s.

type MaxQueryBatchDigests

type MaxQueryBatchDigests int

MaxQueryBatchDigests is maximum amount of digests to batch in query operations.

func (MaxQueryBatchDigests) Apply

func (s MaxQueryBatchDigests) Apply(c *Client)

Apply sets the client's maximal batch digests to s.

type MovedBytesMetadata

type MovedBytesMetadata struct {
	// Requested is the sum of the sizes in bytes for all the uncompressed
	// blobs needed by the execution. It includes bytes that might have
	// been deduped and thus not passed through the wire.
	Requested int64
	// LogicalMoved is the sum of the sizes in bytes of the uncompressed
	// versions of the blobs passed through the wire. It does not included
	// bytes for blobs that were de-duped.
	LogicalMoved int64
	// RealMoved is the sum of sizes in bytes for all blobs passed
	// through the wire in the format they were passed through (eg
	// compressed).
	RealMoved int64
	// Cached is amount of logical bytes that we did not have to move
	// through the wire because they were de-duped.
	Cached int64
}

MovedBytesMetadata represents the bytes moved in CAS related requests.

type Opt

type Opt interface {
	Apply(*Client)
}

Opt is an option that can be passed to Dial in order to configure the behaviour of the client.

type PerRPCCreds

type PerRPCCreds struct {
	Creds credentials.PerRPCCredentials
}

PerRPCCreds sets per-call options that will be set on all RPCs to the underlying connection.

func (*PerRPCCreds) Apply

func (p *PerRPCCreds) Apply(c *Client)

Apply saves the per-RPC creds in the Client.

type RPCTimeouts

type RPCTimeouts map[string]time.Duration

RPCTimeouts is a Opt that sets the per-RPC deadline. The keys are RPC names. The "default" key, if present, is the default timeout. 0 values are valid and indicate no timeout.

func (RPCTimeouts) Apply

func (d RPCTimeouts) Apply(c *Client)

Apply applies the timeouts to a Client. It overrides the provided values, but doesn't remove/alter any other present values.

type RegularMode

type RegularMode os.FileMode

RegularMode is mode used to create non-executable files.

func (RegularMode) Apply

func (m RegularMode) Apply(c *Client)

Apply sets the client's RegularMode to m.

type Retrier

type Retrier struct {
	Backoff     retry.BackoffPolicy
	ShouldRetry retry.ShouldRetry
}

Retrier applied to all client requests.

func RetryTransient

func RetryTransient() *Retrier

RetryTransient is a default retry policy for transient status codes.

func (*Retrier) Apply

func (r *Retrier) Apply(c *Client)

Apply sets the client's retrier function to r.

func (*Retrier) Do

func (r *Retrier) Do(ctx context.Context, f func() error) error

Do executes f() with retries. It can be called with a nil receiver; in that case no retries are done (just a passthrough call to f()).

type StartupCapabilities

type StartupCapabilities bool

StartupCapabilities controls whether the client should attempt to fetch the remote server capabilities on New. If set to true, some configuration such as MaxBatchSize is set according to the remote server capabilities instead of using the provided values.

func (StartupCapabilities) Apply

func (s StartupCapabilities) Apply(c *Client)

Apply sets the StartupCapabilities flag on a client.

type StatusError

type StatusError struct {
	// contains filtered or unexported fields
}

StatusError is the same as status.Error except it includes the error details in the error message.

func StatusDetailedError

func StatusDetailedError(st *status.Status) *StatusError

StatusDetailedError creates a StatusError from Status, which is the same as st.Err() except it includes the error details in the error message.

func (*StatusError) Error

func (e *StatusError) Error() string

func (*StatusError) GRPCStatus

func (e *StatusError) GRPCStatus() *status.Status

GRPCStatus returns the Status represented by e.

func (*StatusError) Is

func (e *StatusError) Is(target error) bool

Is implements error.Is functionality. A StatusError is equivalent if the code and message are identical.

type TreeOutput

type TreeOutput struct {
	Digest           digest.Digest
	Path             string
	IsExecutable     bool
	IsEmptyDirectory bool
	SymlinkTarget    string
	NodeProperties   *repb.NodeProperties
}

TreeOutput represents a leaf output node in a nested directory structure (a file, a symlink, or an empty directory).

type TreeStats

type TreeStats struct {
	// The total number of input files.
	InputFiles int
	// The total number of input directories.
	InputDirectories int
	// The total number of input symlinks
	InputSymlinks int
	// The overall number of bytes from all the inputs.
	TotalInputBytes int64
}

TreeStats contains various stats/metadata of the constructed Merkle tree. Note that these stats count the overall input tree, even if some parts of it are not unique. For example, if a file "foo" of 10 bytes occurs 5 times in the tree, it will be counted as 5 InputFiles and 50 TotalInputBytes.

type TreeSymlinkOpts

type TreeSymlinkOpts struct {
	// By default, a symlink is converted into its targeted file.
	// If true, preserve the symlink.
	Preserved bool
	// If true, the symlink target (if not dangling) is followed.
	FollowsTarget bool
	// If true, overrides Preserved=true for symlinks that point outside the
	// exec root, converting them into their targeted files while preserving
	// symlinks that point to files within the exec root.  Has no effect if
	// Preserved=false, as all symlinks are materialized.
	MaterializeOutsideExecRoot bool
}

TreeSymlinkOpts controls how symlinks are handled when constructing a tree.

func DefaultTreeSymlinkOpts

func DefaultTreeSymlinkOpts() *TreeSymlinkOpts

DefaultTreeSymlinkOpts returns a default DefaultTreeSymlinkOpts object.

func (*TreeSymlinkOpts) Apply

func (o *TreeSymlinkOpts) Apply(c *Client)

Apply sets the client's TreeSymlinkOpts.

type UnifiedDownloadBufferSize

type UnifiedDownloadBufferSize int

UnifiedDownloadBufferSize is to tune when the daemon for UnifiedDownloads flushes the pending requests.

func (UnifiedDownloadBufferSize) Apply

func (s UnifiedDownloadBufferSize) Apply(c *Client)

Apply sets the client's UnifiedDownloadBufferSize.

type UnifiedDownloadTickDuration

type UnifiedDownloadTickDuration time.Duration

UnifiedDownloadTickDuration is to tune how often the daemon for UnifiedDownloads flushes the pending requests.

func (UnifiedDownloadTickDuration) Apply

func (s UnifiedDownloadTickDuration) Apply(c *Client)

Apply sets the client's UnifiedDownloadTickDuration.

type UnifiedDownloads

type UnifiedDownloads bool

UnifiedDownloads is to specify whether client uploads files in the background, unifying operations between different actions.

func (UnifiedDownloads) Apply

func (s UnifiedDownloads) Apply(c *Client)

Apply sets the client's UnifiedDownloads. Note: it is unsafe to change this property when connections are ongoing.

type UnifiedUploadBufferSize

type UnifiedUploadBufferSize int

UnifiedUploadBufferSize is to tune when the daemon for UnifiedUploads flushes the pending requests.

func (UnifiedUploadBufferSize) Apply

func (s UnifiedUploadBufferSize) Apply(c *Client)

Apply sets the client's UnifiedDownloadBufferSize.

type UnifiedUploadTickDuration

type UnifiedUploadTickDuration time.Duration

UnifiedUploadTickDuration is to tune how often the daemon for UnifiedUploads flushes the pending requests.

func (UnifiedUploadTickDuration) Apply

func (s UnifiedUploadTickDuration) Apply(c *Client)

Apply sets the client's UnifiedUploadTickDuration.

type UnifiedUploads

type UnifiedUploads bool

UnifiedUploads is to specify whether client uploads files in the background, unifying operations between different actions.

func (UnifiedUploads) Apply

func (s UnifiedUploads) Apply(c *Client)

Apply sets the client's UnifiedUploads.

type UploadCompressionPredicate

type UploadCompressionPredicate func(*uploadinfo.Entry) bool

An UploadCompressionPredicate determines whether to compress a blob on upload. Note that the CompressedBytestreamThreshold takes priority over this (i.e. if the blob to be uploaded is smaller than the threshold, this will not be called).

func (UploadCompressionPredicate) Apply

func (cc UploadCompressionPredicate) Apply(c *Client)

Apply sets the client's compression predicate.

type UseBatchCompression

type UseBatchCompression bool

UseBatchCompression is currently set to true when the server has SupportedBatchUpdateCompressors capability and supports ZSTD compression.

func (UseBatchCompression) Apply

func (u UseBatchCompression) Apply(c *Client)

Apply sets the batchCompression flag on a client.

type UseBatchOps

type UseBatchOps bool

UseBatchOps can be set to true to use batch CAS operations when uploading multiple blobs, or false to always use individual ByteStream requests.

func (UseBatchOps) Apply

func (u UseBatchOps) Apply(c *Client)

Apply sets the UseBatchOps flag on a client.

type UseCASNG

type UseCASNG bool

UseCASNG is a feature flag for the casng package.

func (UseCASNG) Apply

func (o UseCASNG) Apply(c *Client)

Apply sets the feature flag value in the Client.

type UtilizeLocality

type UtilizeLocality bool

UtilizeLocality is to specify whether client downloads files utilizing disk access locality.

func (UtilizeLocality) Apply

func (s UtilizeLocality) Apply(c *Client)

Apply sets the client's UtilizeLocality.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL