cluster

package
v1.24.24 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 6, 2024 License: BSD-3-Clause Imports: 22 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrConcurrentTransaction = errors.New("concurrent transaction")
	ErrInvalidTransaction    = errors.New("invalid transaction")
	ErrExpiredTransaction    = errors.New("transaction TTL expired")
	ErrNotReady              = errors.New("server is not ready: either starting up or shutting down")
)

Functions

This section is empty.

Types

type AuthConfig added in v1.21.2

type AuthConfig struct {
	BasicAuth BasicAuth `json:"basic" yaml:"basic"`
}

type BasicAuth added in v1.21.2

type BasicAuth struct {
	Username string `json:"username" yaml:"username"`
	Password string `json:"password" yaml:"password"`
}

func (BasicAuth) Enabled added in v1.21.2

func (ba BasicAuth) Enabled() bool

type Client

type Client interface {
	OpenTransaction(ctx context.Context, host string, tx *Transaction) error
	AbortTransaction(ctx context.Context, host string, tx *Transaction) error
	CommitTransaction(ctx context.Context, host string, tx *Transaction) error
}

type CommitFn

type CommitFn func(ctx context.Context, tx *Transaction) error

type Config

type Config struct {
	Hostname                string     `json:"hostname" yaml:"hostname"`
	GossipBindPort          int        `json:"gossipBindPort" yaml:"gossipBindPort"`
	DataBindPort            int        `json:"dataBindPort" yaml:"dataBindPort"`
	Join                    string     `json:"join" yaml:"join"`
	IgnoreStartupSchemaSync bool       `json:"ignoreStartupSchemaSync" yaml:"ignoreStartupSchemaSync"`
	SkipSchemaSyncRepair    bool       `json:"skipSchemaSyncRepair" yaml:"skipSchemaSyncRepair"`
	AuthConfig              AuthConfig `json:"auth" yaml:"auth"`
	AdvertiseAddr           string     `json:"advertiseAddr" yaml:"advertiseAddr"`
	AdvertisePort           int        `json:"advertisePort" yaml:"advertisePort"`
}

type ConsensusFn

type ConsensusFn func(ctx context.Context,
	in []*Transaction) (*Transaction, error)

The Broadcaster is the link between the the current node and all other nodes during a tx operation. This makes it a natural place to inject a consensus function for read transactions. How consensus is reached is completely opaque to the broadcaster and can be controlled through custom business logic.

type DiskUsage added in v1.18.3

type DiskUsage struct {
	// Total disk space
	Total uint64
	// Total available space
	Available uint64
}

DiskUsage contains total and available space in B

type HostnameSource

type HostnameSource interface {
	AllNames() []string
}

type IdealClusterState added in v1.17.4

type IdealClusterState struct {
	// contains filtered or unexported fields
}

func NewIdealClusterState added in v1.17.4

func NewIdealClusterState(s MemberLister, logger logrus.FieldLogger) *IdealClusterState

func (*IdealClusterState) Members added in v1.17.4

func (ics *IdealClusterState) Members() []string

func (*IdealClusterState) Validate added in v1.17.4

func (ics *IdealClusterState) Validate() error

Validate returns an error if the actual state does not match the assumed ideal state, e.g. because a node has died, or left unexpectedly.

type MemberLister

type MemberLister interface {
	AllNames() []string
	Hostnames() []string
}

type NodeInfo added in v1.18.3

type NodeInfo struct {
	DiskUsage
	LastTimeMilli int64 // last update time in milliseconds
}

NodeInfo disk space

type NodeIterationStrategy

type NodeIterationStrategy int
const (
	StartRandom NodeIterationStrategy = iota
	StartAfter
)

type NodeIterator

type NodeIterator struct {
	// contains filtered or unexported fields
}

func NewNodeIterator

func NewNodeIterator(nodeNames []string,
	strategy NodeIterationStrategy,
) (*NodeIterator, error)

func (*NodeIterator) Next

func (n *NodeIterator) Next() string

func (*NodeIterator) SetStartNode

func (n *NodeIterator) SetStartNode(startNode string)

type Persistence added in v1.21.3

type Persistence interface {
	StoreTx(ctx context.Context, tx *Transaction) error
	DeleteTx(ctx context.Context, txID string) error
	IterateAll(ctx context.Context, cb func(tx *Transaction)) error
}

type Remote

type Remote interface {
	BroadcastTransaction(ctx context.Context, tx *Transaction) error
	BroadcastAbortTransaction(ctx context.Context, tx *Transaction) error
	BroadcastCommitTransaction(ctx context.Context, tx *Transaction) error
}

type ResponseFn

type ResponseFn func(ctx context.Context, tx *Transaction) ([]byte, error)

type State

type State struct {
	// contains filtered or unexported fields
}

func Init

func Init(userConfig Config, dataPath string, logger logrus.FieldLogger) (_ *State, err error)

func (*State) AllHostnames

func (s *State) AllHostnames() []string

AllHostnames for live members, including self.

func (*State) AllNames

func (s *State) AllNames() []string

All node names (not their hostnames!) for live members, including self.

func (*State) Candidates added in v1.18.3

func (s *State) Candidates() []string

Candidates returns list of nodes (names) sorted by the free amount of disk space in descending order

func (*State) ClusterHealthScore

func (s *State) ClusterHealthScore() int

func (*State) Hostnames

func (s *State) Hostnames() []string

Hostnames for all live members, except self. Use AllHostnames to include self, prefixes the data port.

func (*State) LocalName

func (s *State) LocalName() string

func (*State) NodeCount

func (s *State) NodeCount() int

All node names (not their hostnames!) for live members, including self.

func (*State) NodeHostname

func (s *State) NodeHostname(nodeName string) (string, bool)

func (*State) NodeInfo added in v1.18.3

func (s *State) NodeInfo(node string) (NodeInfo, bool)

func (*State) SchemaSyncIgnored added in v1.17.4

func (s *State) SchemaSyncIgnored() bool

func (*State) SkipSchemaRepair added in v1.22.0

func (s *State) SkipSchemaRepair() bool

type Transaction

type Transaction struct {
	ID       string
	Type     TransactionType
	Payload  interface{}
	Deadline time.Time

	// If TolerateNodeFailures is false (the default) a transaction cannot be
	// opened or committed if a node is confirmed dead. If a node is only
	// suspected dead, the TxManager will try, but abort unless all nodes ACK.
	TolerateNodeFailures bool
}

type TransactionType

type TransactionType string

type TxBroadcaster

type TxBroadcaster struct {
	// contains filtered or unexported fields
}

func NewTxBroadcaster

func NewTxBroadcaster(state MemberLister, client Client, logger logrus.FieldLogger) *TxBroadcaster

func (*TxBroadcaster) BroadcastAbortTransaction

func (t *TxBroadcaster) BroadcastAbortTransaction(rootCtx context.Context, tx *Transaction) error

func (*TxBroadcaster) BroadcastCommitTransaction

func (t *TxBroadcaster) BroadcastCommitTransaction(rootCtx context.Context, tx *Transaction) error

func (*TxBroadcaster) BroadcastTransaction

func (t *TxBroadcaster) BroadcastTransaction(rootCtx context.Context, tx *Transaction) error

func (*TxBroadcaster) SetConsensusFunction

func (t *TxBroadcaster) SetConsensusFunction(fn ConsensusFn)

type TxManager

type TxManager struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewTxManager

func NewTxManager(remote Remote, persistence Persistence,
	logger logrus.FieldLogger,
) *TxManager

func (*TxManager) BeginTransaction

func (c *TxManager) BeginTransaction(ctx context.Context, trType TransactionType,
	payload interface{}, ttl time.Duration,
) (*Transaction, error)

Begin a Transaction with the specified type and payload. Transactions expire after the specified TTL. For a transaction that does not ever expire, pass in a ttl of 0. When choosing TTLs keep in mind that clocks might be slightly skewed in the cluster, therefore set your TTL for desiredTTL + toleratedClockSkew

Regular transactions cannot be opened if the cluster is not considered healthy.

func (*TxManager) BeginTransactionTolerateNodeFailures added in v1.17.4

func (c *TxManager) BeginTransactionTolerateNodeFailures(ctx context.Context, trType TransactionType,
	payload interface{}, ttl time.Duration,
) (*Transaction, error)

Begin a Transaction that does not require the whole cluster to be healthy. This can be used for example in bootstrapping situations when not all nodes are present yet, or in disaster recovery situations when a node needs to run a transaction in order to re-join a cluster.

func (*TxManager) CloseReadTransaction

func (c *TxManager) CloseReadTransaction(ctx context.Context,
	tx *Transaction,
) error

func (*TxManager) CommitWriteTransaction

func (c *TxManager) CommitWriteTransaction(ctx context.Context,
	tx *Transaction,
) error

func (*TxManager) HaveDanglingTxs added in v1.21.3

func (c *TxManager) HaveDanglingTxs(ctx context.Context,
	allowedTypes []TransactionType,
) (found bool)

HaveDanglingTxs is a way to check if there are any uncommitted transactions in the durable storage. This can be used to make decisions about whether a failed schema check can be temporarily ignored - with the assumption that applying the dangling txs will fix the issue.

func (*TxManager) IncomingAbortTransaction

func (c *TxManager) IncomingAbortTransaction(ctx context.Context,
	tx *Transaction,
)

func (*TxManager) IncomingBeginTransaction

func (c *TxManager) IncomingBeginTransaction(ctx context.Context,
	tx *Transaction,
) ([]byte, error)

func (*TxManager) IncomingCommitTransaction

func (c *TxManager) IncomingCommitTransaction(ctx context.Context,
	tx *Transaction,
) error

func (*TxManager) SetAllowUnready added in v1.21.3

func (c *TxManager) SetAllowUnready(types []TransactionType)

func (*TxManager) SetCommitFn

func (c *TxManager) SetCommitFn(fn CommitFn)

SetCommitFn sets a function that is used in Write Transactions, you can read from the transaction payload and use that state to alter your local state

func (*TxManager) SetResponseFn

func (c *TxManager) SetResponseFn(fn ResponseFn)

SetResponseFn sets a function that is used in Read Transactions. The function sets the local state (by writing it into the Tx Payload). It can then be sent to other nodes. Consensus is not part of the ResponseFn. The coordinator - who initiated the Tx - is responsible for coming up with consensus. Deciding on Consensus requires insights into business logic, as from the TX's perspective payloads are opaque.

func (*TxManager) Shutdown added in v1.21.3

func (c *TxManager) Shutdown()

func (*TxManager) StartAcceptIncoming added in v1.21.3

func (c *TxManager) StartAcceptIncoming()

func (*TxManager) TryResumeDanglingTxs added in v1.21.3

func (c *TxManager) TryResumeDanglingTxs(ctx context.Context,
	allowedTypes []TransactionType,
) (applied bool, err error)

TryResumeDanglingTxs loops over the existing transactions and applies them. It only does so if the transaction type is explicitly listed as allowed. This is because - at the time of creating this - we were not sure if all transaction commit functions are idempotent. If one would not be, then reapplying a tx or tx commit could potentially be dangerous, as we don't know if it was already applied prior to the node death.

For example, think of a "add property 'foo'" tx, that does nothing but append the property to the schema. If this ran twice, we might now end up with two duplicate properties with the name 'foo' which could in turn create other problems. To make sure all txs are resumable (which is what we want because that's the only way to avoid schema issues), we need to make sure that every single tx is idempotent, then add them to the allow list.

One other limitation is that this method currently does nothing to check if a tx was really committed or not. In an ideal world, the node would contact the other nodes and ask. However, this sipmler implementation does not do this check. Instead [HaveDanglingTxs] is used in combination with the schema check. If the schema is not out of sync in the first place, no txs will be applied. This does not cover all edge cases, but it seems to work for now. This should be improved in the future.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL