shed

package
v0.0.0-...-34ee9b9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 24, 2021 License: BSD-3-Clause Imports: 11 Imported by: 0

Documentation

Overview

Package shed provides a simple abstraction components to compose more complex operations on storage data organized in fields and indexes.

Only type which holds logical information about infinity storage chunks data and metadata is Item. This part is not generalized mostly for performance reasons.

Example (Store)

Example_store constructs a simple storage implementation using shed package.

package main

import (
	"bytes"
	"context"
	"encoding/binary"
	"errors"
	"fmt"
	"log"
	"time"

	"github.com/syndtr/goleveldb/leveldb"
	"github.com/yanhuangpai/voyager/pkg/infinity"
	"github.com/yanhuangpai/voyager/pkg/shed"
	"github.com/yanhuangpai/voyager/pkg/storage"
	"github.com/yanhuangpai/voyager/pkg/storage/testing"
)

// Store holds fields and indexes (including their encoding functions)
// and defines operations on them by composing data from them.
// It is just an example without any support for parallel operations
// or real world implementation.
type Store struct {
	db *shed.DB

	// fields and indexes
	schemaName     shed.StringField
	accessCounter  shed.Uint64Field
	retrievalIndex shed.Index
	accessIndex    shed.Index
	gcIndex        shed.Index
}

// New returns new Store. All fields and indexes are initialized
// and possible conflicts with schema from existing database is checked
// automatically.
func New(path string) (s *Store, err error) {
	db, err := shed.NewDB(path, nil)
	if err != nil {
		return nil, err
	}
	s = &Store{
		db: db,
	}
	// Identify current storage schema by arbitrary name.
	s.schemaName, err = db.NewStringField("schema-name")
	if err != nil {
		return nil, err
	}
	// Global ever incrementing index of chunk accesses.
	s.accessCounter, err = db.NewUint64Field("access-counter")
	if err != nil {
		return nil, err
	}
	// Index storing actual chunk address, data and store timestamp.
	s.retrievalIndex, err = db.NewIndex("Address->StoreTimestamp|Data", shed.IndexFuncs{
		EncodeKey: func(fields shed.Item) (key []byte, err error) {
			return fields.Address, nil
		},
		DecodeKey: func(key []byte) (e shed.Item, err error) {
			e.Address = key
			return e, nil
		},
		EncodeValue: func(fields shed.Item) (value []byte, err error) {
			b := make([]byte, 8)
			binary.BigEndian.PutUint64(b, uint64(fields.StoreTimestamp))
			value = append(b, fields.Data...)
			return value, nil
		},
		DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
			e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[:8]))
			e.Data = value[8:]
			return e, nil
		},
	})
	if err != nil {
		return nil, err
	}
	// Index storing access timestamp for a particular address.
	// It is needed in order to update gc index keys for iteration order.
	s.accessIndex, err = db.NewIndex("Address->AccessTimestamp", shed.IndexFuncs{
		EncodeKey: func(fields shed.Item) (key []byte, err error) {
			return fields.Address, nil
		},
		DecodeKey: func(key []byte) (e shed.Item, err error) {
			e.Address = key
			return e, nil
		},
		EncodeValue: func(fields shed.Item) (value []byte, err error) {
			b := make([]byte, 8)
			binary.BigEndian.PutUint64(b, uint64(fields.AccessTimestamp))
			return b, nil
		},
		DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
			e.AccessTimestamp = int64(binary.BigEndian.Uint64(value))
			return e, nil
		},
	})
	if err != nil {
		return nil, err
	}
	// Index with keys ordered by access timestamp for garbage collection prioritization.
	s.gcIndex, err = db.NewIndex("AccessTimestamp|StoredTimestamp|Address->nil", shed.IndexFuncs{
		EncodeKey: func(fields shed.Item) (key []byte, err error) {
			b := make([]byte, 16, 16+len(fields.Address))
			binary.BigEndian.PutUint64(b[:8], uint64(fields.AccessTimestamp))
			binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp))
			key = append(b, fields.Address...)
			return key, nil
		},
		DecodeKey: func(key []byte) (e shed.Item, err error) {
			e.AccessTimestamp = int64(binary.BigEndian.Uint64(key[:8]))
			e.StoreTimestamp = int64(binary.BigEndian.Uint64(key[8:16]))
			e.Address = key[16:]
			return e, nil
		},
		EncodeValue: func(fields shed.Item) (value []byte, err error) {
			return nil, nil
		},
		DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
			return e, nil
		},
	})
	if err != nil {
		return nil, err
	}
	return s, nil
}

// Put stores the chunk and sets it store timestamp.
func (s *Store) Put(_ context.Context, ch infinity.Chunk) (err error) {
	return s.retrievalIndex.Put(shed.Item{
		Address:        ch.Address().Bytes(),
		Data:           ch.Data(),
		StoreTimestamp: time.Now().UTC().UnixNano(),
	})
}

// Get retrieves a chunk with the provided address.
// It updates access and gc indexes by removing the previous
// items from them and adding new items as keys of index entries
// are changed.
func (s *Store) Get(_ context.Context, addr infinity.Address) (c infinity.Chunk, err error) {
	batch := new(leveldb.Batch)

	// Get the chunk data and storage timestamp.
	item, err := s.retrievalIndex.Get(shed.Item{
		Address: addr.Bytes(),
	})
	if err != nil {
		if errors.Is(err, leveldb.ErrNotFound) {
			return nil, storage.ErrNotFound
		}
		return nil, fmt.Errorf("retrieval index get: %w", err)
	}

	// Get the chunk access timestamp.
	accessItem, err := s.accessIndex.Get(shed.Item{
		Address: addr.Bytes(),
	})
	switch {
	case err == nil:
		// Remove gc index entry if access timestamp is found.
		err = s.gcIndex.DeleteInBatch(batch, shed.Item{
			Address:         item.Address,
			StoreTimestamp:  accessItem.AccessTimestamp,
			AccessTimestamp: item.StoreTimestamp,
		})
		if err != nil {
			return nil, fmt.Errorf("gc index delete in batch: %w", err)
		}
	case errors.Is(err, leveldb.ErrNotFound):
		// Access timestamp is not found. Do not do anything.
		// This is the first get request.
	default:
		return nil, fmt.Errorf("access index get: %w", err)
	}

	// Specify new access timestamp
	accessTimestamp := time.Now().UTC().UnixNano()

	// Put new access timestamp in access index.
	err = s.accessIndex.PutInBatch(batch, shed.Item{
		Address:         addr.Bytes(),
		AccessTimestamp: accessTimestamp,
	})
	if err != nil {
		return nil, fmt.Errorf("access index put in batch: %w", err)
	}

	// Put new access timestamp in gc index.
	err = s.gcIndex.PutInBatch(batch, shed.Item{
		Address:         item.Address,
		AccessTimestamp: accessTimestamp,
		StoreTimestamp:  item.StoreTimestamp,
	})
	if err != nil {
		return nil, fmt.Errorf("gc index put in batch: %w", err)
	}

	// Increment access counter.
	// Currently this information is not used anywhere.
	_, err = s.accessCounter.IncInBatch(batch)
	if err != nil {
		return nil, fmt.Errorf("access counter inc in batch: %w", err)
	}

	// Write the batch.
	err = s.db.WriteBatch(batch)
	if err != nil {
		return nil, fmt.Errorf("write batch: %w", err)
	}

	// Return the chunk.
	return infinity.NewChunk(infinity.NewAddress(item.Address), item.Data), nil
}

// CollectGarbage is an example of index iteration.
// It provides no reliable garbage collection functionality.
func (s *Store) CollectGarbage() (err error) {
	const maxTrashSize = 100
	maxRounds := 10 // arbitrary number, needs to be calculated

	// Run a few gc rounds.
	for roundCount := 0; roundCount < maxRounds; roundCount++ {
		var garbageCount int
		// New batch for a new cg round.
		trash := new(leveldb.Batch)
		// Iterate through all index items and break when needed.
		err = s.gcIndex.Iterate(func(item shed.Item) (stop bool, err error) {
			// Remove the chunk.
			err = s.retrievalIndex.DeleteInBatch(trash, item)
			if err != nil {
				return false, err
			}
			// Remove the element in gc index.
			err = s.gcIndex.DeleteInBatch(trash, item)
			if err != nil {
				return false, err
			}
			// Remove the relation in access index.
			err = s.accessIndex.DeleteInBatch(trash, item)
			if err != nil {
				return false, err
			}
			garbageCount++
			if garbageCount >= maxTrashSize {
				return true, nil
			}
			return false, nil
		}, nil)
		if err != nil {
			return err
		}
		if garbageCount == 0 {
			return nil
		}
		err = s.db.WriteBatch(trash)
		if err != nil {
			return err
		}
	}
	return nil
}

// GetSchema is an example of retrieveing the most simple
// string from a database field.
func (s *Store) GetSchema() (name string, err error) {
	name, err = s.schemaName.Get()
	if errors.Is(err, leveldb.ErrNotFound) {
		return "", nil
	}
	return name, err
}

// PutSchema is an example of storing the most simple
// string in a database field.
func (s *Store) PutSchema(name string) (err error) {
	return s.schemaName.Put(name)
}

// Close closes the underlying database.
func (s *Store) Close() error {
	return s.db.Close()
}

// Example_store constructs a simple storage implementation using shed package.
func main() {
	s, err := New("")
	if err != nil {
		log.Fatal(err)
	}
	defer s.Close()

	ch := testing.GenerateTestRandomChunk()
	err = s.Put(context.Background(), ch)
	if err != nil {
		fmt.Println("put chunk:", err)
		return
	}

	got, err := s.Get(context.Background(), ch.Address())
	if err != nil {
		fmt.Println("get chunk:", err)
		return
	}

	fmt.Println(bytes.Equal(got.Data(), ch.Data()))

}
Output:

true

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DB

type DB struct {
	// contains filtered or unexported fields
}

DB provides abstractions over LevelDB in order to implement complex structures using fields and ordered indexes. It provides a schema functionality to store fields and indexes information about naming and types.

func NewDB

func NewDB(path string, o *Options) (db *DB, err error)

NewDB constructs a new DB and validates the schema if it exists in database on the given path. metricsPrefix is used for metrics collection for the given DB.

func (*DB) Close

func (db *DB) Close() (err error)

Close closes LevelDB database.

func (*DB) Delete

func (db *DB) Delete(key []byte) (err error)

Delete wraps LevelDB Delete method to increment metrics counter.

func (*DB) Get

func (db *DB) Get(key []byte) (value []byte, err error)

Get wraps LevelDB Get method to increment metrics counter.

func (*DB) Has

func (db *DB) Has(key []byte) (yes bool, err error)

Has wraps LevelDB Has method to increment metrics counter.

func (*DB) Metrics

func (s *DB) Metrics() []prometheus.Collector

func (*DB) NewIndex

func (db *DB) NewIndex(name string, funcs IndexFuncs) (f Index, err error)

NewIndex returns a new Index instance with defined name and encoding functions. The name must be unique and will be validated on database schema for a key prefix byte.

func (*DB) NewIterator

func (db *DB) NewIterator() iterator.Iterator

NewIterator wraps LevelDB NewIterator method to increment metrics counter.

func (*DB) NewStringField

func (db *DB) NewStringField(name string) (f StringField, err error)

NewStringField retruns a new Instance of StringField. It validates its name and type against the database schema.

func (*DB) NewStructField

func (db *DB) NewStructField(name string) (f StructField, err error)

NewStructField returns a new StructField. It validates its name and type against the database schema.

func (*DB) NewUint64Field

func (db *DB) NewUint64Field(name string) (f Uint64Field, err error)

NewUint64Field returns a new Uint64Field. It validates its name and type against the database schema.

func (*DB) NewUint64Vector

func (db *DB) NewUint64Vector(name string) (f Uint64Vector, err error)

NewUint64Vector returns a new Uint64Vector. It validates its name and type against the database schema.

func (*DB) Put

func (db *DB) Put(key, value []byte) (err error)

Put wraps LevelDB Put method to increment metrics counter.

func (*DB) RenameIndex

func (db *DB) RenameIndex(name, newName string) (renamed bool, err error)

RenameIndex changes the schema so that an existing index name is changed while preserving its data by keeping the same internal key prefix. Renaming indexes is useful when encoding functions can be backward compatible to avoid data migrations.

func (*DB) WriteBatch

func (db *DB) WriteBatch(batch *leveldb.Batch) (err error)

WriteBatch wraps LevelDB Write method to increment metrics counter.

type Index

type Index struct {
	// contains filtered or unexported fields
}

Index represents a set of LevelDB key value pairs that have common prefix. It holds functions for encoding and decoding keys and values to provide transparent actions on saved data which inclide: - getting a particular Item - saving a particular Item - iterating over a sorted LevelDB keys It implements IndexIteratorInterface interface.

func (Index) Count

func (f Index) Count() (count int, err error)

Count returns the number of items in index.

func (Index) CountFrom

func (f Index) CountFrom(start Item) (count int, err error)

CountFrom returns the number of items in index keys starting from the key encoded from the provided Item.

func (Index) Delete

func (f Index) Delete(keyFields Item) (err error)

Delete accepts Item to remove a key/value pair from the database based on its fields.

func (Index) DeleteInBatch

func (f Index) DeleteInBatch(batch *leveldb.Batch, keyFields Item) (err error)

DeleteInBatch is the same as Delete just the operation is performed on the batch instead on the database.

func (Index) Fill

func (f Index) Fill(items []Item) (err error)

Fill populates fields on provided items that are part of the encoded value by getting them based on information passed in their fields. Every item must have all fields needed for encoding the key set. The passed slice items will be changed so that they contain data from the index values. No new slice is allocated. This function uses a single leveldb snapshot.

func (Index) First

func (f Index) First(prefix []byte) (i Item, err error)

First returns the first item in the Index which encoded key starts with a prefix. If the prefix is nil, the first element of the whole index is returned. If Index has no elements, a leveldb.ErrNotFound error is returned.

func (Index) Get

func (f Index) Get(keyFields Item) (out Item, err error)

Get accepts key fields represented as Item to retrieve a value from the index and return maximum available information from the index represented as another Item.

func (Index) Has

func (f Index) Has(keyFields Item) (bool, error)

Has accepts key fields represented as Item to check if there this Item's encoded key is stored in the index.

func (Index) HasMulti

func (f Index) HasMulti(items ...Item) ([]bool, error)

HasMulti accepts multiple multiple key fields represented as Item to check if there this Item's encoded key is stored in the index for each of them.

func (Index) ItemKey

func (f Index) ItemKey(item Item) (key []byte, err error)

ItemKey accepts an Item and returns generated key for it.

func (Index) Iterate

func (f Index) Iterate(fn IndexIterFunc, options *IterateOptions) (err error)

Iterate function iterates over keys of the Index. If IterateOptions is nil, the iterations is over all keys.

func (Index) Last

func (f Index) Last(prefix []byte) (i Item, err error)

Last returns the last item in the Index which encoded key starts with a prefix. If the prefix is nil, the last element of the whole index is returned. If Index has no elements, a leveldb.ErrNotFound error is returned.

func (Index) Put

func (f Index) Put(i Item) (err error)

Put accepts Item to encode information from it and save it to the database.

func (Index) PutInBatch

func (f Index) PutInBatch(batch *leveldb.Batch, i Item) (err error)

PutInBatch is the same as Put method, but it just saves the key/value pair to the batch instead directly to the database.

type IndexFuncs

type IndexFuncs struct {
	EncodeKey   func(fields Item) (key []byte, err error)
	DecodeKey   func(key []byte) (e Item, err error)
	EncodeValue func(fields Item) (value []byte, err error)
	DecodeValue func(keyFields Item, value []byte) (e Item, err error)
}

IndexFuncs structure defines functions for encoding and decoding LevelDB keys and values for a specific index.

type IndexIterFunc

type IndexIterFunc func(item Item) (stop bool, err error)

IndexIterFunc is a callback on every Item that is decoded by iterating on an Index keys. By returning a true for stop variable, iteration will stop, and by returning the error, that error will be propagated to the called iterator method on Index.

type Item

type Item struct {
	Address         []byte
	Data            []byte
	AccessTimestamp int64
	StoreTimestamp  int64
	BinID           uint64
	PinCounter      uint64 // maintains the no of time a chunk is pinned
	Tag             uint32
}

Item holds fields relevant to Infinity Chunk data and metadata. All information required for infinity storage and operations on that storage must be defined here. This structure is logically connected to infinity storage, the only part of this package that is not generalized, mostly for performance reasons.

Item is a type that is used for retrieving, storing and encoding chunk data and metadata. It is passed as an argument to Index encoding functions, get function and put function. But it is also returned with additional data from get function call and as the argument in iterator function definition.

func (Item) Merge

func (i Item) Merge(i2 Item) Item

Merge is a helper method to construct a new Item by filling up fields with default values of a particular Item with values from another one.

type IterateOptions

type IterateOptions struct {
	// StartFrom is the Item to start the iteration from.
	StartFrom *Item
	// If SkipStartFromItem is true, StartFrom item will not
	// be iterated on.
	SkipStartFromItem bool
	// Iterate over items which keys have a common prefix.
	Prefix []byte
	// Iterate over items in reverse order.
	Reverse bool
}

IterateOptions defines optional parameters for Iterate function.

type Options

type Options struct {
	BlockCacheCapacity     uint64
	WriteBufferSize        uint64
	OpenFilesLimit         uint64
	DisableSeeksCompaction bool
}

type StringField

type StringField struct {
	// contains filtered or unexported fields
}

StringField is the most simple field implementation that stores an arbitrary string under a specific LevelDB key.

func (StringField) Get

func (f StringField) Get() (val string, err error)

Get returns a string value from database. If the value is not found, an empty string is returned an no error.

func (StringField) Put

func (f StringField) Put(val string) (err error)

Put stores a string in the database.

func (StringField) PutInBatch

func (f StringField) PutInBatch(batch *leveldb.Batch, val string)

PutInBatch stores a string in a batch that can be saved later in database.

type StructField

type StructField struct {
	// contains filtered or unexported fields
}

StructField is a helper to store complex structure by encoding it in RLP format.

func (StructField) Get

func (f StructField) Get(val interface{}) (err error)

Get unmarshals data from the database to a provided val. If the data is not found leveldb.ErrNotFound is returned.

func (StructField) Put

func (f StructField) Put(val interface{}) (err error)

Put marshals provided val and saves it to the database.

func (StructField) PutInBatch

func (f StructField) PutInBatch(batch *leveldb.Batch, val interface{}) (err error)

PutInBatch marshals provided val and puts it into the batch.

type Uint64Field

type Uint64Field struct {
	// contains filtered or unexported fields
}

Uint64Field provides a way to have a simple counter in the database. It transparently encodes uint64 type value to bytes.

func (Uint64Field) Dec

func (f Uint64Field) Dec() (val uint64, err error)

Dec decrements a uint64 value in the database. This operation is not goroutine save. The field is protected from overflow to a negative value.

func (Uint64Field) DecInBatch

func (f Uint64Field) DecInBatch(batch *leveldb.Batch) (val uint64, err error)

DecInBatch decrements a uint64 value in the batch by retreiving a value from the database, not the same batch. This operation is not goroutine save. The field is protected from overflow to a negative value.

func (Uint64Field) Get

func (f Uint64Field) Get() (val uint64, err error)

Get retrieves a uint64 value from the database. If the value is not found in the database a 0 value is returned and no error.

func (Uint64Field) Inc

func (f Uint64Field) Inc() (val uint64, err error)

Inc increments a uint64 value in the database. This operation is not goroutine save.

func (Uint64Field) IncInBatch

func (f Uint64Field) IncInBatch(batch *leveldb.Batch) (val uint64, err error)

IncInBatch increments a uint64 value in the batch by retreiving a value from the database, not the same batch. This operation is not goroutine save.

func (Uint64Field) Put

func (f Uint64Field) Put(val uint64) (err error)

Put encodes uin64 value and stores it in the database.

func (Uint64Field) PutInBatch

func (f Uint64Field) PutInBatch(batch *leveldb.Batch, val uint64)

PutInBatch stores a uint64 value in a batch that can be saved later in the database.

type Uint64Vector

type Uint64Vector struct {
	// contains filtered or unexported fields
}

Uint64Vector provides a way to have multiple counters in the database. It transparently encodes uint64 type value to bytes.

func (Uint64Vector) Dec

func (f Uint64Vector) Dec(i uint64) (val uint64, err error)

Dec decrements a uint64 value at index i in the database. This operation is not goroutine safe. The field is protected from overflow to a negative value.

func (Uint64Vector) DecInBatch

func (f Uint64Vector) DecInBatch(batch *leveldb.Batch, i uint64) (val uint64, err error)

DecInBatch decrements a uint64 value at index i in the batch by retreiving a value from the database, not the same batch. This operation is not goroutine safe. The field is protected from overflow to a negative value.

func (Uint64Vector) Get

func (f Uint64Vector) Get(i uint64) (val uint64, err error)

Get retrieves a uint64 value at index i from the database. If the value is not found in the database a 0 value is returned and no error.

func (Uint64Vector) Inc

func (f Uint64Vector) Inc(i uint64) (val uint64, err error)

Inc increments a uint64 value in the database. This operation is not goroutine safe.

func (Uint64Vector) IncInBatch

func (f Uint64Vector) IncInBatch(batch *leveldb.Batch, i uint64) (val uint64, err error)

IncInBatch increments a uint64 value at index i in the batch by retreiving a value from the database, not the same batch. This operation is not goroutine safe.

func (Uint64Vector) Put

func (f Uint64Vector) Put(i, val uint64) (err error)

Put encodes uin64 value and stores it in the database.

func (Uint64Vector) PutInBatch

func (f Uint64Vector) PutInBatch(batch *leveldb.Batch, i, val uint64)

PutInBatch stores a uint64 value at index i in a batch that can be saved later in the database.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL