raft

package
v0.1.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 14, 2023 License: Apache-2.0 Imports: 28 Imported by: 0

Documentation

Overview

Package raft contains Raft consensus for WebMesh.

Package streamlayer contains the Raft stream layer implementation.

Index

Constants

View Source
const (
	RaftListenAddressEnvVar   = "RAFT_LISTEN_ADDRESS"
	DataDirEnvVar             = "RAFT_DATA_DIR"
	InMemoryEnvVar            = "RAFT_IN_MEMORY"
	ConnectionPoolCountEnvVar = "RAFT_CONNECTION_POOL_COUNT"
	ConnectionTimeoutEnvVar   = "RAFT_CONNECTION_TIMEOUT"
	HeartbeatTimeoutEnvVar    = "RAFT_HEARTBEAT_TIMEOUT"
	ElectionTimeoutEnvVar     = "RAFT_ELECTION_TIMEOUT"
	ApplyTimeoutEnvVar        = "RAFT_APPLY_TIMEOUT"
	CommitTimeoutEnvVar       = "RAFT_COMMIT_TIMEOUT"
	MaxAppendEntriesEnvVar    = "RAFT_MAX_APPEND_ENTRIES"
	LeaderLeaseTimeoutEnvVar  = "RAFT_LEADER_LEASE_TIMEOUT"
	SnapshotIntervalEnvVar    = "RAFT_SNAPSHOT_INTERVAL"
	SnapshotThresholdEnvVar   = "RAFT_SNAPSHOT_THRESHOLD"
	SnapshotRetentionEnvVar   = "RAFT_SNAPSHOT_RETENTION"
	ObserverChanBufferEnvVar  = "RAFT_OBSERVER_CHAN_BUFFER"
	RaftLogLevelEnvVar        = "RAFT_LOG_LEVEL"
	RaftPreferIPv6EnvVar      = "RAFT_PREFER_IPV6"
	LeaveOnShutdownEnvVar     = "RAFT_LEAVE_ON_SHUTDOWN"
	StartupTimeoutEnvVar      = "RAFT_STARTUP_TIMEOUT"

	// RaftStorePath is the raft stable and log store directory.
	RaftStorePath = "raft-store"
	// DataStoragePath is the raft data storage directory.
	DataStoragePath = "raft-data"
	// DefaultListenPort is the default raft listen port
	DefaultListenPort = 9443
)
View Source
const (
	Follower  = raft.Follower
	Candidate = raft.Candidate
	Leader    = raft.Leader
	Shutdown  = raft.Shutdown
)

Raft states.

View Source
const (
	Voter    = raft.Voter
	Nonvoter = raft.Nonvoter
)

Raft suffrage states.

Variables

View Source
var (
	// ErrStarted is returned when the Raft node is already started.
	ErrStarted = errors.New("raft node already started")
	// ErrClosed is returned when the Raft node is already closed.
	ErrClosed = errors.New("raft node is closed")
	// ErrAlreadyBootstrapped is returned when the Raft node is already bootstrapped.
	ErrAlreadyBootstrapped = raft.ErrCantBootstrap
	// ErrNotLeader is returned when the Raft node is not the leader.
	ErrNotLeader = raft.ErrNotLeader
	// ErrNotVoter is returned when the Raft node is not a voter.
	ErrNotVoter = raft.ErrNotVoter
)

Functions

func MarshalLogEntry added in v0.1.3

func MarshalLogEntry(logEntry *v1.RaftLogEntry) ([]byte, error)

MarshalLogEntry marshals a RaftLogEntry.

func UnmarshalLogEntry added in v0.1.3

func UnmarshalLogEntry(data []byte) (*v1.RaftLogEntry, error)

UnmarshalLogEntry unmarshals a RaftLogEntry.

Types

type BootstrapOptions

type BootstrapOptions struct {
	// AdvertiseAddress is the address to advertise to the other
	// bootstrap nodes. Defaults to localhost:listen_port if empty.
	AdvertiseAddress string
	// Servers are the Raft servers to bootstrap with.
	// Keys are the node IDs, and values are the Raft addresses.
	Servers map[string]string
	// OnBootstrapped is called when the cluster is bootstrapped.
	OnBootstrapped func(isLeader bool) error
}

BootstrapOptions are options for bootstrapping a Raft node.

type LeaderDialer added in v0.1.3

type LeaderDialer interface {
	DialLeader(ctx context.Context) (*grpc.ClientConn, error)
}

LeaderDialer is the interface for dialing the leader.

type LeaderDialerFunc added in v0.1.3

type LeaderDialerFunc func(ctx context.Context) (*grpc.ClientConn, error)

LeaderDialerFunc is the function signature for dialing the leader. It is supplied by the mesh during startup. It can be used as an alternative to the LeaderDialer interface.

func (LeaderDialerFunc) DialLeader added in v0.1.3

func (f LeaderDialerFunc) DialLeader(ctx context.Context) (*grpc.ClientConn, error)

type LeaderObservation

type LeaderObservation = raft.LeaderObservation

LeaderObservation is an alias for raft.LeaderObservation.

type LogStoreCloser

type LogStoreCloser interface {
	io.Closer
	raft.LogStore
}

LogStoreCloser is a LogStore that can be closed.

type MemoryStore

type MemoryStore interface {
	LogStoreCloser
	StableStoreCloser
}

MemoryStore is a Store that is in-memory.

func NewInmemStore

func NewInmemStore() MemoryStore

NewInmemStore returns a new in-memory store that can be used for logs and stable storage.

type MonotonicLogStore

type MonotonicLogStore struct {
	raft.LogStore
}

MonotonicLogStore is a LogStore that is monotonic.

func (*MonotonicLogStore) IsMonotonic

func (m *MonotonicLogStore) IsMonotonic() bool

IsMonotonic returns true if the log store is monotonic.

type Observation

type Observation = raft.Observation

Observation is an alias for raft.Observation.

type Options

type Options struct {
	// ListenAddress is the address to listen on for raft.
	ListenAddress string `` /* 135-byte string literal not displayed */
	// DataDir is the directory to store data in.
	DataDir string `json:"data-dir,omitempty" yaml:"data-dir,omitempty" toml:"data-dir,omitempty" mapstructure:"data-dir,omitempty"`
	// InMemory is if the store should be in memory. This should only be used for testing and ephemeral nodes.
	InMemory bool `json:"in-memory,omitempty" yaml:"in-memory,omitempty" toml:"in-memory,omitempty" mapstructure:"in-memory,omitempty"`
	// ConnectionPoolCount is the number of connections to pool. If 0, no connection pooling is used.
	ConnectionPoolCount int `` /* 163-byte string literal not displayed */
	// ConnectionTimeout is the timeout for connections.
	ConnectionTimeout time.Duration `` /* 151-byte string literal not displayed */
	// HeartbeatTimeout is the timeout for heartbeats.
	HeartbeatTimeout time.Duration `` /* 147-byte string literal not displayed */
	// ElectionTimeout is the timeout for elections.
	ElectionTimeout time.Duration `` /* 143-byte string literal not displayed */
	// ApplyTimeout is the timeout for applying.
	ApplyTimeout time.Duration `` /* 131-byte string literal not displayed */
	// CommitTimeout is the timeout for committing.
	CommitTimeout time.Duration `` /* 135-byte string literal not displayed */
	// MaxAppendEntries is the maximum number of append entries.
	MaxAppendEntries int `` /* 151-byte string literal not displayed */
	// LeaderLeaseTimeout is the timeout for leader leases.
	LeaderLeaseTimeout time.Duration `` /* 159-byte string literal not displayed */
	// SnapshotInterval is the interval to take snapshots.
	SnapshotInterval time.Duration `` /* 147-byte string literal not displayed */
	// SnapshotThreshold is the threshold to take snapshots.
	SnapshotThreshold uint64 `` /* 151-byte string literal not displayed */
	// SnapshotRetention is the number of snapshots to retain.
	SnapshotRetention uint64 `` /* 151-byte string literal not displayed */
	// ObserverChanBuffer is the buffer size for the observer channel.
	ObserverChanBuffer int `` /* 159-byte string literal not displayed */
	// LogLevel is the log level for the raft backend.
	LogLevel string `json:"log-level,omitempty" yaml:"log-level,omitempty" toml:"log-level,omitempty" mapstructure:"log-level,omitempty"`
	// PreferIPv6 is the prefer IPv6 flag.
	PreferIPv6 bool `json:"prefer-ipv6,omitempty" yaml:"prefer-ipv6,omitempty" toml:"prefer-ipv6,omitempty" mapstructure:"prefer-ipv6,omitempty"`
	// LeaveOnShutdown is the leave on shutdown flag.
	LeaveOnShutdown bool `` /* 147-byte string literal not displayed */

	// Below are callbacks used internally or by external packages.
	OnApplyLog        func(ctx context.Context, term, index uint64, log *v1.RaftLogEntry) `json:"-" yaml:"-" toml:"-" mapstructure:"-"`
	OnSnapshotRestore func(ctx context.Context, meta *SnapshotMeta, data io.ReadCloser)   `json:"-" yaml:"-" toml:"-" mapstructure:"-"`
	OnObservation     func(ev Observation)                                                `json:"-" yaml:"-" toml:"-" mapstructure:"-"`
}

Options are the raft options.

func NewOptions

func NewOptions(port int) *Options

NewOptions returns new raft options with the default values and given listen port. If the port is 0, the default is used.

func (*Options) BindFlags

func (o *Options) BindFlags(fl *flag.FlagSet, prefix ...string)

BindFlags binds the flags to the options.

func (*Options) DataStoragePath

func (o *Options) DataStoragePath() string

DataStoragePath returns the data directory.

func (*Options) DeepCopy added in v0.1.13

func (o *Options) DeepCopy() *Options

DeepCopy returns a deep copy of the options.

func (*Options) RaftConfig

func (o *Options) RaftConfig(nodeID string) *raft.Config

RaftConfig builds a raft config.

func (*Options) StorePath

func (o *Options) StorePath() string

StorePath returns the stable store path.

func (*Options) Validate

func (o *Options) Validate() error

Validate validates the raft options.

type PeerObservation

type PeerObservation = raft.PeerObservation

PeerObservation is an alias for raft.PeerObservation.

type Raft

type Raft interface {
	// Start starts the Raft node.
	Start(ctx context.Context, opts *StartOptions) error
	// Bootstrap attempts to bootstrap the Raft cluster. If the cluster is already
	// bootstrapped, ErrAlreadyBootstrapped is returned. If the cluster is not
	// bootstrapped and bootstrapping succeeds, the optional callback is called
	// with isLeader flag set to true if the node is the leader, and false otherwise.
	// Any error returned by the callback is returned by Bootstrap.
	Bootstrap(ctx context.Context, opts *BootstrapOptions) error
	// Storage returns the storage. This is only valid after Start is called.
	Storage() storage.Storage
	// Raft returns the Raft instance. This is only valid after Start is called.
	Raft() *raft.Raft
	// Configuration returns the current raft configuration.
	Configuration() raft.Configuration
	// LastAppliedIndex returns the last applied index.
	LastAppliedIndex() uint64
	// ListenPort returns the listen port.
	ListenPort() int
	// IsLeader returns true if the Raft node is the leader.
	IsLeader() bool
	// IsVoter returns true if the Raft node is a voter.
	IsVoter() bool
	// AddNonVoter adds a non-voting node to the cluster with timeout enforced by the context.
	AddNonVoter(ctx context.Context, id string, addr string) error
	// AddVoter adds a voting node to the cluster with timeout enforced by the context.
	AddVoter(ctx context.Context, id string, addr string) error
	// DemoteVoter demotes a voting node to a non-voting node with timeout enforced by the context.
	DemoteVoter(ctx context.Context, id string) error
	// RemoveServer removes a peer from the cluster with timeout enforced by the context.
	RemoveServer(ctx context.Context, id string, wait bool) error
	// Restore restores the Raft node from a snapshot.
	Restore(rdr io.ReadCloser) error
	// Barrier issues a barrier request to the cluster. This is a no-op if the node is not the leader.
	Barrier(ctx context.Context, timeout time.Duration) (took time.Duration, err error)
	// Stop stops the Raft node.
	Stop(ctx context.Context) error
}

Raft is the interface for Raft consensus and storage.

func New

func New(opts *Options, dialer LeaderDialer) Raft

New returns a new Raft node.

type SnapshotMeta

type SnapshotMeta = raft.SnapshotMeta

SnapshotMeta is an alias for raft.SnapshotMeta.

type StableStoreCloser

type StableStoreCloser interface {
	io.Closer
	raft.StableStore
}

StableStoreCloser is a StableStore that can be closed.

type StartOptions

type StartOptions struct {
	// NodeID is the node ID.
	NodeID string
}

StartOptons are options for starting a Raft node.

type StreamLayer

type StreamLayer interface {
	raft.StreamLayer
	// ListenPort returns the port the transport is listening on.
	ListenPort() int
}

StreamLayer is the StreamLayer interface.

func NewStreamLayer

func NewStreamLayer(addr string) (StreamLayer, error)

NewStreamLayer creates a new stream layer listening on the given address.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL