raft

package
v0.0.23 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 29, 2023 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Overview

Package raft contains Raft consensus for WebMesh.

Package streamlayer contains the Raft stream layer implementation.

Index

Constants

View Source
const (
	RaftListenAddressEnvVar   = "RAFT_LISTEN_ADDRESS"
	DataDirEnvVar             = "RAFT_DATA_DIR"
	InMemoryEnvVar            = "RAFT_IN_MEMORY"
	ConnectionPoolCountEnvVar = "RAFT_CONNECTION_POOL_COUNT"
	ConnectionTimeoutEnvVar   = "RAFT_CONNECTION_TIMEOUT"
	HeartbeatTimeoutEnvVar    = "RAFT_HEARTBEAT_TIMEOUT"
	ElectionTimeoutEnvVar     = "RAFT_ELECTION_TIMEOUT"
	ApplyTimeoutEnvVar        = "RAFT_APPLY_TIMEOUT"
	CommitTimeoutEnvVar       = "RAFT_COMMIT_TIMEOUT"
	MaxAppendEntriesEnvVar    = "RAFT_MAX_APPEND_ENTRIES"
	LeaderLeaseTimeoutEnvVar  = "RAFT_LEADER_LEASE_TIMEOUT"
	SnapshotIntervalEnvVar    = "RAFT_SNAPSHOT_INTERVAL"
	SnapshotThresholdEnvVar   = "RAFT_SNAPSHOT_THRESHOLD"
	SnapshotRetentionEnvVar   = "RAFT_SNAPSHOT_RETENTION"
	ObserverChanBufferEnvVar  = "RAFT_OBSERVER_CHAN_BUFFER"
	RaftLogLevelEnvVar        = "RAFT_LOG_LEVEL"
	RaftPreferIPv6EnvVar      = "RAFT_PREFER_IPV6"
	LeaveOnShutdownEnvVar     = "RAFT_LEAVE_ON_SHUTDOWN"
	StartupTimeoutEnvVar      = "RAFT_STARTUP_TIMEOUT"

	// RaftStorePath is the raft stable and log store directory.
	RaftStorePath = "raft-store"
	// DataStoragePath is the raft data storage directory.
	DataStoragePath = "raft-data"
)
View Source
const (
	Follower  = raft.Follower
	Candidate = raft.Candidate
	Leader    = raft.Leader
	Shutdown  = raft.Shutdown
)

Raft states.

View Source
const (
	Voter    = raft.Voter
	Nonvoter = raft.Nonvoter
)

Raft suffrage states.

Variables

View Source
var (
	// ErrStarted is returned when the Raft node is already started.
	ErrStarted = errors.New("raft node already started")
	// ErrClosed is returned when the Raft node is already closed.
	ErrClosed = errors.New("raft node is closed")
	// ErrAlreadyBootstrapped is returned when the Raft node is already bootstrapped.
	ErrAlreadyBootstrapped = raft.ErrCantBootstrap
	// ErrNotLeader is returned when the Raft node is not the leader.
	ErrNotLeader = raft.ErrNotLeader
)

Functions

This section is empty.

Types

type BootstrapOptions

type BootstrapOptions struct {
	// AdvertiseAddress is the address to advertise to the other
	// bootstrap nodes. Defaults to localhost:listen_port if empty.
	AdvertiseAddress string
	// Servers are the Raft servers to bootstrap with.
	// Keys are the node IDs, and values are the Raft addresses.
	Servers map[string]string
	// OnBootstrapped is called when the cluster is bootstrapped.
	OnBootstrapped func(isLeader bool) error
}

BootstrapOptions are options for bootstrapping a Raft node.

type LeaderObservation

type LeaderObservation = raft.LeaderObservation

LeaderObservation is an alias for raft.LeaderObservation.

type LogStoreCloser

type LogStoreCloser interface {
	io.Closer
	raft.LogStore
}

LogStoreCloser is a LogStore that can be closed.

type MemoryStore

type MemoryStore interface {
	LogStoreCloser
	StableStoreCloser
}

MemoryStore is a Store that is in-memory.

func NewInmemStore

func NewInmemStore() MemoryStore

NewInmemStore returns a new in-memory store that can be used for logs and stable storage.

type MonotonicLogStore

type MonotonicLogStore struct {
	raft.LogStore
}

MonotonicLogStore is a LogStore that is monotonic.

func (*MonotonicLogStore) IsMonotonic

func (m *MonotonicLogStore) IsMonotonic() bool

IsMonotonic returns true if the log store is monotonic.

type Observation

type Observation = raft.Observation

Observation is an alias for raft.Observation.

type Options

type Options struct {
	// ListenAddress is the address to listen on for raft.
	ListenAddress string `json:"listen-address,omitempty" yaml:"listen-address,omitempty" toml:"listen-address,omitempty"`
	// DataDir is the directory to store data in.
	DataDir string `json:"data-dir,omitempty" yaml:"data-dir,omitempty" toml:"data-dir,omitempty"`
	// InMemory is if the store should be in memory. This should only be used for testing and ephemeral nodes.
	InMemory bool `json:"in-memory,omitempty" yaml:"in-memory,omitempty" toml:"in-memory,omitempty"`
	// ConnectionPoolCount is the number of connections to pool. If 0, no connection pooling is used.
	ConnectionPoolCount int `json:"connection-pool-count,omitempty" yaml:"connection-pool-count,omitempty" toml:"connection-pool-count,omitempty"`
	// ConnectionTimeout is the timeout for connections.
	ConnectionTimeout time.Duration `json:"connection-timeout,omitempty" yaml:"connection-timeout,omitempty" toml:"connection-timeout,omitempty"`
	// HeartbeatTimeout is the timeout for heartbeats.
	HeartbeatTimeout time.Duration `json:"heartbeat-timeout,omitempty" yaml:"heartbeat-timeout,omitempty" toml:"heartbeat-timeout,omitempty"`
	// ElectionTimeout is the timeout for elections.
	ElectionTimeout time.Duration `json:"election-timeout,omitempty" yaml:"election-timeout,omitempty" toml:"election-timeout,omitempty"`
	// ApplyTimeout is the timeout for applying.
	ApplyTimeout time.Duration `json:"apply-timeout,omitempty" yaml:"apply-timeout,omitempty" toml:"apply-timeout,omitempty"`
	// CommitTimeout is the timeout for committing.
	CommitTimeout time.Duration `json:"commit-timeout,omitempty" yaml:"commit-timeout,omitempty" toml:"commit-timeout,omitempty"`
	// MaxAppendEntries is the maximum number of append entries.
	MaxAppendEntries int `json:"max-append-entries,omitempty" yaml:"max-append-entries,omitempty" toml:"max-append-entries,omitempty"`
	// LeaderLeaseTimeout is the timeout for leader leases.
	LeaderLeaseTimeout time.Duration `json:"leader-lease-timeout,omitempty" yaml:"leader-lease-timeout,omitempty" toml:"leader-lease-timeout,omitempty"`
	// SnapshotInterval is the interval to take snapshots.
	SnapshotInterval time.Duration `json:"snapshot-interval,omitempty" yaml:"snapshot-interval,omitempty" toml:"snapshot-interval,omitempty"`
	// SnapshotThreshold is the threshold to take snapshots.
	SnapshotThreshold uint64 `json:"snapshot-threshold,omitempty" yaml:"snapshot-threshold,omitempty" toml:"snapshot-threshold,omitempty"`
	// SnapshotRetention is the number of snapshots to retain.
	SnapshotRetention uint64 `json:"snapshot-retention,omitempty" yaml:"snapshot-retention,omitempty" toml:"snapshot-retention,omitempty"`
	// ObserverChanBuffer is the buffer size for the observer channel.
	ObserverChanBuffer int `json:"observer-chan-buffer,omitempty" yaml:"observer-chan-buffer,omitempty" toml:"observer-chan-buffer,omitempty"`
	// LogLevel is the log level for the raft backend.
	LogLevel string `json:"log-level,omitempty" yaml:"log-level,omitempty" toml:"log-level,omitempty"`
	// PreferIPv6 is the prefer IPv6 flag.
	PreferIPv6 bool `json:"prefer-ipv6,omitempty" yaml:"prefer-ipv6,omitempty" toml:"prefer-ipv6,omitempty"`
	// LeaveOnShutdown is the leave on shutdown flag.
	LeaveOnShutdown bool `json:"leave-on-shutdown,omitempty" yaml:"leave-on-shutdown,omitempty" toml:"leave-on-shutdown,omitempty"`

	// Below are callbacks used internally or by external packages.
	OnApplyLog        func(ctx context.Context, term, index uint64, log *v1.RaftLogEntry) `json:"-" yaml:"-" toml:"-"`
	OnSnapshotRestore func(ctx context.Context, meta *SnapshotMeta, data io.ReadCloser)   `json:"-" yaml:"-" toml:"-"`
	OnObservation     func(ev Observation)                                                `json:"-" yaml:"-" toml:"-"`
}

Options are the raft options.

func NewOptions

func NewOptions() *Options

NewOptions returns new raft options with the default values.

func (*Options) BindFlags

func (o *Options) BindFlags(fl *flag.FlagSet)

BindFlags binds the flags to the options.

func (*Options) DataStoragePath

func (o *Options) DataStoragePath() string

DataStoragePath returns the data directory.

func (*Options) RaftConfig

func (o *Options) RaftConfig(nodeID string) *raft.Config

RaftConfig builds a raft config.

func (*Options) StorePath

func (o *Options) StorePath() string

StorePath returns the stable store path.

func (*Options) Validate

func (o *Options) Validate() error

Validate validates the raft options.

type PeerObservation

type PeerObservation = raft.PeerObservation

PeerObservation is an alias for raft.PeerObservation.

type Raft

type Raft interface {
	// Start starts the Raft node.
	Start(ctx context.Context, opts *StartOptions) error
	// Bootstrap attempts to bootstrap the Raft cluster. If the cluster is already
	// bootstrapped, ErrAlreadyBootstrapped is returned. If the cluster is not
	// bootstrapped and bootstrapping succeeds, the optional callback is called
	// with isLeader flag set to true if the node is the leader, and false otherwise.
	// Any error returned by the callback is returned by Bootstrap.
	Bootstrap(ctx context.Context, opts *BootstrapOptions) error
	// Storage returns the storage. This is only valid after Start is called.
	Storage() storage.Storage
	// Raft returns the Raft instance. This is only valid after Start is called.
	Raft() *raft.Raft
	// Configuration returns the current raft configuration.
	Configuration() raft.Configuration
	// LastAppliedIndex returns the last applied index.
	LastAppliedIndex() uint64
	// ListenPort returns the listen port.
	ListenPort() int
	// IsLeader returns true if the Raft node is the leader.
	IsLeader() bool
	// AddNonVoter adds a non-voting node to the cluster with timeout enforced by the context.
	AddNonVoter(ctx context.Context, id string, addr string) error
	// AddVoter adds a voting node to the cluster with timeout enforced by the context.
	AddVoter(ctx context.Context, id string, addr string) error
	// DemoteVoter demotes a voting node to a non-voting node with timeout enforced by the context.
	DemoteVoter(ctx context.Context, id string) error
	// RemoveServer removes a peer from the cluster with timeout enforced by the context.
	RemoveServer(ctx context.Context, id string, wait bool) error
	// Restore restores the Raft node from a snapshot.
	Restore(rdr io.ReadCloser) error
	// Stop stops the Raft node.
	Stop(ctx context.Context) error
}

Raft is the interface for Raft consensus and storage.

func New

func New(opts *Options) Raft

New returns a new Raft node.

type SnapshotMeta

type SnapshotMeta = raft.SnapshotMeta

SnapshotMeta is an alias for raft.SnapshotMeta.

type StableStoreCloser

type StableStoreCloser interface {
	io.Closer
	raft.StableStore
}

StableStoreCloser is a StableStore that can be closed.

type StartOptions

type StartOptions struct {
	// NodeID is the node ID.
	NodeID string
}

StartOptons are options for starting a Raft node.

type StreamLayer

type StreamLayer interface {
	raft.StreamLayer
	// ListenPort returns the port the transport is listening on.
	ListenPort() int
}

StreamLayer is the StreamLayer interface.

func NewStreamLayer

func NewStreamLayer(addr string) (StreamLayer, error)

NewStreamLayer creates a new stream layer listening on the given address.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL