Documentation ¶
Overview ¶
Package storage provides access to the Store and Range abstractions. Each Cockroach node handles one or more stores, each of which multiplexes to one or more ranges, identified by [start, end) keys. Ranges are contiguous regions of the keyspace. Each range implements an instance of the Raft consensus algorithm to synchronize participating range replicas.
Each store is represented by a single engine.Engine instance. The ranges hosted by a store all have access to the same engine, but write to only a range-limited keyspace within it. Ranges access the underlying engine via the MVCC interface, which provides historical versioned values.
Index ¶
- Constants
- Variables
- func InsertRange(txn *client.Txn, key proto.Key) error
- func ProcessStoreEvents(l StoreEventListener, sub *util.Subscription)
- func SetupRangeTree(batch engine.Engine, ms *proto.MVCCStats, timestamp proto.Timestamp, ...) error
- type AddRangeEvent
- type BeginScanRangesEvent
- type CommandQueue
- type EndScanRangesEvent
- type FindStoreFunc
- type IDAllocator
- type MergeRangeEvent
- type NotBootstrappedError
- type PrefixConfig
- type PrefixConfigMap
- func (p PrefixConfigMap) Len() int
- func (p PrefixConfigMap) Less(i, j int) bool
- func (p PrefixConfigMap) MatchByPrefix(key proto.Key) *PrefixConfig
- func (p PrefixConfigMap) MatchesByPrefix(key proto.Key) []*PrefixConfig
- func (p PrefixConfigMap) SplitRangeByPrefixes(start, end proto.Key) ([]*RangeResult, error)
- func (p PrefixConfigMap) Swap(i, j int)
- func (p PrefixConfigMap) VisitPrefixes(start, end proto.Key, ...) error
- func (p PrefixConfigMap) VisitPrefixesHierarchically(key proto.Key, ...) error
- type Range
- func (r *Range) AddCmd(call client.Call, wait bool) error
- func (r *Range) AdminMerge(args *proto.AdminMergeRequest, reply *proto.AdminMergeResponse)
- func (r *Range) AdminSplit(args *proto.AdminSplitRequest, reply *proto.AdminSplitResponse)
- func (r *Range) Append(entries []raftpb.Entry) error
- func (r *Range) ApplySnapshot(snap raftpb.Snapshot) error
- func (r *Range) ChangeReplicas(changeType proto.ReplicaChangeType, replica proto.Replica) error
- func (r *Range) ConditionalPut(batch engine.Engine, ms *proto.MVCCStats, args *proto.ConditionalPutRequest, ...)
- func (r *Range) Contains(batch engine.Engine, args *proto.ContainsRequest, ...)
- func (r *Range) ContainsKey(key proto.Key) bool
- func (r *Range) ContainsKeyRange(start, end proto.Key) bool
- func (r *Range) Delete(batch engine.Engine, ms *proto.MVCCStats, args *proto.DeleteRequest, ...)
- func (r *Range) DeleteRange(batch engine.Engine, ms *proto.MVCCStats, args *proto.DeleteRangeRequest, ...)
- func (r *Range) Desc() *proto.RangeDescriptor
- func (r *Range) Destroy() error
- func (r *Range) EndTransaction(batch engine.Engine, ms *proto.MVCCStats, args *proto.EndTransactionRequest, ...)
- func (r *Range) Entries(lo, hi, maxBytes uint64) ([]raftpb.Entry, error)
- func (r *Range) FirstIndex() (uint64, error)
- func (r *Range) Get(batch engine.Engine, args *proto.GetRequest, reply *proto.GetResponse)
- func (r *Range) GetGCMetadata() (*proto.GCMetadata, error)
- func (r *Range) GetLastVerificationTimestamp() (proto.Timestamp, error)
- func (r *Range) GetMVCCStats() proto.MVCCStats
- func (r *Range) GetMaxBytes() int64
- func (r *Range) GetReplica() *proto.Replica
- func (r *Range) HasLeaderLease(timestamp proto.Timestamp) (bool, bool)
- func (r *Range) Increment(batch engine.Engine, ms *proto.MVCCStats, args *proto.IncrementRequest, ...)
- func (r *Range) InitialState() (raftpb.HardState, raftpb.ConfState, error)
- func (r *Range) InternalGC(batch engine.Engine, ms *proto.MVCCStats, args *proto.InternalGCRequest, ...)
- func (r *Range) InternalHeartbeatTxn(batch engine.Engine, ms *proto.MVCCStats, ...)
- func (r *Range) InternalLeaderLease(batch engine.Engine, ms *proto.MVCCStats, ...)
- func (r *Range) InternalMerge(batch engine.Engine, ms *proto.MVCCStats, args *proto.InternalMergeRequest, ...)
- func (r *Range) InternalPushTxn(batch engine.Engine, ms *proto.MVCCStats, args *proto.InternalPushTxnRequest, ...)
- func (r *Range) InternalRangeLookup(batch engine.Engine, args *proto.InternalRangeLookupRequest, ...)
- func (r *Range) InternalResolveIntent(batch engine.Engine, ms *proto.MVCCStats, ...)
- func (r *Range) InternalTruncateLog(batch engine.Engine, ms *proto.MVCCStats, ...)
- func (r *Range) IsFirstRange() bool
- func (r *Range) LastIndex() (uint64, error)
- func (r *Range) Put(batch engine.Engine, ms *proto.MVCCStats, args *proto.PutRequest, ...)
- func (r *Range) Scan(batch engine.Engine, args *proto.ScanRequest, reply *proto.ScanResponse)
- func (r *Range) SetDesc(desc *proto.RangeDescriptor)
- func (r *Range) SetHardState(st raftpb.HardState) error
- func (r *Range) SetLastVerificationTimestamp(timestamp proto.Timestamp) error
- func (r *Range) SetMaxBytes(maxBytes int64)
- func (r *Range) Snapshot() (raftpb.Snapshot, error)
- func (r *Range) String() string
- func (r *Range) Term(i uint64) (uint64, error)
- func (r *Range) WaitForLeaderLease(t util.Tester)
- type RangeResult
- type RangeSlice
- type RemoveRangeEvent
- type ResponseCache
- func (rc *ResponseCache) ClearData() error
- func (rc *ResponseCache) CopyFrom(e engine.Engine, originRaftID int64) error
- func (rc *ResponseCache) CopyInto(e engine.Engine, destRaftID int64) error
- func (rc *ResponseCache) GetResponse(cmdID proto.ClientCmdID, reply proto.Response) (bool, error)
- func (rc *ResponseCache) PutResponse(cmdID proto.ClientCmdID, reply proto.Response) error
- type SplitRangeEvent
- type StartStoreEvent
- type Store
- func (s *Store) AddRange(rng *Range) error
- func (s *Store) AppliedIndex(groupID uint64) (uint64, error)
- func (s *Store) Attrs() proto.Attributes
- func (s *Store) Bootstrap(ident proto.StoreIdent, stopper *util.Stopper) error
- func (s *Store) BootstrapRange() error
- func (s *Store) Capacity() (proto.StoreCapacity, error)
- func (s *Store) Clock() *hlc.Clock
- func (s *Store) ClusterID() string
- func (s *Store) DB() *client.KV
- func (s *Store) Descriptor() (*proto.StoreDescriptor, error)
- func (s *Store) Engine() engine.Engine
- func (s *Store) EventFeed() StoreEventFeed
- func (s *Store) ExecuteCmd(call client.Call) error
- func (s *Store) ForceRangeGCScan(t util.Tester)
- func (s *Store) ForceReplicationScan(t util.Tester)
- func (s *Store) GetRange(raftID int64) (*Range, error)
- func (s *Store) GetStatus() (*proto.StoreStatus, error)
- func (s *Store) Gossip() *gossip.Gossip
- func (s *Store) GossipCapacity()
- func (s *Store) GroupStorage(groupID uint64) multiraft.WriteableGroupStorage
- func (s *Store) IsStarted() bool
- func (s *Store) LookupRange(start, end proto.Key) *Range
- func (s *Store) MergeRange(subsumingRng *Range, updatedEndKey proto.Key, subsumedRaftID int64) error
- func (s *Store) NewRangeDescriptor(start, end proto.Key, replicas []proto.Replica) (*proto.RangeDescriptor, error)
- func (s *Store) NewSnapshot() engine.Engine
- func (s *Store) ProposeRaftCommand(idKey cmdIDKey, cmd proto.InternalRaftCommand) <-chan error
- func (s *Store) RaftNodeID() proto.RaftNodeID
- func (s *Store) RemoveRange(rng *Range) error
- func (s *Store) SetRangeRetryOptions(ro util.RetryOptions)
- func (s *Store) SplitRange(origRng, newRng *Range) error
- func (s *Store) Start(stopper *util.Stopper) error
- func (s *Store) Stopper() *util.Stopper
- func (s *Store) StoreID() proto.StoreID
- func (s *Store) String() string
- func (s *Store) WaitForInit()
- func (s *Store) WaitForRangeScanCompletion() int64
- type StoreContext
- type StoreEventFeed
- type StoreEventListener
- type StoreFinder
- type TimestampCache
- func (tc *TimestampCache) Add(start, end proto.Key, timestamp proto.Timestamp, txnID []byte, readOnly bool)
- func (tc *TimestampCache) Clear(clock *hlc.Clock)
- func (tc *TimestampCache) GetMax(start, end proto.Key, txnID []byte) (proto.Timestamp, proto.Timestamp)
- func (tc *TimestampCache) MergeInto(dest *TimestampCache, clear bool)
- func (tc *TimestampCache) SetLowWater(lowWater proto.Timestamp)
- type UpdateRangeEvent
Constants ¶
const ( // UserRoot is the username for the root user. UserRoot = "root" // GCResponseCacheExpiration is the expiration duration for response // cache entries. GCResponseCacheExpiration = 1 * time.Hour )
const ( // DefaultLeaderLeaseDuration is the default duration of the leader lease. DefaultLeaderLeaseDuration = time.Second )
raftInitialLogIndex is the starting point for the raft log. We bootstrap the raft membership by synthesizing a snapshot as if there were some discarded prefix to the log, so we must begin the log at an arbitrary index greater than 1.
const ( // MinTSCacheWindow specifies the minimum duration to hold entries in // the cache before allowing eviction. After this window expires, // transactions writing to this node with timestamps lagging by more // than minCacheWindow will necessarily have to advance their commit // timestamp. MinTSCacheWindow = 10 * time.Second )
Variables ¶
var ( // DefaultHeartbeatInterval is how often heartbeats are sent from the // transaction coordinator to a live transaction. These keep it from // being preempted by other transactions writing the same keys. If a // transaction fails to be heartbeat within 2x the heartbeat interval, // it may be aborted by conflicting txns. DefaultHeartbeatInterval = 5 * time.Second )
var IDAllocationRetryOpts = util.RetryOptions{ Backoff: 50 * time.Millisecond, MaxBackoff: 5 * time.Second, Constant: 2, MaxAttempts: 0, }
IDAllocationRetryOpts sets the retry options for handling RaftID allocation errors.
var ( // TestStoreContext has some fields initialized with values relevant // in tests. TestStoreContext = StoreContext{ RaftTickInterval: 100 * time.Millisecond, RaftHeartbeatIntervalTicks: 1, RaftElectionTimeoutTicks: 2, ScanInterval: 10 * time.Minute, Context: context.Background(), } )
TestingCommandFilter may be set in tests to intercept the handling of commands and artificially generate errors. Return true to terminate processing with the filled-in response, or false to continue with regular processing. Note that in a multi-replica test this filter will be run once for each replica and must produce consistent results each time. Should only be used in tests in the storage package but needs to be exported due to circular import issues.
Functions ¶
func InsertRange ¶
InsertRange adds a new range to the RangeTree. This should only be called from operations that create new ranges, such as range.splitTrigger. TODO(bram): Can we optimize this by inserting as a child of the range being split?
func ProcessStoreEvents ¶
func ProcessStoreEvents(l StoreEventListener, sub *util.Subscription)
ProcessStoreEvents reads store events from the supplied channel and passes them to the correct methods of the supplied StoreEventListener. This method will run until the Subscription's events channel is closed.
Types ¶
type AddRangeEvent ¶
type AddRangeEvent struct { StoreID proto.StoreID Desc *proto.RangeDescriptor Stats proto.MVCCStats }
AddRangeEvent occurs when a new range is added to a store. This event includes the Range's RangeDescriptor and current MVCCStats.
type BeginScanRangesEvent ¶
BeginScanRangesEvent occurs when the store is about to scan over all ranges. During such a scan, each existing range will be published to the feed as a AddRangeEvent. This is used because downstream consumers may be tracking statistics via the Deltas in UpdateRangeEvent; this event informs subscribers to clear currently cached values.
type CommandQueue ¶
type CommandQueue struct {
// contains filtered or unexported fields
}
A CommandQueue maintains an interval tree of keys or key ranges for executing commands. New commands affecting keys or key ranges must wait on already-executing commands which overlap their key range.
Before executing, a command invokes GetWait() to initialize a WaitGroup with the number of overlapping commands which are already running. The wait group is waited on by the caller for confirmation that all overlapping, pending commands have completed and the pending command can proceed.
After waiting, a command is added to the queue's already-executing set via Add(). Add accepts a parameter indicating whether the command is read-only. Read-only commands don't need to wait on other read-only commands, so the wait group returned via GetWait() doesn't include read-only on read-only overlapping commands as an optimization.
Once commands complete, Remove() is invoked to remove the executing command and decrement the counts on any pending WaitGroups, possibly signaling waiting commands who were gated by the executing command's affected key(s).
CommandQueue is not thread safe.
func NewCommandQueue ¶
func NewCommandQueue() *CommandQueue
NewCommandQueue returns a new command queue.
func (*CommandQueue) Add ¶
func (cq *CommandQueue) Add(start, end proto.Key, readOnly bool) interface{}
Add adds a command to the queue which affects the specified key range. If end is empty, it is set to start.Next(), meaning the command affects a single key. The returned interface is the key for the command queue and must be re-supplied on subsequent invocation of Remove().
Add should be invoked after waiting on already-executing, overlapping commands via the WaitGroup initialized through GetWait().
func (*CommandQueue) Clear ¶
func (cq *CommandQueue) Clear()
Clear removes all executing commands, signaling any waiting commands.
func (*CommandQueue) GetWait ¶
GetWait initializes the supplied wait group with the number of executing commands which overlap the specified key range. If end is empty, end is set to start.Next(), meaning the command affects a single key. The caller should call wg.Wait() to wait for confirmation that all gating commands have completed or failed. readOnly is true if the requester is a read-only command; false for read-write.
func (*CommandQueue) Remove ¶
func (cq *CommandQueue) Remove(key interface{})
Remove is invoked to signal that the command associated with the specified key has completed and should be removed. Any pending commands waiting on this command will be signaled if this is the only command upon which they are still waiting.
Remove is invoked after a mutating command has been committed to the Raft log and applied to the underlying state machine. Similarly, Remove is invoked after a read-only command has been executed against the underlying state machine.
type EndScanRangesEvent ¶
EndScanRangesEvent occurs when the store has finished scanning all ranges. Every BeginScanRangeEvent will eventually be followed by an EndScanRangeEvent.
type FindStoreFunc ¶
type FindStoreFunc func(proto.Attributes) ([]*proto.StoreDescriptor, error)
FindStoreFunc finds the disks in a datacenter that have the requested attributes.
type IDAllocator ¶
type IDAllocator struct {
// contains filtered or unexported fields
}
An IDAllocator is used to increment a key in allocation blocks of arbitrary size starting at a minimum ID.
func NewIDAllocator ¶
func NewIDAllocator(idKey proto.Key, db *client.KV, minID int64, blockSize int64, stopper *util.Stopper) (*IDAllocator, error)
NewIDAllocator creates a new ID allocator which increments the specified key in allocation blocks of size blockSize, with allocated IDs starting at minID. Allocated IDs are positive integers.
func (*IDAllocator) Allocate ¶
func (ia *IDAllocator) Allocate() (int64, error)
Allocate allocates a new ID from the global KV DB.
type MergeRangeEvent ¶
type MergeRangeEvent struct { StoreID proto.StoreID Merged UpdateRangeEvent Removed RemoveRangeEvent }
MergeRangeEvent occurs whenever a range is merged into another. This Event contains two component events: an UpdateRangeEvent for the range which subsumed the other, and a RemoveRangeEvent for the range that was subsumed.
type NotBootstrappedError ¶
type NotBootstrappedError struct{}
A NotBootstrappedError indicates that an engine has not yet been bootstrapped due to a store identifier not being present.
func (*NotBootstrappedError) Error ¶
func (e *NotBootstrappedError) Error() string
Error formats error.
type PrefixConfig ¶
type PrefixConfig struct { Prefix proto.Key // the prefix the config affects Canonical proto.Key // the prefix for the canonical config, if applicable Config interface{} // the config object }
PrefixConfig relate a string prefix to a config object. Config objects include accounting, permissions, and zones. PrefixConfig objects are the constituents of PrefixConfigMap objects. In order to support binary searches of hierarchical prefixes (see the comments in NewPrefixConfigMap), PrefixConfig objects are additionally added to a PrefixConfigMap to demarcate the end of a prefix range. Such end-of-range sentinels need to refer back to the next "higher-up" prefix in the hierarchy (many times this is the default prefix which covers the entire range of keys). The Canonical key refers to this "higher-up" PrefixConfig by specifying its prefix so it can be binary searched from within a PrefixConfigMap.
func (*PrefixConfig) String ¶
func (pc *PrefixConfig) String() string
String returns a human readable description.
type PrefixConfigMap ¶
type PrefixConfigMap []*PrefixConfig
PrefixConfigMap is a slice of prefix configs, sorted by prefix. Along with various accessor methods, the config map also contains additional prefix configs in the slice to account for the ends of prefix ranges.
func NewPrefixConfigMap ¶
func NewPrefixConfigMap(configs []*PrefixConfig) (PrefixConfigMap, error)
NewPrefixConfigMap creates a new prefix config map and sorts the entries by key prefix and then adds additional entries to mark the ends of each key prefix range. For example, if the map contains entries for:
"/": config1 "/db1": config2 "/db1/table": config3 "/db3": config4
...then entries will be added for:
"/db1/tablf": config2 "/db2": config1 "/db4": config1
These additional entries allow for simple lookups by prefix and provide a way to split a range by prefixes which affect it. This last is necessary for accounting and zone configs; ranges must not span accounting or zone config boundaries.
Similarly, if the map contains successive prefix entries:
"/": config1 "/db1": config2 "/db1/table1": config3 "/db1/table2": config4 "/db2": config5
...then entries will be added for (note that we don't add a redundant entry for /db2 or /db1/table2).:
"/db1/table3": config2 "/db3": config1
func (PrefixConfigMap) Less ¶
func (p PrefixConfigMap) Less(i, j int) bool
func (PrefixConfigMap) MatchByPrefix ¶
func (p PrefixConfigMap) MatchByPrefix(key proto.Key) *PrefixConfig
MatchByPrefix returns the longest matching PrefixConfig. If the key specified does not match an existing prefix, a panic will result. Based on the comments in build(), that example will have a final list of PrefixConfig entries which look like:
"/": config1 "/db1": config2 "/db1/table": config3 "/db1/tablf": config2 "/db2": config1 "/db3": config4 "/db4": config1
To find the longest matching prefix, we take the lower bound of the specified key.
func (PrefixConfigMap) MatchesByPrefix ¶
func (p PrefixConfigMap) MatchesByPrefix(key proto.Key) []*PrefixConfig
MatchesByPrefix returns a list of PrefixConfig objects with prefixes satisfying the specified key. The results are returned in order of longest matching prefix to shortest.
func (PrefixConfigMap) SplitRangeByPrefixes ¶
func (p PrefixConfigMap) SplitRangeByPrefixes(start, end proto.Key) ([]*RangeResult, error)
SplitRangeByPrefixes returns a list of key ranges with corresponding configs. The split is done using matching prefix config entries. For example, consider the following set of configs and prefixes:
/: config1 /db1: config2
A range containing keys from /0 - /db3 will map to the following split ranges and corresponding configs:
/0 - /db1: config1 /db1 - /db2: config2 /db2 - /db3: config1
After calling PrefixConfigMap.build(), our prefixes will look like:
/: config1 /db1: config2 /db2: config1
The algorithm is straightforward for splitting a range by existing prefixes. Lookup start key; that is first config. Lookup end key: that is last config. We then step through the intervening PrefixConfig records and create a RangeResult for each.
func (PrefixConfigMap) Swap ¶
func (p PrefixConfigMap) Swap(i, j int)
func (PrefixConfigMap) VisitPrefixes ¶
func (p PrefixConfigMap) VisitPrefixes(start, end proto.Key, visitor func(start, end proto.Key, config interface{}) (bool, error)) error
VisitPrefixes invokes the visitor function for each prefix overlapped by the specified key range [start, end). If visitor returns done=true or an error, the visitation is halted.
func (PrefixConfigMap) VisitPrefixesHierarchically ¶
func (p PrefixConfigMap) VisitPrefixesHierarchically(key proto.Key, visitor func(start, end proto.Key, config interface{}) (bool, error)) error
VisitPrefixesHierarchically invokes the visitor function for each prefix matching the key argument, from longest matching prefix to shortest. If visitor returns done=true or an error, the visitation is halted.
type Range ¶
type Range struct { sync.RWMutex // Protects the following fields: // contains filtered or unexported fields }
A Range is a contiguous keyspace with writes managed via an instance of the Raft consensus algorithm. Many ranges may exist in a store and they are unlikely to be contiguous. Ranges are independent units and are responsible for maintaining their own integrity by replacing failed replicas, splitting and merging as appropriate.
func NewRange ¶
func NewRange(desc *proto.RangeDescriptor, rm rangeManager) (*Range, error)
NewRange initializes the range using the given metadata.
func (*Range) AddCmd ¶
AddCmd adds a command for execution on this range. The command's affected keys are verified to be contained within the range and the range's leadership is confirmed. The command is then dispatched either along the read-only execution path or the read-write Raft command queue. If wait is false, read-write commands are added to Raft without waiting for their completion.
func (*Range) AdminMerge ¶
func (r *Range) AdminMerge(args *proto.AdminMergeRequest, reply *proto.AdminMergeResponse)
AdminMerge extends the range to subsume the range that comes next in the key space. The range being subsumed is provided in args.SubsumedRange. The EndKey of the subsuming range must equal the start key of the range being subsumed. The merge is performed inside of a distributed transaction which writes the updated range descriptor for the subsuming range and deletes the range descriptor for the subsumed one. It also updates the range addressing metadata. The handover of responsibility for the reassigned key range is carried out seamlessly through a merge trigger carried out as part of the commit of that transaction. A merge requires that the two ranges are collocate on the same set of replicas.
func (*Range) AdminSplit ¶
func (r *Range) AdminSplit(args *proto.AdminSplitRequest, reply *proto.AdminSplitResponse)
AdminSplit divides the range into into two ranges, using either args.SplitKey (if provided) or an internally computed key that aims to roughly equipartition the range by size. The split is done inside of a distributed txn which writes updated and new range descriptors, and updates the range addressing metadata. The handover of responsibility for the reassigned key range is carried out seamlessly through a split trigger carried out as part of the commit of that transaction.
func (*Range) ApplySnapshot ¶
ApplySnapshot implements the multiraft.WriteableGroupStorage interface.
func (*Range) ChangeReplicas ¶
ChangeReplicas adds or removes a replica of a range. The change is performed in a distributed transaction and takes effect when that transaction is committed. When removing a replica, only the NodeID and StoreID fields of the Replica are used.
func (*Range) ConditionalPut ¶
func (r *Range) ConditionalPut(batch engine.Engine, ms *proto.MVCCStats, args *proto.ConditionalPutRequest, reply *proto.ConditionalPutResponse)
ConditionalPut sets the value for a specified key only if the expected value matches. If not, the return value contains the actual value.
func (*Range) Contains ¶
func (r *Range) Contains(batch engine.Engine, args *proto.ContainsRequest, reply *proto.ContainsResponse)
Contains verifies the existence of a key in the key value store.
func (*Range) ContainsKey ¶
ContainsKey returns whether this range contains the specified key.
func (*Range) ContainsKeyRange ¶
ContainsKeyRange returns whether this range contains the specified key range from start to end.
func (*Range) Delete ¶
func (r *Range) Delete(batch engine.Engine, ms *proto.MVCCStats, args *proto.DeleteRequest, reply *proto.DeleteResponse)
Delete deletes the key and value specified by key.
func (*Range) DeleteRange ¶
func (r *Range) DeleteRange(batch engine.Engine, ms *proto.MVCCStats, args *proto.DeleteRangeRequest, reply *proto.DeleteRangeResponse)
DeleteRange deletes the range of key/value pairs specified by start and end keys.
func (*Range) Desc ¶
func (r *Range) Desc() *proto.RangeDescriptor
Desc atomically returns the range's descriptor.
func (*Range) EndTransaction ¶
func (r *Range) EndTransaction(batch engine.Engine, ms *proto.MVCCStats, args *proto.EndTransactionRequest, reply *proto.EndTransactionResponse)
EndTransaction either commits or aborts (rolls back) an extant transaction according to the args.Commit parameter.
func (*Range) Entries ¶
Entries implements the raft.Storage interface. Note that maxBytes is advisory and this method will always return at least one entry even if it exceeds maxBytes. Passing maxBytes equal to zero disables size checking. TODO(bdarnell): consider caching for recent entries, if rocksdb's builtin caching is insufficient.
func (*Range) FirstIndex ¶
FirstIndex implements the raft.Storage interface.
func (*Range) Get ¶
func (r *Range) Get(batch engine.Engine, args *proto.GetRequest, reply *proto.GetResponse)
Get returns the value for a specified key.
func (*Range) GetGCMetadata ¶
func (r *Range) GetGCMetadata() (*proto.GCMetadata, error)
GetGCMetadata reads the latest GC metadata for this range.
func (*Range) GetLastVerificationTimestamp ¶
GetLastVerificationTimestamp reads the timestamp at which the range's data was last verified.
func (*Range) GetMVCCStats ¶
GetMVCCStats returns a copy of the MVCC stats object for this range.
func (*Range) GetMaxBytes ¶
GetMaxBytes atomically gets the range maximum byte limit.
func (*Range) GetReplica ¶
GetReplica returns the replica for this range from the range descriptor. A fatal error occurs if the replica is not found.
func (*Range) HasLeaderLease ¶
HasLeaderLease returns whether this replica holds or was the last holder of the leader lease, and whether the lease has expired. Leases may not overlap, and a gap between successive lease holders is expected.
func (*Range) Increment ¶
func (r *Range) Increment(batch engine.Engine, ms *proto.MVCCStats, args *proto.IncrementRequest, reply *proto.IncrementResponse)
Increment increments the value (interpreted as varint64 encoded) and returns the newly incremented value (encoded as varint64). If no value exists for the key, zero is incremented.
func (*Range) InitialState ¶
InitialState implements the raft.Storage interface.
func (*Range) InternalGC ¶
func (r *Range) InternalGC(batch engine.Engine, ms *proto.MVCCStats, args *proto.InternalGCRequest, reply *proto.InternalGCResponse)
InternalGC iterates through the list of keys to garbage collect specified in the arguments. MVCCGarbageCollect is invoked on each listed key along with the expiration timestamp. The GC metadata specified in the args is persisted after GC.
func (*Range) InternalHeartbeatTxn ¶
func (r *Range) InternalHeartbeatTxn(batch engine.Engine, ms *proto.MVCCStats, args *proto.InternalHeartbeatTxnRequest, reply *proto.InternalHeartbeatTxnResponse)
InternalHeartbeatTxn updates the transaction status and heartbeat timestamp after receiving transaction heartbeat messages from coordinator. Returns the updated transaction.
func (*Range) InternalLeaderLease ¶
func (r *Range) InternalLeaderLease(batch engine.Engine, ms *proto.MVCCStats, args *proto.InternalLeaderLeaseRequest, reply *proto.InternalLeaderLeaseResponse)
InternalLeaderLease sets the leader lease for this range. The command fails only if the desired start timestamp collides with a previous lease. Otherwise, the start timestamp is wound back to right after the expiration of the previous lease (or zero). After a lease has been set, calls to HasLeaderLease() will return true if this replica is the lease holder and the lease has not yet expired. If this range replica is already the lease holder, the expiration will be extended or shortened as indicated. For a new lease, all duties required of the range leader are commenced, including clearing the command queue and timestamp cache.
func (*Range) InternalMerge ¶
func (r *Range) InternalMerge(batch engine.Engine, ms *proto.MVCCStats, args *proto.InternalMergeRequest, reply *proto.InternalMergeResponse)
InternalMerge is used to merge a value into an existing key. Merge is an efficient accumulation operation which is exposed by RocksDB, used by Cockroach for the efficient accumulation of certain values. Due to the difficulty of making these operations transactional, merges are not currently exposed directly to clients. Merged values are explicitly not MVCC data.
func (*Range) InternalPushTxn ¶
func (r *Range) InternalPushTxn(batch engine.Engine, ms *proto.MVCCStats, args *proto.InternalPushTxnRequest, reply *proto.InternalPushTxnResponse)
InternalPushTxn resolves conflicts between concurrent txns (or between a non-transactional reader or writer and a txn) in several ways depending on the statuses and priorities of the conflicting transactions. The InternalPushTxn operation is invoked by a "pusher" (the writer trying to abort a conflicting txn or the reader trying to push a conflicting txn's commit timestamp forward), who attempts to resolve a conflict with a "pushee" (args.PushTxn -- the pushee txn whose intent(s) caused the conflict).
Txn already committed/aborted: If pushee txn is committed or aborted return success.
Txn Timeout: If pushee txn entry isn't present or its LastHeartbeat timestamp isn't set, use PushTxn.Timestamp as LastHeartbeat. If current time - LastHeartbeat > 2 * DefaultHeartbeatInterval, then the pushee txn should be either pushed forward, aborted, or confirmed not pending, depending on value of Request.PushType.
Old Txn Epoch: If persisted pushee txn entry has a newer Epoch than PushTxn.Epoch, return success, as older epoch may be removed.
Lower Txn Priority: If pushee txn has a lower priority than pusher, adjust pushee's persisted txn depending on value of args.PushType. If args.PushType is ABORT_TXN, set txn.Status to ABORTED, and priority to one less than the pusher's priority and return success. If args.PushType is PUSH_TIMESTAMP, set txn.Timestamp to pusher's Timestamp + 1 (note that we use the pusher's Args.Timestamp, not Txn.Timestamp because the args timestamp can advance during the txn).
Higher Txn Priority: If pushee txn has a higher priority than pusher, return TransactionPushError. Transaction will be retried with priority one less than the pushee's higher priority.
func (*Range) InternalRangeLookup ¶
func (r *Range) InternalRangeLookup(batch engine.Engine, args *proto.InternalRangeLookupRequest, reply *proto.InternalRangeLookupResponse)
InternalRangeLookup is used to look up RangeDescriptors - a RangeDescriptor is a metadata structure which describes the key range and replica locations of a distinct range in the cluster.
RangeDescriptors are stored as values in the cockroach cluster's key-value store. However, they are always stored using special "Range Metadata keys", which are "ordinary" keys with a special prefix prepended. The Range Metadata Key for an ordinary key can be generated with the `engine.RangeMetaKey(key)` function. The RangeDescriptor for the range which contains a given key can be retrieved by generating its Range Metadata Key and dispatching it to InternalRangeLookup.
Note that the Range Metadata Key sent to InternalRangeLookup is NOT the key at which the desired RangeDescriptor is stored. Instead, this method returns the RangeDescriptor stored at the _lowest_ existing key which is _greater_ than the given key. The returned RangeDescriptor will thus contain the ordinary key which was originally used to generate the Range Metadata Key sent to InternalRangeLookup.
The "Range Metadata Key" for a range is built by appending the end key of the range to the meta[12] prefix because the RocksDB iterator only supports a Seek() interface which acts as a Ceil(). Using the start key of the range would cause Seek() to find the key after the meta indexing record we're looking for, which would result in having to back the iterator up, an option which is both less efficient and not available in all cases.
This method has an important optimization: instead of just returning the request RangeDescriptor, it also returns a slice of additional range descriptors immediately consecutive to the desired RangeDescriptor. This is intended to serve as a sort of caching pre-fetch, so that the requesting nodes can aggressively cache RangeDescriptors which are likely to be desired by their current workload.
func (*Range) InternalResolveIntent ¶
func (r *Range) InternalResolveIntent(batch engine.Engine, ms *proto.MVCCStats, args *proto.InternalResolveIntentRequest, reply *proto.InternalResolveIntentResponse)
InternalResolveIntent updates the transaction status and heartbeat timestamp after receiving transaction heartbeat messages from coordinator. The range will return the current status for this transaction to the coordinator.
func (*Range) InternalTruncateLog ¶
func (r *Range) InternalTruncateLog(batch engine.Engine, ms *proto.MVCCStats, args *proto.InternalTruncateLogRequest, reply *proto.InternalTruncateLogResponse)
InternalTruncateLog discards a prefix of the raft log.
func (*Range) IsFirstRange ¶
IsFirstRange returns true if this is the first range.
func (*Range) Put ¶
func (r *Range) Put(batch engine.Engine, ms *proto.MVCCStats, args *proto.PutRequest, reply *proto.PutResponse)
Put sets the value for a specified key.
func (*Range) Scan ¶
func (r *Range) Scan(batch engine.Engine, args *proto.ScanRequest, reply *proto.ScanResponse)
Scan scans the key range specified by start key through end key up to some maximum number of results. The last key of the iteration is returned with the reply.
func (*Range) SetDesc ¶
func (r *Range) SetDesc(desc *proto.RangeDescriptor)
SetDesc atomically sets the range's descriptor. This method should be called in the context of having metaLock held, as is the case for merging, splitting and updating the replica set.
func (*Range) SetHardState ¶
SetHardState implements the multiraft.WriteableGroupStorage interface.
func (*Range) SetLastVerificationTimestamp ¶
SetLastVerificationTimestamp writes the timestamp at which the range's data was last verified.
func (*Range) SetMaxBytes ¶
SetMaxBytes atomically sets the maximum byte limit before split. This value is cached by the range for efficiency.
func (*Range) WaitForLeaderLease ¶
WaitForLeaderLease is used from unittests to wait until this range has the leader lease.
type RangeResult ¶
type RangeResult struct {
// contains filtered or unexported fields
}
RangeResult is returned by SplitRangeByPrefixes.
type RangeSlice ¶
type RangeSlice []*Range
A RangeSlice is a slice of Range pointers used for replica lookups by key.
func (RangeSlice) Len ¶
func (rs RangeSlice) Len() int
Implementation of sort.Interface which sorts by StartKey from each range's descriptor.
func (RangeSlice) Less ¶
func (rs RangeSlice) Less(i, j int) bool
func (RangeSlice) Swap ¶
func (rs RangeSlice) Swap(i, j int)
type RemoveRangeEvent ¶
type RemoveRangeEvent struct { StoreID proto.StoreID Desc *proto.RangeDescriptor Stats proto.MVCCStats }
RemoveRangeEvent occurs whenever a Range is removed from a store. This structure includes the Range's RangeDescriptor and the Range's previous MVCCStats before it was removed.
type ResponseCache ¶
A ResponseCache provides idempotence for request retries. Each request to a range specifies a ClientCmdID in the request header which uniquely identifies a client command. After commands have been replicated via Raft, they are executed against the state machine and the results are stored in the ResponseCache.
The ResponseCache stores responses in the underlying engine, using keys derived from the Raft ID and the ClientCmdID.
A ResponseCache is safe for concurrent access.
func NewResponseCache ¶
func NewResponseCache(raftID int64, engine engine.Engine) *ResponseCache
NewResponseCache returns a new response cache. Every range replica maintains a response cache, not just the leader. However, when a replica loses or gains leadership of the Raft consensus group, the inflight map should be cleared.
func (*ResponseCache) ClearData ¶
func (rc *ResponseCache) ClearData() error
ClearData removes all items stored in the persistent cache. It does not alter the inflight map.
func (*ResponseCache) CopyFrom ¶
func (rc *ResponseCache) CopyFrom(e engine.Engine, originRaftID int64) error
CopyFrom copies all the cached results from another response cache into this one. Note that the cache will not be locked while copying is in progress. Failures decoding individual cache entries return an error. The copy is done directly using the engine instead of interpreting values through MVCC for efficiency.
func (*ResponseCache) CopyInto ¶
func (rc *ResponseCache) CopyInto(e engine.Engine, destRaftID int64) error
CopyInto copies all the cached results from one response cache into another. The cache will be locked while copying is in progress; failures decoding individual cache entries return an error. The copy is done directly using the engine instead of interpreting values through MVCC for efficiency.
func (*ResponseCache) GetResponse ¶
func (rc *ResponseCache) GetResponse(cmdID proto.ClientCmdID, reply proto.Response) (bool, error)
GetResponse looks up a response matching the specified cmdID and returns true if found. The response is deserialized into the supplied reply parameter. If no response is found, returns false. If a command is pending already for the cmdID, then this method will block until the the command is completed or the response cache is cleared.
func (*ResponseCache) PutResponse ¶
func (rc *ResponseCache) PutResponse(cmdID proto.ClientCmdID, reply proto.Response) error
PutResponse writes a response to the cache for the specified cmdID.
type SplitRangeEvent ¶
type SplitRangeEvent struct { StoreID proto.StoreID Original UpdateRangeEvent New AddRangeEvent }
SplitRangeEvent occurs whenever a range is split in two. This Event actually contains two other events: an UpdateRangeEvent for the Range which originally existed, and a AddRangeEvent for the range that was created via the split.
type StartStoreEvent ¶
StartStoreEvent occurs whenever a store is initially started.
type Store ¶
type Store struct { *StoreFinder Ident proto.StoreIdent // contains filtered or unexported fields }
A Store maintains a map of ranges by start key. A Store corresponds to one physical device.
func NewStore ¶
func NewStore(ctx StoreContext, eng engine.Engine, nodeDesc *proto.NodeDescriptor) *Store
NewStore returns a new instance of a store.
func (*Store) AddRange ¶
AddRange adds the range to the store's range map and to the sorted rangesByKey slice.
func (*Store) AppliedIndex ¶
AppliedIndex implements the multiraft.StateMachine interface.
func (*Store) Attrs ¶
func (s *Store) Attrs() proto.Attributes
Attrs returns the attributes of the underlying store.
func (*Store) Bootstrap ¶
Bootstrap writes a new store ident to the underlying engine. To ensure that no crufty data already exists in the engine, it scans the engine contents before writing the new store ident. The engine should be completely empty. It returns an error if called on a non-empty engine.
func (*Store) BootstrapRange ¶
BootstrapRange creates the first range in the cluster and manually writes it to the store. Default range addressing records are created for meta1 and meta2. Default configurations for accounting, permissions, and zones are created. All configs are specified for the empty key prefix, meaning they apply to the entire database. Permissions are granted to all users and the zone requires three replicas with no other specifications. It also adds the range tree and the root node, the first range, to it.
func (*Store) Capacity ¶
func (s *Store) Capacity() (proto.StoreCapacity, error)
Capacity returns the capacity of the underlying storage engine.
func (*Store) Descriptor ¶
func (s *Store) Descriptor() (*proto.StoreDescriptor, error)
Descriptor returns a StoreDescriptor including current store capacity information.
func (*Store) ExecuteCmd ¶
ExecuteCmd fetches a range based on the header's replica, assembles method, args & reply into a Raft Cmd struct and executes the command using the fetched range.
func (*Store) ForceRangeGCScan ¶
ForceRangeGCScan iterates over all ranges and enqueues any that may need to be GC'd. Exposed only for testing.
func (*Store) ForceReplicationScan ¶
ForceReplicationScan iterates over all ranges and enqueues any that need to be replicated. Exposed only for testing.
func (*Store) GetRange ¶
GetRange fetches a range by Raft ID. Returns an error if no range is found.
func (*Store) GetStatus ¶
func (s *Store) GetStatus() (*proto.StoreStatus, error)
GetStatus fetches the latest store status from the stored value on the cluster. Returns nil if the scanner has not yet run. The scanner runs once every ctx.ScanInterval.
func (*Store) GossipCapacity ¶
func (s *Store) GossipCapacity()
GossipCapacity broadcasts the node's capacity on the gossip network.
func (*Store) GroupStorage ¶
func (s *Store) GroupStorage(groupID uint64) multiraft.WriteableGroupStorage
GroupStorage implements the multiraft.Storage interface.
func (*Store) LookupRange ¶
LookupRange looks up a range via binary search over the sorted "rangesByKey" RangeSlice. Returns nil if no range is found for specified key range. Note that the specified keys are transformed using Key.Address() to ensure we lookup ranges correctly for local keys. When end is nill, a range that contains start is looked up.
func (*Store) MergeRange ¶
func (s *Store) MergeRange(subsumingRng *Range, updatedEndKey proto.Key, subsumedRaftID int64) error
MergeRange expands the subsuming range to absorb the subsumed range. This merge operation will fail if the two ranges are not collocated on the same store.
func (*Store) NewRangeDescriptor ¶
func (s *Store) NewRangeDescriptor(start, end proto.Key, replicas []proto.Replica) (*proto.RangeDescriptor, error)
NewRangeDescriptor creates a new descriptor based on start and end keys and the supplied proto.Replicas slice. It allocates new Raft and range IDs to fill out the supplied replicas.
func (*Store) NewSnapshot ¶
NewSnapshot creates a new snapshot engine.
func (*Store) ProposeRaftCommand ¶
func (s *Store) ProposeRaftCommand(idKey cmdIDKey, cmd proto.InternalRaftCommand) <-chan error
ProposeRaftCommand submits a command to raft. The command is processed asynchronously and an error or nil will be written to the returned channel when it is committed or aborted (but note that committed does mean that it has been applied to the range yet).
func (*Store) RemoveRange ¶
RemoveRange removes the range from the store's range map and from the sorted rangesByKey slice.
func (*Store) SetRangeRetryOptions ¶
func (s *Store) SetRangeRetryOptions(ro util.RetryOptions)
SetRangeRetryOptions sets the retry options used for this store.
func (*Store) SplitRange ¶
SplitRange shortens the original range to accommodate the new range. The new range is added to the ranges map and the rangesByKey sorted slice.
func (*Store) WaitForInit ¶
func (s *Store) WaitForInit()
WaitForInit waits for any asynchronous processes begun in Start() to complete their initialization. In particular, this includes gossiping. In some cases this may block until the range GC queue has completed its scan. Only for testing.
func (*Store) WaitForRangeScanCompletion ¶
WaitForRangeScanCompletion waits until the next range scan is complete and returns the total number of scans completed so far. This is exposed for use in unit tests.
type StoreContext ¶
type StoreContext struct { Clock *hlc.Clock DB *client.KV Gossip *gossip.Gossip Transport multiraft.Transport Context context.Context // RangeRetryOptions are the retry options for ranges. // TODO(tschottdorf) improve comment once I figure out what this is. RangeRetryOptions util.RetryOptions // RaftTickInterval is the resolution of the Raft timer; other raft timeouts // are defined in terms of multiples of this value. RaftTickInterval time.Duration // RaftHeartbeatIntervalTicks is the number of ticks that pass between heartbeats. RaftHeartbeatIntervalTicks int // RaftElectionTimeoutTicks is the number of ticks that must pass before a follower // considers a leader to have failed and calls a new election. Should be significantly // higher than RaftHeartbeatIntervalTicks. The raft paper recommends a value of 150ms // for local networks. RaftElectionTimeoutTicks int // ScanInterval is the default value for the scan interval ScanInterval time.Duration // EventFeed is a feed to which this store will publish events. EventFeed *util.Feed }
A StoreContext encompasses the auxiliary objects and configuration required to create a store. All fields holding a pointer or an interface are required to create a store; the rest will have sane defaults set if omitted.
func (*StoreContext) Valid ¶
func (sc *StoreContext) Valid() bool
Valid returns true if the StoreContext is populated correctly. We don't check for Gossip and DB since some of our tests pass that as nil.
type StoreEventFeed ¶
type StoreEventFeed struct {
// contains filtered or unexported fields
}
StoreEventFeed is a helper structure which publishes store-specific events to a util.Feed. The target feed may be shared by multiple StoreEventFeeds. If the target feed is nil, event methods become no-ops.
func NewStoreEventFeed ¶
func NewStoreEventFeed(id proto.StoreID, feed *util.Feed) StoreEventFeed
NewStoreEventFeed creates a new StoreEventFeed which publishes events for a specific store to the supplied feed.
type StoreEventListener ¶
type StoreEventListener interface { OnAddRange(event *AddRangeEvent) OnUpdateRange(event *UpdateRangeEvent) OnRemoveRange(event *RemoveRangeEvent) OnSplitRange(event *SplitRangeEvent) OnMergeRange(event *MergeRangeEvent) OnStartStore(event *StartStoreEvent) OnBeginScanRanges(event *BeginScanRangesEvent) OnEndScanRanges(event *EndScanRangesEvent) }
StoreEventListener is an interface that can be implemented by objects which listen for events published by stores.
type StoreFinder ¶
type StoreFinder struct {
// contains filtered or unexported fields
}
StoreFinder provides the data necessary to find stores with particular attributes.
func (*StoreFinder) WaitForNodes ¶
func (sf *StoreFinder) WaitForNodes(n int)
WaitForNodes blocks until at least the given number of nodes are present in the capacity map. Used for tests.
type TimestampCache ¶
type TimestampCache struct {
// contains filtered or unexported fields
}
A TimestampCache maintains an interval tree FIFO cache of keys or key ranges and the timestamps at which they were most recently read or written. If a timestamp was read or written by a transaction, the txn ID is stored with the timestamp to avoid advancing timestamps on successive requests from the same transaction.
The cache also maintains a low-water mark which is the most recently evicted entry's timestamp. This value always ratchets with monotonic increases. The low water mark is initialized to the current system time plus the maximum clock offset.
func NewTimestampCache ¶
func NewTimestampCache(clock *hlc.Clock) *TimestampCache
NewTimestampCache returns a new timestamp cache with supplied hybrid clock.
func (*TimestampCache) Add ¶
func (tc *TimestampCache) Add(start, end proto.Key, timestamp proto.Timestamp, txnID []byte, readOnly bool)
Add the specified timestamp to the cache as covering the range of keys from start to end. If end is nil, the range covers the start key only. txnID is nil for no transaction. readOnly specifies whether the command adding this timestamp was read-only or not.
func (*TimestampCache) Clear ¶
func (tc *TimestampCache) Clear(clock *hlc.Clock)
Clear clears the cache and resets the low water mark to the current time plus the maximum clock offset.
func (*TimestampCache) GetMax ¶
func (tc *TimestampCache) GetMax(start, end proto.Key, txnID []byte) (proto.Timestamp, proto.Timestamp)
GetMax returns the maximum read and write timestamps which overlap the interval spanning from start to end. Cached timestamps matching the specified txnID are not considered. If no part of the specified range is overlapped by timestamps in the cache, the low water timestamp is returned for both read and write timestamps.
The txn ID prevents restarts with a pattern like: read("a"), write("a"). The read adds a timestamp for "a". Then the write (for the same transaction) would get that as the max timestamp and be forced to increment it. This allows timestamps from the same txn to be ignored.
func (*TimestampCache) MergeInto ¶
func (tc *TimestampCache) MergeInto(dest *TimestampCache, clear bool)
MergeInto merges all entries from this timestamp cache into the dest timestamp cache. The clear parameter, if true, copies the values of lowWater and latest and clears the destination cache before merging in the source.
func (*TimestampCache) SetLowWater ¶
func (tc *TimestampCache) SetLowWater(lowWater proto.Timestamp)
SetLowWater sets the cache's low water mark, which is the minimum value the cache will return from calls to GetMax().
type UpdateRangeEvent ¶
type UpdateRangeEvent struct { StoreID proto.StoreID Desc *proto.RangeDescriptor Stats proto.MVCCStats Method proto.Method Delta proto.MVCCStats }
UpdateRangeEvent occurs whenever a Range is modified. This structure includes the same information as AddRangeEvent, but also includes a second set of MVCCStats containing the delta from the Range's previous stats. If the update did not modify any statistics, this delta may be nil.
Source Files ¶
- addressing.go
- allocator.go
- command_queue.go
- doc.go
- feed.go
- gc_queue.go
- id_alloc.go
- prefix.go
- queue.go
- range.go
- range_command.go
- range_data_iter.go
- range_gc_queue.go
- range_raftstorage.go
- range_tree.go
- replicate_queue.go
- response_cache.go
- scanner.go
- split_queue.go
- stats.go
- store.go
- store_finder.go
- timestamp_cache.go
- verify_queue.go