scheduling

package
v0.44.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 9, 2024 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IsTimedout

func IsTimedout(qi *QueueItemWithOrder) bool

func SortWorkerWeights

func SortWorkerWeights(weights []WorkerWithWeight)

SortWorkerWeights sorts a slice of WorkerWithWeight in descending order of Weight

Types

type ExhaustedRateLimitCache

type ExhaustedRateLimitCache struct {
	// contains filtered or unexported fields
}

ExhaustedRateLimitCache is a cache of rate limits to their next refill time, which avoids querying queues where we know we're already rate-limited.

func NewExhaustedRateLimitCache

func NewExhaustedRateLimitCache(maxCacheDuration time.Duration) *ExhaustedRateLimitCache

NewExhaustedRateLimitCache creates a new ExhaustedRateLimitCache.

func (*ExhaustedRateLimitCache) IsExhausted

func (rlc *ExhaustedRateLimitCache) IsExhausted(tenantId, queue string) bool

Get returns true if the rate limit is not exhausted, false otherwise.

func (*ExhaustedRateLimitCache) Set

func (rlc *ExhaustedRateLimitCache) Set(tenantId, queue string, exhaustedRateLimitRefillTimes []time.Time)

type QueueItemWithOrder

type QueueItemWithOrder struct {
	*dbsqlc.QueueItem

	Order int
}

type RateLimit

type RateLimit struct {
	// contains filtered or unexported fields
}

func NewRateLimit

func NewRateLimit(key string, rl *dbsqlc.ListRateLimitsForTenantRow) *RateLimit

func (*RateLimit) AddStepRunId

func (rl *RateLimit) AddStepRunId(stepRunId string, units int32) bool

func (*RateLimit) Key

func (rl *RateLimit) Key() string

func (*RateLimit) NextRefill

func (rl *RateLimit) NextRefill() time.Time

func (*RateLimit) Rollback

func (rl *RateLimit) Rollback(stepRunId string)

func (*RateLimit) UnitsConsumed

func (rl *RateLimit) UnitsConsumed() int32

type SchedulePlan

type SchedulePlan struct {
	StepRunIds             []pgtype.UUID
	StepRunTimeouts        []string
	SlotIds                []string
	WorkerIds              []pgtype.UUID
	UnassignedStepRunIds   []pgtype.UUID
	QueuedStepRuns         []repository.QueuedStepRun
	TimedOutStepRuns       []pgtype.UUID
	RateLimitedStepRuns    []pgtype.UUID
	RateLimitedQueues      map[string][]time.Time
	QueuedItems            []int64
	ShouldContinue         bool
	MinQueuedIds           map[string]int64
	RateLimitUnitsConsumed map[string]int32
	// contains filtered or unexported fields
}

func GeneratePlan

func GeneratePlan(
	ctx context.Context,
	slots []*Slot,
	uniqueActionsArr []string,
	queueItems []*QueueItemWithOrder,
	stepRateUnits map[string]map[string]int32,
	currRateLimits map[string]*dbsqlc.ListRateLimitsForTenantRow,
	workerLabels map[string][]*dbsqlc.GetWorkerLabelsRow,
	stepDesiredLabels map[string][]*dbsqlc.GetDesiredLabelsRow,
) (SchedulePlan, error)

func (*SchedulePlan) AssignQiToSlot

func (plan *SchedulePlan) AssignQiToSlot(qi *QueueItemWithOrder, slot *Slot)

func (*SchedulePlan) HandleNoSlots

func (plan *SchedulePlan) HandleNoSlots(qi *QueueItemWithOrder)

func (*SchedulePlan) HandleRateLimited

func (plan *SchedulePlan) HandleRateLimited(qi *QueueItemWithOrder)

func (*SchedulePlan) HandleTimedOut

func (plan *SchedulePlan) HandleTimedOut(qi *QueueItemWithOrder)

func (*SchedulePlan) HandleUnassigned

func (plan *SchedulePlan) HandleUnassigned(qi *QueueItemWithOrder)

func (*SchedulePlan) UpdateMinQueuedIds

func (sp *SchedulePlan) UpdateMinQueuedIds(qi *QueueItemWithOrder) []repository.QueuedStepRun

type Slot added in v0.44.0

type Slot struct {
	ID           string
	WorkerId     string
	DispatcherId string
	ActionId     string
}

type WorkerState

type WorkerState struct {
	// contains filtered or unexported fields
}

func NewWorkerState

func NewWorkerState(workerId string, labels []*dbsqlc.GetWorkerLabelsRow) *WorkerState

func (*WorkerState) AddSlot

func (w *WorkerState) AddSlot(slot *Slot)

func (*WorkerState) AddStepWeight

func (w *WorkerState) AddStepWeight(stepId string, weight int)

func (*WorkerState) AssignSlot

func (w *WorkerState) AssignSlot(qi *QueueItemWithOrder) (*Slot, bool)

func (*WorkerState) CanAssign

func (w *WorkerState) CanAssign(action string, stepId *string) bool

type WorkerStateManager

type WorkerStateManager struct {
	// contains filtered or unexported fields
}

func NewWorkerStateManager

func NewWorkerStateManager(
	slots []*Slot,
	workerLabels map[string][]*dbsqlc.GetWorkerLabelsRow,
	stepDesiredLabels map[string][]*dbsqlc.GetDesiredLabelsRow,
) *WorkerStateManager

func (*WorkerStateManager) AttemptAssignSlot

func (wm *WorkerStateManager) AttemptAssignSlot(qi *QueueItemWithOrder) *Slot

func (*WorkerStateManager) DropWorker

func (wm *WorkerStateManager) DropWorker(workerId string)

func (*WorkerStateManager) HasEligibleWorkers

func (wm *WorkerStateManager) HasEligibleWorkers(stepId string) bool

type WorkerWithWeight

type WorkerWithWeight struct {
	WorkerId string
	Weight   int
}

WorkerWithWeight represents a worker with an associated weight

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL