statedb

package
v1.15.0-pre.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 31, 2023 License: Apache-2.0 Imports: 28 Imported by: 0

Documentation

Overview

The statedb package provides a transactional in-memory database with per-table locking built on top of the go-immutable-radix library.

As this is built around an immutable data structure and objects may have lockless readers the stored objects MUST NOT be mutated, but instead a copy must be made prior to mutation and insertion.

See pkg/statedb/example for an example how to construct an application that uses this library.

Index

Constants

View Source
const (
	RevisionIndex          = "__revision__"
	GraveyardIndex         = "__graveyard__"
	GraveyardRevisionIndex = "__graveyard_revision__"
)

Variables

View Source
var (
	// ErrDuplicateTable indicates that StateDB has been provided with two or more table definitions
	// that share the same table name.
	ErrDuplicateTable = errors.New("table already exists")

	// ErrPrimaryIndexNotUnique indicates that the primary index for the table is not marked unique.
	ErrPrimaryIndexNotUnique = errors.New("primary index not unique")

	// ErrDuplicateIndex indicates that the table has two or more indexers that share the same name.
	ErrDuplicateIndex = errors.New("index name already in use")

	// ErrReservedPrefix indicates that the index name is using the reserved prefix and should
	// be renamed.
	ErrReservedPrefix = errors.New("index name uses reserved prefix '" + reservedIndexPrefix + "'")

	// ErrTransactionClosed indicates that a write operation is performed using a transaction
	// that has already been committed or aborted.
	ErrTransactionClosed = errors.New("transaction is closed")

	// ErrTableNotLockedForWriting indicates that a write operation is performed against a
	// table that was not locked for writing, e.g. target table not given as argument to
	// WriteTxn().
	ErrTableNotLockedForWriting = errors.New("not locked for writing")

	// ErrRevisionNotEqual indicates that the CompareAndSwap or CompareAndDelete failed due to
	// the object having a mismatching revision, e.g. it had been changed since the object
	// was last read.
	ErrRevisionNotEqual = errors.New("revision not equal")

	// ErrObjectNotFound indicates that the object was not found when the operation required
	// it to exists. This error is not returned by Insert or Delete, but may be returned by
	// CompareAndSwap or CompareAndDelete.
	ErrObjectNotFound = errors.New("object not found")
)
View Source
var Cell = cell.Module(
	"statedb",
	"In-memory transactional database",

	cell.Provide(
		newHiveDB,
		newDumpHandler,
	),
	cell.Metric(NewMetrics),
)

This module provides an in-memory database built on top of immutable radix trees As the database is based on an immutable data structure, the objects inserted into the database MUST NOT be mutated, but rather copied first!

For example use see pkg/statedb/example.

Functions

func Collect

func Collect[Obj any](iter Iterator[Obj]) []Obj

func NewProtectedTableCell

func NewProtectedTableCell[Obj any](
	tableName TableName,
	primaryIndexer Indexer[Obj],
	secondaryIndexers ...Indexer[Obj],
) cell.Cell

NewProtectedTableCell creates a cell for creating and registering a statedb Table[Obj]. The provided RWTable[Obj] is scoped to the module and thus prevents the table from being directly modified outside this module.

func NewTableCell

func NewTableCell[Obj any](
	tableName TableName,
	primaryIndexer Indexer[Obj],
	secondaryIndexers ...Indexer[Obj],
) cell.Cell

NewTableCell creates a cell for creating and registering a statedb Table[Obj].

func ProcessEach

func ProcessEach[Obj any, It Iterator[Obj]](iter It, fn func(Obj, Revision) error) (err error)

ProcessEach invokes the given function for each object provided by the iterator.

Types

type DB

type DB struct {
	// contains filtered or unexported fields
}

DB provides an in-memory transaction database built on top of immutable radix trees. The database supports multiple tables, each with one or more user-defined indexes. Readers can access the data locklessly with a simple atomic pointer read to obtain a snapshot. On writes to the database table-level locks are acquired on target tables and on write transaction commit a root lock is taken to swap in the new root with the modified tables.

As data is stored in immutable data structures any objects inserted into it MUST NOT be mutated afterwards.

DB holds the "root" tree of tables with each table holding a tree of indexes:

           root
          /    \
         ba    T(foo)
       /   \
   T(bar)  T(baz)

      T(bar).indexes
	   /  \
	  i    I(byRevision)
	/   \
   I(id)    I(ip)

          I(ip)
          /  \
        192  172
        /     ...
    bar(192.168.1.1)

T = tableEntry I = indexTree

To lookup:

  1. Create a read (or write) transaction
  2. Find the table from the root tree
  3. Find the index from the table's index tree
  4. Find the object from the index

To insert:

  1. Create write transaction against the target table
  2. Find the table from the root tree
  3. Create/reuse write transaction on primary index
  4. Insert/replace the object into primary index
  5. Create/reuse write transaction on revision index
  6. If old object existed, remove from revision index
  7. If old object existed, remove from graveyard
  8. Update each secondary index
  9. Commit transaction by committing each index to the table and then committing table to the root. Swap the root atomic pointer to new root and notify by closing channels of all modified nodes.

To observe deletions:

  1. Create write transaction against the target table
  2. Create new delete tracker and add it to the table
  3. Commit the write transaction to update the table with the new delete tracker
  4. Query the graveyard by revision, starting from the revision of the write transaction at which it was created.
  5. For each successfully processed deletion, mark the revision to set low watermark for garbage collection.
  6. Periodically garbage collect the graveyard by finding the lowest revision of all delete trackers.

func NewDB

func NewDB(tables []TableMeta, metrics Metrics) (*DB, error)

func (*DB) ReadTxn

func (db *DB) ReadTxn() ReadTxn

ReadTxn constructs a new read transaction for performing reads against a snapshot of the database.

ReadTxn is not thread-safe!

func (*DB) Start

func (db *DB) Start(hive.HookContext) error

func (*DB) Stop

func (db *DB) Stop(stopCtx hive.HookContext) error

func (*DB) WriteTxn

func (db *DB) WriteTxn(table TableMeta, tables ...TableMeta) WriteTxn

WriteTxn constructs a new write transaction against the given set of tables. Each table is locked, which may block until the table locks are acquired. The modifications performed in the write transaction are not visible outside it until Commit() is called. To discard the changes call Abort().

WriteTxn is not thread-safe!

type DeleteTracker

type DeleteTracker[Obj any] struct {
	// contains filtered or unexported fields
}

func (*DeleteTracker[Obj]) Close

func (dt *DeleteTracker[Obj]) Close()

func (*DeleteTracker[Obj]) Deleted

func (dt *DeleteTracker[Obj]) Deleted(txn ReadTxn, minRevision Revision) Iterator[Obj]

Deleted returns an iterator for deleted objects in this table starting from 'minRevision'. The deleted objects are not garbage-collected unless 'Mark' is called!

func (*DeleteTracker[Obj]) Mark

func (dt *DeleteTracker[Obj]) Mark(upTo Revision)

Mark the revision up to which deleted objects have been processed. This sets the low watermark for deleted object garbage collection.

func (*DeleteTracker[Obj]) Process

func (dt *DeleteTracker[Obj]) Process(txn ReadTxn, minRevision Revision, processFn func(obj Obj, deleted bool, rev Revision) error) (Revision, <-chan struct{}, error)

Process is a helper to iterate updates and deletes to a table in revision order.

The 'processFn' is called for each updated or deleted object in order. If an error is returned by the function the iteration is stopped and the revision at which processing failed and the error is returned. The caller can then retry processing again from this revision by providing it as the 'minRevision'.

type DualIterator

type DualIterator[Obj any] struct {
	// contains filtered or unexported fields
}

DualIterator allows iterating over two iterators in revision order. Meant to be used for combined iteration of LowerBound(ByRevision) and Deleted().

func NewDualIterator

func NewDualIterator[Obj any](left, right Iterator[Obj]) *DualIterator[Obj]

func (*DualIterator[Obj]) Next

func (it *DualIterator[Obj]) Next() (obj Obj, revision uint64, fromLeft, ok bool)

type Index

type Index[Obj any, Key any] struct {
	Name       string
	FromObject func(obj Obj) index.KeySet
	FromKey    func(key Key) []byte
	Unique     bool
}

Index implements the indexing of objects (FromObjects) and querying of objects from the index (FromKey)

func (Index[Obj, Key]) Query

func (i Index[Obj, Key]) Query(key Key) Query[Obj]

Query constructs a query against this index from a key.

type IndexName

type IndexName = string

type Indexer

type Indexer[Obj any] interface {
	// contains filtered or unexported methods
}

Indexer is the "FromObject" subset of Index[Obj, Key] without the 'Key' constraint.

type Iterator

type Iterator[Obj any] interface {
	// Next returns the next object and its revision if ok is true, otherwise
	// zero values to mean that the iteration has finished.
	Next() (obj Obj, rev Revision, ok bool)
}

Iterator for iterating objects returned from queries.

type Metrics

type Metrics struct {
	// How long a read transaction was held.
	WriteTxnDuration metric.Vec[metric.Observer]
	// How long it took to acquire a write transaction for all tables.
	WriteTxnAcquisition metric.Vec[metric.Observer]
	// How long writers were blocked while waiting to acquire a write transaction for a specific table.
	TableContention metric.Vec[metric.Gauge]
	// The amount of objects in a given table.
	TableObjectCount metric.Vec[metric.Gauge]
	// The current revision of a given table.
	TableRevision metric.Vec[metric.Gauge]
	// The amount of delete trackers for a given table.
	TableDeleteTrackerCount metric.Vec[metric.Gauge]
	// The amount of objects in the graveyard for a given table.
	TableGraveyardObjectCount metric.Vec[metric.Gauge]
	// The lowest revision of a given table that has been processed by the graveyard garbage collector.
	TableGraveyardLowWatermark metric.Vec[metric.Gauge]
	// The time it took to clean the graveyard for a given table.
	TableGraveyardCleaningDuration metric.Vec[metric.Observer]
}

func NewMetrics

func NewMetrics() Metrics

type Query

type Query[Obj any] struct {
	// contains filtered or unexported fields
}

func ByRevision

func ByRevision[Obj any](rev uint64) Query[Obj]

ByRevision constructs a revision query. Applicable to any table.

type RWTable added in v1.15.0

type RWTable[Obj any] interface {
	// RWTable[Obj] is a superset of Table[Obj]. Queries made with a
	// write transaction return the fresh uncommitted modifications if any.
	Table[Obj]

	// Insert an object into the table. Returns the object that was
	// replaced if there was one.
	//
	// Possible errors:
	// - ErrTableNotLockedForWriting: table was not locked for writing
	// - ErrTransactionClosed: the write transaction already committed or aborted
	//
	// Each inserted or updated object will be assigned a new unique
	// revision.
	Insert(WriteTxn, Obj) (oldObj Obj, hadOld bool, err error)

	// CompareAndSwap compares the existing object's revision against the
	// given revision and if equal it replaces the object.
	//
	// Possible errors:
	// - ErrRevisionNotEqual: the object has mismatching revision
	// - ErrObjectNotFound: object not found from the table
	// - ErrTableNotLockedForWriting: table was not locked for writing
	// - ErrTransactionClosed: the write transaction already committed or aborted
	CompareAndSwap(WriteTxn, Revision, Obj) (oldObj Obj, hadOld bool, err error)

	// Delete an object from the table. Returns the object that was
	// deleted if there was one.
	//
	// If the table is being tracked for deletions via DeleteTracker()
	// the deleted object is inserted into a graveyard index and garbage
	// collected when all delete trackers have consumed it. Each deleted
	// object in the graveyard has unique revision allowing interleaved
	// iteration of updates and deletions (see (*DeleteTracker[Obj]).Process).
	//
	// Possible errors:
	// - ErrTableNotLockedForWriting: table was not locked for writing
	// - ErrTransactionClosed: the write transaction already committed or aborted
	Delete(WriteTxn, Obj) (oldObj Obj, hadOld bool, err error)

	// DeleteAll removes all objects in the table. Semantically the same as
	// All() + Delete(). See Delete() for more information.
	//
	// Possible errors:
	// - ErrTableNotLockedForWriting: table was not locked for writing
	// - ErrTransactionClosed: the write transaction already committed or aborted
	DeleteAll(WriteTxn) error

	// CompareAndDelete compares the existing object's revision against the
	// given revision and if equal it deletes the object. If object is not
	// found 'hadOld' will be false and 'err' nil.
	//
	// Possible errors:
	// - ErrRevisionNotEqual: the object has mismatching revision
	// - ErrTableNotLockedForWriting: table was not locked for writing
	// - ErrTransactionClosed: the write transaction already committed or aborted
	CompareAndDelete(WriteTxn, Revision, Obj) (oldObj Obj, hadOld bool, err error)
}

RWTable provides methods for modifying the table under a write transaction that targets this table.

func NewTable

func NewTable[Obj any](
	tableName TableName,
	primaryIndexer Indexer[Obj],
	secondaryIndexers ...Indexer[Obj],
) (RWTable[Obj], error)

type ReadTxn

type ReadTxn interface {

	// WriteJSON writes the contents of the database as JSON.
	WriteJSON(io.Writer) error
	// contains filtered or unexported methods
}

type Revision

type Revision = uint64

type Table

type Table[Obj any] interface {
	// TableMeta for querying table metadata that is independent of
	// 'Obj' type.
	TableMeta

	// Revision of the table. Constant for a read transaction, but
	// increments in a write transaction on each Insert and Delete.
	Revision(ReadTxn) Revision

	// All returns an iterator for all objects in the table and a watch
	// channel that is closed when the table changes.
	All(ReadTxn) (Iterator[Obj], <-chan struct{})

	// Get returns an iterator for all objects matching the given query
	// and a watch channel that is closed if the query results are
	// invalidated by a write to the table.
	Get(ReadTxn, Query[Obj]) (Iterator[Obj], <-chan struct{})

	// First returns the first matching object for the query.
	First(ReadTxn, Query[Obj]) (obj Obj, rev Revision, found bool)

	// FirstWatch return the first matching object and a watch channel
	// that is closed if the query is invalidated.
	FirstWatch(ReadTxn, Query[Obj]) (obj Obj, rev Revision, watch <-chan struct{}, found bool)

	// Last returns the last matching object.
	Last(ReadTxn, Query[Obj]) (obj Obj, rev Revision, found bool)

	// LastWatch returns the last matching object and a watch channel
	// that is closed if the query is invalidated.
	LastWatch(ReadTxn, Query[Obj]) (obj Obj, rev Revision, watch <-chan struct{}, found bool)

	// LowerBound returns an iterator for objects that have a key
	// greater or equal to the query. The returned watch channel is closed
	// when anything in the table changes as more fine-grained notifications
	// are not possible with a lower bound search.
	LowerBound(ReadTxn, Query[Obj]) (iter Iterator[Obj], watch <-chan struct{})

	// DeleteTracker creates a new delete tracker for the table.
	//
	// It starts tracking deletions performed against the table from the
	// current revision. A WriteTxn against the target table is required to
	// add the tracker to the table.
	DeleteTracker(txn WriteTxn, trackerName string) (*DeleteTracker[Obj], error)
}

Table provides methods for querying the contents of a table.

type TableMeta

type TableMeta interface {
	Name() TableName // The name of the table
	// contains filtered or unexported methods
}

TableMeta provides information about the table that is independent of the object type (the 'Obj' constraint).

type TableName

type TableName = string

type WriteTxn

type WriteTxn interface {
	// WriteTxn is always also a ReadTxn
	ReadTxn

	// Abort the current transaction. All changes are disgarded.
	// It is safe to call Abort() after calling Commit(), e.g.
	// the following pattern is strongly encouraged to make sure
	// write transactions are always completed:
	//
	//  txn := db.WriteTxn(...)
	//  defer txn.Abort()
	//  ...
	//  txn.Commit()
	Abort()

	// Commit the changes in the current transaction to the target tables.
	// This is a no-op if Abort() or Commit() has already been called.
	Commit()
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL