queue

package
v0.0.0-...-12c09fd Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 16, 2025 License: AGPL-3.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidTenantID     = errors.New("invalid tenant id")
	ErrTooManyRequests     = errors.New("too many outstanding requests")
	ErrStopped             = errors.New("queue is stopped")
	ErrQuerierShuttingDown = errors.New("querier has informed the scheduler it is shutting down")
)

Functions

This section is empty.

Types

type QuerierWorkerConn

type QuerierWorkerConn struct {
	QuerierID string
	WorkerID  int
	// contains filtered or unexported fields
}

QuerierWorkerConn is a connection from the querier-worker to the request queue.

WorkerID is unique only per querier; querier-1 and querier-2 will both have a WorkerID=0. WorkerID is derived internally in order to distribute worker connections across queue dimensions. Unregistered querier-worker connections are assigned a sentinel unregisteredWorkerID.

QuerierWorkerConn is also used when passing querierWorkerOperation messages to update querier connection statuses. The querierWorkerOperations can be specific to a querier, but not a particular worker connection (notifyShutdown), or may apply to all queriers instead of any particular querier (forgetDisconnected). In these cases the relevant ID fields are ignored and should be left as their unregistered or zero values.

func NewUnregisteredQuerierWorkerConn

func NewUnregisteredQuerierWorkerConn(ctx context.Context, querierID string) *QuerierWorkerConn

func (*QuerierWorkerConn) IsRegistered

func (qwc *QuerierWorkerConn) IsRegistered() bool

type QuerierWorkerDequeueRequest

type QuerierWorkerDequeueRequest struct {
	*QuerierWorkerConn
	// contains filtered or unexported fields
}

QuerierWorkerDequeueRequest is a request from a querier-worker which is ready to receive the next query. It embeds the unbuffered `recvChan` to receive the querierWorkerDequeueResponse from the RequestQueue.

func NewQuerierWorkerDequeueRequest

func NewQuerierWorkerDequeueRequest(querierWorkerConn *QuerierWorkerConn, lastTenantIdx TenantIndex) *QuerierWorkerDequeueRequest

type QueryComponent

type QueryComponent string
const (
	StoreGateway QueryComponent = "store-gateway"
	Ingester     QueryComponent = "ingester"
)

type QueryComponentUtilization

type QueryComponentUtilization struct {
	// contains filtered or unexported fields
}

QueryComponentUtilization tracks requests from the time they are forwarded to a querier to the time are completed by the querier or failed due to cancel, timeout, or disconnect. Unlike the Scheduler's schedulerInflightRequests, tracking begins only when the request is sent to a querier.

Scheduler-Querier inflight requests are broken out by the query component flags, representing whether the query request will be served by the ingesters, store-gateways, or both. Query requests utilizing both ingesters and store-gateways are tracked in the atomics for both component, therefore the sum of inflight requests by component is likely to exceed the inflight requests total.

func NewQueryComponentUtilization

func NewQueryComponentUtilization(
	querierInflightRequestsMetric *prometheus.SummaryVec,
) (*QueryComponentUtilization, error)

func (*QueryComponentUtilization) GetForComponent

func (qcl *QueryComponentUtilization) GetForComponent(component QueryComponent) int

GetForComponent is a test-only util

func (*QueryComponentUtilization) MarkRequestCompleted

func (qcl *QueryComponentUtilization) MarkRequestCompleted(req *SchedulerRequest)

MarkRequestCompleted is called when a querier completes or fails a request

func (*QueryComponentUtilization) MarkRequestSent

func (qcl *QueryComponentUtilization) MarkRequestSent(req *SchedulerRequest)

MarkRequestSent is called when a request is sent to a querier

func (*QueryComponentUtilization) ObserveInflightRequests

func (qcl *QueryComponentUtilization) ObserveInflightRequests()

type QueryRequest

type QueryRequest interface{}

QueryRequest represents the items stored in the queue which may be a SchedulerRequest when running with the standalone scheduler process, or a frontend/v1 request when running with the RequestQueue embedded in the v1 frontend.

type RequestKey

type RequestKey struct {
	// contains filtered or unexported fields
}

func NewSchedulerRequestKey

func NewSchedulerRequestKey(frontendAddr string, queryID uint64) RequestKey

type RequestQueue

type RequestQueue struct {
	services.Service

	// QueryComponentUtilization encapsulates tracking requests from the time they are forwarded to a querier
	// to the time are completed by the querier or failed due to cancel, timeout, or disconnect.
	// Unlike schedulerInflightRequests, tracking begins only when the request is sent to a querier.
	QueryComponentUtilization *QueryComponentUtilization
	// contains filtered or unexported fields
}

RequestQueue holds incoming requests in queues, split by multiple dimensions based on properties of the request. Dequeuing selects the next request from an appropriate queue given the state of the system. Two layers of QueueAlgorithms are used by the RequestQueue to select the next queue to dequeue a request from:

  • Tenant-Querier Assignments Tenants with shuffle-sharding enabled by setting maxQueriers > 0 are assigned a subset of queriers. The RequestQueue utilizes the querier assignments to only dequeue requests for a tenant assigned to that querier. If shuffle-sharding is disabled, requests are dequeued in a fair round-robin fashion across all tenants.

  • Querier-Worker Queue Priority Querier-worker connections are distributed across queue partitions which separate query requests based on the query component expected to be utilized to service the query. This division prevents a query component experiencing high latency from dominating the utilization of querier-worker connections and preventing requests for other query components from being serviced.

See each QueueAlgorithm implementation for more details.

func NewRequestQueue

func NewRequestQueue(
	log log.Logger,
	maxOutstandingPerTenant int,
	forgetDelay time.Duration,
	queueLength *prometheus.GaugeVec,
	discardedRequests *prometheus.CounterVec,
	enqueueDuration prometheus.Histogram,
	querierInflightRequestsMetric *prometheus.SummaryVec,
) (*RequestQueue, error)

func (*RequestQueue) AwaitRegisterQuerierWorkerConn

func (q *RequestQueue) AwaitRegisterQuerierWorkerConn(conn *QuerierWorkerConn) error

func (*RequestQueue) AwaitRequestForQuerier

func (q *RequestQueue) AwaitRequestForQuerier(dequeueReq *QuerierWorkerDequeueRequest) (QueryRequest, TenantIndex, error)

AwaitRequestForQuerier is called by a querier-worker to submit a QuerierWorkerDequeueRequest message to the RequestQueue.

This method blocks until the QuerierWorkerDequeueRequest gets a querierWorkerDequeueResponse message on its receiving channel, the querier-worker connection context is canceled, or the RequestQueue service stops.

Querier-workers should pass the last TenantIndex received from their previous call to AwaitRequestForQuerier, which enables the RequestQueue to iterate fairly across all tenants assigned to a querier. If a querier-worker finds that the query request received for the tenant is already expired, it can get another request for the same tenant by using TenantIndex.ReuseLastTenant. Newly-connected querier-workers should pass FirstTenant as the TenantIndex to start iteration from the beginning.

func (*RequestQueue) GetConnectedQuerierWorkersMetric

func (q *RequestQueue) GetConnectedQuerierWorkersMetric() float64

func (*RequestQueue) SubmitNotifyQuerierShutdown

func (q *RequestQueue) SubmitNotifyQuerierShutdown(ctx context.Context, querierID string)

SubmitNotifyQuerierShutdown is called by the v1 frontend or scheduler when NotifyQuerierShutdown requests are submitted from the querier to an endpoint, separate from any specific querier-worker connection.

func (*RequestQueue) SubmitRequestToEnqueue

func (q *RequestQueue) SubmitRequestToEnqueue(tenantID string, req QueryRequest, maxQueriers int, successFn func()) error

SubmitRequestToEnqueue handles a query request from the query frontend or scheduler and submits it to the queue. This method will block until the queue's processing loop has enqueued the request into its internal queue structure.

If request is successfully enqueued, successFn is called before any querier can receive the request. Returns error if any occurred during enqueuing, or if the RequestQueue service stopped before enqueuing the request.

maxQueriers is tenant-specific value to compute which queriers should handle requests for this tenant. It is passed to SubmitRequestToEnqueue because the value can change between calls.

func (*RequestQueue) SubmitUnregisterQuerierWorkerConn

func (q *RequestQueue) SubmitUnregisterQuerierWorkerConn(conn *QuerierWorkerConn)

type SchedulerRequest

type SchedulerRequest struct {
	FrontendAddr              string
	UserID                    string
	QueryID                   uint64
	Request                   *httpgrpc.HTTPRequest
	StatsEnabled              bool
	AdditionalQueueDimensions []string

	EnqueueTime time.Time

	Ctx        context.Context
	CancelFunc context.CancelCauseFunc
	QueueSpan  opentracing.Span

	ParentSpanContext opentracing.SpanContext
}

func (*SchedulerRequest) ExpectedQueryComponentName

func (sr *SchedulerRequest) ExpectedQueryComponentName() string

ExpectedQueryComponentName parses the expected query component from annotations by the frontend.

func (*SchedulerRequest) Key

func (sr *SchedulerRequest) Key() RequestKey

type TenantIndex

type TenantIndex struct {
	// contains filtered or unexported fields
}

TenantIndex is opaque type that allows to resume iteration over tenants between successive calls of RequestQueue.AwaitRequestForQuerier method.

func FirstTenant

func FirstTenant() TenantIndex

FirstTenant returns TenantIndex that starts iteration over tenant queues from the very first tenant.

func (TenantIndex) ReuseLastTenant

func (ui TenantIndex) ReuseLastTenant() TenantIndex

ReuseLastTenant modifies index to start iteration on the same tenant, for which last queue was returned.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL