worker

package
v0.0.0-...-b36b742 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 21, 2024 License: AGPL-3.0 Imports: 65 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetPodAddr

func GetPodAddr() (string, error)

GetPodAddr gets the IP from the POD_IP env var. Returns an error if it fails to retrieve an IP.

func GetProcCurrentCPUMillicores

func GetProcCurrentCPUMillicores(cpuTime float64, prevCPUTime float64, systemCPUTime float64, prevSystemCPUTime float64) float64

func GetSystemCPU

func GetSystemCPU() (float64, error)

Types

type AWSCredentialProvider

type AWSCredentialProvider struct {
	CredentialProvider
	Region    string
	AccessKey string
	SecretKey string
}

AWS auth provider

func (*AWSCredentialProvider) GetAuthString

func (p *AWSCredentialProvider) GetAuthString() (string, error)

func (*AWSCredentialProvider) GetAuthorizationToken

func (p *AWSCredentialProvider) GetAuthorizationToken() (string, error)

func (*AWSCredentialProvider) GetUsername

func (p *AWSCredentialProvider) GetUsername() string

type AssignedGpuDevices

type AssignedGpuDevices struct {
	// contains filtered or unexported fields
}

func (*AssignedGpuDevices) String

func (d *AssignedGpuDevices) String() string

type CedanaClient

type CedanaClient struct {
	// contains filtered or unexported fields
}

func NewCedanaClient

func NewCedanaClient(
	ctx context.Context,
	config types.Config,
	gpuEnabled bool,
) (*CedanaClient, error)

func (*CedanaClient) Checkpoint

func (c *CedanaClient) Checkpoint(ctx context.Context, containerId string) (string, error)

Checkpoint a runc container, returns the path to the checkpoint

func (*CedanaClient) Close

func (c *CedanaClient) Close()

func (*CedanaClient) DetailedHealthCheckWait

func (c *CedanaClient) DetailedHealthCheckWait(
	ctx context.Context,
) (*cedanaproto.DetailedHealthCheckResponse, error)

Perform a detailed health check of cedana C/R capabilities

func (*CedanaClient) Manage

func (c *CedanaClient) Manage(ctx context.Context, containerId string, gpuEnabled bool) error

Start managing a runc container

func (*CedanaClient) PrepareContainerSpec

func (c *CedanaClient) PrepareContainerSpec(spec *specs.Spec, containerId string, containerHostname string, gpuEnabled bool) error

Updates the runc container spec to make the shared library available as well as the shared memory that is used for communication

func (*CedanaClient) Restore

func (c *CedanaClient) Restore(
	ctx context.Context,
	restoreOpts cedanaRestoreOpts,
	runcOpts *runc.CreateOpts,
) (*cedanaproto.ProcessState, error)

Restore a runc container. If a checkpoint path is provided, it will be used as the checkpoint. If empty path is provided, the latest checkpoint path from DB will be used.

type ConsoleWriter

type ConsoleWriter struct {
	// contains filtered or unexported fields
}

func NewConsoleWriter

func NewConsoleWriter(writer io.Writer) (*ConsoleWriter, error)

Implementation of runc ConsoleSocket, for writing only

func (*ConsoleWriter) Path

func (w *ConsoleWriter) Path() string

type ContainerInstance

type ContainerInstance struct {
	Id           string
	StubId       string
	BundlePath   string
	Overlay      *common.ContainerOverlay
	Spec         *specs.Spec
	Err          error
	ExitCode     int
	Port         int
	OutputWriter *common.OutputWriter
	LogBuffer    *common.LogBuffer
	Request      *types.ContainerRequest
}

type ContainerLogMessage

type ContainerLogMessage struct {
	Level   string  `json:"level"`
	Message string  `json:"message"`
	TaskID  *string `json:"task_id"`
}

type ContainerLogger

type ContainerLogger struct {
	// contains filtered or unexported fields
}

func (*ContainerLogger) CaptureLogs

func (r *ContainerLogger) CaptureLogs(containerId string, logChan chan common.LogRecord) error

func (*ContainerLogger) Log

func (r *ContainerLogger) Log(containerId, stubId string, format string, args ...any) error

func (*ContainerLogger) Read

func (r *ContainerLogger) Read(containerId string, buffer []byte) (int64, error)

type ContainerMountManager

type ContainerMountManager struct {
	EagerCacheStubCode bool
	// contains filtered or unexported fields
}

func NewContainerMountManager

func NewContainerMountManager(config types.AppConfig) *ContainerMountManager

func (*ContainerMountManager) RemoveContainerMounts

func (c *ContainerMountManager) RemoveContainerMounts(containerId string)

RemoveContainerMounts removes all mounts for a container

func (*ContainerMountManager) SetupContainerMounts

func (c *ContainerMountManager) SetupContainerMounts(request *types.ContainerRequest) error

SetupContainerMounts initializes any external storage for a container

type ContainerNetworkManager

type ContainerNetworkManager struct {
	// contains filtered or unexported fields
}

func NewContainerNetworkManager

func NewContainerNetworkManager(ctx context.Context, workerId string, workerRepo repository.WorkerRepository, containerRepo repository.ContainerRepository, config types.AppConfig) (*ContainerNetworkManager, error)

func (*ContainerNetworkManager) ExposePort

func (m *ContainerNetworkManager) ExposePort(containerId string, hostPort, containerPort int) error

func (*ContainerNetworkManager) Setup

func (m *ContainerNetworkManager) Setup(containerId string, spec *specs.Spec) error

func (*ContainerNetworkManager) TearDown

func (m *ContainerNetworkManager) TearDown(containerId string) error

type ContainerNvidiaManager

type ContainerNvidiaManager struct {
	// contains filtered or unexported fields
}

func (*ContainerNvidiaManager) AssignGPUDevices

func (c *ContainerNvidiaManager) AssignGPUDevices(containerId string, gpuCount uint32) (*AssignedGpuDevices, error)

func (*ContainerNvidiaManager) InjectEnvVars

func (c *ContainerNvidiaManager) InjectEnvVars(env []string, options *ContainerOptions) ([]string, bool)

func (*ContainerNvidiaManager) InjectMounts

func (c *ContainerNvidiaManager) InjectMounts(mounts []specs.Mount) []specs.Mount

func (*ContainerNvidiaManager) UnassignGPUDevices

func (c *ContainerNvidiaManager) UnassignGPUDevices(containerId string)

type ContainerOptions

type ContainerOptions struct {
	BundlePath  string
	BindPort    int
	InitialSpec *specs.Spec
}

type CredentialProvider

type CredentialProvider interface {
	GetUsername() string
	GetAuthorizationToken() (string, error)
	GetAuthString() (string, error)
}

type DockerCredentialProvider

type DockerCredentialProvider struct {
	CredentialProvider
	Username string
	Password string
}

Docker provider

func (*DockerCredentialProvider) GetAuthString

func (p *DockerCredentialProvider) GetAuthString() (string, error)

func (*DockerCredentialProvider) GetAuthorizationToken

func (p *DockerCredentialProvider) GetAuthorizationToken() (string, error)

func (*DockerCredentialProvider) GetUsername

func (p *DockerCredentialProvider) GetUsername() string

type ExecWriter

type ExecWriter struct {
	// contains filtered or unexported fields
}

Will be replaced when structured logging is merged

func (*ExecWriter) Write

func (c *ExecWriter) Write(p []byte) (n int, err error)

type FileCacheManager

type FileCacheManager struct {
	// contains filtered or unexported fields
}

func NewFileCacheManager

func NewFileCacheManager(config types.AppConfig, client *blobcache.BlobCacheClient) *FileCacheManager

func (*FileCacheManager) CacheAvailable

func (cm *FileCacheManager) CacheAvailable() bool

CacheAvailable checks if the file cache is available

func (*FileCacheManager) CacheFilesInPath

func (cm *FileCacheManager) CacheFilesInPath(sourcePath string)

CacheFilesInPath caches files from a specified source path

func (*FileCacheManager) EnableVolumeCaching

func (cm *FileCacheManager) EnableVolumeCaching(workspaceName string, volumeCacheMap map[string]string, spec *specs.Spec) error

func (*FileCacheManager) GetClient

func (cm *FileCacheManager) GetClient() *blobcache.BlobCacheClient

GetClient returns the blobcache client instance.

type FileLock

type FileLock struct {
	// contains filtered or unexported fields
}

func NewFileLock

func NewFileLock(path string) *FileLock

func (*FileLock) Acquire

func (fl *FileLock) Acquire() error

func (*FileLock) Release

func (fl *FileLock) Release() error

type GPUInfoClient

type GPUInfoClient interface {
	AvailableGPUDevices() ([]int, error)
	GetGPUMemoryUsage(deviceIndex int) (GPUMemoryUsageStats, error)
}

type GPUInfoStat

type GPUInfoStat struct {
	MemoryUsed  uint64
	MemoryTotal uint64
}

type GPUManager

type GPUManager interface {
	AssignGPUDevices(containerId string, gpuCount uint32) (*AssignedGpuDevices, error)
	UnassignGPUDevices(containerId string)
	InjectEnvVars(env []string, options *ContainerOptions) ([]string, bool)
	InjectMounts(mounts []specs.Mount) []specs.Mount
}

func NewContainerNvidiaManager

func NewContainerNvidiaManager(gpuCount uint32) GPUManager

type GPUMemoryUsageStats

type GPUMemoryUsageStats struct {
	UsedCapacity  int64
	TotalCapacity int64
}

type ImageClient

type ImageClient struct {
	// contains filtered or unexported fields
}

func NewImageClient

func NewImageClient(config types.AppConfig, workerId string, workerRepo repository.WorkerRepository, fileCacheManager *FileCacheManager) (*ImageClient, error)

func (*ImageClient) Archive

func (c *ImageClient) Archive(ctx context.Context, bundlePath string, imageId string, progressChan chan int) error

Generate and upload archived version of the image for distribution

func (*ImageClient) BuildAndArchiveImage

func (c *ImageClient) BuildAndArchiveImage(ctx context.Context, outputLogger *slog.Logger, dockerfile string, imageId string, buildCtxPath string) error

func (*ImageClient) Cleanup

func (c *ImageClient) Cleanup() error

func (*ImageClient) InspectAndVerifyImage

func (c *ImageClient) InspectAndVerifyImage(ctx context.Context, sourceImage string, creds string) error

func (*ImageClient) PullAndArchiveImage

func (c *ImageClient) PullAndArchiveImage(ctx context.Context, sourceImage string, imageId string, creds string) error

func (*ImageClient) PullLazy

func (c *ImageClient) PullLazy(request *types.ContainerRequest) error

type Mount

type Mount struct {
	Name      string `json:"name"`
	LocalPath string `json:"local_path"`
	MountPath string `json:"mount_path"`
	ReadOnly  bool   `json:"read_only"`
}

type NvidiaInfoClient

type NvidiaInfoClient struct{}

func (*NvidiaInfoClient) AvailableGPUDevices

func (c *NvidiaInfoClient) AvailableGPUDevices() ([]int, error)

func (*NvidiaInfoClient) GetGPUMemoryUsage

func (c *NvidiaInfoClient) GetGPUMemoryUsage(deviceIndex int) (GPUMemoryUsageStats, error)

GetGpuMemoryUsage retrieves the memory usage of a specific NVIDIA GPU. It returns the total and used memory in bytes.

type ProcUtil

type ProcUtil struct {
	procfs.Proc
}

func NewProcUtil

func NewProcUtil(pid int) (*ProcUtil, error)

type ProcessMonitor

type ProcessMonitor struct {
	// contains filtered or unexported fields
}

func NewProcessMonitor

func NewProcessMonitor(pid int, devices []specs.LinuxDeviceCgroup) *ProcessMonitor

func (*ProcessMonitor) GetStatistics

func (m *ProcessMonitor) GetStatistics() (*ProcessStats, error)

type ProcessStats

type ProcessStats struct {
	CPU    uint64 // in millicores
	Memory process.MemoryInfoStat
	IO     process.IOCountersStat
	NetIO  net.IOCountersStat
	GPU    GPUInfoStat
}

type RunCServer

type RunCServer struct {
	pb.UnimplementedRunCServiceServer
	// contains filtered or unexported fields
}

func NewRunCServer

func NewRunCServer(containerInstances *common.SafeMap[*ContainerInstance], imageClient *ImageClient) (*RunCServer, error)

func (*RunCServer) RunCArchive

func (*RunCServer) RunCExec

Execute an arbitary command inside a running container

func (*RunCServer) RunCKill

func (*RunCServer) RunCStatus

func (*RunCServer) RunCStreamLogs

func (*RunCServer) Start

func (s *RunCServer) Start() error

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

func NewWorker

func NewWorker() (*Worker, error)

func (*Worker) IsCRIUAvailable

func (s *Worker) IsCRIUAvailable() bool

func (*Worker) Run

func (s *Worker) Run() error

func (*Worker) RunContainer

func (s *Worker) RunContainer(request *types.ContainerRequest) error

Spawn a single container and stream output to stdout/stderr

type WorkerMetrics

type WorkerMetrics struct {
	// contains filtered or unexported fields
}

func NewWorkerMetrics

func NewWorkerMetrics(
	ctx context.Context,
	workerId string,
	workerRepo repo.WorkerRepository,
	config types.MonitoringConfig,
) (*WorkerMetrics, error)

func (*WorkerMetrics) EmitContainerUsage

func (wm *WorkerMetrics) EmitContainerUsage(ctx context.Context, request *types.ContainerRequest)

Periodically send metrics to track container duration

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL