jobqueue

package
v1.124.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2025 License: AGPL-3.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Queue

type Queue struct {
	RetryAfter time.Duration
	Now        func() time.Time
	// contains filtered or unexported fields
}

Queue is a priority queue of repair jobs paired with a priority queue of jobs to be retried once they are eligible. A secondary index on streamID+position is kept to allow updates to the health (priority) of jobs already in one of the queues.

func NewQueue

func NewQueue(log *zap.Logger, retryAfter time.Duration, initialAlloc, memReleaseThreshold int) (*Queue, error)

NewQueue creates a new Queue.

func (*Queue) Clean

func (q *Queue) Clean(updatedBefore time.Time) (removed int)

Clean removes all items from the queues that were last updated before the given time. This is a relatively expensive operation at O(n). The queues for this placement are left locked for the duration of the operation; all reads and writes to this placement will block until this is complete.

Returns the total number of items removed from the queues.

func (*Queue) Destroy

func (q *Queue) Destroy()

Destroy stops the queue's funnel goroutine (if it is still running) and frees the associated memory.

func (*Queue) Insert

func (q *Queue) Insert(job jobq.RepairJob) (wasNew bool)

Insert adds a job to the queue with the given health. If the segment is already in the repair queue or the retry queue, the job record is updated and left in the queue (with its position updated as necessary)

When a job is updated, its InsertedAt value is preserved, its UpdatedAt field is set to the current time, and the new NumAttempts field is added to the previously existing value.

If the job is not already in either queue and its LastAttemptedAt field is recent enough (as determined by RetryAfter), it is added to the retry queue instead of the repair queue, to wait until it is eligible for another try.

Returns true if the job was newly added to a queue, and false if an existing entry in the target queue was updated.

func (*Queue) Inspect

func (q *Queue) Inspect(streamID uuid.UUID, position uint64) jobq.RepairJob

Inspect finds a repair job in the queue by streamID and position and returns all of the job information.

func (*Queue) Len

func (q *Queue) Len() (inRepair, inRetry int64)

Len returns the number of segments in the repair queue and the retry queue, respectively.

func (*Queue) Peek

func (q *Queue) Peek() jobq.RepairJob

Peek returns the segment with the lowest health without removing it from the queue. If there are no segments in the queue, it returns a zero UUID and position.

func (*Queue) PeekRetry

func (q *Queue) PeekRetry() jobq.RepairJob

PeekRetry returns the segment with the smallest LastUpdatedAt value in the retry queue without removing it from the queue. If there are no segments in the queue, it returns a zero UUID and position.

func (*Queue) Pop

func (q *Queue) Pop() jobq.RepairJob

Pop removes and returns the segment with the lowest health from the repair queue. If there are no segments in the queue, it returns a zero job.

func (*Queue) ResetTimer

func (q *Queue) ResetTimer() error

ResetTimer causes the funnel goroutine to wake up and adjust its wait timer (might be used after artificially changing the clock, for example).

func (*Queue) Start

func (q *Queue) Start() error

Start starts the queue's funnel goroutine, which moves items from the retry queue to the repair queue as they become eligible for retry (after RetryAfter). If the queue is already running, it returns an error.

func (*Queue) Stop

func (q *Queue) Stop()

Stop stops the queue's funnel goroutine.

func (*Queue) Trim

func (q *Queue) Trim(healthGreaterThan float64) (removed int)

Trim removes all items from the queues with health greater than the given value. This is a relatively expensive operation at O(n). The queues for this placement are left locked for the duration of the operation; all reads and writes to this placement will block until this is complete.

Returns the total number of items removed from the queues.

func (*Queue) Truncate

func (q *Queue) Truncate()

Truncate removes all items currently in the queue.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL