datanode

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 9, 2019 License: Apache-2.0 Imports: 33 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Standby uint32 = iota
	Start
	Running
	Shutdown
	Stopped
)

The status of the server

View Source
const (
	IntervalToUpdateReplica       = 60 // interval to update the replica
	IntervalToUpdatePartitionSize = 60 // interval to update the partition size
	NumOfFilesToRecoverInParallel = 7  // number of files to be recovered simultaneously
)
View Source
const (
	ReadFlag  = 1
	WriteFlag = 2
)

cmd response

View Source
const (
	ActionNotifyFollowerToRepair     = "ActionNotifyFollowerRepair"
	ActionStreamRead                 = "ActionStreamRead"
	ActionGetDataPartitionMetrics    = "ActionGetDataPartitionMetrics"
	ActionCreateExtent               = "ActionCreateExtent:"
	ActionMarkDelete                 = "ActionMarkDelete:"
	ActionGetAllExtentWatermarks     = "ActionGetAllExtentWatermarks:"
	ActionWrite                      = "ActionWrite:"
	ActionRepair                     = "ActionRepair:"
	ActionDecommissionPartition      = "ActionDecommissionPartition"
	ActionCreateDataPartition        = "ActionCreateDataPartition"
	ActionLoadDataPartition          = "ActionLoadDataPartition"
	ActionDeleteDataPartition        = "ActionDeleteDataPartition"
	ActionStreamReadTinyDeleteRecord = "ActionStreamReadTinyDeleteRecord"
	ActionSyncTinyDeleteRecord       = "ActionSyncTinyDeleteRecord"
)

Action description

View Source
const (
	MinTinyExtentsToRepair = 10        // minimum number of tiny extents to repair
	NumOfRaftLogsToRetain  = 100000    // Count of raft logs per data partition
	MaxUnit64              = 1<<64 - 1 // Unit64 max value
)
View Source
const (
	RepairRead = true
	StreamRead = false
)
View Source
const (
	NotUpdateSize = false
	UpdateSize    = true
)
View Source
const (
	MaxSyncTinyDeleteBufferSize = 2400000
	MaxFullSyncTinyDeleteTime   = 60 * 2
)
View Source
const (
	DataPartitionPrefix           = "datapartition"
	DataPartitionMetadataFileName = "META"
	TempMetadataFileName          = ".meta"
	ApplyIndexFile                = "APPLY"
	TempApplyIndexFile            = ".apply"
	TimeLayout                    = "2006-01-02 15:04:05"
)
View Source
const (
	DefaultRackName         = "cfs_rack1"
	DefaultRaftDir          = "raft"
	DefaultRaftLogsToRetain = 2000 // Count of raft logs per data partition
	DefaultDiskMaxErr       = 20
	DefaultDiskRetain       = 20 * util.GB // GB
)
View Source
const (
	ConfigKeyLocalIP       = "localIP"       // string
	ConfigKeyPort          = "port"          // int
	ConfigKeyMasterAddr    = "masterAddr"    // array
	ConfigKeyRack          = "rack"          // string
	ConfigKeyDisks         = "disks"         // array
	ConfigKeyRaftDir       = "raftDir"       // string
	ConfigKeyRaftHeartbeat = "raftHeartbeat" // string
	ConfigKeyRaftReplica   = "raftReplica"   // string
)
View Source
const (
	BufferWrite = false
)
View Source
const (
	DiskSectorSize = 512
)

Sector size

View Source
const (
	FinishLoadDataPartitionExtentHeader = 1
)

Status of load data partition extent header

View Source
const (
	IsReleased = 1
)

Tiny extent has been put back to store

View Source
const (
	ModuleName = "dataNode"
)
View Source
const (
	NetworkProtocol = "tcp"
)

Network protocol

View Source
const (
	RaftNotStarted = "RaftNotStarted"
)

Error code

Variables

View Source
var (
	ErrIncorrectStoreType       = errors.New("incorrect store type")
	ErrNoSpaceToCreatePartition = errors.New("no disk space to create a data partition")
	ErrBadConfFile              = errors.New("bad config file")

	LocalIP string

	MasterHelper = util.NewMasterHelper()
)
View Source
var (
	// RegexpDataPartitionDir validates the directory name of a data partition.
	RegexpDataPartitionDir, _ = regexp.Compile("^datapartition_(\\d)+_(\\d)+$")
)

Functions

func IsDiskErr

func IsDiskErr(errMsg string) bool

func NewPacketToBroadcastMinAppliedID

func NewPacketToBroadcastMinAppliedID(partitionID uint64, minAppliedID uint64) (p *repl.Packet)

NewPacketToBroadcastMinAppliedID returns a new packet to broadcast the min applied ID.

func NewPacketToGetAppliedID

func NewPacketToGetAppliedID(partitionID uint64) (p *repl.Packet)

NewPacketToGetAppliedID returns a new packet to get the applied ID.

func NewPacketToGetPartitionSize

func NewPacketToGetPartitionSize(partitionID uint64) (p *repl.Packet)

NewPacketToGetPartitionSize returns a new packet to get the partition size.

Types

type DataNode

type DataNode struct {
	// contains filtered or unexported fields
}

DataNode defines the structure of a data node.

func NewServer

func NewServer() *DataNode

func (*DataNode) OperatePacket

func (s *DataNode) OperatePacket(p *repl.Packet, c *net.TCPConn) (err error)

func (*DataNode) Post

func (s *DataNode) Post(p *repl.Packet) error

func (*DataNode) Prepare

func (s *DataNode) Prepare(p *repl.Packet) (err error)

func (*DataNode) Shutdown

func (s *DataNode) Shutdown()

Shutdown shuts down the current data node.

func (*DataNode) Start

func (s *DataNode) Start(cfg *config.Config) (err error)

func (*DataNode) Sync

func (s *DataNode) Sync()

Sync keeps data node in sync.

type DataPartition

type DataPartition struct {
	FullSyncTinyDeleteTime int64
	// contains filtered or unexported fields
}

func CreateDataPartition

func CreateDataPartition(dpCfg *dataPartitionCfg, disk *Disk) (dp *DataPartition, err error)

func LoadDataPartition

func LoadDataPartition(partitionDir string, disk *Disk) (dp *DataPartition, err error)

LoadDataPartition loads and returns a partition instance based on the specified directory. It reads the partition metadata file stored under the specified directory and creates the partition instance.

func (*DataPartition) Apply

func (dp *DataPartition) Apply(command []byte, index uint64) (resp interface{}, err error)

Apply puts the data onto the disk.

func (*DataPartition) ApplyMemberChange

func (dp *DataPartition) ApplyMemberChange(confChange *raftproto.ConfChange, index uint64) (resp interface{}, err error)

ApplyMemberChange supports adding new raft member or deleting an existing raft member. It does not support updating an existing member at this point.

func (*DataPartition) ApplyRandomWrite

func (dp *DataPartition) ApplyRandomWrite(msg *RaftCmdItem, raftApplyID uint64) (extentID uint64, err error)

ApplyRandomWrite random write apply

func (*DataPartition) ApplySnapshot

func (dp *DataPartition) ApplySnapshot(peers []raftproto.Peer, iterator raftproto.SnapIterator) (err error)

ApplySnapshot asks the raft leader for the snapshot data to recover the contents on the local disk.

func (*DataPartition) Available

func (dp *DataPartition) Available() int

Available returns the available space.

func (*DataPartition) ChangeRaftMember

func (dp *DataPartition) ChangeRaftMember(changeType raftProto.ConfChangeType, peer raftProto.Peer, context []byte) (resp interface{}, err error)

ChangeRaftMember is a wrapper function of changing the raft member.

func (*DataPartition) CheckLeader

func (dp *DataPartition) CheckLeader(request *repl.Packet, connect net.Conn) (err error)

CheckLeader checks if itself is the leader during read

func (*DataPartition) Del

func (dp *DataPartition) Del(key interface{}) (interface{}, error)

Del deletes the raft log based on the given key. It is not needed for replicating data partition.

func (*DataPartition) Disk

func (dp *DataPartition) Disk() *Disk

Disk returns the disk instance.

func (*DataPartition) DoExtentStoreRepair

func (dp *DataPartition) DoExtentStoreRepair(repairTask *DataPartitionRepairTask)

DoExtentStoreRepair performs the repairs of the extent store. 1. when the extent size is smaller than the max size on the record, start to repair the missing part. 2. if the extent does not even exist, create the extent first, and then repair.

func (*DataPartition) DoRepair

func (dp *DataPartition) DoRepair(repairTasks []*DataPartitionRepairTask)

DoRepair asks the leader to perform the repair tasks.

func (*DataPartition) ExtentStore

func (dp *DataPartition) ExtentStore() *storage.ExtentStore

func (*DataPartition) ForceLoadHeader

func (dp *DataPartition) ForceLoadHeader()

func (*DataPartition) Get

func (dp *DataPartition) Get(key interface{}) (interface{}, error)

Get returns the raft log based on the given key. It is not needed for replicating data partition.

func (*DataPartition) GetAppliedID

func (dp *DataPartition) GetAppliedID() (id uint64)

func (*DataPartition) GetExtentCount

func (dp *DataPartition) GetExtentCount() int

func (*DataPartition) HandleFatalEvent

func (dp *DataPartition) HandleFatalEvent(err *raft.FatalError)

HandleFatalEvent notifies the application when panic happens.

func (*DataPartition) HandleLeaderChange

func (dp *DataPartition) HandleLeaderChange(leader uint64)

HandleLeaderChange notifies the application when the raft leader has changed.

func (*DataPartition) IsRaftLeader

func (dp *DataPartition) IsRaftLeader() (addr string, ok bool)

IsRaftLeader tells if the given address belongs to the raft leader.

func (*DataPartition) LaunchRepair

func (dp *DataPartition) LaunchRepair(extentType uint8)

LaunchRepair launches the repair of extents.

func (*DataPartition) Load

func (dp *DataPartition) Load() (response *proto.LoadDataPartitionResponse)

func (*DataPartition) LoadAppliedID

func (dp *DataPartition) LoadAppliedID() (err error)

LoadAppliedID loads the applied IDs to the memory.

func (*DataPartition) NotifyExtentRepair

func (dp *DataPartition) NotifyExtentRepair(members []*DataPartitionRepairTask) (err error)

NotifyExtentRepair notifies the followers to repair.

func (*DataPartition) Path

func (dp *DataPartition) Path() string

func (*DataPartition) PersistMetadata

func (dp *DataPartition) PersistMetadata() (err error)

PersistMetadata persists the file metadata on the disk.

func (*DataPartition) Put

func (dp *DataPartition) Put(key, val interface{}) (resp interface{}, err error)

Put submits the raft log to the raft store.

func (*DataPartition) RandomWriteSubmit

func (dp *DataPartition) RandomWriteSubmit(pkg *repl.Packet) (err error)

RandomWriteSubmit submits the proposal to raft.

func (*DataPartition) ReloadSnapshot

func (dp *DataPartition) ReloadSnapshot()

func (*DataPartition) Replicas

func (dp *DataPartition) Replicas() []string

func (*DataPartition) SetMinAppliedID

func (dp *DataPartition) SetMinAppliedID(id uint64)

func (*DataPartition) Size

func (dp *DataPartition) Size() int

Size returns the partition size.

func (*DataPartition) SnapShot

func (dp *DataPartition) SnapShot() (files []*proto.File)

Snapshot returns the snapshot of the data partition.

func (*DataPartition) Snapshot

func (dp *DataPartition) Snapshot() (raftproto.Snapshot, error)

Snapshot persists the in-memory data (as a snapshot) to the disk. Note that the data in each data partition has already been saved on the disk. Therefore there is no need to take the snapshot in this case.

func (*DataPartition) StartRaft

func (dp *DataPartition) StartRaft() (err error)

StartRaft start raft instance when data partition start or restore.

func (*DataPartition) StartRaftAfterRepair

func (dp *DataPartition) StartRaftAfterRepair()

StartRaftAfterRepair starts the raft after repairing a partition. It can only happens after all the extent files are repaired by the leader. When the repair is finished, the local dp.partitionSize is same as the leader's dp.partitionSize. The repair task can be done in statusUpdateScheduler->LaunchRepair.

func (*DataPartition) StartRaftLoggingSchedule

func (dp *DataPartition) StartRaftLoggingSchedule()

StartRaftLoggingSchedule starts the task schedule as follows: 1. write the raft applied id into disk. 2. collect the applied ids from raft members. 3. based on the minimum applied id to cutoff and delete the saved raft log in order to free the disk space.

func (*DataPartition) Status

func (dp *DataPartition) Status() int

Status returns the partition status.

func (*DataPartition) Stop

func (dp *DataPartition) Stop()

Stop close the store and the raft store.

func (*DataPartition) String

func (dp *DataPartition) String() (m string)

String returns the string format of the data partition information.

func (*DataPartition) Used

func (dp *DataPartition) Used() int

Used returns the used space.

type DataPartitionMetadata

type DataPartitionMetadata struct {
	VolumeID      string
	PartitionID   uint64
	PartitionSize int
	CreateTime    string
	Peers         []proto.Peer
}

func (*DataPartitionMetadata) Validate

func (md *DataPartitionMetadata) Validate() (err error)

type DataPartitionMetrics

type DataPartitionMetrics struct {
	WriteCnt          uint64
	ReadCnt           uint64
	TotalWriteLatency uint64
	TotalReadLatency  uint64
	WriteLatency      float64
	ReadLatency       float64
	// contains filtered or unexported fields
}

DataPartitionMetrics defines the metrics related to the data partition

func NewDataPartitionMetrics

func NewDataPartitionMetrics() *DataPartitionMetrics

NewDataPartitionMetrics creates a new DataPartitionMetrics

func (*DataPartitionMetrics) UpdateReadMetrics

func (metrics *DataPartitionMetrics) UpdateReadMetrics(latency uint64)

UpdateReadMetrics updates the read-related metrics

func (*DataPartitionMetrics) UpdateWriteMetrics

func (metrics *DataPartitionMetrics) UpdateWriteMetrics(latency uint64)

UpdateWriteMetrics updates the write-related metrics

type DataPartitionRepairTask

type DataPartitionRepairTask struct {
	TaskType uint8

	ExtentsToBeCreated       []*storage.ExtentInfo
	ExtentsToBeRepaired      []*storage.ExtentInfo
	LeaderTinyDeleteFileSize int64
	LeaderAddr               string
	// contains filtered or unexported fields
}

DataPartitionRepairTask defines the reapir task for the data partition.

func NewDataPartitionRepairTask

func NewDataPartitionRepairTask(extentFiles []*storage.ExtentInfo, tinyDeleteFileSize int64, source, leaderAddr string) (task *DataPartitionRepairTask)

type Disk

type Disk struct {
	sync.RWMutex
	Path        string
	ReadErrCnt  uint64 // number of read errors
	WriteErrCnt uint64 // number of write errors

	Total       uint64
	Used        uint64
	Available   uint64
	Unallocated uint64
	Allocated   uint64

	MaxErrCnt     int // maximum number of errors
	Status        int // disk status such as READONLY
	ReservedSpace uint64
	// contains filtered or unexported fields
}

Disk represents the structure of the disk

func NewDisk

func NewDisk(path string, restSize uint64, maxErrCnt int, space *SpaceManager) (d *Disk)

func (*Disk) AttachDataPartition

func (d *Disk) AttachDataPartition(dp *DataPartition)

AttachDataPartition adds a data partition to the partition map.

func (*Disk) DataPartitionList

func (d *Disk) DataPartitionList() (partitionIDs []uint64)

DataPartitionList returns a list of the data partitions

func (*Disk) DetachDataPartition

func (d *Disk) DetachDataPartition(dp *DataPartition)

DetachDataPartition removes a data partition from the partition map.

func (*Disk) ForceLoadPartitionHeader

func (d *Disk) ForceLoadPartitionHeader()

func (*Disk) GetDataPartition

func (d *Disk) GetDataPartition(partitionID uint64) (partition *DataPartition)

GetDataPartition returns the data partition based on the given partition ID.

func (*Disk) PartitionCount

func (d *Disk) PartitionCount() int

PartitionCount returns the number of partitions in the partition map.

func (*Disk) RestorePartition

func (d *Disk) RestorePartition(visitor PartitionVisitor)

RestorePartition reads the files stored on the local disk and restores the data partitions.

type ItemIterator

type ItemIterator struct {
	// contains filtered or unexported fields
}

func NewItemIterator

func NewItemIterator(applyID uint64) *ItemIterator

NewItemIterator creates a new item iterator.

func (*ItemIterator) ApplyIndex

func (si *ItemIterator) ApplyIndex() uint64

ApplyIndex returns the appliedID

func (*ItemIterator) Close

func (si *ItemIterator) Close()

Close Closes the iterator.

func (*ItemIterator) Next

func (si *ItemIterator) Next() (data []byte, err error)

Next returns the next item in the iterator.

type PartitionVisitor

type PartitionVisitor func(dp *DataPartition)

type RaftCmdItem

type RaftCmdItem struct {
	Op uint32 `json:"op"`
	K  []byte `json:"k"`
	V  []byte `json:"v"`
}

type SpaceManager

type SpaceManager struct {
	// contains filtered or unexported fields
}

SpaceManager manages the disk space.

func NewSpaceManager

func NewSpaceManager(rack string) *SpaceManager

NewSpaceManager creates a new space manager.

func (*SpaceManager) CreatePartition

func (manager *SpaceManager) CreatePartition(request *proto.CreateDataPartitionRequest) (dp *DataPartition, err error)

func (*SpaceManager) DeletePartition

func (manager *SpaceManager) DeletePartition(dpID uint64)

DeletePartition deletes a partition based on the partition id.

func (*SpaceManager) GetClusterID

func (manager *SpaceManager) GetClusterID() (clusterID string)

func (*SpaceManager) GetDisk

func (manager *SpaceManager) GetDisk(path string) (d *Disk, err error)

func (*SpaceManager) GetDisks

func (manager *SpaceManager) GetDisks() (disks []*Disk)

func (*SpaceManager) GetNodeID

func (manager *SpaceManager) GetNodeID() (nodeID uint64)

func (*SpaceManager) GetRaftStore

func (manager *SpaceManager) GetRaftStore() (raftStore raftstore.RaftStore)

func (*SpaceManager) LoadDisk

func (manager *SpaceManager) LoadDisk(path string, reservedSpace uint64, maxErrCnt int) (err error)

func (*SpaceManager) Partition

func (manager *SpaceManager) Partition(partitionID uint64) (dp *DataPartition)

func (*SpaceManager) RangePartitions

func (manager *SpaceManager) RangePartitions(f func(partition *DataPartition) bool)

func (*SpaceManager) SetClusterID

func (manager *SpaceManager) SetClusterID(clusterID string)

func (*SpaceManager) SetNodeID

func (manager *SpaceManager) SetNodeID(nodeID uint64)

func (*SpaceManager) SetRaftStore

func (manager *SpaceManager) SetRaftStore(raftStore raftstore.RaftStore)

func (*SpaceManager) Stats

func (manager *SpaceManager) Stats() *Stats

func (*SpaceManager) Stop

func (manager *SpaceManager) Stop()

type Stats

type Stats struct {
	Zone                               string
	ConnectionCnt                      int64
	ClusterID                          string
	TCPAddr                            string
	Start                              time.Time
	Total                              uint64
	Used                               uint64
	Available                          uint64 // available space
	TotalPartitionSize                 uint64 // dataPartitionCnt * dataPartitionSize
	RemainingCapacityToCreatePartition uint64
	CreatedPartitionCnt                uint64

	// the maximum capacity among all the nodes that can be used to create partition
	MaxCapacityToCreatePartition uint64

	sync.Mutex
	// contains filtered or unexported fields
}

Stats defines various metrics that will be collected during the execution.

func NewStats

func NewStats(zone string) (s *Stats)

NewStats creates a new Stats.

func (*Stats) AddConnection

func (s *Stats) AddConnection()

AddConnection adds a connection.

func (*Stats) GetConnectionCount

func (s *Stats) GetConnectionCount() int64

GetConnectionCount gets the connection count.

func (*Stats) RemoveConnection

func (s *Stats) RemoveConnection()

RemoveConnection removes a connection.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL