Documentation
¶
Index ¶
- type Executor
- type Job
- type JobDebug
- type JobsOnExecutor
- type Run
- type Scheduler
- func (s *Scheduler[I]) CopyOfSetup() Setup[I]
- func (s *Scheduler[I]) CountJobs(executorID I) int
- func (s *Scheduler[I]) CountRunningJobs(executorID I) int
- func (s *Scheduler[I]) CountRunningJobsWithAMilestone(executorID I) int
- func (s *Scheduler[I]) CountStartedJobs(executorID I) int
- func (s *Scheduler[I]) DebugPrintExecutors(run Run[I], sit SituationType)
- func (s *Scheduler[I]) Executors() []Executor[I]
- func (s *Scheduler[I]) GetActiveJobIDs(exID I) map[I]I
- func (s *Scheduler[I]) GetRun(jobID I) (Run[I], bool)
- func (s *Scheduler[I]) GetRunForID(jobID I) (Run[I], error)
- func (s *Scheduler[I]) HasExecutor(exID I) bool
- func (s *Scheduler[I]) Init(setup Setup[I])
- func (s *Scheduler[I]) PrintDebugRows(w io.Writer)
- func (s *Scheduler[I]) Runs() []Run[I]
- func (s *Scheduler[I]) Start()
- func (s *Scheduler[I]) Stop()
- type Setup
- type SituationType
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Job ¶
type Job[I comparable] struct { ID I DebugName string Duration time.Duration // How long job should run for. 0 is until stopped. Cost float64 // Cost is how much of an executor's CostCapacity the job uses. // contains filtered or unexported fields }
type JobDebug ¶
type JobDebug struct { Existed time.Duration Started time.Duration Ended time.Duration Runned time.Duration Count int JobName string ExecutorName string // contains filtered or unexported fields }
JobDebug stores the duration each job is starting, running and ending. It also stores how long it has existed, i.e has also been in the scheduler. Known remembers when it was first added to the scheduler, even after removed.
type JobsOnExecutor ¶
type JobsOnExecutor[I comparable] struct { JobIDs []I ExecutorID I }
type Run ¶
type Run[I comparable] struct { Job Job[I] `zui:"flatten"` ExecutorID I `zui:"title:ExID"` Count int `zui:"allowempty"` StartedAt time.Time `zui:"allowempty"` RanAt time.Time `zui:"allowempty"` StoppedAt time.Time `zui:"allowempty"` Removing bool `zui:"allowempty"` Stopping bool `zui:"allowempty"` MilestoneAt time.Time `zui:"allowempty"` // MilestoneAt is a time a significant sub-task was achieved. See StopJobIfSinceMilestoneLessThan above. ErrorAt time.Time `zui:"allowempty"` // ErrorAt is last time an error occured on this job/run. Used to de-prioritize jobs with recent errors when starting new jobs. // contains filtered or unexported fields }
type Scheduler ¶
type Scheduler[I comparable] struct { // The channels are made in NewScheduler() StopJobCh chan I // Write a Job ID to StopJobCh to stop the job. RemoveJobCh chan I // Write a Job ID to RemoveJobCh to stop and remove the job. AddJobCh chan Job[I] // Write a Job to AddJobCh to add a job. It will be started when and how possible. ChangeJobCh chan Job[I] // Write a Job with existing ID to ChangeJobCh to change it. It will be restarted if ChangingJobRestartsIt is true. JobIsRunningCh chan I // Write a JobID to JobIsRunningCh to flag it as now running. Not needed if JobIsRunningOnSuccessfullStart true. AddExecutorCh chan Executor[I] // Write an Executor to AddExecutorCh to add an executor. Jobs will start/balance onto it as needed. RemoveExecutorCh chan I // Write an executor ID to RemoveExecutorCh to remove it. Jobs on it will immediately be stopped and restarted. ChangeExecutorCh chan Executor[I] // Wriite an executor with existing ID to ChangeExecutorCh to change it. Jobs on it will be restarted if anything but DebugName is changed. FlushExecutorJobsCh chan I // Write an executor ID to FlushExecutorJobsCh to restart all jobs on it. SetExecutorIsAliveCh chan I // Write an executor ID to SetExecutorIsAliveCh to keep it alive at least with frequency ExecutorAliveDuration. SetJobsOnExecutorCh chan JobsOnExecutor[I] // Write a list of job ids and executor id to SetJobsOnExecutorCh to update what is running on the executor. Since the executor might crash, this keeps jobs in sync. SetTotalMaxJobCountCh chan int // Change TotalMaxJobCount and refresh scheduler based on that. SetJobHasMilestoneNowCh chan I // Sets job's run's milestone to now. See StopJobIfSinceMilestoneLessThan. SetJobHasErrorCh chan I // Sets the run's ErrorAt to now Debug zmap.LockMap[I, JobDebug] // contains filtered or unexported fields }
A *Scheduler* starts *Job*s on *Executor*s, trying to balance the workload Each Job has a *Cost*, and each executor a *CostCapacity*. Jobs can have a Duration or go until stopped. The scheduler assumes jobs take a considerable time to start and end, and can get congested if too many are starting at once, so has SimultaneousStarts and MinDurationBetweenSimultaneousStarts parameters. With these constraints, priority is to start jobs as soon as possible on any executor with enough capacity. All changes to a Scheduler are done through channels
func NewScheduler ¶
func NewScheduler[I comparable]() *Scheduler[I]
func (*Scheduler[I]) CopyOfSetup ¶
func (*Scheduler[I]) CountRunningJobs ¶
func (*Scheduler[I]) CountRunningJobsWithAMilestone ¶
CountRunningJobsWithAMilestone returns the number of running jobs that have reached a milestone, even if it is long past
func (*Scheduler[I]) CountStartedJobs ¶
func (*Scheduler[I]) DebugPrintExecutors ¶
func (s *Scheduler[I]) DebugPrintExecutors(run Run[I], sit SituationType)
func (*Scheduler[I]) GetActiveJobIDs ¶
func (s *Scheduler[I]) GetActiveJobIDs(exID I) map[I]I
GetActiveJobIDs gets map of job-ids-to-executor-id of jobs that are running or started for exID or all if it's 0
func (*Scheduler[I]) GetRunForID ¶
func (*Scheduler[I]) HasExecutor ¶
func (*Scheduler[I]) PrintDebugRows ¶
type Setup ¶
type Setup[I comparable] struct { ExecutorAliveDuration time.Duration // ExecutorAliveDuration is how often an executor needs to say it's alive to be considered operatable. 0 means always alive. SimultaneousStarts int // SimultaneousStarts is how many jobs can start while anotherone is starting and hasn't reached rnuning state yet. See also MinDurationBetweenSimultaneousStarts. MinDurationBetweenSimultaneousStarts time.Duration // MinDurationBetweenSimultaneousStarts is how long to wait to do next start if SimultaneousStarts > 1. LoadBalanceIfCostDifference float64 // If LoadBalanceIfCostDifference > 0, once all jobs are running, switch job to an executor with more capacity left if difference > this. KeepJobsBeyondAtEndUntilEnoughSlack time.Duration // If KeepJobsBeyondAtEndUntilEnoughSlack > 0, a job isn't stopped at Duration end if there's other jobs not in run state yet, yet are stopped if they go beyond this duration extra. SlowStartJobFuncTimeout time.Duration // SlowStartJobFuncTimeout is how long starting a job with StartJobOnExecutorFunc can go until timeout. SlowStopJobFuncTimeout time.Duration // SlowStopJobFuncTimeout is like SlowStartJobFuncTimeout/StopJobOnExecutorFunc but for stopping. TotalMaxJobCount int // The scheduler wont start another job if active jobs >= TotalMaxJobCount. JobIsRunningOnSuccessfullStart bool // Set JobIsRunningOnSuccessfullStart to set a job as running once its start function completes successfully. Otherwise use the JobIsRunningCh channel. ChangingJobRestartsIt bool // If ChangingJobRestartsIt is set, jobs are restarted when changed with ChangeExecutorCh. GracePeriodForJobsOnExecutorCh time.Duration // GracePeriodForJobsOnExecutorCh is amount of slack from start to not stop jobs not reported in executor yet. StartJobOnExecutorFunc func(run Run[I], ctx context.Context) error `zui:"-"` // StartJobOnExecutorFunc is called to start a job. It is done on a goroutine and is assumed to take a while or time out. StopJobOnExecutorFunc func(run Run[I], ctx context.Context) error `zui:"-"` // Like StartJobOnExecutorFunc but for stopping. HandleSituationFastFunc func(run Run[I], s SituationType, details string) `zui:"-"` // This function is for handling start/stop/errors and more. Must very quickly do something or spawn a go routine StopJobIfSinceMilestoneLessThan time.Duration // Only stop job if StopJobIfSinceMilestoneLessThan != 0, and time since run.MilestoneAt is less than it, up to KeepJobsBeyondAtEndUntilEnoughSlack (which also must be set) MinimumTimeBeforeRestartingErroredJob time.Duration }
func DefaultSetup ¶
func DefaultSetup[I comparable]() Setup[I]
type SituationType ¶
type SituationType string
const ( NoWorkersToRunJob SituationType = "no workers fit to run job" RemoveJobFromExecutorFailed SituationType = "remove job from exector failed" ErrorStartingJob SituationType = "error starting job" ExecutorHasExpired SituationType = "executor is no longer alive" MaximumJobsReached SituationType = "maximum jobs reached" SchedulerFinishedStopping SituationType = "scheduler finished stopping" JobStarted SituationType = "job started" JobRunning SituationType = "job running" JobStopped SituationType = "job stopped" JobEnded SituationType = "job ended" )