Documentation ¶
Index ¶
- Constants
- Variables
- func IsTimedOut(err error) bool
- func NewStoreError(fdbCode int, code StoreErrCode, msg string, args ...interface{}) error
- func WrapEventListenerCtx(ctx context.Context) context.Context
- type AtomicIterator
- type AtomicIteratorImpl
- type DefaultListener
- type Event
- type EventListener
- type EventListenerCtxKey
- type FdbBaseKeyValue
- type Future
- type Iterator
- type IteratorImpl
- type KV
- type Key
- type KeyPart
- type KeyValue
- type KeyValueStore
- type KeyValueStoreImpl
- func (k *KeyValueStoreImpl) AtomicAdd(ctx context.Context, table []byte, key Key, value int64) error
- func (k *KeyValueStoreImpl) AtomicRead(ctx context.Context, table []byte, key Key) (int64, error)
- func (k *KeyValueStoreImpl) AtomicReadRange(ctx context.Context, table []byte, lkey Key, rkey Key, isSnapshot bool) (AtomicIterator, error)
- func (k *KeyValueStoreImpl) BeginTx(ctx context.Context) (Tx, error)
- func (d KeyValueStoreImpl) CreateTable(_ context.Context, name []byte) error
- func (d KeyValueStoreImpl) Delete(ctx context.Context, table []byte, key Key) error
- func (d KeyValueStoreImpl) DeleteRange(ctx context.Context, table []byte, lKey Key, rKey Key) error
- func (d KeyValueStoreImpl) DropTable(ctx context.Context, name []byte) error
- func (d KeyValueStoreImpl) Get(ctx context.Context, key []byte, isSnapshot bool) (Future, error)
- func (k *KeyValueStoreImpl) GetInternalDatabase() (interface{}, error)
- func (k *KeyValueStoreImpl) Insert(ctx context.Context, table []byte, key Key, data *internal.TableData) error
- func (k *KeyValueStoreImpl) Read(ctx context.Context, table []byte, key Key) (Iterator, error)
- func (k *KeyValueStoreImpl) ReadRange(ctx context.Context, table []byte, lkey Key, rkey Key, isSnapshot bool) (Iterator, error)
- func (k *KeyValueStoreImpl) Replace(ctx context.Context, table []byte, key Key, data *internal.TableData, ...) error
- func (d KeyValueStoreImpl) SetVersionstampedKey(ctx context.Context, key []byte, value []byte) error
- func (d KeyValueStoreImpl) SetVersionstampedValue(ctx context.Context, key []byte, value []byte) error
- func (d KeyValueStoreImpl) TableSize(ctx context.Context, name []byte) (int64, error)
- func (k *KeyValueStoreImpl) Update(ctx context.Context, table []byte, key Key, ...) (int32, error)
- func (k *KeyValueStoreImpl) UpdateRange(ctx context.Context, table []byte, lKey Key, rKey Key, ...) (int32, error)
- type KeyValueStoreImplWithMetrics
- func (m *KeyValueStoreImplWithMetrics) AtomicAdd(ctx context.Context, table []byte, key Key, value int64) (err error)
- func (m *KeyValueStoreImplWithMetrics) AtomicRead(ctx context.Context, table []byte, key Key) (value int64, err error)
- func (m *KeyValueStoreImplWithMetrics) AtomicReadRange(ctx context.Context, table []byte, lkey Key, rkey Key, isSnapshot bool) (iter AtomicIterator, err error)
- func (m *KeyValueStoreImplWithMetrics) BeginTx(ctx context.Context) (Tx, error)
- func (m *KeyValueStoreImplWithMetrics) CreateTable(ctx context.Context, name []byte) (err error)
- func (m *KeyValueStoreImplWithMetrics) Delete(ctx context.Context, table []byte, key Key) (err error)
- func (m *KeyValueStoreImplWithMetrics) DeleteRange(ctx context.Context, table []byte, lKey Key, rKey Key) (err error)
- func (m *KeyValueStoreImplWithMetrics) DropTable(ctx context.Context, name []byte) (err error)
- func (m *KeyValueStoreImplWithMetrics) Get(ctx context.Context, key []byte, isSnapshot bool) (val Future, err error)
- func (m *KeyValueStoreImplWithMetrics) GetInternalDatabase() (k interface{}, err error)
- func (m *KeyValueStoreImplWithMetrics) Insert(ctx context.Context, table []byte, key Key, data *internal.TableData) (err error)
- func (m *KeyValueStoreImplWithMetrics) Read(ctx context.Context, table []byte, key Key) (it Iterator, err error)
- func (m *KeyValueStoreImplWithMetrics) ReadRange(ctx context.Context, table []byte, lkey Key, rkey Key, isSnapshot bool) (it Iterator, err error)
- func (m *KeyValueStoreImplWithMetrics) Replace(ctx context.Context, table []byte, key Key, data *internal.TableData, ...) (err error)
- func (m *KeyValueStoreImplWithMetrics) SetVersionstampedKey(ctx context.Context, key []byte, value []byte) (err error)
- func (m *KeyValueStoreImplWithMetrics) SetVersionstampedValue(ctx context.Context, key []byte, value []byte) (err error)
- func (m *KeyValueStoreImplWithMetrics) TableSize(ctx context.Context, name []byte) (size int64, err error)
- func (m *KeyValueStoreImplWithMetrics) Update(ctx context.Context, table []byte, key Key, ...) (encoded int32, err error)
- func (m *KeyValueStoreImplWithMetrics) UpdateRange(ctx context.Context, table []byte, lKey Key, rKey Key, ...) (encoded int32, err error)
- type NoopEventListener
- type NoopFDBTypeIterator
- type NoopIterator
- type NoopKV
- func (n *NoopKV) AtomicAdd(ctx context.Context, table []byte, key Key, value int64) error
- func (n *NoopKV) AtomicRead(ctx context.Context, table []byte, key Key) (int64, error)
- func (n *NoopKV) AtomicReadRange(ctx context.Context, table []byte, lkey Key, rkey Key, isSnapshot bool) (AtomicIterator, error)
- func (n *NoopKV) Delete(ctx context.Context, table []byte, key Key) error
- func (n *NoopKV) DeleteRange(ctx context.Context, table []byte, lKey Key, rKey Key) error
- func (n *NoopKV) Get(ctx context.Context, key []byte, isSnapshot bool) (Future, error)
- func (n *NoopKV) Insert(ctx context.Context, table []byte, key Key, data *internal.TableData) error
- func (n *NoopKV) RangeSize(ctx context.Context, table []byte, lkey Key, rkey Key) (int64, error)
- func (n *NoopKV) Read(ctx context.Context, table []byte, key Key) (Iterator, error)
- func (n *NoopKV) ReadRange(ctx context.Context, table []byte, lkey Key, rkey Key, isSnapshot bool) (Iterator, error)
- func (n *NoopKV) Replace(ctx context.Context, table []byte, key Key, data *internal.TableData, ...) error
- func (n *NoopKV) SetVersionstampedKey(ctx context.Context, key []byte, value []byte) error
- func (n *NoopKV) SetVersionstampedValue(ctx context.Context, key []byte, value []byte) error
- func (n *NoopKV) Update(ctx context.Context, table []byte, key Key, ...) (int32, error)
- func (n *NoopKV) UpdateRange(ctx context.Context, table []byte, lKey Key, rKey Key, ...) (int32, error)
- type NoopKVStore
- func (n *NoopKVStore) BeginTx(_ context.Context) (Tx, error)
- func (n *NoopKVStore) CreateTable(_ context.Context, _ []byte) error
- func (n *NoopKVStore) DropTable(_ context.Context, _ []byte) error
- func (n *NoopKVStore) GetInternalDatabase() (interface{}, error)
- func (n *NoopKVStore) TableSize(_ context.Context, _ []byte) (int64, error)
- type NoopTx
- type StoreErrCode
- type StoreError
- type Tx
- type TxImpl
- func (t TxImpl) AtomicAdd(_ context.Context, table []byte, key Key, value int64) error
- func (t TxImpl) AtomicRead(_ context.Context, table []byte, key Key) (int64, error)
- func (t TxImpl) AtomicReadRange(ctx context.Context, table []byte, lkey Key, rkey Key, isSnapshot bool) (AtomicIterator, error)
- func (t TxImpl) Commit(_ context.Context) error
- func (t TxImpl) Delete(ctx context.Context, table []byte, key Key) error
- func (t TxImpl) DeleteRange(ctx context.Context, table []byte, lKey Key, rKey Key) error
- func (t TxImpl) Get(_ context.Context, key []byte, isSnapshot bool) (Future, error)
- func (tx *TxImpl) Insert(ctx context.Context, table []byte, key Key, data *internal.TableData) error
- func (t TxImpl) IsRetriable() bool
- func (t TxImpl) RangeSize(ctx context.Context, table []byte, lKey Key, rKey Key) (int64, error)
- func (tx *TxImpl) Read(ctx context.Context, table []byte, key Key) (Iterator, error)
- func (tx *TxImpl) ReadRange(ctx context.Context, table []byte, lkey Key, rkey Key, isSnapshot bool) (Iterator, error)
- func (tx *TxImpl) Replace(ctx context.Context, table []byte, key Key, data *internal.TableData, ...) error
- func (t TxImpl) Rollback(_ context.Context) error
- func (t TxImpl) SetVersionstampedKey(_ context.Context, key []byte, value []byte) error
- func (t TxImpl) SetVersionstampedValue(_ context.Context, key []byte, value []byte) error
- func (tx *TxImpl) Update(ctx context.Context, table []byte, key Key, ...) (int32, error)
- func (tx *TxImpl) UpdateRange(ctx context.Context, table []byte, lKey Key, rKey Key, ...) (int32, error)
- type TxImplWithMetrics
- func (m *TxImplWithMetrics) AtomicAdd(ctx context.Context, table []byte, key Key, value int64) (err error)
- func (m *TxImplWithMetrics) AtomicRead(ctx context.Context, table []byte, key Key) (value int64, err error)
- func (m *TxImplWithMetrics) AtomicReadRange(ctx context.Context, table []byte, lkey Key, rkey Key, isSnapshot bool) (iter AtomicIterator, err error)
- func (m *TxImplWithMetrics) Commit(ctx context.Context) (err error)
- func (m *TxImplWithMetrics) Delete(ctx context.Context, table []byte, key Key) (err error)
- func (m *TxImplWithMetrics) DeleteRange(ctx context.Context, table []byte, lKey Key, rKey Key) (err error)
- func (m *TxImplWithMetrics) Get(ctx context.Context, key []byte, isSnapshot bool) (val Future, err error)
- func (m *TxImplWithMetrics) Insert(ctx context.Context, table []byte, key Key, data *internal.TableData) (err error)
- func (m *TxImplWithMetrics) IsRetriable() bool
- func (m *TxImplWithMetrics) RangeSize(ctx context.Context, table []byte, lkey Key, rkey Key) (size int64, err error)
- func (m *TxImplWithMetrics) Read(ctx context.Context, table []byte, key Key) (it Iterator, err error)
- func (m *TxImplWithMetrics) ReadRange(ctx context.Context, table []byte, lkey Key, rkey Key, isSnapshot bool) (it Iterator, err error)
- func (m *TxImplWithMetrics) Replace(ctx context.Context, table []byte, key Key, data *internal.TableData, ...) (err error)
- func (m *TxImplWithMetrics) Rollback(ctx context.Context) (err error)
- func (m *TxImplWithMetrics) SetVersionstampedKey(ctx context.Context, key []byte, value []byte) (err error)
- func (m *TxImplWithMetrics) SetVersionstampedValue(ctx context.Context, key []byte, value []byte) (err error)
- func (m *TxImplWithMetrics) Update(ctx context.Context, table []byte, key Key, ...) (encoded int32, err error)
- func (m *TxImplWithMetrics) UpdateRange(ctx context.Context, table []byte, lKey Key, rKey Key, ...) (encoded int32, err error)
Constants ¶
const ( InsertEvent = "insert" ReplaceEvent = "replace" UpdateEvent = "update" UpdateRangeEvent = "updateRange" DeleteEvent = "delete" DeleteRangeEvent = "deleteRange" )
Variables ¶
var ( // ErrDuplicateKey is returned when an insert call is made for a key that already exist. ErrDuplicateKey = NewStoreError(0, ErrCodeDuplicateKey, "duplicate key value, violates key constraint") // ErrConflictingTransaction is returned when there are conflicting transactions. ErrConflictingTransaction = NewStoreError(1020, ErrCodeConflictingTransaction, "transaction not committed due to conflict with another transaction") // ErrTransactionMaxDurationReached is returned when transaction running beyond 5seconds. ErrTransactionMaxDurationReached = NewStoreError(1007, ErrCodeTransactionMaxDuration, "transaction is old to perform reads or be committed") // ErrTransactionTimedOut is returned when fdb abort the transaction because of 5seconds limit. ErrTransactionTimedOut = NewStoreError(1031, ErrCodeTransactionTimedOut, "operation aborted because the transaction timed out") ErrTransactionNotCommitted = NewStoreError(1021, ErrCodeTransactionNotCommitted, "transaction may or may not have committed") ErrValueSizeExceeded = NewStoreError(2103, ErrCodeValueSizeExceeded, "document exceeds limit") ErrTransactionSizeExceeded = NewStoreError(2101, ErrCodeTransactionSizeExceeded, "transaction exceeds limit") )
Functions ¶
func IsTimedOut ¶
func NewStoreError ¶
func NewStoreError(fdbCode int, code StoreErrCode, msg string, args ...interface{}) error
Types ¶
type AtomicIterator ¶
type AtomicIterator interface { Next(value *FdbBaseKeyValue[int64]) bool Err() error }
type AtomicIteratorImpl ¶
type AtomicIteratorImpl struct {
// contains filtered or unexported fields
}
func (*AtomicIteratorImpl) Err ¶
func (i *AtomicIteratorImpl) Err() error
func (*AtomicIteratorImpl) Next ¶
func (i *AtomicIteratorImpl) Next(value *FdbBaseKeyValue[int64]) bool
type DefaultListener ¶
type DefaultListener struct {
Events []*Event
}
func (*DefaultListener) GetEvents ¶
func (l *DefaultListener) GetEvents() []*Event
func (*DefaultListener) OnClearRange ¶
func (l *DefaultListener) OnClearRange(op string, table []byte, lKey []byte, rKey []byte)
type EventListener ¶
type EventListener interface { // OnSet buffers insert/replace/update events OnSet(op string, table []byte, key []byte, data []byte) // OnClearRange buffers delete events OnClearRange(op string, table []byte, lKey []byte, rKey []byte) // GetEvents is used to access buffered events. These events may be shared by different participants callers are // strongly discourage to modify the event and if needed copy it to some other buffer. Once transaction completes // session may discard all the buffered events. GetEvents() []*Event }
EventListener is listener to buffer all the changes in a transaction. It is attached by server layer in the context, and it is only responsible for buffering of the events but doesn't participate in the outcome of the transaction i.e. EventListener has no knowledge whether the transaction was committed or rolled back. The lifecycle of this listener is managed by QuerySession in server package.
func GetEventListener ¶
func GetEventListener(ctx context.Context) EventListener
type EventListenerCtxKey ¶
type EventListenerCtxKey struct{}
type FdbBaseKeyValue ¶
KeyValue type for when we not iterating over TableData.
type Future ¶
type Future fdb.FutureByteSlice
type IteratorImpl ¶
type IteratorImpl struct {
// contains filtered or unexported fields
}
func (*IteratorImpl) Err ¶
func (i *IteratorImpl) Err() error
func (*IteratorImpl) Next ¶
func (i *IteratorImpl) Next(value *KeyValue) bool
type KV ¶
type KV interface { Insert(ctx context.Context, table []byte, key Key, data *internal.TableData) error Replace(ctx context.Context, table []byte, key Key, data *internal.TableData, isUpdate bool) error Delete(ctx context.Context, table []byte, key Key) error DeleteRange(ctx context.Context, table []byte, lKey Key, rKey Key) error Read(ctx context.Context, table []byte, key Key) (Iterator, error) ReadRange(ctx context.Context, table []byte, lkey Key, rkey Key, isSnapshot bool) (Iterator, error) Update(ctx context.Context, table []byte, key Key, apply func(*internal.TableData) (*internal.TableData, error)) (int32, error) UpdateRange(ctx context.Context, table []byte, lKey Key, rKey Key, apply func(*internal.TableData) (*internal.TableData, error)) (int32, error) SetVersionstampedValue(ctx context.Context, key []byte, value []byte) error SetVersionstampedKey(ctx context.Context, key []byte, value []byte) error Get(ctx context.Context, key []byte, isSnapshot bool) (Future, error) AtomicAdd(ctx context.Context, table []byte, key Key, value int64) error AtomicRead(ctx context.Context, table []byte, key Key) (int64, error) AtomicReadRange(ctx context.Context, table []byte, lkey Key, rkey Key, isSnapshot bool) (AtomicIterator, error) }
type KeyValueStore ¶
type KeyValueStore interface { KV BeginTx(ctx context.Context) (Tx, error) CreateTable(ctx context.Context, name []byte) error DropTable(ctx context.Context, name []byte) error GetInternalDatabase() (interface{}, error) // TODO: CDC remove workaround TableSize(ctx context.Context, name []byte) (int64, error) }
func NewKeyValueStore ¶
func NewKeyValueStore(cfg *config.FoundationDBConfig) (KeyValueStore, error)
func NewKeyValueStoreWithMetrics ¶
func NewKeyValueStoreWithMetrics(cfg *config.FoundationDBConfig) (KeyValueStore, error)
type KeyValueStoreImpl ¶
type KeyValueStoreImpl struct {
// contains filtered or unexported fields
}
func (*KeyValueStoreImpl) AtomicRead ¶
func (*KeyValueStoreImpl) AtomicReadRange ¶
func (k *KeyValueStoreImpl) AtomicReadRange(ctx context.Context, table []byte, lkey Key, rkey Key, isSnapshot bool) (AtomicIterator, error)
func (*KeyValueStoreImpl) BeginTx ¶
func (k *KeyValueStoreImpl) BeginTx(ctx context.Context) (Tx, error)
func (KeyValueStoreImpl) CreateTable ¶
func (KeyValueStoreImpl) DeleteRange ¶
func (*KeyValueStoreImpl) GetInternalDatabase ¶
func (k *KeyValueStoreImpl) GetInternalDatabase() (interface{}, error)
func (KeyValueStoreImpl) SetVersionstampedKey ¶
func (KeyValueStoreImpl) SetVersionstampedValue ¶
func (KeyValueStoreImpl) TableSize ¶
TableSize calculates approximate table size in bytes It also works with the prefix of the table name, allowing to calculate sizes of multiple table with the same prefix.
type KeyValueStoreImplWithMetrics ¶
type KeyValueStoreImplWithMetrics struct {
// contains filtered or unexported fields
}
func (*KeyValueStoreImplWithMetrics) AtomicRead ¶
func (*KeyValueStoreImplWithMetrics) AtomicReadRange ¶
func (m *KeyValueStoreImplWithMetrics) AtomicReadRange(ctx context.Context, table []byte, lkey Key, rkey Key, isSnapshot bool) (iter AtomicIterator, err error)
func (*KeyValueStoreImplWithMetrics) BeginTx ¶
func (m *KeyValueStoreImplWithMetrics) BeginTx(ctx context.Context) (Tx, error)
func (*KeyValueStoreImplWithMetrics) CreateTable ¶
func (m *KeyValueStoreImplWithMetrics) CreateTable(ctx context.Context, name []byte) (err error)
func (*KeyValueStoreImplWithMetrics) DeleteRange ¶
func (*KeyValueStoreImplWithMetrics) DropTable ¶
func (m *KeyValueStoreImplWithMetrics) DropTable(ctx context.Context, name []byte) (err error)
func (*KeyValueStoreImplWithMetrics) GetInternalDatabase ¶
func (m *KeyValueStoreImplWithMetrics) GetInternalDatabase() (k interface{}, err error)
func (*KeyValueStoreImplWithMetrics) SetVersionstampedKey ¶
func (*KeyValueStoreImplWithMetrics) SetVersionstampedValue ¶
type NoopEventListener ¶
type NoopEventListener struct{}
func (*NoopEventListener) GetEvents ¶
func (l *NoopEventListener) GetEvents() []*Event
func (*NoopEventListener) OnClearRange ¶
func (l *NoopEventListener) OnClearRange(op string, table []byte, lKey []byte, rKey []byte)
type NoopFDBTypeIterator ¶
type NoopFDBTypeIterator struct{}
func (*NoopFDBTypeIterator) Err ¶
func (n *NoopFDBTypeIterator) Err() error
func (*NoopFDBTypeIterator) Next ¶
func (n *NoopFDBTypeIterator) Next(value *FdbBaseKeyValue[int64]) bool
type NoopIterator ¶
type NoopIterator struct{}
func (*NoopIterator) Err ¶
func (n *NoopIterator) Err() error
func (*NoopIterator) Next ¶
func (n *NoopIterator) Next(value *KeyValue) bool
type NoopKV ¶
type NoopKV struct{}
func (*NoopKV) AtomicRead ¶
func (*NoopKV) AtomicReadRange ¶
func (*NoopKV) DeleteRange ¶
func (*NoopKV) SetVersionstampedKey ¶
func (*NoopKV) SetVersionstampedValue ¶
type NoopKVStore ¶
type NoopKVStore struct {
*NoopKV
}
NoopKVStore is a noop store, useful if we need to profile/debug only compute and not with the storage. This can be initialized in main.go instead of using default kvStore.
func (*NoopKVStore) CreateTable ¶
func (n *NoopKVStore) CreateTable(_ context.Context, _ []byte) error
func (*NoopKVStore) GetInternalDatabase ¶
func (n *NoopKVStore) GetInternalDatabase() (interface{}, error)
type StoreErrCode ¶
type StoreErrCode byte
const ( ErrCodeInvalid StoreErrCode = 0x00 ErrCodeDuplicateKey StoreErrCode = 0x01 ErrCodeConflictingTransaction StoreErrCode = 0x02 ErrCodeTransactionMaxDuration StoreErrCode = 0x03 ErrCodeTransactionTimedOut StoreErrCode = 0x04 ErrCodeTransactionNotCommitted StoreErrCode = 0x05 ErrCodeValueSizeExceeded StoreErrCode = 0x06 ErrCodeTransactionSizeExceeded StoreErrCode = 0x07 )
type StoreError ¶
type StoreError struct {
// contains filtered or unexported fields
}
func (StoreError) Code ¶
func (se StoreError) Code() StoreErrCode
func (StoreError) Error ¶
func (se StoreError) Error() string
func (StoreError) Msg ¶
func (se StoreError) Msg() string
type TxImpl ¶
type TxImpl struct {
// contains filtered or unexported fields
}
func (TxImpl) AtomicRead ¶
func (TxImpl) AtomicReadRange ¶
func (TxImpl) DeleteRange ¶
func (TxImpl) IsRetriable ¶
func (t TxImpl) IsRetriable() bool
IsRetriable returns true if transaction can be retried after error.
func (TxImpl) RangeSize ¶
RangeSize calculates approximate range table size in bytes - this is an estimate and a range smaller than 3mb will not be that accurate.
func (TxImpl) SetVersionstampedKey ¶
func (TxImpl) SetVersionstampedValue ¶
type TxImplWithMetrics ¶
type TxImplWithMetrics struct {
// contains filtered or unexported fields
}
func (*TxImplWithMetrics) AtomicRead ¶
func (*TxImplWithMetrics) AtomicReadRange ¶
func (m *TxImplWithMetrics) AtomicReadRange(ctx context.Context, table []byte, lkey Key, rkey Key, isSnapshot bool) (iter AtomicIterator, err error)
func (*TxImplWithMetrics) Commit ¶
func (m *TxImplWithMetrics) Commit(ctx context.Context) (err error)
func (*TxImplWithMetrics) DeleteRange ¶
func (*TxImplWithMetrics) IsRetriable ¶
func (m *TxImplWithMetrics) IsRetriable() bool
func (*TxImplWithMetrics) Rollback ¶
func (m *TxImplWithMetrics) Rollback(ctx context.Context) (err error)