Documentation ¶
Index ¶
- Constants
- Variables
- func UnsafeClusterDB(c *Cluster) database.DB
- func UnsafeNodeID(c *Cluster) int64
- type Cluster
- func (c *Cluster) Close() error
- func (c *Cluster) DB() database.DB
- func (c *Cluster) EnterExclusive() error
- func (c *Cluster) ExitExclusive(f func(*ClusterTx) error) error
- func (c *Cluster) NodeID(id int64)
- func (c *Cluster) Open(store cluster.ServerStore, address, dir string, timeout time.Duration, ...) error
- func (c *Cluster) SchemaVersion() int
- func (c *Cluster) Transaction(f func(*ClusterTx) error) error
- type ClusterExclusiveLocker
- type ClusterOpener
- type ClusterOption
- func WithClockForCluster(clock clock.Clock) ClusterOption
- func WithClusterTxProviderForCluster(clusterTxProvider ClusterTxProvider) ClusterOption
- func WithFileSystemForCluster(fileSystem fsys.FileSystem) ClusterOption
- func WithLoggerForCluster(logger log.Logger) ClusterOption
- func WithSleeperForCluster(sleeper clock.Sleeper) ClusterOption
- func WithTransactionForCluster(transaction Transaction) ClusterOption
- type ClusterTransactioner
- type ClusterTx
- func (c *ClusterTx) Config() (map[string]string, error)
- func (c *ClusterTx) NodeAdd(name, address string, schema, api int) (int64, error)
- func (c *ClusterTx) NodeAddress() (string, error)
- func (c *ClusterTx) NodeByAddress(address string) (NodeInfo, error)
- func (c *ClusterTx) NodeByName(name string) (NodeInfo, error)
- func (c *ClusterTx) NodeHeartbeat(address string, heartbeat time.Time) error
- func (c *ClusterTx) NodeID(id int64)
- func (c *ClusterTx) NodeIsEmpty(id int64) (string, error)
- func (c *ClusterTx) NodeIsOutdated() (bool, error)
- func (c *ClusterTx) NodeName() (string, error)
- func (c *ClusterTx) NodeOfflineThreshold() (time.Duration, error)
- func (c *ClusterTx) NodePending(id int64, pending bool) error
- func (c *ClusterTx) NodePendingByAddress(address string) (NodeInfo, error)
- func (c *ClusterTx) NodeRemove(id int64) error
- func (c *ClusterTx) NodeRename(old, new string) error
- func (c *ClusterTx) NodeUpdate(id int64, name, address string) error
- func (c *ClusterTx) NodeUpdateVersion(id int64, version [2]int) error
- func (c *ClusterTx) Nodes() ([]NodeInfo, error)
- func (c *ClusterTx) NodesCount() (int, error)
- func (c *ClusterTx) OperationAdd(uuid string, opType OperationType) (int64, error)
- func (c *ClusterTx) OperationByUUID(uuid string) (Operation, error)
- func (c *ClusterTx) OperationNodes() ([]string, error)
- func (c *ClusterTx) OperationRemove(uuid string) error
- func (c *ClusterTx) Operations() ([]Operation, error)
- func (c *ClusterTx) OperationsUUIDs() ([]string, error)
- func (c *ClusterTx) ServiceAdd(name, address, daemonAddress, daemonNonce string) (int64, error)
- func (c *ClusterTx) ServiceByName(name string) (ServiceNodeInfo, error)
- func (c *ClusterTx) ServiceNodeOfflineThreshold() (time.Duration, error)
- func (c *ClusterTx) ServiceNodes() ([]ServiceNodeInfo, error)
- func (c *ClusterTx) ServiceRemove(id int64) error
- func (c *ClusterTx) ServiceUpdate(id int64, name, address, daemonAddress, daemonNonce string) error
- func (c *ClusterTx) TaskAdd(uuid, query string, schedule int64, status int) (int64, error)
- func (c *ClusterTx) TaskByUUID(uuid string) (Task, error)
- func (c *ClusterTx) TaskNodes() ([]string, error)
- func (c *ClusterTx) TaskRemove(uuid string) error
- func (c *ClusterTx) TaskUpdateResult(uuid, res string, status int) error
- func (c *ClusterTx) Tasks() ([]Task, error)
- func (c *ClusterTx) TasksByScheduleRange(from, to time.Time, status int) ([]Task, error)
- func (c *ClusterTx) TasksUUIDs() ([]string, error)
- func (c *ClusterTx) UpdateConfig(values map[string]string) error
- type ClusterTxProvider
- type ConfigQuery
- type CountQuery
- type Node
- type NodeInfo
- type NodeOpener
- type NodeTransactioner
- type NodeTx
- func (n *NodeTx) Config() (map[string]string, error)
- func (n *NodeTx) RaftNodeAdd(address string) (int64, error)
- func (n *NodeTx) RaftNodeAddress(id int64) (string, error)
- func (n *NodeTx) RaftNodeAddresses() ([]string, error)
- func (n *NodeTx) RaftNodeDelete(id int64) error
- func (n *NodeTx) RaftNodeFirst(address string) error
- func (n *NodeTx) RaftNodes() ([]RaftNode, error)
- func (n *NodeTx) RaftNodesReplace(nodes []RaftNode) error
- func (n *NodeTx) UpdateConfig(values map[string]string) error
- type ObjectsQuery
- type Operation
- type OperationType
- type Query
- type QueryCluster
- type QueryNode
- type RaftNode
- type ServiceNodeInfo
- type StringsQuery
- type Task
- type Transaction
Constants ¶
const ClusterDefaultOfflineThreshold = 30
ClusterDefaultOfflineThreshold is the default value for the cluster.offline_threshold configuration key, expressed in seconds.
const DiscoveryDefaultOfflineThreshold = 120
DiscoveryDefaultOfflineThreshold is the default value for the discovery.offline_threshold configuration key, expressed in seconds.
Variables ¶
var ( // ErrAlreadyDefined hapens when the given entry already exists, // for example a container. ErrAlreadyDefined = fmt.Errorf("already exists") // ErrNoSuchObject is in the case of joins (and probably other) queries, // we don't get back sql.ErrNoRows when no rows are returned, even though we do // on selects without joins. Instead, you can use this error to // propagate up and generate proper 404s to the client when something // isn't found so we don't abuse sql.ErrNoRows any more than we // already do. ErrNoSuchObject = errors.Errorf("No such object") )
var ErrSomeNodesAreBehind = errors.Errorf("some nodes are behind this node's version")
ErrSomeNodesAreBehind is returned by OpenCluster if some of the nodes in the cluster have a schema or API version that is less recent than this node.
Functions ¶
func UnsafeClusterDB ¶
UnsafeClusterDB accesses the cluster database, mainly for tests, so that we don't expose the DB directly on the Cluster.
func UnsafeNodeID ¶
UnsafeNodeID accesses the node ID from the cluster, mainly for tests, so that we don't expose the node ID directly on the Cluster.
Types ¶
type Cluster ¶
type Cluster struct {
// contains filtered or unexported fields
}
Cluster mediates access to data stored in the cluster dqlite database.
func NewCluster ¶
func NewCluster(cluster QueryCluster, options ...ClusterOption) *Cluster
NewCluster creates a new Cluster object.
func (*Cluster) EnterExclusive ¶
EnterExclusive acquires a lock on the cluster db, so any successive call to Transaction will block until ExitExclusive has been called.
func (*Cluster) ExitExclusive ¶
ExitExclusive runs the given transaction and then releases the lock acquired with EnterExclusive.
func (*Cluster) NodeID ¶
NodeID sets the the node NodeID associated with this cluster instance. It's used for backward-compatibility of all db-related APIs that were written before clustering and don't accept a node NodeID, so in those cases we automatically use this value as implicit node NodeID.
func (*Cluster) Open ¶
func (c *Cluster) Open( store cluster.ServerStore, address, dir string, timeout time.Duration, options ...dqlite.DriverOption, ) error
Open creates a new Cluster object for interacting with the dqlite database.
- store: Function used to connect to the dqlite backend. - address: Network address of this node (or empty string). - dir: Base database directory (e.g. /var/lib/thermionic/database) - options: Driver options that can be passed to the cluster when opening the db.
The address and api parameters will be used to determine if the cluster database matches our version, and possibly trigger a schema update. If the schema update can't be performed right now, because some nodes are still behind, an Upgrading error is returned.
func (*Cluster) SchemaVersion ¶
SchemaVersion returns the underlying schema version for the cluster
func (*Cluster) Transaction ¶
Transaction creates a new ClusterTx object and transactionally executes the cluster database interactions invoked by the given function. If the function returns no error, all database changes are committed to the cluster database database, otherwise they are rolled back.
If EnterExclusive has been called before, calling Transaction will block until ExitExclusive has been called as well to release the lock.
type ClusterExclusiveLocker ¶
type ClusterExclusiveLocker interface { // EnterExclusive acquires a lock on the cluster db, so any successive call to // Transaction will block until ExitExclusive has been called. EnterExclusive() error // ExitExclusive runs the given transaction and then releases the lock acquired // with EnterExclusive. ExitExclusive(func(*ClusterTx) error) error }
ClusterExclusiveLocker defines an exclusive lock for doing certain operations on the cluster.
type ClusterOpener ¶
type ClusterOpener interface { // Open creates a new Cluster object for interacting with the dqlite database. // // - store: Function used to connect to the dqlite backend. // - address: Network address of this node (or empty string). // - dir: Base database directory (e.g. /var/lib/thermionic/database) // - options: Driver options that can be passed to the cluster when opening the db. // // The address and api parameters will be used to determine if the cluster // database matches our version, and possibly trigger a schema update. If the // schema update can't be performed right now, because some nodes are still // behind, an Upgrading error is returned. Open(cluster.ServerStore, string, string, time.Duration, ...dqlite.DriverOption) error }
ClusterOpener represents a way to open a cluster
type ClusterOption ¶
type ClusterOption func(*clusterOptions)
ClusterOption to be passed to NewCluster to customize the resulting instance.
func WithClockForCluster ¶
func WithClockForCluster(clock clock.Clock) ClusterOption
WithClockForCluster sets the clock on the clusterOptions
func WithClusterTxProviderForCluster ¶
func WithClusterTxProviderForCluster(clusterTxProvider ClusterTxProvider) ClusterOption
WithClusterTxProviderForCluster sets the clusterTxProvider on the clusterOptions
func WithFileSystemForCluster ¶
func WithFileSystemForCluster(fileSystem fsys.FileSystem) ClusterOption
WithFileSystemForCluster sets the fileSystem on the clusterOptions
func WithLoggerForCluster ¶
func WithLoggerForCluster(logger log.Logger) ClusterOption
WithLoggerForCluster sets the logger on the clusterOptions
func WithSleeperForCluster ¶
func WithSleeperForCluster(sleeper clock.Sleeper) ClusterOption
WithSleeperForCluster sets the sleeper on the clusterOptions
func WithTransactionForCluster ¶
func WithTransactionForCluster(transaction Transaction) ClusterOption
WithTransactionForCluster sets the transaction on the clusterOptions
type ClusterTransactioner ¶
type ClusterTransactioner interface { // Transaction creates a new ClusterTx object and transactionally executes the // cluster database interactions invoked by the given function. If the function // returns no error, all database changes are committed to the cluster database // database, otherwise they are rolled back. // // If EnterExclusive has been called before, calling Transaction will block // until ExitExclusive has been called as well to release the lock. Transaction(f func(*ClusterTx) error) error }
ClusterTransactioner represents a way to run transaction on the cluster
type ClusterTx ¶
type ClusterTx struct {
// contains filtered or unexported fields
}
ClusterTx models a single interaction with a cluster database.
It wraps low-level database.Tx objects and offers a high-level API to fetch and update data.
func NewClusterTx ¶
NewClusterTx creates a new transaction node with sane defaults
func NewClusterTxWithQuery ¶
NewClusterTxWithQuery creates a new transaction node with sane defaults
func (*ClusterTx) NodeAdd ¶
NodeAdd adds a node to the current list of nodes that are part of the cluster. It returns the ID of the newly inserted row.
func (*ClusterTx) NodeAddress ¶
NodeAddress returns the address of the node this method is invoked on.
func (*ClusterTx) NodeByAddress ¶
NodeByAddress returns the pending node with the given network address.
func (*ClusterTx) NodeByName ¶
NodeByName returns the node with the given name.
func (*ClusterTx) NodeHeartbeat ¶
NodeHeartbeat updates the heartbeat column of the node with the given address.
func (*ClusterTx) NodeID ¶
NodeID sets the the node NodeID associated with this cluster transaction.
func (*ClusterTx) NodeIsEmpty ¶
NodeIsEmpty returns an empty string if the node with the given ID has anything associated with it. Otherwise, it returns a message say what's left.
func (*ClusterTx) NodeIsOutdated ¶
NodeIsOutdated returns true if there's some cluster node having an API or schema version greater than the node this method is invoked on.
func (*ClusterTx) NodeOfflineThreshold ¶
NodeOfflineThreshold returns the amount of time that needs to elapse after which a series of unsuccessful heartbeat will make the node be considered offline.
func (*ClusterTx) NodePending ¶
NodePending toggles the pending flag for the node. A node is pending when it's been accepted in the cluster, but has not yet actually joined it.
func (*ClusterTx) NodePendingByAddress ¶
NodePendingByAddress returns the pending node with the given network address.
func (*ClusterTx) NodeRemove ¶
NodeRemove removes the node with the given id.
func (*ClusterTx) NodeRename ¶
NodeRename changes the name of an existing node.
Return an error if a node with the same name already exists.
func (*ClusterTx) NodeUpdate ¶
NodeUpdate updates the name an address of a node.
func (*ClusterTx) NodeUpdateVersion ¶
NodeUpdateVersion updates the schema and API version of the node with the given id. This is used only in tests.
func (*ClusterTx) Nodes ¶
Nodes returns all nodes part of the cluster.
If this instance is not clustered, a list with a single node whose address is 0.0.0.0 is returned.
func (*ClusterTx) NodesCount ¶
NodesCount returns the number of nodes in the cluster.
Since there's always at least one node row, even when not-clustered, the return value is greater than zero
func (*ClusterTx) OperationAdd ¶
func (c *ClusterTx) OperationAdd(uuid string, opType OperationType) (int64, error)
OperationAdd adds a new operations to the table.
func (*ClusterTx) OperationByUUID ¶
OperationByUUID returns the operation with the given UUID.
func (*ClusterTx) OperationNodes ¶
OperationNodes returns a list of nodes that have running operations
func (*ClusterTx) OperationRemove ¶
OperationRemove removes the operation with the given UUID.
func (*ClusterTx) Operations ¶
Operations returns all operations associated with this node.
func (*ClusterTx) OperationsUUIDs ¶
OperationsUUIDs returns the UUIDs of all operations associated with this node.
func (*ClusterTx) ServiceAdd ¶
ServiceAdd adds a service to the current list of services that are part of the cluster. It returns the ID of the newly inserted row.
func (*ClusterTx) ServiceByName ¶
func (c *ClusterTx) ServiceByName(name string) (ServiceNodeInfo, error)
ServiceByName returns the service with the given name.
func (*ClusterTx) ServiceNodeOfflineThreshold ¶
ServiceNodeOfflineThreshold returns the amount of time that needs to elapse after which a series of unsuccessful heartbeat will make the service node be considered offline.
func (*ClusterTx) ServiceNodes ¶
func (c *ClusterTx) ServiceNodes() ([]ServiceNodeInfo, error)
ServiceNodes returns all service services part of the cluster.
func (*ClusterTx) ServiceRemove ¶
ServiceRemove removes the service with the given id.
func (*ClusterTx) ServiceUpdate ¶
func (c *ClusterTx) ServiceUpdate( id int64, name, address, daemonAddress, daemonNonce string, ) error
ServiceUpdate updates the name an address of a service.
func (*ClusterTx) TaskByUUID ¶
TaskByUUID returns the operation with the given UUID.
func (*ClusterTx) TaskRemove ¶
TaskRemove removes the operation with the given UUID.
func (*ClusterTx) TaskUpdateResult ¶
TaskUpdateResult updates the result of the operation with the given UUID
func (*ClusterTx) TasksByScheduleRange ¶
TasksByScheduleRange returns a series of tasks within the time range
func (*ClusterTx) TasksUUIDs ¶
TasksUUIDs returns the UUIDs of all tasks associated with this node.
type ClusterTxProvider ¶
type ClusterTxProvider interface { // New creates a ClusterTx with the sane defaults New(database.Tx, int64) *ClusterTx }
ClusterTxProvider creates ClusterTx which can be used by the cluster
type ConfigQuery ¶
type ConfigQuery interface { // SelectConfig executes a query statement against a "config" table, which // must have 'key' and 'value' columns. By default this query returns all // keys, but additional WHERE filters can be specified. // // Returns a map of key names to their associated values. SelectConfig(database.Tx, string, string, ...interface{}) (map[string]string, error) // UpdateConfig updates the given keys in the given table. Config keys set to // empty values will be deleted. UpdateConfig(database.Tx, string, map[string]string) error }
ConfigQuery defines queries to the database for configuration queries
type CountQuery ¶
type CountQuery interface { // Count returns the number of rows in the given table. Count(database.Tx, string, string, ...interface{}) (int, error) }
CountQuery defines queries to the database for count queries
type Node ¶
type Node struct {
// contains filtered or unexported fields
}
Node mediates access to the data stored in the node-local SQLite database.
func (*Node) Open ¶
Open a new node.
The fresh hook parameter is used by the daemon to mark all known patch names as applied when a brand new database is created.
func (*Node) Transaction ¶
Transaction creates a new NodeTx object and transactionally executes the node-level database interactions invoked by the given function. If the function returns no error, all database changes are committed to the node-level database, otherwise they are rolled back.
type NodeInfo ¶
type NodeInfo struct { ID int64 // Stable node identifier Name string // User-assigned name of the node Address string // Network address of the node Description string // Node description (optional) Schema int // Schema version of the code running the node APIExtensions int // Number of API extensions of the code running on the node Heartbeat time.Time // Timestamp of the last heartbeat }
NodeInfo holds information about a single instance in a cluster.
type NodeOpener ¶
type NodeOpener interface { // Open a new node. // // The function hook parameter is used by the daemon to mark all known // patch names as applied when a brand new database is created. Open(string, func(*Node) error) error }
NodeOpener represents a way to open a node
type NodeTransactioner ¶
type NodeTransactioner interface { // Transaction creates a new NodeTx object and transactionally executes the // node-level database interactions invoked by the given function. If the // function returns no error, all database changes are committed to the // node-level database, otherwise they are rolled back. Transaction(f func(*NodeTx) error) error }
NodeTransactioner represents a way to run transaction on the node
type NodeTx ¶
type NodeTx struct {
// contains filtered or unexported fields
}
NodeTx models a single interaction with a node-local database.
It wraps low-level db.Tx objects and offers a high-level API to fetch and update data.
func NewNodeTxWithQuery ¶
NewNodeTxWithQuery creates a new transaction node with sane defaults
func (*NodeTx) RaftNodeAdd ¶
RaftNodeAdd adds a node to the current list of nodes that are part of the dqlite Raft cluster. It returns the ID of the newly inserted row.
func (*NodeTx) RaftNodeAddress ¶
RaftNodeAddress returns the address of the raft node with the given ID, if any matching row exists.
func (*NodeTx) RaftNodeAddresses ¶
RaftNodeAddresses returns the addresses of all nodes that are members of the dqlite Raft cluster (possibly including the local node). If this instance is not running in clustered mode, an empty list is returned.
func (*NodeTx) RaftNodeDelete ¶
RaftNodeDelete removes a node from the current list of nodes that are part of the dqlite Raft cluster.
func (*NodeTx) RaftNodeFirst ¶
RaftNodeFirst adds a the first node if the cluster. It ensures that the database ID is 1, to match the server ID of first raft log entry.
This method is supposed to be called when there are no rows in raft_nodes, and it will replace whatever existing row has ID 1.
func (*NodeTx) RaftNodes ¶
RaftNodes returns information about all nodes that are members of the dqlite Raft cluster (possibly including the local node). If this instance is not running in clustered mode, an empty list is returned.
func (*NodeTx) RaftNodesReplace ¶
RaftNodesReplace replaces the current list of raft nodes.
type ObjectsQuery ¶
type ObjectsQuery interface { // SelectObjects executes a statement which must yield rows with a specific // columns schema. It invokes the given Dest hook for each yielded row. SelectObjects(database.Tx, q.Dest, string, ...interface{}) error // UpsertObject inserts or replaces a new row with the given column values, // to the given table using columns order. For example: // // UpsertObject(tx, "cars", []string{"id", "brand"}, []interface{}{1, "ferrari"}) // // The number of elements in 'columns' must match the one in 'values'. UpsertObject(database.Tx, string, []string, []interface{}) (int64, error) // DeleteObject removes the row identified by the given ID. The given table // must have a primary key column called 'id'. // // It returns a flag indicating if a matching row was actually found and // deleted or not. DeleteObject(database.Tx, string, int64) (bool, error) }
ObjectsQuery defines queries to the database for generic object queries
type Operation ¶
type Operation struct { ID int64 // Stable database identifier UUID string // User-visible identifier NodeAddress string // Address of the node the operation is running on Type OperationType // Type of the operation }
Operation holds information about a single operation running on a node in the cluster.
type OperationType ¶
type OperationType string
OperationType is a identifier code identifying the type of an Operation.
type Query ¶
type Query interface { ConfigQuery ObjectsQuery StringsQuery CountQuery }
Query defines different queries for accessing the database
type QueryCluster ¶
type QueryCluster interface { database.DBAccessor // Open the cluster database object. // // The name argument is the name of the cluster database. It defaults to // 'db.bin', but can be overwritten for testing. // // The dialer argument is a function that returns a gRPC dialer that can be // used to connect to a database node using the gRPC SQL package. Open(cluster.ServerStore, ...dqlite.DriverOption) error // EnsureSchema applies all relevant schema updates to the cluster database. // // Before actually doing anything, this function will make sure that all nodes // in the cluster have a schema version and a number of API extensions that // match our one. If it's not the case, we either return an error (if some // nodes have version greater than us and we need to be upgraded), or return // false and no error (if some nodes have a lower version, and we need to wait // till they get upgraded and restarted). EnsureSchema(string, string) (bool, error) // SchemaVersion returns the underlying schema version for the cluster SchemaVersion() int }
QueryCluster defines an interface for interacting with the cluster DB
type QueryNode ¶
type QueryNode interface { // Open the node-local database object. Open(string) error // EnsureSchema applies all relevant schema updates to the node-local // database. // // Return the initial schema version found before starting the update, along // with any error occurred. EnsureSchema(hookFn schema.Hook) (int, error) // DB return the current database source. DB() database.DB }
QueryNode represents a local node in a cluster
type RaftNode ¶
type RaftNode struct { ID int64 // Stable node identifier Address string // Network address of the node }
RaftNode holds information about a single node in the dqlite raft cluster.
type ServiceNodeInfo ¶
type ServiceNodeInfo struct { ID int64 // Stable node identifier Name string // User-assigned name of the node Address string // Network address of the node DaemonAddress string // Daemon address that's associated with the service DaemonNonce string // Daemon nonce for secure communication with the dameon Heartbeat time.Time // Timestamp of the last heartbeat }
ServiceNodeInfo holds information about a single instance in a cluster.
type StringsQuery ¶
type StringsQuery interface { // SelectStrings executes a statement which must yield rows with a single // string column. It returns the list of column values. SelectStrings(database.Tx, string, ...interface{}) ([]string, error) }
StringsQuery defines queries to the database for string queries
type Task ¶
type Task struct { ID int64 // Stable database identifier UUID string // User-visible identifier Query string Schedule int64 Result string Status int NodeAddress string // Address of the node the operation is running on }
Task holds information about a single operation running on a node in the cluster.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
mocks
Package mocks is a generated GoMock package.
|
Package mocks is a generated GoMock package. |
Package mocks is a generated GoMock package.
|
Package mocks is a generated GoMock package. |
mocks
Package mocks is a generated GoMock package.
|
Package mocks is a generated GoMock package. |
Package query implements helpers around database/sql to execute various kinds of very common SQL queries.
|
Package query implements helpers around database/sql to execute various kinds of very common SQL queries. |
mocks
Package mocks is a generated GoMock package.
|
Package mocks is a generated GoMock package. |
Package schema offers utilities to create and maintain a database schema.
|
Package schema offers utilities to create and maintain a database schema. |
mocks
Package mocks is a generated GoMock package.
|
Package mocks is a generated GoMock package. |