Documentation
¶
Overview ¶
Package chunk provides access to data through content-addressed chunks.
A chunk is the basic unit of storage for data. Chunks are identified by their content-address, which is a hash of the data stored in the chunk. There are two mechanisms for uploading chunks: uploader and batcher. The uploader is intended for medium / large data entries since it performs content-defined chunking on the data and stores each data entry in its own set of chunks. The batcher is intended for small data entries since it batches multiple data entries into larger chunks. The result of each of these upload methods is a list of data references for each data entry. These data references can be used to access the corresponding data through readers.
Index ¶
- Constants
- Variables
- func ComputeChunks(r io.Reader, cb func([]byte) error) error
- func Get(ctx context.Context, client Client, ref *Ref, cb kv.ValueCallback) error
- func NewPostgresKeyStore(db *pachsql.DB) *postgresKeyStore
- func SetupPostgresStoreV0(tx *pachsql.Tx) error
- type Batcher
- type BatcherOption
- type ChunkFunc
- type Client
- type CompressionAlgo
- type CreateOptions
- type DataReader
- type DataRef
- func (*DataRef) Descriptor() ([]byte, []int)
- func (m *DataRef) GetHash() []byte
- func (m *DataRef) GetOffsetBytes() int64
- func (m *DataRef) GetRef() *Ref
- func (m *DataRef) GetSizeBytes() int64
- func (m *DataRef) Marshal() (dAtA []byte, err error)
- func (x *DataRef) MarshalLogObject(enc zapcore.ObjectEncoder) error
- func (m *DataRef) MarshalTo(dAtA []byte) (int, error)
- func (m *DataRef) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*DataRef) ProtoMessage()
- func (m *DataRef) Reset()
- func (m *DataRef) Size() (n int)
- func (m *DataRef) String() string
- func (m *DataRef) Unmarshal(dAtA []byte) error
- func (m *DataRef) XXX_DiscardUnknown()
- func (m *DataRef) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *DataRef) XXX_Merge(src proto.Message)
- func (m *DataRef) XXX_Size() int
- func (m *DataRef) XXX_Unmarshal(b []byte) error
- type EncryptionAlgo
- type Entry
- type EntryFunc
- type GarbageCollector
- type ID
- type KeyStore
- type Metadata
- type Reader
- type ReaderOption
- type Ref
- func (*Ref) Descriptor() ([]byte, []int)
- func (m *Ref) GetCompressionAlgo() CompressionAlgo
- func (m *Ref) GetDek() []byte
- func (m *Ref) GetEdge() bool
- func (m *Ref) GetEncryptionAlgo() EncryptionAlgo
- func (m *Ref) GetId() []byte
- func (m *Ref) GetSizeBytes() int64
- func (r *Ref) Key() pachhash.Output
- func (m *Ref) Marshal() (dAtA []byte, err error)
- func (x *Ref) MarshalLogObject(enc zapcore.ObjectEncoder) error
- func (m *Ref) MarshalTo(dAtA []byte) (int, error)
- func (m *Ref) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*Ref) ProtoMessage()
- func (m *Ref) Reset()
- func (m *Ref) Size() (n int)
- func (m *Ref) String() string
- func (m *Ref) Unmarshal(dAtA []byte) error
- func (m *Ref) XXX_DiscardUnknown()
- func (m *Ref) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *Ref) XXX_Merge(src proto.Message)
- func (m *Ref) XXX_Size() int
- func (m *Ref) XXX_Unmarshal(b []byte) error
- type Renewer
- type Storage
- func (s *Storage) Check(ctx context.Context, begin, end []byte, readChunks bool) (int, error)
- func (s *Storage) List(ctx context.Context, cb func(id ID) error) error
- func (s *Storage) NewBatcher(ctx context.Context, name string, threshold int, opts ...BatcherOption) *Batcher
- func (s *Storage) NewDataReader(ctx context.Context, dataRef *DataRef) *DataReader
- func (s *Storage) NewDeleter() track.Deleter
- func (s *Storage) NewReader(ctx context.Context, dataRefs []*DataRef, opts ...ReaderOption) *Reader
- func (s *Storage) NewUploader(ctx context.Context, name string, noUpload bool, cb UploadFunc) *Uploader
- func (s *Storage) PrefetchData(ctx context.Context, dataRef *DataRef) error
- type StorageOption
- func StorageOptions(conf *serviceenv.StorageConfiguration) ([]StorageOption, error)
- func WithCompression(algo CompressionAlgo) StorageOption
- func WithMaxConcurrentObjects(maxDownload, maxUpload int) StorageOption
- func WithObjectCache(fastLayer obj.Client, size int) StorageOption
- func WithSecret(secret []byte) StorageOption
- type UploadFunc
- type Uploader
Constants ¶
const ( DefaultAverageBits = 23 DefaultSeed = 1 DefaultMinChunkSize = 1 * units.MB DefaultMaxChunkSize = 20 * units.MB )
const ( // TrackerPrefix is the prefix used when creating tracker objects for chunks TrackerPrefix = "chunk/" DefaultPrefetchLimit = 10 )
const (
// WindowSize is the size of the rolling hash window.
WindowSize = 64
)
Variables ¶
var ( ErrInvalidLengthChunk = fmt.Errorf("proto: negative length found during unmarshaling") ErrIntOverflowChunk = fmt.Errorf("proto: integer overflow") ErrUnexpectedEndOfGroupChunk = fmt.Errorf("proto: unexpected end of group") )
var CompressionAlgo_name = map[int32]string{
0: "NONE",
1: "GZIP_BEST_SPEED",
}
var CompressionAlgo_value = map[string]int32{
"NONE": 0,
"GZIP_BEST_SPEED": 1,
}
var EncryptionAlgo_name = map[int32]string{
0: "ENCRYPTION_ALGO_UNKNOWN",
1: "CHACHA20",
}
var EncryptionAlgo_value = map[string]int32{
"ENCRYPTION_ALGO_UNKNOWN": 0,
"CHACHA20": 1,
}
Functions ¶
func ComputeChunks ¶
ComputeChunks splits a stream of bytes into chunks using a content-defined chunking algorithm. To prevent suboptimal chunk sizes, a minimum and maximum chunk size is enforced. This algorithm is useful for ensuring that typical data modifications (insertions, deletions, updates) only affect a small number of chunks. TODO: Expose configuration.
func Get ¶
Get calls client.Get to retrieve a chunk, then verifies, decrypts, and decompresses the data. cb is called with the uncompressed plaintext
func NewPostgresKeyStore ¶
func SetupPostgresStoreV0 ¶
SetupPostgresStoreV0 sets up tables in db DO NOT MODIFY THIS FUNCTION IT HAS BEEN USED IN A RELEASED MIGRATION
Types ¶
type Batcher ¶
type Batcher struct {
// contains filtered or unexported fields
}
Batcher batches entries into chunks. Entries are buffered until they are past the configured threshold, then a chunk is created. Chunk creation is asynchronous with respect to the client, which is why the interface is callback based. Batcher provides one of two callback based interfaces defined by ChunkFunc and EntryFunc. Callbacks will be executed with respect to the order the entries are added (for the ChunkFunc interface, entries are ordered within as well as across calls).
type BatcherOption ¶
type BatcherOption func(b *Batcher)
func WithChunkCallback ¶
func WithChunkCallback(cb ChunkFunc) BatcherOption
func WithEntryCallback ¶
func WithEntryCallback(cb EntryFunc) BatcherOption
type ChunkFunc ¶
ChunkFunc is a function that provides the metadata for the entries in a chunk and a data reference to the chunk.
type Client ¶
type Client interface { Create(ctx context.Context, md Metadata, chunkData []byte) (ID, error) Get(ctx context.Context, chunkID ID, cb kv.ValueCallback) error Close() error }
Client mediates access to a content-addressed store
type CompressionAlgo ¶
type CompressionAlgo int32
const ( CompressionAlgo_NONE CompressionAlgo = 0 CompressionAlgo_GZIP_BEST_SPEED CompressionAlgo = 1 )
func (CompressionAlgo) EnumDescriptor ¶
func (CompressionAlgo) EnumDescriptor() ([]byte, []int)
func (CompressionAlgo) String ¶
func (x CompressionAlgo) String() string
type CreateOptions ¶
type CreateOptions struct { Secret []byte Compression CompressionAlgo }
CreateOptions affect how chunks are created.
type DataReader ¶
type DataReader struct {
// contains filtered or unexported fields
}
DataReader is an abstraction that lazily reads data referenced by a data reference.
type DataRef ¶
type DataRef struct { // The chunk the referenced data is located in. Ref *Ref `protobuf:"bytes,1,opt,name=ref,proto3" json:"ref,omitempty"` // The hash of the data being referenced. Hash []byte `protobuf:"bytes,2,opt,name=hash,proto3" json:"hash,omitempty"` // The offset and size used for accessing the data within the chunk. OffsetBytes int64 `protobuf:"varint,3,opt,name=offset_bytes,json=offsetBytes,proto3" json:"offset_bytes,omitempty"` SizeBytes int64 `protobuf:"varint,4,opt,name=size_bytes,json=sizeBytes,proto3" json:"size_bytes,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
DataRef is a reference to data within a chunk.
func NewDataRef ¶
func (*DataRef) Descriptor ¶
func (*DataRef) GetOffsetBytes ¶
func (*DataRef) GetSizeBytes ¶
func (*DataRef) MarshalLogObject ¶
func (x *DataRef) MarshalLogObject(enc zapcore.ObjectEncoder) error
func (*DataRef) MarshalToSizedBuffer ¶
func (*DataRef) ProtoMessage ¶
func (*DataRef) ProtoMessage()
func (*DataRef) XXX_DiscardUnknown ¶
func (m *DataRef) XXX_DiscardUnknown()
func (*DataRef) XXX_Marshal ¶
func (*DataRef) XXX_Unmarshal ¶
type EncryptionAlgo ¶
type EncryptionAlgo int32
const ( EncryptionAlgo_ENCRYPTION_ALGO_UNKNOWN EncryptionAlgo = 0 EncryptionAlgo_CHACHA20 EncryptionAlgo = 1 )
func (EncryptionAlgo) EnumDescriptor ¶
func (EncryptionAlgo) EnumDescriptor() ([]byte, []int)
func (EncryptionAlgo) String ¶
func (x EncryptionAlgo) String() string
type Entry ¶
type Entry struct { ChunkID ID `db:"chunk_id"` Gen uint64 `db:"gen"` Uploaded bool `db:"uploaded"` Tombstone bool `db:"tombstone"` }
Entry is an chunk object mapping
type EntryFunc ¶
EntryFunc is a function that provides the metadata for an entry and a data reference to the entry in a chunk. Size zero entries will have a nil data reference.
type GarbageCollector ¶
type GarbageCollector struct {
// contains filtered or unexported fields
}
GarbageCollector removes unused chunks from object storage
func NewGC ¶
func NewGC(s *Storage, d time.Duration) *GarbageCollector
NewGC returns a new garbage collector operating on s
func (*GarbageCollector) RunForever ¶
func (gc *GarbageCollector) RunForever(ctx context.Context) error
RunForever calls RunOnce until the context is cancelled, logging any errors.
type ID ¶
type ID []byte
ID uniquely identifies a chunk. It is the hash of its content
func ParseTrackerID ¶
ParseTrackerID parses a trackerID into a chunk
type KeyStore ¶
type KeyStore interface { Create(ctx context.Context, name string, data []byte) error Get(ctx context.Context, name string) ([]byte, error) }
KeyStore is a store for named secret keys
type Reader ¶
type Reader struct {
// contains filtered or unexported fields
}
Reader reads data from chunk storage.
type ReaderOption ¶
type ReaderOption func(*Reader)
func WithOffsetBytes ¶
func WithOffsetBytes(offsetBytes int64) ReaderOption
func WithPrefetchLimit ¶
func WithPrefetchLimit(limit int) ReaderOption
type Ref ¶
type Ref struct { Id []byte `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` SizeBytes int64 `protobuf:"varint,2,opt,name=size_bytes,json=sizeBytes,proto3" json:"size_bytes,omitempty"` Edge bool `protobuf:"varint,3,opt,name=edge,proto3" json:"edge,omitempty"` Dek []byte `protobuf:"bytes,4,opt,name=dek,proto3" json:"dek,omitempty"` EncryptionAlgo EncryptionAlgo `` /* 130-byte string literal not displayed */ CompressionAlgo CompressionAlgo `` /* 134-byte string literal not displayed */ XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
func Create ¶
func Create(ctx context.Context, opts CreateOptions, ptext []byte, createFunc func(ctx context.Context, data []byte) (ID, error)) (*Ref, error)
Create calls createFunc to create a new chunk, but first compresses, and encrypts ptext. ptext will not be modified.
func (*Ref) Descriptor ¶
func (*Ref) GetCompressionAlgo ¶
func (m *Ref) GetCompressionAlgo() CompressionAlgo
func (*Ref) GetEncryptionAlgo ¶
func (m *Ref) GetEncryptionAlgo() EncryptionAlgo
func (*Ref) GetSizeBytes ¶
func (*Ref) MarshalLogObject ¶
func (x *Ref) MarshalLogObject(enc zapcore.ObjectEncoder) error
func (*Ref) ProtoMessage ¶
func (*Ref) ProtoMessage()
func (*Ref) XXX_DiscardUnknown ¶
func (m *Ref) XXX_DiscardUnknown()
func (*Ref) XXX_Unmarshal ¶
type Storage ¶
type Storage struct {
// contains filtered or unexported fields
}
Storage is an abstraction for interfacing with chunk storage. A storage instance: - Provides methods for uploading data to chunks and downloading data from chunks. - Manages tracker state to keep chunks alive while uploading. - Manages an internal chunk cache and work deduplicator (parallel downloads of the same chunk will be deduplicated).
func NewStorage ¶
func NewStorage(objC obj.Client, memCache kv.GetPut, db *pachsql.DB, tracker track.Tracker, opts ...StorageOption) *Storage
NewStorage creates a new Storage.
func NewTestStorage ¶
func NewTestStorage(ctx context.Context, t testing.TB, db *pachsql.DB, tr track.Tracker, opts ...StorageOption) (obj.Client, *Storage)
NewTestStorage creates a local storage instance for testing during the lifetime of the callback.
func (*Storage) Check ¶
Check runs an integrity check on the objects in object storage. It will check objects for chunks with IDs in the range [first, last) As a special case: if len(end) == 0 then it is ignored.
func (*Storage) NewBatcher ¶
func (s *Storage) NewBatcher(ctx context.Context, name string, threshold int, opts ...BatcherOption) *Batcher
TODO: Add config for number of entries.
func (*Storage) NewDataReader ¶
func (s *Storage) NewDataReader(ctx context.Context, dataRef *DataRef) *DataReader
func (*Storage) NewDeleter ¶
NewDeleter creates a deleter for use with a tracker.GC
func (*Storage) NewUploader ¶
type StorageOption ¶
type StorageOption func(s *Storage)
StorageOption configures a storage.
func StorageOptions ¶
func StorageOptions(conf *serviceenv.StorageConfiguration) ([]StorageOption, error)
StorageOptions returns the chunk storage options for the config.
func WithCompression ¶
func WithCompression(algo CompressionAlgo) StorageOption
WithCompression sets the compression algorithm used to compress chunks
func WithMaxConcurrentObjects ¶
func WithMaxConcurrentObjects(maxDownload, maxUpload int) StorageOption
WithMaxConcurrentObjects sets the maximum number of object writers (upload) and readers (download) that can be open at a time.
func WithObjectCache ¶
func WithObjectCache(fastLayer obj.Client, size int) StorageOption
WithObjectCache adds a cache around the currently configured object client
func WithSecret ¶
func WithSecret(secret []byte) StorageOption
WithSecret sets the secret used to generate chunk encryption keys
type UploadFunc ¶
UploadFunc is a function that provides the metadata for a task and the corresponding set of chunk references.
type Uploader ¶
type Uploader struct {
// contains filtered or unexported fields
}
Uploader uploads chunks. Each upload call creates at least one upload task with the provided metadata. Upload tasks are performed asynchronously, which is why the interface is callback based. Callbacks will be executed with respect to the order the upload tasks are created.
func (*Uploader) Copy ¶
Copy performs an upload using a list of data references as the data source. Stable data references will be reused. Unstable data references will have their data downloaded and uploaded similar to a normal upload.