Documentation ¶
Overview ¶
Package storage contains ledger storage implementation on top of BadgerDB engine.
Index ¶
- Variables
- type DB
- func (db *DB) AddPulse(ctx context.Context, pulse core.Pulse) error
- func (db *DB) BeginTransaction(update bool) *TransactionManager
- func (db *DB) Close() error
- func (db *DB) CreateDrop(ctx context.Context, pulse core.PulseNumber, prevHash []byte) (*jetdrop.JetDrop, [][]byte, error)
- func (db *DB) GenesisRef() *core.RecordRef
- func (db *DB) GetActiveNodes(pulse core.PulseNumber) ([]core.Node, error)
- func (db *DB) GetBadgerDB() *badger.DB
- func (db *DB) GetBlob(ctx context.Context, id *core.RecordID) ([]byte, error)
- func (db *DB) GetDrop(ctx context.Context, pulse core.PulseNumber) (*jetdrop.JetDrop, error)
- func (db *DB) GetHeavySyncedPulse(ctx context.Context) (pn core.PulseNumber, err error)
- func (db *DB) GetLastPulseAsLightMaterial(ctx context.Context) (core.PulseNumber, error)
- func (db *DB) GetLatestPulseNumber(ctx context.Context) (core.PulseNumber, error)
- func (db *DB) GetLocalData(ctx context.Context, pulse core.PulseNumber, key []byte) ([]byte, error)
- func (db *DB) GetObjectIndex(ctx context.Context, id *core.RecordID, forupdate bool) (*index.ObjectLifeline, error)
- func (db *DB) GetPulse(ctx context.Context, num core.PulseNumber) (*Pulse, error)
- func (db *DB) GetRecord(ctx context.Context, id *core.RecordID) (record.Record, error)
- func (db *DB) GetReplicatedPulse(ctx context.Context) (core.PulseNumber, error)
- func (db *DB) Init(ctx context.Context) error
- func (db *DB) IterateLocalData(ctx context.Context, pulse core.PulseNumber, prefix []byte, ...) error
- func (db *DB) IterateRecords(ctx context.Context, pulse core.PulseNumber, ...) error
- func (db *DB) SetActiveNodes(pulse core.PulseNumber, nodes []core.Node) error
- func (db *DB) SetBlob(ctx context.Context, pulseNumber core.PulseNumber, blob []byte) (*core.RecordID, error)
- func (db *DB) SetDrop(ctx context.Context, drop *jetdrop.JetDrop) error
- func (db *DB) SetHeavySyncedPulse(ctx context.Context, pulsenum core.PulseNumber) error
- func (db *DB) SetLastPulseAsLightMaterial(ctx context.Context, pulsenum core.PulseNumber) error
- func (db *DB) SetLocalData(ctx context.Context, pulse core.PulseNumber, key []byte, data []byte) error
- func (db *DB) SetMessage(ctx context.Context, pulseNumber core.PulseNumber, genericMessage core.Message) error
- func (db *DB) SetObjectIndex(ctx context.Context, id *core.RecordID, idx *index.ObjectLifeline) error
- func (db *DB) SetRecord(ctx context.Context, pulseNumber core.PulseNumber, rec record.Record) (*core.RecordID, error)
- func (db *DB) SetReplicatedPulse(ctx context.Context, pulsenum core.PulseNumber) error
- func (db *DB) SetTxRetiries(n int)
- func (db *DB) Stop(ctx context.Context) error
- func (db *DB) StoreKeyValues(ctx context.Context, kvs []core.KV) error
- func (db *DB) Update(ctx context.Context, fn func(*TransactionManager) error) error
- func (db *DB) View(ctx context.Context, fn func(*TransactionManager) error) error
- type IDLocker
- type Pulse
- type RecentObjectsIndexMeta
- type RecentStorage
- func (r *RecentStorage) AddObject(id core.RecordID)
- func (r *RecentStorage) AddPendingRequest(id core.RecordID)
- func (r *RecentStorage) ClearObjects()
- func (r *RecentStorage) ClearZeroTTLObjects()
- func (r *RecentStorage) GetObjects() map[core.RecordID]*RecentObjectsIndexMeta
- func (r *RecentStorage) GetRequests() []core.RecordID
- func (r *RecentStorage) RemovePendingRequest(id core.RecordID)
- type ReplicaIter
- type Store
- type TransactionManager
- func (m *TransactionManager) Commit() error
- func (m *TransactionManager) Discard()
- func (m *TransactionManager) GetBlob(ctx context.Context, id *core.RecordID) ([]byte, error)
- func (m *TransactionManager) GetLatestPulseNumber(ctx context.Context) (core.PulseNumber, error)
- func (m *TransactionManager) GetObjectIndex(ctx context.Context, id *core.RecordID, forupdate bool) (*index.ObjectLifeline, error)
- func (m *TransactionManager) GetPulse(ctx context.Context, num core.PulseNumber) (*Pulse, error)
- func (m *TransactionManager) GetRecord(ctx context.Context, id *core.RecordID) (record.Record, error)
- func (m *TransactionManager) GetRequest(ctx context.Context, id *core.RecordID) (record.Request, error)
- func (m *TransactionManager) SetBlob(ctx context.Context, pulseNumber core.PulseNumber, blob []byte) (*core.RecordID, error)
- func (m *TransactionManager) SetObjectIndex(ctx context.Context, id *core.RecordID, idx *index.ObjectLifeline) error
- func (m *TransactionManager) SetRecord(ctx context.Context, pulseNumber core.PulseNumber, rec record.Record) (*core.RecordID, error)
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNotFound returns if record/index not found in storage. ErrNotFound = errors.New("storage object not found") // ErrConflictRetriesOver is returned if Update transaction fails on all retry attempts. ErrConflictRetriesOver = errors.New("transaction conflict retries limit exceeded") // ErrConflict is the alias for badger.ErrConflict. ErrConflict = badger.ErrConflict // ErrOverride is returned if SetRecord tries update existing record ErrOverride = errors.New("records override is forbidden") )
var ErrReplicatorDone = errors.New("no more items in iterator")
ErrReplicatorDone is returned by an Replicator NextRecords method when the iteration is complete.
Functions ¶
This section is empty.
Types ¶
type DB ¶ added in v0.0.6
type DB struct { PlatformCryptographyScheme core.PlatformCryptographyScheme `inject:""` // contains filtered or unexported fields }
DB represents BadgerDB storage implementation.
func NewDB ¶ added in v0.0.6
NewDB returns storage.DB with BadgerDB instance initialized by opts. Creates database in provided dir or in current directory if dir parameter is empty.
func (*DB) BeginTransaction ¶ added in v0.0.6
func (db *DB) BeginTransaction(update bool) *TransactionManager
BeginTransaction opens a new transaction. All methods called on returned transaction manager will persist changes only after success on "Commit" call.
func (*DB) Close ¶ added in v0.0.6
Close wraps BadgerDB Close method.
From https://godoc.org/github.com/dgraph-io/badger#DB.Close: «It's crucial to call it to ensure all the pending updates make their way to disk. Calling DB.Close() multiple times is not safe and wouldcause panic.»
func (*DB) CreateDrop ¶ added in v0.6.0
func (db *DB) CreateDrop(ctx context.Context, pulse core.PulseNumber, prevHash []byte) ( *jetdrop.JetDrop, [][]byte, error, )
CreateDrop creates and stores jet drop for given pulse number.
Previous JetDrop hash should be provided. On success returns saved drop and slot records.
func (*DB) GenesisRef ¶ added in v0.6.0
GenesisRef returns the genesis record reference.
Genesis record is the parent for all top-level records.
func (*DB) GetActiveNodes ¶ added in v0.6.3
GetActiveNodes return active nodes for specified pulse.
func (*DB) GetBadgerDB ¶ added in v0.6.1
GetBadgerDB return badger.DB instance (for internal usage, like tests)
func (*DB) GetHeavySyncedPulse ¶ added in v0.6.3
GetHeavySyncedPulse returns last successfuly synced pulse number on heavy node.
func (*DB) GetLastPulseAsLightMaterial ¶ added in v0.6.3
GetLastPulseAsLightMaterial returns last pulse then node had a 'light material' role.
func (*DB) GetLatestPulseNumber ¶ added in v0.6.0
GetLatestPulseNumber returns current pulse number.
func (*DB) GetLocalData ¶ added in v0.6.3
GetLocalData retrieves data from storage.
func (*DB) GetObjectIndex ¶ added in v0.0.6
func (db *DB) GetObjectIndex( ctx context.Context, id *core.RecordID, forupdate bool, ) (*index.ObjectLifeline, error)
GetObjectIndex wraps matching transaction manager method.
func (*DB) GetReplicatedPulse ¶ added in v0.6.3
GetReplicatedPulse returns last pulse succesfully replicated to 'heavy material' node.
func (*DB) IterateLocalData ¶ added in v0.6.3
func (db *DB) IterateLocalData( ctx context.Context, pulse core.PulseNumber, prefix []byte, handler func(k, v []byte) error, ) error
IterateLocalData iterates over all record with specified prefix and calls handler with key and value of that record.
The key will be returned without prefix (e.g. the remaining slice) and value will be returned as it was saved.
func (*DB) IterateRecords ¶ added in v0.6.3
func (db *DB) IterateRecords( ctx context.Context, pulse core.PulseNumber, handler func(id core.RecordID, rec record.Record) error, ) error
IterateRecords iterates over records.
func (*DB) SetActiveNodes ¶ added in v0.6.3
SetActiveNodes saves active nodes for pulse in memory.
func (*DB) SetBlob ¶ added in v0.6.2
func (db *DB) SetBlob(ctx context.Context, pulseNumber core.PulseNumber, blob []byte) (*core.RecordID, error)
SetBlob saves binary value for provided pulse.
func (*DB) SetHeavySyncedPulse ¶ added in v0.6.3
SetHeavySyncedPulse saves last successfuly synced pulse number on heavy node.
func (*DB) SetLastPulseAsLightMaterial ¶ added in v0.6.3
SetLastPulseAsLightMaterial saves last pulse then node had a 'light material' role.
func (*DB) SetLocalData ¶ added in v0.6.3
func (db *DB) SetLocalData(ctx context.Context, pulse core.PulseNumber, key []byte, data []byte) error
SetLocalData saves provided data to storage.
func (*DB) SetMessage ¶ added in v0.6.2
func (db *DB) SetMessage(ctx context.Context, pulseNumber core.PulseNumber, genericMessage core.Message) error
SetMessage persists message to the database
func (*DB) SetObjectIndex ¶ added in v0.0.6
func (db *DB) SetObjectIndex( ctx context.Context, id *core.RecordID, idx *index.ObjectLifeline, ) error
SetObjectIndex wraps matching transaction manager method.
func (*DB) SetRecord ¶ added in v0.0.6
func (db *DB) SetRecord(ctx context.Context, pulseNumber core.PulseNumber, rec record.Record) (*core.RecordID, error)
SetRecord wraps matching transaction manager method.
func (*DB) SetReplicatedPulse ¶ added in v0.6.3
SetReplicatedPulse saves last pulse succesfully replicated to 'heavy material' node.
func (*DB) SetTxRetiries ¶ added in v0.0.6
SetTxRetiries sets number of retries on conflict in Update
func (*DB) StoreKeyValues ¶ added in v0.6.3
StoreKeyValues stores provided key/value pairs.
type IDLocker ¶ added in v0.6.0
type IDLocker struct {
// contains filtered or unexported fields
}
IDLocker provides Lock/Unlock methods per record ID.
TODO: for further optimization we could use sync.Pool for mutexes.
func NewIDLocker ¶ added in v0.6.0
func NewIDLocker() *IDLocker
NewIDLocker creates new initialized IDLocker.
type Pulse ¶ added in v0.6.3
type Pulse struct { Prev *core.PulseNumber Next *core.PulseNumber Pulse core.Pulse }
Pulse is a record containing pulse info.
type RecentObjectsIndexMeta ¶ added in v0.6.3
type RecentObjectsIndexMeta struct {
TTL int
}
RecentObjectsIndexMeta contains meta about indexes
type RecentStorage ¶ added in v0.6.3
type RecentStorage struct { DefaultTTL int // contains filtered or unexported fields }
RecentStorage is a base structure
func NewRecentStorage ¶ added in v0.6.3
func NewRecentStorage(defaultTTL int) *RecentStorage
NewRecentStorage creates default RecentStorage object
func (*RecentStorage) AddObject ¶ added in v0.6.3
func (r *RecentStorage) AddObject(id core.RecordID)
AddObject adds object to cache
func (*RecentStorage) AddPendingRequest ¶ added in v0.6.3
func (r *RecentStorage) AddPendingRequest(id core.RecordID)
AddPendingRequest adds request to cache.
func (*RecentStorage) ClearObjects ¶ added in v0.6.3
func (r *RecentStorage) ClearObjects()
ClearObjects clears the whole cache
func (*RecentStorage) ClearZeroTTLObjects ¶ added in v0.6.3
func (r *RecentStorage) ClearZeroTTLObjects()
ClearZeroTTLObjects clears objects with zero TTL
func (*RecentStorage) GetObjects ¶ added in v0.6.3
func (r *RecentStorage) GetObjects() map[core.RecordID]*RecentObjectsIndexMeta
GetObjects returns object hot-indexes.
func (*RecentStorage) GetRequests ¶ added in v0.6.3
func (r *RecentStorage) GetRequests() []core.RecordID
GetRequests returns request hot-indexes.
func (*RecentStorage) RemovePendingRequest ¶ added in v0.6.3
func (r *RecentStorage) RemovePendingRequest(id core.RecordID)
RemovePendingRequest removes request from cache.
type ReplicaIter ¶ added in v0.6.3
type ReplicaIter struct {
// contains filtered or unexported fields
}
ReplicaIter provides partial iterator over BadgerDB key/value pairs required for replication to Heavy Material node in provided pulses range.
"Required KV pairs" are all keys with namespace 'scopeIDRecord' (TODO: 'add scopeIDBlob') in provided pulses range and all indexes from zero pulse to the end of provided range.
"Partial" means it fetches data in chunks of the specified size. After a chunk has been fetched, an iterator saves current position.
NOTE: This is not an "honest" alogrithm, because the last record size can exceed the limit. Better implementation is for the future work.
func NewReplicaIter ¶ added in v0.6.3
func NewReplicaIter( ctx context.Context, db *DB, start core.PulseNumber, end core.PulseNumber, limit int, ) *ReplicaIter
NewReplicaIter creates ReplicaIter what iterates over records required for heavy material replication.
Params 'start' and 'end' defines pulses from which scan should happen, and on which it should be stopped, but indexes scan are always started from core.FirstPulseNumber.
Param 'limit' sets per message limit.
func (*ReplicaIter) LastPulse ¶ added in v0.6.3
func (r *ReplicaIter) LastPulse() core.PulseNumber
LastPulse returns maximum pulse number of returned keys after each fetch.
func (*ReplicaIter) NextRecords ¶ added in v0.6.3
func (r *ReplicaIter) NextRecords() ([]core.KV, error)
NextRecords fetches next part of key value pairs.
type Store ¶ added in v0.0.5
type Store interface { GetRecord(ctx context.Context, ref *core.RecordID) (record.Record, error) SetRecord(ctx context.Context, pulseNumber core.PulseNumber, rec record.Record) (*core.RecordID, error) GetBlob(ctx context.Context, ref *core.RecordID) ([]byte, error) SetBlob(ctx context.Context, number core.PulseNumber, blob []byte) (*core.RecordID, error) GetObjectIndex(ctx context.Context, ref *core.RecordID, forupdate bool) (*index.ObjectLifeline, error) SetObjectIndex(ctx context.Context, ref *core.RecordID, idx *index.ObjectLifeline) error GetLatestPulseNumber(ctx context.Context) (core.PulseNumber, error) GetPulse(ctx context.Context, num core.PulseNumber) (*Pulse, error) }
Store is used by context unaware clients who can work inside transactions as well as outside.
type TransactionManager ¶ added in v0.0.6
type TransactionManager struct {
// contains filtered or unexported fields
}
TransactionManager is used to ensure persistent writes to disk.
func (*TransactionManager) Commit ¶ added in v0.0.6
func (m *TransactionManager) Commit() error
Commit tries to write transaction on disk. Returns error on fail.
func (*TransactionManager) Discard ¶ added in v0.0.6
func (m *TransactionManager) Discard()
Discard terminates transaction without disk writes.
func (*TransactionManager) GetBlob ¶ added in v0.6.2
GetBlob returns binary value stored by record ID.
func (*TransactionManager) GetLatestPulseNumber ¶ added in v0.6.0
func (m *TransactionManager) GetLatestPulseNumber(ctx context.Context) (core.PulseNumber, error)
GetLatestPulseNumber returns current pulse number.
func (*TransactionManager) GetObjectIndex ¶ added in v0.0.6
func (m *TransactionManager) GetObjectIndex( ctx context.Context, id *core.RecordID, forupdate bool, ) (*index.ObjectLifeline, error)
GetObjectIndex fetches object lifeline index.
func (*TransactionManager) GetPulse ¶ added in v0.6.3
func (m *TransactionManager) GetPulse(ctx context.Context, num core.PulseNumber) (*Pulse, error)
GetPulse returns pulse for provided pulse number.
func (*TransactionManager) GetRecord ¶ added in v0.0.6
func (m *TransactionManager) GetRecord(ctx context.Context, id *core.RecordID) (record.Record, error)
GetRecord returns record from BadgerDB by *record.Reference.
It returns ErrNotFound if the DB does not contain the key.
func (*TransactionManager) GetRequest ¶ added in v0.5.0
func (m *TransactionManager) GetRequest(ctx context.Context, id *core.RecordID) (record.Request, error)
GetRequest returns request record from BadgerDB by *record.Reference.
It returns ErrNotFound if the DB does not contain the key.
func (*TransactionManager) SetBlob ¶ added in v0.6.2
func (m *TransactionManager) SetBlob(ctx context.Context, pulseNumber core.PulseNumber, blob []byte) (*core.RecordID, error)
SetBlob saves binary value for provided pulse.
func (*TransactionManager) SetObjectIndex ¶ added in v0.0.6
func (m *TransactionManager) SetObjectIndex( ctx context.Context, id *core.RecordID, idx *index.ObjectLifeline, ) error
SetObjectIndex stores object lifeline index.
func (*TransactionManager) SetRecord ¶ added in v0.0.6
func (m *TransactionManager) SetRecord(ctx context.Context, pulseNumber core.PulseNumber, rec record.Record) (*core.RecordID, error)
SetRecord stores record in BadgerDB and returns *record.ID of new record.
If record exists returns both *record.ID and ErrOverride error. If record not found returns nil and ErrNotFound error
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package storagetest contains high level API tests and test utils for other modules.
|
Package storagetest contains high level API tests and test utils for other modules. |