Documentation ¶
Overview ¶
Package multiraft implements the Raft distributed consensus algorithm.
In contrast to other Raft implementations, this version is optimized for the case where one server is a part of many Raft consensus groups (likely with overlapping membership). This entails the use of a shared log and coalesced timers for heartbeats.
A cluster consists of a collection of nodes; the local node is represented by a MultiRaft object. Each node may participate in any number of groups. Nodes must have a globally unique ID (a string), and groups have a globally unique name. The application is responsible for providing a Transport interface that knows how to communicate with other nodes based on their IDs, and a Storage interface to manage persistent data.
The Raft protocol is documented in "In Search of an Understandable Consensus Algorithm" by Diego Ongaro and John Ousterhout. https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf
Package multiraft is a generated protocol buffer package. It is generated from these files: cockroach/multiraft/rpc.proto It has these top-level messages: RaftMessageRequest RaftMessageResponse ConfChangeContext
Index ¶
- Variables
- type ConfChangeContext
- func (m *ConfChangeContext) Marshal() (data []byte, err error)
- func (m *ConfChangeContext) MarshalTo(data []byte) (int, error)
- func (*ConfChangeContext) ProtoMessage()
- func (m *ConfChangeContext) Reset()
- func (m *ConfChangeContext) Size() (n int)
- func (m *ConfChangeContext) String() string
- func (m *ConfChangeContext) Unmarshal(data []byte) error
- type Config
- type EventCommandCommitted
- type EventLeaderElection
- type EventMembershipChangeCommitted
- type MemoryStorage
- func (m *MemoryStorage) GroupLocker() sync.Locker
- func (m *MemoryStorage) GroupStorage(groupID roachpb.RangeID, replicaID roachpb.ReplicaID) (WriteableGroupStorage, error)
- func (m *MemoryStorage) ReplicaDescriptor(groupID roachpb.RangeID, replicaID roachpb.ReplicaID) (roachpb.ReplicaDescriptor, error)
- func (m *MemoryStorage) ReplicaIDForStore(groupID roachpb.RangeID, storeID roachpb.StoreID) (roachpb.ReplicaID, error)
- func (m *MemoryStorage) ReplicasFromSnapshot(_ raftpb.Snapshot) ([]roachpb.ReplicaDescriptor, error)
- type MultiRaft
- func (m *MultiRaft) ChangeGroupMembership(groupID roachpb.RangeID, commandID string, changeType raftpb.ConfChangeType, ...) <-chan error
- func (m *MultiRaft) CreateGroup(groupID roachpb.RangeID) error
- func (m *MultiRaft) RemoveGroup(groupID roachpb.RangeID) error
- func (m *MultiRaft) Start()
- func (m *MultiRaft) Status(groupID roachpb.RangeID) *raft.Status
- func (m *MultiRaft) SubmitCommand(groupID roachpb.RangeID, commandID string, command []byte) <-chan error
- type RaftMessageRequest
- func (*RaftMessageRequest) GetUser() string
- func (m *RaftMessageRequest) Marshal() (data []byte, err error)
- func (m *RaftMessageRequest) MarshalTo(data []byte) (int, error)
- func (*RaftMessageRequest) ProtoMessage()
- func (m *RaftMessageRequest) Reset()
- func (m *RaftMessageRequest) Size() (n int)
- func (m *RaftMessageRequest) String() string
- func (m *RaftMessageRequest) Unmarshal(data []byte) error
- type RaftMessageResponse
- func (m *RaftMessageResponse) Marshal() (data []byte, err error)
- func (m *RaftMessageResponse) MarshalTo(data []byte) (int, error)
- func (*RaftMessageResponse) ProtoMessage()
- func (m *RaftMessageResponse) Reset()
- func (m *RaftMessageResponse) Size() (n int)
- func (m *RaftMessageResponse) String() string
- func (m *RaftMessageResponse) Unmarshal(data []byte) error
- type ServerInterface
- type StateMachine
- type Storage
- type Ticker
- type Transport
- type WriteableGroupStorage
Constants ¶
This section is empty.
Variables ¶
var ( ErrInvalidLengthRpc = fmt.Errorf("proto: negative length found during unmarshaling") ErrIntOverflowRpc = fmt.Errorf("proto: integer overflow") )
var ErrGroupDeleted = errors.New("raft group deleted")
ErrGroupDeleted is returned for commands which are pending while their group is deleted.
var ErrStopped = errors.New("raft processing stopped")
ErrStopped is returned for commands that could not be completed before the node was stopped.
Functions ¶
This section is empty.
Types ¶
type ConfChangeContext ¶
type ConfChangeContext struct { CommandID string `protobuf:"bytes,1,opt,name=command_id" json:"command_id"` // Payload is the application-level command (i.e. an encoded // roachpb.EndTransactionRequest). Payload []byte `protobuf:"bytes,2,opt,name=payload" json:"payload,omitempty"` // Replica contains full details about the replica being added or removed. Replica cockroach_roachpb.ReplicaDescriptor `protobuf:"bytes,3,opt,name=replica" json:"replica"` }
ConfChangeContext is encoded in the raftpb.ConfChange.Context field.
func (*ConfChangeContext) Marshal ¶
func (m *ConfChangeContext) Marshal() (data []byte, err error)
func (*ConfChangeContext) MarshalTo ¶
func (m *ConfChangeContext) MarshalTo(data []byte) (int, error)
func (*ConfChangeContext) ProtoMessage ¶
func (*ConfChangeContext) ProtoMessage()
func (*ConfChangeContext) Reset ¶
func (m *ConfChangeContext) Reset()
func (*ConfChangeContext) Size ¶
func (m *ConfChangeContext) Size() (n int)
func (*ConfChangeContext) String ¶
func (m *ConfChangeContext) String() string
func (*ConfChangeContext) Unmarshal ¶
func (m *ConfChangeContext) Unmarshal(data []byte) error
type Config ¶
type Config struct { Storage Storage Transport Transport // Ticker may be nil to use real time and TickInterval. Ticker Ticker // StateMachine may be nil if the state machine is transient and always starts from // a blank slate. StateMachine StateMachine // A new election is called if the election timeout elapses with no // contact from the leader. The actual timeout is chosen randomly // from the range [ElectionTimeoutTicks*TickInterval, // ElectionTimeoutTicks*TickInterval*2) to minimize the chances of // several servers trying to become leaders simultaneously. The Raft // paper suggests a range of 150-300ms for local networks; // geographically distributed installations should use higher values // to account for the increased round trip time. ElectionTimeoutTicks int HeartbeatIntervalTicks int TickInterval time.Duration EntryFormatter raft.EntryFormatter }
Config contains the parameters necessary to construct a MultiRaft object.
type EventCommandCommitted ¶
type EventCommandCommitted struct { GroupID roachpb.RangeID // CommandID is the application-supplied ID for this command. The same CommandID // may be seen multiple times, so the application should remember this CommandID // for deduping. CommandID string // Index is the raft log index for this event. The application should persist // the Index of the last applied command atomically with any effects of that // command. Index uint64 Command []byte }
An EventCommandCommitted is broadcast whenever a command has been committed.
type EventLeaderElection ¶
An EventLeaderElection is broadcast when a group starts or completes an election. NodeID is zero when an election is in progress.
type EventMembershipChangeCommitted ¶
type EventMembershipChangeCommitted struct { // GroupID, CommandID, and Index are the same as for EventCommandCommitted. GroupID roachpb.RangeID CommandID string Index uint64 Replica roachpb.ReplicaDescriptor ChangeType raftpb.ConfChangeType Payload []byte // Callback should be invoked when this event and its payload have been // processed. A non-nil error aborts the membership change. Callback func(error) }
An EventMembershipChangeCommitted is broadcast whenever a membership change has been committed.
type MemoryStorage ¶
type MemoryStorage struct {
// contains filtered or unexported fields
}
MemoryStorage is an in-memory implementation of Storage for testing.
func NewMemoryStorage ¶
func NewMemoryStorage() *MemoryStorage
NewMemoryStorage creates a MemoryStorage.
func (*MemoryStorage) GroupLocker ¶
func (m *MemoryStorage) GroupLocker() sync.Locker
GroupLocker implements the Storage interface by returning nil.
func (*MemoryStorage) GroupStorage ¶
func (m *MemoryStorage) GroupStorage(groupID roachpb.RangeID, replicaID roachpb.ReplicaID) (WriteableGroupStorage, error)
GroupStorage implements the Storage interface.
func (*MemoryStorage) ReplicaDescriptor ¶
func (m *MemoryStorage) ReplicaDescriptor(groupID roachpb.RangeID, replicaID roachpb.ReplicaID) (roachpb.ReplicaDescriptor, error)
ReplicaDescriptor implements the Storage interface by returning a dummy descriptor.
func (*MemoryStorage) ReplicaIDForStore ¶
func (m *MemoryStorage) ReplicaIDForStore(groupID roachpb.RangeID, storeID roachpb.StoreID) (roachpb.ReplicaID, error)
ReplicaIDForStore implements the Storage interface.
func (*MemoryStorage) ReplicasFromSnapshot ¶
func (m *MemoryStorage) ReplicasFromSnapshot(_ raftpb.Snapshot) ([]roachpb.ReplicaDescriptor, error)
ReplicasFromSnapshot implements the Storage interface.
type MultiRaft ¶
type MultiRaft struct { Config Events chan []interface{} // contains filtered or unexported fields }
MultiRaft represents a local node in a raft cluster. The owner is responsible for consuming the Events channel in a timely manner.
func NewMultiRaft ¶
func NewMultiRaft(nodeID roachpb.NodeID, storeID roachpb.StoreID, config *Config, stopper *stop.Stopper) (*MultiRaft, error)
NewMultiRaft creates a MultiRaft object.
func (*MultiRaft) ChangeGroupMembership ¶
func (m *MultiRaft) ChangeGroupMembership(groupID roachpb.RangeID, commandID string, changeType raftpb.ConfChangeType, replica roachpb.ReplicaDescriptor, payload []byte) <-chan error
ChangeGroupMembership submits a proposed membership change to the cluster. Payload is an opaque blob that will be returned in EventMembershipChangeCommitted.
func (*MultiRaft) CreateGroup ¶
CreateGroup creates a new consensus group and joins it. The initial membership of this group is determined by the InitialState method of the group's Storage object.
func (*MultiRaft) RemoveGroup ¶
RemoveGroup destroys the consensus group with the given ID. No events for this group will be emitted after this method returns (but some events may still be in the channel buffer).
func (*MultiRaft) Start ¶
func (m *MultiRaft) Start()
Start runs the raft algorithm in a background goroutine.
func (*MultiRaft) SubmitCommand ¶
func (m *MultiRaft) SubmitCommand(groupID roachpb.RangeID, commandID string, command []byte) <-chan error
SubmitCommand sends a command (a binary blob) to the cluster. This method returns when the command has been successfully sent, not when it has been committed. An error or nil will be written to the returned channel when the command has been committed or aborted.
type RaftMessageRequest ¶
type RaftMessageRequest struct { GroupID github_com_cockroachdb_cockroach_roachpb.RangeID `protobuf:"varint,1,opt,name=group_id,casttype=github.com/cockroachdb/cockroach/roachpb.RangeID" json:"group_id"` FromReplica cockroach_roachpb.ReplicaDescriptor `protobuf:"bytes,2,opt,name=from_replica" json:"from_replica"` ToReplica cockroach_roachpb.ReplicaDescriptor `protobuf:"bytes,3,opt,name=to_replica" json:"to_replica"` Message raftpb.Message `protobuf:"bytes,4,opt,name=message" json:"message"` }
RaftMessageRequest is the request used to send raft messages using our protobuf-based RPC codec.
func (*RaftMessageRequest) GetUser ¶
func (*RaftMessageRequest) GetUser() string
GetUser implements security.RequestWithUser. Raft messages are always sent by the node user.
func (*RaftMessageRequest) Marshal ¶
func (m *RaftMessageRequest) Marshal() (data []byte, err error)
func (*RaftMessageRequest) MarshalTo ¶
func (m *RaftMessageRequest) MarshalTo(data []byte) (int, error)
func (*RaftMessageRequest) ProtoMessage ¶
func (*RaftMessageRequest) ProtoMessage()
func (*RaftMessageRequest) Reset ¶
func (m *RaftMessageRequest) Reset()
func (*RaftMessageRequest) Size ¶
func (m *RaftMessageRequest) Size() (n int)
func (*RaftMessageRequest) String ¶
func (m *RaftMessageRequest) String() string
func (*RaftMessageRequest) Unmarshal ¶
func (m *RaftMessageRequest) Unmarshal(data []byte) error
type RaftMessageResponse ¶
type RaftMessageResponse struct { }
RaftMessageResponse is an empty message returned by raft RPCs. If a response is needed it will be sent as a separate message.
func (*RaftMessageResponse) Marshal ¶
func (m *RaftMessageResponse) Marshal() (data []byte, err error)
func (*RaftMessageResponse) MarshalTo ¶
func (m *RaftMessageResponse) MarshalTo(data []byte) (int, error)
func (*RaftMessageResponse) ProtoMessage ¶
func (*RaftMessageResponse) ProtoMessage()
func (*RaftMessageResponse) Reset ¶
func (m *RaftMessageResponse) Reset()
func (*RaftMessageResponse) Size ¶
func (m *RaftMessageResponse) Size() (n int)
func (*RaftMessageResponse) String ¶
func (m *RaftMessageResponse) String() string
func (*RaftMessageResponse) Unmarshal ¶
func (m *RaftMessageResponse) Unmarshal(data []byte) error
type ServerInterface ¶
type ServerInterface interface {
RaftMessage(req *RaftMessageRequest) (*RaftMessageResponse, error)
}
ServerInterface is the methods we expose for use by net/rpc.
type StateMachine ¶
type StateMachine interface { // AppliedIndex returns the last index which has been applied to the given group's // state machine. AppliedIndex(groupID roachpb.RangeID) (uint64, error) }
The StateMachine interface is supplied by the application to manage a persistent state machine (in Cockroach the StateMachine and the Storage are the same thing but they are logically distinct and systems like etcd keep them separate).
type Storage ¶
type Storage interface { // GroupStorage returns an interface which can be used to access the // storage for the specified group. May return ErrGroupDeleted if // the group cannot be found or if the given replica ID is known to // be out of date. The replicaID may be zero if the replica ID is // not known; replica-staleness checks should be disabled in this // case. GroupStorage(groupID roachpb.RangeID, replicaID roachpb.ReplicaID) (WriteableGroupStorage, error) ReplicaDescriptor(groupID roachpb.RangeID, replicaID roachpb.ReplicaID) (roachpb.ReplicaDescriptor, error) ReplicaIDForStore(groupID roachpb.RangeID, storeID roachpb.StoreID) (roachpb.ReplicaID, error) ReplicasFromSnapshot(snap raftpb.Snapshot) ([]roachpb.ReplicaDescriptor, error) // GroupLocker returns a lock which (if non-nil) will be acquired // when a group is being created (which entails multiple calls to // Storage and StateMachine methods and may race with the removal of // a previous incarnation of a group). If it returns a non-nil value // it must return the same value on every call. GroupLocker() sync.Locker }
The Storage interface is supplied by the application to manage persistent storage of raft data.
type Ticker ¶
type Ticker interface { // This channel will be readable once per tick. The time value returned is unspecified; // the channel has this type for compatibility with time.Ticker but other implementations // may not return real times. Chan() <-chan time.Time // Close stops the ticker and releases its resources. Close() }
Ticker encapsulates the timing-related parts of the raft protocol.
type Transport ¶
type Transport interface { // Listen informs the Transport of a local store's ID and callback interface. // The Transport should associate the given id with the server object so other Transport's // Connect methods can find it. Listen(id roachpb.StoreID, server ServerInterface) error // Stop undoes a previous Listen. Stop(id roachpb.StoreID) // Send a message to the node specified in the request's To field. Send(req *RaftMessageRequest) error // Close all associated connections. Close() }
The Transport interface is supplied by the application to manage communication with other nodes. It is responsible for mapping from IDs to some communication channel.
func NewLocalRPCTransport ¶
NewLocalRPCTransport creates a Transport for local testing use. MultiRaft instances sharing the same local Transport can find and communicate with each other by ID (which can be an arbitrary string). Each instance binds to a different unused port on localhost. Because this is just for local testing, it doesn't use TLS.
type WriteableGroupStorage ¶
type WriteableGroupStorage interface { raft.Storage Append(entries []raftpb.Entry) error ApplySnapshot(snap raftpb.Snapshot) error SetHardState(st raftpb.HardState) error }
WriteableGroupStorage represents a single group within a Storage. It is implemented by *raft.MemoryStorage.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package storagetest is a test suite for raft.Storage implementations.
|
Package storagetest is a test suite for raft.Storage implementations. |