Documentation ¶
Index ¶
- Constants
- Variables
- func GenerateWorkerId() string
- func MonitorPoolSize(wpc WorkerPoolController, workerPoolConfig *types.WorkerPoolConfig, ...) error
- func ParseCPU(cpu interface{}) (int64, error)
- func ParseGPU(gpu interface{}) (uint, error)
- func ParseGPUType(gpu interface{}) (types.GPUType, error)
- func ParseGpuCount(gpuCount interface{}) (int64, error)
- func ParseMemory(memory interface{}) (int64, error)
- type ExternalWorkerPoolController
- func (wpc *ExternalWorkerPoolController) AddWorker(cpu int64, memory int64, gpuCount uint32) (*types.Worker, error)
- func (wpc *ExternalWorkerPoolController) AddWorkerToMachine(cpu int64, memory int64, gpuType string, gpuCount uint32, machineId string) (*types.Worker, error)
- func (wpc *ExternalWorkerPoolController) Context() context.Context
- func (wpc *ExternalWorkerPoolController) FreeCapacity() (*WorkerPoolCapacity, error)
- func (wpc *ExternalWorkerPoolController) IsPreemptable() bool
- func (wpc *ExternalWorkerPoolController) Name() string
- type LocalKubernetesWorkerPoolController
- func (wpc *LocalKubernetesWorkerPoolController) AddWorker(cpu int64, memory int64, gpuCount uint32) (*types.Worker, error)
- func (wpc *LocalKubernetesWorkerPoolController) AddWorkerToMachine(cpu int64, memory int64, gpuType string, gpuCount uint32, machineId string) (*types.Worker, error)
- func (wpc *LocalKubernetesWorkerPoolController) Context() context.Context
- func (wpc *LocalKubernetesWorkerPoolController) FreeCapacity() (*WorkerPoolCapacity, error)
- func (wpc *LocalKubernetesWorkerPoolController) IsPreemptable() bool
- func (wpc *LocalKubernetesWorkerPoolController) Name() string
- type RequestBacklog
- type Scheduler
- type SchedulerMetrics
- type SchedulerService
- type WorkerPool
- type WorkerPoolCapacity
- type WorkerPoolConfig
- type WorkerPoolController
- type WorkerPoolManager
- func (m *WorkerPoolManager) GetPool(name string) (*WorkerPool, bool)
- func (m *WorkerPoolManager) GetPoolByFilters(filters poolFilters) []*WorkerPool
- func (m *WorkerPoolManager) GetPoolByGPU(gpuType string) (*WorkerPool, bool)
- func (m *WorkerPoolManager) GetPoolsByGPU(gpuType string) []*WorkerPool
- func (m *WorkerPoolManager) SetPool(name string, config types.WorkerPoolConfig, controller WorkerPoolController)
- type WorkerPoolSizer
Constants ¶
const ( Beta9WorkerLabelKey string = "run.beam.cloud/role" Beta9WorkerLabelValue string = "worker" Beta9WorkerJobPrefix string = "worker" Beta9WorkerLabelIDKey string = "run.beam.cloud/worker-id" Beta9WorkerLabelPoolNameKey string = "run.beam.cloud/worker-pool-name" PrometheusPortKey string = "prometheus.io/port" PrometheusScrapeKey string = "prometheus.io/scrape" )
Variables ¶
var SchedulerConfig config = config{
Version: "0.1.0",
}
Functions ¶
func GenerateWorkerId ¶
func GenerateWorkerId() string
func MonitorPoolSize ¶
func MonitorPoolSize(wpc WorkerPoolController, workerPoolConfig *types.WorkerPoolConfig, workerRepo repository.WorkerRepository, providerRepo repository.ProviderRepository) error
func ParseGPUType ¶
func ParseGpuCount ¶
func ParseMemory ¶
Types ¶
type ExternalWorkerPoolController ¶
type ExternalWorkerPoolController struct {
// contains filtered or unexported fields
}
func (*ExternalWorkerPoolController) AddWorkerToMachine ¶
func (*ExternalWorkerPoolController) Context ¶
func (wpc *ExternalWorkerPoolController) Context() context.Context
func (*ExternalWorkerPoolController) FreeCapacity ¶
func (wpc *ExternalWorkerPoolController) FreeCapacity() (*WorkerPoolCapacity, error)
func (*ExternalWorkerPoolController) IsPreemptable ¶
func (wpc *ExternalWorkerPoolController) IsPreemptable() bool
func (*ExternalWorkerPoolController) Name ¶
func (wpc *ExternalWorkerPoolController) Name() string
type LocalKubernetesWorkerPoolController ¶
type LocalKubernetesWorkerPoolController struct {
// contains filtered or unexported fields
}
A "local" k8s worker pool controller means the pool is local to the control plane / in-cluster
func (*LocalKubernetesWorkerPoolController) AddWorkerToMachine ¶
func (*LocalKubernetesWorkerPoolController) Context ¶
func (wpc *LocalKubernetesWorkerPoolController) Context() context.Context
func (*LocalKubernetesWorkerPoolController) FreeCapacity ¶
func (wpc *LocalKubernetesWorkerPoolController) FreeCapacity() (*WorkerPoolCapacity, error)
func (*LocalKubernetesWorkerPoolController) IsPreemptable ¶
func (wpc *LocalKubernetesWorkerPoolController) IsPreemptable() bool
func (*LocalKubernetesWorkerPoolController) Name ¶
func (wpc *LocalKubernetesWorkerPoolController) Name() string
type RequestBacklog ¶
type RequestBacklog struct {
// contains filtered or unexported fields
}
func NewRequestBacklog ¶
func NewRequestBacklog(rdb *common.RedisClient) *RequestBacklog
func (*RequestBacklog) Len ¶
func (rb *RequestBacklog) Len() int64
Gets the length of the sorted set
func (*RequestBacklog) Pop ¶
func (rb *RequestBacklog) Pop() (*types.ContainerRequest, error)
Pops the oldest container request from the sorted set
func (*RequestBacklog) Push ¶
func (rb *RequestBacklog) Push(request *types.ContainerRequest) error
Pushes a new container request into the sorted set
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
func NewScheduler ¶
func NewScheduler(ctx context.Context, config types.AppConfig, redisClient *common.RedisClient, metricsRepo repo.MetricsRepository, backendRepo repo.BackendRepository, workspaceRepo repo.WorkspaceRepository, tailscale *network.Tailscale) (*Scheduler, error)
func (*Scheduler) StartProcessingRequests ¶
func (s *Scheduler) StartProcessingRequests()
type SchedulerMetrics ¶
type SchedulerMetrics struct {
// contains filtered or unexported fields
}
func NewSchedulerMetrics ¶
func NewSchedulerMetrics(metricsRepo repository.MetricsRepository) SchedulerMetrics
func (*SchedulerMetrics) CounterIncContainerRequested ¶
func (sm *SchedulerMetrics) CounterIncContainerRequested(request *types.ContainerRequest)
func (*SchedulerMetrics) CounterIncContainerScheduled ¶
func (sm *SchedulerMetrics) CounterIncContainerScheduled(request *types.ContainerRequest)
type SchedulerService ¶
type SchedulerService struct { pb.UnimplementedSchedulerServer Scheduler *Scheduler }
func NewSchedulerService ¶
func NewSchedulerService(scheduler *Scheduler) (*SchedulerService, error)
func (*SchedulerService) GetVersion ¶
func (wbs *SchedulerService) GetVersion(ctx context.Context, in *pb.VersionRequest) (*pb.VersionResponse, error)
Get Scheduler version
func (*SchedulerService) RunContainer ¶
func (wbs *SchedulerService) RunContainer(ctx context.Context, in *pb.RunContainerRequest) (*pb.RunContainerResponse, error)
Run a container
type WorkerPool ¶
type WorkerPool struct { Name string Config types.WorkerPoolConfig Controller WorkerPoolController }
WorkerPool represents a pool of workers with specific configuration and controller.
type WorkerPoolCapacity ¶
type WorkerPoolConfig ¶
type WorkerPoolController ¶
type WorkerPoolController interface { AddWorker(cpu int64, memory int64, gpuCount uint32) (*types.Worker, error) AddWorkerToMachine(cpu int64, memory int64, gpuType string, gpuCount uint32, machineId string) (*types.Worker, error) Name() string FreeCapacity() (*WorkerPoolCapacity, error) Context() context.Context IsPreemptable() bool }
func NewExternalWorkerPoolController ¶
func NewExternalWorkerPoolController( ctx context.Context, config types.AppConfig, workerPoolName string, backendRepo repository.BackendRepository, workerRepo repository.WorkerRepository, providerRepo repository.ProviderRepository, tailscale *network.Tailscale, providerName *types.MachineProvider) (WorkerPoolController, error)
func NewLocalKubernetesWorkerPoolController ¶
func NewLocalKubernetesWorkerPoolController(ctx context.Context, config types.AppConfig, workerPoolName string, workerRepo repository.WorkerRepository, providerRepo repository.ProviderRepository) (WorkerPoolController, error)
type WorkerPoolManager ¶
type WorkerPoolManager struct {
// contains filtered or unexported fields
}
WorkerPoolManager manages a collection of WorkerPools using a thread-safe SafeMap. It provides additional functionality to filter and retrieve pools based on specific criteria, such as GPU type.
func NewWorkerPoolManager ¶
func NewWorkerPoolManager() *WorkerPoolManager
func (*WorkerPoolManager) GetPool ¶
func (m *WorkerPoolManager) GetPool(name string) (*WorkerPool, bool)
GetPool retrieves a WorkerPool by its name.
func (*WorkerPoolManager) GetPoolByFilters ¶
func (m *WorkerPoolManager) GetPoolByFilters(filters poolFilters) []*WorkerPool
GetPoolByFilters retrieves all WorkerPools that match the specified filters. It returns a slice of WorkerPools that match all specified filters (GPU type and preemptibility), sorted by WorkerPoolConfig.Priority in descending order.
func (*WorkerPoolManager) GetPoolByGPU ¶
func (m *WorkerPoolManager) GetPoolByGPU(gpuType string) (*WorkerPool, bool)
GetPoolByGPU retrieves a WorkerPool by its GPU type. It returns the first matching WorkerPool found.
func (*WorkerPoolManager) GetPoolsByGPU ¶
func (m *WorkerPoolManager) GetPoolsByGPU(gpuType string) []*WorkerPool
GetPoolsByGPU retrieves all WorkerPools by their GPU type. It returns a slice of matching WorkerPools. The results are sorted by WorkerPoolConfig.Priority in descending order.
func (*WorkerPoolManager) SetPool ¶
func (m *WorkerPoolManager) SetPool(name string, config types.WorkerPoolConfig, controller WorkerPoolController)
type WorkerPoolSizer ¶
type WorkerPoolSizer struct {
// contains filtered or unexported fields
}
func NewWorkerPoolSizer ¶
func NewWorkerPoolSizer(controller WorkerPoolController, workerPoolConfig *types.WorkerPoolConfig, workerRepo repository.WorkerRepository, providerRepo repository.ProviderRepository) (*WorkerPoolSizer, error)
func (*WorkerPoolSizer) Start ¶
func (s *WorkerPoolSizer) Start()