Documentation ¶
Overview ¶
Package tikv provides tcp connection to kvserver.
Index ¶
- Constants
- Variables
- func NewBackoffFn(base, cap, jitter int) func(ctx context.Context) int
- func NewTestTiKVStore(client Client, pdClient pd.Client, clientHijack func(Client) Client, ...) (kv.Storage, error)
- func SplitRegionRanges(bo *Backoffer, cache *RegionCache, keyRanges []kv.KeyRange) ([]kv.KeyRange, error)
- type Backoffer
- type CachedRegion
- type Client
- type CopClient
- type Driver
- type EtcdSafePointKV
- type GCHandler
- type KeyLocation
- type Lock
- type LockResolver
- type MockSafePointKV
- type RPCContext
- type RawKVClient
- func (c *RawKVClient) Close() error
- func (c *RawKVClient) ClusterID() uint64
- func (c *RawKVClient) Delete(key []byte) error
- func (c *RawKVClient) DeleteRange(startKey []byte, endKey []byte) error
- func (c *RawKVClient) Get(key []byte) ([]byte, error)
- func (c *RawKVClient) Put(key, value []byte) error
- func (c *RawKVClient) Scan(startKey []byte, limit int) (keys [][]byte, values [][]byte, err error)
- type Region
- func (r *Region) Contains(key []byte) bool
- func (r *Region) EndKey() []byte
- func (r *Region) GetContext() *kvrpcpb.Context
- func (r *Region) GetID() uint64
- func (r *Region) OnRequestFail(storeID uint64) bool
- func (r *Region) StartKey() []byte
- func (r *Region) SwitchPeer(storeID uint64) bool
- func (r *Region) VerID() RegionVerID
- type RegionCache
- func (c *RegionCache) ClearStoreByID(id uint64)
- func (c *RegionCache) DropRegion(id RegionVerID)
- func (c *RegionCache) GetRPCContext(bo *Backoffer, id RegionVerID) (*RPCContext, error)
- func (c *RegionCache) GetStoreAddr(bo *Backoffer, id uint64) (string, error)
- func (c *RegionCache) GroupKeysByRegion(bo *Backoffer, keys [][]byte) (map[RegionVerID][][]byte, RegionVerID, error)
- func (c *RegionCache) ListRegionIDsInKeyRange(bo *Backoffer, startKey, endKey []byte) (regionIDs []uint64, err error)
- func (c *RegionCache) LocateKey(bo *Backoffer, key []byte) (*KeyLocation, error)
- func (c *RegionCache) LocateRegionByID(bo *Backoffer, regionID uint64) (*KeyLocation, error)
- func (c *RegionCache) OnRegionStale(ctx *RPCContext, newRegions []*metapb.Region) error
- func (c *RegionCache) OnRequestFail(ctx *RPCContext, err error)
- func (c *RegionCache) PDClient() pd.Client
- func (c *RegionCache) ReloadStoreAddr(bo *Backoffer, id uint64) (string, error)
- func (c *RegionCache) UpdateLeader(regionID RegionVerID, leaderStoreID uint64)
- type RegionRequestSender
- type RegionVerID
- type SafePointKV
- type Scanner
- type Storage
- type Store
- type TxnStatus
Constants ¶
const ( // NoJitter makes the backoff sequence strict exponential. NoJitter = 1 + iota // FullJitter applies random factors to strict exponential. FullJitter // EqualJitter is also randomized, but prevents very short sleeps. EqualJitter // DecorrJitter increases the maximum jitter based on the last random value. DecorrJitter )
const ( BoTxnLock backoffType BoRegionMiss BoUpdateLeader )
Back off types.
const ( GcOneRegionMaxBackoff = 20000 GcResolveLockMaxBackoff = 100000 GcDeleteRangeMaxBackoff = 100000 )
Maximum total sleep time(in ms) for kv/cop commands.
const ( ReadTimeoutMedium = 60 * time.Second // For requests that may need scan region. ReadTimeoutLong = 150 * time.Second // For requests that may need scan region multiple times. GCTimeout = 5 * time.Minute )
Timeout durations.
const ( // This is almost the same as 'tikv_gc_safe_point' in the table 'mysql.tidb', // save this to pd instead of tikv, because we can't use interface of table // if the safepoint on tidb is expired. GcSavedSafePoint = "/tidb/store/gcworker/saved_safe_point" GcSafePointCacheInterval = time.Second * 100 )
Safe point constants.
const ResolvedCacheSize = 2048
ResolvedCacheSize is max number of cached txn status.
Variables ¶
var ( ErrTiKVServerTimeout = terror.ClassTiKV.New(mysql.ErrTiKVServerTimeout, mysql.MySQLErrName[mysql.ErrTiKVServerTimeout]+txnRetryableMark) ErrResolveLockTimeout = terror.ClassTiKV.New(mysql.ErrResolveLockTimeout, mysql.MySQLErrName[mysql.ErrResolveLockTimeout]+txnRetryableMark) ErrPDServerTimeout = terror.ClassTiKV.New(mysql.ErrPDServerTimeout, mysql.MySQLErrName[mysql.ErrPDServerTimeout]+"%v") ErrTiKVServerBusy = terror.ClassTiKV.New(mysql.ErrTiKVServerBusy, mysql.MySQLErrName[mysql.ErrTiKVServerBusy]+txnRetryableMark) ErrGCTooEarly = terror.ClassTiKV.New(mysql.ErrGCTooEarly, mysql.MySQLErrName[mysql.ErrGCTooEarly]) )
MySQL error instances.
var ( // MaxRawKVScanLimit is the maximum scan limit for rawkv Scan. MaxRawKVScanLimit = 10240 // ErrMaxScanLimitExceeded is returned when the limit for rawkv Scan is to large. ErrMaxScanLimitExceeded = errors.New("limit should be less than MaxRawKVScanLimit") )
var CommitMaxBackoff = 41000
CommitMaxBackoff is max sleep time of the 'commit' command
var ( // ErrBodyMissing response body is missing error ErrBodyMissing = errors.New("response body is missing") )
GrpcKeepAliveTime is the duration of time after which if the client doesn't see any activity it pings the server to see if the transport is still alive.
GrpcKeepAliveTimeout is the duration of time for which the client waits after having pinged for keepalive check and if no activity is seen even after that the connection is closed.
var MaxCallMsgSize = 1<<31 - 1
MaxCallMsgSize set max gRPC receive message size received from server. If any message size is larger than current value, an error will be reported from gRPC.
var MaxConnectionCount uint = 16
MaxConnectionCount is the max gRPC connections that will be established with each tikv-server.
var MaxSendMsgSize = 1<<31 - 1
MaxSendMsgSize set max gRPC request message size sent to server. If any request message size is larger than current value, an error will be reported from gRPC.
NewGCHandlerFunc creates a new GCHandler. To enable real GC, we should assign the function to `gcworker.NewGCWorker`.
Functions ¶
func NewBackoffFn ¶
NewBackoffFn creates a backoff func which implements exponential backoff with optional jitters. See http://www.awsarchitectureblog.com/2015/03/backoff.html
Types ¶
type Backoffer ¶
Backoffer is a utility for retrying queries.
func NewBackoffer ¶
NewBackoffer creates a Backoffer with maximum sleep time(in ms).
func (*Backoffer) Backoff ¶
Backoff sleeps a while base on the backoffType and records the error message. It returns a retryable error if total sleep time exceeds maxSleep.
func (*Backoffer) Clone ¶
Clone creates a new Backoffer which keeps current Backoffer's sleep time and errors, and shares current Backoffer's context.
func (*Backoffer) Fork ¶
func (b *Backoffer) Fork() (*Backoffer, context.CancelFunc)
Fork creates a new Backoffer which keeps current Backoffer's sleep time and errors, and holds a child context of current Backoffer's context.
type CachedRegion ¶
type CachedRegion struct {
// contains filtered or unexported fields
}
CachedRegion encapsulates {Region, TTL}
type Client ¶
type Client interface { // Close should release all data. Close() error // SendRequest sends Request. SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) }
Client is a client that sends RPC. It should not be used after calling Close().
type CopClient ¶
type CopClient struct {
// contains filtered or unexported fields
}
CopClient is coprocessor client.
func (*CopClient) IsRequestTypeSupported ¶
IsRequestTypeSupported checks whether reqType is supported.
type EtcdSafePointKV ¶
type EtcdSafePointKV struct {
// contains filtered or unexported fields
}
EtcdSafePointKV implements SafePointKV at runtime
func NewEtcdSafePointKV ¶
func NewEtcdSafePointKV(addrs []string, tlsConfig *tls.Config) (*EtcdSafePointKV, error)
NewEtcdSafePointKV creates an instance of EtcdSafePointKV
func (*EtcdSafePointKV) Get ¶
func (w *EtcdSafePointKV) Get(k string) (string, error)
Get implements the Get method for SafePointKV
func (*EtcdSafePointKV) Put ¶
func (w *EtcdSafePointKV) Put(k string, v string) error
Put implements the Put method for SafePointKV
type GCHandler ¶
type GCHandler interface { // Start starts the GCHandler. Start() // Close closes the GCHandler. Close() }
GCHandler runs garbage collection job.
type KeyLocation ¶
type KeyLocation struct { Region RegionVerID StartKey []byte EndKey []byte }
KeyLocation is the region and range that a key is located.
func (*KeyLocation) Contains ¶
func (l *KeyLocation) Contains(key []byte) bool
Contains checks if key is in [StartKey, EndKey).
type Lock ¶
Lock represents a lock from tikv server.
type LockResolver ¶
type LockResolver struct {
// contains filtered or unexported fields
}
LockResolver resolves locks and also caches resolved txn status.
func NewLockResolver ¶
func NewLockResolver(etcdAddrs []string, security config.Security) (*LockResolver, error)
NewLockResolver creates a LockResolver. It is exported for other services to use. For instance, binlog service needs to determine a transaction's commit state.
func (*LockResolver) BatchResolveLocks ¶
func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc RegionVerID) (bool, error)
BatchResolveLocks resolve locks in a batch
func (*LockResolver) GetTxnStatus ¶
func (lr *LockResolver) GetTxnStatus(txnID uint64, primary []byte) (TxnStatus, error)
GetTxnStatus queries tikv-server for a txn's status (commit/rollback). If the primary key is still locked, it will launch a Rollback to abort it. To avoid unnecessarily aborting too many txns, it is wiser to wait a few seconds before calling it after Prewrite.
func (*LockResolver) ResolveLocks ¶
func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (ok bool, err error)
ResolveLocks tries to resolve Locks. The resolving process is in 3 steps:
- Use the `lockTTL` to pick up all expired locks. Only locks that are too old are considered orphan locks and will be handled later. If all locks are expired then all locks will be resolved so the returned `ok` will be true, otherwise caller should sleep a while before retry.
- For each lock, query the primary key to get txn(which left the lock)'s commit status.
- Send `ResolveLock` cmd to the lock's region to resolve all locks belong to the same transaction.
type MockSafePointKV ¶
type MockSafePointKV struct {
// contains filtered or unexported fields
}
MockSafePointKV implements SafePointKV at mock test
func NewMockSafePointKV ¶
func NewMockSafePointKV() *MockSafePointKV
NewMockSafePointKV creates an instance of MockSafePointKV
func (*MockSafePointKV) Get ¶
func (w *MockSafePointKV) Get(k string) (string, error)
Get implements the Get method for SafePointKV
func (*MockSafePointKV) Put ¶
func (w *MockSafePointKV) Put(k string, v string) error
Put implements the Put method for SafePointKV
type RPCContext ¶
RPCContext contains data that is needed to send RPC to a region.
func (*RPCContext) GetStoreID ¶
func (c *RPCContext) GetStoreID() uint64
GetStoreID returns StoreID.
type RawKVClient ¶
type RawKVClient struct {
// contains filtered or unexported fields
}
RawKVClient is a client of TiKV server which is used as a key-value storage, only GET/PUT/DELETE commands are supported.
func NewRawKVClient ¶
func NewRawKVClient(pdAddrs []string, security config.Security) (*RawKVClient, error)
NewRawKVClient creates a client with PD cluster addrs.
func (*RawKVClient) ClusterID ¶
func (c *RawKVClient) ClusterID() uint64
ClusterID returns the TiKV cluster ID.
func (*RawKVClient) Delete ¶
func (c *RawKVClient) Delete(key []byte) error
Delete deletes a key-value pair from TiKV.
func (*RawKVClient) DeleteRange ¶
func (c *RawKVClient) DeleteRange(startKey []byte, endKey []byte) error
DeleteRange deletes all key-value pairs in a range from TiKV
func (*RawKVClient) Get ¶
func (c *RawKVClient) Get(key []byte) ([]byte, error)
Get queries value with the key. When the key does not exist, it returns `nil, nil`.
func (*RawKVClient) Put ¶
func (c *RawKVClient) Put(key, value []byte) error
Put stores a key-value pair to TiKV.
func (*RawKVClient) Scan ¶
Scan queries continuous kv pairs, starts from startKey, up to limit pairs. If you want to exclude the startKey, append a '\0' to the key: `Scan(append(startKey, '\0'), limit)`.
type Region ¶
type Region struct {
// contains filtered or unexported fields
}
Region stores region's meta and its leader peer.
func (*Region) Contains ¶
Contains checks whether the key is in the region, for the maximum region endKey is empty. startKey <= key < endKey.
func (*Region) GetContext ¶
GetContext constructs kvprotopb.Context from region info.
func (*Region) OnRequestFail ¶
OnRequestFail records unreachable peer and tries to select another valid peer. It returns false if all peers are unreachable.
func (*Region) SwitchPeer ¶
SwitchPeer switches current peer to the one on specific store. It returns false if no peer matches the storeID.
type RegionCache ¶
type RegionCache struct {
// contains filtered or unexported fields
}
RegionCache caches Regions loaded from PD.
func NewRegionCache ¶
func NewRegionCache(pdClient pd.Client) *RegionCache
NewRegionCache creates a RegionCache.
func (*RegionCache) ClearStoreByID ¶
func (c *RegionCache) ClearStoreByID(id uint64)
ClearStoreByID clears store from cache with storeID.
func (*RegionCache) DropRegion ¶
func (c *RegionCache) DropRegion(id RegionVerID)
DropRegion removes a cached Region.
func (*RegionCache) GetRPCContext ¶
func (c *RegionCache) GetRPCContext(bo *Backoffer, id RegionVerID) (*RPCContext, error)
GetRPCContext returns RPCContext for a region. If it returns nil, the region must be out of date and already dropped from cache.
func (*RegionCache) GetStoreAddr ¶
func (c *RegionCache) GetStoreAddr(bo *Backoffer, id uint64) (string, error)
GetStoreAddr returns a tikv server's address by its storeID. It checks cache first, sends request to pd server when necessary.
func (*RegionCache) GroupKeysByRegion ¶
func (c *RegionCache) GroupKeysByRegion(bo *Backoffer, keys [][]byte) (map[RegionVerID][][]byte, RegionVerID, error)
GroupKeysByRegion separates keys into groups by their belonging Regions. Specially it also returns the first key's region which may be used as the 'PrimaryLockKey' and should be committed ahead of others.
func (*RegionCache) ListRegionIDsInKeyRange ¶
func (c *RegionCache) ListRegionIDsInKeyRange(bo *Backoffer, startKey, endKey []byte) (regionIDs []uint64, err error)
ListRegionIDsInKeyRange lists ids of regions in [start_key,end_key].
func (*RegionCache) LocateKey ¶
func (c *RegionCache) LocateKey(bo *Backoffer, key []byte) (*KeyLocation, error)
LocateKey searches for the region and range that the key is located.
func (*RegionCache) LocateRegionByID ¶
func (c *RegionCache) LocateRegionByID(bo *Backoffer, regionID uint64) (*KeyLocation, error)
LocateRegionByID searches for the region with ID.
func (*RegionCache) OnRegionStale ¶
func (c *RegionCache) OnRegionStale(ctx *RPCContext, newRegions []*metapb.Region) error
OnRegionStale removes the old region and inserts new regions into the cache.
func (*RegionCache) OnRequestFail ¶
func (c *RegionCache) OnRequestFail(ctx *RPCContext, err error)
OnRequestFail is used for clearing cache when a tikv server does not respond.
func (*RegionCache) PDClient ¶
func (c *RegionCache) PDClient() pd.Client
PDClient returns the pd.Client in RegionCache.
func (*RegionCache) ReloadStoreAddr ¶
func (c *RegionCache) ReloadStoreAddr(bo *Backoffer, id uint64) (string, error)
ReloadStoreAddr reloads store's address.
func (*RegionCache) UpdateLeader ¶
func (c *RegionCache) UpdateLeader(regionID RegionVerID, leaderStoreID uint64)
UpdateLeader update some region cache with newer leader info.
type RegionRequestSender ¶
type RegionRequestSender struct {
// contains filtered or unexported fields
}
RegionRequestSender sends KV/Cop requests to tikv server. It handles network errors and some region errors internally.
Typically, a KV/Cop request is bind to a region, all keys that are involved in the request should be located in the region. The sending process begins with looking for the address of leader store's address of the target region from cache, and the request is then sent to the destination tikv server over TCP connection. If region is updated, can be caused by leader transfer, region split, region merge, or region balance, tikv server may not able to process request and send back a RegionError. RegionRequestSender takes care of errors that does not relevant to region range, such as 'I/O timeout', 'NotLeader', and 'ServerIsBusy'. For other errors, since region range have changed, the request may need to split, so we simply return the error to caller.
func NewRegionRequestSender ¶
func NewRegionRequestSender(regionCache *RegionCache, client Client) *RegionRequestSender
NewRegionRequestSender creates a new sender.
type RegionVerID ¶
type RegionVerID struct {
// contains filtered or unexported fields
}
RegionVerID is a unique ID that can identify a Region at a specific version.
type SafePointKV ¶
SafePointKV is used for a seamingless integration for mockTest and runtime.
type Scanner ¶
type Scanner struct {
// contains filtered or unexported fields
}
Scanner support tikv scan
type Storage ¶
type Storage interface { kv.Storage // GetRegionCache gets the RegionCache. GetRegionCache() *RegionCache // SendReq sends a request to TiKV. SendReq(bo *Backoffer, req *tikvrpc.Request, regionID RegionVerID, timeout time.Duration) (*tikvrpc.Response, error) // GetLockResolver gets the LockResolver. GetLockResolver() *LockResolver // GetSafePointKV gets the SafePointKV. GetSafePointKV() SafePointKV // UpdateSPCache updates the cache of safe point. UpdateSPCache(cachedSP uint64, cachedTime time.Time) // GetGCHandler gets the GCHandler. GetGCHandler() GCHandler // SetOracle sets the Oracle. SetOracle(oracle oracle.Oracle) // SetTiKVClient sets the TiKV client. SetTiKVClient(client Client) // GetTiKVClient gets the TiKV client. GetTiKVClient() Client // Closed returns the closed channel. Closed() <-chan struct{} }
Storage represent the kv.Storage runs on TiKV.