zscheduler

package
v0.0.0-...-aa698f8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 4, 2025 License: BSD-3-Clause Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Executor

type Executor[I comparable] struct {
	ID           I
	Paused       bool
	CostCapacity float64
	KeptAliveAt  time.Time
	DebugName    string
	SettingsHash int64 // other settings for executor, if changed cause restart
	// contains filtered or unexported fields
}

type Job

type Job[I comparable] struct {
	ID        I
	DebugName string
	Duration  time.Duration // How long job should run for. 0 is until stopped.
	Cost      float64       // Cost is how much of an executor's CostCapacity the job uses.
	// contains filtered or unexported fields
}

type JobDebug

type JobDebug struct {
	Existed time.Duration
	Started time.Duration
	Ended   time.Duration
	Runned  time.Duration

	Count        int
	JobName      string
	ExecutorName string
	// contains filtered or unexported fields
}

JobDebug stores the duration each job is starting, running and ending. It also stores how long it has existed, i.e has also been in the scheduler. Known remembers when it was first added to the scheduler, even after removed.

type JobsOnExecutor

type JobsOnExecutor[I comparable] struct {
	JobIDs     []I
	ExecutorID I
}

type Run

type Run[I comparable] struct {
	Job         Job[I]    `zui:"flatten"`
	ExecutorID  I         `zui:"title:ExID"`
	Count       int       `zui:"allowempty"`
	StartedAt   time.Time `zui:"allowempty"`
	RanAt       time.Time `zui:"allowempty"`
	StoppedAt   time.Time `zui:"allowempty"`
	Removing    bool      `zui:"allowempty"`
	Stopping    bool      `zui:"allowempty"`
	MilestoneAt time.Time `zui:"allowempty"` // MilestoneAt is a time a significant sub-task was achieved. See StopJobIfSinceMilestoneLessThan above.
	ErrorAt     time.Time `zui:"allowempty"` // ErrorAt is last time an error occured on this job/run. Used to de-prioritize jobs with recent errors when starting new jobs.
	// contains filtered or unexported fields
}

func (*Run[I]) Progress

func (r *Run[I]) Progress() float64

type Scheduler

type Scheduler[I comparable] struct {
	// The channels are made in NewScheduler()
	StopJobCh               chan I                 // Write a Job ID to StopJobCh to stop the job.
	RemoveJobCh             chan I                 // Write a Job ID to RemoveJobCh to stop and remove the job.
	AddJobCh                chan Job[I]            // Write a Job to AddJobCh to add a job. It will be started when and how possible.
	ChangeJobCh             chan Job[I]            // Write  a Job with existing ID to ChangeJobCh to change it. It will be restarted if ChangingJobRestartsIt is true.
	JobIsRunningCh          chan I                 // Write a JobID to JobIsRunningCh to flag it as now running. Not needed if JobIsRunningOnSuccessfullStart true.
	AddExecutorCh           chan Executor[I]       // Write an Executor to AddExecutorCh to add an executor. Jobs will start/balance onto it as needed.
	RemoveExecutorCh        chan I                 //  Write an executor ID to RemoveExecutorCh to remove it. Jobs on it will immediately be stopped and restarted.
	ChangeExecutorCh        chan Executor[I]       // Wriite an executor with existing ID to ChangeExecutorCh to change it. Jobs on it will be restarted if anything but DebugName is changed.
	FlushExecutorJobsCh     chan I                 // Write an executor ID to FlushExecutorJobsCh to restart all jobs on it.
	SetExecutorIsAliveCh    chan I                 // Write an executor ID to SetExecutorIsAliveCh to keep it alive at least with frequency ExecutorAliveDuration.
	SetJobsOnExecutorCh     chan JobsOnExecutor[I] // Write a list of job ids and executor id to SetJobsOnExecutorCh to update what is running on the executor. Since the executor might crash, this keeps jobs in sync.
	SetTotalMaxJobCountCh   chan int               // Change TotalMaxJobCount and refresh scheduler based on that.
	SetJobHasMilestoneNowCh chan I                 // Sets job's run's milestone to now. See StopJobIfSinceMilestoneLessThan.
	SetJobHasErrorCh        chan I                 // Sets the run's ErrorAt to now

	Debug zmap.LockMap[I, JobDebug]
	// contains filtered or unexported fields
}

A *Scheduler* starts *Job*s on *Executor*s, trying to balance the workload Each Job has a *Cost*, and each executor a *CostCapacity*. Jobs can have a Duration or go until stopped. The scheduler assumes jobs take a considerable time to start and end, and can get congested if too many are starting at once, so has SimultaneousStarts and MinDurationBetweenSimultaneousStarts parameters. With these constraints, priority is to start jobs as soon as possible on any executor with enough capacity. All changes to a Scheduler are done through channels

func NewScheduler

func NewScheduler[I comparable]() *Scheduler[I]

func (*Scheduler[I]) CopyOfSetup

func (s *Scheduler[I]) CopyOfSetup() Setup[I]

func (*Scheduler[I]) CountJobs

func (s *Scheduler[I]) CountJobs(executorID I) int

func (*Scheduler[I]) CountRunningJobs

func (s *Scheduler[I]) CountRunningJobs(executorID I) int

func (*Scheduler[I]) CountRunningJobsWithAMilestone

func (s *Scheduler[I]) CountRunningJobsWithAMilestone(executorID I) int

CountRunningJobsWithAMilestone returns the number of running jobs that have reached a milestone, even if it is long past

func (*Scheduler[I]) CountStartedJobs

func (s *Scheduler[I]) CountStartedJobs(executorID I) int

func (*Scheduler[I]) DebugPrintExecutors

func (s *Scheduler[I]) DebugPrintExecutors(run Run[I], sit SituationType)

func (*Scheduler[I]) Executors

func (s *Scheduler[I]) Executors() []Executor[I]

func (*Scheduler[I]) GetActiveJobIDs

func (s *Scheduler[I]) GetActiveJobIDs(exID I) map[I]I

GetActiveJobIDs gets map of job-ids-to-executor-id of jobs that are running or started for exID or all if it's 0

func (*Scheduler[I]) GetRun

func (s *Scheduler[I]) GetRun(jobID I) (Run[I], bool)

func (*Scheduler[I]) GetRunForID

func (s *Scheduler[I]) GetRunForID(jobID I) (Run[I], error)

func (*Scheduler[I]) HasExecutor

func (s *Scheduler[I]) HasExecutor(exID I) bool

func (*Scheduler[I]) Init

func (s *Scheduler[I]) Init(setup Setup[I])

func (*Scheduler[I]) PrintDebugRows

func (s *Scheduler[I]) PrintDebugRows(w io.Writer)

func (*Scheduler[I]) Runs

func (s *Scheduler[I]) Runs() []Run[I]

func (*Scheduler[I]) Start

func (s *Scheduler[I]) Start()

func (*Scheduler[I]) Stop

func (s *Scheduler[I]) Stop()

type Setup

type Setup[I comparable] struct {
	ExecutorAliveDuration                 time.Duration                                     // ExecutorAliveDuration is how often an executor needs to say it's alive to be considered operatable. 0 means always alive.
	SimultaneousStarts                    int                                               // SimultaneousStarts is how many jobs can start while anotherone is starting and hasn't reached rnuning state yet. See also MinDurationBetweenSimultaneousStarts.
	MinDurationBetweenSimultaneousStarts  time.Duration                                     // MinDurationBetweenSimultaneousStarts is how long to wait to do next start if SimultaneousStarts > 1.
	LoadBalanceIfCostDifference           float64                                           // If LoadBalanceIfCostDifference > 0, once all jobs are running, switch job to an executor with more capacity left if difference > this.
	KeepJobsBeyondAtEndUntilEnoughSlack   time.Duration                                     // If KeepJobsBeyondAtEndUntilEnoughSlack > 0, a job isn't stopped at Duration end if there's other jobs not in run state yet, yet are stopped if they go beyond this duration extra.
	SlowStartJobFuncTimeout               time.Duration                                     // SlowStartJobFuncTimeout is how long starting a job with StartJobOnExecutorFunc can go until timeout.
	SlowStopJobFuncTimeout                time.Duration                                     // SlowStopJobFuncTimeout is like SlowStartJobFuncTimeout/StopJobOnExecutorFunc but for stopping.
	TotalMaxJobCount                      int                                               // The scheduler wont start another job if active jobs >= TotalMaxJobCount.
	JobIsRunningOnSuccessfullStart        bool                                              // Set JobIsRunningOnSuccessfullStart to set a job as running once its start function completes successfully. Otherwise use the JobIsRunningCh channel.
	ChangingJobRestartsIt                 bool                                              // If ChangingJobRestartsIt is set, jobs are restarted when changed with ChangeExecutorCh.
	GracePeriodForJobsOnExecutorCh        time.Duration                                     // GracePeriodForJobsOnExecutorCh is amount of slack from start to not stop jobs not reported in executor yet.
	StartJobOnExecutorFunc                func(run Run[I], ctx context.Context) error       `zui:"-"` // StartJobOnExecutorFunc is called to start a job. It is done on a goroutine and is assumed to take a while or time out.
	StopJobOnExecutorFunc                 func(run Run[I], ctx context.Context) error       `zui:"-"` // Like StartJobOnExecutorFunc but for stopping.
	HandleSituationFastFunc               func(run Run[I], s SituationType, details string) `zui:"-"` // This function is for handling start/stop/errors and more. Must very quickly do something or spawn a go routine
	StopJobIfSinceMilestoneLessThan       time.Duration                                     // Only stop job if StopJobIfSinceMilestoneLessThan != 0, and time since run.MilestoneAt is less than it, up to KeepJobsBeyondAtEndUntilEnoughSlack (which also must be set)
	MinimumTimeBeforeRestartingErroredJob time.Duration
}

func DefaultSetup

func DefaultSetup[I comparable]() Setup[I]

type SituationType

type SituationType string
const (
	NoWorkersToRunJob           SituationType = "no workers fit to run job"
	RemoveJobFromExecutorFailed SituationType = "remove job from exector failed"
	ErrorStartingJob            SituationType = "error starting job"
	ExecutorHasExpired          SituationType = "executor is no longer alive"
	MaximumJobsReached          SituationType = "maximum jobs reached"
	SchedulerFinishedStopping   SituationType = "scheduler finished stopping"
	JobStarted                  SituationType = "job started"
	JobRunning                  SituationType = "job running"
	JobStopped                  SituationType = "job stopped"
	JobEnded                    SituationType = "job ended"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL