types

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 11, 2023 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Index

Constants

View Source
const LevelKey = "level"

LevelKey is logger level key.

Variables

This section is empty.

Functions

This section is empty.

Types

type ActivateTenantsOption

type ActivateTenantsOption struct {
	TenantId   []string
	ActiveTime time.Time
}

ActivateTenantsOption specifies options for updating tenants' active time

type CountTenantPendingTasksOption

type CountTenantPendingTasksOption struct {
	TenantId string
	From     time.Time
	To       time.Time
}

CountTenantPendingTasksOption specifies options for counting the number of pending tasks of specified tenant

type DB

type DB interface {
	CreateTenant(ctx context.Context, t entity.Tenant) error
	ActivateTenants(ctx context.Context, opt ActivateTenantsOption) error
	GetTenant(ctx context.Context, opt GetTenantOption) (*entity.Tenant, error)
	FindActiveTenants(ctx context.Context, opt FindActiveTenantsOption) (entity.Tenants, error)
	CountTenantPendingTasks(ctx context.Context, opt CountTenantPendingTasksOption) (int64, error)

	GetTask(ctx context.Context, opt GetTaskOption) (entity.Tasks, error)
	GetTaskStatus(ctx context.Context, opt GetTaskStatusOption) (enum.TaskStatus, error)
	FindRecentTasks(ctx context.Context, opt FindRecentTasksOption) (entity.Tasks, error)
	CreateTask(ctx context.Context, t entity.UserTask) error
	UpdateTaskStatus(ctx context.Context, opt UpdateTaskStatusOption) error

	Close() error
}

type FindActiveTenantsOption

type FindActiveTenantsOption struct {
	From      *time.Time
	Zone      *string
	Partition *string
}

FindActiveTenantsOption specifies options for finding active tenants

type FindRecentTasksOption

type FindRecentTasksOption struct {
	TaskType      enum.TaskType
	TenantId      *string
	MinUserTaskId *string
	Status        []enum.TaskStatus
}

FindRecentTasksOption specifies options for finding recent tasks

type GetTaskOption

type GetTaskOption struct {
	TaskType enum.TaskType
	Uid      string
}

GetTaskOption specifies the options for getting task by uid

type GetTaskStatusOption

type GetTaskStatusOption struct {
	TaskType enum.TaskType
	Uid      string
}

GetTaskStatusOption specifies the options for finding tasks' status

type GetTenantOption

type GetTenantOption struct {
	TenantId string
}

GetTenantOption specifies options for getting tenant entity

type Level

type Level int8

Level is a logger level.

const (
	// LevelTrace trace level
	LevelTrace Level = iota - 1
	// LevelDebug debug level
	LevelDebug
	// LevelInfo info level
	LevelInfo
	// LevelWarn warn level
	LevelWarn
	// LevelError error level
	LevelError
	// LevelFatal fatal level
	LevelFatal
)

func ParseLevel

func ParseLevel(s string) Level

ParseLevel parses a level string into a logger Level value.

func (Level) String

func (l Level) String() string

type Listener

type Listener interface {
	OnTaskCreated(e ListenerEvent)
	OnTaskScheduling(e ListenerEvent)
	OnTaskDispatching(e ListenerEvent)
	OnTaskRunning(e ListenerEvent)
	OnTaskNeedsRetry(e ListenerEvent)
	OnTaskFinished(e ListenerEvent)
	OnTaskFailed(e ListenerEvent)
	OnTaskTransition(e ListenerEvent)
	OnTaskTransitionError(e ListenerEvent)
	OnTaskTransitionFinished(e ListenerEvent)
}

type ListenerEvent

type ListenerEvent struct {
	SchedulerId string
	WorkerId    string
	Task        TaskMetadata
}

type Logger

type Logger interface {
	Log(level Level, keyvals ...interface{})
}

type OnMessageReceived

type OnMessageReceived func(from, to string, msg []byte) (result []byte, err error)

type ResourceUsage

type ResourceUsage struct {
	Time        time.Time
	CPU         int64
	Memory      int64
	Storage     int64
	GPU         int64
	Concurrency int64
	Custom      int64
}

func (*ResourceUsage) GetByType

func (u *ResourceUsage) GetByType(typ string) int64

type Task

type Task struct {
	Handler      string          `json:"handler"`      // Task handler name
	TenantId     string          `json:"tenantId"`     // Task tenant id
	Uid          string          `json:"uid"`          // Task uid, aka taskId
	SchedulerId  string          `json:"schedulerId"`  // Scheduler id that is responsible for scheduling the task
	Type         enum.TaskType   `json:"type"`         // TaskType
	Config       json.RawMessage `json:"config"`       // Task configurations in JSON, might be nil
	RestartTimes int             `json:"restartTimes"` // Number of restart
	InTransition bool            `json:"inTransition"` // Indicates whether this task run is a migration task
	LastRun      *TaskRun        `json:"lastRun"`      // The last TaskRun, nil when Task is never run
}

Task details

type TaskContext

type TaskContext struct {
	Worker *WorkerInfo `json:"worker"`
	Task   Task        `json:"task"`
}

TaskContext contains worker info, task details

func (*TaskContext) MarkDone

func (c *TaskContext) MarkDone(status enum.TaskRunStatus, res string, err error)

func (*TaskContext) MarkProgress

func (c *TaskContext) MarkProgress(v int)

func (*TaskContext) MarkRunning

func (c *TaskContext) MarkRunning()

func (*TaskContext) NewMessage

func (c *TaskContext) NewMessage(typ enum.TaskMessageType, val interface{}) *TaskMessage

type TaskHandler

type TaskHandler interface {
	// StartTransitionTask starts a transition task, If the migration ends
	// need send message to finished chan send a finished signal
	StartTransitionTask(finishSig chan struct{}) (bool, error)
	// Start starts processing the task, returns the error occurred during processing and whether the task should be retried
	// NOTE: This method implementation should be blocking, pooling will be handled by worker
	Start() (bool, error)
	// Stop forces the task to stop, mostly called when a task is manually stopped
	Stop() error
	// BeforeTransitionStart notifies task handler to pause the processing of tasks, but keep some essential state alive.
	// The task will be started on a new worker after that, so task handler has to take care of this situation.
	BeforeTransitionStart() (*TaskContext, *TaskStatus, error)
	// TransitionError is called when error occurred during transition
	TransitionError() (*TaskContext, *TaskStatus, error)
	// HeartBeat is called by worker regularly to ensure task (and task handler) works as expected.
	// When task handler fails to response for a couple of times, worker may treat the task has been failed
	HeartBeat() (*TaskContext, *TaskStatus, error)
}

TaskHandler defines the most basic methods that a normal UserTask has to implement.

type TaskHandlerFactory

type TaskHandlerFactory func(ctx *TaskContext, info *WorkerInfo) (TaskHandler, error)

TaskHandlerFactory builds a new TaskHandler

type TaskMessage

type TaskMessage struct {
	Type        enum.TaskMessageType `json:"type"`        // Task message type
	SchedulerId string               `json:"schedulerId"` // Scheduler id that the message is sent to
	WorkerId    string               `json:"workerId"`    // Worker id that the message is sent from
	Task        Task                 `json:"task"`        // Task details
	Timestamp   time.Time            `json:"timestamp"`   // Timestamp the message is sent
	Value       json.RawMessage      `json:"value"`       // Extra message value in JSON
}

TaskMessage defines the message that a worker sends to scheduler

type TaskMetadata

type TaskMetadata struct {
	Handler  string        `json:"handler"`  // Task handler name
	TenantId string        `json:"tenantId"` // Task tenant id
	Uid      string        `json:"uid"`      // Task uid, aka taskId
	Type     enum.TaskType `json:"type"`     // TaskType
}

TaskMetadata metadata

func NewTaskMetadataFromTaskMessage

func NewTaskMetadataFromTaskMessage(msg *TaskMessage) TaskMetadata

func NewTaskMetadataFromUserTaskEntity

func NewTaskMetadataFromUserTaskEntity(t *entity.UserTask) TaskMetadata

type TaskResourceUsage

type TaskResourceUsage struct {
	TaskId   int64
	TaskType int
	TenantId int64
	Current  ResourceUsage
	History  []ResourceUsage
}

type TaskRun

type TaskRun struct {
	Status   enum.TaskRunStatus `json:"status"`   // Task run status
	Result   string             `json:"result"`   // Task run result
	Error    string             `json:"error"`    // Error when task failed
	Start    time.Time          `json:"start"`    // Time the task began
	End      time.Time          `json:"end"`      // Time the task ended
	Progress *int               `json:"progress"` // Task progress in the range of (0, 100]
}

TaskRun task last run info

type TaskStatus

type TaskStatus struct {
	State         enum.TaskStatus `json:"state"`
	Progress      int             `json:"progress"`
	Error         error           `json:"error"`
	Timestamp     time.Time       `json:"timestamp"`
	ResourceUsage struct {
		CPU         int `json:"cpu"`
		Memory      int `json:"memory"`
		Storage     int `json:"storage"`
		GPU         int `json:"gpu"`
		Concurrency int `json:"concurrency"`
		Custom      int `json:"custom"`
	} `json:"resourceUsage"`
}

TaskStatus is reported everytime HeartBeat method is called.

type TenantResourceUsage

type TenantResourceUsage struct {
	Active  map[int64]*TaskResourceUsage
	History []*TaskResourceUsage
	Stale   []*TaskResourceUsage
}

type TenantResourceUsages

type TenantResourceUsages map[int64]*TenantResourceUsage

type Transport

type Transport interface {
	// Start starts the transport
	Start() error

	// OnReceive registers a message handler, which is called every time a message is received
	OnReceive(fn OnMessageReceived)

	// Send sends message in bytes
	Send(from, to string, msg []byte) error

	// CloseReceive closes receive side, stops receiving or handling messages
	CloseReceive() error

	// CloseSend closes send side, stops sending messages
	CloseSend() error
}

type UpdateTaskStatusOption

type UpdateTaskStatusOption struct {
	TaskType enum.TaskType
	Uids     []string
	Status   enum.TaskStatus
}

UpdateTaskStatusOption specifies the options for updating task status

type WorkerInfo

type WorkerInfo struct {
	Id         string `json:"id"`         // Worker id
	Generation int64  `json:"generation"` // Worker generation
}

WorkerInfo contains essential info about the worker that tasks need.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL