raft

package module
v0.0.0-...-09af9fc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 20, 2020 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

View Source
const (
	RoleLeader = iota
	RoleCandidate
	RoleFollower
)
View Source
const (
	K8S      = "K8S"       // K8S=true
	BrokerID = "BROKER_ID" // BROKER_ID=1

)

Variables

This section is empty.

Functions

func Character

func Character() int

func LeaderURL

func LeaderURL() string

func NewRaft

func NewRaft()

func RegisterRaftServer

func RegisterRaftServer(s *grpc.Server, srv RaftServer)

func VersionAdd

func VersionAdd()

Types

type HB

type HB struct {
	// contains filtered or unexported fields
}

HB 组合心跳发送参数

type HBeat

type HBeat struct {
	// Leader节点的任期
	Term int32 `protobuf:"varint,1,opt,name=term,proto3" json:"term,omitempty"`
	// Leader节点的ID
	LeaderId string `protobuf:"bytes,2,opt,name=leaderId,proto3" json:"leaderId,omitempty"`
	// 当前配置版本 index 递增
	Version int32 `protobuf:"varint,4,opt,name=version,proto3" json:"version,omitempty"`
	// 当前term同步配置信息
	Config               []byte   `protobuf:"bytes,5,opt,name=config,proto3" json:"config,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

hBeat 用于Leader节点复制日志给其他节点,也作为心跳

prevLogIndex和prevLogTerm表示上一次发送的日志的索引和任期,用于保证收到的日志是连续的

func (*HBeat) Descriptor

func (*HBeat) Descriptor() ([]byte, []int)

func (*HBeat) GetConfig

func (m *HBeat) GetConfig() []byte

func (*HBeat) GetLeaderId

func (m *HBeat) GetLeaderId() string

func (*HBeat) GetTerm

func (m *HBeat) GetTerm() int32

func (*HBeat) GetVersion

func (m *HBeat) GetVersion() int32

func (*HBeat) ProtoMessage

func (*HBeat) ProtoMessage()

func (*HBeat) Reset

func (m *HBeat) Reset()

func (*HBeat) String

func (m *HBeat) String() string

func (*HBeat) XXX_DiscardUnknown

func (m *HBeat) XXX_DiscardUnknown()

func (*HBeat) XXX_Marshal

func (m *HBeat) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*HBeat) XXX_Merge

func (m *HBeat) XXX_Merge(src proto.Message)

func (*HBeat) XXX_Size

func (m *HBeat) XXX_Size() int

func (*HBeat) XXX_Unmarshal

func (m *HBeat) XXX_Unmarshal(b []byte) error

type HBeatReturn

type HBeatReturn struct {
	// 当前任期号,用于Leader节点更新自己的任期(应该说是如果这个返回值比Leader自身的任期大,那么Leader需要更新自己的任期)
	Term int32 `protobuf:"varint,1,opt,name=term,proto3" json:"term,omitempty"`
	// 如果Follower节点匹配prevLogIndex和prevLogTerm,返回true
	Success              bool     `protobuf:"varint,2,opt,name=success,proto3" json:"success,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

hBeatReturn 接收者实现逻辑

返回false,如果收到的任期比当前任期小

返回false,如果不包含之前的日志条目(没有匹配prevLogIndex和prevLogTerm)

如果存在index相同但是term不相同的日志,删除从该位置开始所有的日志

追加所有不存在的日志

如果leaderCommit>commitIndex,将commitIndex设置为commitIndex = min(leaderCommit, index of last new entry)

func (*HBeatReturn) Descriptor

func (*HBeatReturn) Descriptor() ([]byte, []int)

func (*HBeatReturn) GetSuccess

func (m *HBeatReturn) GetSuccess() bool

func (*HBeatReturn) GetTerm

func (m *HBeatReturn) GetTerm() int32

func (*HBeatReturn) ProtoMessage

func (*HBeatReturn) ProtoMessage()

func (*HBeatReturn) Reset

func (m *HBeatReturn) Reset()

func (*HBeatReturn) String

func (m *HBeatReturn) String() string

func (*HBeatReturn) XXX_DiscardUnknown

func (m *HBeatReturn) XXX_DiscardUnknown()

func (*HBeatReturn) XXX_Marshal

func (m *HBeatReturn) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*HBeatReturn) XXX_Merge

func (m *HBeatReturn) XXX_Merge(src proto.Message)

func (*HBeatReturn) XXX_Size

func (m *HBeatReturn) XXX_Size() int

func (*HBeatReturn) XXX_Unmarshal

func (m *HBeatReturn) XXX_Unmarshal(b []byte) error

type Node

type Node struct {
	// 节点ID
	Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
	// 节点地址
	Url                  string   `protobuf:"bytes,2,opt,name=url,proto3" json:"url,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

node 节点信息

func (*Node) Descriptor

func (*Node) Descriptor() ([]byte, []int)

func (*Node) GetId

func (m *Node) GetId() string

func (*Node) GetUrl

func (m *Node) GetUrl() string

func (*Node) ProtoMessage

func (*Node) ProtoMessage()

func (*Node) Reset

func (m *Node) Reset()

func (*Node) String

func (m *Node) String() string

func (*Node) XXX_DiscardUnknown

func (m *Node) XXX_DiscardUnknown()

func (*Node) XXX_Marshal

func (m *Node) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*Node) XXX_Merge

func (m *Node) XXX_Merge(src proto.Message)

func (*Node) XXX_Size

func (m *Node) XXX_Size() int

func (*Node) XXX_Unmarshal

func (m *Node) XXX_Unmarshal(b []byte) error

type Raft

type Raft struct {
	// contains filtered or unexported fields
}

Raft 接收客户端提交的同步内容,被封装在自定义的方法中

也返回客户端期望的同步结果及从其他节点同步过来的信息

type RaftClient

type RaftClient interface {
	// HeartBeat 发送心跳
	Heartbeat(ctx context.Context, in *HBeat, opts ...grpc.CallOption) (*HBeatReturn, error)
	// RequestVote 发起选举,索要选票
	RequestVote(ctx context.Context, in *ReqVote, opts ...grpc.CallOption) (*ReqVoteReturn, error)
}

RaftClient is the client API for Raft service.

For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.

func NewRaftClient

func NewRaftClient(cc *grpc.ClientConn) RaftClient

type RaftServer

type RaftServer interface {
	// HeartBeat 发送心跳
	Heartbeat(context.Context, *HBeat) (*HBeatReturn, error)
	// RequestVote 发起选举,索要选票
	RequestVote(context.Context, *ReqVote) (*ReqVoteReturn, error)
}

RaftServer is the server API for Raft service.

type ReqVote

type ReqVote struct {
	// Candidate的任期
	Term int32 `protobuf:"varint,1,opt,name=term,proto3" json:"term,omitempty"`
	// Candidate的ID
	CandidateId string `protobuf:"bytes,2,opt,name=candidateId,proto3" json:"candidateId,omitempty"`
	// Candidate的URL
	Url string `protobuf:"bytes,3,opt,name=url,proto3" json:"url,omitempty"`
	// Candidate最后Leader节点的ID
	LastLeaderId string `protobuf:"bytes,4,opt,name=lastLeaderId,proto3" json:"lastLeaderId,omitempty"`
	// Candidate最后一条日志的版本
	LastVersion int32 `protobuf:"varint,5,opt,name=lastVersion,proto3" json:"lastVersion,omitempty"`
	// Candidate最后一条日志的任期
	LastTerm int32 `protobuf:"varint,6,opt,name=lastTerm,proto3" json:"lastTerm,omitempty"`
	// 时间戳ns
	Timestamp            int64    `protobuf:"varint,7,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

reqVote 用于Candidate获取选票

func (*ReqVote) Descriptor

func (*ReqVote) Descriptor() ([]byte, []int)

func (*ReqVote) GetCandidateId

func (m *ReqVote) GetCandidateId() string

func (*ReqVote) GetLastLeaderId

func (m *ReqVote) GetLastLeaderId() string

func (*ReqVote) GetLastTerm

func (m *ReqVote) GetLastTerm() int32

func (*ReqVote) GetLastVersion

func (m *ReqVote) GetLastVersion() int32

func (*ReqVote) GetTerm

func (m *ReqVote) GetTerm() int32

func (*ReqVote) GetTimestamp

func (m *ReqVote) GetTimestamp() int64

func (*ReqVote) GetUrl

func (m *ReqVote) GetUrl() string

func (*ReqVote) ProtoMessage

func (*ReqVote) ProtoMessage()

func (*ReqVote) Reset

func (m *ReqVote) Reset()

func (*ReqVote) String

func (m *ReqVote) String() string

func (*ReqVote) XXX_DiscardUnknown

func (m *ReqVote) XXX_DiscardUnknown()

func (*ReqVote) XXX_Marshal

func (m *ReqVote) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*ReqVote) XXX_Merge

func (m *ReqVote) XXX_Merge(src proto.Message)

func (*ReqVote) XXX_Size

func (m *ReqVote) XXX_Size() int

func (*ReqVote) XXX_Unmarshal

func (m *ReqVote) XXX_Unmarshal(b []byte) error

type ReqVoteReturn

type ReqVoteReturn struct {
	// 当前任期,用于Candidate更新自己的任期
	Term int32 `protobuf:"varint,1,opt,name=term,proto3" json:"term,omitempty"`
	// true表示给Candidate投票
	VoteGranted          bool     `protobuf:"varint,2,opt,name=voteGranted,proto3" json:"voteGranted,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

接收者的实现逻辑

返回false,如果收到的任期比当前任期小

如果本地状态中votedFor为null或者candidateId,且candidate的日志等于或多余(按照index判断)接收者的日志,则接收者投票给candidate,即返回true

func (*ReqVoteReturn) Descriptor

func (*ReqVoteReturn) Descriptor() ([]byte, []int)

func (*ReqVoteReturn) GetTerm

func (m *ReqVoteReturn) GetTerm() int32

func (*ReqVoteReturn) GetVoteGranted

func (m *ReqVoteReturn) GetVoteGranted() bool

func (*ReqVoteReturn) ProtoMessage

func (*ReqVoteReturn) ProtoMessage()

func (*ReqVoteReturn) Reset

func (m *ReqVoteReturn) Reset()

func (*ReqVoteReturn) String

func (m *ReqVoteReturn) String() string

func (*ReqVoteReturn) XXX_DiscardUnknown

func (m *ReqVoteReturn) XXX_DiscardUnknown()

func (*ReqVoteReturn) XXX_Marshal

func (m *ReqVoteReturn) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*ReqVoteReturn) XXX_Merge

func (m *ReqVoteReturn) XXX_Merge(src proto.Message)

func (*ReqVoteReturn) XXX_Size

func (m *ReqVoteReturn) XXX_Size() int

func (*ReqVoteReturn) XXX_Unmarshal

func (m *ReqVoteReturn) XXX_Unmarshal(b []byte) error

type Server

type Server struct{}

func (*Server) Heartbeat

func (s *Server) Heartbeat(_ context.Context, hBeat *HBeat) (hbr *HBeatReturn, err error)

HeartBeat 发送心跳

func (*Server) RequestVote

func (s *Server) RequestVote(_ context.Context, rv *ReqVote) (rvr *ReqVoteReturn, err error)

RequestVote 发起选举,索要选票

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL