raftnode

package
v1.11.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 19, 2024 License: AGPL-3.0 Imports: 28 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrConsistentRead = errors.New("consistent read failed")
	ErrLagBehind      = errors.New("replica has fallen too far behind")
	ErrAborted        = errors.New("aborted")
)

Functions

func IsRaftLeadershipError

func IsRaftLeadershipError(err error) bool

func RaftLeaderFromStatusDetails

func RaftLeaderFromStatusDetails(err error) (*raftnodepb.RaftNode, bool)

func WithRaftLeaderStatusDetails

func WithRaftLeaderStatusDetails(err error, raft RaftLeaderLocator) error

Types

type Config

type Config struct {
	Dir string `yaml:"dir"`

	BootstrapPeers       []string `yaml:"bootstrap_peers"`
	BootstrapExpectPeers int      `yaml:"bootstrap_expect_peers"`

	ServerID         string `yaml:"server_id"`
	BindAddress      string `yaml:"bind_address"`
	AdvertiseAddress string `yaml:"advertise_address"`

	ApplyTimeout          time.Duration `yaml:"apply_timeout" doc:"hidden"`
	LogIndexCheckInterval time.Duration `yaml:"log_index_check_interval" doc:"hidden"`
	ReadIndexMaxDistance  uint64        `yaml:"read_index_max_distance" doc:"hidden"`

	WALCacheEntries       uint64        `yaml:"wal_cache_entries" doc:"hidden"`
	TrailingLogs          uint64        `yaml:"trailing_logs" doc:"hidden"`
	SnapshotsRetain       uint64        `yaml:"snapshots_retain" doc:"hidden"`
	SnapshotInterval      time.Duration `yaml:"snapshot_interval" doc:"hidden"`
	SnapshotThreshold     uint64        `yaml:"snapshot_threshold" doc:"hidden"`
	TransportConnPoolSize uint64        `yaml:"transport_conn_pool_size" doc:"hidden"`
	TransportTimeout      time.Duration `yaml:"transport_timeout" doc:"hidden"`
}

func (*Config) RegisterFlagsWithPrefix

func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)

func (*Config) Validate

func (cfg *Config) Validate() error

type FSM

type FSM[Tx any] interface {
	AppliedIndex() uint64
	Read(func(Tx)) error
}

type Leader

type Leader interface {
	ReadIndex() (ReadIndex, error)
}

type LeaderActivity

type LeaderActivity interface {
	Start()
	Stop()
}

LeaderActivity is started when the node becomes a leader and stopped when it stops being a leader. The implementation MUST be idempotent.

type Node

type Node struct {
	// contains filtered or unexported fields
}

func NewNode

func NewNode(
	logger log.Logger,
	config Config,
	reg prometheus.Registerer,
	fsm raft.FSM,
) (_ *Node, err error)

func (*Node) AddNode

func (n *Node) AddNode(request *raftnodepb.AddNodeRequest) (*raftnodepb.AddNodeResponse, error)

func (*Node) AppliedIndex

func (n *Node) AppliedIndex() uint64

func (*Node) DemoteLeader

func (*Node) Init

func (n *Node) Init() (err error)

func (*Node) ListSnapshots

func (n *Node) ListSnapshots() ([]*raft.SnapshotMeta, error)

func (*Node) NodeInfo

func (n *Node) NodeInfo() (*raftnodepb.NodeInfo, error)

func (*Node) PromoteToLeader

func (*Node) Propose

func (n *Node) Propose(t fsm.RaftLogEntryType, m proto.Message) (resp proto.Message, err error)

Propose makes an attempt to apply the given command to the FSM. The function returns an error if node is not the leader.

func (*Node) ReadIndex

func (n *Node) ReadIndex() (ReadIndex, error)

func (*Node) Register

func (n *Node) Register(server *grpc.Server)

func (*Node) RemoveNode

func (*Node) RunOnLeader

func (n *Node) RunOnLeader(a LeaderActivity)

func (*Node) Shutdown

func (n *Node) Shutdown()

func (*Node) TransferLeadership

func (n *Node) TransferLeadership() (err error)

type Observer

type Observer struct {
	// contains filtered or unexported fields
}

func NewRaftStateObserver

func NewRaftStateObserver(logger log.Logger, r *raft.Raft, state *prometheus.GaugeVec) *Observer

func (*Observer) Deregister

func (o *Observer) Deregister()

func (*Observer) RegisterHandler

func (o *Observer) RegisterHandler(h StateHandler)

type RaftLeaderLocator

type RaftLeaderLocator interface {
	LeaderWithID() (raft.ServerAddress, raft.ServerID)
}

type RaftNodeService

type RaftNodeService struct {
	raftnodepb.RaftNodeServiceServer
	// contains filtered or unexported fields
}

func NewRaftNodeService

func NewRaftNodeService(node RaftNode) *RaftNodeService

func (*RaftNodeService) AddNode

func (*RaftNodeService) DemoteLeader

func (*RaftNodeService) NodeInfo

func (*RaftNodeService) ReadIndex

ReadIndex returns the current commit index and verifies leadership.

func (*RaftNodeService) RemoveNode

type ReadIndex

type ReadIndex struct {
	// CommitIndex is the index of the last log entry that was committed by
	// the leader and is guaranteed to be present on all followers.
	CommitIndex uint64
	// Term the leader was in when the entry was committed.
	Term uint64
}

ReadIndex is the lower bound for the state any query must operate against. However, it does not guarantee snapshot isolation or an upper bound (which is the applied index of the state machine being queried).

Refer to https://web.stanford.edu/~ouster/cgi-bin/papers/OngaroPhD.pdf, paragraph 6.4, "Processing read-only queries more efficiently".

type StateHandler

type StateHandler interface {
	Observe(raft.RaftState)
}

StateHandler is called every time the Raft state change is observed.

type StateReader

type StateReader[Tx any] struct {
	// contains filtered or unexported fields
}

StateReader represents the read-only state of the replicated state machine. It allows performing read-only transactions on the leader's and follower's state machines.

func NewStateReader

func NewStateReader[Tx any](
	leader Leader,
	fsm FSM[Tx],
	checkInterval time.Duration,
	maxDistance uint64,
) *StateReader[Tx]

NewStateReader creates a new interface to query the replicated state. If the provided leader implementation is the local node, the interface implements the Leader Read pattern. Otherwise, it implements the Follower Read pattern.

> This approach is more efficient than committing read-only queries as new > entries in the log, since it avoids synchronous disk writes. To improve > efficiency further, the leader can amortize the cost of confirming its > leadership: it can use a single round of heartbeats for any number of > read-only queries that it has accumulated. > > Followers could also help offload the processing of read-only queries. > This would improve the system’s read throughput, and it would also > divert load away from the leader, allowing the leader to process more > read-write requests. However, these reads would also run the risk of > returning stale data without additional precautions. For example, a > partitioned follower might not receive any new log entries from the leader > for long periods of time, or even if a follower received a heartbeat from > a leader, that leader might itself be deposed and not yet know it. > To serve reads safely, the follower could issue a request to the leader > that just asked for a current readIndex (the leader would execute steps > 1–3 above); the follower could then execute steps 4 and 5 on its own state > machine for any number of accumulated read-only queries.

The applied index is checked on the configured interval. It the distance between the read index and the applied index exceeds the configured threshold, the operation fails with ErrLagBehind. Any error returned by the reader is wrapped with ErrConsistentRead.

func (*StateReader[Tx]) ConsistentRead

func (r *StateReader[Tx]) ConsistentRead(ctx context.Context, read func(tx Tx, index ReadIndex)) error

ConsistentRead performs a read-only operation on the state machine, whether it's a leader or a follower.

The transaction passed to the provided function has read-only access to the most up-to-date data, reflecting the updates from all prior write operations that were successful. If the function returns an error, it's guaranteed that the state has not been accessed. These errors can and should be retried on another replica.

Currently, each ConsistentRead requests the new read index from the leader. It's possible to "pipeline" such queries to minimize communications by obtaining the applied index with WaitLeaderCommitIndexApplied and checking the currently applied index every time entering the transaction. Take into account that the FSM state might be changed at any time (e.g., restored from a snapshot).

It's caller's responsibility to handle errors encountered while using the provided transaction, such as I/O errors or logical inconsistencies.

func (*StateReader[tx]) WaitLeaderCommitIndexApplied

func (r *StateReader[tx]) WaitLeaderCommitIndexApplied(ctx context.Context) (ReadIndex, error)

WaitLeaderCommitIndexApplied blocks until the local applied index reaches the leader read index

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL