store

package
v0.0.0-...-3e31364 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 1, 2021 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Overview

Package store provides interfaces to work with swarm cluster state.

The primary interface is MemoryStore, which abstracts storage of this cluster state. MemoryStore exposes a transactional interface for both reads and writes. To perform a read transaction, View accepts a callback function that it will invoke with a ReadTx object that gives it a consistent view of the state. Similarly, Update accepts a callback function that it will invoke with a Tx object that allows reads and writes to happen without interference from other transactions.

This is an example of making an update to a MemoryStore:

err := store.Update(func(tx store.Tx) {
	if err := tx.Nodes().Update(newNode); err != nil {
		return err
	}
	return nil
})
if err != nil {
	return fmt.Errorf("transaction failed: %v", err)
}

MemoryStore exposes watch functionality. It exposes a publish/subscribe queue where code can subscribe to changes of interest. This can be combined with the ViewAndWatch function to "fork" a store, by making a snapshot and then applying future changes to keep the copy in sync. This approach lets consumers of the data use their own data structures and implement their own concurrency strategies. It can lead to more efficient code because data consumers don't necessarily have to lock the main data store if they are maintaining their own copies of the state.

Index

Constants

View Source
const (

	// MaxChangesPerTransaction is the number of changes after which a new
	// transaction should be started within Batch.
	MaxChangesPerTransaction = 200

	// MaxTransactionBytes is the maximum serialized transaction size.
	MaxTransactionBytes = 1.5 * 1024 * 1024
)
View Source
const (

	// DefaultClusterName is the default name to use for the cluster
	// object.
	DefaultClusterName = "default"
)

Variables

View Source
var (
	// ErrExist is returned by create operations if the provided ID is already
	// taken.
	ErrExist = errors.New("object already exists")

	// ErrNotExist is returned by altering operations (update, delete) if the
	// provided ID is not found.
	ErrNotExist = errors.New("object does not exist")

	// ErrNameConflict is returned by create/update if the object name is
	// already in use by another object.
	ErrNameConflict = errors.New("name conflicts with an existing object")

	// ErrInvalidFindBy is returned if an unrecognized type is passed to Find.
	ErrInvalidFindBy = errors.New("invalid find argument type")

	// ErrSequenceConflict is returned when trying to update an object
	// whose sequence information does not match the object in the store's.
	ErrSequenceConflict = errors.New("update out of sequence")

	// WedgeTimeout is the maximum amount of time the store lock may be
	// held before declaring a suspected deadlock.
	WedgeTimeout = 30 * time.Second
)
View Source
var All byAll

All is an argument that can be passed to find to list all items in the set.

View Source
var (
	// ErrNoKind is returned by resource create operations if the provided Kind
	// of the resource does not exist
	ErrNoKind = errors.New("object kind is unregistered")
)

Functions

func Apply

func Apply(store *MemoryStore, item events.Event) (err error)

Apply takes an item from the event stream of one Store and applies it to a second Store.

func CreateCluster

func CreateCluster(tx Tx, c *api.Cluster) error

CreateCluster adds a new cluster to the store. Returns ErrExist if the ID is already taken.

func CreateConfig

func CreateConfig(tx Tx, c *api.Config) error

CreateConfig adds a new config to the store. Returns ErrExist if the ID is already taken.

func CreateExtension

func CreateExtension(tx Tx, e *api.Extension) error

CreateExtension adds a new extension to the store. Returns ErrExist if the ID is already taken.

func CreateNetwork

func CreateNetwork(tx Tx, n *api.Network) error

CreateNetwork adds a new network to the store. Returns ErrExist if the ID is already taken.

func CreateNode

func CreateNode(tx Tx, n *api.Node) error

CreateNode adds a new node to the store. Returns ErrExist if the ID is already taken.

func CreateResource

func CreateResource(tx Tx, r *api.Resource) error

CreateResource adds a new resource object to the store. Returns ErrExist if the ID is already taken. Returns ErrNameConflict if a Resource with this Name already exists Returns ErrNoKind if the specified Kind does not exist

func CreateSecret

func CreateSecret(tx Tx, s *api.Secret) error

CreateSecret adds a new secret to the store. Returns ErrExist if the ID is already taken.

func CreateService

func CreateService(tx Tx, s *api.Service) error

CreateService adds a new service to the store. Returns ErrExist if the ID is already taken.

func CreateTask

func CreateTask(tx Tx, t *api.Task) error

CreateTask adds a new task to the store. Returns ErrExist if the ID is already taken.

func DeleteCluster

func DeleteCluster(tx Tx, id string) error

DeleteCluster removes a cluster from the store. Returns ErrNotExist if the cluster doesn't exist.

func DeleteConfig

func DeleteConfig(tx Tx, id string) error

DeleteConfig removes a config from the store. Returns ErrNotExist if the config doesn't exist.

func DeleteExtension

func DeleteExtension(tx Tx, id string) error

DeleteExtension removes an extension from the store. Returns ErrNotExist if the object doesn't exist.

func DeleteNetwork

func DeleteNetwork(tx Tx, id string) error

DeleteNetwork removes a network from the store. Returns ErrNotExist if the network doesn't exist.

func DeleteNode

func DeleteNode(tx Tx, id string) error

DeleteNode removes a node from the store. Returns ErrNotExist if the node doesn't exist.

func DeleteResource

func DeleteResource(tx Tx, id string) error

DeleteResource removes a resource object from the store. Returns ErrNotExist if the object doesn't exist.

func DeleteSecret

func DeleteSecret(tx Tx, id string) error

DeleteSecret removes a secret from the store. Returns ErrNotExist if the secret doesn't exist.

func DeleteService

func DeleteService(tx Tx, id string) error

DeleteService removes a service from the store. Returns ErrNotExist if the service doesn't exist.

func DeleteTask

func DeleteTask(tx Tx, id string) error

DeleteTask removes a task from the store. Returns ErrNotExist if the task doesn't exist.

func FindClusters

func FindClusters(tx ReadTx, by By) ([]*api.Cluster, error)

FindClusters selects a set of clusters and returns them.

func FindConfigs

func FindConfigs(tx ReadTx, by By) ([]*api.Config, error)

FindConfigs selects a set of configs and returns them.

func FindExtensions

func FindExtensions(tx ReadTx, by By) ([]*api.Extension, error)

FindExtensions selects a set of extensions and returns them.

func FindNetworks

func FindNetworks(tx ReadTx, by By) ([]*api.Network, error)

FindNetworks selects a set of networks and returns them.

func FindNodes

func FindNodes(tx ReadTx, by By) ([]*api.Node, error)

FindNodes selects a set of nodes and returns them.

func FindResources

func FindResources(tx ReadTx, by By) ([]*api.Resource, error)

FindResources selects a set of resource objects and returns them.

func FindSecrets

func FindSecrets(tx ReadTx, by By) ([]*api.Secret, error)

FindSecrets selects a set of secrets and returns them.

func FindServices

func FindServices(tx ReadTx, by By) ([]*api.Service, error)

FindServices selects a set of services and returns them.

func FindTasks

func FindTasks(tx ReadTx, by By) ([]*api.Task, error)

FindTasks selects a set of tasks and returns them.

func GetCluster

func GetCluster(tx ReadTx, id string) *api.Cluster

GetCluster looks up a cluster by ID. Returns nil if the cluster doesn't exist.

func GetConfig

func GetConfig(tx ReadTx, id string) *api.Config

GetConfig looks up a config by ID. Returns nil if the config doesn't exist.

func GetExtension

func GetExtension(tx ReadTx, id string) *api.Extension

GetExtension looks up an extension by ID. Returns nil if the object doesn't exist.

func GetNetwork

func GetNetwork(tx ReadTx, id string) *api.Network

GetNetwork looks up a network by ID. Returns nil if the network doesn't exist.

func GetNode

func GetNode(tx ReadTx, id string) *api.Node

GetNode looks up a node by ID. Returns nil if the node doesn't exist.

func GetResource

func GetResource(tx ReadTx, id string) *api.Resource

GetResource looks up a resource object by ID. Returns nil if the object doesn't exist.

func GetSecret

func GetSecret(tx ReadTx, id string) *api.Secret

GetSecret looks up a secret by ID. Returns nil if the secret doesn't exist.

func GetService

func GetService(tx ReadTx, id string) *api.Service

GetService looks up a service by ID. Returns nil if the service doesn't exist.

func GetTask

func GetTask(tx ReadTx, id string) *api.Task

GetTask looks up a task by ID. Returns nil if the task doesn't exist.

func RestoreTable

func RestoreTable(tx Tx, table string, newObjects []api.StoreObject) error

RestoreTable takes a list of new objects of a particular type (e.g. clusters, nodes, etc., which conform to the StoreObject interface) and replaces the existing objects in the store of that type with the new objects.

func UpdateCluster

func UpdateCluster(tx Tx, c *api.Cluster) error

UpdateCluster updates an existing cluster in the store. Returns ErrNotExist if the cluster doesn't exist.

func UpdateConfig

func UpdateConfig(tx Tx, c *api.Config) error

UpdateConfig updates an existing config in the store. Returns ErrNotExist if the config doesn't exist.

func UpdateExtension

func UpdateExtension(tx Tx, e *api.Extension) error

UpdateExtension updates an existing extension in the store. Returns ErrNotExist if the object doesn't exist.

func UpdateNetwork

func UpdateNetwork(tx Tx, n *api.Network) error

UpdateNetwork updates an existing network in the store. Returns ErrNotExist if the network doesn't exist.

func UpdateNode

func UpdateNode(tx Tx, n *api.Node) error

UpdateNode updates an existing node in the store. Returns ErrNotExist if the node doesn't exist.

func UpdateResource

func UpdateResource(tx Tx, r *api.Resource) error

UpdateResource updates an existing resource object in the store. Returns ErrNotExist if the object doesn't exist.

func UpdateSecret

func UpdateSecret(tx Tx, s *api.Secret) error

UpdateSecret updates an existing secret in the store. Returns ErrNotExist if the secret doesn't exist.

func UpdateService

func UpdateService(tx Tx, s *api.Service) error

UpdateService updates an existing service in the store. Returns ErrNotExist if the service doesn't exist.

func UpdateTask

func UpdateTask(tx Tx, t *api.Task) error

UpdateTask updates an existing task in the store. Returns ErrNotExist if the node doesn't exist.

func ViewAndWatch

func ViewAndWatch(store *MemoryStore, cb func(ReadTx) error, specifiers ...api.Event) (watch chan events.Event, cancel func(), err error)

ViewAndWatch calls a callback which can observe the state of this MemoryStore. It also returns a channel that will return further events from this point so the snapshot can be kept up to date. The watch channel must be released with watch.StopWatch when it is no longer needed. The channel is guaranteed to get all events after the moment of the snapshot, and only those events.

func WatchFrom

func WatchFrom(store *MemoryStore, version *api.Version, specifiers ...api.Event) (chan events.Event, func(), error)

WatchFrom returns a channel that will return past events from starting from "version", and new events until the channel is closed. If "version" is nil, this function is equivalent to

state.Watch(store.WatchQueue(), specifiers...).

If the log has been compacted and it's not possible to produce the exact set of events leading from "version" to the current state, this function will return an error, and the caller should re-sync.

The watch channel must be released with watch.StopWatch when it is no longer needed.

Types

type Batch

type Batch struct {
	// contains filtered or unexported fields
}

Batch provides a mechanism to batch updates to a store.

func (*Batch) Update

func (batch *Batch) Update(cb func(Tx) error) error

Update adds a single change to a batch. Each call to Update is atomic, but different calls to Update may be spread across multiple transactions to circumvent transaction size limits.

type By

type By interface {
	// contains filtered or unexported methods
}

By is an interface type passed to Find methods. Implementations must be defined in this package.

func ByCustom

func ByCustom(objType, index, value string) By

ByCustom creates an object to pass to Find to search a custom index.

func ByCustomPrefix

func ByCustomPrefix(objType, index, value string) By

ByCustomPrefix creates an object to pass to Find to search a custom index by a value prefix.

func ByDesiredState

func ByDesiredState(state api.TaskState) By

ByDesiredState creates an object to pass to Find to select by desired state.

func ByIDPrefix

func ByIDPrefix(idPrefix string) By

ByIDPrefix creates an object to pass to Find to select by query.

func ByKind

func ByKind(kind string) By

ByKind creates an object to pass to Find to search for a Resource of a particular kind.

func ByMembership

func ByMembership(membership api.NodeSpec_Membership) By

ByMembership creates an object to pass to Find to select by Membership.

func ByName

func ByName(name string) By

ByName creates an object to pass to Find to select by name.

func ByNamePrefix

func ByNamePrefix(namePrefix string) By

ByNamePrefix creates an object to pass to Find to select by query.

func ByNodeID

func ByNodeID(nodeID string) By

ByNodeID creates an object to pass to Find to select by node.

func ByReferencedConfigID

func ByReferencedConfigID(configID string) By

ByReferencedConfigID creates an object to pass to Find to search for a service or task that references a config with the given ID.

func ByReferencedNetworkID

func ByReferencedNetworkID(networkID string) By

ByReferencedNetworkID creates an object to pass to Find to search for a service or task that references a network with the given ID.

func ByReferencedSecretID

func ByReferencedSecretID(secretID string) By

ByReferencedSecretID creates an object to pass to Find to search for a service or task that references a secret with the given ID.

func ByRole

func ByRole(role api.NodeRole) By

ByRole creates an object to pass to Find to select by role.

func ByRuntime

func ByRuntime(runtime string) By

ByRuntime creates an object to pass to Find to select by runtime.

func ByServiceID

func ByServiceID(serviceID string) By

ByServiceID creates an object to pass to Find to select by service.

func BySlot

func BySlot(serviceID string, slot uint64) By

BySlot creates an object to pass to Find to select by slot.

func ByTaskState

func ByTaskState(state api.TaskState) By

ByTaskState creates an object to pass to Find to select by task state.

func Or

func Or(bys ...By) By

Or returns a combinator that applies OR logic on all the supplied By arguments.

type MemoryStore

type MemoryStore struct {
	// contains filtered or unexported fields
}

MemoryStore is a concurrency-safe, in-memory implementation of the Store interface.

func NewMemoryStore

func NewMemoryStore(proposer state.Proposer) *MemoryStore

NewMemoryStore returns an in-memory store. The argument is an optional Proposer which will be used to propagate changes to other members in a cluster.

func (*MemoryStore) ApplyStoreActions

func (s *MemoryStore) ApplyStoreActions(actions []api.StoreAction) error

ApplyStoreActions updates a store based on StoreAction messages.

func (*MemoryStore) Batch

func (s *MemoryStore) Batch(cb func(*Batch) error) error

Batch performs one or more transactions that allow reads and writes It invokes a callback that is passed a Batch object. The callback may call batch.Update for each change it wants to make as part of the batch. The changes in the batch may be split over multiple transactions if necessary to keep transactions below the size limit. Batch holds a lock over the state, but will yield this lock every it creates a new transaction to allow other writers to proceed. Thus, unrelated changes to the state may occur between calls to batch.Update.

This method allows the caller to iterate over a data set and apply changes in sequence without holding the store write lock for an excessive time, or producing a transaction that exceeds the maximum size.

If Batch returns an error, no guarantees are made about how many updates were committed successfully.

func (*MemoryStore) Close

func (s *MemoryStore) Close() error

Close closes the memory store and frees its associated resources.

func (*MemoryStore) Restore

func (s *MemoryStore) Restore(snapshot *pb.StoreSnapshot) error

Restore sets the contents of the store to the serialized data in the argument.

func (*MemoryStore) Save

func (s *MemoryStore) Save(tx ReadTx) (*pb.StoreSnapshot, error)

Save serializes the data in the store.

func (*MemoryStore) Update

func (s *MemoryStore) Update(cb func(Tx) error) error

Update executes a read/write transaction.

func (*MemoryStore) View

func (s *MemoryStore) View(cb func(ReadTx))

View executes a read transaction.

func (*MemoryStore) WatchQueue

func (s *MemoryStore) WatchQueue() *watch.Queue

WatchQueue returns the publish/subscribe queue.

func (*MemoryStore) Wedged

func (s *MemoryStore) Wedged() bool

Wedged returns true if the store lock has been held for a long time, possibly indicating a deadlock.

type ObjectStoreConfig

type ObjectStoreConfig struct {
	Table            *memdb.TableSchema
	Save             func(ReadTx, *api.StoreSnapshot) error
	Restore          func(Tx, *api.StoreSnapshot) error
	ApplyStoreAction func(Tx, api.StoreAction) error
}

ObjectStoreConfig provides the necessary methods to store a particular object type inside MemoryStore.

type ReadTx

type ReadTx interface {
	// contains filtered or unexported methods
}

ReadTx is a read transaction. Note that transaction does not imply any internal batching. It only means that the transaction presents a consistent view of the data that cannot be affected by other transactions.

type Tx

type Tx interface {
	ReadTx
	// contains filtered or unexported methods
}

Tx is a read/write transaction. Note that transaction does not imply any internal batching. The purpose of this transaction is to give the user a guarantee that its changes won't be visible to other transactions until the transaction is over.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL