Documentation ¶
Index ¶
- Constants
- Variables
- func LogLevel() int
- func MakeHTTPClient(timeout time.Duration) *http.Client
- func RegisterCommand(command Command)
- func SetLogLevel(level int)
- type AppendEntriesRequest
- type AppendEntriesResponse
- func (aer *AppendEntriesResponse) CommitIndex() uint64
- func (resp *AppendEntriesResponse) Decode(r io.Reader) (int, error)
- func (resp *AppendEntriesResponse) Encode(w io.Writer) (int, error)
- func (aer *AppendEntriesResponse) Index() uint64
- func (aer *AppendEntriesResponse) Success() bool
- func (aer *AppendEntriesResponse) Term() uint64
- type Command
- type CommandApply
- type CommandEncoder
- type Config
- type Context
- type DefaultJoinCommand
- type DefaultLeaveCommand
- type Event
- type EventListener
- type HTTPMuxer
- type HTTPTransporter
- func (t *HTTPTransporter) AppendEntriesPath() string
- func (t *HTTPTransporter) Install(server Server, mux HTTPMuxer)
- func (t *HTTPTransporter) Prefix() string
- func (t *HTTPTransporter) RequestVotePath() string
- func (t *HTTPTransporter) SendAppendEntriesRequest(server Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse
- func (t *HTTPTransporter) SendSnapshotRecoveryRequest(server Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse
- func (t *HTTPTransporter) SendSnapshotRequest(server Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse
- func (t *HTTPTransporter) SendVoteRequest(server Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse
- func (t *HTTPTransporter) SnapshotPath() string
- func (t *HTTPTransporter) SnapshotRecoveryPath() string
- type JoinCommand
- type LeaveCommand
- type Log
- type LogEntry
- type NOPCommand
- type Peer
- type RequestVoteRequest
- type RequestVoteResponse
- type Server
- type Snapshot
- type SnapshotRecoveryRequest
- type SnapshotRecoveryResponse
- type SnapshotRequest
- type SnapshotResponse
- type StateMachine
- type TimeoutConn
- func (c *TimeoutConn) Close() error
- func (c *TimeoutConn) LocalAddr() net.Addr
- func (c *TimeoutConn) Read(b []byte) (n int, err error)
- func (c *TimeoutConn) RemoteAddr() net.Addr
- func (c *TimeoutConn) SetDeadline(t time.Time) error
- func (c *TimeoutConn) SetReadDeadline(t time.Time) error
- func (c *TimeoutConn) SetWriteDeadline(t time.Time) error
- func (c *TimeoutConn) Write(b []byte) (n int, err error)
- type Transporter
Constants ¶
const ( Debug = 1 Trace = 2 )
const ( StateChangeEventType = "stateChange" LeaderChangeEventType = "leaderChange" TermChangeEventType = "termChange" CommitEventType = "commit" AddPeerEventType = "addPeer" RemovePeerEventType = "removePeer" HeartbeatIntervalEventType = "heartbeatInterval" ElectionTimeoutThresholdEventType = "electionTimeoutThreshold" HeartbeatEventType = "heartbeat" )
const ( Stopped = "stopped" Initialized = "initialized" Follower = "follower" Candidate = "candidate" Leader = "leader" Snapshotting = "snapshotting" )
const ( MaxLogEntriesPerRequest = 2000 NumberOfLogEntriesAfterSnapshot = 200 )
const ( // DefaultHeartbeatInterval is the interval that the leader will send // AppendEntriesRequests to followers to maintain leadership. // ElectionTimeoutThresholdPercent: leader定期发送心跳给followers维护leader关系的间隔 DefaultHeartbeatInterval = 50 * time.Millisecond // 默认选举超时,当follower在指定的时间内未收到leader的心跳RPC,则进入candidate状态 DefaultElectionTimeout = 150 * time.Millisecond )
const ElectionTimeoutThresholdPercent = 0.8
ElectionTimeoutThresholdPercent specifies the threshold at which the server will dispatch warning events that the heartbeat RTT is too close to the election timeout. ElectionTimeoutThresholdPercent 指定了当server的heartbeat的RTT太接近与选举时间时, 分配一个告警事件
Variables ¶
var CommandTimeoutError = errors.New("raft: Command timeout")
var DuplicatePeerError = errors.New("raft.Server: Duplicate peer")
var NotLeaderError = errors.New("raft.Server: Not current leader")
var StopError = errors.New("raft: Has been stopped")
Functions ¶
func RegisterCommand ¶
func RegisterCommand(command Command)
Registers a command by storing a reference to an instance of it.
func SetLogLevel ¶
func SetLogLevel(level int)
Types ¶
type AppendEntriesRequest ¶
type AppendEntriesRequest struct { Term uint64 PrevLogIndex uint64 PrevLogTerm uint64 CommitIndex uint64 LeaderName string Entries []*protobuf.LogEntry }
The request sent to a server to append entries to the log.
type AppendEntriesResponse ¶
type AppendEntriesResponse struct {
// contains filtered or unexported fields
}
The response returned from a server appending entries to the log.
func (*AppendEntriesResponse) CommitIndex ¶
func (aer *AppendEntriesResponse) CommitIndex() uint64
func (*AppendEntriesResponse) Decode ¶
func (resp *AppendEntriesResponse) Decode(r io.Reader) (int, error)
Decodes the AppendEntriesResponse from a buffer. Returns the number of bytes read and any error that occurs.
func (*AppendEntriesResponse) Encode ¶
func (resp *AppendEntriesResponse) Encode(w io.Writer) (int, error)
Encodes the AppendEntriesResponse to a buffer. Returns the number of bytes written and any error that may have occurred. 将AppendEntries响应编码到一个缓存中 返回写入的字节数和错误信息
func (*AppendEntriesResponse) Index ¶
func (aer *AppendEntriesResponse) Index() uint64
func (*AppendEntriesResponse) Success ¶
func (aer *AppendEntriesResponse) Success() bool
func (*AppendEntriesResponse) Term ¶
func (aer *AppendEntriesResponse) Term() uint64
type Command ¶
type Command interface {
CommandName() string
}
Command represents an action to be taken on the replicated state machine.
type CommandApply ¶
CommandApply represents the interface to apply a command to the server.
type CommandEncoder ¶
type Context ¶
type Context interface { Server() Server CurrentTerm() uint64 CurrentIndex() uint64 CommitIndex() uint64 }
Context represents the current state of the server. It is passed into a command when the command is being applied since the server methods are locked.
type DefaultJoinCommand ¶
type DefaultJoinCommand struct { Name string `json:"name"` ConnectionString string `json:"connectionString"` }
Join command
func (*DefaultJoinCommand) Apply ¶
func (c *DefaultJoinCommand) Apply(server Server) (interface{}, error)
func (*DefaultJoinCommand) CommandName ¶
func (c *DefaultJoinCommand) CommandName() string
The name of the Join command in the log
func (*DefaultJoinCommand) NodeName ¶
func (c *DefaultJoinCommand) NodeName() string
type DefaultLeaveCommand ¶
type DefaultLeaveCommand struct {
Name string `json:"name"`
}
Leave command
func (*DefaultLeaveCommand) Apply ¶
func (c *DefaultLeaveCommand) Apply(server Server) (interface{}, error)
func (*DefaultLeaveCommand) CommandName ¶
func (c *DefaultLeaveCommand) CommandName() string
The name of the Leave command in the log
func (*DefaultLeaveCommand) NodeName ¶
func (c *DefaultLeaveCommand) NodeName() string
type Event ¶
type Event interface { Type() string Source() interface{} Value() interface{} PrevValue() interface{} }
Event represents an action that occurred within the Raft library. Listeners can subscribe to event types by using the Server.AddEventListener() function.
type EventListener ¶
type EventListener func(Event)
EventListener is a function that can receive event notifications.
type HTTPMuxer ¶
type HTTPMuxer interface {
HandleFunc(string, func(http.ResponseWriter, *http.Request))
}
type HTTPTransporter ¶
type HTTPTransporter struct { DisableKeepAlives bool Transport *http.Transport RoundTripper http.RoundTripper // contains filtered or unexported fields }
An HTTPTransporter is a default transport layer used to communicate between multiple servers.
func NewHTTPTransporter ¶
func NewHTTPTransporter(prefix string, timeout time.Duration) *HTTPTransporter
使用一个指定的路径"/raft"创建一个HTTP transporter Creates a new HTTP transporter with the given path prefix.
func (*HTTPTransporter) AppendEntriesPath ¶
func (t *HTTPTransporter) AppendEntriesPath() string
Retrieves the AppendEntries path.
func (*HTTPTransporter) Install ¶
func (t *HTTPTransporter) Install(server Server, mux HTTPMuxer)
将处理各种请求的URL绑定到路由处理器上 Applies Raft routes to an HTTP router for a given server.
func (*HTTPTransporter) Prefix ¶
func (t *HTTPTransporter) Prefix() string
Retrieves the path prefix used by the transporter.
func (*HTTPTransporter) RequestVotePath ¶
func (t *HTTPTransporter) RequestVotePath() string
Retrieves the RequestVote path.
func (*HTTPTransporter) SendAppendEntriesRequest ¶
func (t *HTTPTransporter) SendAppendEntriesRequest(server Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse
Sends an AppendEntries RPC to a peer.
func (*HTTPTransporter) SendSnapshotRecoveryRequest ¶
func (t *HTTPTransporter) SendSnapshotRecoveryRequest(server Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse
Sends a SnapshotRequest RPC to a peer.
func (*HTTPTransporter) SendSnapshotRequest ¶
func (t *HTTPTransporter) SendSnapshotRequest(server Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse
Sends a SnapshotRequest RPC to a peer.
func (*HTTPTransporter) SendVoteRequest ¶
func (t *HTTPTransporter) SendVoteRequest(server Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse
Sends a RequestVote RPC to a peer.
func (*HTTPTransporter) SnapshotPath ¶
func (t *HTTPTransporter) SnapshotPath() string
Retrieves the Snapshot path.
func (*HTTPTransporter) SnapshotRecoveryPath ¶
func (t *HTTPTransporter) SnapshotRecoveryPath() string
Retrieves the SnapshotRecovery path.
type LeaveCommand ¶
Leave command interface
type Log ¶
type Log struct { ApplyFunc func(*LogEntry, Command) (interface{}, error) // contains filtered or unexported fields }
A log is a collection of log entries that are persisted to durable storage. 日志是日志记录的集合,并且有持久化的保证。
func (*Log) CommitIndex ¶
最后被提交的日志的index The last committed index in the log.
type LogEntry ¶
type LogEntry struct { Position int64 // position in the log file // contains filtered or unexported fields }
A log entry stores a single item in the log.
func (*LogEntry) CommandName ¶
func (*LogEntry) Decode ¶
Decodes the log entry from a buffer. Returns the number of bytes read and any error that occurs. 从缓冲区中读取log记录。返回读取到字节数,以及可能遇到的错误。
type NOPCommand ¶
type NOPCommand struct { }
NOP command
func (NOPCommand) Apply ¶
func (c NOPCommand) Apply(server Server) (interface{}, error)
func (NOPCommand) CommandName ¶
func (c NOPCommand) CommandName() string
The name of the NOP command in the log
type Peer ¶
type Peer struct { Name string `json:"name"` ConnectionString string `json:"connectionString"` sync.RWMutex // contains filtered or unexported fields }
A peer is a reference to another server involved in the consensus protocol.
func (*Peer) LastActivity ¶
LastActivity returns the last time any response was received from the peer.
type RequestVoteRequest ¶
type RequestVoteRequest struct { Term uint64 LastLogIndex uint64 LastLogTerm uint64 CandidateName string // contains filtered or unexported fields }
The request sent to a server to vote for a candidate to become a leader.
type RequestVoteResponse ¶
type RequestVoteResponse struct { Term uint64 VoteGranted bool // contains filtered or unexported fields }
The response returned from a server after a vote for a candidate to become a leader.
type Server ¶
type Server interface { Name() string Context() interface{} StateMachine() StateMachine Leader() string State() string Path() string LogPath() string SnapshotPath(lastIndex uint64, lastTerm uint64) string Term() uint64 CommitIndex() uint64 VotedFor() string MemberCount() int QuorumSize() int IsLogEmpty() bool LogEntries() []*LogEntry LastCommandName() string GetState() string ElectionTimeout() time.Duration SetElectionTimeout(duration time.Duration) HeartbeatInterval() time.Duration SetHeartbeatInterval(duration time.Duration) Transporter() Transporter SetTransporter(t Transporter) AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse RequestVote(req *RequestVoteRequest) *RequestVoteResponse RequestSnapshot(req *SnapshotRequest) *SnapshotResponse SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse AddPeer(name string, connectiongString string) error RemovePeer(name string) error Peers() map[string]*Peer Init() error Start() error Stop() Running() bool Do(command Command) (interface{}, error) TakeSnapshot() error LoadSnapshot() error AddEventListener(string, EventListener) FlushCommitIndex() }
A server is involved in the consensus protocol and can act as a follower, candidate or a leader.
func NewServer ¶
func NewServer(name string, path string, transporter Transporter, stateMachine StateMachine, ctx interface{}, connectionString string) (Server, error)
Creates a new server with a log at the given path. transporter must not be nil. stateMachine can be nil if snapshotting and log compaction is to be disabled. context can be anything (including nil) and is not used by the raft package except returned by Server.Context(). connectionString can be anything.
使用指定的log和路径创建一个新的server. transporter不能为空。 如果log compaction和快照功能被禁止,则stateMachine可以为空。 contenxt可以为任何对象(包括nil),并且除了使用Server.Context()之外, contexnt不会被任何raft的包使用。 connectionString可以为任意对象。
type Snapshot ¶
type Snapshot struct { LastIndex uint64 `json:"lastIndex"` LastTerm uint64 `json:"lastTerm"` // Cluster configuration. Peers []*Peer `json:"peers"` State []byte `json:"state"` Path string `json:"path"` }
Snapshot represents an in-memory representation of the current state of the system.
type SnapshotRecoveryRequest ¶
type SnapshotRecoveryRequest struct { LeaderName string LastIndex uint64 LastTerm uint64 Peers []*Peer State []byte }
The request sent to a server to start from the snapshot.
type SnapshotRecoveryResponse ¶
The response returned from a server appending entries to the log.
type SnapshotRequest ¶
The request sent to a server to start from the snapshot.
type SnapshotResponse ¶
type SnapshotResponse struct {
Success bool `json:"success"`
}
The response returned if the follower entered snapshot state
type StateMachine ¶
StateMachine is the interface for allowing the host application to save and recovery the state machine. This makes it possible to make snapshots and compact the log.
type TimeoutConn ¶
type TimeoutConn struct {
// contains filtered or unexported fields
}
func NewTimeoutConn ¶
func NewTimeoutConn(conn net.Conn, timeout time.Duration) *TimeoutConn
func (*TimeoutConn) Close ¶
func (c *TimeoutConn) Close() error
func (*TimeoutConn) LocalAddr ¶
func (c *TimeoutConn) LocalAddr() net.Addr
func (*TimeoutConn) RemoteAddr ¶
func (c *TimeoutConn) RemoteAddr() net.Addr
func (*TimeoutConn) SetDeadline ¶
func (c *TimeoutConn) SetDeadline(t time.Time) error
func (*TimeoutConn) SetReadDeadline ¶
func (c *TimeoutConn) SetReadDeadline(t time.Time) error
func (*TimeoutConn) SetWriteDeadline ¶
func (c *TimeoutConn) SetWriteDeadline(t time.Time) error
type Transporter ¶
type Transporter interface { SendVoteRequest(server Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse SendAppendEntriesRequest(server Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse SendSnapshotRequest(server Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse SendSnapshotRecoveryRequest(server Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse }
Transporter is the interface for allowing the host application to transport requests to other nodes.