master

package
v1.0.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 12, 2024 License: Apache-2.0 Imports: 35 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewBaseUpdateInstanceStatusHandler

func NewBaseUpdateInstanceStatusHandler(jobInstanceInfo *common.JobInstanceInfo, taskMaster taskmaster.TaskMaster) *baseUpdateInstanceStatusHandler

func NewBroadcastTaskMaster added in v0.0.2

func NewBroadcastTaskMaster(jobInstanceInfo *common.JobInstanceInfo, actorCtx actor.Context) taskmaster.TaskMaster

func NewMapTaskMaster added in v0.0.2

func NewMapTaskMaster(jobInstanceInfo *common.JobInstanceInfo, actorCtx actor.Context) taskmaster.TaskMaster

func NewStandaloneTaskMaster

func NewStandaloneTaskMaster(jobInstanceInfo *common.JobInstanceInfo, actorCtx actor.Context) taskmaster.TaskMaster

Types

type BatchTaskMaster added in v0.0.2

type BatchTaskMaster struct {
	*GridTaskMaster
}

func NewBatchTaskMaster added in v0.0.2

func NewBatchTaskMaster(jobInstanceInfo *common.JobInstanceInfo, actorCtx actor.Context) *BatchTaskMaster

type BroadcastTaskMaster added in v0.0.2

type BroadcastTaskMaster struct {
	*TaskMaster
	// contains filtered or unexported fields
}

func (*BroadcastTaskMaster) CheckProcessor added in v0.0.2

func (m *BroadcastTaskMaster) CheckProcessor()

func (*BroadcastTaskMaster) Clear added in v0.0.2

func (m *BroadcastTaskMaster) Clear(taskMaster taskmaster.TaskMaster)

func (*BroadcastTaskMaster) DestroyContainerPool added in v0.0.2

func (m *BroadcastTaskMaster) DestroyContainerPool()

func (*BroadcastTaskMaster) GetJobInstanceProgress added in v0.0.2

func (m *BroadcastTaskMaster) GetJobInstanceProgress() (string, error)

func (*BroadcastTaskMaster) GetWorkerProgressMap added in v0.0.2

func (m *BroadcastTaskMaster) GetWorkerProgressMap() *sync.Map

func (*BroadcastTaskMaster) KillInstance added in v0.0.2

func (m *BroadcastTaskMaster) KillInstance(reason string) error

func (*BroadcastTaskMaster) PostFinish added in v0.0.2

func (m *BroadcastTaskMaster) PostFinish(jobInstanceId int64) *processor.ProcessResult

func (*BroadcastTaskMaster) SubmitInstance added in v0.0.2

func (m *BroadcastTaskMaster) SubmitInstance(ctx context.Context, jobInstanceInfo *common.JobInstanceInfo) error

func (*BroadcastTaskMaster) UpdateTaskStatus added in v0.0.2

type GridTaskMaster added in v0.0.2

type GridTaskMaster struct {
	*MapTaskMaster
}

func NewGridTaskMaster added in v0.0.2

func NewGridTaskMaster(jobInstanceInfo *common.JobInstanceInfo, actorCtx actor.Context) *GridTaskMaster

func (*GridTaskMaster) Map added in v0.0.2

func (m *GridTaskMaster) Map(jobCtx *jobcontext.JobContext, taskList [][]byte, taskName string) (bool, error)

func (*GridTaskMaster) PostFinish added in v0.0.2

func (m *GridTaskMaster) PostFinish(jobInstanceId int64) *processor.ProcessResult

type MapTaskMaster added in v0.0.2

type MapTaskMaster struct {
	*TaskMaster
	// contains filtered or unexported fields
}

func (*MapTaskMaster) BatchDispatchTasks added in v0.0.2

func (m *MapTaskMaster) BatchDispatchTasks(masterStartContainerRequests []*schedulerx.MasterStartContainerRequest, remoteWorker string)

BatchDispatchTasks dispatches tasks

func (*MapTaskMaster) BatchHandlePulledProgress added in v0.0.2

func (m *MapTaskMaster) BatchHandlePulledProgress(masterStartContainerRequests []*schedulerx.MasterStartContainerRequest,
	remoteWorker string) (map[string][]*schedulerx.MasterStartContainerRequest, map[string][]*schedulerx.MasterStartContainerRequest)

func (*MapTaskMaster) BatchPullTasks added in v0.0.2

func (m *MapTaskMaster) BatchPullTasks(masterStartContainerRequests []*schedulerx.MasterStartContainerRequest, workerIdAddr string)

func (*MapTaskMaster) BatchUpdateTaskStatues added in v0.0.2

func (m *MapTaskMaster) BatchUpdateTaskStatues(requests []*schedulerx.ContainerReportTaskStatusRequest)

func (*MapTaskMaster) CheckProcessor added in v0.0.2

func (m *MapTaskMaster) CheckProcessor()

func (*MapTaskMaster) Clear added in v0.0.2

func (m *MapTaskMaster) Clear(taskMaster taskmaster.TaskMaster)

func (*MapTaskMaster) DestroyContainerPool added in v0.0.2

func (m *MapTaskMaster) DestroyContainerPool()

func (*MapTaskMaster) GetJobInstanceProgress added in v0.0.2

func (m *MapTaskMaster) GetJobInstanceProgress() (string, error)

func (*MapTaskMaster) GetRootTaskResult added in v0.0.2

func (m *MapTaskMaster) GetRootTaskResult() string

func (*MapTaskMaster) GetTaskProgressMap added in v0.0.2

func (m *MapTaskMaster) GetTaskProgressMap() *sync.Map

func (*MapTaskMaster) KillInstance added in v0.0.2

func (m *MapTaskMaster) KillInstance(reason string) error

func (*MapTaskMaster) KillTask added in v0.0.2

func (m *MapTaskMaster) KillTask(uniqueId string, workerId string, workerAddr string)

func (*MapTaskMaster) Map added in v0.0.2

func (m *MapTaskMaster) Map(jobCtx *jobcontext.JobContext, taskList [][]byte, taskName string) (bool, error)

func (*MapTaskMaster) PostFinish added in v0.0.2

func (m *MapTaskMaster) PostFinish(jobInstanceId int64) *processor.ProcessResult

func (*MapTaskMaster) SetRootTaskResult added in v0.0.2

func (m *MapTaskMaster) SetRootTaskResult(rootTaskResult string)

func (*MapTaskMaster) Stop added in v0.0.2

func (m *MapTaskMaster) Stop()

func (*MapTaskMaster) SubmitInstance added in v0.0.2

func (m *MapTaskMaster) SubmitInstance(ctx context.Context, jobInstanceInfo *common.JobInstanceInfo) error

func (*MapTaskMaster) SyncPullTasks added in v0.0.2

func (m *MapTaskMaster) SyncPullTasks(pageSize int32, workerIdAddr string) []*schedulerx.MasterStartContainerRequest

func (*MapTaskMaster) UpdateTaskStatus added in v0.0.2

func (m *MapTaskMaster) UpdateTaskStatus(request *schedulerx.ContainerReportTaskStatusRequest) error

type ParallelTaskMaster added in v0.0.2

type ParallelTaskMaster struct {
	*MapTaskMaster
	// contains filtered or unexported fields
}

ParallelTaskMaster using persistence.ServerTaskPersistence

func NewParallelTaskMaster added in v0.0.2

func NewParallelTaskMaster(jobInstanceInfo *common.JobInstanceInfo, actorCtx actor.Context) *ParallelTaskMaster

func (*ParallelTaskMaster) Map added in v0.0.2

func (m *ParallelTaskMaster) Map(jobCtx *jobcontext.JobContext, taskList [][]byte, taskName string) (bool, error)

func (*ParallelTaskMaster) RetryTasks added in v0.0.2

func (m *ParallelTaskMaster) RetryTasks(taskEntities []*schedulerx.RetryTaskEntity)

type ShardingTaskMaster added in v0.0.2

type ShardingTaskMaster struct {
	*GridTaskMaster
	// contains filtered or unexported fields
}

func NewShardingTaskMaster added in v0.0.2

func NewShardingTaskMaster(jobInstanceInfo *common.JobInstanceInfo, actorCtx actor.Context) *ShardingTaskMaster

func (*ShardingTaskMaster) BatchHandlePulledProgress added in v0.0.2

func (m *ShardingTaskMaster) BatchHandlePulledProgress(masterStartContainerRequests []*schedulerx.MasterStartContainerRequest,
	remoteWorker string) (map[string][]*schedulerx.MasterStartContainerRequest, map[string][]*schedulerx.MasterStartContainerRequest)

func (*ShardingTaskMaster) BatchUpdateTaskStatues added in v0.0.2

func (m *ShardingTaskMaster) BatchUpdateTaskStatues(requests []*schedulerx.ContainerReportTaskStatusRequest)

func (*ShardingTaskMaster) CheckProcessor added in v0.0.2

func (m *ShardingTaskMaster) CheckProcessor()

func (*ShardingTaskMaster) Clear added in v0.0.2

func (m *ShardingTaskMaster) Clear(taskMaster taskmaster.TaskMaster)

func (*ShardingTaskMaster) GetJobInstanceProgress added in v0.0.2

func (m *ShardingTaskMaster) GetJobInstanceProgress() (string, error)

func (*ShardingTaskMaster) PostFinish added in v0.0.2

func (m *ShardingTaskMaster) PostFinish(jobInstanceId int64) *processor.ProcessResult

func (*ShardingTaskMaster) SubmitInstance added in v0.0.2

func (m *ShardingTaskMaster) SubmitInstance(ctx context.Context, jobInstanceInfo *common.JobInstanceInfo) error

type StandaloneTaskMaster

type StandaloneTaskMaster struct {
	*TaskMaster
	// contains filtered or unexported fields
}

func (*StandaloneTaskMaster) CheckProcessor added in v0.0.2

func (m *StandaloneTaskMaster) CheckProcessor() error

func (*StandaloneTaskMaster) DestroyContainerPool added in v0.0.2

func (m *StandaloneTaskMaster) DestroyContainerPool()

func (*StandaloneTaskMaster) GetCurrentSelection added in v0.0.2

func (m *StandaloneTaskMaster) GetCurrentSelection() string

func (*StandaloneTaskMaster) KillInstance

func (m *StandaloneTaskMaster) KillInstance(reason string) error

func (*StandaloneTaskMaster) SubmitInstance

func (m *StandaloneTaskMaster) SubmitInstance(ctx context.Context, jobInstanceInfo *common.JobInstanceInfo) error

type TaskMaster

type TaskMaster struct {
	// contains filtered or unexported fields
}

func NewTaskMaster

func NewTaskMaster(actorCtx actor.Context, jobInstanceInfo *common.JobInstanceInfo, statusHandler UpdateInstanceStatusHandler) *TaskMaster

func (*TaskMaster) AcquireSerialNum

func (m *TaskMaster) AcquireSerialNum() int64

func (*TaskMaster) AcquireTaskId

func (m *TaskMaster) AcquireTaskId() int64

func (*TaskMaster) BatchUpdateTaskStatus

func (m *TaskMaster) BatchUpdateTaskStatus(taskMaster taskmaster.TaskMaster, req *schedulerx.ContainerBatchReportTaskStatuesRequest) error

BatchUpdateTaskStatus TODO: MapTaskMaster may override this method do really batch process

func (*TaskMaster) Clear

func (m *TaskMaster) Clear(taskMaster taskmaster.TaskMaster)

func (*TaskMaster) DestroyContainerPool

func (m *TaskMaster) DestroyContainerPool()

func (*TaskMaster) ExistInvalidWorker

func (m *TaskMaster) ExistInvalidWorker() bool

func (*TaskMaster) GetActorContext added in v0.0.2

func (m *TaskMaster) GetActorContext() actor.Context

func (*TaskMaster) GetAliveCheckWorkerSet

func (m *TaskMaster) GetAliveCheckWorkerSet() *utils.ConcurrentSet

GetAliveCheckWorkerSet return set<string>

func (*TaskMaster) GetCurrentSelection

func (m *TaskMaster) GetCurrentSelection() string

func (*TaskMaster) GetInstanceStatus

func (m *TaskMaster) GetInstanceStatus() processor.InstanceStatus

func (*TaskMaster) GetJobInstanceInfo

func (m *TaskMaster) GetJobInstanceInfo() *common.JobInstanceInfo

func (*TaskMaster) GetJobInstanceProgress

func (m *TaskMaster) GetJobInstanceProgress() (string, error)

func (*TaskMaster) GetLocalContainerRouterPath

func (m *TaskMaster) GetLocalContainerRouterPath() string

func (*TaskMaster) GetLocalJobInstanceRouterPath

func (m *TaskMaster) GetLocalJobInstanceRouterPath() string

func (*TaskMaster) GetLocalTaskRouterPath

func (m *TaskMaster) GetLocalTaskRouterPath() string

func (*TaskMaster) GetLocalWorkerIdAddr

func (m *TaskMaster) GetLocalWorkerIdAddr() string

func (*TaskMaster) GetSerialNum

func (m *TaskMaster) GetSerialNum() int64

func (*TaskMaster) Init

func (m *TaskMaster) Init()

func (*TaskMaster) IsInited

func (m *TaskMaster) IsInited() bool

func (*TaskMaster) IsJobInstanceFinished

func (m *TaskMaster) IsJobInstanceFinished() bool

func (*TaskMaster) IsKilled

func (m *TaskMaster) IsKilled() bool

func (*TaskMaster) KillInstance

func (m *TaskMaster) KillInstance(reason string) error

func (*TaskMaster) KillTask

func (m *TaskMaster) KillTask(uniqueId, workerId, workerAddr string)

func (*TaskMaster) PostFinish

func (m *TaskMaster) PostFinish(jobInstanceId int64) *processor.ProcessResult

func (*TaskMaster) ResetJobInstanceWorkerList

func (m *TaskMaster) ResetJobInstanceWorkerList()

func (*TaskMaster) RestJobInstanceWorkerList added in v0.0.2

func (m *TaskMaster) RestJobInstanceWorkerList(freeWorkers *utils.Set)

func (*TaskMaster) RetryTasks

func (m *TaskMaster) RetryTasks(taskEntities []schedulerx.RetryTaskEntity)

func (*TaskMaster) SetInstanceStatus

func (m *TaskMaster) SetInstanceStatus(instanceStatus processor.InstanceStatus)

func (*TaskMaster) Stop

func (m *TaskMaster) Stop()

func (*TaskMaster) SubmitInstance

func (m *TaskMaster) SubmitInstance(ctx context.Context, jobInstanceInfo *common.JobInstanceInfo) error

func (*TaskMaster) UpdateNewInstanceStatus

func (m *TaskMaster) UpdateNewInstanceStatus(serialNum int64, newStatus processor.InstanceStatus, result string) error

func (*TaskMaster) UpdateTaskStatus

func (m *TaskMaster) UpdateTaskStatus(req *schedulerx.ContainerReportTaskStatusRequest) error

type TimePlanEntry added in v0.0.2

type TimePlanEntry struct {
	// contains filtered or unexported fields
}

func NewTimePlanEntry added in v0.0.2

func NewTimePlanEntry(jobInstanceId int64, scheduleTimeStamp int64, handler *secondJobUpdateInstanceStatusHandler) *TimePlanEntry

func (*TimePlanEntry) Handler added in v0.0.2

func (t *TimePlanEntry) Handler() *secondJobUpdateInstanceStatusHandler

func (*TimePlanEntry) JobInstanceId added in v0.0.2

func (t *TimePlanEntry) JobInstanceId() int64

func (*TimePlanEntry) Priority added in v0.0.2

func (t *TimePlanEntry) Priority() int64

func (*TimePlanEntry) ScheduleTimeStamp added in v0.0.2

func (t *TimePlanEntry) ScheduleTimeStamp() int64

func (*TimePlanEntry) SetHandler added in v0.0.2

func (t *TimePlanEntry) SetHandler(handler *secondJobUpdateInstanceStatusHandler)

func (*TimePlanEntry) SetJobInstanceId added in v0.0.2

func (t *TimePlanEntry) SetJobInstanceId(jobInstanceId int64)

func (*TimePlanEntry) SetScheduleTimeStamp added in v0.0.2

func (t *TimePlanEntry) SetScheduleTimeStamp(scheduleTimeStamp int64)

func (*TimePlanEntry) Value added in v0.0.2

func (t *TimePlanEntry) Value() interface{}

type TimeQueue added in v0.0.2

type TimeQueue struct {
	// contains filtered or unexported fields
}

TimeQueue Time queue sorted by scheduling time and task priority

func NewTimeQueue added in v0.0.2

func NewTimeQueue() *TimeQueue

func (*TimeQueue) Add added in v0.0.2

func (q *TimeQueue) Add(timePlanEntry *TimePlanEntry)

func (*TimeQueue) Clear added in v0.0.2

func (q *TimeQueue) Clear()

func (*TimeQueue) IsEmpty added in v0.0.2

func (q *TimeQueue) IsEmpty() bool

func (*TimeQueue) Peek added in v0.0.2

func (q *TimeQueue) Peek() *TimePlanEntry

Peek return the head of this queue, or returns null if this queue is empty.

func (*TimeQueue) Remove added in v0.0.2

func (q *TimeQueue) Remove(jobInstanceId int64)

func (*TimeQueue) RemoveHeader added in v0.0.2

func (q *TimeQueue) RemoveHeader() *TimePlanEntry

RemoveHeader removes the head of this queue.

func (*TimeQueue) Size added in v0.0.2

func (q *TimeQueue) Size() int

type TimeScheduler added in v0.0.2

type TimeScheduler struct {
	// contains filtered or unexported fields
}

TimeScheduler is the client time scheduler, mainly used for second-level task scheduling.

func GetTimeScheduler added in v0.0.2

func GetTimeScheduler() *TimeScheduler

type UpdateInstanceStatusHandler

type UpdateInstanceStatusHandler interface {
	Handle(serialNum int64, newStatus processor.InstanceStatus, result string) error
}

func NewCommonUpdateInstanceStatusHandler added in v0.0.2

func NewCommonUpdateInstanceStatusHandler(actorContext actor.Context, taskMaster taskmaster.TaskMaster, jobInstanceInfo *common.JobInstanceInfo) (rcvr UpdateInstanceStatusHandler)

func NewSecondJobUpdateInstanceStatusHandler added in v0.0.2

func NewSecondJobUpdateInstanceStatusHandler(actorCtx actor.Context, taskMaster taskmaster.TaskMaster, jobInstanceInfo *common.JobInstanceInfo) UpdateInstanceStatusHandler

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL