multiraft

package
v0.0.0-...-83b8107 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 22, 2015 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Overview

Package multiraft implements the Raft distributed consensus algorithm.

In contrast to other Raft implementations, this version is optimized for the case where one server is a part of many Raft consensus groups (likely with overlapping membership). This entails the use of a shared log and coalesced timers for heartbeats.

A cluster consists of a collection of nodes; the local node is represented by a MultiRaft object. Each node may participate in any number of groups. Nodes must have a globally unique ID (a string), and groups have a globally unique name. The application is responsible for providing a Transport interface that knows how to communicate with other nodes based on their IDs, and a Storage interface to manage persistent data.

The Raft protocol is documented in "In Search of an Understandable Consensus Algorithm" by Diego Ongaro and John Ousterhout. https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf

Package multiraft is a generated protocol buffer package.

It is generated from these files:
	cockroach/multiraft/rpc.proto

It has these top-level messages:
	RaftMessageRequest
	RaftMessageResponse
	ConfChangeContext

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidLengthRpc = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowRpc   = fmt.Errorf("proto: integer overflow")
)
View Source
var ErrGroupDeleted = errors.New("raft group deleted")

ErrGroupDeleted is returned for commands which are pending while their group is deleted.

View Source
var ErrStopped = errors.New("raft processing stopped")

ErrStopped is returned for commands that could not be completed before the node was stopped.

Functions

This section is empty.

Types

type ConfChangeContext

type ConfChangeContext struct {
	CommandID string `protobuf:"bytes,1,opt,name=command_id" json:"command_id"`
	// Payload is the application-level command (i.e. an encoded
	// roachpb.EndTransactionRequest).
	Payload []byte `protobuf:"bytes,2,opt,name=payload" json:"payload,omitempty"`
	// Replica contains full details about the replica being added or removed.
	Replica cockroach_roachpb.ReplicaDescriptor `protobuf:"bytes,3,opt,name=replica" json:"replica"`
}

ConfChangeContext is encoded in the raftpb.ConfChange.Context field.

func (*ConfChangeContext) GetCommandID

func (m *ConfChangeContext) GetCommandID() string

func (*ConfChangeContext) GetPayload

func (m *ConfChangeContext) GetPayload() []byte

func (*ConfChangeContext) GetReplica

func (*ConfChangeContext) Marshal

func (m *ConfChangeContext) Marshal() (data []byte, err error)

func (*ConfChangeContext) MarshalTo

func (m *ConfChangeContext) MarshalTo(data []byte) (int, error)

func (*ConfChangeContext) ProtoMessage

func (*ConfChangeContext) ProtoMessage()

func (*ConfChangeContext) Reset

func (m *ConfChangeContext) Reset()

func (*ConfChangeContext) Size

func (m *ConfChangeContext) Size() (n int)

func (*ConfChangeContext) String

func (m *ConfChangeContext) String() string

func (*ConfChangeContext) Unmarshal

func (m *ConfChangeContext) Unmarshal(data []byte) error

type Config

type Config struct {
	Storage   Storage
	Transport Transport
	// Ticker may be nil to use real time and TickInterval.
	Ticker Ticker
	// StateMachine may be nil if the state machine is transient and always starts from
	// a blank slate.
	StateMachine StateMachine

	// A new election is called if the election timeout elapses with no
	// contact from the leader.  The actual timeout is chosen randomly
	// from the range [ElectionTimeoutTicks*TickInterval,
	// ElectionTimeoutTicks*TickInterval*2) to minimize the chances of
	// several servers trying to become leaders simultaneously. The Raft
	// paper suggests a range of 150-300ms for local networks;
	// geographically distributed installations should use higher values
	// to account for the increased round trip time.
	ElectionTimeoutTicks   int
	HeartbeatIntervalTicks int
	TickInterval           time.Duration

	EntryFormatter raft.EntryFormatter
}

Config contains the parameters necessary to construct a MultiRaft object.

type EventCommandCommitted

type EventCommandCommitted struct {
	GroupID roachpb.RangeID
	// CommandID is the application-supplied ID for this command. The same CommandID
	// may be seen multiple times, so the application should remember this CommandID
	// for deduping.
	CommandID string
	// Index is the raft log index for this event. The application should persist
	// the Index of the last applied command atomically with any effects of that
	// command.
	Index   uint64
	Command []byte
}

An EventCommandCommitted is broadcast whenever a command has been committed.

type EventLeaderElection

type EventLeaderElection struct {
	GroupID   roachpb.RangeID
	ReplicaID roachpb.ReplicaID
	Term      uint64
}

An EventLeaderElection is broadcast when a group starts or completes an election. NodeID is zero when an election is in progress.

type EventMembershipChangeCommitted

type EventMembershipChangeCommitted struct {
	// GroupID, CommandID, and Index are the same as for EventCommandCommitted.
	GroupID    roachpb.RangeID
	CommandID  string
	Index      uint64
	Replica    roachpb.ReplicaDescriptor
	ChangeType raftpb.ConfChangeType
	Payload    []byte

	// Callback should be invoked when this event and its payload have been
	// processed. A non-nil error aborts the membership change.
	Callback func(error)
}

An EventMembershipChangeCommitted is broadcast whenever a membership change has been committed.

type MemoryStorage

type MemoryStorage struct {
	// contains filtered or unexported fields
}

MemoryStorage is an in-memory implementation of Storage for testing.

func NewMemoryStorage

func NewMemoryStorage() *MemoryStorage

NewMemoryStorage creates a MemoryStorage.

func (*MemoryStorage) GroupLocker

func (m *MemoryStorage) GroupLocker() sync.Locker

GroupLocker implements the Storage interface by returning nil.

func (*MemoryStorage) GroupStorage

func (m *MemoryStorage) GroupStorage(groupID roachpb.RangeID, replicaID roachpb.ReplicaID) (WriteableGroupStorage, error)

GroupStorage implements the Storage interface.

func (*MemoryStorage) ReplicaDescriptor

func (m *MemoryStorage) ReplicaDescriptor(groupID roachpb.RangeID, replicaID roachpb.ReplicaID) (roachpb.ReplicaDescriptor, error)

ReplicaDescriptor implements the Storage interface by returning a dummy descriptor.

func (*MemoryStorage) ReplicaIDForStore

func (m *MemoryStorage) ReplicaIDForStore(groupID roachpb.RangeID, storeID roachpb.StoreID) (roachpb.ReplicaID, error)

ReplicaIDForStore implements the Storage interface.

func (*MemoryStorage) ReplicasFromSnapshot

func (m *MemoryStorage) ReplicasFromSnapshot(_ raftpb.Snapshot) ([]roachpb.ReplicaDescriptor, error)

ReplicasFromSnapshot implements the Storage interface.

type MultiRaft

type MultiRaft struct {
	Config

	Events chan []interface{}
	// contains filtered or unexported fields
}

MultiRaft represents a local node in a raft cluster. The owner is responsible for consuming the Events channel in a timely manner.

func NewMultiRaft

func NewMultiRaft(nodeID roachpb.NodeID, storeID roachpb.StoreID, config *Config, stopper *stop.Stopper) (*MultiRaft, error)

NewMultiRaft creates a MultiRaft object.

func (*MultiRaft) ChangeGroupMembership

func (m *MultiRaft) ChangeGroupMembership(groupID roachpb.RangeID, commandID string,
	changeType raftpb.ConfChangeType, replica roachpb.ReplicaDescriptor, payload []byte) <-chan error

ChangeGroupMembership submits a proposed membership change to the cluster. Payload is an opaque blob that will be returned in EventMembershipChangeCommitted.

func (*MultiRaft) CreateGroup

func (m *MultiRaft) CreateGroup(groupID roachpb.RangeID) error

CreateGroup creates a new consensus group and joins it. The initial membership of this group is determined by the InitialState method of the group's Storage object.

func (*MultiRaft) RemoveGroup

func (m *MultiRaft) RemoveGroup(groupID roachpb.RangeID) error

RemoveGroup destroys the consensus group with the given ID. No events for this group will be emitted after this method returns (but some events may still be in the channel buffer).

func (*MultiRaft) Start

func (m *MultiRaft) Start()

Start runs the raft algorithm in a background goroutine.

func (*MultiRaft) Status

func (m *MultiRaft) Status(groupID roachpb.RangeID) *raft.Status

Status returns the current status of the given group.

func (*MultiRaft) SubmitCommand

func (m *MultiRaft) SubmitCommand(groupID roachpb.RangeID, commandID string, command []byte) <-chan error

SubmitCommand sends a command (a binary blob) to the cluster. This method returns when the command has been successfully sent, not when it has been committed. An error or nil will be written to the returned channel when the command has been committed or aborted.

type RaftMessageRequest

type RaftMessageRequest struct {
	GroupID     github_com_cockroachdb_cockroach_roachpb.RangeID `protobuf:"varint,1,opt,name=group_id,casttype=github.com/cockroachdb/cockroach/roachpb.RangeID" json:"group_id"`
	FromReplica cockroach_roachpb.ReplicaDescriptor              `protobuf:"bytes,2,opt,name=from_replica" json:"from_replica"`
	ToReplica   cockroach_roachpb.ReplicaDescriptor              `protobuf:"bytes,3,opt,name=to_replica" json:"to_replica"`
	Message     raftpb.Message                                   `protobuf:"bytes,4,opt,name=message" json:"message"`
}

RaftMessageRequest is the request used to send raft messages using our protobuf-based RPC codec.

func (*RaftMessageRequest) GetFromReplica

func (*RaftMessageRequest) GetGroupID

func (*RaftMessageRequest) GetMessage

func (m *RaftMessageRequest) GetMessage() raftpb.Message

func (*RaftMessageRequest) GetToReplica

func (*RaftMessageRequest) GetUser

func (*RaftMessageRequest) GetUser() string

GetUser implements security.RequestWithUser. Raft messages are always sent by the node user.

func (*RaftMessageRequest) Marshal

func (m *RaftMessageRequest) Marshal() (data []byte, err error)

func (*RaftMessageRequest) MarshalTo

func (m *RaftMessageRequest) MarshalTo(data []byte) (int, error)

func (*RaftMessageRequest) ProtoMessage

func (*RaftMessageRequest) ProtoMessage()

func (*RaftMessageRequest) Reset

func (m *RaftMessageRequest) Reset()

func (*RaftMessageRequest) Size

func (m *RaftMessageRequest) Size() (n int)

func (*RaftMessageRequest) String

func (m *RaftMessageRequest) String() string

func (*RaftMessageRequest) Unmarshal

func (m *RaftMessageRequest) Unmarshal(data []byte) error

type RaftMessageResponse

type RaftMessageResponse struct {
}

RaftMessageResponse is an empty message returned by raft RPCs. If a response is needed it will be sent as a separate message.

func (*RaftMessageResponse) Marshal

func (m *RaftMessageResponse) Marshal() (data []byte, err error)

func (*RaftMessageResponse) MarshalTo

func (m *RaftMessageResponse) MarshalTo(data []byte) (int, error)

func (*RaftMessageResponse) ProtoMessage

func (*RaftMessageResponse) ProtoMessage()

func (*RaftMessageResponse) Reset

func (m *RaftMessageResponse) Reset()

func (*RaftMessageResponse) Size

func (m *RaftMessageResponse) Size() (n int)

func (*RaftMessageResponse) String

func (m *RaftMessageResponse) String() string

func (*RaftMessageResponse) Unmarshal

func (m *RaftMessageResponse) Unmarshal(data []byte) error

type ServerInterface

type ServerInterface interface {
	RaftMessage(req *RaftMessageRequest) (*RaftMessageResponse, error)
}

ServerInterface is the methods we expose for use by net/rpc.

type StateMachine

type StateMachine interface {
	// AppliedIndex returns the last index which has been applied to the given group's
	// state machine.
	AppliedIndex(groupID roachpb.RangeID) (uint64, error)
}

The StateMachine interface is supplied by the application to manage a persistent state machine (in Cockroach the StateMachine and the Storage are the same thing but they are logically distinct and systems like etcd keep them separate).

type Storage

type Storage interface {
	// GroupStorage returns an interface which can be used to access the
	// storage for the specified group. May return ErrGroupDeleted if
	// the group cannot be found or if the given replica ID is known to
	// be out of date. The replicaID may be zero if the replica ID is
	// not known; replica-staleness checks should be disabled in this
	// case.
	GroupStorage(groupID roachpb.RangeID, replicaID roachpb.ReplicaID) (WriteableGroupStorage, error)

	ReplicaDescriptor(groupID roachpb.RangeID, replicaID roachpb.ReplicaID) (roachpb.ReplicaDescriptor, error)
	ReplicaIDForStore(groupID roachpb.RangeID, storeID roachpb.StoreID) (roachpb.ReplicaID, error)
	ReplicasFromSnapshot(snap raftpb.Snapshot) ([]roachpb.ReplicaDescriptor, error)

	// GroupLocker returns a lock which (if non-nil) will be acquired
	// when a group is being created (which entails multiple calls to
	// Storage and StateMachine methods and may race with the removal of
	// a previous incarnation of a group). If it returns a non-nil value
	// it must return the same value on every call.
	GroupLocker() sync.Locker
}

The Storage interface is supplied by the application to manage persistent storage of raft data.

type Ticker

type Ticker interface {
	// This channel will be readable once per tick. The time value returned is unspecified;
	// the channel has this type for compatibility with time.Ticker but other implementations
	// may not return real times.
	Chan() <-chan time.Time

	// Close stops the ticker and releases its resources.
	Close()
}

Ticker encapsulates the timing-related parts of the raft protocol.

type Transport

type Transport interface {
	// Listen informs the Transport of a local store's ID and callback interface.
	// The Transport should associate the given id with the server object so other Transport's
	// Connect methods can find it.
	Listen(id roachpb.StoreID, server ServerInterface) error

	// Stop undoes a previous Listen.
	Stop(id roachpb.StoreID)

	// Send a message to the node specified in the request's To field.
	Send(req *RaftMessageRequest) error

	// Close all associated connections.
	Close()
}

The Transport interface is supplied by the application to manage communication with other nodes. It is responsible for mapping from IDs to some communication channel.

func NewLocalRPCTransport

func NewLocalRPCTransport(stopper *stop.Stopper) Transport

NewLocalRPCTransport creates a Transport for local testing use. MultiRaft instances sharing the same local Transport can find and communicate with each other by ID (which can be an arbitrary string). Each instance binds to a different unused port on localhost. Because this is just for local testing, it doesn't use TLS.

type WriteableGroupStorage

type WriteableGroupStorage interface {
	raft.Storage
	Append(entries []raftpb.Entry) error
	ApplySnapshot(snap raftpb.Snapshot) error
	SetHardState(st raftpb.HardState) error
}

WriteableGroupStorage represents a single group within a Storage. It is implemented by *raft.MemoryStorage.

Directories

Path Synopsis
Package storagetest is a test suite for raft.Storage implementations.
Package storagetest is a test suite for raft.Storage implementations.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL