Documentation ¶
Index ¶
- Constants
- Variables
- func DeleteLimiterWait()
- func GetIoMetricLabels(partition *DataPartition, tp string) map[string]string
- func GetStatusMessage(status int) string
- func IsDiskErr(errMsg string) bool
- func MarshalRaftCmd(raftOpItem *RaftCmdItem) (raw []byte, err error)
- func MarshalRandWriteRaftLog(opcode uint8, extentID uint64, offset, size int64, data []byte, crc uint32) (result []byte, err error)
- func NewPacketToBroadcastMinAppliedID(partitionID uint64, minAppliedID uint64) (p *repl.Packet)
- func NewPacketToGetAppliedID(partitionID uint64) (p *repl.Packet)
- func NewPacketToGetMaxExtentIDAndPartitionSIze(partitionID uint64) (p *repl.Packet)
- func NewPacketToGetPartitionSize(partitionID uint64) (p *repl.Packet)
- func UnmarshalOldVersionRaftLog(raw []byte) (opItem *rndWrtOpItem, err error)
- func UnmarshalOldVersionRandWriteOpItem(raw []byte) (result *rndWrtOpItem, err error)
- func UnmarshalRandWriteRaftLog(raw []byte) (opItem *rndWrtOpItem, err error)
- type DataNode
- func (s *DataNode) ExtentWithHoleRepairRead(request *repl.Packet, connect net.Conn, ...)
- func (m *DataNode) GetDpMaxRepairErrCnt() uint64
- func (s *DataNode) HasStarted() bool
- func (s *DataNode) NormalSnapshotExtentRepairRead(request *repl.Packet, connect net.Conn)
- func (s *DataNode) OperatePacket(p *repl.Packet, c net.Conn) (err error)
- func (s *DataNode) Post(p *repl.Packet) error
- func (s *DataNode) Prepare(p *repl.Packet) (err error)
- func (s *DataNode) Shutdown()
- func (s *DataNode) Start(cfg *config.Config) (err error)
- func (s *DataNode) Sync()
- type DataNodeID
- type DataNodeInfo
- type DataNodeMetrics
- type DataPartition
- func (dp *DataPartition) Apply(command []byte, index uint64) (resp interface{}, err error)
- func (dp *DataPartition) ApplyMemberChange(confChange *raftproto.ConfChange, index uint64) (resp interface{}, err error)
- func (dp *DataPartition) ApplyRandomWrite(command []byte, raftApplyID uint64) (respStatus interface{}, err error)
- func (dp *DataPartition) ApplySnapshot(peers []raftproto.Peer, iterator raftproto.SnapIterator) (err error)
- func (dp *DataPartition) Available() int
- func (dp *DataPartition) CanRemoveRaftMember(peer proto.Peer, force bool) error
- func (dp *DataPartition) ChangeRaftMember(changeType raftProto.ConfChangeType, peer raftProto.Peer, context []byte) (resp interface{}, err error)
- func (dp *DataPartition) CheckLeader(request *repl.Packet, connect net.Conn) (err error)
- func (dp *DataPartition) CheckWriteVer(p *repl.Packet) (err error)
- func (dp *DataPartition) Del(key interface{}) (interface{}, error)
- func (dp *DataPartition) Disk() *Disk
- func (dp *DataPartition) DoExtentStoreRepair(repairTask *DataPartitionRepairTask)
- func (dp *DataPartition) DoRepair(repairTasks []*DataPartitionRepairTask)
- func (dp *DataPartition) ExtentStore() *storage.ExtentStore
- func (dp *DataPartition) ExtentWithHoleRepairRead(request repl.PacketInterface, connect net.Conn, ...)
- func (dp *DataPartition) ForceLoadHeader()
- func (dp *DataPartition) ForceSetDataPartitionToFininshLoad()
- func (dp *DataPartition) ForceSetDataPartitionToLoadding()
- func (dp *DataPartition) ForceSetRaftRunning()
- func (dp *DataPartition) Get(key interface{}) (interface{}, error)
- func (dp *DataPartition) GetAppliedID() (id uint64)
- func (dp *DataPartition) GetExtentCount() int
- func (dp *DataPartition) GetExtentCountWithoutLock() int
- func (dp *DataPartition) GetRepairBlockSize() (size uint64)
- func (dp *DataPartition) HandleFatalEvent(err *raft.FatalError)
- func (dp *DataPartition) HandleLeaderChange(leader uint64)
- func (partition *DataPartition) HandleVersionOp(req *proto.MultiVersionOpRequest) (err error)
- func (dp *DataPartition) IsDataPartitionLoadFin() bool
- func (dp *DataPartition) IsDataPartitionLoading() bool
- func (dp *DataPartition) IsEquareCreateDataPartitionRequst(request *proto.CreateDataPartitionRequest) (err error)
- func (dp *DataPartition) IsExistPeer(peer proto.Peer) bool
- func (dp *DataPartition) IsExistReplica(addr string) bool
- func (dp *DataPartition) IsExistReplicaWithNodeId(addr string, nodeID uint64) bool
- func (dp *DataPartition) IsForbidden() bool
- func (dp *DataPartition) IsRaftLeader() (addr string, ok bool)
- func (dp *DataPartition) LaunchRepair(extentType uint8)
- func (dp *DataPartition) Load() (response *proto.LoadDataPartitionResponse)
- func (dp *DataPartition) LoadAppliedID() (err error)
- func (dp *DataPartition) NormalExtentRepairRead(p repl.PacketInterface, connect net.Conn, isRepairRead bool, ...) (err error)
- func (dp *DataPartition) NotifyExtentRepair(members []*DataPartitionRepairTask) (err error)
- func (dp *DataPartition) Path() string
- func (dp *DataPartition) PersistMetadata() (err error)
- func (dp *DataPartition) Put(key interface{}, val interface{}) (resp interface{}, err error)
- func (dp *DataPartition) RandomWriteSubmit(pkg *repl.Packet) (err error)
- func (dp *DataPartition) ReloadSnapshot()
- func (dp *DataPartition) RemoveAll(force bool) (err error)
- func (dp *DataPartition) Replicas() []string
- func (dp *DataPartition) SetForbidden(status bool)
- func (dp *DataPartition) SetMinAppliedID(id uint64)
- func (dp *DataPartition) SetRepairBlockSize(size uint64)
- func (dp *DataPartition) Size() int
- func (dp *DataPartition) SnapShot() (files []*proto.File)
- func (dp *DataPartition) Snapshot() (raftproto.Snapshot, error)
- func (dp *DataPartition) StartRaft(isLoad bool) (err error)
- func (dp *DataPartition) StartRaftAfterRepair(isLoad bool)
- func (dp *DataPartition) StartRaftLoggingSchedule()
- func (dp *DataPartition) Status() int
- func (dp *DataPartition) Stop()
- func (dp *DataPartition) StopDecommissionRecover(stop bool)
- func (dp *DataPartition) String() (m string)
- func (dp *DataPartition) Submit(val []byte) (retCode uint8, err error)
- func (dp *DataPartition) Used() int
- type DataPartitionMetadata
- type DataPartitionRepairTask
- type Disk
- func (d *Disk) AddBackupPartitionDir(id uint64)
- func (d *Disk) AddDiskErrPartition(dpId uint64)
- func (d *Disk) AddSize(size uint64)
- func (d *Disk) AttachDataPartition(dp *DataPartition)
- func (d *Disk) CanWrite() bool
- func (d *Disk) CheckDiskError(err error, rwFlag uint8)
- func (d *Disk) DataPartitionList() (partitionIDs []uint64)
- func (d *Disk) DetachDataPartition(dp *DataPartition)
- func (d *Disk) ForceExitRaftStore()
- func (d *Disk) GetBackupPartitionDirList() (backupInfos []proto.BackupDataPartitionInfo)
- func (d *Disk) GetDataPartition(partitionID uint64) (partition *DataPartition)
- func (d *Disk) GetDataPartitionCount() int
- func (d *Disk) GetDecommissionStatus() bool
- func (d *Disk) GetDiskErrPartitionCount() uint64
- func (d *Disk) GetDiskErrPartitionList() (diskErrPartitionList []uint64)
- func (d *Disk) GetDiskPartition() *disk.PartitionStat
- func (d *Disk) HasDiskErrPartition(dpId uint64) bool
- func (d *Disk) MarkDecommissionStatus(decommission bool)
- func (d *Disk) PartitionCount() int
- func (d *Disk) QueryExtentRepairReadLimitStatus() (bool, uint64)
- func (d *Disk) ReleaseReadExtentToken()
- func (d *Disk) RequireReadExtentToken(id uint64) bool
- func (d *Disk) RestorePartition(visitor PartitionVisitor) (err error)
- func (d *Disk) SetExtentRepairReadLimitStatus(status bool)
- type DiskExtentReadLimitInfo
- type DiskExtentReadLimitStatusResponse
- type GcExtent
- type ItemIterator
- type LimiterStatus
- type MetaMultiSnapshotInfo
- type PartitionVisitor
- type PersistApplyIdRequest
- type RaftCmdItem
- type ReplicaInfo
- type SimpleVolView
- type SpaceManager
- func (manager *SpaceManager) AttachPartition(dp *DataPartition)
- func (manager *SpaceManager) CreatePartition(request *proto.CreateDataPartitionRequest) (dp *DataPartition, err error)
- func (manager *SpaceManager) DeletePartition(dpID uint64, force bool) (err error)
- func (manager *SpaceManager) DetachDataPartition(partitionID uint64)
- func (manager *SpaceManager) FillIoUtils(samples map[string]loadutil.DiskIoSample)
- func (manager *SpaceManager) GetAllDiskPartitions() []*disk.PartitionStat
- func (manager *SpaceManager) GetClusterID() (clusterID string)
- func (manager *SpaceManager) GetDisk(path string) (d *Disk, err error)
- func (manager *SpaceManager) GetDiskUtils() map[string]float64
- func (manager *SpaceManager) GetDisks() (disks []*Disk)
- func (manager *SpaceManager) GetNodeID() (nodeID uint64)
- func (manager *SpaceManager) GetRaftStore() (raftStore raftstore.RaftStore)
- func (manager *SpaceManager) LoadBrokenDisk(path string, reservedSpace, diskRdonlySpace uint64, maxErrCnt int, ...) (err error)
- func (manager *SpaceManager) LoadDisk(path string, reservedSpace, diskRdonlySpace uint64, maxErrCnt int, ...) (err error)
- func (manager *SpaceManager) Partition(partitionID uint64) (dp *DataPartition)
- func (manager *SpaceManager) RangePartitions(f func(partition *DataPartition, testID string) bool, reqID string)
- func (manager *SpaceManager) SetClusterID(clusterID string)
- func (manager *SpaceManager) SetCurrentLoadDpLimit(limit int)
- func (manager *SpaceManager) SetCurrentStopDpLimit(limit int)
- func (manager *SpaceManager) SetNodeID(nodeID uint64)
- func (manager *SpaceManager) SetRaftStore(raftStore raftstore.RaftStore)
- func (manager *SpaceManager) StartDiskSample()
- func (manager *SpaceManager) Stats() *Stats
- func (manager *SpaceManager) Stop()
- type Stats
- type VolMap
Constants ¶
const ( IntervalToUpdateReplica = 600 // interval to update the replica IntervalToUpdatePartitionSize = 60 * time.Second // interval to update the partition size NumOfFilesToRecoverInParallel = 10 // number of files to be recovered simultaneously )
const ( ReadFlag = 1 WriteFlag = 2 )
cmd response
const ( ActionNotifyFollowerToRepair = "ActionNotifyFollowerRepair" ActionStreamRead = "ActionStreamRead" ActionStreamFollowerRead = "ActionStreamFollowerRead" ActionCreateExtent = "ActionCreateExtent:" ActionMarkDelete = "ActionMarkDelete:" ActionGetAllExtentWatermarks = "ActionGetAllExtentWatermarks:" ActionWrite = "ActionWrite:" ActionRepair = "ActionRepair:" ActionDecommissionPartition = "ActionDecommissionPartition" ActionAddDataPartitionRaftMember = "ActionAddDataPartitionRaftMember" ActionRemoveDataPartitionRaftMember = "ActionRemoveDataPartitionRaftMember" ActionDataPartitionTryToLeader = "ActionDataPartitionTryToLeader" ActionCreateDataPartition = "ActionCreateDataPartition" ActionLoadDataPartition = "ActionLoadDataPartition" ActionDeleteDataPartition = "ActionDeleteDataPartition" ActionStreamReadTinyDeleteRecord = "ActionStreamReadTinyDeleteRecord" ActionSyncTinyDeleteRecord = "ActionSyncTinyDeleteRecord" ActionStreamReadTinyExtentRepair = "ActionStreamReadTinyExtentRepair" ActionBatchMarkDelete = "ActionBatchMarkDelete" ActionBatchLockNormalExtent = "ActionBatchLockNormalExtent" ActionUpdateVersion = "ActionUpdateVersion" ActionStopDataPartitionRepair = "ActionStopDataPartitionRepair" ActionRecoverDataReplicaMeta = "ActionRecoverDataReplicaMeta" ActionRecoverBackupDataReplica = "ActionRecoverBackupDataReplica" ActionRecoverBadDisk = "ActionRecoverBadDisk" ActionQueryBadDiskRecoverProgress = "ActionQueryBadDiskRecoverProgress" ActionDeleteBackupDirectories = "ActionDeleteBackupDirectories" )
Action description
const ( RepairRead = true StreamRead = false )
const ( EmptyResponse = 'E' TinyExtentRepairReadResponseArgLen = 17 NormalExtentWithHoleRepairReadResponseArgLen = 17 MaxSyncTinyDeleteBufferSize = 2400000 MaxFullSyncTinyDeleteTime = 3600 * 24 MinTinyExtentDeleteRecordSyncSize = 4 * 1024 * 1024 )
const ( ExpiredPartitionPrefix = "expired_" ExpiredPartitionExistTime = time.Hour * time.Duration(24*7) BackupPartitionPrefix = "backup_" )
const ( SyncTinyDeleteRecordFromLeaderOnEveryDisk = 5 MaxExtentRepairReadLimit = 1 // )
const ( DiskRecoverStop uint32 = iota DiskRecoverStart )
const ( StatPeriod = time.Minute * time.Duration(1) MetricPartitionIOName = "dataPartitionIO" MetricPartitionIOBytesName = "dataPartitionIOBytes" MetricLackDpCount = "lackDataPartitionCount" MetricCapacityToCreateDp = "capacityToCreateDp" MetricConnectionCnt = "connectionCnt" MetricDpCount = "dataPartitionCount" MetricTotalDpSize = "totalDpSize" MetricCapacity = "capacity" )
const ( UpdateNodeInfoTicket = 1 * time.Minute RepairTimeOut = time.Hour * 24 MaxRepairErrCnt = 1000 )
const ( DataPartitionPrefix = "datapartition" CachePartitionPrefix = "cachepartition" PreLoadPartitionPrefix = "preloadpartition" DataPartitionMetadataFileName = "META" TempMetadataFileName = ".meta" ApplyIndexFile = "APPLY" TempApplyIndexFile = ".apply" TimeLayout = "2006-01-02 15:04:05" )
const ( RaftStatusStopped = 0 RaftStatusRunning = 1 )
const ( DefaultZoneName = proto.DefaultZoneName DefaultRaftDir = "raft" DefaultRaftLogsToRetain = 10 // Count of raft logs per data partition DefaultDiskMaxErr = 1 DefaultDiskRetainMin = 5 * util.GB // GB DefaultNameResolveInterval = 1 // minutes )
const ( ConfigKeyLocalIP = "localIP" // string ConfigKeyPort = "port" // int ConfigKeyMasterAddr = "masterAddr" // array ConfigKeyZone = "zoneName" // string ConfigKeyDisks = "disks" // array ConfigKeyRaftDir = "raftDir" // string ConfigKeyRaftHeartbeat = "raftHeartbeat" // string ConfigKeyRaftReplica = "raftReplica" // string CfgTickInterval = "tickInterval" // int CfgRaftRecvBufSize = "raftRecvBufSize" // int ConfigKeyLogDir = "logDir" // string ConfigKeyDiskPath = "diskPath" // string /* * Metrics Degrade Level * minus value: turn off metrics collection. * 0 or 1: full metrics. * 2: 1/2 of the metrics will be collected. * 3: 1/3 of the metrics will be collected. * ... */ CfgMetricsDegrade = "metricsDegrade" // int CfgDiskRdonlySpace = "diskRdonlySpace" // int // smux Config ConfigKeyEnableSmuxClient = "enableSmuxConnPool" // bool ConfigKeySmuxPortShift = "smuxPortShift" // int ConfigKeySmuxMaxConn = "smuxMaxConn" // int ConfigKeySmuxStreamPerConn = "smuxStreamPerConn" // int ConfigKeySmuxMaxBuffer = "smuxMaxBuffer" // int ConfigKeySmuxTotalStream = "sumxTotalStream" // int // rate limit control enable ConfigDiskQosEnable = "diskQosEnable" // bool ConfigDiskReadIocc = "diskReadIocc" // int ConfigDiskReadIops = "diskReadIops" // int ConfigDiskReadFlow = "diskReadFlow" // int ConfigDiskWriteIocc = "diskWriteIocc" // int ConfigDiskWriteIops = "diskWriteIops" // int ConfigDiskWriteFlow = "diskWriteFlow" // int // load/stop dp limit ConfigDiskCurrentLoadDpLimit = "diskCurrentLoadDpLimit" ConfigDiskCurrentStopDpLimit = "diskCurrentStopDpLimit" // disk read extent limit ConfigEnableDiskReadExtentLimit = "enableDiskReadRepairExtentLimit" // bool ConfigServiceIDKey = "serviceIDKey" ConfigEnableGcTimer = "enableGcTimer" ConfigKeyDiskUnavailablePartitionErrorCount = "diskUnavailablePartitionErrorCount" )
const (
BinaryMarshalMagicVersion = 0xFF
)
const (
BufferWrite = false
)
const (
DecommissionDiskMark = "decommissionDiskMark"
)
const DefaultCurrentLoadDpLimit = 4
const DefaultStopDpLimit = 4
const DiskErrNotAssociatedWithPartition uint64 = 0 // use 0 for disk error without any data partition
const (
DiskSectorSize = 512
)
Sector size
const DiskSelectMaxStraw = 65536
const (
DiskStatusFile = ".diskStatus"
)
const (
FinishLoadDataPartitionExtentHeader = 1
)
Status of load data partition extent header
const (
IsReleased = 1
)
Tiny extent has been put back to store
const (
MinAvaliTinyExtentCnt = 5
)
const (
MinTinyExtentsToRepair = 10 // minimum number of tiny extents to repair
)
Apply the raft log operation. Currently we only have the random write operation.
const (
ModuleName = "dataNode"
)
const (
NetworkProtocol = "tcp"
)
Network protocol
const (
RaftNotStarted = "RaftNotStarted"
)
Error code
Variables ¶
var ( // RegexpDataPartitionDir validates the directory name of a data partition. RegexpDataPartitionDir, _ = regexp.Compile(`^datapartition_(\d)+_(\d)+$`) RegexpCachePartitionDir, _ = regexp.Compile(`^cachepartition_(\d)+_(\d)+$`) RegexpPreLoadPartitionDir, _ = regexp.Compile(`^preloadpartition_(\d)+_(\d)+$`) RegexpExpiredDataPartitionDir, _ = regexp.Compile(`^expired_datapartition_(\d)+_(\d)+$`) RegexpBackupDataPartitionDirToDelete, _ = regexp.Compile(`backup_datapartition_(\d+)_(\d+)-(\d+)`) )
var ( MaxExtentRepairLimit = 20000 MinExtentRepairLimit = 5 CurExtentRepairLimit = MaxExtentRepairLimit )
var ( ErrIncorrectStoreType = errors.New("Incorrect store type") ErrNoSpaceToCreatePartition = errors.New("No disk space to create a data partition") ErrNewSpaceManagerFailed = errors.New("Creater new space manager failed") ErrGetMasterDatanodeInfoFailed = errors.New("Failed to get datanode info from master") LocalIP string // MasterClient = masterSDK.NewMasterClient(nil, false) MasterClient *masterSDK.MasterCLientWithResolver )
var AutoRepairStatus = true
var ErrForbiddenDataPartition = errors.New("the data partition is forbidden")
Functions ¶
func DeleteLimiterWait ¶ added in v1.34.0
func DeleteLimiterWait()
func GetIoMetricLabels ¶ added in v1.34.0
func GetIoMetricLabels(partition *DataPartition, tp string) map[string]string
func GetStatusMessage ¶ added in v1.34.0
func MarshalRaftCmd ¶ added in v1.34.0
func MarshalRaftCmd(raftOpItem *RaftCmdItem) (raw []byte, err error)
func MarshalRandWriteRaftLog ¶ added in v1.4.0
func NewPacketToBroadcastMinAppliedID ¶
NewPacketToBroadcastMinAppliedID returns a new packet to broadcast the min applied ID.
func NewPacketToGetAppliedID ¶
NewPacketToGetAppliedID returns a new packet to get the applied ID.
func NewPacketToGetMaxExtentIDAndPartitionSIze ¶ added in v1.4.0
NewPacketToGetPartitionSize returns a new packet to get the partition size.
func NewPacketToGetPartitionSize ¶
NewPacketToGetPartitionSize returns a new packet to get the partition size.
func UnmarshalOldVersionRaftLog ¶ added in v1.4.0
func UnmarshalOldVersionRandWriteOpItem ¶ added in v1.4.0
func UnmarshalRandWriteRaftLog ¶ added in v1.4.0
RandomWriteSubmit submits the proposal to raft.
Types ¶
type DataNode ¶
type DataNode struct {
// contains filtered or unexported fields
}
DataNode defines the structure of a data node.
func (*DataNode) ExtentWithHoleRepairRead ¶ added in v1.34.0
func (s *DataNode) ExtentWithHoleRepairRead(request *repl.Packet, connect net.Conn, getReplyPacket func() repl.PacketInterface)
Handle tinyExtentRepairRead packet.
func (*DataNode) GetDpMaxRepairErrCnt ¶ added in v1.34.0
func (*DataNode) HasStarted ¶ added in v1.34.0
func (*DataNode) NormalSnapshotExtentRepairRead ¶ added in v1.34.0
func (*DataNode) OperatePacket ¶
type DataNodeID ¶ added in v1.34.0
type DataNodeInfo ¶ added in v1.4.0
type DataNodeInfo struct { Addr string PersistenceDataPartitions []uint64 PersistenceDataPartitionsWithDiskPath []proto.DataPartitionDiskInfo }
type DataNodeMetrics ¶ added in v1.34.0
type DataNodeMetrics struct { MetricIOBytes *exporter.Counter MetricLackDpCount *exporter.GaugeVec MetricCapacityToCreateDp *exporter.GaugeVec MetricConnectionCnt *exporter.Gauge MetricDpCount *exporter.Gauge MetricTotalDpSize *exporter.Gauge MetricCapacity *exporter.GaugeVec // contains filtered or unexported fields }
type DataPartition ¶
type DataPartition struct { DataPartitionCreateType int PersistApplyIdChan chan PersistApplyIdRequest // contains filtered or unexported fields }
func CreateDataPartition ¶
func CreateDataPartition(dpCfg *dataPartitionCfg, disk *Disk, request *proto.CreateDataPartitionRequest) (dp *DataPartition, err error)
func LoadDataPartition ¶
func LoadDataPartition(partitionDir string, disk *Disk) (dp *DataPartition, err error)
LoadDataPartition loads and returns a partition instance based on the specified directory. It reads the partition metadata file stored under the specified directory and creates the partition instance.
func (*DataPartition) Apply ¶
func (dp *DataPartition) Apply(command []byte, index uint64) (resp interface{}, err error)
Apply puts the data onto the disk.
func (*DataPartition) ApplyMemberChange ¶
func (dp *DataPartition) ApplyMemberChange(confChange *raftproto.ConfChange, index uint64) (resp interface{}, err error)
ApplyMemberChange supports adding new raft member or deleting an existing raft member. It does not support updating an existing member at this point.
func (*DataPartition) ApplyRandomWrite ¶
func (dp *DataPartition) ApplyRandomWrite(command []byte, raftApplyID uint64) (respStatus interface{}, err error)
ApplyRandomWrite random write apply
func (*DataPartition) ApplySnapshot ¶
func (dp *DataPartition) ApplySnapshot(peers []raftproto.Peer, iterator raftproto.SnapIterator) (err error)
ApplySnapshot asks the raft leader for the snapshot data to recover the contents on the local disk.
func (*DataPartition) Available ¶
func (dp *DataPartition) Available() int
Available returns the available space.
func (*DataPartition) CanRemoveRaftMember ¶ added in v1.4.0
func (dp *DataPartition) CanRemoveRaftMember(peer proto.Peer, force bool) error
func (*DataPartition) ChangeRaftMember ¶
func (dp *DataPartition) ChangeRaftMember(changeType raftProto.ConfChangeType, peer raftProto.Peer, context []byte) (resp interface{}, err error)
ChangeRaftMember is a wrapper function of changing the raft member.
func (*DataPartition) CheckLeader ¶
CheckLeader checks if itself is the leader during read
func (*DataPartition) CheckWriteVer ¶ added in v1.34.0
func (dp *DataPartition) CheckWriteVer(p *repl.Packet) (err error)
func (*DataPartition) Del ¶
func (dp *DataPartition) Del(key interface{}) (interface{}, error)
Del deletes the raft log based on the given key. It is not needed for replicating data partition.
func (*DataPartition) DoExtentStoreRepair ¶
func (dp *DataPartition) DoExtentStoreRepair(repairTask *DataPartitionRepairTask)
DoExtentStoreRepair performs the repairs of the extent store. 1. when the extent size is smaller than the max size on the record, start to repair the missing part. 2. if the extent does not even exist, create the extent first, and then repair.
func (*DataPartition) DoRepair ¶
func (dp *DataPartition) DoRepair(repairTasks []*DataPartitionRepairTask)
DoRepair asks the leader to perform the repair tasks.
func (*DataPartition) ExtentStore ¶
func (dp *DataPartition) ExtentStore() *storage.ExtentStore
func (*DataPartition) ExtentWithHoleRepairRead ¶ added in v1.34.0
func (dp *DataPartition) ExtentWithHoleRepairRead(request repl.PacketInterface, connect net.Conn, getReplyPacket func() repl.PacketInterface)
func (*DataPartition) ForceLoadHeader ¶
func (dp *DataPartition) ForceLoadHeader()
func (*DataPartition) ForceSetDataPartitionToFininshLoad ¶ added in v1.34.0
func (dp *DataPartition) ForceSetDataPartitionToFininshLoad()
func (*DataPartition) ForceSetDataPartitionToLoadding ¶ added in v1.34.0
func (dp *DataPartition) ForceSetDataPartitionToLoadding()
func (*DataPartition) ForceSetRaftRunning ¶ added in v1.34.0
func (dp *DataPartition) ForceSetRaftRunning()
func (*DataPartition) Get ¶
func (dp *DataPartition) Get(key interface{}) (interface{}, error)
Get returns the raft log based on the given key. It is not needed for replicating data partition.
func (*DataPartition) GetAppliedID ¶
func (dp *DataPartition) GetAppliedID() (id uint64)
func (*DataPartition) GetExtentCount ¶
func (dp *DataPartition) GetExtentCount() int
func (*DataPartition) GetExtentCountWithoutLock ¶ added in v1.34.0
func (dp *DataPartition) GetExtentCountWithoutLock() int
func (*DataPartition) GetRepairBlockSize ¶ added in v1.34.0
func (dp *DataPartition) GetRepairBlockSize() (size uint64)
func (*DataPartition) HandleFatalEvent ¶
func (dp *DataPartition) HandleFatalEvent(err *raft.FatalError)
HandleFatalEvent notifies the application when panic happens.
func (*DataPartition) HandleLeaderChange ¶
func (dp *DataPartition) HandleLeaderChange(leader uint64)
HandleLeaderChange notifies the application when the raft leader has changed.
func (*DataPartition) HandleVersionOp ¶ added in v1.34.0
func (partition *DataPartition) HandleVersionOp(req *proto.MultiVersionOpRequest) (err error)
func (*DataPartition) IsDataPartitionLoadFin ¶ added in v1.34.0
func (dp *DataPartition) IsDataPartitionLoadFin() bool
func (*DataPartition) IsDataPartitionLoading ¶ added in v1.34.0
func (dp *DataPartition) IsDataPartitionLoading() bool
func (*DataPartition) IsEquareCreateDataPartitionRequst ¶ added in v1.34.0
func (dp *DataPartition) IsEquareCreateDataPartitionRequst(request *proto.CreateDataPartitionRequest) (err error)
func (*DataPartition) IsExistPeer ¶ added in v1.34.0
func (dp *DataPartition) IsExistPeer(peer proto.Peer) bool
func (*DataPartition) IsExistReplica ¶ added in v1.34.0
func (dp *DataPartition) IsExistReplica(addr string) bool
func (*DataPartition) IsExistReplicaWithNodeId ¶ added in v1.34.0
func (dp *DataPartition) IsExistReplicaWithNodeId(addr string, nodeID uint64) bool
func (*DataPartition) IsForbidden ¶ added in v1.34.0
func (dp *DataPartition) IsForbidden() bool
func (*DataPartition) IsRaftLeader ¶
func (dp *DataPartition) IsRaftLeader() (addr string, ok bool)
IsRaftLeader tells if the given address belongs to the raft leader.
func (*DataPartition) LaunchRepair ¶
func (dp *DataPartition) LaunchRepair(extentType uint8)
LaunchRepair launches the repair of extents.
func (*DataPartition) Load ¶
func (dp *DataPartition) Load() (response *proto.LoadDataPartitionResponse)
func (*DataPartition) LoadAppliedID ¶
func (dp *DataPartition) LoadAppliedID() (err error)
LoadAppliedID loads the applied IDs to the memory.
func (*DataPartition) NormalExtentRepairRead ¶ added in v1.34.0
func (dp *DataPartition) NormalExtentRepairRead(p repl.PacketInterface, connect net.Conn, isRepairRead bool, metrics *DataNodeMetrics, makeRspPacket repl.MakeStreamReadResponsePacket, ) (err error)
func (*DataPartition) NotifyExtentRepair ¶
func (dp *DataPartition) NotifyExtentRepair(members []*DataPartitionRepairTask) (err error)
NotifyExtentRepair notifies the followers to repair.
func (*DataPartition) Path ¶
func (dp *DataPartition) Path() string
func (*DataPartition) PersistMetadata ¶
func (dp *DataPartition) PersistMetadata() (err error)
PersistMetadata persists the file metadata on the disk.
func (*DataPartition) Put ¶
func (dp *DataPartition) Put(key interface{}, val interface{}) (resp interface{}, err error)
Put submits the raft log to the raft store.
func (*DataPartition) RandomWriteSubmit ¶
func (dp *DataPartition) RandomWriteSubmit(pkg *repl.Packet) (err error)
RandomWriteSubmit submits the proposal to raft.
func (*DataPartition) ReloadSnapshot ¶
func (dp *DataPartition) ReloadSnapshot()
func (*DataPartition) RemoveAll ¶ added in v1.34.0
func (dp *DataPartition) RemoveAll(force bool) (err error)
func (*DataPartition) Replicas ¶
func (dp *DataPartition) Replicas() []string
func (*DataPartition) SetForbidden ¶ added in v1.34.0
func (dp *DataPartition) SetForbidden(status bool)
func (*DataPartition) SetMinAppliedID ¶
func (dp *DataPartition) SetMinAppliedID(id uint64)
func (*DataPartition) SetRepairBlockSize ¶ added in v1.34.0
func (dp *DataPartition) SetRepairBlockSize(size uint64)
func (*DataPartition) SnapShot ¶
func (dp *DataPartition) SnapShot() (files []*proto.File)
Snapshot returns the snapshot of the data partition.
func (*DataPartition) Snapshot ¶
func (dp *DataPartition) Snapshot() (raftproto.Snapshot, error)
Snapshot persists the in-memory data (as a snapshot) to the disk. Note that the data in each data partition has already been saved on the disk. Therefore there is no need to take the snapshot in this case.
func (*DataPartition) StartRaft ¶
func (dp *DataPartition) StartRaft(isLoad bool) (err error)
StartRaft start raft instance when data partition start or restore.
func (*DataPartition) StartRaftAfterRepair ¶
func (dp *DataPartition) StartRaftAfterRepair(isLoad bool)
StartRaftAfterRepair starts the raft after repairing a partition. It can only happens after all the extent files are repaired by the leader. When the repair is finished, the local dp.partitionSize is same as the leader's dp.partitionSize. The repair task can be done in statusUpdateScheduler->LaunchRepair.
func (*DataPartition) StartRaftLoggingSchedule ¶
func (dp *DataPartition) StartRaftLoggingSchedule()
StartRaftLoggingSchedule starts the task schedule as follows: 1. write the raft applied id into disk. 2. collect the applied ids from raft members. 3. based on the minimum applied id to cutoff and delete the saved raft log in order to free the disk space.
func (*DataPartition) Status ¶
func (dp *DataPartition) Status() int
Status returns the partition status.
func (*DataPartition) Stop ¶
func (dp *DataPartition) Stop()
Stop close the store and the raft store.
func (*DataPartition) StopDecommissionRecover ¶ added in v1.34.0
func (dp *DataPartition) StopDecommissionRecover(stop bool)
func (*DataPartition) String ¶
func (dp *DataPartition) String() (m string)
String returns the string format of the data partition information.
type DataPartitionMetadata ¶
type DataPartitionMetadata struct { VolumeID string PartitionID uint64 PartitionSize int PartitionType int CreateTime string Peers []proto.Peer Hosts []string DataPartitionCreateType int LastTruncateID uint64 ReplicaNum int StopRecover bool VerList []*proto.VolVersionInfo ApplyID uint64 DiskErrCnt uint64 }
func (*DataPartitionMetadata) Validate ¶
func (md *DataPartitionMetadata) Validate() (err error)
type DataPartitionRepairTask ¶
type DataPartitionRepairTask struct { TaskType uint8 ExtentsToBeCreated []*storage.ExtentInfo ExtentsToBeRepaired []*storage.ExtentInfo LeaderTinyDeleteRecordFileSize int64 LeaderAddr string // contains filtered or unexported fields }
DataPartitionRepairTask defines the repair task for the data partition.
func NewDataPartitionRepairTask ¶
func NewDataPartitionRepairTask(extentFiles []*storage.ExtentInfo, tinyDeleteRecordFileSize int64, source, leaderAddr string, extentType uint8) (task *DataPartitionRepairTask)
type Disk ¶
type Disk struct { sync.RWMutex Path string ReadErrCnt uint64 // number of read errors WriteErrCnt uint64 // number of write errors Total uint64 Used uint64 Available uint64 Unallocated uint64 Allocated uint64 MaxErrCnt int // maximum number of errors Status int // disk status such as READONLY ReservedSpace uint64 DiskRdonlySpace uint64 RejectWrite bool DiskErrPartitionSet sync.Map BackupDataPartitions sync.Map BackupReplicaLk sync.RWMutex // contains filtered or unexported fields }
Disk represents the structure of the disk
func NewBrokenDisk ¶ added in v1.34.0
func (*Disk) AddBackupPartitionDir ¶ added in v1.34.0
func (*Disk) AddDiskErrPartition ¶ added in v1.34.0
func (*Disk) AttachDataPartition ¶
func (d *Disk) AttachDataPartition(dp *DataPartition)
AttachDataPartition adds a data partition to the partition map.
func (*Disk) CheckDiskError ¶ added in v1.34.0
func (*Disk) DataPartitionList ¶
DataPartitionList returns a list of the data partitions
func (*Disk) DetachDataPartition ¶
func (d *Disk) DetachDataPartition(dp *DataPartition)
DetachDataPartition removes a data partition from the partition map.
func (*Disk) ForceExitRaftStore ¶ added in v1.1.1
func (d *Disk) ForceExitRaftStore()
func (*Disk) GetBackupPartitionDirList ¶ added in v1.34.0
func (d *Disk) GetBackupPartitionDirList() (backupInfos []proto.BackupDataPartitionInfo)
func (*Disk) GetDataPartition ¶
func (d *Disk) GetDataPartition(partitionID uint64) (partition *DataPartition)
GetDataPartition returns the data partition based on the given partition ID.
func (*Disk) GetDataPartitionCount ¶ added in v1.34.0
func (*Disk) GetDecommissionStatus ¶ added in v1.34.0
func (*Disk) GetDiskErrPartitionCount ¶ added in v1.34.0
func (*Disk) GetDiskErrPartitionList ¶ added in v1.34.0
func (*Disk) GetDiskPartition ¶ added in v1.34.0
func (d *Disk) GetDiskPartition() *disk.PartitionStat
func (*Disk) HasDiskErrPartition ¶ added in v1.34.0
func (*Disk) MarkDecommissionStatus ¶ added in v1.34.0
func (*Disk) PartitionCount ¶
PartitionCount returns the number of partitions in the partition map.
func (*Disk) QueryExtentRepairReadLimitStatus ¶ added in v1.34.0
func (*Disk) ReleaseReadExtentToken ¶ added in v1.34.0
func (d *Disk) ReleaseReadExtentToken()
func (*Disk) RequireReadExtentToken ¶ added in v1.34.0
func (*Disk) RestorePartition ¶
func (d *Disk) RestorePartition(visitor PartitionVisitor) (err error)
RestorePartition reads the files stored on the local disk and restores the data partitions.
func (*Disk) SetExtentRepairReadLimitStatus ¶ added in v1.34.0
type DiskExtentReadLimitInfo ¶ added in v1.34.0
type DiskExtentReadLimitStatusResponse ¶ added in v1.34.0
type DiskExtentReadLimitStatusResponse struct {
Infos []DiskExtentReadLimitInfo `json:"infos"`
}
type GcExtent ¶ added in v1.34.0
type GcExtent struct { *storage.ExtentInfo GcStatus string `json:"gc_status"` }
type ItemIterator ¶
type ItemIterator struct {
// contains filtered or unexported fields
}
func NewItemIterator ¶
func NewItemIterator(applyID uint64) *ItemIterator
NewItemIterator creates a new item iterator.
func (*ItemIterator) ApplyIndex ¶
func (si *ItemIterator) ApplyIndex() uint64
ApplyIndex returns the appliedID
func (*ItemIterator) Next ¶
func (si *ItemIterator) Next() (data []byte, err error)
Next returns the next item in the iterator.
type LimiterStatus ¶ added in v1.34.0
type MetaMultiSnapshotInfo ¶ added in v1.34.0
MetaMultiSnapshotInfo
type PartitionVisitor ¶
type PartitionVisitor func(dp *DataPartition)
type PersistApplyIdRequest ¶ added in v1.34.0
type PersistApplyIdRequest struct {
// contains filtered or unexported fields
}
type RaftCmdItem ¶
func UnmarshalRaftCmd ¶ added in v1.34.0
func UnmarshalRaftCmd(raw []byte) (raftOpItem *RaftCmdItem, err error)
type ReplicaInfo ¶ added in v1.34.0
type SimpleVolView ¶ added in v1.34.0
type SimpleVolView struct {
// contains filtered or unexported fields
}
type SpaceManager ¶
type SpaceManager struct {
// contains filtered or unexported fields
}
SpaceManager manages the disk space.
func NewSpaceManager ¶
func NewSpaceManager(dataNode *DataNode) *SpaceManager
NewSpaceManager creates a new space manager.
func (*SpaceManager) AttachPartition ¶ added in v1.4.0
func (manager *SpaceManager) AttachPartition(dp *DataPartition)
func (*SpaceManager) CreatePartition ¶
func (manager *SpaceManager) CreatePartition(request *proto.CreateDataPartitionRequest) (dp *DataPartition, err error)
func (*SpaceManager) DeletePartition ¶
func (manager *SpaceManager) DeletePartition(dpID uint64, force bool) (err error)
DeletePartition deletes a partition based on the partition id.
func (*SpaceManager) DetachDataPartition ¶ added in v1.4.0
func (manager *SpaceManager) DetachDataPartition(partitionID uint64)
DetachDataPartition removes a data partition from the partition map.
func (*SpaceManager) FillIoUtils ¶ added in v1.34.0
func (manager *SpaceManager) FillIoUtils(samples map[string]loadutil.DiskIoSample)
func (*SpaceManager) GetAllDiskPartitions ¶ added in v1.34.0
func (manager *SpaceManager) GetAllDiskPartitions() []*disk.PartitionStat
func (*SpaceManager) GetClusterID ¶
func (manager *SpaceManager) GetClusterID() (clusterID string)
func (*SpaceManager) GetDisk ¶
func (manager *SpaceManager) GetDisk(path string) (d *Disk, err error)
func (*SpaceManager) GetDiskUtils ¶ added in v1.34.0
func (manager *SpaceManager) GetDiskUtils() map[string]float64
func (*SpaceManager) GetDisks ¶
func (manager *SpaceManager) GetDisks() (disks []*Disk)
func (*SpaceManager) GetNodeID ¶
func (manager *SpaceManager) GetNodeID() (nodeID uint64)
func (*SpaceManager) GetRaftStore ¶
func (manager *SpaceManager) GetRaftStore() (raftStore raftstore.RaftStore)
func (*SpaceManager) LoadBrokenDisk ¶ added in v1.34.0
func (*SpaceManager) Partition ¶
func (manager *SpaceManager) Partition(partitionID uint64) (dp *DataPartition)
func (*SpaceManager) RangePartitions ¶
func (manager *SpaceManager) RangePartitions(f func(partition *DataPartition, testID string) bool, reqID string)
func (*SpaceManager) SetClusterID ¶
func (manager *SpaceManager) SetClusterID(clusterID string)
func (*SpaceManager) SetCurrentLoadDpLimit ¶ added in v1.34.0
func (manager *SpaceManager) SetCurrentLoadDpLimit(limit int)
func (*SpaceManager) SetCurrentStopDpLimit ¶ added in v1.34.0
func (manager *SpaceManager) SetCurrentStopDpLimit(limit int)
func (*SpaceManager) SetNodeID ¶
func (manager *SpaceManager) SetNodeID(nodeID uint64)
func (*SpaceManager) SetRaftStore ¶
func (manager *SpaceManager) SetRaftStore(raftStore raftstore.RaftStore)
func (*SpaceManager) StartDiskSample ¶ added in v1.34.0
func (manager *SpaceManager) StartDiskSample()
func (*SpaceManager) Stats ¶
func (manager *SpaceManager) Stats() *Stats
func (*SpaceManager) Stop ¶
func (manager *SpaceManager) Stop()
type Stats ¶
type Stats struct { Zone string ConnectionCnt int64 ClusterID string TCPAddr string Start time.Time Total uint64 Used uint64 Available uint64 // available space TotalPartitionSize uint64 // dataPartitionCnt * dataPartitionSize RemainingCapacityToCreatePartition uint64 CreatedPartitionCnt uint64 LackPartitionsInMem uint64 LackPartitionsInDisk uint64 // the maximum capacity among all the disks that can be used to create partition MaxCapacityToCreatePartition uint64 sync.Mutex // contains filtered or unexported fields }
Stats defines various metrics that will be collected during the execution.
func (*Stats) GetConnectionCount ¶
GetConnectionCount gets the connection count.
func (*Stats) RemoveConnection ¶
func (s *Stats) RemoveConnection()
RemoveConnection removes a connection.