shed

package
v0.0.0-...-0e3e10b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 26, 2020 License: GPL-3.0 Imports: 14 Imported by: 0

Documentation

Overview

Package shed provides a simple abstraction components to compose more complex operations on storage data organized in fields and indexes.

Only type which holds logical information about swarm storage chunks data and metadata is Item. This part is not generalized mostly for performance reasons.

Example (Store)

Example_store constructs a simple storage implementation using shed package.

package main

import (
	"bytes"
	"context"
	"encoding/binary"
	"fmt"
	"io/ioutil"
	"log"
	"os"
	"time"

	"github.com/ether-ark/etherark/swarm/shed"
	"github.com/ether-ark/etherark/swarm/storage"
	"github.com/syndtr/goleveldb/leveldb"
)

// Store holds fields and indexes (including their encoding functions)
// and defines operations on them by composing data from them.
// It implements storage.ChunkStore interface.
// It is just an example without any support for parallel operations
// or real world implementation.
type Store struct {
	db *shed.DB

	// fields and indexes
	schemaName     shed.StringField
	sizeCounter    shed.Uint64Field
	accessCounter  shed.Uint64Field
	retrievalIndex shed.Index
	accessIndex    shed.Index
	gcIndex        shed.Index
}

// New returns new Store. All fields and indexes are initialized
// and possible conflicts with schema from existing database is checked
// automatically.
func New(path string) (s *Store, err error) {
	db, err := shed.NewDB(path, "")
	if err != nil {
		return nil, err
	}
	s = &Store{
		db: db,
	}
	// Identify current storage schema by arbitrary name.
	s.schemaName, err = db.NewStringField("schema-name")
	if err != nil {
		return nil, err
	}
	// Global ever incrementing index of chunk accesses.
	s.accessCounter, err = db.NewUint64Field("access-counter")
	if err != nil {
		return nil, err
	}
	// Index storing actual chunk address, data and store timestamp.
	s.retrievalIndex, err = db.NewIndex("Address->StoreTimestamp|Data", shed.IndexFuncs{
		EncodeKey: func(fields shed.Item) (key []byte, err error) {
			return fields.Address, nil
		},
		DecodeKey: func(key []byte) (e shed.Item, err error) {
			e.Address = key
			return e, nil
		},
		EncodeValue: func(fields shed.Item) (value []byte, err error) {
			b := make([]byte, 8)
			binary.BigEndian.PutUint64(b, uint64(fields.StoreTimestamp))
			value = append(b, fields.Data...)
			return value, nil
		},
		DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
			e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[:8]))
			e.Data = value[8:]
			return e, nil
		},
	})
	if err != nil {
		return nil, err
	}
	// Index storing access timestamp for a particular address.
	// It is needed in order to update gc index keys for iteration order.
	s.accessIndex, err = db.NewIndex("Address->AccessTimestamp", shed.IndexFuncs{
		EncodeKey: func(fields shed.Item) (key []byte, err error) {
			return fields.Address, nil
		},
		DecodeKey: func(key []byte) (e shed.Item, err error) {
			e.Address = key
			return e, nil
		},
		EncodeValue: func(fields shed.Item) (value []byte, err error) {
			b := make([]byte, 8)
			binary.BigEndian.PutUint64(b, uint64(fields.AccessTimestamp))
			return b, nil
		},
		DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
			e.AccessTimestamp = int64(binary.BigEndian.Uint64(value))
			return e, nil
		},
	})
	if err != nil {
		return nil, err
	}
	// Index with keys ordered by access timestamp for garbage collection prioritization.
	s.gcIndex, err = db.NewIndex("AccessTimestamp|StoredTimestamp|Address->nil", shed.IndexFuncs{
		EncodeKey: func(fields shed.Item) (key []byte, err error) {
			b := make([]byte, 16, 16+len(fields.Address))
			binary.BigEndian.PutUint64(b[:8], uint64(fields.AccessTimestamp))
			binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp))
			key = append(b, fields.Address...)
			return key, nil
		},
		DecodeKey: func(key []byte) (e shed.Item, err error) {
			e.AccessTimestamp = int64(binary.BigEndian.Uint64(key[:8]))
			e.StoreTimestamp = int64(binary.BigEndian.Uint64(key[8:16]))
			e.Address = key[16:]
			return e, nil
		},
		EncodeValue: func(fields shed.Item) (value []byte, err error) {
			return nil, nil
		},
		DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
			return e, nil
		},
	})
	if err != nil {
		return nil, err
	}
	return s, nil
}

// Put stores the chunk and sets it store timestamp.
func (s *Store) Put(_ context.Context, ch storage.Chunk) (err error) {
	return s.retrievalIndex.Put(shed.Item{
		Address:        ch.Address(),
		Data:           ch.Data(),
		StoreTimestamp: time.Now().UTC().UnixNano(),
	})
}

// Get retrieves a chunk with the provided address.
// It updates access and gc indexes by removing the previous
// items from them and adding new items as keys of index entries
// are changed.
func (s *Store) Get(_ context.Context, addr storage.Address) (c storage.Chunk, err error) {
	batch := new(leveldb.Batch)

	// Get the chunk data and storage timestamp.
	item, err := s.retrievalIndex.Get(shed.Item{
		Address: addr,
	})
	if err != nil {
		if err == leveldb.ErrNotFound {
			return nil, storage.ErrChunkNotFound
		}
		return nil, err
	}

	// Get the chunk access timestamp.
	accessItem, err := s.accessIndex.Get(shed.Item{
		Address: addr,
	})
	switch err {
	case nil:
		// Remove gc index entry if access timestamp is found.
		err = s.gcIndex.DeleteInBatch(batch, shed.Item{
			Address:         item.Address,
			StoreTimestamp:  accessItem.AccessTimestamp,
			AccessTimestamp: item.StoreTimestamp,
		})
		if err != nil {
			return nil, err
		}
	case leveldb.ErrNotFound:
	// Access timestamp is not found. Do not do anything.
	// This is the firs get request.
	default:
		return nil, err
	}

	// Specify new access timestamp
	accessTimestamp := time.Now().UTC().UnixNano()

	// Put new access timestamp in access index.
	err = s.accessIndex.PutInBatch(batch, shed.Item{
		Address:         addr,
		AccessTimestamp: accessTimestamp,
	})
	if err != nil {
		return nil, err
	}

	// Put new access timestamp in gc index.
	err = s.gcIndex.PutInBatch(batch, shed.Item{
		Address:         item.Address,
		AccessTimestamp: accessTimestamp,
		StoreTimestamp:  item.StoreTimestamp,
	})
	if err != nil {
		return nil, err
	}

	// Increment access counter.
	// Currently this information is not used anywhere.
	_, err = s.accessCounter.IncInBatch(batch)
	if err != nil {
		return nil, err
	}

	// Write the batch.
	err = s.db.WriteBatch(batch)
	if err != nil {
		return nil, err
	}

	// Return the chunk.
	return storage.NewChunk(item.Address, item.Data), nil
}

// CollectGarbage is an example of index iteration.
// It provides no reliable garbage collection functionality.
func (s *Store) CollectGarbage() (err error) {
	const maxTrashSize = 100
	maxRounds := 10 // arbitrary number, needs to be calculated

	// Run a few gc rounds.
	for roundCount := 0; roundCount < maxRounds; roundCount++ {
		var garbageCount int
		// New batch for a new cg round.
		trash := new(leveldb.Batch)
		// Iterate through all index items and break when needed.
		err = s.gcIndex.Iterate(func(item shed.Item) (stop bool, err error) {
			// Remove the chunk.
			err = s.retrievalIndex.DeleteInBatch(trash, item)
			if err != nil {
				return false, err
			}
			// Remove the element in gc index.
			err = s.gcIndex.DeleteInBatch(trash, item)
			if err != nil {
				return false, err
			}
			// Remove the relation in access index.
			err = s.accessIndex.DeleteInBatch(trash, item)
			if err != nil {
				return false, err
			}
			garbageCount++
			if garbageCount >= maxTrashSize {
				return true, nil
			}
			return false, nil
		}, nil)
		if err != nil {
			return err
		}
		if garbageCount == 0 {
			return nil
		}
		err = s.db.WriteBatch(trash)
		if err != nil {
			return err
		}
	}
	return nil
}

// GetSchema is an example of retrieveing the most simple
// string from a database field.
func (s *Store) GetSchema() (name string, err error) {
	name, err = s.schemaName.Get()
	if err == leveldb.ErrNotFound {
		return "", nil
	}
	return name, err
}

// GetSchema is an example of storing the most simple
// string in a database field.
func (s *Store) PutSchema(name string) (err error) {
	return s.schemaName.Put(name)
}

// Close closes the underlying database.
func (s *Store) Close() error {
	return s.db.Close()
}

// Example_store constructs a simple storage implementation using shed package.
func main() {
	dir, err := ioutil.TempDir("", "ephemeral")
	if err != nil {
		log.Fatal(err)
	}
	defer os.RemoveAll(dir)

	s, err := New(dir)
	if err != nil {
		log.Fatal(err)
	}
	defer s.Close()

	ch := storage.GenerateRandomChunk(1024)
	err = s.Put(context.Background(), ch)
	if err != nil {
		log.Fatal(err)
	}

	got, err := s.Get(context.Background(), ch.Address())
	if err != nil {
		log.Fatal(err)
	}

	fmt.Println(bytes.Equal(got.Data(), ch.Data()))

}
Output:

true

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DB

type DB struct {
	// contains filtered or unexported fields
}

DB provides abstractions over LevelDB in order to implement complex structures using fields and ordered indexes. It provides a schema functionality to store fields and indexes information about naming and types.

func NewDB

func NewDB(path string, metricsPrefix string) (db *DB, err error)

NewDB constructs a new DB and validates the schema if it exists in database on the given path. metricsPrefix is used for metrics collection for the given DB.

func (*DB) Close

func (db *DB) Close() (err error)

Close closes LevelDB database.

func (*DB) Delete

func (db *DB) Delete(key []byte) (err error)

Delete wraps LevelDB Delete method to increment metrics counter.

func (*DB) Get

func (db *DB) Get(key []byte) (value []byte, err error)

Get wraps LevelDB Get method to increment metrics counter.

func (*DB) NewIndex

func (db *DB) NewIndex(name string, funcs IndexFuncs) (f Index, err error)

NewIndex returns a new Index instance with defined name and encoding functions. The name must be unique and will be validated on database schema for a key prefix byte.

func (*DB) NewIterator

func (db *DB) NewIterator() iterator.Iterator

NewIterator wraps LevelDB NewIterator method to increment metrics counter.

func (*DB) NewStringField

func (db *DB) NewStringField(name string) (f StringField, err error)

NewStringField retruns a new Instance of StringField. It validates its name and type against the database schema.

func (*DB) NewStructField

func (db *DB) NewStructField(name string) (f StructField, err error)

NewStructField returns a new StructField. It validates its name and type against the database schema.

func (*DB) NewUint64Field

func (db *DB) NewUint64Field(name string) (f Uint64Field, err error)

NewUint64Field returns a new Uint64Field. It validates its name and type against the database schema.

func (*DB) Put

func (db *DB) Put(key []byte, value []byte) (err error)

Put wraps LevelDB Put method to increment metrics counter.

func (*DB) WriteBatch

func (db *DB) WriteBatch(batch *leveldb.Batch) (err error)

WriteBatch wraps LevelDB Write method to increment metrics counter.

type Index

type Index struct {
	// contains filtered or unexported fields
}

Index represents a set of LevelDB key value pairs that have common prefix. It holds functions for encoding and decoding keys and values to provide transparent actions on saved data which inclide: - getting a particular Item - saving a particular Item - iterating over a sorted LevelDB keys It implements IndexIteratorInterface interface.

func (Index) Count

func (f Index) Count() (count int, err error)

Count returns the number of items in index.

func (Index) CountFrom

func (f Index) CountFrom(start Item) (count int, err error)

CountFrom returns the number of items in index keys starting from the key encoded from the provided Item.

func (Index) Delete

func (f Index) Delete(keyFields Item) (err error)

Delete accepts Item to remove a key/value pair from the database based on its fields.

func (Index) DeleteInBatch

func (f Index) DeleteInBatch(batch *leveldb.Batch, keyFields Item) (err error)

DeleteInBatch is the same as Delete just the operation is performed on the batch instead on the database.

func (Index) Get

func (f Index) Get(keyFields Item) (out Item, err error)

Get accepts key fields represented as Item to retrieve a value from the index and return maximum available information from the index represented as another Item.

func (Index) Iterate

func (f Index) Iterate(fn IndexIterFunc, options *IterateOptions) (err error)

Iterate function iterates over keys of the Index. If IterateOptions is nil, the iterations is over all keys.

func (Index) Put

func (f Index) Put(i Item) (err error)

Put accepts Item to encode information from it and save it to the database.

func (Index) PutInBatch

func (f Index) PutInBatch(batch *leveldb.Batch, i Item) (err error)

PutInBatch is the same as Put method, but it just saves the key/value pair to the batch instead directly to the database.

type IndexFuncs

type IndexFuncs struct {
	EncodeKey   func(fields Item) (key []byte, err error)
	DecodeKey   func(key []byte) (e Item, err error)
	EncodeValue func(fields Item) (value []byte, err error)
	DecodeValue func(keyFields Item, value []byte) (e Item, err error)
}

IndexFuncs structure defines functions for encoding and decoding LevelDB keys and values for a specific index.

type IndexIterFunc

type IndexIterFunc func(item Item) (stop bool, err error)

IndexIterFunc is a callback on every Item that is decoded by iterating on an Index keys. By returning a true for stop variable, iteration will stop, and by returning the error, that error will be propagated to the called iterator method on Index.

type Item

type Item struct {
	Address         []byte
	Data            []byte
	AccessTimestamp int64
	StoreTimestamp  int64
	// UseMockStore is a pointer to identify
	// an unset state of the field in Join function.
	UseMockStore *bool
}

Item holds fields relevant to Swarm Chunk data and metadata. All information required for swarm storage and operations on that storage must be defined here. This structure is logically connected to swarm storage, the only part of this package that is not generalized, mostly for performance reasons.

Item is a type that is used for retrieving, storing and encoding chunk data and metadata. It is passed as an argument to Index encoding functions, get function and put function. But it is also returned with additional data from get function call and as the argument in iterator function definition.

func (Item) Merge

func (i Item) Merge(i2 Item) (new Item)

Merge is a helper method to construct a new Item by filling up fields with default values of a particular Item with values from another one.

type IterateOptions

type IterateOptions struct {
	// StartFrom is the Item to start the iteration from.
	StartFrom *Item
	// If SkipStartFromItem is true, StartFrom item will not
	// be iterated on.
	SkipStartFromItem bool
	// Iterate over items which keys have a common prefix.
	Prefix []byte
}

IterateOptions defines optional parameters for Iterate function.

type StringField

type StringField struct {
	// contains filtered or unexported fields
}

StringField is the most simple field implementation that stores an arbitrary string under a specific LevelDB key.

func (StringField) Get

func (f StringField) Get() (val string, err error)

Get returns a string value from database. If the value is not found, an empty string is returned an no error.

func (StringField) Put

func (f StringField) Put(val string) (err error)

Put stores a string in the database.

func (StringField) PutInBatch

func (f StringField) PutInBatch(batch *leveldb.Batch, val string)

PutInBatch stores a string in a batch that can be saved later in database.

type StructField

type StructField struct {
	// contains filtered or unexported fields
}

StructField is a helper to store complex structure by encoding it in RLP format.

func (StructField) Get

func (f StructField) Get(val interface{}) (err error)

Get unmarshals data from the database to a provided val. If the data is not found leveldb.ErrNotFound is returned.

func (StructField) Put

func (f StructField) Put(val interface{}) (err error)

Put marshals provided val and saves it to the database.

func (StructField) PutInBatch

func (f StructField) PutInBatch(batch *leveldb.Batch, val interface{}) (err error)

PutInBatch marshals provided val and puts it into the batch.

type Uint64Field

type Uint64Field struct {
	// contains filtered or unexported fields
}

Uint64Field provides a way to have a simple counter in the database. It transparently encodes uint64 type value to bytes.

func (Uint64Field) Dec

func (f Uint64Field) Dec() (val uint64, err error)

Dec decrements a uint64 value in the database. This operation is not goroutine save. The field is protected from overflow to a negative value.

func (Uint64Field) DecInBatch

func (f Uint64Field) DecInBatch(batch *leveldb.Batch) (val uint64, err error)

DecInBatch decrements a uint64 value in the batch by retreiving a value from the database, not the same batch. This operation is not goroutine save. The field is protected from overflow to a negative value.

func (Uint64Field) Get

func (f Uint64Field) Get() (val uint64, err error)

Get retrieves a uint64 value from the database. If the value is not found in the database a 0 value is returned and no error.

func (Uint64Field) Inc

func (f Uint64Field) Inc() (val uint64, err error)

Inc increments a uint64 value in the database. This operation is not goroutine save.

func (Uint64Field) IncInBatch

func (f Uint64Field) IncInBatch(batch *leveldb.Batch) (val uint64, err error)

IncInBatch increments a uint64 value in the batch by retreiving a value from the database, not the same batch. This operation is not goroutine save.

func (Uint64Field) Put

func (f Uint64Field) Put(val uint64) (err error)

Put encodes uin64 value and stores it in the database.

func (Uint64Field) PutInBatch

func (f Uint64Field) PutInBatch(batch *leveldb.Batch, val uint64)

PutInBatch stores a uint64 value in a batch that can be saved later in the database.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL