datanode

package
v0.0.0-...-d1c0873 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 7, 2022 License: Apache-2.0 Imports: 37 Imported by: 0

Documentation

Index

Constants

View Source
const (
	IntervalToUpdateReplica       = 600 // interval to update the replica
	IntervalToUpdatePartitionSize = 60  // interval to update the partition size
	NumOfFilesToRecoverInParallel = 10  // number of files to be recovered simultaneously
)
View Source
const (
	ReadFlag  = 1
	WriteFlag = 2
)

cmd response

View Source
const (
	ActionNotifyFollowerToRepair        = "ActionNotifyFollowerRepair"
	ActionStreamRead                    = "ActionStreamRead"
	ActionCreateExtent                  = "ActionCreateExtent:"
	ActionMarkDelete                    = "ActionMarkDelete:"
	ActionGetAllExtentWatermarks        = "ActionGetAllExtentWatermarks:"
	ActionWrite                         = "ActionWrite:"
	ActionRepair                        = "ActionRepair:"
	ActionDecommissionPartition         = "ActionDecommissionPartition"
	ActionAddDataPartitionRaftMember    = "ActionAddDataPartitionRaftMember"
	ActionRemoveDataPartitionRaftMember = "ActionRemoveDataPartitionRaftMember"
	ActionDataPartitionTryToLeader      = "ActionDataPartitionTryToLeader"

	ActionCreateDataPartition        = "ActionCreateDataPartition"
	ActionLoadDataPartition          = "ActionLoadDataPartition"
	ActionDeleteDataPartition        = "ActionDeleteDataPartition"
	ActionStreamReadTinyDeleteRecord = "ActionStreamReadTinyDeleteRecord"
	ActionSyncTinyDeleteRecord       = "ActionSyncTinyDeleteRecord"
	ActionStreamReadTinyExtentRepair = "ActionStreamReadTinyExtentRepair"
	ActionBatchMarkDelete            = "ActionBatchMarkDelete"
)

Action description

View Source
const (
	RepairRead = true
	StreamRead = false
)
View Source
const (
	EmptyResponse                      = 'E'
	TinyExtentRepairReadResponseArgLen = 17
	MaxSyncTinyDeleteBufferSize        = 2400000
	MaxFullSyncTinyDeleteTime          = 3600 * 24
	MinTinyExtentDeleteRecordSyncSize  = 4 * 1024 * 1024
)
View Source
const (
	MetricPartitionIOName      = "dataPartitionIO"
	MetricPartitionIOBytesName = "dataPartitionIOBytes"
)
View Source
const (
	DataPartitionPrefix           = "datapartition"
	DataPartitionMetadataFileName = "META"
	TempMetadataFileName          = ".meta"
	ApplyIndexFile                = "APPLY"
	TempApplyIndexFile            = ".apply"
	TimeLayout                    = "2006-01-02 15:04:05"
)
View Source
const (
	RaftStatusStopped = 0
	RaftStatusRunning = 1
)
View Source
const (
	DefaultZoneName         = proto.DefaultZoneName
	DefaultRaftDir          = "raft"
	DefaultRaftLogsToRetain = 10 // Count of raft logs per data partition
	DefaultDiskMaxErr       = 1
	DefaultDiskRetainMin    = 5 * util.GB // GB
)
View Source
const (
	ConfigKeyLocalIP       = "localIP"         // string
	ConfigKeyPort          = "port"            // int
	ConfigKeyMasterAddr    = "masterAddr"      // array
	ConfigKeyZone          = "zoneName"        // string
	ConfigKeyDisks         = "disks"           // array
	ConfigKeyRaftDir       = "raftDir"         // string
	ConfigKeyRaftHeartbeat = "raftHeartbeat"   // string
	ConfigKeyRaftReplica   = "raftReplica"     // string
	CfgTickInterval        = "tickInterval"    // int
	CfgRaftRecvBufSize     = "raftRecvBufSize" // int

	/*
	 * Metrics Degrade Level
	 * minus value: turn off metrics collection.
	 * 0 or 1: full metrics.
	 * 2: 1/2 of the metrics will be collected.
	 * 3: 1/3 of the metrics will be collected.
	 * ...
	 */
	CfgMetricsDegrade = "metricsDegrade" // int

	CfgDiskRdonlySpace = "diskRdonlySpace" // int
	// smux Config
	ConfigKeyEnableSmuxClient  = "enableSmuxConnPool" //bool
	ConfigKeySmuxPortShift     = "smuxPortShift"      //int
	ConfigKeySmuxMaxConn       = "smuxMaxConn"        //int
	ConfigKeySmuxStreamPerConn = "smuxStreamPerConn"  //int
	ConfigKeySmuxMaxBuffer     = "smuxMaxBuffer"      //int
)
View Source
const (
	BinaryMarshalMagicVersion = 0xFF
)
View Source
const (
	BufferWrite = false
)
View Source
const (
	DiskSectorSize = 512
)

Sector size

View Source
const (
	DiskStatusFile = ".diskStatus"
)
View Source
const ExpiredPartitionPrefix = "expired_"
View Source
const (
	FinishLoadDataPartitionExtentHeader = 1
)

Status of load data partition extent header

View Source
const (
	IsReleased = 1
)

Tiny extent has been put back to store

View Source
const (
	MinAvaliTinyExtentCnt = 5
)
View Source
const (
	MinTinyExtentsToRepair = 10 // minimum number of tiny extents to repair
)

Apply the raft log operation. Currently we only have the random write operation.

View Source
const (
	ModuleName = "dataNode"
)
View Source
const (
	NetworkProtocol = "tcp"
)

Network protocol

View Source
const (
	RaftNotStarted = "RaftNotStarted"
)

Error code

View Source
const (
	SyncTinyDeleteRecordFromLeaderOnEveryDisk = 5
)
View Source
const (
	UpdateNodeInfoTicket = 1 * time.Minute
)

Variables

View Source
var (
	MaxExtentRepairLimit = 20000
	MinExtentRepairLimit = 5
	CurExtentRepairLimit = MaxExtentRepairLimit
)
View Source
var (
	ErrIncorrectStoreType          = errors.New("Incorrect store type")
	ErrNoSpaceToCreatePartition    = errors.New("No disk space to create a data partition")
	ErrNewSpaceManagerFailed       = errors.New("Creater new space manager failed")
	ErrGetMasterDatanodeInfoFailed = errors.New("Failed to get datanode info from master")

	LocalIP string

	MasterClient = masterSDK.NewMasterClient(nil, false)
)
View Source
var (
	AutoRepairStatus = true
)
View Source
var (
	// RegexpDataPartitionDir validates the directory name of a data partition.
	RegexpDataPartitionDir, _ = regexp.Compile("^datapartition_(\\d)+_(\\d)+$")
)

Functions

func DeleteLimiterWait

func DeleteLimiterWait()

func GetIoMetricLabels

func GetIoMetricLabels(partition *DataPartition, tp string) map[string]string

func IsDiskErr

func IsDiskErr(errMsg string) bool

func MarshalRandWriteRaftLog

func MarshalRandWriteRaftLog(opcode uint8, extentID uint64, offset, size int64, data []byte, crc uint32) (result []byte, err error)

func NewPacketToBroadcastMinAppliedID

func NewPacketToBroadcastMinAppliedID(partitionID uint64, minAppliedID uint64) (p *repl.Packet)

NewPacketToBroadcastMinAppliedID returns a new packet to broadcast the min applied ID.

func NewPacketToGetAppliedID

func NewPacketToGetAppliedID(partitionID uint64) (p *repl.Packet)

NewPacketToGetAppliedID returns a new packet to get the applied ID.

func NewPacketToGetMaxExtentIDAndPartitionSIze

func NewPacketToGetMaxExtentIDAndPartitionSIze(partitionID uint64) (p *repl.Packet)

NewPacketToGetPartitionSize returns a new packet to get the partition size.

func NewPacketToGetPartitionSize

func NewPacketToGetPartitionSize(partitionID uint64) (p *repl.Packet)

NewPacketToGetPartitionSize returns a new packet to get the partition size.

func UnmarshalOldVersionRaftLog

func UnmarshalOldVersionRaftLog(raw []byte) (opItem *rndWrtOpItem, err error)

func UnmarshalOldVersionRandWriteOpItem

func UnmarshalOldVersionRandWriteOpItem(raw []byte) (result *rndWrtOpItem, err error)

func UnmarshalRandWriteRaftLog

func UnmarshalRandWriteRaftLog(raw []byte) (opItem *rndWrtOpItem, err error)

RandomWriteSubmit submits the proposal to raft.

Types

type DataNode

type DataNode struct {
	// contains filtered or unexported fields
}

DataNode defines the structure of a data node.

func NewServer

func NewServer() *DataNode

func (*DataNode) OperatePacket

func (s *DataNode) OperatePacket(p *repl.Packet, c net.Conn) (err error)

func (*DataNode) Post

func (s *DataNode) Post(p *repl.Packet) error

func (*DataNode) Prepare

func (s *DataNode) Prepare(p *repl.Packet) (err error)

func (*DataNode) Shutdown

func (s *DataNode) Shutdown()

Shutdown shuts down the current data node.

func (*DataNode) Start

func (s *DataNode) Start(cfg *config.Config) (err error)

func (*DataNode) Sync

func (s *DataNode) Sync()

Sync keeps data node in sync.

type DataNodeInfo

type DataNodeInfo struct {
	Addr                      string
	PersistenceDataPartitions []uint64
}

type DataNodeMetrics

type DataNodeMetrics struct {
	MetricIOBytes *exporter.Counter
}

type DataPartition

type DataPartition struct {
	DataPartitionCreateType int
	// contains filtered or unexported fields
}

func CreateDataPartition

func CreateDataPartition(dpCfg *dataPartitionCfg, disk *Disk, request *proto.CreateDataPartitionRequest) (dp *DataPartition, err error)

func LoadDataPartition

func LoadDataPartition(partitionDir string, disk *Disk) (dp *DataPartition, err error)

LoadDataPartition loads and returns a partition instance based on the specified directory. It reads the partition metadata file stored under the specified directory and creates the partition instance.

func (*DataPartition) Apply

func (dp *DataPartition) Apply(command []byte, index uint64) (resp interface{}, err error)

Apply puts the data onto the disk.

func (*DataPartition) ApplyMemberChange

func (dp *DataPartition) ApplyMemberChange(confChange *raftproto.ConfChange, index uint64) (resp interface{}, err error)

ApplyMemberChange supports adding new raft member or deleting an existing raft member. It does not support updating an existing member at this point.

func (*DataPartition) ApplyRandomWrite

func (dp *DataPartition) ApplyRandomWrite(command []byte, raftApplyID uint64) (resp interface{}, err error)

ApplyRandomWrite random write apply

func (*DataPartition) ApplySnapshot

func (dp *DataPartition) ApplySnapshot(peers []raftproto.Peer, iterator raftproto.SnapIterator) (err error)

ApplySnapshot asks the raft leader for the snapshot data to recover the contents on the local disk.

func (*DataPartition) Available

func (dp *DataPartition) Available() int

Available returns the available space.

func (*DataPartition) CanRemoveRaftMember

func (dp *DataPartition) CanRemoveRaftMember(peer proto.Peer) error

func (*DataPartition) ChangeRaftMember

func (dp *DataPartition) ChangeRaftMember(changeType raftProto.ConfChangeType, peer raftProto.Peer, context []byte) (resp interface{}, err error)

ChangeRaftMember is a wrapper function of changing the raft member.

func (*DataPartition) CheckLeader

func (dp *DataPartition) CheckLeader(request *repl.Packet, connect net.Conn) (err error)

CheckLeader checks if itself is the leader during read

func (*DataPartition) Del

func (dp *DataPartition) Del(key interface{}) (interface{}, error)

Del deletes the raft log based on the given key. It is not needed for replicating data partition.

func (*DataPartition) Disk

func (dp *DataPartition) Disk() *Disk

Disk returns the disk instance.

func (*DataPartition) DoExtentStoreRepair

func (dp *DataPartition) DoExtentStoreRepair(repairTask *DataPartitionRepairTask)

DoExtentStoreRepair performs the repairs of the extent store. 1. when the extent size is smaller than the max size on the record, start to repair the missing part. 2. if the extent does not even exist, create the extent first, and then repair.

func (*DataPartition) DoRepair

func (dp *DataPartition) DoRepair(repairTasks []*DataPartitionRepairTask)

DoRepair asks the leader to perform the repair tasks.

func (*DataPartition) ExtentStore

func (dp *DataPartition) ExtentStore() *storage.ExtentStore

func (*DataPartition) ForceLoadHeader

func (dp *DataPartition) ForceLoadHeader()

func (*DataPartition) ForceSetDataPartitionToFininshLoad

func (dp *DataPartition) ForceSetDataPartitionToFininshLoad()

func (*DataPartition) ForceSetDataPartitionToLoadding

func (dp *DataPartition) ForceSetDataPartitionToLoadding()

func (*DataPartition) ForceSetRaftRunning

func (dp *DataPartition) ForceSetRaftRunning()

func (*DataPartition) Get

func (dp *DataPartition) Get(key interface{}) (interface{}, error)

Get returns the raft log based on the given key. It is not needed for replicating data partition.

func (*DataPartition) GetAppliedID

func (dp *DataPartition) GetAppliedID() (id uint64)

func (*DataPartition) GetExtentCount

func (dp *DataPartition) GetExtentCount() int

func (*DataPartition) HandleFatalEvent

func (dp *DataPartition) HandleFatalEvent(err *raft.FatalError)

HandleFatalEvent notifies the application when panic happens.

func (*DataPartition) HandleLeaderChange

func (dp *DataPartition) HandleLeaderChange(leader uint64)

HandleLeaderChange notifies the application when the raft leader has changed.

func (*DataPartition) IsEquareCreateDataPartitionRequst

func (dp *DataPartition) IsEquareCreateDataPartitionRequst(request *proto.CreateDataPartitionRequest) (err error)

func (*DataPartition) IsExsitReplica

func (dp *DataPartition) IsExsitReplica(addr string) bool

func (*DataPartition) IsRaftLeader

func (dp *DataPartition) IsRaftLeader() (addr string, ok bool)

IsRaftLeader tells if the given address belongs to the raft leader.

func (*DataPartition) LaunchRepair

func (dp *DataPartition) LaunchRepair(extentType uint8)

LaunchRepair launches the repair of extents.

func (*DataPartition) Load

func (dp *DataPartition) Load() (response *proto.LoadDataPartitionResponse)

func (*DataPartition) LoadAppliedID

func (dp *DataPartition) LoadAppliedID() (err error)

LoadAppliedID loads the applied IDs to the memory.

func (*DataPartition) NotifyExtentRepair

func (dp *DataPartition) NotifyExtentRepair(members []*DataPartitionRepairTask) (err error)

NotifyExtentRepair notifies the followers to repair.

func (*DataPartition) Path

func (dp *DataPartition) Path() string

func (*DataPartition) PersistMetadata

func (dp *DataPartition) PersistMetadata() (err error)

PersistMetadata persists the file metadata on the disk.

func (*DataPartition) Put

func (dp *DataPartition) Put(key interface{}, val interface{}) (resp interface{}, err error)

Put submits the raft log to the raft store.

func (*DataPartition) RandomWriteSubmit

func (dp *DataPartition) RandomWriteSubmit(pkg *repl.Packet) (err error)

RandomWriteSubmit submits the proposal to raft.

func (*DataPartition) ReloadSnapshot

func (dp *DataPartition) ReloadSnapshot()

func (*DataPartition) Replicas

func (dp *DataPartition) Replicas() []string

func (*DataPartition) SetMinAppliedID

func (dp *DataPartition) SetMinAppliedID(id uint64)

func (*DataPartition) Size

func (dp *DataPartition) Size() int

Size returns the partition size.

func (*DataPartition) SnapShot

func (dp *DataPartition) SnapShot() (files []*proto.File)

Snapshot returns the snapshot of the data partition.

func (*DataPartition) Snapshot

func (dp *DataPartition) Snapshot() (raftproto.Snapshot, error)

Snapshot persists the in-memory data (as a snapshot) to the disk. Note that the data in each data partition has already been saved on the disk. Therefore there is no need to take the snapshot in this case.

func (*DataPartition) StartRaft

func (dp *DataPartition) StartRaft() (err error)

StartRaft start raft instance when data partition start or restore.

func (*DataPartition) StartRaftAfterRepair

func (dp *DataPartition) StartRaftAfterRepair()

StartRaftAfterRepair starts the raft after repairing a partition. It can only happens after all the extent files are repaired by the leader. When the repair is finished, the local dp.partitionSize is same as the leader's dp.partitionSize. The repair task can be done in statusUpdateScheduler->LaunchRepair.

func (*DataPartition) StartRaftLoggingSchedule

func (dp *DataPartition) StartRaftLoggingSchedule()

StartRaftLoggingSchedule starts the task schedule as follows: 1. write the raft applied id into disk. 2. collect the applied ids from raft members. 3. based on the minimum applied id to cutoff and delete the saved raft log in order to free the disk space.

func (*DataPartition) Status

func (dp *DataPartition) Status() int

Status returns the partition status.

func (*DataPartition) Stop

func (dp *DataPartition) Stop()

Stop close the store and the raft store.

func (*DataPartition) String

func (dp *DataPartition) String() (m string)

String returns the string format of the data partition information.

func (*DataPartition) Used

func (dp *DataPartition) Used() int

Used returns the used space.

type DataPartitionMetadata

type DataPartitionMetadata struct {
	VolumeID                string
	PartitionID             uint64
	PartitionSize           int
	CreateTime              string
	Peers                   []proto.Peer
	Hosts                   []string
	DataPartitionCreateType int
	LastTruncateID          uint64
}

func (*DataPartitionMetadata) Validate

func (md *DataPartitionMetadata) Validate() (err error)

type DataPartitionRepairTask

type DataPartitionRepairTask struct {
	TaskType uint8

	ExtentsToBeCreated             []*storage.ExtentInfo
	ExtentsToBeRepaired            []*storage.ExtentInfo
	LeaderTinyDeleteRecordFileSize int64
	LeaderAddr                     string
	// contains filtered or unexported fields
}

DataPartitionRepairTask defines the reapir task for the data partition.

func NewDataPartitionRepairTask

func NewDataPartitionRepairTask(extentFiles []*storage.ExtentInfo, tinyDeleteRecordFileSize int64, source, leaderAddr string) (task *DataPartitionRepairTask)

type Disk

type Disk struct {
	sync.RWMutex
	Path        string
	ReadErrCnt  uint64 // number of read errors
	WriteErrCnt uint64 // number of write errors

	Total       uint64
	Used        uint64
	Available   uint64
	Unallocated uint64
	Allocated   uint64

	MaxErrCnt       int // maximum number of errors
	Status          int // disk status such as READONLY
	ReservedSpace   uint64
	DiskRdonlySpace uint64

	RejectWrite bool
	// contains filtered or unexported fields
}

Disk represents the structure of the disk

func NewDisk

func NewDisk(path string, reservedSpace, diskRdonlySpace uint64, maxErrCnt int, space *SpaceManager) (d *Disk)

func (*Disk) AddSize

func (d *Disk) AddSize(size uint64)

func (*Disk) AttachDataPartition

func (d *Disk) AttachDataPartition(dp *DataPartition)

AttachDataPartition adds a data partition to the partition map.

func (*Disk) CanWrite

func (d *Disk) CanWrite() bool

func (*Disk) DataPartitionList

func (d *Disk) DataPartitionList() (partitionIDs []uint64)

DataPartitionList returns a list of the data partitions

func (*Disk) DetachDataPartition

func (d *Disk) DetachDataPartition(dp *DataPartition)

DetachDataPartition removes a data partition from the partition map.

func (*Disk) ForceExitRaftStore

func (d *Disk) ForceExitRaftStore()

func (*Disk) GetDataPartition

func (d *Disk) GetDataPartition(partitionID uint64) (partition *DataPartition)

GetDataPartition returns the data partition based on the given partition ID.

func (*Disk) PartitionCount

func (d *Disk) PartitionCount() int

PartitionCount returns the number of partitions in the partition map.

func (*Disk) RestorePartition

func (d *Disk) RestorePartition(visitor PartitionVisitor)

RestorePartition reads the files stored on the local disk and restores the data partitions.

type ItemIterator

type ItemIterator struct {
	// contains filtered or unexported fields
}

func NewItemIterator

func NewItemIterator(applyID uint64) *ItemIterator

NewItemIterator creates a new item iterator.

func (*ItemIterator) ApplyIndex

func (si *ItemIterator) ApplyIndex() uint64

ApplyIndex returns the appliedID

func (*ItemIterator) Close

func (si *ItemIterator) Close()

Close Closes the iterator.

func (*ItemIterator) Next

func (si *ItemIterator) Next() (data []byte, err error)

Next returns the next item in the iterator.

type PartitionVisitor

type PartitionVisitor func(dp *DataPartition)

type RaftCmdItem

type RaftCmdItem struct {
	Op uint32 `json:"op"`
	K  []byte `json:"k"`
	V  []byte `json:"v"`
}

type SpaceManager

type SpaceManager struct {
	// contains filtered or unexported fields
}

SpaceManager manages the disk space.

func NewSpaceManager

func NewSpaceManager(dataNode *DataNode) *SpaceManager

NewSpaceManager creates a new space manager.

func (*SpaceManager) AttachPartition

func (manager *SpaceManager) AttachPartition(dp *DataPartition)

func (*SpaceManager) CreatePartition

func (manager *SpaceManager) CreatePartition(request *proto.CreateDataPartitionRequest) (dp *DataPartition, err error)

func (*SpaceManager) DeletePartition

func (manager *SpaceManager) DeletePartition(dpID uint64)

DeletePartition deletes a partition based on the partition id.

func (*SpaceManager) DetachDataPartition

func (manager *SpaceManager) DetachDataPartition(partitionID uint64)

DetachDataPartition removes a data partition from the partition map.

func (*SpaceManager) GetClusterID

func (manager *SpaceManager) GetClusterID() (clusterID string)

func (*SpaceManager) GetDisk

func (manager *SpaceManager) GetDisk(path string) (d *Disk, err error)

func (*SpaceManager) GetDisks

func (manager *SpaceManager) GetDisks() (disks []*Disk)

func (*SpaceManager) GetNodeID

func (manager *SpaceManager) GetNodeID() (nodeID uint64)

func (*SpaceManager) GetRaftStore

func (manager *SpaceManager) GetRaftStore() (raftStore raftstore.RaftStore)

func (*SpaceManager) LoadDisk

func (manager *SpaceManager) LoadDisk(path string, reservedSpace, diskRdonlySpace uint64, maxErrCnt int) (err error)

func (*SpaceManager) Partition

func (manager *SpaceManager) Partition(partitionID uint64) (dp *DataPartition)

func (*SpaceManager) RangePartitions

func (manager *SpaceManager) RangePartitions(f func(partition *DataPartition) bool)

func (*SpaceManager) SetClusterID

func (manager *SpaceManager) SetClusterID(clusterID string)

func (*SpaceManager) SetNodeID

func (manager *SpaceManager) SetNodeID(nodeID uint64)

func (*SpaceManager) SetRaftStore

func (manager *SpaceManager) SetRaftStore(raftStore raftstore.RaftStore)

func (*SpaceManager) Stats

func (manager *SpaceManager) Stats() *Stats

func (*SpaceManager) Stop

func (manager *SpaceManager) Stop()

type Stats

type Stats struct {
	Zone                               string
	ConnectionCnt                      int64
	ClusterID                          string
	TCPAddr                            string
	Start                              time.Time
	Total                              uint64
	Used                               uint64
	Available                          uint64 // available space
	TotalPartitionSize                 uint64 // dataPartitionCnt * dataPartitionSize
	RemainingCapacityToCreatePartition uint64
	CreatedPartitionCnt                uint64

	// the maximum capacity among all the nodes that can be used to create partition
	MaxCapacityToCreatePartition uint64

	sync.Mutex
	// contains filtered or unexported fields
}

Stats defines various metrics that will be collected during the execution.

func NewStats

func NewStats(zone string) (s *Stats)

NewStats creates a new Stats.

func (*Stats) AddConnection

func (s *Stats) AddConnection()

AddConnection adds a connection.

func (*Stats) GetConnectionCount

func (s *Stats) GetConnectionCount() int64

GetConnectionCount gets the connection count.

func (*Stats) RemoveConnection

func (s *Stats) RemoveConnection()

RemoveConnection removes a connection.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL