Documentation ¶
Overview ¶
Package store provides interfaces to work with swarm cluster state.
The primary interface is MemoryStore, which abstracts storage of this cluster state. MemoryStore exposes a transactional interface for both reads and writes. To perform a read transaction, View accepts a callback function that it will invoke with a ReadTx object that gives it a consistent view of the state. Similarly, Update accepts a callback function that it will invoke with a Tx object that allows reads and writes to happen without interference from other transactions.
This is an example of making an update to a MemoryStore:
err := store.Update(func(tx store.Tx) { if err := tx.Nodes().Update(newNode); err != nil { return err } return nil }) if err != nil { return fmt.Errorf("transaction failed: %v", err) }
MemoryStore exposes watch functionality. It exposes a publish/subscribe queue where code can subscribe to changes of interest. This can be combined with the ViewAndWatch function to "fork" a store, by making a snapshot and then applying future changes to keep the copy in sync. This approach lets consumers of the data use their own data structures and implement their own concurrency strategies. It can lead to more efficient code because data consumers don't necessarily have to lock the main data store if they are maintaining their own copies of the state.
Index ¶
- Constants
- Variables
- func Apply(store *MemoryStore, item events.Event) (err error)
- func CreateCluster(tx Tx, c *api.Cluster) error
- func CreateConfig(tx Tx, c *api.Config) error
- func CreateExtension(tx Tx, e *api.Extension) error
- func CreateNetwork(tx Tx, n *api.Network) error
- func CreateNode(tx Tx, n *api.Node) error
- func CreateResource(tx Tx, r *api.Resource) error
- func CreateSecret(tx Tx, s *api.Secret) error
- func CreateService(tx Tx, s *api.Service) error
- func CreateTask(tx Tx, t *api.Task) error
- func DeleteCluster(tx Tx, id string) error
- func DeleteConfig(tx Tx, id string) error
- func DeleteExtension(tx Tx, id string) error
- func DeleteNetwork(tx Tx, id string) error
- func DeleteNode(tx Tx, id string) error
- func DeleteResource(tx Tx, id string) error
- func DeleteSecret(tx Tx, id string) error
- func DeleteService(tx Tx, id string) error
- func DeleteTask(tx Tx, id string) error
- func FindClusters(tx ReadTx, by By) ([]*api.Cluster, error)
- func FindConfigs(tx ReadTx, by By) ([]*api.Config, error)
- func FindExtensions(tx ReadTx, by By) ([]*api.Extension, error)
- func FindNetworks(tx ReadTx, by By) ([]*api.Network, error)
- func FindNodes(tx ReadTx, by By) ([]*api.Node, error)
- func FindResources(tx ReadTx, by By) ([]*api.Resource, error)
- func FindSecrets(tx ReadTx, by By) ([]*api.Secret, error)
- func FindServices(tx ReadTx, by By) ([]*api.Service, error)
- func FindTasks(tx ReadTx, by By) ([]*api.Task, error)
- func GetCluster(tx ReadTx, id string) *api.Cluster
- func GetConfig(tx ReadTx, id string) *api.Config
- func GetExtension(tx ReadTx, id string) *api.Extension
- func GetNetwork(tx ReadTx, id string) *api.Network
- func GetNode(tx ReadTx, id string) *api.Node
- func GetResource(tx ReadTx, id string) *api.Resource
- func GetSecret(tx ReadTx, id string) *api.Secret
- func GetService(tx ReadTx, id string) *api.Service
- func GetTask(tx ReadTx, id string) *api.Task
- func RestoreTable(tx Tx, table string, newObjects []api.StoreObject) error
- func UpdateCluster(tx Tx, c *api.Cluster) error
- func UpdateConfig(tx Tx, c *api.Config) error
- func UpdateExtension(tx Tx, e *api.Extension) error
- func UpdateNetwork(tx Tx, n *api.Network) error
- func UpdateNode(tx Tx, n *api.Node) error
- func UpdateResource(tx Tx, r *api.Resource) error
- func UpdateSecret(tx Tx, s *api.Secret) error
- func UpdateService(tx Tx, s *api.Service) error
- func UpdateTask(tx Tx, t *api.Task) error
- func ViewAndWatch(store *MemoryStore, cb func(ReadTx) error, specifiers ...api.Event) (watch chan events.Event, cancel func(), err error)
- func WatchFrom(store *MemoryStore, version *api.Version, specifiers ...api.Event) (chan events.Event, func(), error)
- type Batch
- type By
- func ByCustom(objType, index, value string) By
- func ByCustomPrefix(objType, index, value string) By
- func ByDesiredState(state api.TaskState) By
- func ByIDPrefix(idPrefix string) By
- func ByKind(kind string) By
- func ByMembership(membership api.NodeSpec_Membership) By
- func ByName(name string) By
- func ByNamePrefix(namePrefix string) By
- func ByNodeID(nodeID string) By
- func ByReferencedConfigID(configID string) By
- func ByReferencedNetworkID(networkID string) By
- func ByReferencedSecretID(secretID string) By
- func ByRole(role api.NodeRole) By
- func ByRuntime(runtime string) By
- func ByServiceID(serviceID string) By
- func BySlot(serviceID string, slot uint64) By
- func ByTaskState(state api.TaskState) By
- func Or(bys ...By) By
- type MemoryStore
- func (s *MemoryStore) ApplyStoreActions(actions []api.StoreAction) error
- func (s *MemoryStore) Batch(cb func(*Batch) error) error
- func (s *MemoryStore) Close() error
- func (s *MemoryStore) Restore(snapshot *pb.StoreSnapshot) error
- func (s *MemoryStore) Save(tx ReadTx) (*pb.StoreSnapshot, error)
- func (s *MemoryStore) Update(cb func(Tx) error) error
- func (s *MemoryStore) View(cb func(ReadTx))
- func (s *MemoryStore) WatchQueue() *watch.Queue
- func (s *MemoryStore) Wedged() bool
- type ObjectStoreConfig
- type ReadTx
- type Tx
Constants ¶
const ( // MaxChangesPerTransaction is the number of changes after which a new // transaction should be started within Batch. MaxChangesPerTransaction = 200 // MaxTransactionBytes is the maximum serialized transaction size. MaxTransactionBytes = 1.5 * 1024 * 1024 )
const ( // DefaultClusterName is the default name to use for the cluster // object. DefaultClusterName = "default" )
Variables ¶
var ( // ErrExist is returned by create operations if the provided ID is already // taken. ErrExist = errors.New("object already exists") // ErrNotExist is returned by altering operations (update, delete) if the // provided ID is not found. ErrNotExist = errors.New("object does not exist") // ErrNameConflict is returned by create/update if the object name is // already in use by another object. ErrNameConflict = errors.New("name conflicts with an existing object") // ErrInvalidFindBy is returned if an unrecognized type is passed to Find. ErrInvalidFindBy = errors.New("invalid find argument type") // ErrSequenceConflict is returned when trying to update an object // whose sequence information does not match the object in the store's. ErrSequenceConflict = errors.New("update out of sequence") // WedgeTimeout is the maximum amount of time the store lock may be // held before declaring a suspected deadlock. WedgeTimeout = 30 * time.Second )
var All byAll
All is an argument that can be passed to find to list all items in the set.
var ( // ErrNoKind is returned by resource create operations if the provided Kind // of the resource does not exist ErrNoKind = errors.New("object kind is unregistered") )
Functions ¶
func Apply ¶
func Apply(store *MemoryStore, item events.Event) (err error)
Apply takes an item from the event stream of one Store and applies it to a second Store.
func CreateCluster ¶
CreateCluster adds a new cluster to the store. Returns ErrExist if the ID is already taken.
func CreateConfig ¶
CreateConfig adds a new config to the store. Returns ErrExist if the ID is already taken.
func CreateExtension ¶
CreateExtension adds a new extension to the store. Returns ErrExist if the ID is already taken.
func CreateNetwork ¶
CreateNetwork adds a new network to the store. Returns ErrExist if the ID is already taken.
func CreateNode ¶
CreateNode adds a new node to the store. Returns ErrExist if the ID is already taken.
func CreateResource ¶
CreateResource adds a new resource object to the store. Returns ErrExist if the ID is already taken. Returns ErrNameConflict if a Resource with this Name already exists Returns ErrNoKind if the specified Kind does not exist
func CreateSecret ¶
CreateSecret adds a new secret to the store. Returns ErrExist if the ID is already taken.
func CreateService ¶
CreateService adds a new service to the store. Returns ErrExist if the ID is already taken.
func CreateTask ¶
CreateTask adds a new task to the store. Returns ErrExist if the ID is already taken.
func DeleteCluster ¶
DeleteCluster removes a cluster from the store. Returns ErrNotExist if the cluster doesn't exist.
func DeleteConfig ¶
DeleteConfig removes a config from the store. Returns ErrNotExist if the config doesn't exist.
func DeleteExtension ¶
DeleteExtension removes an extension from the store. Returns ErrNotExist if the object doesn't exist.
func DeleteNetwork ¶
DeleteNetwork removes a network from the store. Returns ErrNotExist if the network doesn't exist.
func DeleteNode ¶
DeleteNode removes a node from the store. Returns ErrNotExist if the node doesn't exist.
func DeleteResource ¶
DeleteResource removes a resource object from the store. Returns ErrNotExist if the object doesn't exist.
func DeleteSecret ¶
DeleteSecret removes a secret from the store. Returns ErrNotExist if the secret doesn't exist.
func DeleteService ¶
DeleteService removes a service from the store. Returns ErrNotExist if the service doesn't exist.
func DeleteTask ¶
DeleteTask removes a task from the store. Returns ErrNotExist if the task doesn't exist.
func FindClusters ¶
FindClusters selects a set of clusters and returns them.
func FindConfigs ¶
FindConfigs selects a set of configs and returns them.
func FindExtensions ¶
FindExtensions selects a set of extensions and returns them.
func FindNetworks ¶
FindNetworks selects a set of networks and returns them.
func FindResources ¶
FindResources selects a set of resource objects and returns them.
func FindSecrets ¶
FindSecrets selects a set of secrets and returns them.
func FindServices ¶
FindServices selects a set of services and returns them.
func GetCluster ¶
GetCluster looks up a cluster by ID. Returns nil if the cluster doesn't exist.
func GetExtension ¶
GetExtension looks up an extension by ID. Returns nil if the object doesn't exist.
func GetNetwork ¶
GetNetwork looks up a network by ID. Returns nil if the network doesn't exist.
func GetResource ¶
GetResource looks up a resource object by ID. Returns nil if the object doesn't exist.
func GetService ¶
GetService looks up a service by ID. Returns nil if the service doesn't exist.
func RestoreTable ¶
func RestoreTable(tx Tx, table string, newObjects []api.StoreObject) error
RestoreTable takes a list of new objects of a particular type (e.g. clusters, nodes, etc., which conform to the StoreObject interface) and replaces the existing objects in the store of that type with the new objects.
func UpdateCluster ¶
UpdateCluster updates an existing cluster in the store. Returns ErrNotExist if the cluster doesn't exist.
func UpdateConfig ¶
UpdateConfig updates an existing config in the store. Returns ErrNotExist if the config doesn't exist.
func UpdateExtension ¶
UpdateExtension updates an existing extension in the store. Returns ErrNotExist if the object doesn't exist.
func UpdateNetwork ¶
UpdateNetwork updates an existing network in the store. Returns ErrNotExist if the network doesn't exist.
func UpdateNode ¶
UpdateNode updates an existing node in the store. Returns ErrNotExist if the node doesn't exist.
func UpdateResource ¶
UpdateResource updates an existing resource object in the store. Returns ErrNotExist if the object doesn't exist.
func UpdateSecret ¶
UpdateSecret updates an existing secret in the store. Returns ErrNotExist if the secret doesn't exist.
func UpdateService ¶
UpdateService updates an existing service in the store. Returns ErrNotExist if the service doesn't exist.
func UpdateTask ¶
UpdateTask updates an existing task in the store. Returns ErrNotExist if the node doesn't exist.
func ViewAndWatch ¶
func ViewAndWatch(store *MemoryStore, cb func(ReadTx) error, specifiers ...api.Event) (watch chan events.Event, cancel func(), err error)
ViewAndWatch calls a callback which can observe the state of this MemoryStore. It also returns a channel that will return further events from this point so the snapshot can be kept up to date. The watch channel must be released with watch.StopWatch when it is no longer needed. The channel is guaranteed to get all events after the moment of the snapshot, and only those events.
func WatchFrom ¶
func WatchFrom(store *MemoryStore, version *api.Version, specifiers ...api.Event) (chan events.Event, func(), error)
WatchFrom returns a channel that will return past events from starting from "version", and new events until the channel is closed. If "version" is nil, this function is equivalent to
state.Watch(store.WatchQueue(), specifiers...).
If the log has been compacted and it's not possible to produce the exact set of events leading from "version" to the current state, this function will return an error, and the caller should re-sync.
The watch channel must be released with watch.StopWatch when it is no longer needed.
Types ¶
type Batch ¶
type Batch struct {
// contains filtered or unexported fields
}
Batch provides a mechanism to batch updates to a store.
type By ¶
type By interface {
// contains filtered or unexported methods
}
By is an interface type passed to Find methods. Implementations must be defined in this package.
func ByCustomPrefix ¶
ByCustomPrefix creates an object to pass to Find to search a custom index by a value prefix.
func ByDesiredState ¶
ByDesiredState creates an object to pass to Find to select by desired state.
func ByIDPrefix ¶
ByIDPrefix creates an object to pass to Find to select by query.
func ByKind ¶
ByKind creates an object to pass to Find to search for a Resource of a particular kind.
func ByMembership ¶
func ByMembership(membership api.NodeSpec_Membership) By
ByMembership creates an object to pass to Find to select by Membership.
func ByNamePrefix ¶
ByNamePrefix creates an object to pass to Find to select by query.
func ByReferencedConfigID ¶
ByReferencedConfigID creates an object to pass to Find to search for a service or task that references a config with the given ID.
func ByReferencedNetworkID ¶
ByReferencedNetworkID creates an object to pass to Find to search for a service or task that references a network with the given ID.
func ByReferencedSecretID ¶
ByReferencedSecretID creates an object to pass to Find to search for a service or task that references a secret with the given ID.
func ByServiceID ¶
ByServiceID creates an object to pass to Find to select by service.
func ByTaskState ¶
ByTaskState creates an object to pass to Find to select by task state.
type MemoryStore ¶
type MemoryStore struct {
// contains filtered or unexported fields
}
MemoryStore is a concurrency-safe, in-memory implementation of the Store interface.
func NewMemoryStore ¶
func NewMemoryStore(proposer state.Proposer) *MemoryStore
NewMemoryStore returns an in-memory store. The argument is an optional Proposer which will be used to propagate changes to other members in a cluster.
func (*MemoryStore) ApplyStoreActions ¶
func (s *MemoryStore) ApplyStoreActions(actions []api.StoreAction) error
ApplyStoreActions updates a store based on StoreAction messages.
func (*MemoryStore) Batch ¶
func (s *MemoryStore) Batch(cb func(*Batch) error) error
Batch performs one or more transactions that allow reads and writes It invokes a callback that is passed a Batch object. The callback may call batch.Update for each change it wants to make as part of the batch. The changes in the batch may be split over multiple transactions if necessary to keep transactions below the size limit. Batch holds a lock over the state, but will yield this lock every it creates a new transaction to allow other writers to proceed. Thus, unrelated changes to the state may occur between calls to batch.Update.
This method allows the caller to iterate over a data set and apply changes in sequence without holding the store write lock for an excessive time, or producing a transaction that exceeds the maximum size.
If Batch returns an error, no guarantees are made about how many updates were committed successfully.
func (*MemoryStore) Close ¶
func (s *MemoryStore) Close() error
Close closes the memory store and frees its associated resources.
func (*MemoryStore) Restore ¶
func (s *MemoryStore) Restore(snapshot *pb.StoreSnapshot) error
Restore sets the contents of the store to the serialized data in the argument.
func (*MemoryStore) Save ¶
func (s *MemoryStore) Save(tx ReadTx) (*pb.StoreSnapshot, error)
Save serializes the data in the store.
func (*MemoryStore) Update ¶
func (s *MemoryStore) Update(cb func(Tx) error) error
Update executes a read/write transaction.
func (*MemoryStore) View ¶
func (s *MemoryStore) View(cb func(ReadTx))
View executes a read transaction.
func (*MemoryStore) WatchQueue ¶
func (s *MemoryStore) WatchQueue() *watch.Queue
WatchQueue returns the publish/subscribe queue.
func (*MemoryStore) Wedged ¶
func (s *MemoryStore) Wedged() bool
Wedged returns true if the store lock has been held for a long time, possibly indicating a deadlock.
type ObjectStoreConfig ¶
type ObjectStoreConfig struct { Table *memdb.TableSchema Save func(ReadTx, *api.StoreSnapshot) error Restore func(Tx, *api.StoreSnapshot) error ApplyStoreAction func(Tx, api.StoreAction) error }
ObjectStoreConfig provides the necessary methods to store a particular object type inside MemoryStore.
type ReadTx ¶
type ReadTx interface {
// contains filtered or unexported methods
}
ReadTx is a read transaction. Note that transaction does not imply any internal batching. It only means that the transaction presents a consistent view of the data that cannot be affected by other transactions.
type Tx ¶
type Tx interface { ReadTx // contains filtered or unexported methods }
Tx is a read/write transaction. Note that transaction does not imply any internal batching. The purpose of this transaction is to give the user a guarantee that its changes won't be visible to other transactions until the transaction is over.