Documentation ¶
Index ¶
- Variables
- type AuthConfig
- type BasicAuth
- type Client
- type CommitFn
- type Config
- type ConsensusFn
- type DiskUsage
- type HostnameSource
- type IdealClusterState
- type MemberLister
- type NodeInfo
- type NodeIterationStrategy
- type NodeIterator
- type NodeSelector
- type Persistence
- type Remote
- type ResponseFn
- type State
- func (s *State) AllHostnames() []string
- func (s *State) AllNames() []string
- func (s *State) ClusterHealthScore() int
- func (s *State) Hostnames() []string
- func (s *State) LocalName() string
- func (s *State) MaintenanceModeEnabledForLocalhost() bool
- func (s *State) NodeAddress(id string) string
- func (s *State) NodeCount() int
- func (s *State) NodeHostname(nodeName string) (string, bool)
- func (s *State) NodeInfo(node string) (NodeInfo, bool)
- func (s *State) NonStorageNodes() []string
- func (s *State) SchemaSyncIgnored() bool
- func (s *State) SetMaintenanceModeForLocalhost(enabled bool)
- func (s *State) SkipSchemaRepair() bool
- func (s *State) SortCandidates(nodes []string) []string
- func (s *State) StorageCandidates() []string
- type Transaction
- type TransactionType
- type TxBroadcaster
- func (t *TxBroadcaster) BroadcastAbortTransaction(rootCtx context.Context, tx *Transaction) error
- func (t *TxBroadcaster) BroadcastCommitTransaction(rootCtx context.Context, tx *Transaction) error
- func (t *TxBroadcaster) BroadcastTransaction(rootCtx context.Context, tx *Transaction) error
- func (t *TxBroadcaster) SetConsensusFunction(fn ConsensusFn)
- type TxManager
- func (c *TxManager) BeginTransaction(ctx context.Context, trType TransactionType, payload interface{}, ...) (*Transaction, error)
- func (c *TxManager) BeginTransactionTolerateNodeFailures(ctx context.Context, trType TransactionType, payload interface{}, ...) (*Transaction, error)
- func (c *TxManager) CloseReadTransaction(ctx context.Context, tx *Transaction) error
- func (c *TxManager) CommitWriteTransaction(ctx context.Context, tx *Transaction) error
- func (c *TxManager) HaveDanglingTxs(ctx context.Context, allowedTypes []TransactionType) (found bool)
- func (c *TxManager) IncomingAbortTransaction(ctx context.Context, tx *Transaction)
- func (c *TxManager) IncomingBeginTransaction(ctx context.Context, tx *Transaction) ([]byte, error)
- func (c *TxManager) IncomingCommitTransaction(ctx context.Context, tx *Transaction) error
- func (c *TxManager) SetAllowUnready(types []TransactionType)
- func (c *TxManager) SetCommitFn(fn CommitFn)
- func (c *TxManager) SetResponseFn(fn ResponseFn)
- func (c *TxManager) Shutdown()
- func (c *TxManager) StartAcceptIncoming()
- func (c *TxManager) TryResumeDanglingTxs(ctx context.Context, allowedTypes []TransactionType) (applied bool, err error)
Constants ¶
This section is empty.
Variables ¶
Functions ¶
This section is empty.
Types ¶
type AuthConfig ¶
type AuthConfig struct {
BasicAuth BasicAuth `json:"basic" yaml:"basic"`
}
type BasicAuth ¶
type Client ¶
type Client interface { OpenTransaction(ctx context.Context, host string, tx *Transaction) error AbortTransaction(ctx context.Context, host string, tx *Transaction) error CommitTransaction(ctx context.Context, host string, tx *Transaction) error }
type Config ¶
type Config struct { Hostname string `json:"hostname" yaml:"hostname"` GossipBindPort int `json:"gossipBindPort" yaml:"gossipBindPort"` DataBindPort int `json:"dataBindPort" yaml:"dataBindPort"` Join string `json:"join" yaml:"join"` IgnoreStartupSchemaSync bool `json:"ignoreStartupSchemaSync" yaml:"ignoreStartupSchemaSync"` SkipSchemaSyncRepair bool `json:"skipSchemaSyncRepair" yaml:"skipSchemaSyncRepair"` AuthConfig AuthConfig `json:"auth" yaml:"auth"` AdvertiseAddr string `json:"advertiseAddr" yaml:"advertiseAddr"` AdvertisePort int `json:"advertisePort" yaml:"advertisePort"` // FastFailureDetection mostly for testing purpose, it will make memberlist sensitive and detect // failures (down nodes) faster. FastFailureDetection bool `json:"fastFailureDetection" yaml:"fastFailureDetection"` // LocalHost flag enables running a multi-node setup with the same localhost and different ports Localhost bool `json:"localhost" yaml:"localhost"` // MaintenanceNodes is experimental. You should not use this directly, but should use the // public methods on the State struct. This is a list of nodes (by Hostname) that are in // maintenance mode (eg return a 418 for all data requests). We use a list here instead of a // bool because it allows us to set the same config/env vars on all nodes to put a subset of // them in maintenance mode. In addition, we may want to have the cluster nodes not in // maintenance mode be aware of which nodes are in maintenance mode in the future. MaintenanceNodes []string `json:"maintenanceNodes" yaml:"maintenanceNodes"` }
type ConsensusFn ¶
type ConsensusFn func(ctx context.Context, in []*Transaction) (*Transaction, error)
The Broadcaster is the link between the the current node and all other nodes during a tx operation. This makes it a natural place to inject a consensus function for read transactions. How consensus is reached is completely opaque to the broadcaster and can be controlled through custom business logic.
type DiskUsage ¶
type DiskUsage struct { // Total disk space Total uint64 // Total available space Available uint64 }
DiskUsage contains total and available space in B
type HostnameSource ¶
type HostnameSource interface {
AllNames() []string
}
type IdealClusterState ¶
type IdealClusterState struct {
// contains filtered or unexported fields
}
func NewIdealClusterState ¶
func NewIdealClusterState(s MemberLister, logger logrus.FieldLogger) *IdealClusterState
func (*IdealClusterState) Members ¶
func (ics *IdealClusterState) Members() []string
func (*IdealClusterState) Validate ¶
func (ics *IdealClusterState) Validate() error
Validate returns an error if the actual state does not match the assumed ideal state, e.g. because a node has died, or left unexpectedly.
type MemberLister ¶
type NodeIterationStrategy ¶
type NodeIterationStrategy int
const ( StartRandom NodeIterationStrategy = iota StartAfter )
type NodeIterator ¶
type NodeIterator struct {
// contains filtered or unexported fields
}
func NewNodeIterator ¶
func NewNodeIterator(nodeNames []string, strategy NodeIterationStrategy, ) (*NodeIterator, error)
func (*NodeIterator) Next ¶
func (n *NodeIterator) Next() string
func (*NodeIterator) SetStartNode ¶
func (n *NodeIterator) SetStartNode(startNode string)
type NodeSelector ¶
type NodeSelector interface { // StorageCandidates returns list of storage nodes (names) // sorted by the free amount of disk space in descending orders StorageCandidates() []string // NonStorageNodes return nodes from member list which // they are configured not to be voter only NonStorageNodes() []string // SortCandidates Sort passed nodes names by the // free amount of disk space in descending order SortCandidates(nodes []string) []string // LocalName() return local node name LocalName() string // NodeHostname return hosts address for a specific node name NodeHostname(name string) (string, bool) }
NodeSelector is an interface to select a portion of the available nodes in memberlist
type Persistence ¶
type Remote ¶
type Remote interface { BroadcastTransaction(ctx context.Context, tx *Transaction) error BroadcastAbortTransaction(ctx context.Context, tx *Transaction) error BroadcastCommitTransaction(ctx context.Context, tx *Transaction) error }
type ResponseFn ¶
type ResponseFn func(ctx context.Context, tx *Transaction) ([]byte, error)
type State ¶
type State struct {
// contains filtered or unexported fields
}
func (*State) AllHostnames ¶
AllHostnames for live members, including self.
func (*State) ClusterHealthScore ¶
func (*State) Hostnames ¶
Hostnames for all live members, except self. Use AllHostnames to include self, prefixes the data port.
func (*State) MaintenanceModeEnabledForLocalhost ¶
MaintenanceModeEnabledForLocalhost is experimental, may be removed/changed. It returns true if this node is in maintenance mode (which means it should return an error for all data requests).
func (*State) NodeAddress ¶
NodeAddress is used to resolve the node name into an ip address without the port
func (*State) NonStorageNodes ¶
NonStorageNodes return nodes from member list which they are configured not to be voter only
func (*State) SchemaSyncIgnored ¶
func (*State) SetMaintenanceModeForLocalhost ¶
SetMaintenanceModeForLocalhost is experimental, may be removed/changed. Enables/disables maintenance mode for this node.
func (*State) SkipSchemaRepair ¶
func (*State) SortCandidates ¶
SortCandidates Sort passed nodes names by the free amount of disk space in descending order
func (*State) StorageCandidates ¶
StorageCandidates returns list of storage nodes (names) sorted by the free amount of disk space in descending order
type Transaction ¶
type Transaction struct { ID string Type TransactionType Payload interface{} Deadline time.Time // If TolerateNodeFailures is false (the default) a transaction cannot be // opened or committed if a node is confirmed dead. If a node is only // suspected dead, the TxManager will try, but abort unless all nodes ACK. TolerateNodeFailures bool }
type TransactionType ¶
type TransactionType string
type TxBroadcaster ¶
type TxBroadcaster struct {
// contains filtered or unexported fields
}
func NewTxBroadcaster ¶
func NewTxBroadcaster(state MemberLister, client Client, logger logrus.FieldLogger) *TxBroadcaster
func (*TxBroadcaster) BroadcastAbortTransaction ¶
func (t *TxBroadcaster) BroadcastAbortTransaction(rootCtx context.Context, tx *Transaction) error
func (*TxBroadcaster) BroadcastCommitTransaction ¶
func (t *TxBroadcaster) BroadcastCommitTransaction(rootCtx context.Context, tx *Transaction) error
func (*TxBroadcaster) BroadcastTransaction ¶
func (t *TxBroadcaster) BroadcastTransaction(rootCtx context.Context, tx *Transaction) error
func (*TxBroadcaster) SetConsensusFunction ¶
func (t *TxBroadcaster) SetConsensusFunction(fn ConsensusFn)
type TxManager ¶
func NewTxManager ¶
func NewTxManager(remote Remote, persistence Persistence, logger logrus.FieldLogger, ) *TxManager
func (*TxManager) BeginTransaction ¶
func (c *TxManager) BeginTransaction(ctx context.Context, trType TransactionType, payload interface{}, ttl time.Duration, ) (*Transaction, error)
Begin a Transaction with the specified type and payload. Transactions expire after the specified TTL. For a transaction that does not ever expire, pass in a ttl of 0. When choosing TTLs keep in mind that clocks might be slightly skewed in the cluster, therefore set your TTL for desiredTTL + toleratedClockSkew
Regular transactions cannot be opened if the cluster is not considered healthy.
func (*TxManager) BeginTransactionTolerateNodeFailures ¶
func (c *TxManager) BeginTransactionTolerateNodeFailures(ctx context.Context, trType TransactionType, payload interface{}, ttl time.Duration, ) (*Transaction, error)
Begin a Transaction that does not require the whole cluster to be healthy. This can be used for example in bootstrapping situations when not all nodes are present yet, or in disaster recovery situations when a node needs to run a transaction in order to re-join a cluster.
func (*TxManager) CloseReadTransaction ¶
func (c *TxManager) CloseReadTransaction(ctx context.Context, tx *Transaction, ) error
func (*TxManager) CommitWriteTransaction ¶
func (c *TxManager) CommitWriteTransaction(ctx context.Context, tx *Transaction, ) error
func (*TxManager) HaveDanglingTxs ¶
func (c *TxManager) HaveDanglingTxs(ctx context.Context, allowedTypes []TransactionType, ) (found bool)
HaveDanglingTxs is a way to check if there are any uncommitted transactions in the durable storage. This can be used to make decisions about whether a failed schema check can be temporarily ignored - with the assumption that applying the dangling txs will fix the issue.
func (*TxManager) IncomingAbortTransaction ¶
func (c *TxManager) IncomingAbortTransaction(ctx context.Context, tx *Transaction, )
func (*TxManager) IncomingBeginTransaction ¶
func (*TxManager) IncomingCommitTransaction ¶
func (c *TxManager) IncomingCommitTransaction(ctx context.Context, tx *Transaction, ) error
func (*TxManager) SetAllowUnready ¶
func (c *TxManager) SetAllowUnready(types []TransactionType)
func (*TxManager) SetCommitFn ¶
SetCommitFn sets a function that is used in Write Transactions, you can read from the transaction payload and use that state to alter your local state
func (*TxManager) SetResponseFn ¶
func (c *TxManager) SetResponseFn(fn ResponseFn)
SetResponseFn sets a function that is used in Read Transactions. The function sets the local state (by writing it into the Tx Payload). It can then be sent to other nodes. Consensus is not part of the ResponseFn. The coordinator - who initiated the Tx - is responsible for coming up with consensus. Deciding on Consensus requires insights into business logic, as from the TX's perspective payloads are opaque.
func (*TxManager) StartAcceptIncoming ¶
func (c *TxManager) StartAcceptIncoming()
func (*TxManager) TryResumeDanglingTxs ¶
func (c *TxManager) TryResumeDanglingTxs(ctx context.Context, allowedTypes []TransactionType, ) (applied bool, err error)
TryResumeDanglingTxs loops over the existing transactions and applies them. It only does so if the transaction type is explicitly listed as allowed. This is because - at the time of creating this - we were not sure if all transaction commit functions are idempotent. If one would not be, then reapplying a tx or tx commit could potentially be dangerous, as we don't know if it was already applied prior to the node death.
For example, think of a "add property 'foo'" tx, that does nothing but append the property to the schema. If this ran twice, we might now end up with two duplicate properties with the name 'foo' which could in turn create other problems. To make sure all txs are resumable (which is what we want because that's the only way to avoid schema issues), we need to make sure that every single tx is idempotent, then add them to the allow list.
One other limitation is that this method currently does nothing to check if a tx was really committed or not. In an ideal world, the node would contact the other nodes and ask. However, this sipmler implementation does not do this check. Instead [HaveDanglingTxs] is used in combination with the schema check. If the schema is not out of sync in the first place, no txs will be applied. This does not cover all edge cases, but it seems to work for now. This should be improved in the future.