Documentation ¶
Index ¶
- type BaseReqHandler
- func (rcvr *BaseReqHandler) AsyncHandleReqs(h ReqHandler) []interface{}
- func (rcvr *BaseReqHandler) Clear()
- func (rcvr *BaseReqHandler) GetLatestRequest() interface{}
- func (rcvr *BaseReqHandler) GetTaskMasterPool() *masterpool.TaskMasterPool
- func (rcvr *BaseReqHandler) IsActive() bool
- func (rcvr *BaseReqHandler) Process(jobInstanceId int64, reqs []interface{}, workerIdAddr string)
- func (rcvr *BaseReqHandler) SetBatchSize(batchSize int32)
- func (rcvr *BaseReqHandler) SetWorkThreadNum(workThreadNum int)
- func (rcvr *BaseReqHandler) Start(h ReqHandler) error
- func (rcvr *BaseReqHandler) Stop()
- func (rcvr *BaseReqHandler) SubmitRequest(request interface{})
- func (rcvr *BaseReqHandler) SyncHandleReqs(h ReqHandler, pageSize int32, workerIdAddr string) []interface{}
- type BaseTaskDispatchReqHandler
- type ContainerStatusReqHandler
- type ContainerStatusReqHandlerPool
- func (p *ContainerStatusReqHandlerPool) Contains(jobInstanceId int64) bool
- func (p *ContainerStatusReqHandlerPool) GetHandlers() *sync.Map
- func (p *ContainerStatusReqHandlerPool) Start(jobInstanceId int64, reqHandler *ContainerStatusReqHandler)
- func (p *ContainerStatusReqHandlerPool) Stop(jobInstanceId int64)
- func (p *ContainerStatusReqHandlerPool) SubmitReq(jobInstanceId int64, req *schedulerx.ContainerReportTaskStatusRequest) bool
- type Pair
- type ReqHandler
- type ReqQueue
- type TMStatusReqHandler
- type TaskDispatchReqHandler
- type TaskPullReqHandler
- type TaskPushReqHandler
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BaseReqHandler ¶
type BaseReqHandler struct {
// contains filtered or unexported fields
}
BaseReqHandler used for every parallel/Grid job instance every parallel/Grid task master has a BaseReqHandler, a BaseReqHandler will batch retrieve reqs then merge these reqs into a batch request for hugely reducing network reqs number
func NewBaseReqHandler ¶
func (*BaseReqHandler) AsyncHandleReqs ¶
func (rcvr *BaseReqHandler) AsyncHandleReqs(h ReqHandler) []interface{}
func (*BaseReqHandler) Clear ¶
func (rcvr *BaseReqHandler) Clear()
func (*BaseReqHandler) GetLatestRequest ¶
func (rcvr *BaseReqHandler) GetLatestRequest() interface{}
func (*BaseReqHandler) GetTaskMasterPool ¶
func (rcvr *BaseReqHandler) GetTaskMasterPool() *masterpool.TaskMasterPool
func (*BaseReqHandler) IsActive ¶
func (rcvr *BaseReqHandler) IsActive() bool
IsActive queue has remaining or at least on runnable running, using this method with attention because batch process may be async so activeRunnableNum should be decremented when job really down,
func (*BaseReqHandler) Process ¶
func (rcvr *BaseReqHandler) Process(jobInstanceId int64, reqs []interface{}, workerIdAddr string)
Process logic implemented by subclass for processing this batch of reqs jobInstanceId: id of job instance which these reqs belong to. reqs: batch of reqs workerIdAddr: workerIdAddr of PullModel
func (*BaseReqHandler) SetBatchSize ¶
func (rcvr *BaseReqHandler) SetBatchSize(batchSize int32)
func (*BaseReqHandler) SetWorkThreadNum ¶
func (rcvr *BaseReqHandler) SetWorkThreadNum(workThreadNum int)
func (*BaseReqHandler) Start ¶
func (rcvr *BaseReqHandler) Start(h ReqHandler) error
func (*BaseReqHandler) Stop ¶
func (rcvr *BaseReqHandler) Stop()
func (*BaseReqHandler) SubmitRequest ¶
func (rcvr *BaseReqHandler) SubmitRequest(request interface{})
func (*BaseReqHandler) SyncHandleReqs ¶
func (rcvr *BaseReqHandler) SyncHandleReqs(h ReqHandler, pageSize int32, workerIdAddr string) []interface{}
type BaseTaskDispatchReqHandler ¶
type BaseTaskDispatchReqHandler struct { *BaseReqHandler // contains filtered or unexported fields }
func (*BaseTaskDispatchReqHandler) SetDispatchSize ¶
func (rcvr *BaseTaskDispatchReqHandler) SetDispatchSize(dispatchSize int)
type ContainerStatusReqHandler ¶
type ContainerStatusReqHandler struct { *BaseReqHandler // contains filtered or unexported fields }
ContainerStatusReqHandler batch report container task status to task master
func (*ContainerStatusReqHandler) GetTaskMasterAkkaPath ¶
func (h *ContainerStatusReqHandler) GetTaskMasterAkkaPath() string
func (*ContainerStatusReqHandler) Process ¶
func (h *ContainerStatusReqHandler) Process(jobInstanceId int64, requests []interface{}, workerAddr string)
type ContainerStatusReqHandlerPool ¶
type ContainerStatusReqHandlerPool struct {
// contains filtered or unexported fields
}
ContainerStatusReqHandlerPool a reqs handler per jobInstance
func GetContainerStatusReqHandlerPool ¶
func GetContainerStatusReqHandlerPool() *ContainerStatusReqHandlerPool
func NewContainerStatusReqHandlerPool ¶
func NewContainerStatusReqHandlerPool() *ContainerStatusReqHandlerPool
func (*ContainerStatusReqHandlerPool) Contains ¶
func (p *ContainerStatusReqHandlerPool) Contains(jobInstanceId int64) bool
func (*ContainerStatusReqHandlerPool) GetHandlers ¶
func (p *ContainerStatusReqHandlerPool) GetHandlers() *sync.Map
func (*ContainerStatusReqHandlerPool) Start ¶
func (p *ContainerStatusReqHandlerPool) Start(jobInstanceId int64, reqHandler *ContainerStatusReqHandler)
func (*ContainerStatusReqHandlerPool) Stop ¶
func (p *ContainerStatusReqHandlerPool) Stop(jobInstanceId int64)
func (*ContainerStatusReqHandlerPool) SubmitReq ¶
func (p *ContainerStatusReqHandlerPool) SubmitReq(jobInstanceId int64, req *schedulerx.ContainerReportTaskStatusRequest) bool
type ReqHandler ¶
type ReqHandler interface { Start(h ReqHandler) error Stop() Clear() IsActive() bool GetLatestRequest() interface{} SetBatchSize(batchSize int32) SetWorkThreadNum(workThreadNum int) SubmitRequest(request interface{}) AsyncHandleReqs(h ReqHandler) []interface{} SyncHandleReqs(h ReqHandler, pageSize int32, workerIdAddr string) []interface{} Process(jobInstanceId int64, reqs []interface{}, workerIdAddr string) }
type ReqQueue ¶
type ReqQueue struct {
// contains filtered or unexported fields
}
func NewReqQueue ¶
func (*ReqQueue) RetrieveRequests ¶
func (*ReqQueue) SubmitRequest ¶
type TMStatusReqHandler ¶
type TMStatusReqHandler struct {
*BaseReqHandler
}
func NewTMStatusReqHandler ¶
func (*TMStatusReqHandler) Process ¶
func (rcvr *TMStatusReqHandler) Process(jobInstanceId int64, reqs []interface{}, workerIdAddr string)
type TaskDispatchReqHandler ¶
type TaskDispatchReqHandler interface { ReqHandler SetDispatchSize(dispatchSize int) }
func NewTaskPullReqHandler ¶
func NewTaskPushReqHandler ¶
type TaskPullReqHandler ¶
type TaskPullReqHandler struct {
*BaseTaskDispatchReqHandler
}
func (*TaskPullReqHandler) Process ¶
func (h *TaskPullReqHandler) Process(jobInstanceId int64, reqs []interface{}, workerIdAddr string)
func (*TaskPullReqHandler) Start ¶
func (h *TaskPullReqHandler) Start(handler ReqHandler) error
type TaskPushReqHandler ¶
type TaskPushReqHandler struct { *BaseTaskDispatchReqHandler // contains filtered or unexported fields }
func (*TaskPushReqHandler) Process ¶
func (h *TaskPushReqHandler) Process(jobInstanceId int64, reqs []interface{}, workerAddr string)
func (*TaskPushReqHandler) Start ¶
func (h *TaskPushReqHandler) Start(handler ReqHandler) error