Documentation ¶
Index ¶
- func ObjectSafeParams(name, namespace string) svc1log.Param
- type AsyncClientMetrics
- func (acm *AsyncClientMetrics) MarkFailedToEnqueue(ctx context.Context, requestType store.RequestType)
- func (acm *AsyncClientMetrics) MarkMaxRetries(ctx context.Context, requestType store.RequestType)
- func (acm *AsyncClientMetrics) MarkRequest(ctx context.Context, requestType store.RequestType)
- func (acm *AsyncClientMetrics) MarkRequestRetry(ctx context.Context, requestType store.RequestType)
- type Client
- type DemandCache
- type ResourceReservationCache
- func (rrc *ResourceReservationCache) Create(rr *v1beta2.ResourceReservation) error
- func (rrc *ResourceReservationCache) Delete(namespace, name string)
- func (rrc *ResourceReservationCache) Get(namespace, name string) (*v1beta2.ResourceReservation, bool)
- func (rrc *ResourceReservationCache) InflightQueueLengths() []int
- func (rrc *ResourceReservationCache) List() []*v1beta2.ResourceReservation
- func (rrc *ResourceReservationCache) Run(ctx context.Context)
- func (rrc *ResourceReservationCache) Update(rr *v1beta2.ResourceReservation) error
- type SafeDemandCache
- func (sdc *SafeDemandCache) CRDExists() bool
- func (sdc *SafeDemandCache) CacheSize() int
- func (sdc *SafeDemandCache) Create(rr *demandapi.Demand) error
- func (sdc *SafeDemandCache) Delete(namespace, name string)
- func (sdc *SafeDemandCache) Get(namespace, name string) (*demandapi.Demand, bool)
- func (sdc *SafeDemandCache) InflightQueueLengths() []int
- func (sdc *SafeDemandCache) Run(ctx context.Context)
- type SoftReservation
- type SoftReservationStore
- func (s *SoftReservationStore) AddReservationForPod(ctx context.Context, appID string, podName string, ...) error
- func (s *SoftReservationStore) CreateSoftReservationIfNotExists(appID string)
- func (s *SoftReservationStore) ExecutorHasSoftReservation(ctx context.Context, executor *v1.Pod) bool
- func (s *SoftReservationStore) GetActiveExtraExecutorCount() int
- func (s *SoftReservationStore) GetAllSoftReservationsCopy() map[string]*SoftReservation
- func (s *SoftReservationStore) GetApplicationCount() int
- func (s *SoftReservationStore) GetExecutorSoftReservation(ctx context.Context, executor *v1.Pod) (*v1beta2.Reservation, bool)
- func (s *SoftReservationStore) GetSoftReservation(appID string) (*SoftReservation, bool)
- func (s *SoftReservationStore) RemoveExecutorReservation(appID string, executorName string)
- func (s *SoftReservationStore) UsedSoftReservationResources() resources.NodeGroupResources
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ObjectSafeParams ¶
ObjectSafeParams returns safe logging params for a name and a namespace
Types ¶
type AsyncClientMetrics ¶
type AsyncClientMetrics struct {
ObjectTypeTag string
}
AsyncClientMetrics emits metrics on retries and failures of the internal async client calls to the api server TODO: Move this to the metrics package after a refactor of that package to avoid cyclical imports
func (*AsyncClientMetrics) MarkFailedToEnqueue ¶
func (acm *AsyncClientMetrics) MarkFailedToEnqueue(ctx context.Context, requestType store.RequestType)
MarkFailedToEnqueue marks that a request is not going to be retried because the inflight requests queue was full
func (*AsyncClientMetrics) MarkMaxRetries ¶
func (acm *AsyncClientMetrics) MarkMaxRetries(ctx context.Context, requestType store.RequestType)
MarkMaxRetries marks that a request to the api server failed and is not going to be retried because it reached the maximum number of retries
func (*AsyncClientMetrics) MarkRequest ¶
func (acm *AsyncClientMetrics) MarkRequest(ctx context.Context, requestType store.RequestType)
MarkRequest marks that a request to the api server is being made
func (*AsyncClientMetrics) MarkRequestRetry ¶
func (acm *AsyncClientMetrics) MarkRequestRetry(ctx context.Context, requestType store.RequestType)
MarkRequestRetry marks that a request to the api server failed and is being retried
type Client ¶
type Client interface { Create(context.Context, metav1.Object) (metav1.Object, error) Update(context.Context, metav1.Object) (metav1.Object, error) Delete(ctx context.Context, namespace, name string) error Get(ctx context.Context, namespace, name string) (metav1.Object, error) }
Client is a generic representation of a kube client that acts on metav1.Object, asyncClient can be used for multiple k8s resources.
type DemandCache ¶
type DemandCache struct {
// contains filtered or unexported fields
}
DemandCache is a cache for demands. It assumes it is the only client that creates demands. Externally created demands will not be included in the cache. Deletions from all clients are valid and are reflected in the cache.
func NewDemandCache ¶
func NewDemandCache( ctx context.Context, demandInformer demandinformers.DemandInformer, demandKubeClient demandclient.ScalerV1alpha2Interface, asyncClientConfig config.AsyncClientConfig, ) (*DemandCache, error)
NewDemandCache creates a new cache
func (*DemandCache) Create ¶
func (dc *DemandCache) Create(rr *demandapi.Demand) error
Create enqueues a creation request and puts the object into the store
func (*DemandCache) Delete ¶
func (dc *DemandCache) Delete(namespace, name string)
Delete enqueues a deletion request and removes the object from store
func (*DemandCache) Get ¶
func (dc *DemandCache) Get(namespace, name string) (*demandapi.Demand, bool)
Get returns the object from the store if it exists
func (*DemandCache) Run ¶
func (dc *DemandCache) Run(ctx context.Context)
Run starts the async clients of this cache
type ResourceReservationCache ¶
type ResourceReservationCache struct {
// contains filtered or unexported fields
}
ResourceReservationCache is a cache for resource reservations. It assumes it is the only client that issues write requests for resource reservations. Any external update and creation will be ignored, but deletions will be reflected in the cache.
func NewResourceReservationCache ¶
func NewResourceReservationCache( ctx context.Context, resourceReservationInformer rrinformers.ResourceReservationInformer, resourceReservationKubeClient sparkschedulerclient.SparkschedulerV1beta2Interface, asyncClientConfig config.AsyncClientConfig, ) (*ResourceReservationCache, error)
NewResourceReservationCache creates a new cache.
func (*ResourceReservationCache) Create ¶
func (rrc *ResourceReservationCache) Create(rr *v1beta2.ResourceReservation) error
Create enqueues a creation request and puts the object into the store
func (*ResourceReservationCache) Delete ¶
func (rrc *ResourceReservationCache) Delete(namespace, name string)
Delete enqueues a deletion request and removes the object from store
func (*ResourceReservationCache) Get ¶
func (rrc *ResourceReservationCache) Get(namespace, name string) (*v1beta2.ResourceReservation, bool)
Get returns the object from the store if it exists
func (*ResourceReservationCache) InflightQueueLengths ¶
func (rrc *ResourceReservationCache) InflightQueueLengths() []int
InflightQueueLengths returns the number of items per request queue
func (*ResourceReservationCache) List ¶
func (rrc *ResourceReservationCache) List() []*v1beta2.ResourceReservation
List returns all known objects in the store
func (*ResourceReservationCache) Run ¶
func (rrc *ResourceReservationCache) Run(ctx context.Context)
Run starts the async clients of this cache
func (*ResourceReservationCache) Update ¶
func (rrc *ResourceReservationCache) Update(rr *v1beta2.ResourceReservation) error
Update enqueues an update request and updates the object in store
type SafeDemandCache ¶
type SafeDemandCache struct { *DemandCache // contains filtered or unexported fields }
SafeDemandCache wraps a demand cache by checking if the demand CRD exists before each operation
func NewSafeDemandCache ¶
func NewSafeDemandCache( lazyDemandInformer *crd.LazyDemandInformer, demandKubeClient demandclient.ScalerV1alpha2Interface, asyncClientConfig config.AsyncClientConfig, ) *SafeDemandCache
NewSafeDemandCache returns a demand cache which fallbacks to no-op if demand CRD doesn't exist
func (*SafeDemandCache) CRDExists ¶
func (sdc *SafeDemandCache) CRDExists() bool
CRDExists checks if the demand crd exists
func (*SafeDemandCache) CacheSize ¶
func (sdc *SafeDemandCache) CacheSize() int
CacheSize returns the number of elements in the cache
func (*SafeDemandCache) Create ¶
func (sdc *SafeDemandCache) Create(rr *demandapi.Demand) error
Create enqueues a creation request and puts the object into the store
func (*SafeDemandCache) Delete ¶
func (sdc *SafeDemandCache) Delete(namespace, name string)
Delete enqueues a deletion request and removes the object from store
func (*SafeDemandCache) Get ¶
func (sdc *SafeDemandCache) Get(namespace, name string) (*demandapi.Demand, bool)
Get returns the object from the store if it exists
func (*SafeDemandCache) InflightQueueLengths ¶
func (sdc *SafeDemandCache) InflightQueueLengths() []int
InflightQueueLengths returns the number of items per request queue
func (*SafeDemandCache) Run ¶
func (sdc *SafeDemandCache) Run(ctx context.Context)
Run starts the goroutine to check for the existence of the demand CRD
type SoftReservation ¶
type SoftReservation struct { // Executor pod name -> Reservation (only valid ones here) Reservations map[string]v1beta2.Reservation // Executor pod name -> Reservation valid or not // The reason for this is that we want to keep a history of previously allocated extra executors that we should not create a // Reservation for if we already have in the past even if the executor is now dead. This prevents the scenario where we have a race between // the executor death event handling and the executor's scheduling event. Status map[string]bool }
SoftReservation is an in-memory reservation for a particular spark application that keeps track of extra executors allocated over the min reservation count
type SoftReservationStore ¶
type SoftReservationStore struct {
// contains filtered or unexported fields
}
SoftReservationStore is an in-memory store that keeps track of soft reservations granted to extra executors for applications that support dynamic allocation
func NewSoftReservationStore ¶
func NewSoftReservationStore(ctx context.Context, informer coreinformers.PodInformer) *SoftReservationStore
NewSoftReservationStore builds and returns a SoftReservationStore and instantiates the needed background informer event handlers to keep the store up to date.
func (*SoftReservationStore) AddReservationForPod ¶
func (s *SoftReservationStore) AddReservationForPod(ctx context.Context, appID string, podName string, reservation v1beta2.Reservation) error
AddReservationForPod adds a reservation for an extra executor pod, attaching the associated node and resources to it. This is a noop if the reservation already exists.
func (*SoftReservationStore) CreateSoftReservationIfNotExists ¶
func (s *SoftReservationStore) CreateSoftReservationIfNotExists(appID string)
CreateSoftReservationIfNotExists creates an internal empty soft reservation for a particular application. This is a noop if the reservation already exists.
func (*SoftReservationStore) ExecutorHasSoftReservation ¶
func (s *SoftReservationStore) ExecutorHasSoftReservation(ctx context.Context, executor *v1.Pod) bool
ExecutorHasSoftReservation returns true when the passed executor pod currently has a SoftReservation, false otherwise.
func (*SoftReservationStore) GetActiveExtraExecutorCount ¶
func (s *SoftReservationStore) GetActiveExtraExecutorCount() int
GetActiveExtraExecutorCount returns the total number of extra executors that are currently allocated and have SoftReservations in the SoftReservationStore (excluding the ones that are already marked as dead by the store)
func (*SoftReservationStore) GetAllSoftReservationsCopy ¶
func (s *SoftReservationStore) GetAllSoftReservationsCopy() map[string]*SoftReservation
GetAllSoftReservationsCopy returns a copy of the internal store. As this indicates, this method does a deep copy which is slow and should only be used for purposes where this is acceptable such as tests.
func (*SoftReservationStore) GetApplicationCount ¶
func (s *SoftReservationStore) GetApplicationCount() int
GetApplicationCount returns the distinct number of applications that are tracked in the SoftReservationStore (whether they currently have extra executors or not)
func (*SoftReservationStore) GetExecutorSoftReservation ¶
func (s *SoftReservationStore) GetExecutorSoftReservation(ctx context.Context, executor *v1.Pod) (*v1beta2.Reservation, bool)
GetExecutorSoftReservation returns the Reservation object associated with this executor if it exists.
func (*SoftReservationStore) GetSoftReservation ¶
func (s *SoftReservationStore) GetSoftReservation(appID string) (*SoftReservation, bool)
GetSoftReservation returns a copy of the SoftReservation tied to an application if it exists (otherwise, bool returned will be false).
func (*SoftReservationStore) RemoveExecutorReservation ¶
func (s *SoftReservationStore) RemoveExecutorReservation(appID string, executorName string)
RemoveExecutorReservation deletes the soft reservation bound to the passed executor if it exists.
func (*SoftReservationStore) UsedSoftReservationResources ¶
func (s *SoftReservationStore) UsedSoftReservationResources() resources.NodeGroupResources
UsedSoftReservationResources returns SoftReservation usage by node.