Documentation ¶
Index ¶
- Constants
- func APIAcceptTTJobs(jobs []*thunder.RequestAddJobsJobT) ([]uint64, error)
- func APILogFinish(hostname string, runId uint64, prevStatus string, success bool) (string, error)
- func APIUpdateRunStatus(hostname string, runId uint64, prevStatus, status string) error
- func GenerateJobsCycle()
- func GetKillerThreadsList() []string
- func GetLauncherThreadsList() []string
- func SelectRunQueue() (map[string][]*RunQueueEntry, error)
- func Setup(config common.FullConfig)
- func WriteLogsThread(filename string)
- type DebugPrintRequest
- type DispatcherData
- type DispatcherThreadDescr
- type FinishEvent
- type FinishResult
- type FinishResultRunInfo
- type FinishResultRusage
- type FlagEntry
- type JobGenState
- type JobInfoEntry
- type Jobs
- type JobsCountRequest
- type KillRequest
- type LauncherData
- type LauncherDebugPrintRequest
- type LauncherLogFinishRequest
- type LauncherLogFinishResponse
- type LauncherState
- type LauncherUpdateStatusRequest
- type LoadStateFunc
- type NewJobs
- type NextGenParams
- type NextTsCallback
- type RunQueueEntry
- type ScriptEntry
- type ScriptRusageEntry
- type ScriptSettings
- type ServerInfo
- type TTPriorityQueue
- type TimetableEntry
Constants ¶
View Source
const ( CYCLE_CLASS_NAME = "ScriptFramework\\Script_JobGenerator" PINBA_CLASS_NAME = "\\" + CYCLE_CLASS_NAME JOBS_TYPE_NONE = "none" JOBS_TYPE_INSTANCES = "instances" JOBS_TYPE_RANGE = "range" JOBS_TYPE_CUSTOM = "custom" LOCATION_TYPE_ANY = "any" LOCATION_TYPE_EACH = "each" LOCATION_ALL = "*" METHOD_RUN = "run" METHOD_INIT_JOBS = "initJobs" METHOD_FINISH_JOBS = "finishJobs" RUN_STATUS_WAITING = "Waiting" RUN_STATUS_INIT = "Init" RUN_STATUS_RUNNING = "Running" RUN_STATUS_FINISHED = "Finished" DEFAULT_LOCATION_IDX = "0" HOSTS_UPDATE_INTERVAL = time.Second * 5 DELETE_IDS_KEEP_GENERATIONS = 3 // buffered channel (1) + processing (1) + selecting (1) DEVELOPER_DEBUG_HOSTNAME = "www1" SELECT_HOSTNAME_MAX_WAITING_LEN = 5 THROTTLE_CHAN_CAPACITY = 5 )
View Source
const ( PHPROXY_TAG = "scriptframework" INIT_TIMEOUT_SEC = 20 DEVELOPER_CUSTOM_PATH_TIMEOUT = 14400 // how long "developer" field has effect, in seconds PROFILING_TIMEOUT = 3600 // how long "profiling_enabled" field has effect, in seconds DEBUG_TIMEOUT = 1200 // how long "debug_enabled" field has effect, in seconds KILL_ACTION_NO_ACTION = "noAction" KILL_ACTION_DELETE_FROM_QUEUE = "deleteFromQueue" KILL_ACTION_SET_WAITING = "setWaiting" KILL_ACTION_LOG_SCRIPT_FINISH_INIT = "logScriptFinishInit" KILL_ACTION_LOG_SCRIPT_FINISH_RUNNING = "logScriptFinishRunning" )
View Source
const ( TABLE_ACTION_LOG = "ActionLog" TABLE_SCRIPT = "Script" TABLE_ACTION_SETTINGS = "ActionScriptSettings" TABLE_SCRIPT_SETTINGS = "ScriptSettings" TABLE_SCRIPT_JOB_INFO = "ScriptJobInfo" TABLE_SCRIPT_JOB_RESULT = "ScriptJobResult" TABLE_SCRIPT_FAIL_INFO = "ScriptFailInfo" TABLE_SCRIPT_RUSAGE_STATS = "ScriptRusageStats" TABLE_SCRIPT_TIMETABLE = "ScriptTimetable" TABLE_SCRIPT_TAG = "ScriptTag" TABLE_SCRIPT_FLAGS = "ScriptFlags" TABLE_SERVER = "Server" TABLE_SERVER_GROUP = "ServerGroup" TABLE_RUN_QUEUE = "RunQueue" )
View Source
const ( QUERY_SELECT_FROM_TIMETABLE = `SELECT id, generation_id, class_name, ` + "`repeat`" + `, retry_count, default_retry, job_data, method, location, UNIX_TIMESTAMP(finished_ts), finished_successfully, finish_count, UNIX_TIMESTAMP(next_launch_ts), UNIX_TIMESTAMP(added_to_queue_ts), token, settings_id, UNIX_TIMESTAMP(created) FROM ` + TABLE_SCRIPT_TIMETABLE + ` #add_where#` QUERY_GET_RUNQUEUE = `SELECT id, class_name, timetable_id, generation_id, hostname, hostname_idx, job_data, method, run_status, UNIX_TIMESTAMP(created), UNIX_TIMESTAMP(waiting_ts), UNIX_TIMESTAMP(should_init_ts), init_attempts, UNIX_TIMESTAMP(init_ts), UNIX_TIMESTAMP(running_ts), UNIX_TIMESTAMP(max_finished_ts), UNIX_TIMESTAMP(finished_ts), stopped_employee_id, token, retry_attempt, settings_id FROM ` + TABLE_RUN_QUEUE + ` #where#` QUERY_SIMPLE_GET_SCRIPTS_FOR_PLATFORM = "SELECT class_name, settings_id FROM " + TABLE_SCRIPT QUERY_GET_NEW_SETTINGS = `SELECT id, class_name, instance_count, max_time, jobs, next_ts_callback, ` + "`repeat`" + `, retry, ttl, repeat_job, retry_job, location, location_type, developer, max_retries, profiling_enabled, debug_enabled, named_params, UNIX_TIMESTAMP(created) FROM ` + TABLE_SCRIPT_SETTINGS + ` WHERE id IN(#new_settings_ids#)` QUERY_GET_JOB_INFO = `SELECT generation_id, class_name, location, UNIX_TIMESTAMP(init_jobs_ts), UNIX_TIMESTAMP(jobs_generated_ts), UNIX_TIMESTAMP(finish_jobs_ts), UNIX_TIMESTAMP(next_generate_job_ts), settings_id FROM ` + TABLE_SCRIPT_JOB_INFO QUERY_GET_FLAGS = `SELECT class_name, UNIX_TIMESTAMP(run_requested_ts), UNIX_TIMESTAMP(run_accepted_ts), UNIX_TIMESTAMP(pause_requested_ts), UNIX_TIMESTAMP(kill_requested_ts), kill_request_employee_id, UNIX_TIMESTAMP(run_queue_killed_ts), UNIX_TIMESTAMP(killed_ts), UNIX_TIMESTAMP(paused_ts) FROM ` + TABLE_SCRIPT_FLAGS QUERY_GET_SCRIPTS_RUSAGE_STATS = `SELECT class_name, real_time, sys_time, user_time, max_memory FROM ` + TABLE_SCRIPT_RUSAGE_STATS QUERY_GET_AVAILABLE_HOSTS = `SELECT hostname, ` + "`group`" + `, FLOOR(cpu_idle * cpu_parrots_per_core * cpu_cores / 100) AS cpu_idle_parrots, cpu_parrots_per_core, ROUND(cpu_idle * cpu_cores / 100, 2) AS cpu_idle_cores, cpu_cores, cpu_parasite, mem_total, mem_free, mem_cached, mem_parasite, swap_used, min_memory, min_memory_ratio FROM ` + TABLE_SERVER + ` WHERE phproxyd_heartbeat_ts > NOW() - INTERVAL 15 SECOND AND disabled_ts IS NULL ORDER BY cpu_idle_parrots DESC` // Update queries QUERY_INSERT_INTO_TIMETABLE = `INSERT INTO ` + TABLE_SCRIPT_TIMETABLE + ` (class_name, default_retry, ` + "`repeat`" + `, method, finished_successfully, generation_id, settings_id, location, job_data, shard_id, created, next_launch_ts) VALUES #values#` QUERY_INSERT_INTO_JOB_INFO = `INSERT INTO ` + TABLE_SCRIPT_JOB_INFO + ` #fields# VALUES #values#` QUERY_SET_JOBS_GENERATED_TS = `UPDATE ` + TABLE_SCRIPT_JOB_INFO + ` SET jobs_generated_ts = NOW() WHERE class_name = '#class_name#' AND location IN(#locations#)` QUERY_CLEAR_JOB_RESULTS = `DELETE FROM ` + TABLE_SCRIPT_JOB_RESULT + ` WHERE class_name = '#class_name#'` QUERY_CLEAR_JOB_RESULTS_FOR_LOCATIONS = `DELETE FROM ` + TABLE_SCRIPT_JOB_RESULT + ` WHERE class_name = '#class_name#' AND location IN(#locations#)` QUERY_SET_NEXT_GENERATE_JOB_TS = `UPDATE ` + TABLE_SCRIPT_JOB_INFO + ` SET next_generate_job_ts = FROM_UNIXTIME(#next_generate_job_ts#), jobs_generated_ts = NULL, jobs_finished_ts = NULL, init_jobs_ts = NULL, finish_jobs_ts = NULL, generation_id = generation_id + 1, settings_id = #settings_id# WHERE class_name = '#class_name#'` QUERY_BATCH_SET_NEXT_GENERATE_JOB_TS = `INSERT INTO ` + TABLE_SCRIPT_JOB_INFO + ` #fields# VALUES #values# ON DUPLICATE KEY UPDATE next_generate_job_ts = VALUES(next_generate_job_ts), jobs_generated_ts = VALUES(jobs_generated_ts), jobs_finished_ts = VALUES(jobs_finished_ts), init_jobs_ts = VALUES(init_jobs_ts), finish_jobs_ts = VALUES(finish_jobs_ts), generation_id = generation_id + 1, settings_id = VALUES(settings_id)` QUERY_SET_INIT_JOBS_TS = `UPDATE ` + TABLE_SCRIPT_JOB_INFO + ` SET init_jobs_ts = NOW() WHERE class_name = '#class_name#' AND location IN(#locations#)` QUERY_SET_FINISH_JOBS_TS = `UPDATE ` + TABLE_SCRIPT_JOB_INFO + ` SET finish_jobs_ts = NOW() WHERE class_name = '#class_name#' AND location IN(#locations#)` QUERY_SET_MAX_FINISHED_TS_NOW = `UPDATE ` + TABLE_RUN_QUEUE + ` SET max_finished_ts = FROM_UNIXTIME(#ts#), stopped_employee_id = #employee_id# WHERE class_name = '#class_name#'` QUERY_INSERT_INTO_RUN_QUEUE = "INSERT INTO " + TABLE_RUN_QUEUE + "#fields# VALUES#values#" QUERY_LOG_ADD_TO_QUEUE = "UPDATE " + TABLE_SCRIPT_TIMETABLE + " SET added_to_queue_ts = NOW() WHERE id IN(#ids#) AND added_to_queue_ts IS NULL" QUERY_UPDATE_TIMETABLE_STATUS = "UPDATE " + TABLE_SCRIPT_TIMETABLE + ` SET finished_ts = FROM_UNIXTIME(#finished_ts#), next_launch_ts = FROM_UNIXTIME(#next_launch_ts#), added_to_queue_ts = FROM_UNIXTIME(#added_to_queue_ts#), retry_count = #retry_count#, finish_count = #finish_count#, finished_successfully = #finished_successfully# WHERE id = #id# #add_where#` QUERY_DELETE_FROM_TIMETABLE = "DELETE FROM " + TABLE_SCRIPT_TIMETABLE + " WHERE id IN(#ids#) #add_where#" QUERY_RESET_RUN_REQUEST = `UPDATE ` + TABLE_SCRIPT_FLAGS + ` SET run_requested_ts = NULL, run_accepted_ts = NULL WHERE class_name = '#class_name#'` QUERY_SET_RUN_ACCEPTED = `UPDATE ` + TABLE_SCRIPT_FLAGS + ` SET run_accepted_ts = NOW() WHERE class_name = '#class_name#'` QUERY_DELETE_FROM_QUEUE = "DELETE FROM " + TABLE_RUN_QUEUE + " WHERE id IN(#ids#) AND run_status = '#status#'" QUERY_UPDATE_RUN_STATUS = "UPDATE " + TABLE_RUN_QUEUE + ` SET run_status = '#status#', #status#_ts = NOW() WHERE id = #id# AND run_status = '#prev_status#'` QUERY_UPDATE_RUN_STATUS_INIT = "UPDATE " + TABLE_RUN_QUEUE + ` SET run_status = 'Init', init_ts = NOW(), max_finished_ts = created + INTERVAL #max_time# SECOND WHERE id = #id#` QUERY_QUERY_CLEAR_OLD_HEARTBEATS = `DELETE FROM ` + TABLE_RUN_QUEUE + ` WHERE class_name = '#class_name#' AND run_status = 'Waiting' AND created < NOW() - INTERVAL 30 SECOND` QUERY_CHECK_TT_IDS = "SELECT id FROM " + TABLE_SCRIPT_TIMETABLE + " WHERE id IN(#ids#)" )
language=SQL
View Source
const LAUNCHER_DB_DEBUG = false
View Source
const MAX_API_JOBS = 50000
View Source
const MAX_TIME_DISCREPANCY = 3
Variables ¶
This section is empty.
Functions ¶
func APIAcceptTTJobs ¶
func APILogFinish ¶
string is json-encode'd run queue entry row
func APIUpdateRunStatus ¶
func GenerateJobsCycle ¶
func GenerateJobsCycle()
func GetKillerThreadsList ¶
func GetKillerThreadsList() []string
func GetLauncherThreadsList ¶
func GetLauncherThreadsList() []string
func SelectRunQueue ¶
func SelectRunQueue() (map[string][]*RunQueueEntry, error)
func Setup ¶
func Setup(config common.FullConfig)
func WriteLogsThread ¶
func WriteLogsThread(filename string)
Types ¶
type DebugPrintRequest ¶
type DebugPrintRequest struct { Waiting bool Added bool RespCh chan *JobGenState }
type DispatcherData ¶
type DispatcherData struct {
// contains filtered or unexported fields
}
type DispatcherThreadDescr ¶
func GetDispatcherThreadsList ¶
func GetDispatcherThreadsList() []DispatcherThreadDescr
type FinishEvent ¶
type FinishEvent struct {
// contains filtered or unexported fields
}
type FinishResult ¶
type FinishResult struct { Id uint64 `json:"id"` TimetableId uint64 `json:"timetable_id"` ClassName string `json:"class_name"` Hostname string `json:"hostname"` Success bool `json:"success"` PrevStatus string `json:"prev_status"` Rusage FinishResultRusage `json:"rusage"` ProfilingUrl string `json:"profiling_url"` Initial bool `json:"initial"` Timestamp uint64 `json:"timestamp"` RunInfo FinishResultRunInfo `json:"run_info"` }
type FinishResultRunInfo ¶
type FinishResultRunInfo struct { Id uint64 `json:"id"` TimetableId uint64 `json:"timetable_id"` GenerationId uint64 `json:"generation_id"` Hostname string `json:"hostname"` HostnameIdx uint32 `json:"hostname_idx"` ClassName string `json:"class_name"` JobData string `json:"job_data"` Method string `json:"method"` RunStatus string `json:"run_status"` Created interface{} `json:"created"` WaitingTs interface{} `json:"waiting_ts"` ShouldInitTs interface{} `json:"should_init_ts"` InitAttempts uint32 `json:"init_attempts"` InitTs interface{} `json:"init_ts"` RunningTs interface{} `json:"running_ts"` MaxFinishedTs interface{} `json:"max_finished_ts"` FinishedTs interface{} `json:"finished_ts"` StoppedEmployeeId int64 `json:"stopped_employee_id"` Token string `json:"token"` RetryAttempt uint32 `json:"retry_attempt"` SettingsId uint64 `json:"settings_id"` }
type FinishResultRusage ¶
type JobGenState ¶
type JobGenState struct { Added []*TimetableEntry Waiting []*TimetableEntry RawResp string }
func GetDispatcherJobs ¶
func GetDispatcherJobs(className, location string) (*JobGenState, error)
type JobInfoEntry ¶
type JobInfoEntry struct {
// contains filtered or unexported fields
}
type JobsCountRequest ¶
type JobsCountRequest struct {
RespCh chan int
}
type KillRequest ¶
type KillRequest struct {
ResCh chan error // response for request (success or failure)
}
type LauncherData ¶
type LauncherData struct {
// contains filtered or unexported fields
}
type LauncherDebugPrintRequest ¶
type LauncherDebugPrintRequest struct {
RespCh chan *LauncherState
}
type LauncherLogFinishResponse ¶
type LauncherLogFinishResponse struct {
// contains filtered or unexported fields
}
type LauncherState ¶
type LauncherState struct { Waiting []*RunQueueEntry Init []*RunQueueEntry Running []*RunQueueEntry Finished []*RunQueueEntry RawResp string }
func GetLauncherJobs ¶
func GetLauncherJobs(hostname string) (*LauncherState, error)
type LoadStateFunc ¶
type LoadStateFunc struct {
// contains filtered or unexported fields
}
type NextGenParams ¶
type NextGenParams struct { Location string JobInfo *JobInfoEntry }
type NextTsCallback ¶
type RunQueueEntry ¶
type ScriptEntry ¶
type ScriptEntry struct {
// contains filtered or unexported fields
}
type ScriptRusageEntry ¶
type ScriptRusageEntry struct {
// contains filtered or unexported fields
}
type ScriptSettings ¶
type ScriptSettings struct {
// contains filtered or unexported fields
}
type ServerInfo ¶
type ServerInfo struct {
// contains filtered or unexported fields
}
func (*ServerInfo) String ¶
func (entry *ServerInfo) String() string
type TTPriorityQueue ¶
type TTPriorityQueue []*TimetableEntry
TTPriorityQueue implements heap.Interface and holds Items.
func (TTPriorityQueue) Len ¶
func (pq TTPriorityQueue) Len() int
func (TTPriorityQueue) Less ¶
func (pq TTPriorityQueue) Less(i, j int) bool
func (*TTPriorityQueue) Pop ¶
func (pq *TTPriorityQueue) Pop() interface{}
func (*TTPriorityQueue) Push ¶
func (pq *TTPriorityQueue) Push(x interface{})
func (TTPriorityQueue) Swap ¶
func (pq TTPriorityQueue) Swap(i, j int)
type TimetableEntry ¶
type TimetableEntry struct { JobData string NextLaunchTs sql.NullInt64 // contains filtered or unexported fields }
func (*TimetableEntry) String1 ¶
func (p *TimetableEntry) String1() string
Click to show internal directories.
Click to hide internal directories.