scheduler

package
v0.0.0-...-b36b742 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 21, 2024 License: AGPL-3.0 Imports: 32 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Beta9WorkerLabelKey         string = "run.beam.cloud/role"
	Beta9WorkerLabelValue       string = "worker"
	Beta9WorkerJobPrefix        string = "worker"
	Beta9WorkerLabelIDKey       string = "run.beam.cloud/worker-id"
	Beta9WorkerLabelPoolNameKey string = "run.beam.cloud/worker-pool-name"
	PrometheusPortKey           string = "prometheus.io/port"
	PrometheusScrapeKey         string = "prometheus.io/scrape"
)

Variables

View Source
var SchedulerConfig config = config{
	Version: "0.1.0",
}

Functions

func GenerateWorkerId

func GenerateWorkerId() string

func MonitorPoolSize

func MonitorPoolSize(wpc WorkerPoolController,
	workerPoolConfig *types.WorkerPoolConfig,
	workerRepo repository.WorkerRepository,
	providerRepo repository.ProviderRepository) error

func ParseCPU

func ParseCPU(cpu interface{}) (int64, error)

func ParseGPU

func ParseGPU(gpu interface{}) (uint, error)

func ParseGPUType

func ParseGPUType(gpu interface{}) (types.GPUType, error)

func ParseGpuCount

func ParseGpuCount(gpuCount interface{}) (int64, error)

func ParseMemory

func ParseMemory(memory interface{}) (int64, error)

Types

type ExternalWorkerPoolController

type ExternalWorkerPoolController struct {
	// contains filtered or unexported fields
}

func (*ExternalWorkerPoolController) AddWorker

func (wpc *ExternalWorkerPoolController) AddWorker(cpu int64, memory int64, gpuCount uint32) (*types.Worker, error)

func (*ExternalWorkerPoolController) AddWorkerToMachine

func (wpc *ExternalWorkerPoolController) AddWorkerToMachine(cpu int64, memory int64, gpuType string, gpuCount uint32, machineId string) (*types.Worker, error)

func (*ExternalWorkerPoolController) Context

func (*ExternalWorkerPoolController) FreeCapacity

func (wpc *ExternalWorkerPoolController) FreeCapacity() (*WorkerPoolCapacity, error)

func (*ExternalWorkerPoolController) IsPreemptable

func (wpc *ExternalWorkerPoolController) IsPreemptable() bool

func (*ExternalWorkerPoolController) Name

type LocalKubernetesWorkerPoolController

type LocalKubernetesWorkerPoolController struct {
	// contains filtered or unexported fields
}

A "local" k8s worker pool controller means the pool is local to the control plane / in-cluster

func (*LocalKubernetesWorkerPoolController) AddWorker

func (wpc *LocalKubernetesWorkerPoolController) AddWorker(cpu int64, memory int64, gpuCount uint32) (*types.Worker, error)

func (*LocalKubernetesWorkerPoolController) AddWorkerToMachine

func (wpc *LocalKubernetesWorkerPoolController) AddWorkerToMachine(cpu int64, memory int64, gpuType string, gpuCount uint32, machineId string) (*types.Worker, error)

func (*LocalKubernetesWorkerPoolController) Context

func (*LocalKubernetesWorkerPoolController) FreeCapacity

func (*LocalKubernetesWorkerPoolController) IsPreemptable

func (wpc *LocalKubernetesWorkerPoolController) IsPreemptable() bool

func (*LocalKubernetesWorkerPoolController) Name

type RequestBacklog

type RequestBacklog struct {
	// contains filtered or unexported fields
}

func NewRequestBacklog

func NewRequestBacklog(rdb *common.RedisClient) *RequestBacklog

func (*RequestBacklog) Len

func (rb *RequestBacklog) Len() int64

Gets the length of the sorted set

func (*RequestBacklog) Pop

Pops the oldest container request from the sorted set

func (*RequestBacklog) Push

func (rb *RequestBacklog) Push(request *types.ContainerRequest) error

Pushes a new container request into the sorted set

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

func NewScheduler

func NewScheduler(ctx context.Context, config types.AppConfig, redisClient *common.RedisClient, metricsRepo repo.MetricsRepository, backendRepo repo.BackendRepository, workspaceRepo repo.WorkspaceRepository, tailscale *network.Tailscale) (*Scheduler, error)

func (*Scheduler) Run

func (s *Scheduler) Run(request *types.ContainerRequest) error

func (*Scheduler) StartProcessingRequests

func (s *Scheduler) StartProcessingRequests()

func (*Scheduler) Stop

func (s *Scheduler) Stop(stopArgs *types.StopContainerArgs) error

type SchedulerMetrics

type SchedulerMetrics struct {
	// contains filtered or unexported fields
}

func NewSchedulerMetrics

func NewSchedulerMetrics(metricsRepo repository.MetricsRepository) SchedulerMetrics

func (*SchedulerMetrics) CounterIncContainerRequested

func (sm *SchedulerMetrics) CounterIncContainerRequested(request *types.ContainerRequest)

func (*SchedulerMetrics) CounterIncContainerScheduled

func (sm *SchedulerMetrics) CounterIncContainerScheduled(request *types.ContainerRequest)

type SchedulerService

type SchedulerService struct {
	pb.UnimplementedSchedulerServer
	Scheduler *Scheduler
}

func NewSchedulerService

func NewSchedulerService(scheduler *Scheduler) (*SchedulerService, error)

func (*SchedulerService) GetVersion

func (wbs *SchedulerService) GetVersion(ctx context.Context, in *pb.VersionRequest) (*pb.VersionResponse, error)

Get Scheduler version

func (*SchedulerService) RunContainer

Run a container

type WorkerPool

type WorkerPool struct {
	Name       string
	Config     types.WorkerPoolConfig
	Controller WorkerPoolController
}

WorkerPool represents a pool of workers with specific configuration and controller.

type WorkerPoolCapacity

type WorkerPoolCapacity struct {
	FreeCpu    int64
	FreeMemory int64
	FreeGpu    uint
}

type WorkerPoolConfig

type WorkerPoolConfig struct {
	DefaultWorkerCpuRequest    int64
	DefaultWorkerMemoryRequest int64
}

type WorkerPoolController

type WorkerPoolController interface {
	AddWorker(cpu int64, memory int64, gpuCount uint32) (*types.Worker, error)
	AddWorkerToMachine(cpu int64, memory int64, gpuType string, gpuCount uint32, machineId string) (*types.Worker, error)
	Name() string
	FreeCapacity() (*WorkerPoolCapacity, error)
	Context() context.Context
	IsPreemptable() bool
}

func NewExternalWorkerPoolController

func NewExternalWorkerPoolController(
	ctx context.Context,
	config types.AppConfig,
	workerPoolName string,
	backendRepo repository.BackendRepository,
	workerRepo repository.WorkerRepository,
	providerRepo repository.ProviderRepository,
	tailscale *network.Tailscale,
	providerName *types.MachineProvider) (WorkerPoolController, error)

func NewLocalKubernetesWorkerPoolController

func NewLocalKubernetesWorkerPoolController(ctx context.Context, config types.AppConfig, workerPoolName string, workerRepo repository.WorkerRepository, providerRepo repository.ProviderRepository) (WorkerPoolController, error)

type WorkerPoolManager

type WorkerPoolManager struct {
	// contains filtered or unexported fields
}

WorkerPoolManager manages a collection of WorkerPools using a thread-safe SafeMap. It provides additional functionality to filter and retrieve pools based on specific criteria, such as GPU type.

func NewWorkerPoolManager

func NewWorkerPoolManager() *WorkerPoolManager

func (*WorkerPoolManager) GetPool

func (m *WorkerPoolManager) GetPool(name string) (*WorkerPool, bool)

GetPool retrieves a WorkerPool by its name.

func (*WorkerPoolManager) GetPoolByFilters

func (m *WorkerPoolManager) GetPoolByFilters(filters poolFilters) []*WorkerPool

GetPoolByFilters retrieves all WorkerPools that match the specified filters. It returns a slice of WorkerPools that match all specified filters (GPU type and preemptibility), sorted by WorkerPoolConfig.Priority in descending order.

func (*WorkerPoolManager) GetPoolByGPU

func (m *WorkerPoolManager) GetPoolByGPU(gpuType string) (*WorkerPool, bool)

GetPoolByGPU retrieves a WorkerPool by its GPU type. It returns the first matching WorkerPool found.

func (*WorkerPoolManager) GetPoolsByGPU

func (m *WorkerPoolManager) GetPoolsByGPU(gpuType string) []*WorkerPool

GetPoolsByGPU retrieves all WorkerPools by their GPU type. It returns a slice of matching WorkerPools. The results are sorted by WorkerPoolConfig.Priority in descending order.

func (*WorkerPoolManager) SetPool

func (m *WorkerPoolManager) SetPool(name string, config types.WorkerPoolConfig, controller WorkerPoolController)

type WorkerPoolSizer

type WorkerPoolSizer struct {
	// contains filtered or unexported fields
}

func NewWorkerPoolSizer

func NewWorkerPoolSizer(controller WorkerPoolController,
	workerPoolConfig *types.WorkerPoolConfig,
	workerRepo repository.WorkerRepository,
	providerRepo repository.ProviderRepository) (*WorkerPoolSizer, error)

func (*WorkerPoolSizer) Start

func (s *WorkerPoolSizer) Start()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL