batch

package
v1.0.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 12, 2024 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BaseReqHandler

type BaseReqHandler struct {
	// contains filtered or unexported fields
}

BaseReqHandler used for every parallel/Grid job instance every parallel/Grid task master has a BaseReqHandler, a BaseReqHandler will batch retrieve reqs then merge these reqs into a batch request for hugely reducing network reqs number

func NewBaseReqHandler

func NewBaseReqHandler(jobInstanceId int64, coreBatchThreadNum int, maxBatchThreadNum int, batchSize int32,
	queue *ReqQueue, batchProcessThreadName string, batchRetrieveThreadName string) (rcvr *BaseReqHandler)

func (*BaseReqHandler) AsyncHandleReqs

func (rcvr *BaseReqHandler) AsyncHandleReqs(h ReqHandler) []interface{}

func (*BaseReqHandler) Clear

func (rcvr *BaseReqHandler) Clear()

func (*BaseReqHandler) GetLatestRequest

func (rcvr *BaseReqHandler) GetLatestRequest() interface{}

func (*BaseReqHandler) GetTaskMasterPool

func (rcvr *BaseReqHandler) GetTaskMasterPool() *masterpool.TaskMasterPool

func (*BaseReqHandler) IsActive

func (rcvr *BaseReqHandler) IsActive() bool

IsActive queue has remaining or at least on runnable running, using this method with attention because batch process may be async so activeRunnableNum should be decremented when job really down,

func (*BaseReqHandler) Process

func (rcvr *BaseReqHandler) Process(jobInstanceId int64, reqs []interface{}, workerIdAddr string)

Process logic implemented by subclass for processing this batch of reqs jobInstanceId: id of job instance which these reqs belong to. reqs: batch of reqs workerIdAddr: workerIdAddr of PullModel

func (*BaseReqHandler) SetBatchSize

func (rcvr *BaseReqHandler) SetBatchSize(batchSize int32)

func (*BaseReqHandler) SetWorkThreadNum

func (rcvr *BaseReqHandler) SetWorkThreadNum(workThreadNum int)

func (*BaseReqHandler) Start

func (rcvr *BaseReqHandler) Start(h ReqHandler) error

func (*BaseReqHandler) Stop

func (rcvr *BaseReqHandler) Stop()

func (*BaseReqHandler) SubmitRequest

func (rcvr *BaseReqHandler) SubmitRequest(request interface{})

func (*BaseReqHandler) SyncHandleReqs

func (rcvr *BaseReqHandler) SyncHandleReqs(h ReqHandler, pageSize int32, workerIdAddr string) []interface{}

type BaseTaskDispatchReqHandler

type BaseTaskDispatchReqHandler struct {
	*BaseReqHandler
	// contains filtered or unexported fields
}

func NewBaseTaskDispatchReqHandler

func NewBaseTaskDispatchReqHandler(jobInstanceId int64, coreBatchThreadNum int, maxBatchThreadNum int, batchSize int32,
	queue *ReqQueue, batchProcessThreadName string, batchRetrieveThreadName string) *BaseTaskDispatchReqHandler

func (*BaseTaskDispatchReqHandler) SetDispatchSize

func (rcvr *BaseTaskDispatchReqHandler) SetDispatchSize(dispatchSize int)

type ContainerStatusReqHandler

type ContainerStatusReqHandler struct {
	*BaseReqHandler
	// contains filtered or unexported fields
}

ContainerStatusReqHandler batch report container task status to task master

func NewContainerStatusReqHandler

func NewContainerStatusReqHandler(jobInstanceId int64, coreBatchThreadNum int, maxBatchThreadNum int, batchSize int32, queue *ReqQueue, taskMasterAkkaPath string) *ContainerStatusReqHandler

func (*ContainerStatusReqHandler) GetTaskMasterAkkaPath

func (h *ContainerStatusReqHandler) GetTaskMasterAkkaPath() string

func (*ContainerStatusReqHandler) Process

func (h *ContainerStatusReqHandler) Process(jobInstanceId int64, requests []interface{}, workerAddr string)

type ContainerStatusReqHandlerPool

type ContainerStatusReqHandlerPool struct {
	// contains filtered or unexported fields
}

ContainerStatusReqHandlerPool a reqs handler per jobInstance

func GetContainerStatusReqHandlerPool

func GetContainerStatusReqHandlerPool() *ContainerStatusReqHandlerPool

func NewContainerStatusReqHandlerPool

func NewContainerStatusReqHandlerPool() *ContainerStatusReqHandlerPool

func (*ContainerStatusReqHandlerPool) Contains

func (p *ContainerStatusReqHandlerPool) Contains(jobInstanceId int64) bool

func (*ContainerStatusReqHandlerPool) GetHandlers

func (p *ContainerStatusReqHandlerPool) GetHandlers() *sync.Map

func (*ContainerStatusReqHandlerPool) Start

func (p *ContainerStatusReqHandlerPool) Start(jobInstanceId int64, reqHandler *ContainerStatusReqHandler)

func (*ContainerStatusReqHandlerPool) Stop

func (p *ContainerStatusReqHandlerPool) Stop(jobInstanceId int64)

func (*ContainerStatusReqHandlerPool) SubmitReq

type Pair

type Pair struct {
	// contains filtered or unexported fields
}

type ReqHandler

type ReqHandler interface {
	Start(h ReqHandler) error
	Stop()
	Clear()
	IsActive() bool
	GetLatestRequest() interface{}
	SetBatchSize(batchSize int32)
	SetWorkThreadNum(workThreadNum int)
	SubmitRequest(request interface{})
	AsyncHandleReqs(h ReqHandler) []interface{}
	SyncHandleReqs(h ReqHandler, pageSize int32, workerIdAddr string) []interface{}
	Process(jobInstanceId int64, reqs []interface{}, workerIdAddr string)
}

type ReqQueue

type ReqQueue struct {
	// contains filtered or unexported fields
}

func NewReqQueue

func NewReqQueue(maxSize int32) (q *ReqQueue)

func (*ReqQueue) Clear

func (q *ReqQueue) Clear()

func (*ReqQueue) RetrieveRequests

func (q *ReqQueue) RetrieveRequests(batchSize int32) []any

func (*ReqQueue) Size

func (q *ReqQueue) Size() int

func (*ReqQueue) SubmitRequest

func (q *ReqQueue) SubmitRequest(req any)

type TMStatusReqHandler

type TMStatusReqHandler struct {
	*BaseReqHandler
}

func NewTMStatusReqHandler

func NewTMStatusReqHandler(jobInstanceId int64, coreBatchThreadNum int, maxBatchThreadNum int, batchSize int32, queue *ReqQueue) (rcvr *TMStatusReqHandler)

func (*TMStatusReqHandler) Process

func (rcvr *TMStatusReqHandler) Process(jobInstanceId int64, reqs []interface{}, workerIdAddr string)

type TaskDispatchReqHandler

type TaskDispatchReqHandler interface {
	ReqHandler
	SetDispatchSize(dispatchSize int)
}

func NewTaskPullReqHandler

func NewTaskPullReqHandler(jobInstanceId int64, coreBatchThreadNum int, maxBatchThreadNum int, batchSize int32, queue *ReqQueue) TaskDispatchReqHandler

func NewTaskPushReqHandler

func NewTaskPushReqHandler(jobInstanceId int64, coreBatchThreadNum int, maxBatchThreadNum int, batchSize int32, queue *ReqQueue, dispatchSize int32) TaskDispatchReqHandler

type TaskPullReqHandler

type TaskPullReqHandler struct {
	*BaseTaskDispatchReqHandler
}

func (*TaskPullReqHandler) Process

func (h *TaskPullReqHandler) Process(jobInstanceId int64, reqs []interface{}, workerIdAddr string)

func (*TaskPullReqHandler) Start

func (h *TaskPullReqHandler) Start(handler ReqHandler) error

type TaskPushReqHandler

type TaskPushReqHandler struct {
	*BaseTaskDispatchReqHandler
	// contains filtered or unexported fields
}

func (*TaskPushReqHandler) Process

func (h *TaskPushReqHandler) Process(jobInstanceId int64, reqs []interface{}, workerAddr string)

func (*TaskPushReqHandler) Start

func (h *TaskPushReqHandler) Start(handler ReqHandler) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL