Documentation ¶
Index ¶
- Constants
- Variables
- func IsNotExist(err error) bool
- func NewFake() *fakeStore
- type CASError
- type ConsulStore
- func (s *ConsulStore) AddDesiredReplicas(id fields.ID, n int) error
- func (s *ConsulStore) CASDesiredReplicas(id fields.ID, expected int, n int) error
- func (s *ConsulStore) Create(manifest manifest.Manifest, nodeSelector klabels.Selector, ...) (fields.RC, error)
- func (s *ConsulStore) CreateTxn(ctx context.Context, manifest manifest.Manifest, nodeSelector klabels.Selector, ...) (fields.RC, error)
- func (s *ConsulStore) Delete(id fields.ID, force bool) error
- func (s *ConsulStore) DeleteTxn(ctx context.Context, id fields.ID, force bool) error
- func (s *ConsulStore) Disable(id fields.ID) error
- func (s *ConsulStore) DisableTxn(ctx context.Context, id fields.ID) error
- func (s *ConsulStore) Enable(id fields.ID) error
- func (s *ConsulStore) EnableTxn(ctx context.Context, id fields.ID) error
- func (s *ConsulStore) FindWhereLabeled(podID types.PodID, availabilityZone pc_fields.AvailabilityZone, ...) ([]fields.RC, error)
- func (s *ConsulStore) Get(id fields.ID) (fields.RC, error)
- func (s *ConsulStore) List() ([]fields.RC, error)
- func (s *ConsulStore) LockForMutation(rcID fields.ID, session consul.Session) (consul.Unlocker, error)
- func (s *ConsulStore) LockForMutationTxn(lockCtx context.Context, rcID fields.ID, session consul.Session) (consul.TxnUnlocker, error)
- func (s *ConsulStore) LockForNodeTransfer(rcID fields.ID, session consul.Session) (consul.Unlocker, error)
- func (s *ConsulStore) LockForOwnership(rcID fields.ID, session consul.Session) (consul.Unlocker, error)
- func (s *ConsulStore) LockForUpdateCreation(rcID fields.ID, session consul.Session) (consul.Unlocker, error)
- func (s *ConsulStore) MutateRC(id fields.ID, mutator func(fields.RC) (fields.RC, error)) error
- func (s *ConsulStore) SetDesiredReplicas(id fields.ID, n int) error
- func (s *ConsulStore) TransferReplicaCounts(ctx context.Context, req TransferReplicaCountsRequest) error
- func (s *ConsulStore) UpdateCreationLockPath(rcID fields.ID) (string, error)
- func (s *ConsulStore) UpdateManifest(id fields.ID, man manifest.Manifest) error
- func (s *ConsulStore) UpdateStrategy(id fields.ID, strategy fields.Strategy) error
- func (s *ConsulStore) Watch(rcID fields.ID, quit <-chan struct{}) (<-chan fields.RC, <-chan error)
- func (s *ConsulStore) WatchNew(quit <-chan struct{}) (<-chan []fields.RC, <-chan error)
- func (s *ConsulStore) WatchRCKeysWithLockInfo(quit <-chan struct{}, pauseTime time.Duration) (<-chan []RCLockResult, <-chan error)
- type LockType
- type RCLabeler
- type RCLockResult
- type TransferReplicaCountsRequest
Constants ¶
const ( // This label is applied to an RC, to identify the ID of its pod manifest. PodIDLabel = types.PodIDLabel )
Variables ¶
var NoReplicationController error = errors.New("No replication controller found")
Functions ¶
func IsNotExist ¶
Types ¶
type ConsulStore ¶
type ConsulStore struct {
// contains filtered or unexported fields
}
func NewConsul ¶
func NewConsul(client consulutil.ConsulClient, labeler RCLabeler, retries int) *ConsulStore
func (*ConsulStore) AddDesiredReplicas ¶
func (s *ConsulStore) AddDesiredReplicas(id fields.ID, n int) error
AddDesiredReplicas increments the replica count for the specified RC by n.
func (*ConsulStore) CASDesiredReplicas ¶
CASDesiredReplicas first checks that the desired replica count for the given RC is the given integer (returning an error if it is not), and if it is, sets it to the given integer.
func (*ConsulStore) Create ¶
func (s *ConsulStore) Create( manifest manifest.Manifest, nodeSelector klabels.Selector, availabilityZone pc_fields.AvailabilityZone, clusterName pc_fields.ClusterName, podLabels klabels.Set, additionalLabels klabels.Set, allocationStrategy fields.Strategy, ) (fields.RC, error)
Create creates a replication controller with the specified manifest and selectors. The node selector is used to determine what nodes the replication controller may schedule on. The pod label set is applied to every pod the replication controller schedules. The additionalLabels label set is applied to the RCs own labels
func (*ConsulStore) CreateTxn ¶
func (s *ConsulStore) CreateTxn( ctx context.Context, manifest manifest.Manifest, nodeSelector klabels.Selector, availabilityZone pc_fields.AvailabilityZone, clusterName pc_fields.ClusterName, podLabels klabels.Set, additionalLabels klabels.Set, allocationStrategy fields.Strategy, ) (fields.RC, error)
TODO: replace Create() with this
func (*ConsulStore) Delete ¶
func (s *ConsulStore) Delete(id fields.ID, force bool) error
Delete removes the RC with the given ID the targeted RC, returning an error if it does not exist. Normally an RC can only be deleted if its desired replica count is zero; pass force=true to override this check.
func (*ConsulStore) DeleteTxn ¶
DeleteTxn adds a deletion operation to the passed context rather than immediately deleting ig
func (*ConsulStore) Disable ¶
func (s *ConsulStore) Disable(id fields.ID) error
Disable sets the disabled flag on the given replication controller, instructing any running farm instances to cease handling it.
func (*ConsulStore) DisableTxn ¶
DisableTxn adds the KV operations required to disable the RC to ctx.
func (*ConsulStore) Enable ¶
func (s *ConsulStore) Enable(id fields.ID) error
Enable unsets the disabled flag for the given RC, instructing any running farms to begin handling it.
func (*ConsulStore) FindWhereLabeled ¶
func (s *ConsulStore) FindWhereLabeled(podID types.PodID, availabilityZone pc_fields.AvailabilityZone, clusterName pc_fields.ClusterName) ([]fields.RC, error)
FindWhereLabeled will return all RCs that match the arguments
func (*ConsulStore) Get ¶
Get retrieves a replication controller by its ID. Returns NoReplicationController if the fetch operation succeeds but no replication controller with that ID is found.
func (*ConsulStore) List ¶
func (s *ConsulStore) List() ([]fields.RC, error)
List retrieves all of the replication controllers from the consul store.
func (*ConsulStore) LockForMutation ¶
func (s *ConsulStore) LockForMutation(rcID fields.ID, session consul.Session) (consul.Unlocker, error)
LockForMutation acquires a lock on the RC with the intent of mutating it. Must be held by goroutines in the rolling update farm as well as any other tool that may mutate an RC
func (*ConsulStore) LockForMutationTxn ¶
func (s *ConsulStore) LockForMutationTxn( lockCtx context.Context, rcID fields.ID, session consul.Session, ) (consul.TxnUnlocker, error)
LockForMutationTxn doesn't immediately lock the RC for mutation but instead adds the necessary kv operations to the lockCtx's transaction to do so. It also adds the operations required to unlock the RC to unlockCtx.
func (*ConsulStore) LockForNodeTransfer ¶
func (s *ConsulStore) LockForNodeTransfer(rcID fields.ID, session consul.Session) (consul.Unlocker, error)
LockForNodeTransfer attempts to acquire a lock that must be held before a node transfer is initiated by an RC. This is useful for external systems that wish to temporarily prevent an RC from performing a node transfer which can be accomplished by preemptively acquiring the lock
func (*ConsulStore) LockForOwnership ¶
func (s *ConsulStore) LockForOwnership(rcID fields.ID, session consul.Session) (consul.Unlocker, error)
LockForOwnership acquires a lock on the RC that should be used by RC farm goroutines, whose job it is to carry out the intent of the RC
func (*ConsulStore) LockForUpdateCreation ¶
func (s *ConsulStore) LockForUpdateCreation(rcID fields.ID, session consul.Session) (consul.Unlocker, error)
LockForUpdateCreation acquires a lock on the RC for ensuring that no two rolling updates are created that operate on the same replication controllers. A lock on both the intended "new" and "old" replication controllers should be held before the update is created.
func (*ConsulStore) MutateRC ¶
performs a safe (ie check-and-set) mutation of the rc with the given id, using the given function if the mutator returns an error, it will be propagated out if the returned RC has id="", then it will be deleted
func (*ConsulStore) SetDesiredReplicas ¶
func (s *ConsulStore) SetDesiredReplicas(id fields.ID, n int) error
SetDesiredReplicas updates the replica count for the RC with the given ID.
func (*ConsulStore) TransferReplicaCounts ¶
func (s *ConsulStore) TransferReplicaCounts(ctx context.Context, req TransferReplicaCountsRequest) error
TransferReplicaCounts supports transactionally updating the replica counts of two RCs in consul. This is useful for rolling updates to transition nodes from the old RC to the new one without risking the consul database dying between updates and violating replica count invariants
func (*ConsulStore) UpdateCreationLockPath ¶
func (s *ConsulStore) UpdateCreationLockPath(rcID fields.ID) (string, error)
UpdateCreationLockPath computes the consul key that should be locked by callers creating an RU that will operate on this RC. This function is exported so these callers can perform operations on the lock during a consul transaction
func (*ConsulStore) UpdateManifest ¶
UpdateManifest will set the manifest on the RC at the given ID. Be careful with this function!
func (*ConsulStore) UpdateStrategy ¶
func (*ConsulStore) Watch ¶
Watch watches for any changes to the replication controller with ID `rcID`. This returns two output channels. The RC's most recent value is sent on the first output channel whenever a change has occurred or the watch times out. Errors are sent on the second output channel. Send a value on `quitChannel` to stop watching. The two output channels will be closed in response.
func (*ConsulStore) WatchNew ¶
func (s *ConsulStore) WatchNew(quit <-chan struct{}) (<-chan []fields.RC, <-chan error)
Watches the consul store for changes to the RC tree and attempts to return the full RC list on each update.
Because processing the full list of RCs may take a large amount of time, particularly when there are 100s of RCs, WatchNew() takes care to drop writes to the output channel if they're being consumed too slowly. It will block writing a value to the output channel until 1) it is read or 2) a new value comes in, in which case that value will be written instead
func (*ConsulStore) WatchRCKeysWithLockInfo ¶
func (s *ConsulStore) WatchRCKeysWithLockInfo(quit <-chan struct{}, pauseTime time.Duration) (<-chan []RCLockResult, <-chan error)
WatchRCKeysWithLockInfo executes periodic consul watch queries for replication controller keys and passes the results on a channel along with information about which keys have locks on them. Only keys are returned as a bandwidth performance optimization. To observe changes to the fields of a replication controller, individual watches should be used. Lock info is returned so that it can be used to not try to lock RCs that are already locked. In other words, it's a QPS optimization over the naive solution of always attempting to lock every RC and relying on the consul server to decide whether the lock is owned.
Since lock information is retrieved once per update to the RC list, it's possible that lock information will be out of date as the list is processed. However, a subsequent update will get the correct view of the world so the behavior should be correct
type RCLabeler ¶
type RCLabeler interface { SetLabels(labelType labels.Type, id string, labels map[string]string) error RemoveAllLabels(labelType labels.Type, id string) error SetLabelsTxn(ctx context.Context, labelType labels.Type, id string, labels map[string]string) error GetMatches(klabels.Selector, labels.Type) ([]labels.Labeled, error) RemoveAllLabelsTxn(ctx context.Context, labelType labels.Type, id string) error }
type RCLockResult ¶
type RCLockResult struct { ID fields.ID LockedForOwnership bool LockedForUpdateCreation bool LockedForMutation bool }
Wraps an RC ID with lock information This structure is useful for low-bandwidth detection of new RCs, for use in the RC farm.
type TransferReplicaCountsRequest ¶
type TransferReplicaCountsRequest struct { // ToRCID is the RC ID of the replication controller which will have its // replica count increased ToRCID fields.ID // FromRCID is the RC ID of the replication controller which will have its // replica count decreased FromRCID fields.ID // ReplicasToAdd is the number of replicas to add to the RC indicated by // ToRCID ReplicasToAdd *int // ReplicasToRemove is the number of replicas to remove from the RC indicated // by FromRCID ReplicasToRemove *int // StartingToReplicas is the number of replicas the caller expects the // RC specified by ToRCID to have prior to the transaction being // applied. This is useful to guarantee that assumptions that went into // the calculation of ReplicasToAdd and ReplicasToRemove haven't // changed. StartingToReplicas *int // StartingFromReplicas is the number of replicas the caller expects // the RC specified by FromRCID to have prior to the transaction being // applied. This is useful to guarantee that assumptions that went into // the calculation of ReplicasToAdd and ReplicasToRemove haven't // changed. StartingFromReplicas *int }