persistence

package
v1.0.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 12, 2024 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrTimeout = errors.New("timeout")
)

Functions

func WithTransaction

func WithTransaction(ctx context.Context, db *sql.DB, fn SQLTxFunc) (err error)

WithTransaction creates a new transaction and handles rollback/commit based on the error object returned by the 'SQLTxFunc'

Types

type H2ConnectionPool

type H2ConnectionPool struct {
	*sql.DB
}

func NewH2ConnectionPool

func NewH2ConnectionPool(opts ...Option) (*H2ConnectionPool, error)

type H2FilePersistence

type H2FilePersistence struct {
	*H2Persistence
}

func GetH2FilePersistence

func GetH2FilePersistence() *H2FilePersistence

type H2MemoryPersistence

type H2MemoryPersistence struct {
	*H2Persistence
}

H2MemoryPersistence is a singleton, so that only executes initTable on the first initialization

func GetH2MemoryPersistence

func GetH2MemoryPersistence() *H2MemoryPersistence

type H2Persistence

type H2Persistence struct {
	// contains filtered or unexported fields
}

H2Persistence is a singleton, so that only executes initTable on the first initialization

func GetH2Persistence

func GetH2Persistence() *H2Persistence

func NewH2Persistence

func NewH2Persistence() *H2Persistence

func (*H2Persistence) BatchUpdateTaskStatus

func (rcvr *H2Persistence) BatchUpdateTaskStatus(jobInstanceId int64, status taskstatus.TaskStatus, workerId string, workerAddr string) int64

func (*H2Persistence) CheckInstanceStatus

func (rcvr *H2Persistence) CheckInstanceStatus(jobInstanceId int64) processor.InstanceStatus

func (*H2Persistence) ClearTasks

func (rcvr *H2Persistence) ClearTasks(jobInstanceId int64) error

func (*H2Persistence) CreateTask

func (rcvr *H2Persistence) CreateTask(jobId, jobInstanceId, taskId int64, taskName string, taskBody []byte) error

func (*H2Persistence) CreateTasks

func (rcvr *H2Persistence) CreateTasks(containers []*schedulerx.MasterStartContainerRequest, workerId, workerAddr string) error

func (*H2Persistence) GetDistinctInstanceIds

func (rcvr *H2Persistence) GetDistinctInstanceIds() []int64

GetDistinctInstanceIds get the remaining terminated but undeleted instances in H2

func (*H2Persistence) GetTaskStatistics

func (rcvr *H2Persistence) GetTaskStatistics() *common.TaskStatistics

GetTaskStatistics get H2 tasks summary statistics

func (*H2Persistence) InitTable

func (rcvr *H2Persistence) InitTable()

func (*H2Persistence) IsInited

func (rcvr *H2Persistence) IsInited() bool

func (*H2Persistence) Pull

func (rcvr *H2Persistence) Pull(jobInstanceId int64, pageSize int32) ([]*common.TaskInfo, error)

func (*H2Persistence) UpdateTaskStatues

func (rcvr *H2Persistence) UpdateTaskStatues(taskStatusInfos []*schedulerx.ContainerReportTaskStatusRequest) error

UpdateTaskStatues .

  • !!!Attention!!! For Grid/Batch tasks, this method invoked only when finish statuses updated.
  • In order to reduce h2 size, this method will delete all finish tasks;
  • taskStatusInfos list of task Status

func (*H2Persistence) UpdateTaskStatus

func (rcvr *H2Persistence) UpdateTaskStatus(jobInstanceId int64, taskIds []int64, status taskstatus.TaskStatus, workerId, workerAddr string) (int64, error)

type Option

type Option func(o *Options)

func WithDataSourceName

func WithDataSourceName(dataSourceName string) Option

type Options

type Options struct {
	// contains filtered or unexported fields
}

type SQLTxFunc

type SQLTxFunc func(tx *sql.Tx) error

SQLTxFunc is a function that will be called with an initialized 'DbTx' object that can be used for executing statements and queries against a database.

type ServerTaskPersistence

type ServerTaskPersistence struct {
	// contains filtered or unexported fields
}

func NewServerTaskPersistence

func NewServerTaskPersistence(groupId string) (rcvr *ServerTaskPersistence)

func (*ServerTaskPersistence) BatchUpdateTaskStatus

func (rcvr *ServerTaskPersistence) BatchUpdateTaskStatus(jobInstanceId int64, status taskstatus.TaskStatus, workerId string, workerAddr string) int64

func (*ServerTaskPersistence) CheckInstanceStatus

func (rcvr *ServerTaskPersistence) CheckInstanceStatus(jobInstanceId int64) processor.InstanceStatus

func (*ServerTaskPersistence) ClearTasks

func (rcvr *ServerTaskPersistence) ClearTasks(jobInstanceId int64) error

func (*ServerTaskPersistence) CreateTask

func (rcvr *ServerTaskPersistence) CreateTask(jobId int64, jobInstanceId int64, taskId int64, taskName string, taskBody []byte) error

CreateTask do nothing, already create by server.

func (*ServerTaskPersistence) CreateTasks

func (rcvr *ServerTaskPersistence) CreateTasks(containers []*schedulerx.MasterStartContainerRequest, workerId string, workerAddr string) error

func (*ServerTaskPersistence) InitTable

func (rcvr *ServerTaskPersistence) InitTable()

func (*ServerTaskPersistence) Pull

func (rcvr *ServerTaskPersistence) Pull(jobInstanceId int64, pageSize int32) ([]*common.TaskInfo, error)

func (*ServerTaskPersistence) UpdateTaskStatues

func (rcvr *ServerTaskPersistence) UpdateTaskStatues(taskStatusInfos []*schedulerx.ContainerReportTaskStatusRequest) error

func (*ServerTaskPersistence) UpdateTaskStatus

func (rcvr *ServerTaskPersistence) UpdateTaskStatus(jobInstanceId int64, taskIds []int64, status taskstatus.TaskStatus, workerId, workerAddr string) (int64, error)

type TaskDao

type TaskDao struct {
	// contains filtered or unexported fields
}

func NewTaskDao

func NewTaskDao(h2CP *H2ConnectionPool) *TaskDao

func (*TaskDao) BatchDeleteTasks

func (d *TaskDao) BatchDeleteTasks(jobInstanceId int64, taskIds []int64) (int64, error)

func (*TaskDao) BatchDeleteTasks2

func (d *TaskDao) BatchDeleteTasks2(jobInstanceId int64, workerId string, workerAddr string) (int64, error)

func (*TaskDao) BatchInsert

func (d *TaskDao) BatchInsert(containers []*schedulerx.MasterStartContainerRequest, workerId string, workerAddr string) (int64, error)

func (*TaskDao) BatchUpdateStatus

func (d *TaskDao) BatchUpdateStatus(jobInstanceId int64, taskIdList []int64, status int) (int64, error)

func (*TaskDao) BatchUpdateStatus2

func (d *TaskDao) BatchUpdateStatus2(jobInstanceId int64, status int, workerId string, workerAddr string) (int64, error)

func (*TaskDao) CreateTable

func (d *TaskDao) CreateTable() error

func (*TaskDao) DeleteByJobInstanceId

func (d *TaskDao) DeleteByJobInstanceId(jobInstanceId int64) (int64, error)

func (*TaskDao) DropTable

func (d *TaskDao) DropTable() error

func (*TaskDao) Exist

func (d *TaskDao) Exist(jobInstanceId int64) (bool, error)

func (*TaskDao) GetDistinctInstanceIds

func (d *TaskDao) GetDistinctInstanceIds() ([]int64, error)

func (*TaskDao) GetTaskStatistics

func (d *TaskDao) GetTaskStatistics() (*common.TaskStatistics, error)

func (*TaskDao) Insert

func (d *TaskDao) Insert(jobId int64, jobInstanceId int64, taskId int64, taskName string, taskBody []byte) error

func (*TaskDao) QueryStatus

func (d *TaskDao) QueryStatus(jobInstanceId int64) ([]int32, error)

func (*TaskDao) QueryTaskCount

func (d *TaskDao) QueryTaskCount(jobInstanceId int64) (int64, error)

func (*TaskDao) QueryTaskList

func (d *TaskDao) QueryTaskList(jobInstanceId int64, status int, pageSize int32) ([]*TaskSnapshot, error)

func (*TaskDao) QueryTasks

func (d *TaskDao) QueryTasks(jobInstanceId int64, pageSize int32) ([]*TaskSnapshot, error)

func (*TaskDao) UpdateStatus

func (d *TaskDao) UpdateStatus(jobInstanceId int64, taskId int64, status int, workerAddr string) (int64, error)

func (*TaskDao) UpdateStatus2

func (d *TaskDao) UpdateStatus2(jobInstanceId int64, taskIds []int64, status int, workerId string, workerAddr string) (int64, error)

func (*TaskDao) UpdateWorker

func (d *TaskDao) UpdateWorker(jobInstanceId int64, taskId int64, workerId string, workerAddr string) (int64, error)

type TaskPersistence

type TaskPersistence interface {
	// InitTable init task table
	InitTable()

	// UpdateTaskStatus update tasks Status and Worker info
	UpdateTaskStatus(jobInstanceId int64, taskIds []int64, status taskstatus.TaskStatus, workerId, workerAddr string) (int64, error)

	// UpdateTaskStatues update task statues accord to list of TaskStatusInfo
	UpdateTaskStatues(taskStatusInfos []*schedulerx.ContainerReportTaskStatusRequest) error

	// ClearTasks clear all tasks belong to specific job instance
	ClearTasks(jobInstanceId int64) error

	// CreateTask create task
	CreateTask(jobId, jobInstanceId, taskId int64, taskName string, taskBody []byte) error

	// CreateTasks batch create container infos
	CreateTasks(containers []*schedulerx.MasterStartContainerRequest, workerId, workerAddr string) error

	// BatchUpdateTaskStatus update tasks Status using condition
	BatchUpdateTaskStatus(jobInstanceId int64, status taskstatus.TaskStatus, workerId string, workerAddr string) int64

	// CheckInstanceStatus check job instance current Status
	CheckInstanceStatus(jobInstanceId int64) processor.InstanceStatus

	// Pull pull init tasks for failover retry
	Pull(jobInstanceId int64, pageSize int32) ([]*common.TaskInfo, error)
}

type TaskSnapshot

type TaskSnapshot struct {
	JobId         int64     `db:"job_id"`
	JobInstanceId int64     `db:"job_instance_id"`
	TaskId        int64     `db:"task_id"`
	TaskName      string    `db:"task_name"`
	Status        int32     `db:"status"`
	Progress      float64   `db:"progress"`
	GmtCreate     time.Time `db:"gmt_create"`
	GmtModified   time.Time `db:"gmt_modified"`
	WorkerAddr    string    `db:"worker_addr"`
	WorkerId      string    `db:"worker_id"`
	TaskBody      []byte    `db:"task_body"`
}

func NewTaskSnapshot

func NewTaskSnapshot() (rcvr *TaskSnapshot)

func (*TaskSnapshot) GetGmtCreate

func (t *TaskSnapshot) GetGmtCreate() time.Time

func (*TaskSnapshot) GetGmtModified

func (t *TaskSnapshot) GetGmtModified() time.Time

func (*TaskSnapshot) GetJobId

func (t *TaskSnapshot) GetJobId() int64

func (*TaskSnapshot) GetJobInstanceId

func (t *TaskSnapshot) GetJobInstanceId() int64

func (*TaskSnapshot) GetProgress

func (t *TaskSnapshot) GetProgress() float64

func (*TaskSnapshot) GetStatus

func (t *TaskSnapshot) GetStatus() int32

func (*TaskSnapshot) GetTaskBody

func (t *TaskSnapshot) GetTaskBody() []byte

func (*TaskSnapshot) GetTaskId

func (t *TaskSnapshot) GetTaskId() int64

func (*TaskSnapshot) GetTaskName

func (t *TaskSnapshot) GetTaskName() string

func (*TaskSnapshot) GetWorkerAddr

func (t *TaskSnapshot) GetWorkerAddr() string

func (*TaskSnapshot) GetWorkerId

func (t *TaskSnapshot) GetWorkerId() string

func (*TaskSnapshot) SetGmtCreate

func (t *TaskSnapshot) SetGmtCreate(gmtCreate time.Time)

func (*TaskSnapshot) SetGmtModified

func (t *TaskSnapshot) SetGmtModified(gmtModified time.Time)

func (*TaskSnapshot) SetJobId

func (t *TaskSnapshot) SetJobId(jobId int64)

func (*TaskSnapshot) SetJobInstanceId

func (t *TaskSnapshot) SetJobInstanceId(jobInstanceId int64)

func (*TaskSnapshot) SetProgress

func (t *TaskSnapshot) SetProgress(progress float64)

func (*TaskSnapshot) SetStatus

func (t *TaskSnapshot) SetStatus(Status int32)

func (*TaskSnapshot) SetTaskBody

func (t *TaskSnapshot) SetTaskBody(taskBody []byte)

func (*TaskSnapshot) SetTaskId

func (t *TaskSnapshot) SetTaskId(taskId int64)

func (*TaskSnapshot) SetTaskName

func (t *TaskSnapshot) SetTaskName(taskName string)

func (*TaskSnapshot) SetWorker

func (t *TaskSnapshot) SetWorker(workerAddr string)

func (*TaskSnapshot) SetWorkerId

func (t *TaskSnapshot) SetWorkerId(workerId string)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL