Documentation ¶
Index ¶
- func ReadShardFromFile(dir string, replicaId uint64) (*gossip.RaftShardMessage, error)
- func WriteMetadataToFile(dir string, id uint16, meta *RaftMetadata) error
- type GRPCConfig
- type MoveTo
- type RaftConfig
- type RaftMetadata
- type RaftOpType
- type StateMachine
- func (r *StateMachine) Close() error
- func (r *StateMachine) Lookup(query interface{}) (interface{}, error)
- func (r *StateMachine) Open(stopChan <-chan struct{}) (uint64, error)
- func (r *StateMachine) PrepareSnapshot() (interface{}, error)
- func (r *StateMachine) RecoverFromSnapshot(reader io.Reader, stopChan <-chan struct{}) error
- func (r *StateMachine) SaveSnapshot(snapshot interface{}, writer io.Writer, stopChan <-chan struct{}) error
- func (r *StateMachine) Sync() error
- func (r *StateMachine) Update(entries []sm.Entry) ([]sm.Entry, error)
- type Storage
- func (s *Storage) AddRaftNode(replicaId uint64, target string, shardIds []uint64) error
- func (s *Storage) AddRaftObserver(replicaId uint64, target string, shardIds []uint64) error
- func (s *Storage) ChangeRaftNodeShardIds(add bool, shardIds []uint64) error
- func (s *Storage) Del(cfName string, hashKey, key []byte) error
- func (s *Storage) DelPrefix(cfName string, hashKey, prefix []byte) error
- func (s *Storage) Get(cfName string, hashKey []byte, linearizable bool, key []byte) (uint64, []byte, error)
- func (s *Storage) GetAliveInstances() map[string]bool
- func (s *Storage) GetLocalMembership() map[uint64]*gossip.MemberInfo
- func (s *Storage) GetNodeHost() []string
- func (s *Storage) GetRaftMembership() ([]*gossip.MemberInfo, error)
- func (s *Storage) GetReplicaId() uint64
- func (s *Storage) GetShardMessage() *gossip.RaftShardMessage
- func (s *Storage) GetTarget() string
- func (s *Storage) Put(cfName string, hashKey []byte, key, val []byte) (uint64, error)
- func (s *Storage) RaftReady() error
- func (s *Storage) RemoveRaftNode(replicaId uint64, shardIds []uint64) error
- func (s *Storage) Search(cfName string, hashKey []byte, linearizable bool, prefix []byte) ([][]byte, error)
- func (s *Storage) StopRaftNode()
- func (s *Storage) TryLock(lockTimeout uint64, cfName string, key []byte) (bool, error)
- func (s *Storage) TryUnLock(cfName string, key []byte) (bool, error)
- func (s *Storage) UpdateShardMessage(shard *gossip.RaftShardMessage)
- func (s *Storage) WriteShardToFile(shard *gossip.RaftShardMessage) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ReadShardFromFile ¶
func ReadShardFromFile(dir string, replicaId uint64) (*gossip.RaftShardMessage, error)
ReadShardFromFile 集群启动成功,外部管理Storage的程序需要调用读取本地文件的shard信息
func WriteMetadataToFile ¶
func WriteMetadataToFile(dir string, id uint16, meta *RaftMetadata) error
存储raft启动的meta数据
Types ¶
type GRPCConfig ¶
type MoveTo ¶
type MoveTo struct { pb.UnimplementedMoveToServer // contains filtered or unexported fields }
func (*MoveTo) MoveToInvoke ¶
func (m *MoveTo) MoveToInvoke(ctx context.Context, req *pb.MoveToCommand) (*pb.MoveToResponse, error)
func (*MoveTo) RaftNodeInvoke ¶
func (m *MoveTo) RaftNodeInvoke(ctx context.Context, req *pb.RaftInvokeOp) (*pb.MoveToResponse, error)
type RaftConfig ¶
type RaftConfig struct { LogDir string LogLevel zapcore.Level // 本机IP地址 HostIP string // ReplicaId一旦生成不能变动 ReplicaId uint64 // 该节点被分配的shardIds ShardIds []uint64 // raft通信地址 RaftAddr string // 用于moveTo命令时对方raft节点的grpc端口 GrpcPort uint16 // raft shard的分组个数, 用于hashKey计算shardId MultiGroupSize uint32 // 数据存储地址 StorageDir string // 是否以join的方式加入raft集群 Join map[uint64]map[uint64]bool // 此参数需注意的是: // 采用gossip方式启动时val是nodehostId, 详细参考dragonboat的文档, // 初次可以使用dragonboat id.NewNodeHostID(id uint64)来生成; // 若采用raftAddr固定不变的方式启动,val就是raftAddr // key: shardId, key:replicaId InitialMembers map[uint64]map[uint64]string // 如果raft集群采用gossip可变IP的方式启动需设置 Gossip bool GossipPort uint16 GossipSeeds []string // dragonboat 是否开启metrics Metrics bool // 自己内部的gossip配置 GossipConfig gossip.GossipConfig }
type RaftMetadata ¶
type RaftMetadata struct { // raft shard的分组个数, 用于hashKey计算shardId MultiGroupSize uint32 `json:"multiGroupSize"` // 本地的节点ID ReplicaId uint64 `json:"replicaId"` // raft节点的IP地址 // 启动前需校验机器IP是否发生变化 LocalIP string `json:"localIP"` // raft的通信端口号 RaftPort uint16 `json:"raftPort"` // moveTo grpc端口号 GrpcPort uint16 `json:"grpcPort"` // 是否采用gossip方式启动 Gossip bool `json:"gossip"` // 节点shardId分配的版本号 Revision int64 `json:"revision"` // 如果raft集群采用gossip可变IP的方式启动需设置 GossipPort uint16 `json:"gossipPort"` GossipSeeds []string `json:"gossipSeeds"` // dragonboat 是否开启metrics Metrics bool `json:"metrics"` // 自己内部的gossip配置 GossipConfig gossip.GossipConfig `json:"gossipConfig"` }
func ReadMetadataFromFile ¶
func ReadMetadataFromFile(dir string, id uint16) (*RaftMetadata, error)
读取raft启动的meta数据
type StateMachine ¶
type StateMachine struct { ShardId uint64 ReplicaID uint64 // contains filtered or unexported fields }
func (*StateMachine) Close ¶
func (r *StateMachine) Close() error
func (*StateMachine) Lookup ¶
func (r *StateMachine) Lookup(query interface{}) (interface{}, error)
func (*StateMachine) Open ¶
func (r *StateMachine) Open(stopChan <-chan struct{}) (uint64, error)
func (*StateMachine) PrepareSnapshot ¶
func (r *StateMachine) PrepareSnapshot() (interface{}, error)
func (*StateMachine) RecoverFromSnapshot ¶
func (r *StateMachine) RecoverFromSnapshot(reader io.Reader, stopChan <-chan struct{}) error
func (*StateMachine) SaveSnapshot ¶
func (r *StateMachine) SaveSnapshot(snapshot interface{}, writer io.Writer, stopChan <-chan struct{}) error
func (*StateMachine) Sync ¶
func (r *StateMachine) Sync() error
type Storage ¶
type Storage struct {
// contains filtered or unexported fields
}
func NewStorage ¶
func NewStorage(ServerGrpcAddr, metricsAddr string, cfg *RaftConfig) (*Storage, error)
func (*Storage) AddRaftNode ¶
AddRaftNode 添加raft节点,可用于机器的新增或已有机器新增shardIds
func (*Storage) AddRaftObserver ¶
AddRaftObserver 添加raft observer节点,可用于机器的新增或已有机器新增shardIds
func (*Storage) ChangeRaftNodeShardIds ¶
func (*Storage) GetAliveInstances ¶
func (*Storage) GetLocalMembership ¶
func (s *Storage) GetLocalMembership() map[uint64]*gossip.MemberInfo
func (*Storage) GetNodeHost ¶
func (*Storage) GetRaftMembership ¶
func (s *Storage) GetRaftMembership() ([]*gossip.MemberInfo, error)
func (*Storage) GetReplicaId ¶
func (*Storage) GetShardMessage ¶
func (s *Storage) GetShardMessage() *gossip.RaftShardMessage
func (*Storage) RemoveRaftNode ¶
RemoveRaftNodeShards 下线当前Node所在的部分shardIds
func (*Storage) StopRaftNode ¶
func (s *Storage) StopRaftNode()
func (*Storage) UpdateShardMessage ¶
func (s *Storage) UpdateShardMessage(shard *gossip.RaftShardMessage)
更新集群的情况
func (*Storage) WriteShardToFile ¶
func (s *Storage) WriteShardToFile(shard *gossip.RaftShardMessage) error
更新集群后需要存储到文件
Source Files ¶
Click to show internal directories.
Click to hide internal directories.