Documentation ¶
Overview ¶
Package unit provides a "work unit" scheduling system for handling data sets that traverse multiple workers / goroutines. The aim is to bind priority to a data set instead of a goroutine and split resources fairly among requests.
Every "work" Unit is assigned an ever increasing ID and can be marked as "paused" or "high priority". The Scheduler always gives a clearance up to a certain ID. All units below this ID may be processed. High priority Units may always be processed.
The Scheduler works with short slots and measures how many Units were finished in a slot. The "slot pace" holds an indication of the current Unit finishing speed per slot. It is only changed slowly (but boosts if too far away) in order to keep stabilize the system. The Scheduler then calculates the next unit ID limit to give clearance to for the next slot:
"finished units" + "slot pace" + "paused units" - "fraction of high priority units"
Index ¶
- type Scheduler
- func (s *Scheduler) DebugUnit(u *Unit, unitSource string)
- func (s *Scheduler) GetAvgCatchUpSlotDuration() int64
- func (s *Scheduler) GetAvgSlotPace() int64
- func (s *Scheduler) GetAvgUnitLife() int64
- func (s *Scheduler) GetAvgWorkSlotDuration() int64
- func (s *Scheduler) GetMaxLeveledSlotPace() int64
- func (s *Scheduler) GetMaxSlotPace() int64
- func (s *Scheduler) NewUnit() *Unit
- func (s *Scheduler) SlotScheduler(ctx context.Context) error
- func (s *Scheduler) StartDebugLog()
- func (s *Scheduler) Stop()
- type SchedulerConfig
- type Unit
- type UnitDebugData
- type UnitDebugger
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler creates and schedules units. Must be created using NewScheduler().
func NewScheduler ¶
func NewScheduler(config *SchedulerConfig) *Scheduler
NewScheduler returns a new scheduler.
func (*Scheduler) DebugUnit ¶
DebugUnit registers the given unit for debug output with the given source. Additional calls on the same unit update the unit source. StartDebugLog() must be called before calling DebugUnit().
func (*Scheduler) GetAvgCatchUpSlotDuration ¶
GetAvgCatchUpSlotDuration returns the current average catch up slot duration.
func (*Scheduler) GetAvgSlotPace ¶
GetAvgSlotPace returns the current average slot pace.
func (*Scheduler) GetAvgUnitLife ¶
GetAvgUnitLife returns the current average unit lifetime until it is finished.
func (*Scheduler) GetAvgWorkSlotDuration ¶
GetAvgWorkSlotDuration returns the current average work slot duration.
func (*Scheduler) GetMaxLeveledSlotPace ¶
GetMaxLeveledSlotPace returns the current maximum leveled slot pace.
func (*Scheduler) GetMaxSlotPace ¶
GetMaxSlotPace returns the current maximum slot pace.
func (*Scheduler) SlotScheduler ¶
SlotScheduler manages the slot and schedules units. Must only be started once.
func (*Scheduler) StartDebugLog ¶
func (s *Scheduler) StartDebugLog()
StartDebugLog logs the scheduler state every second.
type SchedulerConfig ¶
type SchedulerConfig struct { // SlotDuration defines the duration of one slot. SlotDuration time.Duration // MinSlotPace defines the minimum slot pace. // The slot pace will never fall below this value. MinSlotPace int64 // WorkSlotPercentage defines the how much of a slot should be scheduled with work. // The remainder is for catching up and breathing room for other tasks. // Must be between 55% (0.55) and 95% (0.95). // The default value is 0.7 (70%). WorkSlotPercentage float64 // SlotChangeRatePerStreak defines how many percent (0-1) the slot pace // should change per streak. // Is enforced to be able to change the minimum slot pace by at least 1. // The default value is 0.02 (2%). SlotChangeRatePerStreak float64 // StatCycleDuration defines how often stats are calculated. // The default value is 1 minute. StatCycleDuration time.Duration }
SchedulerConfig holds scheduler configuration.
type Unit ¶
type Unit struct {
// contains filtered or unexported fields
}
Unit describes a "work unit" and is meant to be embedded into another struct used for passing data moving through multiple processing steps.
func (*Unit) Finish ¶
func (u *Unit) Finish()
Finish signals the unit scheduler that this unit has finished processing. Will no-op if called on a nil Unit.
func (*Unit) IsHighPriority ¶
IsHighPriority returns whether the unit has high priority.
func (*Unit) MakeHighPriority ¶
func (u *Unit) MakeHighPriority()
MakeHighPriority marks the unit as high priority.
func (*Unit) ReUse ¶
func (u *Unit) ReUse()
ReUse re-initialized the unit to be able to reuse already allocated structs.
func (*Unit) RemovePriority ¶
func (u *Unit) RemovePriority()
RemovePriority removes the high priority mark.
func (*Unit) WaitForSlot ¶
func (u *Unit) WaitForSlot()
WaitForSlot blocks until the unit may be processed.
type UnitDebugData ¶
type UnitDebugData struct {
// contains filtered or unexported fields
}
UnitDebugData represents a unit that is being debugged.
type UnitDebugger ¶
type UnitDebugger struct {
// contains filtered or unexported fields
}
UnitDebugger is used to debug unit leaks.