Documentation
¶
Overview ¶
Package watchable provides a Syncbase-specific store.Store wrapper that provides versioned storage for specified prefixes and maintains a watchable log of operations performed on versioned records. This log forms the basis for the implementation of client-facing watch as well as the sync module's internal watching of store updates.
LogEntry records are stored chronologically, using keys of the form "$log:<seq>". Sequence numbers are zero-padded to ensure that the lexicographic order matches the numeric order.
Version number records are stored using keys of the form "$version:<key>", where <key> is the client-specified key.
TODO(razvanm): Switch to package_test. This is a little involved because createStore is used by the other _test.go files in this directory and TestLogEntryTimestamps is poking inside the watchable store to read the sequence number.
Index ¶
- func AddOp(tx *Transaction, op interface{}, precond func() error) error
- func GetAtVersion(ctx *context.T, st store.StoreReader, key, valbuf, version []byte) ([]byte, error)
- func GetResumeMarker(st store.StoreReader) (watch.ResumeMarker, error)
- func GetVersion(ctx *context.T, st store.StoreReader, key []byte) ([]byte, error)
- func MakeResumeMarker(seq uint64) watch.ResumeMarker
- func ManagesKey(tx *Transaction, key []byte) bool
- func NewVersion() []byte
- func PutAtVersion(ctx *context.T, tx *Transaction, key, valbuf, version []byte) error
- func PutVersion(ctx *context.T, tx *Transaction, key, version []byte) error
- func RunInTransaction(st *Store, fn func(tx *Transaction) error) error
- func SetTransactionFromSync(tx *Transaction)
- func WatchUpdates(st store.Store) (update <-chan struct{}, cancel func())
- type Clock
- type DeleteOp
- type DeleteOpTarget
- func (t *DeleteOpTarget) FinishField(_, _ vdl.Target) error
- func (t *DeleteOpTarget) FinishFields(_ vdl.FieldsTarget) error
- func (t *DeleteOpTarget) StartField(name string) (key, field vdl.Target, _ error)
- func (t *DeleteOpTarget) StartFields(tt *vdl.Type) (vdl.FieldsTarget, error)
- func (t *DeleteOpTarget) ZeroField(name string) error
- type GetOp
- type GetOpTarget
- func (t *GetOpTarget) FinishField(_, _ vdl.Target) error
- func (t *GetOpTarget) FinishFields(_ vdl.FieldsTarget) error
- func (t *GetOpTarget) StartField(name string) (key, field vdl.Target, _ error)
- func (t *GetOpTarget) StartFields(tt *vdl.Type) (vdl.FieldsTarget, error)
- func (t *GetOpTarget) ZeroField(name string) error
- type LogEntry
- type LogEntryTarget
- func (t *LogEntryTarget) FinishField(_, _ vdl.Target) error
- func (t *LogEntryTarget) FinishFields(_ vdl.FieldsTarget) error
- func (t *LogEntryTarget) StartField(name string) (key, field vdl.Target, _ error)
- func (t *LogEntryTarget) StartFields(tt *vdl.Type) (vdl.FieldsTarget, error)
- func (t *LogEntryTarget) ZeroField(name string) error
- type Options
- type PutOp
- type PutOpTarget
- func (t *PutOpTarget) FinishField(_, _ vdl.Target) error
- func (t *PutOpTarget) FinishFields(_ vdl.FieldsTarget) error
- func (t *PutOpTarget) StartField(name string) (key, field vdl.Target, _ error)
- func (t *PutOpTarget) StartFields(tt *vdl.Type) (vdl.FieldsTarget, error)
- func (t *PutOpTarget) ZeroField(name string) error
- type ScanOp
- type ScanOpTarget
- func (t *ScanOpTarget) FinishField(_, _ vdl.Target) error
- func (t *ScanOpTarget) FinishFields(_ vdl.FieldsTarget) error
- func (t *ScanOpTarget) StartField(name string) (key, field vdl.Target, _ error)
- func (t *ScanOpTarget) StartFields(tt *vdl.Type) (vdl.FieldsTarget, error)
- func (t *ScanOpTarget) ZeroField(name string) error
- type Store
- func (st *Store) Close() error
- func (st *Store) Delete(key []byte) error
- func (st *Store) Get(key, valbuf []byte) ([]byte, error)
- func (st *Store) NewSnapshot() store.Snapshot
- func (st *Store) NewTransaction() store.Transaction
- func (st *Store) NewWatchableTransaction() *Transaction
- func (st *Store) Put(key, value []byte) error
- func (st *Store) Scan(start, limit []byte) store.Stream
- type Transaction
- type TransactionOptions
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AddOp ¶
func AddOp(tx *Transaction, op interface{}, precond func() error) error
AddOp provides a generic way to add an arbitrary op to the log. If precond is not nil it will be run with the locks held and the append will only happen if the precond returns an error.
func GetAtVersion ¶
func GetAtVersion(ctx *context.T, st store.StoreReader, key, valbuf, version []byte) ([]byte, error)
GetAtVersion returns the value of a managed key at the requested version. This method is used by the Sync module when the responder needs to send objects over the wire. At minimum, an object implementing the StoreReader interface is required since this is a Get operation. TODO(razvanm): find a way to get rid of the type switch.
func GetResumeMarker ¶
func GetResumeMarker(st store.StoreReader) (watch.ResumeMarker, error)
GetResumeMarker returns the ResumeMarker that points to the current end of the event log.
func GetVersion ¶
GetVersion returns the current version of a managed key. This method is used by the Sync module when the initiator is attempting to add new versions of objects. Reading the version key is used for optimistic concurrency control. At minimum, an object implementing the StoreReader interface is required since this is a Get operation. TODO(razvanm): find a way to get rid of the type switch.
func MakeResumeMarker ¶
func MakeResumeMarker(seq uint64) watch.ResumeMarker
MakeResumeMarker converts a sequence number to the resume marker.
func ManagesKey ¶
func ManagesKey(tx *Transaction, key []byte) bool
ManagesKey returns true if the store used by a transaction manages a particular key.
func NewVersion ¶
func NewVersion() []byte
NewVersion returns a new version for a store entry mutation.
func PutAtVersion ¶
func PutAtVersion(ctx *context.T, tx *Transaction, key, valbuf, version []byte) error
PutAtVersion puts a value for the managed key at the requested version. This method is used by the Sync module exclusively when the initiator adds objects with versions created on other Syncbases. At minimum, an object implementing the Transaction interface is required since this is a Put operation.
func PutVersion ¶
func PutVersion(ctx *context.T, tx *Transaction, key, version []byte) error
PutVersion updates the version of a managed key to the requested version. This method is used by the Sync module exclusively when the initiator selects which of the already stored versions (via PutAtVersion calls) becomes the current version. At minimum, an object implementing the Transaction interface is required since this is a Put operation.
func RunInTransaction ¶
func RunInTransaction(st *Store, fn func(tx *Transaction) error) error
RunInTransaction runs the given fn in a transaction, managing retries and commit/abort.
func SetTransactionFromSync ¶
func SetTransactionFromSync(tx *Transaction)
SetTransactionFromSync marks this transaction as created by sync as opposed to one created by an application. The net effect is that, at commit time, the log entries written are marked as made by sync. This allows the sync Watcher to ignore them (echo suppression) because it made these updates. Note: this is an internal function used by sync, not part of the interface. TODO(rdaoud): support a generic echo-suppression mechanism for apps as well maybe by having a creator ID in the transaction and log entries. TODO(rdaoud): fold this flag (or creator ID) into Tx options when available. TODO(razvanm): move to syncbase side by using a generic annotation mechanism.
func WatchUpdates ¶
WatchUpdates returns a channel that can be used to wait for changes of the database, as well as a cancel function which MUST be called to release the watch resources. If the update channel is closed, the store is closed and no more updates will happen. Otherwise, the channel will have a value available whenever the store has changed since the last receive on the channel.
Types ¶
type DeleteOp ¶
type DeleteOp struct {
Key []byte
}
DeleteOp represents a store delete operation.
func (*DeleteOp) MakeVDLTarget ¶
type DeleteOpTarget ¶
type DeleteOpTarget struct { Value *DeleteOp vdl.TargetBase vdl.FieldsTargetBase // contains filtered or unexported fields }
func (*DeleteOpTarget) FinishField ¶
func (t *DeleteOpTarget) FinishField(_, _ vdl.Target) error
func (*DeleteOpTarget) FinishFields ¶
func (t *DeleteOpTarget) FinishFields(_ vdl.FieldsTarget) error
func (*DeleteOpTarget) StartField ¶
func (t *DeleteOpTarget) StartField(name string) (key, field vdl.Target, _ error)
func (*DeleteOpTarget) StartFields ¶
func (t *DeleteOpTarget) StartFields(tt *vdl.Type) (vdl.FieldsTarget, error)
func (*DeleteOpTarget) ZeroField ¶
func (t *DeleteOpTarget) ZeroField(name string) error
type GetOp ¶
type GetOp struct {
Key []byte
}
GetOp represents a store get operation.
func (*GetOp) MakeVDLTarget ¶
type GetOpTarget ¶
type GetOpTarget struct { Value *GetOp vdl.TargetBase vdl.FieldsTargetBase // contains filtered or unexported fields }
func (*GetOpTarget) FinishField ¶
func (t *GetOpTarget) FinishField(_, _ vdl.Target) error
func (*GetOpTarget) FinishFields ¶
func (t *GetOpTarget) FinishFields(_ vdl.FieldsTarget) error
func (*GetOpTarget) StartField ¶
func (t *GetOpTarget) StartField(name string) (key, field vdl.Target, _ error)
func (*GetOpTarget) StartFields ¶
func (t *GetOpTarget) StartFields(tt *vdl.Type) (vdl.FieldsTarget, error)
func (*GetOpTarget) ZeroField ¶
func (t *GetOpTarget) ZeroField(name string) error
type LogEntry ¶
type LogEntry struct { // The store operation that was performed. Op *vom.RawBytes // Time when the operation was committed in nanoseconds since the epoch. // Note: We don't use time.Time here because VDL's time.Time consists of // {Seconds int64, Nanos int32}, which is more expensive than a single int64. CommitTimestamp int64 // Operation came from sync (used for echo suppression). // TODO(razvanm): this field is specific to syncbase. We should add a // generic way to add fields and use that instead. FromSync bool // If true, this entry is followed by more entries that belong to the same // commit as this entry. Continued bool }
LogEntry represents a single store operation. This operation may have been part of a transaction, as signified by the Continued boolean. Read-only operations (and read-only transactions) are not logged.
func ReadBatchFromLog ¶
func ReadBatchFromLog(st store.Store, resumeMarker watch.ResumeMarker) ([]*LogEntry, watch.ResumeMarker, error)
ReadBatchFromLog returns a batch of watch log records (a transaction) from the given database and the new resume marker at the end of the batch.
func (*LogEntry) MakeVDLTarget ¶
type LogEntryTarget ¶
type LogEntryTarget struct { Value *LogEntry vdl.TargetBase vdl.FieldsTargetBase // contains filtered or unexported fields }
func (*LogEntryTarget) FinishField ¶
func (t *LogEntryTarget) FinishField(_, _ vdl.Target) error
func (*LogEntryTarget) FinishFields ¶
func (t *LogEntryTarget) FinishFields(_ vdl.FieldsTarget) error
func (*LogEntryTarget) StartField ¶
func (t *LogEntryTarget) StartField(name string) (key, field vdl.Target, _ error)
func (*LogEntryTarget) StartFields ¶
func (t *LogEntryTarget) StartFields(tt *vdl.Type) (vdl.FieldsTarget, error)
func (*LogEntryTarget) ZeroField ¶
func (t *LogEntryTarget) ZeroField(name string) error
type Options ¶
type Options struct { // Key prefixes to version and log. If nil, all keys are managed. ManagedPrefixes []string }
Options configures a Store.
type PutOp ¶
PutOp represents a store put operation. The new version is written instead of the value to avoid duplicating the user data in the store. The version is used to access the user data of that specific mutation.
func (*PutOp) MakeVDLTarget ¶
type PutOpTarget ¶
type PutOpTarget struct { Value *PutOp vdl.TargetBase vdl.FieldsTargetBase // contains filtered or unexported fields }
func (*PutOpTarget) FinishField ¶
func (t *PutOpTarget) FinishField(_, _ vdl.Target) error
func (*PutOpTarget) FinishFields ¶
func (t *PutOpTarget) FinishFields(_ vdl.FieldsTarget) error
func (*PutOpTarget) StartField ¶
func (t *PutOpTarget) StartField(name string) (key, field vdl.Target, _ error)
func (*PutOpTarget) StartFields ¶
func (t *PutOpTarget) StartFields(tt *vdl.Type) (vdl.FieldsTarget, error)
func (*PutOpTarget) ZeroField ¶
func (t *PutOpTarget) ZeroField(name string) error
type ScanOpTarget ¶
type ScanOpTarget struct { Value *ScanOp vdl.TargetBase vdl.FieldsTargetBase // contains filtered or unexported fields }
func (*ScanOpTarget) FinishField ¶
func (t *ScanOpTarget) FinishField(_, _ vdl.Target) error
func (*ScanOpTarget) FinishFields ¶
func (t *ScanOpTarget) FinishFields(_ vdl.FieldsTarget) error
func (*ScanOpTarget) StartField ¶
func (t *ScanOpTarget) StartField(name string) (key, field vdl.Target, _ error)
func (*ScanOpTarget) StartFields ¶
func (t *ScanOpTarget) StartFields(tt *vdl.Type) (vdl.FieldsTarget, error)
func (*ScanOpTarget) ZeroField ¶
func (t *ScanOpTarget) ZeroField(name string) error
type Store ¶
type Store struct { // TODO(razvanm): make the clock private. The clock is used only by the // addSyncgroupLogRec function from the vsync package. Clock Clock // used to provide write timestamps // contains filtered or unexported fields }
func (*Store) NewSnapshot ¶
NewSnapshot implements the store.Store interface.
func (*Store) NewTransaction ¶
func (st *Store) NewTransaction() store.Transaction
NewTransaction implements the store.Store interface.
func (*Store) NewWatchableTransaction ¶
func (st *Store) NewWatchableTransaction() *Transaction
NewWatchableTransaction implements the Store interface.
type Transaction ¶
type Transaction struct { St *Store // contains filtered or unexported fields }
func (*Transaction) Abort ¶
func (tx *Transaction) Abort() error
Abort implements the store.Transaction interface.
func (*Transaction) Commit ¶
func (tx *Transaction) Commit() error
Commit implements the store.Transaction interface.
func (*Transaction) Delete ¶
func (tx *Transaction) Delete(key []byte) error
Delete implements the store.StoreWriter interface.
func (*Transaction) Get ¶
func (tx *Transaction) Get(key, valbuf []byte) ([]byte, error)
Get implements the store.StoreReader interface.
func (*Transaction) Put ¶
func (tx *Transaction) Put(key, value []byte) error
Put implements the store.StoreWriter interface.
type TransactionOptions ¶
type TransactionOptions struct {
NumAttempts int // number of attempts; only used by RunInTransaction
}
TODO(razvanm): This is copied from store/util.go. TODO(sadovsky): Move this to model.go and make it an argument to Store.NewTransaction.