cache

package
v0.59.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 11, 2023 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ObjectSafeParams

func ObjectSafeParams(name, namespace string) svc1log.Param

ObjectSafeParams returns safe logging params for a name and a namespace

Types

type AsyncClientMetrics

type AsyncClientMetrics struct {
	ObjectTypeTag string
}

AsyncClientMetrics emits metrics on retries and failures of the internal async client calls to the api server TODO: Move this to the metrics package after a refactor of that package to avoid cyclical imports

func (*AsyncClientMetrics) MarkFailedToEnqueue

func (acm *AsyncClientMetrics) MarkFailedToEnqueue(ctx context.Context, requestType store.RequestType)

MarkFailedToEnqueue marks that a request is not going to be retried because the inflight requests queue was full

func (*AsyncClientMetrics) MarkMaxRetries

func (acm *AsyncClientMetrics) MarkMaxRetries(ctx context.Context, requestType store.RequestType)

MarkMaxRetries marks that a request to the api server failed and is not going to be retried because it reached the maximum number of retries

func (*AsyncClientMetrics) MarkRequest

func (acm *AsyncClientMetrics) MarkRequest(ctx context.Context, requestType store.RequestType)

MarkRequest marks that a request to the api server is being made

func (*AsyncClientMetrics) MarkRequestRetry

func (acm *AsyncClientMetrics) MarkRequestRetry(ctx context.Context, requestType store.RequestType)

MarkRequestRetry marks that a request to the api server failed and is being retried

type Client

type Client interface {
	Create(context.Context, metav1.Object) (metav1.Object, error)
	Update(context.Context, metav1.Object) (metav1.Object, error)
	Delete(ctx context.Context, namespace, name string) error
	Get(ctx context.Context, namespace, name string) (metav1.Object, error)
}

Client is a generic representation of a kube client that acts on metav1.Object, asyncClient can be used for multiple k8s resources.

type DemandCache

type DemandCache struct {
	// contains filtered or unexported fields
}

DemandCache is a cache for demands. It assumes it is the only client that creates demands. Externally created demands will not be included in the cache. Deletions from all clients are valid and are reflected in the cache.

func NewDemandCache

func NewDemandCache(
	ctx context.Context,
	demandInformer demandinformers.DemandInformer,
	demandKubeClient demandclient.ScalerV1alpha2Interface,
	asyncClientConfig config.AsyncClientConfig,
) (*DemandCache, error)

NewDemandCache creates a new cache

func (*DemandCache) Create

func (dc *DemandCache) Create(rr *demandapi.Demand) error

Create enqueues a creation request and puts the object into the store

func (*DemandCache) Delete

func (dc *DemandCache) Delete(namespace, name string)

Delete enqueues a deletion request and removes the object from store

func (*DemandCache) Get

func (dc *DemandCache) Get(namespace, name string) (*demandapi.Demand, bool)

Get returns the object from the store if it exists

func (*DemandCache) Run

func (dc *DemandCache) Run(ctx context.Context)

Run starts the async clients of this cache

type ResourceReservationCache

type ResourceReservationCache struct {
	// contains filtered or unexported fields
}

ResourceReservationCache is a cache for resource reservations. It assumes it is the only client that issues write requests for resource reservations. Any external update and creation will be ignored, but deletions will be reflected in the cache.

func NewResourceReservationCache

func NewResourceReservationCache(
	ctx context.Context,
	resourceReservationInformer rrinformers.ResourceReservationInformer,
	resourceReservationKubeClient sparkschedulerclient.SparkschedulerV1beta2Interface,
	asyncClientConfig config.AsyncClientConfig,
) (*ResourceReservationCache, error)

NewResourceReservationCache creates a new cache.

func (*ResourceReservationCache) Create

Create enqueues a creation request and puts the object into the store

func (*ResourceReservationCache) Delete

func (rrc *ResourceReservationCache) Delete(namespace, name string)

Delete enqueues a deletion request and removes the object from store

func (*ResourceReservationCache) Get

func (rrc *ResourceReservationCache) Get(namespace, name string) (*v1beta2.ResourceReservation, bool)

Get returns the object from the store if it exists

func (*ResourceReservationCache) InflightQueueLengths

func (rrc *ResourceReservationCache) InflightQueueLengths() []int

InflightQueueLengths returns the number of items per request queue

func (*ResourceReservationCache) List

List returns all known objects in the store

func (*ResourceReservationCache) Run

Run starts the async clients of this cache

func (*ResourceReservationCache) Update

Update enqueues an update request and updates the object in store

type SafeDemandCache

type SafeDemandCache struct {
	*DemandCache
	// contains filtered or unexported fields
}

SafeDemandCache wraps a demand cache by checking if the demand CRD exists before each operation

func NewSafeDemandCache

func NewSafeDemandCache(
	lazyDemandInformer *crd.LazyDemandInformer,
	demandKubeClient demandclient.ScalerV1alpha2Interface,
	asyncClientConfig config.AsyncClientConfig,
) *SafeDemandCache

NewSafeDemandCache returns a demand cache which fallbacks to no-op if demand CRD doesn't exist

func (*SafeDemandCache) CRDExists

func (sdc *SafeDemandCache) CRDExists() bool

CRDExists checks if the demand crd exists

func (*SafeDemandCache) CacheSize

func (sdc *SafeDemandCache) CacheSize() int

CacheSize returns the number of elements in the cache

func (*SafeDemandCache) Create

func (sdc *SafeDemandCache) Create(rr *demandapi.Demand) error

Create enqueues a creation request and puts the object into the store

func (*SafeDemandCache) Delete

func (sdc *SafeDemandCache) Delete(namespace, name string)

Delete enqueues a deletion request and removes the object from store

func (*SafeDemandCache) Get

func (sdc *SafeDemandCache) Get(namespace, name string) (*demandapi.Demand, bool)

Get returns the object from the store if it exists

func (*SafeDemandCache) InflightQueueLengths

func (sdc *SafeDemandCache) InflightQueueLengths() []int

InflightQueueLengths returns the number of items per request queue

func (*SafeDemandCache) Run

func (sdc *SafeDemandCache) Run(ctx context.Context)

Run starts the goroutine to check for the existence of the demand CRD

type SoftReservation

type SoftReservation struct {
	// Executor pod name -> Reservation (only valid ones here)
	Reservations map[string]v1beta2.Reservation

	// Executor pod name -> Reservation valid or not
	// The reason for this is that we want to keep a history of previously allocated extra executors that we should not create a
	// Reservation for if we already have in the past even if the executor is now dead. This prevents the scenario where we have a race between
	// the executor death event handling and the executor's scheduling event.
	Status map[string]bool
}

SoftReservation is an in-memory reservation for a particular spark application that keeps track of extra executors allocated over the min reservation count

type SoftReservationStore

type SoftReservationStore struct {
	// contains filtered or unexported fields
}

SoftReservationStore is an in-memory store that keeps track of soft reservations granted to extra executors for applications that support dynamic allocation

func NewSoftReservationStore

func NewSoftReservationStore(ctx context.Context, informer coreinformers.PodInformer) *SoftReservationStore

NewSoftReservationStore builds and returns a SoftReservationStore and instantiates the needed background informer event handlers to keep the store up to date.

func (*SoftReservationStore) AddReservationForPod

func (s *SoftReservationStore) AddReservationForPod(ctx context.Context, appID string, podName string, reservation v1beta2.Reservation) error

AddReservationForPod adds a reservation for an extra executor pod, attaching the associated node and resources to it. This is a noop if the reservation already exists.

func (*SoftReservationStore) CreateSoftReservationIfNotExists

func (s *SoftReservationStore) CreateSoftReservationIfNotExists(appID string)

CreateSoftReservationIfNotExists creates an internal empty soft reservation for a particular application. This is a noop if the reservation already exists.

func (*SoftReservationStore) ExecutorHasSoftReservation

func (s *SoftReservationStore) ExecutorHasSoftReservation(ctx context.Context, executor *v1.Pod) bool

ExecutorHasSoftReservation returns true when the passed executor pod currently has a SoftReservation, false otherwise.

func (*SoftReservationStore) GetActiveExtraExecutorCount

func (s *SoftReservationStore) GetActiveExtraExecutorCount() int

GetActiveExtraExecutorCount returns the total number of extra executors that are currently allocated and have SoftReservations in the SoftReservationStore (excluding the ones that are already marked as dead by the store)

func (*SoftReservationStore) GetAllSoftReservationsCopy

func (s *SoftReservationStore) GetAllSoftReservationsCopy() map[string]*SoftReservation

GetAllSoftReservationsCopy returns a copy of the internal store. As this indicates, this method does a deep copy which is slow and should only be used for purposes where this is acceptable such as tests.

func (*SoftReservationStore) GetApplicationCount

func (s *SoftReservationStore) GetApplicationCount() int

GetApplicationCount returns the distinct number of applications that are tracked in the SoftReservationStore (whether they currently have extra executors or not)

func (*SoftReservationStore) GetExecutorSoftReservation

func (s *SoftReservationStore) GetExecutorSoftReservation(ctx context.Context, executor *v1.Pod) (*v1beta2.Reservation, bool)

GetExecutorSoftReservation returns the Reservation object associated with this executor if it exists.

func (*SoftReservationStore) GetSoftReservation

func (s *SoftReservationStore) GetSoftReservation(appID string) (*SoftReservation, bool)

GetSoftReservation returns a copy of the SoftReservation tied to an application if it exists (otherwise, bool returned will be false).

func (*SoftReservationStore) RemoveExecutorReservation

func (s *SoftReservationStore) RemoveExecutorReservation(appID string, executorName string)

RemoveExecutorReservation deletes the soft reservation bound to the passed executor if it exists.

func (*SoftReservationStore) UsedSoftReservationResources

func (s *SoftReservationStore) UsedSoftReservationResources() resources.NodeGroupResources

UsedSoftReservationResources returns SoftReservation usage by node.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL