Documentation ¶
Overview ¶
Package storage contains ledger storage implementation on top of BadgerDB engine.
Index ¶
- Variables
- func NullifyJetInKey(key []byte)
- type DB
- func (db *DB) AddDropSize(ctx context.Context, dropSize *jet.DropSize) error
- func (db *DB) AddJets(ctx context.Context, jetIDs ...core.RecordID) error
- func (db *DB) AddPulse(ctx context.Context, pulse core.Pulse) error
- func (db *DB) BeginTransaction(update bool) (*TransactionManager, error)
- func (db *DB) CloneJetTree(ctx context.Context, from, to core.PulseNumber) (*jet.Tree, error)
- func (db *DB) Close() error
- func (db *DB) CreateDrop(ctx context.Context, jetID core.RecordID, pulse core.PulseNumber, ...) (*jet.JetDrop, [][]byte, uint64, error)
- func (db *DB) GenesisRef() *core.RecordRef
- func (db *DB) GetActiveNodes(pulse core.PulseNumber) ([]core.Node, error)
- func (db *DB) GetActiveNodesByRole(pulse core.PulseNumber, role core.StaticRole) ([]core.Node, error)
- func (db *DB) GetAllNonEmptySyncClientJets(ctx context.Context) (map[core.RecordID][]core.PulseNumber, error)
- func (db *DB) GetAllSyncClientJets(ctx context.Context) (map[core.RecordID][]core.PulseNumber, error)
- func (db *DB) GetBadgerDB() *badger.DB
- func (db *DB) GetBlob(ctx context.Context, jetID core.RecordID, id *core.RecordID) ([]byte, error)
- func (db *DB) GetDrop(ctx context.Context, jetID core.RecordID, pulse core.PulseNumber) (*jet.JetDrop, error)
- func (db *DB) GetDropSizeHistory(ctx context.Context, jetID core.RecordID) (jet.DropSizeHistory, error)
- func (db *DB) GetHeavySyncedPulse(ctx context.Context, jetID core.RecordID) (pn core.PulseNumber, err error)
- func (db *DB) GetJetSizesHistoryDepth() int
- func (db *DB) GetJetTree(ctx context.Context, pulse core.PulseNumber) (*jet.Tree, error)
- func (db *DB) GetJets(ctx context.Context) (jet.IDSet, error)
- func (db *DB) GetLatestPulse(ctx context.Context) (*Pulse, error)deprecated
- func (db *DB) GetLocalData(ctx context.Context, pulse core.PulseNumber, key []byte) ([]byte, error)
- func (db *DB) GetObjectIndex(ctx context.Context, jetID core.RecordID, id *core.RecordID, forupdate bool) (*index.ObjectLifeline, error)
- func (db *DB) GetPreviousPulse(ctx context.Context, num core.PulseNumber) (*Pulse, error)
- func (db *DB) GetPulse(ctx context.Context, num core.PulseNumber) (*Pulse, error)
- func (db *DB) GetRecord(ctx context.Context, jetID core.RecordID, id *core.RecordID) (record.Record, error)
- func (db *DB) GetSyncClientJetPulses(ctx context.Context, jetID core.RecordID) ([]core.PulseNumber, error)
- func (db *DB) Init(ctx context.Context) error
- func (db *DB) IterateIndexIDs(ctx context.Context, jetID core.RecordID, handler func(id core.RecordID) error) error
- func (db *DB) IterateLocalData(ctx context.Context, pulse core.PulseNumber, prefix []byte, ...) error
- func (db *DB) IterateRecordsOnPulse(ctx context.Context, jetID core.RecordID, pulse core.PulseNumber, ...) error
- func (db *DB) RemoveActiveNodesUntil(pulse core.PulseNumber)
- func (db *DB) RemoveAllForJetUntilPulse(ctx context.Context, jetID core.RecordID, pn core.PulseNumber, ...) (map[string]int, error)
- func (db *DB) RemoveJetBlobsUntil(ctx context.Context, jetID core.RecordID, pn core.PulseNumber) (int, error)
- func (db *DB) RemoveJetDropsUntil(ctx context.Context, jetID core.RecordID, pn core.PulseNumber) (int, error)
- func (db *DB) RemoveJetIndexesUntil(ctx context.Context, jetID core.RecordID, pn core.PulseNumber, ...) (int, error)
- func (db *DB) RemoveJetRecordsUntil(ctx context.Context, jetID core.RecordID, pn core.PulseNumber, ...) (int, error)
- func (db *DB) RemoveObjectIndex(ctx context.Context, jetID core.RecordID, ref *core.RecordID) error
- func (db *DB) SetActiveNodes(pulse core.PulseNumber, nodes []core.Node) error
- func (db *DB) SetBlob(ctx context.Context, jetID core.RecordID, pulseNumber core.PulseNumber, ...) (*core.RecordID, error)
- func (db *DB) SetDrop(ctx context.Context, jetID core.RecordID, drop *jet.JetDrop) error
- func (db *DB) SetDropSizeHistory(ctx context.Context, jetID core.RecordID, dropSizeHistory jet.DropSizeHistory) error
- func (db *DB) SetHeavySyncedPulse(ctx context.Context, jetID core.RecordID, pulsenum core.PulseNumber) error
- func (db *DB) SetLocalData(ctx context.Context, pulse core.PulseNumber, key []byte, data []byte) error
- func (db *DB) SetMessage(ctx context.Context, jetID core.RecordID, pulseNumber core.PulseNumber, ...) error
- func (db *DB) SetObjectIndex(ctx context.Context, jetID core.RecordID, id *core.RecordID, ...) error
- func (db *DB) SetRecord(ctx context.Context, jetID core.RecordID, pulseNumber core.PulseNumber, ...) (*core.RecordID, error)
- func (db *DB) SetSyncClientJetPulses(ctx context.Context, jetID core.RecordID, pns []core.PulseNumber) error
- func (db *DB) SetTxRetiries(n int)
- func (db *DB) SplitJetTree(ctx context.Context, pulse core.PulseNumber, jetID core.RecordID) (*core.RecordID, *core.RecordID, error)
- func (db *DB) Stop(ctx context.Context) error
- func (db *DB) StoreKeyValues(ctx context.Context, kvs []core.KV) error
- func (db *DB) Update(ctx context.Context, fn func(*TransactionManager) error) error
- func (db *DB) UpdateJetTree(ctx context.Context, pulse core.PulseNumber, setActual bool, ...) error
- func (db *DB) View(ctx context.Context, fn func(*TransactionManager) error) error
- type IDLocker
- type Key
- type Node
- type Pulse
- type PulseStorage
- type RecentStorage
- func (r *RecentStorage) AddObject(ctx context.Context, id core.RecordID)
- func (r *RecentStorage) AddObjectWithTLL(ctx context.Context, id core.RecordID, ttl int)
- func (r *RecentStorage) AddPendingRequest(ctx context.Context, obj, req core.RecordID)
- func (r *RecentStorage) DecreaseTTL(ctx context.Context)
- func (r *RecentStorage) GetObjects() map[core.RecordID]int
- func (r *RecentStorage) GetRequests() map[core.RecordID]map[core.RecordID]struct{}
- func (r *RecentStorage) GetRequestsForObject(obj core.RecordID) []core.RecordID
- func (r *RecentStorage) IsRecordIDCached(obj core.RecordID) bool
- func (r *RecentStorage) RemovePendingRequest(ctx context.Context, obj, req core.RecordID)
- type RecentStorageProvider
- type ReplicaIter
- type Store
- type TransactionManager
- func (m *TransactionManager) Commit() error
- func (m *TransactionManager) Discard()
- func (m *TransactionManager) GetBlob(ctx context.Context, jetID core.RecordID, id *core.RecordID) ([]byte, error)
- func (m *TransactionManager) GetLatestPulse(ctx context.Context) (*Pulse, error)
- func (m *TransactionManager) GetObjectIndex(ctx context.Context, j core.RecordID, id *core.RecordID, forupdate bool) (*index.ObjectLifeline, error)
- func (m *TransactionManager) GetPulse(ctx context.Context, num core.PulseNumber) (*Pulse, error)
- func (m *TransactionManager) GetRecord(ctx context.Context, jetID core.RecordID, id *core.RecordID) (record.Record, error)
- func (m *TransactionManager) GetRequest(ctx context.Context, jetID core.RecordID, id *core.RecordID) (record.Request, error)
- func (m *TransactionManager) RemoveObjectIndex(ctx context.Context, j core.RecordID, ref *core.RecordID) error
- func (m *TransactionManager) SetBlob(ctx context.Context, jetID core.RecordID, pulseNumber core.PulseNumber, ...) (*core.RecordID, error)
- func (m *TransactionManager) SetObjectIndex(ctx context.Context, j core.RecordID, id *core.RecordID, ...) error
- func (m *TransactionManager) SetRecord(ctx context.Context, j core.RecordID, pulseNumber core.PulseNumber, ...) (*core.RecordID, error)
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNotFound returns if record/index not found in storage. ErrNotFound = errors.New("storage object not found") // ErrConflictRetriesOver is returned if Update transaction fails on all retry attempts. ErrConflictRetriesOver = errors.New("transaction conflict retries limit exceeded") // ErrConflict is the alias for badger.ErrConflict. ErrConflict = badger.ErrConflict // ErrOverride is returned if SetRecord tries to update existing record. ErrOverride = errors.New("records override is forbidden") // ErrClosed is returned when attempt to read or write to closed db. ErrClosed = errors.New("db is closed") )
var ErrReplicatorDone = errors.New("no more items in iterator")
ErrReplicatorDone is returned by an Replicator NextRecords method when the iteration is complete.
Functions ¶
func NullifyJetInKey ¶ added in v0.7.5
func NullifyJetInKey(key []byte)
NullifyJetInKey nullify jet part in record.
Types ¶
type DB ¶ added in v0.0.6
type DB struct { PlatformCryptographyScheme core.PlatformCryptographyScheme `inject:""` // contains filtered or unexported fields }
DB represents BadgerDB storage implementation.
func NewDB ¶ added in v0.0.6
NewDB returns storage.DB with BadgerDB instance initialized by opts. Creates database in provided dir or in current directory if dir parameter is empty.
func (*DB) AddDropSize ¶ added in v0.7.5
AddDropSize adds Jet drop size stats (required for split decision).
func (*DB) BeginTransaction ¶ added in v0.0.6
func (db *DB) BeginTransaction(update bool) (*TransactionManager, error)
BeginTransaction opens a new transaction. All methods called on returned transaction manager will persist changes only after success on "Commit" call.
func (*DB) CloneJetTree ¶ added in v0.7.5
CloneJetTree copies tree from one pulse to another. Use it to copy past tree into new pulse.
func (*DB) Close ¶ added in v0.0.6
Close wraps BadgerDB Close method.
From https://godoc.org/github.com/dgraph-io/badger#DB.Close: «It's crucial to call it to ensure all the pending updates make their way to disk. Calling DB.Close() multiple times is not safe and wouldcause panic.»
func (*DB) CreateDrop ¶ added in v0.6.0
func (db *DB) CreateDrop(ctx context.Context, jetID core.RecordID, pulse core.PulseNumber, prevHash []byte) ( *jet.JetDrop, [][]byte, uint64, error, )
CreateDrop creates and stores jet drop for given pulse number.
On success returns saved drop object, slot records, drop size.
func (*DB) GenesisRef ¶ added in v0.6.0
GenesisRef returns the genesis record reference.
Genesis record is the parent for all top-level records.
func (*DB) GetActiveNodes ¶ added in v0.6.3
GetActiveNodes return active nodes for specified pulse.
func (*DB) GetActiveNodesByRole ¶ added in v0.7.5
func (db *DB) GetActiveNodesByRole(pulse core.PulseNumber, role core.StaticRole) ([]core.Node, error)
GetActiveNodesByRole return active nodes for specified pulse and role.
func (*DB) GetAllNonEmptySyncClientJets ¶ added in v0.7.5
func (db *DB) GetAllNonEmptySyncClientJets(ctx context.Context) (map[core.RecordID][]core.PulseNumber, error)
GetAllNonEmptySyncClientJets returns map of all jet's if they have non empty list pulses to sync.
func (*DB) GetAllSyncClientJets ¶ added in v0.7.5
func (db *DB) GetAllSyncClientJets(ctx context.Context) (map[core.RecordID][]core.PulseNumber, error)
GetAllSyncClientJets returns map of all jet's processed by node.
func (*DB) GetBadgerDB ¶ added in v0.6.1
GetBadgerDB return badger.DB instance (for internal usage, like tests)
func (*DB) GetBlob ¶ added in v0.6.2
GetBlob returns binary value stored by record ID. TODO: switch from reference to passing blob id for consistency - @nordicdyno 6.Dec.2018
func (*DB) GetDrop ¶ added in v0.0.6
func (db *DB) GetDrop(ctx context.Context, jetID core.RecordID, pulse core.PulseNumber) (*jet.JetDrop, error)
GetDrop returns jet drop for a given pulse number and jet id.
func (*DB) GetDropSizeHistory ¶ added in v0.7.5
func (db *DB) GetDropSizeHistory(ctx context.Context, jetID core.RecordID) (jet.DropSizeHistory, error)
GetDropSizeHistory returns last drops sizes.
func (*DB) GetHeavySyncedPulse ¶ added in v0.6.3
func (db *DB) GetHeavySyncedPulse(ctx context.Context, jetID core.RecordID) (pn core.PulseNumber, err error)
GetHeavySyncedPulse returns last successfuly synced pulse number on heavy node.
func (*DB) GetJetSizesHistoryDepth ¶ added in v0.7.5
GetJetSizesHistoryDepth returns max amount of drop sizes
func (*DB) GetJetTree ¶ added in v0.7.5
GetJetTree fetches tree for specified pulse.
func (*DB) GetLocalData ¶ added in v0.6.3
GetLocalData retrieves data from storage.
func (*DB) GetObjectIndex ¶ added in v0.0.6
func (db *DB) GetObjectIndex( ctx context.Context, jetID core.RecordID, id *core.RecordID, forupdate bool, ) (*index.ObjectLifeline, error)
GetObjectIndex wraps matching transaction manager method.
func (*DB) GetPreviousPulse ¶ added in v0.7.5
GetPreviousPulse returns pulse for provided pulse number.
func (*DB) GetRecord ¶ added in v0.0.6
func (db *DB) GetRecord(ctx context.Context, jetID core.RecordID, id *core.RecordID) (record.Record, error)
GetRecord wraps matching transaction manager method.
func (*DB) GetSyncClientJetPulses ¶ added in v0.7.5
func (db *DB) GetSyncClientJetPulses(ctx context.Context, jetID core.RecordID) ([]core.PulseNumber, error)
GetSyncClientJetPulses returns all jet's pulses not synced to heavy.
func (*DB) IterateIndexIDs ¶ added in v0.7.5
func (db *DB) IterateIndexIDs( ctx context.Context, jetID core.RecordID, handler func(id core.RecordID) error, ) error
IterateIndexIDs iterates over index IDs on provided Jet ID.
func (*DB) IterateLocalData ¶ added in v0.6.3
func (db *DB) IterateLocalData( ctx context.Context, pulse core.PulseNumber, prefix []byte, handler func(k, v []byte) error, ) error
IterateLocalData iterates over all record with specified prefix and calls handler with key and value of that record.
The key will be returned without prefix (e.g. the remaining slice) and value will be returned as it was saved.
func (*DB) IterateRecordsOnPulse ¶ added in v0.7.5
func (db *DB) IterateRecordsOnPulse( ctx context.Context, jetID core.RecordID, pulse core.PulseNumber, handler func(id core.RecordID, rec record.Record) error, ) error
IterateRecordsOnPulse iterates over records on provided Jet ID and Pulse.
func (*DB) RemoveActiveNodesUntil ¶ added in v0.7.5
func (db *DB) RemoveActiveNodesUntil(pulse core.PulseNumber)
RemoveActiveNodesUntil removes active nodes for all nodes less than provided pulse.
func (*DB) RemoveAllForJetUntilPulse ¶ added in v0.7.5
func (db *DB) RemoveAllForJetUntilPulse( ctx context.Context, jetID core.RecordID, pn core.PulseNumber, recent recentstorage.RecentStorage, ) (map[string]int, error)
RemoveAllForJetUntilPulse removes all syncing on heavy records until pulse number for provided jetID returns removal stat and cummulative error
func (*DB) RemoveJetBlobsUntil ¶ added in v0.7.5
func (db *DB) RemoveJetBlobsUntil(ctx context.Context, jetID core.RecordID, pn core.PulseNumber) (int, error)
RemoveJetBlobsUntil removes for provided JetID all blobs older than provided pulse number.
func (*DB) RemoveJetDropsUntil ¶ added in v0.7.5
func (db *DB) RemoveJetDropsUntil(ctx context.Context, jetID core.RecordID, pn core.PulseNumber) (int, error)
RemoveJetDropsUntil removes for provided JetID all jet drops older than provided pulse number.
func (*DB) RemoveJetIndexesUntil ¶ added in v0.7.5
func (db *DB) RemoveJetIndexesUntil(ctx context.Context, jetID core.RecordID, pn core.PulseNumber, recent recentstorage.RecentStorage) (int, error)
RemoveJetIndexesUntil removes for provided JetID all lifelines older than provided pulse number. Indexes caches by recent storage, we should avoid them deletion.
func (*DB) RemoveJetRecordsUntil ¶ added in v0.7.5
func (db *DB) RemoveJetRecordsUntil(ctx context.Context, jetID core.RecordID, pn core.PulseNumber, recent recentstorage.RecentStorage) (int, error)
RemoveJetRecordsUntil removes for provided JetID all records older than provided pulse number. In recods pending requests live, so we need recent storage here
func (*DB) RemoveObjectIndex ¶ added in v0.7.5
func (db *DB) RemoveObjectIndex( ctx context.Context, jetID core.RecordID, ref *core.RecordID, ) error
RemoveObjectIndex removes an index of an object
func (*DB) SetActiveNodes ¶ added in v0.6.3
SetActiveNodes saves active nodes for pulse in memory.
func (*DB) SetBlob ¶ added in v0.6.2
func (db *DB) SetBlob(ctx context.Context, jetID core.RecordID, pulseNumber core.PulseNumber, blob []byte) (*core.RecordID, error)
SetBlob saves binary value for provided pulse.
func (*DB) SetDropSizeHistory ¶ added in v0.7.5
func (db *DB) SetDropSizeHistory(ctx context.Context, jetID core.RecordID, dropSizeHistory jet.DropSizeHistory) error
SetDropSizeHistory saves drop sizes history.
func (*DB) SetHeavySyncedPulse ¶ added in v0.6.3
func (db *DB) SetHeavySyncedPulse(ctx context.Context, jetID core.RecordID, pulsenum core.PulseNumber) error
SetHeavySyncedPulse saves last successfuly synced pulse number on heavy node.
func (*DB) SetLocalData ¶ added in v0.6.3
func (db *DB) SetLocalData(ctx context.Context, pulse core.PulseNumber, key []byte, data []byte) error
SetLocalData saves provided data to storage.
func (*DB) SetMessage ¶ added in v0.6.2
func (db *DB) SetMessage(ctx context.Context, jetID core.RecordID, pulseNumber core.PulseNumber, genericMessage core.Message) error
SetMessage persists message to the database
func (*DB) SetObjectIndex ¶ added in v0.0.6
func (db *DB) SetObjectIndex( ctx context.Context, jetID core.RecordID, id *core.RecordID, idx *index.ObjectLifeline, ) error
SetObjectIndex wraps matching transaction manager method.
func (*DB) SetRecord ¶ added in v0.0.6
func (db *DB) SetRecord(ctx context.Context, jetID core.RecordID, pulseNumber core.PulseNumber, rec record.Record) (*core.RecordID, error)
SetRecord wraps matching transaction manager method.
func (*DB) SetSyncClientJetPulses ¶ added in v0.7.5
func (db *DB) SetSyncClientJetPulses(ctx context.Context, jetID core.RecordID, pns []core.PulseNumber) error
SetSyncClientJetPulses saves all jet's pulses not synced to heavy.
func (*DB) SetTxRetiries ¶ added in v0.0.6
SetTxRetiries sets number of retries on conflict in Update
func (*DB) SplitJetTree ¶ added in v0.7.5
func (db *DB) SplitJetTree( ctx context.Context, pulse core.PulseNumber, jetID core.RecordID, ) (*core.RecordID, *core.RecordID, error)
SplitJetTree performs jet split and returns resulting jet ids.
func (*DB) StoreKeyValues ¶ added in v0.6.3
StoreKeyValues stores provided key/value pairs.
func (*DB) Update ¶ added in v0.0.6
Update accepts transaction function and commits changes. All calls to received transaction manager will be consistent and written tp disk or an error will be returned.
type IDLocker ¶ added in v0.6.0
type IDLocker struct {
// contains filtered or unexported fields
}
IDLocker provides Lock/Unlock methods per record ID.
TODO: for further optimization we could use sync.Pool for mutexes.
func NewIDLocker ¶ added in v0.6.0
func NewIDLocker() *IDLocker
NewIDLocker creates new initialized IDLocker.
type Key ¶ added in v0.7.5
type Key []byte
Key type for wrapping storage binary key.
func (Key) PulseNumber ¶ added in v0.7.5
func (b Key) PulseNumber() core.PulseNumber
PulseNumber returns pulse number for provided storage binary key.
type Node ¶ added in v0.7.5
type Node struct { FID core.RecordRef FRole core.StaticRole }
func (Node) GetGlobuleID ¶ added in v0.7.5
func (Node) PhysicalAddress ¶ added in v0.7.5
func (Node) Role ¶ added in v0.7.5
func (n Node) Role() core.StaticRole
func (Node) ShortID ¶ added in v0.7.5
func (Node) ShortID() core.ShortNodeID
type Pulse ¶ added in v0.6.3
type Pulse struct { Prev *core.PulseNumber Next *core.PulseNumber SerialNumber int Pulse core.Pulse }
Pulse is a record containing pulse info.
type PulseStorage ¶ added in v0.7.5
type PulseStorage struct {
// contains filtered or unexported fields
}
PulseStorage implements core.PulseStorage
func NewPulseStorage ¶ added in v0.7.5
func NewPulseStorage(db *DB) *PulseStorage
NewPulseStorage creates new pulse storage
func (*PulseStorage) Lock ¶ added in v0.7.5
func (ps *PulseStorage) Lock()
Lock takes lock on parent's pulse storage
func (*PulseStorage) Set ¶ added in v0.7.5
func (ps *PulseStorage) Set(pulse *core.Pulse)
func (*PulseStorage) Unlock ¶ added in v0.7.5
func (ps *PulseStorage) Unlock()
Unlock takes unlock on parent's pulse storage
type RecentStorage ¶ added in v0.6.3
type RecentStorage struct { DefaultTTL int // contains filtered or unexported fields }
RecentStorage is a base structure
func NewRecentStorage ¶ added in v0.6.3
func NewRecentStorage(jetID core.RecordID, defaultTTL int) *RecentStorage
NewRecentStorage creates default RecentStorage object
func (*RecentStorage) AddObject ¶ added in v0.6.3
func (r *RecentStorage) AddObject(ctx context.Context, id core.RecordID)
AddObject adds object to cache
func (*RecentStorage) AddObjectWithTLL ¶ added in v0.7.5
AddObjectWithTLL adds object with specified TTL to the cache
func (*RecentStorage) AddPendingRequest ¶ added in v0.6.3
func (r *RecentStorage) AddPendingRequest(ctx context.Context, obj, req core.RecordID)
AddPendingRequest adds request to cache.
func (*RecentStorage) DecreaseTTL ¶ added in v0.7.5
func (r *RecentStorage) DecreaseTTL(ctx context.Context)
DecreaseTTL decreases ttl and clears objects if their ttl is zero
func (*RecentStorage) GetObjects ¶ added in v0.6.3
func (r *RecentStorage) GetObjects() map[core.RecordID]int
GetObjects returns object hot-indexes.
func (*RecentStorage) GetRequests ¶ added in v0.6.3
func (r *RecentStorage) GetRequests() map[core.RecordID]map[core.RecordID]struct{}
GetRequests returns request hot-indexes.
func (*RecentStorage) GetRequestsForObject ¶ added in v0.7.5
func (r *RecentStorage) GetRequestsForObject(obj core.RecordID) []core.RecordID
GetRequestsForObject returns request hot-indexes for object.
func (*RecentStorage) IsRecordIDCached ¶ added in v0.7.5
func (r *RecentStorage) IsRecordIDCached(obj core.RecordID) bool
IsRecordIDCached checks recordID inside caches
func (*RecentStorage) RemovePendingRequest ¶ added in v0.6.3
func (r *RecentStorage) RemovePendingRequest(ctx context.Context, obj, req core.RecordID)
RemovePendingRequest removes request from cache.
type RecentStorageProvider ¶ added in v0.7.5
type RecentStorageProvider struct { DefaultTTL int // contains filtered or unexported fields }
RecentStorageProvider provides a recent storage for jet
func NewRecentStorageProvider ¶ added in v0.7.5
func NewRecentStorageProvider(defaultTTL int) *RecentStorageProvider
NewRecentStorageProvider creates new provider
func (*RecentStorageProvider) CloneStorage ¶ added in v0.7.5
func (p *RecentStorageProvider) CloneStorage(ctx context.Context, fromJetID, toJetID core.RecordID)
CloneStorage clones a recent storage from one jet to another
func (*RecentStorageProvider) GetStorage ¶ added in v0.7.5
func (p *RecentStorageProvider) GetStorage(ctx context.Context, jetID core.RecordID) recentstorage.RecentStorage
GetStorage returns a recent storage for jet
func (*RecentStorageProvider) RemoveStorage ¶ added in v0.7.5
func (p *RecentStorageProvider) RemoveStorage(ctx context.Context, id core.RecordID)
RemoveStorage removes storage from provider
type ReplicaIter ¶ added in v0.6.3
type ReplicaIter struct {
// contains filtered or unexported fields
}
ReplicaIter provides partial iterator over BadgerDB key/value pairs required for replication to Heavy Material node in provided pulses range.
"Required KV pairs" are all keys with namespace 'scopeIDRecord' (TODO: 'add scopeIDBlob') in provided pulses range and all indexes from zero pulse to the end of provided range.
"Partial" means it fetches data in chunks of the specified size. After a chunk has been fetched, an iterator saves current position.
NOTE: This is not an "honest" alogrithm, because the last record size can exceed the limit. Better implementation is for the future work.
func NewReplicaIter ¶ added in v0.6.3
func NewReplicaIter( ctx context.Context, db *DB, jetID core.RecordID, start core.PulseNumber, end core.PulseNumber, limit int, ) *ReplicaIter
NewReplicaIter creates ReplicaIter what iterates over records on jet, required for heavy material replication.
Params 'start' and 'end' defines pulses from which scan should happen, and on which it should be stopped, but indexes scan are always started from core.FirstPulseNumber.
Param 'limit' sets per message limit.
func (*ReplicaIter) LastSeenPulse ¶ added in v0.7.5
func (r *ReplicaIter) LastSeenPulse() core.PulseNumber
LastPulse returns maximum pulse number of returned keys after each fetch.
func (*ReplicaIter) NextRecords ¶ added in v0.6.3
func (r *ReplicaIter) NextRecords() ([]core.KV, error)
NextRecords fetches next part of key value pairs.
type Store ¶ added in v0.0.5
type Store interface { GetRecord(ctx context.Context, jetID core.RecordID, ref *core.RecordID) (record.Record, error) SetRecord(ctx context.Context, jetID core.RecordID, pulseNumber core.PulseNumber, rec record.Record) (*core.RecordID, error) GetBlob(ctx context.Context, jetID core.RecordID, ref *core.RecordID) ([]byte, error) SetBlob(ctx context.Context, jetID core.RecordID, number core.PulseNumber, blob []byte) (*core.RecordID, error) GetObjectIndex(ctx context.Context, jetID core.RecordID, ref *core.RecordID, forupdate bool) (*index.ObjectLifeline, error) SetObjectIndex(ctx context.Context, jetID core.RecordID, ref *core.RecordID, idx *index.ObjectLifeline) error RemoveObjectIndex(ctx context.Context, jetID core.RecordID, ref *core.RecordID) error // Deprecated: use core.PulseStorage.Current() instead GetLatestPulse(ctx context.Context) (*Pulse, error) GetPulse(ctx context.Context, num core.PulseNumber) (*Pulse, error) }
Store is used by context unaware clients who can work inside transactions as well as outside.
type TransactionManager ¶ added in v0.0.6
type TransactionManager struct {
// contains filtered or unexported fields
}
TransactionManager is used to ensure persistent writes to disk.
func (*TransactionManager) Commit ¶ added in v0.0.6
func (m *TransactionManager) Commit() error
Commit tries to write transaction on disk. Returns error on fail.
func (*TransactionManager) Discard ¶ added in v0.0.6
func (m *TransactionManager) Discard()
Discard terminates transaction without disk writes.
func (*TransactionManager) GetBlob ¶ added in v0.6.2
func (m *TransactionManager) GetBlob(ctx context.Context, jetID core.RecordID, id *core.RecordID) ([]byte, error)
GetBlob returns binary value stored by record ID.
func (*TransactionManager) GetLatestPulse ¶ added in v0.7.5
func (m *TransactionManager) GetLatestPulse(ctx context.Context) (*Pulse, error)
GetLatestPulse returns the latest pulse
func (*TransactionManager) GetObjectIndex ¶ added in v0.0.6
func (m *TransactionManager) GetObjectIndex( ctx context.Context, j core.RecordID, id *core.RecordID, forupdate bool, ) (*index.ObjectLifeline, error)
GetObjectIndex fetches object lifeline index.
func (*TransactionManager) GetPulse ¶ added in v0.6.3
func (m *TransactionManager) GetPulse(ctx context.Context, num core.PulseNumber) (*Pulse, error)
GetPulse returns pulse for provided pulse number.
func (*TransactionManager) GetRecord ¶ added in v0.0.6
func (m *TransactionManager) GetRecord(ctx context.Context, jetID core.RecordID, id *core.RecordID) (record.Record, error)
GetRecord returns record from BadgerDB by *record.Reference.
It returns ErrNotFound if the DB does not contain the key.
func (*TransactionManager) GetRequest ¶ added in v0.5.0
func (m *TransactionManager) GetRequest(ctx context.Context, jetID core.RecordID, id *core.RecordID) (record.Request, error)
GetRequest returns request record from BadgerDB by *record.Reference.
It returns ErrNotFound if the DB does not contain the key.
func (*TransactionManager) RemoveObjectIndex ¶ added in v0.7.5
func (m *TransactionManager) RemoveObjectIndex( ctx context.Context, j core.RecordID, ref *core.RecordID, ) error
RemoveObjectIndex removes an index of an object
func (*TransactionManager) SetBlob ¶ added in v0.6.2
func (m *TransactionManager) SetBlob(ctx context.Context, jetID core.RecordID, pulseNumber core.PulseNumber, blob []byte) (*core.RecordID, error)
SetBlob saves binary value for provided pulse.
func (*TransactionManager) SetObjectIndex ¶ added in v0.0.6
func (m *TransactionManager) SetObjectIndex( ctx context.Context, j core.RecordID, id *core.RecordID, idx *index.ObjectLifeline, ) error
SetObjectIndex stores object lifeline index.
func (*TransactionManager) SetRecord ¶ added in v0.0.6
func (m *TransactionManager) SetRecord(ctx context.Context, j core.RecordID, pulseNumber core.PulseNumber, rec record.Record) (*core.RecordID, error)
SetRecord stores record in BadgerDB and returns *record.ID of new record.
If record exists returns both *record.ID and ErrOverride error. If record not found returns nil and ErrNotFound error
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package index represents indexes and meta information for records.
|
Package index represents indexes and meta information for records. |
Package jet provides methods for working with Jet objects.
|
Package jet provides methods for working with Jet objects. |
Package record contains code and types for storage records manipulation.
|
Package record contains code and types for storage records manipulation. |
Package storagetest contains high level API tests and test utils for other modules.
|
Package storagetest contains high level API tests and test utils for other modules. |