Documentation ¶
Index ¶
- Variables
- type FIFOQueue
- func (q *FIFOQueue[T]) Has(key string) bool
- func (q *FIFOQueue[T]) IsEmpty() bool
- func (q *FIFOQueue[T]) IsFull() bool
- func (q *FIFOQueue[T]) Len() int
- func (q *FIFOQueue[T]) Pop() (Task[T], bool)
- func (q *FIFOQueue[T]) Print() string
- func (q *FIFOQueue[T]) Push(item Task[T]) bool
- func (q *FIFOQueue[T]) TryPush(item Task[T]) bool
- type JobType
- type LifecycleObject
- type Node
- type Option
- type RingBufQueue
- func (q *RingBufQueue[T]) Dequeue() (Task[T], bool)
- func (q *RingBufQueue[T]) Enqueue(item Task[T]) bool
- func (q *RingBufQueue[T]) FreeCapacity() int
- func (q *RingBufQueue[T]) Has(key string) bool
- func (q *RingBufQueue[T]) IsEmpty() bool
- func (q *RingBufQueue[T]) IsFull() bool
- func (q *RingBufQueue[T]) Len() int
- func (q *RingBufQueue[T]) Print() string
- func (q *RingBufQueue[T]) TryEnqueue(item Task[T]) bool
- type Scheduler
- type Task
Constants ¶
This section is empty.
Variables ¶
var EvictionReason = map[ttlcache.EvictionReason]string{
ttlcache.EvictionReasonDeleted: "deleted",
ttlcache.EvictionReasonCapacityReached: "capacity reached",
ttlcache.EvictionReasonExpired: "ttl expired",
}
Functions ¶
This section is empty.
Types ¶
type FIFOQueue ¶
type FIFOQueue[T LifecycleObject] struct { // contains filtered or unexported fields }
func NewFIFOQueue ¶
func NewFIFOQueue[T LifecycleObject](capacity uint64) *FIFOQueue[T]
type Node ¶
type Node[T LifecycleObject] struct { // contains filtered or unexported fields }
type Option ¶
type Option[T LifecycleObject] func(scheduler *Scheduler[T])
func WithActiveJobCache ¶
func WithActiveJobCache[T LifecycleObject](capacity uint64, ttl time.Duration) Option[T]
func WithJobConfig ¶
func WithJobConfig[T LifecycleObject](jobConfig string) Option[T]
func WithQueueCapacity ¶
func WithQueueCapacity[T LifecycleObject](capacity uint64) Option[T]
func WithWorkerCount ¶
func WithWorkerCount[T LifecycleObject](workers uint64) Option[T]
type RingBufQueue ¶
type RingBufQueue[T LifecycleObject] struct { Enqueued chan struct{} // contains filtered or unexported fields }
func NewRingBufQueue ¶
func NewRingBufQueue[T LifecycleObject](capacity uint64) *RingBufQueue[T]
func (*RingBufQueue[T]) Dequeue ¶
func (q *RingBufQueue[T]) Dequeue() (Task[T], bool)
func (*RingBufQueue[T]) Enqueue ¶
func (q *RingBufQueue[T]) Enqueue(item Task[T]) bool
func (*RingBufQueue[T]) FreeCapacity ¶
func (q *RingBufQueue[T]) FreeCapacity() int
func (*RingBufQueue[T]) Has ¶
func (q *RingBufQueue[T]) Has(key string) bool
func (*RingBufQueue[T]) IsEmpty ¶
func (q *RingBufQueue[T]) IsEmpty() bool
func (*RingBufQueue[T]) IsFull ¶
func (q *RingBufQueue[T]) IsFull() bool
func (*RingBufQueue[T]) Len ¶
func (q *RingBufQueue[T]) Len() int
func (*RingBufQueue[T]) Print ¶
func (q *RingBufQueue[T]) Print() string
func (*RingBufQueue[T]) TryEnqueue ¶
func (q *RingBufQueue[T]) TryEnqueue(item Task[T]) bool
type Scheduler ¶
type Scheduler[T LifecycleObject] struct { *kubernetes.Clientset // contains filtered or unexported fields }
func NewScheduler ¶
func NewScheduler[T LifecycleObject]( logger *slog.Logger, cfg *rest.Config, namespace string, opts ...Option[T], ) *Scheduler[T]
NewScheduler creates a new Scheduler instance with the given parameters. The Scheduler manages the scheduling of tasks and their execution by workers. It uses a workqueue and a FIFO queue to track tasks. The workerCount parameter specifies the number of workers, it reflects the max number of kubernetes Jobs which could be run in parallel. The activeJobTTL parameter is the time-to-live for active jobs in the cache. The logger parameter is the logger instance to use for logging. The Scheduler instance is returned.
func (*Scheduler[T]) ForgetFinishedJob ¶
ForgetFinishedJob deletes a finished job from the active job tracker. It takes a key string as input and removes the corresponding job from the tracker. If the job is successfully deleted, it is considered forgotten. The method does not return any value.
func (*Scheduler[T]) Schedule ¶
func (s *Scheduler[T]) Schedule(item Task[T]) commonv1alpha1.RequestResult
Schedule checks if a Task is already enqueued in one of the queues and returns the appropriate RequestResult. If the Task is not enqueued in any queue, it attempts to enqueue it in the workqueue. If the enqueuing in the workqueue is successful, it returns RequestResult_REQUEST_RESULT_SCHEDULED. Otherwise, it attempts to enqueue the Task in the FIFO queue. If the enqueuing in the FIFO queue is successful, it returns RequestResult_REQUEST_RESULT_SCHEDULED. If the Task is already enqueued in any of the queues, it returns RequestResult_REQUEST_RESULT_SCHEDULED. If the enqueuing in both queues fails, it returns RequestResult_REQUEST_RESULT_FAILURE.
func (*Scheduler[T]) Start ¶
Start starts the scheduler by performing the following steps: 1. Configures the activeJobs cache to call the dropFinishedJob method on eviction. 2. Starts the activeJobs cache in a separate goroutine. 3. Starts a worker goroutine for each worker in the scheduler's workers list. 5. Starts the schedulingLoop in a separate goroutine. 6. Waits for the schedulingLoop to complete. The context passed to the Start method is used to control the lifecycle of the scheduler.