manager

package
v0.0.0-...-1ba95a4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 21, 2024 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// JobRemovedEvent means that a job has been removed.
	JobRemovedEvent = JobStatusChangeType(iota + 1)
)

Variables

This section is empty.

Functions

This section is empty.

Types

type DefaultGCCoordinator

type DefaultGCCoordinator struct {
	// contains filtered or unexported fields
}

DefaultGCCoordinator implements interface GCCoordinator. It is responsible for triggering file resource garbage collection.

func NewGCCoordinator

func NewGCCoordinator(
	executorInfos ExecutorInfoProvider,
	jobInfos JobStatusProvider,
	metaClient pkgOrm.ResourceClient,
	gcRunner GCRunner,
) *DefaultGCCoordinator

NewGCCoordinator creates a new DefaultGCCoordinator.

func (*DefaultGCCoordinator) OnKeepAlive

func (c *DefaultGCCoordinator) OnKeepAlive(resourceID resModel.ResourceID, workerID frameModel.WorkerID)

OnKeepAlive is not implemented for now.

func (*DefaultGCCoordinator) Run

Run runs the DefaultGCCoordinator.

type DefaultGCRunner

type DefaultGCRunner struct {
	// contains filtered or unexported fields
}

DefaultGCRunner implements GCRunner.

func NewGCRunner

func NewGCRunner(
	resClient pkgOrm.ResourceClient,
	executorClients client.ExecutorGroup,
	config *resModel.Config,
) *DefaultGCRunner

NewGCRunner returns a new GCRunner.

func (*DefaultGCRunner) GCExecutors

func (r *DefaultGCRunner) GCExecutors(ctx context.Context, executors ...model.ExecutorID) error

GCExecutors is used to GC executors.

For local file resource, we need to remove the meta record, since executors going offline means that the resource is already gone.

For s3 resource, we need to remove all temporary resources created by the offline exectors to avoid resource leaks. Note dummy meta record created by such exectors should be removed after temporary files are cleared.

FIXME: we should a periodic background cleaning policy to avoid affecting normal services.

func (*DefaultGCRunner) GCNotify

func (r *DefaultGCRunner) GCNotify()

GCNotify is used to ask GCRunner to GC the next resource immediately. It is used when we have just marked a resource as gc_pending.

func (*DefaultGCRunner) Run

func (r *DefaultGCRunner) Run(ctx context.Context) error

Run runs the GCRunner. It blocks until ctx is canceled.

type ExecutorInfoProvider

type ExecutorInfoProvider interface {
	HasExecutor(executorID string) bool

	// WatchExecutors returns a snapshot of all online executors plus
	// a stream of events describing changes that happen to the executors
	// after the snapshot is taken.
	WatchExecutors(ctx context.Context) (
		snap map[model.ExecutorID]string, stream *notifier.Receiver[model.ExecutorStatusChange], err error,
	)
}

ExecutorInfoProvider describes an object that maintains a list of all executors

type GCCoordinator

type GCCoordinator interface {
	Run(ctx context.Context) error
	OnKeepAlive(resourceID resModel.ResourceID, workerID frameModel.WorkerID)
}

GCCoordinator describes an object responsible for triggering file resource garbage collection.

type GCRunner

type GCRunner interface {
	Run(ctx context.Context) error
	GCNotify()
	GCExecutors(context.Context, ...model.ExecutorID) error
}

GCRunner perform the actual GC operations.

type JobStatus

type JobStatus = frameModel.MasterState

JobStatus describes the a Job's status.

type JobStatusChangeEvent

type JobStatusChangeEvent struct {
	EventType JobStatusChangeType
	JobID     frameModel.MasterID
}

JobStatusChangeEvent is an event denoting a job status has changed.

type JobStatusChangeType

type JobStatusChangeType int32

JobStatusChangeType describes the type of job status changes.

type JobStatusProvider

type JobStatusProvider interface {
	// GetJobStatuses returns the status of all jobs that are
	// not deleted.
	GetJobStatuses(ctx context.Context) (JobStatusesSnapshot, error)

	// WatchJobStatuses listens on all job status changes followed by
	// a snapshot.
	WatchJobStatuses(
		ctx context.Context,
	) (JobStatusesSnapshot, *notifier.Receiver[JobStatusChangeEvent], error)
}

JobStatusProvider describes an object that can be queried on the status of jobs.

type JobStatusesSnapshot

type JobStatusesSnapshot = map[frameModel.MasterID]JobStatus

JobStatusesSnapshot describes the statuses of all jobs at some time point.

type MockClient

type MockClient struct {
	mock.Mock
}

MockClient is a mock implementation of ResourceManagerClient interface

func NewMockClient

func NewMockClient() *MockClient

NewMockClient creates a MockClient

func (*MockClient) CreateResource

func (m *MockClient) CreateResource(ctx context.Context, req *pb.CreateResourceRequest) error

CreateResource implements ResourceManagerClient.CreateResource

func (*MockClient) QueryResource

QueryResource implements ResourceManagerClient.QueryResource

func (*MockClient) RemoveResource

func (m *MockClient) RemoveResource(ctx context.Context, req *pb.RemoveResourceRequest) error

RemoveResource implements ResourceManagerClient.RemoveResource

type MockExecutorInfoProvider

type MockExecutorInfoProvider struct {
	// contains filtered or unexported fields
}

MockExecutorInfoProvider implements ExecutorInfoProvider interface

func NewMockExecutorInfoProvider

func NewMockExecutorInfoProvider() *MockExecutorInfoProvider

NewMockExecutorInfoProvider creates a new MockExecutorInfoProvider instance

func (*MockExecutorInfoProvider) AddExecutor

func (p *MockExecutorInfoProvider) AddExecutor(executorID string, addr string)

AddExecutor adds an executor to the mock.

func (*MockExecutorInfoProvider) HasExecutor

func (p *MockExecutorInfoProvider) HasExecutor(executorID string) bool

HasExecutor returns whether the mock contains the given executor.

func (*MockExecutorInfoProvider) ListExecutors

func (p *MockExecutorInfoProvider) ListExecutors() (ret []string)

ListExecutors lists all executors.

func (*MockExecutorInfoProvider) RemoveExecutor

func (p *MockExecutorInfoProvider) RemoveExecutor(executorID string)

RemoveExecutor removes an executor from the mock.

func (*MockExecutorInfoProvider) WatchExecutors

WatchExecutors implements ExecutorManager.WatchExecutors

type MockGCRunner

type MockGCRunner struct {
	GCRunner
	// contains filtered or unexported fields
}

MockGCRunner implements the interface GCRunner.

func NewMockGCRunner

func NewMockGCRunner(resClient pkgOrm.ResourceClient) *MockGCRunner

NewMockGCRunner returns a new MockGCNotifier

func (*MockGCRunner) GCNotify

func (n *MockGCRunner) GCNotify()

GCNotify pushes a new notification to the internal channel so it can be waited on by WaitNotify().

func (*MockGCRunner) WaitNotify

func (n *MockGCRunner) WaitNotify(t *testing.T, timeout time.Duration)

WaitNotify waits for a pending notification with timeout.

type MockJobStatusProvider

type MockJobStatusProvider struct {
	// contains filtered or unexported fields
}

MockJobStatusProvider implements ExecutorManager interface

func NewMockJobStatusProvider

func NewMockJobStatusProvider() *MockJobStatusProvider

NewMockJobStatusProvider returns a new instance of MockJobStatusProvider.

func (*MockJobStatusProvider) GetJobStatuses

func (jp *MockJobStatusProvider) GetJobStatuses(ctx context.Context) (JobStatusesSnapshot, error)

GetJobStatuses implements the interface JobStatusProvider.

func (*MockJobStatusProvider) RemoveJob

func (jp *MockJobStatusProvider) RemoveJob(jobID frameModel.MasterID)

RemoveJob removes a job from the mock.

func (*MockJobStatusProvider) SetJobStatus

func (jp *MockJobStatusProvider) SetJobStatus(jobID frameModel.MasterID, status JobStatus)

SetJobStatus upserts the status of a given job.

func (*MockJobStatusProvider) WatchJobStatuses

WatchJobStatuses implements the interface JobStatusProvider.

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service implements pb.ResourceManagerServer

func NewService

func NewService(metaclient pkgOrm.Client) *Service

NewService creates a new externalresource manage service

func (*Service) CreateResource

func (s *Service) CreateResource(
	ctx context.Context,
	request *pb.CreateResourceRequest,
) (*pb.CreateResourceResponse, error)

CreateResource implements ResourceManagerClient.CreateResource

func (*Service) GetPlacementConstraint

func (s *Service) GetPlacementConstraint(
	ctx context.Context,
	resourceKey resModel.ResourceKey,
) (resModel.ExecutorID, bool, error)

GetPlacementConstraint is called by the Scheduler to determine whether a resource the worker relies on requires the worker running on a specific executor. Returns: (1) A local resource is required and the resource exists: (executorID, true, nil) (2) A local resource is required but the resource is not found: ("", false, ErrResourceDoesNotExist) (3) No placement constraint is needed: ("", false, nil) (4) Other errors: ("", false, err)

func (*Service) QueryResource

func (s *Service) QueryResource(
	ctx context.Context,
	request *pb.QueryResourceRequest,
) (*pb.QueryResourceResponse, error)

QueryResource implements ResourceManagerClient.QueryResource

func (*Service) RemoveResource

func (s *Service) RemoveResource(
	ctx context.Context,
	request *pb.RemoveResourceRequest,
) (*pb.RemoveResourceResponse, error)

RemoveResource implements ResourceManagerClient.RemoveResource

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL