scheduler

package
v1.19.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 25, 2022 License: MIT Imports: 43 Imported by: 0

Documentation

Overview

Package scheduler provides scheduler main logic implements.

Transactions There are followed transactions to do applications or taskgroups operation: LAUCH: create and launch an application from version definition DELETE: delete application UPDATE: update application SCALE: scale up or scale down application's instances RESCHEDULE: reschedule taskgroup when it is fail or required by API

Service When applications are running, sometimes they are binded to some services, and need to export to services, Service Manager is implemented to do application bind and export, it watches followed events: Taskgroup Add Taskgroup Delete Taskgroup Update Service Add Service Update Service delete

Status Report When tasks run on slave machine, the status will reported by mesos slave, the report message is processed by function StatusReport

Health Check Report If a running taskgroup is configured to do health check, the health-check result will reported by healthy module, the messeages are processed by HealthyReport

Deployment related functions The deployments' rollingupdate is implemented by using application transactions, refer to function DeploymentCheck

DataChecker DataChecker is responsable for dirty or error data in ZK refer to DataCheckManage

Message Message is used to send message to executor, just as localfile, signal ...

Index

Constants

View Source
const (
	/*schedule taskgroup type*/
	LaunchTaskgroupType     = "launch"
	RescheduleTaskgroupType = "reschedule"
	ScaleTaskgroupType      = "scale"
	UpdateTaskgroupType     = "update"

	/*operate application type*/
	LaunchApplicationType        = "launch"
	DeleteApplicationType        = "delete"
	ScaleApplicationType         = "scale"
	UpdateApplicationType        = "update"
	RollingupdateApplicationType = "rollingupdate"
)
View Source
const (
	SchedulerRoleMaster = "master"
	SchedulerRoleSlave  = "slave"
)
View Source
const DATA_CHECK_INTERVAL = 1200

Interval for checking ZK data

View Source
const MAX_DATA_UPDATE_INTERVAL = 180

Interval for update task, taskgroup, application in ZK

View Source
const MAX_STAGING_UPDATE_INTERVAL = 180
View Source
const MESOS_HEARTBEAT_TIMEOUT = 120

HeartBeat timeout between scheduler and mesos master

View Source
const (
	MaxEventQueueLength = 10240
)
View Source
const TRANSACTION_APPLICATION_LAUNCH_LIFEPERIOD = 1800
View Source
const TRANSACTION_DEFAULT_LIFEPERIOD = 480

Default transaction max lifeperoid, 480 seconds the lifeperoid for all not specified transactions are set to this

View Source
const TRANSACTION_DEPLOYMENT_INNERDELETE_LIFEPERIOD = 7500

Max lifeperoid for innder delete application transaction, 300 seconds If a transaction dosen't finish in its lifePeriod, it will be timeout

View Source
const TRANSACTION_DEPLOYMENT_ROLLING_DOWN_LIFEPERIOD = 7500

Max lifeperoid for every rolling transaction, 300 seconds If a transaction dosen't finish in its lifePeriod, it will be timeout

View Source
const TRANSACTION_DEPLOYMENT_ROLLING_UP_LIFEPERIOD = 300

Max lifeperoid for every rolling transaction, 300 seconds If a transaction dosen't finish in its lifePeriod, it will be timeout

View Source
const TRANSACTION_INNER_RESCHEDULE_LIFEPERIOD = 86400

Max lifePeriod for inner taskgroup-reschedule, 3600 seconds If a transaction dosen't finish in its lifePeriod, it will be timeout

View Source
const TRANSACTION_RESCHEDULE_RESET_INTERVAL = 1800

If taskgroup running than 1800 seconds, the restart times will be reset to 0

Variables

View Source
var (
	ScheduleTaskgroupTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
		Namespace: types.MetricsNamespaceScheduler,
		Subsystem: types.MetricsSubsystemScheduler,
		Name:      "schedule_taskgroup_total",
		Help:      "Total counter of schedule taskgroup",
	}, []string{"namespace", "application", "taskgroup", "type"})

	ScheduleTaskgroupLatencyMs = prometheus.NewHistogramVec(prometheus.HistogramOpts{
		Namespace: types.MetricsNamespaceScheduler,
		Subsystem: types.MetricsSubsystemScheduler,
		Name:      "schedule_taskgroup_latency_ms",
		Help:      "Schedule taskgroup latency milliseconds",
	}, []string{"namespace", "application", "taskgroup", "type"})

	OperateAppTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
		Namespace: types.MetricsNamespaceScheduler,
		Subsystem: types.MetricsSubsystemScheduler,
		Name:      "operate_application_total",
		Help:      "Total counter of operate application",
	}, []string{"namespace", "application", "type"})

	OperateAppLatencySecond = prometheus.NewHistogramVec(prometheus.HistogramOpts{
		Namespace: types.MetricsNamespaceScheduler,
		Subsystem: types.MetricsSubsystemScheduler,
		Name:      "operate_application_latency_second",
		Help:      "Operate application latency seconds",
	}, []string{"namespace", "application", "type"})

	TaskgroupReportTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
		Namespace: types.MetricsNamespaceScheduler,
		Subsystem: types.MetricsSubsystemScheduler,
		Name:      "taskgroup_report_total",
		Help:      "Total counter of report taskgroup status",
	}, []string{"namespace", "application", "taskgroup", "status"})

	TaskgroupOomTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
		Namespace: types.MetricsNamespaceScheduler,
		Subsystem: types.MetricsSubsystemScheduler,
		Name:      "application_oom_total",
		Help:      "Total counter of application oom killed",
	}, []string{"namespace", "application"})
)

Metrics the scheduler info

Functions

func DataCheckManage

func DataCheckManage(mgr *DataCheckMgr, doRecover bool)

DataChecker main function

Types

type DataCheckMgr

type DataCheckMgr struct {
	// contains filtered or unexported fields
}

DataChecker

func CreateDataCheckMgr

func CreateDataCheckMgr(store store.Store, s *Scheduler) (*DataCheckMgr, error)

Create DataChecker

func (*DataCheckMgr) SendMsg

func (mgr *DataCheckMgr) SendMsg(msg *DataCheckMsg) error

Send DataChecker control message

type DataCheckMsg

type DataCheckMsg struct {
	// opencheck
	// closecheck
	// stop
	MsgType string
}

DataChecker control message

type Scheduler

type Scheduler struct {

	// Scheduler Listen IP
	IP string
	// Scheduler Listen Port
	Port int

	// Current Schedulers in the cluster
	Schedulers []*commtype.SchedulerServInfo
	// Current Mesos Masters in the cluster
	Memsoses []*commtype.MesosServInfo

	// Current Role: master, slave, none
	Role string

	// Cluster ID from mesos master
	ClusterId string

	// BCS Cluster ID
	BcsClusterId string

	// Service Manager
	ServiceMgr *ServiceMgr
	// contains filtered or unexported fields
}

Scheduler represents a Mesos scheduler

func NewScheduler

func NewScheduler(config util.Scheduler, store store.Store) *Scheduler

NewScheduler returns a pointer to new Scheduler

func (*Scheduler) BuildTaskGroup

func (s *Scheduler) BuildTaskGroup(version *types.Version, app *types.Application, ID string, reason string) (*types.TaskGroup, error)

Build an taskgroup for application: If ID is empty, the taskgroup's ID will created and its index will be app.Instances, If ID is not empty, the taskgroup's ID will be inputted ID You can input the reason to decribe why the taskgrop is built. The taskgroup will be created in DB, application, and also will be outputted in related service

func (*Scheduler) CheckPodBelongDaemonset

func (s *Scheduler) CheckPodBelongDaemonset(taskgroupId string) bool

check taskgroup whether belongs to daemonset

func (*Scheduler) DeclineOffers

func (s *Scheduler) DeclineOffers(offers []*mesos.Offer) error

Decline offer from mesos master

func (*Scheduler) DeclineResource

func (s *Scheduler) DeclineResource(offerId *string) (*http.Response, error)

DeclineResource is used to send DECLINE request to mesos to release offer. This is very important, otherwise resource will be taked until framework exited.

func (*Scheduler) DeleteDaemonsetTaskGroup

func (s *Scheduler) DeleteDaemonsetTaskGroup(daemon *types.BcsDaemonset, taskGroup *types.TaskGroup)

Delete a taskgroup: the taskgroup will delete from DB, application and service

func (*Scheduler) DeleteTaskGroup

func (s *Scheduler) DeleteTaskGroup(app *types.Application, taskGroup *types.TaskGroup, reason string) error

Delete a taskgroup: the taskgroup will delete from DB, application and service

func (*Scheduler) DeploymentCheck

func (s *Scheduler) DeploymentCheck(ns string, name string, recover bool)

Check deployment status and maintain updating progress when it is in updating If the updating is finish or canceled, the function will come to end

func (*Scheduler) FetchAgentSchedInfo

func (s *Scheduler) FetchAgentSchedInfo(hostname string) (*types.AgentSchedInfo, error)

Get agent schedInfo by hostname

func (*Scheduler) FetchAgentSetting

func (s *Scheduler) FetchAgentSetting(ip string) (*commtype.BcsClusterAgentSetting, error)

Get agent setting by IP

func (*Scheduler) FetchTaskGroup

func (s *Scheduler) FetchTaskGroup(taskGroupID string) (*types.TaskGroup, error)

func (*Scheduler) FinishTransaction

func (s *Scheduler) FinishTransaction(transaction *Transaction)

Finish a transaction, set application status

func (*Scheduler) GetAllOffers

func (s *Scheduler) GetAllOffers() []*offer.Offer

Get current all offers

func (*Scheduler) GetClusterId

func (s *Scheduler) GetClusterId() string

Get Cluster ID

func (*Scheduler) GetClusterResource

func (s *Scheduler) GetClusterResource() (*commtype.BcsClusterResource, error)

Get cluster resources

func (*Scheduler) GetCurrentOffers

func (s *Scheduler) GetCurrentOffers() []*mesos.Offer

func (*Scheduler) GetFirstOffer

func (s *Scheduler) GetFirstOffer() *offer.Offer

Get current first offer from pool

func (*Scheduler) GetHostAttributes

func (s *Scheduler) GetHostAttributes(para *typesplugin.HostPluginParameter) (map[string]*typesplugin.HostAttributes, error)

Get agent attributes

func (*Scheduler) GetMesosResourceIn

func (s *Scheduler) GetMesosResourceIn(mesosClient *client.Client) (*commtype.BcsClusterResource, error)

Get cluster current resource information from mesos master

func (*Scheduler) GetNextOffer

func (s *Scheduler) GetNextOffer(offer *offer.Offer) *offer.Offer

Get next offer from pool

func (*Scheduler) HealthyReport

func (s *Scheduler) HealthyReport(healthyResult *bcstype.HealthCheckResult)

The goroutine function for process health check report When scheduler receive health-check report messege, it will create a goroutine for process this message,

func (*Scheduler) InnerDeleteApplication

func (s *Scheduler) InnerDeleteApplication(runAs, appId string, enforce bool) error

func (*Scheduler) IsConstraintsFit

func (s *Scheduler) IsConstraintsFit(version *types.Version, offer *mesos.Offer, taskgroupID string) bool

Check whether the offer match version constraints

func (*Scheduler) IsOfferExtendedResourcesFitLaunch

func (s *Scheduler) IsOfferExtendedResourcesFitLaunch(needs map[string]*bcstype.ExtendedResource, outOffer *offer.Offer) bool

check whether the offer is match extended resources for launching a taskgroup

func (*Scheduler) IsOfferResourceFitLaunch

func (s *Scheduler) IsOfferResourceFitLaunch(needResource *types.Resource, outOffer *offer.Offer) bool

Check whether the offer is match required resource for launching a taskgroup

func (*Scheduler) IsOfferResourceFitUpdate

func (s *Scheduler) IsOfferResourceFitUpdate(needResource *types.Resource, outOffer *offer.Offer) bool

Check whether the offer is match required resource for updating a taskgroup's resource

func (*Scheduler) KillExecutor

func (s *Scheduler) KillExecutor(agentId, executerId string) (*http.Response, error)

Kill a taskgroup with the agent and executor ID

func (*Scheduler) KillTaskGroup

func (s *Scheduler) KillTaskGroup(taskGroup *types.TaskGroup) (*http.Response, error)

Kill a taskgroup with taskgroup information

func (*Scheduler) LaunchTaskGroup

func (s *Scheduler) LaunchTaskGroup(offer *mesos.Offer, taskGroup *mesos.TaskGroupInfo,
	version *types.Version) (*http.Response, error)

Launch an taskgroup with offered slave resource

func (*Scheduler) LaunchTaskGroups

func (s *Scheduler) LaunchTaskGroups(offer *mesos.Offer, taskGroups []*mesos.TaskGroupInfo,
	version *types.Version) (*http.Response, error)

Launch taskgroups with offered slave resource

func (*Scheduler) OfferedResources

func (s *Scheduler) OfferedResources(offer *mesos.Offer) (cpus, mem, disk float64)

Get offered resource from mesos master

func (*Scheduler) ProcessCommandMessage

func (s *Scheduler) ProcessCommandMessage(bcsMsg *types.BcsMessage)

func (*Scheduler) RunCommand

func (s *Scheduler) RunCommand(command *commtypes.BcsCommandInfo)

func (*Scheduler) RunDeleteApplication

func (s *Scheduler) RunDeleteApplication(transaction *Transaction)

The goroutine function for delete application transaction You can create a transaction for delete application, then call this function to do it This function will come to end as soon as the transaction is done, fail or timeout(as defined by transaction.LifePeriod)

func (*Scheduler) RunInnerScaleApplication

func (s *Scheduler) RunInnerScaleApplication(transaction *Transaction)

The goroutine function for inner scale application transaction You can create a transaction for scale application, then call this function to do it This function will come to end as soon as the transaction is done, fail or timeout(as defined by transaction.LifePeriod)

func (*Scheduler) RunLaunchApplication

func (s *Scheduler) RunLaunchApplication(transaction *Transaction)

RunLaunchApplication The goroutine function for launch application transaction You can create a transaction for launch application, then call this function to do it This function will come to end as soon as the transaction is done, fail or timeout(as defined by transaction.LifePeriod)

func (*Scheduler) RunRescheduleTaskgroup

func (s *Scheduler) RunRescheduleTaskgroup(transaction *Transaction)

The goroutine function for reschedule taskgroup transaction You can create a transaction for reschedule taskgroup, then call this function to do it This function will come to end as soon as the transaction is done, fail or timeout(as defined by transaction.LifePeriod)

func (*Scheduler) RunScaleApplication

func (s *Scheduler) RunScaleApplication(transaction *Transaction)

The goroutine function for scale application transaction You can create a transaction for scale application, then call this function to do it This function will come to end as soon as the transaction is done, fail or timeout(as defined by transaction.LifePeriod)

func (*Scheduler) RunUpdateApplication

func (s *Scheduler) RunUpdateApplication(transaction *Transaction)

The goroutine function for update application transaction You can create a transaction for update application, then call this function to do it This function will come to end as soon as the transaction is done, fail or timeout(as defined by transaction.LifePeriod)

func (*Scheduler) RunUpdateApplicationResource

func (s *Scheduler) RunUpdateApplicationResource(transaction *Transaction)

The goroutine function for update application quota transaction You can create a transaction for update application quota, then call this function to do it This function will come to end as soon as the transaction is done, fail or timeout(as defined by transaction.LifePeriod)

func (*Scheduler) SendBcsMessage

func (s *Scheduler) SendBcsMessage(taskGroup *types.TaskGroup, bcsMsg *types.BcsMessage) (*types.BcsMessage, error)

SendBcsMessage send bcs message to TaskGroup

func (*Scheduler) SendEnv

func (s *Scheduler) SendEnv(taskGroup *types.TaskGroup, name, value string) (*types.BcsMessage, error)

SendEnv send env to the executor, name is the env value key, replace indicates whether to replace an existing one if it is exist already if replace is false, addition or creation is the default behavior

func (*Scheduler) SendHealthMsg

func (s *Scheduler) SendHealthMsg(kind alarm.MessageKind, RunAs, message string, alarmID string, convergenceSeconds *uint16)

Send health message

func (*Scheduler) SendLocalFile

func (s *Scheduler) SendLocalFile(taskGroup *types.TaskGroup, ctxBase64, to, right, user string) (*types.BcsMessage, error)

SendLocalFile send local file to executor

func (*Scheduler) SendMessage

func (s *Scheduler) SendMessage(taskGroup *types.TaskGroup, msg []byte) (*http.Response, error)

SendMessage send msg by scheduler to executor, msg is handled by master with MESSAGE call

func (*Scheduler) SendRemoteFile

func (s *Scheduler) SendRemoteFile(taskGroup *types.TaskGroup, from, to, right, user string) (*types.BcsMessage, error)

SendRemoteFile send remote file to executor

func (*Scheduler) SendSignal

func (s *Scheduler) SendSignal(taskGroup *types.TaskGroup, signal uint32) (*types.BcsMessage, error)

SendSignal send any user specifyed signal to the executor

func (*Scheduler) Start

func (s *Scheduler) Start() error

start starts the scheduler and subscribes to event stream

func (*Scheduler) StatusReport

func (s *Scheduler) StatusReport(status *mesos.TaskStatus)

StatusReport The goroutine function for process task status report When scheduler receive a task status report messege, it will create a goroutine for process this message, #lizard forgives StatusReport

func (*Scheduler) Stop

func (s *Scheduler) Stop()

func (*Scheduler) UpdateAgentSchedInfo

func (s *Scheduler) UpdateAgentSchedInfo(hostname, taskGroupID string, deltaResource *types.Resource) error

Update agent schedinfo by hostname

func (*Scheduler) UpdateMesosAgents

func (s *Scheduler) UpdateMesosAgents()

func (*Scheduler) UpdateTaskStatus

func (s *Scheduler) UpdateTaskStatus(agentID, executorID string, bcsMsg *types.BcsMessage)

UpdateTaskStatus current only update task status running by mesos message, if task status changed by mesos status update

func (*Scheduler) UseOffer

func (s *Scheduler) UseOffer(o *offer.Offer) bool

Use offer

type ServiceMgr

type ServiceMgr struct {
	// contains filtered or unexported fields
}

Service Manager

func NewServiceMgr

func NewServiceMgr(scheduler *Scheduler) *ServiceMgr

Create service manager

func (*ServiceMgr) SendMsg

func (mgr *ServiceMgr) SendMsg(msg *ServiceMgrMsg) error

Send control message to service manager

func (*ServiceMgr) ServiceAdd

func (mgr *ServiceMgr) ServiceAdd(service *commtypes.BcsService)

Send service add event to servie manager

func (*ServiceMgr) ServiceDelete

func (mgr *ServiceMgr) ServiceDelete(service *commtypes.BcsService)

Send service delete event to servie manager

func (*ServiceMgr) ServiceUpdate

func (mgr *ServiceMgr) ServiceUpdate(service *commtypes.BcsService)

Send service updat event to servie manager

func (*ServiceMgr) TaskgroupAdd

func (mgr *ServiceMgr) TaskgroupAdd(taskgroup *types.TaskGroup)

Send taskgroup add event to servie manager

func (*ServiceMgr) TaskgroupDelete

func (mgr *ServiceMgr) TaskgroupDelete(taskgroup *types.TaskGroup)

Send taskgroup delete event to servie manager

func (*ServiceMgr) TaskgroupUpdate

func (mgr *ServiceMgr) TaskgroupUpdate(taskgroup *types.TaskGroup)

Send taskgroup update event to servie manager

func (*ServiceMgr) Worker

func (mgr *ServiceMgr) Worker()

The goroutine function for service monitoring This function will process events of taskgrou add, delete and update This function will process events of service add, delete and update

type ServiceMgrMsg

type ServiceMgrMsg struct {
	// open:  work
	// close:  not work
	// stop: finish
	MsgType string
}

Control message for service manager

type ServiceSyncData

type ServiceSyncData struct {
	// TaskGroup, Service
	DataType string
	// Add, Delete, Update
	Action string
	// Taskgroup or Service point
	Item interface{}
}

Event for service manager

type TransAPIDeleteOpdata

type TransAPIDeleteOpdata struct {
	// if false, the operation will fail when some taskgroups cannot come to end status
	Enforce bool
}

Delete application transaction data

type TransAPILaunchOpdata

type TransAPILaunchOpdata struct {
	// version definition for launch application
	Version *types.Version
	// already launched taskgroups number
	LaunchedNum int
	// resource for a taskgroup
	NeedResource *types.Resource
	// why do this operation
	Reason string
}

Launch application transaction data

type TransAPIScaleOpdata

type TransAPIScaleOpdata struct {
	// version definition for application
	Version *types.Version
	// resource for a taskgroup
	NeedResource *types.Resource
	// the target count for application's taskgroups
	Instances uint64
	// scale down or up
	IsDown bool
	// already launched taskgroups number
	LaunchedNum int
}

Scale application transaction data

type TransAPIUpdateOpdata

type TransAPIUpdateOpdata struct {
	// version definition for application
	Version *types.Version
	// already updated count
	LaunchedNum int
	// the count of taskgroups to be updated
	Instances int
	// resource for one taskgroup
	NeedResource *types.Resource
	// the taskgroups to be updated
	Taskgroups []*types.TaskGroup
}

Update application transaction data

type TransRescheduleOpData

type TransRescheduleOpData struct {
	// version definition for application
	Version *types.Version
	// the taskgroup to be rescheduled
	TaskGroupID string
	// if the taskgroup cannot come to end status, do the operation or not
	Force bool
	// the operation is created by schedulder( taskgroup fail or lost ) or not
	IsInner bool
	// resource for one taskgroup
	NeedResource *types.Resource
	// host retain time
	HostRetainTime int64
	// host retain
	HostRetain string
}

Reschedule taskgroup transaction data

type Transaction

type Transaction struct {
	// transaction unique ID, created in CreateTransaction
	ID string
	// namepace
	RunAs string
	// application name
	AppID string
	// operation type: LAUNCH, DELETE, SCALE, UPDATE, RESCHEDULE ...
	OpType string
	// operation status: INIT, FINISH, FAIL, ERROR ...
	Status string
	// operation data
	OpData interface{}
	// the seconds before transaction timeout
	LifePeriod int64
	// the seconds before transaction really excute
	DelayTime int64
	// transaction create time
	CreateTime int64
}

Transaction

func CreateTransaction

func CreateTransaction() *Transaction

Create a transaction, ID, createTime will be initialized

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL