Documentation ¶
Index ¶
- Constants
- Variables
- type Config
- type NodeManager
- type NodeResolver
- type Partition
- type PartitionConfig
- type PartitionFsm
- type PartitionStatus
- type PeerAddress
- type RaftStore
- type RocksDBStore
- func (rs *RocksDBStore) BatchPut(cmdMap map[string][]byte, isSync bool) error
- func (rs *RocksDBStore) Del(key interface{}, isSync bool) (result interface{}, err error)
- func (rs *RocksDBStore) DeleteKeyAndPutIndex(key string, cmdMap map[string][]byte, isSync bool) error
- func (rs *RocksDBStore) Get(key interface{}) (result interface{}, err error)
- func (rs *RocksDBStore) Iterator(snapshot *gorocksdb.Snapshot) *gorocksdb.Iterator
- func (rs *RocksDBStore) Open(lruCacheSize, writeBufferSize int) error
- func (rs *RocksDBStore) Put(key, value interface{}, isSync bool) (result interface{}, err error)
- func (rs *RocksDBStore) ReleaseSnapshot(snapshot *gorocksdb.Snapshot)
- func (rs *RocksDBStore) Replace(key string, value interface{}, isSync bool) (result interface{}, err error)
- func (rs *RocksDBStore) RocksDBSnapshot() *gorocksdb.Snapshot
- func (rs *RocksDBStore) SeekForPrefix(prefix []byte) (result map[string][]byte, err error)
Constants ¶
const ( DefaultHeartbeatPort = 5901 DefaultReplicaPort = 5902 DefaultNumOfLogsToRetain = 20000 DefaultTickInterval = 300 DefaultElectionTick = 3 )
Constants for network port definition.
Variables ¶
var ( ErrNoSuchNode = errors.New("no such node") ErrIllegalAddress = errors.New("illegal address") ErrUnknownSocketType = errors.New("unknown socket type") )
Error definitions.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { NodeID uint64 // Identity of raft server instance. RaftPath string // Path of raft logs IPAddr string // IP address HeartbeatPort int ReplicaPort int NumOfLogsToRetain uint64 // number of logs to be kept after truncation. The default value is 20000. // TickInterval is the interval of timer which check heartbeat and election timeout. // The default value is 300,unit is millisecond. TickInterval int // ElectionTick is the election timeout. If a follower does not receive any message // from the leader of current term during ElectionTick, it will become candidate and start an election. // ElectionTick must be greater than HeartbeatTick. // We suggest to use ElectionTick = 10 * HeartbeatTick to avoid unnecessary leader switching. // The default value is 1s. ElectionTick int MaxSnapConcurrency int }
Config defines the configuration properties for the raft store.
type NodeManager ¶
type NodeManager interface { // add node address with specified port. AddNodeWithPort(nodeID uint64, addr string, heartbeat int, replicate int) // delete node address information DeleteNode(nodeID uint64) }
NodeManager defines the necessary methods for node address management.
type NodeResolver ¶
type NodeResolver interface { raft.SocketResolver NodeManager }
NodeResolver defines the methods for node address resolving and management. It is extended from SocketResolver and NodeManager.
func NewNodeResolver ¶
func NewNodeResolver() NodeResolver
NewNodeResolver returns a new NodeResolver instance for node address management and resolving.
type Partition ¶
type Partition interface { // Submit submits command data to raft log. Submit(cmd []byte) (resp interface{}, err error) // ChaneMember submits member change event and information to raft log. ChangeMember(changeType proto.ConfChangeType, peer proto.Peer, context []byte) (resp interface{}, err error) // Stop removes the raft partition from raft server and shuts down this partition. Stop() error // Delete stops and deletes the partition. Delete() error // Status returns the current raft status. Status() (status *PartitionStatus) // LeaderTerm returns the current term of leader in the raft group. TODO what is term? LeaderTerm() (leaderID, term uint64) // IsRaftLeader returns true if this node is the leader of the raft group it belongs to. IsRaftLeader() bool // AppliedIndex returns the current index of the applied raft log in the raft store partition. AppliedIndex() uint64 // CommittedIndex returns the current index of the applied raft log in the raft store partition. CommittedIndex() uint64 // Truncate raft log Truncate(index uint64) TryToLeader(nodeID uint64) error IsOfflinePeer() bool }
Partition wraps necessary methods for raft store partition operation. Partition is a shard for multi-raft in RaftSore. RaftStore is based on multi-raft which manages multiple raft replication groups at same time through a single raft server instance and system resource.
type PartitionConfig ¶
type PartitionConfig struct { ID uint64 Applied uint64 Leader uint64 Term uint64 Peers []PeerAddress SM PartitionFsm WalPath string }
PartitionConfig defines the configuration properties for the partitions.
type PartitionFsm ¶
type PartitionFsm = raft.StateMachine
PartitionFsm wraps necessary methods include both FSM implementation and data storage operation for raft store partition. It extends from raft StateMachine and Store.
type PartitionStatus ¶
PartitionStatus is a type alias of raft.Status
type PeerAddress ¶
PeerAddress defines the set of addresses that will be used by the peers.
func (PeerAddress) String ¶ added in v1.4.0
func (p PeerAddress) String() string
type RaftStore ¶
type RaftStore interface { CreatePartition(cfg *PartitionConfig) (Partition, error) Stop() RaftConfig() *raft.Config RaftStatus(raftID uint64) (raftStatus *raft.Status) NodeManager RaftServer() *raft.RaftServer }
RaftStore defines the interface for the raft store.
func NewRaftStore ¶
NewRaftStore returns a new raft store instance.
type RocksDBStore ¶
type RocksDBStore struct {
// contains filtered or unexported fields
}
RocksDBStore is a wrapper of the gorocksdb.DB
func NewRocksDBStore ¶
func NewRocksDBStore(dir string, lruCacheSize, writeBufferSize int) (store *RocksDBStore, err error)
NewRocksDBStore returns a new RocksDB instance.
func (*RocksDBStore) BatchPut ¶
func (rs *RocksDBStore) BatchPut(cmdMap map[string][]byte, isSync bool) error
BatchPut puts the key-value pairs in batch.
func (*RocksDBStore) Del ¶
func (rs *RocksDBStore) Del(key interface{}, isSync bool) (result interface{}, err error)
Del deletes a key-value pair.
func (*RocksDBStore) DeleteKeyAndPutIndex ¶
func (rs *RocksDBStore) DeleteKeyAndPutIndex(key string, cmdMap map[string][]byte, isSync bool) error
DeleteKeyAndPutIndex deletes the key-value pair based on the given key and put other keys in the cmdMap to RocksDB. TODO explain
func (*RocksDBStore) Get ¶
func (rs *RocksDBStore) Get(key interface{}) (result interface{}, err error)
Get returns the value based on the given key.
func (*RocksDBStore) Iterator ¶
func (rs *RocksDBStore) Iterator(snapshot *gorocksdb.Snapshot) *gorocksdb.Iterator
Iterator returns the iterator of the snapshot.
func (*RocksDBStore) Open ¶
func (rs *RocksDBStore) Open(lruCacheSize, writeBufferSize int) error
Open opens the RocksDB instance.
func (*RocksDBStore) Put ¶
func (rs *RocksDBStore) Put(key, value interface{}, isSync bool) (result interface{}, err error)
Put adds a new key-value pair to the RocksDB.
func (*RocksDBStore) ReleaseSnapshot ¶
func (rs *RocksDBStore) ReleaseSnapshot(snapshot *gorocksdb.Snapshot)
ReleaseSnapshot releases the snapshot and its resources.
func (*RocksDBStore) Replace ¶
func (rs *RocksDBStore) Replace(key string, value interface{}, isSync bool) (result interface{}, err error)
Put adds a new key-value pair to the RocksDB.
func (*RocksDBStore) RocksDBSnapshot ¶
func (rs *RocksDBStore) RocksDBSnapshot() *gorocksdb.Snapshot
RocksDBSnapshot returns the RocksDB snapshot.
func (*RocksDBStore) SeekForPrefix ¶
func (rs *RocksDBStore) SeekForPrefix(prefix []byte) (result map[string][]byte, err error)
SeekForPrefix seeks for the place where the prefix is located in the snapshots.