scheduler

package
v0.0.0-...-b8de061 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2024 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var EvictionReason = map[ttlcache.EvictionReason]string{
	ttlcache.EvictionReasonDeleted:         "deleted",
	ttlcache.EvictionReasonCapacityReached: "capacity reached",
	ttlcache.EvictionReasonExpired:         "ttl expired",
}

Functions

This section is empty.

Types

type FIFOQueue

type FIFOQueue[T LifecycleObject] struct {
	// contains filtered or unexported fields
}

func NewFIFOQueue

func NewFIFOQueue[T LifecycleObject](capacity uint64) *FIFOQueue[T]

func (*FIFOQueue[T]) Has

func (q *FIFOQueue[T]) Has(key string) bool

func (*FIFOQueue[T]) IsEmpty

func (q *FIFOQueue[T]) IsEmpty() bool

func (*FIFOQueue[T]) IsFull

func (q *FIFOQueue[T]) IsFull() bool

func (*FIFOQueue[T]) Len

func (q *FIFOQueue[T]) Len() int

func (*FIFOQueue[T]) Pop

func (q *FIFOQueue[T]) Pop() (Task[T], bool)

func (*FIFOQueue[T]) Print

func (q *FIFOQueue[T]) Print() string

func (*FIFOQueue[T]) Push

func (q *FIFOQueue[T]) Push(item Task[T]) bool

func (*FIFOQueue[T]) TryPush

func (q *FIFOQueue[T]) TryPush(item Task[T]) bool

type JobType

type JobType string
const (
	ScanJob    JobType = "scan"
	InstallJob JobType = "install"
)

type LifecycleObject

type LifecycleObject interface {
	client.Object
	LastScanTime() time.Time
}

type Node

type Node[T LifecycleObject] struct {
	// contains filtered or unexported fields
}

type Option

type Option[T LifecycleObject] func(scheduler *Scheduler[T])

func WithActiveJobCache

func WithActiveJobCache[T LifecycleObject](capacity uint64, ttl time.Duration) Option[T]

func WithJobConfig

func WithJobConfig[T LifecycleObject](jobConfig string) Option[T]

func WithQueueCapacity

func WithQueueCapacity[T LifecycleObject](capacity uint64) Option[T]

func WithWorkerCount

func WithWorkerCount[T LifecycleObject](workers uint64) Option[T]

type RingBufQueue

type RingBufQueue[T LifecycleObject] struct {
	Enqueued chan struct{}
	// contains filtered or unexported fields
}

func NewRingBufQueue

func NewRingBufQueue[T LifecycleObject](capacity uint64) *RingBufQueue[T]

func (*RingBufQueue[T]) Dequeue

func (q *RingBufQueue[T]) Dequeue() (Task[T], bool)

func (*RingBufQueue[T]) Enqueue

func (q *RingBufQueue[T]) Enqueue(item Task[T]) bool

func (*RingBufQueue[T]) FreeCapacity

func (q *RingBufQueue[T]) FreeCapacity() int

func (*RingBufQueue[T]) Has

func (q *RingBufQueue[T]) Has(key string) bool

func (*RingBufQueue[T]) IsEmpty

func (q *RingBufQueue[T]) IsEmpty() bool

func (*RingBufQueue[T]) IsFull

func (q *RingBufQueue[T]) IsFull() bool

func (*RingBufQueue[T]) Len

func (q *RingBufQueue[T]) Len() int

func (*RingBufQueue[T]) Print

func (q *RingBufQueue[T]) Print() string

func (*RingBufQueue[T]) TryEnqueue

func (q *RingBufQueue[T]) TryEnqueue(item Task[T]) bool

type Scheduler

type Scheduler[T LifecycleObject] struct {
	*kubernetes.Clientset
	// contains filtered or unexported fields
}

func NewScheduler

func NewScheduler[T LifecycleObject](
	logger *slog.Logger,
	cfg *rest.Config,
	namespace string,
	opts ...Option[T],
) *Scheduler[T]

NewScheduler creates a new Scheduler instance with the given parameters. The Scheduler manages the scheduling of tasks and their execution by workers. It uses a workqueue and a FIFO queue to track tasks. The workerCount parameter specifies the number of workers, it reflects the max number of kubernetes Jobs which could be run in parallel. The activeJobTTL parameter is the time-to-live for active jobs in the cache. The logger parameter is the logger instance to use for logging. The Scheduler instance is returned.

func (*Scheduler[T]) ForgetFinishedJob

func (s *Scheduler[T]) ForgetFinishedJob(key string)

ForgetFinishedJob deletes a finished job from the active job tracker. It takes a key string as input and removes the corresponding job from the tracker. If the job is successfully deleted, it is considered forgotten. The method does not return any value.

func (*Scheduler[T]) GetActiveJob

func (s *Scheduler[T]) GetActiveJob(id string) (Task[T], error)

func (*Scheduler[T]) Schedule

func (s *Scheduler[T]) Schedule(item Task[T]) commonv1alpha1.RequestResult

Schedule checks if a Task is already enqueued in one of the queues and returns the appropriate RequestResult. If the Task is not enqueued in any queue, it attempts to enqueue it in the workqueue. If the enqueuing in the workqueue is successful, it returns RequestResult_REQUEST_RESULT_SCHEDULED. Otherwise, it attempts to enqueue the Task in the FIFO queue. If the enqueuing in the FIFO queue is successful, it returns RequestResult_REQUEST_RESULT_SCHEDULED. If the Task is already enqueued in any of the queues, it returns RequestResult_REQUEST_RESULT_SCHEDULED. If the enqueuing in both queues fails, it returns RequestResult_REQUEST_RESULT_FAILURE.

func (*Scheduler[T]) Start

func (s *Scheduler[T]) Start(ctx context.Context)

Start starts the scheduler by performing the following steps: 1. Configures the activeJobs cache to call the dropFinishedJob method on eviction. 2. Starts the activeJobs cache in a separate goroutine. 3. Starts a worker goroutine for each worker in the scheduler's workers list. 5. Starts the schedulingLoop in a separate goroutine. 6. Waits for the schedulingLoop to complete. The context passed to the Start method is used to control the lifecycle of the scheduler.

type Task

type Task[T LifecycleObject] struct {
	Key        string
	Type       JobType
	Target     T
	TargetType string
}

func NewTask

func NewTask[T LifecycleObject](key string, taskType JobType, target T, targetType string) Task[T]

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL