Documentation ¶
Overview ¶
Package watchable provides a Syncbase-specific store.Store wrapper that provides versioned storage for specified prefixes and maintains a watchable log of operations performed on versioned records. This log forms the basis for the implementation of client-facing watch as well as the sync module's internal watching of store updates.
LogEntry records are stored chronologically, using keys of the form "$log:<seq>". Sequence numbers are zero-padded to ensure that the lexicographic order matches the numeric order.
Version number records are stored using keys of the form "$version:<key>", where <key> is the client-specified key.
TODO(razvanm): Switch to package_test. This is a little involved because createStore is used by the other _test.go files in this directory and TestLogEntryTimestamps is poking inside the watchable store to read the sequence number.
Index ¶
- func AddOp(tx *Transaction, op interface{}, precond func() error) error
- func GetAtVersion(ctx *context.T, st store.StoreReader, key, valbuf, version []byte) ([]byte, error)
- func GetResumeMarker(sntx store.SnapshotOrTransaction) (watch.ResumeMarker, error)
- func GetVersion(ctx *context.T, st store.StoreReader, key []byte) ([]byte, error)
- func MakeResumeMarker(seq uint64) watch.ResumeMarker
- func ManagesKey(tx *Transaction, key []byte) bool
- func NewVersion() []byte
- func PutAtVersion(ctx *context.T, tx *Transaction, key, valbuf, version []byte) error
- func PutVersion(ctx *context.T, tx *Transaction, key, version []byte) error
- func RunInTransaction(st *Store, fn func(tx *Transaction) error) error
- func SetTransactionFromSync(tx *Transaction)
- type Client
- type Clock
- type DeleteOp
- type GetOp
- type LogEntry
- type Options
- type PutOp
- type ScanOp
- type Store
- func (st *Store) Close() error
- func (st *Store) Delete(key []byte) error
- func (st *Store) Get(key, valbuf []byte) ([]byte, error)
- func (st *Store) NewSnapshot() store.Snapshot
- func (st *Store) NewTransaction() store.Transaction
- func (st *Store) NewWatchableTransaction() *Transaction
- func (st *Store) Put(key, value []byte) error
- func (st *Store) Scan(start, limit []byte) store.Stream
- func (st *Store) UpdateLogStart(syncMarker watch.ResumeMarker) (watch.ResumeMarker, error)
- func (st *Store) WatchUpdates(resumeMarker watch.ResumeMarker) (_ *Client, cancel func())
- type Transaction
- type TransactionOptions
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AddOp ¶
func AddOp(tx *Transaction, op interface{}, precond func() error) error
AddOp provides a generic way to add an arbitrary op to the log. If precond is not nil it will be run with the locks held and the append will only happen if the precond returns an error.
func GetAtVersion ¶
func GetAtVersion(ctx *context.T, st store.StoreReader, key, valbuf, version []byte) ([]byte, error)
GetAtVersion returns the value of a managed key at the requested version. This method is used by the Sync module when the responder needs to send objects over the wire. At minimum, an object implementing the StoreReader interface is required since this is a Get operation. TODO(razvanm): find a way to get rid of the type switch.
func GetResumeMarker ¶
func GetResumeMarker(sntx store.SnapshotOrTransaction) (watch.ResumeMarker, error)
GetResumeMarker returns the ResumeMarker that points to the current end of the event log.
func GetVersion ¶
GetVersion returns the current version of a managed key. This method is used by the Sync module when the initiator is attempting to add new versions of objects. Reading the version key is used for optimistic concurrency control. At minimum, an object implementing the StoreReader interface is required since this is a Get operation. TODO(razvanm): find a way to get rid of the type switch.
func MakeResumeMarker ¶
func MakeResumeMarker(seq uint64) watch.ResumeMarker
MakeResumeMarker converts a sequence number to the resume marker.
func ManagesKey ¶
func ManagesKey(tx *Transaction, key []byte) bool
ManagesKey returns true if the store used by a transaction manages a particular key.
func NewVersion ¶
func NewVersion() []byte
NewVersion returns a new version for a store entry mutation.
func PutAtVersion ¶
func PutAtVersion(ctx *context.T, tx *Transaction, key, valbuf, version []byte) error
PutAtVersion puts a value for the managed key at the requested version. This method is used by the Sync module exclusively when the initiator adds objects with versions created on other Syncbases. At minimum, an object implementing the Transaction interface is required since this is a Put operation.
func PutVersion ¶
func PutVersion(ctx *context.T, tx *Transaction, key, version []byte) error
PutVersion updates the version of a managed key to the requested version. This method is used by the Sync module exclusively when the initiator selects which of the already stored versions (via PutAtVersion calls) becomes the current version. At minimum, an object implementing the Transaction interface is required since this is a Put operation.
func RunInTransaction ¶
func RunInTransaction(st *Store, fn func(tx *Transaction) error) error
RunInTransaction runs the given fn in a transaction, managing retries and commit/abort.
func SetTransactionFromSync ¶
func SetTransactionFromSync(tx *Transaction)
SetTransactionFromSync marks this transaction as created by sync as opposed to one created by an application. The net effect is that, at commit time, the log entries written are marked as made by sync. This allows the sync Watcher to ignore them (echo suppression) because it made these updates. Note: this is an internal function used by sync, not part of the interface. TODO(rdaoud): support a generic echo-suppression mechanism for apps as well maybe by having a creator ID in the transaction and log entries. TODO(rdaoud): fold this flag (or creator ID) into Tx options when available. TODO(razvanm): move to syncbase side by using a generic annotation mechanism.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client encapsulates a channel used to notify watch clients of store updates and an iterator over the watch log.
func (*Client) Err ¶
Err returns the error that caused the client to stop watching. If the error is nil, the client is active. Otherwise: * ErrCanceled - watch was canceled by the client. * ErrAborted - watcher was closed (store was closed, possibly destroyed). * ErrUnknownResumeMarker - watch was started with an invalid or too old resume marker. * other errors - NextBatchFromLog encountered an error.
func (*Client) NextBatchFromLog ¶
NextBatchFromLog returns the next batch of watch log records (transaction) from the given database and the resume marker at the end of the batch. If there is no batch available, it returns a nil slice and the same resume marker as the previous NextBatchFromLog call. The returned log entries are guaranteed to point to existing data versions until either the client is stopped or NextBatchFromLog is called again. If the client is stopped, NextBatchFromLog returns the same error as Err.
func (*Client) Wait ¶
func (c *Client) Wait() <-chan struct{}
Wait returns the update channel that can be used to wait for new changes in the store. If the update channel is closed, the client is stopped and no more updates will happen. Otherwise, the channel will have a value available whenever the store has changed since the last receive on the channel.
type DeleteOp ¶
type DeleteOp struct {
Key []byte
}
DeleteOp represents a store delete operation.
func (DeleteOp) VDLReflect ¶
type GetOp ¶
type GetOp struct {
Key []byte
}
GetOp represents a store get operation.
func (GetOp) VDLReflect ¶
type LogEntry ¶
type LogEntry struct { // The store operation that was performed. Op *vom.RawBytes // Time when the operation was committed in nanoseconds since the epoch. // Note: We don't use time.Time here because VDL's time.Time consists of // {Seconds int64, Nanos int32}, which is more expensive than a single int64. CommitTimestamp int64 // Operation came from sync (used for echo suppression). // TODO(razvanm): this field is specific to syncbase. We should add a // generic way to add fields and use that instead. FromSync bool // If true, this entry is followed by more entries that belong to the same // commit as this entry. Continued bool }
LogEntry represents a single store operation. This operation may have been part of a transaction, as signified by the Continued boolean. Read-only operations (and read-only transactions) are not logged.
func (LogEntry) VDLReflect ¶
type Options ¶
type Options struct { // Key prefixes to version and log. ManagedPrefixes []string }
Options configures a Store.
type PutOp ¶
PutOp represents a store put operation. The new version is written instead of the value to avoid duplicating the user data in the store. The version is used to access the user data of that specific mutation.
func (PutOp) VDLReflect ¶
type Store ¶
type Store struct { // TODO(razvanm): make the clock private. The clock is used only by the // addSyncgroupLogRec function from the vsync package. Clock Clock // used to provide write timestamps // contains filtered or unexported fields }
func (*Store) NewSnapshot ¶
NewSnapshot implements the store.Store interface.
func (*Store) NewTransaction ¶
func (st *Store) NewTransaction() store.Transaction
NewTransaction implements the store.Store interface.
func (*Store) NewWatchableTransaction ¶
func (st *Store) NewWatchableTransaction() *Transaction
NewWatchableTransaction implements the Store interface.
func (*Store) UpdateLogStart ¶
func (st *Store) UpdateLogStart(syncMarker watch.ResumeMarker) (watch.ResumeMarker, error)
UpdateLogStart takes as input the resume marker of the sync watcher and returns the new log start, computed as the earliest resume marker of all active watchers including the sync watcher. The new log start is persisted before being returned, making it safe to garbage collect earlier log entries. syncMarker is assumed to monotonically increase, always remaining between the log start and end (inclusive).
func (*Store) WatchUpdates ¶
func (st *Store) WatchUpdates(resumeMarker watch.ResumeMarker) (_ *Client, cancel func())
WatchUpdates returns a Client which supports waiting for changes and iterating over the watch log starting from resumeMarker, as well as a cancel function which MUST be called to release watch resources. Returns a stopped Client if the resume marker is invalid or pointing to an already garbage collected segment of the log.
type Transaction ¶
type Transaction struct { St *Store // contains filtered or unexported fields }
func (*Transaction) Abort ¶
func (tx *Transaction) Abort() error
Abort implements the store.Transaction interface.
func (*Transaction) Commit ¶
func (tx *Transaction) Commit() error
Commit implements the store.Transaction interface.
func (*Transaction) Delete ¶
func (tx *Transaction) Delete(key []byte) error
Delete implements the store.StoreWriter interface.
func (*Transaction) Get ¶
func (tx *Transaction) Get(key, valbuf []byte) ([]byte, error)
Get implements the store.StoreReader interface.
func (*Transaction) Put ¶
func (tx *Transaction) Put(key, value []byte) error
Put implements the store.StoreWriter interface.
type TransactionOptions ¶
type TransactionOptions struct {
NumAttempts int // number of attempts; only used by RunInTransaction
}
TODO(razvanm): This is copied from store/util.go. TODO(sadovsky): Move this to model.go and make it an argument to Store.NewTransaction.