Documentation ¶
Overview ¶
The statedb package provides a transactional in-memory database with per-table locking. The database indexes objects using Persistive Adaptive Radix Trees. (https://db.in.tum.de/~leis/papers/ART.pdf)
As this is built around an immutable data structure and objects may have lockless readers the stored objects MUST NOT be mutated, but instead a copy must be made prior to mutation and insertion.
See 'example/' for an example how to construct an application that uses this library.
Index ¶
- Constants
- Variables
- func Collect[Obj any](seq iter.Seq2[Obj, Revision]) []Obj
- func CompareCmd(db *DB) script.Cmd
- func DBCmd(db *DB) script.Cmd
- func DeleteCmd(db *DB) script.Cmd
- func Derive[In, Out any](jobName string, transform func(obj In, deleted bool) (Out, DeriveResult)) func(DeriveParams[In, Out])
- func Filter[Obj any](seq iter.Seq2[Obj, Revision], keep func(Obj) bool) iter.Seq2[Obj, Revision]
- func GetCmd(db *DB) script.Cmd
- func InitializedCmd(db *DB) script.Cmd
- func InsertCmd(db *DB) script.Cmd
- func ListCmd(db *DB) script.Cmd
- func LowerBoundCmd(db *DB) script.Cmd
- func Map[In, Out any](seq iter.Seq2[In, Revision], fn func(In) Out) iter.Seq2[Out, Revision]
- func Observable[Obj any](db *DB, table Table[Obj]) stream.Observable[Change[Obj]]
- func PrefixCmd(db *DB) script.Cmd
- func RegisterTable[Obj any](db *DB, table RWTable[Obj]) error
- func ScriptCommands(db *DB) hive.ScriptCmdsOut
- func ShowCmd(db *DB) script.Cmd
- func ToSeq[A, B any](seq iter.Seq2[A, B]) iter.Seq[A]
- func WatchCmd(db *DB) script.Cmd
- type AnyTable
- func (t AnyTable) All(txn ReadTxn) iter.Seq2[any, Revision]
- func (t AnyTable) AllWatch(txn ReadTxn) (iter.Seq2[any, Revision], <-chan struct{})
- func (t AnyTable) Changes(txn WriteTxn) (anyChangeIterator, error)
- func (t AnyTable) Delete(txn WriteTxn, obj any) (old any, hadOld bool, err error)
- func (t AnyTable) Get(txn ReadTxn, index string, key string) (any, Revision, bool, error)
- func (t AnyTable) Insert(txn WriteTxn, obj any) (old any, hadOld bool, err error)
- func (t AnyTable) List(txn ReadTxn, index string, key string) (iter.Seq2[any, Revision], error)
- func (t AnyTable) LowerBound(txn ReadTxn, index string, key string) (iter.Seq2[any, Revision], error)
- func (t AnyTable) Prefix(txn ReadTxn, index string, key string) (iter.Seq2[any, Revision], error)
- func (t AnyTable) Proto() any
- func (t AnyTable) TableHeader() []string
- func (t AnyTable) UnmarshalYAML(data []byte) (any, error)
- type Change
- type ChangeIterator
- type DB
- func (db *DB) GetTable(txn ReadTxn, name string) TableMeta
- func (db *DB) GetTables(txn ReadTxn) (tbls []TableMeta)
- func (db *DB) HTTPHandler() http.Handler
- func (db *DB) NewHandle(name string) *DB
- func (db *DB) ReadTxn() ReadTxn
- func (db *DB) RegisterTable(table TableMeta, tables ...TableMeta) error
- func (db *DB) ServeHTTP(w http.ResponseWriter, r *http.Request)
- func (db *DB) Start() error
- func (db *DB) Stop() error
- func (db *DB) WriteTxn(table TableMeta, tables ...TableMeta) WriteTxn
- type DeriveParams
- type DeriveResult
- type DualIterator
- type ExpVarMetrics
- func (m *ExpVarMetrics) DeleteTrackerCount(name string, numTrackers int)
- func (m *ExpVarMetrics) GraveyardCleaningDuration(name string, duration time.Duration)
- func (m *ExpVarMetrics) GraveyardLowWatermark(name string, lowWatermark Revision)
- func (m *ExpVarMetrics) GraveyardObjectCount(name string, numDeletedObjects int)
- func (m *ExpVarMetrics) ObjectCount(name string, numObjects int)
- func (m *ExpVarMetrics) Revision(name string, revision uint64)
- func (m *ExpVarMetrics) String() (out string)
- func (m *ExpVarMetrics) WriteTxnDuration(handle string, tables []string, acquire time.Duration)
- func (m *ExpVarMetrics) WriteTxnTableAcquisition(handle string, tableName string, acquire time.Duration)
- func (m *ExpVarMetrics) WriteTxnTotalAcquisition(handle string, tables []string, acquire time.Duration)
- type Index
- type IndexName
- type Indexer
- type Iterator
- type Metrics
- type NopMetrics
- func (*NopMetrics) DeleteTrackerCount(tableName string, numTrackers int)
- func (*NopMetrics) GraveyardCleaningDuration(tableName string, duration time.Duration)
- func (*NopMetrics) GraveyardLowWatermark(tableName string, lowWatermark uint64)
- func (*NopMetrics) GraveyardObjectCount(tableName string, numDeletedObjects int)
- func (*NopMetrics) ObjectCount(tableName string, numObjects int)
- func (*NopMetrics) Revision(tableName string, revision uint64)
- func (*NopMetrics) WriteTxnDuration(handle string, tables []string, acquire time.Duration)
- func (*NopMetrics) WriteTxnTableAcquisition(handle string, tableName string, acquire time.Duration)
- func (*NopMetrics) WriteTxnTotalAcquisition(handle string, tables []string, acquire time.Duration)
- type Option
- type Query
- type QueryRequest
- type QueryResponse
- type RWTable
- type ReadTxn
- type RemoteTable
- func (t *RemoteTable[Obj]) Changes(ctx context.Context) (seq iter.Seq2[Change[Obj], Revision], errChan <-chan error)
- func (t *RemoteTable[Obj]) Get(ctx context.Context, q Query[Obj]) (iter.Seq2[Obj, Revision], <-chan error)
- func (t *RemoteTable[Obj]) LowerBound(ctx context.Context, q Query[Obj]) (iter.Seq2[Obj, Revision], <-chan error)
- func (t *RemoteTable[Obj]) SetTransport(tr *http.Transport)
- type Revision
- type Table
- type TableMeta
- type TableName
- type TableWritable
- type WriteTxn
Constants ¶
const ( PrimaryIndexPos = 0 RevisionIndex = "__revision__" RevisionIndexPos = 1 GraveyardIndex = "__graveyard__" GraveyardIndexPos = 2 GraveyardRevisionIndex = "__graveyard_revision__" GraveyardRevisionIndexPos = 3 SecondaryIndexStartPos = 4 )
Variables ¶
var ( // ErrDuplicateTable indicates that StateDB has been provided with two or more table definitions // that share the same table name. ErrDuplicateTable = errors.New("table already exists") // ErrTableNotRegistered indicates that a user tries to write to a table that has not been // registered with this StateDB instance. ErrTableNotRegistered = errors.New("table not registered") // ErrPrimaryIndexNotUnique indicates that the primary index for the table is not marked unique. ErrPrimaryIndexNotUnique = errors.New("primary index not unique") // ErrDuplicateIndex indicates that the table has two or more indexers that share the same name. ErrDuplicateIndex = errors.New("index name already in use") // ErrReservedPrefix indicates that the index name is using the reserved prefix and should // be renamed. ErrReservedPrefix = errors.New("index name uses reserved prefix '" + reservedIndexPrefix + "'") // ErrTransactionClosed indicates that a write operation is performed using a transaction // that has already been committed or aborted. ErrTransactionClosed = errors.New("transaction is closed") // ErrTableNotLockedForWriting indicates that a write operation is performed against a // table that was not locked for writing, e.g. target table not given as argument to // WriteTxn(). ErrTableNotLockedForWriting = errors.New("not locked for writing") // ErrRevisionNotEqual indicates that the CompareAndSwap or CompareAndDelete failed due to // the object having a mismatching revision, e.g. it had been changed since the object // was last read. ErrRevisionNotEqual = errors.New("revision not equal") // ErrObjectNotFound indicates that the object was not found when the operation required // it to exists. This error is not returned by Insert or Delete, but may be returned by // CompareAndSwap or CompareAndDelete. ErrObjectNotFound = errors.New("object not found") )
var Cell = cell.Module( "statedb", "In-memory transactional database", cell.Provide( newHiveDB, ScriptCommands, ), )
This module provides an in-memory database built on top of immutable radix trees As the database is based on an immutable data structure, the objects inserted into the database MUST NOT be mutated, but rather copied first!
Functions ¶
func Collect ¶
Collect creates a slice of objects out of the iterator. The iterator is consumed in the process.
func CompareCmd ¶ added in v0.3.1
func Derive ¶
func Derive[In, Out any](jobName string, transform func(obj In, deleted bool) (Out, DeriveResult)) func(DeriveParams[In, Out])
Derive constructs and registers a job to transform objects from the input table to the output table, e.g. derive the output table from the input table. Useful when constructing a reconciler that has its desired state solely derived from a single table. For example the bandwidth manager's desired state is directly derived from the devices table.
Derive is parametrized with the transform function that transforms the input object into the output object. If the transform function returns false, then the object is skipped.
Example use:
cell.Invoke( statedb.Derive[*tables.Device, *Foo]( func(d *Device, deleted bool) (*Foo, DeriveResult) { if deleted { return &Foo{Index: d.Index}, DeriveDelete } return &Foo{Index: d.Index}, DeriveInsert }), )
func InitializedCmd ¶ added in v0.3.1
func LowerBoundCmd ¶ added in v0.3.1
func Observable ¶
Observable creates an observable from the given table for observing the changes to the table as a stream of events.
For high-churn tables it's advisable to apply rate-limiting to the stream to decrease overhead (stream.Throttle).
func RegisterTable ¶
RegisterTable registers a table to the database:
func NewMyTable() statedb.RWTable[MyTable] { ... } cell.Provide(NewMyTable), cell.Invoke(statedb.RegisterTable[MyTable]),
func ScriptCommands ¶ added in v0.3.1
func ScriptCommands(db *DB) hive.ScriptCmdsOut
Types ¶
type AnyTable ¶ added in v0.3.1
type AnyTable struct {
Meta TableMeta
}
AnyTable allows any-typed access to a StateDB table. This is intended for building generic tooling for accessing the table and should be avoided if possible.
func (AnyTable) LowerBound ¶ added in v0.3.1
func (AnyTable) TableHeader ¶ added in v0.3.1
type Change ¶
type Change[Obj any] struct { Object Obj `json:"obj"` Revision Revision `json:"rev"` Deleted bool `json:"deleted,omitempty"` }
Change is either an update or a delete of an object. Used by Changes() and the Observable(). The 'Revision' is carried also in the Change object so that it is also accessible via Observable.
type ChangeIterator ¶
type ChangeIterator[Obj any] interface { // Next returns the sequence of unobserved changes up to the given ReadTxn (snapshot) and // a watch channel. // // If changes are available Next returns a closed watch channel. Only once there are no further // changes available will a proper watch channel be returned. // // Next can be called again without fully consuming the sequence to pull in new changes. // // The returned sequence is a single-use sequence and subsequent calls will return // an empty sequence. // // If the transaction given to Next is a WriteTxn the modifications made in the // transaction are not observed, that is, only committed changes can be observed. Next(ReadTxn) (iter.Seq2[Change[Obj], Revision], <-chan struct{}) }
type DB ¶
type DB struct {
// contains filtered or unexported fields
}
DB provides an in-memory transaction database built on top of immutable radix trees. The database supports multiple tables, each with one or more user-defined indexes. Readers can access the data locklessly with a simple atomic pointer read to obtain a snapshot. On writes to the database table-level locks are acquired on target tables and on write transaction commit a root lock is taken to swap in the new root with the modified tables.
As data is stored in immutable data structures any objects inserted into it MUST NOT be mutated afterwards.
DB holds the "root" tree of tables with each table holding a tree of indexes:
root / \ ba T(foo) / \ T(bar) T(baz) T(bar).indexes / \ i I(byRevision) / \ I(id) I(ip) I(ip) / \ 192 172 / ... bar(192.168.1.1)
T = tableEntry I = indexTree
To lookup:
- Create a read (or write) transaction
- Find the table from the root tree
- Find the index from the table's index tree
- Find the object from the index
To insert:
- Create write transaction against the target table
- Find the table from the root tree
- Create/reuse write transaction on primary index
- Insert/replace the object into primary index
- Create/reuse write transaction on revision index
- If old object existed, remove from revision index
- If old object existed, remove from graveyard
- Update each secondary index
- Commit transaction by committing each index to the table and then committing table to the root. Swap the root atomic pointer to new root and notify by closing channels of all modified nodes.
To observe deletions:
- Create write transaction against the target table
- Create new delete tracker and add it to the table
- Commit the write transaction to update the table with the new delete tracker
- Query the graveyard by revision, starting from the revision of the write transaction at which it was created.
- For each successfully processed deletion, mark the revision to set low watermark for garbage collection.
- Periodically garbage collect the graveyard by finding the lowest revision of all delete trackers.
func (*DB) HTTPHandler ¶
func (*DB) NewHandle ¶
NewHandle returns a new named handle to the DB. The given name is used to annotate metrics.
func (*DB) ReadTxn ¶
ReadTxn constructs a new read transaction for performing reads against a snapshot of the database.
The returned ReadTxn is not thread-safe.
func (*DB) RegisterTable ¶
RegisterTable registers a table to the database.
func (*DB) ServeHTTP ¶
func (db *DB) ServeHTTP(w http.ResponseWriter, r *http.Request)
ServeHTTP is an HTTP handler for dumping StateDB as JSON.
Example usage:
var db *statedb.DB http.Handle("/db", db) http.ListenAndServe(":8080", nil)
func (*DB) Start ¶
Start the background workers for the database.
This starts the graveyard worker that deals with garbage collecting deleted objects that are no longer necessary for Changes().
func (*DB) WriteTxn ¶
WriteTxn constructs a new write transaction against the given set of tables. Each table is locked, which may block until the table locks are acquired. The modifications performed in the write transaction are not visible outside it until Commit() is called. To discard the changes call Abort().
The returned WriteTxn is not thread-safe.
type DeriveParams ¶
type DeriveResult ¶
type DeriveResult int
const ( DeriveInsert DeriveResult = 0 // Insert the object DeriveUpdate DeriveResult = 1 // Update the object (if it exists) DeriveDelete DeriveResult = 2 // Delete the object DeriveSkip DeriveResult = 3 // Skip )
type DualIterator ¶
type DualIterator[Obj any] struct { // contains filtered or unexported fields }
DualIterator allows iterating over two iterators in revision order. Meant to be used for combined iteration of LowerBound(ByRevision) and Deleted().
func NewDualIterator ¶
func NewDualIterator[Obj any](left, right Iterator[Obj]) *DualIterator[Obj]
func (*DualIterator[Obj]) Next ¶
func (it *DualIterator[Obj]) Next() (obj Obj, revision uint64, fromLeft, ok bool)
type ExpVarMetrics ¶
type ExpVarMetrics struct { LockContentionVar *expvar.Map GraveyardCleaningDurationVar *expvar.Map GraveyardLowWatermarkVar *expvar.Map GraveyardObjectCountVar *expvar.Map ObjectCountVar *expvar.Map WriteTxnAcquisitionVar *expvar.Map WriteTxnDurationVar *expvar.Map DeleteTrackerCountVar *expvar.Map RevisionVar *expvar.Map }
ExpVarMetrics is a simple implementation for the metrics.
func NewExpVarMetrics ¶
func NewExpVarMetrics(publish bool) *ExpVarMetrics
func (*ExpVarMetrics) DeleteTrackerCount ¶
func (m *ExpVarMetrics) DeleteTrackerCount(name string, numTrackers int)
func (*ExpVarMetrics) GraveyardCleaningDuration ¶
func (m *ExpVarMetrics) GraveyardCleaningDuration(name string, duration time.Duration)
func (*ExpVarMetrics) GraveyardLowWatermark ¶
func (m *ExpVarMetrics) GraveyardLowWatermark(name string, lowWatermark Revision)
func (*ExpVarMetrics) GraveyardObjectCount ¶
func (m *ExpVarMetrics) GraveyardObjectCount(name string, numDeletedObjects int)
func (*ExpVarMetrics) ObjectCount ¶
func (m *ExpVarMetrics) ObjectCount(name string, numObjects int)
func (*ExpVarMetrics) Revision ¶
func (m *ExpVarMetrics) Revision(name string, revision uint64)
func (*ExpVarMetrics) String ¶
func (m *ExpVarMetrics) String() (out string)
func (*ExpVarMetrics) WriteTxnDuration ¶
func (m *ExpVarMetrics) WriteTxnDuration(handle string, tables []string, acquire time.Duration)
func (*ExpVarMetrics) WriteTxnTableAcquisition ¶
func (m *ExpVarMetrics) WriteTxnTableAcquisition(handle string, tableName string, acquire time.Duration)
func (*ExpVarMetrics) WriteTxnTotalAcquisition ¶
func (m *ExpVarMetrics) WriteTxnTotalAcquisition(handle string, tables []string, acquire time.Duration)
type Index ¶
type Index[Obj any, Key any] struct { // Name of the index Name string // FromObject extracts key(s) from the object. The key set // can contain 0, 1 or more keys. FromObject func(obj Obj) index.KeySet // FromKey converts the index key into a raw key. // With this we can perform Query() against this index with // the [Key] type. FromKey func(key Key) index.Key // FromString is an optional conversion from string to a raw key. // If implemented allows script commands to query with this index. FromString func(key string) (index.Key, error) // Unique marks the index as unique. Primary index must always be // unique. A secondary index may be non-unique in which case a single // key may map to multiple objects. Unique bool }
Index implements the indexing of objects (FromObjects) and querying of objects from the index (FromKey)
func (Index[Obj, Key]) ObjectToKey ¶
func (Index[Obj, Key]) QueryFromObject ¶
type Indexer ¶
type Indexer[Obj any] interface { ObjectToKey(Obj) index.Key QueryFromObject(Obj) Query[Obj] // contains filtered or unexported methods }
Indexer is the "FromObject" subset of Index[Obj, Key] without the 'Key' constraint.
type Iterator ¶
type Iterator[Obj any] interface { // Next returns the next object and its revision if ok is true, otherwise // zero values to mean that the iteration has finished. Next() (obj Obj, rev Revision, ok bool) }
Iterator for iterating a sequence objects.
type Metrics ¶
type Metrics interface { WriteTxnTableAcquisition(handle string, tableName string, acquire time.Duration) WriteTxnTotalAcquisition(handle string, tables []string, acquire time.Duration) WriteTxnDuration(handle string, tables []string, acquire time.Duration) GraveyardLowWatermark(tableName string, lowWatermark Revision) GraveyardCleaningDuration(tableName string, duration time.Duration) GraveyardObjectCount(tableName string, numDeletedObjects int) ObjectCount(tableName string, numObjects int) DeleteTrackerCount(tableName string, numTrackers int) Revision(tableName string, revision Revision) }
type NopMetrics ¶
type NopMetrics struct{}
func (*NopMetrics) DeleteTrackerCount ¶
func (*NopMetrics) DeleteTrackerCount(tableName string, numTrackers int)
DeleteTrackerCount implements Metrics.
func (*NopMetrics) GraveyardCleaningDuration ¶
func (*NopMetrics) GraveyardCleaningDuration(tableName string, duration time.Duration)
GraveyardCleaningDuration implements Metrics.
func (*NopMetrics) GraveyardLowWatermark ¶
func (*NopMetrics) GraveyardLowWatermark(tableName string, lowWatermark uint64)
GraveyardLowWatermark implements Metrics.
func (*NopMetrics) GraveyardObjectCount ¶
func (*NopMetrics) GraveyardObjectCount(tableName string, numDeletedObjects int)
GraveyardObjectCount implements Metrics.
func (*NopMetrics) ObjectCount ¶
func (*NopMetrics) ObjectCount(tableName string, numObjects int)
ObjectCount implements Metrics.
func (*NopMetrics) Revision ¶
func (*NopMetrics) Revision(tableName string, revision uint64)
Revision implements Metrics.
func (*NopMetrics) WriteTxnDuration ¶
func (*NopMetrics) WriteTxnDuration(handle string, tables []string, acquire time.Duration)
WriteTxnDuration implements Metrics.
func (*NopMetrics) WriteTxnTableAcquisition ¶
func (*NopMetrics) WriteTxnTableAcquisition(handle string, tableName string, acquire time.Duration)
WriteTxnTableAcquisition implements Metrics.
func (*NopMetrics) WriteTxnTotalAcquisition ¶
func (*NopMetrics) WriteTxnTotalAcquisition(handle string, tables []string, acquire time.Duration)
WriteTxnTotalAcquisition implements Metrics.
type Query ¶
type Query[Obj any] struct { // contains filtered or unexported fields }
func ByRevision ¶
ByRevision constructs a revision query. Applicable to any table.
type QueryRequest ¶
type QueryResponse ¶
type RWTable ¶
type RWTable[Obj any] interface { // RWTable[Obj] is a superset of Table[Obj]. Queries made with a // write transaction return the fresh uncommitted modifications if any. Table[Obj] // RegisterInitializer registers an initializer to the table. Returns // a function to mark the initializer done. Once all initializers are // done, Table[*].Initialized() will return true. // This should only be used before the application has started. RegisterInitializer(txn WriteTxn, name string) func(WriteTxn) // ToTable returns the Table[Obj] interface. Useful with cell.Provide // to avoid the anonymous function: // // cell.ProvidePrivate(NewMyTable), // RWTable // cell.Invoke(statedb.Register[statedb.RWTable[Foo]) // // // with anononymous function: // cell.Provide(func(t statedb.RWTable[Foo]) statedb.Table[Foo] { return t }) // // // with ToTable: // cell.Provide(statedb.RWTable[Foo].ToTable), ToTable() Table[Obj] // Insert an object into the table. Returns the object that was // replaced if there was one. // // Possible errors: // - ErrTableNotLockedForWriting: table was not locked for writing // - ErrTransactionClosed: the write transaction already committed or aborted // // Each inserted or updated object will be assigned a new unique // revision. Insert(WriteTxn, Obj) (oldObj Obj, hadOld bool, err error) // Modify an existing object or insert a new object into the table. If an old object // exists the [merge] function is called with the old and new objects. // // Modify is semantically equal to Get + Insert, but avoids extra lookups making // it significantly more efficient. // // Possible errors: // - ErrTableNotLockedForWriting: table was not locked for writing // - ErrTransactionClosed: the write transaction already committed or aborted Modify(txn WriteTxn, new Obj, merge func(old, new Obj) Obj) (oldObj Obj, hadOld bool, err error) // CompareAndSwap compares the existing object's revision against the // given revision and if equal it replaces the object. // // Possible errors: // - ErrRevisionNotEqual: the object has mismatching revision // - ErrObjectNotFound: object not found from the table // - ErrTableNotLockedForWriting: table was not locked for writing // - ErrTransactionClosed: the write transaction already committed or aborted CompareAndSwap(WriteTxn, Revision, Obj) (oldObj Obj, hadOld bool, err error) // Delete an object from the table. Returns the object that was // deleted if there was one. // // If the table is being tracked for deletions via EventIterator() // the deleted object is inserted into a graveyard index and garbage // collected when all delete trackers have consumed it. Each deleted // object in the graveyard has unique revision allowing interleaved // iteration of updates and deletions. // // Possible errors: // - ErrTableNotLockedForWriting: table was not locked for writing // - ErrTransactionClosed: the write transaction already committed or aborted Delete(WriteTxn, Obj) (oldObj Obj, hadOld bool, err error) // DeleteAll removes all objects in the table. Semantically the same as // All() + Delete(). See Delete() for more information. // // Possible errors: // - ErrTableNotLockedForWriting: table was not locked for writing // - ErrTransactionClosed: the write transaction already committed or aborted DeleteAll(WriteTxn) error // CompareAndDelete compares the existing object's revision against the // given revision and if equal it deletes the object. If object is not // found 'hadOld' will be false and 'err' nil. // // Possible errors: // - ErrRevisionNotEqual: the object has mismatching revision // - ErrTableNotLockedForWriting: table was not locked for writing // - ErrTransactionClosed: the write transaction already committed or aborted CompareAndDelete(WriteTxn, Revision, Obj) (oldObj Obj, hadOld bool, err error) }
RWTable provides methods for modifying the table under a write transaction that targets this table.
func MustNewTable ¶
func MustNewTable[Obj any]( tableName TableName, primaryIndexer Indexer[Obj], secondaryIndexers ...Indexer[Obj]) RWTable[Obj]
MustNewTable creates a new table with given name and indexes. Panics if indexes are malformed.
func NewTable ¶
func NewTable[Obj any]( tableName TableName, primaryIndexer Indexer[Obj], secondaryIndexers ...Indexer[Obj], ) (RWTable[Obj], error)
NewTable creates a new table with given name and indexes. Can fail if the indexes or the name are malformed. The name must match regex "^[a-z][a-z0-9_\\-]{0,30}$".
To provide access to the table via Hive:
cell.Provide( // Provide statedb.RWTable[*MyObject]. Often only provided to the module with ProvidePrivate. statedb.NewTable[*MyObject]("my-objects", MyObjectIDIndex, MyObjectNameIndex), // Provide the read-only statedb.Table[*MyObject]. statedb.RWTable[*MyObject].ToTable, )
type RemoteTable ¶
type RemoteTable[Obj any] struct { // contains filtered or unexported fields }
func NewRemoteTable ¶
func NewRemoteTable[Obj any](base *url.URL, table TableName) *RemoteTable[Obj]
NewRemoteTable creates a new handle for querying a remote StateDB table over the HTTP. Example usage:
devices := statedb.NewRemoteTable[*tables.Device](url.Parse("http://localhost:8080/db"), "devices") // Get all devices ordered by name. iter, errs := devices.LowerBound(ctx, tables.DeviceByName("")) for device, revision, ok := iter.Next(); ok; device, revision, ok = iter.Next() { ... } // Get device by name. iter, errs := devices.Get(ctx, tables.DeviceByName("eth0")) if dev, revision, ok := iter.Next(); ok { ... } // Get devices in revision order, e.g. oldest changed devices first. iter, errs = devices.LowerBound(ctx, statedb.ByRevision(0))
func (*RemoteTable[Obj]) LowerBound ¶
func (*RemoteTable[Obj]) SetTransport ¶
func (t *RemoteTable[Obj]) SetTransport(tr *http.Transport)
type Table ¶
type Table[Obj any] interface { // TableMeta for querying table metadata that is independent of // 'Obj' type. TableMeta // PrimaryIndexer returns the primary indexer for the table. // Useful for generic utilities that need access to the primary key. PrimaryIndexer() Indexer[Obj] // All returns a sequence of all objects in the table. All(ReadTxn) iter.Seq2[Obj, Revision] // AllWatch returns a sequence of all objects in the table and a watch // channel that is closed when the table changes. AllWatch(ReadTxn) (iter.Seq2[Obj, Revision], <-chan struct{}) // List returns sequence of objects matching the given query. List(ReadTxn, Query[Obj]) iter.Seq2[Obj, Revision] // ListWatch returns an iterator for all objects matching the given query // and a watch channel that is closed if the query results are // invalidated by a write to the table. ListWatch(ReadTxn, Query[Obj]) (iter.Seq2[Obj, Revision], <-chan struct{}) // Get returns the first matching object for the query. Get(ReadTxn, Query[Obj]) (obj Obj, rev Revision, found bool) // GetWatch return the first matching object and a watch channel // that is closed if the query is invalidated. GetWatch(ReadTxn, Query[Obj]) (obj Obj, rev Revision, watch <-chan struct{}, found bool) // LowerBound returns an iterator for objects that have a key // greater or equal to the query. LowerBound(ReadTxn, Query[Obj]) iter.Seq2[Obj, Revision] // LowerBoundWatch returns an iterator for objects that have a key // greater or equal to the query. The returned watch channel is closed // when anything in the table changes as more fine-grained notifications // are not possible with a lower bound search. LowerBoundWatch(ReadTxn, Query[Obj]) (seq iter.Seq2[Obj, Revision], watch <-chan struct{}) // Prefix searches the table by key prefix. Prefix(ReadTxn, Query[Obj]) iter.Seq2[Obj, Revision] // PrefixWatch searches the table by key prefix. Returns an iterator and a watch // channel that closes when the query results have become stale. PrefixWatch(ReadTxn, Query[Obj]) (seq iter.Seq2[Obj, Revision], watch <-chan struct{}) // Changes returns an iterator for changes happening to the table. // This uses the revision index to iterate over the objects in the order // they have changed. Deleted objects are placed onto a temporary index // (graveyard) where they live until all change iterators have observed // the deletion. // // If an object is created and deleted before the observer has iterated // over the creation then only the deletion is seen. Changes(WriteTxn) (ChangeIterator[Obj], error) }
Table provides methods for querying the contents of a table.
type TableMeta ¶
type TableMeta interface { // Name returns the name of the table Name() TableName // Indexes returns the names of the indexes Indexes() []string // NumObjects returns the number of objects stored in the table. NumObjects(ReadTxn) int // Initialized returns true if in this ReadTxn (snapshot of the database) // the registered initializers have all been completed. The returned // watch channel will be closed when the table becomes initialized. Initialized(ReadTxn) (bool, <-chan struct{}) // PendingInitializers returns the set of pending initializers that // have not yet completed. PendingInitializers(ReadTxn) []string // Revision of the table. Constant for a read transaction, but // increments in a write transaction on each Insert and Delete. Revision(ReadTxn) Revision // contains filtered or unexported methods }
TableMeta provides information about the table that is independent of the object type (the 'Obj' constraint).
type TableWritable ¶
type TableWritable interface { // TableHeader returns the header columns that are independent of the // object. TableHeader() []string // TableRow returns the row columns for this object. TableRow() []string }
TableWritable is a constraint for objects that implement tabular pretty-printing. Used in "cilium-dbg statedb" sub-commands.
type WriteTxn ¶
type WriteTxn interface { // WriteTxn is always also a ReadTxn ReadTxn // Abort the current transaction. All changes are disgarded. // It is safe to call Abort() after calling Commit(), e.g. // the following pattern is strongly encouraged to make sure // write transactions are always completed: // // txn := db.WriteTxn(...) // defer txn.Abort() // ... // txn.Commit() Abort() // Commit the changes in the current transaction to the target tables. // This is a no-op if Abort() or Commit() has already been called. // Returns a ReadTxn for reading the database at the time of commit. Commit() ReadTxn }