Documentation ¶
Index ¶
- Constants
- func NewDefaultBusinessMetaConfig() *metaModel.StoreConfig
- type Config
- type Executor
- type ExecutorManager
- type ExecutorManagerImpl
- func (e *ExecutorManagerImpl) AllocateNewExec(ctx context.Context, req *pb.RegisterExecutorRequest) (*ormModel.Executor, error)
- func (e *ExecutorManagerImpl) ExecutorCount(status model.ExecutorStatus) (count int)
- func (e *ExecutorManagerImpl) GetAddr(executorID model.ExecutorID) (string, bool)
- func (e *ExecutorManagerImpl) GetExecutorInfos() map[model.ExecutorID]schedModel.ExecutorInfo
- func (e *ExecutorManagerImpl) HandleHeartbeat(req *pb.HeartbeatRequest) (*pb.HeartbeatResponse, error)
- func (e *ExecutorManagerImpl) HasExecutor(executorID string) bool
- func (e *ExecutorManagerImpl) ListExecutors() []*ormModel.Executor
- func (e *ExecutorManagerImpl) Run(ctx context.Context) error
- func (e *ExecutorManagerImpl) WatchExecutors(ctx context.Context) (snap map[model.ExecutorID]string, ...)
- type JobFsm
- func (fsm *JobFsm) IterPendingJobs(dispatchJobFn func(job *frameModel.MasterMeta) (string, error)) error
- func (fsm *JobFsm) IterWaitAckJobs(dispatchJobFn func(job *frameModel.MasterMeta) (string, error)) error
- func (fsm *JobFsm) JobCount(status pb.Job_State) int
- func (fsm *JobFsm) JobDispatchFailed(worker framework.WorkerHandle) error
- func (fsm *JobFsm) JobDispatched(job *frameModel.MasterMeta, addFromFailover bool)
- func (fsm *JobFsm) JobOffline(worker framework.WorkerHandle, needFailover bool)
- func (fsm *JobFsm) JobOnline(worker framework.WorkerHandle) error
- func (fsm *JobFsm) QueryJob(jobID frameModel.MasterID) *JobHolder
- func (fsm *JobFsm) QueryOnlineJob(jobID frameModel.MasterID) *JobHolder
- type JobHolder
- type JobManager
- type JobManagerImpl
- func (jm *JobManagerImpl) CancelJob(ctx context.Context, req *pb.CancelJobRequest) (*pb.Job, error)
- func (jm *JobManagerImpl) CloseImpl(ctx context.Context)
- func (jm *JobManagerImpl) CreateJob(ctx context.Context, req *pb.CreateJobRequest) (*pb.Job, error)
- func (jm *JobManagerImpl) DeleteJob(ctx context.Context, req *pb.DeleteJobRequest) (*emptypb.Empty, error)
- func (jm *JobManagerImpl) GetJob(ctx context.Context, req *pb.GetJobRequest) (*pb.Job, error)
- func (jm *JobManagerImpl) GetJobMasterForwardAddress(ctx context.Context, jobID string) (string, error)
- func (jm *JobManagerImpl) GetJobStatuses(ctx context.Context) (map[frameModel.MasterID]frameModel.MasterState, error)
- func (jm *JobManagerImpl) InitImpl(ctx context.Context) error
- func (jm *JobManagerImpl) ListJobs(ctx context.Context, req *pb.ListJobsRequest) (*pb.ListJobsResponse, error)
- func (jm *JobManagerImpl) OnMasterRecovered(ctx context.Context) error
- func (jm *JobManagerImpl) OnWorkerDispatched(worker framework.WorkerHandle, result error) error
- func (jm *JobManagerImpl) OnWorkerMessage(worker framework.WorkerHandle, topic p2p.Topic, message interface{}) error
- func (jm *JobManagerImpl) OnWorkerOffline(worker framework.WorkerHandle, reason error) error
- func (jm *JobManagerImpl) OnWorkerOnline(worker framework.WorkerHandle) error
- func (jm *JobManagerImpl) OnWorkerStatusUpdated(worker framework.WorkerHandle, newStatus *frameModel.WorkerStatus) error
- func (jm *JobManagerImpl) SendCancelJobMessage(ctx context.Context, jobID string) error
- func (jm *JobManagerImpl) StopImpl(ctx context.Context)
- func (jm *JobManagerImpl) Tick(ctx context.Context) error
- func (jm *JobManagerImpl) UpdateJobStatus(ctx context.Context, jobID frameModel.MasterID, errMsg string, ...) error
- func (jm *JobManagerImpl) WatchJobStatuses(ctx context.Context) (resManager.JobStatusesSnapshot, ...)
- type JobStats
- type MetaStoreManager
- type Server
- func (s *Server) CancelJob(ctx context.Context, req *pb.CancelJobRequest) (*pb.Job, error)
- func (s *Server) CreateJob(ctx context.Context, req *pb.CreateJobRequest) (*pb.Job, error)
- func (s *Server) DeleteJob(ctx context.Context, req *pb.DeleteJobRequest) (*emptypb.Empty, error)
- func (s *Server) GetJob(ctx context.Context, req *pb.GetJobRequest) (*pb.Job, error)
- func (s *Server) GetLeader(_ context.Context, _ *pb.GetLeaderRequest) (*pb.GetLeaderResponse, error)
- func (s *Server) Heartbeat(ctx context.Context, req *pb.HeartbeatRequest) (*pb.HeartbeatResponse, error)
- func (s *Server) ListExecutors(ctx context.Context, req *pb.ListExecutorsRequest) (*pb.ListExecutorsResponse, error)
- func (s *Server) ListJobs(ctx context.Context, req *pb.ListJobsRequest) (*pb.ListJobsResponse, error)
- func (s *Server) ListMasters(ctx context.Context, req *pb.ListMastersRequest) (*pb.ListMastersResponse, error)
- func (s *Server) QueryMetaStore(ctx context.Context, req *pb.QueryMetaStoreRequest) (*pb.QueryMetaStoreResponse, error)
- func (s *Server) QueryStorageConfig(ctx context.Context, req *pb.QueryStorageConfigRequest) (*pb.QueryStorageConfigResponse, error)
- func (s *Server) RegisterExecutor(ctx context.Context, req *pb.RegisterExecutorRequest) (*pb.Executor, error)
- func (s *Server) ResignLeader(ctx context.Context, _ *pb.ResignLeaderRequest) (*emptypb.Empty, error)
- func (s *Server) Run(ctx context.Context) error
- func (s *Server) ScheduleTask(ctx context.Context, req *pb.ScheduleTaskRequest) (*pb.ScheduleTaskResponse, error)
Constants ¶
const ( // DefaultBusinessMetaID is the ID for default business metastore DefaultBusinessMetaID = "_default" // FrameMetaID is the ID for frame metastore FrameMetaID = "_root" )
Variables ¶
This section is empty.
Functions ¶
func NewDefaultBusinessMetaConfig ¶
func NewDefaultBusinessMetaConfig() *metaModel.StoreConfig
NewDefaultBusinessMetaConfig return the default business metastore config
Types ¶
type Config ¶
type Config struct { LogConf logutil.Config `toml:"log" json:"log"` Name string `toml:"name" json:"name"` Addr string `toml:"addr" json:"addr"` AdvertiseAddr string `toml:"advertise-addr" json:"advertise-addr"` FrameworkMeta *metaModel.StoreConfig `toml:"framework-meta" json:"framework-meta"` BusinessMeta *metaModel.StoreConfig `toml:"business-meta" json:"business-meta"` KeepAliveTTLStr string `toml:"keepalive-ttl" json:"keepalive-ttl"` // time interval string to check executor aliveness KeepAliveIntervalStr string `toml:"keepalive-interval" json:"keepalive-interval"` KeepAliveTTL time.Duration `toml:"-" json:"-"` KeepAliveInterval time.Duration `toml:"-" json:"-"` Storage resModel.Config `toml:"storage" json:"storage"` Security *security.Credential `toml:"security" json:"security"` JobBackoff *jobop.BackoffConfig `toml:"job-backoff" json:"job-backoff"` }
Config is the configuration for server-master.
func GetDefaultMasterConfig ¶
func GetDefaultMasterConfig() *Config
GetDefaultMasterConfig returns a default master config
func (*Config) AdjustAndValidate ¶
AdjustAndValidate validates and adjusts the master configuration
type ExecutorManager ¶
type ExecutorManager interface { HandleHeartbeat(req *pb.HeartbeatRequest) (*pb.HeartbeatResponse, error) AllocateNewExec(ctx context.Context, req *pb.RegisterExecutorRequest) (*ormModel.Executor, error) // ExecutorCount returns executor count with given status ExecutorCount(status model.ExecutorStatus) int HasExecutor(executorID string) bool ListExecutors() []*ormModel.Executor GetAddr(executorID model.ExecutorID) (string, bool) Run(ctx context.Context) error // WatchExecutors returns a snapshot of all online executors plus // a stream of events describing changes that happen to the executors // after the snapshot is taken. WatchExecutors(ctx context.Context) ( snap map[model.ExecutorID]string, stream *notifier.Receiver[model.ExecutorStatusChange], err error, ) // GetExecutorInfos implements the interface scheduler.executorInfoProvider. // It is called by the scheduler as the source of truth for executors. GetExecutorInfos() map[model.ExecutorID]schedModel.ExecutorInfo }
ExecutorManager defines an interface to manager all executors
type ExecutorManagerImpl ¶
type ExecutorManagerImpl struct {
// contains filtered or unexported fields
}
ExecutorManagerImpl holds all the executors' info, including liveness, status, resource usage.
func NewExecutorManagerImpl ¶
func NewExecutorManagerImpl(metaClient orm.Client, initHeartbeatTTL, keepAliveInterval time.Duration) *ExecutorManagerImpl
NewExecutorManagerImpl creates a new ExecutorManagerImpl instance
func (*ExecutorManagerImpl) AllocateNewExec ¶
func (e *ExecutorManagerImpl) AllocateNewExec(ctx context.Context, req *pb.RegisterExecutorRequest) (*ormModel.Executor, error)
AllocateNewExec allocates new executor info to a give RegisterExecutorRequest and then registers the executor.
func (*ExecutorManagerImpl) ExecutorCount ¶
func (e *ExecutorManagerImpl) ExecutorCount(status model.ExecutorStatus) (count int)
ExecutorCount implements ExecutorManager.ExecutorCount
func (*ExecutorManagerImpl) GetAddr ¶
func (e *ExecutorManagerImpl) GetAddr(executorID model.ExecutorID) (string, bool)
GetAddr implements ExecutorManager.GetAddr
func (*ExecutorManagerImpl) GetExecutorInfos ¶
func (e *ExecutorManagerImpl) GetExecutorInfos() map[model.ExecutorID]schedModel.ExecutorInfo
GetExecutorInfos returns necessary information on the executor that is needed for scheduling.
func (*ExecutorManagerImpl) HandleHeartbeat ¶
func (e *ExecutorManagerImpl) HandleHeartbeat(req *pb.HeartbeatRequest) (*pb.HeartbeatResponse, error)
HandleHeartbeat implements pb interface,
func (*ExecutorManagerImpl) HasExecutor ¶
func (e *ExecutorManagerImpl) HasExecutor(executorID string) bool
HasExecutor implements ExecutorManager.HasExecutor
func (*ExecutorManagerImpl) ListExecutors ¶
func (e *ExecutorManagerImpl) ListExecutors() []*ormModel.Executor
ListExecutors implements ExecutorManager.ListExecutors
func (*ExecutorManagerImpl) Run ¶
func (e *ExecutorManagerImpl) Run(ctx context.Context) error
Run implements ExecutorManager.Run
func (*ExecutorManagerImpl) WatchExecutors ¶
func (e *ExecutorManagerImpl) WatchExecutors( ctx context.Context, ) (snap map[model.ExecutorID]string, receiver *notifier.Receiver[model.ExecutorStatusChange], err error)
WatchExecutors implements the ExecutorManager interface.
type JobFsm ¶
type JobFsm struct { JobStats // contains filtered or unexported fields }
JobFsm manages state of all job masters, job master state forms a finite-state machine. Note job master managed in JobFsm is in running status, which means the job is not terminated or finished.
,-------. ,-------. ,-------. ,--------. |WaitAck| |Online | |Pending| |Finished| `---+---' `---+---' `---+---' `---+----'
| | | | | Master | | | | .OnWorkerOnline | | | |-------------------------->| | | | | | | | | Master | | | | .OnWorkerOffline | | | | (failover) | | | |------------------->| | | | | | | | Master | | | | .OnWorkerOffline | | | | (finish) | | | |----------------------------------->| | | | | | Master | | | | .OnWorkerOffline | | | | (failover) | | | |----------------------------------------------->| | | | | | | Master | | | | .OnWorkerOffline | | | | (finish) | | | |--------------------------------------------------------------->| | | | | | | Master | | | | .CreateWorker | | |<-----------------------------------------------| | | | | | | Master | | | | .OnWorkerDispatched | | | | (with error) | | | |----------------------------------------------->| | | | | | | | | | | | | |
func (*JobFsm) IterPendingJobs ¶
func (fsm *JobFsm) IterPendingJobs(dispatchJobFn func(job *frameModel.MasterMeta) (string, error)) error
IterPendingJobs iterates all pending jobs and dispatch(via create worker) them again.
func (*JobFsm) IterWaitAckJobs ¶
func (fsm *JobFsm) IterWaitAckJobs(dispatchJobFn func(job *frameModel.MasterMeta) (string, error)) error
IterWaitAckJobs iterates wait ack jobs, failover them if they are added from failover
func (*JobFsm) JobDispatchFailed ¶
func (fsm *JobFsm) JobDispatchFailed(worker framework.WorkerHandle) error
JobDispatchFailed is called when a job dispatch fails
func (*JobFsm) JobDispatched ¶
func (fsm *JobFsm) JobDispatched(job *frameModel.MasterMeta, addFromFailover bool)
JobDispatched is called when a job is firstly created or server master is failovered
func (*JobFsm) JobOffline ¶
func (fsm *JobFsm) JobOffline(worker framework.WorkerHandle, needFailover bool)
JobOffline is called when a job meets error or finishes
func (*JobFsm) JobOnline ¶
func (fsm *JobFsm) JobOnline(worker framework.WorkerHandle) error
JobOnline is called when the first heartbeat of job is received
func (*JobFsm) QueryJob ¶
func (fsm *JobFsm) QueryJob(jobID frameModel.MasterID) *JobHolder
QueryJob queries job with given jobID and returns QueryJobResponse
func (*JobFsm) QueryOnlineJob ¶
func (fsm *JobFsm) QueryOnlineJob(jobID frameModel.MasterID) *JobHolder
QueryOnlineJob queries job from online job list
type JobHolder ¶
type JobHolder struct {
// contains filtered or unexported fields
}
JobHolder holds job meta and worker handle for a job.
func (*JobHolder) MasterMeta ¶
func (jh *JobHolder) MasterMeta() *frameModel.MasterMeta
MasterMeta returns master meta of the job.
func (*JobHolder) WorkerHandle ¶
func (jh *JobHolder) WorkerHandle() framework.WorkerHandle
WorkerHandle returns the job master's worker handle.
type JobManager ¶
type JobManager interface { framework.Master JobStats pb.JobManagerServer GetJobMasterForwardAddress(ctx context.Context, jobID string) (string, error) GetJobStatuses(ctx context.Context) (map[frameModel.MasterID]frameModel.MasterState, error) UpdateJobStatus(ctx context.Context, jobID frameModel.MasterID, errMsg string, code frameModel.MasterState) error WatchJobStatuses( ctx context.Context, ) (resManager.JobStatusesSnapshot, *notifier.Receiver[resManager.JobStatusChangeEvent], error) }
JobManager defines manager of job master
type JobManagerImpl ¶
type JobManagerImpl struct { framework.BaseMaster *JobFsm JobBackoffMgr jobop.BackoffManager // contains filtered or unexported fields }
JobManagerImpl is a special job master that manages all the job masters, and notify the offline executor to them. worker state transition - submit new job, create job master successfully, then adds to the `waitAckJobs`. - receive worker online, move job from `waitAckJobs` to `onlineJobs`. - receive worker offline, move job from `onlineJobs` to `pendingJobs`. - Tick checks `pendingJobs` periodically and reschedules the jobs.
func NewJobManagerImpl ¶
func NewJobManagerImpl( dctx *dcontext.Context, id frameModel.MasterID, backoffConfig *jobop.BackoffConfig, ) (*JobManagerImpl, error)
NewJobManagerImpl creates a new JobManagerImpl instance
func (*JobManagerImpl) CancelJob ¶
func (jm *JobManagerImpl) CancelJob(ctx context.Context, req *pb.CancelJobRequest) (*pb.Job, error)
CancelJob implements JobManagerServer.CancelJob.
func (*JobManagerImpl) CloseImpl ¶
func (jm *JobManagerImpl) CloseImpl(ctx context.Context)
CloseImpl implements frame.MasterImpl.CloseImpl
func (*JobManagerImpl) CreateJob ¶
func (jm *JobManagerImpl) CreateJob(ctx context.Context, req *pb.CreateJobRequest) (*pb.Job, error)
CreateJob implements JobManagerServer.CreateJob.
func (*JobManagerImpl) DeleteJob ¶
func (jm *JobManagerImpl) DeleteJob(ctx context.Context, req *pb.DeleteJobRequest) (*emptypb.Empty, error)
DeleteJob implements JobManagerServer.DeleteJob.
func (*JobManagerImpl) GetJob ¶
func (jm *JobManagerImpl) GetJob(ctx context.Context, req *pb.GetJobRequest) (*pb.Job, error)
GetJob implements JobManagerServer.GetJob.
func (*JobManagerImpl) GetJobMasterForwardAddress ¶
func (jm *JobManagerImpl) GetJobMasterForwardAddress(ctx context.Context, jobID string) (string, error)
GetJobMasterForwardAddress implements JobManager.GetJobMasterForwardAddress.
func (*JobManagerImpl) GetJobStatuses ¶
func (jm *JobManagerImpl) GetJobStatuses( ctx context.Context, ) (map[frameModel.MasterID]frameModel.MasterState, error)
GetJobStatuses returns the status code of all jobs that are not deleted.
func (*JobManagerImpl) InitImpl ¶
func (jm *JobManagerImpl) InitImpl(ctx context.Context) error
InitImpl implements frame.MasterImpl.InitImpl
func (*JobManagerImpl) ListJobs ¶
func (jm *JobManagerImpl) ListJobs(ctx context.Context, req *pb.ListJobsRequest) (*pb.ListJobsResponse, error)
ListJobs implements JobManagerServer.ListJobs.
func (*JobManagerImpl) OnMasterRecovered ¶
func (jm *JobManagerImpl) OnMasterRecovered(ctx context.Context) error
OnMasterRecovered implements frame.MasterImpl.OnMasterRecovered
func (*JobManagerImpl) OnWorkerDispatched ¶
func (jm *JobManagerImpl) OnWorkerDispatched(worker framework.WorkerHandle, result error) error
OnWorkerDispatched implements frame.MasterImpl.OnWorkerDispatched
func (*JobManagerImpl) OnWorkerMessage ¶
func (jm *JobManagerImpl) OnWorkerMessage(worker framework.WorkerHandle, topic p2p.Topic, message interface{}) error
OnWorkerMessage implements frame.MasterImpl.OnWorkerMessage
func (*JobManagerImpl) OnWorkerOffline ¶
func (jm *JobManagerImpl) OnWorkerOffline(worker framework.WorkerHandle, reason error) error
OnWorkerOffline implements frame.MasterImpl.OnWorkerOffline
func (*JobManagerImpl) OnWorkerOnline ¶
func (jm *JobManagerImpl) OnWorkerOnline(worker framework.WorkerHandle) error
OnWorkerOnline implements frame.MasterImpl.OnWorkerOnline
func (*JobManagerImpl) OnWorkerStatusUpdated ¶
func (jm *JobManagerImpl) OnWorkerStatusUpdated(worker framework.WorkerHandle, newStatus *frameModel.WorkerStatus) error
OnWorkerStatusUpdated implements frame.MasterImpl.OnWorkerStatusUpdated
func (*JobManagerImpl) SendCancelJobMessage ¶
func (jm *JobManagerImpl) SendCancelJobMessage(ctx context.Context, jobID string) error
SendCancelJobMessage implements operateRouter.SendCancelJobMessage
func (*JobManagerImpl) StopImpl ¶
func (jm *JobManagerImpl) StopImpl(ctx context.Context)
StopImpl implements frame.MasterImpl.StopImpl
func (*JobManagerImpl) Tick ¶
func (jm *JobManagerImpl) Tick(ctx context.Context) error
Tick implements frame.MasterImpl.Tick
func (*JobManagerImpl) UpdateJobStatus ¶
func (jm *JobManagerImpl) UpdateJobStatus( ctx context.Context, jobID frameModel.MasterID, errMsg string, code frameModel.MasterState, ) error
UpdateJobStatus implements JobManager.UpdateJobStatus
func (*JobManagerImpl) WatchJobStatuses ¶
func (jm *JobManagerImpl) WatchJobStatuses( ctx context.Context, ) (resManager.JobStatusesSnapshot, *notifier.Receiver[resManager.JobStatusChangeEvent], error)
WatchJobStatuses returns a snapshot of job statuses followed by a stream of job status changes.
type MetaStoreManager ¶
type MetaStoreManager interface { // Register register specify backend store to manager with an unique id // id can be some readable identifier, like `meta-test1`. // Duplicate id will return an error Register(id string, store *metaModel.StoreConfig) error // UnRegister delete an existing backend store UnRegister(id string) // GetMetaStore get an existing backend store info GetMetaStore(id string) *metaModel.StoreConfig }
MetaStoreManager defines an interface to manage metastore
func NewMetaStoreManager ¶
func NewMetaStoreManager() MetaStoreManager
NewMetaStoreManager creates a new metaStoreManagerImpl instance
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server handles PRC requests for df master.
func (*Server) GetLeader ¶
func (s *Server) GetLeader(_ context.Context, _ *pb.GetLeaderRequest) (*pb.GetLeaderResponse, error)
GetLeader implements DiscoveryServer.GetLeader.
func (*Server) Heartbeat ¶
func (s *Server) Heartbeat(ctx context.Context, req *pb.HeartbeatRequest) (*pb.HeartbeatResponse, error)
Heartbeat implements pb interface.
func (*Server) ListExecutors ¶
func (s *Server) ListExecutors(ctx context.Context, req *pb.ListExecutorsRequest) (*pb.ListExecutorsResponse, error)
ListExecutors implements DiscoveryServer.ListExecutors.
func (*Server) ListJobs ¶
func (s *Server) ListJobs(ctx context.Context, req *pb.ListJobsRequest) (*pb.ListJobsResponse, error)
ListJobs delegates request to leader's JobManager.ListJobs.
func (*Server) ListMasters ¶
func (s *Server) ListMasters(ctx context.Context, req *pb.ListMastersRequest) (*pb.ListMastersResponse, error)
ListMasters implements DiscoveryServer.ListMasters.
func (*Server) QueryMetaStore ¶
func (s *Server) QueryMetaStore( ctx context.Context, req *pb.QueryMetaStoreRequest, ) (*pb.QueryMetaStoreResponse, error)
QueryMetaStore implements gRPC interface
func (*Server) QueryStorageConfig ¶
func (s *Server) QueryStorageConfig( ctx context.Context, req *pb.QueryStorageConfigRequest, ) (*pb.QueryStorageConfigResponse, error)
QueryStorageConfig implements gRPC interface
func (*Server) RegisterExecutor ¶
func (s *Server) RegisterExecutor(ctx context.Context, req *pb.RegisterExecutorRequest) (*pb.Executor, error)
RegisterExecutor implements grpc interface, and passes request onto executor manager.
func (*Server) ResignLeader ¶
func (s *Server) ResignLeader(ctx context.Context, _ *pb.ResignLeaderRequest) (*emptypb.Empty, error)
ResignLeader implements DiscoveryServer.ResignLeader.
func (*Server) ScheduleTask ¶
func (s *Server) ScheduleTask(ctx context.Context, req *pb.ScheduleTaskRequest) (*pb.ScheduleTaskResponse, error)
ScheduleTask implements grpc interface. It works as follows - receives request from job master - queries resource manager to allocate resource and maps tasks to executors - returns scheduler response to job master