Documentation ¶
Overview ¶
Package chunk provides access to data through content-addressed chunks.
A chunk is the basic unit of storage for data. Chunks are identified by their content-address, which is a hash of the data stored in the chunk. There are two mechanisms for uploading chunks: uploader and batcher. The uploader is intended for medium / large data entries since it performs content-defined chunking on the data and stores each data entry in its own set of chunks. The batcher is intended for small data entries since it batches multiple data entries into larger chunks. The result of each of these upload methods is a list of data references for each data entry. These data references can be used to access the corresponding data through readers.
Index ¶
- Constants
- Variables
- func ComputeChunks(r io.Reader, cb func([]byte) error) error
- func Get(ctx context.Context, client Client, ref *Ref) ([]byte, error)
- func NewPostgresKeyStore(db *pachsql.DB) *postgresKeyStore
- func SetupPostgresStoreV0(ctx context.Context, tx *pachsql.Tx) error
- type Batcher
- type BatcherOption
- type ChunkFunc
- type Client
- type CompressionAlgo
- func (CompressionAlgo) Descriptor() protoreflect.EnumDescriptor
- func (x CompressionAlgo) Enum() *CompressionAlgo
- func (CompressionAlgo) EnumDescriptor() ([]byte, []int)deprecated
- func (x CompressionAlgo) Number() protoreflect.EnumNumber
- func (x CompressionAlgo) String() string
- func (CompressionAlgo) Type() protoreflect.EnumType
- type CreateOptions
- type DataReader
- type DataRef
- func (*DataRef) Descriptor() ([]byte, []int)deprecated
- func (x *DataRef) GetHash() []byte
- func (x *DataRef) GetOffsetBytes() int64
- func (x *DataRef) GetRef() *Ref
- func (x *DataRef) GetSizeBytes() int64
- func (x *DataRef) MarshalLogObject(enc zapcore.ObjectEncoder) error
- func (*DataRef) ProtoMessage()
- func (x *DataRef) ProtoReflect() protoreflect.Message
- func (x *DataRef) Reset()
- func (x *DataRef) String() string
- func (m *DataRef) Validate() error
- func (m *DataRef) ValidateAll() error
- type DataRefMultiError
- type DataRefValidationError
- type EncryptionAlgo
- func (EncryptionAlgo) Descriptor() protoreflect.EnumDescriptor
- func (x EncryptionAlgo) Enum() *EncryptionAlgo
- func (EncryptionAlgo) EnumDescriptor() ([]byte, []int)deprecated
- func (x EncryptionAlgo) Number() protoreflect.EnumNumber
- func (x EncryptionAlgo) String() string
- func (EncryptionAlgo) Type() protoreflect.EnumType
- type Entry
- type EntryFunc
- type GarbageCollector
- type ID
- type KeyStore
- type Metadata
- type Reader
- type ReaderOption
- type Ref
- func (*Ref) Descriptor() ([]byte, []int)deprecated
- func (x *Ref) GetCompressionAlgo() CompressionAlgo
- func (x *Ref) GetDek() []byte
- func (x *Ref) GetEdge() bool
- func (x *Ref) GetEncryptionAlgo() EncryptionAlgo
- func (x *Ref) GetId() []byte
- func (x *Ref) GetSizeBytes() int64
- func (r *Ref) Key() pachhash.Output
- func (x *Ref) MarshalLogObject(enc zapcore.ObjectEncoder) error
- func (*Ref) ProtoMessage()
- func (x *Ref) ProtoReflect() protoreflect.Message
- func (x *Ref) Reset()
- func (x *Ref) String() string
- func (m *Ref) Validate() error
- func (m *Ref) ValidateAll() error
- type RefMultiError
- type RefValidationError
- type Renewer
- type Storage
- func (s *Storage) Check(ctx context.Context, begin, end []byte, readChunks bool) (int, error)
- func (s *Storage) ListStore(ctx context.Context, cb func(id ID, gen uint64) error) error
- func (s *Storage) NewBatcher(ctx context.Context, name string, threshold int, opts ...BatcherOption) *Batcher
- func (s *Storage) NewDataReader(ctx context.Context, dataRef *DataRef) *DataReader
- func (s *Storage) NewDeleter() track.Deleter
- func (s *Storage) NewReader(ctx context.Context, dataRefs []*DataRef, opts ...ReaderOption) *Reader
- func (s *Storage) NewUploader(ctx context.Context, name string, noUpload bool, cb UploadFunc) *Uploader
- func (s *Storage) PrefetchData(ctx context.Context, dataRef *DataRef) error
- type StorageOption
- type UploadFunc
- type Uploader
Constants ¶
const ( DefaultAverageBits = 23 DefaultSeed = 1 DefaultMinChunkSize = 1 * units.MB DefaultMaxChunkSize = 20 * units.MB )
const ( // TrackerPrefix is the prefix used when creating tracker objects for chunks TrackerPrefix = "chunk/" DefaultPrefetchLimit = 10 )
const (
// WindowSize is the size of the rolling hash window.
WindowSize = 64
)
Variables ¶
var ( CompressionAlgo_name = map[int32]string{ 0: "NONE", 1: "GZIP_BEST_SPEED", } CompressionAlgo_value = map[string]int32{ "NONE": 0, "GZIP_BEST_SPEED": 1, } )
Enum value maps for CompressionAlgo.
var ( EncryptionAlgo_name = map[int32]string{ 0: "ENCRYPTION_ALGO_UNKNOWN", 1: "CHACHA20", } EncryptionAlgo_value = map[string]int32{ "ENCRYPTION_ALGO_UNKNOWN": 0, "CHACHA20": 1, } )
Enum value maps for EncryptionAlgo.
var File_internal_storage_chunk_chunk_proto protoreflect.FileDescriptor
Functions ¶
func ComputeChunks ¶
ComputeChunks splits a stream of bytes into chunks using a content-defined chunking algorithm. To prevent suboptimal chunk sizes, a minimum and maximum chunk size is enforced. This algorithm is useful for ensuring that typical data modifications (insertions, deletions, updates) only affect a small number of chunks. TODO: Expose configuration.
func Get ¶
Get calls client.Get to retrieve a chunk, then verifies, decrypts, and decompresses the data. the uncompressed plaintext, is returned.
func NewPostgresKeyStore ¶
Types ¶
type Batcher ¶
type Batcher struct {
// contains filtered or unexported fields
}
Batcher batches entries into chunks. Entries are buffered until they are past the configured threshold, then a chunk is created. Chunk creation is asynchronous with respect to the client, which is why the interface is callback based. Batcher provides one of two callback based interfaces defined by ChunkFunc and EntryFunc. Callbacks will be executed with respect to the order the entries are added (for the ChunkFunc interface, entries are ordered within as well as across calls).
type BatcherOption ¶
type BatcherOption func(b *Batcher)
func WithChunkCallback ¶
func WithChunkCallback(cb ChunkFunc) BatcherOption
func WithEntryCallback ¶
func WithEntryCallback(cb EntryFunc) BatcherOption
type ChunkFunc ¶
ChunkFunc is a function that provides the metadata for the entries in a chunk and a data reference to the chunk.
type Client ¶
type Client interface { Create(ctx context.Context, md Metadata, chunkData []byte) (ID, error) Get(ctx context.Context, chunkID ID, cb kv.ValueCallback) error Close() error }
Client mediates access to a content-addressed store
type CompressionAlgo ¶
type CompressionAlgo int32
const ( CompressionAlgo_NONE CompressionAlgo = 0 CompressionAlgo_GZIP_BEST_SPEED CompressionAlgo = 1 )
func (CompressionAlgo) Descriptor ¶ added in v2.7.0
func (CompressionAlgo) Descriptor() protoreflect.EnumDescriptor
func (CompressionAlgo) Enum ¶ added in v2.7.0
func (x CompressionAlgo) Enum() *CompressionAlgo
func (CompressionAlgo) EnumDescriptor
deprecated
func (CompressionAlgo) EnumDescriptor() ([]byte, []int)
Deprecated: Use CompressionAlgo.Descriptor instead.
func (CompressionAlgo) Number ¶ added in v2.7.0
func (x CompressionAlgo) Number() protoreflect.EnumNumber
func (CompressionAlgo) String ¶
func (x CompressionAlgo) String() string
func (CompressionAlgo) Type ¶ added in v2.7.0
func (CompressionAlgo) Type() protoreflect.EnumType
type CreateOptions ¶
type CreateOptions struct { Secret []byte Compression CompressionAlgo }
CreateOptions affect how chunks are created.
type DataReader ¶
type DataReader struct {
// contains filtered or unexported fields
}
DataReader is an abstraction that lazily reads data referenced by a data reference.
type DataRef ¶
type DataRef struct { // The chunk the referenced data is located in. Ref *Ref `protobuf:"bytes,1,opt,name=ref,proto3" json:"ref,omitempty"` // The hash of the data being referenced. Hash []byte `protobuf:"bytes,2,opt,name=hash,proto3" json:"hash,omitempty"` // The offset and size used for accessing the data within the chunk. OffsetBytes int64 `protobuf:"varint,3,opt,name=offset_bytes,json=offsetBytes,proto3" json:"offset_bytes,omitempty"` SizeBytes int64 `protobuf:"varint,4,opt,name=size_bytes,json=sizeBytes,proto3" json:"size_bytes,omitempty"` // contains filtered or unexported fields }
DataRef is a reference to data within a chunk.
func NewDataRef ¶
func (*DataRef) Descriptor
deprecated
func (*DataRef) GetOffsetBytes ¶
func (*DataRef) GetSizeBytes ¶
func (*DataRef) MarshalLogObject ¶
func (x *DataRef) MarshalLogObject(enc zapcore.ObjectEncoder) error
func (*DataRef) ProtoMessage ¶
func (*DataRef) ProtoMessage()
func (*DataRef) ProtoReflect ¶ added in v2.7.0
func (x *DataRef) ProtoReflect() protoreflect.Message
func (*DataRef) Validate ¶ added in v2.8.0
Validate checks the field values on DataRef with the rules defined in the proto definition for this message. If any rules are violated, the first error encountered is returned, or nil if there are no violations.
func (*DataRef) ValidateAll ¶ added in v2.8.0
ValidateAll checks the field values on DataRef with the rules defined in the proto definition for this message. If any rules are violated, the result is a list of violation errors wrapped in DataRefMultiError, or nil if none found.
type DataRefMultiError ¶ added in v2.8.0
type DataRefMultiError []error
DataRefMultiError is an error wrapping multiple validation errors returned by DataRef.ValidateAll() if the designated constraints aren't met.
func (DataRefMultiError) AllErrors ¶ added in v2.8.0
func (m DataRefMultiError) AllErrors() []error
AllErrors returns a list of validation violation errors.
func (DataRefMultiError) Error ¶ added in v2.8.0
func (m DataRefMultiError) Error() string
Error returns a concatenation of all the error messages it wraps.
type DataRefValidationError ¶ added in v2.8.0
type DataRefValidationError struct {
// contains filtered or unexported fields
}
DataRefValidationError is the validation error returned by DataRef.Validate if the designated constraints aren't met.
func (DataRefValidationError) Cause ¶ added in v2.8.0
func (e DataRefValidationError) Cause() error
Cause function returns cause value.
func (DataRefValidationError) Error ¶ added in v2.8.0
func (e DataRefValidationError) Error() string
Error satisfies the builtin error interface
func (DataRefValidationError) ErrorName ¶ added in v2.8.0
func (e DataRefValidationError) ErrorName() string
ErrorName returns error name.
func (DataRefValidationError) Field ¶ added in v2.8.0
func (e DataRefValidationError) Field() string
Field function returns field value.
func (DataRefValidationError) Key ¶ added in v2.8.0
func (e DataRefValidationError) Key() bool
Key function returns key value.
func (DataRefValidationError) Reason ¶ added in v2.8.0
func (e DataRefValidationError) Reason() string
Reason function returns reason value.
type EncryptionAlgo ¶
type EncryptionAlgo int32
const ( EncryptionAlgo_ENCRYPTION_ALGO_UNKNOWN EncryptionAlgo = 0 EncryptionAlgo_CHACHA20 EncryptionAlgo = 1 )
func (EncryptionAlgo) Descriptor ¶ added in v2.7.0
func (EncryptionAlgo) Descriptor() protoreflect.EnumDescriptor
func (EncryptionAlgo) Enum ¶ added in v2.7.0
func (x EncryptionAlgo) Enum() *EncryptionAlgo
func (EncryptionAlgo) EnumDescriptor
deprecated
func (EncryptionAlgo) EnumDescriptor() ([]byte, []int)
Deprecated: Use EncryptionAlgo.Descriptor instead.
func (EncryptionAlgo) Number ¶ added in v2.7.0
func (x EncryptionAlgo) Number() protoreflect.EnumNumber
func (EncryptionAlgo) String ¶
func (x EncryptionAlgo) String() string
func (EncryptionAlgo) Type ¶ added in v2.7.0
func (EncryptionAlgo) Type() protoreflect.EnumType
type Entry ¶
type Entry struct { ChunkID ID `db:"chunk_id"` Gen uint64 `db:"gen"` Uploaded bool `db:"uploaded"` Tombstone bool `db:"tombstone"` }
Entry is an chunk object mapping
type EntryFunc ¶
EntryFunc is a function that provides the metadata for an entry and a data reference to the entry in a chunk. Size zero entries will have a nil data reference.
type GarbageCollector ¶
type GarbageCollector struct {
// contains filtered or unexported fields
}
GarbageCollector removes unused chunks from object storage
func NewGC ¶
func NewGC(s *Storage, d time.Duration) *GarbageCollector
NewGC returns a new garbage collector operating on s
func (*GarbageCollector) RunForever ¶
func (gc *GarbageCollector) RunForever(ctx context.Context) error
RunForever calls RunOnce until the context is cancelled, logging any errors.
type ID ¶
type ID []byte
ID uniquely identifies a chunk. It is the hash of its content
func ParseTrackerID ¶
ParseTrackerID parses a trackerID into a chunk
type KeyStore ¶
type KeyStore interface { Create(ctx context.Context, name string, data []byte) error Get(ctx context.Context, name string) ([]byte, error) }
KeyStore is a store for named secret keys
type Reader ¶
type Reader struct {
// contains filtered or unexported fields
}
Reader reads data from chunk storage.
type ReaderOption ¶
type ReaderOption func(*Reader)
func WithOffsetBytes ¶
func WithOffsetBytes(offsetBytes int64) ReaderOption
func WithPrefetchLimit ¶
func WithPrefetchLimit(limit int) ReaderOption
type Ref ¶
type Ref struct { Id []byte `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` SizeBytes int64 `protobuf:"varint,2,opt,name=size_bytes,json=sizeBytes,proto3" json:"size_bytes,omitempty"` Edge bool `protobuf:"varint,3,opt,name=edge,proto3" json:"edge,omitempty"` Dek []byte `protobuf:"bytes,4,opt,name=dek,proto3" json:"dek,omitempty"` EncryptionAlgo EncryptionAlgo `` /* 130-byte string literal not displayed */ CompressionAlgo CompressionAlgo `` /* 134-byte string literal not displayed */ // contains filtered or unexported fields }
func Create ¶
func Create(ctx context.Context, opts CreateOptions, ptext []byte, createFunc func(ctx context.Context, data []byte) (ID, error)) (*Ref, error)
Create calls createFunc to create a new chunk, but first compresses, and encrypts ptext. ptext will not be modified.
func (*Ref) Descriptor
deprecated
func (*Ref) GetCompressionAlgo ¶
func (x *Ref) GetCompressionAlgo() CompressionAlgo
func (*Ref) GetEncryptionAlgo ¶
func (x *Ref) GetEncryptionAlgo() EncryptionAlgo
func (*Ref) GetSizeBytes ¶
func (*Ref) MarshalLogObject ¶
func (x *Ref) MarshalLogObject(enc zapcore.ObjectEncoder) error
func (*Ref) ProtoMessage ¶
func (*Ref) ProtoMessage()
func (*Ref) ProtoReflect ¶ added in v2.7.0
func (x *Ref) ProtoReflect() protoreflect.Message
func (*Ref) Validate ¶ added in v2.8.0
Validate checks the field values on Ref with the rules defined in the proto definition for this message. If any rules are violated, the first error encountered is returned, or nil if there are no violations.
func (*Ref) ValidateAll ¶ added in v2.8.0
ValidateAll checks the field values on Ref with the rules defined in the proto definition for this message. If any rules are violated, the result is a list of violation errors wrapped in RefMultiError, or nil if none found.
type RefMultiError ¶ added in v2.8.0
type RefMultiError []error
RefMultiError is an error wrapping multiple validation errors returned by Ref.ValidateAll() if the designated constraints aren't met.
func (RefMultiError) AllErrors ¶ added in v2.8.0
func (m RefMultiError) AllErrors() []error
AllErrors returns a list of validation violation errors.
func (RefMultiError) Error ¶ added in v2.8.0
func (m RefMultiError) Error() string
Error returns a concatenation of all the error messages it wraps.
type RefValidationError ¶ added in v2.8.0
type RefValidationError struct {
// contains filtered or unexported fields
}
RefValidationError is the validation error returned by Ref.Validate if the designated constraints aren't met.
func (RefValidationError) Cause ¶ added in v2.8.0
func (e RefValidationError) Cause() error
Cause function returns cause value.
func (RefValidationError) Error ¶ added in v2.8.0
func (e RefValidationError) Error() string
Error satisfies the builtin error interface
func (RefValidationError) ErrorName ¶ added in v2.8.0
func (e RefValidationError) ErrorName() string
ErrorName returns error name.
func (RefValidationError) Field ¶ added in v2.8.0
func (e RefValidationError) Field() string
Field function returns field value.
func (RefValidationError) Key ¶ added in v2.8.0
func (e RefValidationError) Key() bool
Key function returns key value.
func (RefValidationError) Reason ¶ added in v2.8.0
func (e RefValidationError) Reason() string
Reason function returns reason value.
type Storage ¶
type Storage struct {
// contains filtered or unexported fields
}
Storage is an abstraction for interfacing with chunk storage. A storage instance: - Provides methods for uploading data to chunks and downloading data from chunks. - Manages tracker state to keep chunks alive while uploading. - Manages an internal chunk cache and work deduplicator (parallel downloads of the same chunk will be deduplicated).
func NewStorage ¶
func NewStorage(store kv.Store, db *pachsql.DB, tracker track.Tracker, opts ...StorageOption) *Storage
NewStorage creates a new Storage.
func NewTestStorage ¶
func NewTestStorage(t testing.TB, db *pachsql.DB, tr track.Tracker, opts ...StorageOption) (kv.Store, *Storage)
NewTestStorage creates a local storage instance for testing during the lifetime of the callback.
func (*Storage) Check ¶
Check runs an integrity check on the objects in object storage. It will check objects for chunks with IDs in the range [first, last) As a special case: if len(end) == 0 then it is ignored.
func (*Storage) ListStore ¶ added in v2.7.0
ListStore lists all of the chunks in object storage. This is not the same as listing the chunk entries in the database.
func (*Storage) NewBatcher ¶
func (s *Storage) NewBatcher(ctx context.Context, name string, threshold int, opts ...BatcherOption) *Batcher
TODO: Add config for number of entries.
func (*Storage) NewDataReader ¶
func (s *Storage) NewDataReader(ctx context.Context, dataRef *DataRef) *DataReader
func (*Storage) NewDeleter ¶
NewDeleter creates a deleter for use with a tracker.GC
func (*Storage) NewUploader ¶
type StorageOption ¶
type StorageOption func(s *Storage)
StorageOption configures a storage.
func WithCompression ¶
func WithCompression(algo CompressionAlgo) StorageOption
WithCompression sets the compression algorithm used to compress chunks
func WithMemoryCacheSize ¶ added in v2.7.0
func WithMemoryCacheSize(size int) StorageOption
WithMemoryCacheSize sets the number of decrypted, uncompressed chunks that will be stored in memory.
func WithSecret ¶
func WithSecret(secret []byte) StorageOption
WithSecret sets the secret used to generate chunk encryption keys
type UploadFunc ¶
UploadFunc is a function that provides the metadata for a task and the corresponding set of chunk references.
type Uploader ¶
type Uploader struct {
// contains filtered or unexported fields
}
Uploader uploads chunks. Each upload call creates at least one upload task with the provided metadata. Upload tasks are performed asynchronously, which is why the interface is callback based. Callbacks will be executed with respect to the order the upload tasks are created.
func (*Uploader) Copy ¶
Copy performs an upload using a list of data references as the data source. Stable data references will be reused. Unstable data references will have their data downloaded and uploaded similar to a normal upload.