scheduler

package
v1.6.2-rc1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 28, 2025 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	AttrJobType       = "job_type"
	AttrEvalType      = "eval_type"
	AttrSchedulerType = "scheduler_type"

	AttrOperationPartGetJob      = "get_job"
	AttrOperationPartGetExecs    = "get_executions"
	AttrOperationPartGetNodes    = "get_node_infos"
	AttrOperationPartMatchNodes  = "match_nodes"
	AttrOperationPartProcessPlan = "process_plan"

	AttrOutcomeKey              = attribute.Key("outcome")
	AttrOutcomeSuccess          = "success"
	AttrOutcomeFailure          = "failure"
	AttrOutcomeAlreadyTerminal  = "already_terminal"
	AttrOutcomeExhaustedRetries = "exhausted_retries"
	AttrOutcomeQueueing         = "queueing"
	AttrOutcomeTimeout          = "timeout"
	AttrOutcomeQueueTimeout     = "queue_timeout"
)

Common attribute keys

Variables

View Source
var (
	Meter = otel.GetMeterProvider().Meter("scheduler")
)

Functions

This section is empty.

Types

type BatchRateLimiter added in v1.6.2

type BatchRateLimiter struct {
	// contains filtered or unexported fields
}

BatchRateLimiter limits the number of executions that can be created in a single evaluation and creates delayed evaluations for remaining executions

func NewBatchRateLimiter added in v1.6.2

func NewBatchRateLimiter(params BatchRateLimiterParams) *BatchRateLimiter

func (*BatchRateLimiter) Apply added in v1.6.2

func (b *BatchRateLimiter) Apply(ctx context.Context, plan *models.Plan, totalNeeded int) int

type BatchRateLimiterParams added in v1.6.2

type BatchRateLimiterParams struct {
	// MaxExecutionsPerEval limits the number of new executions that can be created in a single evaluation
	MaxExecutionsPerEval int
	// ExecutionLimitBackoff is the duration to wait before creating a new evaluation when hitting execution limits
	ExecutionLimitBackoff time.Duration
	// Clock is used for time-based operations. If not provided, system clock is used.
	Clock clock.Clock
}

type BatchServiceJobScheduler added in v1.1.0

type BatchServiceJobScheduler struct {
	// contains filtered or unexported fields
}

BatchServiceJobScheduler handles scheduling of batch and service jobs, with support for partitioned execution. Each job can specify N parallel executions via its Count field, where each execution is assigned a unique partition index from 0 to Count-1.

Partitioning Logic: - The job.Count field determines the number of partitions (N) - Each execution is assigned a unique PartitionIndex (0 to Count-1) - The scheduler ensures exactly one active execution per partition - When an execution fails, its partition becomes available for retry

Execution Lifecycle Per Partition: 1. Initial scheduling: assign execution to an available node with PartitionIndex 2. If execution fails: partition becomes available for retry on another node 3. If execution succeeds:

  • Batch jobs: partition is marked complete, no more executions
  • Service jobs: partition must maintain one running execution

Example with Count = 3: - Creates three partitions (0, 1, 2) - Each partition runs independently - If partition 1 fails, it can be retried while 0 and 2 continue running - Batch job completes when all three partitions have successful executions - Service job runs continuously with one execution per partition

func NewBatchServiceJobScheduler added in v1.1.0

func NewBatchServiceJobScheduler(params BatchServiceJobSchedulerParams) *BatchServiceJobScheduler

func (*BatchServiceJobScheduler) Process added in v1.1.0

func (b *BatchServiceJobScheduler) Process(ctx context.Context, evaluation *models.Evaluation) (err error)

Process handles the scheduling logic for partitioned jobs: 1. Tracks existing executions per partition 2. Handles failed executions and retries while maintaining partition assignment 3. Ensures each partition (0 to job.Count-1) has exactly one active execution 4. For batch jobs: monitors completion of all partitions 5. For service jobs: maintains continuous execution per partition

type BatchServiceJobSchedulerParams added in v1.1.0

type BatchServiceJobSchedulerParams struct {
	JobStore      jobstore.Store
	Planner       orchestrator.Planner
	NodeSelector  orchestrator.NodeSelector
	RetryStrategy orchestrator.RetryStrategy
	QueueBackoff  time.Duration
	// RateLimiter controls the rate at which new executions are created
	// If not provided, a NoopRateLimiter is used
	RateLimiter ExecutionRateLimiter
	// Clock is the clock used for time-based operations.
	// If not provided, the system clock is used.
	Clock clock.Clock
}

type DaemonJobScheduler added in v1.1.0

type DaemonJobScheduler struct {
	// contains filtered or unexported fields
}

func NewDaemonJobScheduler added in v1.1.0

func NewDaemonJobScheduler(params DaemonJobSchedulerParams) *DaemonJobScheduler

func (*DaemonJobScheduler) Process added in v1.1.0

func (b *DaemonJobScheduler) Process(ctx context.Context, evaluation *models.Evaluation) (err error)

type DaemonJobSchedulerParams added in v1.1.0

type DaemonJobSchedulerParams struct {
	JobStore     jobstore.Store
	Planner      orchestrator.Planner
	NodeSelector orchestrator.NodeSelector
	// RateLimiter controls the rate at which new executions are created
	// If not provided, a NoopRateLimiter is used
	RateLimiter ExecutionRateLimiter
}

DaemonJobScheduler is a scheduler for batch jobs that run until completion

type ExecutionRateLimiter added in v1.6.2

type ExecutionRateLimiter interface {
	// Apply checks if the number of executions exceeds the limit and optionally creates
	// a delayed evaluation for remaining executions. Returns the number of executions
	// that should be created in this evaluation.
	Apply(ctx context.Context, plan *models.Plan, totalNeeded int) int
}

ExecutionRateLimiter provides rate limiting functionality for job executions

type NoopRateLimiter added in v1.6.2

type NoopRateLimiter struct{}

NoopRateLimiter is a rate limiter that does not impose any limits

func NewNoopRateLimiter added in v1.6.2

func NewNoopRateLimiter() *NoopRateLimiter

func (*NoopRateLimiter) Apply added in v1.6.2

func (n *NoopRateLimiter) Apply(ctx context.Context, plan *models.Plan, totalNeeded int) int

type OpsJobScheduler

type OpsJobScheduler struct {
	// contains filtered or unexported fields
}

func NewOpsJobScheduler

func NewOpsJobScheduler(params OpsJobSchedulerParams) *OpsJobScheduler

func (*OpsJobScheduler) Process

func (b *OpsJobScheduler) Process(ctx context.Context, evaluation *models.Evaluation) (err error)

type OpsJobSchedulerParams

type OpsJobSchedulerParams struct {
	JobStore     jobstore.Store
	Planner      orchestrator.Planner
	NodeSelector orchestrator.NodeSelector
	// RateLimiter controls the rate at which new executions are created
	// If not provided, a NoopRateLimiter is used
	RateLimiter ExecutionRateLimiter
	// Clock is the clock used for time-based operations.
	// If not provided, the system clock is used.
	Clock clock.Clock
}

OpsJobScheduler is a scheduler for batch jobs that run until completion

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL