Documentation ¶
Overview ¶
Package kv provides a key-value API to an underlying cockroach datastore. Cockroach itself provides a single, monolithic, sorted key value map, distributed over multiple nodes. Each node holds a set of key ranges. Package kv translates between the monolithic, logical map which Cockroach clients experience to the physically distributed key ranges which comprise the whole.
Package kv implements the logic necessary to locate appropriate nodes based on keys being read or written. In some cases, requests may span a range of keys, in which case multiple RPCs may be sent out.
Index ¶
- func InitSenderForLocalTestCluster(nodeDesc *roachpb.NodeDescriptor, tracer opentracing.Tracer, clock *hlc.Clock, ...) client.Sender
- type BatchCall
- type DBServer
- type DistSender
- func (ds *DistSender) CountRanges(ctx context.Context, rs roachpb.RSpan) (int64, error)
- func (ds *DistSender) FirstRange() (*roachpb.RangeDescriptor, error)
- func (ds *DistSender) GetParallelSendCount() int32
- func (ds *DistSender) Metrics() DistSenderMetrics
- func (ds *DistSender) RangeLookup(ctx context.Context, key roachpb.RKey, desc *roachpb.RangeDescriptor, ...) ([]roachpb.RangeDescriptor, []roachpb.RangeDescriptor, *roachpb.Error)
- func (ds *DistSender) Send(ctx context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error)
- type DistSenderConfig
- type DistSenderMetrics
- type EvictionToken
- type RangeDescriptorDB
- type RangeIterator
- func (ri *RangeIterator) Desc() *roachpb.RangeDescriptor
- func (ri *RangeIterator) Error() *roachpb.Error
- func (ri *RangeIterator) Key() roachpb.RKey
- func (ri *RangeIterator) LeaseHolder(ctx context.Context) (roachpb.ReplicaDescriptor, bool)
- func (ri *RangeIterator) NeedAnother(rs roachpb.RSpan) bool
- func (ri *RangeIterator) Next(ctx context.Context)
- func (ri *RangeIterator) Seek(ctx context.Context, key roachpb.RKey, scanDir ScanDirection)
- func (ri *RangeIterator) Token() *EvictionToken
- func (ri *RangeIterator) Valid() bool
- type ReplicaInfo
- type ReplicaSlice
- func (rs ReplicaSlice) FindReplica(storeID roachpb.StoreID) int
- func (rs ReplicaSlice) FindReplicaByNodeID(nodeID roachpb.NodeID) int
- func (rs ReplicaSlice) Len() int
- func (rs ReplicaSlice) MoveToFront(i int)
- func (rs ReplicaSlice) OptimizeReplicaOrder(nodeDesc *roachpb.NodeDescriptor)
- func (rs ReplicaSlice) SortByCommonAttributePrefix(attrs []string) int
- func (rs ReplicaSlice) Swap(i, j int)
- type ScanDirection
- type SendOptions
- type Transport
- type TransportFactory
- type TxnCoordSender
- type TxnMetrics
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func InitSenderForLocalTestCluster ¶
func InitSenderForLocalTestCluster( nodeDesc *roachpb.NodeDescriptor, tracer opentracing.Tracer, clock *hlc.Clock, latency time.Duration, stores client.Sender, stopper *stop.Stopper, gossip *gossip.Gossip, ) client.Sender
InitSenderForLocalTestCluster initializes a TxnCoordSender that can be used with LocalTestCluster.
Types ¶
type BatchCall ¶
type BatchCall struct { Reply *roachpb.BatchResponse Err error }
BatchCall contains a response and an RPC error (note that the response contains its own roachpb.Error, which is separate from BatchCall.Err), and is analogous to the net/rpc.Call struct.
type DBServer ¶
type DBServer struct {
// contains filtered or unexported fields
}
A DBServer provides an HTTP server endpoint serving the key-value API. It accepts either JSON or serialized protobuf content types.
func NewDBServer ¶
NewDBServer allocates and returns a new DBServer.
func (*DBServer) Batch ¶
func (s *DBServer) Batch( ctx context.Context, args *roachpb.BatchRequest, ) (br *roachpb.BatchResponse, err error)
Batch implements the roachpb.KVServer interface.
type DistSender ¶
type DistSender struct { log.AmbientContext // contains filtered or unexported fields }
A DistSender provides methods to access Cockroach's monolithic, distributed key value store. Each method invocation triggers a lookup or lookups to find replica metadata for implicated key ranges. RPCs are sent to one or more of the replicas to satisfy the method invocation.
func NewDistSender ¶
func NewDistSender(cfg DistSenderConfig, g *gossip.Gossip) *DistSender
NewDistSender returns a batch.Sender instance which connects to the Cockroach cluster via the supplied gossip instance. Supplying a DistSenderContext or the fields within is optional. For omitted values, sane defaults will be used.
func (*DistSender) CountRanges ¶
CountRanges returns the number of ranges that encompass the given key span.
func (*DistSender) FirstRange ¶
func (ds *DistSender) FirstRange() (*roachpb.RangeDescriptor, error)
FirstRange implements the RangeDescriptorDB interface. FirstRange returns the RangeDescriptor for the first range on the cluster, which is retrieved from the gossip protocol instead of the datastore.
func (*DistSender) GetParallelSendCount ¶
func (ds *DistSender) GetParallelSendCount() int32
GetParallelSendCount returns the number of parallel batch requests the dist sender has dispatched in its lifetime.
func (*DistSender) Metrics ¶
func (ds *DistSender) Metrics() DistSenderMetrics
Metrics returns a struct which contains metrics related to the distributed sender's activity.
func (*DistSender) RangeLookup ¶
func (ds *DistSender) RangeLookup( ctx context.Context, key roachpb.RKey, desc *roachpb.RangeDescriptor, useReverseScan bool, ) ([]roachpb.RangeDescriptor, []roachpb.RangeDescriptor, *roachpb.Error)
RangeLookup implements the RangeDescriptorDB interface. RangeLookup dispatches a RangeLookup request for the given metadata key to the replicas of the given range. Note that we allow inconsistent reads when doing range lookups for efficiency. Getting stale data is not a correctness problem but instead may infrequently result in additional latency as additional range lookups may be required. Note also that rangeLookup bypasses the DistSender's Send() method, so there is no error inspection and retry logic here; this is not an issue since the lookup performs a single inconsistent read only.
func (*DistSender) Send ¶
func (ds *DistSender) Send( ctx context.Context, ba roachpb.BatchRequest, ) (*roachpb.BatchResponse, *roachpb.Error)
Send implements the batch.Sender interface. It subdivides the Batch into batches admissible for sending (preventing certain illegal mixtures of requests), executes each individual part (which may span multiple ranges), and recombines the response.
When the request spans ranges, it is split by range and a partial subset of the batch request is sent to affected ranges in parallel.
The first write in a transaction may not arrive before writes to other ranges. This is relevant in the case of a BeginTransaction request. Intents written to other ranges before the transaction record is created will cause the transaction to abort early.
type DistSenderConfig ¶
type DistSenderConfig struct { AmbientCtx log.AmbientContext Clock *hlc.Clock RangeDescriptorCacheSize int32 // RangeLookupMaxRanges sets how many ranges will be prefetched into the // range descriptor cache when dispatching a range lookup request. RangeLookupMaxRanges int32 LeaseHolderCacheSize int32 RPCRetryOptions *retry.Options // The RPC dispatcher. Defaults to grpc but can be changed here for testing // purposes. TransportFactory TransportFactory RPCContext *rpc.Context RangeDescriptorDB RangeDescriptorDB SendNextTimeout time.Duration // SenderConcurrency specifies the parallelization available when // splitting batches into multiple requests when they span ranges. // TODO(spencer): This is per-process. We should add a per-batch limit. SenderConcurrency int32 // contains filtered or unexported fields }
DistSenderConfig holds configuration and auxiliary objects that can be passed to NewDistSender.
type DistSenderMetrics ¶
type DistSenderMetrics struct { BatchCount *metric.Counter PartialBatchCount *metric.Counter SentCount *metric.Counter LocalSentCount *metric.Counter SendNextTimeoutCount *metric.Counter NextReplicaErrCount *metric.Counter NotLeaseHolderErrCount *metric.Counter SlowRequestsCount *metric.Gauge }
DistSenderMetrics is the set of metrics for a given distributed sender.
type EvictionToken ¶
type EvictionToken struct {
// contains filtered or unexported fields
}
EvictionToken holds eviction state between calls to LookupRangeDescriptor.
func (*EvictionToken) Evict ¶
func (et *EvictionToken) Evict(ctx context.Context) error
Evict instructs the EvictionToken to evict the RangeDescriptor it was created with from the rangeDescriptorCache.
func (*EvictionToken) EvictAndReplace ¶
func (et *EvictionToken) EvictAndReplace( ctx context.Context, newDescs ...roachpb.RangeDescriptor, ) error
EvictAndReplace instructs the EvictionToken to evict the RangeDescriptor it was created with from the rangeDescriptorCache. It also allows the user to provide new RangeDescriptors to insert into the cache, all atomically. When called without arguments, EvictAndReplace will behave the same as Evict.
type RangeDescriptorDB ¶
type RangeDescriptorDB interface { // RangeLookup takes a meta key to look up descriptors for and a descriptor // of the range believed to hold the meta key. Two slices of range // descriptors are returned. The first of these slices holds descriptors // which contain the given key (possibly from intents), and the second holds // prefetched adjacent descriptors. RangeLookup( ctx context.Context, key roachpb.RKey, desc *roachpb.RangeDescriptor, useReverseScan bool, ) ([]roachpb.RangeDescriptor, []roachpb.RangeDescriptor, *roachpb.Error) // FirstRange returns the descriptor for the first Range. This is the // Range containing all meta1 entries. FirstRange() (*roachpb.RangeDescriptor, error) }
RangeDescriptorDB is a type which can query range descriptors from an underlying datastore. This interface is used by rangeDescriptorCache to initially retrieve information which will be cached.
type RangeIterator ¶
type RangeIterator struct {
// contains filtered or unexported fields
}
A RangeIterator provides a mechanism for iterating over all ranges in a key span. A new RangeIterator must be positioned with Seek() to begin iteration.
RangeIterator is not thread-safe.
func NewRangeIterator ¶
func NewRangeIterator(ds *DistSender) *RangeIterator
NewRangeIterator creates a new RangeIterator.
func (*RangeIterator) Desc ¶
func (ri *RangeIterator) Desc() *roachpb.RangeDescriptor
Desc returns the descriptor of the range at which the iterator is currently positioned. The iterator must be valid.
func (*RangeIterator) Error ¶
func (ri *RangeIterator) Error() *roachpb.Error
Error returns the error the iterator encountered, if any. If the iterator has not been initialized, returns iterator error.
func (*RangeIterator) Key ¶
func (ri *RangeIterator) Key() roachpb.RKey
Key returns the current key. The iterator must be valid.
func (*RangeIterator) LeaseHolder ¶
func (ri *RangeIterator) LeaseHolder(ctx context.Context) (roachpb.ReplicaDescriptor, bool)
LeaseHolder returns the lease holder of the iterator's current range, if that information is present in the DistSender's LeaseHolderCache. The second return val is true if the descriptor has been found. The iterator must be valid.
func (*RangeIterator) NeedAnother ¶
func (ri *RangeIterator) NeedAnother(rs roachpb.RSpan) bool
NeedAnother checks whether the iteration needs to continue to cover the remainder of the ranges described by the supplied key span. The iterator must be valid.
func (*RangeIterator) Next ¶
func (ri *RangeIterator) Next(ctx context.Context)
Next advances the iterator to the next range. The direction of advance is dependent on whether the iterator is reversed. The iterator must be valid.
func (*RangeIterator) Seek ¶
func (ri *RangeIterator) Seek(ctx context.Context, key roachpb.RKey, scanDir ScanDirection)
Seek positions the iterator at the specified key.
func (*RangeIterator) Token ¶
func (ri *RangeIterator) Token() *EvictionToken
Token returns the eviction token corresponding to the range descriptor for the current iteration. The iterator must be valid.
func (*RangeIterator) Valid ¶
func (ri *RangeIterator) Valid() bool
Valid returns whether the iterator is valid. To be valid, the iterator must be have been seeked to an initial position using Seek(), and must not have encountered an error.
type ReplicaInfo ¶
type ReplicaInfo struct { roachpb.ReplicaDescriptor NodeDesc *roachpb.NodeDescriptor }
ReplicaInfo extends the Replica structure with the associated node descriptor.
type ReplicaSlice ¶
type ReplicaSlice []ReplicaInfo
A ReplicaSlice is a slice of ReplicaInfo.
func NewReplicaSlice ¶
func NewReplicaSlice(gossip *gossip.Gossip, desc *roachpb.RangeDescriptor) ReplicaSlice
NewReplicaSlice creates a ReplicaSlice from the replicas listed in the range descriptor and using gossip to lookup node descriptors. Replicas on nodes that are not gossiped are omitted from the result.
func (ReplicaSlice) FindReplica ¶
func (rs ReplicaSlice) FindReplica(storeID roachpb.StoreID) int
FindReplica returns the index of the replica which matches the specified store ID. If no replica matches, -1 is returned.
func (ReplicaSlice) FindReplicaByNodeID ¶
func (rs ReplicaSlice) FindReplicaByNodeID(nodeID roachpb.NodeID) int
FindReplicaByNodeID returns the index of the replica which matches the specified node ID. If no replica matches, -1 is returned.
func (ReplicaSlice) Len ¶
func (rs ReplicaSlice) Len() int
Len returns the total number of replicas in the slice.
func (ReplicaSlice) MoveToFront ¶
func (rs ReplicaSlice) MoveToFront(i int)
MoveToFront moves the replica at the given index to the front of the slice, keeping the order of the remaining elements stable. The function will panic when invoked with an invalid index.
func (ReplicaSlice) OptimizeReplicaOrder ¶
func (rs ReplicaSlice) OptimizeReplicaOrder(nodeDesc *roachpb.NodeDescriptor)
OptimizeReplicaOrder sorts the replicas in the order in which they're to be used for sending RPCs (meaning in the order in which they'll be probed for the lease). "Closer" (matching in more attributes) replicas are ordered first. If the current node is a replica, then it'll be the first one.
nodeDesc is the descriptor of the current node. It can be nil, in which case information about the current descriptor is not used in optimizing the order.
Note that this method is not concerned with any information the node might have about who the lease holder might be. If there is such info (e.g. in a LeaseHolderCache), the caller will probably want to further tweak the head of the ReplicaSlice.
func (ReplicaSlice) SortByCommonAttributePrefix ¶
func (rs ReplicaSlice) SortByCommonAttributePrefix(attrs []string) int
SortByCommonAttributePrefix rearranges the ReplicaSlice by comparing the attributes to the given reference attributes. The basis for the comparison is that of the common prefix of replica attributes (i.e. the number of equal attributes, starting at the first), with a longer prefix sorting first. The number of attributes successfully matched to at least one replica is returned (hence, if the return value equals the length of the ReplicaSlice, at least one replica matched all attributes).
func (ReplicaSlice) Swap ¶
func (rs ReplicaSlice) Swap(i, j int)
Swap swaps the replicas with indexes i and j.
type ScanDirection ¶
type ScanDirection byte
ScanDirection determines the semantics of RangeIterator.Next() and RangeIterator.NeedAnother().
const ( // Ascending means Next() will advance towards keys that compare higher. Ascending ScanDirection = iota // Descending means Next() will advance towards keys that compare lower. Descending )
type SendOptions ¶
type SendOptions struct { // SendNextTimeout is the duration after which RPCs are sent to // other replicas in a set. SendNextTimeout time.Duration // contains filtered or unexported fields }
A SendOptions structure describes the algorithm for sending RPCs to one or more replicas, depending on error conditions and how many successful responses are required.
type Transport ¶
type Transport interface { // IsExhausted returns true if there are no more replicas to try. IsExhausted() bool // SendNext sends the rpc (captured at creation time) to the next // replica. May panic if the transport is exhausted. Should not // block; the transport is responsible for starting other goroutines // as needed. SendNext(context.Context, chan<- BatchCall) // MoveToFront locates the specified replica and moves it to the // front of the ordering of replicas to try. If the replica has // already been tried, it will be retried. If the specified replica // can't be found, this is a noop. MoveToFront(roachpb.ReplicaDescriptor) // Close is called when the transport is no longer needed. It may // cancel any pending RPCs without writing any response to the channel. Close() }
Transport objects can send RPCs to one or more replicas of a range. All calls to Transport methods are made from a single thread, so Transports are not required to be thread-safe.
type TransportFactory ¶
type TransportFactory func( SendOptions, *rpc.Context, ReplicaSlice, roachpb.BatchRequest, ) (Transport, error)
TransportFactory encapsulates all interaction with the RPC subsystem, allowing it to be mocked out for testing. The factory function returns a Transport object which is used to send the given arguments to one or more replicas in the slice.
In addition to actually sending RPCs, the transport is responsible for ordering replicas in accordance with SendOptions.Ordering and transport-specific knowledge such as connection health or latency.
TODO(bdarnell): clean up this crufty interface; it was extracted verbatim from the non-abstracted code.
func SenderTransportFactory ¶
func SenderTransportFactory(tracer opentracing.Tracer, sender client.Sender) TransportFactory
SenderTransportFactory wraps a client.Sender for use as a KV Transport. This is useful for tests that want to use DistSender without a full RPC stack.
type TxnCoordSender ¶
type TxnCoordSender struct { log.AmbientContext // contains filtered or unexported fields }
A TxnCoordSender is an implementation of client.Sender which wraps a lower-level Sender (either a storage.Stores or a DistSender) to which it sends commands. It acts as a man-in-the-middle, coordinating transaction state for clients. After a transaction is started, the TxnCoordSender starts asynchronously sending heartbeat messages to that transaction's txn record, to keep it live. It also keeps track of each written key or key range over the course of the transaction. When the transaction is committed or aborted, it clears accumulated write intents for the transaction.
func NewTxnCoordSender ¶
func NewTxnCoordSender( ambient log.AmbientContext, wrapped client.Sender, clock *hlc.Clock, linearizable bool, stopper *stop.Stopper, txnMetrics TxnMetrics, ) *TxnCoordSender
NewTxnCoordSender creates a new TxnCoordSender for use from a KV distributed DB instance. ctx is the base context and is used for logs and traces when there isn't a more specific context available; it must have a Tracer set.
func (*TxnCoordSender) Send ¶
func (tc *TxnCoordSender) Send( ctx context.Context, ba roachpb.BatchRequest, ) (*roachpb.BatchResponse, *roachpb.Error)
Send implements the batch.Sender interface. If the request is part of a transaction, the TxnCoordSender adds the transaction to a map of active transactions and begins heartbeating it. Every subsequent request for the same transaction updates the lastUpdate timestamp to prevent live transactions from being considered abandoned and garbage collected. Read/write mutating requests have their key or key range added to the transaction's interval tree of key ranges for eventual cleanup via resolved write intents; they're tagged to an outgoing EndTransaction request, with the receiving replica in charge of resolving them.
type TxnMetrics ¶
type TxnMetrics struct { Aborts *metric.CounterWithRates Commits *metric.CounterWithRates Commits1PC *metric.CounterWithRates // Commits which finished in a single phase Abandons *metric.CounterWithRates Durations *metric.Histogram // Restarts is the number of times we had to restart the transaction. Restarts *metric.Histogram }
TxnMetrics holds all metrics relating to KV transactions.
func MakeTxnMetrics ¶
func MakeTxnMetrics(sampleInterval time.Duration) TxnMetrics
MakeTxnMetrics returns a TxnMetrics struct that contains metrics whose windowed portions retain data for approximately sampleInterval.