raft

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 30, 2021 License: Apache-2.0 Imports: 30 Imported by: 0

README

Quick Start

参数说明

  • httpPort:节点对外向客户端提供API的端口
  • nodeID:节点id,集群中唯一,第一个节点id默认为1
  • host:节点地址ip:port,节点运行地址,也是节点间进行信息通信的地址
  • isCluster:是否开启集群模式,非集群模式下的节点在其他节点选择加入的情况下会自动切换到集群模式
  • peers:peer列表,以逗号(,)分割,非集群模式下可为空
  • keyspeers参数对应的节点id列表,以逗号(,)分割,非集群模式下可为空
  • join:是否选择加入一个新集群,非集群模式下可为空
  • target:目标集群节点地址,join为true时不能为空

IService接口说明

  • // CommitHandler 集群节点commit信息前的处理
    CommitHandler(cmd string, data []byte)  (err error)
    
  • // ProcessHandler 集群节点propose信息前的处理,一般情况下由leader节点调用
    ProcessHandler(command string, propose []byte) (cmd string, data []byte, err error)
    
  • // GetInit 集群初始化时的将service缓存中的信息进行打包处理,只会在节点切换集群模式的时候调用一次
    GetInit() (cmd string, data []byte, err error)
    
  • // ResetSnap 读取快照数据,用于恢复service
    ResetSnap(data []byte) (err error)
    
  • // GetSnapshot 生成快照,用于快照文件的生成
    GetSnapshot() (data []byte, err error)
    
Demo实现
// Service Demo 
type service struct {
   store map[string]string
   mutex       sync.RWMutex
}

// KV 用于数据传输的结构
type KV struct {
   Key string
   Value string
}

// 新建service demo
func Create() *service {
   return &service{
      store: make(map[string]string),
      mutex: sync.RWMutex{},
   }
}

func (s *service) CommitHandler(cmd string, data []byte) (err error) {
   // TODO: process the command
   s.mutex.Lock()
   defer s.mutex.Unlock()
   switch cmd {
   case "set":
      kv := &KV{}
      err = json.Unmarshal(data, kv)
      s.store[kv.Key] = kv.Value
      return err
   case "init":
      return nil
   }
   return nil
}

func (s *service) ProcessHandler(command string, propose []byte) (cmd string, data []byte, err error) {
   // TODO: process the command before sending the message
   kv := &KV{}
   kv.Decode(propose)
   kv.Value = kv.Value + string(time.Now().UnixNano())
   data, err = json.Marshal(kv)
   command = "set"
   return command, data, err
}

func (s *service) GetInit() (cmd string, data []byte, err error) {
   s.mutex.RLock()
   defer s.mutex.RUnlock()
   cmd = "init"
   data, err = json.Marshal(s.store)
   return cmd, data, err
}

func (s *service) ResetSnap(data []byte) error  {
   s.mutex.Lock()
   defer s.mutex.Unlock()
   store := make(map[string]string)
   json.Unmarshal(data, &store)
   s.store = store
   return nil
}

func (s *service) GetSnapshot() ([]byte, error) {
   s.mutex.RLock()
   defer s.mutex.RUnlock()
   return json.Marshal(s.store)
}

func (kv *KV) Encode() ([]byte, error) {
   var buf bytes.Buffer
   if err := gob.NewEncoder(&buf).Encode(kv); err != nil {
      return nil, err
   }
   return buf.Bytes(), nil
}

func (kv *KV) Decode(data []byte) error {
   dec := gob.NewDecoder(bytes.NewBuffer(data))
   if err := dec.Decode(kv); err != nil {
      return err
   }
   return nil
}

开启一个非集群节点

// 基础参数设置
// 客户端接口请求端口
httpPort := 8081
// 第一个节点id默认为1
nodeID := 1
// 非集群模式
isCluster := false
// 当isCluster为false时,join需为false
join := false
// 当前节点地址
host := "http://127.0.0.1:1234"
// peer列表,非集群模式下仅有一个peer仅有节点本身
peers := "http://127.0.0.1:1234"
// peer对应的的id列表
keys := "1"
// 初始化服务,该服务已经实现了IService interface
var s = Create()
var raft = &raftNode{}
// 新建raft节点
raft, err = CreateRaftNode(nodeID, host, s, peers, keys, join, isCluster)
if err != nil {
    log.Fatal(err)
}
client := &RaftClient{
    raft: raft,
}
// 开启客户端http对外服务,主要提供了往集群中添加信息,增加节点,删除节点的对外接口,通过客户端接口可以实现节点间信息的同步
// 开发者也可以结合自己的实际需求开放针对service的其他服务
httpServer := http.NewServeMux()
httpServer.Handle("/raft/api/", client.Handler())
log.Info(fmt.Sprintf("Listen http port %d successfully", httpPort))
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", httpPort), httpServer))
CreateRaftNode函数说明
// CreateRaftNode 初始化节点
// 1、创建非集群节点,isCluster为false,此时peers可为空
// 2、创建集群节点,isCluster为true,若此时peers为空(或仅有节点本身),表示该集群仅有一个节点
// peers也可以是其余集群中的其他节点,表示这是一个多节点集群,此时其他节点也需通过同样的参数配置和方式启动,
// 推荐使用JoinCluster来新建多节点集群节点
// 3、创建加入已知集群的节点,join为true,isCluster为true,此时peers需包括其他节点地址,推荐使用JoinCluster来新建非单点集群节点
// 一般情况下,如果join为true,isCluster也默认为true
func CreateRaftNode(id int, host string, service IService, peers string, keys string, join bool, isCluster bool) (*raftNode, error)

新建节点并加入一个集群

// 客户端接口请求端口
httpPort := 8082
// 选择加入集群
join := true
// 当前节点ip
host := "http://127.0.0.1:2234"
// 目标集群的已知地址,如果target本身为非集群节点,则会先建立集群,再新建本节点
target := "http://127.0.0.1:1234"
// 初始化服务,该服务已经实现了IService interface
var s = Create()
var raft = &raftNode{}
// 新建raft节点,加入一个已知集群
raft, err = JoinCluster(host, target, s)
if err != nil {
    log.Fatal(err)
}
client := &RaftClient{
    raft: raft,
}
httpServer := http.NewServeMux()
httpServer.Handle("/raft/api/", client.Handler())
log.Info(fmt.Sprintf("Listen http port %d successfully", httpPort))
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", httpPort), httpServer))
JoinCluster函数说明
// JoinCluster 新建一个加入已知集群的节点
// 1、应用于新建一个想要加入已知集群的节点,会向已知节点发送请求以获取节点id等新建节点信息
// 已知节点如果还处于非集群模式,会先切换成集群模式
// 2、也可以用于节点crash后的重启处理
func JoinCluster(local string, target string, service IService) (*raftNode, error) 

客户端接口说明

  • /raft/api/set
    • 请求方法:POST
    • 接口描述:往集群中新增信息
    • 请求参数:json类型,开发者结合自己的实际需求规定,demo中模拟了一个kv数据库存储,使用的结构是一个kv结构,因此demo中的参数为想要在节点间同步的key,value
  • /raft/api/addNode
    • 请求方法:POST
    • 接口描述:往集群中新增节点,基于api新增的节点,需要另外通过CreateRaftNode方式新建一个指定id的节点
    • 参数类型:JSON
    • 请求参数:节点Id和节点地址host
  • /raft/api/deleteNode
    • 请求方法:POST
    • 接口描述:删除集群中的已有节点
    • 参数类型:JSON
    • 请求参数:节点Id
  • /raft/api/getPeerList
    • 请求方法:GET
    • 接口描述:获取集群中已有的节点列表

相关流程图说明

往节点中新增信息(即调用/raft/api/set)
  • 节点处于非集群模式

    fig1

  • 节点处于集群模式

    fig2

Documentation

Index

Constants

View Source
const INIT commandType = 1
View Source
const PROPOSE commandType = 2

Variables

View Source
var ErrCompacted = errors.New("requested index is unavailable due to compaction")

ErrCompacted is returned by Storage.Entries/Compact when a requested index is unavailable because it predates the last snapshot.

View Source
var ErrSnapOutOfDate = errors.New("requested index is older than the existing snapshot")

ErrSnapOutOfDate is returned by Storage.CreateSnapshot when a requested index is older than the existing snapshot.

View Source
var ErrSnapshotTemporarilyUnavailable = errors.New("snapshot is temporarily unavailable")

ErrSnapshotTemporarilyUnavailable is returned by the Storage interface when the required snapshot is temporarily unavailable.

View Source
var ErrUnavailable = errors.New("requested entry at index is unavailable")

ErrUnavailable is returned by Storage interface when the requested log entries are unavailable.

Functions

func Create

func Create() *service

func JoinCluster

func JoinCluster(rc *Node, broadCastIP string, broadPort int, address string, protocol string) error

JoinCluster 新建一个加入已知集群的请求 1、应用于新建一个想要加入已知集群的节点,会向已知节点发送请求获取id等新建节点信息 已知节点如果还处于非集群模式,会先切换成集群模式 2、也可以用于节点crash后的重启处理

Types

type GetNodeInfoRequest added in v0.1.0

type GetNodeInfoRequest struct {
	BroadcastIP   string `json:"broadcast_ip"`
	BroadcastPort int    `json:"broadcast_port"`
	Protocol      string `json:"protocol"`
	Target        string `json:"target"`
}

type IRaftSender added in v0.1.0

type IRaftSender interface {
	Send(msg []byte) (interface{}, error)
}

type IRaftService added in v0.1.0

type IRaftService interface {
	IService
}

type IService

type IService interface {
	// CommitHandler 节点commit信息前的处理
	CommitHandler(data []byte) (err error)

	// ProcessHandler 节点propose信息前的处理,leader发起或者未启用集群时使用
	//ProcessHandler(body interface{}) (data []byte, err error)
	// ProcessHandler 转发到leader时的处理
	ProcessDataHandler(body []byte) (object interface{}, data []byte, err error)

	// GetInit 集群初始化时的将service缓存中的信息进行打包处理,只会在切换集群模式的时候调用一次
	GetInit() (data []byte, err error)

	// ResetSnap 读取快照,用于恢复service数据
	ResetSnap(data []byte) (err error)

	// GetSnapshot 生成快照,用于快照文件的生成
	GetSnapshot() (data []byte, err error)

	SetRaft(raft IRaftSender)
}

type JoinRequest added in v0.1.0

type JoinRequest struct {
	NodeID        uint64 `json:"node_id"`
	NodeKey       string `json:"node_key"`
	BroadcastIP   string `json:"broadcast_ip"`
	BroadcastPort int    `json:"broadcast_port"`
	Protocol      string `json:"protocol"`
	Target        string `json:"target"`
}

type JoinResponse added in v0.1.0

type JoinResponse struct {
	*NodeSecret
	Peer map[uint64]*NodeInfo `json:"peer"`
}

type KV

type KV struct {
	Key   string
	Value string
}

KV 用于传输的结构

func (*KV) Decode

func (kv *KV) Decode(data []byte) error

func (*KV) Encode

func (kv *KV) Encode() ([]byte, error)

type MemoryStorage

type MemoryStorage struct {
	// Protects access to all fields. Most methods of MemoryStorage are
	// run on the raft goroutine, but Append() is run on an application
	// goroutine.
	sync.Mutex
	// contains filtered or unexported fields
}

MemoryStorage implements the Storage interface backed by an in-memory array.

func NewMemoryStorage

func NewMemoryStorage() *MemoryStorage

NewMemoryStorage creates an empty MemoryStorage.

func (*MemoryStorage) Append

func (ms *MemoryStorage) Append(entries []pb.Entry) error

Append the new entries to storage. TODO (xiangli): ensure the entries are continuous and entries[0].Index > ms.entries[0].Index

func (*MemoryStorage) ApplySnapshot

func (ms *MemoryStorage) ApplySnapshot(snap pb.Snapshot) error

ApplySnapshot overwrites the contents of this Storage object with those of the given snapshot.

func (*MemoryStorage) Compact

func (ms *MemoryStorage) Compact(compactIndex uint64) error

Compact discards all log entries prior to compactIndex. It is the application's responsibility to not attempt to compact an index greater than raftLog.applied.

func (*MemoryStorage) CreateSnapshot

func (ms *MemoryStorage) CreateSnapshot(i uint64, cs *pb.ConfState, data []byte) (pb.Snapshot, error)

CreateSnapshot makes a snapshot which can be retrieved with Snapshot() and can be used to reconstruct the state at that point. If any configuration changes have been made since the last compaction, the result of the last ApplyConfChange must be passed in.

func (*MemoryStorage) Entries

func (ms *MemoryStorage) Entries(lo, hi, maxSize uint64) ([]pb.Entry, error)

Entries implements the Storage interface.

func (*MemoryStorage) FirstIndex

func (ms *MemoryStorage) FirstIndex() (uint64, error)

FirstIndex implements the Storage interface.

func (*MemoryStorage) InitialState

func (ms *MemoryStorage) InitialState() (pb.HardState, pb.ConfState, error)

InitialState implements the Storage interface.

func (*MemoryStorage) LastIndex

func (ms *MemoryStorage) LastIndex() (uint64, error)

LastIndex implements the Storage interface.

func (*MemoryStorage) SetHardState

func (ms *MemoryStorage) SetHardState(st pb.HardState) error

SetHardState saves the current HardState.

func (*MemoryStorage) Snapshot

func (ms *MemoryStorage) Snapshot() (pb.Snapshot, error)

Snapshot implements the Storage interface.

func (*MemoryStorage) Term

func (ms *MemoryStorage) Term(i uint64) (uint64, error)

Term implements the Storage interface.

func (*MemoryStorage) UpdateConState

func (ms *MemoryStorage) UpdateConState(cs *pb.ConfState) error

UpdateConState eosc新增,快照更新snapshot的ConfState

type Message

type Message struct {
	Type commandType
	From uint64
	Cmd  string
	Data []byte
}

Message 发送Propose和Init消息结构

func (*Message) Decode

func (m *Message) Decode(data []byte) error

func (*Message) Encode

func (m *Message) Encode() ([]byte, error)

type Node added in v0.1.0

type Node struct {
	// contains filtered or unexported fields
}

raft节点结构

func NewNode added in v0.1.0

func NewNode(service IService) (*Node, error)

NewNode 新建raft节点

func (*Node) AddNode added in v0.1.0

func (rc *Node) AddNode(nodeID uint64, data []byte) error

AddNode 客户端发送增加节点的发送处理

func (*Node) Addr added in v0.1.0

func (rc *Node) Addr() string

func (*Node) DeleteConfigChange added in v0.1.0

func (rc *Node) DeleteConfigChange() error

DeleteConfigChange 客户端发送删除节点的发送处理

func (*Node) GetPeers added in v0.1.0

func (rc *Node) GetPeers() (map[uint64]*NodeInfo, uint64, error)

GetPeers 获取集群的peer列表,供API调用

func (*Node) InitSend added in v0.1.0

func (rc *Node) InitSend() error

func (*Node) IsIDRemoved added in v0.1.0

func (rc *Node) IsIDRemoved(id uint64) bool

func (*Node) IsJoin added in v0.1.0

func (rc *Node) IsJoin() bool

func (*Node) NodeID added in v0.1.0

func (rc *Node) NodeID() uint64

func (*Node) NodeKey added in v0.1.0

func (rc *Node) NodeKey() string

func (*Node) Process added in v0.1.0

func (rc *Node) Process(ctx context.Context, m raftpb.Message) error

func (*Node) ProcessData added in v0.1.0

func (rc *Node) ProcessData(data []byte) error

func (*Node) ProcessInitData added in v0.1.0

func (rc *Node) ProcessInitData(data []byte) error

func (*Node) ReadSnap added in v0.1.0

func (rc *Node) ReadSnap(snapshotter *snap.Snapshotter, init bool) error

ReadSnap 读取快照内容到service

func (*Node) ReportSnapshot added in v0.1.0

func (rc *Node) ReportSnapshot(id uint64, status raft.SnapshotStatus)

func (*Node) ReportUnreachable added in v0.1.0

func (rc *Node) ReportUnreachable(id uint64)

func (*Node) Send added in v0.1.0

func (rc *Node) Send(msg []byte) (interface{}, error)

Send 客户端发送propose请求的处理 由客户端API调用或Leader收到转发后调用 如果是非集群模式(isCluster为false),直接处理(即service.ProcessHandler后直接service.CommitHandler) 如果是集群模式,分两种情况 1、当前节点是leader,经service.ProcessHandler后由node.Propose处理后返回, 后续会由各个节点的node.Ready读取后进行Commit时由service.CommitHandler处理 2、当前节点不是leader,获取当前leader节点地址,转发至leader进行处理(rc.proposeHandler), leader收到请求后经service.ProcessHandler后由node.Propose处理后返回, 后续会由各个节点的node.Ready读取后进行Commit时由service.CommitHandler处理

func (*Node) ServeHTTP added in v0.1.0

func (rc *Node) ServeHTTP(writer http.ResponseWriter, request *http.Request)

func (*Node) Status added in v0.1.0

func (rc *Node) Status() raft.Status

func (*Node) Stop added in v0.1.0

func (rc *Node) Stop()

func (*Node) UpdateHostInfo added in v0.1.0

func (rc *Node) UpdateHostInfo(addr string) error

type NodeInfo added in v0.1.0

type NodeInfo struct {
	*NodeSecret
	BroadcastIP   string `json:"broadcast_ip"`
	BroadcastPort int    `json:"broadcast_port"`
	Addr          string `json:"addr"`
	Protocol      string `json:"protocol"`
}

func (*NodeInfo) Marshal added in v0.1.0

func (n *NodeInfo) Marshal() []byte

type NodeSecret added in v0.1.0

type NodeSecret struct {
	ID  uint64 `json:"id"`
	Key string `json:"key"`
}

type Peers

type Peers struct {
	// contains filtered or unexported fields
}

func NewPeers

func NewPeers() *Peers

func (*Peers) CheckExist

func (p *Peers) CheckExist(host string) (uint64, bool)

CheckExist 判断host对应的ID是否存在

func (*Peers) DeletePeerByID

func (p *Peers) DeletePeerByID(id uint64)

DeletePeerByID 通过ID删除节点

func (*Peers) GetAllPeers

func (p *Peers) GetAllPeers() map[uint64]*NodeInfo

GetAllPeers 获取所有节点列表

func (*Peers) GetPeerByID

func (p *Peers) GetPeerByID(id uint64) (*NodeInfo, bool)

func (*Peers) GetPeerNum

func (p *Peers) GetPeerNum() int

func (*Peers) Index added in v0.1.0

func (p *Peers) Index() uint64

func (*Peers) SetPeer

func (p *Peers) SetPeer(id uint64, value *NodeInfo)

type ProposeMsg

type ProposeMsg struct {
	From uint64 `json:"from"`
	Body []byte `json:"body"`
}

type Response

type Response struct {
	Code string      `json:"code"`
	Msg  string      `json:"msg"`
	Data interface{} `json:"data,omitempty"`
	Err  string      `json:"error,omitempty"`
}

type SnapStore

type SnapStore struct {
	Data              []byte
	Peer              map[types.ID]string
	ConfigChangeCount int
	Id                int
}

SnapStore 用于快照处理的结构

func (*SnapStore) Decode

func (s *SnapStore) Decode(data []byte) error

func (*SnapStore) Encode

func (s *SnapStore) Encode() ([]byte, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL