Documentation ¶
Index ¶
- Variables
- func WithTransaction(ctx context.Context, db *sql.DB, fn SQLTxFunc) (err error)
- type H2ConnectionPool
- type H2FilePersistence
- type H2MemoryPersistence
- type H2Persistence
- func (rcvr *H2Persistence) BatchUpdateTaskStatus(jobInstanceId int64, status taskstatus.TaskStatus, workerId string, ...) int64
- func (rcvr *H2Persistence) CheckInstanceStatus(jobInstanceId int64) processor.InstanceStatus
- func (rcvr *H2Persistence) ClearTasks(jobInstanceId int64) error
- func (rcvr *H2Persistence) CreateTask(jobId, jobInstanceId, taskId int64, taskName string, taskBody []byte) error
- func (rcvr *H2Persistence) CreateTasks(containers []*schedulerx.MasterStartContainerRequest, ...) error
- func (rcvr *H2Persistence) GetDistinctInstanceIds() []int64
- func (rcvr *H2Persistence) GetTaskStatistics() *common.TaskStatistics
- func (rcvr *H2Persistence) InitTable()
- func (rcvr *H2Persistence) IsInited() bool
- func (rcvr *H2Persistence) Pull(jobInstanceId int64, pageSize int32) ([]*common.TaskInfo, error)
- func (rcvr *H2Persistence) UpdateTaskStatues(taskStatusInfos []*schedulerx.ContainerReportTaskStatusRequest) error
- func (rcvr *H2Persistence) UpdateTaskStatus(jobInstanceId int64, taskIds []int64, status taskstatus.TaskStatus, ...) (int64, error)
- type Option
- type Options
- type SQLTxFunc
- type ServerTaskPersistence
- func (rcvr *ServerTaskPersistence) BatchUpdateTaskStatus(jobInstanceId int64, status taskstatus.TaskStatus, workerId string, ...) int64
- func (rcvr *ServerTaskPersistence) CheckInstanceStatus(jobInstanceId int64) processor.InstanceStatus
- func (rcvr *ServerTaskPersistence) ClearTasks(jobInstanceId int64) error
- func (rcvr *ServerTaskPersistence) CreateTask(jobId int64, jobInstanceId int64, taskId int64, taskName string, ...) error
- func (rcvr *ServerTaskPersistence) CreateTasks(containers []*schedulerx.MasterStartContainerRequest, workerId string, ...) error
- func (rcvr *ServerTaskPersistence) InitTable()
- func (rcvr *ServerTaskPersistence) Pull(jobInstanceId int64, pageSize int32) ([]*common.TaskInfo, error)
- func (rcvr *ServerTaskPersistence) UpdateTaskStatues(taskStatusInfos []*schedulerx.ContainerReportTaskStatusRequest) error
- func (rcvr *ServerTaskPersistence) UpdateTaskStatus(jobInstanceId int64, taskIds []int64, status taskstatus.TaskStatus, ...) (int64, error)
- type TaskDao
- func (d *TaskDao) BatchDeleteTasks(jobInstanceId int64, taskIds []int64) (int64, error)
- func (d *TaskDao) BatchDeleteTasks2(jobInstanceId int64, workerId string, workerAddr string) (int64, error)
- func (d *TaskDao) BatchInsert(containers []*schedulerx.MasterStartContainerRequest, workerId string, ...) (int64, error)
- func (d *TaskDao) BatchUpdateStatus(jobInstanceId int64, taskIdList []int64, status int) (int64, error)
- func (d *TaskDao) BatchUpdateStatus2(jobInstanceId int64, status int, workerId string, workerAddr string) (int64, error)
- func (d *TaskDao) CreateTable() error
- func (d *TaskDao) DeleteByJobInstanceId(jobInstanceId int64) (int64, error)
- func (d *TaskDao) DropTable() error
- func (d *TaskDao) Exist(jobInstanceId int64) (bool, error)
- func (d *TaskDao) GetDistinctInstanceIds() ([]int64, error)
- func (d *TaskDao) GetTaskStatistics() (*common.TaskStatistics, error)
- func (d *TaskDao) Insert(jobId int64, jobInstanceId int64, taskId int64, taskName string, ...) error
- func (d *TaskDao) QueryStatus(jobInstanceId int64) ([]int32, error)
- func (d *TaskDao) QueryTaskCount(jobInstanceId int64) (int64, error)
- func (d *TaskDao) QueryTaskList(jobInstanceId int64, status int, pageSize int32) ([]*TaskSnapshot, error)
- func (d *TaskDao) QueryTasks(jobInstanceId int64, pageSize int32) ([]*TaskSnapshot, error)
- func (d *TaskDao) UpdateStatus(jobInstanceId int64, taskId int64, status int, workerAddr string) (int64, error)
- func (d *TaskDao) UpdateStatus2(jobInstanceId int64, taskIds []int64, status int, workerId string, ...) (int64, error)
- func (d *TaskDao) UpdateWorker(jobInstanceId int64, taskId int64, workerId string, workerAddr string) (int64, error)
- type TaskPersistence
- type TaskSnapshot
- func (t *TaskSnapshot) GetGmtCreate() time.Time
- func (t *TaskSnapshot) GetGmtModified() time.Time
- func (t *TaskSnapshot) GetJobId() int64
- func (t *TaskSnapshot) GetJobInstanceId() int64
- func (t *TaskSnapshot) GetProgress() float64
- func (t *TaskSnapshot) GetStatus() int32
- func (t *TaskSnapshot) GetTaskBody() []byte
- func (t *TaskSnapshot) GetTaskId() int64
- func (t *TaskSnapshot) GetTaskName() string
- func (t *TaskSnapshot) GetWorkerAddr() string
- func (t *TaskSnapshot) GetWorkerId() string
- func (t *TaskSnapshot) SetGmtCreate(gmtCreate time.Time)
- func (t *TaskSnapshot) SetGmtModified(gmtModified time.Time)
- func (t *TaskSnapshot) SetJobId(jobId int64)
- func (t *TaskSnapshot) SetJobInstanceId(jobInstanceId int64)
- func (t *TaskSnapshot) SetProgress(progress float64)
- func (t *TaskSnapshot) SetStatus(Status int32)
- func (t *TaskSnapshot) SetTaskBody(taskBody []byte)
- func (t *TaskSnapshot) SetTaskId(taskId int64)
- func (t *TaskSnapshot) SetTaskName(taskName string)
- func (t *TaskSnapshot) SetWorker(workerAddr string)
- func (t *TaskSnapshot) SetWorkerId(workerId string)
Constants ¶
This section is empty.
Variables ¶
View Source
var (
ErrTimeout = errors.New("timeout")
)
Functions ¶
Types ¶
type H2ConnectionPool ¶
func NewH2ConnectionPool ¶
func NewH2ConnectionPool(opts ...Option) (*H2ConnectionPool, error)
type H2FilePersistence ¶
type H2FilePersistence struct {
*H2Persistence
}
func GetH2FilePersistence ¶
func GetH2FilePersistence() *H2FilePersistence
type H2MemoryPersistence ¶
type H2MemoryPersistence struct {
*H2Persistence
}
H2MemoryPersistence is a singleton, so that only executes initTable on the first initialization
func GetH2MemoryPersistence ¶
func GetH2MemoryPersistence() *H2MemoryPersistence
type H2Persistence ¶
type H2Persistence struct {
// contains filtered or unexported fields
}
H2Persistence is a singleton, so that only executes initTable on the first initialization
func GetH2Persistence ¶
func GetH2Persistence() *H2Persistence
func NewH2Persistence ¶
func NewH2Persistence() *H2Persistence
func (*H2Persistence) BatchUpdateTaskStatus ¶
func (rcvr *H2Persistence) BatchUpdateTaskStatus(jobInstanceId int64, status taskstatus.TaskStatus, workerId string, workerAddr string) int64
func (*H2Persistence) CheckInstanceStatus ¶
func (rcvr *H2Persistence) CheckInstanceStatus(jobInstanceId int64) processor.InstanceStatus
func (*H2Persistence) ClearTasks ¶
func (rcvr *H2Persistence) ClearTasks(jobInstanceId int64) error
func (*H2Persistence) CreateTask ¶
func (rcvr *H2Persistence) CreateTask(jobId, jobInstanceId, taskId int64, taskName string, taskBody []byte) error
func (*H2Persistence) CreateTasks ¶
func (rcvr *H2Persistence) CreateTasks(containers []*schedulerx.MasterStartContainerRequest, workerId, workerAddr string) error
func (*H2Persistence) GetDistinctInstanceIds ¶
func (rcvr *H2Persistence) GetDistinctInstanceIds() []int64
GetDistinctInstanceIds get the remaining terminated but undeleted instances in H2
func (*H2Persistence) GetTaskStatistics ¶
func (rcvr *H2Persistence) GetTaskStatistics() *common.TaskStatistics
GetTaskStatistics get H2 tasks summary statistics
func (*H2Persistence) InitTable ¶
func (rcvr *H2Persistence) InitTable()
func (*H2Persistence) IsInited ¶
func (rcvr *H2Persistence) IsInited() bool
func (*H2Persistence) UpdateTaskStatues ¶
func (rcvr *H2Persistence) UpdateTaskStatues(taskStatusInfos []*schedulerx.ContainerReportTaskStatusRequest) error
UpdateTaskStatues .
- !!!Attention!!! For Grid/Batch tasks, this method invoked only when finish statuses updated.
- In order to reduce h2 size, this method will delete all finish tasks;
- taskStatusInfos list of task Status
func (*H2Persistence) UpdateTaskStatus ¶
func (rcvr *H2Persistence) UpdateTaskStatus(jobInstanceId int64, taskIds []int64, status taskstatus.TaskStatus, workerId, workerAddr string) (int64, error)
type SQLTxFunc ¶
SQLTxFunc is a function that will be called with an initialized 'DbTx' object that can be used for executing statements and queries against a database.
type ServerTaskPersistence ¶
type ServerTaskPersistence struct {
// contains filtered or unexported fields
}
func NewServerTaskPersistence ¶
func NewServerTaskPersistence(groupId string) (rcvr *ServerTaskPersistence)
func (*ServerTaskPersistence) BatchUpdateTaskStatus ¶
func (rcvr *ServerTaskPersistence) BatchUpdateTaskStatus(jobInstanceId int64, status taskstatus.TaskStatus, workerId string, workerAddr string) int64
func (*ServerTaskPersistence) CheckInstanceStatus ¶
func (rcvr *ServerTaskPersistence) CheckInstanceStatus(jobInstanceId int64) processor.InstanceStatus
func (*ServerTaskPersistence) ClearTasks ¶
func (rcvr *ServerTaskPersistence) ClearTasks(jobInstanceId int64) error
func (*ServerTaskPersistence) CreateTask ¶
func (rcvr *ServerTaskPersistence) CreateTask(jobId int64, jobInstanceId int64, taskId int64, taskName string, taskBody []byte) error
CreateTask do nothing, already create by server.
func (*ServerTaskPersistence) CreateTasks ¶
func (rcvr *ServerTaskPersistence) CreateTasks(containers []*schedulerx.MasterStartContainerRequest, workerId string, workerAddr string) error
func (*ServerTaskPersistence) InitTable ¶
func (rcvr *ServerTaskPersistence) InitTable()
func (*ServerTaskPersistence) UpdateTaskStatues ¶
func (rcvr *ServerTaskPersistence) UpdateTaskStatues(taskStatusInfos []*schedulerx.ContainerReportTaskStatusRequest) error
func (*ServerTaskPersistence) UpdateTaskStatus ¶
func (rcvr *ServerTaskPersistence) UpdateTaskStatus(jobInstanceId int64, taskIds []int64, status taskstatus.TaskStatus, workerId, workerAddr string) (int64, error)
type TaskDao ¶
type TaskDao struct {
// contains filtered or unexported fields
}
func NewTaskDao ¶
func NewTaskDao(h2CP *H2ConnectionPool) *TaskDao
func (*TaskDao) BatchDeleteTasks ¶
func (*TaskDao) BatchDeleteTasks2 ¶
func (*TaskDao) BatchInsert ¶
func (d *TaskDao) BatchInsert(containers []*schedulerx.MasterStartContainerRequest, workerId string, workerAddr string) (int64, error)
func (*TaskDao) BatchUpdateStatus ¶
func (*TaskDao) BatchUpdateStatus2 ¶
func (*TaskDao) CreateTable ¶
func (*TaskDao) DeleteByJobInstanceId ¶
func (*TaskDao) GetDistinctInstanceIds ¶
func (*TaskDao) GetTaskStatistics ¶
func (d *TaskDao) GetTaskStatistics() (*common.TaskStatistics, error)
func (*TaskDao) QueryTaskCount ¶
func (*TaskDao) QueryTaskList ¶
func (*TaskDao) QueryTasks ¶
func (d *TaskDao) QueryTasks(jobInstanceId int64, pageSize int32) ([]*TaskSnapshot, error)
func (*TaskDao) UpdateStatus ¶
func (*TaskDao) UpdateStatus2 ¶
type TaskPersistence ¶
type TaskPersistence interface { // InitTable init task table InitTable() // UpdateTaskStatus update tasks Status and Worker info UpdateTaskStatus(jobInstanceId int64, taskIds []int64, status taskstatus.TaskStatus, workerId, workerAddr string) (int64, error) // UpdateTaskStatues update task statues accord to list of TaskStatusInfo UpdateTaskStatues(taskStatusInfos []*schedulerx.ContainerReportTaskStatusRequest) error // ClearTasks clear all tasks belong to specific job instance ClearTasks(jobInstanceId int64) error // CreateTask create task CreateTask(jobId, jobInstanceId, taskId int64, taskName string, taskBody []byte) error // CreateTasks batch create container infos CreateTasks(containers []*schedulerx.MasterStartContainerRequest, workerId, workerAddr string) error // BatchUpdateTaskStatus update tasks Status using condition BatchUpdateTaskStatus(jobInstanceId int64, status taskstatus.TaskStatus, workerId string, workerAddr string) int64 // CheckInstanceStatus check job instance current Status CheckInstanceStatus(jobInstanceId int64) processor.InstanceStatus // Pull pull init tasks for failover retry Pull(jobInstanceId int64, pageSize int32) ([]*common.TaskInfo, error) }
type TaskSnapshot ¶
type TaskSnapshot struct { JobId int64 `db:"job_id"` JobInstanceId int64 `db:"job_instance_id"` TaskId int64 `db:"task_id"` TaskName string `db:"task_name"` Status int32 `db:"status"` Progress float64 `db:"progress"` GmtCreate time.Time `db:"gmt_create"` GmtModified time.Time `db:"gmt_modified"` WorkerAddr string `db:"worker_addr"` WorkerId string `db:"worker_id"` TaskBody []byte `db:"task_body"` }
func NewTaskSnapshot ¶
func NewTaskSnapshot() (rcvr *TaskSnapshot)
func (*TaskSnapshot) GetGmtCreate ¶
func (t *TaskSnapshot) GetGmtCreate() time.Time
func (*TaskSnapshot) GetGmtModified ¶
func (t *TaskSnapshot) GetGmtModified() time.Time
func (*TaskSnapshot) GetJobId ¶
func (t *TaskSnapshot) GetJobId() int64
func (*TaskSnapshot) GetJobInstanceId ¶
func (t *TaskSnapshot) GetJobInstanceId() int64
func (*TaskSnapshot) GetProgress ¶
func (t *TaskSnapshot) GetProgress() float64
func (*TaskSnapshot) GetStatus ¶
func (t *TaskSnapshot) GetStatus() int32
func (*TaskSnapshot) GetTaskBody ¶
func (t *TaskSnapshot) GetTaskBody() []byte
func (*TaskSnapshot) GetTaskId ¶
func (t *TaskSnapshot) GetTaskId() int64
func (*TaskSnapshot) GetTaskName ¶
func (t *TaskSnapshot) GetTaskName() string
func (*TaskSnapshot) GetWorkerAddr ¶
func (t *TaskSnapshot) GetWorkerAddr() string
func (*TaskSnapshot) GetWorkerId ¶
func (t *TaskSnapshot) GetWorkerId() string
func (*TaskSnapshot) SetGmtCreate ¶
func (t *TaskSnapshot) SetGmtCreate(gmtCreate time.Time)
func (*TaskSnapshot) SetGmtModified ¶
func (t *TaskSnapshot) SetGmtModified(gmtModified time.Time)
func (*TaskSnapshot) SetJobId ¶
func (t *TaskSnapshot) SetJobId(jobId int64)
func (*TaskSnapshot) SetJobInstanceId ¶
func (t *TaskSnapshot) SetJobInstanceId(jobInstanceId int64)
func (*TaskSnapshot) SetProgress ¶
func (t *TaskSnapshot) SetProgress(progress float64)
func (*TaskSnapshot) SetStatus ¶
func (t *TaskSnapshot) SetStatus(Status int32)
func (*TaskSnapshot) SetTaskBody ¶
func (t *TaskSnapshot) SetTaskBody(taskBody []byte)
func (*TaskSnapshot) SetTaskId ¶
func (t *TaskSnapshot) SetTaskId(taskId int64)
func (*TaskSnapshot) SetTaskName ¶
func (t *TaskSnapshot) SetTaskName(taskName string)
func (*TaskSnapshot) SetWorker ¶
func (t *TaskSnapshot) SetWorker(workerAddr string)
func (*TaskSnapshot) SetWorkerId ¶
func (t *TaskSnapshot) SetWorkerId(workerId string)
Click to show internal directories.
Click to hide internal directories.