cluster

package
v0.0.0-...-f09cf9b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 13, 2025 License: BSD-3-Clause Imports: 23 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrConcurrentTransaction = errors.New("concurrent transaction")
	ErrInvalidTransaction    = errors.New("invalid transaction")
	ErrExpiredTransaction    = errors.New("transaction TTL expired")
	ErrNotReady              = errors.New("server is not ready: either starting up or shutting down")
)

Functions

This section is empty.

Types

type AuthConfig

type AuthConfig struct {
	BasicAuth BasicAuth `json:"basic" yaml:"basic"`
}

type BasicAuth

type BasicAuth struct {
	Username string `json:"username" yaml:"username"`
	Password string `json:"password" yaml:"password"`
}

func (BasicAuth) Enabled

func (ba BasicAuth) Enabled() bool

type Client

type Client interface {
	OpenTransaction(ctx context.Context, host string, tx *Transaction) error
	AbortTransaction(ctx context.Context, host string, tx *Transaction) error
	CommitTransaction(ctx context.Context, host string, tx *Transaction) error
}

type CommitFn

type CommitFn func(ctx context.Context, tx *Transaction) error

type Config

type Config struct {
	Hostname                string     `json:"hostname" yaml:"hostname"`
	GossipBindPort          int        `json:"gossipBindPort" yaml:"gossipBindPort"`
	DataBindPort            int        `json:"dataBindPort" yaml:"dataBindPort"`
	Join                    string     `json:"join" yaml:"join"`
	IgnoreStartupSchemaSync bool       `json:"ignoreStartupSchemaSync" yaml:"ignoreStartupSchemaSync"`
	SkipSchemaSyncRepair    bool       `json:"skipSchemaSyncRepair" yaml:"skipSchemaSyncRepair"`
	AuthConfig              AuthConfig `json:"auth" yaml:"auth"`
	AdvertiseAddr           string     `json:"advertiseAddr" yaml:"advertiseAddr"`
	AdvertisePort           int        `json:"advertisePort" yaml:"advertisePort"`
	// FastFailureDetection mostly for testing purpose, it will make memberlist sensitive and detect
	// failures (down nodes) faster.
	FastFailureDetection bool `json:"fastFailureDetection" yaml:"fastFailureDetection"`
	// LocalHost flag enables running a multi-node setup with the same localhost and different ports
	Localhost bool `json:"localhost" yaml:"localhost"`
	// MaintenanceNodes is experimental. You should not use this directly, but should use the
	// public methods on the State struct. This is a list of nodes (by Hostname) that are in
	// maintenance mode (eg return a 418 for all data requests). We use a list here instead of a
	// bool because it allows us to set the same config/env vars on all nodes to put a subset of
	// them in maintenance mode. In addition, we may want to have the cluster nodes not in
	// maintenance mode be aware of which nodes are in maintenance mode in the future.
	MaintenanceNodes []string `json:"maintenanceNodes" yaml:"maintenanceNodes"`
}

type ConsensusFn

type ConsensusFn func(ctx context.Context,
	in []*Transaction) (*Transaction, error)

The Broadcaster is the link between the the current node and all other nodes during a tx operation. This makes it a natural place to inject a consensus function for read transactions. How consensus is reached is completely opaque to the broadcaster and can be controlled through custom business logic.

type DiskUsage

type DiskUsage struct {
	// Total disk space
	Total uint64
	// Total available space
	Available uint64
}

DiskUsage contains total and available space in B

type HostnameSource

type HostnameSource interface {
	AllNames() []string
}

type IdealClusterState

type IdealClusterState struct {
	// contains filtered or unexported fields
}

func NewIdealClusterState

func NewIdealClusterState(s MemberLister, logger logrus.FieldLogger) *IdealClusterState

func (*IdealClusterState) Members

func (ics *IdealClusterState) Members() []string

func (*IdealClusterState) Validate

func (ics *IdealClusterState) Validate() error

Validate returns an error if the actual state does not match the assumed ideal state, e.g. because a node has died, or left unexpectedly.

type MemberLister

type MemberLister interface {
	AllNames() []string
	Hostnames() []string
}

type NodeInfo

type NodeInfo struct {
	DiskUsage
	LastTimeMilli int64 // last update time in milliseconds
}

NodeInfo disk space

type NodeIterationStrategy

type NodeIterationStrategy int
const (
	StartRandom NodeIterationStrategy = iota
	StartAfter
)

type NodeIterator

type NodeIterator struct {
	// contains filtered or unexported fields
}

func NewNodeIterator

func NewNodeIterator(nodeNames []string,
	strategy NodeIterationStrategy,
) (*NodeIterator, error)

func (*NodeIterator) Next

func (n *NodeIterator) Next() string

func (*NodeIterator) SetStartNode

func (n *NodeIterator) SetStartNode(startNode string)

type NodeSelector

type NodeSelector interface {
	// StorageCandidates returns list of storage nodes (names)
	// sorted by the free amount of disk space in descending orders
	StorageCandidates() []string
	// NonStorageNodes return nodes from member list which
	// they are configured not to be voter only
	NonStorageNodes() []string
	// SortCandidates Sort passed nodes names by the
	// free amount of disk space in descending order
	SortCandidates(nodes []string) []string
	// LocalName() return local node name
	LocalName() string
	// NodeHostname return hosts address for a specific node name
	NodeHostname(name string) (string, bool)
}

NodeSelector is an interface to select a portion of the available nodes in memberlist

type Persistence

type Persistence interface {
	StoreTx(ctx context.Context, tx *Transaction) error
	DeleteTx(ctx context.Context, txID string) error
	IterateAll(ctx context.Context, cb func(tx *Transaction)) error
}

type Remote

type Remote interface {
	BroadcastTransaction(ctx context.Context, tx *Transaction) error
	BroadcastAbortTransaction(ctx context.Context, tx *Transaction) error
	BroadcastCommitTransaction(ctx context.Context, tx *Transaction) error
}

type ResponseFn

type ResponseFn func(ctx context.Context, tx *Transaction) ([]byte, error)

type State

type State struct {
	// contains filtered or unexported fields
}

func Init

func Init(userConfig Config, dataPath string, nonStorageNodes map[string]struct{}, logger logrus.FieldLogger) (_ *State, err error)

func (*State) AllHostnames

func (s *State) AllHostnames() []string

AllHostnames for live members, including self.

func (*State) AllNames

func (s *State) AllNames() []string

All node names (not their hostnames!) for live members, including self.

func (*State) ClusterHealthScore

func (s *State) ClusterHealthScore() int

func (*State) Hostnames

func (s *State) Hostnames() []string

Hostnames for all live members, except self. Use AllHostnames to include self, prefixes the data port.

func (*State) LocalName

func (s *State) LocalName() string

LocalName() return local node name

func (*State) MaintenanceModeEnabledForLocalhost

func (s *State) MaintenanceModeEnabledForLocalhost() bool

MaintenanceModeEnabledForLocalhost is experimental, may be removed/changed. It returns true if this node is in maintenance mode (which means it should return an error for all data requests).

func (*State) NodeAddress

func (s *State) NodeAddress(id string) string

NodeAddress is used to resolve the node name into an ip address without the port

func (*State) NodeCount

func (s *State) NodeCount() int

All node names (not their hostnames!) for live members, including self.

func (*State) NodeHostname

func (s *State) NodeHostname(nodeName string) (string, bool)

func (*State) NodeInfo

func (s *State) NodeInfo(node string) (NodeInfo, bool)

func (*State) NonStorageNodes

func (s *State) NonStorageNodes() []string

NonStorageNodes return nodes from member list which they are configured not to be voter only

func (*State) SchemaSyncIgnored

func (s *State) SchemaSyncIgnored() bool

func (*State) SetMaintenanceModeForLocalhost

func (s *State) SetMaintenanceModeForLocalhost(enabled bool)

SetMaintenanceModeForLocalhost is experimental, may be removed/changed. Enables/disables maintenance mode for this node.

func (*State) SkipSchemaRepair

func (s *State) SkipSchemaRepair() bool

func (*State) SortCandidates

func (s *State) SortCandidates(nodes []string) []string

SortCandidates Sort passed nodes names by the free amount of disk space in descending order

func (*State) StorageCandidates

func (s *State) StorageCandidates() []string

StorageCandidates returns list of storage nodes (names) sorted by the free amount of disk space in descending order

type Transaction

type Transaction struct {
	ID       string
	Type     TransactionType
	Payload  interface{}
	Deadline time.Time

	// If TolerateNodeFailures is false (the default) a transaction cannot be
	// opened or committed if a node is confirmed dead. If a node is only
	// suspected dead, the TxManager will try, but abort unless all nodes ACK.
	TolerateNodeFailures bool
}

type TransactionType

type TransactionType string

type TxBroadcaster

type TxBroadcaster struct {
	// contains filtered or unexported fields
}

func NewTxBroadcaster

func NewTxBroadcaster(state MemberLister, client Client, logger logrus.FieldLogger) *TxBroadcaster

func (*TxBroadcaster) BroadcastAbortTransaction

func (t *TxBroadcaster) BroadcastAbortTransaction(rootCtx context.Context, tx *Transaction) error

func (*TxBroadcaster) BroadcastCommitTransaction

func (t *TxBroadcaster) BroadcastCommitTransaction(rootCtx context.Context, tx *Transaction) error

func (*TxBroadcaster) BroadcastTransaction

func (t *TxBroadcaster) BroadcastTransaction(rootCtx context.Context, tx *Transaction) error

func (*TxBroadcaster) SetConsensusFunction

func (t *TxBroadcaster) SetConsensusFunction(fn ConsensusFn)

type TxManager

type TxManager struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewTxManager

func NewTxManager(remote Remote, persistence Persistence,
	logger logrus.FieldLogger,
) *TxManager

func (*TxManager) BeginTransaction

func (c *TxManager) BeginTransaction(ctx context.Context, trType TransactionType,
	payload interface{}, ttl time.Duration,
) (*Transaction, error)

Begin a Transaction with the specified type and payload. Transactions expire after the specified TTL. For a transaction that does not ever expire, pass in a ttl of 0. When choosing TTLs keep in mind that clocks might be slightly skewed in the cluster, therefore set your TTL for desiredTTL + toleratedClockSkew

Regular transactions cannot be opened if the cluster is not considered healthy.

func (*TxManager) BeginTransactionTolerateNodeFailures

func (c *TxManager) BeginTransactionTolerateNodeFailures(ctx context.Context, trType TransactionType,
	payload interface{}, ttl time.Duration,
) (*Transaction, error)

Begin a Transaction that does not require the whole cluster to be healthy. This can be used for example in bootstrapping situations when not all nodes are present yet, or in disaster recovery situations when a node needs to run a transaction in order to re-join a cluster.

func (*TxManager) CloseReadTransaction

func (c *TxManager) CloseReadTransaction(ctx context.Context,
	tx *Transaction,
) error

func (*TxManager) CommitWriteTransaction

func (c *TxManager) CommitWriteTransaction(ctx context.Context,
	tx *Transaction,
) error

func (*TxManager) HaveDanglingTxs

func (c *TxManager) HaveDanglingTxs(ctx context.Context,
	allowedTypes []TransactionType,
) (found bool)

HaveDanglingTxs is a way to check if there are any uncommitted transactions in the durable storage. This can be used to make decisions about whether a failed schema check can be temporarily ignored - with the assumption that applying the dangling txs will fix the issue.

func (*TxManager) IncomingAbortTransaction

func (c *TxManager) IncomingAbortTransaction(ctx context.Context,
	tx *Transaction,
)

func (*TxManager) IncomingBeginTransaction

func (c *TxManager) IncomingBeginTransaction(ctx context.Context,
	tx *Transaction,
) ([]byte, error)

func (*TxManager) IncomingCommitTransaction

func (c *TxManager) IncomingCommitTransaction(ctx context.Context,
	tx *Transaction,
) error

func (*TxManager) SetAllowUnready

func (c *TxManager) SetAllowUnready(types []TransactionType)

func (*TxManager) SetCommitFn

func (c *TxManager) SetCommitFn(fn CommitFn)

SetCommitFn sets a function that is used in Write Transactions, you can read from the transaction payload and use that state to alter your local state

func (*TxManager) SetResponseFn

func (c *TxManager) SetResponseFn(fn ResponseFn)

SetResponseFn sets a function that is used in Read Transactions. The function sets the local state (by writing it into the Tx Payload). It can then be sent to other nodes. Consensus is not part of the ResponseFn. The coordinator - who initiated the Tx - is responsible for coming up with consensus. Deciding on Consensus requires insights into business logic, as from the TX's perspective payloads are opaque.

func (*TxManager) Shutdown

func (c *TxManager) Shutdown()

func (*TxManager) StartAcceptIncoming

func (c *TxManager) StartAcceptIncoming()

func (*TxManager) TryResumeDanglingTxs

func (c *TxManager) TryResumeDanglingTxs(ctx context.Context,
	allowedTypes []TransactionType,
) (applied bool, err error)

TryResumeDanglingTxs loops over the existing transactions and applies them. It only does so if the transaction type is explicitly listed as allowed. This is because - at the time of creating this - we were not sure if all transaction commit functions are idempotent. If one would not be, then reapplying a tx or tx commit could potentially be dangerous, as we don't know if it was already applied prior to the node death.

For example, think of a "add property 'foo'" tx, that does nothing but append the property to the schema. If this ran twice, we might now end up with two duplicate properties with the name 'foo' which could in turn create other problems. To make sure all txs are resumable (which is what we want because that's the only way to avoid schema issues), we need to make sure that every single tx is idempotent, then add them to the allow list.

One other limitation is that this method currently does nothing to check if a tx was really committed or not. In an ideal world, the node would contact the other nodes and ask. However, this sipmler implementation does not do this check. Instead [HaveDanglingTxs] is used in combination with the schema check. If the schema is not out of sync in the first place, no txs will be applied. This does not cover all edge cases, but it seems to work for now. This should be improved in the future.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL