queue

package
v1.6.2-0...-8a36637 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 27, 2024 License: AGPL-3.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrTooManyRequests = errors.New("too many outstanding requests")
	ErrStopped         = errors.New("queue is stopped")
	ErrQueueWasRemoved = errors.New("the queue has been removed or moved to another position")
)
View Source
var ErrOutOfBounds = errors.New("queue index out of bounds")

Functions

This section is empty.

Types

type Limits

type Limits interface {
	// MaxConsumers returns the max consumers to use per tenant or 0 to allow all consumers to consume from the queue.
	MaxConsumers(user string, allConsumers int) int
}

type Mapable

type Mapable interface {
	*tenantQueue | *TreeQueue
	// https://github.com/golang/go/issues/48522#issuecomment-924348755
	Pos() QueueIndex
	SetPos(index QueueIndex)
}

type Mapping

type Mapping[v Mapable] struct {
	// contains filtered or unexported fields
}

Mapping is a map-like data structure that allows accessing its items not only by key but also by index. When an item is removed, the internal key array is not resized, but the removed place is marked as empty. This allows to remove keys without changing the index of the remaining items after the removed key. Mapping uses *tenantQueue as concrete value and keys of type string. The data structure is not thread-safe.

func (*Mapping[v]) Get

func (m *Mapping[v]) Get(idx QueueIndex) v

func (*Mapping[v]) GetByKey

func (m *Mapping[v]) GetByKey(key string) v

func (*Mapping[v]) GetNext

func (m *Mapping[v]) GetNext(idx QueueIndex) (v, error)

func (*Mapping[v]) Init

func (m *Mapping[v]) Init(size int)

func (*Mapping[v]) Keys

func (m *Mapping[v]) Keys() []string

func (*Mapping[v]) Len

func (m *Mapping[v]) Len() int

func (*Mapping[v]) Put

func (m *Mapping[v]) Put(key string, value v) bool

func (*Mapping[v]) Remove

func (m *Mapping[v]) Remove(key string) bool

func (*Mapping[v]) Values

func (m *Mapping[v]) Values() []v

type Metrics

type Metrics struct {
	// contains filtered or unexported fields
}

func NewMetrics

func NewMetrics(registerer prometheus.Registerer, metricsNamespace, subsystem string) *Metrics

func (*Metrics) Cleanup

func (m *Metrics) Cleanup(user string)

type Queue

type Queue interface {
	Chan() RequestChannel
	Dequeue() Request
	Name() string
	Len() int
}

type QueueIndex

type QueueIndex int // nolint:revive

QueueIndex is opaque type that allows to resume iteration over tenants between successive calls of RequestQueue.GetNextRequestForQuerier method.

var StartIndex QueueIndex = -1

StartIndex is the index of the queue that starts iteration over sub queues.

var StartIndexWithLocalQueue QueueIndex = -2

StartIndexWithLocalQueue is the index of the queue that starts iteration over local and sub queues.

func (QueueIndex) ReuseLastIndex

func (ui QueueIndex) ReuseLastIndex() QueueIndex

Modify index to start iteration on the same tenant, for which last queue was returned.

type QueuePath

type QueuePath []string //nolint:revive

type Request

type Request any

Request stored into the queue.

type RequestChannel

type RequestChannel chan Request

RequestChannel is a channel that queues Requests

type RequestQueue

type RequestQueue struct {
	services.Service
	// contains filtered or unexported fields
}

RequestQueue holds incoming requests in per-tenant queues. It also assigns each tenant specified number of queriers, and when querier asks for next request to handle (using GetNextRequestForQuerier), it returns requests in a fair fashion.

func NewRequestQueue

func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, limits Limits, metrics *Metrics) *RequestQueue

func (*RequestQueue) Dequeue

func (q *RequestQueue) Dequeue(ctx context.Context, last QueueIndex, consumerID string) (Request, QueueIndex, error)

Dequeue find next tenant queue and takes the next request off of it. Will block if there are no requests. By passing tenant index from previous call of this method, querier guarantees that it iterates over all tenants fairly. Even if the consumer used UserIndex.ReuseLastUser to fetch the request from the same tenant's queue, it does not provide any guaranties that the previously used queue is still at this position because another consumer could already read the last request and the queue could be removed and another queue is already placed at this position.

func (*RequestQueue) DequeueMany

func (q *RequestQueue) DequeueMany(ctx context.Context, idx QueueIndex, consumerID string, maxItems int) ([]Request, QueueIndex, error)

DequeueMany consumes multiple items for a single tenant from the queue. It blocks the execution until it dequeues at least 1 request and continue reading until it reaches `maxItems` requests or if no requests for this tenant are enqueued. The caller is responsible for returning the dequeued requests back to the pool by calling ReleaseRequests(items).

func (*RequestQueue) Enqueue

func (q *RequestQueue) Enqueue(tenant string, path []string, req Request, successFn func()) error

Enqueue puts the request into the queue. If request is successfully enqueued, successFn is called with the lock held, before any querier can receive the request.

func (*RequestQueue) GetConnectedConsumersMetric

func (q *RequestQueue) GetConnectedConsumersMetric() float64

func (*RequestQueue) NotifyConsumerShutdown

func (q *RequestQueue) NotifyConsumerShutdown(querierID string)

func (*RequestQueue) RegisterConsumerConnection

func (q *RequestQueue) RegisterConsumerConnection(querier string)

func (*RequestQueue) ReleaseRequests

func (q *RequestQueue) ReleaseRequests(items []Request)

ReleaseRequests returns items back to the slice pool. Must only be called in combination with DequeueMany().

func (*RequestQueue) UnregisterConsumerConnection

func (q *RequestQueue) UnregisterConsumerConnection(querier string)

type SlicePool

type SlicePool[T any] struct {
	// contains filtered or unexported fields
}

SlicePool uses a bucket pool and wraps the Get() and Put() functions for simpler access.

func NewSlicePool

func NewSlicePool[T any](minSize, maxSize int, factor float64) *SlicePool[T]

func (*SlicePool[T]) Get

func (sp *SlicePool[T]) Get(n int) []T

func (*SlicePool[T]) Put

func (sp *SlicePool[T]) Put(buf []T)

type TreeQueue

type TreeQueue struct {
	// contains filtered or unexported fields
}

TreeQueue is an hierarchical queue implementation where each sub-queue has the same guarantees to be chosen from. Each queue has also a local queue, which gets chosen with equal preference as the sub-queues.

func (*TreeQueue) Chan

func (q *TreeQueue) Chan() RequestChannel

Chan implements Queue

func (*TreeQueue) Dequeue

func (q *TreeQueue) Dequeue() Request

Dequeue implements Queue

func (*TreeQueue) Len

func (q *TreeQueue) Len() int

Len implements Queue It returns the length of the local queue and all sub-queues. This may be expensive depending on the size of the queue tree.

func (*TreeQueue) Name

func (q *TreeQueue) Name() string

Name implements Queue

func (*TreeQueue) Pos

func (q *TreeQueue) Pos() QueueIndex

Index implements Mapable

func (*TreeQueue) SetPos

func (q *TreeQueue) SetPos(index QueueIndex)

Index implements Mapable

func (*TreeQueue) String

func (q *TreeQueue) String() string

String makes the queue printable

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL