Documentation ¶
Overview ¶
The statedb package provides a transactional in-memory database with per-table locking built on top of the go-immutable-radix library.
As this is built around an immutable data structure and objects may have lockless readers the stored objects MUST NOT be mutated, but instead a copy must be made prior to mutation and insertion.
See pkg/statedb/example for an example how to construct an application that uses this library.
Index ¶
- Constants
- Variables
- func Collect[Obj any](iter Iterator[Obj]) []Obj
- func NewTableCell[Obj any](tableName TableName, primaryIndexer Indexer[Obj], ...) cell.Cell
- func ProcessEach[Obj any, It Iterator[Obj]](iter It, fn func(Obj, Revision) error) (err error)
- type DB
- type DeleteTracker
- func (dt *DeleteTracker[Obj]) Close()
- func (dt *DeleteTracker[Obj]) Deleted(txn ReadTxn, minRevision Revision) Iterator[Obj]
- func (dt *DeleteTracker[Obj]) Mark(txn ReadTxn, upTo Revision)
- func (dt *DeleteTracker[Obj]) Process(txn ReadTxn, minRevision Revision, ...) (Revision, <-chan struct{}, error)
- type DualIterator
- type Index
- type IndexName
- type Indexer
- type Iterator
- type Metrics
- type Query
- type ReadTxn
- type Revision
- type Table
- type TableMeta
- type TableName
- type WriteTxn
Constants ¶
const ( RevisionIndex = "__revision__" GraveyardIndex = "__graveyard__" GraveyardRevisionIndex = "__graveyard_revision__" )
Variables ¶
var Cell = cell.Module( "statedb", "In-memory transactional database", cell.Provide( newHiveDB, newDumpHandler, ), cell.Metric(NewMetrics), )
This module provides an in-memory database built on top of immutable radix trees As the database is based on an immutable data structure, the objects inserted into the database MUST NOT be mutated, but rather copied first!
For example use see pkg/statedb/example.
Functions ¶
Types ¶
type DB ¶
type DB struct {
// contains filtered or unexported fields
}
DB provides an in-memory transaction database built on top of immutable radix trees. The database supports multiple tables, each with one or more user-defined indexes. Readers can access the data locklessly with a simple atomic pointer read to obtain a snapshot. On writes to the database table-level locks are acquired on target tables and on write transaction commit a root lock is taken to swap in the new root with the modified tables.
As data is stored in immutable data structures any objects inserted into it MUST NOT be mutated afterwards.
DB holds the "root" tree of tables with each table holding a tree of indexes:
root / \ ba T(foo) / \ T(bar) T(baz) T(bar).indexes / \ i I(byRevision) / \ I(id) I(ip) I(ip) / \ 192 172 / ... bar(192.168.1.1)
T = tableEntry I = indexTree
To lookup:
- Create a read (or write) transaction
- Find the table from the root tree
- Find the index from the table's index tree
- Find the object from the index
To insert:
- Create write transaction against the target table
- Find the table from the root tree
- Create/reuse write transaction on primary index
- Insert/replace the object into primary index
- Create/reuse write transaction on revision index
- If old object existed, remove from revision index
- If old object existed, remove from graveyard
- Update each secondary index
- Commit transaction by committing each index to the table and then committing table to the root. Swap the root atomic pointer to new root and notify by closing channels of all modified nodes.
To observe deletions:
- Create write transaction against the target table
- Create new delete tracker and add it to the table
- Commit the write transaction to update the table with the new delete tracker
- Query the graveyard by revision, starting from the revision of the write transaction at which it was created.
- For each successfully processed deletion, mark the revision to set low watermark for garbage collection.
- Periodically garbage collect the graveyard by finding the lowest revision of all delete trackers.
func (*DB) ReadTxn ¶
ReadTxn constructs a new read transaction for performing reads against a snapshot of the database.
ReadTxn is not thread-safe!
func (*DB) WriteTxn ¶
WriteTxn constructs a new write transaction against the given set of tables. Each table is locked, which may block until the table locks are acquired. The modifications performed in the write transaction are not visible outside it until Commit() is called. To discard the changes call Abort().
WriteTxn is not thread-safe!
type DeleteTracker ¶
type DeleteTracker[Obj any] struct { // contains filtered or unexported fields }
func (*DeleteTracker[Obj]) Close ¶
func (dt *DeleteTracker[Obj]) Close()
func (*DeleteTracker[Obj]) Deleted ¶
func (dt *DeleteTracker[Obj]) Deleted(txn ReadTxn, minRevision Revision) Iterator[Obj]
Deleted returns an iterator for deleted objects in this table starting from 'minRevision'. The deleted objects are not garbage-collected unless 'Mark' is called!
func (*DeleteTracker[Obj]) Mark ¶
func (dt *DeleteTracker[Obj]) Mark(txn ReadTxn, upTo Revision)
Mark the revision up to which deleted objects have been processed. This sets the low watermark for deleted object garbage collection.
func (*DeleteTracker[Obj]) Process ¶
func (dt *DeleteTracker[Obj]) Process(txn ReadTxn, minRevision Revision, processFn func(obj Obj, deleted bool, rev Revision) error) (Revision, <-chan struct{}, error)
Process is a helper to iterate updates and deletes to a table in revision order.
The 'processFn' is called for each updated or deleted object in order. If an error is returned by the function the iteration is stopped and the revision at which processing failed and the error is returned. The caller can then retry processing again from this revision by providing it as the 'minRevision'.
type DualIterator ¶
type DualIterator[Obj any] struct { // contains filtered or unexported fields }
DualIterator allows iterating over two iterators in revision order. Meant to be used for combined iteration of LowerBound(ByRevision) and Deleted().
func NewDualIterator ¶
func NewDualIterator[Obj any](left, right Iterator[Obj]) *DualIterator[Obj]
func (*DualIterator[Obj]) Next ¶
func (it *DualIterator[Obj]) Next() (obj Obj, revision uint64, fromLeft, ok bool)
type Index ¶
type Index[Obj any, Key any] struct { Name string FromObject func(obj Obj) index.KeySet FromKey func(key Key) []byte Unique bool }
Index implements the indexing of objects (FromObjects) and querying of objects from the index (FromKey)
type Indexer ¶
type Indexer[Obj any] interface { // contains filtered or unexported methods }
Indexer is the "FromObject" subset of Index[Obj, Key] without the 'Key' constraint.
type Iterator ¶
type Iterator[Obj any] interface { // Next returns the next object its revision if ok is true, otherwise // zero values to mean that the iteration has finished. Next() (obj Obj, rev Revision, ok bool) }
Iterator for iterating objects returned from queries.
type Metrics ¶
type Metrics struct { // How long a read transaction was held. WriteTxnDuration metric.Vec[metric.Observer] // How long it took to acquire a write transaction for all tables. WriteTxnAcquisition metric.Vec[metric.Observer] // How long writers were blocked while waiting to acquire a write transaction for a specific table. TableContention metric.Vec[metric.Gauge] // The amount of objects in a given table. TableObjectCount metric.Vec[metric.Gauge] // The current revision of a given table. TableRevision metric.Vec[metric.Gauge] // The amount of delete trackers for a given table. TableDeleteTrackerCount metric.Vec[metric.Gauge] // The amount of objects in the graveyard for a given table. TableGraveyardObjects metric.Vec[metric.Gauge] // The lowest revision of a given table that has been processed by the graveyard garbage collector. TableGraveyardLowWatermark metric.Vec[metric.Gauge] // The time it took to clean the graveyard for a given table. TableGraveyardCleaningDuration metric.Vec[metric.Observer] }
func NewMetrics ¶
func NewMetrics() Metrics
type Query ¶
type Query[Obj any] struct { // contains filtered or unexported fields }
func ByRevision ¶
ByRevision constructs a revision query. Applicable to any table.
type Table ¶
type Table[Obj any] interface { // TableMeta for querying table metadata that is independent of // 'Obj' type. Provides the database access to table's indexers. TableMeta // Revision of the table. Constant for a read transaction, but // increments in a write transaction on each Insert and Delete. Revision(ReadTxn) Revision // All returns an iterator for all objects in the table and a watch // channel that is closed when the table changes. All(ReadTxn) (Iterator[Obj], <-chan struct{}) // Get returns an iterator for all objects matching the given query // and a watch channel that is closed if the query results are // invalidated by a write to the table. Get(ReadTxn, Query[Obj]) (Iterator[Obj], <-chan struct{}) // First returns the first matching object for the query. First(ReadTxn, Query[Obj]) (obj Obj, rev Revision, found bool) // FirstWatch return the first matching object and a watch channel // that is closed if the query is invalidated. FirstWatch(ReadTxn, Query[Obj]) (obj Obj, rev Revision, watch <-chan struct{}, found bool) // Last returns the last matching object. Last(ReadTxn, Query[Obj]) (obj Obj, rev Revision, found bool) // LastWatch returns the last matching object and a watch channel // that is closed if the query is invalidated. LastWatch(ReadTxn, Query[Obj]) (obj Obj, rev Revision, watch <-chan struct{}, found bool) // LowerBound returns an iterator for objects that have a key // greater or equal to the query. The returned watch channel is closed // when anything in the table changes as more fine-grained notifications // are not possible with a lower bound search. LowerBound(ReadTxn, Query[Obj]) (iter Iterator[Obj], watch <-chan struct{}) // Insert an object into the table. Returns the object that was // replaced if there was one. Error may be returned if the table // is not locked for writing or if the write transaction has already // been committed or aborted. // // Each inserted or updated object will be assigned a new unique // revision. Insert(WriteTxn, Obj) (oldObj Obj, hadOld bool, err error) // Delete an object from the table. Returns the object that was // deleted if there was one. Error may be returned if the table // is not locked for writing or if the write transaction has already // been committed or aborted. // // If the table is being tracked for deletions via DeleteTracker() // the deleted object is inserted into a graveyard index and garbage // collected when all delete trackers have consumed it. Each deleted // object in the graveyard has unique revision allowing interleaved // iteration of updates and deletions (see (*DeleteTracker[Obj]).Process). Delete(WriteTxn, Obj) (oldObj Obj, hadOld bool, err error) DeleteAll(WriteTxn) error // DeleteTracker creates a new delete tracker for the table // starting from the given revision. DeleteTracker(txn WriteTxn, trackerName string) (*DeleteTracker[Obj], error) }