kv

package
v1.0.0-beta.119 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 13, 2023 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	NumShards             = 128
	AtomicSuffixSeparator = "__A__"
)
View Source
const (
	KB   = 1000
	KB99 = 99 * KB
)
View Source
const (
	InsertEvent  = "insert"
	ReplaceEvent = "replace"
	UpdateEvent  = "update"
	DeleteEvent  = "delete"
)
View Source
const (
	StatsSizeKey     = "size"
	StatsRowCountKey = "count"

	StatsSearchFieldsKey = "search_size"
)

Variables

View Source
var (
	// ErrDuplicateKey is returned when an insert call is made for a key that already exist.
	ErrDuplicateKey = NewStoreError(0, ErrCodeDuplicateKey, "duplicate key value, violates key constraint")
	// ErrConflictingTransaction is returned when there are conflicting transactions.
	ErrConflictingTransaction = NewStoreError(1020, ErrCodeConflictingTransaction, "transaction not committed due to conflict with another transaction")
	// ErrTransactionMaxDurationReached is returned when transaction running beyond 5seconds.
	ErrTransactionMaxDurationReached = NewStoreError(1007, ErrCodeTransactionMaxDuration, "transaction is old to perform reads or be committed")
	// ErrTransactionTimedOut is returned when fdb abort the transaction because of 5seconds limit.
	ErrTransactionTimedOut     = NewStoreError(1031, ErrCodeTransactionTimedOut, "operation aborted because the transaction timed out")
	ErrTransactionNotCommitted = NewStoreError(1021, ErrCodeTransactionNotCommitted, "transaction may or may not have committed")
	ErrValueSizeExceeded       = NewStoreError(2103, ErrCodeValueSizeExceeded, "document exceeds limit")
	ErrTransactionSizeExceeded = NewStoreError(2101, ErrCodeTransactionSizeExceeded, "transaction exceeds limit")
	ErrNotFound                = NewStoreError(0, ErrCodeNotFound, "not found")
)
View Source
var StatsTable = []byte("stats")

Functions

func CtxWithSize

func CtxWithSize(ctx context.Context, size int32) context.Context

func GetSizeFromCtx

func GetSizeFromCtx(ctx context.Context) int32

func IsTimedOut

func IsTimedOut(err error) bool

func NewStoreError

func NewStoreError(fdbCode int, code StoreErrCode, msg string, args ...any) error

func WrapEventListenerCtx

func WrapEventListenerCtx(ctx context.Context) context.Context

Types

type AtomicIterator

type AtomicIterator interface {
	Next(value *FdbBaseKeyValue[int64]) bool
	Err() error
}

type AtomicIteratorImpl

type AtomicIteratorImpl struct {
	// contains filtered or unexported fields
}

func (*AtomicIteratorImpl) Err

func (i *AtomicIteratorImpl) Err() error

func (*AtomicIteratorImpl) Next

func (i *AtomicIteratorImpl) Next(value *FdbBaseKeyValue[int64]) bool

type Builder

type Builder struct {
	// contains filtered or unexported fields
}

func NewBuilder

func NewBuilder() *Builder

func (*Builder) Build

func (b *Builder) Build(cfg *config.FoundationDBConfig) (TxStore, error)

Build will create the TxStore in an order. For example, a simple kv is created first then chunk store is created using this simple kv. Listener enabled will be added after chunking so that it is called before chunking. Finally, the measure at the end.

func (*Builder) WithChunking

func (b *Builder) WithChunking() *Builder

func (*Builder) WithCompression

func (b *Builder) WithCompression() *Builder

func (*Builder) WithListener

func (b *Builder) WithListener() *Builder

func (*Builder) WithMeasure

func (b *Builder) WithMeasure() *Builder

func (*Builder) WithStats

func (b *Builder) WithStats() *Builder

type ChunkIterator

type ChunkIterator struct {
	Iterator
	// contains filtered or unexported fields
}

func (*ChunkIterator) Err

func (it *ChunkIterator) Err() error

func (*ChunkIterator) Next

func (it *ChunkIterator) Next(value *KeyValue) bool

type ChunkTx

type ChunkTx struct {
	*KeyValueTx
	// contains filtered or unexported fields
}

func (ChunkTx) AtomicAdd

func (t ChunkTx) AtomicAdd(_ context.Context, table []byte, key Key, value int64) error

func (ChunkTx) AtomicRead

func (t ChunkTx) AtomicRead(_ context.Context, table []byte, key Key) (int64, error)

func (ChunkTx) AtomicReadPrefix

func (t ChunkTx) AtomicReadPrefix(ctx context.Context, table []byte, key Key, isSnapshot bool) (AtomicIterator, error)

func (ChunkTx) AtomicReadRange

func (t ChunkTx) AtomicReadRange(ctx context.Context, table []byte, lkey Key, rkey Key, isSnapshot bool) (AtomicIterator, error)

func (ChunkTx) Commit

func (t ChunkTx) Commit(_ context.Context) error

func (ChunkTx) Delete

func (t ChunkTx) Delete(_ context.Context, table []byte, key Key) error

func (ChunkTx) DeleteRange

func (t ChunkTx) DeleteRange(_ context.Context, table []byte, lKey Key, rKey Key) error

func (ChunkTx) Get

func (t ChunkTx) Get(_ context.Context, key []byte, isSnapshot bool) Future

func (*ChunkTx) Insert

func (tx *ChunkTx) Insert(ctx context.Context, table []byte, key Key, data *internal.TableData) error

func (ChunkTx) IsRetriable

func (t ChunkTx) IsRetriable() bool

IsRetriable returns true if transaction can be retried after error.

func (ChunkTx) RangeSize

func (t ChunkTx) RangeSize(_ context.Context, table []byte, lKey Key, rKey Key) (int64, error)

RangeSize calculates approximate range table size in bytes - this is an estimate and a range smaller than 3mb will not be that accurate.

func (*ChunkTx) Read

func (tx *ChunkTx) Read(ctx context.Context, table []byte, key Key, reverse bool) (Iterator, error)

Read needs to return chunk iterator so that it can merge and returned merged chunk to caller.

func (*ChunkTx) ReadRange

func (tx *ChunkTx) ReadRange(ctx context.Context, table []byte, lKey Key, rKey Key, isSnapshot bool, reverse bool) (Iterator, error)

func (*ChunkTx) Replace

func (tx *ChunkTx) Replace(ctx context.Context, table []byte, key Key, data *internal.TableData, isUpdate bool) error

func (ChunkTx) Rollback

func (t ChunkTx) Rollback(_ context.Context) error

func (ChunkTx) SetVersionstampedKey

func (t ChunkTx) SetVersionstampedKey(_ context.Context, key []byte, value []byte) error

func (ChunkTx) SetVersionstampedValue

func (t ChunkTx) SetVersionstampedValue(_ context.Context, key []byte, value []byte) error

type ChunkTxStore

type ChunkTxStore struct {
	TxStore
	// contains filtered or unexported fields
}

ChunkTxStore is used as a layer on top of KeyValueTxStore. The idea is that chunk store will automatically split the user payload if it is greater than 99KB. It adds some metadata in the zeroth chunk like total chunks so that it can easily merge the value again. The attributes of data passed by the caller is only needed in the first chunk, the remaining chunks only have body. The chunk number is appended at the end in the format "__<chunk number>". This number is used during merging from the key so there is no information apart from total chunk is persisted in the value.

func (*ChunkTxStore) BeginTx

func (store *ChunkTxStore) BeginTx(ctx context.Context) (Tx, error)

type CompressTx

type CompressTx struct {
	Tx
	// contains filtered or unexported fields
}

func (*CompressTx) Insert

func (tx *CompressTx) Insert(ctx context.Context, table []byte, key Key, data *internal.TableData) error

func (*CompressTx) Read

func (tx *CompressTx) Read(ctx context.Context, table []byte, key Key, reverse bool) (Iterator, error)

func (*CompressTx) ReadRange

func (tx *CompressTx) ReadRange(ctx context.Context, table []byte, lKey Key, rKey Key, isSnapshot bool, reverse bool) (Iterator, error)

func (*CompressTx) Replace

func (tx *CompressTx) Replace(ctx context.Context, table []byte, key Key, data *internal.TableData, isUpdate bool) error

type CompressTxStore

type CompressTxStore struct {
	TxStore
	// contains filtered or unexported fields
}

func (*CompressTxStore) BeginTx

func (store *CompressTxStore) BeginTx(ctx context.Context) (Tx, error)

type CtxSearchSize

type CtxSearchSize struct{}

type CtxValueSize

type CtxValueSize struct{}

type DecompressIterator

type DecompressIterator struct {
	Iterator
	// contains filtered or unexported fields
}

func (*DecompressIterator) Err

func (it *DecompressIterator) Err() error

func (*DecompressIterator) Next

func (it *DecompressIterator) Next(value *KeyValue) bool

type DefaultListener

type DefaultListener struct {
	Events []*Event
}

func (*DefaultListener) GetEvents

func (l *DefaultListener) GetEvents() []*Event

func (*DefaultListener) OnClear

func (l *DefaultListener) OnClear(op string, table []byte, key Key)

func (*DefaultListener) OnSet

func (l *DefaultListener) OnSet(op string, table []byte, key Key, data *internal.TableData)

type Event

type Event struct {
	Op    string
	Table []byte
	Key   Key                 `json:",omitempty"`
	Data  *internal.TableData `json:",omitempty"`
	Last  bool
}

type EventListener

type EventListener interface {
	// OnSet buffers insert/replace/update events
	OnSet(op string, table []byte, key Key, data *internal.TableData)
	// OnClear buffers delete events
	OnClear(op string, table []byte, key Key)
	// GetEvents is used to access buffered events. These events may be shared by different participants callers are
	// strongly discourage to modify the event and if needed copy it to some other buffer. Once transaction completes
	// session may discard all the buffered events.
	GetEvents() []*Event
}

EventListener is listener to buffer all the changes in a transaction. It is attached by server layer in the context, and it is only responsible for buffering of the events but doesn't participate in the outcome of the transaction i.e. EventListener has no knowledge whether the transaction was committed or rolled back. The lifecycle of this listener is managed by QuerySession in server package.

func GetEventListener

func GetEventListener(ctx context.Context) EventListener

type EventListenerCtxKey

type EventListenerCtxKey struct{}

type FdbBaseKeyValue

type FdbBaseKeyValue[T fdbBaseType] struct {
	Key    Key
	FDBKey []byte
	Data   T
}

FdbBaseKeyValue type for when we are not iterating over TableData.

type Future

type Future = fdb.FutureByteSlice

type Iterator

type Iterator interface {
	Next(value *KeyValue) bool
	Err() error
}

type KV

type KV interface {
	Insert(ctx context.Context, table []byte, key Key, data *internal.TableData) error
	Replace(ctx context.Context, table []byte, key Key, data *internal.TableData, isUpdate bool) error
	Delete(ctx context.Context, table []byte, key Key) error
	Read(ctx context.Context, table []byte, key Key, reverse bool) (Iterator, error)
	ReadRange(ctx context.Context, table []byte, lkey Key, rkey Key, isSnapshot bool, reverse bool) (Iterator, error)

	GetMetadata(ctx context.Context, table []byte, key Key) (*internal.TableData, error)

	SetVersionstampedKey(_ context.Context, key []byte, value []byte) error
	SetVersionstampedValue(ctx context.Context, key []byte, value []byte) error
	Get(ctx context.Context, key []byte, isSnapshot bool) Future
	AtomicAdd(ctx context.Context, table []byte, key Key, value int64) error
	AtomicRead(ctx context.Context, table []byte, key Key) (int64, error)
	AtomicReadRange(ctx context.Context, table []byte, lKey Key, rKey Key, isSnapshot bool) (AtomicIterator, error)
	AtomicReadPrefix(ctx context.Context, table []byte, key Key, isSnapshot bool) (AtomicIterator, error)
}

type Key

type Key []KeyPart

func BuildKey

func BuildKey(parts ...any) Key

func (Key) AddPart

func (k Key) AddPart(part any) Key

type KeyPart

type KeyPart any

type KeyValue

type KeyValue struct {
	Key    Key
	FDBKey []byte
	Data   *internal.TableData
}

type KeyValueIterator

type KeyValueIterator struct {
	// contains filtered or unexported fields
}

func NewKeyValueIterator

func NewKeyValueIterator(ctx context.Context, iter baseIterator) *KeyValueIterator

func (*KeyValueIterator) Err

func (i *KeyValueIterator) Err() error

func (*KeyValueIterator) Next

func (i *KeyValueIterator) Next(value *KeyValue) bool

type KeyValueIteratorWithMetrics

type KeyValueIteratorWithMetrics struct {
	Iterator
	// contains filtered or unexported fields
}

func NewKeyValueIteratorWithMetrics

func NewKeyValueIteratorWithMetrics(ctx context.Context, iter Iterator) *KeyValueIteratorWithMetrics

func (*KeyValueIteratorWithMetrics) Err

func (*KeyValueIteratorWithMetrics) Next

func (i *KeyValueIteratorWithMetrics) Next(value *KeyValue) bool

type KeyValueTx

type KeyValueTx struct {
	// contains filtered or unexported fields
}

func (KeyValueTx) AtomicAdd

func (t KeyValueTx) AtomicAdd(_ context.Context, table []byte, key Key, value int64) error

func (KeyValueTx) AtomicRead

func (t KeyValueTx) AtomicRead(_ context.Context, table []byte, key Key) (int64, error)

func (KeyValueTx) AtomicReadPrefix

func (t KeyValueTx) AtomicReadPrefix(ctx context.Context, table []byte, key Key, isSnapshot bool) (AtomicIterator, error)

func (KeyValueTx) AtomicReadRange

func (t KeyValueTx) AtomicReadRange(ctx context.Context, table []byte, lkey Key, rkey Key, isSnapshot bool) (AtomicIterator, error)

func (KeyValueTx) Commit

func (t KeyValueTx) Commit(_ context.Context) error

func (KeyValueTx) Delete

func (t KeyValueTx) Delete(_ context.Context, table []byte, key Key) error

func (KeyValueTx) DeleteRange

func (t KeyValueTx) DeleteRange(_ context.Context, table []byte, lKey Key, rKey Key) error

func (KeyValueTx) Get

func (t KeyValueTx) Get(_ context.Context, key []byte, isSnapshot bool) Future

func (*KeyValueTx) GetMetadata

func (tx *KeyValueTx) GetMetadata(ctx context.Context, table []byte, key Key) (*internal.TableData, error)

func (*KeyValueTx) Insert

func (tx *KeyValueTx) Insert(ctx context.Context, table []byte, key Key, data *internal.TableData) error

func (KeyValueTx) IsRetriable

func (t KeyValueTx) IsRetriable() bool

IsRetriable returns true if transaction can be retried after error.

func (KeyValueTx) RangeSize

func (t KeyValueTx) RangeSize(_ context.Context, table []byte, lKey Key, rKey Key) (int64, error)

RangeSize calculates approximate range table size in bytes - this is an estimate and a range smaller than 3mb will not be that accurate.

func (*KeyValueTx) Read

func (tx *KeyValueTx) Read(ctx context.Context, table []byte, key Key, reverse bool) (Iterator, error)

func (*KeyValueTx) ReadRange

func (tx *KeyValueTx) ReadRange(ctx context.Context, table []byte, lkey Key, rkey Key, isSnapshot bool, reverse bool) (Iterator, error)

func (*KeyValueTx) Replace

func (tx *KeyValueTx) Replace(ctx context.Context, table []byte, key Key, data *internal.TableData, isUpdate bool) error

func (KeyValueTx) Rollback

func (t KeyValueTx) Rollback(_ context.Context) error

func (KeyValueTx) SetVersionstampedKey

func (t KeyValueTx) SetVersionstampedKey(_ context.Context, key []byte, value []byte) error

func (KeyValueTx) SetVersionstampedValue

func (t KeyValueTx) SetVersionstampedValue(_ context.Context, key []byte, value []byte) error

type KeyValueTxStore

type KeyValueTxStore struct {
	// contains filtered or unexported fields
}

func (KeyValueTxStore) AtomicAdd

func (d KeyValueTxStore) AtomicAdd(ctx context.Context, table []byte, key Key, value int64) error

func (KeyValueTxStore) AtomicRead

func (d KeyValueTxStore) AtomicRead(ctx context.Context, table []byte, key Key) (int64, error)

func (KeyValueTxStore) AtomicReadRange

func (d KeyValueTxStore) AtomicReadRange(ctx context.Context, table []byte, lKey Key, rKey Key, isSnapshot bool) (AtomicIterator, error)

func (*KeyValueTxStore) BeginTx

func (k *KeyValueTxStore) BeginTx(ctx context.Context) (Tx, error)

func (KeyValueTxStore) CreateTable

func (KeyValueTxStore) CreateTable(_ context.Context, name []byte) error

func (KeyValueTxStore) Delete

func (d KeyValueTxStore) Delete(ctx context.Context, table []byte, key Key) error

func (KeyValueTxStore) DropTable

func (d KeyValueTxStore) DropTable(ctx context.Context, name []byte) error

func (KeyValueTxStore) Get

func (d KeyValueTxStore) Get(ctx context.Context, key []byte, isSnapshot bool) Future

func (*KeyValueTxStore) GetInternalDatabase

func (k *KeyValueTxStore) GetInternalDatabase() (any, error)

func (*KeyValueTxStore) GetTableStats

func (k *KeyValueTxStore) GetTableStats(ctx context.Context, table []byte) (*TableStats, error)

func (KeyValueTxStore) Insert

func (d KeyValueTxStore) Insert(ctx context.Context, table []byte, key Key, data []byte) error

func (KeyValueTxStore) Read

func (d KeyValueTxStore) Read(ctx context.Context, table []byte, key Key, isSnapshot bool, reverse bool) (baseIterator, error)

Read returns all the keys which has prefix equal to "key" parameter.

func (KeyValueTxStore) ReadRange

func (d KeyValueTxStore) ReadRange(ctx context.Context, table []byte, lKey Key, rKey Key, isSnapshot bool, reverse bool) (baseIterator, error)

func (KeyValueTxStore) Replace

func (d KeyValueTxStore) Replace(ctx context.Context, table []byte, key Key, data []byte, isUpdate bool) error

func (KeyValueTxStore) SetVersionstampedKey

func (d KeyValueTxStore) SetVersionstampedKey(ctx context.Context, key []byte, value []byte) error

func (KeyValueTxStore) SetVersionstampedValue

func (d KeyValueTxStore) SetVersionstampedValue(ctx context.Context, key []byte, value []byte) error

func (KeyValueTxStore) TableSize

func (d KeyValueTxStore) TableSize(ctx context.Context, name []byte) (int64, error)

TableSize calculates approximate table size in bytes It also works with the prefix of the table name, allowing to calculate sizes of multiple table with the same prefix.

type ListenerStore

type ListenerStore struct {
	TxStore
}

ListenerStore is the before any other kv layer in the chain so that event data can be pushed to the listener.

func (*ListenerStore) BeginTx

func (store *ListenerStore) BeginTx(ctx context.Context) (Tx, error)

type ListenerTx

type ListenerTx struct {
	Tx
}

ListenerTx is the tx created for ListenerStore.

func (*ListenerTx) Delete

func (tx *ListenerTx) Delete(ctx context.Context, table []byte, key Key) error

func (*ListenerTx) Insert

func (tx *ListenerTx) Insert(ctx context.Context, table []byte, key Key, data *internal.TableData) error

func (*ListenerTx) Replace

func (tx *ListenerTx) Replace(ctx context.Context, table []byte, key Key, data *internal.TableData, isUpdate bool) error

type NoopEventListener

type NoopEventListener struct{}

func (*NoopEventListener) GetEvents

func (*NoopEventListener) GetEvents() []*Event

func (*NoopEventListener) OnClear

func (*NoopEventListener) OnClear(string, []byte, Key)

func (*NoopEventListener) OnSet

type NoopFDBTypeIterator

type NoopFDBTypeIterator struct{}

func (*NoopFDBTypeIterator) Err

func (*NoopFDBTypeIterator) Err() error

func (*NoopFDBTypeIterator) Next

type NoopIterator

type NoopIterator struct{}

func (*NoopIterator) Err

func (*NoopIterator) Err() error

func (*NoopIterator) Next

func (*NoopIterator) Next(_ *KeyValue) bool

type NoopKV

type NoopKV struct{}

func (*NoopKV) AtomicAdd

func (*NoopKV) AtomicAdd(_ context.Context, _ []byte, _ Key, _ int64) error

func (*NoopKV) AtomicRead

func (*NoopKV) AtomicRead(_ context.Context, _ []byte, _ Key) (int64, error)

func (*NoopKV) AtomicReadPrefix

func (*NoopKV) AtomicReadPrefix(_ context.Context, _ []byte, _ Key, _ bool) (AtomicIterator, error)

func (*NoopKV) AtomicReadRange

func (*NoopKV) AtomicReadRange(_ context.Context, _ []byte, _ Key, _ Key, _ bool) (AtomicIterator, error)

func (*NoopKV) Delete

func (*NoopKV) Delete(_ context.Context, _ []byte, _ Key) error

func (*NoopKV) Get

func (*NoopKV) Get(_ context.Context, _ []byte, _ bool) Future

func (*NoopKV) GetMetadata

func (*NoopKV) GetMetadata(_ context.Context, _ []byte, _ Key) (*internal.TableData, error)

func (*NoopKV) Insert

func (*NoopKV) Insert(_ context.Context, _ []byte, _ Key, _ *internal.TableData) error

func (*NoopKV) RangeSize

func (*NoopKV) RangeSize(_ context.Context, _ []byte, _ Key, _ Key) (int64, error)

func (*NoopKV) Read

func (*NoopKV) Read(_ context.Context, _ []byte, _ Key, _ bool) (Iterator, error)

func (*NoopKV) ReadRange

func (*NoopKV) ReadRange(_ context.Context, _ []byte, _ Key, _ Key, _ bool, _ bool) (Iterator, error)

func (*NoopKV) Replace

func (*NoopKV) Replace(_ context.Context, _ []byte, _ Key, _ *internal.TableData, _ bool) error

func (*NoopKV) SetVersionstampedKey

func (*NoopKV) SetVersionstampedKey(_ context.Context, _ []byte, _ []byte) error

func (*NoopKV) SetVersionstampedValue

func (*NoopKV) SetVersionstampedValue(_ context.Context, _ []byte, _ []byte) error

type NoopKVStore

type NoopKVStore struct {
	*NoopKV
}

NoopKVStore is a noop store, useful if we need to profile/debug only compute and not with the storage. This can be initialized in main.go instead of using default kvStore.

func (*NoopKVStore) BeginTx

func (*NoopKVStore) BeginTx(_ context.Context) (Tx, error)

func (*NoopKVStore) CreateTable

func (*NoopKVStore) CreateTable(_ context.Context, _ []byte) error

func (*NoopKVStore) DropTable

func (*NoopKVStore) DropTable(_ context.Context, _ []byte) error

func (*NoopKVStore) GetInternalDatabase

func (*NoopKVStore) GetInternalDatabase() (any, error)

func (*NoopKVStore) GetTableStats

func (*NoopKVStore) GetTableStats(_ context.Context, _ []byte) (*TableStats, error)

func (*NoopKVStore) TableSize

func (*NoopKVStore) TableSize(_ context.Context, _ []byte) (int64, error)

type NoopTx

type NoopTx struct {
	*NoopKV
}

func (*NoopTx) Commit

func (*NoopTx) Commit(context.Context) error

func (*NoopTx) IsRetriable

func (*NoopTx) IsRetriable() bool

func (*NoopTx) Rollback

func (*NoopTx) Rollback(context.Context) error

type ShardedAtomics

type ShardedAtomics interface {
	AtomicAddTx(ctx context.Context, tx Tx, table []byte, key Key, value int64) error
	AtomicReadTx(ctx context.Context, tx Tx, table []byte, key Key) (int64, error)
	AtomicAdd(ctx context.Context, table []byte, key Key, value int64) error
	AtomicRead(ctx context.Context, table []byte, key Key) (int64, error)
}

func NewShardedAtomics

func NewShardedAtomics(kv TxStore) ShardedAtomics

type StatsTx

type StatsTx struct {
	Tx
	// contains filtered or unexported fields
}

func (*StatsTx) Delete

func (tx *StatsTx) Delete(ctx context.Context, table []byte, key Key) error

func (*StatsTx) Insert

func (tx *StatsTx) Insert(ctx context.Context, table []byte, key Key, data *internal.TableData) error

func (*StatsTx) Replace

func (tx *StatsTx) Replace(ctx context.Context, table []byte, key Key, data *internal.TableData, isUpdate bool) error

type StatsTxStore

type StatsTxStore struct {
	TxStore
	// contains filtered or unexported fields
}

func (*StatsTxStore) BeginTx

func (store *StatsTxStore) BeginTx(ctx context.Context) (Tx, error)

func (*StatsTxStore) DropTable

func (store *StatsTxStore) DropTable(ctx context.Context, name []byte) error

func (*StatsTxStore) GetTableStats

func (store *StatsTxStore) GetTableStats(ctx context.Context, table []byte) (*TableStats, error)

type StoreErrCode

type StoreErrCode byte
const (
	ErrCodeInvalid                 StoreErrCode = 0x00
	ErrCodeDuplicateKey            StoreErrCode = 0x01
	ErrCodeConflictingTransaction  StoreErrCode = 0x02
	ErrCodeTransactionMaxDuration  StoreErrCode = 0x03
	ErrCodeTransactionTimedOut     StoreErrCode = 0x04
	ErrCodeTransactionNotCommitted StoreErrCode = 0x05
	ErrCodeValueSizeExceeded       StoreErrCode = 0x06
	ErrCodeTransactionSizeExceeded StoreErrCode = 0x07
	ErrCodeNotFound                StoreErrCode = 0x08
)

type StoreError

type StoreError struct {
	// contains filtered or unexported fields
}

func (StoreError) Code

func (se StoreError) Code() StoreErrCode

func (StoreError) Error

func (se StoreError) Error() string

func (StoreError) Msg

func (se StoreError) Msg() string

type TableStats

type TableStats struct {
	StoredBytes      int64
	OnDiskSize       int64
	RowCount         int64
	SearchFieldsSize int64
}

type Tx

type Tx interface {
	KV
	Commit(context.Context) error
	Rollback(context.Context) error
	IsRetriable() bool
	RangeSize(ctx context.Context, table []byte, lkey Key, rkey Key) (int64, error)
}

type TxImplWithMetrics

type TxImplWithMetrics struct {
	// contains filtered or unexported fields
}

func (*TxImplWithMetrics) AtomicAdd

func (m *TxImplWithMetrics) AtomicAdd(ctx context.Context, table []byte, key Key, value int64) (err error)

func (*TxImplWithMetrics) AtomicRead

func (m *TxImplWithMetrics) AtomicRead(ctx context.Context, table []byte, key Key) (value int64, err error)

func (*TxImplWithMetrics) AtomicReadPrefix

func (m *TxImplWithMetrics) AtomicReadPrefix(ctx context.Context, table []byte, key Key, isSnapshot bool) (iter AtomicIterator, err error)

func (*TxImplWithMetrics) AtomicReadRange

func (m *TxImplWithMetrics) AtomicReadRange(ctx context.Context, table []byte, lkey Key, rkey Key, isSnapshot bool) (iter AtomicIterator, err error)

func (*TxImplWithMetrics) Commit

func (m *TxImplWithMetrics) Commit(ctx context.Context) (err error)

func (*TxImplWithMetrics) Delete

func (m *TxImplWithMetrics) Delete(ctx context.Context, table []byte, key Key) (err error)

func (*TxImplWithMetrics) Get

func (m *TxImplWithMetrics) Get(ctx context.Context, key []byte, isSnapshot bool) (val Future)

func (*TxImplWithMetrics) GetMetadata

func (m *TxImplWithMetrics) GetMetadata(ctx context.Context, table []byte, key Key) (data *internal.TableData, err error)

func (*TxImplWithMetrics) Insert

func (m *TxImplWithMetrics) Insert(ctx context.Context, table []byte, key Key, data *internal.TableData) (err error)

func (*TxImplWithMetrics) IsRetriable

func (m *TxImplWithMetrics) IsRetriable() bool

func (*TxImplWithMetrics) RangeSize

func (m *TxImplWithMetrics) RangeSize(ctx context.Context, table []byte, lkey Key, rkey Key) (size int64, err error)

func (*TxImplWithMetrics) Read

func (m *TxImplWithMetrics) Read(ctx context.Context, table []byte, key Key, reverse bool) (it Iterator, err error)

func (*TxImplWithMetrics) ReadRange

func (m *TxImplWithMetrics) ReadRange(ctx context.Context, table []byte, lkey Key, rkey Key, isSnapshot bool, reverse bool) (it Iterator, err error)

func (*TxImplWithMetrics) Replace

func (m *TxImplWithMetrics) Replace(ctx context.Context, table []byte, key Key, data *internal.TableData, isUpdate bool) (err error)

func (*TxImplWithMetrics) Rollback

func (m *TxImplWithMetrics) Rollback(ctx context.Context) (err error)

func (*TxImplWithMetrics) SetVersionstampedKey

func (m *TxImplWithMetrics) SetVersionstampedKey(ctx context.Context, key []byte, value []byte) (err error)

func (*TxImplWithMetrics) SetVersionstampedValue

func (m *TxImplWithMetrics) SetVersionstampedValue(ctx context.Context, key []byte, value []byte) (err error)

type TxStore

type TxStore interface {
	BeginTx(ctx context.Context) (Tx, error)
	CreateTable(ctx context.Context, name []byte) error
	DropTable(ctx context.Context, name []byte) error
	GetInternalDatabase() (any, error) // TODO: CDC remove workaround
	GetTableStats(ctx context.Context, name []byte) (*TableStats, error)
}

func NewChunkStore

func NewChunkStore(store TxStore, enabled bool) TxStore

func NewCompressionStore

func NewCompressionStore(store TxStore, enabled bool) TxStore

func NewKeyValueStoreWithMetrics

func NewKeyValueStoreWithMetrics(txStore TxStore) TxStore

func NewListenerStore

func NewListenerStore(store TxStore) TxStore

func NewStatsStore

func NewStatsStore(store TxStore) TxStore

func NewTxStore

func NewTxStore(kv *fdbkv) TxStore

func StoreForDatabase

func StoreForDatabase(cfg *config.Config) (TxStore, error)

func StoreForSearch

func StoreForSearch(cfg *config.Config) (TxStore, error)

type TxStoreWithMetrics

type TxStoreWithMetrics struct {
	// contains filtered or unexported fields
}

func (*TxStoreWithMetrics) BeginTx

func (m *TxStoreWithMetrics) BeginTx(ctx context.Context) (Tx, error)

func (*TxStoreWithMetrics) CreateTable

func (m *TxStoreWithMetrics) CreateTable(ctx context.Context, name []byte) (err error)

func (*TxStoreWithMetrics) DropTable

func (m *TxStoreWithMetrics) DropTable(ctx context.Context, name []byte) (err error)

func (*TxStoreWithMetrics) GetInternalDatabase

func (m *TxStoreWithMetrics) GetInternalDatabase() (any, error)

func (*TxStoreWithMetrics) GetTableStats

func (m *TxStoreWithMetrics) GetTableStats(ctx context.Context, name []byte) (stats *TableStats, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL