worker

package
v0.0.0-...-8808a61 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 2, 2024 License: AGPL-3.0 Imports: 54 Imported by: 0

Documentation

Index

Constants

View Source
const ExitCodeSigterm = 143

Variables

This section is empty.

Functions

func AuthInterceptor

func AuthInterceptor(token string) grpc.UnaryClientInterceptor

func GetPodAddr

func GetPodAddr() (string, error)

GetPodAddr gets the IP from the POD_IP env var. Returns an error if it fails to retrieve an IP.

func GetProcCurrentCPUMillicores

func GetProcCurrentCPUMillicores(cpuTime float64, prevCPUTime float64, systemCPUTime float64, prevSystemCPUTime float64) float64

func GetRandomFreePort

func GetRandomFreePort() (int, error)

func GetSystemCPU

func GetSystemCPU() (float64, error)

Types

type AWSCredentialProvider

type AWSCredentialProvider struct {
	CredentialProvider
	Region    string
	AccessKey string
	SecretKey string
}

AWS auth provider

func (*AWSCredentialProvider) GetAuthString

func (p *AWSCredentialProvider) GetAuthString() (string, error)

func (*AWSCredentialProvider) GetAuthorizationToken

func (p *AWSCredentialProvider) GetAuthorizationToken() (string, error)

func (*AWSCredentialProvider) GetUsername

func (p *AWSCredentialProvider) GetUsername() string

type AssignedGpuDevices

type AssignedGpuDevices struct {
	// contains filtered or unexported fields
}

func (*AssignedGpuDevices) String

func (d *AssignedGpuDevices) String() string

type CacheClient

type CacheClient struct {
	ServiceUrl   string
	ServiceToken string
	// contains filtered or unexported fields
}

func NewCacheClient

func NewCacheClient(serviceUrl, serviceToken string) (*CacheClient, error)

func (*CacheClient) Close

func (c *CacheClient) Close() error

func (*CacheClient) GetContent

func (c *CacheClient) GetContent(hash string, offset int64, length int64) ([]byte, error)

func (*CacheClient) StoreContent

func (c *CacheClient) StoreContent(chunks chan []byte) (string, error)

type ContainerCudaManager

type ContainerCudaManager struct {
	// contains filtered or unexported fields
}

func NewContainerCudaManager

func NewContainerCudaManager(gpuCount uint32) *ContainerCudaManager

func (*ContainerCudaManager) AssignGpuDevices

func (c *ContainerCudaManager) AssignGpuDevices(containerId string, gpuCount uint32) (*AssignedGpuDevices, error)

func (*ContainerCudaManager) InjectCudaEnvVars

func (c *ContainerCudaManager) InjectCudaEnvVars(env []string, options *ContainerOptions) ([]string, bool)

func (*ContainerCudaManager) InjectCudaMounts

func (c *ContainerCudaManager) InjectCudaMounts(mounts []specs.Mount) []specs.Mount

func (*ContainerCudaManager) UnassignGpuDevices

func (c *ContainerCudaManager) UnassignGpuDevices(containerId string)

type ContainerInstance

type ContainerInstance struct {
	Id           string
	StubId       string
	BundlePath   string
	Overlay      *common.ContainerOverlay
	Spec         *specs.Spec
	Err          error
	ExitCode     int
	Port         int
	OutputWriter *common.OutputWriter
	LogBuffer    *common.LogBuffer
}

type ContainerLogMessage

type ContainerLogMessage struct {
	Level   string  `json:"level"`
	Message string  `json:"message"`
	TaskID  *string `json:"task_id"`
}

type ContainerLogger

type ContainerLogger struct {
	// contains filtered or unexported fields
}

func (*ContainerLogger) CaptureLogs

func (r *ContainerLogger) CaptureLogs(containerId string, outputChan chan common.OutputMsg) error

func (*ContainerLogger) Read

func (r *ContainerLogger) Read(containerId string, buffer []byte) (int64, error)

type ContainerOptions

type ContainerOptions struct {
	BindPort    int
	InitialSpec *specs.Spec
}

type CredentialProvider

type CredentialProvider interface {
	GetUsername() string
	GetAuthorizationToken() (string, error)
	GetAuthString() (string, error)
}

type DockerCredentialProvider

type DockerCredentialProvider struct {
	CredentialProvider
	Username string
	Password string
}

Docker provider

func (*DockerCredentialProvider) GetAuthString

func (p *DockerCredentialProvider) GetAuthString() (string, error)

func (*DockerCredentialProvider) GetAuthorizationToken

func (p *DockerCredentialProvider) GetAuthorizationToken() (string, error)

func (*DockerCredentialProvider) GetUsername

func (p *DockerCredentialProvider) GetUsername() string

type FileLock

type FileLock struct {
	// contains filtered or unexported fields
}

func NewFileLock

func NewFileLock(path string) *FileLock

func (*FileLock) Acquire

func (fl *FileLock) Acquire() error

func (*FileLock) Release

func (fl *FileLock) Release() error

type GpuMemoryUsageStats

type GpuMemoryUsageStats struct {
	UsedCapacity  int64
	TotalCapacity int64
}

func GetGpuMemoryUsage

func GetGpuMemoryUsage(deviceIndex int) (GpuMemoryUsageStats, error)

GetGpuMemoryUsage retrieves the memory usage of a specific NVIDIA GPU. It returns the total and used memory in bytes.

type ImageClient

type ImageClient struct {
	// contains filtered or unexported fields
}

func NewImageClient

func NewImageClient(config types.ImageServiceConfig, workerId string, workerRepo repository.WorkerRepository) (*ImageClient, error)

func (*ImageClient) Archive

func (c *ImageClient) Archive(ctx context.Context, bundlePath string, imageId string, progressChan chan int) error

Generate and upload archived version of the image for distribution

func (*ImageClient) Cleanup

func (c *ImageClient) Cleanup() error

func (*ImageClient) PullAndArchiveImage

func (c *ImageClient) PullAndArchiveImage(ctx context.Context, sourceImage string, imageId string, creds *string) error

func (*ImageClient) PullLazy

func (c *ImageClient) PullLazy(imageId string) error

type Mount

type Mount struct {
	Name      string `json:"name"`
	LocalPath string `json:"local_path"`
	MountPath string `json:"mount_path"`
	ReadOnly  bool   `json:"read_only"`
}

type ProcUtil

type ProcUtil struct {
	procfs.Proc
}

func NewProcUtil

func NewProcUtil(pid int) (*ProcUtil, error)

type RunCServer

type RunCServer struct {
	pb.UnimplementedRunCServiceServer
	// contains filtered or unexported fields
}

func NewRunCServer

func NewRunCServer(containerInstances *common.SafeMap[*ContainerInstance], imageClient *ImageClient) (*RunCServer, error)

func (*RunCServer) RunCArchive

func (*RunCServer) RunCExec

Execute an arbitary command inside a running container

func (*RunCServer) RunCKill

func (*RunCServer) RunCStatus

func (*RunCServer) RunCStreamLogs

func (*RunCServer) Start

func (s *RunCServer) Start() error

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

func NewWorker

func NewWorker() (*Worker, error)

func (*Worker) Run

func (s *Worker) Run() error

func (*Worker) RunContainer

func (s *Worker) RunContainer(request *types.ContainerRequest) error

Spawn a single container and stream output to stdout/stderr

func (*Worker) SpawnAsync

func (s *Worker) SpawnAsync(request *types.ContainerRequest, bundlePath string, spec *specs.Spec) error

Invoke a runc container using a predefined config spec

type WorkerMetrics

type WorkerMetrics struct {
	// contains filtered or unexported fields
}

func NewWorkerMetrics

func NewWorkerMetrics(
	ctx context.Context,
	workerId string,
	workerRepo repo.WorkerRepository,
	config types.MonitoringConfig,
) (*WorkerMetrics, error)

func (*WorkerMetrics) EmitContainerUsage

func (wm *WorkerMetrics) EmitContainerUsage(request *types.ContainerRequest, done chan bool)

Periodically send metrics to track container duration

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL