chunk

package
v2.8.0-nightly.20231016 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 12, 2023 License: Apache-2.0 Imports: 50 Imported by: 0

Documentation

Overview

Package chunk provides access to data through content-addressed chunks.

A chunk is the basic unit of storage for data. Chunks are identified by their content-address, which is a hash of the data stored in the chunk. There are two mechanisms for uploading chunks: uploader and batcher. The uploader is intended for medium / large data entries since it performs content-defined chunking on the data and stores each data entry in its own set of chunks. The batcher is intended for small data entries since it batches multiple data entries into larger chunks. The result of each of these upload methods is a list of data references for each data entry. These data references can be used to access the corresponding data through readers.

Index

Constants

View Source
const (
	DefaultAverageBits  = 23
	DefaultSeed         = 1
	DefaultMinChunkSize = 1 * units.MB
	DefaultMaxChunkSize = 20 * units.MB
)
View Source
const (
	// TrackerPrefix is the prefix used when creating tracker objects for chunks
	TrackerPrefix = "chunk/"

	DefaultPrefetchLimit = 10
)
View Source
const (
	// WindowSize is the size of the rolling hash window.
	WindowSize = 64
)

Variables

View Source
var (
	CompressionAlgo_name = map[int32]string{
		0: "NONE",
		1: "GZIP_BEST_SPEED",
	}
	CompressionAlgo_value = map[string]int32{
		"NONE":            0,
		"GZIP_BEST_SPEED": 1,
	}
)

Enum value maps for CompressionAlgo.

View Source
var (
	EncryptionAlgo_name = map[int32]string{
		0: "ENCRYPTION_ALGO_UNKNOWN",
		1: "CHACHA20",
	}
	EncryptionAlgo_value = map[string]int32{
		"ENCRYPTION_ALGO_UNKNOWN": 0,
		"CHACHA20":                1,
	}
)

Enum value maps for EncryptionAlgo.

View Source
var File_internal_storage_chunk_chunk_proto protoreflect.FileDescriptor

Functions

func ComputeChunks

func ComputeChunks(r io.Reader, cb func([]byte) error) error

ComputeChunks splits a stream of bytes into chunks using a content-defined chunking algorithm. To prevent suboptimal chunk sizes, a minimum and maximum chunk size is enforced. This algorithm is useful for ensuring that typical data modifications (insertions, deletions, updates) only affect a small number of chunks. TODO: Expose configuration.

func Get

func Get(ctx context.Context, client Client, ref *Ref) ([]byte, error)

Get calls client.Get to retrieve a chunk, then verifies, decrypts, and decompresses the data. the uncompressed plaintext, is returned.

func NewPostgresKeyStore

func NewPostgresKeyStore(db *pachsql.DB) *postgresKeyStore

func SetupPostgresStoreV0

func SetupPostgresStoreV0(ctx context.Context, tx *pachsql.Tx) error

SetupPostgresStoreV0 sets up tables in db DO NOT MODIFY THIS FUNCTION IT HAS BEEN USED IN A RELEASED MIGRATION

Types

type Batcher

type Batcher struct {
	// contains filtered or unexported fields
}

Batcher batches entries into chunks. Entries are buffered until they are past the configured threshold, then a chunk is created. Chunk creation is asynchronous with respect to the client, which is why the interface is callback based. Batcher provides one of two callback based interfaces defined by ChunkFunc and EntryFunc. Callbacks will be executed with respect to the order the entries are added (for the ChunkFunc interface, entries are ordered within as well as across calls).

func (*Batcher) Add

func (b *Batcher) Add(meta interface{}, data []byte, pointsTo []*DataRef) error

func (*Batcher) Close

func (b *Batcher) Close() error

type BatcherOption

type BatcherOption func(b *Batcher)

func WithChunkCallback

func WithChunkCallback(cb ChunkFunc) BatcherOption

func WithEntryCallback

func WithEntryCallback(cb EntryFunc) BatcherOption

type ChunkFunc

type ChunkFunc = func([]interface{}, *DataRef) error

ChunkFunc is a function that provides the metadata for the entries in a chunk and a data reference to the chunk.

type Client

type Client interface {
	Create(ctx context.Context, md Metadata, chunkData []byte) (ID, error)
	Get(ctx context.Context, chunkID ID, cb kv.ValueCallback) error
	Close() error
}

Client mediates access to a content-addressed store

func NewClient

func NewClient(store kv.Store, db *pachsql.DB, tr track.Tracker, renewer *Renewer, pool *kv.Pool) Client

NewClient returns a client which will write to objc, mdstore, and tracker. Name is used for the set of temporary objects

type CompressionAlgo

type CompressionAlgo int32
const (
	CompressionAlgo_NONE            CompressionAlgo = 0
	CompressionAlgo_GZIP_BEST_SPEED CompressionAlgo = 1
)

func (CompressionAlgo) Descriptor added in v2.7.0

func (CompressionAlgo) Enum added in v2.7.0

func (x CompressionAlgo) Enum() *CompressionAlgo

func (CompressionAlgo) EnumDescriptor deprecated

func (CompressionAlgo) EnumDescriptor() ([]byte, []int)

Deprecated: Use CompressionAlgo.Descriptor instead.

func (CompressionAlgo) Number added in v2.7.0

func (CompressionAlgo) String

func (x CompressionAlgo) String() string

func (CompressionAlgo) Type added in v2.7.0

type CreateOptions

type CreateOptions struct {
	Secret      []byte
	Compression CompressionAlgo
}

CreateOptions affect how chunks are created.

type DataReader

type DataReader struct {
	// contains filtered or unexported fields
}

DataReader is an abstraction that lazily reads data referenced by a data reference.

func (*DataReader) Read

func (dr *DataReader) Read(data []byte) (int, error)

type DataRef

type DataRef struct {

	// The chunk the referenced data is located in.
	Ref *Ref `protobuf:"bytes,1,opt,name=ref,proto3" json:"ref,omitempty"`
	// The hash of the data being referenced.
	Hash []byte `protobuf:"bytes,2,opt,name=hash,proto3" json:"hash,omitempty"`
	// The offset and size used for accessing the data within the chunk.
	OffsetBytes int64 `protobuf:"varint,3,opt,name=offset_bytes,json=offsetBytes,proto3" json:"offset_bytes,omitempty"`
	SizeBytes   int64 `protobuf:"varint,4,opt,name=size_bytes,json=sizeBytes,proto3" json:"size_bytes,omitempty"`
	// contains filtered or unexported fields
}

DataRef is a reference to data within a chunk.

func FullRef

func FullRef(dataRef *DataRef) *DataRef

FullRef creates a data reference for the full chunk referenced by a data reference.

func NewDataRef

func NewDataRef(chunkRef *DataRef, chunkBytes []byte, offset, size int64) *DataRef

func (*DataRef) Descriptor deprecated

func (*DataRef) Descriptor() ([]byte, []int)

Deprecated: Use DataRef.ProtoReflect.Descriptor instead.

func (*DataRef) GetHash

func (x *DataRef) GetHash() []byte

func (*DataRef) GetOffsetBytes

func (x *DataRef) GetOffsetBytes() int64

func (*DataRef) GetRef

func (x *DataRef) GetRef() *Ref

func (*DataRef) GetSizeBytes

func (x *DataRef) GetSizeBytes() int64

func (*DataRef) MarshalLogObject

func (x *DataRef) MarshalLogObject(enc zapcore.ObjectEncoder) error

func (*DataRef) ProtoMessage

func (*DataRef) ProtoMessage()

func (*DataRef) ProtoReflect added in v2.7.0

func (x *DataRef) ProtoReflect() protoreflect.Message

func (*DataRef) Reset

func (x *DataRef) Reset()

func (*DataRef) String

func (x *DataRef) String() string

func (*DataRef) Validate added in v2.8.0

func (m *DataRef) Validate() error

Validate checks the field values on DataRef with the rules defined in the proto definition for this message. If any rules are violated, the first error encountered is returned, or nil if there are no violations.

func (*DataRef) ValidateAll added in v2.8.0

func (m *DataRef) ValidateAll() error

ValidateAll checks the field values on DataRef with the rules defined in the proto definition for this message. If any rules are violated, the result is a list of violation errors wrapped in DataRefMultiError, or nil if none found.

type DataRefMultiError added in v2.8.0

type DataRefMultiError []error

DataRefMultiError is an error wrapping multiple validation errors returned by DataRef.ValidateAll() if the designated constraints aren't met.

func (DataRefMultiError) AllErrors added in v2.8.0

func (m DataRefMultiError) AllErrors() []error

AllErrors returns a list of validation violation errors.

func (DataRefMultiError) Error added in v2.8.0

func (m DataRefMultiError) Error() string

Error returns a concatenation of all the error messages it wraps.

type DataRefValidationError added in v2.8.0

type DataRefValidationError struct {
	// contains filtered or unexported fields
}

DataRefValidationError is the validation error returned by DataRef.Validate if the designated constraints aren't met.

func (DataRefValidationError) Cause added in v2.8.0

func (e DataRefValidationError) Cause() error

Cause function returns cause value.

func (DataRefValidationError) Error added in v2.8.0

func (e DataRefValidationError) Error() string

Error satisfies the builtin error interface

func (DataRefValidationError) ErrorName added in v2.8.0

func (e DataRefValidationError) ErrorName() string

ErrorName returns error name.

func (DataRefValidationError) Field added in v2.8.0

func (e DataRefValidationError) Field() string

Field function returns field value.

func (DataRefValidationError) Key added in v2.8.0

func (e DataRefValidationError) Key() bool

Key function returns key value.

func (DataRefValidationError) Reason added in v2.8.0

func (e DataRefValidationError) Reason() string

Reason function returns reason value.

type EncryptionAlgo

type EncryptionAlgo int32
const (
	EncryptionAlgo_ENCRYPTION_ALGO_UNKNOWN EncryptionAlgo = 0
	EncryptionAlgo_CHACHA20                EncryptionAlgo = 1
)

func (EncryptionAlgo) Descriptor added in v2.7.0

func (EncryptionAlgo) Enum added in v2.7.0

func (x EncryptionAlgo) Enum() *EncryptionAlgo

func (EncryptionAlgo) EnumDescriptor deprecated

func (EncryptionAlgo) EnumDescriptor() ([]byte, []int)

Deprecated: Use EncryptionAlgo.Descriptor instead.

func (EncryptionAlgo) Number added in v2.7.0

func (EncryptionAlgo) String

func (x EncryptionAlgo) String() string

func (EncryptionAlgo) Type added in v2.7.0

type Entry

type Entry struct {
	ChunkID   ID     `db:"chunk_id"`
	Gen       uint64 `db:"gen"`
	Uploaded  bool   `db:"uploaded"`
	Tombstone bool   `db:"tombstone"`
}

Entry is an chunk object mapping

type EntryFunc

type EntryFunc = func(interface{}, *DataRef) error

EntryFunc is a function that provides the metadata for an entry and a data reference to the entry in a chunk. Size zero entries will have a nil data reference.

type GarbageCollector

type GarbageCollector struct {
	// contains filtered or unexported fields
}

GarbageCollector removes unused chunks from object storage

func NewGC

func NewGC(s *Storage, d time.Duration) *GarbageCollector

NewGC returns a new garbage collector operating on s

func (*GarbageCollector) RunForever

func (gc *GarbageCollector) RunForever(ctx context.Context) error

RunForever calls RunOnce until the context is cancelled, logging any errors.

func (*GarbageCollector) RunOnce

func (gc *GarbageCollector) RunOnce(ctx context.Context) (retErr error)

RunOnce runs 1 cycle of garbage collection.

type ID

type ID []byte

ID uniquely identifies a chunk. It is the hash of its content

func Hash

func Hash(data []byte) ID

Hash produces an ID by hashing data

func IDFromHex

func IDFromHex(h string) (ID, error)

IDFromHex parses a hex string into an ID

func ParseTrackerID

func ParseTrackerID(trackerID string) (ID, error)

ParseTrackerID parses a trackerID into a chunk

func (ID) HexString

func (id ID) HexString() string

HexString hex encodes the ID

func (ID) String

func (id ID) String() string

func (ID) TrackerID

func (id ID) TrackerID() string

TrackerID returns an ID for use with the tracker.

type KeyStore

type KeyStore interface {
	Create(ctx context.Context, name string, data []byte) error
	Get(ctx context.Context, name string) ([]byte, error)
}

KeyStore is a store for named secret keys

type Metadata

type Metadata struct {
	Size     int
	PointsTo []ID
}

Metadata holds metadata about a chunk

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader reads data from chunk storage.

func (*Reader) Get

func (r *Reader) Get(w io.Writer) (retErr error)

Get writes the concatenation of the data referenced by the data references.

type ReaderOption

type ReaderOption func(*Reader)

func WithOffsetBytes

func WithOffsetBytes(offsetBytes int64) ReaderOption

func WithPrefetchLimit

func WithPrefetchLimit(limit int) ReaderOption

type Ref

type Ref struct {
	Id              []byte          `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
	SizeBytes       int64           `protobuf:"varint,2,opt,name=size_bytes,json=sizeBytes,proto3" json:"size_bytes,omitempty"`
	Edge            bool            `protobuf:"varint,3,opt,name=edge,proto3" json:"edge,omitempty"`
	Dek             []byte          `protobuf:"bytes,4,opt,name=dek,proto3" json:"dek,omitempty"`
	EncryptionAlgo  EncryptionAlgo  `` /* 130-byte string literal not displayed */
	CompressionAlgo CompressionAlgo `` /* 134-byte string literal not displayed */
	// contains filtered or unexported fields
}

func Create

func Create(ctx context.Context, opts CreateOptions, ptext []byte, createFunc func(ctx context.Context, data []byte) (ID, error)) (*Ref, error)

Create calls createFunc to create a new chunk, but first compresses, and encrypts ptext. ptext will not be modified.

func (*Ref) Descriptor deprecated

func (*Ref) Descriptor() ([]byte, []int)

Deprecated: Use Ref.ProtoReflect.Descriptor instead.

func (*Ref) GetCompressionAlgo

func (x *Ref) GetCompressionAlgo() CompressionAlgo

func (*Ref) GetDek

func (x *Ref) GetDek() []byte

func (*Ref) GetEdge

func (x *Ref) GetEdge() bool

func (*Ref) GetEncryptionAlgo

func (x *Ref) GetEncryptionAlgo() EncryptionAlgo

func (*Ref) GetId

func (x *Ref) GetId() []byte

func (*Ref) GetSizeBytes

func (x *Ref) GetSizeBytes() int64

func (*Ref) Key

func (r *Ref) Key() pachhash.Output

Key returns a unique key for the Ref suitable for use in hash tables

func (*Ref) MarshalLogObject

func (x *Ref) MarshalLogObject(enc zapcore.ObjectEncoder) error

func (*Ref) ProtoMessage

func (*Ref) ProtoMessage()

func (*Ref) ProtoReflect added in v2.7.0

func (x *Ref) ProtoReflect() protoreflect.Message

func (*Ref) Reset

func (x *Ref) Reset()

func (*Ref) String

func (x *Ref) String() string

func (*Ref) Validate added in v2.8.0

func (m *Ref) Validate() error

Validate checks the field values on Ref with the rules defined in the proto definition for this message. If any rules are violated, the first error encountered is returned, or nil if there are no violations.

func (*Ref) ValidateAll added in v2.8.0

func (m *Ref) ValidateAll() error

ValidateAll checks the field values on Ref with the rules defined in the proto definition for this message. If any rules are violated, the result is a list of violation errors wrapped in RefMultiError, or nil if none found.

type RefMultiError added in v2.8.0

type RefMultiError []error

RefMultiError is an error wrapping multiple validation errors returned by Ref.ValidateAll() if the designated constraints aren't met.

func (RefMultiError) AllErrors added in v2.8.0

func (m RefMultiError) AllErrors() []error

AllErrors returns a list of validation violation errors.

func (RefMultiError) Error added in v2.8.0

func (m RefMultiError) Error() string

Error returns a concatenation of all the error messages it wraps.

type RefValidationError added in v2.8.0

type RefValidationError struct {
	// contains filtered or unexported fields
}

RefValidationError is the validation error returned by Ref.Validate if the designated constraints aren't met.

func (RefValidationError) Cause added in v2.8.0

func (e RefValidationError) Cause() error

Cause function returns cause value.

func (RefValidationError) Error added in v2.8.0

func (e RefValidationError) Error() string

Error satisfies the builtin error interface

func (RefValidationError) ErrorName added in v2.8.0

func (e RefValidationError) ErrorName() string

ErrorName returns error name.

func (RefValidationError) Field added in v2.8.0

func (e RefValidationError) Field() string

Field function returns field value.

func (RefValidationError) Key added in v2.8.0

func (e RefValidationError) Key() bool

Key function returns key value.

func (RefValidationError) Reason added in v2.8.0

func (e RefValidationError) Reason() string

Reason function returns reason value.

type Renewer

type Renewer struct {
	// contains filtered or unexported fields
}

func NewRenewer

func NewRenewer(ctx context.Context, tr track.Tracker, name string, ttl time.Duration) *Renewer

func (*Renewer) Add

func (r *Renewer) Add(ctx context.Context, id ID) error

func (*Renewer) Close

func (r *Renewer) Close() error

type Storage

type Storage struct {
	// contains filtered or unexported fields
}

Storage is an abstraction for interfacing with chunk storage. A storage instance: - Provides methods for uploading data to chunks and downloading data from chunks. - Manages tracker state to keep chunks alive while uploading. - Manages an internal chunk cache and work deduplicator (parallel downloads of the same chunk will be deduplicated).

func NewStorage

func NewStorage(store kv.Store, db *pachsql.DB, tracker track.Tracker, opts ...StorageOption) *Storage

NewStorage creates a new Storage.

func NewTestStorage

func NewTestStorage(t testing.TB, db *pachsql.DB, tr track.Tracker, opts ...StorageOption) (kv.Store, *Storage)

NewTestStorage creates a local storage instance for testing during the lifetime of the callback.

func (*Storage) Check

func (s *Storage) Check(ctx context.Context, begin, end []byte, readChunks bool) (int, error)

Check runs an integrity check on the objects in object storage. It will check objects for chunks with IDs in the range [first, last) As a special case: if len(end) == 0 then it is ignored.

func (*Storage) ListStore added in v2.7.0

func (s *Storage) ListStore(ctx context.Context, cb func(id ID, gen uint64) error) error

ListStore lists all of the chunks in object storage. This is not the same as listing the chunk entries in the database.

func (*Storage) NewBatcher

func (s *Storage) NewBatcher(ctx context.Context, name string, threshold int, opts ...BatcherOption) *Batcher

TODO: Add config for number of entries.

func (*Storage) NewDataReader

func (s *Storage) NewDataReader(ctx context.Context, dataRef *DataRef) *DataReader

func (*Storage) NewDeleter

func (s *Storage) NewDeleter() track.Deleter

NewDeleter creates a deleter for use with a tracker.GC

func (*Storage) NewReader

func (s *Storage) NewReader(ctx context.Context, dataRefs []*DataRef, opts ...ReaderOption) *Reader

NewReader creates a new Reader.

func (*Storage) NewUploader

func (s *Storage) NewUploader(ctx context.Context, name string, noUpload bool, cb UploadFunc) *Uploader

func (*Storage) PrefetchData

func (s *Storage) PrefetchData(ctx context.Context, dataRef *DataRef) error

type StorageOption

type StorageOption func(s *Storage)

StorageOption configures a storage.

func WithCompression

func WithCompression(algo CompressionAlgo) StorageOption

WithCompression sets the compression algorithm used to compress chunks

func WithMemoryCacheSize added in v2.7.0

func WithMemoryCacheSize(size int) StorageOption

WithMemoryCacheSize sets the number of decrypted, uncompressed chunks that will be stored in memory.

func WithSecret

func WithSecret(secret []byte) StorageOption

WithSecret sets the secret used to generate chunk encryption keys

type UploadFunc

type UploadFunc = func(interface{}, []*DataRef) error

UploadFunc is a function that provides the metadata for a task and the corresponding set of chunk references.

type Uploader

type Uploader struct {
	// contains filtered or unexported fields
}

Uploader uploads chunks. Each upload call creates at least one upload task with the provided metadata. Upload tasks are performed asynchronously, which is why the interface is callback based. Callbacks will be executed with respect to the order the upload tasks are created.

func (*Uploader) Close

func (u *Uploader) Close() error

func (*Uploader) Copy

func (u *Uploader) Copy(meta interface{}, dataRefs []*DataRef) error

Copy performs an upload using a list of data references as the data source. Stable data references will be reused. Unstable data references will have their data downloaded and uploaded similar to a normal upload.

func (*Uploader) CopyByReference

func (u *Uploader) CopyByReference(meta interface{}, dataRefs []*DataRef) error

func (*Uploader) Upload

func (u *Uploader) Upload(meta interface{}, r io.Reader) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL