Documentation
¶
Index ¶
- Constants
- type ActivateTenantsOption
- type CountTenantPendingTasksOption
- type DB
- type FindActiveTenantsOption
- type FindRecentTasksOption
- type GetTaskOption
- type GetTaskStatusOption
- type GetTenantOption
- type Level
- type Listener
- type ListenerEvent
- type Logger
- type OnMessageReceived
- type ResourceUsage
- type Task
- type TaskContext
- type TaskHandler
- type TaskHandlerFactory
- type TaskMessage
- type TaskMetadata
- type TaskResourceUsage
- type TaskRun
- type TaskStatus
- type TenantResourceUsage
- type TenantResourceUsages
- type Transport
- type UpdateTaskStatusOption
- type WorkerInfo
Constants ¶
const LevelKey = "level"
LevelKey is logger level key.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ActivateTenantsOption ¶
ActivateTenantsOption specifies options for updating tenants' active time
type CountTenantPendingTasksOption ¶
CountTenantPendingTasksOption specifies options for counting the number of pending tasks of specified tenant
type DB ¶
type DB interface { CreateTenant(ctx context.Context, t entity.Tenant) error ActivateTenants(ctx context.Context, opt ActivateTenantsOption) error GetTenant(ctx context.Context, opt GetTenantOption) (*entity.Tenant, error) FindActiveTenants(ctx context.Context, opt FindActiveTenantsOption) (entity.Tenants, error) CountTenantPendingTasks(ctx context.Context, opt CountTenantPendingTasksOption) (int64, error) GetTask(ctx context.Context, opt GetTaskOption) (entity.Tasks, error) GetTaskStatus(ctx context.Context, opt GetTaskStatusOption) (enum.TaskStatus, error) FindRecentTasks(ctx context.Context, opt FindRecentTasksOption) (entity.Tasks, error) CreateTask(ctx context.Context, t entity.UserTask) error UpdateTaskStatus(ctx context.Context, opt UpdateTaskStatusOption) error Close() error }
type FindActiveTenantsOption ¶
FindActiveTenantsOption specifies options for finding active tenants
type FindRecentTasksOption ¶
type FindRecentTasksOption struct { TaskType enum.TaskType TenantId *string MinUserTaskId *string Status []enum.TaskStatus }
FindRecentTasksOption specifies options for finding recent tasks
type GetTaskOption ¶
GetTaskOption specifies the options for getting task by uid
type GetTaskStatusOption ¶
GetTaskStatusOption specifies the options for finding tasks' status
type GetTenantOption ¶
type GetTenantOption struct {
TenantId string
}
GetTenantOption specifies options for getting tenant entity
type Level ¶
type Level int8
Level is a logger level.
func ParseLevel ¶
ParseLevel parses a level string into a logger Level value.
type Listener ¶
type Listener interface { OnTaskCreated(e ListenerEvent) OnTaskScheduling(e ListenerEvent) OnTaskDispatching(e ListenerEvent) OnTaskRunning(e ListenerEvent) OnTaskNeedsRetry(e ListenerEvent) OnTaskFinished(e ListenerEvent) OnTaskFailed(e ListenerEvent) OnTaskTransition(e ListenerEvent) OnTaskTransitionError(e ListenerEvent) OnTaskTransitionFinished(e ListenerEvent) }
type ListenerEvent ¶
type ListenerEvent struct { SchedulerId string WorkerId string Task TaskMetadata }
type OnMessageReceived ¶
type ResourceUsage ¶
type ResourceUsage struct { Time time.Time CPU int64 Memory int64 Storage int64 GPU int64 Concurrency int64 Custom int64 }
func (*ResourceUsage) GetByType ¶
func (u *ResourceUsage) GetByType(typ string) int64
type Task ¶
type Task struct { Handler string `json:"handler"` // Task handler name TenantId string `json:"tenantId"` // Task tenant id Uid string `json:"uid"` // Task uid, aka taskId SchedulerId string `json:"schedulerId"` // Scheduler id that is responsible for scheduling the task Type enum.TaskType `json:"type"` // TaskType Config json.RawMessage `json:"config"` // Task configurations in JSON, might be nil RestartTimes int `json:"restartTimes"` // Number of restart InTransition bool `json:"inTransition"` // Indicates whether this task run is a migration task LastRun *TaskRun `json:"lastRun"` // The last TaskRun, nil when Task is never run }
Task details
type TaskContext ¶
type TaskContext struct { Worker *WorkerInfo `json:"worker"` Task Task `json:"task"` }
TaskContext contains worker info, task details
func (*TaskContext) MarkDone ¶
func (c *TaskContext) MarkDone(status enum.TaskRunStatus, res string, err error)
func (*TaskContext) MarkProgress ¶
func (c *TaskContext) MarkProgress(v int)
func (*TaskContext) MarkRunning ¶
func (c *TaskContext) MarkRunning()
func (*TaskContext) NewMessage ¶
func (c *TaskContext) NewMessage(typ enum.TaskMessageType, val interface{}) *TaskMessage
type TaskHandler ¶
type TaskHandler interface { // StartTransitionTask starts a transition task, If the migration ends // need send message to finished chan send a finished signal StartTransitionTask(finishSig chan struct{}) (bool, error) // Start starts processing the task, returns the error occurred during processing and whether the task should be retried // NOTE: This method implementation should be blocking, pooling will be handled by worker Start() (bool, error) // Stop forces the task to stop, mostly called when a task is manually stopped Stop() error // BeforeTransitionStart notifies task handler to pause the processing of tasks, but keep some essential state alive. // The task will be started on a new worker after that, so task handler has to take care of this situation. BeforeTransitionStart() (*TaskContext, *TaskStatus, error) // TransitionError is called when error occurred during transition TransitionError() (*TaskContext, *TaskStatus, error) // HeartBeat is called by worker regularly to ensure task (and task handler) works as expected. // When task handler fails to response for a couple of times, worker may treat the task has been failed HeartBeat() (*TaskContext, *TaskStatus, error) }
TaskHandler defines the most basic methods that a normal UserTask has to implement.
type TaskHandlerFactory ¶
type TaskHandlerFactory func(ctx *TaskContext, info *WorkerInfo) (TaskHandler, error)
TaskHandlerFactory builds a new TaskHandler
type TaskMessage ¶
type TaskMessage struct { Type enum.TaskMessageType `json:"type"` // Task message type SchedulerId string `json:"schedulerId"` // Scheduler id that the message is sent to WorkerId string `json:"workerId"` // Worker id that the message is sent from Task Task `json:"task"` // Task details Timestamp time.Time `json:"timestamp"` // Timestamp the message is sent Value json.RawMessage `json:"value"` // Extra message value in JSON }
TaskMessage defines the message that a worker sends to scheduler
type TaskMetadata ¶
type TaskMetadata struct { Handler string `json:"handler"` // Task handler name TenantId string `json:"tenantId"` // Task tenant id Uid string `json:"uid"` // Task uid, aka taskId Type enum.TaskType `json:"type"` // TaskType }
TaskMetadata metadata
func NewTaskMetadataFromTaskMessage ¶
func NewTaskMetadataFromTaskMessage(msg *TaskMessage) TaskMetadata
func NewTaskMetadataFromUserTaskEntity ¶
func NewTaskMetadataFromUserTaskEntity(t *entity.UserTask) TaskMetadata
type TaskResourceUsage ¶
type TaskResourceUsage struct { TaskId int64 TaskType int TenantId int64 Current ResourceUsage History []ResourceUsage }
type TaskRun ¶
type TaskRun struct { Status enum.TaskRunStatus `json:"status"` // Task run status Result string `json:"result"` // Task run result Error string `json:"error"` // Error when task failed Start time.Time `json:"start"` // Time the task began End time.Time `json:"end"` // Time the task ended Progress *int `json:"progress"` // Task progress in the range of (0, 100] }
TaskRun task last run info
type TaskStatus ¶
type TaskStatus struct { State enum.TaskStatus `json:"state"` Progress int `json:"progress"` Error error `json:"error"` Timestamp time.Time `json:"timestamp"` ResourceUsage struct { CPU int `json:"cpu"` Memory int `json:"memory"` Storage int `json:"storage"` GPU int `json:"gpu"` Concurrency int `json:"concurrency"` Custom int `json:"custom"` } `json:"resourceUsage"` }
TaskStatus is reported everytime HeartBeat method is called.
type TenantResourceUsage ¶
type TenantResourceUsage struct { Active map[int64]*TaskResourceUsage History []*TaskResourceUsage Stale []*TaskResourceUsage }
type TenantResourceUsages ¶
type TenantResourceUsages map[int64]*TenantResourceUsage
type Transport ¶
type Transport interface { // Start starts the transport Start() error // OnReceive registers a message handler, which is called every time a message is received OnReceive(fn OnMessageReceived) // Send sends message in bytes Send(from, to string, msg []byte) error // CloseReceive closes receive side, stops receiving or handling messages CloseReceive() error // CloseSend closes send side, stops sending messages CloseSend() error }
type UpdateTaskStatusOption ¶
type UpdateTaskStatusOption struct { TaskType enum.TaskType Uids []string Status enum.TaskStatus }
UpdateTaskStatusOption specifies the options for updating task status
type WorkerInfo ¶
type WorkerInfo struct { Id string `json:"id"` // Worker id Generation int64 `json:"generation"` // Worker generation }
WorkerInfo contains essential info about the worker that tasks need.