Documentation ¶
Overview ¶
Package client contains a high-level remote execution client library.
Index ¶
- Constants
- Variables
- func NewCompressedWriteBuffer(w io.Writer) (io.WriteCloser, chan error, error)
- func OperationStatus(op *oppb.Operation) *status.Status
- type Action
- type AuthType
- type CASConcurrency
- type ChunkMaxSize
- type Client
- func (c *Client) BatchDownloadBlobs(ctx context.Context, dgs []digest.Digest) (map[digest.Digest][]byte, error)
- func (c *Client) BatchDownloadBlobsWithStats(ctx context.Context, dgs []digest.Digest) (map[digest.Digest]CompressedBlobInfo, error)
- func (c *Client) BatchReadBlobs(ctx context.Context, req *repb.BatchReadBlobsRequest) (res *repb.BatchReadBlobsResponse, err error)
- func (c *Client) BatchUpdateBlobs(ctx context.Context, req *repb.BatchUpdateBlobsRequest) (res *repb.BatchUpdateBlobsResponse, err error)
- func (c *Client) BatchWriteBlobs(ctx context.Context, blobs map[digest.Digest][]byte) error
- func (c *Client) CallWithTimeout(ctx context.Context, rpcName string, f func(ctx context.Context) error) error
- func (c *Client) CancelOperation(ctx context.Context, req *oppb.CancelOperationRequest) (res *emptypb.Empty, err error)
- func (c *Client) CheckActionCache(ctx context.Context, acDg *repb.Digest) (*repb.ActionResult, error)
- func (c *Client) CheckCapabilities(ctx context.Context) (err error)
- func (c *Client) Close() error
- func (c *Client) ComputeMerkleTree(ctx context.Context, execRoot, workingDir, remoteWorkingDir string, ...) (root digest.Digest, inputs []*uploadinfo.Entry, stats *TreeStats, err error)
- func (c *Client) ComputeOutputsToUpload(execRoot, workingDir string, paths []string, cache filemetadata.Cache, ...) (map[digest.Digest]*uploadinfo.Entry, *repb.ActionResult, error)
- func (c *Client) DeleteOperation(ctx context.Context, req *oppb.DeleteOperationRequest) (res *emptypb.Empty, err error)
- func (c *Client) DownloadActionOutputs(ctx context.Context, resPb *repb.ActionResult, outDir string, ...) (*MovedBytesMetadata, error)
- func (c *Client) DownloadDirectory(ctx context.Context, d digest.Digest, outDir string, cache filemetadata.Cache) (map[string]*TreeOutput, *MovedBytesMetadata, error)
- func (c *Client) DownloadFiles(ctx context.Context, outDir string, outputs map[digest.Digest]*TreeOutput) (*MovedBytesMetadata, error)
- func (c *Client) DownloadOutputs(ctx context.Context, outs map[string]*TreeOutput, outDir string, ...) (*MovedBytesMetadata, error)
- func (c *Client) Execute(ctx context.Context, req *repb.ExecuteRequest) (res regrpc.Execution_ExecuteClient, err error)
- func (c *Client) ExecuteAction(ctx context.Context, ac *Action) (*repb.ActionResult, error)
- func (c *Client) ExecuteAndWait(ctx context.Context, req *repb.ExecuteRequest) (op *oppb.Operation, err error)
- func (c *Client) ExecuteAndWaitProgress(ctx context.Context, req *repb.ExecuteRequest, ...) (op *oppb.Operation, err error)
- func (c *Client) FindMissingBlobs(ctx context.Context, req *repb.FindMissingBlobsRequest) (res *repb.FindMissingBlobsResponse, err error)
- func (c *Client) FlattenActionOutputs(ctx context.Context, ar *repb.ActionResult) (map[string]*TreeOutput, error)
- func (c *Client) FlattenTree(tree *repb.Tree, rootPath string) (map[string]*TreeOutput, error)
- func (c *Client) GetActionResult(ctx context.Context, req *repb.GetActionResultRequest) (res *repb.ActionResult, err error)
- func (c *Client) GetBackendCapabilities(ctx context.Context, conn *grpc.ClientConn, req *repb.GetCapabilitiesRequest) (res *repb.ServerCapabilities, err error)
- func (c *Client) GetCapabilities(ctx context.Context) (res *repb.ServerCapabilities, err error)
- func (c *Client) GetCapabilitiesForInstance(ctx context.Context, instance string) (res *repb.ServerCapabilities, err error)
- func (c *Client) GetDirectoryTree(ctx context.Context, d *repb.Digest) (result []*repb.Directory, err error)
- func (c *Client) GetOperation(ctx context.Context, req *oppb.GetOperationRequest) (res *oppb.Operation, err error)
- func (c *Client) GetTree(ctx context.Context, req *repb.GetTreeRequest) (res regrpc.ContentAddressableStorage_GetTreeClient, err error)
- func (c *Client) IsCasNG() bool
- func (c *Client) ListOperations(ctx context.Context, req *oppb.ListOperationsRequest) (res *oppb.ListOperationsResponse, err error)
- func (c *Client) MissingBlobs(ctx context.Context, digests []digest.Digest) ([]digest.Digest, error)
- func (c *Client) NgUpload(ctx context.Context, reqs ...casng.UploadRequest) ([]digest.Digest, casng.Stats, error)
- func (c *Client) NgUploadTree(ctx context.Context, execRoot impath.Absolute, ...) (rootDigest digest.Digest, uploaded []digest.Digest, stats casng.Stats, ...)
- func (c *Client) PrepAction(ctx context.Context, ac *Action) (*repb.Digest, *repb.ActionResult, error)
- func (c *Client) QueryWriteStatus(ctx context.Context, req *bspb.QueryWriteStatusRequest) (res *bspb.QueryWriteStatusResponse, err error)
- func (c *Client) RPCOpts() []grpc.CallOption
- func (c *Client) Read(ctx context.Context, req *bspb.ReadRequest) (res bsgrpc.ByteStream_ReadClient, err error)
- func (c *Client) ReadBlob(ctx context.Context, d digest.Digest) ([]byte, *MovedBytesMetadata, error)
- func (c *Client) ReadBlobRange(ctx context.Context, d digest.Digest, offset, limit int64) ([]byte, *MovedBytesMetadata, error)
- func (c *Client) ReadBlobToFile(ctx context.Context, d digest.Digest, fpath string) (*MovedBytesMetadata, error)
- func (c *Client) ReadBytes(ctx context.Context, name string) ([]byte, error)
- func (c *Client) ReadProto(ctx context.Context, d digest.Digest, msg proto.Message) (*MovedBytesMetadata, error)
- func (c *Client) ReadResourceTo(ctx context.Context, name string, w io.Writer) (int64, error)
- func (c *Client) ReadResourceToFile(ctx context.Context, name, fpath string) (int64, error)
- func (c *Client) ResourceName(segments ...string) (string, error)
- func (c *Client) ResourceNameCompressedWrite(hash string, sizeBytes int64) string
- func (c *Client) ResourceNameWrite(hash string, sizeBytes int64) string
- func (c *Client) RunBackgroundTasks(ctx context.Context)
- func (c *Client) SupportsActionPlatformProperties() bool
- func (c *Client) SupportsCommandOutputPaths() bool
- func (c *Client) UpdateActionResult(ctx context.Context, req *repb.UpdateActionResultRequest) (res *repb.ActionResult, err error)
- func (c *Client) UploadIfMissing(ctx context.Context, entries ...*uploadinfo.Entry) ([]digest.Digest, int64, error)
- func (c *Client) WaitExecution(ctx context.Context, req *repb.WaitExecutionRequest) (res regrpc.Execution_ExecuteClient, err error)
- func (c *Client) Write(ctx context.Context) (res bsgrpc.ByteStream_WriteClient, err error)
- func (c *Client) WriteBlob(ctx context.Context, blob []byte) (digest.Digest, error)
- func (c *Client) WriteBlobs(ctx context.Context, blobs map[digest.Digest][]byte) error
- func (c *Client) WriteBytes(ctx context.Context, name string, data []byte) error
- func (c *Client) WriteBytesAtRemoteOffset(ctx context.Context, name string, data []byte, doNotFinalize bool, ...) (int64, error)
- func (c *Client) WriteProto(ctx context.Context, msg proto.Message) (digest.Digest, error)
- type CompressedBlobInfo
- type CompressedBytestreamThreshold
- type DialParams
- type DirMode
- type ExecutableMode
- type InitError
- type LegacyExecRootRelativeOutputs
- type MaxBatchDigests
- type MaxBatchSize
- type MaxQueryBatchDigests
- type MovedBytesMetadata
- type Opt
- type PerRPCCreds
- type RPCTimeouts
- type RegularMode
- type Retrier
- type StartupCapabilities
- type StatusError
- type TreeOutput
- type TreeStats
- type TreeSymlinkOpts
- type UnifiedDownloadBufferSize
- type UnifiedDownloadTickDuration
- type UnifiedDownloads
- type UnifiedUploadBufferSize
- type UnifiedUploadTickDuration
- type UnifiedUploads
- type UploadCompressionPredicate
- type UseBatchCompression
- type UseBatchOps
- type UseCASNG
- type UtilizeLocality
Constants ¶
const ( // DefaultMaxBatchSize is the maximum size of a batch to upload with BatchWriteBlobs. We set it to slightly // below 4 MB, because that is the limit of a message size in gRPC DefaultMaxBatchSize = 4*1024*1024 - 1024 // DefaultMaxBatchDigests is a suggested approximate limit based on current RBE implementation. // Above that BatchUpdateBlobs calls start to exceed a typical minute timeout. DefaultMaxBatchDigests = 4000 // DefaultMaxQueryBatchDigests is a suggested limit for the number of items for in batch for a missing blobs query. DefaultMaxQueryBatchDigests = 10_000 // DefaultDirMode is mode used to create directories. DefaultDirMode = 0777 // DefaultExecutableMode is mode used to create executable files. DefaultExecutableMode = 0777 // DefaultRegularMode is mode used to create non-executable files. DefaultRegularMode = 0644 )
const DefaultCASConcurrency = 500
DefaultCASConcurrency is the default maximum number of concurrent upload and download operations.
const DefaultCompressedBytestreamThreshold = -1
DefaultCompressedBytestreamThreshold is the default threshold, in bytes, for transferring blobs compressed on ByteStream.Write RPCs.
const DefaultMaxConcurrentRequests = 25
DefaultMaxConcurrentRequests specifies the default maximum number of concurrent requests on a single connection that the GRPC balancer can perform.
const DefaultMaxConcurrentStreams = 25
DefaultMaxConcurrentStreams specifies the default threshold value at which the GRPC balancer should create new sub-connections.
const DefaultUnifiedDownloadBufferSize = 10000
DefaultUnifiedDownloadBufferSize is the default UnifiedDownloadBufferSize.
const DefaultUnifiedDownloadTickDuration = UnifiedDownloadTickDuration(50 * time.Millisecond)
DefaultUnifiedDownloadTickDuration is the default UnifiedDownloadTickDuration.
const DefaultUnifiedUploadBufferSize = 10000
DefaultUnifiedUploadBufferSize is the default UnifiedUploadBufferSize.
const DefaultUnifiedUploadTickDuration = UnifiedUploadTickDuration(50 * time.Millisecond)
DefaultUnifiedUploadTickDuration is the default UnifiedUploadTickDuration.
const (
// HomeDirMacro is replaced by the current user's home dir in the CredFile dial parameter.
HomeDirMacro = "${HOME}"
)
Variables ¶
var DefaultRPCTimeouts = map[string]time.Duration{ "default": 20 * time.Second, "GetCapabilities": 5 * time.Second, "BatchUpdateBlobs": time.Minute, "BatchReadBlobs": time.Minute, "GetTree": time.Minute, "Execute": 0, "WaitExecution": 0, }
DefaultRPCTimeouts contains the default timeout of various RPC calls to RBE.
var ErrEmptySegment = errors.New("empty segment in resoure name")
ErrEmptySegment indicates an attempt to construct a resource name with an empty segment.
Functions ¶
func NewCompressedWriteBuffer ¶
NewCompressedWriteBuffer creates wraps a io.Writer contained compressed contents to write decompressed contents.
Types ¶
type Action ¶
type Action struct { // Args are the command-line arguments to start the process. The first argument is the process // name, and the rest are its arguments. Args []string // EnvVars are the variables to add to the process's environment. EnvVars map[string]string // InputRoot and InputFiles contain the details of the input tree, in remote execution format. // They should normally be constructed through the PackageTree function. InputRoot digest.Digest InputFiles map[digest.Digest][]byte // OutputFiles is a list of output files requested (full paths). OutputFiles []string // OutputDirs is a list of output directories requested (full paths). OutputDirs []string // Docker image is a docker:// URL to the docker image in which execution will take place. DockerImage string // Timeout is the maximum execution time for the action. Note that it's not an overall timeout on // the process, since there may be additional time for transferring files, waiting for a worker to // become available, or other overhead. // // If 0, the server's default timeout is used. Timeout time.Duration // DoNotCache, if true, indicates that the result of this action should never be cached. It // implies SkipCache. DoNotCache bool // SkipCache, if true, indicates that this action should be executed even if there is a copy of // its result in the action cache that could be used instead. SkipCache bool }
Action encodes the full details of an action to be sent to the remote execution service for execution. It corresponds roughly, but not exactly, to the Action proto used by the Remote Execution API.
type AuthType ¶
type AuthType int
AuthType indicates the type of authentication being used.
const ( // UnknownAuth refers to unknown authentication type. UnknownAuth AuthType = iota // NoAuth refers to no authentication when connecting to the RBE service. NoAuth // ExternalTokenAuth is used to connect to the RBE service. ExternalTokenAuth // CredsFileAuth refers to a JSON credentials file used to connect to the RBE service. CredsFileAuth // ApplicationDefaultCredsAuth refers to Google Application default credentials that is // used to connect to the RBE service. ApplicationDefaultCredsAuth // GCECredsAuth refers to GCE machine credentials that is // used to connect to the RBE service. GCECredsAuth )
func Dial ¶
func Dial(ctx context.Context, endpoint string, params DialParams) (*grpc.ClientConn, AuthType, error)
Dial dials a given endpoint and returns the grpc connection that is established.
func DialRaw ¶
func DialRaw(ctx context.Context, params DialParams) (*grpc.ClientConn, AuthType, error)
DialRaw dials a remote execution service and returns the grpc connection that is established. TODO(olaola): remove this overload when all clients use Dial.
type CASConcurrency ¶
type CASConcurrency int
CASConcurrency is the number of simultaneous requests that will be issued for CAS upload and download operations.
func (CASConcurrency) Apply ¶
func (cy CASConcurrency) Apply(c *Client)
Apply sets the CASConcurrency flag on a client.
type ChunkMaxSize ¶
type ChunkMaxSize int
ChunkMaxSize is maximum chunk size to use in Bytestream wrappers.
func (ChunkMaxSize) Apply ¶
func (s ChunkMaxSize) Apply(c *Client)
Apply sets the client's maximal chunk size s.
type Client ¶
type Client struct { // InstanceName is the instance name for the targeted remote execution instance; e.g. for Google // RBE: "projects/<foo>/instances/default_instance". // It should NOT be used to construct resource names, but rather only for reusing the instance name as is. // Use the ResourceName method to create correctly formatted resource names. InstanceName string // Retrier is the Retrier that is used for RPCs made by this client. // // These fields are logically "protected" and are intended for use by extensions of Client. Retrier *Retrier Connection *grpc.ClientConn CASConnection *grpc.ClientConn // Can be different from Connection a separate CAS endpoint is provided. // StartupCapabilities denotes whether to load ServerCapabilities on startup. StartupCapabilities StartupCapabilities // LegacyExecRootRelativeOutputs denotes whether outputs are relative to the exec root. LegacyExecRootRelativeOutputs LegacyExecRootRelativeOutputs // ChunkMaxSize is maximum chunk size to use for CAS uploads/downloads. ChunkMaxSize ChunkMaxSize // CompressedBytestreamThreshold is the threshold in bytes for which blobs are read and written // compressed. Use 0 for all writes being compressed, and a negative number for all operations being // uncompressed. CompressedBytestreamThreshold CompressedBytestreamThreshold // UploadCompressionPredicate is a function called to decide whether a blob should be compressed for upload. UploadCompressionPredicate UploadCompressionPredicate // MaxBatchDigests is maximum amount of digests to batch in upload and download operations. MaxBatchDigests MaxBatchDigests // MaxQueryBatchDigests is maximum amount of digests to batch in CAS query operations. MaxQueryBatchDigests MaxQueryBatchDigests // MaxBatchSize is maximum size in bytes of a batch request for batch operations. MaxBatchSize MaxBatchSize // DirMode is mode used to create directories. DirMode os.FileMode // ExecutableMode is mode used to create executable files. ExecutableMode os.FileMode // RegularMode is mode used to create non-executable files. RegularMode os.FileMode // UtilizeLocality is to specify whether client downloads files utilizing disk access locality. UtilizeLocality UtilizeLocality // UnifiedUploads specifies whether the client uploads files in the background. UnifiedUploads UnifiedUploads // UnifiedUploadBufferSize specifies when the unified upload daemon flushes the pending requests. UnifiedUploadBufferSize UnifiedUploadBufferSize // UnifiedUploadTickDuration specifies how often the unified upload daemon flushes the pending requests. UnifiedUploadTickDuration UnifiedUploadTickDuration // UnifiedDownloads specifies whether the client downloads files in the background. UnifiedDownloads UnifiedDownloads // UnifiedDownloadBufferSize specifies when the unified download daemon flushes the pending requests. UnifiedDownloadBufferSize UnifiedDownloadBufferSize // UnifiedDownloadTickDuration specifies how often the unified download daemon flushes the pending requests. UnifiedDownloadTickDuration UnifiedDownloadTickDuration // TreeSymlinkOpts controls how symlinks are handled when constructing a tree. TreeSymlinkOpts *TreeSymlinkOpts // contains filtered or unexported fields }
Client is a client to several services, including remote execution and services used in conjunction with remote execution. A Client must be constructed by calling Dial() or NewClient() rather than attempting to assemble it directly.
Unless specified otherwise, and provided the fields are not modified, a Client is safe for concurrent use.
func NewClient ¶
func NewClient(ctx context.Context, instanceName string, params DialParams, opts ...Opt) (*Client, error)
NewClient connects to a remote execution service and returns a client suitable for higher-level functionality.
func NewClientFromConnection ¶
func NewClientFromConnection(ctx context.Context, instanceName string, conn, casConn *grpc.ClientConn, opts ...Opt) (*Client, error)
NewClientFromConnection creates a client from gRPC connections to a remote execution service and a cas service.
func (*Client) BatchDownloadBlobs ¶
func (c *Client) BatchDownloadBlobs(ctx context.Context, dgs []digest.Digest) (map[digest.Digest][]byte, error)
BatchDownloadBlobs downloads a number of blobs from the CAS to memory. They must collectively be below the maximum total size for a batch read, which is about 4 MB (see MaxBatchSize). Digests must be computed in advance by the caller. In case multiple errors occur during the blob read, the last error will be returned.
func (*Client) BatchDownloadBlobsWithStats ¶
func (*Client) BatchReadBlobs ¶
func (c *Client) BatchReadBlobs(ctx context.Context, req *repb.BatchReadBlobsRequest) (res *repb.BatchReadBlobsResponse, err error)
BatchReadBlobs wraps the underlying call with specific client options. NOTE that its retry logic ignores the per-blob errors embedded in the response. It is recommended to use BatchDownloadBlobs instead.
func (*Client) BatchUpdateBlobs ¶
func (c *Client) BatchUpdateBlobs(ctx context.Context, req *repb.BatchUpdateBlobsRequest) (res *repb.BatchUpdateBlobsResponse, err error)
BatchUpdateBlobs wraps the underlying call with specific client options. NOTE that its retry logic ignores the per-blob errors embedded in the response; you probably want to use BatchWriteBlobs() instead.
func (*Client) BatchWriteBlobs ¶
BatchWriteBlobs (over)writes specified blobs to the CAS, regardless if they already exist.
The collective size must be below the maximum total size for a batch upload, which is about 4 MB (see MaxBatchSize). In case multiple errors occur during the blob upload, the last error is returned.
func (*Client) CallWithTimeout ¶
func (c *Client) CallWithTimeout(ctx context.Context, rpcName string, f func(ctx context.Context) error) error
CallWithTimeout executes the given function f with a context that times out after an RPC timeout.
This method is logically "protected" and is intended for use by extensions of Client.
func (*Client) CancelOperation ¶
func (c *Client) CancelOperation(ctx context.Context, req *oppb.CancelOperationRequest) (res *emptypb.Empty, err error)
CancelOperation wraps the underlying call with specific client options.
func (*Client) CheckActionCache ¶
func (c *Client) CheckActionCache(ctx context.Context, acDg *repb.Digest) (*repb.ActionResult, error)
CheckActionCache queries remote action cache, returning an ActionResult or nil if it doesn't exist.
func (*Client) CheckCapabilities ¶
CheckCapabilities verifies that this client can work with the remote server in terms of API version and digest function. It sets some client parameters according to remote server preferences, like MaxBatchSize.
func (*Client) ComputeMerkleTree ¶
func (c *Client) ComputeMerkleTree(ctx context.Context, execRoot, workingDir, remoteWorkingDir string, is *command.InputSpec, cache filemetadata.Cache) (root digest.Digest, inputs []*uploadinfo.Entry, stats *TreeStats, err error)
ComputeMerkleTree packages an InputSpec into uploadable inputs, returned as uploadinfo.Entrys
func (*Client) ComputeOutputsToUpload ¶
func (c *Client) ComputeOutputsToUpload(execRoot, workingDir string, paths []string, cache filemetadata.Cache, sb command.SymlinkBehaviorType, nodeProperties map[string]*cpb.NodeProperties) (map[digest.Digest]*uploadinfo.Entry, *repb.ActionResult, error)
ComputeOutputsToUpload transforms the provided local output paths into uploadable Chunkers. The paths have to be relative to execRoot. It also populates the remote ActionResult, packaging output directories as trees where required.
func (*Client) DeleteOperation ¶
func (c *Client) DeleteOperation(ctx context.Context, req *oppb.DeleteOperationRequest) (res *emptypb.Empty, err error)
DeleteOperation wraps the underlying call with specific client options.
func (*Client) DownloadActionOutputs ¶
func (c *Client) DownloadActionOutputs(ctx context.Context, resPb *repb.ActionResult, outDir string, cache filemetadata.Cache) (*MovedBytesMetadata, error)
DownloadActionOutputs downloads the output files and directories in the given action result. It returns the amount of downloaded bytes. It returns the number of logical and real bytes downloaded, which may be different from sum of sizes of the files due to dedupping and compression.
func (*Client) DownloadDirectory ¶
func (c *Client) DownloadDirectory(ctx context.Context, d digest.Digest, outDir string, cache filemetadata.Cache) (map[string]*TreeOutput, *MovedBytesMetadata, error)
DownloadDirectory downloads the entire directory of given digest. It returns the number of logical and real bytes downloaded, which may be different from sum of sizes of the files due to dedupping and compression.
func (*Client) DownloadFiles ¶
func (c *Client) DownloadFiles(ctx context.Context, outDir string, outputs map[digest.Digest]*TreeOutput) (*MovedBytesMetadata, error)
DownloadFiles downloads the output files under |outDir|. It returns the number of logical and real bytes downloaded, which may be different from sum of sizes of the files due to dedupping and compression.
func (*Client) DownloadOutputs ¶
func (c *Client) DownloadOutputs(ctx context.Context, outs map[string]*TreeOutput, outDir string, cache filemetadata.Cache) (*MovedBytesMetadata, error)
DownloadOutputs downloads the specified outputs. It returns the amount of downloaded bytes. It returns the number of logical and real bytes downloaded, which may be different from sum of sizes of the files due to dedupping and compression.
func (*Client) Execute ¶
func (c *Client) Execute(ctx context.Context, req *repb.ExecuteRequest) (res regrpc.Execution_ExecuteClient, err error)
Execute wraps the underlying call with specific client options. The wrapper is here for completeness to provide access to the low-level RPCs. Prefer using higher-level ExecuteAndWait instead, as it includes retries/timeouts handling.
func (*Client) ExecuteAction ¶
ExecuteAction performs all of the steps necessary to execute an action, including checking the cache if applicable, uploading necessary protos and inputs to the CAS, queueing the action, and waiting for the result.
Execute may block for a long time while the action is in progress. Currently, two-phase queue-wait is not supported; the token necessary to query the job is not provided to users.
This method MAY return a non-nil ActionResult along with a non-nil error if the action failed. The ActionResult may include, for example, the stdout/stderr digest from the attempt.
ExecuteAction is a convenience method which wraps both PrepAction and ExecuteAndWait, along with other steps such as uploading extra inputs and parsing Operation protos.
func (*Client) ExecuteAndWait ¶
func (c *Client) ExecuteAndWait(ctx context.Context, req *repb.ExecuteRequest) (op *oppb.Operation, err error)
ExecuteAndWait calls Execute on the underlying client and WaitExecution if necessary. It returns the completed operation or an error.
The retry logic is complicated. Assuming retries are enabled, we want the retry to call WaitExecution if there's an Operation "in progress", and to call Execute otherwise. In practice that means:
- If an error occurs before the first operation is returned, or after the final operation is returned (i.e. the one with op.Done==true), retry by calling Execute again.
- Otherwise, retry by calling WaitExecution with the last operation name.
In addition, we want the retrier to trigger based on certain operation statuses as well as on explicit errors. (The shouldRetry function knows which statuses.) We do this by mapping statuses, if present, to errors inside the closure and then throwing away such "fake" errors outside the closure (if we ran out of retries or if there was never a retrier enabled). The exception is deadline-exceeded statuses, which we never give to the retrier (and hence will always propagate directly to the caller).
func (*Client) ExecuteAndWaitProgress ¶
func (c *Client) ExecuteAndWaitProgress(ctx context.Context, req *repb.ExecuteRequest, progress func(metadata *repb.ExecuteOperationMetadata)) (op *oppb.Operation, err error)
ExecuteAndWaitProgress calls Execute on the underlying client and WaitExecution if necessary. It returns the completed operation or an error. The supplied callback function is called for each message received to update the state of the remote action.
func (*Client) FindMissingBlobs ¶
func (c *Client) FindMissingBlobs(ctx context.Context, req *repb.FindMissingBlobsRequest) (res *repb.FindMissingBlobsResponse, err error)
FindMissingBlobs wraps the underlying call with specific client options.
func (*Client) FlattenActionOutputs ¶
func (c *Client) FlattenActionOutputs(ctx context.Context, ar *repb.ActionResult) (map[string]*TreeOutput, error)
FlattenActionOutputs collects and flattens all the outputs of an action. It downloads the output directory metadata, if required, but not the leaf file blobs.
func (*Client) FlattenTree ¶
FlattenTree takes a Tree message and calculates the relative paths of all the files to the tree root. Note that only files/symlinks/empty directories are included in the returned slice, not the intermediate directories. Directories containing only other directories will be omitted.
func (*Client) GetActionResult ¶
func (c *Client) GetActionResult(ctx context.Context, req *repb.GetActionResultRequest) (res *repb.ActionResult, err error)
GetActionResult wraps the underlying call with specific client options.
func (*Client) GetBackendCapabilities ¶
func (c *Client) GetBackendCapabilities(ctx context.Context, conn *grpc.ClientConn, req *repb.GetCapabilitiesRequest) (res *repb.ServerCapabilities, err error)
GetBackendCapabilities returns the capabilities for a specific server connection (either the main connection or the CAS connection).
func (*Client) GetCapabilities ¶
GetCapabilities returns the capabilities for the targeted servers. If the CAS URL was set differently to the execution server then the CacheCapabilities will be determined from that; ExecutionCapabilities will always come from the main URL.
func (*Client) GetCapabilitiesForInstance ¶
func (c *Client) GetCapabilitiesForInstance(ctx context.Context, instance string) (res *repb.ServerCapabilities, err error)
GetCapabilitiesForInstance returns the capabilities for the targeted servers. If the CAS URL was set differently to the execution server then the CacheCapabilities will be determined from that; ExecutionCapabilities will always come from the main URL.
func (*Client) GetDirectoryTree ¶
func (c *Client) GetDirectoryTree(ctx context.Context, d *repb.Digest) (result []*repb.Directory, err error)
GetDirectoryTree returns the entire directory tree rooted at the given digest (which must target a Directory stored in the CAS).
func (*Client) GetOperation ¶
func (c *Client) GetOperation(ctx context.Context, req *oppb.GetOperationRequest) (res *oppb.Operation, err error)
GetOperation wraps the underlying call with specific client options.
func (*Client) GetTree ¶
func (c *Client) GetTree(ctx context.Context, req *repb.GetTreeRequest) (res regrpc.ContentAddressableStorage_GetTreeClient, err error)
GetTree wraps the underlying call with specific client options. The wrapper is here for completeness to provide access to the low-level RPCs. Prefer using higher-level GetDirectoryTree instead, as it includes retries/timeouts handling.
func (*Client) ListOperations ¶
func (c *Client) ListOperations(ctx context.Context, req *oppb.ListOperationsRequest) (res *oppb.ListOperationsResponse, err error)
ListOperations wraps the underlying call with specific client options.
func (*Client) MissingBlobs ¶
func (c *Client) MissingBlobs(ctx context.Context, digests []digest.Digest) ([]digest.Digest, error)
MissingBlobs queries the CAS to determine if it has the specified blobs. Returns a slice of missing blobs.
func (*Client) NgUpload ¶
func (c *Client) NgUpload(ctx context.Context, reqs ...casng.UploadRequest) ([]digest.Digest, casng.Stats, error)
NgUpload delegates to Upload of the casng package.
func (*Client) NgUploadTree ¶
func (c *Client) NgUploadTree(ctx context.Context, execRoot impath.Absolute, workingDir, remoteWorkingDir impath.Relative, reqs ...casng.UploadRequest) (rootDigest digest.Digest, uploaded []digest.Digest, stats casng.Stats, err error)
NgUploadTree delegates to UploadTree of the casng package.
func (*Client) PrepAction ¶
func (c *Client) PrepAction(ctx context.Context, ac *Action) (*repb.Digest, *repb.ActionResult, error)
PrepAction constructs the Command and Action protos, checks the action cache if appropriate, and uploads the action if the cache was not checked or if there was no cache hit. If successful, PrepAction returns the digest of the Action and a (possibly nil) pointer to an ActionResult representing the result of the cache check, if any.
func (*Client) QueryWriteStatus ¶
func (c *Client) QueryWriteStatus(ctx context.Context, req *bspb.QueryWriteStatusRequest) (res *bspb.QueryWriteStatusResponse, err error)
QueryWriteStatus wraps the underlying call with specific client options.
func (*Client) RPCOpts ¶
func (c *Client) RPCOpts() []grpc.CallOption
RPCOpts returns the default RPC options that should be used for calls made with this client.
This method is logically "protected" and is intended for use by extensions of Client.
func (*Client) Read ¶
func (c *Client) Read(ctx context.Context, req *bspb.ReadRequest) (res bsgrpc.ByteStream_ReadClient, err error)
Read wraps the underlying call with specific client options. The wrapper is here for completeness to provide access to the low-level RPCs. Prefer using higher-level functions such as ReadBlob(ToFile) instead, as they include retries/timeouts handling.
func (*Client) ReadBlob ¶
func (c *Client) ReadBlob(ctx context.Context, d digest.Digest) ([]byte, *MovedBytesMetadata, error)
ReadBlob fetches a blob from the CAS into a byte slice. Returns the size of the blob and the amount of bytes moved through the wire.
func (*Client) ReadBlobRange ¶
func (c *Client) ReadBlobRange(ctx context.Context, d digest.Digest, offset, limit int64) ([]byte, *MovedBytesMetadata, error)
ReadBlobRange fetches a partial blob from the CAS into a byte slice, starting from offset bytes and including at most limit bytes (or no limit if limit==0). The offset must be non-negative and no greater than the size of the entire blob. The limit must not be negative, but offset+limit may be greater than the size of the entire blob.
func (*Client) ReadBlobToFile ¶
func (c *Client) ReadBlobToFile(ctx context.Context, d digest.Digest, fpath string) (*MovedBytesMetadata, error)
ReadBlobToFile fetches a blob with a provided digest name from the CAS, saving it into a file. It returns the number of bytes read.
func (*Client) ReadBytes ¶
ReadBytes fetches a resource's contents into a byte slice.
ReadBytes panics with ErrTooLarge if an attempt is made to read a resource with contents too large to fit into a byte array.
func (*Client) ReadProto ¶
func (c *Client) ReadProto(ctx context.Context, d digest.Digest, msg proto.Message) (*MovedBytesMetadata, error)
ReadProto reads a blob from the CAS and unmarshals it into the given message. Returns the size of the proto and the amount of bytes moved through the wire.
func (*Client) ReadResourceTo ¶
ReadResourceTo writes a resource's contents to a Writer.
func (*Client) ReadResourceToFile ¶
ReadResourceToFile fetches a resource's contents, saving it into a file.
The provided resource name must be a child resource of this client's instance, e.g. '/blobs/abc-123/45' (NOT 'projects/foo/bar/baz').
The number of bytes read is returned.
func (*Client) ResourceName ¶
ResourceName constructs a correctly formatted resource name as defined in the spec. No keyword validation is performed since the semantics of the path are defined by the server. See: https://github.com/bazelbuild/remote-apis/blob/cb8058798964f0adf6dbab2f4c2176ae2d653447/build/bazel/remote/execution/v2/remote_execution.proto#L223
func (*Client) ResourceNameCompressedWrite ¶
ResourceNameCompressedWrite generates a valid write resource name. TODO(rubensf): Converge compressor to proto in https://github.com/bazelbuild/remote-apis/pull/168 once that gets merged in.
func (*Client) ResourceNameWrite ¶
ResourceNameWrite generates a valid write resource name.
func (*Client) RunBackgroundTasks ¶
RunBackgroundTasks starts background goroutines for the client.
func (*Client) SupportsActionPlatformProperties ¶
SupportsActionPlatformProperties returns whether the server's RE API version supports the `Action.platform_properties` field.
func (*Client) SupportsCommandOutputPaths ¶
SupportsCommandOutputPaths returns whether the server's RE API version supports the `Command.action_paths` field.
func (*Client) UpdateActionResult ¶
func (c *Client) UpdateActionResult(ctx context.Context, req *repb.UpdateActionResultRequest) (res *repb.ActionResult, err error)
UpdateActionResult wraps the underlying call with specific client options.
func (*Client) UploadIfMissing ¶
func (c *Client) UploadIfMissing(ctx context.Context, entries ...*uploadinfo.Entry) ([]digest.Digest, int64, error)
UploadIfMissing writes the missing blobs from those specified to the CAS.
The blobs are first matched against existing ones and only the missing blobs are written. Returns a slice of missing digests that were written and the sum of total bytes moved, which may be different from logical bytes moved (i.e. sum of digest sizes) due to compression.
func (*Client) WaitExecution ¶
func (c *Client) WaitExecution(ctx context.Context, req *repb.WaitExecutionRequest) (res regrpc.Execution_ExecuteClient, err error)
WaitExecution wraps the underlying call with specific client options. The wrapper is here for completeness to provide access to the low-level RPCs. Prefer using higher-level ExecuteAndWait instead, as it includes retries/timeouts handling.
func (*Client) Write ¶
Write wraps the underlying call with specific client options. The wrapper is here for completeness to provide access to the low-level RPCs. Prefer using higher-level functions such as WriteBlob(s) instead, as they include retries/timeouts handling.
func (*Client) WriteBlob ¶
WriteBlob (over)writes a blob to the CAS regardless if it already exists.
func (*Client) WriteBlobs ¶
WriteBlobs is a proxy method for UploadIfMissing that facilitates specifying a map of digest-to-blob. It's intended for use with PackageTree. TODO(olaola): rethink the API of this layer: * Do we want to allow []byte uploads, or require the user to construct Chunkers? * How to consistently distinguish in the API between should we use GetMissing or not? * Should BatchWrite be a public method at all?
func (*Client) WriteBytes ¶
WriteBytes uploads a byte slice.
func (*Client) WriteBytesAtRemoteOffset ¶
func (c *Client) WriteBytesAtRemoteOffset(ctx context.Context, name string, data []byte, doNotFinalize bool, initialOffset int64) (int64, error)
WriteBytesAtRemoteOffset uploads a byte slice with a given resource name to the CAS at an arbitrary offset but retries still resend from the initial Offset. As of now(2023-02-08), ByteStream.WriteRequest.FinishWrite and an arbitrary offset are supported for uploads with LogStream resource name. If doNotFinalize is set to true, ByteStream.WriteRequest.FinishWrite will be set to false.
type CompressedBlobInfo ¶
CompressedBlobInfo is primarily used to store stats about compressed blob size in addition to the actual blob data.
type CompressedBytestreamThreshold ¶
type CompressedBytestreamThreshold int64
CompressedBytestreamThreshold is the threshold for compressing blobs when writing/reading. See comment in related field on the Client struct.
func (CompressedBytestreamThreshold) Apply ¶
func (s CompressedBytestreamThreshold) Apply(c *Client)
Apply sets the client's maximal chunk size s.
type DialParams ¶
type DialParams struct { // Service contains the address of remote execution service. Service string // CASService contains the address of the CAS service, if it is separate from // the remote execution service. CASService string // UseApplicationDefault indicates that the default credentials should be used. UseApplicationDefault bool // UseComputeEngine indicates that the default CE credentials should be used. UseComputeEngine bool // UseExternalAuthToken indicates whether an externally specified auth token should be used. // If set to true, ExternalPerRPCCreds should also be non-nil. UseExternalAuthToken bool // ExternalPerRPCCreds refers to the per RPC credentials that should be used for each RPC. ExternalPerRPCCreds *PerRPCCreds // CredFile is the JSON file that contains the credentials for RPCs. CredFile string // ActAsAccount is the service account to act as when making RPC calls. ActAsAccount string // NoSecurity is true if there is no security: no credentials are configured // (NoAuth is implied) and grpc.WithInsecure() is passed in. Should only be // used in test code. NoSecurity bool // NoAuth is true if TLS is enabled (NoSecurity is false) but the client does // not need to authenticate with the server. NoAuth bool // TransportCredsOnly is true if it's the caller's responsibility to set per-RPC credentials // on individual calls. This overrides ActAsAccount, UseApplicationDefault, and UseComputeEngine. // This is not the same as NoSecurity, as transport credentials will still be set. TransportCredsOnly bool // TLSCACertFile is the PEM file that contains TLS root certificates. TLSCACertFile string // TLSServerName overrides the server name sent in TLS, if set to a non-empty string. TLSServerName string // DialOpts defines the set of gRPC DialOptions to apply, in addition to any used internally. DialOpts []grpc.DialOption // MaxConcurrentRequests specifies the maximum number of concurrent RPCs on a single connection. MaxConcurrentRequests uint32 // MaxConcurrentStreams specifies the maximum number of concurrent stream RPCs on a single connection. MaxConcurrentStreams uint32 // TLSClientAuthCert specifies the public key in PEM format for using mTLS auth to connect to the RBE service. // // If this is specified, TLSClientAuthKey must also be specified. TLSClientAuthCert string // TLSClientAuthKey specifies the private key for using mTLS auth to connect to the RBE service. // // If this is specified, TLSClientAuthCert must also be specified. TLSClientAuthKey string }
DialParams contains all the parameters that Dial needs.
type ExecutableMode ¶
ExecutableMode is mode used to create executable files.
func (ExecutableMode) Apply ¶
func (m ExecutableMode) Apply(c *Client)
Apply sets the client's ExecutableMode to m.
type InitError ¶
type InitError struct { // Err refers to the underlying client initialization error. Err error // AuthUsed stores the type of authentication used to connect to RBE. AuthUsed AuthType }
InitError is used to wrap the error returned when initializing a new client to also indicate the type of authentication used.
type LegacyExecRootRelativeOutputs ¶
type LegacyExecRootRelativeOutputs bool
LegacyExecRootRelativeOutputs controls whether the client uses legacy behavior of treating output paths as relative to the exec root instead of the working directory.
func (LegacyExecRootRelativeOutputs) Apply ¶
func (l LegacyExecRootRelativeOutputs) Apply(c *Client)
Apply sets the LegacyExecRootRelativeOutputs flag on a client.
type MaxBatchDigests ¶
type MaxBatchDigests int
MaxBatchDigests is maximum amount of digests to batch in upload and download operations.
func (MaxBatchDigests) Apply ¶
func (s MaxBatchDigests) Apply(c *Client)
Apply sets the client's maximal batch digests to s.
type MaxBatchSize ¶
type MaxBatchSize int64
MaxBatchSize is maximum size in bytes of a batch request for batch operations.
func (MaxBatchSize) Apply ¶
func (s MaxBatchSize) Apply(c *Client)
Apply sets the client's maximum batch size to s.
type MaxQueryBatchDigests ¶
type MaxQueryBatchDigests int
MaxQueryBatchDigests is maximum amount of digests to batch in query operations.
func (MaxQueryBatchDigests) Apply ¶
func (s MaxQueryBatchDigests) Apply(c *Client)
Apply sets the client's maximal batch digests to s.
type MovedBytesMetadata ¶
type MovedBytesMetadata struct { // Requested is the sum of the sizes in bytes for all the uncompressed // blobs needed by the execution. It includes bytes that might have // been deduped and thus not passed through the wire. Requested int64 // LogicalMoved is the sum of the sizes in bytes of the uncompressed // versions of the blobs passed through the wire. It does not included // bytes for blobs that were de-duped. LogicalMoved int64 // RealMoved is the sum of sizes in bytes for all blobs passed // through the wire in the format they were passed through (eg // compressed). RealMoved int64 // Cached is amount of logical bytes that we did not have to move // through the wire because they were de-duped. Cached int64 }
MovedBytesMetadata represents the bytes moved in CAS related requests.
type Opt ¶
type Opt interface {
Apply(*Client)
}
Opt is an option that can be passed to Dial in order to configure the behaviour of the client.
type PerRPCCreds ¶
type PerRPCCreds struct {
Creds credentials.PerRPCCredentials
}
PerRPCCreds sets per-call options that will be set on all RPCs to the underlying connection.
func (*PerRPCCreds) Apply ¶
func (p *PerRPCCreds) Apply(c *Client)
Apply saves the per-RPC creds in the Client.
type RPCTimeouts ¶
RPCTimeouts is a Opt that sets the per-RPC deadline. The keys are RPC names. The "default" key, if present, is the default timeout. 0 values are valid and indicate no timeout.
func (RPCTimeouts) Apply ¶
func (d RPCTimeouts) Apply(c *Client)
Apply applies the timeouts to a Client. It overrides the provided values, but doesn't remove/alter any other present values.
type RegularMode ¶
RegularMode is mode used to create non-executable files.
func (RegularMode) Apply ¶
func (m RegularMode) Apply(c *Client)
Apply sets the client's RegularMode to m.
type Retrier ¶
type Retrier struct { Backoff retry.BackoffPolicy ShouldRetry retry.ShouldRetry }
Retrier applied to all client requests.
func RetryTransient ¶
func RetryTransient() *Retrier
RetryTransient is a default retry policy for transient status codes.
type StartupCapabilities ¶
type StartupCapabilities bool
StartupCapabilities controls whether the client should attempt to fetch the remote server capabilities on New. If set to true, some configuration such as MaxBatchSize is set according to the remote server capabilities instead of using the provided values.
func (StartupCapabilities) Apply ¶
func (s StartupCapabilities) Apply(c *Client)
Apply sets the StartupCapabilities flag on a client.
type StatusError ¶
type StatusError struct {
// contains filtered or unexported fields
}
StatusError is the same as status.Error except it includes the error details in the error message.
func StatusDetailedError ¶
func StatusDetailedError(st *status.Status) *StatusError
StatusDetailedError creates a StatusError from Status, which is the same as st.Err() except it includes the error details in the error message.
func (*StatusError) Error ¶
func (e *StatusError) Error() string
func (*StatusError) GRPCStatus ¶
func (e *StatusError) GRPCStatus() *status.Status
GRPCStatus returns the Status represented by e.
func (*StatusError) Is ¶
func (e *StatusError) Is(target error) bool
Is implements error.Is functionality. A StatusError is equivalent if the code and message are identical.
type TreeOutput ¶
type TreeOutput struct { Digest digest.Digest Path string IsExecutable bool IsEmptyDirectory bool SymlinkTarget string NodeProperties *repb.NodeProperties }
TreeOutput represents a leaf output node in a nested directory structure (a file, a symlink, or an empty directory).
type TreeStats ¶
type TreeStats struct { // The total number of input files. InputFiles int // The total number of input directories. InputDirectories int // The total number of input symlinks InputSymlinks int // The overall number of bytes from all the inputs. TotalInputBytes int64 }
TreeStats contains various stats/metadata of the constructed Merkle tree. Note that these stats count the overall input tree, even if some parts of it are not unique. For example, if a file "foo" of 10 bytes occurs 5 times in the tree, it will be counted as 5 InputFiles and 50 TotalInputBytes.
type TreeSymlinkOpts ¶
type TreeSymlinkOpts struct { // By default, a symlink is converted into its targeted file. // If true, preserve the symlink. Preserved bool // If true, the symlink target (if not dangling) is followed. FollowsTarget bool // If true, overrides Preserved=true for symlinks that point outside the // exec root, converting them into their targeted files while preserving // symlinks that point to files within the exec root. Has no effect if // Preserved=false, as all symlinks are materialized. MaterializeOutsideExecRoot bool }
TreeSymlinkOpts controls how symlinks are handled when constructing a tree.
func DefaultTreeSymlinkOpts ¶
func DefaultTreeSymlinkOpts() *TreeSymlinkOpts
DefaultTreeSymlinkOpts returns a default DefaultTreeSymlinkOpts object.
func (*TreeSymlinkOpts) Apply ¶
func (o *TreeSymlinkOpts) Apply(c *Client)
Apply sets the client's TreeSymlinkOpts.
type UnifiedDownloadBufferSize ¶
type UnifiedDownloadBufferSize int
UnifiedDownloadBufferSize is to tune when the daemon for UnifiedDownloads flushes the pending requests.
func (UnifiedDownloadBufferSize) Apply ¶
func (s UnifiedDownloadBufferSize) Apply(c *Client)
Apply sets the client's UnifiedDownloadBufferSize.
type UnifiedDownloadTickDuration ¶
UnifiedDownloadTickDuration is to tune how often the daemon for UnifiedDownloads flushes the pending requests.
func (UnifiedDownloadTickDuration) Apply ¶
func (s UnifiedDownloadTickDuration) Apply(c *Client)
Apply sets the client's UnifiedDownloadTickDuration.
type UnifiedDownloads ¶
type UnifiedDownloads bool
UnifiedDownloads is to specify whether client uploads files in the background, unifying operations between different actions.
func (UnifiedDownloads) Apply ¶
func (s UnifiedDownloads) Apply(c *Client)
Apply sets the client's UnifiedDownloads. Note: it is unsafe to change this property when connections are ongoing.
type UnifiedUploadBufferSize ¶
type UnifiedUploadBufferSize int
UnifiedUploadBufferSize is to tune when the daemon for UnifiedUploads flushes the pending requests.
func (UnifiedUploadBufferSize) Apply ¶
func (s UnifiedUploadBufferSize) Apply(c *Client)
Apply sets the client's UnifiedDownloadBufferSize.
type UnifiedUploadTickDuration ¶
UnifiedUploadTickDuration is to tune how often the daemon for UnifiedUploads flushes the pending requests.
func (UnifiedUploadTickDuration) Apply ¶
func (s UnifiedUploadTickDuration) Apply(c *Client)
Apply sets the client's UnifiedUploadTickDuration.
type UnifiedUploads ¶
type UnifiedUploads bool
UnifiedUploads is to specify whether client uploads files in the background, unifying operations between different actions.
func (UnifiedUploads) Apply ¶
func (s UnifiedUploads) Apply(c *Client)
Apply sets the client's UnifiedUploads.
type UploadCompressionPredicate ¶
type UploadCompressionPredicate func(*uploadinfo.Entry) bool
An UploadCompressionPredicate determines whether to compress a blob on upload. Note that the CompressedBytestreamThreshold takes priority over this (i.e. if the blob to be uploaded is smaller than the threshold, this will not be called).
func (UploadCompressionPredicate) Apply ¶
func (cc UploadCompressionPredicate) Apply(c *Client)
Apply sets the client's compression predicate.
type UseBatchCompression ¶
type UseBatchCompression bool
UseBatchCompression is currently set to true when the server has SupportedBatchUpdateCompressors capability and supports ZSTD compression.
func (UseBatchCompression) Apply ¶
func (u UseBatchCompression) Apply(c *Client)
Apply sets the batchCompression flag on a client.
type UseBatchOps ¶
type UseBatchOps bool
UseBatchOps can be set to true to use batch CAS operations when uploading multiple blobs, or false to always use individual ByteStream requests.
func (UseBatchOps) Apply ¶
func (u UseBatchOps) Apply(c *Client)
Apply sets the UseBatchOps flag on a client.
type UtilizeLocality ¶
type UtilizeLocality bool
UtilizeLocality is to specify whether client downloads files utilizing disk access locality.
func (UtilizeLocality) Apply ¶
func (s UtilizeLocality) Apply(c *Client)
Apply sets the client's UtilizeLocality.