Documentation ¶
Index ¶
- Constants
- Variables
- func CheckTaskIDValid(egn Engine, taskID string) (bool, error)
- func CreateTaskBasic(egn Engine, tb *TaskBasic) error
- func GetMapExcludeTableTaskBasic(source interface{}) (map[string]interface{}, error)
- func UpdateProjectInfoBasic(egn Engine, projectID string, delta DeltaProjectInfoBasic) error
- func UpdateTaskBasic(egn Engine, tb *TaskBasic, rawWhere []string) error
- type DeltaProjectInfoBasic
- type Engine
- type MySQLConf
- type Preferences
- type ProjectBasic
- type QueueBriefInfo
- type QueueShareType
- type StagingTaskQueue
- type TableBasic
- type TableProjectBasic
- type TableProjectInfoBasic
- type TableTaskBasic
- type TableWhitelistBasic
- type TaskBasic
- type TaskBasicClient
- type TaskBasicStatus
- func (tbs *TaskBasicStatus) Beats()
- func (tbs *TaskBasicStatus) BeforeRunning() bool
- func (tbs *TaskBasicStatus) ChangeStatus(status TaskStatusType)
- func (tbs *TaskBasicStatus) Create()
- func (tbs *TaskBasicStatus) End()
- func (tbs *TaskBasicStatus) FailWithClientCancel()
- func (tbs *TaskBasicStatus) FailWithClientLost()
- func (tbs *TaskBasicStatus) FailWithServerDown()
- func (tbs *TaskBasicStatus) Finish()
- func (tbs *TaskBasicStatus) Init()
- func (tbs *TaskBasicStatus) Launch()
- func (tbs *TaskBasicStatus) Ready()
- func (tbs *TaskBasicStatus) ShutDown()
- func (tbs *TaskBasicStatus) Start()
- func (tbs *TaskBasicStatus) Update()
- type TaskExtension
- type TaskListOptions
- type TaskPriority
- type TaskQueueGroup
- type TaskStatusCode
- type TaskStatusType
- type TypeName
- type WhiteListKey
- type WhitelistBasic
Constants ¶
const (
WhiteListAllProjectID = "DIST_CC_CONST::ALL_PROJECT"
)
Variables ¶
var ( ErrorUnknownEngineType = fmt.Errorf("unknown engine type") ErrorNoTaskInQueue = fmt.Errorf("there is no task in the queue") ErrorNoEnoughResources = fmt.Errorf("no enough resources") ErrorTaskNoFound = fmt.Errorf("task no found") ErrorUnterminatedTaskNoFound = fmt.Errorf("unterminated task no found") ErrorProjectNoFound = fmt.Errorf("project no found") ErrorWhitelistNoFound = fmt.Errorf("whitelist no found") ErrorInnerEngineError = fmt.Errorf("inner engine error") ErrorNoSupportDegrade = fmt.Errorf("no support degrade") ErrorNoQueueNameSpecified = fmt.Errorf("no queue name specified") ErrorUnknownMessageType = fmt.Errorf("unknown message type") ErrorIPNotSpecified = fmt.Errorf("ip not specified") )
Functions ¶
func CheckTaskIDValid ¶
CheckTaskIDValid check if the taskID is valid, no used. By checking the engine task basic.
func CreateTaskBasic ¶
CreateTaskBasic create new task basic into database
func GetMapExcludeTableTaskBasic ¶
GetMapExcludeTableTaskBasic received a table which inherit the TableTaskBasic and marshal it into map[string]interface{}, then remove the keys provided by TableTaskBasic Then the return data can be used to update table and will never change TableTaskBasic
func UpdateProjectInfoBasic ¶
func UpdateProjectInfoBasic(egn Engine, projectID string, delta DeltaProjectInfoBasic) error
UpdateProjectInfoBasic update the project info basic with delta data.
Types ¶
type DeltaProjectInfoBasic ¶
DeltaProjectInfoBasic contains the delta part of project info basic.
type Engine ¶
type Engine interface { // Name() is to get this engine name, this name must match the project.EngineName Name() TypeName // SelectFirstTaskBasic receive a task queue group(which contains many queues) and the target queue name // let the engine decide which task basic should be get in priority // basically, queueGroup.GetQueue(queueName).First() is just fine SelectFirstTaskBasic(queueGroup *TaskQueueGroup, queueName string) (*TaskBasic, error) // CreateTaskExtension provide the task basic data and the extra data, // extra data should be defined by different engines and decode inside engines // engine should generate the concerned data and save it during this function CreateTaskExtension(tb *TaskBasic, extra []byte) error // Get TaskExtension from engine // Task provide many public functions so that manager don't have to ask engine everything between progresses GetTaskExtension(taskID string) (TaskExtension, error) // LaunchTask tell engine to launch the distribute service // it should be asynchronous and return immediately after exec the launch job // if there is no enough resources for task launching, just return engine.ErrorNoEnoughResources LaunchTask(tb *TaskBasic, queueName string) error // LaunchDone is used to check if the launch job is done LaunchDone(taskID string) (bool, error) // CheckTask let engine check this task status CheckTask(tb *TaskBasic) error // DegradeTask will be called when manager decide to degrade this task, such as staging timeout // engine can just return an error if no support degrade mode. DegradeTask(taskID string) error // SendProjectMessage provide the message to engine, // it should be decode inside the engines // then get data back SendProjectMessage(projectID string, message []byte) ([]byte, error) // SendTaskMessage provide the message to engine, // it should be decode inside the engines // then get data back SendTaskMessage(taskID string, message []byte) ([]byte, error) // CollectTaskData tell engine that this distribute service will not be used any more, // fell free to collect data or some after-work jobs CollectTaskData(tb *TaskBasic) error // ReleaseTask tell engine to shutdown the service and release the resource ReleaseTask(taskID string) error // GetPreferences return the engine preferences settings GetPreferences() Preferences // GetTaskBasicTable return the DB instance where TableTaskBasic is GetTaskBasicTable() *gorm.DB // GetProjectBasicTable return the DB instance where TableProjectBasic is GetProjectBasicTable() *gorm.DB // GetProjectInfoBasicTable return the DB instance where TableProjectInfoBasic is GetProjectInfoBasicTable() *gorm.DB // GetWhitelistBasicTable return the DB instance where TableWhitelistBasic is GetWhitelistBasicTable() *gorm.DB }
Engine describe how different distribute service work under the manager
type MySQLConf ¶
type MySQLConf struct { MySQLStorage string MySQLDatabase string MySQLUser string MySQLPwd string Charset string MySQLDebug bool MysqlTableOption string }
MySQLConf describe the mysql connection config need by engines
type Preferences ¶
type Preferences struct { // heartbeat timeout = heartbeat tick gap * tick times HeartbeatTimeoutTickTimes int }
Preferences describe the custom settings by different engines. these settings matters when manager handle some public works such as heartbeat.
type ProjectBasic ¶
type ProjectBasic struct { // PRIMARY key, which should be unique in global(among all engines). ProjectID string // detail name for this project ProjectName string // detail descriptions for this project Message string // Priority defines the priority of tasks launched by this project // which matters the task position in waiting queue Priority TaskPriority // QueueName defines which queue will manage the tasks launched by this project. QueueName string // EngineName decide which engine will be used to launch the tasks from this project EngineName TypeName // StageTimeout defines the timeout(seconds) when tasks in staging status. // After timeout, the task will be regarded as in the situation "no enough resources", // then the task will be do the regraded progress StageTimeout int // Concurrency defines the max concurrency unterminated task under this projects // a new task which reach the limits will be rejected. Concurrency int }
ProjectBasic describe the basic settings of project,
func GetProjectBasic ¶
func GetProjectBasic(egn Engine, projectID string) (*ProjectBasic, error)
GetProjectBasic get project basic from engine
func TableProject2ProjectBasic ¶
func TableProject2ProjectBasic(basic *TableProjectBasic) *ProjectBasic
TableProject2ProjectBasic convert ProjectBasic from database field into struct data
type QueueBriefInfo ¶
QueueBriefInfo describe a brief information of a queue it is a queue - engine pair
type StagingTaskQueue ¶
type StagingTaskQueue interface { // get length of this queue Len() int64 // list all TaskBasic from this queue, order by Priority and CreateTime(in same Priority) All() []*TaskBasic // get first instance from this queue First() (*TaskBasic, error) // check if task exist Exist(taskID string) bool // get the specific TaskBasic rank in this queue Rank(taskID string) (int, error) // add new TaskBasic into this queue // the ordering will be adjusted automatically Add(task *TaskBasic) // delete the specific TaskBasic from this queue Delete(taskID string) error // clear all elements from this queue Clear() }
StagingTaskQueue maintains a priority-driven queue for TaskBasic
func NewStagingTaskQueue ¶
func NewStagingTaskQueue() StagingTaskQueue
NewStagingTaskQueue get a new, empty, initialized StagingTaskQueue
type TableBasic ¶
type TableBasic struct { UpdatedAt time.Time `gorm:"column:update_at" json:"-"` Disabled bool `gorm:"column:disabled;default:false;index" json:"-"` }
TableBasic contains some basic fields in all tables.
type TableProjectBasic ¶
type TableProjectBasic struct { TableBasic ProjectID string `gorm:"column:project_id;primary_key" json:"project_id"` ProjectName string `gorm:"column:project_name" json:"project_name"` Priority int `gorm:"column:priority" json:"priority"` EngineName string `gorm:"column:engine_name" json:"engine_name"` QueueName string `gorm:"column:queue_name" json:"queue_name"` StageTimeout int `gorm:"column:stage_timeout;default:60" json:"stage_timeout"` Message string `gorm:"column:message" sql:"type:text" json:"message"` Concurrency int `gorm:"column:concurrency;default:0" json:"concurrency"` }
TableProjectBasic contains the project basic fields. All engine project should contains these basic fields.
type TableProjectInfoBasic ¶
type TableProjectInfoBasic struct { TableBasic ProjectID string `gorm:"column:project_id;primary_key" json:"project_id"` CompileFinishTimes int64 `gorm:"column:compile_finish_times" json:"compile_finish_times"` CompileFailedTimes int64 `gorm:"column:compile_failed_times" json:"compile_failed_times"` }
TableProjectInfoBasic contains the project info basic fields.
type TableTaskBasic ¶
type TableTaskBasic struct { TableBasic TaskID string `gorm:"column:task_id;primary_key" json:"task_id"` // basic client EngineName string `gorm:"column:engine_name" json:"engine_name"` QueueName string `gorm:"column:queue_name" json:"queue_name"` ProjectID string `gorm:"column:project_id;index" json:"project_id"` BuildID string `gorm:"column:build_id" json:"build_id"` ClientIP string `gorm:"column:client_ip" sql:"type:text" json:"client_ip"` ClientCPU int `gorm:"column:client_cpu" json:"client_cpu"` ClientVersion string `gorm:"column:client_version" json:"client_version"` ClientMessage string `gorm:"column:client_message" sql:"type:text" json:"client_message"` StageTimeout int `gorm:"column:stage_timeout;default:60" json:"stage_timeout"` Priority int `gorm:"column:priority" json:"priority"` // basic status Status string `gorm:"column:status;index" json:"status"` StatusCode int `gorm:"column:status_code" json:"status_code"` StatusMessage string `gorm:"column:status_message" sql:"type:text" json:"status_message"` Released bool `gorm:"column:released;index" json:"released"` LastHeartBeatTime int64 `gorm:"column:last_heartbeat_time" json:"last_heartbeat_time"` StatusChangeTime int64 `gorm:"column:status_change_time" json:"status_change_time"` InitTime int64 `gorm:"column:init_time" json:"init_time"` CreateTime int64 `gorm:"column:create_time;index" json:"create_time"` UpdateTime int64 `gorm:"column:update_time" json:"update_time"` LaunchTime int64 `gorm:"column:launch_time" json:"launch_time"` ReadyTime int64 `gorm:"column:ready_time" json:"ready_time"` ShutDownTime int64 `gorm:"column:shutdown_time" json:"shutdown_time"` StartTime int64 `gorm:"column:start_time" json:"start_time"` EndTime int64 `gorm:"column:end_time" json:"end_time"` }
TableTaskBasic contains the task basic fields. All engine task should contains these basic fields.
func TaskBasic2TableTask ¶
func TaskBasic2TableTask(basic *TaskBasic) *TableTaskBasic
TaskBasic2TableTask convert TaskBasic from struct data into database field
type TableWhitelistBasic ¶
type TableWhitelistBasic struct { TableBasic IP string `gorm:"column:ip;primary_key" json:"ip"` ProjectID string `gorm:"column:project_id;primary_key" json:"project_id"` Message string `gorm:"column:message" sql:"type:text" json:"message"` }
TableProjectBasic contains the project basic fields. All engine project should contains these basic fields.
func (*TableWhitelistBasic) CheckData ¶
func (tb *TableWhitelistBasic) CheckData() error
CheckData check whitelist basic data
type TaskBasic ¶
type TaskBasic struct { // the primary key of tasks ID string // settings when created Client TaskBasicClient // status after created Status TaskBasicStatus }
TaskBasic describe task basic struct data
func CopyTaskBasic ¶
CopyTaskBasic deep copy a new task basic
func ListTaskBasic ¶
func ListTaskBasic(egn Engine, options TaskListOptions) ([]*TaskBasic, error)
ListTaskBasic list task basic from engine
func TableTask2TaskBasic ¶
func TableTask2TaskBasic(table *TableTaskBasic) *TaskBasic
TableTask2TaskBasic convert TaskBasic from database field into struct data
type TaskBasicClient ¶
type TaskBasicClient struct { EngineName TypeName QueueName string ProjectID string BuildID string Priority TaskPriority ClientIP string ClientCPU int ClientVersion string StageTimeout int Message string }
TaskBasicClient describe task basic settings
type TaskBasicStatus ¶
type TaskBasicStatus struct { Status TaskStatusType StatusCode TaskStatusCode Message string Released bool // Task time LastHeartBeatTime time.Time StatusChangeTime time.Time InitTime time.Time CreateTime time.Time UpdateTime time.Time LaunchTime time.Time ReadyTime time.Time ShutDownTime time.Time StartTime time.Time EndTime time.Time }
TaskBasicStatus describe task basic status
func (*TaskBasicStatus) Beats ¶
func (tbs *TaskBasicStatus) Beats()
Beats record the heartbeat time.
func (*TaskBasicStatus) BeforeRunning ¶
func (tbs *TaskBasicStatus) BeforeRunning() bool
BeforeStarting check whether before running
func (*TaskBasicStatus) ChangeStatus ¶
func (tbs *TaskBasicStatus) ChangeStatus(status TaskStatusType)
ChangeStatus change task status
func (*TaskBasicStatus) Create ¶
func (tbs *TaskBasicStatus) Create()
Create make task created, right after basic and extension are all created.
func (*TaskBasicStatus) End ¶
func (tbs *TaskBasicStatus) End()
End record the task end time, right after the task server end the service.
func (*TaskBasicStatus) FailWithClientCancel ¶
func (tbs *TaskBasicStatus) FailWithClientCancel()
FailWithClientCancel make task failed by client side request
func (*TaskBasicStatus) FailWithClientLost ¶
func (tbs *TaskBasicStatus) FailWithClientLost()
FailWithClientLost make task failed by client lost
func (*TaskBasicStatus) FailWithServerDown ¶
func (tbs *TaskBasicStatus) FailWithServerDown()
FailWithServerDown make task failed by server
func (*TaskBasicStatus) Finish ¶
func (tbs *TaskBasicStatus) Finish()
Finish make task finish, right after the task finish without any error.
func (*TaskBasicStatus) Init ¶
func (tbs *TaskBasicStatus) Init()
Init make task init, right after basic created.
func (*TaskBasicStatus) Launch ¶
func (tbs *TaskBasicStatus) Launch()
Launch make task launched, right after task server is starting
func (*TaskBasicStatus) Ready ¶
func (tbs *TaskBasicStatus) Ready()
Ready make task ready, right after task server is running
func (*TaskBasicStatus) ShutDown ¶
func (tbs *TaskBasicStatus) ShutDown()
ShutDown make task shutdown, right after task server is released
func (*TaskBasicStatus) Start ¶
func (tbs *TaskBasicStatus) Start()
Start record the task start time, right after the task server begins to provide service.
type TaskExtension ¶
type TaskExtension interface { // if task get enough available resource EnoughAvailableResource() bool // get worker list from task WorkerList() []string GetRequestInstance() int GetWorkerCount() int // dump the task data Dump() []byte // return the engine custom data from task CustomData(interface{}) interface{} }
TaskExtension describe the extension part beyond task basic according to different engines
type TaskListOptions ¶
type TaskListOptions struct { Status []TaskStatusType Released *bool }
TaskListOptions describe tasks list filter conditions get all status if len(Status) is 0 get all no matter released or not if Released is nil
func NewTaskListOptions ¶
func NewTaskListOptions(released *bool, statusList ...TaskStatusType) TaskListOptions
NewTaskListOptions get a new task list options
type TaskPriority ¶
type TaskPriority uint
TaskPriority include 1 ~ 20, the greater number will be set to 20. If the priority is 0, the task will never be launch.
const ( MaxTaskPriority TaskPriority = 20 MinTaskPriority TaskPriority = 1 DefaultTaskPriority TaskPriority = 10 DefaultTaskStageTimeout = 60 )
type TaskQueueGroup ¶
TaskQueueGroup handle a few StagingTaskQueues
func NewTaskQueueGroup ¶
func NewTaskQueueGroup() *TaskQueueGroup
NewTaskQueueGroup get a new, empty, initialized TaskQueueGroup
func (*TaskQueueGroup) DeleteTask ¶
func (tqg *TaskQueueGroup) DeleteTask(taskID string) bool
DeleteTask delete a task from all queues of groups
func (*TaskQueueGroup) Exist ¶
func (tqg *TaskQueueGroup) Exist(taskID string) bool
Exist check if the task is in any queues of groups
func (*TaskQueueGroup) GetQueue ¶
func (tqg *TaskQueueGroup) GetQueue(name string) StagingTaskQueue
GetQueue get the specific queue from group
type TaskStatusCode ¶
type TaskStatusCode uint
const ( TaskStatusCodeFinished TaskStatusCode = iota TaskStatusCodeUnknown TaskStatusCodeClientCancelInStaging TaskStatusCodeClientCancelInStarting TaskStatusCodeClientCancelInRunning TaskStatusCodeClientLostInStaging TaskStatusCodeClientLostInStarting TaskStatusCodeClientLostInRunning TaskStatusCodeServerFailedInStaging TaskStatusCodeServerFailedInStarting TaskStatusCodeServerFailedInRunning )
func (TaskStatusCode) ServerAlive ¶
func (sc TaskStatusCode) ServerAlive() bool
ServerAlive if task server still alive in current status
func (TaskStatusCode) String ¶
func (sc TaskStatusCode) String() string
String get task status code string
type TaskStatusType ¶
type TaskStatusType string
const ( TaskStatusInit TaskStatusType = "init" TaskStatusStaging TaskStatusType = "staging" TaskStatusStarting TaskStatusType = "starting" TaskStatusRunning TaskStatusType = "running" TaskStatusFailed TaskStatusType = "failed" TaskStatusFinish TaskStatusType = "finish" )
func (TaskStatusType) Terminated ¶
func (tst TaskStatusType) Terminated() bool
Terminated check if the task status is in terminated
type WhiteListKey ¶
WhiteListKey describe the primary key in whitelist basic, it is a ip - projectID pair.
type WhitelistBasic ¶
WhitelistBasic describe the basic whitelist information One whitelist record means:
A request from the IP, ask to applying a new task under the Project, will be approved.
func ListWhitelistBasic ¶
func ListWhitelistBasic(egn Engine, projectID string) ([]*WhitelistBasic, error)
ListWhitelistBasic list whitelist basic from engine
func TableWhitelist2WhitelistBasic ¶
func TableWhitelist2WhitelistBasic(basic *TableWhitelistBasic) *WhitelistBasic
TableWhitelist2WhitelistBasic convert WhitelistBasic from database field into struct data