local

package
v0.0.0-...-07366c6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 23, 2025 License: Apache-2.0 Imports: 29 Imported by: 2

Documentation

Index

Constants

View Source
const (
	// BlockDeviceBackedLocationRecordSize is the size of a single
	// serialized LocationRecord in bytes. In serialized form, a
	// LocationRecord contains the following fields:
	//
	// - Epoch ID                     4 bytes
	// - Blocks from last             2 bytes
	// - Key:                        32 bytes
	// - Hash table probing attempt   4 bytes
	// - Blob offset                  8 bytes
	// - Blob length                  8 bytes
	// - Record checksum              8 bytes
	//                        Total: 66 bytes
	BlockDeviceBackedLocationRecordSize = 4 + 2 + sha256.Size + 4 + 8 + 8 + 8
)

Variables

View Source
var ErrLocationRecordInvalid = errors.New("Location record invalid")

ErrLocationRecordInvalid is an error code that may be returned by LocationRecordArray.Get() to indicate that the LocationRecord stored at a given index is invalid.

Entries are invalid if they have never been set, or when they point to a location that is no longer valid. The latter can happen when BlockList.PopFront() is called.

This error should never be returned to the user. It should be caught by consumers of LocationRecordArray, such as HashingKeyLocationMap.

Functions

func NewFlatBlobAccess

func NewFlatBlobAccess(keyLocationMap KeyLocationMap, locationBlobMap LocationBlobMap, digestKeyFormat digest.KeyFormat, lock *sync.RWMutex, storageType string, capabilitiesProvider capabilities.Provider) blobstore.BlobAccess

NewFlatBlobAccess creates a BlobAccess that forwards all calls to KeyLocationMap and LocationBlobMap backend. It's called 'flat', because it assumes all objects are stored in a flat namespace. It either ignores the REv2 instance name in digests entirely, or it strongly partitions objects by instance name. It does not introduce any hierarchy.

func NewHierarchicalCASBlobAccess

func NewHierarchicalCASBlobAccess(keyLocationMap KeyLocationMap, locationBlobMap LocationBlobMap, lock *sync.RWMutex, capabilitiesProvider capabilities.Provider) blobstore.BlobAccess

NewHierarchicalCASBlobAccess creates a BlobAccess that uses a KeyLocationMap and a LocationBlobMap as backing stores.

The BlobAccess returned by this function can be thought of as being an alternative to FlatBlobAccess, with one big difference: it keeps track of which REv2 instance name prefixes are permitted to access an object. It does this by writing multiple entries into the key-location map:

  • One canonical entry that always points to the newest copy of an object. This entry's key does not contain an instance name.
  • One or more lookup entries, whose keys contain an instance name prefix. These are only synchronized with the canonical entry when the lookup entry points to an object that needs to be refreshed.

As the name implies, this implementation should only be used for the Content Addressable Storage (CAS). This is because writes for objects that already exist for a different REv2 instance name don't cause any new data to be ingested. This makes this implementation unsuitable for mutable data sets.

Types

type Block

type Block interface {
	Get(digest digest.Digest, offsetBytes, sizeBytes int64, dataIntegrityCallback buffer.DataIntegrityCallback) buffer.Buffer
	HasSpace(sizeBytes int64) bool
	Put(sizeBytes int64) BlockPutWriter
	Release()
}

Block of storage that contains a sequence of blobs. Buffers returned by Get() must remain valid, even if Release() is called.

type BlockAllocator

type BlockAllocator interface {
	// Used to allocate a fresh block of data. The location at which
	// this block is stored is returned, both to allow the caller to
	// store this information as part of persistent state and to
	// detect recycling of blocks that were used previously.
	//
	// If persistent storage is not supported, nil may be returned.
	NewBlock() (Block, *pb.BlockLocation, error)

	// Used to obtain a block of data at an explicit location. This is
	// called when attempting to reuse previous persistent state.
	//
	// This function may fail if no free block at this location
	// exists, or if persistent storage is not provided.
	NewBlockAtLocation(location *pb.BlockLocation, writeOffsetBytes int64) (Block, bool)
}

BlockAllocator is used by BlockList to allocate large blocks of storage (in-memory or on-disk) at a time. These blocks are then filled with blobs that are stored without any padding in between.

The methods provided this interface are not thread-safe. Exclusive locking must be used when allocating blocks.

func NewBlockDeviceBackedBlockAllocator

func NewBlockDeviceBackedBlockAllocator(blockDevice blockdevice.BlockDevice, readBufferFactory blobstore.ReadBufferFactory, sectorSizeBytes int, blockSectorCount int64, blockCount int, storageType string) BlockAllocator

NewBlockDeviceBackedBlockAllocator implements a BlockAllocator that can be used by implementations of BlockList to store data. Blocks created by this allocator are backed by a single BlockDevice. The BlockDevice is partitioned into equally sized blocks that are stored consecutively.

Blocks are initially handed out by increasing offset. Later on, the least recently released blocks are reused. This adds wear leveling to the system.

This implementation also ensures that writes against underlying storage are all performed at sector boundaries and sizes. This ensures that no unnecessary reads are performed.

func NewInMemoryBlockAllocator

func NewInMemoryBlockAllocator(blockSize int) BlockAllocator

NewInMemoryBlockAllocator creates a block allocator that stores its blocks directly in memory, being backed by a simple byte slice. The byte slice is already fully allocated. It does not grow to the desired size lazily.

type BlockList

type BlockList interface {
	BlockReferenceResolver

	// PopFront removes the oldest block from the beginning of the
	// BlockList.
	PopFront()

	// PushBack adds a new block to the end of the BlockList.
	PushBack() error

	// Get a blob from a given block in the BlockList.
	Get(blockIndex int, digest digest.Digest, offsetBytes, sizeBytes int64, dataIntegrityCallback buffer.DataIntegrityCallback) buffer.Buffer

	// HasSpace returns whether a given block in the BlockList is
	// capable of storing an additional blob of a given size.
	HasSpace(blockIndex int, sizeBytes int64) bool

	// Put a new blob in a given block in the BlockList.
	Put(blockIndex int, sizeBytes int64) BlockListPutWriter
}

BlockList keeps track of a list of blocks that are handed out by an underlying BlockAllocator. For every block, BlockList tracks how much space in the block is consumed.

BlockList is only partially thread-safe. The BlockReferenceResolver methods and BlockList.Get() can be invoked in parallel (e.g., under a read lock), while BlockList.PopFront(), BlockList.PushBack(), BlockList.HasSpace(), BlockList.Put() and BlockListPutFinalizer must run exclusively (e.g., under a write lock). BlockListPutWriter is safe to call without holding any locks.

func NewVolatileBlockList

func NewVolatileBlockList(blockAllocator BlockAllocator) BlockList

NewVolatileBlockList creates a BlockList that is suitable for non-persistent data stores.

type BlockListGrowthPolicy

type BlockListGrowthPolicy interface {
	ShouldGrowNewBlocks(currentBlocks, newBlocks int) bool
	ShouldGrowCurrentBlocks(currentBlocks int) bool
}

BlockListGrowthPolicy is used by OldCurrentNewLocationBlobMap to determine whether the number of Blocks in the underlying BlockList is allowed to grow.

func NewImmutableBlockListGrowthPolicy

func NewImmutableBlockListGrowthPolicy(currentBlocks, newBlocks int) BlockListGrowthPolicy

NewImmutableBlockListGrowthPolicy creates a BlockListGrowthPolicy that is suitable for data stores that hold objects that are immutable, such as the Content Addressable Storage (CAS).

This policy permits new objects to be written to multiple Blocks, which is good for ensuring that data is spread out evenly. This amortizes the cost of refreshing these objects in the future.

It also allows the number of "new" blocks to exceed the configured maximum in case the number of "current" blocks is low, increasing the spread of data even further.

func NewMutableBlockListGrowthPolicy

func NewMutableBlockListGrowthPolicy(currentBlocks int) BlockListGrowthPolicy

NewMutableBlockListGrowthPolicy creates a BlockListGrowthPolicy that is suitable for data stores that hold objects that are mutable, such as the Action Cache (AC). Calls such as UpdateActionResult() are expected to replace existing entries.

This policy only permits new objects to be written to the latest Block. This ensures that updating the corresponding entry in the KeyLocationMap is guaranteed to succeed.

type BlockListPutFinalizer

type BlockListPutFinalizer = BlockPutFinalizer

BlockListPutFinalizer is returned by BlockListPutWriter after writing of data has finished. This function returns the offset at which the blob was stored. There is no guarantee that this data can be extracted again, as BlockList.PopFront() may have been called in the meantime.

type BlockListPutWriter

type BlockListPutWriter = BlockPutWriter

BlockListPutWriter is a callback that is returned by BlockList.Put(). It can be used to store data corresponding to a blob in space that has been allocated. It is safe to call this function without holding any locks.

This function blocks until all data contained in the Buffer has been processed or an error occurs. A BlockListPutFinalizer is returned that the caller must invoke while locked.

type BlockPutFinalizer

type BlockPutFinalizer func() (int64, error)

BlockPutFinalizer is returned by BlockPutWriter after writing of data has finished. This function returns the offset at which the blob was stored.

type BlockPutWriter

type BlockPutWriter func(b buffer.Buffer) BlockPutFinalizer

BlockPutWriter is a callback that is returned by Block.Put(). It can be used to store data corresponding to a blob in space that has been allocated. It is safe to call this function without holding any locks.

This function blocks until all data contained in the Buffer has been processed or an error occurs. A BlockPutFinalizer is returned that the caller must invoke while locked.

type BlockReference

type BlockReference struct {
	// A version number of the layout of a BlockList. This version
	// number can be constructed by BlockList any way it sees fit.
	EpochID uint32

	// The number to subtract from the index of the last block
	// associated with this version of the layout of the BlockList.
	// More concretely, zero corresponds to the last block in that
	// epoch, one corresponds to the second-to-last block, etc..
	BlocksFromLast uint16
}

BlockReference holds a stable reference to a block in a BlockList.

BlockList uses simple integer indices to refer to blocks. This is useful, because these can be bounds checked and compared for (in)equality, which is a requirement for HashingKeyLocationMap and OldNewCurrentLocationBlobMap to function properly.

What is problematic about these integer indices is that they become invalidated every time BlockList.PopFront() is called. This means that it's safe to use them as references to blocks within the context of a single operation while locks are held, but not to store them to refer to blocks later on (e.g., as done by LocationRecordArray).

The goal of BlockReference is to act as a stable identifier for a block. Conversion functions are provided by BlockReferenceResolver to translate between BlockReferences and integer indices.

type BlockReferenceResolver

type BlockReferenceResolver interface {
	// BlockReferenceToBlockIndex converts a BlockReference to an
	// integer index. The boolean value of this function indicates
	// whether the conversion is successful. Conversions may fail if
	// the epoch ID is too far in the past or in the future, or when
	// BlocksFromLast refers to a block that has already been
	// released.
	BlockReferenceToBlockIndex(blockReference BlockReference) (int, uint64, bool)

	// BlockIndexToBlockReference converts an integer index of a
	// block to a BlockReference. The BlockReference will use the
	// latest epoch ID.
	//
	// It is invalid to call this function with a block index that
	// is out of bounds.
	BlockIndexToBlockReference(blockIndex int) (BlockReference, uint64)
}

BlockReferenceResolver is a helper type that can be used to convert between BlockReferences and integer indices of blocks managed by a BlockList.

Both methods provided by this interface return a 64-bit hash seed. This value can be used by implementations of LocationRecordArray as a seed for checksum validation of individual entries. The idea behind using this value, as opposed to using some constant, is that it automatically causes entries for uncommitted changes to be discarded upon restart. Restarts after crashes may cause the same epoch ID to be reused, but the hash seed will differ.

type DataSyncer

type DataSyncer func() error

DataSyncer is a callback that needs PeriodicSyncer.ProcessBlockPut() calls into to request that the contents of blocks are synchronized to disk.

Synchronizing these is a requirements to ensure that the KeyLocationMap does not reference objects that are only partially written.

type Key

type Key [sha256.Size]byte

Key is the key type for objects managed by KeyLocationMap.

func NewKeyFromString

func NewKeyFromString(s string) Key

NewKeyFromString creates a new Key based on a string value. Because keys are fixed size, this function uses SHA-256 to convert a variable size string to a key. This means that keys are irreversible.

type KeyLocationMap

type KeyLocationMap interface {
	Get(key Key) (Location, error)
	Put(key Key, location Location) error
}

KeyLocationMap is equivalent to a map[Key]Location. It is used by FlatBlobAccess and HierarchicalCASBlobAccess to track where blobs are stored, so that they may be accessed. Implementations are permitted to discard entries for outdated locations during lookups/insertions using the provided validator.

func NewHashingKeyLocationMap

func NewHashingKeyLocationMap(recordArray LocationRecordArray, recordsCount int, hashInitialization uint64, maximumGetAttempts uint32, maximumPutAttempts int, storageType string) KeyLocationMap

NewHashingKeyLocationMap creates a KeyLocationMap backed by a hash table that uses a strategy similar to Robin Hood hashing to handle collisions. By displacing entries for older locations in favour of newer locations, older locations are gradually pushed to the 'outside' of the hash table.

Data is stored in a LocationRecordArray. Because implementations of LocationRecordArray take a BlockReferenceResolver, they can treat entries for no longer existent locations as invalid. This makes the hash table self-cleaning.

Because the hash table has a limited size (and does not resize), there is a risk of the hash collision rate becoming too high. In the case of a full hash table, it would even deadlock. To prevent this from happening, there is a fixed upper bound on the number of iterations Get() and Put() are willing to run. Records will be discarded once the upper bound is reached. Though this may sound harmful, there is a very high probability that the entry being discarded is one of the older ones.

As both the hashing function that is used by LocationRecordKey and the slot computation of HashingKeyLocationMap use modulo arithmetic, it is recommended to let recordsCount be prime to ensure proper distribution of records.

type Location

type Location struct {
	BlockIndex  int
	OffsetBytes int64
	SizeBytes   int64
}

Location at which a blob is stored within blocks managed by implementations of BlockList. A location consists of a number that identifies a block in a BlockList and the region within the block.

func (Location) IsOlder

func (a Location) IsOlder(b Location) bool

IsOlder returns true if the receiving Location is stored in Block that is older than the Location argument, or if it is stored prior to the Location argument within the same Block.

type LocationBlobGetter

type LocationBlobGetter func(digest digest.Digest) buffer.Buffer

LocationBlobGetter is a callback that is returned by LocationBlobMap.Get(). It can be used to obtain a Buffer that gives access to the data associated with the blob.

Calls to LocationBlobMap.Put() and LocationBlobPutFinalizer invalidate any of the LocationBlobGetters returned by LocationBlobMap.Get(). Calling them is a programming mistake.

type LocationBlobMap

type LocationBlobMap interface {
	// Get information about a blob stored in the map.
	//
	// A LocationBlobGetter is returned that can be invoked to
	// instantiate a Buffer that gives access to the data associated
	// with the blob.
	//
	// In addition to that, it is returned whether the blob needs to
	// be refreshed. When true, there is a high probability that the
	// blob will disappear in the nearby future due to recycling of
	// storage space. The caller is advised to call Put() to
	// reupload the blob. Because Put() invalidates
	// LocationBlobGetters, this function must be called after the
	// LocationBlobGetters is invoked.
	Get(location Location) (LocationBlobGetter, bool)

	// Put a new blob to storage.
	//
	// This function returns a LocationBlobPutWriter, which must be
	// called afterwards to provide the data. Once finished, a
	// LocationBlobPutFinalizer must be invoked to release any
	// internal resources and obtain the blob's location.
	Put(sizeBytes int64) (LocationBlobPutWriter, error)
}

LocationBlobMap implements a data store for blobs. Blobs can be reobtained by providing a location to Get() that is returned by Put(). Because not all locations are valid (and may not necessarily remain valid over time), all Locations provided to Get() must be validated using a BlockReferenceResolver.

LocationBlobMap is only partially thread-safe. LocationBlobMap.Get() and LocationBlobGetter can be invoked in parallel (e.g., under a read lock), while LocationBlobMap.Put() and LocationBlobPutFinalizer must run exclusively (e.g., under a write lock). LocationBlobPutWriter is safe to call without holding any locks.

type LocationBlobPutFinalizer

type LocationBlobPutFinalizer func() (Location, error)

LocationBlobPutFinalizer is returned by LocationBlobPutWriter after writing of data has finished. This function returns the location at which the blob was stored.

The Location returned by this function must still be valid at the time it is returned. If the write takes such a long time that the space associated with the blob is already released, this function must fail.

type LocationBlobPutWriter

type LocationBlobPutWriter func(b buffer.Buffer) LocationBlobPutFinalizer

LocationBlobPutWriter is a callback that is returned by LocationBlobMap.Put(). It can be used to store data corresponding to a blob in space that has been allocated. It is safe to call this function without holding any locks.

This function blocks until all data contained in the Buffer has been processed or an error occurs. A LocationBlobPutFinalizer is returned that the caller must invoke while locked.

type LocationRecord

type LocationRecord struct {
	RecordKey LocationRecordKey
	Location  Location
}

LocationRecord is a key-value pair that contains information on where a blob may be found.

type LocationRecordArray

type LocationRecordArray interface {
	Get(index int) (LocationRecord, error)
	Put(index int, locationRecord LocationRecord) error
}

LocationRecordArray is equivalent to a []LocationRecord. It is used as the backing store by HashingKeyLocationMap. Instead of storing data in a slice in memory, an implementation could store this information on disk for a persistent data store.

func NewBlockDeviceBackedLocationRecordArray

func NewBlockDeviceBackedLocationRecordArray(device blockdevice.BlockDevice, resolver BlockReferenceResolver) LocationRecordArray

NewBlockDeviceBackedLocationRecordArray creates a persistent LocationRecordArray. It works by using a block device as an array-like structure, writing serialized LocationRecords next to each other.

func NewInMemoryLocationRecordArray

func NewInMemoryLocationRecordArray(size int, resolver BlockReferenceResolver) LocationRecordArray

NewInMemoryLocationRecordArray creates a LocationRecordArray that stores its data in memory. HashingKeyLocationMap relies on being able to store a mapping from Keys to a Location in memory or on disk. This type implements a non-persistent storage of such a map in memory.

type LocationRecordKey

type LocationRecordKey struct {
	Key     Key
	Attempt uint32
}

LocationRecordKey contains a compact, partial binary representation of a Key that is used to identify blobs in HashingKeyLocationMap.

Because HashingKeyLocationMap uses open addressing, LocationRecords may be stored at alternative, less preferred indices. The Attempt field contains the probing distance at which the record is stored.

func (*LocationRecordKey) Hash

func (k *LocationRecordKey) Hash(hashInitialization uint64) uint64

Hash a LocationRecordKey using FNV-1a. Instead of using the well-known offset basis of 14695981039346656037, a custom initialization may be provided. This permits mirrored instances to each use a different offset basis.

In the unlikely event that the collision rate on the hash table becomes too high, records may simply get lost. By letting mirrored instances use different offset bases, it becomes less likely that both instances lose the same record.

For non-persistent setups, it is advised to use a randomly chosen offset basis to prevent collision attacks.

type OldCurrentNewLocationBlobMap

type OldCurrentNewLocationBlobMap struct {
	// contains filtered or unexported fields
}

OldCurrentNewLocationBlobMap is a LocationBlobMap that stores data in blocks. Blocks are managed using a BlockList. Blobs cannot span multiple blocks, meaning that blocks generally need to be large in size (gigabytes). The number of blocks may be relatively low. For example, for a 512 GiB cache, it is acceptable to create 32 blocks of 16 GiB in size.

Blocks are partitioned into three groups based on their creation time, named "old", "current" and "new". Blobs provided to Put() will always be stored in a block in the "new" group. When the oldest block in the "new" group becomes full, it is moved to the "current" group. This causes the oldest block in the "current" group to be displaced to the "old" group. The oldest block in the "old" group is discarded.

The difference between the "current" group and the "old" group is that the needRefresh value returned by Get() differs. Data in the "old" group is at risk of being removed in the nearby future, which is why it needs to be copied into the "new" group when requested to be retained. Data in the "current" group is assumed to remain present for the time being, which is why it is left in place. This copying is performed by FlatBlobAccess.

Below is an illustration of how the blocks of data may be laid out at a given point in time. Every column of █ characters corresponds to a single block. The number of characters indicates the amount of data stored within.

← Over time, blocks move from "new" to "current" to "old" ←

              Old         Current        New
            █ █ █ █ │ █ █ █ █ █ █ █ █ │
            █ █ █ █ │ █ █ █ █ █ █ █ █ │
            █ █ █ █ │ █ █ █ █ █ █ █ █ │
            █ █ █ █ │ █ █ █ █ █ █ █ █ │
            █ █ █ █ │ █ █ █ █ █ █ █ █ │ █
            █ █ █ █ │ █ █ █ █ █ █ █ █ │ █
            █ █ █ █ │ █ █ █ █ █ █ █ █ │ █ █
            █ █ █ █ │ █ █ █ █ █ █ █ █ │ █ █ █
            ↓ ↓ ↓ ↓                     ↑ ↑ ↑ ↑
            └─┴─┴─┴─────────────────────┴─┴─┴─┘
   Data gets copied from "old" to "new" when requested.

Blobs get stored in blocks in the "new" group with an inverse exponential probability. This is done to reduce the probability of multiple block rotations close after each other, as this might put excessive pressure on the garbage collector. Because the placement distribution decreases rapidly, having more than three or four "new" blocks would be wasteful. Having fewer is also not recommended, as that increases the chance of placing objects that are used together inside the same block. This may cause 'tidal waves' of I/O whenever such data ends up in the "old" group at once.

After initialization, there will be fewer blocks in the "current" group than configured, due to there simply being no data. This is compensated by adding more blocks to the "new" group. Unlike the regular blocks in this group, these will have a uniform placement distribution that is twice as high as normal. This is done to ensure the "current" blocks are randomly seeded to reduce 'tidal waves' later on.

The number of blocks in the "old" group should not be too low, as this would cause this storage backend to become a FIFO instead of being LRU-like. Setting it too high is also not recommended, as this would increase redundancy in the data stored. The "current" group should likely be two or three times as large as the "old" group.

func NewOldCurrentNewLocationBlobMap

func NewOldCurrentNewLocationBlobMap(blockList BlockList, blockListGrowthPolicy BlockListGrowthPolicy, errorLogger util.ErrorLogger, storageType string, blockSizeBytes int64, oldBlocksCount, newBlocksCount, initialBlocksCount int) *OldCurrentNewLocationBlobMap

NewOldCurrentNewLocationBlobMap creates a new instance of OldCurrentNewLocationBlobMap.

func (*OldCurrentNewLocationBlobMap) BlockIndexToBlockReference

func (lbm *OldCurrentNewLocationBlobMap) BlockIndexToBlockReference(blockIndex int) (BlockReference, uint64)

BlockIndexToBlockReference converts an integer index of a block in the underlying BlockList to a BlockReference. The BlockReference is a stable referennce to this block that remains valid after locks are dropped.

func (*OldCurrentNewLocationBlobMap) BlockReferenceToBlockIndex

func (lbm *OldCurrentNewLocationBlobMap) BlockReferenceToBlockIndex(blockReference BlockReference) (int, uint64, bool)

BlockReferenceToBlockIndex converts a BlockReference that contains a stable reference to a block to an integer index. The integer index corresponds to the current location of the block in the underlying BlockList.

func (*OldCurrentNewLocationBlobMap) Get

Get information about a blob based on its Location. A LocationBlobGetter is returned that can be used to fetch the blob's contents.

func (*OldCurrentNewLocationBlobMap) Put

Put a new blob of a given size to storage.

type PeriodicSyncer

type PeriodicSyncer struct {
	// contains filtered or unexported fields
}

PeriodicSyncer can be used to monitor PersistentBlockList for writes and block releases. When such events occur, the state of the PersistentBlockList is extracted and written to disk. This allows its contents to be recovered after a restart.

func NewPeriodicSyncer

func NewPeriodicSyncer(source PersistentStateSource, sourceLock *sync.RWMutex, store PersistentStateStore, clock clock.Clock, errorLogger util.ErrorLogger, errorRetryInterval, minimumEpochInterval time.Duration, keyLocationMapHashInitialization uint64, dataSyncer DataSyncer) *PeriodicSyncer

NewPeriodicSyncer creates a new PeriodicSyncer according to the arguments provided.

func (*PeriodicSyncer) ProcessBlockPut

func (ps *PeriodicSyncer) ProcessBlockPut(ctx context.Context) bool

ProcessBlockPut waits for writes to occur against a block managed by a PersistentBlockList. It causes data on the underlying block device to be synchronized after a certain amount of time, followed by updating the persistent state stored on disk.

This function must generally be called in a loop in a separate goroutine, so that the persistent state is updated continuously. The return value of this method denotes whether the caller must continue to call this method. When false, it indicates the provided context was cancelled, due to a shutdown being requested.

func (*PeriodicSyncer) ProcessBlockRelease

func (ps *PeriodicSyncer) ProcessBlockRelease()

ProcessBlockRelease waits for a single block to be released by a PersistentBlockList. It causes the persistent state of the PersistentBlockList to be extracted and written to a file.

This function must generally be called in a loop in a separate goroutine, so that block release events are handled continuously.

type PersistentBlockList

type PersistentBlockList struct {
	// contains filtered or unexported fields
}

PersistentBlockList is an implementation of BlockList whose internal state can be extracted and persisted. This allows data contained in its blocks to be accessed after restarts.

func NewPersistentBlockList

func NewPersistentBlockList(blockAllocator BlockAllocator, initialOldestEpochID uint32, initialBlocks []*pb.BlockState) (*PersistentBlockList, int)

NewPersistentBlockList provides an implementation of BlockList whose state can be persisted. This makes it possible to preserve the contents of FlatBlobAccess and HierarchicalCASBlobAccess across restarts.

func (*PersistentBlockList) BlockIndexToBlockReference

func (bl *PersistentBlockList) BlockIndexToBlockReference(blockIndex int) (BlockReference, uint64)

BlockIndexToBlockReference converts the index of a block to a BlockReference that uses the latest epoch ID.

func (*PersistentBlockList) BlockReferenceToBlockIndex

func (bl *PersistentBlockList) BlockReferenceToBlockIndex(blockReference BlockReference) (int, uint64, bool)

BlockReferenceToBlockIndex converts a BlockReference to the index of the block in the BlockList. This conversion may fail if the block has already been released using PopFront().

func (*PersistentBlockList) Get

func (bl *PersistentBlockList) Get(index int, digest digest.Digest, offsetBytes, sizeBytes int64, dataIntegrityCallback buffer.DataIntegrityCallback) buffer.Buffer

Get data from one of the blocks managed by this BlockList.

func (*PersistentBlockList) GetBlockPutWakeup

func (bl *PersistentBlockList) GetBlockPutWakeup() <-chan struct{}

GetBlockPutWakeup returns a channel that triggers when there was data stored in one of the blocks since the last persistent state was written to disk.

func (*PersistentBlockList) GetBlockReleaseWakeup

func (bl *PersistentBlockList) GetBlockReleaseWakeup() <-chan struct{}

GetBlockReleaseWakeup returns a channel that triggers when there are one or more blocks that have been released since the last persistent state was written to disk.

func (*PersistentBlockList) GetPersistentState

func (bl *PersistentBlockList) GetPersistentState() (uint32, []*pb.BlockState)

GetPersistentState returns information that needs to be persisted to disk to be able to restore the layout of the BlockList after a restart.

func (*PersistentBlockList) HasSpace

func (bl *PersistentBlockList) HasSpace(index int, sizeBytes int64) bool

HasSpace returns whether a block with a given index has sufficient space to store a blob of a given size.

func (*PersistentBlockList) NotifyPersistentStateWritten

func (bl *PersistentBlockList) NotifyPersistentStateWritten()

NotifyPersistentStateWritten needs to be called after the data returned by GetPersistentState() is written to disk. This allows PersistentBlockList to recycle blocks that were used previously.

func (*PersistentBlockList) NotifySyncCompleted

func (bl *PersistentBlockList) NotifySyncCompleted()

NotifySyncCompleted needs to be called right after the data on the storage medium underneath the BlockAllocator is synchronized. This causes the next call to GetPersistentState() to return information on the newly synchronized data.

func (*PersistentBlockList) NotifySyncStarting

func (bl *PersistentBlockList) NotifySyncStarting(isFinalSync bool)

NotifySyncStarting needs to be called right before the data on the storage medium underneath the BlockAllocator is synchronized. This causes the epoch ID to be increased when the next blob is stored.

func (*PersistentBlockList) PopFront

func (bl *PersistentBlockList) PopFront()

PopFront removes the oldest block from the BlockList, having index zero.

func (*PersistentBlockList) PushBack

func (bl *PersistentBlockList) PushBack() error

PushBack appends a new block to the BlockList. The block is obtained by calling into the underlying BlockAllocator.

func (*PersistentBlockList) Put

func (bl *PersistentBlockList) Put(index int, sizeBytes int64) BlockListPutWriter

Put data into a block managed by the BlockList.

type PersistentStateSource

type PersistentStateSource interface {
	// GetBlockReleaseWakeup returns a channel that triggers if one
	// or more blocks have been released from a BlockList.
	// When triggered, PeriodicSyncer will attempt to update the
	// persistent state immediately.
	//
	// This function must be called while holding a read lock on the
	// BlockList.
	GetBlockReleaseWakeup() <-chan struct{}

	// GetBlockPutWakeup returns a channel that triggers if one or
	// more blobs have been written to a BlockList. This can be used
	// by PeriodicSyncer to synchronize data to storage.
	// PeriodicSyncer may apply a short delay before actually
	// synchronize data to perform some batching.
	//
	// This function must be called while holding a read lock on the
	// BlockList.
	GetBlockPutWakeup() <-chan struct{}

	// NotifySyncStarting instructs the BlockList that
	// PeriodicSyncer is about to synchronize data to storage.
	// Successive writes to the BlockList should use a new epoch ID,
	// as there is no guarantee their data is synchronized as part
	// of the current epoch.
	//
	// This function must be called while holding a write lock on
	// the BlockList.
	NotifySyncStarting(isFinalSync bool)

	// NotifySyncCompleted instructs the BlockList that the
	// synchronization performed after the last call to
	// NotifySyncStarting was successful.
	//
	// Future calls to GetPersistentState may now return information
	// about blocks and epochs that were created before the previous
	// NotifySyncStarting call.
	//
	// Calling this function may cause the next channel returned by
	// GetBlockPutWakeup to block once again.
	//
	// This function must be called while holding a write lock on
	// the BlockList.
	NotifySyncCompleted()

	// GetPersistentState returns information about all blocks and
	// epochs that are managed by the BlockList and have been
	// synchronized to storage successfully.
	//
	// This function must be called while holding a read lock on the
	// BlockList.
	GetPersistentState() (uint32, []*pb.BlockState)

	// NotifyPersistentStateWritten instructs the BlockList that the
	// data returned by the last call to GetPersistentState was
	// stored successfully.
	//
	// This call allows the BlockList to recycle blocks that were
	// used previously, but were still part of the persistent state
	// written to disk.
	//
	// Calling this function may cause the next channel returned by
	// GetBlockReleaseWakeup to block once again.
	//
	// This function must be called while holding a write lock on
	// the BlockList.
	NotifyPersistentStateWritten()
}

PersistentStateSource is used by PeriodicSyncer to determine whether the persistent state file needs to update, and if so which contents it needs to hold.

type PersistentStateStore

type PersistentStateStore interface {
	ReadPersistentState() (*pb.PersistentState, error)
	WritePersistentState(persistentState *pb.PersistentState) error
}

PersistentStateStore is used by PeriodicSyncer to write PersistentBlockList's state to disk. This state can be reloaded on startup to make it possible to access data that was written in the past.

func NewDirectoryBackedPersistentStateStore

func NewDirectoryBackedPersistentStateStore(directory filesystem.Directory) PersistentStateStore

NewDirectoryBackedPersistentStateStore creates a PersistentStateStore that writes PersistentState Protobuf messages to a file named "state" stored inside a filesystem.Directory.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL