Documentation ¶
Index ¶
- Constants
- Variables
- func EncodeCluster(c ClusterInfo) (data []byte)
- func NewMemFSM() *memFSM
- func NewMemRpc(localAddr string) *memRPC
- func NewMemSnapShot() *memSnapshot
- func ValidateConfig(c *Config) (bool, string)
- type AppendEntriesFuture
- type AppendEntryPipeline
- type AppendEntryRequest
- type AppendEntryResponse
- type ApplyFuture
- type BatchFSM
- type CacheLog
- func (c *CacheLog) DeleteRange(from, to uint64) error
- func (c *CacheLog) FirstIndex() (uint64, error)
- func (c *CacheLog) GetLog(index uint64) (log *LogEntry, err error)
- func (c *CacheLog) GetLogRange(from, to uint64) (logs []*LogEntry, err error)
- func (c *CacheLog) LastIndex() (uint64, error)
- func (c *CacheLog) SetLogs(logs []*LogEntry) error
- type ClusterInfo
- type Config
- type ConfigurationStorage
- type DataBus
- type DefaultPackageParser
- type FSM
- type FastTimeoutRequest
- type FastTimeoutResponse
- type FileSnapshot
- type FileSnapshotSink
- type FileWithSync
- type FsmSnapshot
- type Future
- type IndexFuture
- type InstallSnapshotRequest
- type InstallSnapshotResponse
- type JsonRpcHandler
- type KVStorage
- type KvSchema
- type LogEntry
- type LogFuture
- type LogStore
- type LogType
- type Logger
- type MemoryStore
- func (m *MemoryStore) DeleteRange(min, max uint64) error
- func (m *MemoryStore) FirstIndex() (uint64, error)
- func (m *MemoryStore) Get(key []byte) (val []byte, err error)
- func (m *MemoryStore) GetLog(index uint64) (log *LogEntry, err error)
- func (m *MemoryStore) GetLogRange(from, to uint64) (logs []*LogEntry, err error)
- func (m *MemoryStore) GetUint64(key []byte) (uint64, error)
- func (m *MemoryStore) LastIndex() (uint64, error)
- func (m *MemoryStore) Set(key []byte, val []byte) (err error)
- func (m *MemoryStore) SetLogs(logs []*LogEntry) (err error)
- func (m *MemoryStore) SetUint64(key []byte, val uint64) (err error)
- type NetLayer
- type NetTransport
- func (n *NetTransport) AppendEntries(info *ServerInfo, request *AppendEntryRequest) (*AppendEntryResponse, error)
- func (n *NetTransport) AppendEntryPipeline(info *ServerInfo) (AppendEntryPipeline, error)
- func (n *NetTransport) Close() error
- func (n *NetTransport) CloseConnections()
- func (n *NetTransport) Consumer() <-chan *RPC
- func (n *NetTransport) DecodeAddr(bytes []byte) ServerAddr
- func (n *NetTransport) EncodeAddr(info *ServerInfo) []byte
- func (n *NetTransport) FastTimeout(info *ServerInfo, req *FastTimeoutRequest) (*FastTimeoutResponse, error)
- func (n *NetTransport) InstallSnapShot(info *ServerInfo, request *InstallSnapshotRequest, r io.Reader) (*InstallSnapshotResponse, error)
- func (n *NetTransport) LocalAddr() ServerAddr
- func (n *NetTransport) SetHeartbeatFastPath(cb fastPath)
- func (n *NetTransport) Start()
- func (n *NetTransport) VoteRequest(info *ServerInfo, request *VoteRequest) (*VoteResponse, error)
- type NetWorkTransportConfig
- type OpenSnapshot
- type PackageParser
- type Processor
- type ProcessorProxy
- type RPC
- type RPCHeader
- type Raft
- func (r *Raft) AddServer(peer ServerInfo, prevIndex uint64, timeout time.Duration) IndexFuture
- func (r *Raft) Apply(data []byte, timeout time.Duration) ApplyFuture
- func (r *Raft) Barrier(readIndex uint64, timeout time.Duration) Future[uint64]
- func (r *Raft) BootstrapCluster(configuration ClusterInfo) defaultFuture
- func (r *Raft) Conf() *Config
- func (r *Raft) GetConfiguration() ClusterInfo
- func (s *Raft) GetState() State
- func (r *Raft) LastApplied() uint64
- func (r *Raft) LastContact() time.Time
- func (r *Raft) LatestIndex() uint64
- func (r *Raft) LeaderInfo() (ServerID, ServerAddr)
- func (r *Raft) LeaderTransfer(id ServerID, address ServerAddr, timeout time.Duration) defaultFuture
- func (r *Raft) RaftStats() Future[map[string]interface{}]
- func (r *Raft) ReStoreSnapshot(meta *SnapshotMeta, reader io.ReadCloser) error
- func (r *Raft) ReadIndex(timeout time.Duration) Future[uint64]
- func (r *Raft) ReloadConfig(rc ReloadableConfig) error
- func (r *Raft) RemoveServer(peer ServerInfo, prevIndex uint64, timeout time.Duration) IndexFuture
- func (r *Raft) ShutDown() defaultFuture
- func (r *Raft) SnapShot() Future[OpenSnapshot]
- func (r *Raft) StateCh() <-chan *StateChange
- func (r *Raft) UpdateServer(peer ServerInfo, prevIndex uint64, timeout time.Duration) IndexFuture
- func (r *Raft) VerifyLeader() Future[bool]
- type ReloadableConfig
- type RpcConvert
- type RpcInterface
- type ServerAddr
- type ServerAddrProvider
- type ServerID
- type ServerInfo
- type ServerProcessor
- type SnapShotFutureResp
- type SnapshotMeta
- type SnapshotSink
- type SnapshotStore
- type SnapshotVersion
- type State
- type StateChange
- type Suffrage
- type TcpLayer
- type VoteRequest
- type VoteResponse
- type WithPeers
Constants ¶
const ( RpcVoteRequest rpcType = iota + 1 RpcAppendEntry RpcAppendEntryPipeline RpcInstallSnapshot RpcFastTimeout )
const ( // DefaultTimeoutScale is the default TimeoutScale in a NetworkTransport. DefaultTimeoutScale = 256 * 1024 // 256KB )
Variables ¶
var ( ErrNotExist = errors.New("not exist") ErrPipelineReplicationNotSupported = errors.New("pipeline replication not supported") ErrNotFound = customError{"not found"} ErrNotLeader = errors.New("not leader") ErrCantBootstrap = errors.New("bootstrap only works on new clusters") ErrIllegalConfiguration = errors.New("illegal clusterInfo") ErrShutDown = errors.New("shut down") ErrNotStarted = errors.New("not started") ErrLeadershipTransferInProgress = errors.New("leader ship transfer in progress") // ErrAbortedByRestore is returned when a leader fails to commit a log // entry because it's been superseded by a user snapshot restore. ErrAbortedByRestore = errors.New("snapshot restored while committing log") ErrEnqueueTimeout = errors.New("timed out enqueuing operation") ErrTimeout = errors.New("time out") ErrPipelineShutdown = errors.New("append pipeline closed") ErrNotVoter = errors.New("not voter") ErrLeadershipTransferFail = errors.New("not found transfer peer") ErrLeadershipLost = errors.New("leadership lost") ErrNothingNewToSnapshot = errors.New("nothing new to snapshot") ErrEmptyCommit = errors.New("empty commit") ErrPrevLogNotMatch = errors.New("prev log term not match") )
var ( FutureErrTimeout = errors.New("time out") FutureErrNotLeader = errors.New("not leader") )
var ( ErrKeyIsNil = customError{"key is nil"} ErrValueIsNil = customError{"value is nil"} ErrRange = customError{"from must no bigger than to"} )
var ( KeyCurrentTerm = []byte("CurrentTerm") KeyLastVoteFor = []byte("LastVoteFor") KeyLastVoteTerm = []byte("LastVoteTerm") )
Functions ¶
func EncodeCluster ¶
func EncodeCluster(c ClusterInfo) (data []byte)
func NewMemSnapShot ¶
func NewMemSnapShot() *memSnapshot
func ValidateConfig ¶
Types ¶
type AppendEntriesFuture ¶
type AppendEntriesFuture interface { Future[*AppendEntryResponse] StartAt() time.Time Request() *AppendEntryRequest }
type AppendEntryPipeline ¶
type AppendEntryPipeline interface { AppendEntries(*AppendEntryRequest) (AppendEntriesFuture, error) Consumer() <-chan AppendEntriesFuture Close() error }
type AppendEntryRequest ¶
type AppendEntryRequest struct { *RPCHeader Term uint64 PrevLogIndex uint64 PrevLogTerm uint64 Entries []*LogEntry LeaderCommit uint64 }
AppendEntryRequest 追加日志
type AppendEntryResponse ¶
type ApplyFuture ¶
type ApplyFuture interface { IndexFuture Future[nilRespFuture] }
type CacheLog ¶
type CacheLog struct {
// contains filtered or unexported fields
}
CacheLog 带缓存的 LogStore 用于减少磁盘 IO,只在执行 SetLogs, DeleteRange 时更新 cache 以保证局部性
func (*CacheLog) DeleteRange ¶
func (*CacheLog) FirstIndex ¶
func (*CacheLog) GetLogRange ¶
type ClusterInfo ¶
type ClusterInfo struct {
Servers []ServerInfo
}
func DecodeCluster ¶
func DecodeCluster(data []byte) (c ClusterInfo)
func (*ClusterInfo) Clone ¶
func (c *ClusterInfo) Clone() (copy ClusterInfo)
type Config ¶
type Config struct { ElectionTimeout time.Duration HeartbeatTimeout time.Duration LeaderLeaseTimeout time.Duration ApplyBatch bool MaxAppendEntries uint64 CommitTimeout time.Duration SnapshotInterval time.Duration SnapshotThreshold uint64 TrailingLogs uint64 Logger Logger LocalID string LeadershipCatchUpRounds uint LeadershipLostShutDown bool Debug bool // 开启后使用 expvar 包导出 raft 状态信息,可以使用 /debug/vars 路由进行访问 }
func DefaultConfig ¶
func DefaultConfig() *Config
type ConfigurationStorage ¶
type ConfigurationStorage interface { KVStorage SetConfiguration(index uint64, configuration ClusterInfo) error }
type DefaultPackageParser ¶
type DefaultPackageParser struct{}
type FSM ¶
type FSM interface { Apply(*LogEntry) interface{} ReStore(reader io.ReadCloser) error // 从快照恢复,需要自行实现觅等 Snapshot() (FsmSnapshot, error) }
type FastTimeoutRequest ¶
FastTimeoutRequest 引导 leader 直接超时
type FastTimeoutResponse ¶
type FileSnapshot ¶
type FileSnapshot struct {
// contains filtered or unexported fields
}
func NewFileSnapshot ¶
func NewFileSnapshot(dirPath string, noSync bool, retainCount int) (*FileSnapshot, error)
func (*FileSnapshot) Create ¶
func (f *FileSnapshot) Create(version SnapshotVersion, index, term uint64, configuration ClusterInfo, configurationIndex uint64, rpc RpcInterface) (SnapshotSink, error)
func (*FileSnapshot) List ¶
func (f *FileSnapshot) List() (list []*SnapshotMeta, err error)
func (*FileSnapshot) Open ¶
func (f *FileSnapshot) Open(id string) (*SnapshotMeta, io.ReadCloser, error)
type FileSnapshotSink ¶
type FileSnapshotSink struct {
// contains filtered or unexported fields
}
func (*FileSnapshotSink) Cancel ¶
func (f *FileSnapshotSink) Cancel() error
func (*FileSnapshotSink) Close ¶
func (f *FileSnapshotSink) Close() error
func (*FileSnapshotSink) ID ¶
func (f *FileSnapshotSink) ID() string
type FileWithSync ¶
type FsmSnapshot ¶
type FsmSnapshot interface { Persist(sink SnapshotSink) error Release() }
type IndexFuture ¶
type IndexFuture interface { Index() uint64 // contains filtered or unexported methods }
type InstallSnapshotRequest ¶
type InstallSnapshotRequest struct { *RPCHeader SnapshotMeta *SnapshotMeta Term uint64 }
InstallSnapshotRequest 安装快照
type InstallSnapshotResponse ¶
type JsonRpcHandler ¶
type JsonRpcHandler struct{}
JsonRpcHandler 提供 json 的序列化能力
func (*JsonRpcHandler) Deserialization ¶
func (j *JsonRpcHandler) Deserialization(data []byte, i interface{}) error
func (*JsonRpcHandler) Serialization ¶
func (j *JsonRpcHandler) Serialization(i interface{}) (bytes []byte, err error)
type KVStorage ¶
type KVStorage interface { // Get 用于存储日志 Get(key []byte) (val []byte, err error) // Set 用于存储日志 Set(key, val []byte) error // SetUint64 用于存储任期 SetUint64(key []byte, val uint64) error // GetUint64 用于返回任期 GetUint64(key []byte) (uint64, error) }
KVStorage 提供稳定存储的抽象
type LogEntry ¶
type LogEntry struct { Index uint64 // 日志的日志索引 Term uint64 // 创建日志时的任期 Data []byte // 日志内容 Type LogType // 日志类型 CreatedAt time.Time // 创建时间 }
LogEntry 日志条目,毕传 Data 、Type 字段,Index、Term、CreatedAt 字段会在 applyLog 方法中默认设置
type LogStore ¶
type LogStore interface { // FirstIndex 返回第一个写入的索引,-1 代表没有 FirstIndex() (uint64, error) // LastIndex 返回最后一个写入的索引,-1 代表没有 LastIndex() (uint64, error) // GetLog 返回指定位置的索引 GetLog(index uint64) (log *LogEntry, err error) // GetLogRange 按指定范围遍历索引,闭区间 GetLogRange(from, to uint64) (log []*LogEntry, err error) // SetLogs 追加日志 SetLogs(logs []*LogEntry) error // DeleteRange 批量删除指定范围的索引内容,用于快照生成 DeleteRange(from, to uint64) error }
LogStore 提供日志操作的抽象
func NewCacheLog ¶
NewCacheLog capacity 必须大于 0
type MemoryStore ¶
type MemoryStore struct {
// contains filtered or unexported fields
}
func NewMemoryStore ¶
func NewMemoryStore() *MemoryStore
func (*MemoryStore) DeleteRange ¶
func (m *MemoryStore) DeleteRange(min, max uint64) error
func (*MemoryStore) FirstIndex ¶
func (m *MemoryStore) FirstIndex() (uint64, error)
func (*MemoryStore) GetLogRange ¶
func (m *MemoryStore) GetLogRange(from, to uint64) (logs []*LogEntry, err error)
func (*MemoryStore) LastIndex ¶
func (m *MemoryStore) LastIndex() (uint64, error)
func (*MemoryStore) SetLogs ¶
func (m *MemoryStore) SetLogs(logs []*LogEntry) (err error)
type NetLayer ¶
type NetLayer interface { net.Listener // Dial is used to create a new outgoing connection Dial(peer ServerAddr, timeout time.Duration) (net.Conn, error) }
NetLayer 网络层抽象
type NetTransport ¶
type NetTransport struct { TimeoutScale int64 // contains filtered or unexported fields }
func NewNetTransport ¶
func NewNetTransport(conf *NetWorkTransportConfig) *NetTransport
func NewTcpTransport ¶
func (*NetTransport) AppendEntries ¶
func (n *NetTransport) AppendEntries(info *ServerInfo, request *AppendEntryRequest) (*AppendEntryResponse, error)
func (*NetTransport) AppendEntryPipeline ¶
func (n *NetTransport) AppendEntryPipeline(info *ServerInfo) (AppendEntryPipeline, error)
func (*NetTransport) Close ¶
func (n *NetTransport) Close() error
func (*NetTransport) CloseConnections ¶
func (n *NetTransport) CloseConnections()
func (*NetTransport) Consumer ¶
func (n *NetTransport) Consumer() <-chan *RPC
func (*NetTransport) DecodeAddr ¶
func (n *NetTransport) DecodeAddr(bytes []byte) ServerAddr
func (*NetTransport) EncodeAddr ¶
func (n *NetTransport) EncodeAddr(info *ServerInfo) []byte
func (*NetTransport) FastTimeout ¶
func (n *NetTransport) FastTimeout(info *ServerInfo, req *FastTimeoutRequest) (*FastTimeoutResponse, error)
func (*NetTransport) InstallSnapShot ¶
func (n *NetTransport) InstallSnapShot(info *ServerInfo, request *InstallSnapshotRequest, r io.Reader) (*InstallSnapshotResponse, error)
func (*NetTransport) LocalAddr ¶
func (n *NetTransport) LocalAddr() ServerAddr
func (*NetTransport) SetHeartbeatFastPath ¶
func (n *NetTransport) SetHeartbeatFastPath(cb fastPath)
func (*NetTransport) Start ¶
func (n *NetTransport) Start()
func (*NetTransport) VoteRequest ¶
func (n *NetTransport) VoteRequest(info *ServerInfo, request *VoteRequest) (*VoteResponse, error)
type NetWorkTransportConfig ¶
type OpenSnapshot ¶
type OpenSnapshot = func() (*SnapshotMeta, io.ReadCloser, error)
OpenSnapshot 用于 API 请求执行完快照后再需要的时候延迟打开快照
type PackageParser ¶
type ProcessorProxy ¶
type ProcessorProxy struct {
Processor
}
ProcessorProxy 服务器接口 handler 代理,提供将序列化数据,解析成接口 struct 指针的功能
func (*ProcessorProxy) Do ¶
func (p *ProcessorProxy) Do(rpcType rpcType, reqBytes interface{}, reader io.Reader) (respBytes interface{}, err error)
func (*ProcessorProxy) SetFastPath ¶
func (d *ProcessorProxy) SetFastPath(cb fastPath)
type RPC ¶
type RPC struct { RpcType rpcType Request any Response chan any Reader io.Reader // 链接读接口,安装快照的时候用 }
RPC rpc 请求的封装
type RPCHeader ¶
type RPCHeader struct { ID ServerID Addr ServerAddr }
type Raft ¶
type Raft struct {
// contains filtered or unexported fields
}
func NewRaft ¶
func NewRaft(conf *Config, fsm FSM, rpc RpcInterface, logStore LogStore, kvStore KVStorage, snapshotStore SnapshotStore) (*Raft, error)
func (*Raft) AddServer ¶
func (r *Raft) AddServer(peer ServerInfo, prevIndex uint64, timeout time.Duration) IndexFuture
func (*Raft) Apply ¶
func (r *Raft) Apply(data []byte, timeout time.Duration) ApplyFuture
Apply 向 raft 提交日志
func (*Raft) BootstrapCluster ¶
func (r *Raft) BootstrapCluster(configuration ClusterInfo) defaultFuture
func (*Raft) GetConfiguration ¶
func (r *Raft) GetConfiguration() ClusterInfo
GetConfiguration 获取集群配置
func (*Raft) LastApplied ¶
func (*Raft) LastContact ¶
func (*Raft) LatestIndex ¶
func (*Raft) LeaderInfo ¶
func (r *Raft) LeaderInfo() (ServerID, ServerAddr)
func (*Raft) LeaderTransfer ¶
func (r *Raft) LeaderTransfer(id ServerID, address ServerAddr, timeout time.Duration) defaultFuture
func (*Raft) ReStoreSnapshot ¶
func (r *Raft) ReStoreSnapshot(meta *SnapshotMeta, reader io.ReadCloser) error
func (*Raft) ReloadConfig ¶
func (r *Raft) ReloadConfig(rc ReloadableConfig) error
func (*Raft) RemoveServer ¶
func (r *Raft) RemoveServer(peer ServerInfo, prevIndex uint64, timeout time.Duration) IndexFuture
func (*Raft) SnapShot ¶
func (r *Raft) SnapShot() Future[OpenSnapshot]
func (*Raft) UpdateServer ¶
func (r *Raft) UpdateServer(peer ServerInfo, prevIndex uint64, timeout time.Duration) IndexFuture
type ReloadableConfig ¶
type RpcConvert ¶
type RpcInterface ¶
type RpcInterface interface { // Consumer 返回一个可消费的 Chan Consumer() <-chan *RPC // VoteRequest 发起投票请求 VoteRequest(*ServerInfo, *VoteRequest) (*VoteResponse, error) // AppendEntries 追加日志 AppendEntries(*ServerInfo, *AppendEntryRequest) (*AppendEntryResponse, error) // AppendEntryPipeline 以 pipe 形式追加日志 AppendEntryPipeline(*ServerInfo) (AppendEntryPipeline, error) // InstallSnapShot 安装快照 InstallSnapShot(*ServerInfo, *InstallSnapshotRequest, io.Reader) (*InstallSnapshotResponse, error) // SetHeartbeatFastPath 用于快速处理,不用经过主流程,不支持也没关系 SetHeartbeatFastPath(cb fastPath) // FastTimeout 快速超时转换为候选人 FastTimeout(*ServerInfo, *FastTimeoutRequest) (*FastTimeoutResponse, error) LocalAddr() ServerAddr EncodeAddr(info *ServerInfo) []byte DecodeAddr([]byte) ServerAddr }
type ServerAddr ¶
type ServerAddr string
type ServerAddrProvider ¶
type ServerAddrProvider interface {
GetAddr(id ServerID) (ServerAddr, error)
}
type ServerInfo ¶
type ServerInfo struct { Suffrage Suffrage Addr ServerAddr ID ServerID }
ServerInfo 节点的地址信息
type ServerProcessor ¶
type ServerProcessor struct {
// contains filtered or unexported fields
}
ServerProcessor 服务器接口 handler ,提供具体的接口处理逻辑
func (*ServerProcessor) Do ¶
func (d *ServerProcessor) Do(typ rpcType, req interface{}, reader io.Reader) (resp interface{}, err error)
Do ServerProcessor 不关心上层协议,所以不用处理第一个参数(rpcType)
func (*ServerProcessor) SetFastPath ¶
func (d *ServerProcessor) SetFastPath(cb fastPath)
type SnapShotFutureResp ¶
type SnapShotFutureResp struct {
// contains filtered or unexported fields
}
type SnapshotMeta ¶
type SnapshotMeta struct { Version SnapshotVersion ID string Index uint64 Term uint64 Configuration ClusterInfo ConfigurationIndex uint64 Size int64 }
SnapshotMeta 快照元信息
type SnapshotSink ¶
type SnapshotSink interface { io.WriteCloser ID() string Cancel() error }
SnapshotSink 快照的抽象提供写入、取消写入、返回快照 ID 的能力
type SnapshotStore ¶
type SnapshotStore interface { Open(id string) (*SnapshotMeta, io.ReadCloser, error) List() ([]*SnapshotMeta, error) Create(version SnapshotVersion, index, term uint64, configuration ClusterInfo, configurationIndex uint64, rpc RpcInterface) (SnapshotSink, error) }
SnapshotStore 快照存储的抽象,提供打开快照,创建快照,查询快照列表的能力
type SnapshotVersion ¶
type SnapshotVersion uint64
SnapshotVersion 表示快照的版本,会在以后的快照结构变更的时候使用
const (
SnapShotVersionDefault SnapshotVersion = iota + 1
)
type StateChange ¶
type StateChange struct {
Before, After State
}
type Suffrage ¶
type Suffrage int // 是否有选举权,枚举: Voter NonVoter
func (*Suffrage) MarshalText ¶
func (*Suffrage) UnmarshalText ¶
type VoteRequest ¶
type VoteRequest struct { *RPCHeader Term uint64 CandidateID ServerID LastLogIndex uint64 LastLogTerm uint64 LeaderTransfer bool }
VoteRequest 投票
type VoteResponse ¶
type WithPeers ¶
type WithPeers interface { Connect(addr ServerAddr, rpc RpcInterface) Disconnect(addr ServerAddr) DisconnectAll() }
Source Files ¶
- api.go
- cache_store.go
- command.go
- config.go
- configuration.go
- file_snapshot.go
- fsm.go
- future.go
- log.go
- main.go
- mem_snapshot.go
- mem_transport.go
- memory_log.go
- men_fsm.go
- net_protocol.go
- net_transport.go
- raft.go
- replication.go
- rpc.go
- rpc_processer.go
- snapshot.go
- state.go
- store.go
- tcp_transport.go
- transport.go
- util.go