Documentation ¶
Index ¶
- Variables
- type AuthConfig
- type BasicAuth
- type Client
- type CommitFn
- type Config
- type ConsensusFn
- type DiskUsage
- type HostnameSource
- type IdealClusterState
- type MemberLister
- type NodeInfo
- type NodeIterationStrategy
- type NodeIterator
- type Persistence
- type Remote
- type ResponseFn
- type State
- func (s *State) AllHostnames() []string
- func (s *State) AllNames() []string
- func (s *State) Candidates() []string
- func (s *State) ClusterHealthScore() int
- func (s *State) Hostnames() []string
- func (s *State) LocalName() string
- func (s *State) NodeCount() int
- func (s *State) NodeHostname(nodeName string) (string, bool)
- func (s *State) NodeInfo(node string) (NodeInfo, bool)
- func (s *State) SchemaSyncIgnored() bool
- func (s *State) SkipSchemaRepair() bool
- type Transaction
- type TransactionType
- type TxBroadcaster
- func (t *TxBroadcaster) BroadcastAbortTransaction(rootCtx context.Context, tx *Transaction) error
- func (t *TxBroadcaster) BroadcastCommitTransaction(rootCtx context.Context, tx *Transaction) error
- func (t *TxBroadcaster) BroadcastTransaction(rootCtx context.Context, tx *Transaction) error
- func (t *TxBroadcaster) SetConsensusFunction(fn ConsensusFn)
- type TxManager
- func (c *TxManager) BeginTransaction(ctx context.Context, trType TransactionType, payload interface{}, ...) (*Transaction, error)
- func (c *TxManager) BeginTransactionTolerateNodeFailures(ctx context.Context, trType TransactionType, payload interface{}, ...) (*Transaction, error)
- func (c *TxManager) CloseReadTransaction(ctx context.Context, tx *Transaction) error
- func (c *TxManager) CommitWriteTransaction(ctx context.Context, tx *Transaction) error
- func (c *TxManager) HaveDanglingTxs(ctx context.Context, allowedTypes []TransactionType) (found bool)
- func (c *TxManager) IncomingAbortTransaction(ctx context.Context, tx *Transaction)
- func (c *TxManager) IncomingBeginTransaction(ctx context.Context, tx *Transaction) ([]byte, error)
- func (c *TxManager) IncomingCommitTransaction(ctx context.Context, tx *Transaction) error
- func (c *TxManager) SetAllowUnready(types []TransactionType)
- func (c *TxManager) SetCommitFn(fn CommitFn)
- func (c *TxManager) SetResponseFn(fn ResponseFn)
- func (c *TxManager) Shutdown()
- func (c *TxManager) StartAcceptIncoming()
- func (c *TxManager) TryResumeDanglingTxs(ctx context.Context, allowedTypes []TransactionType) (applied bool, err error)
Constants ¶
This section is empty.
Variables ¶
Functions ¶
This section is empty.
Types ¶
type AuthConfig ¶ added in v1.21.2
type AuthConfig struct {
BasicAuth BasicAuth `json:"basic" yaml:"basic"`
}
type BasicAuth ¶ added in v1.21.2
type Client ¶
type Client interface { OpenTransaction(ctx context.Context, host string, tx *Transaction) error AbortTransaction(ctx context.Context, host string, tx *Transaction) error CommitTransaction(ctx context.Context, host string, tx *Transaction) error }
type Config ¶
type Config struct { Hostname string `json:"hostname" yaml:"hostname"` GossipBindPort int `json:"gossipBindPort" yaml:"gossipBindPort"` DataBindPort int `json:"dataBindPort" yaml:"dataBindPort"` Join string `json:"join" yaml:"join"` IgnoreStartupSchemaSync bool `json:"ignoreStartupSchemaSync" yaml:"ignoreStartupSchemaSync"` SkipSchemaSyncRepair bool `json:"skipSchemaSyncRepair" yaml:"skipSchemaSyncRepair"` AuthConfig AuthConfig `json:"auth" yaml:"auth"` AdvertiseAddr string `json:"advertiseAddr" yaml:"advertiseAddr"` AdvertisePort int `json:"advertisePort" yaml:"advertisePort"` }
type ConsensusFn ¶
type ConsensusFn func(ctx context.Context, in []*Transaction) (*Transaction, error)
The Broadcaster is the link between the the current node and all other nodes during a tx operation. This makes it a natural place to inject a consensus function for read transactions. How consensus is reached is completely opaque to the broadcaster and can be controlled through custom business logic.
type DiskUsage ¶ added in v1.18.3
type DiskUsage struct { // Total disk space Total uint64 // Total available space Available uint64 }
DiskUsage contains total and available space in B
type HostnameSource ¶
type HostnameSource interface {
AllNames() []string
}
type IdealClusterState ¶ added in v1.17.4
type IdealClusterState struct {
// contains filtered or unexported fields
}
func NewIdealClusterState ¶ added in v1.17.4
func NewIdealClusterState(s MemberLister, logger logrus.FieldLogger) *IdealClusterState
func (*IdealClusterState) Members ¶ added in v1.17.4
func (ics *IdealClusterState) Members() []string
func (*IdealClusterState) Validate ¶ added in v1.17.4
func (ics *IdealClusterState) Validate() error
Validate returns an error if the actual state does not match the assumed ideal state, e.g. because a node has died, or left unexpectedly.
type MemberLister ¶
type NodeIterationStrategy ¶
type NodeIterationStrategy int
const ( StartRandom NodeIterationStrategy = iota StartAfter )
type NodeIterator ¶
type NodeIterator struct {
// contains filtered or unexported fields
}
func NewNodeIterator ¶
func NewNodeIterator(nodeNames []string, strategy NodeIterationStrategy, ) (*NodeIterator, error)
func (*NodeIterator) Next ¶
func (n *NodeIterator) Next() string
func (*NodeIterator) SetStartNode ¶
func (n *NodeIterator) SetStartNode(startNode string)
type Persistence ¶ added in v1.21.3
type Remote ¶
type Remote interface { BroadcastTransaction(ctx context.Context, tx *Transaction) error BroadcastAbortTransaction(ctx context.Context, tx *Transaction) error BroadcastCommitTransaction(ctx context.Context, tx *Transaction) error }
type ResponseFn ¶
type ResponseFn func(ctx context.Context, tx *Transaction) ([]byte, error)
type State ¶
type State struct {
// contains filtered or unexported fields
}
func (*State) AllHostnames ¶
AllHostnames for live members, including self.
func (*State) Candidates ¶ added in v1.18.3
Candidates returns list of nodes (names) sorted by the free amount of disk space in descending order
func (*State) ClusterHealthScore ¶
func (*State) Hostnames ¶
Hostnames for all live members, except self. Use AllHostnames to include self, prefixes the data port.
func (*State) SchemaSyncIgnored ¶ added in v1.17.4
func (*State) SkipSchemaRepair ¶ added in v1.22.0
type Transaction ¶
type Transaction struct { ID string Type TransactionType Payload interface{} Deadline time.Time // If TolerateNodeFailures is false (the default) a transaction cannot be // opened or committed if a node is confirmed dead. If a node is only // suspected dead, the TxManager will try, but abort unless all nodes ACK. TolerateNodeFailures bool }
type TransactionType ¶
type TransactionType string
type TxBroadcaster ¶
type TxBroadcaster struct {
// contains filtered or unexported fields
}
func NewTxBroadcaster ¶
func NewTxBroadcaster(state MemberLister, client Client, logger logrus.FieldLogger) *TxBroadcaster
func (*TxBroadcaster) BroadcastAbortTransaction ¶
func (t *TxBroadcaster) BroadcastAbortTransaction(rootCtx context.Context, tx *Transaction) error
func (*TxBroadcaster) BroadcastCommitTransaction ¶
func (t *TxBroadcaster) BroadcastCommitTransaction(rootCtx context.Context, tx *Transaction) error
func (*TxBroadcaster) BroadcastTransaction ¶
func (t *TxBroadcaster) BroadcastTransaction(rootCtx context.Context, tx *Transaction) error
func (*TxBroadcaster) SetConsensusFunction ¶
func (t *TxBroadcaster) SetConsensusFunction(fn ConsensusFn)
type TxManager ¶
func NewTxManager ¶
func NewTxManager(remote Remote, persistence Persistence, logger logrus.FieldLogger, ) *TxManager
func (*TxManager) BeginTransaction ¶
func (c *TxManager) BeginTransaction(ctx context.Context, trType TransactionType, payload interface{}, ttl time.Duration, ) (*Transaction, error)
Begin a Transaction with the specified type and payload. Transactions expire after the specified TTL. For a transaction that does not ever expire, pass in a ttl of 0. When choosing TTLs keep in mind that clocks might be slightly skewed in the cluster, therefore set your TTL for desiredTTL + toleratedClockSkew
Regular transactions cannot be opened if the cluster is not considered healthy.
func (*TxManager) BeginTransactionTolerateNodeFailures ¶ added in v1.17.4
func (c *TxManager) BeginTransactionTolerateNodeFailures(ctx context.Context, trType TransactionType, payload interface{}, ttl time.Duration, ) (*Transaction, error)
Begin a Transaction that does not require the whole cluster to be healthy. This can be used for example in bootstrapping situations when not all nodes are present yet, or in disaster recovery situations when a node needs to run a transaction in order to re-join a cluster.
func (*TxManager) CloseReadTransaction ¶
func (c *TxManager) CloseReadTransaction(ctx context.Context, tx *Transaction, ) error
func (*TxManager) CommitWriteTransaction ¶
func (c *TxManager) CommitWriteTransaction(ctx context.Context, tx *Transaction, ) error
func (*TxManager) HaveDanglingTxs ¶ added in v1.21.3
func (c *TxManager) HaveDanglingTxs(ctx context.Context, allowedTypes []TransactionType, ) (found bool)
HaveDanglingTxs is a way to check if there are any uncommitted transactions in the durable storage. This can be used to make decisions about whether a failed schema check can be temporarily ignored - with the assumption that applying the dangling txs will fix the issue.
func (*TxManager) IncomingAbortTransaction ¶
func (c *TxManager) IncomingAbortTransaction(ctx context.Context, tx *Transaction, )
func (*TxManager) IncomingBeginTransaction ¶
func (*TxManager) IncomingCommitTransaction ¶
func (c *TxManager) IncomingCommitTransaction(ctx context.Context, tx *Transaction, ) error
func (*TxManager) SetAllowUnready ¶ added in v1.21.3
func (c *TxManager) SetAllowUnready(types []TransactionType)
func (*TxManager) SetCommitFn ¶
SetCommitFn sets a function that is used in Write Transactions, you can read from the transaction payload and use that state to alter your local state
func (*TxManager) SetResponseFn ¶
func (c *TxManager) SetResponseFn(fn ResponseFn)
SetResponseFn sets a function that is used in Read Transactions. The function sets the local state (by writing it into the Tx Payload). It can then be sent to other nodes. Consensus is not part of the ResponseFn. The coordinator - who initiated the Tx - is responsible for coming up with consensus. Deciding on Consensus requires insights into business logic, as from the TX's perspective payloads are opaque.
func (*TxManager) StartAcceptIncoming ¶ added in v1.21.3
func (c *TxManager) StartAcceptIncoming()
func (*TxManager) TryResumeDanglingTxs ¶ added in v1.21.3
func (c *TxManager) TryResumeDanglingTxs(ctx context.Context, allowedTypes []TransactionType, ) (applied bool, err error)
TryResumeDanglingTxs loops over the existing transactions and applies them. It only does so if the transaction type is explicitly listed as allowed. This is because - at the time of creating this - we were not sure if all transaction commit functions are idempotent. If one would not be, then reapplying a tx or tx commit could potentially be dangerous, as we don't know if it was already applied prior to the node death.
For example, think of a "add property 'foo'" tx, that does nothing but append the property to the schema. If this ran twice, we might now end up with two duplicate properties with the name 'foo' which could in turn create other problems. To make sure all txs are resumable (which is what we want because that's the only way to avoid schema issues), we need to make sure that every single tx is idempotent, then add them to the allow list.
One other limitation is that this method currently does nothing to check if a tx was really committed or not. In an ideal world, the node would contact the other nodes and ask. However, this sipmler implementation does not do this check. Instead [HaveDanglingTxs] is used in combination with the schema check. If the schema is not out of sync in the first place, no txs will be applied. This does not cover all edge cases, but it seems to work for now. This should be improved in the future.