Documentation ¶
Index ¶
- func GetFromCompositeWithBlobReplicator(ctx context.Context, parentDigest, childDigest digest.Digest, ...) buffer.Buffer
- func GetWithBlobReplicator(ctx context.Context, digest digest.Digest, initialBackend blobstore.BlobAccess, ...) buffer.Buffer
- func NewReplicatorServer(replicator BlobReplicator) replicator_pb.ReplicatorServer
- type BlobReplicator
- func NewConcurrencyLimitingBlobReplicator(base BlobReplicator, sink blobstore.BlobAccess, semaphore *semaphore.Weighted) BlobReplicator
- func NewDeduplicatingBlobReplicator(base BlobReplicator, sink blobstore.BlobAccess, ...) BlobReplicator
- func NewLocalBlobReplicator(source, sink blobstore.BlobAccess) BlobReplicator
- func NewMetricsBlobReplicator(replicator BlobReplicator, clock clock.Clock, storageTypeName string) BlobReplicator
- func NewNoopBlobReplicator(source blobstore.BlobAccess) BlobReplicator
- func NewQueuedBlobReplicator(source blobstore.BlobAccess, base BlobReplicator, ...) BlobReplicator
- func NewRemoteBlobReplicator(source blobstore.BlobAccess, client grpc.ClientConnInterface) BlobReplicator
- type BlobReplicatorSelector
- type NestedBlobReplicator
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GetFromCompositeWithBlobReplicator ¶
func GetFromCompositeWithBlobReplicator(ctx context.Context, parentDigest, childDigest digest.Digest, slicer slicing.BlobSlicer, initialBackend blobstore.BlobAccess, successiveBackends BlobReplicatorSelector) buffer.Buffer
GetFromCompositeWithBlobReplicator is equivalent to GetWithBlobReplicator, except that it's a common implementation of BlobAccess.GetFromComposite().
func GetWithBlobReplicator ¶
func GetWithBlobReplicator(ctx context.Context, digest digest.Digest, initialBackend blobstore.BlobAccess, successiveBackends BlobReplicatorSelector) buffer.Buffer
GetWithBlobReplicator is a common implementation of BlobAccess.Get() that can be used by backends that call into a single backend, and fall back to calling into a BlobReplicator upon failure. This is a common pattern, used by backends such as MirroredBlobAccess and ReadCachingBlobAccess.
func NewReplicatorServer ¶
func NewReplicatorServer(replicator BlobReplicator) replicator_pb.ReplicatorServer
NewReplicatorServer creates a gRPC stub for the Replicator service that forwards all calls to BlobReplicator.
Types ¶
type BlobReplicator ¶
type BlobReplicator interface { // Replicate a single object between backends, while at the same // time giving a handle back to it. ReplicateSingle(ctx context.Context, digest digest.Digest) buffer.Buffer // Replicate a single composite object between backends, while // at the same time giving a handle back to one of its children. ReplicateComposite(ctx context.Context, parentDigest, childDigest digest.Digest, slicer slicing.BlobSlicer) buffer.Buffer // Replicate a set of objects between backends. ReplicateMultiple(ctx context.Context, digests digest.Set) error }
BlobReplicator provides the strategy that is used by MirroredBlobAccess to replicate objects between storage backends. This strategy is called into when MirroredBlobAccess detects that a certain object is only present in one of the two backends.
func NewConcurrencyLimitingBlobReplicator ¶
func NewConcurrencyLimitingBlobReplicator(base BlobReplicator, sink blobstore.BlobAccess, semaphore *semaphore.Weighted) BlobReplicator
NewConcurrencyLimitingBlobReplicator creates a decorator for BlobReplicator that uses a semaphore to place a limit on the number of concurrent replication requests. This can be used to prevent excessive amounts of congestion on the network.
The semaphore.Weighted type retains the original request order, meaning that starvation is prevented.
func NewDeduplicatingBlobReplicator ¶
func NewDeduplicatingBlobReplicator(base BlobReplicator, sink blobstore.BlobAccess, sinkDigestKeyFormat digest.KeyFormat) BlobReplicator
NewDeduplicatingBlobReplicator creates a decorator for BlobReplicator that ensures that blobs are not replicated redundantly. Replication requests for the same blob are merged. To deal with potential race conditions, this replicator double checks whether the sink already contains a blob before copying.
In order to guarantee responsiveness for all callers, this replicator decomposes requests for multiple blobs into one request per blob. To prevent callers from stalling the replication process, it also doesn't stream data back to the caller as it is being replicated. This means that blobs are fully replicated from the source to the sink, prior to letting the caller read the data from the sink at its own pace.
This replicator has been designed to reduce the amount of traffic against the source to an absolute minimum, at the cost of generating more traffic against the sink. It is recommended to use this replicator when the sink is an instance of LocalBlobAccess that is embedded into the same process, and blobs are expected to be consumed locally.
func NewLocalBlobReplicator ¶
func NewLocalBlobReplicator(source, sink blobstore.BlobAccess) BlobReplicator
NewLocalBlobReplicator creates a BlobReplicator that can be used to let MirroredBlobAccess repair inconsistencies between backends directly.
This replicator tends to be sufficient for the Action Cache (AC), but for the Content Addressable Storage (CAS) it may be inefficient. If MirroredBlobAccess is used by many clients, each having a high concurrency, this replicator may cause redundant replications and load spikes. A separate replication daemon (bb_replicator) should be used for such setups.
func NewMetricsBlobReplicator ¶
func NewMetricsBlobReplicator(replicator BlobReplicator, clock clock.Clock, storageTypeName string) BlobReplicator
NewMetricsBlobReplicator creates a wrapper around BlobReplicator that adds Prometheus metrics for monitoring replication operations.
func NewNoopBlobReplicator ¶
func NewNoopBlobReplicator(source blobstore.BlobAccess) BlobReplicator
NewNoopBlobReplicator creates a BlobReplicator that can be used to access a single source without replication.
It is useful for the BlobAccess variants where replication is optional.
func NewQueuedBlobReplicator ¶
func NewQueuedBlobReplicator(source blobstore.BlobAccess, base BlobReplicator, existenceCache *digest.ExistenceCache) BlobReplicator
NewQueuedBlobReplicator creates a decorator for BlobReplicator that serializes and deduplicates requests. It can be used to place a limit on the amount of replication traffic.
TODO: The current implementation is a bit simplistic, in that it does not guarantee fairness. Should all requests be processed in FIFO order? Alternatively, should we replicate objects with most waiters first?
func NewRemoteBlobReplicator ¶
func NewRemoteBlobReplicator(source blobstore.BlobAccess, client grpc.ClientConnInterface) BlobReplicator
NewRemoteBlobReplicator creates a BlobReplicator that forwards requests to a remote gRPC service. This service may be used to deduplicate and queue replication actions globally.
type BlobReplicatorSelector ¶
type BlobReplicatorSelector func(observedErr error) (BlobReplicator, error)
BlobReplicatorSelector is called into by GetWithBlobReplicator to obtain a BlobReplicator that is used after a failure has been observed.
type NestedBlobReplicator ¶
type NestedBlobReplicator struct {
// contains filtered or unexported fields
}
NestedBlobReplicator is a helper type for BlobReplicator that can be used to copy nested hierarchies of objects stored in the Content Addressable Storage (CAS). In the case of the REv2 protocol, these are Action, Directory and Tree messages.
func NewNestedBlobReplicator ¶
func NewNestedBlobReplicator(replicator BlobReplicator, digestKeyFormat digest.KeyFormat, maximumMessageSizeBytes int) *NestedBlobReplicator
NewNestedBlobReplicator creates a new NestedBlobReplicator that does not have any objects to be replicated queued.
func (*NestedBlobReplicator) EnqueueAction ¶
func (nr *NestedBlobReplicator) EnqueueAction(actionDigest digest.Digest)
EnqueueAction enqueues an REv2 Action to be replicated. The referenced input root and Command message will be replicated as well.
func (*NestedBlobReplicator) EnqueueDirectory ¶
func (nr *NestedBlobReplicator) EnqueueDirectory(directoryDigest digest.Digest)
EnqueueDirectory enqueues an REv2 Directory to be replicated. Any referenced file or child Directory message will be replicated as well, recursively.
func (*NestedBlobReplicator) EnqueueTree ¶
func (nr *NestedBlobReplicator) EnqueueTree(treeDigest digest.Digest)
EnqueueTree enqueues an REv2 Tree to be replicated. Any referenced file will be replicated as well.
func (*NestedBlobReplicator) Replicate ¶
func (nr *NestedBlobReplicator) Replicate(ctx context.Context) error
Replicate objects that are enqueued. This method will continue to run until all enqueued objects are replicated. It is safe to call this method from multiple goroutines, to increase parallelism.