Documentation ¶
Overview ¶
Package multiraft implements the Raft distributed consensus algorithm.
In contrast to other Raft implementations, this version is optimized for the case where one server is a part of many Raft consensus groups (likely with overlapping membership). This entails the use of a shared log and coalesced timers for heartbeats.
A cluster consists of a collection of nodes; the local node is represented by a MultiRaft object. Each node may participate in any number of groups. Nodes must have a globally unique ID (a string), and groups have a globally unique name. The application is responsible for providing a Transport interface that knows how to communicate with other nodes based on their IDs, and a Storage interface to manage persistent data.
The Raft protocol is documented in "In Search of an Understandable Consensus Algorithm" by Diego Ongaro and John Ousterhout. https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf
Package multiraft is a generated protocol buffer package. It is generated from these files: cockroach/multiraft/rpc.proto It has these top-level messages: RaftMessageRequest RaftMessageResponse
Index ¶
- Variables
- type Config
- type EventCommandCommitted
- type EventLeaderElection
- type EventMembershipChangeCommitted
- type MemoryStorage
- type MultiRaft
- func (m *MultiRaft) ChangeGroupMembership(groupID proto.RangeID, commandID string, changeType raftpb.ConfChangeType, ...) <-chan error
- func (m *MultiRaft) CreateGroup(groupID proto.RangeID) error
- func (m *MultiRaft) RemoveGroup(groupID proto.RangeID) error
- func (m *MultiRaft) Start()
- func (m *MultiRaft) Status(groupID proto.RangeID) *raft.Status
- func (m *MultiRaft) SubmitCommand(groupID proto.RangeID, commandID string, command []byte) <-chan error
- type RaftMessageRequest
- func (m *RaftMessageRequest) GetGroupID() github_com_cockroachdb_cockroach_proto.RangeID
- func (m *RaftMessageRequest) GetMessage() raftpb.Message
- func (m *RaftMessageRequest) GetUser() string
- func (m *RaftMessageRequest) Marshal() (data []byte, err error)
- func (m *RaftMessageRequest) MarshalTo(data []byte) (int, error)
- func (*RaftMessageRequest) ProtoMessage()
- func (m *RaftMessageRequest) Reset()
- func (m *RaftMessageRequest) Size() (n int)
- func (m *RaftMessageRequest) String() string
- func (m *RaftMessageRequest) Unmarshal(data []byte) error
- type RaftMessageResponse
- func (m *RaftMessageResponse) Marshal() (data []byte, err error)
- func (m *RaftMessageResponse) MarshalTo(data []byte) (int, error)
- func (*RaftMessageResponse) ProtoMessage()
- func (m *RaftMessageResponse) Reset()
- func (m *RaftMessageResponse) Size() (n int)
- func (m *RaftMessageResponse) String() string
- func (m *RaftMessageResponse) Unmarshal(data []byte) error
- type ServerInterface
- type StateMachine
- type Storage
- type Ticker
- type Transport
- type WriteableGroupStorage
Constants ¶
This section is empty.
Variables ¶
var ErrGroupDeleted = errors.New("raft group deleted")
An ErrGroupDeleted is returned for commands which are pending while their group is deleted.
var (
ErrInvalidLengthRpc = fmt.Errorf("proto: negative length found during unmarshaling")
)
var ErrStopped = errors.New("raft processing stopped")
ErrStopped is returned for commands that could not be completed before the node was stopped.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { Storage Storage Transport Transport // Ticker may be nil to use real time and TickInterval. Ticker Ticker // StateMachine may be nil if the state machine is transient and always starts from // a blank slate. StateMachine StateMachine // A new election is called if the election timeout elapses with no // contact from the leader. The actual timeout is chosen randomly // from the range [ElectionTimeoutTicks*TickInterval, // ElectionTimeoutTicks*TickInterval*2) to minimize the chances of // several servers trying to become leaders simultaneously. The Raft // paper suggests a range of 150-300ms for local networks; // geographically distributed installations should use higher values // to account for the increased round trip time. ElectionTimeoutTicks int HeartbeatIntervalTicks int TickInterval time.Duration // EventBufferSize is the capacity (in number of events) of the // MultiRaft.Events channel. In tests, we use 0 to ensure that there // are no deadlocks when the limit is reached; real deployments may // want to set a buffer so that applying a command committed on one // group does not interfere with other groups or cause heartbeats to // be missed. EventBufferSize int EntryFormatter raft.EntryFormatter }
Config contains the parameters necessary to construct a MultiRaft object.
type EventCommandCommitted ¶
type EventCommandCommitted struct { GroupID proto.RangeID // CommandID is the application-supplied ID for this command. The same CommandID // may be seen multiple times, so the application should remember this CommandID // for deduping. CommandID string // Index is the raft log index for this event. The application should persist // the Index of the last applied command atomically with any effects of that // command. Index uint64 Command []byte }
An EventCommandCommitted is broadcast whenever a command has been committed.
type EventLeaderElection ¶
type EventLeaderElection struct { GroupID proto.RangeID NodeID proto.RaftNodeID Term uint64 }
An EventLeaderElection is broadcast when a group starts or completes an election. NodeID is zero when an election is in progress.
type EventMembershipChangeCommitted ¶
type EventMembershipChangeCommitted struct { // GroupID, CommandID, and Index are the same as for EventCommandCommitted. GroupID proto.RangeID CommandID string Index uint64 NodeID proto.RaftNodeID ChangeType raftpb.ConfChangeType Payload []byte // Callback should be invoked when this event and its payload have been // processed. A non-nil error aborts the membership change. Callback func(error) }
An EventMembershipChangeCommitted is broadcast whenever a membership change has been committed.
type MemoryStorage ¶
type MemoryStorage struct {
// contains filtered or unexported fields
}
MemoryStorage is an in-memory implementation of Storage for testing.
func NewMemoryStorage ¶
func NewMemoryStorage() *MemoryStorage
NewMemoryStorage creates a MemoryStorage.
func (*MemoryStorage) GroupStorage ¶
func (m *MemoryStorage) GroupStorage(groupID proto.RangeID) WriteableGroupStorage
GroupStorage implements the Storage interface.
type MultiRaft ¶
type MultiRaft struct { Config Events chan interface{} // contains filtered or unexported fields }
MultiRaft represents a local node in a raft cluster. The owner is responsible for consuming the Events channel in a timely manner.
func NewMultiRaft ¶
func NewMultiRaft(nodeID proto.RaftNodeID, config *Config, stopper *stop.Stopper) (*MultiRaft, error)
NewMultiRaft creates a MultiRaft object.
func (*MultiRaft) ChangeGroupMembership ¶
func (m *MultiRaft) ChangeGroupMembership(groupID proto.RangeID, commandID string, changeType raftpb.ConfChangeType, nodeID proto.RaftNodeID, payload []byte) <-chan error
ChangeGroupMembership submits a proposed membership change to the cluster. Payload is an opaque blob that will be returned in EventMembershipChangeCommitted.
func (*MultiRaft) CreateGroup ¶
CreateGroup creates a new consensus group and joins it. The initial membership of this group is determined by the InitialState method of the group's Storage object.
func (*MultiRaft) RemoveGroup ¶
RemoveGroup destroys the consensus group with the given ID. No events for this group will be emitted after this method returns (but some events may still be in the channel buffer).
func (*MultiRaft) Start ¶
func (m *MultiRaft) Start()
Start runs the raft algorithm in a background goroutine.
func (*MultiRaft) SubmitCommand ¶
func (m *MultiRaft) SubmitCommand(groupID proto.RangeID, commandID string, command []byte) <-chan error
SubmitCommand sends a command (a binary blob) to the cluster. This method returns when the command has been successfully sent, not when it has been committed. An error or nil will be written to the returned channel when the command has been committed or aborted.
type RaftMessageRequest ¶
type RaftMessageRequest struct { GroupID github_com_cockroachdb_cockroach_proto.RangeID `protobuf:"varint,1,opt,name=group_id,casttype=github.com/cockroachdb/cockroach/proto.RangeID" json:"group_id"` Message raftpb.Message `protobuf:"bytes,2,opt,name=message" json:"message"` }
RaftMessageRequest is the request used to send raft messages using our protobuf-based RPC codec.
func (*RaftMessageRequest) GetGroupID ¶
func (m *RaftMessageRequest) GetGroupID() github_com_cockroachdb_cockroach_proto.RangeID
func (*RaftMessageRequest) GetMessage ¶
func (m *RaftMessageRequest) GetMessage() raftpb.Message
func (*RaftMessageRequest) GetUser ¶
func (m *RaftMessageRequest) GetUser() string
GetUser implements userRequest. Raft messages are always sent by the node user.
func (*RaftMessageRequest) Marshal ¶
func (m *RaftMessageRequest) Marshal() (data []byte, err error)
func (*RaftMessageRequest) MarshalTo ¶
func (m *RaftMessageRequest) MarshalTo(data []byte) (int, error)
func (*RaftMessageRequest) ProtoMessage ¶
func (*RaftMessageRequest) ProtoMessage()
func (*RaftMessageRequest) Reset ¶
func (m *RaftMessageRequest) Reset()
func (*RaftMessageRequest) Size ¶
func (m *RaftMessageRequest) Size() (n int)
func (*RaftMessageRequest) String ¶
func (m *RaftMessageRequest) String() string
func (*RaftMessageRequest) Unmarshal ¶
func (m *RaftMessageRequest) Unmarshal(data []byte) error
type RaftMessageResponse ¶
type RaftMessageResponse struct { }
RaftMessageResponse is an empty message returned by raft RPCs. If a response is needed it will be sent as a separate message.
func (*RaftMessageResponse) Marshal ¶
func (m *RaftMessageResponse) Marshal() (data []byte, err error)
func (*RaftMessageResponse) MarshalTo ¶
func (m *RaftMessageResponse) MarshalTo(data []byte) (int, error)
func (*RaftMessageResponse) ProtoMessage ¶
func (*RaftMessageResponse) ProtoMessage()
func (*RaftMessageResponse) Reset ¶
func (m *RaftMessageResponse) Reset()
func (*RaftMessageResponse) Size ¶
func (m *RaftMessageResponse) Size() (n int)
func (*RaftMessageResponse) String ¶
func (m *RaftMessageResponse) String() string
func (*RaftMessageResponse) Unmarshal ¶
func (m *RaftMessageResponse) Unmarshal(data []byte) error
type ServerInterface ¶
type ServerInterface interface {
RaftMessage(req *RaftMessageRequest) (*RaftMessageResponse, error)
}
ServerInterface is the methods we expose for use by net/rpc.
type StateMachine ¶
type StateMachine interface { // AppliedIndex returns the last index which has been applied to the given group's // state machine. AppliedIndex(groupID proto.RangeID) (uint64, error) }
The StateMachine interface is supplied by the application to manage a persistent state machine (in Cockroach the StateMachine and the Storage are the same thing but they are logically distinct and systems like etcd keep them separate).
type Storage ¶
type Storage interface {
GroupStorage(groupID proto.RangeID) WriteableGroupStorage
}
The Storage interface is supplied by the application to manage persistent storage of raft data.
type Ticker ¶
type Ticker interface { // This channel will be readable once per tick. The time value returned is unspecified; // the channel has this type for compatibility with time.Ticker but other implementations // may not return real times. Chan() <-chan time.Time // Close stops the ticker and releases its resources. Close() }
Ticker encapsulates the timing-related parts of the raft protocol.
type Transport ¶
type Transport interface { // Listen informs the Transport of the local node's ID and callback interface. // The Transport should associate the given id with the server object so other Transport's // Connect methods can find it. Listen(id proto.RaftNodeID, server ServerInterface) error // Stop undoes a previous Listen. Stop(id proto.RaftNodeID) // Send a message to the node specified in the request's To field. Send(req *RaftMessageRequest) error // Close all associated connections. Close() }
The Transport interface is supplied by the application to manage communication with other nodes. It is responsible for mapping from IDs to some communication channel (in the simplest case, a host:port pair could be used as an ID, although this would make it impossible to move an instance from one host to another except by syncing up a new node from scratch).
func NewLocalRPCTransport ¶
NewLocalRPCTransport creates a Transport for local testing use. MultiRaft instances sharing the same local Transport can find and communicate with each other by ID (which can be an arbitrary string). Each instance binds to a different unused port on localhost. Because this is just for local testing, it doesn't use TLS.
type WriteableGroupStorage ¶
type WriteableGroupStorage interface { raft.Storage Append(entries []raftpb.Entry) error ApplySnapshot(snap raftpb.Snapshot) error SetHardState(st raftpb.HardState) error }
WriteableGroupStorage represents a single group within a Storage. It is implemented by *raft.MemoryStorage.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package storagetest is a test suite for raft.Storage implementations.
|
Package storagetest is a test suite for raft.Storage implementations. |