engine

package
v0.0.0-...-32b710e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 24, 2024 License: Apache-2.0, BSD-3-Clause, MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	WhiteListAllProjectID = "DIST_CC_CONST::ALL_PROJECT"
)

Variables

View Source
var (
	ErrorUnknownEngineType       = fmt.Errorf("unknown engine type")
	ErrorNoTaskInQueue           = fmt.Errorf("there is no task in the queue")
	ErrorNoEnoughResources       = fmt.Errorf("no enough resources")
	ErrorTaskNoFound             = fmt.Errorf("task no found")
	ErrorUnterminatedTaskNoFound = fmt.Errorf("unterminated task no found")
	ErrorProjectNoFound          = fmt.Errorf("project no found")
	ErrorWhitelistNoFound        = fmt.Errorf("whitelist no found")
	ErrorInnerEngineError        = fmt.Errorf("inner engine error")
	ErrorNoSupportDegrade        = fmt.Errorf("no support degrade")
	ErrorNoQueueNameSpecified    = fmt.Errorf("no queue name specified")
	ErrorUnknownMessageType      = fmt.Errorf("unknown message type")
	ErrorIPNotSpecified          = fmt.Errorf("ip not specified")
)

Functions

func CheckTaskIDValid

func CheckTaskIDValid(egn Engine, taskID string) (bool, error)

CheckTaskIDValid check if the taskID is valid, no used. By checking the engine task basic.

func CreateTaskBasic

func CreateTaskBasic(egn Engine, tb *TaskBasic) error

CreateTaskBasic create new task basic into database

func GetMapExcludeTableTaskBasic

func GetMapExcludeTableTaskBasic(source interface{}) (map[string]interface{}, error)

GetMapExcludeTableTaskBasic received a table which inherit the TableTaskBasic and marshal it into map[string]interface{}, then remove the keys provided by TableTaskBasic Then the return data can be used to update table and will never change TableTaskBasic

func UpdateProjectInfoBasic

func UpdateProjectInfoBasic(egn Engine, projectID string, delta DeltaProjectInfoBasic) error

UpdateProjectInfoBasic update the project info basic with delta data.

func UpdateTaskBasic

func UpdateTaskBasic(egn Engine, tb *TaskBasic, rawWhere []string) error

UpdateTaskBasic update task basic into database

Types

type DeltaProjectInfoBasic

type DeltaProjectInfoBasic struct {
	CompileFinishTimes int64
	CompileFailedTimes int64
}

DeltaProjectInfoBasic contains the delta part of project info basic.

type Engine

type Engine interface {

	// Name() is to get this engine name, this name must match the project.EngineName
	Name() TypeName

	// SelectFirstTaskBasic receive a task queue group(which contains many queues) and the target queue name
	// let the engine decide which task basic should be get in priority
	// basically, queueGroup.GetQueue(queueName).First() is just fine
	SelectFirstTaskBasic(queueGroup *TaskQueueGroup, queueName string) (*TaskBasic, error)

	// CreateTaskExtension provide the task basic data and the extra data,
	// extra data should be defined by different engines and decode inside engines
	// engine should generate the concerned data and save it during this function
	CreateTaskExtension(tb *TaskBasic, extra []byte) error

	// Get TaskExtension from engine
	// Task provide many public functions so that manager don't have to ask engine everything between progresses
	GetTaskExtension(taskID string) (TaskExtension, error)

	// LaunchTask tell engine to launch the distribute service
	// it should be asynchronous and return immediately after exec the launch job
	// if there is no enough resources for task launching, just return engine.ErrorNoEnoughResources
	LaunchTask(tb *TaskBasic, queueName string) error

	// LaunchDone is used to check if the launch job is done
	LaunchDone(taskID string) (bool, error)

	// CheckTask let engine check this task status
	CheckTask(tb *TaskBasic) error

	// DegradeTask will be called when manager decide to degrade this task, such as staging timeout
	// engine can just return an error if no support degrade mode.
	DegradeTask(taskID string) error

	// SendProjectMessage provide the message to engine,
	// it should be decode inside the engines
	// then get data back
	SendProjectMessage(projectID string, message []byte) ([]byte, error)

	// SendTaskMessage provide the message to engine,
	// it should be decode inside the engines
	// then get data back
	SendTaskMessage(taskID string, message []byte) ([]byte, error)

	// CollectTaskData tell engine that this distribute service will not be used any more,
	// fell free to collect data or some after-work jobs
	CollectTaskData(tb *TaskBasic) error

	// ReleaseTask tell engine to shutdown the service and release the resource
	ReleaseTask(taskID string) error

	// GetPreferences return the engine preferences settings
	GetPreferences() Preferences

	// GetTaskBasicTable return the DB instance where TableTaskBasic is
	GetTaskBasicTable() *gorm.DB

	// GetProjectBasicTable return the DB instance where TableProjectBasic is
	GetProjectBasicTable() *gorm.DB

	// GetProjectInfoBasicTable return the DB instance where TableProjectInfoBasic is
	GetProjectInfoBasicTable() *gorm.DB

	// GetWhitelistBasicTable return the DB instance where TableWhitelistBasic is
	GetWhitelistBasicTable() *gorm.DB
}

Engine describe how different distribute service work under the manager

type MySQLConf

type MySQLConf struct {
	MySQLStorage     string
	MySQLDatabase    string
	MySQLUser        string
	MySQLPwd         string
	Charset          string
	MySQLDebug       bool
	MysqlTableOption string
}

MySQLConf describe the mysql connection config need by engines

type Preferences

type Preferences struct {
	// heartbeat timeout = heartbeat tick gap * tick times
	HeartbeatTimeoutTickTimes int
}

Preferences describe the custom settings by different engines. these settings matters when manager handle some public works such as heartbeat.

type ProjectBasic

type ProjectBasic struct {

	// PRIMARY key, which should be unique in global(among all engines).
	ProjectID string

	// detail name for this project
	ProjectName string

	// detail descriptions for this project
	Message string

	// Priority defines the priority of tasks launched by this project
	// which matters the task position in waiting queue
	Priority TaskPriority

	// QueueName defines which queue will manage the tasks launched by this project.
	QueueName string

	// EngineName decide which engine will be used to launch the tasks from this project
	EngineName TypeName

	// StageTimeout defines the timeout(seconds) when tasks in staging status.
	// After timeout, the task will be regarded as in the situation "no enough resources",
	// then the task will be do the regraded progress
	StageTimeout int

	// Concurrency defines the max concurrency unterminated task under this projects
	// a new task which reach the limits will be rejected.
	Concurrency int
}

ProjectBasic describe the basic settings of project,

func GetProjectBasic

func GetProjectBasic(egn Engine, projectID string) (*ProjectBasic, error)

GetProjectBasic get project basic from engine

func TableProject2ProjectBasic

func TableProject2ProjectBasic(basic *TableProjectBasic) *ProjectBasic

TableProject2ProjectBasic convert ProjectBasic from database field into struct data

type QueueBriefInfo

type QueueBriefInfo struct {
	QueueName  string
	EngineName TypeName
}

QueueBriefInfo describe a brief information of a queue it is a queue - engine pair

type QueueShareType

type QueueShareType int
const (
	QueueShareTypeAllAllowed         QueueShareType = iota
	QueueShareTypeOnlyTakeFromPublic                // 是否允许从公共队列中获取任务,即用当前资源来为公共队列中的任务进行加速
	QueueShareTypeOnlyGiveToPublic                  // 是否允许将当前处理不了的任务放置到公共队列中,即是否用其它组的资源来为当前组的任务进行加速
	QueueShareTypeNoneAllowed
)

type StagingTaskQueue

type StagingTaskQueue interface {
	// get length of this queue
	Len() int64

	// list all TaskBasic from this queue, order by Priority and CreateTime(in same Priority)
	All() []*TaskBasic

	// get first instance from this queue
	First() (*TaskBasic, error)

	// check if task exist
	Exist(taskID string) bool

	// get the specific TaskBasic rank in this queue
	Rank(taskID string) (int, error)

	// add new TaskBasic into this queue
	// the ordering will be adjusted automatically
	Add(task *TaskBasic)

	// delete the specific TaskBasic from this queue
	Delete(taskID string) error

	// clear all elements from this queue
	Clear()
}

StagingTaskQueue maintains a priority-driven queue for TaskBasic

func NewStagingTaskQueue

func NewStagingTaskQueue() StagingTaskQueue

NewStagingTaskQueue get a new, empty, initialized StagingTaskQueue

type TableBasic

type TableBasic struct {
	UpdatedAt time.Time `gorm:"column:update_at" json:"-"`
	Disabled  bool      `gorm:"column:disabled;default:false;index" json:"-"`
}

TableBasic contains some basic fields in all tables.

type TableProjectBasic

type TableProjectBasic struct {
	TableBasic

	ProjectID    string `gorm:"column:project_id;primary_key" json:"project_id"`
	ProjectName  string `gorm:"column:project_name" json:"project_name"`
	Priority     int    `gorm:"column:priority" json:"priority"`
	EngineName   string `gorm:"column:engine_name" json:"engine_name"`
	QueueName    string `gorm:"column:queue_name" json:"queue_name"`
	StageTimeout int    `gorm:"column:stage_timeout;default:60" json:"stage_timeout"`
	Message      string `gorm:"column:message" sql:"type:text" json:"message"`
	Concurrency  int    `gorm:"column:concurrency;default:0" json:"concurrency"`
}

TableProjectBasic contains the project basic fields. All engine project should contains these basic fields.

type TableProjectInfoBasic

type TableProjectInfoBasic struct {
	TableBasic

	ProjectID          string `gorm:"column:project_id;primary_key" json:"project_id"`
	CompileFinishTimes int64  `gorm:"column:compile_finish_times" json:"compile_finish_times"`
	CompileFailedTimes int64  `gorm:"column:compile_failed_times" json:"compile_failed_times"`
}

TableProjectInfoBasic contains the project info basic fields.

type TableTaskBasic

type TableTaskBasic struct {
	TableBasic
	TaskID string `gorm:"column:task_id;primary_key" json:"task_id"`

	// basic client
	EngineName    string `gorm:"column:engine_name" json:"engine_name"`
	QueueName     string `gorm:"column:queue_name" json:"queue_name"`
	ProjectID     string `gorm:"column:project_id;index" json:"project_id"`
	BuildID       string `gorm:"column:build_id" json:"build_id"`
	ClientIP      string `gorm:"column:client_ip" sql:"type:text" json:"client_ip"`
	ClientCPU     int    `gorm:"column:client_cpu" json:"client_cpu"`
	ClientVersion string `gorm:"column:client_version" json:"client_version"`
	ClientMessage string `gorm:"column:client_message" sql:"type:text" json:"client_message"`
	StageTimeout  int    `gorm:"column:stage_timeout;default:60" json:"stage_timeout"`
	Priority      int    `gorm:"column:priority" json:"priority"`

	// basic status
	Status            string `gorm:"column:status;index" json:"status"`
	StatusCode        int    `gorm:"column:status_code" json:"status_code"`
	StatusMessage     string `gorm:"column:status_message" sql:"type:text" json:"status_message"`
	Released          bool   `gorm:"column:released;index" json:"released"`
	LastHeartBeatTime int64  `gorm:"column:last_heartbeat_time" json:"last_heartbeat_time"`
	StatusChangeTime  int64  `gorm:"column:status_change_time" json:"status_change_time"`
	InitTime          int64  `gorm:"column:init_time" json:"init_time"`
	CreateTime        int64  `gorm:"column:create_time;index" json:"create_time"`
	UpdateTime        int64  `gorm:"column:update_time" json:"update_time"`
	LaunchTime        int64  `gorm:"column:launch_time" json:"launch_time"`
	ReadyTime         int64  `gorm:"column:ready_time" json:"ready_time"`
	ShutDownTime      int64  `gorm:"column:shutdown_time" json:"shutdown_time"`
	StartTime         int64  `gorm:"column:start_time" json:"start_time"`
	EndTime           int64  `gorm:"column:end_time" json:"end_time"`
}

TableTaskBasic contains the task basic fields. All engine task should contains these basic fields.

func TaskBasic2TableTask

func TaskBasic2TableTask(basic *TaskBasic) *TableTaskBasic

TaskBasic2TableTask convert TaskBasic from struct data into database field

type TableWhitelistBasic

type TableWhitelistBasic struct {
	TableBasic

	IP        string `gorm:"column:ip;primary_key" json:"ip"`
	ProjectID string `gorm:"column:project_id;primary_key" json:"project_id"`
	Message   string `gorm:"column:message" sql:"type:text" json:"message"`
}

TableProjectBasic contains the project basic fields. All engine project should contains these basic fields.

func (*TableWhitelistBasic) CheckData

func (tb *TableWhitelistBasic) CheckData() error

CheckData check whitelist basic data

type TaskBasic

type TaskBasic struct {
	// the primary key of tasks
	ID string

	// settings when created
	Client TaskBasicClient

	// status after created
	Status TaskBasicStatus
}

TaskBasic describe task basic struct data

func CopyTaskBasic

func CopyTaskBasic(task *TaskBasic) *TaskBasic

CopyTaskBasic deep copy a new task basic

func ListTaskBasic

func ListTaskBasic(egn Engine, options TaskListOptions) ([]*TaskBasic, error)

ListTaskBasic list task basic from engine

func TableTask2TaskBasic

func TableTask2TaskBasic(table *TableTaskBasic) *TaskBasic

TableTask2TaskBasic convert TaskBasic from database field into struct data

func (*TaskBasic) Check

func (tb *TaskBasic) Check() error

Check check if the task basic valid

type TaskBasicClient

type TaskBasicClient struct {
	EngineName    TypeName
	QueueName     string
	ProjectID     string
	BuildID       string
	Priority      TaskPriority
	ClientIP      string
	ClientCPU     int
	ClientVersion string
	StageTimeout  int
	Message       string
}

TaskBasicClient describe task basic settings

type TaskBasicStatus

type TaskBasicStatus struct {
	Status     TaskStatusType
	StatusCode TaskStatusCode
	Message    string

	Released bool

	// Task time
	LastHeartBeatTime time.Time
	StatusChangeTime  time.Time
	InitTime          time.Time
	CreateTime        time.Time
	UpdateTime        time.Time
	LaunchTime        time.Time
	ReadyTime         time.Time
	ShutDownTime      time.Time
	StartTime         time.Time
	EndTime           time.Time
}

TaskBasicStatus describe task basic status

func (*TaskBasicStatus) Beats

func (tbs *TaskBasicStatus) Beats()

Beats record the heartbeat time.

func (*TaskBasicStatus) BeforeRunning

func (tbs *TaskBasicStatus) BeforeRunning() bool

BeforeStarting check whether before running

func (*TaskBasicStatus) ChangeStatus

func (tbs *TaskBasicStatus) ChangeStatus(status TaskStatusType)

ChangeStatus change task status

func (*TaskBasicStatus) Create

func (tbs *TaskBasicStatus) Create()

Create make task created, right after basic and extension are all created.

func (*TaskBasicStatus) End

func (tbs *TaskBasicStatus) End()

End record the task end time, right after the task server end the service.

func (*TaskBasicStatus) FailWithClientCancel

func (tbs *TaskBasicStatus) FailWithClientCancel()

FailWithClientCancel make task failed by client side request

func (*TaskBasicStatus) FailWithClientLost

func (tbs *TaskBasicStatus) FailWithClientLost()

FailWithClientLost make task failed by client lost

func (*TaskBasicStatus) FailWithServerDown

func (tbs *TaskBasicStatus) FailWithServerDown()

FailWithServerDown make task failed by server

func (*TaskBasicStatus) Finish

func (tbs *TaskBasicStatus) Finish()

Finish make task finish, right after the task finish without any error.

func (*TaskBasicStatus) Init

func (tbs *TaskBasicStatus) Init()

Init make task init, right after basic created.

func (*TaskBasicStatus) Launch

func (tbs *TaskBasicStatus) Launch()

Launch make task launched, right after task server is starting

func (*TaskBasicStatus) Ready

func (tbs *TaskBasicStatus) Ready()

Ready make task ready, right after task server is running

func (*TaskBasicStatus) ShutDown

func (tbs *TaskBasicStatus) ShutDown()

ShutDown make task shutdown, right after task server is released

func (*TaskBasicStatus) Start

func (tbs *TaskBasicStatus) Start()

Start record the task start time, right after the task server begins to provide service.

func (*TaskBasicStatus) Update

func (tbs *TaskBasicStatus) Update()

Update record task update

type TaskExtension

type TaskExtension interface {
	// if task get enough available resource
	EnoughAvailableResource() bool

	// get worker list from task
	WorkerList() []string

	GetRequestInstance() int

	GetWorkerCount() int

	// dump the task data
	Dump() []byte

	// return the engine custom data from task
	CustomData(interface{}) interface{}
}

TaskExtension describe the extension part beyond task basic according to different engines

type TaskListOptions

type TaskListOptions struct {
	Status   []TaskStatusType
	Released *bool
}

TaskListOptions describe tasks list filter conditions get all status if len(Status) is 0 get all no matter released or not if Released is nil

func NewTaskListOptions

func NewTaskListOptions(released *bool, statusList ...TaskStatusType) TaskListOptions

NewTaskListOptions get a new task list options

type TaskPriority

type TaskPriority uint

TaskPriority include 1 ~ 20, the greater number will be set to 20. If the priority is 0, the task will never be launch.

const (
	MaxTaskPriority     TaskPriority = 20
	MinTaskPriority     TaskPriority = 1
	DefaultTaskPriority TaskPriority = 10

	DefaultTaskStageTimeout = 60
)

type TaskQueueGroup

type TaskQueueGroup struct {
	sync.Mutex
	// contains filtered or unexported fields
}

TaskQueueGroup handle a few StagingTaskQueues

func NewTaskQueueGroup

func NewTaskQueueGroup() *TaskQueueGroup

NewTaskQueueGroup get a new, empty, initialized TaskQueueGroup

func (*TaskQueueGroup) DeleteTask

func (tqg *TaskQueueGroup) DeleteTask(taskID string) bool

DeleteTask delete a task from all queues of groups

func (*TaskQueueGroup) Exist

func (tqg *TaskQueueGroup) Exist(taskID string) bool

Exist check if the task is in any queues of groups

func (*TaskQueueGroup) GetQueue

func (tqg *TaskQueueGroup) GetQueue(name string) StagingTaskQueue

GetQueue get the specific queue from group

type TaskStatusCode

type TaskStatusCode uint
const (
	TaskStatusCodeFinished TaskStatusCode = iota
	TaskStatusCodeUnknown
	TaskStatusCodeClientCancelInStaging
	TaskStatusCodeClientCancelInStarting
	TaskStatusCodeClientCancelInRunning

	TaskStatusCodeClientLostInStaging
	TaskStatusCodeClientLostInStarting
	TaskStatusCodeClientLostInRunning

	TaskStatusCodeServerFailedInStaging
	TaskStatusCodeServerFailedInStarting
	TaskStatusCodeServerFailedInRunning
)

func (TaskStatusCode) ServerAlive

func (sc TaskStatusCode) ServerAlive() bool

ServerAlive if task server still alive in current status

func (TaskStatusCode) String

func (sc TaskStatusCode) String() string

String get task status code string

type TaskStatusType

type TaskStatusType string
const (
	TaskStatusInit     TaskStatusType = "init"
	TaskStatusStaging  TaskStatusType = "staging"
	TaskStatusStarting TaskStatusType = "starting"
	TaskStatusRunning  TaskStatusType = "running"
	TaskStatusFailed   TaskStatusType = "failed"
	TaskStatusFinish   TaskStatusType = "finish"
)

func (TaskStatusType) Terminated

func (tst TaskStatusType) Terminated() bool

Terminated check if the task status is in terminated

type TypeName

type TypeName string

Engine type name

func (TypeName) String

func (t TypeName) String() string

String return the engine type name

type WhiteListKey

type WhiteListKey struct {
	IP        string `json:"ip"`
	ProjectID string `json:"project_id"`
}

WhiteListKey describe the primary key in whitelist basic, it is a ip - projectID pair.

type WhitelistBasic

type WhitelistBasic struct {
	ProjectID string
	IP        string
	Message   string
}

WhitelistBasic describe the basic whitelist information One whitelist record means:

A request from the IP, ask to applying a new task under the Project, will be approved.

func ListWhitelistBasic

func ListWhitelistBasic(egn Engine, projectID string) ([]*WhitelistBasic, error)

ListWhitelistBasic list whitelist basic from engine

func TableWhitelist2WhitelistBasic

func TableWhitelist2WhitelistBasic(basic *TableWhitelistBasic) *WhitelistBasic

TableWhitelist2WhitelistBasic convert WhitelistBasic from database field into struct data

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL