raftstore

package
v0.0.0-...-0ebafaf Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 4, 2021 License: Apache-2.0 Imports: 52 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultApplyWBSize = 4 * 1024

	WriteTypeFlagPut      = 'P'
	WriteTypeFlagDelete   = 'D'
	WriteTypeFlagLock     = 'L'
	WriteTypeFlagRollback = 'R'
)
View Source
const (
	InitEpochVer     uint64 = 1
	InitEpochConfVer uint64 = 1
	KvTS             uint64 = 1
	RaftTS           uint64 = 0
)
View Source
const (
	KB          uint64 = 1024
	MB          uint64 = 1024 * 1024
	SplitSizeMb uint64 = 96
)
View Source
const (
	LocalPrefix byte = 0x01

	// We save two types region data in DB, for raft and other meta data.
	// When the store starts, we should iterate all region meta data to
	// construct peer, no need to travel large raft data, so we separate them
	// with different prefixes.
	RegionRaftPrefix byte = 0x02
	RegionMetaPrefix byte = 0x03
	RegionRaftLogLen      = 19 // REGION_RAFT_PREFIX_KEY + region_id + suffix + index

	// Following are the suffix after the local prefix.
	// For region id
	RaftLogSuffix           byte = 0x01
	RaftStateSuffix         byte = 0x02
	ApplyStateSuffix        byte = 0x03
	SnapshotRaftStateSuffix byte = 0x04

	// For region meta
	RegionStateSuffix byte = 0x01
)
View Source
const (
	MaxCheckClusterBootstrappedRetryCount = 60
	CheckClusterBootstrapRetrySeconds     = 3
)
View Source
const (
	// When we create a region peer, we should initialize its log term/index > 0,
	// so that we can force the follower peer to sync the snapshot first.
	RaftInitLogTerm  = 5
	RaftInitLogIndex = 5

	MaxSnapRetryCnt = 5

	MaxCacheCapacity = 1024 - 1
)
View Source
const (
	NSEC_PER_MSEC uint64 = 1000000
	SEC_SHIFT     uint64 = 10
	MSEC_MASK     uint64 = (1 << SEC_SHIFT) - 1
)
View Source
const InvalidID uint64 = 0
View Source
const LockstoreFileName = "lockstore.dump"
View Source
const MaxDeleteBatchSize int = 32 * 1024

In our tests, we found that if the batch size is too large, running deleteAllInRange will reduce OLTP QPS by 30% ~ 60%. We found that 32K is a proper choice.

View Source
const RaftInvalidIndex uint64 = 0

Variables

View Source
var (
	MinKey = []byte{0}

	// Data key has two prefix, meta 'm' and table 't',
	// extra keys has prefix 'm' + 1 = 'n',
	// extra table keys has prefix 't' + 1 = 'u', end key would be 'v'.
	MinDataKey = []byte{'m'}
	MaxDataKey = []byte{'v'}

	RegionMetaMinKey = []byte{LocalPrefix, RegionMetaPrefix}
	RegionMetaMaxKey = []byte{LocalPrefix, RegionMetaPrefix + 1}
)

Functions

func ApplyStateKey

func ApplyStateKey(regionID uint64) []byte

func BindRespError

func BindRespError(resp *raft_cmdpb.RaftCmdResponse, err error)

func BindRespTerm

func BindRespTerm(resp *raft_cmdpb.RaftCmdResponse, term uint64)

func BootstrapStore

func BootstrapStore(engines *Engines, clussterID, storeID uint64) error

func CheckKeyInRegion

func CheckKeyInRegion(key []byte, region *metapb.Region) error

/ Check if key in region range [`start_key`, `end_key`).

func CheckKeyInRegionExclusive

func CheckKeyInRegionExclusive(key []byte, region *metapb.Region) error

/ Check if key in region range (`start_key`, `end_key`).

func CheckKeyInRegionInclusive

func CheckKeyInRegionInclusive(key []byte, region *metapb.Region) error

/ Check if key in region range [`start_key`, `end_key`].

func CheckRegionEpoch

func CheckRegionEpoch(req *raft_cmdpb.RaftCmdRequest, region *metapb.Region, includeRegion bool) error

func ClearMeta

func ClearMeta(engines *Engines, kvWB, raftWB *WriteBatch, regionID uint64, lastIndex uint64) error

func ClearPrepareBootstrap

func ClearPrepareBootstrap(engines *Engines, regionID uint64) error

func ClearPrepareBootstrapState

func ClearPrepareBootstrapState(engines *Engines) error

func CloneMsg

func CloneMsg(origin, cloned proto.Message) error

func CompactRaftLog

func CompactRaftLog(tag string, state *applyState, compactIndex, compactTerm uint64) error

CompactRaftLog discards all log entries prior to compact_index. We must guarantee that the compact_index is not greater than applied index.

func CreateRaftLogCompactionFilter

func CreateRaftLogCompactionFilter(targetLevel int, startKey, endKey []byte) badger.CompactionFilter

func ErrResp

func ErrResp(err error) *raft_cmdpb.RaftCmdResponse

func ErrRespRegionNotFound

func ErrRespRegionNotFound(regionID uint64) *raft_cmdpb.RaftCmdResponse

func ErrRespStaleCommand

func ErrRespStaleCommand(term uint64) *raft_cmdpb.RaftCmdResponse

func ErrRespWithTerm

func ErrRespWithTerm(err error, term uint64) *raft_cmdpb.RaftCmdResponse

func IsEpochStale

func IsEpochStale(epoch *metapb.RegionEpoch, checkEpoch *metapb.RegionEpoch) bool

/ check whether epoch is staler than check_epoch.

func IsUrgentRequest

func IsUrgentRequest(rlog raftlog.RaftLog) bool

/ We enable follower lazy commit to get a better performance. / But it may not be appropriate for some requests. This function / checks whether the request should be committed on all followers / as soon as possible.

func NewCustomWriteBatch

func NewCustomWriteBatch(startTS, commitTS uint64, ctx *kvrpcpb.Context) mvcc.WriteBatch

func NewDBWriter

func NewDBWriter(conf *config.Config, router *RaftstoreRouter) mvcc.DBWriter

func NewTestRaftWriter

func NewTestRaftWriter(dbBundle *mvcc.DBBundle, engine *Engines) mvcc.DBWriter

func NotifyReqRegionRemoved

func NotifyReqRegionRemoved(regionId uint64, cb *Callback)

func NotifyStaleReq

func NotifyStaleReq(term uint64, cb *Callback)

func PeerEqual

func PeerEqual(l, r *metapb.Peer) bool

func PrepareBootstrap

func PrepareBootstrap(engins *Engines, storeID, regionID, peerID uint64) (*metapb.Region, error)

func Quorum

func Quorum(total int) int

func RaftLogIndex

func RaftLogIndex(key []byte) (uint64, error)

/ RaftLogIndex gets the log index from raft log key generated by `raft_log_key`.

func RaftLogKey

func RaftLogKey(regionID, index uint64) []byte

func RaftStateKey

func RaftStateKey(regionID uint64) []byte

func RaftstoreErrToPbError

func RaftstoreErrToPbError(e error) *errorpb.Error

func RawEndKey

func RawEndKey(region *metapb.Region) []byte

Get the `end_key` of current region in encoded form.

func RawStartKey

func RawStartKey(region *metapb.Region) []byte

Get the `start_key` of current region in encoded form.

func RegionEqual

func RegionEqual(l, r *metapb.Region) bool

func RegionMetaPrefixKey

func RegionMetaPrefixKey(regionID uint64) []byte

func RegionRaftPrefixKey

func RegionRaftPrefixKey(regionID uint64) []byte

func RegionStateKey

func RegionStateKey(regionID uint64) []byte

func RestoreLockStore

func RestoreLockStore(offset uint64, bundle *mvcc.DBBundle, raftDB *badger.DB) error

func SnapshotRaftStateKey

func SnapshotRaftStateKey(regionID uint64) []byte

func TimeToU64

func TimeToU64(t time.Time) uint64

func U64ToTime

func U64ToTime(u uint64) time.Time

func WritePeerState

func WritePeerState(kvWB *WriteBatch, region *metapb.Region, state rspb.PeerState, mergeState *rspb.MergeState)

Types

type ApplyOptions

type ApplyOptions struct {
	DBBundle *mvcc.DBBundle
	Region   *metapb.Region
	Abort    *uint32
	Builder  *sstable.Builder
	WB       *WriteBatch
}

type ApplyResult

type ApplyResult struct {
	HasPut      bool
	RegionState *rspb.RegionLocalState
}

type ApplySnapResult

type ApplySnapResult struct {
	// PrevRegion is the region before snapshot applied
	PrevRegion *metapb.Region
	Region     *metapb.Region
}

type CFFile

type CFFile struct {
	CF          CFName
	Path        string
	TmpPath     string
	ClonePath   string
	SstWriter   *rocksdb.SstFileWriter
	File        *os.File
	KVCount     int
	Size        uint64
	WrittenSize uint64
	Checksum    uint32
	WriteDigest hash.Hash32
}

type CFName

type CFName = string
const (
	CFDefault CFName = "default"
	CFLock    CFName = "lock"
	CFWrite   CFName = "write"
	CFRaft    CFName = "raft"
)

type CacheQueryStats

type CacheQueryStats struct {
	// contains filtered or unexported fields
}

type Callback

type Callback struct {
	// contains filtered or unexported fields
}

func NewCallback

func NewCallback() *Callback

func (*Callback) Done

func (cb *Callback) Done(resp *raft_cmdpb.RaftCmdResponse)

type Config

type Config struct {
	// true for high reliability, prevent data loss when power failure.
	SyncLog bool
	// minimizes disruption when a partitioned node rejoins the cluster by using a two phase election.
	Prevote    bool
	RaftdbPath string

	SnapPath string

	// store capacity. 0 means no limit.
	Capacity uint64

	// raft_base_tick_interval is a base tick interval (ms).
	RaftBaseTickInterval        time.Duration
	RaftHeartbeatTicks          int
	RaftElectionTimeoutTicks    int
	RaftMinElectionTimeoutTicks int
	RaftMaxElectionTimeoutTicks int
	RaftMaxSizePerMsg           uint64
	RaftMaxInflightMsgs         int

	// When the entry exceed the max size, reject to propose it.
	RaftEntryMaxSize uint64

	// Interval to gc unnecessary raft log (ms).
	RaftLogGCTickInterval time.Duration
	// A threshold to gc stale raft log, must >= 1.
	RaftLogGcThreshold uint64
	// When entry count exceed this value, gc will be forced trigger.
	RaftLogGcCountLimit uint64
	// When the approximate size of raft log entries exceed this value,
	// gc will be forced trigger.
	RaftLogGcSizeLimit uint64
	// When a peer is not responding for this time, leader will not keep entry cache for it.
	RaftEntryCacheLifeTime time.Duration
	// When a peer is newly added, reject transferring leader to the peer for a while.
	RaftRejectTransferLeaderDuration time.Duration

	// Interval (ms) to check region whether need to be split or not.
	SplitRegionCheckTickInterval time.Duration
	/// When size change of region exceed the diff since last check, it
	/// will be checked again whether it should be split.
	RegionSplitCheckDiff uint64
	/// Interval (ms) to check whether start compaction for a region.
	RegionCompactCheckInterval time.Duration
	// delay time before deleting a stale peer
	CleanStalePeerDelay time.Duration
	/// Number of regions for each time checking.
	RegionCompactCheckStep uint64
	/// Minimum number of tombstones to trigger manual compaction.
	RegionCompactMinTombstones uint64
	/// Minimum percentage of tombstones to trigger manual compaction.
	/// Should between 1 and 100.
	RegionCompactTombstonesPencent uint64
	PdHeartbeatTickInterval        time.Duration
	PdStoreHeartbeatTickInterval   time.Duration
	SnapMgrGcTickInterval          time.Duration
	SnapGcTimeout                  time.Duration

	NotifyCapacity  uint64
	MessagesPerTick uint64

	/// When a peer is not active for max_peer_down_duration,
	/// the peer is considered to be down and is reported to PD.
	MaxPeerDownDuration time.Duration

	/// If the leader of a peer is missing for longer than max_leader_missing_duration,
	/// the peer would ask pd to confirm whether it is valid in any region.
	/// If the peer is stale and is not valid in any region, it will destroy itself.
	MaxLeaderMissingDuration time.Duration
	/// Similar to the max_leader_missing_duration, instead it will log warnings and
	/// try to alert monitoring systems, if there is any.
	AbnormalLeaderMissingDuration time.Duration
	PeerStaleStateCheckInterval   time.Duration

	LeaderTransferMaxLogLag uint64

	SnapApplyBatchSize uint64

	// Interval (ms) to check region whether the data is consistent.
	ConsistencyCheckInterval time.Duration

	ReportRegionFlowInterval time.Duration

	// The lease provided by a successfully proposed and applied entry.
	RaftStoreMaxLeaderLease time.Duration

	// Right region derive origin region id when split.
	RightDeriveWhenSplit bool

	AllowRemoveLeader bool

	/// Max log gap allowed to propose merge.
	MergeMaxLogGap uint64

	/// Interval to re-propose merge.
	MergeCheckTickInterval time.Duration

	UseDeleteRange bool

	ApplyMaxBatchSize uint64
	ApplyPoolSize     uint64

	StoreMaxBatchSize uint64

	ConcurrentSendSnapLimit uint64
	ConcurrentRecvSnapLimit uint64

	GrpcInitialWindowSize uint64
	GrpcKeepAliveTime     time.Duration
	GrpcKeepAliveTimeout  time.Duration
	GrpcRaftConnNum       uint64

	Addr          string
	AdvertiseAddr string
	Labels        []StoreLabel

	SplitCheck *splitCheckConfig
}

func NewDefaultConfig

func NewDefaultConfig() *Config

func (*Config) Validate

func (c *Config) Validate() error

type ConsistencyState

type ConsistencyState struct {
	LastCheckTime time.Time
	// (computed_result_or_to_be_verified, index, hash)
	Index uint64
	Hash  []byte
}

/ `ConsistencyState` is used for consistency check.

type DestroyPeerJob

type DestroyPeerJob struct {
	Initialized bool
	AsyncRemove bool
	RegionId    uint64
	Peer        *metapb.Peer
}

type Engines

type Engines struct {
	// contains filtered or unexported fields
}

func NewEngines

func NewEngines(kvEngine *mvcc.DBBundle, raftEngine *badger.DB, kvPath, raftPath string) *Engines

func (*Engines) SyncKVWAL

func (en *Engines) SyncKVWAL() error

func (*Engines) SyncRaftWAL

func (en *Engines) SyncRaftWAL() error

func (*Engines) WriteKV

func (en *Engines) WriteKV(wb *WriteBatch) error

func (*Engines) WriteRaft

func (en *Engines) WriteRaft(wb *WriteBatch) error

type EntryCache

type EntryCache struct {
	// contains filtered or unexported fields
}

type ErrEpochNotMatch

type ErrEpochNotMatch struct {
	Message string
	Regions []*metapb.Region
}

func (*ErrEpochNotMatch) Error

func (e *ErrEpochNotMatch) Error() string

type ErrKeyNotInRegion

type ErrKeyNotInRegion struct {
	Key    []byte
	Region *metapb.Region
}

func (*ErrKeyNotInRegion) Error

func (e *ErrKeyNotInRegion) Error() string

type ErrNotLeader

type ErrNotLeader struct {
	RegionId uint64
	Leader   *metapb.Peer
}

func (*ErrNotLeader) Error

func (e *ErrNotLeader) Error() string

type ErrRaftEntryTooLarge

type ErrRaftEntryTooLarge struct {
	RegionId  uint64
	EntrySize uint64
}

func (*ErrRaftEntryTooLarge) Error

func (e *ErrRaftEntryTooLarge) Error() string

type ErrRegionNotFound

type ErrRegionNotFound struct {
	RegionId uint64
}

func (*ErrRegionNotFound) Error

func (e *ErrRegionNotFound) Error() string

type ErrServerIsBusy

type ErrServerIsBusy struct {
	Reason    string
	BackoffMs uint64
}

func (*ErrServerIsBusy) Error

func (e *ErrServerIsBusy) Error() string

type ErrStaleCommand

type ErrStaleCommand struct{}

func (*ErrStaleCommand) Error

func (e *ErrStaleCommand) Error() string

type ErrStoreNotMatch

type ErrStoreNotMatch struct {
	RequestStoreId uint64
	ActualStoreId  uint64
}

func (*ErrStoreNotMatch) Error

func (e *ErrStoreNotMatch) Error() string

type GenSnapTask

type GenSnapTask struct {
	// contains filtered or unexported fields
}

type GlobalContext

type GlobalContext struct {
	// contains filtered or unexported fields
}

type IOLimiter

type IOLimiter = rate.Limiter

func NewIOLimiter

func NewIOLimiter(rateLimit int) *IOLimiter

func NewInfLimiter

func NewInfLimiter() *IOLimiter

type InvokeContext

type InvokeContext struct {
	RegionID   uint64
	RaftState  raftState
	ApplyState applyState

	SnapRegion *metapb.Region
	// contains filtered or unexported fields
}

func NewInvokeContext

func NewInvokeContext(store *PeerStorage) *InvokeContext

type JobStatus

type JobStatus = uint32
const (
	JobStatus_Pending JobStatus = 0 + iota
	JobStatus_Running
	JobStatus_Cancelling
	JobStatus_Cancelled
	JobStatus_Finished
	JobStatus_Failed
)

type LeaderChecker

type LeaderChecker interface {
	IsLeader(ctx *kvrpcpb.Context, router *RaftstoreRouter) *errorpb.Error
}

type Lease

type Lease struct {
	// contains filtered or unexported fields
}

/ Lease records an expired time, for examining the current moment is in lease or not. / It's dedicated to the Raft leader lease mechanism, contains either state of / 1. Suspect Timestamp / A suspicious leader lease timestamp, which marks the leader may still hold or lose / its lease until the clock time goes over this timestamp. / 2. Valid Timestamp / A valid leader lease timestamp, which marks the leader holds the lease for now. / The lease is valid until the clock time goes over this timestamp. / / ```text / Time / |----------------------------------> / ^ ^ / Now Suspect TS / State: | Suspect | Suspect / / |----------------------------------> / ^ ^ / Now Valid TS / State: | Valid | Expired / ``` / / Note: / - Valid timestamp would increase when raft log entries are applied in current term. / - Suspect timestamp would be set after the message `MsgTimeoutNow` is sent by current peer. / The message `MsgTimeoutNow` starts a leader transfer procedure. During this procedure, / current peer as an old leader may still hold its lease or lose it. / It's possible there is a new leader elected and current peer as an old leader / doesn't step down due to network partition from the new leader. In that case, / current peer lose its leader lease. / Within this suspect leader lease expire time, read requests could not be performed / locally. / - The valid leader lease should be `lease = max_lease - (commit_ts - send_ts)` / And the expired timestamp for that leader lease is `commit_ts + lease`, / which is `send_ts + max_lease` in short.

func NewLease

func NewLease(maxLease time.Duration) *Lease

func (*Lease) Expire

func (l *Lease) Expire()

func (*Lease) ExpireRemoteLease

func (l *Lease) ExpireRemoteLease()

func (*Lease) Inspect

func (l *Lease) Inspect(ts *time.Time) LeaseState

/ Inspect the lease state for the ts or now.

func (*Lease) MaybeNewRemoteLease

func (l *Lease) MaybeNewRemoteLease(term uint64) *RemoteLease

/ Return a new `RemoteLease` if there is none.

func (*Lease) Renew

func (l *Lease) Renew(sendTs time.Time)

/ Renew the lease to the bound.

func (*Lease) Suspect

func (l *Lease) Suspect(sendTs time.Time)

/ Suspect the lease to the bound.

type LeaseState

type LeaseState int
const (
	/// The lease is suspicious, may be invalid.
	LeaseState_Suspect LeaseState = 1 + iota
	/// The lease is valid.
	LeaseState_Valid
	/// The lease is expired.
	LeaseState_Expired
)

type LimitWriter

type LimitWriter struct {
	// contains filtered or unexported fields
}

func (*LimitWriter) Write

func (lw *LimitWriter) Write(b []byte) (int, error)

type MetaFile

type MetaFile struct {
	Meta *rspb.SnapshotMeta
	Path string
	File *os.File

	// for writing snapshot
	TmpPath string
}

type Msg

type Msg struct {
	Type     MsgType
	RegionID uint64
	Data     interface{}
}

func NewMsg

func NewMsg(tp MsgType, data interface{}) Msg

func NewPeerMsg

func NewPeerMsg(tp MsgType, regionID uint64, data interface{}) Msg

type MsgComputeHashResult

type MsgComputeHashResult struct {
	Index uint64
	Hash  []byte
}

type MsgGCSnap

type MsgGCSnap struct {
	Snaps []SnapKeyWithSending
}

type MsgHalfSplitRegion

type MsgHalfSplitRegion struct {
	RegionEpoch *metapb.RegionEpoch
}

type MsgMergeResult

type MsgMergeResult struct {
	TargetPeer *metapb.Peer
	Stale      bool
}

type MsgRaftCmd

type MsgRaftCmd struct {
	SendTime time.Time
	Request  raftlog.RaftLog
	Callback *Callback
}

type MsgSignificant

type MsgSignificant struct {
	Type           MsgSignificantType
	ToPeerID       uint64
	SnapshotStatus raft.SnapshotStatus
}

type MsgSignificantType

type MsgSignificantType int
const (
	MsgSignificantTypeStatus      MsgSignificantType = 1
	MsgSignificantTypeUnreachable MsgSignificantType = 2
)

type MsgSplitRegion

type MsgSplitRegion struct {
	RegionEpoch *metapb.RegionEpoch
	// It's an encoded key.
	// TODO: support meta key.
	SplitKeys [][]byte
	Callback  *Callback
}

type MsgStoreClearRegionSizeInRange

type MsgStoreClearRegionSizeInRange struct {
	StartKey []byte
	EndKey   []byte
}

type MsgType

type MsgType int64
const (
	MsgTypeNull                   MsgType = 0
	MsgTypeRaftMessage            MsgType = 1
	MsgTypeRaftCmd                MsgType = 2
	MsgTypeSplitRegion            MsgType = 3
	MsgTypeComputeResult          MsgType = 4
	MsgTypeRegionApproximateSize  MsgType = 5
	MsgTypeRegionApproximateKeys  MsgType = 6
	MsgTypeCompactionDeclineBytes MsgType = 7
	MsgTypeHalfSplitRegion        MsgType = 8
	MsgTypeMergeResult            MsgType = 9
	MsgTypeGcSnap                 MsgType = 10
	MsgTypeClearRegionSize        MsgType = 11
	MsgTypeTick                   MsgType = 12
	MsgTypeSignificantMsg         MsgType = 13
	MsgTypeStart                  MsgType = 14
	MsgTypeApplyRes               MsgType = 15
	MsgTypeNoop                   MsgType = 16

	MsgTypeStoreRaftMessage   MsgType = 101
	MsgTypeStoreSnapshotStats MsgType = 102
	// Clear region size and keys for all regions in the range, so we can force them to re-calculate
	// their size later.
	MsgTypeStoreClearRegionSizeInRange MsgType = 104
	MsgTypeStoreCompactedEvent         MsgType = 105
	MsgTypeStoreTick                   MsgType = 106
	MsgTypeStoreStart                  MsgType = 107

	MsgTypeFsmNormal  MsgType = 201
	MsgTypeFsmControl MsgType = 202

	MsgTypeApply             MsgType = 301
	MsgTypeApplyRegistration MsgType = 302
	MsgTypeApplyProposal     MsgType = 303
	MsgTypeApplyCatchUpLogs  MsgType = 304
	MsgTypeApplyLogsUpToDate MsgType = 305
	MsgTypeApplyDestroy      MsgType = 306
	MsgTypeApplySnapshot     MsgType = 307
)

type Node

type Node struct {
	// contains filtered or unexported fields
}

func NewNode

func NewNode(system *raftBatchSystem, store *metapb.Store, cfg *Config, pdClient pd.Client, observer PeerEventObserver) *Node

func (*Node) BootstrapCluster

func (n *Node) BootstrapCluster(ctx context.Context, engines *Engines, firstRegion *metapb.Region) (newCluster bool, err error)

func (*Node) Start

func (n *Node) Start(ctx context.Context, engines *Engines, trans Transport, snapMgr *SnapManager, pdWorker *worker, router *router) error

type Peer

type Peer struct {
	Meta *metapb.Peer

	RaftGroup *raft.RawNode

	// Record the last instant of each peer's heartbeat response.
	PeerHeartbeats map[uint64]time.Time

	/// Record the instants of peers being added into the configuration.
	/// Remove them after they are not pending any more.
	PeersStartPendingTime map[uint64]time.Time
	RecentAddedPeer       *RecentAddedPeer

	/// an inaccurate difference in region size since last reset.
	SizeDiffHint uint64

	/// approximate size of the region.
	ApproximateSize *uint64
	/// approximate keys of the region.
	ApproximateKeys         *uint64
	CompactionDeclinedBytes uint64

	ConsistencyState *ConsistencyState

	Tag string

	// Index of last scheduled committed raft log.
	LastApplyingIdx  uint64
	LastCompactedIdx uint64

	// Approximate size of logs that is applied but not compacted yet.
	RaftLogSizeHint uint64

	PendingRemove bool

	PendingMergeState *rspb.MergeState

	PendingMergeApplyResult *WaitApplyResultState
	PeerStat                PeerStat
	// contains filtered or unexported fields
}

func NewPeer

func NewPeer(storeId uint64, cfg *Config, engines *Engines, region *metapb.Region, regionSched chan<- task,
	peer *metapb.Peer) (*Peer, error)

func (*Peer) Activate

func (p *Peer) Activate(applyMsgs *applyMsgs)

/ Register self to applyMsgs so that the peer is then usable. / Also trigger `RegionChangeEvent::Create` here.

func (*Peer) AnyNewPeerCatchUp

func (p *Peer) AnyNewPeerCatchUp(peerId uint64) bool

/ Returns `true` if any new peer catches up with the leader in replicating logs. / And updates `PeersStartPendingTime` if needed.

func (*Peer) ApplyReads

func (p *Peer) ApplyReads(kv *mvcc.DBBundle, ready *raft.Ready)

func (*Peer) CheckPeers

func (p *Peer) CheckPeers()

/ Checks and updates `peer_heartbeats` for the peer.

func (*Peer) CheckStaleState

func (p *Peer) CheckStaleState(cfg *Config) StaleState

func (*Peer) CollectDownPeers

func (p *Peer) CollectDownPeers(maxDuration time.Duration) []*pdpb.PeerStats

/ Collects all down peers.

func (*Peer) CollectPendingPeers

func (p *Peer) CollectPendingPeers() []*metapb.Peer

/ Collects all pending peers and update `peers_start_pending_time`.

func (*Peer) Destroy

func (p *Peer) Destroy(engine *Engines, keepData bool) error

/ Does the real destroy task which includes: / 1. Set the region to tombstone; / 2. Clear data; / 3. Notify all pending requests.

func (*Peer) GetMinProgress

func (p *Peer) GetMinProgress() uint64

func (*Peer) GetRaftStatus

func (p *Peer) GetRaftStatus() *raft.Status

func (*Peer) GetRole

func (p *Peer) GetRole() raft.StateType

func (*Peer) HandleRaftReadyAppend

func (p *Peer) HandleRaftReadyAppend(trans Transport, applyMsgs *applyMsgs, kvWB, raftWB *WriteBatch, observer PeerEventObserver) *ReadyICPair

func (*Peer) HandleRaftReadyApply

func (p *Peer) HandleRaftReadyApply(kv *mvcc.DBBundle, applyMsgs *applyMsgs, ready *raft.Ready)

func (*Peer) HasPendingSnapshot

func (p *Peer) HasPendingSnapshot() bool

/ Returns `true` if the raft group has replicated a snapshot but not committed it yet.

func (*Peer) HeartbeatPd

func (p *Peer) HeartbeatPd(pdScheduler chan<- task)

func (*Peer) IsApplyingSnapshot

func (p *Peer) IsApplyingSnapshot() bool

func (*Peer) IsLeader

func (p *Peer) IsLeader() bool

func (*Peer) LeaderId

func (p *Peer) LeaderId() uint64

func (*Peer) MaybeCampaign

func (p *Peer) MaybeCampaign(parentIsLeader bool) bool

func (*Peer) MaybeDestroy

func (p *Peer) MaybeDestroy() *DestroyPeerJob

/ Tries to destroy itself. Returns a job (if needed) to do more cleaning tasks.

func (*Peer) MaybeRenewLeaderLease

func (p *Peer) MaybeRenewLeaderLease(ts time.Time)

Try to renew leader lease.

func (*Peer) OnRoleChanged

func (p *Peer) OnRoleChanged(observer PeerEventObserver, ready *raft.Ready)

func (*Peer) PeerId

func (p *Peer) PeerId() uint64

func (*Peer) PostApply

func (p *Peer) PostApply(kv *mvcc.DBBundle, applyState applyState, appliedIndexTerm uint64, merged bool, applyMetrics applyMetrics) bool

func (*Peer) PostPropose

func (p *Peer) PostPropose(meta *ProposalMeta, isConfChange bool, cb *Callback)

func (*Peer) PostRaftReadyPersistent

func (p *Peer) PostRaftReadyPersistent(trans Transport, applyMsgs *applyMsgs, ready *raft.Ready, invokeCtx *InvokeContext) *ApplySnapResult

func (*Peer) PostSplit

func (p *Peer) PostSplit()

func (*Peer) PrePropose

func (p *Peer) PrePropose(cfg *Config, rlog raftlog.RaftLog) (*ProposalContext, error)

func (*Peer) Propose

func (p *Peer) Propose(kv *mvcc.DBBundle, cfg *Config, cb *Callback, rlog raftlog.RaftLog, errResp *raft_cmdpb.RaftCmdResponse) bool

Propose a request.

Return true means the request has been proposed successfully.

func (*Peer) ProposeConfChange

func (p *Peer) ProposeConfChange(cfg *Config, req *raft_cmdpb.RaftCmdRequest) (uint64, error)

Fails in such cases: 1. A pending conf change has not been applied yet; 2. Removing the leader is not allowed in the configuration; 3. The conf change makes the raft group not healthy; 4. The conf change is dropped by raft group internally.

func (*Peer) ProposeNormal

func (p *Peer) ProposeNormal(cfg *Config, rlog raftlog.RaftLog) (uint64, error)

func (*Peer) ProposeTransferLeader

func (p *Peer) ProposeTransferLeader(cfg *Config, req *raft_cmdpb.RaftCmdRequest, cb *Callback) bool

Return true if the transfer leader request is accepted.

func (*Peer) ReadyToHandlePendingSnap

func (p *Peer) ReadyToHandlePendingSnap() bool

func (*Peer) Region

func (p *Peer) Region() *metapb.Region

func (*Peer) Send

func (p *Peer) Send(trans Transport, msgs []eraftpb.Message) error

func (*Peer) SetRegion

func (p *Peer) SetRegion(region *metapb.Region)

/ Set the region of a peer. / / This will update the region of the peer, caller must ensure the region / has been preserved in a durable device.

func (*Peer) Step

func (p *Peer) Step(m *eraftpb.Message) error

/ Steps the raft message.

func (*Peer) Stop

func (p *Peer) Stop()

func (*Peer) Store

func (p *Peer) Store() *PeerStorage

func (*Peer) TakeApplyProposals

func (p *Peer) TakeApplyProposals() *regionProposal

func (*Peer) Term

func (p *Peer) Term() uint64

type PeerEventContext

type PeerEventContext struct {
	LeaderChecker LeaderChecker
	RegionId      uint64
}

type PeerEventObserver

type PeerEventObserver interface {
	// OnPeerCreate will be invoked when there is a new peer created.
	OnPeerCreate(ctx *PeerEventContext, region *metapb.Region)
	// OnPeerApplySnap will be invoked when there is a replicate peer's snapshot applied.
	OnPeerApplySnap(ctx *PeerEventContext, region *metapb.Region)
	// OnPeerDestroy will be invoked when a peer is destroyed.
	OnPeerDestroy(ctx *PeerEventContext)
	// OnSplitRegion will be invoked when region split into new regions with corresponding peers.
	OnSplitRegion(derived *metapb.Region, regions []*metapb.Region, peers []*PeerEventContext)
	// OnRegionConfChange will be invoked after conf change updated region's epoch.
	OnRegionConfChange(ctx *PeerEventContext, epoch *metapb.RegionEpoch)
	// OnRoleChange will be invoked after peer state has changed
	OnRoleChange(regionId uint64, newState raft.StateType)
}

type PeerStat

type PeerStat struct {
	WrittenBytes uint64
	WrittenKeys  uint64
}

type PeerStorage

type PeerStorage struct {
	Engines *Engines

	Tag string
	// contains filtered or unexported fields
}

func NewPeerStorage

func NewPeerStorage(engines *Engines, region *metapb.Region, regionSched chan<- task, peerID uint64, tag string) (*PeerStorage, error)

func (*PeerStorage) Append

func (ps *PeerStorage) Append(invokeCtx *InvokeContext, entries []eraftpb.Entry, raftWB *WriteBatch) error

Append the given entries to the raft log using previous last index or self.last_index. Return the new last index for later update. After we commit in engine, we can set last_index to the return one.

func (*PeerStorage) AppliedIndex

func (ps *PeerStorage) AppliedIndex() uint64

func (*PeerStorage) ApplySnapshot

func (ps *PeerStorage) ApplySnapshot(ctx *InvokeContext, snap *eraftpb.Snapshot, kvWB *WriteBatch, raftWB *WriteBatch) error

Apply the peer with given snapshot.

func (*PeerStorage) CancelApplyingSnap

func (p *PeerStorage) CancelApplyingSnap() bool

func (*PeerStorage) CheckApplyingSnap

func (p *PeerStorage) CheckApplyingSnap() bool

Check if the storage is applying a snapshot.

func (*PeerStorage) ClearData

func (ps *PeerStorage) ClearData() error

func (*PeerStorage) CompactTo

func (ps *PeerStorage) CompactTo(idx uint64)

func (*PeerStorage) Entries

func (ps *PeerStorage) Entries(low, high, maxSize uint64) ([]eraftpb.Entry, error)

func (*PeerStorage) FirstIndex

func (ps *PeerStorage) FirstIndex() (uint64, error)

func (*PeerStorage) InitialState

func (ps *PeerStorage) InitialState() (eraftpb.HardState, eraftpb.ConfState, error)

func (*PeerStorage) IsApplyingSnapshot

func (ps *PeerStorage) IsApplyingSnapshot() bool

func (*PeerStorage) LastIndex

func (ps *PeerStorage) LastIndex() (uint64, error)

func (*PeerStorage) MaybeGCCache

func (ps *PeerStorage) MaybeGCCache(replicatedIdx, appliedIdx uint64)

func (*PeerStorage) PostReadyPersistent

func (ps *PeerStorage) PostReadyPersistent(ctx *InvokeContext) *ApplySnapResult

Update the memory state after ready changes are flushed to disk successfully.

func (*PeerStorage) Region

func (ps *PeerStorage) Region() *metapb.Region

func (*PeerStorage) SaveReadyState

func (ps *PeerStorage) SaveReadyState(kvWB, raftWB *WriteBatch, ready *raft.Ready) (*InvokeContext, error)

/ Save memory states to disk. / / This function only write data to `ready_ctx`'s `WriteBatch`. It's caller's duty to write / it explicitly to disk. If it's flushed to disk successfully, `post_ready` should be called / to update the memory states properly. / Do not modify ready in this function, this is a requirement to advance the ready object properly later.

func (*PeerStorage) ScheduleApplyingSnapshot

func (ps *PeerStorage) ScheduleApplyingSnapshot()

func (*PeerStorage) SetRegion

func (ps *PeerStorage) SetRegion(region *metapb.Region)

func (*PeerStorage) Snapshot

func (ps *PeerStorage) Snapshot() (eraftpb.Snapshot, error)

func (*PeerStorage) Term

func (ps *PeerStorage) Term(idx uint64) (uint64, error)

type PeerTick

type PeerTick int
const (
	PeerTickRaft             PeerTick = 0
	PeerTickRaftLogGC        PeerTick = 1
	PeerTickSplitRegionCheck PeerTick = 2
	PeerTickPdHeartbeat      PeerTick = 3
	PeerTickCheckMerge       PeerTick = 4
	PeerTickPeerStaleState   PeerTick = 5
)

type ProposalContext

type ProposalContext byte
const (
	ProposalContext_SyncLog      ProposalContext = 1
	ProposalContext_Split        ProposalContext = 1 << 1
	ProposalContext_PrepareMerge ProposalContext = 1 << 2
)

func NewProposalContextFromBytes

func NewProposalContextFromBytes(ctx []byte) *ProposalContext

func (ProposalContext) ToBytes

func (c ProposalContext) ToBytes() []byte

type ProposalMeta

type ProposalMeta struct {
	Index          uint64
	Term           uint64
	RenewLeaseTime *time.Time
}

type ProposalQueue

type ProposalQueue struct {
	// contains filtered or unexported fields
}

func (*ProposalQueue) Clear

func (q *ProposalQueue) Clear()

func (*ProposalQueue) PopFront

func (q *ProposalQueue) PopFront(term uint64) *ProposalMeta

func (*ProposalQueue) Push

func (q *ProposalQueue) Push(meta *ProposalMeta)

type RaftClient

type RaftClient struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func (*RaftClient) Flush

func (c *RaftClient) Flush()

func (*RaftClient) Send

func (c *RaftClient) Send(msg *raft_serverpb.RaftMessage)

func (*RaftClient) Stop

func (c *RaftClient) Stop()

type RaftContext

type RaftContext struct {
	*GlobalContext

	ReadyRes []*ReadyICPair
	// contains filtered or unexported fields
}

type RaftError

type RaftError struct {
	RequestErr *errorpb.Error
}

func (*RaftError) Error

func (re *RaftError) Error() string

type RaftInnerServer

type RaftInnerServer struct {
	// contains filtered or unexported fields
}

func NewRaftInnerServer

func NewRaftInnerServer(globalConfig *config.Config, engines *Engines, raftConfig *Config) *RaftInnerServer

func (*RaftInnerServer) BatchRaft

func (ris *RaftInnerServer) BatchRaft(stream tikvpb.Tikv_BatchRaftServer) error

func (*RaftInnerServer) GetRaftstoreRouter

func (ris *RaftInnerServer) GetRaftstoreRouter() *RaftstoreRouter

func (*RaftInnerServer) GetStoreMeta

func (ris *RaftInnerServer) GetStoreMeta() *metapb.Store

func (*RaftInnerServer) Raft

func (ris *RaftInnerServer) Raft(stream tikvpb.Tikv_RaftServer) error

func (*RaftInnerServer) SetPeerEventObserver

func (ris *RaftInnerServer) SetPeerEventObserver(ob PeerEventObserver)

func (*RaftInnerServer) Setup

func (ris *RaftInnerServer) Setup(pdClient pd.Client)

func (*RaftInnerServer) Snapshot

func (ris *RaftInnerServer) Snapshot(stream tikvpb.Tikv_SnapshotServer) error

func (*RaftInnerServer) Start

func (ris *RaftInnerServer) Start(pdClient pd.Client) error

func (*RaftInnerServer) Stop

func (ris *RaftInnerServer) Stop() error

type RaftstoreRouter

type RaftstoreRouter struct {
	// contains filtered or unexported fields
}

RaftstoreRouter exports SendCommand method for other packages.

func (*RaftstoreRouter) SendCommand

func (r *RaftstoreRouter) SendCommand(req *raft_cmdpb.RaftCmdRequest, cb *Callback) error

func (*RaftstoreRouter) SplitRegion

func (r *RaftstoreRouter) SplitRegion(ctx *kvrpcpb.Context, keys [][]byte) ([]*metapb.Region, error)

type ReadExecutor

type ReadExecutor struct {
	// contains filtered or unexported fields
}

func NewReadExecutor

func NewReadExecutor(checkEpoch bool) *ReadExecutor

func (*ReadExecutor) Execute

type ReadIndexQueue

type ReadIndexQueue struct {
	// contains filtered or unexported fields
}

func (*ReadIndexQueue) ClearUncommitted

func (r *ReadIndexQueue) ClearUncommitted(term uint64)

func (*ReadIndexQueue) NextId

func (r *ReadIndexQueue) NextId() uint64

func (*ReadIndexQueue) PopFront

func (q *ReadIndexQueue) PopFront() *ReadIndexRequest

type ReadIndexRequest

type ReadIndexRequest struct {
	// contains filtered or unexported fields
}

func NewReadIndexRequest

func NewReadIndexRequest(id uint64, cmds []*ReqCbPair, renewLeaseTime *time.Time) *ReadIndexRequest

type ReadyICPair

type ReadyICPair struct {
	Ready raft.Ready
	IC    *InvokeContext
}

type RecentAddedPeer

type RecentAddedPeer struct {
	RejectDurationAsSecs uint64
	Id                   uint64
	AddedTime            time.Time
}

func NewRecentAddedPeer

func NewRecentAddedPeer(rejectDurationAsSecs uint64) *RecentAddedPeer

func (*RecentAddedPeer) Contains

func (r *RecentAddedPeer) Contains(id uint64) bool

func (*RecentAddedPeer) Update

func (r *RecentAddedPeer) Update(id uint64, now time.Time)

type RemoteLease

type RemoteLease struct {
	// contains filtered or unexported fields
}

/ A remote lease, it can only be derived by `Lease`. It will be sent / to the local read thread, so name it remote. If Lease expires, the remote must / expire too.

func (*RemoteLease) Expire

func (r *RemoteLease) Expire()

func (*RemoteLease) Inspect

func (r *RemoteLease) Inspect(ts *time.Time) LeaseState

func (*RemoteLease) Renew

func (r *RemoteLease) Renew(bound time.Time)

func (*RemoteLease) Term

func (r *RemoteLease) Term() uint64

type ReqCbPair

type ReqCbPair struct {
	Req *raft_cmdpb.RaftCmdRequest
	Cb  *Callback
}

type RequestInspector

type RequestInspector interface {
	// contains filtered or unexported methods
}

type RequestPolicy

type RequestPolicy int
const (
	// Handle the read request directly without dispatch.
	RequestPolicy_ReadLocal RequestPolicy = 0 + iota
	// Handle the read request via raft's SafeReadIndex mechanism.
	RequestPolicy_ReadIndex
	RequestPolicy_ProposeNormal
	RequestPolicy_ProposeTransferLeader
	RequestPolicy_ProposeConfChange
	RequestPolicy_Invalid
)

type ServerTransport

type ServerTransport struct {
	// contains filtered or unexported fields
}

func NewServerTransport

func NewServerTransport(raftClient *RaftClient, snapScheduler chan<- task, router *router) *ServerTransport

func (*ServerTransport) Flush

func (t *ServerTransport) Flush()

func (*ServerTransport) ReportSnapshotStatus

func (t *ServerTransport) ReportSnapshotStatus(msg *raft_serverpb.RaftMessage, status raft.SnapshotStatus)

func (*ServerTransport) ReportUnreachable

func (t *ServerTransport) ReportUnreachable(msg *raft_serverpb.RaftMessage)

func (*ServerTransport) Send

func (*ServerTransport) SendSnapshotSock

func (t *ServerTransport) SendSnapshotSock(msg *raft_serverpb.RaftMessage)

type Snap

type Snap struct {
	CFFiles []*CFFile

	MetaFile  *MetaFile
	SizeTrack *int64
	// contains filtered or unexported fields
}

func NewSnap

func NewSnap(dir string, key SnapKey, sizeTrack *int64, isSending, toBuild bool,
	deleter SnapshotDeleter, limiter *IOLimiter) (*Snap, error)

func NewSnapForApplying

func NewSnapForApplying(dir string, key SnapKey, sizeTrack *int64, deleter SnapshotDeleter) (*Snap, error)

func NewSnapForBuilding

func NewSnapForBuilding(dir string, key SnapKey, sizeTrack *int64, deleter SnapshotDeleter, limiter *IOLimiter) (*Snap, error)

func NewSnapForReceiving

func NewSnapForReceiving(dir string, key SnapKey, snapshotMeta *rspb.SnapshotMeta,
	sizeTrack *int64, deleter SnapshotDeleter, limiter *IOLimiter) (*Snap, error)

func NewSnapForSending

func NewSnapForSending(dir string, key SnapKey, sizeTrack *int64, deleter SnapshotDeleter) (*Snap, error)

func (*Snap) Apply

func (s *Snap) Apply(opts ApplyOptions) (ApplyResult, error)

func (*Snap) Build

func (s *Snap) Build(dbSnap *regionSnapshot, region *metapb.Region, snapData *rspb.RaftSnapshotData, stat *SnapStatistics, deleter SnapshotDeleter) error

func (*Snap) Delete

func (s *Snap) Delete()

func (*Snap) Drop

func (s *Snap) Drop()

func (*Snap) Exists

func (s *Snap) Exists() bool

func (*Snap) Meta

func (s *Snap) Meta() (os.FileInfo, error)

func (*Snap) Path

func (s *Snap) Path() string

func (*Snap) Read

func (s *Snap) Read(b []byte) (int, error)

func (*Snap) Save

func (s *Snap) Save() error

func (*Snap) TotalSize

func (s *Snap) TotalSize() (total uint64)

func (*Snap) Write

func (s *Snap) Write(b []byte) (int, error)

type SnapEntry

type SnapEntry int
const (
	SnapEntryGenerating SnapEntry = 1
	SnapEntrySending    SnapEntry = 2
	SnapEntryReceiving  SnapEntry = 3
	SnapEntryApplying   SnapEntry = 4
)

func (SnapEntry) String

func (e SnapEntry) String() string

type SnapKey

type SnapKey struct {
	RegionID uint64
	Term     uint64
	Index    uint64
}

func SnapKeyFromRegionSnap

func SnapKeyFromRegionSnap(regionID uint64, snap *eraftpb.Snapshot) SnapKey

func SnapKeyFromSnap

func SnapKeyFromSnap(snap *eraftpb.Snapshot) (SnapKey, error)

func (SnapKey) String

func (k SnapKey) String() string

type SnapKeyWithSending

type SnapKeyWithSending struct {
	SnapKey   SnapKey
	IsSending bool
}

type SnapManager

type SnapManager struct {
	MaxTotalSize uint64
	// contains filtered or unexported fields
}

func NewSnapManager

func NewSnapManager(path string, router *router) *SnapManager

func (*SnapManager) DeleteSnapshot

func (sm *SnapManager) DeleteSnapshot(key SnapKey, snapshot Snapshot, checkEntry bool) bool

func (*SnapManager) Deregister

func (sm *SnapManager) Deregister(key SnapKey, entry SnapEntry)

func (*SnapManager) GetSnapshotForApplying

func (sm *SnapManager) GetSnapshotForApplying(snapKey SnapKey) (Snapshot, error)

func (*SnapManager) GetSnapshotForBuilding

func (sm *SnapManager) GetSnapshotForBuilding(key SnapKey) (Snapshot, error)

func (*SnapManager) GetSnapshotForReceiving

func (sm *SnapManager) GetSnapshotForReceiving(snapKey SnapKey, data []byte) (Snapshot, error)

func (*SnapManager) GetSnapshotForSending

func (sm *SnapManager) GetSnapshotForSending(snapKey SnapKey) (Snapshot, error)

func (*SnapManager) GetTotalSnapSize

func (sm *SnapManager) GetTotalSnapSize() uint64

func (*SnapManager) HasRegistered

func (sm *SnapManager) HasRegistered(key SnapKey) bool

func (*SnapManager) ListIdleSnap

func (sm *SnapManager) ListIdleSnap() ([]SnapKeyWithSending, error)

func (*SnapManager) Register

func (sm *SnapManager) Register(key SnapKey, entry SnapEntry)

func (*SnapManager) Stats

func (sm *SnapManager) Stats() SnapStats

type SnapManagerBuilder

type SnapManagerBuilder struct {
	// contains filtered or unexported fields
}

func (*SnapManagerBuilder) Build

func (smb *SnapManagerBuilder) Build(path string, router *router) *SnapManager

func (*SnapManagerBuilder) MaxTotalSize

func (smb *SnapManagerBuilder) MaxTotalSize(v uint64) *SnapManagerBuilder

type SnapState

type SnapState struct {
	StateType SnapStateType
	Status    *JobStatus
	Receiver  chan *eraftpb.Snapshot
}

type SnapStateType

type SnapStateType int
const (
	SnapState_Relax SnapStateType = 0 + iota
	SnapState_Generating
	SnapState_Applying
	SnapState_ApplyAborted
)

type SnapStatistics

type SnapStatistics struct {
	Size    uint64
	KVCount int
}

type SnapStats

type SnapStats struct {
	ReceivingCount int
	SendingCount   int
}

type Snapshot

type Snapshot interface {
	io.Reader
	io.Writer
	Build(dbBundle *regionSnapshot, region *metapb.Region, snapData *rspb.RaftSnapshotData, stat *SnapStatistics, deleter SnapshotDeleter) error
	Path() string
	Exists() bool
	Delete()
	Meta() (os.FileInfo, error)
	TotalSize() uint64
	Save() error
	Apply(option ApplyOptions) (ApplyResult, error)
}

`Snapshot` is an interface for snapshot. It's used in these scenarios:

  1. build local snapshot
  2. read local snapshot and then replicate it to remote raftstores
  3. receive snapshot from remote raftstore and write it to local storage
  4. apply snapshot
  5. snapshot gc

type SnapshotDeleter

type SnapshotDeleter interface {
	// DeleteSnapshot returns true if it successfully delete the specified snapshot.
	DeleteSnapshot(key SnapKey, snapshot Snapshot, checkEntry bool) bool
}

`SnapshotDeleter` is a trait for deleting snapshot. It's used to ensure that the snapshot deletion happens under the protection of locking to avoid race case for concurrent read/write.

type StaleState

type StaleState int
const (
	StaleStateValid StaleState = 0 + iota
	StaleStateToValidate
	StaleStateLeaderMissing
)

type StoreContext

type StoreContext struct {
	*GlobalContext
	// contains filtered or unexported fields
}

type StoreLabel

type StoreLabel struct {
	LabelKey, LabelValue string
}

type StoreTick

type StoreTick int
const (
	StoreTickCompactCheck     StoreTick = 0
	StoreTickPdStoreHeartbeat StoreTick = 1
	StoreTickSnapGC           StoreTick = 2
	StoreTickConsistencyCheck StoreTick = 3
)

type TestRaftWriter

type TestRaftWriter struct {
	// contains filtered or unexported fields
}

TestRaftWriter is used to mock raft write related prewrite and commit operations without sending real raft commands

func (*TestRaftWriter) Close

func (w *TestRaftWriter) Close()

func (*TestRaftWriter) DeleteRange

func (w *TestRaftWriter) DeleteRange(start, end []byte, latchHandle mvcc.LatchHandle) error

func (*TestRaftWriter) NewWriteBatch

func (w *TestRaftWriter) NewWriteBatch(startTS, commitTS uint64, ctx *kvrpcpb.Context) mvcc.WriteBatch

func (*TestRaftWriter) Open

func (w *TestRaftWriter) Open()

func (*TestRaftWriter) Write

func (w *TestRaftWriter) Write(batch mvcc.WriteBatch) error

type Transport

type Transport interface {
	Send(msg *rspb.RaftMessage) error
}

type WaitApplyResultState

type WaitApplyResultState struct {
	// contains filtered or unexported fields
}

/ A struct that stores the state to wait for `PrepareMerge` apply result. / / When handling the apply result of a `CommitMerge`, the source peer may have / not handle the apply result of the `PrepareMerge`, so the target peer has / to abort current handle process and wait for it asynchronously.

type WriteBatch

type WriteBatch struct {
	// contains filtered or unexported fields
}

func (*WriteBatch) Delete

func (wb *WriteBatch) Delete(key y.Key)

func (*WriteBatch) DeleteLock

func (wb *WriteBatch) DeleteLock(key []byte)

func (*WriteBatch) Len

func (wb *WriteBatch) Len() int

func (*WriteBatch) MustWriteToKV

func (wb *WriteBatch) MustWriteToKV(db *mvcc.DBBundle)

func (*WriteBatch) MustWriteToRaft

func (wb *WriteBatch) MustWriteToRaft(db *badger.DB)

func (*WriteBatch) Reset

func (wb *WriteBatch) Reset()

func (*WriteBatch) Rollback

func (wb *WriteBatch) Rollback(key y.Key)

func (*WriteBatch) RollbackToSafePoint

func (wb *WriteBatch) RollbackToSafePoint()

func (*WriteBatch) Set

func (wb *WriteBatch) Set(key y.Key, val []byte)

func (*WriteBatch) SetLock

func (wb *WriteBatch) SetLock(key, val []byte)

func (*WriteBatch) SetMsg

func (wb *WriteBatch) SetMsg(key y.Key, msg proto.Message) error

func (*WriteBatch) SetOpLock

func (wb *WriteBatch) SetOpLock(key y.Key, userMeta []byte)

func (*WriteBatch) SetSafePoint

func (wb *WriteBatch) SetSafePoint()

func (*WriteBatch) SetWithUserMeta

func (wb *WriteBatch) SetWithUserMeta(key y.Key, val, userMeta []byte)

func (*WriteBatch) WriteToKV

func (wb *WriteBatch) WriteToKV(bundle *mvcc.DBBundle) error

WriteToKV flush WriteBatch to DB by two steps:

  1. Write entries to badger. After save ApplyState to badger, subsequent regionSnapshot will start at new raft index.
  2. Update lockStore, the date in lockStore may be older than the DB, so we need to restore then entries from raft log.

func (*WriteBatch) WriteToRaft

func (wb *WriteBatch) WriteToRaft(db *badger.DB) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL