Documentation ¶
Index ¶
- Constants
- Variables
- func IsTimedOut(err error) bool
- func NewStoreError(fdbCode int, code StoreErrCode, msg string, args ...interface{}) error
- func WrapEventListenerCtx(ctx context.Context) context.Context
- type AtomicIterator
- type AtomicIteratorImpl
- type ChunkIterator
- type ChunkTx
- func (t ChunkTx) AtomicAdd(_ context.Context, table []byte, key Key, value int64) error
- func (t ChunkTx) AtomicRead(_ context.Context, table []byte, key Key) (int64, error)
- func (t ChunkTx) AtomicReadRange(ctx context.Context, table []byte, lkey Key, rkey Key, isSnapshot bool) (AtomicIterator, error)
- func (t ChunkTx) Commit(_ context.Context) error
- func (t ChunkTx) Delete(ctx context.Context, table []byte, key Key) error
- func (t ChunkTx) DeleteRange(ctx context.Context, table []byte, lKey Key, rKey Key) error
- func (t ChunkTx) Get(_ context.Context, key []byte, isSnapshot bool) (Future, error)
- func (tx *ChunkTx) Insert(ctx context.Context, table []byte, key Key, data *internal.TableData) error
- func (t ChunkTx) IsRetriable() bool
- func (t ChunkTx) RangeSize(ctx context.Context, table []byte, lKey Key, rKey Key) (int64, error)
- func (tx *ChunkTx) Read(ctx context.Context, table []byte, key Key) (Iterator, error)
- func (tx *ChunkTx) ReadRange(ctx context.Context, table []byte, lKey Key, rKey Key, isSnapshot bool) (Iterator, error)
- func (tx *ChunkTx) Replace(ctx context.Context, table []byte, key Key, data *internal.TableData, ...) error
- func (t ChunkTx) Rollback(_ context.Context) error
- func (t ChunkTx) SetVersionstampedKey(_ context.Context, key []byte, value []byte) error
- func (t ChunkTx) SetVersionstampedValue(_ context.Context, key []byte, value []byte) error
- type ChunkTxStore
- func (d ChunkTxStore) AtomicAdd(ctx context.Context, table []byte, key Key, value int64) error
- func (d ChunkTxStore) AtomicRead(ctx context.Context, table []byte, key Key) (int64, error)
- func (d ChunkTxStore) AtomicReadRange(ctx context.Context, table []byte, lKey Key, rKey Key, isSnapshot bool) (AtomicIterator, error)
- func (txStore *ChunkTxStore) BeginTx(ctx context.Context) (Tx, error)
- func (d ChunkTxStore) CreateTable(_ context.Context, name []byte) error
- func (d ChunkTxStore) Delete(ctx context.Context, table []byte, key Key) error
- func (d ChunkTxStore) DropTable(ctx context.Context, name []byte) error
- func (d ChunkTxStore) Get(ctx context.Context, key []byte, isSnapshot bool) (Future, error)
- func (d ChunkTxStore) Insert(ctx context.Context, table []byte, key Key, data []byte) error
- func (d ChunkTxStore) Read(ctx context.Context, table []byte, key Key) (baseIterator, error)
- func (d ChunkTxStore) ReadRange(ctx context.Context, table []byte, lKey Key, rKey Key, isSnapshot bool) (baseIterator, error)
- func (d ChunkTxStore) Replace(ctx context.Context, table []byte, key Key, data []byte, isUpdate bool) error
- func (d ChunkTxStore) SetVersionstampedKey(ctx context.Context, key []byte, value []byte) error
- func (d ChunkTxStore) SetVersionstampedValue(ctx context.Context, key []byte, value []byte) error
- func (d ChunkTxStore) TableSize(ctx context.Context, name []byte) (int64, error)
- type DefaultListener
- type Event
- type EventListener
- type EventListenerCtxKey
- type FdbBaseKeyValue
- type Future
- type Iterator
- type KV
- type Key
- type KeyPart
- type KeyValue
- type KeyValueIterator
- type KeyValueTx
- func (t KeyValueTx) AtomicAdd(_ context.Context, table []byte, key Key, value int64) error
- func (t KeyValueTx) AtomicRead(_ context.Context, table []byte, key Key) (int64, error)
- func (t KeyValueTx) AtomicReadRange(ctx context.Context, table []byte, lkey Key, rkey Key, isSnapshot bool) (AtomicIterator, error)
- func (t KeyValueTx) Commit(_ context.Context) error
- func (t KeyValueTx) Delete(ctx context.Context, table []byte, key Key) error
- func (t KeyValueTx) DeleteRange(ctx context.Context, table []byte, lKey Key, rKey Key) error
- func (t KeyValueTx) Get(_ context.Context, key []byte, isSnapshot bool) (Future, error)
- func (tx *KeyValueTx) Insert(ctx context.Context, table []byte, key Key, data *internal.TableData) error
- func (t KeyValueTx) IsRetriable() bool
- func (t KeyValueTx) RangeSize(ctx context.Context, table []byte, lKey Key, rKey Key) (int64, error)
- func (tx *KeyValueTx) Read(ctx context.Context, table []byte, key Key) (Iterator, error)
- func (tx *KeyValueTx) ReadRange(ctx context.Context, table []byte, lkey Key, rkey Key, isSnapshot bool) (Iterator, error)
- func (tx *KeyValueTx) Replace(ctx context.Context, table []byte, key Key, data *internal.TableData, ...) error
- func (t KeyValueTx) Rollback(_ context.Context) error
- func (t KeyValueTx) SetVersionstampedKey(_ context.Context, key []byte, value []byte) error
- func (t KeyValueTx) SetVersionstampedValue(_ context.Context, key []byte, value []byte) error
- type KeyValueTxStore
- func (d KeyValueTxStore) AtomicAdd(ctx context.Context, table []byte, key Key, value int64) error
- func (d KeyValueTxStore) AtomicRead(ctx context.Context, table []byte, key Key) (int64, error)
- func (d KeyValueTxStore) AtomicReadRange(ctx context.Context, table []byte, lKey Key, rKey Key, isSnapshot bool) (AtomicIterator, error)
- func (k *KeyValueTxStore) BeginTx(ctx context.Context) (Tx, error)
- func (d KeyValueTxStore) CreateTable(_ context.Context, name []byte) error
- func (d KeyValueTxStore) Delete(ctx context.Context, table []byte, key Key) error
- func (d KeyValueTxStore) DropTable(ctx context.Context, name []byte) error
- func (d KeyValueTxStore) Get(ctx context.Context, key []byte, isSnapshot bool) (Future, error)
- func (k *KeyValueTxStore) GetInternalDatabase() (interface{}, error)
- func (d KeyValueTxStore) Insert(ctx context.Context, table []byte, key Key, data []byte) error
- func (d KeyValueTxStore) Read(ctx context.Context, table []byte, key Key) (baseIterator, error)
- func (d KeyValueTxStore) ReadRange(ctx context.Context, table []byte, lKey Key, rKey Key, isSnapshot bool) (baseIterator, error)
- func (d KeyValueTxStore) Replace(ctx context.Context, table []byte, key Key, data []byte, isUpdate bool) error
- func (d KeyValueTxStore) SetVersionstampedKey(ctx context.Context, key []byte, value []byte) error
- func (d KeyValueTxStore) SetVersionstampedValue(ctx context.Context, key []byte, value []byte) error
- func (d KeyValueTxStore) TableSize(ctx context.Context, name []byte) (int64, error)
- type NoopEventListener
- type NoopFDBTypeIterator
- type NoopIterator
- type NoopKV
- func (n *NoopKV) AtomicAdd(ctx context.Context, table []byte, key Key, value int64) error
- func (n *NoopKV) AtomicRead(ctx context.Context, table []byte, key Key) (int64, error)
- func (n *NoopKV) AtomicReadRange(ctx context.Context, table []byte, lkey Key, rkey Key, isSnapshot bool) (AtomicIterator, error)
- func (n *NoopKV) Delete(ctx context.Context, table []byte, key Key) error
- func (n *NoopKV) Get(ctx context.Context, key []byte, isSnapshot bool) (Future, error)
- func (n *NoopKV) Insert(ctx context.Context, table []byte, key Key, data *internal.TableData) error
- func (n *NoopKV) RangeSize(ctx context.Context, table []byte, lkey Key, rkey Key) (int64, error)
- func (n *NoopKV) Read(ctx context.Context, table []byte, key Key) (Iterator, error)
- func (n *NoopKV) ReadRange(ctx context.Context, table []byte, lkey Key, rkey Key, isSnapshot bool) (Iterator, error)
- func (n *NoopKV) Replace(ctx context.Context, table []byte, key Key, data *internal.TableData, ...) error
- func (n *NoopKV) SetVersionstampedKey(ctx context.Context, key []byte, value []byte) error
- func (n *NoopKV) SetVersionstampedValue(ctx context.Context, key []byte, value []byte) error
- type NoopKVStore
- func (n *NoopKVStore) BeginTx(_ context.Context) (Tx, error)
- func (n *NoopKVStore) CreateTable(_ context.Context, _ []byte) error
- func (n *NoopKVStore) DropTable(_ context.Context, _ []byte) error
- func (n *NoopKVStore) GetInternalDatabase() (interface{}, error)
- func (n *NoopKVStore) TableSize(_ context.Context, _ []byte) (int64, error)
- type NoopTx
- type StoreErrCode
- type StoreError
- type Tx
- type TxImplWithMetrics
- func (m *TxImplWithMetrics) AtomicAdd(ctx context.Context, table []byte, key Key, value int64) (err error)
- func (m *TxImplWithMetrics) AtomicRead(ctx context.Context, table []byte, key Key) (value int64, err error)
- func (m *TxImplWithMetrics) AtomicReadRange(ctx context.Context, table []byte, lkey Key, rkey Key, isSnapshot bool) (iter AtomicIterator, err error)
- func (m *TxImplWithMetrics) Commit(ctx context.Context) (err error)
- func (m *TxImplWithMetrics) Delete(ctx context.Context, table []byte, key Key) (err error)
- func (m *TxImplWithMetrics) Get(ctx context.Context, key []byte, isSnapshot bool) (val Future, err error)
- func (m *TxImplWithMetrics) Insert(ctx context.Context, table []byte, key Key, data *internal.TableData) (err error)
- func (m *TxImplWithMetrics) IsRetriable() bool
- func (m *TxImplWithMetrics) RangeSize(ctx context.Context, table []byte, lkey Key, rkey Key) (size int64, err error)
- func (m *TxImplWithMetrics) Read(ctx context.Context, table []byte, key Key) (it Iterator, err error)
- func (m *TxImplWithMetrics) ReadRange(ctx context.Context, table []byte, lkey Key, rkey Key, isSnapshot bool) (it Iterator, err error)
- func (m *TxImplWithMetrics) Replace(ctx context.Context, table []byte, key Key, data *internal.TableData, ...) (err error)
- func (m *TxImplWithMetrics) Rollback(ctx context.Context) (err error)
- func (m *TxImplWithMetrics) SetVersionstampedKey(ctx context.Context, key []byte, value []byte) (err error)
- func (m *TxImplWithMetrics) SetVersionstampedValue(ctx context.Context, key []byte, value []byte) (err error)
- type TxStore
- type TxStoreWithMetrics
- func (m *TxStoreWithMetrics) BeginTx(ctx context.Context) (Tx, error)
- func (m *TxStoreWithMetrics) CreateTable(ctx context.Context, name []byte) (err error)
- func (m *TxStoreWithMetrics) DropTable(ctx context.Context, name []byte) (err error)
- func (m *TxStoreWithMetrics) GetInternalDatabase() (k interface{}, err error)
- func (m *TxStoreWithMetrics) TableSize(ctx context.Context, name []byte) (size int64, err error)
Constants ¶
const ( KB = 1000 KB99 = 99 * KB )
const ( InsertEvent = "insert" ReplaceEvent = "replace" UpdateEvent = "update" UpdateRangeEvent = "updateRange" DeleteEvent = "delete" DeleteRangeEvent = "deleteRange" )
Variables ¶
var ( // ErrDuplicateKey is returned when an insert call is made for a key that already exist. ErrDuplicateKey = NewStoreError(0, ErrCodeDuplicateKey, "duplicate key value, violates key constraint") // ErrConflictingTransaction is returned when there are conflicting transactions. ErrConflictingTransaction = NewStoreError(1020, ErrCodeConflictingTransaction, "transaction not committed due to conflict with another transaction") // ErrTransactionMaxDurationReached is returned when transaction running beyond 5seconds. ErrTransactionMaxDurationReached = NewStoreError(1007, ErrCodeTransactionMaxDuration, "transaction is old to perform reads or be committed") // ErrTransactionTimedOut is returned when fdb abort the transaction because of 5seconds limit. ErrTransactionTimedOut = NewStoreError(1031, ErrCodeTransactionTimedOut, "operation aborted because the transaction timed out") ErrTransactionNotCommitted = NewStoreError(1021, ErrCodeTransactionNotCommitted, "transaction may or may not have committed") ErrValueSizeExceeded = NewStoreError(2103, ErrCodeValueSizeExceeded, "document exceeds limit") ErrTransactionSizeExceeded = NewStoreError(2101, ErrCodeTransactionSizeExceeded, "transaction exceeds limit") )
Functions ¶
func IsTimedOut ¶
func NewStoreError ¶
func NewStoreError(fdbCode int, code StoreErrCode, msg string, args ...interface{}) error
Types ¶
type AtomicIterator ¶
type AtomicIterator interface { Next(value *FdbBaseKeyValue[int64]) bool Err() error }
type AtomicIteratorImpl ¶
type AtomicIteratorImpl struct {
// contains filtered or unexported fields
}
func (*AtomicIteratorImpl) Err ¶
func (i *AtomicIteratorImpl) Err() error
func (*AtomicIteratorImpl) Next ¶
func (i *AtomicIteratorImpl) Next(value *FdbBaseKeyValue[int64]) bool
type ChunkIterator ¶
type ChunkIterator struct { Iterator // contains filtered or unexported fields }
func (*ChunkIterator) Err ¶
func (it *ChunkIterator) Err() error
func (*ChunkIterator) Next ¶
func (it *ChunkIterator) Next(value *KeyValue) bool
type ChunkTx ¶
type ChunkTx struct {
*KeyValueTx
}
func (ChunkTx) AtomicRead ¶
func (ChunkTx) AtomicReadRange ¶
func (ChunkTx) DeleteRange ¶
func (ChunkTx) IsRetriable ¶
func (t ChunkTx) IsRetriable() bool
IsRetriable returns true if transaction can be retried after error.
func (ChunkTx) RangeSize ¶
RangeSize calculates approximate range table size in bytes - this is an estimate and a range smaller than 3mb will not be that accurate.
func (*ChunkTx) Read ¶
Read needs to return chunk iterator so that it can merge and returned merged chunk to caller.
func (ChunkTx) SetVersionstampedKey ¶
type ChunkTxStore ¶
type ChunkTxStore struct {
*KeyValueTxStore
}
ChunkTxStore is used as a layer on top of KeyValueTxStore. The idea is that chunk store will automatically split the user payload if it is greater than 99KB. It adds some metadata in the zeroth chunk like total chunks so that it can easily merge the value again. The attributes of data passed by the caller is only needed in the first chunk, the remaining chunks only have body. The chunk number is appended at the end in the format "__<chunk number>". This number is used during merging from the key so there is no information apart from total chunk is persisted in the value.
func (ChunkTxStore) AtomicRead ¶
func (ChunkTxStore) AtomicReadRange ¶
func (*ChunkTxStore) BeginTx ¶
func (txStore *ChunkTxStore) BeginTx(ctx context.Context) (Tx, error)
func (ChunkTxStore) CreateTable ¶
func (ChunkTxStore) SetVersionstampedKey ¶
func (ChunkTxStore) SetVersionstampedValue ¶
type DefaultListener ¶
type DefaultListener struct {
Events []*Event
}
func (*DefaultListener) GetEvents ¶
func (l *DefaultListener) GetEvents() []*Event
func (*DefaultListener) OnClearRange ¶
func (l *DefaultListener) OnClearRange(op string, table []byte, lKey []byte, rKey []byte)
type EventListener ¶
type EventListener interface { // OnSet buffers insert/replace/update events OnSet(op string, table []byte, key []byte, data []byte) // OnClearRange buffers delete events OnClearRange(op string, table []byte, lKey []byte, rKey []byte) // GetEvents is used to access buffered events. These events may be shared by different participants callers are // strongly discourage to modify the event and if needed copy it to some other buffer. Once transaction completes // session may discard all the buffered events. GetEvents() []*Event }
EventListener is listener to buffer all the changes in a transaction. It is attached by server layer in the context, and it is only responsible for buffering of the events but doesn't participate in the outcome of the transaction i.e. EventListener has no knowledge whether the transaction was committed or rolled back. The lifecycle of this listener is managed by QuerySession in server package.
func GetEventListener ¶
func GetEventListener(ctx context.Context) EventListener
type EventListenerCtxKey ¶
type EventListenerCtxKey struct{}
type FdbBaseKeyValue ¶
FdbBaseKeyValue type for when we are not iterating over TableData.
type Future ¶
type Future fdb.FutureByteSlice
type KV ¶
type KV interface { Insert(ctx context.Context, table []byte, key Key, data *internal.TableData) error Replace(ctx context.Context, table []byte, key Key, data *internal.TableData, isUpdate bool) error Delete(ctx context.Context, table []byte, key Key) error Read(ctx context.Context, table []byte, key Key) (Iterator, error) ReadRange(ctx context.Context, table []byte, lkey Key, rkey Key, isSnapshot bool) (Iterator, error) SetVersionstampedValue(ctx context.Context, key []byte, value []byte) error SetVersionstampedKey(ctx context.Context, key []byte, value []byte) error Get(ctx context.Context, key []byte, isSnapshot bool) (Future, error) AtomicAdd(ctx context.Context, table []byte, key Key, value int64) error AtomicRead(ctx context.Context, table []byte, key Key) (int64, error) AtomicReadRange(ctx context.Context, table []byte, lkey Key, rkey Key, isSnapshot bool) (AtomicIterator, error) }
type KeyValueIterator ¶
type KeyValueIterator struct {
// contains filtered or unexported fields
}
func (*KeyValueIterator) Err ¶
func (i *KeyValueIterator) Err() error
func (*KeyValueIterator) Next ¶
func (i *KeyValueIterator) Next(value *KeyValue) bool
type KeyValueTx ¶
type KeyValueTx struct {
// contains filtered or unexported fields
}
func (KeyValueTx) AtomicRead ¶
func (KeyValueTx) AtomicReadRange ¶
func (KeyValueTx) DeleteRange ¶
func (KeyValueTx) IsRetriable ¶
func (t KeyValueTx) IsRetriable() bool
IsRetriable returns true if transaction can be retried after error.
func (KeyValueTx) RangeSize ¶
RangeSize calculates approximate range table size in bytes - this is an estimate and a range smaller than 3mb will not be that accurate.
func (KeyValueTx) SetVersionstampedKey ¶
type KeyValueTxStore ¶
type KeyValueTxStore struct {
// contains filtered or unexported fields
}
func (KeyValueTxStore) AtomicRead ¶
func (KeyValueTxStore) AtomicReadRange ¶
func (*KeyValueTxStore) BeginTx ¶
func (k *KeyValueTxStore) BeginTx(ctx context.Context) (Tx, error)
func (KeyValueTxStore) CreateTable ¶
func (*KeyValueTxStore) GetInternalDatabase ¶
func (k *KeyValueTxStore) GetInternalDatabase() (interface{}, error)
func (KeyValueTxStore) SetVersionstampedKey ¶
func (KeyValueTxStore) SetVersionstampedValue ¶
type NoopEventListener ¶
type NoopEventListener struct{}
func (*NoopEventListener) GetEvents ¶
func (l *NoopEventListener) GetEvents() []*Event
func (*NoopEventListener) OnClearRange ¶
func (l *NoopEventListener) OnClearRange(op string, table []byte, lKey []byte, rKey []byte)
type NoopFDBTypeIterator ¶
type NoopFDBTypeIterator struct{}
func (*NoopFDBTypeIterator) Err ¶
func (n *NoopFDBTypeIterator) Err() error
func (*NoopFDBTypeIterator) Next ¶
func (n *NoopFDBTypeIterator) Next(value *FdbBaseKeyValue[int64]) bool
type NoopIterator ¶
type NoopIterator struct{}
func (*NoopIterator) Err ¶
func (n *NoopIterator) Err() error
func (*NoopIterator) Next ¶
func (n *NoopIterator) Next(value *KeyValue) bool
type NoopKV ¶
type NoopKV struct{}
func (*NoopKV) AtomicRead ¶
func (*NoopKV) AtomicReadRange ¶
func (*NoopKV) SetVersionstampedKey ¶
type NoopKVStore ¶
type NoopKVStore struct {
*NoopKV
}
NoopKVStore is a noop store, useful if we need to profile/debug only compute and not with the storage. This can be initialized in main.go instead of using default kvStore.
func (*NoopKVStore) CreateTable ¶
func (n *NoopKVStore) CreateTable(_ context.Context, _ []byte) error
func (*NoopKVStore) GetInternalDatabase ¶
func (n *NoopKVStore) GetInternalDatabase() (interface{}, error)
type StoreErrCode ¶
type StoreErrCode byte
const ( ErrCodeInvalid StoreErrCode = 0x00 ErrCodeDuplicateKey StoreErrCode = 0x01 ErrCodeConflictingTransaction StoreErrCode = 0x02 ErrCodeTransactionMaxDuration StoreErrCode = 0x03 ErrCodeTransactionTimedOut StoreErrCode = 0x04 ErrCodeTransactionNotCommitted StoreErrCode = 0x05 ErrCodeValueSizeExceeded StoreErrCode = 0x06 ErrCodeTransactionSizeExceeded StoreErrCode = 0x07 )
type StoreError ¶
type StoreError struct {
// contains filtered or unexported fields
}
func (StoreError) Code ¶
func (se StoreError) Code() StoreErrCode
func (StoreError) Error ¶
func (se StoreError) Error() string
func (StoreError) Msg ¶
func (se StoreError) Msg() string
type TxImplWithMetrics ¶
type TxImplWithMetrics struct {
// contains filtered or unexported fields
}
func (*TxImplWithMetrics) AtomicRead ¶
func (*TxImplWithMetrics) AtomicReadRange ¶
func (m *TxImplWithMetrics) AtomicReadRange(ctx context.Context, table []byte, lkey Key, rkey Key, isSnapshot bool) (iter AtomicIterator, err error)
func (*TxImplWithMetrics) Commit ¶
func (m *TxImplWithMetrics) Commit(ctx context.Context) (err error)
func (*TxImplWithMetrics) IsRetriable ¶
func (m *TxImplWithMetrics) IsRetriable() bool
func (*TxImplWithMetrics) Rollback ¶
func (m *TxImplWithMetrics) Rollback(ctx context.Context) (err error)
func (*TxImplWithMetrics) SetVersionstampedKey ¶
func (*TxImplWithMetrics) SetVersionstampedValue ¶
type TxStore ¶
type TxStore interface { BeginTx(ctx context.Context) (Tx, error) CreateTable(ctx context.Context, name []byte) error DropTable(ctx context.Context, name []byte) error GetInternalDatabase() (interface{}, error) // TODO: CDC remove workaround TableSize(ctx context.Context, name []byte) (int64, error) }
func NewChunkStore ¶
func NewChunkStore(cfg *config.FoundationDBConfig) (TxStore, error)
func NewTxStore ¶
func NewTxStore(cfg *config.FoundationDBConfig) (TxStore, error)
type TxStoreWithMetrics ¶
type TxStoreWithMetrics struct {
// contains filtered or unexported fields
}
func (*TxStoreWithMetrics) BeginTx ¶
func (m *TxStoreWithMetrics) BeginTx(ctx context.Context) (Tx, error)
func (*TxStoreWithMetrics) CreateTable ¶
func (m *TxStoreWithMetrics) CreateTable(ctx context.Context, name []byte) (err error)
func (*TxStoreWithMetrics) DropTable ¶
func (m *TxStoreWithMetrics) DropTable(ctx context.Context, name []byte) (err error)
func (*TxStoreWithMetrics) GetInternalDatabase ¶
func (m *TxStoreWithMetrics) GetInternalDatabase() (k interface{}, err error)