blobstore

package
v0.0.0-...-f5a181e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 21, 2024 License: Apache-2.0 Imports: 37 Imported by: 34

Documentation

Index

Constants

View Source
const (
	TreeRootFieldNumber     protowire.Number = 1
	TreeChildrenFieldNumber protowire.Number = 2
)

The Protobuf field numbers of the REv2 Tree's "root" and "children" fields. These are used in combination with util.VisitProtoBytesFields() to be able to process REv2 Tree objects in a streaming manner.

View Source
const RecommendedFindMissingDigestsCount = 10000

RecommendedFindMissingDigestsCount corresponds to the maximum number of digests that is safe to provide to BlobAccess.FindMissing() without running into size limits of underlying protocols.

Assuming that a typical REv2 Digest message is ~70 bytes in size, this will cause us to generate gRPC calls with messages of up to ~700 KB in size. This seems like a safe limit, because most gRPC implementations limit the maximum message size to a small number of megabytes (4 MB for Java, 16 MB for Go).

TODO: Would it make sense to replace BlobAccess.FindMissing() with a streaming API that abstracts away the maximum digests count? That way we can ensure proper batching of digests, even when sharding is used, ExistenceCachingBlobAccess has a high hit rate, etc..

It would be nice if such an API also supported decomposition of large objects natively. See the "Future work" section in ADR#3 for details: https://github.com/buildbarn/bb-adrs/blob/master/0003-cas-decomposition.md#future-work

Variables

This section is empty.

Functions

func CASPutProto

func CASPutProto(ctx context.Context, blobAccess BlobAccess, message proto.Message, digestFunction digest.Function) (digest.Digest, error)

CASPutProto is a helper function for storing Protobuf messages in the Content Addressable Storage (CAS). It computes the digest of the message and stores it under that key. The digest is then returned, so that the object may be referenced.

func GetReducedActionDigest

func GetReducedActionDigest(digestFunction digest.Function, action *remoteexecution.Action) (digest.Digest, error)

GetReducedActionDigest computes the digest of an Initial Size Class Cache (ISCC), or File System Access Cache (FSAC) object that corresponds to a given Action.

By only considering the Action's command digest and the platform properties when generating the digest, actions with equal command line arguments and environment variables will have the same ISCC/FSAC digest, even if their input roots differ. This should be an adequate heuristic for grouping actions with similar performance characteristics.

func VisitTopologicallySortedTree

func VisitTopologicallySortedTree[TArgument any](r io.Reader, digestFunction digest.Function, maximumDirectorySizeBytes int, rootArgument *TArgument, visitDirectory TreeDirectoryVisitor[TArgument]) error

VisitTopologicallySortedTree iterates over all Directory messages contained in an REv2 Tree object. For each directory, the visitor is capable of tracking state, which it can also build up when parent directories are visited.

This function expects the REv2 Tree object to be topologically sorted, with parents being stored before children. As a result, directories are also visited in topological order.

Types

type BlobAccess

type BlobAccess interface {
	capabilities.Provider

	Get(ctx context.Context, digest digest.Digest) buffer.Buffer
	GetFromComposite(ctx context.Context, parentDigest, childDigest digest.Digest, slicer slicing.BlobSlicer) buffer.Buffer
	Put(ctx context.Context, digest digest.Digest, b buffer.Buffer) error
	FindMissing(ctx context.Context, digests digest.Set) (digest.Set, error)
}

BlobAccess is an abstraction for a data store that can be used to hold an Action Cache (AC), Content Addressable Storage (CAS), or any other data store that uses keys in the form of digests.

func NewActionResultExpiringBlobAccess

func NewActionResultExpiringBlobAccess(blobAccess BlobAccess, clock clock.Clock, maximumMessageSizeBytes int, minimumTimestamp time.Time, minimumValidity, maximumValidityJitter time.Duration) BlobAccess

NewActionResultExpiringBlobAccess creates a decorator for an Action Cache (AC) that automatically expires ActionResult objects after a certain amount of time has passed. This forces targets to be rebuilt periodically.

The expiration time of an ActionResult is computed by considering the 'worker_completed_timestamp' field in ExecutedActionMetadata. Jitter is added to the expiration time to amortize rebuilds. The process for determining the amount of jitter is deterministic, meaning that it is safe to use this decorator in a distributed setting.

func NewActionResultTimestampInjectingBlobAccess

func NewActionResultTimestampInjectingBlobAccess(blobAccess BlobAccess, clock clock.Clock, maximumMessageSizeBytes int) BlobAccess

NewActionResultTimestampInjectingBlobAccess creates a decorator for an Action Cache (AC) that for each ActionResult message written through it, sets the execution_metadata.worker_completed_timestamp field to the current time, if not set already.

This decorator is necessary to make ActionResultExpiringBlobAccess work reliably, as it depends on this field being set. Not all clients set this field.

func NewAuthorizingBlobAccess

func NewAuthorizingBlobAccess(base BlobAccess, getAuthorizer, putAuthorizer, findMissingAuthorizer auth.Authorizer) BlobAccess

NewAuthorizingBlobAccess creates a new BlobAccess which guards blob accesses by checks with Authorizers. Calls to GetCapabilities() are not checked, for the reason that the exact logic for this differs between the Action Cache (AC) and Content Addressable Storage (CAS).

func NewDemultiplexingBlobAccess

func NewDemultiplexingBlobAccess(getBackend DemultiplexedBlobAccessGetter) BlobAccess

NewDemultiplexingBlobAccess creates a BlobAccess that demultiplexes requests based on the instance names. This can be used to let bb-storage serve as a proxy in front of multiple clusters (e.g., as a single on-premise cache).

For every request, calls are made to a DemultiplexedBlobAccessGetter callback that provide different backends and mutate instance names on outgoing requests.

func NewEmptyBlobInjectingBlobAccess

func NewEmptyBlobInjectingBlobAccess(base BlobAccess) BlobAccess

NewEmptyBlobInjectingBlobAccess is a decorator for BlobAccess that causes it to directly process any requests for blobs of size zero. Get() operations immediately return an empty buffer, while Put() operations for such buffers are ignored.

Bazel never attempts to read the empty blob from the Content Addressable Storage, which by itself is harmless. In addition to that, it never attempts to write the empty blob. This is problematic, as it may cause unaware implementations of GetActionResult() and input root population to fail.

This problem remained undetected for a long time, because running at least one build action through bb_worker has a high probability of creating the empty blob in storage explicitly.

The consensus within the Remote APIs working group has been to give the empty blob a special meaning: the system must behave as if this blob is always present.

More details: https://github.com/bazelbuild/bazel/issues/11063

func NewErrorBlobAccess

func NewErrorBlobAccess(err error) BlobAccess

NewErrorBlobAccess creates a BlobAccess that returns a fixed error response. Such an implementation is useful for adding explicit rejection of oversized requests or disabling storage entirely.

func NewExistenceCachingBlobAccess

func NewExistenceCachingBlobAccess(base BlobAccess, existenceCache *digest.ExistenceCache) BlobAccess

NewExistenceCachingBlobAccess creates a decorator for BlobAccess that adds caching to the FindMissing() operation.

Clients such as Bazel tend to frequently call ContentAddressableStorage.FindMissingBlobs() with overlapping sets of digests. They don't seem to have a local cache of which digests they queried recently. This decorator adds such a cache.

This decorator may be useful to run on instances that act as frontends for a mirrored/sharding storage pool, as it may reduce the load observed on the storage pool.

func NewHierarchicalInstanceNamesBlobAccess

func NewHierarchicalInstanceNamesBlobAccess(base BlobAccess) BlobAccess

NewHierarchicalInstanceNamesBlobAccess creates a decorator for BlobAccess that falls back to reading objects from parent instance names. This can be used to let non-empty instance names inherit their contents from parent instance names. This BlobAccess reads blobs in descending order of specificity, which is useful for the AC because it respects potential overriding, but should not be used for the CAS because with the CAS ascending-specificity checks are preferred to maximise sharing.

func NewMetricsBlobAccess

func NewMetricsBlobAccess(blobAccess BlobAccess, clock clock.Clock, storageType, backendType string) BlobAccess

NewMetricsBlobAccess creates an adapter for BlobAccess that adds basic instrumentation in the form of Prometheus metrics.

func NewReadCanaryingBlobAccess

func NewReadCanaryingBlobAccess(source, replica BlobAccess, clock clock.Clock, evictionSet eviction.Set[string], maximumCacheSize int, maximumCacheDuration time.Duration, replicaErrorLogger util.ErrorLogger) BlobAccess

NewReadCanaryingBlobAccess creates a decorator for two BlobAccess instances. One acts as the source of truth, while the other one acts as a read-only replica. This backend may be used to guarantee availability in case the replica goes offline.

By default, all requests are sent to the source. For read requests (Get and FindMissing), this backend periodically sends a single canary request to the replica. Upon success, all subsequent read requests are sent to the replica as well. Upon failure, all requests will continue to go to the source. Only infrastructure errors are considered failures.

State is tracked for each instance name separately. This ensures that if the replica uses features like AuthorizingBlobAccess or DemultiplexingBlobAccess, this backend still behaves in a meaningful way.

func NewReferenceExpandingBlobAccess

func NewReferenceExpandingBlobAccess(indirectContentAddressableStorage, contentAddressableStorage BlobAccess, httpClient *http.Client, s3Client cloud_aws.S3Client, gcsClient cloud_gcp.StorageClient, maximumMessageSizeBytes int) BlobAccess

NewReferenceExpandingBlobAccess takes an Indirect Content Addressable Storage (ICAS) backend and converts it to a Content Addressable Storage (CAS) backend. Any object requested through this BlobAccess will cause its reference to be loaded from the ICAS, followed by fetching its data from the referenced location.

func NewZIPReadingBlobAccess

func NewZIPReadingBlobAccess(capabilitiesProvider capabilities.Provider, readBufferFactory ReadBufferFactory, digestKeyFormat digest.KeyFormat, filesList []*zip.File) BlobAccess

NewZIPReadingBlobAccess creates a BlobAccess that is capable of reading objects from a ZIP archive. Depending on whether the containing files are compressed, files may either be randomly or sequentially accessible.

type DemultiplexedBlobAccessGetter

type DemultiplexedBlobAccessGetter func(i digest.InstanceName) (BlobAccess, string, digest.InstanceNamePatcher, error)

DemultiplexedBlobAccessGetter is a callback that is provided to instances of DemultiplexingBlobAccess to resolve instance names to backends to which requests need to be forwarded.

For every backend, a name must be provided as well. This name is used as part of error messages. The name must be unique, as it is also used as a key to identify backends. An InstanceNamePatcher can also be returned to adjust the instance name for outgoing requests.

type ReadBufferFactory

type ReadBufferFactory interface {
	// NewBufferFromByteSlice creates a buffer from a byte slice.
	NewBufferFromByteSlice(digest digest.Digest, data []byte, dataIntegrityCallback buffer.DataIntegrityCallback) buffer.Buffer
	// NewBufferFromReader creates a buffer from a reader.
	NewBufferFromReader(digest digest.Digest, r io.ReadCloser, dataIntegrityCallback buffer.DataIntegrityCallback) buffer.Buffer
	// NewBufferFromReaderAt creates a buffer from a reader that
	// provides random access.
	NewBufferFromReaderAt(digest digest.Digest, r buffer.ReadAtCloser, sizeBytes int64, dataIntegrityCallback buffer.DataIntegrityCallback) buffer.Buffer
}

ReadBufferFactory is passed to many implementations of BlobAccess to be able to use the same BlobAccess implementation for both the Content Addressable Storage (CAS), Action Cache (AC) and Indirect Content Addressable Storage (ICAS). This interface provides functions for buffer creation.

var ACReadBufferFactory ReadBufferFactory = acReadBufferFactory{}

ACReadBufferFactory is capable of buffers for objects stored in the Action Cache (AC).

var CASReadBufferFactory ReadBufferFactory = casReadBufferFactory{}

CASReadBufferFactory is capable of creating buffers for objects stored in the Content Addressable Storage (CAS).

var FSACReadBufferFactory ReadBufferFactory = fsacReadBufferFactory{}

FSACReadBufferFactory is capable of creating identifiers and buffers for objects stored in the File System Access Cache (FSAC).

var ICASReadBufferFactory ReadBufferFactory = icasReadBufferFactory{}

ICASReadBufferFactory is capable of creating identifiers and buffers for objects stored in the Indirect Content Addressable Storage (ICAS).

var ISCCReadBufferFactory ReadBufferFactory = isccReadBufferFactory{}

ISCCReadBufferFactory is capable of creating identifiers and buffers for objects stored in the Initial Size Class Cache (ISCC).

func NewValidationCachingReadBufferFactory

func NewValidationCachingReadBufferFactory(base ReadBufferFactory, existenceCache *digest.ExistenceCache) ReadBufferFactory

NewValidationCachingReadBufferFactory creates a decorator for ReadBufferFactory that disables data integrity checking for repeated requests for the same object. This may be a necessity for supporting efficient random access to blobs.

Information on which blobs have been accessed previously is tracked in a digest.ExistenceCache. This means that an upper bound can be placed on the maximum amount of time integrity checking is disabled.

type ReadWriterAt

type ReadWriterAt interface {
	io.ReaderAt
	io.WriterAt
}

ReadWriterAt is a file that can be randomly read and written.

type TreeDirectoryVisitor

type TreeDirectoryVisitor[TArgument any] func(d *remoteexecution.Directory, argument *TArgument, childArguments []*TArgument) error

TreeDirectoryVisitor is a callback type that is invoked by VisitTopologicallySortedTree for each Directory message contained in the REv2 Tree object.

type ZIPWritingBlobAccess

type ZIPWritingBlobAccess struct {
	capabilities.Provider
	// contains filtered or unexported fields
}

ZIPWritingBlobAccess is an implementation of BlobAccess that stores all objects in a ZIP archive. The resulting ZIP archives can be read using NewZIPReadingBlobAccess().

func NewZIPWritingBlobAccess

func NewZIPWritingBlobAccess(capabilitiesProvider capabilities.Provider, readBufferFactory ReadBufferFactory, digestKeyFormat digest.KeyFormat, rw ReadWriterAt) *ZIPWritingBlobAccess

NewZIPWritingBlobAccess creates a new BlobAccess that stores all objects in a ZIP archive. In its initial state, the resulting ZIP file will be empty.

func (*ZIPWritingBlobAccess) Finalize

func (ba *ZIPWritingBlobAccess) Finalize() error

Finalize the ZIP archive by appending a central directory to the underlying file. Once called, it is no longer possible to call Put().

func (*ZIPWritingBlobAccess) FindMissing

func (ba *ZIPWritingBlobAccess) FindMissing(ctx context.Context, digests digest.Set) (digest.Set, error)

FindMissing reports which objects are absent from a ZIP archive, given a set of digests.

func (*ZIPWritingBlobAccess) Get

func (ba *ZIPWritingBlobAccess) Get(ctx context.Context, blobDigest digest.Digest) buffer.Buffer

Get the contents of an object that was successfully stored in the ZIP archive through a previous call to Put().

func (*ZIPWritingBlobAccess) GetFromComposite

func (ba *ZIPWritingBlobAccess) GetFromComposite(ctx context.Context, parentDigest, childDigest digest.Digest, slicer slicing.BlobSlicer) buffer.Buffer

GetFromComposite fetches an object that is contained within a composite object that was successfully stored in the ZIP archive through a previous call to Put().

func (*ZIPWritingBlobAccess) Put

func (ba *ZIPWritingBlobAccess) Put(ctx context.Context, blobDigest digest.Digest, b buffer.Buffer) error

Put a new object in the ZIP archive.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL