Documentation ¶
Overview ¶
Package shed provides a simple abstraction components to compose more complex operations on storage data organized in fields and indexes.
Only type which holds logical information about infinity storage chunks data and metadata is Item. This part is not generalized mostly for performance reasons.
Example (Store) ¶
Example_store constructs a simple storage implementation using shed package.
package main import ( "bytes" "context" "encoding/binary" "errors" "fmt" "log" "time" "github.com/syndtr/goleveldb/leveldb" "github.com/yanhuangpai/voyager/pkg/infinity" "github.com/yanhuangpai/voyager/pkg/shed" "github.com/yanhuangpai/voyager/pkg/storage" "github.com/yanhuangpai/voyager/pkg/storage/testing" ) // Store holds fields and indexes (including their encoding functions) // and defines operations on them by composing data from them. // It is just an example without any support for parallel operations // or real world implementation. type Store struct { db *shed.DB // fields and indexes schemaName shed.StringField accessCounter shed.Uint64Field retrievalIndex shed.Index accessIndex shed.Index gcIndex shed.Index } // New returns new Store. All fields and indexes are initialized // and possible conflicts with schema from existing database is checked // automatically. func New(path string) (s *Store, err error) { db, err := shed.NewDB(path, nil) if err != nil { return nil, err } s = &Store{ db: db, } // Identify current storage schema by arbitrary name. s.schemaName, err = db.NewStringField("schema-name") if err != nil { return nil, err } // Global ever incrementing index of chunk accesses. s.accessCounter, err = db.NewUint64Field("access-counter") if err != nil { return nil, err } // Index storing actual chunk address, data and store timestamp. s.retrievalIndex, err = db.NewIndex("Address->StoreTimestamp|Data", shed.IndexFuncs{ EncodeKey: func(fields shed.Item) (key []byte, err error) { return fields.Address, nil }, DecodeKey: func(key []byte) (e shed.Item, err error) { e.Address = key return e, nil }, EncodeValue: func(fields shed.Item) (value []byte, err error) { b := make([]byte, 8) binary.BigEndian.PutUint64(b, uint64(fields.StoreTimestamp)) value = append(b, fields.Data...) return value, nil }, DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) { e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[:8])) e.Data = value[8:] return e, nil }, }) if err != nil { return nil, err } // Index storing access timestamp for a particular address. // It is needed in order to update gc index keys for iteration order. s.accessIndex, err = db.NewIndex("Address->AccessTimestamp", shed.IndexFuncs{ EncodeKey: func(fields shed.Item) (key []byte, err error) { return fields.Address, nil }, DecodeKey: func(key []byte) (e shed.Item, err error) { e.Address = key return e, nil }, EncodeValue: func(fields shed.Item) (value []byte, err error) { b := make([]byte, 8) binary.BigEndian.PutUint64(b, uint64(fields.AccessTimestamp)) return b, nil }, DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) { e.AccessTimestamp = int64(binary.BigEndian.Uint64(value)) return e, nil }, }) if err != nil { return nil, err } // Index with keys ordered by access timestamp for garbage collection prioritization. s.gcIndex, err = db.NewIndex("AccessTimestamp|StoredTimestamp|Address->nil", shed.IndexFuncs{ EncodeKey: func(fields shed.Item) (key []byte, err error) { b := make([]byte, 16, 16+len(fields.Address)) binary.BigEndian.PutUint64(b[:8], uint64(fields.AccessTimestamp)) binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp)) key = append(b, fields.Address...) return key, nil }, DecodeKey: func(key []byte) (e shed.Item, err error) { e.AccessTimestamp = int64(binary.BigEndian.Uint64(key[:8])) e.StoreTimestamp = int64(binary.BigEndian.Uint64(key[8:16])) e.Address = key[16:] return e, nil }, EncodeValue: func(fields shed.Item) (value []byte, err error) { return nil, nil }, DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) { return e, nil }, }) if err != nil { return nil, err } return s, nil } // Put stores the chunk and sets it store timestamp. func (s *Store) Put(_ context.Context, ch infinity.Chunk) (err error) { return s.retrievalIndex.Put(shed.Item{ Address: ch.Address().Bytes(), Data: ch.Data(), StoreTimestamp: time.Now().UTC().UnixNano(), }) } // Get retrieves a chunk with the provided address. // It updates access and gc indexes by removing the previous // items from them and adding new items as keys of index entries // are changed. func (s *Store) Get(_ context.Context, addr infinity.Address) (c infinity.Chunk, err error) { batch := new(leveldb.Batch) // Get the chunk data and storage timestamp. item, err := s.retrievalIndex.Get(shed.Item{ Address: addr.Bytes(), }) if err != nil { if errors.Is(err, leveldb.ErrNotFound) { return nil, storage.ErrNotFound } return nil, fmt.Errorf("retrieval index get: %w", err) } // Get the chunk access timestamp. accessItem, err := s.accessIndex.Get(shed.Item{ Address: addr.Bytes(), }) switch { case err == nil: // Remove gc index entry if access timestamp is found. err = s.gcIndex.DeleteInBatch(batch, shed.Item{ Address: item.Address, StoreTimestamp: accessItem.AccessTimestamp, AccessTimestamp: item.StoreTimestamp, }) if err != nil { return nil, fmt.Errorf("gc index delete in batch: %w", err) } case errors.Is(err, leveldb.ErrNotFound): // Access timestamp is not found. Do not do anything. // This is the first get request. default: return nil, fmt.Errorf("access index get: %w", err) } // Specify new access timestamp accessTimestamp := time.Now().UTC().UnixNano() // Put new access timestamp in access index. err = s.accessIndex.PutInBatch(batch, shed.Item{ Address: addr.Bytes(), AccessTimestamp: accessTimestamp, }) if err != nil { return nil, fmt.Errorf("access index put in batch: %w", err) } // Put new access timestamp in gc index. err = s.gcIndex.PutInBatch(batch, shed.Item{ Address: item.Address, AccessTimestamp: accessTimestamp, StoreTimestamp: item.StoreTimestamp, }) if err != nil { return nil, fmt.Errorf("gc index put in batch: %w", err) } // Increment access counter. // Currently this information is not used anywhere. _, err = s.accessCounter.IncInBatch(batch) if err != nil { return nil, fmt.Errorf("access counter inc in batch: %w", err) } // Write the batch. err = s.db.WriteBatch(batch) if err != nil { return nil, fmt.Errorf("write batch: %w", err) } // Return the chunk. return infinity.NewChunk(infinity.NewAddress(item.Address), item.Data), nil } // CollectGarbage is an example of index iteration. // It provides no reliable garbage collection functionality. func (s *Store) CollectGarbage() (err error) { const maxTrashSize = 100 maxRounds := 10 // arbitrary number, needs to be calculated // Run a few gc rounds. for roundCount := 0; roundCount < maxRounds; roundCount++ { var garbageCount int // New batch for a new cg round. trash := new(leveldb.Batch) // Iterate through all index items and break when needed. err = s.gcIndex.Iterate(func(item shed.Item) (stop bool, err error) { // Remove the chunk. err = s.retrievalIndex.DeleteInBatch(trash, item) if err != nil { return false, err } // Remove the element in gc index. err = s.gcIndex.DeleteInBatch(trash, item) if err != nil { return false, err } // Remove the relation in access index. err = s.accessIndex.DeleteInBatch(trash, item) if err != nil { return false, err } garbageCount++ if garbageCount >= maxTrashSize { return true, nil } return false, nil }, nil) if err != nil { return err } if garbageCount == 0 { return nil } err = s.db.WriteBatch(trash) if err != nil { return err } } return nil } // GetSchema is an example of retrieveing the most simple // string from a database field. func (s *Store) GetSchema() (name string, err error) { name, err = s.schemaName.Get() if errors.Is(err, leveldb.ErrNotFound) { return "", nil } return name, err } // PutSchema is an example of storing the most simple // string in a database field. func (s *Store) PutSchema(name string) (err error) { return s.schemaName.Put(name) } // Close closes the underlying database. func (s *Store) Close() error { return s.db.Close() } // Example_store constructs a simple storage implementation using shed package. func main() { s, err := New("") if err != nil { log.Fatal(err) } defer s.Close() ch := testing.GenerateTestRandomChunk() err = s.Put(context.Background(), ch) if err != nil { fmt.Println("put chunk:", err) return } got, err := s.Get(context.Background(), ch.Address()) if err != nil { fmt.Println("get chunk:", err) return } fmt.Println(bytes.Equal(got.Data(), ch.Data())) }
Output: true
Index ¶
- type DB
- func (db *DB) Close() (err error)
- func (db *DB) Delete(key []byte) (err error)
- func (db *DB) Get(key []byte) (value []byte, err error)
- func (db *DB) Has(key []byte) (yes bool, err error)
- func (s *DB) Metrics() []prometheus.Collector
- func (db *DB) NewIndex(name string, funcs IndexFuncs) (f Index, err error)
- func (db *DB) NewIterator() iterator.Iterator
- func (db *DB) NewStringField(name string) (f StringField, err error)
- func (db *DB) NewStructField(name string) (f StructField, err error)
- func (db *DB) NewUint64Field(name string) (f Uint64Field, err error)
- func (db *DB) NewUint64Vector(name string) (f Uint64Vector, err error)
- func (db *DB) Put(key, value []byte) (err error)
- func (db *DB) RenameIndex(name, newName string) (renamed bool, err error)
- func (db *DB) WriteBatch(batch *leveldb.Batch) (err error)
- type Index
- func (f Index) Count() (count int, err error)
- func (f Index) CountFrom(start Item) (count int, err error)
- func (f Index) Delete(keyFields Item) (err error)
- func (f Index) DeleteInBatch(batch *leveldb.Batch, keyFields Item) (err error)
- func (f Index) Fill(items []Item) (err error)
- func (f Index) First(prefix []byte) (i Item, err error)
- func (f Index) Get(keyFields Item) (out Item, err error)
- func (f Index) Has(keyFields Item) (bool, error)
- func (f Index) HasMulti(items ...Item) ([]bool, error)
- func (f Index) ItemKey(item Item) (key []byte, err error)
- func (f Index) Iterate(fn IndexIterFunc, options *IterateOptions) (err error)
- func (f Index) Last(prefix []byte) (i Item, err error)
- func (f Index) Put(i Item) (err error)
- func (f Index) PutInBatch(batch *leveldb.Batch, i Item) (err error)
- type IndexFuncs
- type IndexIterFunc
- type Item
- type IterateOptions
- type Options
- type StringField
- type StructField
- type Uint64Field
- func (f Uint64Field) Dec() (val uint64, err error)
- func (f Uint64Field) DecInBatch(batch *leveldb.Batch) (val uint64, err error)
- func (f Uint64Field) Get() (val uint64, err error)
- func (f Uint64Field) Inc() (val uint64, err error)
- func (f Uint64Field) IncInBatch(batch *leveldb.Batch) (val uint64, err error)
- func (f Uint64Field) Put(val uint64) (err error)
- func (f Uint64Field) PutInBatch(batch *leveldb.Batch, val uint64)
- type Uint64Vector
- func (f Uint64Vector) Dec(i uint64) (val uint64, err error)
- func (f Uint64Vector) DecInBatch(batch *leveldb.Batch, i uint64) (val uint64, err error)
- func (f Uint64Vector) Get(i uint64) (val uint64, err error)
- func (f Uint64Vector) Inc(i uint64) (val uint64, err error)
- func (f Uint64Vector) IncInBatch(batch *leveldb.Batch, i uint64) (val uint64, err error)
- func (f Uint64Vector) Put(i, val uint64) (err error)
- func (f Uint64Vector) PutInBatch(batch *leveldb.Batch, i, val uint64)
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DB ¶
type DB struct {
// contains filtered or unexported fields
}
DB provides abstractions over LevelDB in order to implement complex structures using fields and ordered indexes. It provides a schema functionality to store fields and indexes information about naming and types.
func NewDB ¶
NewDB constructs a new DB and validates the schema if it exists in database on the given path. metricsPrefix is used for metrics collection for the given DB.
func (*DB) Metrics ¶
func (s *DB) Metrics() []prometheus.Collector
func (*DB) NewIndex ¶
func (db *DB) NewIndex(name string, funcs IndexFuncs) (f Index, err error)
NewIndex returns a new Index instance with defined name and encoding functions. The name must be unique and will be validated on database schema for a key prefix byte.
func (*DB) NewIterator ¶
NewIterator wraps LevelDB NewIterator method to increment metrics counter.
func (*DB) NewStringField ¶
func (db *DB) NewStringField(name string) (f StringField, err error)
NewStringField retruns a new Instance of StringField. It validates its name and type against the database schema.
func (*DB) NewStructField ¶
func (db *DB) NewStructField(name string) (f StructField, err error)
NewStructField returns a new StructField. It validates its name and type against the database schema.
func (*DB) NewUint64Field ¶
func (db *DB) NewUint64Field(name string) (f Uint64Field, err error)
NewUint64Field returns a new Uint64Field. It validates its name and type against the database schema.
func (*DB) NewUint64Vector ¶
func (db *DB) NewUint64Vector(name string) (f Uint64Vector, err error)
NewUint64Vector returns a new Uint64Vector. It validates its name and type against the database schema.
func (*DB) RenameIndex ¶
RenameIndex changes the schema so that an existing index name is changed while preserving its data by keeping the same internal key prefix. Renaming indexes is useful when encoding functions can be backward compatible to avoid data migrations.
type Index ¶
type Index struct {
// contains filtered or unexported fields
}
Index represents a set of LevelDB key value pairs that have common prefix. It holds functions for encoding and decoding keys and values to provide transparent actions on saved data which inclide: - getting a particular Item - saving a particular Item - iterating over a sorted LevelDB keys It implements IndexIteratorInterface interface.
func (Index) CountFrom ¶
CountFrom returns the number of items in index keys starting from the key encoded from the provided Item.
func (Index) Delete ¶
Delete accepts Item to remove a key/value pair from the database based on its fields.
func (Index) DeleteInBatch ¶
DeleteInBatch is the same as Delete just the operation is performed on the batch instead on the database.
func (Index) Fill ¶
Fill populates fields on provided items that are part of the encoded value by getting them based on information passed in their fields. Every item must have all fields needed for encoding the key set. The passed slice items will be changed so that they contain data from the index values. No new slice is allocated. This function uses a single leveldb snapshot.
func (Index) First ¶
First returns the first item in the Index which encoded key starts with a prefix. If the prefix is nil, the first element of the whole index is returned. If Index has no elements, a leveldb.ErrNotFound error is returned.
func (Index) Get ¶
Get accepts key fields represented as Item to retrieve a value from the index and return maximum available information from the index represented as another Item.
func (Index) Has ¶
Has accepts key fields represented as Item to check if there this Item's encoded key is stored in the index.
func (Index) HasMulti ¶
HasMulti accepts multiple multiple key fields represented as Item to check if there this Item's encoded key is stored in the index for each of them.
func (Index) Iterate ¶
func (f Index) Iterate(fn IndexIterFunc, options *IterateOptions) (err error)
Iterate function iterates over keys of the Index. If IterateOptions is nil, the iterations is over all keys.
func (Index) Last ¶
Last returns the last item in the Index which encoded key starts with a prefix. If the prefix is nil, the last element of the whole index is returned. If Index has no elements, a leveldb.ErrNotFound error is returned.
type IndexFuncs ¶
type IndexFuncs struct { EncodeKey func(fields Item) (key []byte, err error) DecodeKey func(key []byte) (e Item, err error) EncodeValue func(fields Item) (value []byte, err error) DecodeValue func(keyFields Item, value []byte) (e Item, err error) }
IndexFuncs structure defines functions for encoding and decoding LevelDB keys and values for a specific index.
type IndexIterFunc ¶
IndexIterFunc is a callback on every Item that is decoded by iterating on an Index keys. By returning a true for stop variable, iteration will stop, and by returning the error, that error will be propagated to the called iterator method on Index.
type Item ¶
type Item struct { Address []byte Data []byte AccessTimestamp int64 StoreTimestamp int64 BinID uint64 PinCounter uint64 // maintains the no of time a chunk is pinned Tag uint32 }
Item holds fields relevant to Infinity Chunk data and metadata. All information required for infinity storage and operations on that storage must be defined here. This structure is logically connected to infinity storage, the only part of this package that is not generalized, mostly for performance reasons.
Item is a type that is used for retrieving, storing and encoding chunk data and metadata. It is passed as an argument to Index encoding functions, get function and put function. But it is also returned with additional data from get function call and as the argument in iterator function definition.
type IterateOptions ¶
type IterateOptions struct { // StartFrom is the Item to start the iteration from. StartFrom *Item // If SkipStartFromItem is true, StartFrom item will not // be iterated on. SkipStartFromItem bool // Iterate over items which keys have a common prefix. Prefix []byte // Iterate over items in reverse order. Reverse bool }
IterateOptions defines optional parameters for Iterate function.
type StringField ¶
type StringField struct {
// contains filtered or unexported fields
}
StringField is the most simple field implementation that stores an arbitrary string under a specific LevelDB key.
func (StringField) Get ¶
func (f StringField) Get() (val string, err error)
Get returns a string value from database. If the value is not found, an empty string is returned an no error.
func (StringField) Put ¶
func (f StringField) Put(val string) (err error)
Put stores a string in the database.
func (StringField) PutInBatch ¶
func (f StringField) PutInBatch(batch *leveldb.Batch, val string)
PutInBatch stores a string in a batch that can be saved later in database.
type StructField ¶
type StructField struct {
// contains filtered or unexported fields
}
StructField is a helper to store complex structure by encoding it in RLP format.
func (StructField) Get ¶
func (f StructField) Get(val interface{}) (err error)
Get unmarshals data from the database to a provided val. If the data is not found leveldb.ErrNotFound is returned.
func (StructField) Put ¶
func (f StructField) Put(val interface{}) (err error)
Put marshals provided val and saves it to the database.
func (StructField) PutInBatch ¶
func (f StructField) PutInBatch(batch *leveldb.Batch, val interface{}) (err error)
PutInBatch marshals provided val and puts it into the batch.
type Uint64Field ¶
type Uint64Field struct {
// contains filtered or unexported fields
}
Uint64Field provides a way to have a simple counter in the database. It transparently encodes uint64 type value to bytes.
func (Uint64Field) Dec ¶
func (f Uint64Field) Dec() (val uint64, err error)
Dec decrements a uint64 value in the database. This operation is not goroutine save. The field is protected from overflow to a negative value.
func (Uint64Field) DecInBatch ¶
func (f Uint64Field) DecInBatch(batch *leveldb.Batch) (val uint64, err error)
DecInBatch decrements a uint64 value in the batch by retreiving a value from the database, not the same batch. This operation is not goroutine save. The field is protected from overflow to a negative value.
func (Uint64Field) Get ¶
func (f Uint64Field) Get() (val uint64, err error)
Get retrieves a uint64 value from the database. If the value is not found in the database a 0 value is returned and no error.
func (Uint64Field) Inc ¶
func (f Uint64Field) Inc() (val uint64, err error)
Inc increments a uint64 value in the database. This operation is not goroutine save.
func (Uint64Field) IncInBatch ¶
func (f Uint64Field) IncInBatch(batch *leveldb.Batch) (val uint64, err error)
IncInBatch increments a uint64 value in the batch by retreiving a value from the database, not the same batch. This operation is not goroutine save.
func (Uint64Field) Put ¶
func (f Uint64Field) Put(val uint64) (err error)
Put encodes uin64 value and stores it in the database.
func (Uint64Field) PutInBatch ¶
func (f Uint64Field) PutInBatch(batch *leveldb.Batch, val uint64)
PutInBatch stores a uint64 value in a batch that can be saved later in the database.
type Uint64Vector ¶
type Uint64Vector struct {
// contains filtered or unexported fields
}
Uint64Vector provides a way to have multiple counters in the database. It transparently encodes uint64 type value to bytes.
func (Uint64Vector) Dec ¶
func (f Uint64Vector) Dec(i uint64) (val uint64, err error)
Dec decrements a uint64 value at index i in the database. This operation is not goroutine safe. The field is protected from overflow to a negative value.
func (Uint64Vector) DecInBatch ¶
DecInBatch decrements a uint64 value at index i in the batch by retreiving a value from the database, not the same batch. This operation is not goroutine safe. The field is protected from overflow to a negative value.
func (Uint64Vector) Get ¶
func (f Uint64Vector) Get(i uint64) (val uint64, err error)
Get retrieves a uint64 value at index i from the database. If the value is not found in the database a 0 value is returned and no error.
func (Uint64Vector) Inc ¶
func (f Uint64Vector) Inc(i uint64) (val uint64, err error)
Inc increments a uint64 value in the database. This operation is not goroutine safe.
func (Uint64Vector) IncInBatch ¶
IncInBatch increments a uint64 value at index i in the batch by retreiving a value from the database, not the same batch. This operation is not goroutine safe.
func (Uint64Vector) Put ¶
func (f Uint64Vector) Put(i, val uint64) (err error)
Put encodes uin64 value and stores it in the database.
func (Uint64Vector) PutInBatch ¶
func (f Uint64Vector) PutInBatch(batch *leveldb.Batch, i, val uint64)
PutInBatch stores a uint64 value at index i in a batch that can be saved later in the database.