storage

package
v0.6.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 30, 2018 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Overview

Package storage contains ledger storage implementation on top of BadgerDB engine.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNotFound returns if record/index not found in storage.
	ErrNotFound = errors.New("storage object not found")

	// ErrConflictRetriesOver is returned if Update transaction fails on all retry attempts.
	ErrConflictRetriesOver = errors.New("transaction conflict retries limit exceeded")

	// ErrConflict is the alias for badger.ErrConflict.
	ErrConflict = badger.ErrConflict

	// ErrOverride is returned if SetRecord tries update existing record
	ErrOverride = errors.New("records override is forbidden")
)
View Source
var ErrReplicatorDone = errors.New("no more items in iterator")

ErrReplicatorDone is returned by an Replicator NextRecords method when the iteration is complete.

Functions

This section is empty.

Types

type DB added in v0.0.6

type DB struct {
	PlatformCryptographyScheme core.PlatformCryptographyScheme `inject:""`
	// contains filtered or unexported fields
}

DB represents BadgerDB storage implementation.

func NewDB added in v0.0.6

func NewDB(conf configuration.Ledger, opts *badger.Options) (*DB, error)

NewDB returns storage.DB with BadgerDB instance initialized by opts. Creates database in provided dir or in current directory if dir parameter is empty.

func (*DB) AddPulse added in v0.6.0

func (db *DB) AddPulse(ctx context.Context, pulse core.Pulse) error

AddPulse saves new pulse data and updates index.

func (*DB) BeginTransaction added in v0.0.6

func (db *DB) BeginTransaction(update bool) *TransactionManager

BeginTransaction opens a new transaction. All methods called on returned transaction manager will persist changes only after success on "Commit" call.

func (*DB) Close added in v0.0.6

func (db *DB) Close() error

Close wraps BadgerDB Close method.

From https://godoc.org/github.com/dgraph-io/badger#DB.Close: «It's crucial to call it to ensure all the pending updates make their way to disk. Calling DB.Close() multiple times is not safe and wouldcause panic.»

func (*DB) CreateDrop added in v0.6.0

func (db *DB) CreateDrop(ctx context.Context, pulse core.PulseNumber, prevHash []byte) (
	*jetdrop.JetDrop,
	[][]byte,
	error,
)

CreateDrop creates and stores jet drop for given pulse number.

Previous JetDrop hash should be provided. On success returns saved drop and slot records.

func (*DB) GenesisRef added in v0.6.0

func (db *DB) GenesisRef() *core.RecordRef

GenesisRef returns the genesis record reference.

Genesis record is the parent for all top-level records.

func (*DB) GetActiveNodes added in v0.6.3

func (db *DB) GetActiveNodes(pulse core.PulseNumber) ([]core.Node, error)

GetActiveNodes return active nodes for specified pulse.

func (*DB) GetBadgerDB added in v0.6.1

func (db *DB) GetBadgerDB() *badger.DB

GetBadgerDB return badger.DB instance (for internal usage, like tests)

func (*DB) GetBlob added in v0.6.2

func (db *DB) GetBlob(ctx context.Context, id *core.RecordID) ([]byte, error)

GetBlob returns binary value stored by record ID.

func (*DB) GetDrop added in v0.0.6

func (db *DB) GetDrop(ctx context.Context, pulse core.PulseNumber) (*jetdrop.JetDrop, error)

GetDrop returns jet drop for a given pulse number.

func (*DB) GetHeavySyncedPulse added in v0.6.3

func (db *DB) GetHeavySyncedPulse(ctx context.Context) (pn core.PulseNumber, err error)

GetHeavySyncedPulse returns last successfuly synced pulse number on heavy node.

func (*DB) GetLastPulseAsLightMaterial added in v0.6.3

func (db *DB) GetLastPulseAsLightMaterial(ctx context.Context) (core.PulseNumber, error)

GetLastPulseAsLightMaterial returns last pulse then node had a 'light material' role.

func (*DB) GetLatestPulseNumber added in v0.6.0

func (db *DB) GetLatestPulseNumber(ctx context.Context) (core.PulseNumber, error)

GetLatestPulseNumber returns current pulse number.

func (*DB) GetLocalData added in v0.6.3

func (db *DB) GetLocalData(ctx context.Context, pulse core.PulseNumber, key []byte) ([]byte, error)

GetLocalData retrieves data from storage.

func (*DB) GetObjectIndex added in v0.0.6

func (db *DB) GetObjectIndex(
	ctx context.Context,
	id *core.RecordID,
	forupdate bool,
) (*index.ObjectLifeline, error)

GetObjectIndex wraps matching transaction manager method.

func (*DB) GetPulse added in v0.6.0

func (db *DB) GetPulse(ctx context.Context, num core.PulseNumber) (*Pulse, error)

GetPulse returns pulse for provided pulse number.

func (*DB) GetRecord added in v0.0.6

func (db *DB) GetRecord(ctx context.Context, id *core.RecordID) (record.Record, error)

GetRecord wraps matching transaction manager method.

func (*DB) GetReplicatedPulse added in v0.6.3

func (db *DB) GetReplicatedPulse(ctx context.Context) (core.PulseNumber, error)

GetReplicatedPulse returns last pulse succesfully replicated to 'heavy material' node.

func (*DB) Init added in v0.6.3

func (db *DB) Init(ctx context.Context) error

Init creates initial records in storage.

func (*DB) IterateLocalData added in v0.6.3

func (db *DB) IterateLocalData(
	ctx context.Context,
	pulse core.PulseNumber,
	prefix []byte,
	handler func(k, v []byte) error,
) error

IterateLocalData iterates over all record with specified prefix and calls handler with key and value of that record.

The key will be returned without prefix (e.g. the remaining slice) and value will be returned as it was saved.

func (*DB) IterateRecords added in v0.6.3

func (db *DB) IterateRecords(
	ctx context.Context,
	pulse core.PulseNumber,
	handler func(id core.RecordID, rec record.Record) error,
) error

IterateRecords iterates over records.

func (*DB) SetActiveNodes added in v0.6.3

func (db *DB) SetActiveNodes(pulse core.PulseNumber, nodes []core.Node) error

SetActiveNodes saves active nodes for pulse in memory.

func (*DB) SetBlob added in v0.6.2

func (db *DB) SetBlob(ctx context.Context, pulseNumber core.PulseNumber, blob []byte) (*core.RecordID, error)

SetBlob saves binary value for provided pulse.

func (*DB) SetDrop added in v0.0.6

func (db *DB) SetDrop(ctx context.Context, drop *jetdrop.JetDrop) error

SetDrop saves provided JetDrop in db.

func (*DB) SetHeavySyncedPulse added in v0.6.3

func (db *DB) SetHeavySyncedPulse(ctx context.Context, pulsenum core.PulseNumber) error

SetHeavySyncedPulse saves last successfuly synced pulse number on heavy node.

func (*DB) SetLastPulseAsLightMaterial added in v0.6.3

func (db *DB) SetLastPulseAsLightMaterial(ctx context.Context, pulsenum core.PulseNumber) error

SetLastPulseAsLightMaterial saves last pulse then node had a 'light material' role.

func (*DB) SetLocalData added in v0.6.3

func (db *DB) SetLocalData(ctx context.Context, pulse core.PulseNumber, key []byte, data []byte) error

SetLocalData saves provided data to storage.

func (*DB) SetMessage added in v0.6.2

func (db *DB) SetMessage(ctx context.Context, pulseNumber core.PulseNumber, genericMessage core.Message) error

SetMessage persists message to the database

func (*DB) SetObjectIndex added in v0.0.6

func (db *DB) SetObjectIndex(
	ctx context.Context,
	id *core.RecordID,
	idx *index.ObjectLifeline,
) error

SetObjectIndex wraps matching transaction manager method.

func (*DB) SetRecord added in v0.0.6

func (db *DB) SetRecord(ctx context.Context, pulseNumber core.PulseNumber, rec record.Record) (*core.RecordID, error)

SetRecord wraps matching transaction manager method.

func (*DB) SetReplicatedPulse added in v0.6.3

func (db *DB) SetReplicatedPulse(ctx context.Context, pulsenum core.PulseNumber) error

SetReplicatedPulse saves last pulse succesfully replicated to 'heavy material' node.

func (*DB) SetTxRetiries added in v0.0.6

func (db *DB) SetTxRetiries(n int)

SetTxRetiries sets number of retries on conflict in Update

func (*DB) Stop added in v0.6.3

func (db *DB) Stop(ctx context.Context) error

Stop stops DB component.

func (*DB) StoreKeyValues added in v0.6.3

func (db *DB) StoreKeyValues(ctx context.Context, kvs []core.KV) error

StoreKeyValues stores provided key/value pairs.

func (*DB) Update added in v0.0.6

func (db *DB) Update(ctx context.Context, fn func(*TransactionManager) error) error

Update accepts transaction function and commits changes. All calls to received transaction manager will be consistent and written tp disk or an error will be returned.

func (*DB) View added in v0.0.6

func (db *DB) View(ctx context.Context, fn func(*TransactionManager) error) error

View accepts transaction function. All calls to received transaction manager will be consistent.

type IDLocker added in v0.6.0

type IDLocker struct {
	// contains filtered or unexported fields
}

IDLocker provides Lock/Unlock methods per record ID.

TODO: for further optimization we could use sync.Pool for mutexes.

func NewIDLocker added in v0.6.0

func NewIDLocker() *IDLocker

NewIDLocker creates new initialized IDLocker.

func (*IDLocker) Lock added in v0.6.0

func (l *IDLocker) Lock(id *core.RecordID)

Lock locks mutex belonged to record ID. If mutex does not exist, it will be created in concurrent safe fashion.

func (*IDLocker) Unlock added in v0.6.0

func (l *IDLocker) Unlock(id *core.RecordID)

Unlock unlocks mutex belonged to record ID.

type Pulse added in v0.6.3

type Pulse struct {
	Prev  *core.PulseNumber
	Next  *core.PulseNumber
	Pulse core.Pulse
}

Pulse is a record containing pulse info.

func (*Pulse) Bytes added in v0.6.3

func (p *Pulse) Bytes() []byte

Bytes serializes pulse.

type RecentObjectsIndexMeta added in v0.6.3

type RecentObjectsIndexMeta struct {
	TTL int
}

RecentObjectsIndexMeta contains meta about indexes

type RecentStorage added in v0.6.3

type RecentStorage struct {
	DefaultTTL int
	// contains filtered or unexported fields
}

RecentStorage is a base structure

func NewRecentStorage added in v0.6.3

func NewRecentStorage(defaultTTL int) *RecentStorage

NewRecentStorage creates default RecentStorage object

func (*RecentStorage) AddObject added in v0.6.3

func (r *RecentStorage) AddObject(id core.RecordID)

AddObject adds object to cache

func (*RecentStorage) AddPendingRequest added in v0.6.3

func (r *RecentStorage) AddPendingRequest(id core.RecordID)

AddPendingRequest adds request to cache.

func (*RecentStorage) ClearObjects added in v0.6.3

func (r *RecentStorage) ClearObjects()

ClearObjects clears the whole cache

func (*RecentStorage) ClearZeroTTLObjects added in v0.6.3

func (r *RecentStorage) ClearZeroTTLObjects()

ClearZeroTTLObjects clears objects with zero TTL

func (*RecentStorage) GetObjects added in v0.6.3

func (r *RecentStorage) GetObjects() map[core.RecordID]*RecentObjectsIndexMeta

GetObjects returns object hot-indexes.

func (*RecentStorage) GetRequests added in v0.6.3

func (r *RecentStorage) GetRequests() []core.RecordID

GetRequests returns request hot-indexes.

func (*RecentStorage) RemovePendingRequest added in v0.6.3

func (r *RecentStorage) RemovePendingRequest(id core.RecordID)

RemovePendingRequest removes request from cache.

type ReplicaIter added in v0.6.3

type ReplicaIter struct {
	// contains filtered or unexported fields
}

ReplicaIter provides partial iterator over BadgerDB key/value pairs required for replication to Heavy Material node in provided pulses range.

"Required KV pairs" are all keys with namespace 'scopeIDRecord' (TODO: 'add scopeIDBlob') in provided pulses range and all indexes from zero pulse to the end of provided range.

"Partial" means it fetches data in chunks of the specified size. After a chunk has been fetched, an iterator saves current position.

NOTE: This is not an "honest" alogrithm, because the last record size can exceed the limit. Better implementation is for the future work.

func NewReplicaIter added in v0.6.3

func NewReplicaIter(
	ctx context.Context,
	db *DB,
	start core.PulseNumber,
	end core.PulseNumber,
	limit int,
) *ReplicaIter

NewReplicaIter creates ReplicaIter what iterates over records required for heavy material replication.

Params 'start' and 'end' defines pulses from which scan should happen, and on which it should be stopped, but indexes scan are always started from core.FirstPulseNumber.

Param 'limit' sets per message limit.

func (*ReplicaIter) LastPulse added in v0.6.3

func (r *ReplicaIter) LastPulse() core.PulseNumber

LastPulse returns maximum pulse number of returned keys after each fetch.

func (*ReplicaIter) NextRecords added in v0.6.3

func (r *ReplicaIter) NextRecords() ([]core.KV, error)

NextRecords fetches next part of key value pairs.

type Store added in v0.0.5

type Store interface {
	GetRecord(ctx context.Context, ref *core.RecordID) (record.Record, error)
	SetRecord(ctx context.Context, pulseNumber core.PulseNumber, rec record.Record) (*core.RecordID, error)
	GetBlob(ctx context.Context, ref *core.RecordID) ([]byte, error)
	SetBlob(ctx context.Context, number core.PulseNumber, blob []byte) (*core.RecordID, error)
	GetObjectIndex(ctx context.Context, ref *core.RecordID, forupdate bool) (*index.ObjectLifeline, error)
	SetObjectIndex(ctx context.Context, ref *core.RecordID, idx *index.ObjectLifeline) error
	GetLatestPulseNumber(ctx context.Context) (core.PulseNumber, error)
	GetPulse(ctx context.Context, num core.PulseNumber) (*Pulse, error)
}

Store is used by context unaware clients who can work inside transactions as well as outside.

type TransactionManager added in v0.0.6

type TransactionManager struct {
	// contains filtered or unexported fields
}

TransactionManager is used to ensure persistent writes to disk.

func (*TransactionManager) Commit added in v0.0.6

func (m *TransactionManager) Commit() error

Commit tries to write transaction on disk. Returns error on fail.

func (*TransactionManager) Discard added in v0.0.6

func (m *TransactionManager) Discard()

Discard terminates transaction without disk writes.

func (*TransactionManager) GetBlob added in v0.6.2

func (m *TransactionManager) GetBlob(ctx context.Context, id *core.RecordID) ([]byte, error)

GetBlob returns binary value stored by record ID.

func (*TransactionManager) GetLatestPulseNumber added in v0.6.0

func (m *TransactionManager) GetLatestPulseNumber(ctx context.Context) (core.PulseNumber, error)

GetLatestPulseNumber returns current pulse number.

func (*TransactionManager) GetObjectIndex added in v0.0.6

func (m *TransactionManager) GetObjectIndex(
	ctx context.Context,
	id *core.RecordID,
	forupdate bool,
) (*index.ObjectLifeline, error)

GetObjectIndex fetches object lifeline index.

func (*TransactionManager) GetPulse added in v0.6.3

func (m *TransactionManager) GetPulse(ctx context.Context, num core.PulseNumber) (*Pulse, error)

GetPulse returns pulse for provided pulse number.

func (*TransactionManager) GetRecord added in v0.0.6

func (m *TransactionManager) GetRecord(ctx context.Context, id *core.RecordID) (record.Record, error)

GetRecord returns record from BadgerDB by *record.Reference.

It returns ErrNotFound if the DB does not contain the key.

func (*TransactionManager) GetRequest added in v0.5.0

func (m *TransactionManager) GetRequest(ctx context.Context, id *core.RecordID) (record.Request, error)

GetRequest returns request record from BadgerDB by *record.Reference.

It returns ErrNotFound if the DB does not contain the key.

func (*TransactionManager) SetBlob added in v0.6.2

func (m *TransactionManager) SetBlob(ctx context.Context, pulseNumber core.PulseNumber, blob []byte) (*core.RecordID, error)

SetBlob saves binary value for provided pulse.

func (*TransactionManager) SetObjectIndex added in v0.0.6

func (m *TransactionManager) SetObjectIndex(
	ctx context.Context,
	id *core.RecordID,
	idx *index.ObjectLifeline,
) error

SetObjectIndex stores object lifeline index.

func (*TransactionManager) SetRecord added in v0.0.6

func (m *TransactionManager) SetRecord(ctx context.Context, pulseNumber core.PulseNumber, rec record.Record) (*core.RecordID, error)

SetRecord stores record in BadgerDB and returns *record.ID of new record.

If record exists returns both *record.ID and ErrOverride error. If record not found returns nil and ErrNotFound error

Directories

Path Synopsis
Package storagetest contains high level API tests and test utils for other modules.
Package storagetest contains high level API tests and test utils for other modules.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL