servermaster

package
v0.0.0-...-beee317 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 3, 2025 License: Apache-2.0 Imports: 74 Imported by: 0

README

Structure of Master

Structure

The master module consists of several components:

  • Master Server.
    • provide Grpc/http api, including
      • operate cluster
        • register/delete executor server
        • handle heartbeat (keep alive)
      • user interface
        • submit job
        • query job(s)
        • cancel job
      • schedule and failover jobs
  • ExecutorManager
    • Handle Heartbeat
      • Notify Resource Manager updates the status infos
    • Maintain the aliveness of Executors.
      • Check liveness
        • Executor Manager check every executor whether the heartbeat has been timeout.
        • Once an executor is offline by heartbeat timeout, it should notify the job manager to reschedule all the tasks on it.
  • Executor Client
    • Embeds two independent interfaces
      • ExecutorServiceClient, which is used to dispatch task to executor.
      • BrokerServiceClient, which is used to manage resource belongs to an executor.
  • JobManager
    • Receive SubmitJob Request, Check the type of Job, Create the JobMaster.
    • JobMaster (per job)
      • Generate the Tasks for the job
      • Schedule and Dispatch Tasks

Documentation

Index

Constants

View Source
const (

	// DefaultBusinessMetaID is the ID for default business metastore
	DefaultBusinessMetaID = "_default"

	// FrameMetaID is the ID for frame metastore
	FrameMetaID = "_root"
)

Variables

This section is empty.

Functions

func NewDefaultBusinessMetaConfig

func NewDefaultBusinessMetaConfig() *metaModel.StoreConfig

NewDefaultBusinessMetaConfig return the default business metastore config

Types

type Config

type Config struct {
	LogConf logutil.Config `toml:"log" json:"log"`

	Name          string `toml:"name" json:"name"`
	Addr          string `toml:"addr" json:"addr"`
	AdvertiseAddr string `toml:"advertise-addr" json:"advertise-addr"`

	FrameworkMeta *metaModel.StoreConfig `toml:"framework-meta" json:"framework-meta"`
	BusinessMeta  *metaModel.StoreConfig `toml:"business-meta" json:"business-meta"`

	KeepAliveTTLStr string `toml:"keepalive-ttl" json:"keepalive-ttl"`
	// time interval string to check executor aliveness
	KeepAliveIntervalStr string `toml:"keepalive-interval" json:"keepalive-interval"`

	KeepAliveTTL      time.Duration `toml:"-" json:"-"`
	KeepAliveInterval time.Duration `toml:"-" json:"-"`

	Storage resModel.Config `toml:"storage" json:"storage"`

	Security *security.Credential `toml:"security" json:"security"`

	JobBackoff *jobop.BackoffConfig `toml:"job-backoff" json:"job-backoff"`
}

Config is the configuration for server-master.

func GetDefaultMasterConfig

func GetDefaultMasterConfig() *Config

GetDefaultMasterConfig returns a default master config

func (*Config) AdjustAndValidate

func (c *Config) AdjustAndValidate() (err error)

AdjustAndValidate validates and adjusts the master configuration

func (*Config) String

func (c *Config) String() string

func (*Config) Toml

func (c *Config) Toml() (string, error)

Toml returns TOML format representation of config.

type Executor

type Executor struct {
	ormModel.Executor
	// contains filtered or unexported fields
}

Executor records the status of an executor instance.

type ExecutorManager

type ExecutorManager interface {
	HandleHeartbeat(req *pb.HeartbeatRequest) (*pb.HeartbeatResponse, error)
	AllocateNewExec(ctx context.Context, req *pb.RegisterExecutorRequest) (*ormModel.Executor, error)
	// ExecutorCount returns executor count with given status
	ExecutorCount(status model.ExecutorStatus) int
	HasExecutor(executorID string) bool
	ListExecutors() []*ormModel.Executor
	GetAddr(executorID model.ExecutorID) (string, bool)
	Run(ctx context.Context) error

	// WatchExecutors returns a snapshot of all online executors plus
	// a stream of events describing changes that happen to the executors
	// after the snapshot is taken.
	WatchExecutors(ctx context.Context) (
		snap map[model.ExecutorID]string, stream *notifier.Receiver[model.ExecutorStatusChange], err error,
	)

	// GetExecutorInfos implements the interface scheduler.executorInfoProvider.
	// It is called by the scheduler as the source of truth for executors.
	GetExecutorInfos() map[model.ExecutorID]schedModel.ExecutorInfo
}

ExecutorManager defines an interface to manager all executors

type ExecutorManagerImpl

type ExecutorManagerImpl struct {
	// contains filtered or unexported fields
}

ExecutorManagerImpl holds all the executors' info, including liveness, status, resource usage.

func NewExecutorManagerImpl

func NewExecutorManagerImpl(metaClient orm.Client, initHeartbeatTTL, keepAliveInterval time.Duration) *ExecutorManagerImpl

NewExecutorManagerImpl creates a new ExecutorManagerImpl instance

func (*ExecutorManagerImpl) AllocateNewExec

AllocateNewExec allocates new executor info to a give RegisterExecutorRequest and then registers the executor.

func (*ExecutorManagerImpl) ExecutorCount

func (e *ExecutorManagerImpl) ExecutorCount(status model.ExecutorStatus) (count int)

ExecutorCount implements ExecutorManager.ExecutorCount

func (*ExecutorManagerImpl) GetAddr

func (e *ExecutorManagerImpl) GetAddr(executorID model.ExecutorID) (string, bool)

GetAddr implements ExecutorManager.GetAddr

func (*ExecutorManagerImpl) GetExecutorInfos

func (e *ExecutorManagerImpl) GetExecutorInfos() map[model.ExecutorID]schedModel.ExecutorInfo

GetExecutorInfos returns necessary information on the executor that is needed for scheduling.

func (*ExecutorManagerImpl) HandleHeartbeat

func (e *ExecutorManagerImpl) HandleHeartbeat(req *pb.HeartbeatRequest) (*pb.HeartbeatResponse, error)

HandleHeartbeat implements pb interface,

func (*ExecutorManagerImpl) HasExecutor

func (e *ExecutorManagerImpl) HasExecutor(executorID string) bool

HasExecutor implements ExecutorManager.HasExecutor

func (*ExecutorManagerImpl) ListExecutors

func (e *ExecutorManagerImpl) ListExecutors() []*ormModel.Executor

ListExecutors implements ExecutorManager.ListExecutors

func (*ExecutorManagerImpl) Run

Run implements ExecutorManager.Run

func (*ExecutorManagerImpl) WatchExecutors

func (e *ExecutorManagerImpl) WatchExecutors(
	ctx context.Context,
) (snap map[model.ExecutorID]string, receiver *notifier.Receiver[model.ExecutorStatusChange], err error)

WatchExecutors implements the ExecutorManager interface.

type JobFsm

type JobFsm struct {
	JobStats
	// contains filtered or unexported fields
}

JobFsm manages state of all job masters, job master state forms a finite-state machine. Note job master managed in JobFsm is in running status, which means the job is not terminated or finished.

,-------. ,-------. ,-------. ,--------. |WaitAck| |Online | |Pending| |Finished| `---+---' `---+---' `---+---' `---+----'

|                           |                    |               |
| Master                    |                    |               |
|  .OnWorkerOnline          |                    |               |
|-------------------------->|                    |               |
|                           |                    |               |
|                           | Master             |               |
|                           |   .OnWorkerOffline |               |
|                           |   (failover)       |               |
|                           |------------------->|               |
|                           |                    |               |
|                           | Master             |               |
|                           |   .OnWorkerOffline |               |
|                           |   (finish)         |               |
|                           |----------------------------------->|
|                           |                    |               |
| Master                    |                    |               |
|  .OnWorkerOffline         |                    |               |
|  (failover)               |                    |               |
|----------------------------------------------->|               |
|                           |                    |               |
| Master                    |                    |               |
|  .OnWorkerOffline         |                    |               |
|  (finish)                 |                    |               |
|--------------------------------------------------------------->|
|                           |                    |               |
|                           | Master             |               |
|                           |   .CreateWorker    |               |
|<-----------------------------------------------|               |
|                           |                    |               |
| Master                    |                    |               |
|  .OnWorkerDispatched      |                    |               |
|  (with error)             |                    |               |
|----------------------------------------------->|               |
|                           |                    |               |
|                           |                    |               |
|                           |                    |               |

func NewJobFsm

func NewJobFsm() *JobFsm

NewJobFsm creates a new job fsm

func (*JobFsm) IterPendingJobs

func (fsm *JobFsm) IterPendingJobs(dispatchJobFn func(job *frameModel.MasterMeta) (string, error)) error

IterPendingJobs iterates all pending jobs and dispatch(via create worker) them again.

func (*JobFsm) IterWaitAckJobs

func (fsm *JobFsm) IterWaitAckJobs(dispatchJobFn func(job *frameModel.MasterMeta) (string, error)) error

IterWaitAckJobs iterates wait ack jobs, failover them if they are added from failover

func (*JobFsm) JobCount

func (fsm *JobFsm) JobCount(status pb.Job_State) int

JobCount queries job count based on job status

func (*JobFsm) JobDispatchFailed

func (fsm *JobFsm) JobDispatchFailed(worker framework.WorkerHandle) error

JobDispatchFailed is called when a job dispatch fails

func (*JobFsm) JobDispatched

func (fsm *JobFsm) JobDispatched(job *frameModel.MasterMeta, addFromFailover bool)

JobDispatched is called when a job is firstly created or server master is failovered

func (*JobFsm) JobOffline

func (fsm *JobFsm) JobOffline(worker framework.WorkerHandle, needFailover bool)

JobOffline is called when a job meets error or finishes

func (*JobFsm) JobOnline

func (fsm *JobFsm) JobOnline(worker framework.WorkerHandle) error

JobOnline is called when the first heartbeat of job is received

func (*JobFsm) QueryJob

func (fsm *JobFsm) QueryJob(jobID frameModel.MasterID) *JobHolder

QueryJob queries job with given jobID and returns QueryJobResponse

func (*JobFsm) QueryOnlineJob

func (fsm *JobFsm) QueryOnlineJob(jobID frameModel.MasterID) *JobHolder

QueryOnlineJob queries job from online job list

type JobHolder

type JobHolder struct {
	// contains filtered or unexported fields
}

JobHolder holds job meta and worker handle for a job.

func (*JobHolder) MasterMeta

func (jh *JobHolder) MasterMeta() *frameModel.MasterMeta

MasterMeta returns master meta of the job.

func (*JobHolder) WorkerHandle

func (jh *JobHolder) WorkerHandle() framework.WorkerHandle

WorkerHandle returns the job master's worker handle.

type JobManager

type JobManager interface {
	framework.Master
	JobStats
	pb.JobManagerServer

	GetJobMasterForwardAddress(ctx context.Context, jobID string) (string, error)
	GetJobStatuses(ctx context.Context) (map[frameModel.MasterID]frameModel.MasterState, error)
	UpdateJobStatus(ctx context.Context, jobID frameModel.MasterID, errMsg string, code frameModel.MasterState) error
	WatchJobStatuses(
		ctx context.Context,
	) (resManager.JobStatusesSnapshot, *notifier.Receiver[resManager.JobStatusChangeEvent], error)
}

JobManager defines manager of job master

type JobManagerImpl

type JobManagerImpl struct {
	framework.BaseMaster
	*JobFsm

	JobBackoffMgr jobop.BackoffManager
	// contains filtered or unexported fields
}

JobManagerImpl is a special job master that manages all the job masters, and notify the offline executor to them. worker state transition - submit new job, create job master successfully, then adds to the `waitAckJobs`. - receive worker online, move job from `waitAckJobs` to `onlineJobs`. - receive worker offline, move job from `onlineJobs` to `pendingJobs`. - Tick checks `pendingJobs` periodically and reschedules the jobs.

func NewJobManagerImpl

func NewJobManagerImpl(
	dctx *dcontext.Context,
	id frameModel.MasterID,
	backoffConfig *jobop.BackoffConfig,
) (*JobManagerImpl, error)

NewJobManagerImpl creates a new JobManagerImpl instance

func (*JobManagerImpl) CancelJob

func (jm *JobManagerImpl) CancelJob(ctx context.Context, req *pb.CancelJobRequest) (*pb.Job, error)

CancelJob implements JobManagerServer.CancelJob.

func (*JobManagerImpl) CloseImpl

func (jm *JobManagerImpl) CloseImpl(ctx context.Context)

CloseImpl implements frame.MasterImpl.CloseImpl

func (*JobManagerImpl) CreateJob

func (jm *JobManagerImpl) CreateJob(ctx context.Context, req *pb.CreateJobRequest) (*pb.Job, error)

CreateJob implements JobManagerServer.CreateJob.

func (*JobManagerImpl) DeleteJob

func (jm *JobManagerImpl) DeleteJob(ctx context.Context, req *pb.DeleteJobRequest) (*emptypb.Empty, error)

DeleteJob implements JobManagerServer.DeleteJob.

func (*JobManagerImpl) GetJob

func (jm *JobManagerImpl) GetJob(ctx context.Context, req *pb.GetJobRequest) (*pb.Job, error)

GetJob implements JobManagerServer.GetJob.

func (*JobManagerImpl) GetJobMasterForwardAddress

func (jm *JobManagerImpl) GetJobMasterForwardAddress(ctx context.Context, jobID string) (string, error)

GetJobMasterForwardAddress implements JobManager.GetJobMasterForwardAddress.

func (*JobManagerImpl) GetJobStatuses

func (jm *JobManagerImpl) GetJobStatuses(
	ctx context.Context,
) (map[frameModel.MasterID]frameModel.MasterState, error)

GetJobStatuses returns the status code of all jobs that are not deleted.

func (*JobManagerImpl) InitImpl

func (jm *JobManagerImpl) InitImpl(ctx context.Context) error

InitImpl implements frame.MasterImpl.InitImpl

func (*JobManagerImpl) ListJobs

ListJobs implements JobManagerServer.ListJobs.

func (*JobManagerImpl) OnMasterRecovered

func (jm *JobManagerImpl) OnMasterRecovered(ctx context.Context) error

OnMasterRecovered implements frame.MasterImpl.OnMasterRecovered

func (*JobManagerImpl) OnWorkerDispatched

func (jm *JobManagerImpl) OnWorkerDispatched(worker framework.WorkerHandle, result error) error

OnWorkerDispatched implements frame.MasterImpl.OnWorkerDispatched

func (*JobManagerImpl) OnWorkerMessage

func (jm *JobManagerImpl) OnWorkerMessage(worker framework.WorkerHandle, topic p2p.Topic, message interface{}) error

OnWorkerMessage implements frame.MasterImpl.OnWorkerMessage

func (*JobManagerImpl) OnWorkerOffline

func (jm *JobManagerImpl) OnWorkerOffline(worker framework.WorkerHandle, reason error) error

OnWorkerOffline implements frame.MasterImpl.OnWorkerOffline

func (*JobManagerImpl) OnWorkerOnline

func (jm *JobManagerImpl) OnWorkerOnline(worker framework.WorkerHandle) error

OnWorkerOnline implements frame.MasterImpl.OnWorkerOnline

func (*JobManagerImpl) OnWorkerStatusUpdated

func (jm *JobManagerImpl) OnWorkerStatusUpdated(worker framework.WorkerHandle, newStatus *frameModel.WorkerStatus) error

OnWorkerStatusUpdated implements frame.MasterImpl.OnWorkerStatusUpdated

func (*JobManagerImpl) SendCancelJobMessage

func (jm *JobManagerImpl) SendCancelJobMessage(ctx context.Context, jobID string) error

SendCancelJobMessage implements operateRouter.SendCancelJobMessage

func (*JobManagerImpl) StopImpl

func (jm *JobManagerImpl) StopImpl(ctx context.Context)

StopImpl implements frame.MasterImpl.StopImpl

func (*JobManagerImpl) Tick

func (jm *JobManagerImpl) Tick(ctx context.Context) error

Tick implements frame.MasterImpl.Tick

func (*JobManagerImpl) UpdateJobStatus

func (jm *JobManagerImpl) UpdateJobStatus(
	ctx context.Context, jobID frameModel.MasterID, errMsg string, code frameModel.MasterState,
) error

UpdateJobStatus implements JobManager.UpdateJobStatus

func (*JobManagerImpl) WatchJobStatuses

WatchJobStatuses returns a snapshot of job statuses followed by a stream of job status changes.

type JobStats

type JobStats interface {
	JobCount(status pb.Job_State) int
}

JobStats defines a statistics interface for JobFsm

type MetaStoreManager

type MetaStoreManager interface {
	// Register register specify backend store to manager with an unique id
	// id can be some readable identifier, like `meta-test1`.
	// Duplicate id will return an error
	Register(id string, store *metaModel.StoreConfig) error
	// UnRegister delete an existing backend store
	UnRegister(id string)
	// GetMetaStore get an existing backend store info
	GetMetaStore(id string) *metaModel.StoreConfig
}

MetaStoreManager defines an interface to manage metastore

func NewMetaStoreManager

func NewMetaStoreManager() MetaStoreManager

NewMetaStoreManager creates a new metaStoreManagerImpl instance

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server handles PRC requests for df master.

func NewServer

func NewServer(cfg *Config) (*Server, error)

NewServer creates a new master-server.

func (*Server) CancelJob

func (s *Server) CancelJob(ctx context.Context, req *pb.CancelJobRequest) (*pb.Job, error)

CancelJob delegates request to leader's JobManager.CancelJob.

func (*Server) CreateJob

func (s *Server) CreateJob(ctx context.Context, req *pb.CreateJobRequest) (*pb.Job, error)

CreateJob delegates request to leader's JobManager.CreateJob.

func (*Server) DeleteJob

func (s *Server) DeleteJob(ctx context.Context, req *pb.DeleteJobRequest) (*emptypb.Empty, error)

DeleteJob delegates request to leader's JobManager.DeleteJob.

func (*Server) GetJob

func (s *Server) GetJob(ctx context.Context, req *pb.GetJobRequest) (*pb.Job, error)

GetJob delegates request to leader's JobManager.GetJob.

func (*Server) GetLeader

GetLeader implements DiscoveryServer.GetLeader.

func (*Server) Heartbeat

func (s *Server) Heartbeat(ctx context.Context, req *pb.HeartbeatRequest) (*pb.HeartbeatResponse, error)

Heartbeat implements pb interface.

func (*Server) ListExecutors

func (s *Server) ListExecutors(ctx context.Context, req *pb.ListExecutorsRequest) (*pb.ListExecutorsResponse, error)

ListExecutors implements DiscoveryServer.ListExecutors.

func (*Server) ListJobs

func (s *Server) ListJobs(ctx context.Context, req *pb.ListJobsRequest) (*pb.ListJobsResponse, error)

ListJobs delegates request to leader's JobManager.ListJobs.

func (*Server) ListMasters

func (s *Server) ListMasters(ctx context.Context, req *pb.ListMastersRequest) (*pb.ListMastersResponse, error)

ListMasters implements DiscoveryServer.ListMasters.

func (*Server) QueryMetaStore

func (s *Server) QueryMetaStore(
	ctx context.Context, req *pb.QueryMetaStoreRequest,
) (*pb.QueryMetaStoreResponse, error)

QueryMetaStore implements gRPC interface

func (*Server) QueryStorageConfig

func (s *Server) QueryStorageConfig(
	ctx context.Context, req *pb.QueryStorageConfigRequest,
) (*pb.QueryStorageConfigResponse, error)

QueryStorageConfig implements gRPC interface

func (*Server) RegisterExecutor

func (s *Server) RegisterExecutor(ctx context.Context, req *pb.RegisterExecutorRequest) (*pb.Executor, error)

RegisterExecutor implements grpc interface, and passes request onto executor manager.

func (*Server) ResignLeader

func (s *Server) ResignLeader(ctx context.Context, _ *pb.ResignLeaderRequest) (*emptypb.Empty, error)

ResignLeader implements DiscoveryServer.ResignLeader.

func (*Server) Run

func (s *Server) Run(ctx context.Context) error

Run the server master.

func (*Server) ScheduleTask

func (s *Server) ScheduleTask(ctx context.Context, req *pb.ScheduleTaskRequest) (*pb.ScheduleTaskResponse, error)

ScheduleTask implements grpc interface. It works as follows - receives request from job master - queries resource manager to allocate resource and maps tasks to executors - returns scheduler response to job master

Directories

Path Synopsis
mock
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL