Documentation ¶
Index ¶
- Constants
- type DefaultGCCoordinator
- type DefaultGCRunner
- type ExecutorInfoProvider
- type GCCoordinator
- type GCRunner
- type JobStatus
- type JobStatusChangeEvent
- type JobStatusChangeType
- type JobStatusProvider
- type JobStatusesSnapshot
- type MockClient
- func (m *MockClient) CreateResource(ctx context.Context, req *pb.CreateResourceRequest) error
- func (m *MockClient) QueryResource(ctx context.Context, req *pb.QueryResourceRequest) (*pb.QueryResourceResponse, error)
- func (m *MockClient) RemoveResource(ctx context.Context, req *pb.RemoveResourceRequest) error
- type MockExecutorInfoProvider
- func (p *MockExecutorInfoProvider) AddExecutor(executorID string, addr string)
- func (p *MockExecutorInfoProvider) HasExecutor(executorID string) bool
- func (p *MockExecutorInfoProvider) ListExecutors() (ret []string)
- func (p *MockExecutorInfoProvider) RemoveExecutor(executorID string)
- func (p *MockExecutorInfoProvider) WatchExecutors(ctx context.Context) (map[model.ExecutorID]string, *notifier.Receiver[model.ExecutorStatusChange], ...)
- type MockGCRunner
- type MockJobStatusProvider
- func (jp *MockJobStatusProvider) GetJobStatuses(ctx context.Context) (JobStatusesSnapshot, error)
- func (jp *MockJobStatusProvider) RemoveJob(jobID frameModel.MasterID)
- func (jp *MockJobStatusProvider) SetJobStatus(jobID frameModel.MasterID, status JobStatus)
- func (jp *MockJobStatusProvider) WatchJobStatuses(ctx context.Context) (JobStatusesSnapshot, *notifier.Receiver[JobStatusChangeEvent], error)
- type Service
- func (s *Service) CreateResource(ctx context.Context, request *pb.CreateResourceRequest) (*pb.CreateResourceResponse, error)
- func (s *Service) GetPlacementConstraint(ctx context.Context, resourceKey resModel.ResourceKey) (resModel.ExecutorID, bool, error)
- func (s *Service) QueryResource(ctx context.Context, request *pb.QueryResourceRequest) (*pb.QueryResourceResponse, error)
- func (s *Service) RemoveResource(ctx context.Context, request *pb.RemoveResourceRequest) (*pb.RemoveResourceResponse, error)
Constants ¶
const ( // JobRemovedEvent means that a job has been removed. JobRemovedEvent = JobStatusChangeType(iota + 1) )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DefaultGCCoordinator ¶
type DefaultGCCoordinator struct {
// contains filtered or unexported fields
}
DefaultGCCoordinator implements interface GCCoordinator. It is responsible for triggering file resource garbage collection.
func NewGCCoordinator ¶
func NewGCCoordinator( executorInfos ExecutorInfoProvider, jobInfos JobStatusProvider, metaClient pkgOrm.ResourceClient, gcRunner GCRunner, ) *DefaultGCCoordinator
NewGCCoordinator creates a new DefaultGCCoordinator.
func (*DefaultGCCoordinator) OnKeepAlive ¶
func (c *DefaultGCCoordinator) OnKeepAlive(resourceID resModel.ResourceID, workerID frameModel.WorkerID)
OnKeepAlive is not implemented for now.
type DefaultGCRunner ¶
type DefaultGCRunner struct {
// contains filtered or unexported fields
}
DefaultGCRunner implements GCRunner.
func NewGCRunner ¶
func NewGCRunner( resClient pkgOrm.ResourceClient, executorClients client.ExecutorGroup, config *resModel.Config, ) *DefaultGCRunner
NewGCRunner returns a new GCRunner.
func (*DefaultGCRunner) GCExecutors ¶
func (r *DefaultGCRunner) GCExecutors(ctx context.Context, executors ...model.ExecutorID) error
GCExecutors is used to GC executors.
For local file resource, we need to remove the meta record, since executors going offline means that the resource is already gone.
For s3 resource, we need to remove all temporary resources created by the offline exectors to avoid resource leaks. Note dummy meta record created by such exectors should be removed after temporary files are cleared.
FIXME: we should a periodic background cleaning policy to avoid affecting normal services.
func (*DefaultGCRunner) GCNotify ¶
func (r *DefaultGCRunner) GCNotify()
GCNotify is used to ask GCRunner to GC the next resource immediately. It is used when we have just marked a resource as gc_pending.
type ExecutorInfoProvider ¶
type ExecutorInfoProvider interface { HasExecutor(executorID string) bool // WatchExecutors returns a snapshot of all online executors plus // a stream of events describing changes that happen to the executors // after the snapshot is taken. WatchExecutors(ctx context.Context) ( snap map[model.ExecutorID]string, stream *notifier.Receiver[model.ExecutorStatusChange], err error, ) }
ExecutorInfoProvider describes an object that maintains a list of all executors
type GCCoordinator ¶
type GCCoordinator interface { Run(ctx context.Context) error OnKeepAlive(resourceID resModel.ResourceID, workerID frameModel.WorkerID) }
GCCoordinator describes an object responsible for triggering file resource garbage collection.
type GCRunner ¶
type GCRunner interface { Run(ctx context.Context) error GCNotify() GCExecutors(context.Context, ...model.ExecutorID) error }
GCRunner perform the actual GC operations.
type JobStatusChangeEvent ¶
type JobStatusChangeEvent struct { EventType JobStatusChangeType JobID frameModel.MasterID }
JobStatusChangeEvent is an event denoting a job status has changed.
type JobStatusChangeType ¶
type JobStatusChangeType int32
JobStatusChangeType describes the type of job status changes.
type JobStatusProvider ¶
type JobStatusProvider interface { // GetJobStatuses returns the status of all jobs that are // not deleted. GetJobStatuses(ctx context.Context) (JobStatusesSnapshot, error) // WatchJobStatuses listens on all job status changes followed by // a snapshot. WatchJobStatuses( ctx context.Context, ) (JobStatusesSnapshot, *notifier.Receiver[JobStatusChangeEvent], error) }
JobStatusProvider describes an object that can be queried on the status of jobs.
type JobStatusesSnapshot ¶
type JobStatusesSnapshot = map[frameModel.MasterID]JobStatus
JobStatusesSnapshot describes the statuses of all jobs at some time point.
type MockClient ¶
MockClient is a mock implementation of ResourceManagerClient interface
func (*MockClient) CreateResource ¶
func (m *MockClient) CreateResource(ctx context.Context, req *pb.CreateResourceRequest) error
CreateResource implements ResourceManagerClient.CreateResource
func (*MockClient) QueryResource ¶
func (m *MockClient) QueryResource(ctx context.Context, req *pb.QueryResourceRequest) (*pb.QueryResourceResponse, error)
QueryResource implements ResourceManagerClient.QueryResource
func (*MockClient) RemoveResource ¶
func (m *MockClient) RemoveResource(ctx context.Context, req *pb.RemoveResourceRequest) error
RemoveResource implements ResourceManagerClient.RemoveResource
type MockExecutorInfoProvider ¶
type MockExecutorInfoProvider struct {
// contains filtered or unexported fields
}
MockExecutorInfoProvider implements ExecutorInfoProvider interface
func NewMockExecutorInfoProvider ¶
func NewMockExecutorInfoProvider() *MockExecutorInfoProvider
NewMockExecutorInfoProvider creates a new MockExecutorInfoProvider instance
func (*MockExecutorInfoProvider) AddExecutor ¶
func (p *MockExecutorInfoProvider) AddExecutor(executorID string, addr string)
AddExecutor adds an executor to the mock.
func (*MockExecutorInfoProvider) HasExecutor ¶
func (p *MockExecutorInfoProvider) HasExecutor(executorID string) bool
HasExecutor returns whether the mock contains the given executor.
func (*MockExecutorInfoProvider) ListExecutors ¶
func (p *MockExecutorInfoProvider) ListExecutors() (ret []string)
ListExecutors lists all executors.
func (*MockExecutorInfoProvider) RemoveExecutor ¶
func (p *MockExecutorInfoProvider) RemoveExecutor(executorID string)
RemoveExecutor removes an executor from the mock.
func (*MockExecutorInfoProvider) WatchExecutors ¶
func (p *MockExecutorInfoProvider) WatchExecutors( ctx context.Context, ) (map[model.ExecutorID]string, *notifier.Receiver[model.ExecutorStatusChange], error)
WatchExecutors implements ExecutorManager.WatchExecutors
type MockGCRunner ¶
type MockGCRunner struct { GCRunner // contains filtered or unexported fields }
MockGCRunner implements the interface GCRunner.
func NewMockGCRunner ¶
func NewMockGCRunner(resClient pkgOrm.ResourceClient) *MockGCRunner
NewMockGCRunner returns a new MockGCNotifier
func (*MockGCRunner) GCNotify ¶
func (n *MockGCRunner) GCNotify()
GCNotify pushes a new notification to the internal channel so it can be waited on by WaitNotify().
func (*MockGCRunner) WaitNotify ¶
func (n *MockGCRunner) WaitNotify(t *testing.T, timeout time.Duration)
WaitNotify waits for a pending notification with timeout.
type MockJobStatusProvider ¶
type MockJobStatusProvider struct {
// contains filtered or unexported fields
}
MockJobStatusProvider implements ExecutorManager interface
func NewMockJobStatusProvider ¶
func NewMockJobStatusProvider() *MockJobStatusProvider
NewMockJobStatusProvider returns a new instance of MockJobStatusProvider.
func (*MockJobStatusProvider) GetJobStatuses ¶
func (jp *MockJobStatusProvider) GetJobStatuses(ctx context.Context) (JobStatusesSnapshot, error)
GetJobStatuses implements the interface JobStatusProvider.
func (*MockJobStatusProvider) RemoveJob ¶
func (jp *MockJobStatusProvider) RemoveJob(jobID frameModel.MasterID)
RemoveJob removes a job from the mock.
func (*MockJobStatusProvider) SetJobStatus ¶
func (jp *MockJobStatusProvider) SetJobStatus(jobID frameModel.MasterID, status JobStatus)
SetJobStatus upserts the status of a given job.
func (*MockJobStatusProvider) WatchJobStatuses ¶
func (jp *MockJobStatusProvider) WatchJobStatuses( ctx context.Context, ) (JobStatusesSnapshot, *notifier.Receiver[JobStatusChangeEvent], error)
WatchJobStatuses implements the interface JobStatusProvider.
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service implements pb.ResourceManagerServer
func NewService ¶
NewService creates a new externalresource manage service
func (*Service) CreateResource ¶
func (s *Service) CreateResource( ctx context.Context, request *pb.CreateResourceRequest, ) (*pb.CreateResourceResponse, error)
CreateResource implements ResourceManagerClient.CreateResource
func (*Service) GetPlacementConstraint ¶
func (s *Service) GetPlacementConstraint( ctx context.Context, resourceKey resModel.ResourceKey, ) (resModel.ExecutorID, bool, error)
GetPlacementConstraint is called by the Scheduler to determine whether a resource the worker relies on requires the worker running on a specific executor. Returns: (1) A local resource is required and the resource exists: (executorID, true, nil) (2) A local resource is required but the resource is not found: ("", false, ErrResourceDoesNotExist) (3) No placement constraint is needed: ("", false, nil) (4) Other errors: ("", false, err)
func (*Service) QueryResource ¶
func (s *Service) QueryResource( ctx context.Context, request *pb.QueryResourceRequest, ) (*pb.QueryResourceResponse, error)
QueryResource implements ResourceManagerClient.QueryResource
func (*Service) RemoveResource ¶
func (s *Service) RemoveResource( ctx context.Context, request *pb.RemoveResourceRequest, ) (*pb.RemoveResourceResponse, error)
RemoveResource implements ResourceManagerClient.RemoveResource