unit

package
v1.6.28 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 2, 2024 License: GPL-3.0 Imports: 8 Imported by: 0

Documentation

Overview

Package unit provides a "work unit" scheduling system for handling data sets that traverse multiple workers / goroutines. The aim is to bind priority to a data set instead of a goroutine and split resources fairly among requests.

Every "work" Unit is assigned an ever increasing ID and can be marked as "paused" or "high priority". The Scheduler always gives a clearance up to a certain ID. All units below this ID may be processed. High priority Units may always be processed.

The Scheduler works with short slots and measures how many Units were finished in a slot. The "slot pace" holds an indication of the current Unit finishing speed per slot. It is only changed slowly (but boosts if too far away) in order to keep stabilize the system. The Scheduler then calculates the next unit ID limit to give clearance to for the next slot:

"finished units" + "slot pace" + "paused units" - "fraction of high priority units"

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler creates and schedules units. Must be created using NewScheduler().

func NewScheduler

func NewScheduler(config *SchedulerConfig) *Scheduler

NewScheduler returns a new scheduler.

func (*Scheduler) DebugUnit

func (s *Scheduler) DebugUnit(u *Unit, unitSource string)

DebugUnit registers the given unit for debug output with the given source. Additional calls on the same unit update the unit source. StartDebugLog() must be called before calling DebugUnit().

func (*Scheduler) GetAvgCatchUpSlotDuration

func (s *Scheduler) GetAvgCatchUpSlotDuration() int64

GetAvgCatchUpSlotDuration returns the current average catch up slot duration.

func (*Scheduler) GetAvgSlotPace

func (s *Scheduler) GetAvgSlotPace() int64

GetAvgSlotPace returns the current average slot pace.

func (*Scheduler) GetAvgUnitLife

func (s *Scheduler) GetAvgUnitLife() int64

GetAvgUnitLife returns the current average unit lifetime until it is finished.

func (*Scheduler) GetAvgWorkSlotDuration

func (s *Scheduler) GetAvgWorkSlotDuration() int64

GetAvgWorkSlotDuration returns the current average work slot duration.

func (*Scheduler) GetMaxLeveledSlotPace

func (s *Scheduler) GetMaxLeveledSlotPace() int64

GetMaxLeveledSlotPace returns the current maximum leveled slot pace.

func (*Scheduler) GetMaxSlotPace

func (s *Scheduler) GetMaxSlotPace() int64

GetMaxSlotPace returns the current maximum slot pace.

func (*Scheduler) NewUnit

func (s *Scheduler) NewUnit() *Unit

NewUnit returns a new unit within the scheduler.

func (*Scheduler) SlotScheduler

func (s *Scheduler) SlotScheduler(ctx *mgr.WorkerCtx) error

SlotScheduler manages the slot and schedules units. Must only be started once.

func (*Scheduler) StartDebugLog

func (s *Scheduler) StartDebugLog()

StartDebugLog logs the scheduler state every second.

func (*Scheduler) Stop

func (s *Scheduler) Stop()

Stop stops the scheduler and gives clearance to all units.

type SchedulerConfig

type SchedulerConfig struct {
	// SlotDuration defines the duration of one slot.
	SlotDuration time.Duration

	// MinSlotPace defines the minimum slot pace.
	// The slot pace will never fall below this value.
	MinSlotPace int64

	// WorkSlotPercentage defines the how much of a slot should be scheduled with work.
	// The remainder is for catching up and breathing room for other tasks.
	// Must be between 55% (0.55) and 95% (0.95).
	// The default value is 0.7 (70%).
	WorkSlotPercentage float64

	// SlotChangeRatePerStreak defines how many percent (0-1) the slot pace
	// should change per streak.
	// Is enforced to be able to change the minimum slot pace by at least 1.
	// The default value is 0.02 (2%).
	SlotChangeRatePerStreak float64

	// StatCycleDuration defines how often stats are calculated.
	// The default value is 1 minute.
	StatCycleDuration time.Duration
}

SchedulerConfig holds scheduler configuration.

type Unit

type Unit struct {
	// contains filtered or unexported fields
}

Unit describes a "work unit" and is meant to be embedded into another struct used for passing data moving through multiple processing steps.

func (*Unit) Finish

func (u *Unit) Finish()

Finish signals the unit scheduler that this unit has finished processing. Will no-op if called on a nil Unit.

func (*Unit) IsHighPriority

func (u *Unit) IsHighPriority() bool

IsHighPriority returns whether the unit has high priority.

func (*Unit) MakeHighPriority

func (u *Unit) MakeHighPriority()

MakeHighPriority marks the unit as high priority.

func (*Unit) ReUse

func (u *Unit) ReUse()

ReUse re-initialized the unit to be able to reuse already allocated structs.

func (*Unit) RemovePriority

func (u *Unit) RemovePriority()

RemovePriority removes the high priority mark.

func (*Unit) WaitForSlot

func (u *Unit) WaitForSlot()

WaitForSlot blocks until the unit may be processed.

type UnitDebugData

type UnitDebugData struct {
	// contains filtered or unexported fields
}

UnitDebugData represents a unit that is being debugged.

type UnitDebugger

type UnitDebugger struct {
	// contains filtered or unexported fields
}

UnitDebugger is used to debug unit leaks.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL