Documentation
¶
Index ¶
- Constants
- Variables
- type ActivateTenantsOption
- type CountTenantPendingTasksOption
- type DB
- type FindActiveTenantsOption
- type FindPendingTasksOption
- type GetTaskOption
- type GetTaskStatusOption
- type GetTenantOption
- type Level
- type Listener
- type ListenerEvent
- type Logger
- type OnMessageReceived
- type ResourceUsage
- type Task
- type TaskContext
- type TaskHandler
- type TaskHandlerFactory
- type TaskMessage
- type TaskMetadata
- type TaskResourceUsage
- type TaskRun
- type TaskStatus
- type TenantResourceUsage
- type TenantResourceUsages
- type Transport
- type UpdateTaskStatusOption
- type WorkerInfo
Constants ¶
const LevelKey = "level"
LevelKey is logger level key.
Variables ¶
var PendingTaskExpiredTime = time.Hour * 24 * 7
Functions ¶
This section is empty.
Types ¶
type ActivateTenantsOption ¶
ActivateTenantsOption specifies options for updating tenants' active time
type CountTenantPendingTasksOption ¶
CountTenantPendingTasksOption specifies options for counting the number of pending tasks of specified tenant
type DB ¶
type DB interface { CreateTenant(ctx context.Context, t entity.Tenant) error ActivateTenants(ctx context.Context, opt ActivateTenantsOption) error GetTenant(ctx context.Context, opt GetTenantOption) (*entity.Tenant, error) FindActiveTenants(ctx context.Context, opt FindActiveTenantsOption) (entity.Tenants, error) CountTenantPendingTasks(ctx context.Context, opt CountTenantPendingTasksOption) (int64, error) GetTask(ctx context.Context, opt GetTaskOption) (*entity.Task, error) GetTaskStatus(ctx context.Context, opt GetTaskStatusOption) (enum.TaskStatus, error) FindPendingTasks(ctx context.Context, opt FindPendingTasksOption) (entity.Tasks, error) CreateTask(ctx context.Context, t entity.Task) error UpdateTaskStatus(ctx context.Context, opt UpdateTaskStatusOption) error Close() error }
type FindActiveTenantsOption ¶
FindActiveTenantsOption specifies options for finding active tenants
type FindPendingTasksOption ¶ added in v0.1.6
type FindPendingTasksOption struct { TenantId *string MinUid *string Status []enum.TaskStatus }
FindPendingTasksOption specifies options for finding recent tasks
type GetTaskOption ¶
GetTaskOption specifies the options for getting task by uid
type GetTaskStatusOption ¶
type GetTaskStatusOption struct { TenantId string // Required by Redis DB implementation Uid string }
GetTaskStatusOption specifies the options for getting user task's status
type GetTenantOption ¶
type GetTenantOption struct {
TenantId string
}
GetTenantOption specifies options for getting tenant entity
type Level ¶
type Level int8
Level is a logger level.
func ParseLevel ¶
ParseLevel parses a level string into a logger Level value.
type Listener ¶
type Listener interface { OnTaskCreated(e ListenerEvent) OnTaskScheduling(e ListenerEvent) OnTaskDispatching(e ListenerEvent) OnTaskRunning(e ListenerEvent) OnTaskNeedsRetry(e ListenerEvent) OnTaskFinished(e ListenerEvent) OnTaskFailed(e ListenerEvent) OnTaskTransition(e ListenerEvent) OnTaskTransitionError(e ListenerEvent) OnTaskTransitionFinished(e ListenerEvent) }
type ListenerEvent ¶
type ListenerEvent struct { SchedulerId string WorkerId string Task TaskMetadata }
type OnMessageReceived ¶
type ResourceUsage ¶
type ResourceUsage struct { Time time.Time CPU int64 Memory int64 Storage int64 GPU int64 Concurrency int64 Custom int64 }
func (*ResourceUsage) GetByType ¶
func (u *ResourceUsage) GetByType(typ string) int64
type Task ¶
type Task struct { Handler string `json:"handler"` // Task handler name TenantId string `json:"tenantId"` // Task tenant id Uid string `json:"uid"` // Task uid, aka taskId SchedulerId string `json:"schedulerId"` // Scheduler id that is responsible for scheduling the task Config string `json:"config"` // Task configurations in JSON, might be nil RestartTimes int `json:"restartTimes"` // Number of restart InTransition bool `json:"inTransition"` // Indicates whether this task run is a migration task LastRun *TaskRun `json:"lastRun"` // The last TaskRun, nil when Task is never run }
Task details
type TaskContext ¶
type TaskContext struct { Worker *WorkerInfo `json:"worker"` Task Task `json:"task"` }
TaskContext contains worker info, task details
func (*TaskContext) MarkDone ¶
func (c *TaskContext) MarkDone(status enum.TaskRunStatus, res string, err error)
func (*TaskContext) MarkProgress ¶
func (c *TaskContext) MarkProgress(v int)
func (*TaskContext) MarkRunning ¶
func (c *TaskContext) MarkRunning()
func (*TaskContext) NewMessage ¶
func (c *TaskContext) NewMessage(typ enum.TaskMessageType, val interface{}) *TaskMessage
type TaskHandler ¶
type TaskHandler interface { // Start starts processing the task, returns the error occurred during processing and whether the task should be retried // NOTE: This method implementation should be blocking, pooling will be handled by worker Start() (bool, error) // StartMigratedTask starts a migrated task, signals the worker through finishC when the task has been started StartMigratedTask(finishC chan struct{}) (bool, error) // Stop forces the task to stop, mostly called when a task is manually stopped Stop() error // PrepareMigration notifies task handler to pause the processing of tasks, but keep some essential state alive. // The task will be started on a new worker after that, so task handler has to take care of this situation. PrepareMigration() (*TaskContext, *TaskStatus, error) // MigrateError is called when error occurred during migration MigrateError() (*TaskContext, *TaskStatus, error) // HeartBeat is called by worker regularly to ensure task (and task handler) works as expected. // When task handler fails to response for a couple of times, worker may treat the task has been failed HeartBeat() (*TaskContext, *TaskStatus, error) }
TaskHandler defines the most basic methods that a normal UserTask has to implement.
type TaskHandlerFactory ¶
type TaskHandlerFactory func(ctx *TaskContext, info *WorkerInfo) (TaskHandler, error)
TaskHandlerFactory builds a new TaskHandler
type TaskMessage ¶
type TaskMessage struct { Type enum.TaskMessageType `json:"type"` // Task message type SchedulerId string `json:"schedulerId"` // Scheduler id that the message is sent to WorkerId string `json:"workerId"` // Worker id that the message is sent from Task Task `json:"task"` // Task details Timestamp time.Time `json:"timestamp"` // Timestamp the message is sent Value json.RawMessage `json:"value"` // Extra message value in JSON }
TaskMessage defines the message that a worker sends to scheduler
type TaskMetadata ¶
type TaskMetadata struct { Handler string `json:"handler"` // Task handler name TenantId string `json:"tenantId"` // Task tenant id Uid string `json:"uid"` // Task uid, aka taskId }
TaskMetadata metadata
func NewTaskMetadataFromTaskEntity ¶ added in v0.1.6
func NewTaskMetadataFromTaskEntity(t *entity.Task) TaskMetadata
func NewTaskMetadataFromTaskMessage ¶
func NewTaskMetadataFromTaskMessage(msg *TaskMessage) TaskMetadata
type TaskResourceUsage ¶
type TaskResourceUsage struct { TaskId int64 TaskType int TenantId int64 Current ResourceUsage History []ResourceUsage }
type TaskRun ¶
type TaskRun struct { Status enum.TaskRunStatus `json:"status"` // Task run status Result string `json:"result"` // Task run result Error string `json:"error"` // Error when task failed Start time.Time `json:"start"` // Time the task began End time.Time `json:"end"` // Time the task ended Progress *int `json:"progress"` // Task progress in the range of (0, 100] }
TaskRun task last run info
type TaskStatus ¶
type TaskStatus struct { State enum.TaskStatus `json:"state"` Progress int `json:"progress"` Error error `json:"error"` Timestamp time.Time `json:"timestamp"` ResourceUsage struct { CPU int `json:"cpu"` Memory int `json:"memory"` Storage int `json:"storage"` GPU int `json:"gpu"` Concurrency int `json:"concurrency"` Custom int `json:"custom"` } `json:"resourceUsage"` }
TaskStatus is reported everytime HeartBeat method is called.
type TenantResourceUsage ¶
type TenantResourceUsage struct { Active map[int64]*TaskResourceUsage History []*TaskResourceUsage Stale []*TaskResourceUsage }
type TenantResourceUsages ¶
type TenantResourceUsages map[int64]*TenantResourceUsage
type Transport ¶
type Transport interface { // Start starts the transport Start() error // OnReceive registers a message handler, which is called every time a message is received OnReceive(fn OnMessageReceived) // Send sends message in bytes Send(from, to string, msg []byte) error // CloseReceive closes receive side, stops receiving or handling messages CloseReceive() error // CloseSend closes send side, stops sending messages CloseSend() error }
type UpdateTaskStatusOption ¶
type UpdateTaskStatusOption struct { TenantId string // Required by Redis DB implementation Uids []string Status enum.TaskStatus }
UpdateTaskStatusOption specifies the options for updating user tasks' status
type WorkerInfo ¶
type WorkerInfo struct { Id string `json:"id"` // Worker id Generation int64 `json:"generation"` // Worker generation }
WorkerInfo contains essential info about the worker that tasks need.