Documentation ¶
Index ¶
- func ComputeWeight(s []*dbsqlc.GetDesiredLabelsRow, l []*dbsqlc.GetWorkerLabelsRow) int
- func IsTimedout(qi *QueueItemWithOrder) bool
- func SortWorkerWeights(weights []WorkerWithWeight)
- type ExhaustedRateLimitCache
- type QueueItemWithOrder
- type RateLimit
- type RateLimitedResult
- type SchedulePlan
- func (plan *SchedulePlan) AssignQiToSlot(qi *QueueItemWithOrder, slot *Slot)
- func (plan *SchedulePlan) HandleNoSlots(qi *QueueItemWithOrder)
- func (plan *SchedulePlan) HandleRateLimited(qi *QueueItemWithOrder, key string, units int32)
- func (plan *SchedulePlan) HandleTimedOut(qi *QueueItemWithOrder)
- func (plan *SchedulePlan) HandleUnassigned(qi *QueueItemWithOrder)
- func (sp *SchedulePlan) UpdateMinQueuedIds(qi *QueueItemWithOrder) []repository.QueuedStepRun
- type Slot
- type WorkerState
- type WorkerStateManager
- type WorkerWithWeight
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ComputeWeight ¶
func ComputeWeight(s []*dbsqlc.GetDesiredLabelsRow, l []*dbsqlc.GetWorkerLabelsRow) int
func IsTimedout ¶
func IsTimedout(qi *QueueItemWithOrder) bool
func SortWorkerWeights ¶
func SortWorkerWeights(weights []WorkerWithWeight)
SortWorkerWeights sorts a slice of WorkerWithWeight in descending order of Weight
Types ¶
type ExhaustedRateLimitCache ¶
type ExhaustedRateLimitCache struct {
// contains filtered or unexported fields
}
ExhaustedRateLimitCache is a cache of rate limits to their next refill time, which avoids querying queues where we know we're already rate-limited.
func NewExhaustedRateLimitCache ¶
func NewExhaustedRateLimitCache(maxCacheDuration time.Duration) *ExhaustedRateLimitCache
NewExhaustedRateLimitCache creates a new ExhaustedRateLimitCache.
func (*ExhaustedRateLimitCache) IsExhausted ¶
func (rlc *ExhaustedRateLimitCache) IsExhausted(tenantId, queue string) bool
Get returns true if the rate limit is not exhausted, false otherwise.
type QueueItemWithOrder ¶
type RateLimit ¶
type RateLimit struct {
// contains filtered or unexported fields
}
func NewRateLimit ¶
func NewRateLimit(key string, rl *dbsqlc.ListRateLimitsForTenantWithMutateRow) *RateLimit
func (*RateLimit) AddStepRunId ¶
func (*RateLimit) NextRefill ¶
func (*RateLimit) UnitsConsumed ¶
type RateLimitedResult ¶ added in v0.47.0
type SchedulePlan ¶
type SchedulePlan struct { StepRunIds []pgtype.UUID StepRunTimeouts []string SlotIds []string WorkerIds []pgtype.UUID UnassignedStepRunIds []pgtype.UUID QueuedStepRuns []repository.QueuedStepRun TimedOutStepRuns []pgtype.UUID RateLimitedStepRuns RateLimitedResult QueuedItems []int64 ShouldContinue bool MinQueuedIds map[string]int64 RateLimitUnitsConsumed map[string]int32 // contains filtered or unexported fields }
func GeneratePlan ¶
func GeneratePlan( ctx context.Context, slots []*Slot, uniqueActionsArr []string, queueItems []*QueueItemWithOrder, stepRunRateUnits map[string]map[string]int32, currRateLimits map[string]*dbsqlc.ListRateLimitsForTenantWithMutateRow, workerLabels map[string][]*dbsqlc.GetWorkerLabelsRow, stepDesiredLabels map[string][]*dbsqlc.GetDesiredLabelsRow, ) (SchedulePlan, error)
func (*SchedulePlan) AssignQiToSlot ¶
func (plan *SchedulePlan) AssignQiToSlot(qi *QueueItemWithOrder, slot *Slot)
func (*SchedulePlan) HandleNoSlots ¶
func (plan *SchedulePlan) HandleNoSlots(qi *QueueItemWithOrder)
func (*SchedulePlan) HandleRateLimited ¶
func (plan *SchedulePlan) HandleRateLimited(qi *QueueItemWithOrder, key string, units int32)
func (*SchedulePlan) HandleTimedOut ¶
func (plan *SchedulePlan) HandleTimedOut(qi *QueueItemWithOrder)
func (*SchedulePlan) HandleUnassigned ¶
func (plan *SchedulePlan) HandleUnassigned(qi *QueueItemWithOrder)
func (*SchedulePlan) UpdateMinQueuedIds ¶
func (sp *SchedulePlan) UpdateMinQueuedIds(qi *QueueItemWithOrder) []repository.QueuedStepRun
type WorkerState ¶
type WorkerState struct {
// contains filtered or unexported fields
}
func NewWorkerState ¶
func NewWorkerState(workerId string, labels []*dbsqlc.GetWorkerLabelsRow) *WorkerState
func (*WorkerState) AddSlot ¶
func (w *WorkerState) AddSlot(slot *Slot)
func (*WorkerState) AddStepWeight ¶
func (w *WorkerState) AddStepWeight(stepId string, weight int)
func (*WorkerState) AssignSlot ¶
func (w *WorkerState) AssignSlot(qi *QueueItemWithOrder) (*Slot, bool)
type WorkerStateManager ¶
type WorkerStateManager struct {
// contains filtered or unexported fields
}
func NewWorkerStateManager ¶
func NewWorkerStateManager( slots []*Slot, workerLabels map[string][]*dbsqlc.GetWorkerLabelsRow, stepDesiredLabels map[string][]*dbsqlc.GetDesiredLabelsRow, ) *WorkerStateManager
func (*WorkerStateManager) AttemptAssignSlot ¶
func (wm *WorkerStateManager) AttemptAssignSlot(qi *QueueItemWithOrder) *Slot
func (*WorkerStateManager) DropWorker ¶
func (wm *WorkerStateManager) DropWorker(workerId string)
func (*WorkerStateManager) HasEligibleWorkers ¶
func (wm *WorkerStateManager) HasEligibleWorkers(stepId string) bool
type WorkerWithWeight ¶
WorkerWithWeight represents a worker with an associated weight
Source Files ¶
Click to show internal directories.
Click to hide internal directories.