Documentation ¶
Index ¶
- Variables
- type Limits
- type Mapable
- type Mapping
- func (m *Mapping[v]) Get(idx QueueIndex) v
- func (m *Mapping[v]) GetByKey(key string) v
- func (m *Mapping[v]) GetNext(idx QueueIndex) (v, error)
- func (m *Mapping[v]) Init(size int)
- func (m *Mapping[v]) Keys() []string
- func (m *Mapping[v]) Len() int
- func (m *Mapping[v]) Put(key string, value v) bool
- func (m *Mapping[v]) Remove(key string) bool
- func (m *Mapping[v]) Values() []v
- type Metrics
- type Queue
- type QueueIndex
- type QueuePath
- type Request
- type RequestChannel
- type RequestQueue
- func (q *RequestQueue) Dequeue(ctx context.Context, last QueueIndex, consumerID string) (Request, QueueIndex, error)
- func (q *RequestQueue) DequeueMany(ctx context.Context, idx QueueIndex, consumerID string, maxItems int) ([]Request, QueueIndex, error)
- func (q *RequestQueue) Enqueue(tenant string, path []string, req Request, successFn func()) error
- func (q *RequestQueue) GetConnectedConsumersMetric() float64
- func (q *RequestQueue) NotifyConsumerShutdown(querierID string)
- func (q *RequestQueue) RegisterConsumerConnection(querier string)
- func (q *RequestQueue) ReleaseRequests(items []Request)
- func (q *RequestQueue) UnregisterConsumerConnection(querier string)
- type SlicePool
- type TreeQueue
Constants ¶
This section is empty.
Variables ¶
var ( ErrTooManyRequests = errors.New("too many outstanding requests") ErrStopped = errors.New("queue is stopped") ErrQueueWasRemoved = errors.New("the queue has been removed or moved to another position") )
var ErrOutOfBounds = errors.New("queue index out of bounds")
Functions ¶
This section is empty.
Types ¶
type Mapable ¶
type Mapable interface { *tenantQueue | *TreeQueue // https://github.com/golang/go/issues/48522#issuecomment-924348755 Pos() QueueIndex SetPos(index QueueIndex) }
type Mapping ¶
type Mapping[v Mapable] struct { // contains filtered or unexported fields }
Mapping is a map-like data structure that allows accessing its items not only by key but also by index. When an item is removed, the internal key array is not resized, but the removed place is marked as empty. This allows to remove keys without changing the index of the remaining items after the removed key. Mapping uses *tenantQueue as concrete value and keys of type string. The data structure is not thread-safe.
func (*Mapping[v]) Get ¶
func (m *Mapping[v]) Get(idx QueueIndex) v
func (*Mapping[v]) GetNext ¶
func (m *Mapping[v]) GetNext(idx QueueIndex) (v, error)
type Metrics ¶
type Metrics struct {
// contains filtered or unexported fields
}
func NewMetrics ¶
func NewMetrics(registerer prometheus.Registerer, metricsNamespace, subsystem string) *Metrics
type Queue ¶
type Queue interface { Chan() RequestChannel Dequeue() Request Name() string Len() int }
type QueueIndex ¶
type QueueIndex int // nolint:revive
QueueIndex is opaque type that allows to resume iteration over tenants between successive calls of RequestQueue.GetNextRequestForQuerier method.
var StartIndex QueueIndex = -1
StartIndex is the index of the queue that starts iteration over sub queues.
var StartIndexWithLocalQueue QueueIndex = -2
StartIndexWithLocalQueue is the index of the queue that starts iteration over local and sub queues.
func (QueueIndex) ReuseLastIndex ¶
func (ui QueueIndex) ReuseLastIndex() QueueIndex
Modify index to start iteration on the same tenant, for which last queue was returned.
type RequestChannel ¶
type RequestChannel chan Request
RequestChannel is a channel that queues Requests
type RequestQueue ¶
RequestQueue holds incoming requests in per-tenant queues. It also assigns each tenant specified number of queriers, and when querier asks for next request to handle (using GetNextRequestForQuerier), it returns requests in a fair fashion.
func NewRequestQueue ¶
func (*RequestQueue) Dequeue ¶
func (q *RequestQueue) Dequeue(ctx context.Context, last QueueIndex, consumerID string) (Request, QueueIndex, error)
Dequeue find next tenant queue and takes the next request off of it. Will block if there are no requests. By passing tenant index from previous call of this method, querier guarantees that it iterates over all tenants fairly. Even if the consumer used UserIndex.ReuseLastUser to fetch the request from the same tenant's queue, it does not provide any guaranties that the previously used queue is still at this position because another consumer could already read the last request and the queue could be removed and another queue is already placed at this position.
func (*RequestQueue) DequeueMany ¶
func (q *RequestQueue) DequeueMany(ctx context.Context, idx QueueIndex, consumerID string, maxItems int) ([]Request, QueueIndex, error)
DequeueMany consumes multiple items for a single tenant from the queue. It blocks the execution until it dequeues at least 1 request and continue reading until it reaches `maxItems` requests or if no requests for this tenant are enqueued. The caller is responsible for returning the dequeued requests back to the pool by calling ReleaseRequests(items).
func (*RequestQueue) Enqueue ¶
func (q *RequestQueue) Enqueue(tenant string, path []string, req Request, successFn func()) error
Enqueue puts the request into the queue. If request is successfully enqueued, successFn is called with the lock held, before any querier can receive the request.
func (*RequestQueue) GetConnectedConsumersMetric ¶
func (q *RequestQueue) GetConnectedConsumersMetric() float64
func (*RequestQueue) NotifyConsumerShutdown ¶
func (q *RequestQueue) NotifyConsumerShutdown(querierID string)
func (*RequestQueue) RegisterConsumerConnection ¶
func (q *RequestQueue) RegisterConsumerConnection(querier string)
func (*RequestQueue) ReleaseRequests ¶
func (q *RequestQueue) ReleaseRequests(items []Request)
ReleaseRequests returns items back to the slice pool. Must only be called in combination with DequeueMany().
func (*RequestQueue) UnregisterConsumerConnection ¶
func (q *RequestQueue) UnregisterConsumerConnection(querier string)
type SlicePool ¶
type SlicePool[T any] struct { // contains filtered or unexported fields }
SlicePool uses a bucket pool and wraps the Get() and Put() functions for simpler access.
type TreeQueue ¶
type TreeQueue struct {
// contains filtered or unexported fields
}
TreeQueue is an hierarchical queue implementation where each sub-queue has the same guarantees to be chosen from. Each queue has also a local queue, which gets chosen with equal preference as the sub-queues.