Documentation ¶
Index ¶
- Constants
- func CASPutProto(ctx context.Context, blobAccess BlobAccess, message proto.Message, ...) (digest.Digest, error)
- func GetReducedActionDigest(digestFunction digest.Function, action *remoteexecution.Action) (digest.Digest, error)
- func VisitTopologicallySortedTree[TArgument any](r io.Reader, digestFunction digest.Function, maximumDirectorySizeBytes int, ...) error
- type BlobAccess
- func NewActionResultExpiringBlobAccess(blobAccess BlobAccess, clock clock.Clock, maximumMessageSizeBytes int, ...) BlobAccess
- func NewActionResultTimestampInjectingBlobAccess(blobAccess BlobAccess, clock clock.Clock, maximumMessageSizeBytes int) BlobAccess
- func NewAuthorizingBlobAccess(base BlobAccess, ...) BlobAccess
- func NewDemultiplexingBlobAccess(getBackend DemultiplexedBlobAccessGetter) BlobAccess
- func NewEmptyBlobInjectingBlobAccess(base BlobAccess) BlobAccess
- func NewErrorBlobAccess(err error) BlobAccess
- func NewExistenceCachingBlobAccess(base BlobAccess, existenceCache *digest.ExistenceCache) BlobAccess
- func NewHierarchicalInstanceNamesBlobAccess(base BlobAccess) BlobAccess
- func NewMetricsBlobAccess(blobAccess BlobAccess, clock clock.Clock, storageType, backendType string) BlobAccess
- func NewReadCanaryingBlobAccess(source, replica BlobAccess, clock clock.Clock, ...) BlobAccess
- func NewReferenceExpandingBlobAccess(indirectContentAddressableStorage, contentAddressableStorage BlobAccess, ...) BlobAccess
- func NewZIPReadingBlobAccess(capabilitiesProvider capabilities.Provider, ...) BlobAccess
- type DemultiplexedBlobAccessGetter
- type ReadBufferFactory
- type ReadWriterAt
- type TreeDirectoryVisitor
- type ZIPWritingBlobAccess
- func (ba *ZIPWritingBlobAccess) Finalize() error
- func (ba *ZIPWritingBlobAccess) FindMissing(ctx context.Context, digests digest.Set) (digest.Set, error)
- func (ba *ZIPWritingBlobAccess) Get(ctx context.Context, blobDigest digest.Digest) buffer.Buffer
- func (ba *ZIPWritingBlobAccess) GetFromComposite(ctx context.Context, parentDigest, childDigest digest.Digest, ...) buffer.Buffer
- func (ba *ZIPWritingBlobAccess) Put(ctx context.Context, blobDigest digest.Digest, b buffer.Buffer) error
Constants ¶
const ( TreeRootFieldNumber protowire.Number = 1 TreeChildrenFieldNumber protowire.Number = 2 )
The Protobuf field numbers of the REv2 Tree's "root" and "children" fields. These are used in combination with util.VisitProtoBytesFields() to be able to process REv2 Tree objects in a streaming manner.
const RecommendedFindMissingDigestsCount = 10000
RecommendedFindMissingDigestsCount corresponds to the maximum number of digests that is safe to provide to BlobAccess.FindMissing() without running into size limits of underlying protocols.
Assuming that a typical REv2 Digest message is ~70 bytes in size, this will cause us to generate gRPC calls with messages of up to ~700 KB in size. This seems like a safe limit, because most gRPC implementations limit the maximum message size to a small number of megabytes (4 MB for Java, 16 MB for Go).
TODO: Would it make sense to replace BlobAccess.FindMissing() with a streaming API that abstracts away the maximum digests count? That way we can ensure proper batching of digests, even when sharding is used, ExistenceCachingBlobAccess has a high hit rate, etc..
It would be nice if such an API also supported decomposition of large objects natively. See the "Future work" section in ADR#3 for details: https://github.com/buildbarn/bb-adrs/blob/master/0003-cas-decomposition.md#future-work
Variables ¶
This section is empty.
Functions ¶
func CASPutProto ¶
func CASPutProto(ctx context.Context, blobAccess BlobAccess, message proto.Message, digestFunction digest.Function) (digest.Digest, error)
CASPutProto is a helper function for storing Protobuf messages in the Content Addressable Storage (CAS). It computes the digest of the message and stores it under that key. The digest is then returned, so that the object may be referenced.
func GetReducedActionDigest ¶
func GetReducedActionDigest(digestFunction digest.Function, action *remoteexecution.Action) (digest.Digest, error)
GetReducedActionDigest computes the digest of an Initial Size Class Cache (ISCC), or File System Access Cache (FSAC) object that corresponds to a given Action.
By only considering the Action's command digest and the platform properties when generating the digest, actions with equal command line arguments and environment variables will have the same ISCC/FSAC digest, even if their input roots differ. This should be an adequate heuristic for grouping actions with similar performance characteristics.
func VisitTopologicallySortedTree ¶
func VisitTopologicallySortedTree[TArgument any](r io.Reader, digestFunction digest.Function, maximumDirectorySizeBytes int, rootArgument *TArgument, visitDirectory TreeDirectoryVisitor[TArgument]) error
VisitTopologicallySortedTree iterates over all Directory messages contained in an REv2 Tree object. For each directory, the visitor is capable of tracking state, which it can also build up when parent directories are visited.
This function expects the REv2 Tree object to be topologically sorted, with parents being stored before children. As a result, directories are also visited in topological order.
Types ¶
type BlobAccess ¶
type BlobAccess interface { capabilities.Provider Get(ctx context.Context, digest digest.Digest) buffer.Buffer GetFromComposite(ctx context.Context, parentDigest, childDigest digest.Digest, slicer slicing.BlobSlicer) buffer.Buffer Put(ctx context.Context, digest digest.Digest, b buffer.Buffer) error FindMissing(ctx context.Context, digests digest.Set) (digest.Set, error) }
BlobAccess is an abstraction for a data store that can be used to hold an Action Cache (AC), Content Addressable Storage (CAS), or any other data store that uses keys in the form of digests.
func NewActionResultExpiringBlobAccess ¶
func NewActionResultExpiringBlobAccess(blobAccess BlobAccess, clock clock.Clock, maximumMessageSizeBytes int, minimumTimestamp time.Time, minimumValidity, maximumValidityJitter time.Duration) BlobAccess
NewActionResultExpiringBlobAccess creates a decorator for an Action Cache (AC) that automatically expires ActionResult objects after a certain amount of time has passed. This forces targets to be rebuilt periodically.
The expiration time of an ActionResult is computed by considering the 'worker_completed_timestamp' field in ExecutedActionMetadata. Jitter is added to the expiration time to amortize rebuilds. The process for determining the amount of jitter is deterministic, meaning that it is safe to use this decorator in a distributed setting.
func NewActionResultTimestampInjectingBlobAccess ¶
func NewActionResultTimestampInjectingBlobAccess(blobAccess BlobAccess, clock clock.Clock, maximumMessageSizeBytes int) BlobAccess
NewActionResultTimestampInjectingBlobAccess creates a decorator for an Action Cache (AC) that for each ActionResult message written through it, sets the execution_metadata.worker_completed_timestamp field to the current time, if not set already.
This decorator is necessary to make ActionResultExpiringBlobAccess work reliably, as it depends on this field being set. Not all clients set this field.
func NewAuthorizingBlobAccess ¶
func NewAuthorizingBlobAccess(base BlobAccess, getAuthorizer, putAuthorizer, findMissingAuthorizer auth.Authorizer) BlobAccess
NewAuthorizingBlobAccess creates a new BlobAccess which guards blob accesses by checks with Authorizers. Calls to GetCapabilities() are not checked, for the reason that the exact logic for this differs between the Action Cache (AC) and Content Addressable Storage (CAS).
func NewDemultiplexingBlobAccess ¶
func NewDemultiplexingBlobAccess(getBackend DemultiplexedBlobAccessGetter) BlobAccess
NewDemultiplexingBlobAccess creates a BlobAccess that demultiplexes requests based on the instance names. This can be used to let bb-storage serve as a proxy in front of multiple clusters (e.g., as a single on-premise cache).
For every request, calls are made to a DemultiplexedBlobAccessGetter callback that provide different backends and mutate instance names on outgoing requests.
func NewEmptyBlobInjectingBlobAccess ¶
func NewEmptyBlobInjectingBlobAccess(base BlobAccess) BlobAccess
NewEmptyBlobInjectingBlobAccess is a decorator for BlobAccess that causes it to directly process any requests for blobs of size zero. Get() operations immediately return an empty buffer, while Put() operations for such buffers are ignored.
Bazel never attempts to read the empty blob from the Content Addressable Storage, which by itself is harmless. In addition to that, it never attempts to write the empty blob. This is problematic, as it may cause unaware implementations of GetActionResult() and input root population to fail.
This problem remained undetected for a long time, because running at least one build action through bb_worker has a high probability of creating the empty blob in storage explicitly.
The consensus within the Remote APIs working group has been to give the empty blob a special meaning: the system must behave as if this blob is always present.
More details: https://github.com/bazelbuild/bazel/issues/11063
func NewErrorBlobAccess ¶
func NewErrorBlobAccess(err error) BlobAccess
NewErrorBlobAccess creates a BlobAccess that returns a fixed error response. Such an implementation is useful for adding explicit rejection of oversized requests or disabling storage entirely.
func NewExistenceCachingBlobAccess ¶
func NewExistenceCachingBlobAccess(base BlobAccess, existenceCache *digest.ExistenceCache) BlobAccess
NewExistenceCachingBlobAccess creates a decorator for BlobAccess that adds caching to the FindMissing() operation.
Clients such as Bazel tend to frequently call ContentAddressableStorage.FindMissingBlobs() with overlapping sets of digests. They don't seem to have a local cache of which digests they queried recently. This decorator adds such a cache.
This decorator may be useful to run on instances that act as frontends for a mirrored/sharding storage pool, as it may reduce the load observed on the storage pool.
func NewHierarchicalInstanceNamesBlobAccess ¶
func NewHierarchicalInstanceNamesBlobAccess(base BlobAccess) BlobAccess
NewHierarchicalInstanceNamesBlobAccess creates a decorator for BlobAccess that falls back to reading objects from parent instance names. This can be used to let non-empty instance names inherit their contents from parent instance names. This BlobAccess reads blobs in descending order of specificity, which is useful for the AC because it respects potential overriding, but should not be used for the CAS because with the CAS ascending-specificity checks are preferred to maximise sharing.
func NewMetricsBlobAccess ¶
func NewMetricsBlobAccess(blobAccess BlobAccess, clock clock.Clock, storageType, backendType string) BlobAccess
NewMetricsBlobAccess creates an adapter for BlobAccess that adds basic instrumentation in the form of Prometheus metrics.
func NewReadCanaryingBlobAccess ¶
func NewReadCanaryingBlobAccess(source, replica BlobAccess, clock clock.Clock, evictionSet eviction.Set[string], maximumCacheSize int, maximumCacheDuration time.Duration, replicaErrorLogger util.ErrorLogger) BlobAccess
NewReadCanaryingBlobAccess creates a decorator for two BlobAccess instances. One acts as the source of truth, while the other one acts as a read-only replica. This backend may be used to guarantee availability in case the replica goes offline.
By default, all requests are sent to the source. For read requests (Get and FindMissing), this backend periodically sends a single canary request to the replica. Upon success, all subsequent read requests are sent to the replica as well. Upon failure, all requests will continue to go to the source. Only infrastructure errors are considered failures.
State is tracked for each instance name separately. This ensures that if the replica uses features like AuthorizingBlobAccess or DemultiplexingBlobAccess, this backend still behaves in a meaningful way.
func NewReferenceExpandingBlobAccess ¶
func NewReferenceExpandingBlobAccess(indirectContentAddressableStorage, contentAddressableStorage BlobAccess, httpClient *http.Client, s3Client cloud_aws.S3Client, gcsClient cloud_gcp.StorageClient, maximumMessageSizeBytes int) BlobAccess
NewReferenceExpandingBlobAccess takes an Indirect Content Addressable Storage (ICAS) backend and converts it to a Content Addressable Storage (CAS) backend. Any object requested through this BlobAccess will cause its reference to be loaded from the ICAS, followed by fetching its data from the referenced location.
func NewZIPReadingBlobAccess ¶
func NewZIPReadingBlobAccess(capabilitiesProvider capabilities.Provider, readBufferFactory ReadBufferFactory, digestKeyFormat digest.KeyFormat, filesList []*zip.File) BlobAccess
NewZIPReadingBlobAccess creates a BlobAccess that is capable of reading objects from a ZIP archive. Depending on whether the containing files are compressed, files may either be randomly or sequentially accessible.
type DemultiplexedBlobAccessGetter ¶
type DemultiplexedBlobAccessGetter func(i digest.InstanceName) (BlobAccess, string, digest.InstanceNamePatcher, error)
DemultiplexedBlobAccessGetter is a callback that is provided to instances of DemultiplexingBlobAccess to resolve instance names to backends to which requests need to be forwarded.
For every backend, a name must be provided as well. This name is used as part of error messages. The name must be unique, as it is also used as a key to identify backends. An InstanceNamePatcher can also be returned to adjust the instance name for outgoing requests.
type ReadBufferFactory ¶
type ReadBufferFactory interface { // NewBufferFromByteSlice creates a buffer from a byte slice. NewBufferFromByteSlice(digest digest.Digest, data []byte, dataIntegrityCallback buffer.DataIntegrityCallback) buffer.Buffer // NewBufferFromReader creates a buffer from a reader. NewBufferFromReader(digest digest.Digest, r io.ReadCloser, dataIntegrityCallback buffer.DataIntegrityCallback) buffer.Buffer // NewBufferFromReaderAt creates a buffer from a reader that // provides random access. NewBufferFromReaderAt(digest digest.Digest, r buffer.ReadAtCloser, sizeBytes int64, dataIntegrityCallback buffer.DataIntegrityCallback) buffer.Buffer }
ReadBufferFactory is passed to many implementations of BlobAccess to be able to use the same BlobAccess implementation for both the Content Addressable Storage (CAS), Action Cache (AC) and Indirect Content Addressable Storage (ICAS). This interface provides functions for buffer creation.
var ACReadBufferFactory ReadBufferFactory = acReadBufferFactory{}
ACReadBufferFactory is capable of buffers for objects stored in the Action Cache (AC).
var CASReadBufferFactory ReadBufferFactory = casReadBufferFactory{}
CASReadBufferFactory is capable of creating buffers for objects stored in the Content Addressable Storage (CAS).
var FSACReadBufferFactory ReadBufferFactory = fsacReadBufferFactory{}
FSACReadBufferFactory is capable of creating identifiers and buffers for objects stored in the File System Access Cache (FSAC).
var ICASReadBufferFactory ReadBufferFactory = icasReadBufferFactory{}
ICASReadBufferFactory is capable of creating identifiers and buffers for objects stored in the Indirect Content Addressable Storage (ICAS).
var ISCCReadBufferFactory ReadBufferFactory = isccReadBufferFactory{}
ISCCReadBufferFactory is capable of creating identifiers and buffers for objects stored in the Initial Size Class Cache (ISCC).
func NewValidationCachingReadBufferFactory ¶
func NewValidationCachingReadBufferFactory(base ReadBufferFactory, existenceCache *digest.ExistenceCache) ReadBufferFactory
NewValidationCachingReadBufferFactory creates a decorator for ReadBufferFactory that disables data integrity checking for repeated requests for the same object. This may be a necessity for supporting efficient random access to blobs.
Information on which blobs have been accessed previously is tracked in a digest.ExistenceCache. This means that an upper bound can be placed on the maximum amount of time integrity checking is disabled.
type ReadWriterAt ¶
ReadWriterAt is a file that can be randomly read and written.
type TreeDirectoryVisitor ¶
type TreeDirectoryVisitor[TArgument any] func(d *remoteexecution.Directory, argument *TArgument, childArguments []*TArgument) error
TreeDirectoryVisitor is a callback type that is invoked by VisitTopologicallySortedTree for each Directory message contained in the REv2 Tree object.
type ZIPWritingBlobAccess ¶
type ZIPWritingBlobAccess struct { capabilities.Provider // contains filtered or unexported fields }
ZIPWritingBlobAccess is an implementation of BlobAccess that stores all objects in a ZIP archive. The resulting ZIP archives can be read using NewZIPReadingBlobAccess().
func NewZIPWritingBlobAccess ¶
func NewZIPWritingBlobAccess(capabilitiesProvider capabilities.Provider, readBufferFactory ReadBufferFactory, digestKeyFormat digest.KeyFormat, rw ReadWriterAt) *ZIPWritingBlobAccess
NewZIPWritingBlobAccess creates a new BlobAccess that stores all objects in a ZIP archive. In its initial state, the resulting ZIP file will be empty.
func (*ZIPWritingBlobAccess) Finalize ¶
func (ba *ZIPWritingBlobAccess) Finalize() error
Finalize the ZIP archive by appending a central directory to the underlying file. Once called, it is no longer possible to call Put().
func (*ZIPWritingBlobAccess) FindMissing ¶
func (ba *ZIPWritingBlobAccess) FindMissing(ctx context.Context, digests digest.Set) (digest.Set, error)
FindMissing reports which objects are absent from a ZIP archive, given a set of digests.
func (*ZIPWritingBlobAccess) Get ¶
Get the contents of an object that was successfully stored in the ZIP archive through a previous call to Put().
func (*ZIPWritingBlobAccess) GetFromComposite ¶
func (ba *ZIPWritingBlobAccess) GetFromComposite(ctx context.Context, parentDigest, childDigest digest.Digest, slicer slicing.BlobSlicer) buffer.Buffer
GetFromComposite fetches an object that is contained within a composite object that was successfully stored in the ZIP archive through a previous call to Put().
Source Files ¶
- ac_read_buffer_factory.go
- action_result_expiring_blob_access.go
- action_result_timestamp_injecting_blob_access.go
- authorizing_blob_access.go
- blob_access.go
- cas_read_buffer_factory.go
- demultiplexing_blob_access.go
- empty_blob_injecting_blob_access.go
- error_blob_access.go
- existence_caching_blob_access.go
- fsac_read_buffer_factory.go
- hierarchical_instance_names_blob_access.go
- icas_read_buffer_factory.go
- iscc_read_buffer_factory.go
- metrics_blob_access.go
- read_buffer_factory.go
- read_canarying_blob_access.go
- reference_expanding_blob_access.go
- validation_caching_read_buffer_factory.go
- visit_topologically_sorted_tree.go
- zip_reading_blob_access.go
- zip_writing_blob_access.go