Documentation ¶
Overview ¶
Package raft exports MessagePack codec functionality to serialize log entries, LogStore items, and FSM snaphosts.
Package raft exports a Raft finite state machine (FSM) for DDA state members to make use of replicated state.
Package raft provides a state synchronization binding implementation using the Raft consensus algorithm.
Package raft exports the RaftStore which is an implementation of a LogStore, a StableStore, and a FileSnapshotStore for the HashiCorp Raft library. RaftStore uses the internal DDA implementation of the Pebble storage engine.
Package raft exports a Raft transport based on DDA pub-sub communication.
Index ¶
- Constants
- Variables
- func DecodeMsgPack(b []byte, out any) error
- func DecodeMsgPackFromReader(r io.ReadCloser, out any) error
- func EncodeMsgPack(in any) ([]byte, error)
- type AddVoterRequest
- type AddVoterResponse
- type ApplyRequest
- type ApplyResponse
- type RaftBinding
- func (b *RaftBinding) Close()
- func (b *RaftBinding) Node() *hraft.Raft
- func (b *RaftBinding) NodeId() string
- func (b *RaftBinding) ObserveMembershipChange(ctx context.Context) (<-chan api.MembershipChange, error)
- func (b *RaftBinding) ObserveStateChange(ctx context.Context) (<-chan api.Input, error)
- func (b *RaftBinding) Open(cfg *config.Config, com comapi.Api) error
- func (b *RaftBinding) ProposeInput(ctx context.Context, in *api.Input) error
- type RaftFsm
- func (f *RaftFsm) AddStateChangeObserver(ch chan api.Input) uint64
- func (f *RaftFsm) Apply(entry *hraft.Log) any
- func (f *RaftFsm) RemoveStateChangeObserver(chanId uint64)
- func (f *RaftFsm) Restore(snapshot io.ReadCloser) error
- func (f *RaftFsm) Snapshot() (hraft.FSMSnapshot, error)
- func (f *RaftFsm) State() api.State
- type RaftStore
- func (s *RaftStore) AddMembershipChangeObserver(ch chan api.MembershipChange, raft *hraft.Raft) uint64
- func (s *RaftStore) Close(removeStorage bool) error
- func (s *RaftStore) DeleteRange(min, max uint64) error
- func (s *RaftStore) FirstIndex() (uint64, error)
- func (s *RaftStore) Get(key []byte) ([]byte, error)
- func (s *RaftStore) GetLog(index uint64, log *hraft.Log) error
- func (s *RaftStore) GetUint64(key []byte) (uint64, error)
- func (s *RaftStore) LastIndex() (uint64, error)
- func (s *RaftStore) RemoveMembershipChangeObserver(chanId uint64)
- func (s *RaftStore) Set(key []byte, val []byte) error
- func (s *RaftStore) SetUint64(key []byte, val uint64) error
- func (s *RaftStore) StoreLog(log *hraft.Log) error
- func (s *RaftStore) StoreLogs(logs []*hraft.Log) error
- type RaftTransport
- func (t *RaftTransport) AppendEntries(id hraft.ServerID, target hraft.ServerAddress, ...) error
- func (t *RaftTransport) AppendEntriesPipeline(id hraft.ServerID, target hraft.ServerAddress) (hraft.AppendPipeline, error)
- func (t *RaftTransport) Close() error
- func (t *RaftTransport) Consumer() <-chan hraft.RPC
- func (t *RaftTransport) DecodePeer(buf []byte) hraft.ServerAddress
- func (t *RaftTransport) EncodePeer(id hraft.ServerID, addr hraft.ServerAddress) []byte
- func (t *RaftTransport) InstallSnapshot(id hraft.ServerID, target hraft.ServerAddress, ...) error
- func (t *RaftTransport) LfwAddVoter(ctx context.Context, args *AddVoterRequest, resp *AddVoterResponse) error
- func (t *RaftTransport) LfwApply(ctx context.Context, args *ApplyRequest, resp *ApplyResponse) error
- func (t *RaftTransport) LfwConsumer() <-chan hraft.RPC
- func (t *RaftTransport) LfwRemoveServer(ctx context.Context, args *RemoveServerRequest, resp *RemoveServerResponse) error
- func (t *RaftTransport) LocalAddr() hraft.ServerAddress
- func (t *RaftTransport) RequestVote(id hraft.ServerID, target hraft.ServerAddress, args *hraft.RequestVoteRequest, ...) error
- func (t *RaftTransport) SetHeartbeatHandler(cb func(rpc hraft.RPC))
- func (t *RaftTransport) Timeout() time.Duration
- func (t *RaftTransport) TimeoutNow(id hraft.ServerID, target hraft.ServerAddress, args *hraft.TimeoutNowRequest, ...) error
- type RaftTransportConfig
- type RemoveServerRequest
- type RemoveServerResponse
Constants ¶
const ( // DefaultStartupTimeout is the default timeout when starting up a new Raft // node, either as a leader or as a follower. DefaultStartupTimeout = 10000 * time.Millisecond // DefaultLfwTimeout is the default timeout for leaderforwarded Propose and // GetState remote operation responses. It only applies in situations where // there is no leader. It must not be set too low as Propose operations may // take some time. DefaultLfwTimeout = 20000 * time.Millisecond )
const ( // DefaultRpcTimeout is the default remote operation timeout in the Raft // transport. DefaultRpcTimeout = 1000 * time.Millisecond // DefaultInstallSnapshotTimeoutScale is the default TimeoutScale for // InstallSnapshot operations in the Raft transport. DefaultInstallSnapshotTimeoutScale = 256 * 1024 // 256KB )
Variables ¶
var ( // ErrTransportShutdown is returned when operations on a transport are // invoked after it's been terminated. ErrTransportShutdown = fmt.Errorf("transport shutdown") // ErrPipelineShutdown is returned when the pipeline is closed. ErrPipelineShutdown = fmt.Errorf("append pipeline closed") )
var (
ErrKeyNotFound = errors.New("not found") // corresponds with Hashicorp raft key not found error string
)
Functions ¶
func DecodeMsgPack ¶
DecodeMsgPack decodes from a MessagePack encoded byte slice.
func DecodeMsgPackFromReader ¶
func DecodeMsgPackFromReader(r io.ReadCloser, out any) error
DecodeMsgPackFromReader decodes from a MessagePack encoded reader.
func EncodeMsgPack ¶
EncodeMsgPack returns an encoded MessagePack object as a byte slice.
Types ¶
type AddVoterRequest ¶
type AddVoterRequest struct { ServerId hraft.ServerID ServerAddress hraft.ServerAddress Timeout time.Duration // initial time to wait for AddVoter command to be started on leader }
AddVoterRequest represents a leader forwarded request to add this node as a voting follower. The request responds with an error if the transport's configured rpcTimeout elapses before the corresponding command completes on the leader.
type AddVoterResponse ¶
type AddVoterResponse struct {
Index uint64 // holds the index of the newly applied log entry
}
AddVoterResponse represents a response to a leader forwarded AddVoterRequest.
type ApplyRequest ¶
type ApplyRequest struct { Command []byte Timeout time.Duration // initial time to wait for Apply command to be started on leader }
ApplyRequest represents a leader forwarded request to apply a given log command. The request responds with an error if the transport's configured rpcTimeout elapses before the corresponding command completes on the leader.
type ApplyResponse ¶
type ApplyResponse struct { Index uint64 // holds the index of the newly applied log entry on the leader Response error // nil if operation is successful; an error otherwise }
ApplyResponse represents a response to a leader forwarded ApplyRequest.
type RaftBinding ¶
type RaftBinding struct {
// contains filtered or unexported fields
}
RaftBinding realizes a state synchronization binding for the Raft consensus protocol by implementing the state synchronization API interface api.Api using the HashiCorp Raft library.
func (*RaftBinding) Node ¶
func (b *RaftBinding) Node() *hraft.Raft
Node returns the Raft node (exposed for testing purposes).
func (*RaftBinding) NodeId ¶
func (b *RaftBinding) NodeId() string
NodeId returns the Raft node ID (exposed for testing purposes).
func (*RaftBinding) ObserveMembershipChange ¶
func (b *RaftBinding) ObserveMembershipChange(ctx context.Context) (<-chan api.MembershipChange, error)
ObserveMembershipChange implements the api.Api interface.
func (*RaftBinding) ObserveStateChange ¶
ObserveStateChange implements the api.Api interface.
func (*RaftBinding) ProposeInput ¶
ProposeInput implements the api.Api interface.
type RaftFsm ¶
type RaftFsm struct {
// contains filtered or unexported fields
}
RaftFsm implements the hraft.FSM interface to model replicated state in the form of a key-value dictionary.
func NewRaftFsm ¶
func NewRaftFsm() *RaftFsm
NewRaftFsm creates a new Raft FSM that models replicated state in the form of a key-value dictionary.
func (*RaftFsm) AddStateChangeObserver ¶
AddStateChangeObserver registers the given channel to listen to state changes and returns a channel ID that can be used to deregister the channel later.
The channel should continuously receive data on the channel in a non-blocking manner to prevent blocking send operations.
func (*RaftFsm) Apply ¶
Apply is called once a log entry is committed by a majority of the cluster.
Apply should apply the log to the FSM. Apply must be deterministic and produce the same result on all peers in the cluster.
The returned value is returned to the client as the ApplyFuture.Response. Note that if Apply returns an error, it will be returned by Response, and not by the Error method of ApplyFuture, so it is always important to check Response for errors from the FSM. If the given input operation is applied successfully, ApplyFuture.Response returns nil.
Apply implements the hraft.FSM interface.
func (*RaftFsm) RemoveStateChangeObserver ¶
RemoveStateChangeObserver deregisters the channel with the given channel id.
Note that the channel is not closed, it must be closed by the caller.
func (*RaftFsm) Restore ¶
func (f *RaftFsm) Restore(snapshot io.ReadCloser) error
Restore is used to restore an FSM from a snapshot. It is not called concurrently with any other command. The FSM must discard all previous state before restoring the snapshot.
Restore implements the hraft.FSM interface.
func (*RaftFsm) Snapshot ¶
func (f *RaftFsm) Snapshot() (hraft.FSMSnapshot, error)
Snapshot returns an FSMSnapshot used to: support log compaction, to restore the FSM to a previous state, or to bring out-of-date followers up to a recent log index.
The Snapshot implementation should return quickly, because Apply can not be called while Snapshot is running. Generally this means Snapshot should only capture a pointer to the state, and any expensive IO should happen as part of FSMSnapshot.Persist.
Apply and Snapshot are always called from the same thread, but Apply will be called concurrently with FSMSnapshot.Persist. This means the FSM should be implemented to allow for concurrent updates while a snapshot is happening.
Snapshot implements the hraft.FSM interface.
type RaftStore ¶
type RaftStore struct { SnapStore hraft.SnapshotStore // Raft snapshot store // contains filtered or unexported fields }
RaftStore implements a LogStore, a StableStore, and a FileSnapshotStore for the [HashiCorp Raft] library.
func NewRaftStore ¶
NewRaftStore creates local storage for persisting Raft specific durable data including log entries, stable store, and file snapshot store. Returns an error along with a nil *RaftStore if any of the stores couldn't be created.
The given storage location should specify a directory given by an absolute pathname or a pathname relative to the working directory of the DDA sidecar or instance, or an empty string to indicate that storage is non-persistent and completely memory-backed as long as the DDA instance is running.
func (*RaftStore) AddMembershipChangeObserver ¶
func (s *RaftStore) AddMembershipChangeObserver(ch chan api.MembershipChange, raft *hraft.Raft) uint64
AddMembershipChangeObserver registers the given channel to listen to membership changes and returns a channel ID that can be used to deregister the channel later.
The channel should continuously receive data on the channel in a non-blocking manner to prevent blocking send operations.
func (*RaftStore) Close ¶
Close gracefully closes the Raft store, optionally removing all associated persistent storage files and folders.
You should not remove storage if you want to restart your DDA state member at a later point in time.
func (*RaftStore) DeleteRange ¶
DeleteRange deletes a range of log entries. The range is inclusive.
DeleteRange implements the hraft.LogStore interface.
func (*RaftStore) FirstIndex ¶
FirstIndex returns the first index written. 0 for no entries.
FirstIndex implements the hraft.LogStore interface.
func (*RaftStore) Get ¶
Get returns the value for key, or an empty byte slice if key was not found.
Get implements the hraft.StableStore interface.
func (*RaftStore) GetLog ¶
GetLog gets a log entry at a given index.
GetLog implements the hraft.LogStore interface.
func (*RaftStore) GetUint64 ¶
GetUint64 returns the uint64 value for key, or 0 if key was not found.
GetUint64 implements the hraft.StableStore interface.
func (*RaftStore) LastIndex ¶
LastIndex returns the last index written. 0 for no entries.
LastIndex implements the hraft.LogStore interface.
func (*RaftStore) RemoveMembershipChangeObserver ¶
RemoveMembershipChangeObserver deregisters the channel with the given channel id.
Note that the channel is not closed, it must be closed by the caller.
func (*RaftStore) Set ¶
Set sets the given key-value pair.
Set implements the hraft.StableStore interface.
func (*RaftStore) SetUint64 ¶
SetUint64 sets the given key-value pair.
SetUint64 implements the hraft.StableStore interface.
func (*RaftStore) StoreLog ¶
StoreLog stores a log entry.
StoreLog implements the hraft.LogStore interface.
func (*RaftStore) StoreLogs ¶
StoreLogs stores multiple log entries.
By default the logs stored may not be contiguous with previous logs (i.e. may have a gap in Index since the last log written). If an implementation can't tolerate this it may optionally implement `MonotonicLogStore` to indicate that this is not allowed. This changes Raft's behaviour after restoring a user snapshot to remove all previous logs instead of relying on a "gap" to signal the discontinuity between logs before the snapshot and logs after.
StoreLogs implements the hraft.LogStore interface.
type RaftTransport ¶
type RaftTransport struct {
// contains filtered or unexported fields
}
RaftTransport implements the hraft.Transport interface to allow Raft to communicate with other Raft nodes over the configured DDA pub-sub communication protocol. In addition, it supports leader forwarding to allow non-leader Raft nodes to accept Apply, Barrier, and AddVoter commands.
func NewRaftTransport ¶
func NewRaftTransport(config *RaftTransportConfig, addr hraft.ServerAddress, com comapi.Api) *RaftTransport
NewRaftTransport creates a new Raft pub-sub transport with the given transport configuration, a local address, and a ready-to-use DDA pub-sub communication API.
func (*RaftTransport) AppendEntries ¶
func (t *RaftTransport) AppendEntries(id hraft.ServerID, target hraft.ServerAddress, args *hraft.AppendEntriesRequest, resp *hraft.AppendEntriesResponse) error
AppendEntries sends the appropriate RPC to the target node.
AppendEntries implements the hraft.Transport interface.
func (*RaftTransport) AppendEntriesPipeline ¶
func (t *RaftTransport) AppendEntriesPipeline(id hraft.ServerID, target hraft.ServerAddress) (hraft.AppendPipeline, error)
AppendEntriesPipeline returns an interface that can be used to pipeline AppendEntries requests to the target node.
AppendEntriesPipeline implements the hraft.Transport interface.
func (*RaftTransport) Close ¶
func (t *RaftTransport) Close() error
Close permanently closes a transport, stopping any associated goroutines and freeing other resources.
Close implements the hraft.WithClose interface. WithClose is an interface that a transport may provide which allows a transport to be shut down cleanly when a Raft instance shuts down.
func (*RaftTransport) Consumer ¶
func (t *RaftTransport) Consumer() <-chan hraft.RPC
Consumer returns a channel that can be used to consume and respond to RPC requests. This channel is not used for leader forwarding operations (see [LfwConsumer]).
Consumer implements the hraft.Transport interface.
func (*RaftTransport) DecodePeer ¶
func (t *RaftTransport) DecodePeer(buf []byte) hraft.ServerAddress
DecodePeer is used to deserialize a peer's address.
DecodePeer implements the hraft.Transport interface.
func (*RaftTransport) EncodePeer ¶
func (t *RaftTransport) EncodePeer(id hraft.ServerID, addr hraft.ServerAddress) []byte
EncodePeer is used to serialize a peer's address
EncodePeer implements the hraft.Transport interface.
func (*RaftTransport) InstallSnapshot ¶
func (t *RaftTransport) InstallSnapshot(id hraft.ServerID, target hraft.ServerAddress, args *hraft.InstallSnapshotRequest, resp *hraft.InstallSnapshotResponse, data io.Reader) error
InstallSnapshot is used to push a snapshot down to a follower. The data is read from the ReadCloser and streamed to the client.
InstallSnapshot implements the hraft.Transport interface.
func (*RaftTransport) LfwAddVoter ¶
func (t *RaftTransport) LfwAddVoter(ctx context.Context, args *AddVoterRequest, resp *AddVoterResponse) error
LfwAddVoter implements transparent leader forwarding for hraft.AddVoter command. It will forward the request to the leader which will add the given server to the cluster as a staging server, promoting it to a voter once that server is ready.
LfwAddVoter is a blocking operation that will time out with an error in case no response is received within a time interval given by the context.
func (*RaftTransport) LfwApply ¶
func (t *RaftTransport) LfwApply(ctx context.Context, args *ApplyRequest, resp *ApplyResponse) error
LfwApply implements transparent leader forwarding for hraft.Apply command. It will forward a command to the leader which applies it to the FSM in a highly consistent manner.
LfwApply is a blocking operation that will time out with an error in case no response is received within a time interval given by the context.
func (*RaftTransport) LfwConsumer ¶
func (t *RaftTransport) LfwConsumer() <-chan hraft.RPC
LfwConsumer returns a channel that can be used to consume and respond to leader forwarded RPC requests. This channel is not used for node-targeted RPC operations (see [Consumer]).
func (*RaftTransport) LfwRemoveServer ¶
func (t *RaftTransport) LfwRemoveServer(ctx context.Context, args *RemoveServerRequest, resp *RemoveServerResponse) error
LfwRemoveServer implements transparent leader forwarding for hraft.RemoveServer command. It will forward the request to the leader which will remove the given server from the cluster. If the current leader is being removed, it will cause a new election to occur.
LfwRemoveServer is a blocking operation that will time out with an error in case no response is received within a time interval given by the context.
func (*RaftTransport) LocalAddr ¶
func (t *RaftTransport) LocalAddr() hraft.ServerAddress
LocalAddr is used to return our local address to distinguish from our peers.
LocalAddr implements the hraft.Transport interface.
func (*RaftTransport) RequestVote ¶
func (t *RaftTransport) RequestVote(id hraft.ServerID, target hraft.ServerAddress, args *hraft.RequestVoteRequest, resp *hraft.RequestVoteResponse) error
RequestVote sends the appropriate RPC to the target node.
RequestVote implements the hraft.Transport interface.
func (*RaftTransport) SetHeartbeatHandler ¶
func (t *RaftTransport) SetHeartbeatHandler(cb func(rpc hraft.RPC))
SetHeartbeatHandler is used to setup a heartbeat handler as a fast-path. This is to avoid head-of-line blocking from RPC invocations. If a Transport does not support this, it can simply ignore the callback, and push the heartbeat onto the Consumer channel. Otherwise, it MUST be safe for this callback to be invoked concurrently with a blocking RPC.
SetHeartbeatHandler implements the hraft.Transport interface.
func (*RaftTransport) Timeout ¶
func (t *RaftTransport) Timeout() time.Duration
Timeout gets the configured timeout duration of the transport.
func (*RaftTransport) TimeoutNow ¶
func (t *RaftTransport) TimeoutNow(id hraft.ServerID, target hraft.ServerAddress, args *hraft.TimeoutNowRequest, resp *hraft.TimeoutNowResponse) error
TimeoutNow is used to start a leadership transfer to the target node.
TimeoutNow implements the hraft.Transport interface.
type RaftTransportConfig ¶
type RaftTransportConfig struct { // Timeout used to apply I/O deadlines to remote operations on the Raft // transport. For InstallSnapshot, we multiply the timeout by (SnapshotSize // / TimeoutScale). // // If not present or zero, the default timeout is applied. Timeout time.Duration // For InstallSnapshot, timeout is proportional to the snapshot size. The // timeout is multiplied by (SnapshotSize / TimeoutScale). // // If not present or zero, a default value of 256KB is used. TimeoutScale int }
RaftTransportConfig encapsulates configuration options for the Raft pub-sub transport layer.
type RemoveServerRequest ¶
type RemoveServerRequest struct { ServerId hraft.ServerID Timeout time.Duration // initial time to wait for RemoveServer command to be started on leader }
AddVoterRequest represents a leader forwarded request to remove this node as a server from the cluster. The request responds with an error if the transport's configured rpcTimeout elapses before the corresponding command completes on the leader.
type RemoveServerResponse ¶
type RemoveServerResponse struct {
Index uint64 // holds the index of the newly applied log entry
}
RemoveServerResponse represents a response to a leader forwarded RemoveServerRequest.