Documentation ¶
Overview ¶
Package shed provides a simple abstraction components to compose more complex operations on storage data organized in fields and indexes.
Only type which holds logical information about swarm storage chunks data and metadata is Item. This part is not generalized mostly for performance reasons.
Example (Store) ¶
Example_store constructs a simple storage implementation using shed package.
package main import ( "bytes" "context" "encoding/binary" "fmt" "io/ioutil" "log" "os" "time" "git.pirl.io/community/pirl/swarm/shed" "git.pirl.io/community/pirl/swarm/storage" "github.com/syndtr/goleveldb/leveldb" ) // Store holds fields and indexes (including their encoding functions) // and defines operations on them by composing data from them. // It implements storage.ChunkStore interface. // It is just an example without any support for parallel operations // or real world implementation. type Store struct { db *shed.DB // fields and indexes schemaName shed.StringField sizeCounter shed.Uint64Field accessCounter shed.Uint64Field retrievalIndex shed.Index accessIndex shed.Index gcIndex shed.Index } // New returns new Store. All fields and indexes are initialized // and possible conflicts with schema from existing database is checked // automatically. func New(path string) (s *Store, err error) { db, err := shed.NewDB(path, "") if err != nil { return nil, err } s = &Store{ db: db, } // Identify current storage schema by arbitrary name. s.schemaName, err = db.NewStringField("schema-name") if err != nil { return nil, err } // Global ever incrementing index of chunk accesses. s.accessCounter, err = db.NewUint64Field("access-counter") if err != nil { return nil, err } // Index storing actual chunk address, data and store timestamp. s.retrievalIndex, err = db.NewIndex("Address->StoreTimestamp|Data", shed.IndexFuncs{ EncodeKey: func(fields shed.Item) (key []byte, err error) { return fields.Address, nil }, DecodeKey: func(key []byte) (e shed.Item, err error) { e.Address = key return e, nil }, EncodeValue: func(fields shed.Item) (value []byte, err error) { b := make([]byte, 8) binary.BigEndian.PutUint64(b, uint64(fields.StoreTimestamp)) value = append(b, fields.Data...) return value, nil }, DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) { e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[:8])) e.Data = value[8:] return e, nil }, }) if err != nil { return nil, err } // Index storing access timestamp for a particular address. // It is needed in order to update gc index keys for iteration order. s.accessIndex, err = db.NewIndex("Address->AccessTimestamp", shed.IndexFuncs{ EncodeKey: func(fields shed.Item) (key []byte, err error) { return fields.Address, nil }, DecodeKey: func(key []byte) (e shed.Item, err error) { e.Address = key return e, nil }, EncodeValue: func(fields shed.Item) (value []byte, err error) { b := make([]byte, 8) binary.BigEndian.PutUint64(b, uint64(fields.AccessTimestamp)) return b, nil }, DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) { e.AccessTimestamp = int64(binary.BigEndian.Uint64(value)) return e, nil }, }) if err != nil { return nil, err } // Index with keys ordered by access timestamp for garbage collection prioritization. s.gcIndex, err = db.NewIndex("AccessTimestamp|StoredTimestamp|Address->nil", shed.IndexFuncs{ EncodeKey: func(fields shed.Item) (key []byte, err error) { b := make([]byte, 16, 16+len(fields.Address)) binary.BigEndian.PutUint64(b[:8], uint64(fields.AccessTimestamp)) binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp)) key = append(b, fields.Address...) return key, nil }, DecodeKey: func(key []byte) (e shed.Item, err error) { e.AccessTimestamp = int64(binary.BigEndian.Uint64(key[:8])) e.StoreTimestamp = int64(binary.BigEndian.Uint64(key[8:16])) e.Address = key[16:] return e, nil }, EncodeValue: func(fields shed.Item) (value []byte, err error) { return nil, nil }, DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) { return e, nil }, }) if err != nil { return nil, err } return s, nil } // Put stores the chunk and sets it store timestamp. func (s *Store) Put(_ context.Context, ch storage.Chunk) (err error) { return s.retrievalIndex.Put(shed.Item{ Address: ch.Address(), Data: ch.Data(), StoreTimestamp: time.Now().UTC().UnixNano(), }) } // Get retrieves a chunk with the provided address. // It updates access and gc indexes by removing the previous // items from them and adding new items as keys of index entries // are changed. func (s *Store) Get(_ context.Context, addr storage.Address) (c storage.Chunk, err error) { batch := new(leveldb.Batch) // Get the chunk data and storage timestamp. item, err := s.retrievalIndex.Get(shed.Item{ Address: addr, }) if err != nil { if err == leveldb.ErrNotFound { return nil, storage.ErrChunkNotFound } return nil, err } // Get the chunk access timestamp. accessItem, err := s.accessIndex.Get(shed.Item{ Address: addr, }) switch err { case nil: // Remove gc index entry if access timestamp is found. err = s.gcIndex.DeleteInBatch(batch, shed.Item{ Address: item.Address, StoreTimestamp: accessItem.AccessTimestamp, AccessTimestamp: item.StoreTimestamp, }) if err != nil { return nil, err } case leveldb.ErrNotFound: // Access timestamp is not found. Do not do anything. // This is the firs get request. default: return nil, err } // Specify new access timestamp accessTimestamp := time.Now().UTC().UnixNano() // Put new access timestamp in access index. err = s.accessIndex.PutInBatch(batch, shed.Item{ Address: addr, AccessTimestamp: accessTimestamp, }) if err != nil { return nil, err } // Put new access timestamp in gc index. err = s.gcIndex.PutInBatch(batch, shed.Item{ Address: item.Address, AccessTimestamp: accessTimestamp, StoreTimestamp: item.StoreTimestamp, }) if err != nil { return nil, err } // Increment access counter. // Currently this information is not used anywhere. _, err = s.accessCounter.IncInBatch(batch) if err != nil { return nil, err } // Write the batch. err = s.db.WriteBatch(batch) if err != nil { return nil, err } // Return the chunk. return storage.NewChunk(item.Address, item.Data), nil } // CollectGarbage is an example of index iteration. // It provides no reliable garbage collection functionality. func (s *Store) CollectGarbage() (err error) { const maxTrashSize = 100 maxRounds := 10 // arbitrary number, needs to be calculated // Run a few gc rounds. for roundCount := 0; roundCount < maxRounds; roundCount++ { var garbageCount int // New batch for a new cg round. trash := new(leveldb.Batch) // Iterate through all index items and break when needed. err = s.gcIndex.Iterate(func(item shed.Item) (stop bool, err error) { // Remove the chunk. err = s.retrievalIndex.DeleteInBatch(trash, item) if err != nil { return false, err } // Remove the element in gc index. err = s.gcIndex.DeleteInBatch(trash, item) if err != nil { return false, err } // Remove the relation in access index. err = s.accessIndex.DeleteInBatch(trash, item) if err != nil { return false, err } garbageCount++ if garbageCount >= maxTrashSize { return true, nil } return false, nil }, nil) if err != nil { return err } if garbageCount == 0 { return nil } err = s.db.WriteBatch(trash) if err != nil { return err } } return nil } // GetSchema is an example of retrieveing the most simple // string from a database field. func (s *Store) GetSchema() (name string, err error) { name, err = s.schemaName.Get() if err == leveldb.ErrNotFound { return "", nil } return name, err } // GetSchema is an example of storing the most simple // string in a database field. func (s *Store) PutSchema(name string) (err error) { return s.schemaName.Put(name) } // Close closes the underlying database. func (s *Store) Close() error { return s.db.Close() } // Example_store constructs a simple storage implementation using shed package. func main() { dir, err := ioutil.TempDir("", "ephemeral") if err != nil { log.Fatal(err) } defer os.RemoveAll(dir) s, err := New(dir) if err != nil { log.Fatal(err) } defer s.Close() ch := storage.GenerateRandomChunk(1024) err = s.Put(context.Background(), ch) if err != nil { log.Fatal(err) } got, err := s.Get(context.Background(), ch.Address()) if err != nil { log.Fatal(err) } fmt.Println(bytes.Equal(got.Data(), ch.Data())) }
Output: true
Index ¶
- type DB
- func (db *DB) Close() (err error)
- func (db *DB) Delete(key []byte) (err error)
- func (db *DB) Get(key []byte) (value []byte, err error)
- func (db *DB) NewIndex(name string, funcs IndexFuncs) (f Index, err error)
- func (db *DB) NewIterator() iterator.Iterator
- func (db *DB) NewStringField(name string) (f StringField, err error)
- func (db *DB) NewStructField(name string) (f StructField, err error)
- func (db *DB) NewUint64Field(name string) (f Uint64Field, err error)
- func (db *DB) Put(key []byte, value []byte) (err error)
- func (db *DB) WriteBatch(batch *leveldb.Batch) (err error)
- type Index
- func (f Index) Count() (count int, err error)
- func (f Index) CountFrom(start Item) (count int, err error)
- func (f Index) Delete(keyFields Item) (err error)
- func (f Index) DeleteInBatch(batch *leveldb.Batch, keyFields Item) (err error)
- func (f Index) Get(keyFields Item) (out Item, err error)
- func (f Index) Iterate(fn IndexIterFunc, options *IterateOptions) (err error)
- func (f Index) Put(i Item) (err error)
- func (f Index) PutInBatch(batch *leveldb.Batch, i Item) (err error)
- type IndexFuncs
- type IndexIterFunc
- type Item
- type IterateOptions
- type StringField
- type StructField
- type Uint64Field
- func (f Uint64Field) Dec() (val uint64, err error)
- func (f Uint64Field) DecInBatch(batch *leveldb.Batch) (val uint64, err error)
- func (f Uint64Field) Get() (val uint64, err error)
- func (f Uint64Field) Inc() (val uint64, err error)
- func (f Uint64Field) IncInBatch(batch *leveldb.Batch) (val uint64, err error)
- func (f Uint64Field) Put(val uint64) (err error)
- func (f Uint64Field) PutInBatch(batch *leveldb.Batch, val uint64)
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DB ¶
type DB struct {
// contains filtered or unexported fields
}
DB provides abstractions over LevelDB in order to implement complex structures using fields and ordered indexes. It provides a schema functionality to store fields and indexes information about naming and types.
func NewDB ¶
NewDB constructs a new DB and validates the schema if it exists in database on the given path. metricsPrefix is used for metrics collection for the given DB.
func (*DB) NewIndex ¶
func (db *DB) NewIndex(name string, funcs IndexFuncs) (f Index, err error)
NewIndex returns a new Index instance with defined name and encoding functions. The name must be unique and will be validated on database schema for a key prefix byte.
func (*DB) NewIterator ¶
NewIterator wraps LevelDB NewIterator method to increment metrics counter.
func (*DB) NewStringField ¶
func (db *DB) NewStringField(name string) (f StringField, err error)
NewStringField retruns a new Instance of StringField. It validates its name and type against the database schema.
func (*DB) NewStructField ¶
func (db *DB) NewStructField(name string) (f StructField, err error)
NewStructField returns a new StructField. It validates its name and type against the database schema.
func (*DB) NewUint64Field ¶
func (db *DB) NewUint64Field(name string) (f Uint64Field, err error)
NewUint64Field returns a new Uint64Field. It validates its name and type against the database schema.
type Index ¶
type Index struct {
// contains filtered or unexported fields
}
Index represents a set of LevelDB key value pairs that have common prefix. It holds functions for encoding and decoding keys and values to provide transparent actions on saved data which inclide: - getting a particular Item - saving a particular Item - iterating over a sorted LevelDB keys It implements IndexIteratorInterface interface.
func (Index) CountFrom ¶
CountFrom returns the number of items in index keys starting from the key encoded from the provided Item.
func (Index) Delete ¶
Delete accepts Item to remove a key/value pair from the database based on its fields.
func (Index) DeleteInBatch ¶
DeleteInBatch is the same as Delete just the operation is performed on the batch instead on the database.
func (Index) Get ¶
Get accepts key fields represented as Item to retrieve a value from the index and return maximum available information from the index represented as another Item.
func (Index) Iterate ¶
func (f Index) Iterate(fn IndexIterFunc, options *IterateOptions) (err error)
Iterate function iterates over keys of the Index. If IterateOptions is nil, the iterations is over all keys.
type IndexFuncs ¶
type IndexFuncs struct { EncodeKey func(fields Item) (key []byte, err error) DecodeKey func(key []byte) (e Item, err error) EncodeValue func(fields Item) (value []byte, err error) DecodeValue func(keyFields Item, value []byte) (e Item, err error) }
IndexFuncs structure defines functions for encoding and decoding LevelDB keys and values for a specific index.
type IndexIterFunc ¶
IndexIterFunc is a callback on every Item that is decoded by iterating on an Index keys. By returning a true for stop variable, iteration will stop, and by returning the error, that error will be propagated to the called iterator method on Index.
type Item ¶
type Item struct { Address []byte Data []byte AccessTimestamp int64 StoreTimestamp int64 // UseMockStore is a pointer to identify // an unset state of the field in Join function. UseMockStore *bool }
Item holds fields relevant to Swarm Chunk data and metadata. All information required for swarm storage and operations on that storage must be defined here. This structure is logically connected to swarm storage, the only part of this package that is not generalized, mostly for performance reasons.
Item is a type that is used for retrieving, storing and encoding chunk data and metadata. It is passed as an argument to Index encoding functions, get function and put function. But it is also returned with additional data from get function call and as the argument in iterator function definition.
type IterateOptions ¶
type IterateOptions struct { // StartFrom is the Item to start the iteration from. StartFrom *Item // If SkipStartFromItem is true, StartFrom item will not // be iterated on. SkipStartFromItem bool // Iterate over items which keys have a common prefix. Prefix []byte }
IterateOptions defines optional parameters for Iterate function.
type StringField ¶
type StringField struct {
// contains filtered or unexported fields
}
StringField is the most simple field implementation that stores an arbitrary string under a specific LevelDB key.
func (StringField) Get ¶
func (f StringField) Get() (val string, err error)
Get returns a string value from database. If the value is not found, an empty string is returned an no error.
func (StringField) Put ¶
func (f StringField) Put(val string) (err error)
Put stores a string in the database.
func (StringField) PutInBatch ¶
func (f StringField) PutInBatch(batch *leveldb.Batch, val string)
PutInBatch stores a string in a batch that can be saved later in database.
type StructField ¶
type StructField struct {
// contains filtered or unexported fields
}
StructField is a helper to store complex structure by encoding it in RLP format.
func (StructField) Get ¶
func (f StructField) Get(val interface{}) (err error)
Get unmarshals data from the database to a provided val. If the data is not found leveldb.ErrNotFound is returned.
func (StructField) Put ¶
func (f StructField) Put(val interface{}) (err error)
Put marshals provided val and saves it to the database.
func (StructField) PutInBatch ¶
func (f StructField) PutInBatch(batch *leveldb.Batch, val interface{}) (err error)
PutInBatch marshals provided val and puts it into the batch.
type Uint64Field ¶
type Uint64Field struct {
// contains filtered or unexported fields
}
Uint64Field provides a way to have a simple counter in the database. It transparently encodes uint64 type value to bytes.
func (Uint64Field) Dec ¶
func (f Uint64Field) Dec() (val uint64, err error)
Dec decrements a uint64 value in the database. This operation is not goroutine save. The field is protected from overflow to a negative value.
func (Uint64Field) DecInBatch ¶
func (f Uint64Field) DecInBatch(batch *leveldb.Batch) (val uint64, err error)
DecInBatch decrements a uint64 value in the batch by retreiving a value from the database, not the same batch. This operation is not goroutine save. The field is protected from overflow to a negative value.
func (Uint64Field) Get ¶
func (f Uint64Field) Get() (val uint64, err error)
Get retrieves a uint64 value from the database. If the value is not found in the database a 0 value is returned and no error.
func (Uint64Field) Inc ¶
func (f Uint64Field) Inc() (val uint64, err error)
Inc increments a uint64 value in the database. This operation is not goroutine save.
func (Uint64Field) IncInBatch ¶
func (f Uint64Field) IncInBatch(batch *leveldb.Batch) (val uint64, err error)
IncInBatch increments a uint64 value in the batch by retreiving a value from the database, not the same batch. This operation is not goroutine save.
func (Uint64Field) Put ¶
func (f Uint64Field) Put(val uint64) (err error)
Put encodes uin64 value and stores it in the database.
func (Uint64Field) PutInBatch ¶
func (f Uint64Field) PutInBatch(batch *leveldb.Batch, val uint64)
PutInBatch stores a uint64 value in a batch that can be saved later in the database.