Documentation
¶
Index ¶
- Variables
- type Batch
- type Config
- type Decoder
- type DiskStorage
- type Encoder
- type GroupConfig
- type GroupNode
- type LeaderChanged
- type MultiNode
- func (n *MultiNode) AddNodeToGroup(ctx context.Context, node, group uint64, data interface{}) error
- func (n *MultiNode) Campaign(ctx context.Context, group uint64) error
- func (n *MultiNode) CreateGroup(ctx context.Context, cfg GroupConfig) error
- func (n *MultiNode) Exists(ctx context.Context, gid uint64) (ok bool)
- func (n *MultiNode) Propose(ctx context.Context, group uint64, req interface{}) (res interface{}, err error)
- func (n *MultiNode) ProposeRetry(group uint64, req interface{}, timeout time.Duration, maxRetries int) (res interface{}, err error)
- func (n *MultiNode) RemoveNodeFromGroup(ctx context.Context, node, group uint64, data interface{}) error
- func (n *MultiNode) Status(group uint64) *etcdraft.Status
- func (n *MultiNode) StepBatch(ctx context.Context, batch Batch, timeout time.Duration) error
- func (n *MultiNode) Stop()
- func (n *MultiNode) String() string
- type Priority
- type Reporter
- type Request
- type RequestID
- type Response
- type SendFunc
- type StateMachine
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrStopped is returned when the node is already stopped. ErrStopped = errors.New("raft: node stopped") // ErrUnreachable is returned when SendFunc cannot reach the destination node. ErrUnreachable = errors.New("raft: node unreachable") // ErrGroupExists is returned when the group already exists. ErrGroupExists = errors.New("raft: group exists") // ErrNoSuchGroup is returned when the requested group does not exist. ErrNoSuchGroup = errors.New("raft: no such group") )
var (
ErrEmptyMessage = errors.New("raft: empty message")
)
var (
ErrNoEnoughData = errors.New("raft: not enough data")
)
Functions ¶
This section is empty.
Types ¶
type Batch ¶
type Batch struct { From uint64 // Source node. To uint64 // Destination node. Priority Priority // Priority of this batch. Messages map[uint64][]raftpb.Message // List of messages of each group. }
Batch represents a batch of messages from one node to another with a specific priority.
type Config ¶
type Config struct { ID uint64 // Node ID. Name string // Node name. Send SendFunc // Network send function. Ticker <-chan time.Time // Ticker of the node. }
Config represents the configuration of a MultiNode.
type Decoder ¶
type Decoder struct {
// contains filtered or unexported fields
}
Decoder decodes size delimited raft messages in a bytes.Buffer.
Example ¶
var msgs []raftpb.Message var buf bytes.Buffer dec := NewDecoder(&buf) for { var msg raftpb.Message err := dec.Decode(&msg) if err == io.EOF { break } if err != nil { panic(err) } msgs = append(msgs, msg) }
Output:
func NewDecoder ¶
NewDecoder creates a Decoder that decodes messages from r.
type DiskStorage ¶
type DiskStorage interface { // Save function saves ents and state to the underlying stable storage. // Save MUST block until st and ents are on stable storage. Save(st raftpb.HardState, ents []raftpb.Entry) error // SaveSnap function saves snapshot to the underlying stable storage. SaveSnap(snap raftpb.Snapshot) error // Sync syncs the data written with the disk. Sync() error // Close closes the storage and performs finalization. Close() error }
func OpenStorage ¶
func OpenStorage(node uint64, dir string, stateMachine StateMachine) ( raftStorage *etcdraft.MemoryStorage, diskStorage DiskStorage, lastSnapIdx, lastEntIdx uint64, exists bool, err error)
OpenStorage creates or reloads the disk-backed storage in path.
type Encoder ¶
type Encoder struct {
// contains filtered or unexported fields
}
Encoder encodes size delimited raft messages in a bytes.Buffer.
Example ¶
msgs := []raftpb.Message{ /*...*/ } var buf bytes.Buffer enc := NewEncoder(&buf) for _, m := range msgs { if err := enc.Encode(m); err != nil { panic(err) } }
Output:
func NewEncoder ¶
NewEncoder creates an Encoder that encodes messages in w.
type GroupConfig ¶
type GroupConfig struct { ID uint64 // Group ID. Name string // Group name. StateMachine StateMachine // Group state machine. Peers []etcdraft.Peer // Peers of this group. DataDir string // Where to save raft state. SnapCount uint64 // How many entries to include in a snapshot. FsyncTick time.Duration // The frequency of fsyncs. ElectionTicks int // Number of ticks to fire an election. HeartbeatTicks int // Number of ticks to fire heartbeats. MaxInFlights int // Maximum number of inflight messages. MaxMsgSize uint64 // Maximum number of entries in a message. }
GroupConfig represents the configuration of a raft group.
type GroupNode ¶
type GroupNode struct { Group uint64 // Group's ID. Node uint64 // Node's ID. Data interface{} // Data is application-defined. Must be registered in gob. }
GroupNode stores the ID, the group and an application-defined data for a node in a raft group.
func (GroupNode) MustEncode ¶
MustEncode encodes the hive into bytes.
type LeaderChanged ¶
type LeaderChanged struct { Old uint64 // The old leader. New uint64 // The new leader. Term uint64 // The raft term. }
LeaderChanged indicate that the leader of the raft quorom is changed.
type MultiNode ¶
type MultiNode struct {
// contains filtered or unexported fields
}
MultiNode represents a node that participates in multiple raft consensus groups.
func StartMultiNode ¶
StartMultiNode starts a MultiNode with the given id and name. Send function is used send all messags of this node. The ticker is used for all groups. You can fine tune the hearbeat and election timeouts in the group configs.
func (*MultiNode) AddNodeToGroup ¶
func (*MultiNode) CreateGroup ¶
func (n *MultiNode) CreateGroup(ctx context.Context, cfg GroupConfig) error
func (*MultiNode) Propose ¶
func (n *MultiNode) Propose(ctx context.Context, group uint64, req interface{}) (res interface{}, err error)
Propose proposes the request and returns the response. This method blocks and returns either when the ctx is cancelled or the raft node returns a response.
func (*MultiNode) ProposeRetry ¶
func (n *MultiNode) ProposeRetry(group uint64, req interface{}, timeout time.Duration, maxRetries int) (res interface{}, err error)
ProposeRetry proposes the request to the given group. It retires maxRetries times using the given timeout. If maxRetries is -1 it will keep proposing the request until it retrieves a response.
func (*MultiNode) RemoveNodeFromGroup ¶
func (*MultiNode) Status ¶
Status returns the latest status of the group. Returns nil if the group does not exists.
type Reporter ¶
type Reporter interface { // Report reports the given node is not reachable for the last send. ReportUnreachable(id, group uint64) // ReportSnapshot reports the stutus of the sent snapshot. ReportSnapshot(id, group uint64, status etcdraft.SnapshotStatus) }
type Response ¶
type Response struct { ID RequestID // Response ID is always set to the ID of the request. Data interface{} // Data is set if there was no error. Err error // Error, if any. }
Response represents a response to a request.
type SendFunc ¶
SendFunc sent a batch of messages to a group.
The sender should send messages in a way that messages of a higher priority are not blocked by messages of a lower priority on network channels.
type StateMachine ¶
type StateMachine interface { // Save saves the store into bytes. Save() ([]byte, error) // Recover recovers the store from bytes. Restore(b []byte) error // Apply applies a request and returns the response. Apply(req interface{}) (interface{}, error) // ApplyConfChange processes a configuration change. ApplyConfChange(cc raftpb.ConfChange, gn GroupNode) error // ProcessStatusChange is called whenever the leader of the quorum is changed. ProcessStatusChange(event interface{}) }
StateMachine represents an application defined state.