datanode

package
v1.34.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 9, 2024 License: Apache-2.0 Imports: 49 Imported by: 1

Documentation

Index

Constants

View Source
const (
	IntervalToUpdateReplica       = 600              // interval to update the replica
	IntervalToUpdatePartitionSize = 60 * time.Second // interval to update the partition size
	NumOfFilesToRecoverInParallel = 10               // number of files to be recovered simultaneously
)
View Source
const (
	ReadFlag  = 1
	WriteFlag = 2
)

cmd response

View Source
const (
	ActionNotifyFollowerToRepair        = "ActionNotifyFollowerRepair"
	ActionStreamRead                    = "ActionStreamRead"
	ActionStreamFollowerRead            = "ActionStreamFollowerRead"
	ActionCreateExtent                  = "ActionCreateExtent:"
	ActionMarkDelete                    = "ActionMarkDelete:"
	ActionGetAllExtentWatermarks        = "ActionGetAllExtentWatermarks:"
	ActionWrite                         = "ActionWrite:"
	ActionRepair                        = "ActionRepair:"
	ActionDecommissionPartition         = "ActionDecommissionPartition"
	ActionAddDataPartitionRaftMember    = "ActionAddDataPartitionRaftMember"
	ActionRemoveDataPartitionRaftMember = "ActionRemoveDataPartitionRaftMember"
	ActionDataPartitionTryToLeader      = "ActionDataPartitionTryToLeader"

	ActionCreateDataPartition         = "ActionCreateDataPartition"
	ActionLoadDataPartition           = "ActionLoadDataPartition"
	ActionDeleteDataPartition         = "ActionDeleteDataPartition"
	ActionStreamReadTinyDeleteRecord  = "ActionStreamReadTinyDeleteRecord"
	ActionSyncTinyDeleteRecord        = "ActionSyncTinyDeleteRecord"
	ActionStreamReadTinyExtentRepair  = "ActionStreamReadTinyExtentRepair"
	ActionBatchMarkDelete             = "ActionBatchMarkDelete"
	ActionBatchLockNormalExtent       = "ActionBatchLockNormalExtent"
	ActionUpdateVersion               = "ActionUpdateVersion"
	ActionStopDataPartitionRepair     = "ActionStopDataPartitionRepair"
	ActionRecoverDataReplicaMeta      = "ActionRecoverDataReplicaMeta"
	ActionRecoverBackupDataReplica    = "ActionRecoverBackupDataReplica"
	ActionRecoverBadDisk              = "ActionRecoverBadDisk"
	ActionQueryBadDiskRecoverProgress = "ActionQueryBadDiskRecoverProgress"
	ActionDeleteBackupDirectories     = "ActionDeleteBackupDirectories"
)

Action description

View Source
const (
	RepairRead = true
	StreamRead = false
)
View Source
const (
	EmptyResponse                                = 'E'
	TinyExtentRepairReadResponseArgLen           = 17
	NormalExtentWithHoleRepairReadResponseArgLen = 17
	MaxSyncTinyDeleteBufferSize                  = 2400000
	MaxFullSyncTinyDeleteTime                    = 3600 * 24
	MinTinyExtentDeleteRecordSyncSize            = 4 * 1024 * 1024
)
View Source
const (
	ExpiredPartitionPrefix    = "expired_"
	ExpiredPartitionExistTime = time.Hour * time.Duration(24*7)
	BackupPartitionPrefix     = "backup_"
)
View Source
const (
	SyncTinyDeleteRecordFromLeaderOnEveryDisk = 5
	MaxExtentRepairReadLimit                  = 1 //
)
View Source
const (
	DiskRecoverStop uint32 = iota
	DiskRecoverStart
)
View Source
const (
	StatPeriod                 = time.Minute * time.Duration(1)
	MetricPartitionIOName      = "dataPartitionIO"
	MetricPartitionIOBytesName = "dataPartitionIOBytes"
	MetricLackDpCount          = "lackDataPartitionCount"
	MetricCapacityToCreateDp   = "capacityToCreateDp"
	MetricConnectionCnt        = "connectionCnt"
	MetricDpCount              = "dataPartitionCount"
	MetricTotalDpSize          = "totalDpSize"
	MetricCapacity             = "capacity"
)
View Source
const (
	UpdateNodeInfoTicket = 1 * time.Minute

	RepairTimeOut   = time.Hour * 24
	MaxRepairErrCnt = 1000
)
View Source
const (
	DataPartitionPrefix           = "datapartition"
	CachePartitionPrefix          = "cachepartition"
	PreLoadPartitionPrefix        = "preloadpartition"
	DataPartitionMetadataFileName = "META"
	TempMetadataFileName          = ".meta"
	ApplyIndexFile                = "APPLY"
	TempApplyIndexFile            = ".apply"
	TimeLayout                    = "2006-01-02 15:04:05"
)
View Source
const (
	RaftStatusStopped = 0
	RaftStatusRunning = 1
)
View Source
const (
	DefaultZoneName            = proto.DefaultZoneName
	DefaultRaftDir             = "raft"
	DefaultRaftLogsToRetain    = 10 // Count of raft logs per data partition
	DefaultDiskMaxErr          = 1
	DefaultDiskRetainMin       = 5 * util.GB // GB
	DefaultNameResolveInterval = 1           // minutes

	DefaultDiskUnavailableErrorCount          = 5
	DefaultDiskUnavailablePartitionErrorCount = 3
)
View Source
const (
	ConfigKeyLocalIP       = "localIP"         // string
	ConfigKeyPort          = "port"            // int
	ConfigKeyMasterAddr    = "masterAddr"      // array
	ConfigKeyZone          = "zoneName"        // string
	ConfigKeyDisks         = "disks"           // array
	ConfigKeyRaftDir       = "raftDir"         // string
	ConfigKeyRaftHeartbeat = "raftHeartbeat"   // string
	ConfigKeyRaftReplica   = "raftReplica"     // string
	CfgTickInterval        = "tickInterval"    // int
	CfgRaftRecvBufSize     = "raftRecvBufSize" // int
	ConfigKeyLogDir        = "logDir"          // string

	ConfigKeyDiskPath = "diskPath" // string

	/*
	 * Metrics Degrade Level
	 * minus value: turn off metrics collection.
	 * 0 or 1: full metrics.
	 * 2: 1/2 of the metrics will be collected.
	 * 3: 1/3 of the metrics will be collected.
	 * ...
	 */
	CfgMetricsDegrade = "metricsDegrade" // int

	CfgDiskRdonlySpace = "diskRdonlySpace" // int
	// smux Config
	ConfigKeyEnableSmuxClient  = "enableSmuxConnPool" // bool
	ConfigKeySmuxPortShift     = "smuxPortShift"      // int
	ConfigKeySmuxMaxConn       = "smuxMaxConn"        // int
	ConfigKeySmuxStreamPerConn = "smuxStreamPerConn"  // int
	ConfigKeySmuxMaxBuffer     = "smuxMaxBuffer"      // int
	ConfigKeySmuxTotalStream   = "sumxTotalStream"    // int

	// rate limit control enable
	ConfigDiskQosEnable = "diskQosEnable" // bool
	ConfigDiskReadIocc  = "diskReadIocc"  // int
	ConfigDiskReadIops  = "diskReadIops"  // int
	ConfigDiskReadFlow  = "diskReadFlow"  // int
	ConfigDiskWriteIocc = "diskWriteIocc" // int
	ConfigDiskWriteIops = "diskWriteIops" // int
	ConfigDiskWriteFlow = "diskWriteFlow" // int

	// load/stop dp limit
	ConfigDiskCurrentLoadDpLimit = "diskCurrentLoadDpLimit"
	ConfigDiskCurrentStopDpLimit = "diskCurrentStopDpLimit"
	// disk read extent limit
	ConfigEnableDiskReadExtentLimit = "enableDiskReadRepairExtentLimit" // bool

	ConfigServiceIDKey = "serviceIDKey"

	ConfigEnableGcTimer = "enableGcTimer"

	// disk status becomes unavailable if disk error partition count reaches this value
	ConfigKeyDiskUnavailablePartitionErrorCount = "diskUnavailablePartitionErrorCount"
)
View Source
const (
	BinaryMarshalMagicVersion = 0xFF
)
View Source
const (
	BufferWrite = false
)
View Source
const (
	DecommissionDiskMark = "decommissionDiskMark"
)
View Source
const DefaultCurrentLoadDpLimit = 4
View Source
const DefaultStopDpLimit = 4
View Source
const DiskErrNotAssociatedWithPartition uint64 = 0 // use 0 for disk error without any data partition
View Source
const (
	DiskSectorSize = 512
)

Sector size

View Source
const DiskSelectMaxStraw = 65536
View Source
const (
	DiskStatusFile = ".diskStatus"
)
View Source
const (
	FinishLoadDataPartitionExtentHeader = 1
)

Status of load data partition extent header

View Source
const (
	IsReleased = 1
)

Tiny extent has been put back to store

View Source
const (
	MinAvaliTinyExtentCnt = 5
)
View Source
const (
	MinTinyExtentsToRepair = 10 // minimum number of tiny extents to repair
)

Apply the raft log operation. Currently we only have the random write operation.

View Source
const (
	ModuleName = "dataNode"
)
View Source
const (
	NetworkProtocol = "tcp"
)

Network protocol

View Source
const (
	RaftNotStarted = "RaftNotStarted"
)

Error code

Variables

View Source
var (
	// RegexpDataPartitionDir validates the directory name of a data partition.
	RegexpDataPartitionDir, _               = regexp.Compile(`^datapartition_(\d)+_(\d)+$`)
	RegexpCachePartitionDir, _              = regexp.Compile(`^cachepartition_(\d)+_(\d)+$`)
	RegexpPreLoadPartitionDir, _            = regexp.Compile(`^preloadpartition_(\d)+_(\d)+$`)
	RegexpExpiredDataPartitionDir, _        = regexp.Compile(`^expired_datapartition_(\d)+_(\d)+$`)
	RegexpBackupDataPartitionDirToDelete, _ = regexp.Compile(`backup_datapartition_(\d+)_(\d+)-(\d+)`)
)
View Source
var (
	MaxExtentRepairLimit = 20000
	MinExtentRepairLimit = 5
	CurExtentRepairLimit = MaxExtentRepairLimit
)
View Source
var (
	ErrIncorrectStoreType          = errors.New("Incorrect store type")
	ErrNoSpaceToCreatePartition    = errors.New("No disk space to create a data partition")
	ErrNewSpaceManagerFailed       = errors.New("Creater new space manager failed")
	ErrGetMasterDatanodeInfoFailed = errors.New("Failed to get datanode info from master")

	LocalIP string

	// MasterClient        = masterSDK.NewMasterClient(nil, false)
	MasterClient *masterSDK.MasterCLientWithResolver
)
View Source
var AutoRepairStatus = true
View Source
var ErrForbiddenDataPartition = errors.New("the data partition is forbidden")

Functions

func DeleteLimiterWait added in v1.34.0

func DeleteLimiterWait()

func GetIoMetricLabels added in v1.34.0

func GetIoMetricLabels(partition *DataPartition, tp string) map[string]string

func GetStatusMessage added in v1.34.0

func GetStatusMessage(status int) string

func IsDiskErr

func IsDiskErr(errMsg string) bool

func MarshalRaftCmd added in v1.34.0

func MarshalRaftCmd(raftOpItem *RaftCmdItem) (raw []byte, err error)

func MarshalRandWriteRaftLog added in v1.4.0

func MarshalRandWriteRaftLog(opcode uint8, extentID uint64, offset, size int64, data []byte, crc uint32) (result []byte, err error)

func NewPacketToBroadcastMinAppliedID

func NewPacketToBroadcastMinAppliedID(partitionID uint64, minAppliedID uint64) (p *repl.Packet)

NewPacketToBroadcastMinAppliedID returns a new packet to broadcast the min applied ID.

func NewPacketToGetAppliedID

func NewPacketToGetAppliedID(partitionID uint64) (p *repl.Packet)

NewPacketToGetAppliedID returns a new packet to get the applied ID.

func NewPacketToGetMaxExtentIDAndPartitionSIze added in v1.4.0

func NewPacketToGetMaxExtentIDAndPartitionSIze(partitionID uint64) (p *repl.Packet)

NewPacketToGetPartitionSize returns a new packet to get the partition size.

func NewPacketToGetPartitionSize

func NewPacketToGetPartitionSize(partitionID uint64) (p *repl.Packet)

NewPacketToGetPartitionSize returns a new packet to get the partition size.

func UnmarshalOldVersionRaftLog added in v1.4.0

func UnmarshalOldVersionRaftLog(raw []byte) (opItem *rndWrtOpItem, err error)

func UnmarshalOldVersionRandWriteOpItem added in v1.4.0

func UnmarshalOldVersionRandWriteOpItem(raw []byte) (result *rndWrtOpItem, err error)

func UnmarshalRandWriteRaftLog added in v1.4.0

func UnmarshalRandWriteRaftLog(raw []byte) (opItem *rndWrtOpItem, err error)

RandomWriteSubmit submits the proposal to raft.

Types

type DataNode

type DataNode struct {
	// contains filtered or unexported fields
}

DataNode defines the structure of a data node.

func NewServer

func NewServer() *DataNode

func (*DataNode) ExtentWithHoleRepairRead added in v1.34.0

func (s *DataNode) ExtentWithHoleRepairRead(request *repl.Packet, connect net.Conn, getReplyPacket func() repl.PacketInterface)

Handle tinyExtentRepairRead packet.

func (*DataNode) GetDpMaxRepairErrCnt added in v1.34.0

func (m *DataNode) GetDpMaxRepairErrCnt() uint64

func (*DataNode) HasStarted added in v1.34.0

func (s *DataNode) HasStarted() bool

func (*DataNode) NormalSnapshotExtentRepairRead added in v1.34.0

func (s *DataNode) NormalSnapshotExtentRepairRead(request *repl.Packet, connect net.Conn)

func (*DataNode) OperatePacket

func (s *DataNode) OperatePacket(p *repl.Packet, c net.Conn) (err error)

func (*DataNode) Post

func (s *DataNode) Post(p *repl.Packet) error

func (*DataNode) Prepare

func (s *DataNode) Prepare(p *repl.Packet) (err error)

func (*DataNode) Shutdown

func (s *DataNode) Shutdown()

Shutdown shuts down the current data node.

func (*DataNode) Start

func (s *DataNode) Start(cfg *config.Config) (err error)

func (*DataNode) Sync

func (s *DataNode) Sync()

Sync keeps data node in sync.

type DataNodeID added in v1.34.0

type DataNodeID struct {
	Addr string
	ID   uint64
}

type DataNodeInfo added in v1.4.0

type DataNodeInfo struct {
	Addr                                  string
	PersistenceDataPartitions             []uint64
	PersistenceDataPartitionsWithDiskPath []proto.DataPartitionDiskInfo
}

type DataNodeMetrics added in v1.34.0

type DataNodeMetrics struct {
	MetricIOBytes            *exporter.Counter
	MetricLackDpCount        *exporter.GaugeVec
	MetricCapacityToCreateDp *exporter.GaugeVec
	MetricConnectionCnt      *exporter.Gauge
	MetricDpCount            *exporter.Gauge
	MetricTotalDpSize        *exporter.Gauge
	MetricCapacity           *exporter.GaugeVec
	// contains filtered or unexported fields
}

type DataPartition

type DataPartition struct {
	DataPartitionCreateType int

	PersistApplyIdChan chan PersistApplyIdRequest
	// contains filtered or unexported fields
}

func CreateDataPartition

func CreateDataPartition(dpCfg *dataPartitionCfg, disk *Disk, request *proto.CreateDataPartitionRequest) (dp *DataPartition, err error)

func LoadDataPartition

func LoadDataPartition(partitionDir string, disk *Disk) (dp *DataPartition, err error)

LoadDataPartition loads and returns a partition instance based on the specified directory. It reads the partition metadata file stored under the specified directory and creates the partition instance.

func (*DataPartition) Apply

func (dp *DataPartition) Apply(command []byte, index uint64) (resp interface{}, err error)

Apply puts the data onto the disk.

func (*DataPartition) ApplyMemberChange

func (dp *DataPartition) ApplyMemberChange(confChange *raftproto.ConfChange, index uint64) (resp interface{}, err error)

ApplyMemberChange supports adding new raft member or deleting an existing raft member. It does not support updating an existing member at this point.

func (*DataPartition) ApplyRandomWrite

func (dp *DataPartition) ApplyRandomWrite(command []byte, raftApplyID uint64) (respStatus interface{}, err error)

ApplyRandomWrite random write apply

func (*DataPartition) ApplySnapshot

func (dp *DataPartition) ApplySnapshot(peers []raftproto.Peer, iterator raftproto.SnapIterator) (err error)

ApplySnapshot asks the raft leader for the snapshot data to recover the contents on the local disk.

func (*DataPartition) Available

func (dp *DataPartition) Available() int

Available returns the available space.

func (*DataPartition) CanRemoveRaftMember added in v1.4.0

func (dp *DataPartition) CanRemoveRaftMember(peer proto.Peer, force bool) error

func (*DataPartition) ChangeRaftMember

func (dp *DataPartition) ChangeRaftMember(changeType raftProto.ConfChangeType, peer raftProto.Peer, context []byte) (resp interface{}, err error)

ChangeRaftMember is a wrapper function of changing the raft member.

func (*DataPartition) CheckLeader

func (dp *DataPartition) CheckLeader(request *repl.Packet, connect net.Conn) (err error)

CheckLeader checks if itself is the leader during read

func (*DataPartition) CheckWriteVer added in v1.34.0

func (dp *DataPartition) CheckWriteVer(p *repl.Packet) (err error)

func (*DataPartition) Del

func (dp *DataPartition) Del(key interface{}) (interface{}, error)

Del deletes the raft log based on the given key. It is not needed for replicating data partition.

func (*DataPartition) Disk

func (dp *DataPartition) Disk() *Disk

Disk returns the disk instance.

func (*DataPartition) DoExtentStoreRepair

func (dp *DataPartition) DoExtentStoreRepair(repairTask *DataPartitionRepairTask)

DoExtentStoreRepair performs the repairs of the extent store. 1. when the extent size is smaller than the max size on the record, start to repair the missing part. 2. if the extent does not even exist, create the extent first, and then repair.

func (*DataPartition) DoRepair

func (dp *DataPartition) DoRepair(repairTasks []*DataPartitionRepairTask)

DoRepair asks the leader to perform the repair tasks.

func (*DataPartition) ExtentStore

func (dp *DataPartition) ExtentStore() *storage.ExtentStore

func (*DataPartition) ExtentWithHoleRepairRead added in v1.34.0

func (dp *DataPartition) ExtentWithHoleRepairRead(request repl.PacketInterface, connect net.Conn, getReplyPacket func() repl.PacketInterface)

func (*DataPartition) ForceLoadHeader

func (dp *DataPartition) ForceLoadHeader()

func (*DataPartition) ForceSetDataPartitionToFininshLoad added in v1.34.0

func (dp *DataPartition) ForceSetDataPartitionToFininshLoad()

func (*DataPartition) ForceSetDataPartitionToLoadding added in v1.34.0

func (dp *DataPartition) ForceSetDataPartitionToLoadding()

func (*DataPartition) ForceSetRaftRunning added in v1.34.0

func (dp *DataPartition) ForceSetRaftRunning()

func (*DataPartition) Get

func (dp *DataPartition) Get(key interface{}) (interface{}, error)

Get returns the raft log based on the given key. It is not needed for replicating data partition.

func (*DataPartition) GetAppliedID

func (dp *DataPartition) GetAppliedID() (id uint64)

func (*DataPartition) GetExtentCount

func (dp *DataPartition) GetExtentCount() int

func (*DataPartition) GetExtentCountWithoutLock added in v1.34.0

func (dp *DataPartition) GetExtentCountWithoutLock() int

func (*DataPartition) GetRepairBlockSize added in v1.34.0

func (dp *DataPartition) GetRepairBlockSize() (size uint64)

func (*DataPartition) HandleFatalEvent

func (dp *DataPartition) HandleFatalEvent(err *raft.FatalError)

HandleFatalEvent notifies the application when panic happens.

func (*DataPartition) HandleLeaderChange

func (dp *DataPartition) HandleLeaderChange(leader uint64)

HandleLeaderChange notifies the application when the raft leader has changed.

func (*DataPartition) HandleVersionOp added in v1.34.0

func (partition *DataPartition) HandleVersionOp(req *proto.MultiVersionOpRequest) (err error)

func (*DataPartition) IsDataPartitionLoadFin added in v1.34.0

func (dp *DataPartition) IsDataPartitionLoadFin() bool

func (*DataPartition) IsDataPartitionLoading added in v1.34.0

func (dp *DataPartition) IsDataPartitionLoading() bool

func (*DataPartition) IsEquareCreateDataPartitionRequst added in v1.34.0

func (dp *DataPartition) IsEquareCreateDataPartitionRequst(request *proto.CreateDataPartitionRequest) (err error)

func (*DataPartition) IsExistPeer added in v1.34.0

func (dp *DataPartition) IsExistPeer(peer proto.Peer) bool

func (*DataPartition) IsExistReplica added in v1.34.0

func (dp *DataPartition) IsExistReplica(addr string) bool

func (*DataPartition) IsExistReplicaWithNodeId added in v1.34.0

func (dp *DataPartition) IsExistReplicaWithNodeId(addr string, nodeID uint64) bool

func (*DataPartition) IsForbidden added in v1.34.0

func (dp *DataPartition) IsForbidden() bool

func (*DataPartition) IsRaftLeader

func (dp *DataPartition) IsRaftLeader() (addr string, ok bool)

IsRaftLeader tells if the given address belongs to the raft leader.

func (*DataPartition) LaunchRepair

func (dp *DataPartition) LaunchRepair(extentType uint8)

LaunchRepair launches the repair of extents.

func (*DataPartition) Load

func (dp *DataPartition) Load() (response *proto.LoadDataPartitionResponse)

func (*DataPartition) LoadAppliedID

func (dp *DataPartition) LoadAppliedID() (err error)

LoadAppliedID loads the applied IDs to the memory.

func (*DataPartition) NormalExtentRepairRead added in v1.34.0

func (dp *DataPartition) NormalExtentRepairRead(p repl.PacketInterface, connect net.Conn, isRepairRead bool,
	metrics *DataNodeMetrics, makeRspPacket repl.MakeStreamReadResponsePacket,
) (err error)

func (*DataPartition) NotifyExtentRepair

func (dp *DataPartition) NotifyExtentRepair(members []*DataPartitionRepairTask) (err error)

NotifyExtentRepair notifies the followers to repair.

func (*DataPartition) Path

func (dp *DataPartition) Path() string

func (*DataPartition) PersistMetadata

func (dp *DataPartition) PersistMetadata() (err error)

PersistMetadata persists the file metadata on the disk.

func (*DataPartition) Put

func (dp *DataPartition) Put(key interface{}, val interface{}) (resp interface{}, err error)

Put submits the raft log to the raft store.

func (*DataPartition) RandomWriteSubmit

func (dp *DataPartition) RandomWriteSubmit(pkg *repl.Packet) (err error)

RandomWriteSubmit submits the proposal to raft.

func (*DataPartition) ReloadSnapshot

func (dp *DataPartition) ReloadSnapshot()

func (*DataPartition) RemoveAll added in v1.34.0

func (dp *DataPartition) RemoveAll(force bool) (err error)

func (*DataPartition) Replicas

func (dp *DataPartition) Replicas() []string

func (*DataPartition) SetForbidden added in v1.34.0

func (dp *DataPartition) SetForbidden(status bool)

func (*DataPartition) SetMinAppliedID

func (dp *DataPartition) SetMinAppliedID(id uint64)

func (*DataPartition) SetRepairBlockSize added in v1.34.0

func (dp *DataPartition) SetRepairBlockSize(size uint64)

func (*DataPartition) Size

func (dp *DataPartition) Size() int

Size returns the partition size.

func (*DataPartition) SnapShot

func (dp *DataPartition) SnapShot() (files []*proto.File)

Snapshot returns the snapshot of the data partition.

func (*DataPartition) Snapshot

func (dp *DataPartition) Snapshot() (raftproto.Snapshot, error)

Snapshot persists the in-memory data (as a snapshot) to the disk. Note that the data in each data partition has already been saved on the disk. Therefore there is no need to take the snapshot in this case.

func (*DataPartition) StartRaft

func (dp *DataPartition) StartRaft(isLoad bool) (err error)

StartRaft start raft instance when data partition start or restore.

func (*DataPartition) StartRaftAfterRepair

func (dp *DataPartition) StartRaftAfterRepair(isLoad bool)

StartRaftAfterRepair starts the raft after repairing a partition. It can only happens after all the extent files are repaired by the leader. When the repair is finished, the local dp.partitionSize is same as the leader's dp.partitionSize. The repair task can be done in statusUpdateScheduler->LaunchRepair.

func (*DataPartition) StartRaftLoggingSchedule

func (dp *DataPartition) StartRaftLoggingSchedule()

StartRaftLoggingSchedule starts the task schedule as follows: 1. write the raft applied id into disk. 2. collect the applied ids from raft members. 3. based on the minimum applied id to cutoff and delete the saved raft log in order to free the disk space.

func (*DataPartition) Status

func (dp *DataPartition) Status() int

Status returns the partition status.

func (*DataPartition) Stop

func (dp *DataPartition) Stop()

Stop close the store and the raft store.

func (*DataPartition) StopDecommissionRecover added in v1.34.0

func (dp *DataPartition) StopDecommissionRecover(stop bool)

func (*DataPartition) String

func (dp *DataPartition) String() (m string)

String returns the string format of the data partition information.

func (*DataPartition) Submit added in v1.34.0

func (dp *DataPartition) Submit(val []byte) (retCode uint8, err error)

func (*DataPartition) Used

func (dp *DataPartition) Used() int

Used returns the used space.

type DataPartitionMetadata

type DataPartitionMetadata struct {
	VolumeID                string
	PartitionID             uint64
	PartitionSize           int
	PartitionType           int
	CreateTime              string
	Peers                   []proto.Peer
	Hosts                   []string
	DataPartitionCreateType int
	LastTruncateID          uint64
	ReplicaNum              int
	StopRecover             bool
	VerList                 []*proto.VolVersionInfo
	ApplyID                 uint64
	DiskErrCnt              uint64
}

func (*DataPartitionMetadata) Validate

func (md *DataPartitionMetadata) Validate() (err error)

type DataPartitionRepairTask

type DataPartitionRepairTask struct {
	TaskType uint8

	ExtentsToBeCreated             []*storage.ExtentInfo
	ExtentsToBeRepaired            []*storage.ExtentInfo
	LeaderTinyDeleteRecordFileSize int64
	LeaderAddr                     string
	// contains filtered or unexported fields
}

DataPartitionRepairTask defines the repair task for the data partition.

func NewDataPartitionRepairTask

func NewDataPartitionRepairTask(extentFiles []*storage.ExtentInfo, tinyDeleteRecordFileSize int64, source, leaderAddr string, extentType uint8) (task *DataPartitionRepairTask)

type Disk

type Disk struct {
	sync.RWMutex
	Path        string
	ReadErrCnt  uint64 // number of read errors
	WriteErrCnt uint64 // number of write errors

	Total       uint64
	Used        uint64
	Available   uint64
	Unallocated uint64
	Allocated   uint64

	MaxErrCnt       int // maximum number of errors
	Status          int // disk status such as READONLY
	ReservedSpace   uint64
	DiskRdonlySpace uint64

	RejectWrite bool

	DiskErrPartitionSet sync.Map

	BackupDataPartitions sync.Map

	BackupReplicaLk sync.RWMutex
	// contains filtered or unexported fields
}

Disk represents the structure of the disk

func NewBrokenDisk added in v1.34.0

func NewBrokenDisk(path string, reservedSpace, diskRdonlySpace uint64, maxErrCnt int, space *SpaceManager, diskEnableReadRepairExtentLimit bool) (d *Disk)

func NewDisk

func NewDisk(path string, reservedSpace, diskRdonlySpace uint64, maxErrCnt int, space *SpaceManager,
	diskEnableReadRepairExtentLimit bool,
) (d *Disk, err error)

func (*Disk) AddBackupPartitionDir added in v1.34.0

func (d *Disk) AddBackupPartitionDir(id uint64)

func (*Disk) AddDiskErrPartition added in v1.34.0

func (d *Disk) AddDiskErrPartition(dpId uint64)

func (*Disk) AddSize added in v1.4.0

func (d *Disk) AddSize(size uint64)

func (*Disk) AttachDataPartition

func (d *Disk) AttachDataPartition(dp *DataPartition)

AttachDataPartition adds a data partition to the partition map.

func (*Disk) CanWrite added in v1.34.0

func (d *Disk) CanWrite() bool

func (*Disk) CheckDiskError added in v1.34.0

func (d *Disk) CheckDiskError(err error, rwFlag uint8)

func (*Disk) DataPartitionList

func (d *Disk) DataPartitionList() (partitionIDs []uint64)

DataPartitionList returns a list of the data partitions

func (*Disk) DetachDataPartition

func (d *Disk) DetachDataPartition(dp *DataPartition)

DetachDataPartition removes a data partition from the partition map.

func (*Disk) ForceExitRaftStore added in v1.1.1

func (d *Disk) ForceExitRaftStore()

func (*Disk) GetBackupPartitionDirList added in v1.34.0

func (d *Disk) GetBackupPartitionDirList() (backupInfos []proto.BackupDataPartitionInfo)

func (*Disk) GetDataPartition

func (d *Disk) GetDataPartition(partitionID uint64) (partition *DataPartition)

GetDataPartition returns the data partition based on the given partition ID.

func (*Disk) GetDataPartitionCount added in v1.34.0

func (d *Disk) GetDataPartitionCount() int

func (*Disk) GetDecommissionStatus added in v1.34.0

func (d *Disk) GetDecommissionStatus() bool

func (*Disk) GetDiskErrPartitionCount added in v1.34.0

func (d *Disk) GetDiskErrPartitionCount() uint64

func (*Disk) GetDiskErrPartitionList added in v1.34.0

func (d *Disk) GetDiskErrPartitionList() (diskErrPartitionList []uint64)

func (*Disk) GetDiskPartition added in v1.34.0

func (d *Disk) GetDiskPartition() *disk.PartitionStat

func (*Disk) HasDiskErrPartition added in v1.34.0

func (d *Disk) HasDiskErrPartition(dpId uint64) bool

func (*Disk) MarkDecommissionStatus added in v1.34.0

func (d *Disk) MarkDecommissionStatus(decommission bool)

func (*Disk) PartitionCount

func (d *Disk) PartitionCount() int

PartitionCount returns the number of partitions in the partition map.

func (*Disk) QueryExtentRepairReadLimitStatus added in v1.34.0

func (d *Disk) QueryExtentRepairReadLimitStatus() (bool, uint64)

func (*Disk) ReleaseReadExtentToken added in v1.34.0

func (d *Disk) ReleaseReadExtentToken()

func (*Disk) RequireReadExtentToken added in v1.34.0

func (d *Disk) RequireReadExtentToken(id uint64) bool

func (*Disk) RestorePartition

func (d *Disk) RestorePartition(visitor PartitionVisitor) (err error)

RestorePartition reads the files stored on the local disk and restores the data partitions.

func (*Disk) SetExtentRepairReadLimitStatus added in v1.34.0

func (d *Disk) SetExtentRepairReadLimitStatus(status bool)

type DiskExtentReadLimitInfo added in v1.34.0

type DiskExtentReadLimitInfo struct {
	DiskPath              string `json:"diskPath"`
	ExtentReadLimitStatus bool   `json:"extentReadLimitStatus"`
	Dp                    uint64 `json:"dp"`
}

type DiskExtentReadLimitStatusResponse added in v1.34.0

type DiskExtentReadLimitStatusResponse struct {
	Infos []DiskExtentReadLimitInfo `json:"infos"`
}

type GcExtent added in v1.34.0

type GcExtent struct {
	*storage.ExtentInfo
	GcStatus string `json:"gc_status"`
}

type ItemIterator

type ItemIterator struct {
	// contains filtered or unexported fields
}

func NewItemIterator

func NewItemIterator(applyID uint64) *ItemIterator

NewItemIterator creates a new item iterator.

func (*ItemIterator) ApplyIndex

func (si *ItemIterator) ApplyIndex() uint64

ApplyIndex returns the appliedID

func (*ItemIterator) Close

func (si *ItemIterator) Close()

Close Closes the iterator.

func (*ItemIterator) Next

func (si *ItemIterator) Next() (data []byte, err error)

Next returns the next item in the iterator.

type LimiterStatus added in v1.34.0

type LimiterStatus struct {
	FlowLimit int
	FlowUsed  int

	IOConcurrency int
	IOQueue       int
	IORunning     int
	IOWaiting     int
}

type MetaMultiSnapshotInfo added in v1.34.0

type MetaMultiSnapshotInfo struct {
	VerSeq uint64
	Status int8
	Ctime  time.Time
}

MetaMultiSnapshotInfo

type PartitionVisitor

type PartitionVisitor func(dp *DataPartition)

type PersistApplyIdRequest added in v1.34.0

type PersistApplyIdRequest struct {
	// contains filtered or unexported fields
}

type RaftCmdItem

type RaftCmdItem struct {
	Op uint32 `json:"op"`
	K  []byte `json:"k"`
	V  []byte `json:"v"`
}

func UnmarshalRaftCmd added in v1.34.0

func UnmarshalRaftCmd(raw []byte) (raftOpItem *RaftCmdItem, err error)

type ReplicaInfo added in v1.34.0

type ReplicaInfo struct {
	Addr string
	Disk string
}

type SimpleVolView added in v1.34.0

type SimpleVolView struct {
	// contains filtered or unexported fields
}

type SpaceManager

type SpaceManager struct {
	// contains filtered or unexported fields
}

SpaceManager manages the disk space.

func NewSpaceManager

func NewSpaceManager(dataNode *DataNode) *SpaceManager

NewSpaceManager creates a new space manager.

func (*SpaceManager) AttachPartition added in v1.4.0

func (manager *SpaceManager) AttachPartition(dp *DataPartition)

func (*SpaceManager) CreatePartition

func (manager *SpaceManager) CreatePartition(request *proto.CreateDataPartitionRequest) (dp *DataPartition, err error)

func (*SpaceManager) DeletePartition

func (manager *SpaceManager) DeletePartition(dpID uint64, force bool) (err error)

DeletePartition deletes a partition based on the partition id.

func (*SpaceManager) DetachDataPartition added in v1.4.0

func (manager *SpaceManager) DetachDataPartition(partitionID uint64)

DetachDataPartition removes a data partition from the partition map.

func (*SpaceManager) FillIoUtils added in v1.34.0

func (manager *SpaceManager) FillIoUtils(samples map[string]loadutil.DiskIoSample)

func (*SpaceManager) GetAllDiskPartitions added in v1.34.0

func (manager *SpaceManager) GetAllDiskPartitions() []*disk.PartitionStat

func (*SpaceManager) GetClusterID

func (manager *SpaceManager) GetClusterID() (clusterID string)

func (*SpaceManager) GetDisk

func (manager *SpaceManager) GetDisk(path string) (d *Disk, err error)

func (*SpaceManager) GetDiskUtils added in v1.34.0

func (manager *SpaceManager) GetDiskUtils() map[string]float64

func (*SpaceManager) GetDisks

func (manager *SpaceManager) GetDisks() (disks []*Disk)

func (*SpaceManager) GetNodeID

func (manager *SpaceManager) GetNodeID() (nodeID uint64)

func (*SpaceManager) GetRaftStore

func (manager *SpaceManager) GetRaftStore() (raftStore raftstore.RaftStore)

func (*SpaceManager) LoadBrokenDisk added in v1.34.0

func (manager *SpaceManager) LoadBrokenDisk(path string, reservedSpace, diskRdonlySpace uint64, maxErrCnt int, diskEnableReadRepairExtentLimit bool) (err error)

func (*SpaceManager) LoadDisk

func (manager *SpaceManager) LoadDisk(path string, reservedSpace, diskRdonlySpace uint64, maxErrCnt int,
	diskEnableReadRepairExtentLimit bool,
) (err error)

func (*SpaceManager) Partition

func (manager *SpaceManager) Partition(partitionID uint64) (dp *DataPartition)

func (*SpaceManager) RangePartitions

func (manager *SpaceManager) RangePartitions(f func(partition *DataPartition, testID string) bool, reqID string)

func (*SpaceManager) SetClusterID

func (manager *SpaceManager) SetClusterID(clusterID string)

func (*SpaceManager) SetCurrentLoadDpLimit added in v1.34.0

func (manager *SpaceManager) SetCurrentLoadDpLimit(limit int)

func (*SpaceManager) SetCurrentStopDpLimit added in v1.34.0

func (manager *SpaceManager) SetCurrentStopDpLimit(limit int)

func (*SpaceManager) SetNodeID

func (manager *SpaceManager) SetNodeID(nodeID uint64)

func (*SpaceManager) SetRaftStore

func (manager *SpaceManager) SetRaftStore(raftStore raftstore.RaftStore)

func (*SpaceManager) StartDiskSample added in v1.34.0

func (manager *SpaceManager) StartDiskSample()

func (*SpaceManager) Stats

func (manager *SpaceManager) Stats() *Stats

func (*SpaceManager) Stop

func (manager *SpaceManager) Stop()

type Stats

type Stats struct {
	Zone                               string
	ConnectionCnt                      int64
	ClusterID                          string
	TCPAddr                            string
	Start                              time.Time
	Total                              uint64
	Used                               uint64
	Available                          uint64 // available space
	TotalPartitionSize                 uint64 // dataPartitionCnt * dataPartitionSize
	RemainingCapacityToCreatePartition uint64
	CreatedPartitionCnt                uint64
	LackPartitionsInMem                uint64
	LackPartitionsInDisk               uint64

	// the maximum capacity among all the disks that can be used to create partition
	MaxCapacityToCreatePartition uint64

	sync.Mutex
	// contains filtered or unexported fields
}

Stats defines various metrics that will be collected during the execution.

func NewStats

func NewStats(zone string) (s *Stats)

NewStats creates a new Stats.

func (*Stats) AddConnection

func (s *Stats) AddConnection()

AddConnection adds a connection.

func (*Stats) GetConnectionCount

func (s *Stats) GetConnectionCount() int64

GetConnectionCount gets the connection count.

func (*Stats) RemoveConnection

func (s *Stats) RemoveConnection()

RemoveConnection removes a connection.

type VolMap added in v1.34.0

type VolMap struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL