Documentation ¶
Index ¶
- Variables
- func IsRaftLeadershipError(err error) bool
- func RaftLeaderFromStatusDetails(err error) (*raftnodepb.RaftNode, bool)
- func WithRaftLeaderStatusDetails(err error, raft RaftLeaderLocator) error
- type Config
- type FSM
- type Leader
- type LeaderActivity
- type Node
- func (n *Node) AddNode(request *raftnodepb.AddNodeRequest) (*raftnodepb.AddNodeResponse, error)
- func (n *Node) AppliedIndex() uint64
- func (n *Node) DemoteLeader(request *raftnodepb.DemoteLeaderRequest) (*raftnodepb.DemoteLeaderResponse, error)
- func (n *Node) Init() (err error)
- func (n *Node) ListSnapshots() ([]*raft.SnapshotMeta, error)
- func (n *Node) NodeInfo() (*raftnodepb.NodeInfo, error)
- func (n *Node) PromoteToLeader(request *raftnodepb.PromoteToLeaderRequest) (*raftnodepb.PromoteToLeaderResponse, error)
- func (n *Node) Propose(t fsm.RaftLogEntryType, m proto.Message) (resp proto.Message, err error)
- func (n *Node) ReadIndex() (ReadIndex, error)
- func (n *Node) Register(server *grpc.Server)
- func (n *Node) RemoveNode(request *raftnodepb.RemoveNodeRequest) (*raftnodepb.RemoveNodeResponse, error)
- func (n *Node) RunOnLeader(a LeaderActivity)
- func (n *Node) Shutdown()
- func (n *Node) TransferLeadership() (err error)
- type Observer
- type RaftLeaderLocator
- type RaftNode
- type RaftNodeService
- func (svc *RaftNodeService) AddNode(_ context.Context, r *raftnodepb.AddNodeRequest) (*raftnodepb.AddNodeResponse, error)
- func (svc *RaftNodeService) DemoteLeader(_ context.Context, r *raftnodepb.DemoteLeaderRequest) (*raftnodepb.DemoteLeaderResponse, error)
- func (svc *RaftNodeService) NodeInfo(context.Context, *raftnodepb.NodeInfoRequest) (*raftnodepb.NodeInfoResponse, error)
- func (svc *RaftNodeService) PromoteToLeader(_ context.Context, r *raftnodepb.PromoteToLeaderRequest) (*raftnodepb.PromoteToLeaderResponse, error)
- func (svc *RaftNodeService) ReadIndex(context.Context, *raftnodepb.ReadIndexRequest) (*raftnodepb.ReadIndexResponse, error)
- func (svc *RaftNodeService) RemoveNode(_ context.Context, r *raftnodepb.RemoveNodeRequest) (*raftnodepb.RemoveNodeResponse, error)
- type ReadIndex
- type StateHandler
- type StateReader
Constants ¶
This section is empty.
Variables ¶
Functions ¶
func IsRaftLeadershipError ¶
func RaftLeaderFromStatusDetails ¶
func RaftLeaderFromStatusDetails(err error) (*raftnodepb.RaftNode, bool)
func WithRaftLeaderStatusDetails ¶
func WithRaftLeaderStatusDetails(err error, raft RaftLeaderLocator) error
Types ¶
type Config ¶
type Config struct { Dir string `yaml:"dir"` BootstrapPeers []string `yaml:"bootstrap_peers"` BootstrapExpectPeers int `yaml:"bootstrap_expect_peers"` ServerID string `yaml:"server_id"` BindAddress string `yaml:"bind_address"` AdvertiseAddress string `yaml:"advertise_address"` ApplyTimeout time.Duration `yaml:"apply_timeout" doc:"hidden"` LogIndexCheckInterval time.Duration `yaml:"log_index_check_interval" doc:"hidden"` ReadIndexMaxDistance uint64 `yaml:"read_index_max_distance" doc:"hidden"` WALCacheEntries uint64 `yaml:"wal_cache_entries" doc:"hidden"` TrailingLogs uint64 `yaml:"trailing_logs" doc:"hidden"` SnapshotsRetain uint64 `yaml:"snapshots_retain" doc:"hidden"` SnapshotInterval time.Duration `yaml:"snapshot_interval" doc:"hidden"` SnapshotThreshold uint64 `yaml:"snapshot_threshold" doc:"hidden"` TransportConnPoolSize uint64 `yaml:"transport_conn_pool_size" doc:"hidden"` TransportTimeout time.Duration `yaml:"transport_timeout" doc:"hidden"` }
func (*Config) RegisterFlagsWithPrefix ¶
type LeaderActivity ¶
type LeaderActivity interface { Start() Stop() }
LeaderActivity is started when the node becomes a leader and stopped when it stops being a leader. The implementation MUST be idempotent.
type Node ¶
type Node struct {
// contains filtered or unexported fields
}
func NewNode ¶
func NewNode( logger log.Logger, config Config, reg prometheus.Registerer, fsm raft.FSM, ) (_ *Node, err error)
func (*Node) AddNode ¶
func (n *Node) AddNode(request *raftnodepb.AddNodeRequest) (*raftnodepb.AddNodeResponse, error)
func (*Node) AppliedIndex ¶
func (*Node) DemoteLeader ¶
func (n *Node) DemoteLeader(request *raftnodepb.DemoteLeaderRequest) (*raftnodepb.DemoteLeaderResponse, error)
func (*Node) ListSnapshots ¶
func (n *Node) ListSnapshots() ([]*raft.SnapshotMeta, error)
func (*Node) PromoteToLeader ¶
func (n *Node) PromoteToLeader(request *raftnodepb.PromoteToLeaderRequest) (*raftnodepb.PromoteToLeaderResponse, error)
func (*Node) Propose ¶
Propose makes an attempt to apply the given command to the FSM. The function returns an error if node is not the leader.
func (*Node) RemoveNode ¶
func (n *Node) RemoveNode(request *raftnodepb.RemoveNodeRequest) (*raftnodepb.RemoveNodeResponse, error)
func (*Node) RunOnLeader ¶
func (n *Node) RunOnLeader(a LeaderActivity)
func (*Node) TransferLeadership ¶
type Observer ¶
type Observer struct {
// contains filtered or unexported fields
}
func NewRaftStateObserver ¶
func (*Observer) Deregister ¶
func (o *Observer) Deregister()
func (*Observer) RegisterHandler ¶
func (o *Observer) RegisterHandler(h StateHandler)
type RaftLeaderLocator ¶
type RaftLeaderLocator interface {
LeaderWithID() (raft.ServerAddress, raft.ServerID)
}
type RaftNode ¶
type RaftNode interface { ReadIndex() (ReadIndex, error) NodeInfo() (*raftnodepb.NodeInfo, error) RemoveNode(request *raftnodepb.RemoveNodeRequest) (*raftnodepb.RemoveNodeResponse, error) AddNode(request *raftnodepb.AddNodeRequest) (*raftnodepb.AddNodeResponse, error) DemoteLeader(request *raftnodepb.DemoteLeaderRequest) (*raftnodepb.DemoteLeaderResponse, error) PromoteToLeader(request *raftnodepb.PromoteToLeaderRequest) (*raftnodepb.PromoteToLeaderResponse, error) }
type RaftNodeService ¶
type RaftNodeService struct { raftnodepb.RaftNodeServiceServer // contains filtered or unexported fields }
func NewRaftNodeService ¶
func NewRaftNodeService(node RaftNode) *RaftNodeService
func (*RaftNodeService) AddNode ¶
func (svc *RaftNodeService) AddNode( _ context.Context, r *raftnodepb.AddNodeRequest, ) (*raftnodepb.AddNodeResponse, error)
func (*RaftNodeService) DemoteLeader ¶
func (svc *RaftNodeService) DemoteLeader( _ context.Context, r *raftnodepb.DemoteLeaderRequest, ) (*raftnodepb.DemoteLeaderResponse, error)
func (*RaftNodeService) NodeInfo ¶
func (svc *RaftNodeService) NodeInfo( context.Context, *raftnodepb.NodeInfoRequest, ) (*raftnodepb.NodeInfoResponse, error)
func (*RaftNodeService) PromoteToLeader ¶
func (svc *RaftNodeService) PromoteToLeader( _ context.Context, r *raftnodepb.PromoteToLeaderRequest, ) (*raftnodepb.PromoteToLeaderResponse, error)
func (*RaftNodeService) ReadIndex ¶
func (svc *RaftNodeService) ReadIndex( context.Context, *raftnodepb.ReadIndexRequest, ) (*raftnodepb.ReadIndexResponse, error)
ReadIndex returns the current commit index and verifies leadership.
func (*RaftNodeService) RemoveNode ¶
func (svc *RaftNodeService) RemoveNode( _ context.Context, r *raftnodepb.RemoveNodeRequest, ) (*raftnodepb.RemoveNodeResponse, error)
type ReadIndex ¶
type ReadIndex struct { // CommitIndex is the index of the last log entry that was committed by // the leader and is guaranteed to be present on all followers. CommitIndex uint64 // Term the leader was in when the entry was committed. Term uint64 }
ReadIndex is the lower bound for the state any query must operate against. However, it does not guarantee snapshot isolation or an upper bound (which is the applied index of the state machine being queried).
Refer to https://web.stanford.edu/~ouster/cgi-bin/papers/OngaroPhD.pdf, paragraph 6.4, "Processing read-only queries more efficiently".
type StateHandler ¶
StateHandler is called every time the Raft state change is observed.
type StateReader ¶
type StateReader[Tx any] struct { // contains filtered or unexported fields }
StateReader represents the read-only state of the replicated state machine. It allows performing read-only transactions on the leader's and follower's state machines.
func NewStateReader ¶
func NewStateReader[Tx any]( leader Leader, fsm FSM[Tx], checkInterval time.Duration, maxDistance uint64, ) *StateReader[Tx]
NewStateReader creates a new interface to query the replicated state. If the provided leader implementation is the local node, the interface implements the Leader Read pattern. Otherwise, it implements the Follower Read pattern.
> This approach is more efficient than committing read-only queries as new > entries in the log, since it avoids synchronous disk writes. To improve > efficiency further, the leader can amortize the cost of confirming its > leadership: it can use a single round of heartbeats for any number of > read-only queries that it has accumulated. > > Followers could also help offload the processing of read-only queries. > This would improve the system’s read throughput, and it would also > divert load away from the leader, allowing the leader to process more > read-write requests. However, these reads would also run the risk of > returning stale data without additional precautions. For example, a > partitioned follower might not receive any new log entries from the leader > for long periods of time, or even if a follower received a heartbeat from > a leader, that leader might itself be deposed and not yet know it. > To serve reads safely, the follower could issue a request to the leader > that just asked for a current readIndex (the leader would execute steps > 1–3 above); the follower could then execute steps 4 and 5 on its own state > machine for any number of accumulated read-only queries.
The applied index is checked on the configured interval. It the distance between the read index and the applied index exceeds the configured threshold, the operation fails with ErrLagBehind. Any error returned by the reader is wrapped with ErrConsistentRead.
func (*StateReader[Tx]) ConsistentRead ¶
func (r *StateReader[Tx]) ConsistentRead(ctx context.Context, read func(tx Tx, index ReadIndex)) error
ConsistentRead performs a read-only operation on the state machine, whether it's a leader or a follower.
The transaction passed to the provided function has read-only access to the most up-to-date data, reflecting the updates from all prior write operations that were successful. If the function returns an error, it's guaranteed that the state has not been accessed. These errors can and should be retried on another replica.
Currently, each ConsistentRead requests the new read index from the leader. It's possible to "pipeline" such queries to minimize communications by obtaining the applied index with WaitLeaderCommitIndexApplied and checking the currently applied index every time entering the transaction. Take into account that the FSM state might be changed at any time (e.g., restored from a snapshot).
It's caller's responsibility to handle errors encountered while using the provided transaction, such as I/O errors or logical inconsistencies.
func (*StateReader[tx]) WaitLeaderCommitIndexApplied ¶
func (r *StateReader[tx]) WaitLeaderCommitIndexApplied(ctx context.Context) (ReadIndex, error)
WaitLeaderCommitIndexApplied blocks until the local applied index reaches the leader read index