replication

package
v0.0.0-...-199de25 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 14, 2024 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	VOTE_FOR_NO_ONE = -1

	INIT_LOG_INDEX = 0
)

Variables

View Source
var (
	RAFTLOG_PREFIX = []byte{0x02, 0x00, 0x00, 0x24}

	FIRST_IDX_KEY = []byte{0x88, 0x88}

	LAST_IDX_KEY = []byte{0x99, 0x99}

	RAFT_STATE_KEY = []byte{0x19, 0x49}

	SNAPSHOT_STATE_KEY = []byte{0x19, 0x97}
)

Functions

func DecodeEntry

func DecodeEntry(in []byte) *pb.Entry

DecodeEntry decode log entry from bytes sequence

func DecodeRaftLogKey

func DecodeRaftLogKey(bts []byte) uint64

DecodeRaftLogKey deocde raft log key, return log id

func EncodeEntry

func EncodeEntry(ent *pb.Entry) []byte

EncodeEntry encode log entry to bytes sequence

func EncodeRaftLogKey

func EncodeRaftLogKey(idx uint64) []byte

EncodeRaftLogKey encode raft log key with perfix -> RAFTLOG_PREFIX

func EncodeRaftState

func EncodeRaftState(rfState *RaftPersistenState) []byte

EncodeRaftState encode RaftPersistenState to bytes sequence

func MakeAnRandomElectionTimeout

func MakeAnRandomElectionTimeout(base int) int

func Max

func Max(x, y int) int

func Min

func Min(x, y int) int

func NodeToString

func NodeToString(role NodeRole) string

func PrintDebugLog

func PrintDebugLog(msg string)

func RandIntRange

func RandIntRange(min int, max int) int

Types

type IRaftLog

type IRaftLog interface {
	GetFirstLogId() uint64
	GetLastLogId() uint64
	ResetFirstLogEntry(term int64, index int64) error
	ReInitLogs() error
	GetFirst() *pb.Entry
	GetLast() *pb.Entry
	LogItemCount() int
	Append(newEnt *pb.Entry)
	EraseBefore(logidx int64, withDel bool) ([]*pb.Entry, error)
	EraseAfter(logidx int64, withDel bool) []*pb.Entry
	GetRange(lo, hi int64) []*pb.Entry
	GetEntry(idx int64) *pb.Entry
	PersistRaftState(curTerm int64, votedFor int64, appliedId int64)
	ReadRaftState() (curTerm int64, votedFor int64, appliedId int64)
}

type MemLog

type MemLog struct {
	// contains filtered or unexported fields
}

func MakeMemLog

func MakeMemLog() *MemLog

func (*MemLog) Append

func (memLog *MemLog) Append(newEnt *pb.Entry)

func (*MemLog) EraseAfter

func (memLog *MemLog) EraseAfter(logidx int64, withDel bool) []*pb.Entry

func (*MemLog) EraseBefore

func (memLog *MemLog) EraseBefore(logidx int64, withDel bool) ([]*pb.Entry, error)

func (*MemLog) GetEntry

func (memLog *MemLog) GetEntry(idx int64) *pb.Entry

func (*MemLog) GetFirst

func (memLog *MemLog) GetFirst() *pb.Entry

func (*MemLog) GetFirstLogId

func (memLog *MemLog) GetFirstLogId() uint64

func (*MemLog) GetLast

func (memLog *MemLog) GetLast() *pb.Entry

func (*MemLog) GetLastLogId

func (memLog *MemLog) GetLastLogId() uint64

func (*MemLog) GetRange

func (memLog *MemLog) GetRange(lo, hi int64) []*pb.Entry

func (*MemLog) LogItemCount

func (memLog *MemLog) LogItemCount() int

func (*MemLog) PersistRaftState

func (memLog *MemLog) PersistRaftState(curTerm int64, votedFor int64, appliedId int64)

func (*MemLog) ReInitLogs

func (memLog *MemLog) ReInitLogs() error

func (*MemLog) ReadRaftState

func (memLog *MemLog) ReadRaftState() (curTerm int64, votedFor int64, appliedId int64)

func (*MemLog) ResetFirstLogEntry

func (memLog *MemLog) ResetFirstLogEntry(term int64, index int64) error

type NodeRole

type NodeRole uint8
const (
	NodeRoleFollower NodeRole = iota
	NodeRoleCandidate
	NodeRoleLeader
)

raft node state

type PersisRaftLog

type PersisRaftLog struct {
	// contains filtered or unexported fields
}

func MakePersistRaftLog

func MakePersistRaftLog(newdbEng storage_eng.KvStore) *PersisRaftLog

MakePersistRaftLog make a persist raft log model

newdbEng: a LevelDBKvStore storage engine

func (*PersisRaftLog) Append

func (rfLog *PersisRaftLog) Append(newEnt *pb.Entry)

Append

append a new entry to raftlog, put it to storage engine

func (*PersisRaftLog) EraseAfter

func (rfLog *PersisRaftLog) EraseAfter(logidx int64, withDel bool) []*pb.Entry

EraseAfter erase after idx, !!!WRANNING!!! is withDel is true, this operation will delete log key in storage engine

func (*PersisRaftLog) EraseBefore

func (rfLog *PersisRaftLog) EraseBefore(logidx int64, withDel bool) ([]*pb.Entry, error)

EraseBefore erase log before from idx, and copy [idx:] log return this operation don't modity log in storage engine

func (*PersisRaftLog) GetEntry

func (rfLog *PersisRaftLog) GetEntry(idx int64) *pb.Entry

GetEntry get log entry with idx

func (*PersisRaftLog) GetFirst

func (rfLog *PersisRaftLog) GetFirst() *pb.Entry

GetFirst

get the first entry from storage engine

func (*PersisRaftLog) GetFirstLogId

func (rfLog *PersisRaftLog) GetFirstLogId() uint64

GetFirstLogId get the first log id from storage engine

func (*PersisRaftLog) GetLast

func (rfLog *PersisRaftLog) GetLast() *pb.Entry

GetLast

get the last entry from storage engine

func (*PersisRaftLog) GetLastLogId

func (rfLog *PersisRaftLog) GetLastLogId() uint64

GetLastLogId

get the last log id from storage engine

func (*PersisRaftLog) GetRange

func (rfLog *PersisRaftLog) GetRange(lo, hi int64) []*pb.Entry

GetRange get range log from storage engine, and return the copy [lo, hi)

func (*PersisRaftLog) LogItemCount

func (rfLog *PersisRaftLog) LogItemCount() int

LogItemCount

get total log count from storage engine

func (*PersisRaftLog) PersistRaftState

func (rfLog *PersisRaftLog) PersistRaftState(curTerm int64, votedFor int64, appliedId int64)

PersistRaftState Persistent storage raft state (curTerm, and votedFor) you can find this design in raft paper figure2 State definition

func (*PersisRaftLog) ReInitLogs

func (rfLog *PersisRaftLog) ReInitLogs() error

ReInitLogs make logs to init state

func (*PersisRaftLog) ReadRaftState

func (rfLog *PersisRaftLog) ReadRaftState() (curTerm int64, votedFor int64, appliedId int64)

ReadRaftState read the persist curTerm, votedFor for node from storage engine

func (*PersisRaftLog) ResetFirstLogEntry

func (rfLog *PersisRaftLog) ResetFirstLogEntry(term int64, index int64) error

type Raft

type Raft struct {
	// contains filtered or unexported fields
}

raft stack definition

func MakeRaft

func MakeRaft(peers []*RaftPeerNode, me int64, newdbEng storage_eng.KvStore, applyCh chan *pb.ApplyMsg, heartbeatTimeOutMs uint64, baseElectionTimeOutMs uint64) *Raft

func (*Raft) Append

func (rf *Raft) Append(command []byte, cliId int64) *pb.Entry

func (*Raft) Applier

func (rf *Raft) Applier()

Applier() Write the commited message to the applyCh channel and update lastApplied

func (*Raft) BroadcastAppend

func (rf *Raft) BroadcastAppend()

func (*Raft) BroadcastHeartbeat

func (rf *Raft) BroadcastHeartbeat()

BroadcastHeartbeat broadcast heartbeat to peers

func (*Raft) CloseEndsConn

func (rf *Raft) CloseEndsConn()

CloseEndsConn close rpc client connect

func (*Raft) CondInstallSnapshot

func (rf *Raft) CondInstallSnapshot(lastIncluedTerm int, lastIncludedIndex int) bool

func (*Raft) Election

func (rf *Raft) Election()

Election make a new election

func (*Raft) GetLeaderId

func (rf *Raft) GetLeaderId() int64

func (*Raft) GetState

func (rf *Raft) GetState() (int, bool)

func (*Raft) HandleAppendEntries

func (rf *Raft) HandleAppendEntries(req *pb.AppendEntriesRequest, resp *pb.AppendEntriesResponse)

func (*Raft) HandleInstallSnapshot

func (rf *Raft) HandleInstallSnapshot(request *pb.InstallSnapshotRequest, response *pb.InstallSnapshotResponse)

install snapshot from leader

func (*Raft) HandleRequestVote

func (rf *Raft) HandleRequestVote(req *pb.RequestVoteRequest, resp *pb.RequestVoteResponse)

HandleRequestVote handle request vote from other node

func (*Raft) IncrCurrentTerm

func (rf *Raft) IncrCurrentTerm()

func (*Raft) IncrGrantedVotes

func (rf *Raft) IncrGrantedVotes()

func (*Raft) IsKilled

func (rf *Raft) IsKilled() bool

func (*Raft) IsLeader

func (rf *Raft) IsLeader() bool

func (*Raft) Kill

func (rf *Raft) Kill()

func (*Raft) MatchLog

func (rf *Raft) MatchLog(term, index int64) bool

MatchLog is log matched

func (*Raft) PersistRaftState

func (rf *Raft) PersistRaftState()

func (*Raft) Propose

func (rf *Raft) Propose(payload []byte, cliId int64) (int, int, bool)

func (*Raft) Replicator

func (rf *Raft) Replicator(peer *RaftPeerNode)

Replicator manager duplicate run

func (*Raft) StartSnapshot

func (rf *Raft) StartSnapshot(snap_idx uint64) error

func (*Raft) SwitchRaftNodeRole

func (rf *Raft) SwitchRaftNodeRole(role NodeRole)

func (*Raft) Tick

func (rf *Raft) Tick()

Tick raft heart, this ticket trigger raft main flow running

type RaftPeerNode

type RaftPeerNode struct {
	// contains filtered or unexported fields
}

func MakeRaftPeerNode

func MakeRaftPeerNode(addr string, id uint64) *RaftPeerNode

func (*RaftPeerNode) CloseAllConn

func (rfEnd *RaftPeerNode) CloseAllConn()

func (*RaftPeerNode) GetRaftServiceCli

func (rfEnd *RaftPeerNode) GetRaftServiceCli() *raftpb.RaftServiceClient

func (*RaftPeerNode) Id

func (rfEnd *RaftPeerNode) Id() uint64

type RaftPersistenState

type RaftPersistenState struct {
	CurTerm   int64
	VotedFor  int64
	AppliedId int64
}

func DecodeRaftState

func DecodeRaftState(in []byte) *RaftPersistenState

DecodeRaftState decode RaftPersistenState from bytes sequence

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL