types

package
v0.2.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 2, 2024 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Index

Constants

View Source
const LevelKey = "level"

LevelKey is logger level key.

Variables

View Source
var PendingTaskExpiredTime = time.Hour * 24 * 7

Functions

This section is empty.

Types

type ActivateTenantsOption

type ActivateTenantsOption struct {
	TenantId   []string
	ActiveTime time.Time
}

ActivateTenantsOption specifies options for updating tenants' active time

type CountTenantPendingTasksOption

type CountTenantPendingTasksOption struct {
	TenantId string
	From     time.Time
	To       time.Time
}

CountTenantPendingTasksOption specifies options for counting the number of pending tasks of specified tenant

type DB

type DB interface {
	CreateTenant(ctx context.Context, t entity.Tenant) error
	ActivateTenants(ctx context.Context, opt ActivateTenantsOption) error
	GetTenant(ctx context.Context, opt GetTenantOption) (*entity.Tenant, error)
	FindActiveTenants(ctx context.Context, opt FindActiveTenantsOption) (entity.Tenants, error)
	CountTenantPendingTasks(ctx context.Context, opt CountTenantPendingTasksOption) (int64, error)

	GetTask(ctx context.Context, opt GetTaskOption) (*entity.Task, error)
	GetTaskStatus(ctx context.Context, opt GetTaskStatusOption) (enum.TaskStatus, error)
	FindPendingTasks(ctx context.Context, opt FindPendingTasksOption) (entity.Tasks, error)
	CreateTask(ctx context.Context, t entity.Task) error
	UpdateTaskStatus(ctx context.Context, opt UpdateTaskStatusOption) error

	Close() error
}

type FindActiveTenantsOption

type FindActiveTenantsOption struct {
	From      *time.Time
	Zone      *string
	Partition *string
}

FindActiveTenantsOption specifies options for finding active tenants

type FindPendingTasksOption added in v0.1.6

type FindPendingTasksOption struct {
	TenantId *string
	MinUid   *string
	Status   []enum.TaskStatus
}

FindPendingTasksOption specifies options for finding recent tasks

type GetTaskOption

type GetTaskOption struct {
	TenantId string // Required by Redis DB implementation
	Uid      string
}

GetTaskOption specifies the options for getting task by uid

type GetTaskStatusOption

type GetTaskStatusOption struct {
	TenantId string // Required by Redis DB implementation
	Uid      string
}

GetTaskStatusOption specifies the options for getting user task's status

type GetTenantOption

type GetTenantOption struct {
	TenantId string
}

GetTenantOption specifies options for getting tenant entity

type Level

type Level int8

Level is a logger level.

const (
	// LevelTrace trace level
	LevelTrace Level = iota - 1
	// LevelDebug debug level
	LevelDebug
	// LevelInfo info level
	LevelInfo
	// LevelWarn warn level
	LevelWarn
	// LevelError error level
	LevelError
	// LevelFatal fatal level
	LevelFatal
)

func ParseLevel

func ParseLevel(s string) Level

ParseLevel parses a level string into a logger Level value.

func (Level) String

func (l Level) String() string

type Listener

type Listener interface {
	OnTaskCreated(e ListenerEvent)
	OnTaskScheduling(e ListenerEvent)
	OnTaskDispatching(e ListenerEvent)
	OnTaskRunning(e ListenerEvent)
	OnTaskNeedsRetry(e ListenerEvent)
	OnTaskFinished(e ListenerEvent)
	OnTaskFailed(e ListenerEvent)
	OnTaskTransition(e ListenerEvent)
	OnTaskTransitionError(e ListenerEvent)
	OnTaskTransitionFinished(e ListenerEvent)
}

type ListenerEvent

type ListenerEvent struct {
	SchedulerId string
	WorkerId    string
	Task        TaskMetadata
}

type Logger

type Logger interface {
	Log(level Level, keyvals ...interface{})
}

type OnMessageReceived

type OnMessageReceived func(from, to string, msg []byte) (result []byte, err error)

type ResourceUsage

type ResourceUsage struct {
	Time        time.Time
	CPU         int64
	Memory      int64
	Storage     int64
	GPU         int64
	Concurrency int64
	Custom      int64
}

func (*ResourceUsage) GetByType

func (u *ResourceUsage) GetByType(typ string) int64

type Task

type Task struct {
	Handler      string   `json:"handler"`      // Task handler name
	TenantId     string   `json:"tenantId"`     // Task tenant id
	Uid          string   `json:"uid"`          // Task uid, aka taskId
	SchedulerId  string   `json:"schedulerId"`  // Scheduler id that is responsible for scheduling the task
	Config       string   `json:"config"`       // Task configurations in JSON, might be nil
	RestartTimes int      `json:"restartTimes"` // Number of restart
	InTransition bool     `json:"inTransition"` // Indicates whether this task run is a migration task
	LastRun      *TaskRun `json:"lastRun"`      // The last TaskRun, nil when Task is never run
}

Task details

type TaskContext

type TaskContext struct {
	Worker *WorkerInfo `json:"worker"`
	Task   Task        `json:"task"`
}

TaskContext contains worker info, task details

func (*TaskContext) MarkDone

func (c *TaskContext) MarkDone(status enum.TaskRunStatus, res string, err error)

func (*TaskContext) MarkProgress

func (c *TaskContext) MarkProgress(v int)

func (*TaskContext) MarkRunning

func (c *TaskContext) MarkRunning()

func (*TaskContext) NewMessage

func (c *TaskContext) NewMessage(typ enum.TaskMessageType, val interface{}) *TaskMessage

type TaskHandler

type TaskHandler interface {
	// Start starts processing the task, returns the error occurred during processing and whether the task should be retried
	// NOTE: This method implementation should be blocking, pooling will be handled by worker
	Start() (bool, error)

	// StartMigratedTask starts a migrated task, signals the worker through finishC when the task has been started
	StartMigratedTask(finishC chan struct{}) (bool, error)

	// Stop forces the task to stop, mostly called when a task is manually stopped
	Stop() error

	// PrepareMigration notifies task handler to pause the processing of tasks, but keep some essential state alive.
	// The task will be started on a new worker after that, so task handler has to take care of this situation.
	PrepareMigration() (*TaskContext, *TaskStatus, error)

	// MigrateError is called when error occurred during migration
	MigrateError() (*TaskContext, *TaskStatus, error)

	// HeartBeat is called by worker regularly to ensure task (and task handler) works as expected.
	// When task handler fails to response for a couple of times, worker may treat the task has been failed
	HeartBeat() (*TaskContext, *TaskStatus, error)
}

TaskHandler defines the most basic methods that a normal UserTask has to implement.

type TaskHandlerFactory

type TaskHandlerFactory func(ctx *TaskContext, info *WorkerInfo) (TaskHandler, error)

TaskHandlerFactory builds a new TaskHandler

type TaskMessage

type TaskMessage struct {
	Type        enum.TaskMessageType `json:"type"`        // Task message type
	SchedulerId string               `json:"schedulerId"` // Scheduler id that the message is sent to
	WorkerId    string               `json:"workerId"`    // Worker id that the message is sent from
	Task        Task                 `json:"task"`        // Task details
	Timestamp   time.Time            `json:"timestamp"`   // Timestamp the message is sent
	Value       json.RawMessage      `json:"value"`       // Extra message value in JSON
}

TaskMessage defines the message that a worker sends to scheduler

type TaskMetadata

type TaskMetadata struct {
	Handler  string `json:"handler"`  // Task handler name
	TenantId string `json:"tenantId"` // Task tenant id
	Uid      string `json:"uid"`      // Task uid, aka taskId
}

TaskMetadata metadata

func NewTaskMetadataFromTaskEntity added in v0.1.6

func NewTaskMetadataFromTaskEntity(t *entity.Task) TaskMetadata

func NewTaskMetadataFromTaskMessage

func NewTaskMetadataFromTaskMessage(msg *TaskMessage) TaskMetadata

type TaskResourceUsage

type TaskResourceUsage struct {
	TaskId   int64
	TaskType int
	TenantId int64
	Current  ResourceUsage
	History  []ResourceUsage
}

type TaskRun

type TaskRun struct {
	Status   enum.TaskRunStatus `json:"status"`   // Task run status
	Result   string             `json:"result"`   // Task run result
	Error    string             `json:"error"`    // Error when task failed
	Start    time.Time          `json:"start"`    // Time the task began
	End      time.Time          `json:"end"`      // Time the task ended
	Progress *int               `json:"progress"` // Task progress in the range of (0, 100]
}

TaskRun task last run info

type TaskStatus

type TaskStatus struct {
	State         enum.TaskStatus `json:"state"`
	Progress      int             `json:"progress"`
	Error         error           `json:"error"`
	Timestamp     time.Time       `json:"timestamp"`
	ResourceUsage struct {
		CPU         int `json:"cpu"`
		Memory      int `json:"memory"`
		Storage     int `json:"storage"`
		GPU         int `json:"gpu"`
		Concurrency int `json:"concurrency"`
		Custom      int `json:"custom"`
	} `json:"resourceUsage"`
}

TaskStatus is reported everytime HeartBeat method is called.

type TenantResourceUsage

type TenantResourceUsage struct {
	Active  map[int64]*TaskResourceUsage
	History []*TaskResourceUsage
	Stale   []*TaskResourceUsage
}

type TenantResourceUsages

type TenantResourceUsages map[int64]*TenantResourceUsage

type Transport

type Transport interface {
	// Start starts the transport
	Start() error

	// OnReceive registers a message handler, which is called every time a message is received
	OnReceive(fn OnMessageReceived)

	// Send sends message in bytes
	Send(from, to string, msg []byte) error

	// CloseReceive closes receive side, stops receiving or handling messages
	CloseReceive() error

	// CloseSend closes send side, stops sending messages
	CloseSend() error
}

type UpdateTaskStatusOption

type UpdateTaskStatusOption struct {
	TenantId string // Required by Redis DB implementation
	Uids     []string
	Status   enum.TaskStatus
}

UpdateTaskStatusOption specifies the options for updating user tasks' status

type WorkerInfo

type WorkerInfo struct {
	Id         string `json:"id"`         // Worker id
	Generation int64  `json:"generation"` // Worker generation
}

WorkerInfo contains essential info about the worker that tasks need.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL