rcstore

package
v0.0.0-...-8223eb1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 14, 2020 License: Apache-2.0 Imports: 21 Imported by: 7

Documentation

Index

Constants

View Source
const (
	// This label is applied to an RC, to identify the ID of its pod manifest.
	PodIDLabel = types.PodIDLabel
)

Variables

View Source
var NoReplicationController error = errors.New("No replication controller found")

Functions

func IsNotExist

func IsNotExist(err error) bool

func NewFake

func NewFake() *fakeStore

Types

type CASError

type CASError string

TODO: combine with similar CASError type in pkg/labels

func (CASError) Error

func (e CASError) Error() string

type ConsulStore

type ConsulStore struct {
	// contains filtered or unexported fields
}

func NewConsul

func NewConsul(client consulutil.ConsulClient, labeler RCLabeler, retries int) *ConsulStore

func (*ConsulStore) AddDesiredReplicas

func (s *ConsulStore) AddDesiredReplicas(id fields.ID, n int) error

AddDesiredReplicas increments the replica count for the specified RC by n.

func (*ConsulStore) CASDesiredReplicas

func (s *ConsulStore) CASDesiredReplicas(id fields.ID, expected int, n int) error

CASDesiredReplicas first checks that the desired replica count for the given RC is the given integer (returning an error if it is not), and if it is, sets it to the given integer.

func (*ConsulStore) Create

func (s *ConsulStore) Create(
	manifest manifest.Manifest,
	nodeSelector klabels.Selector,
	availabilityZone pc_fields.AvailabilityZone,
	clusterName pc_fields.ClusterName,
	podLabels klabels.Set,
	additionalLabels klabels.Set,
	allocationStrategy fields.Strategy,
) (fields.RC, error)

Create creates a replication controller with the specified manifest and selectors. The node selector is used to determine what nodes the replication controller may schedule on. The pod label set is applied to every pod the replication controller schedules. The additionalLabels label set is applied to the RCs own labels

func (*ConsulStore) CreateTxn

func (s *ConsulStore) CreateTxn(
	ctx context.Context,
	manifest manifest.Manifest,
	nodeSelector klabels.Selector,
	availabilityZone pc_fields.AvailabilityZone,
	clusterName pc_fields.ClusterName,
	podLabels klabels.Set,
	additionalLabels klabels.Set,
	allocationStrategy fields.Strategy,
) (fields.RC, error)

TODO: replace Create() with this

func (*ConsulStore) Delete

func (s *ConsulStore) Delete(id fields.ID, force bool) error

Delete removes the RC with the given ID the targeted RC, returning an error if it does not exist. Normally an RC can only be deleted if its desired replica count is zero; pass force=true to override this check.

func (*ConsulStore) DeleteTxn

func (s *ConsulStore) DeleteTxn(ctx context.Context, id fields.ID, force bool) error

DeleteTxn adds a deletion operation to the passed context rather than immediately deleting ig

func (*ConsulStore) Disable

func (s *ConsulStore) Disable(id fields.ID) error

Disable sets the disabled flag on the given replication controller, instructing any running farm instances to cease handling it.

func (*ConsulStore) DisableTxn

func (s *ConsulStore) DisableTxn(ctx context.Context, id fields.ID) error

DisableTxn adds the KV operations required to disable the RC to ctx.

func (*ConsulStore) Enable

func (s *ConsulStore) Enable(id fields.ID) error

Enable unsets the disabled flag for the given RC, instructing any running farms to begin handling it.

func (*ConsulStore) EnableTxn

func (s *ConsulStore) EnableTxn(ctx context.Context, id fields.ID) error

EnableTxn adds the KV operations required to enable the RC to ctx.

func (*ConsulStore) FindWhereLabeled

func (s *ConsulStore) FindWhereLabeled(podID types.PodID,
	availabilityZone pc_fields.AvailabilityZone,
	clusterName pc_fields.ClusterName) ([]fields.RC, error)

FindWhereLabeled will return all RCs that match the arguments

func (*ConsulStore) Get

func (s *ConsulStore) Get(id fields.ID) (fields.RC, error)

Get retrieves a replication controller by its ID. Returns NoReplicationController if the fetch operation succeeds but no replication controller with that ID is found.

func (*ConsulStore) List

func (s *ConsulStore) List() ([]fields.RC, error)

List retrieves all of the replication controllers from the consul store.

func (*ConsulStore) LockForMutation

func (s *ConsulStore) LockForMutation(rcID fields.ID, session consul.Session) (consul.Unlocker, error)

LockForMutation acquires a lock on the RC with the intent of mutating it. Must be held by goroutines in the rolling update farm as well as any other tool that may mutate an RC

func (*ConsulStore) LockForMutationTxn

func (s *ConsulStore) LockForMutationTxn(
	lockCtx context.Context,
	rcID fields.ID,
	session consul.Session,
) (consul.TxnUnlocker, error)

LockForMutationTxn doesn't immediately lock the RC for mutation but instead adds the necessary kv operations to the lockCtx's transaction to do so. It also adds the operations required to unlock the RC to unlockCtx.

func (*ConsulStore) LockForNodeTransfer

func (s *ConsulStore) LockForNodeTransfer(rcID fields.ID, session consul.Session) (consul.Unlocker, error)

LockForNodeTransfer attempts to acquire a lock that must be held before a node transfer is initiated by an RC. This is useful for external systems that wish to temporarily prevent an RC from performing a node transfer which can be accomplished by preemptively acquiring the lock

func (*ConsulStore) LockForOwnership

func (s *ConsulStore) LockForOwnership(rcID fields.ID, session consul.Session) (consul.Unlocker, error)

LockForOwnership acquires a lock on the RC that should be used by RC farm goroutines, whose job it is to carry out the intent of the RC

func (*ConsulStore) LockForUpdateCreation

func (s *ConsulStore) LockForUpdateCreation(rcID fields.ID, session consul.Session) (consul.Unlocker, error)

LockForUpdateCreation acquires a lock on the RC for ensuring that no two rolling updates are created that operate on the same replication controllers. A lock on both the intended "new" and "old" replication controllers should be held before the update is created.

func (*ConsulStore) MutateRC

func (s *ConsulStore) MutateRC(id fields.ID, mutator func(fields.RC) (fields.RC, error)) error

performs a safe (ie check-and-set) mutation of the rc with the given id, using the given function if the mutator returns an error, it will be propagated out if the returned RC has id="", then it will be deleted

func (*ConsulStore) SetDesiredReplicas

func (s *ConsulStore) SetDesiredReplicas(id fields.ID, n int) error

SetDesiredReplicas updates the replica count for the RC with the given ID.

func (*ConsulStore) TransferReplicaCounts

func (s *ConsulStore) TransferReplicaCounts(ctx context.Context, req TransferReplicaCountsRequest) error

TransferReplicaCounts supports transactionally updating the replica counts of two RCs in consul. This is useful for rolling updates to transition nodes from the old RC to the new one without risking the consul database dying between updates and violating replica count invariants

func (*ConsulStore) UpdateCreationLockPath

func (s *ConsulStore) UpdateCreationLockPath(rcID fields.ID) (string, error)

UpdateCreationLockPath computes the consul key that should be locked by callers creating an RU that will operate on this RC. This function is exported so these callers can perform operations on the lock during a consul transaction

func (*ConsulStore) UpdateManifest

func (s *ConsulStore) UpdateManifest(id fields.ID, man manifest.Manifest) error

UpdateManifest will set the manifest on the RC at the given ID. Be careful with this function!

func (*ConsulStore) UpdateStrategy

func (s *ConsulStore) UpdateStrategy(id fields.ID, strategy fields.Strategy) error

func (*ConsulStore) Watch

func (s *ConsulStore) Watch(rcID fields.ID, quit <-chan struct{}) (<-chan fields.RC, <-chan error)

Watch watches for any changes to the replication controller with ID `rcID`. This returns two output channels. The RC's most recent value is sent on the first output channel whenever a change has occurred or the watch times out. Errors are sent on the second output channel. Send a value on `quitChannel` to stop watching. The two output channels will be closed in response.

func (*ConsulStore) WatchNew

func (s *ConsulStore) WatchNew(quit <-chan struct{}) (<-chan []fields.RC, <-chan error)

Watches the consul store for changes to the RC tree and attempts to return the full RC list on each update.

Because processing the full list of RCs may take a large amount of time, particularly when there are 100s of RCs, WatchNew() takes care to drop writes to the output channel if they're being consumed too slowly. It will block writing a value to the output channel until 1) it is read or 2) a new value comes in, in which case that value will be written instead

func (*ConsulStore) WatchRCKeysWithLockInfo

func (s *ConsulStore) WatchRCKeysWithLockInfo(quit <-chan struct{}, pauseTime time.Duration) (<-chan []RCLockResult, <-chan error)

WatchRCKeysWithLockInfo executes periodic consul watch queries for replication controller keys and passes the results on a channel along with information about which keys have locks on them. Only keys are returned as a bandwidth performance optimization. To observe changes to the fields of a replication controller, individual watches should be used. Lock info is returned so that it can be used to not try to lock RCs that are already locked. In other words, it's a QPS optimization over the naive solution of always attempting to lock every RC and relying on the consul server to decide whether the lock is owned.

Since lock information is retrieved once per update to the RC list, it's possible that lock information will be out of date as the list is processed. However, a subsequent update will get the correct view of the world so the behavior should be correct

type LockType

type LockType int
const (
	OwnershipLockType LockType = iota
	MutationLockType
	UpdateCreationLockType
	UnknownLockType
	NodeTransferLockType
)

func (LockType) String

func (l LockType) String() string

type RCLabeler

type RCLabeler interface {
	SetLabels(labelType labels.Type, id string, labels map[string]string) error
	RemoveAllLabels(labelType labels.Type, id string) error
	SetLabelsTxn(ctx context.Context, labelType labels.Type, id string, labels map[string]string) error
	GetMatches(klabels.Selector, labels.Type) ([]labels.Labeled, error)
	RemoveAllLabelsTxn(ctx context.Context, labelType labels.Type, id string) error
}

type RCLockResult

type RCLockResult struct {
	ID                      fields.ID
	LockedForOwnership      bool
	LockedForUpdateCreation bool
	LockedForMutation       bool
}

Wraps an RC ID with lock information This structure is useful for low-bandwidth detection of new RCs, for use in the RC farm.

type TransferReplicaCountsRequest

type TransferReplicaCountsRequest struct {
	// ToRCID is the RC ID of the replication controller which will have its
	// replica count increased
	ToRCID fields.ID

	// FromRCID is the RC ID of the replication controller which will have its
	// replica count decreased
	FromRCID fields.ID

	// ReplicasToAdd is the number of replicas to add to the RC indicated by
	// ToRCID
	ReplicasToAdd *int

	// ReplicasToRemove is the number of replicas to remove from the RC indicated
	// by FromRCID
	ReplicasToRemove *int

	// StartingToReplicas is the number of replicas the caller expects the
	// RC specified by ToRCID to have prior to the transaction being
	// applied. This is useful to guarantee that assumptions that went into
	// the calculation of ReplicasToAdd and ReplicasToRemove haven't
	// changed.
	StartingToReplicas *int

	// StartingFromReplicas is the number of replicas the caller expects
	// the RC specified by FromRCID to have prior to the transaction being
	// applied. This is useful to guarantee that assumptions that went into
	// the calculation of ReplicasToAdd and ReplicasToRemove haven't
	// changed.
	StartingFromReplicas *int
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL