Documentation ¶
Index ¶
- Variables
- type DequeueArgs
- type MultiQueuingAlgorithmTreeQueue
- func (t *MultiQueuingAlgorithmTreeQueue) Dequeue(dequeueArgs *DequeueArgs) (QueuePath, any)
- func (t *MultiQueuingAlgorithmTreeQueue) EnqueueBackByPath(path QueuePath, v any) error
- func (t *MultiQueuingAlgorithmTreeQueue) EnqueueFrontByPath(path QueuePath, v any) error
- func (t *MultiQueuingAlgorithmTreeQueue) GetNode(path QueuePath) *Node
- func (t *MultiQueuingAlgorithmTreeQueue) IsEmpty() bool
- func (t *MultiQueuingAlgorithmTreeQueue) ItemCount() int
- type Node
- type QuerierID
- type QuerierWorkerConn
- type QuerierWorkerDequeueRequest
- type QuerierWorkerQueuePriorityAlgo
- type QueryComponent
- type QueryComponentUtilization
- func (qcl *QueryComponentUtilization) GetForComponent(component QueryComponent) int
- func (qcl *QueryComponentUtilization) MarkRequestCompleted(req *SchedulerRequest)
- func (qcl *QueryComponentUtilization) MarkRequestSent(req *SchedulerRequest)
- func (qcl *QueryComponentUtilization) ObserveInflightRequests()
- type QueryRequest
- type QueueIndex
- type QueuePath
- type QueuingAlgorithm
- type RequestKey
- type RequestQueue
- func (q *RequestQueue) AwaitRegisterQuerierWorkerConn(conn *QuerierWorkerConn) error
- func (q *RequestQueue) AwaitRequestForQuerier(dequeueReq *QuerierWorkerDequeueRequest) (QueryRequest, TenantIndex, error)
- func (q *RequestQueue) GetConnectedQuerierWorkersMetric() float64
- func (q *RequestQueue) SubmitNotifyQuerierShutdown(ctx context.Context, querierID QuerierID)
- func (q *RequestQueue) SubmitRequestToEnqueue(tenantID string, req QueryRequest, maxQueriers int, successFn func()) error
- func (q *RequestQueue) SubmitUnregisterQuerierWorkerConn(conn *QuerierWorkerConn)
- type SchedulerRequest
- type TenantID
- type TenantIndex
- type Tree
Constants ¶
This section is empty.
Variables ¶
Functions ¶
This section is empty.
Types ¶
type DequeueArgs ¶
type DequeueArgs struct {
// contains filtered or unexported fields
}
type MultiQueuingAlgorithmTreeQueue ¶
type MultiQueuingAlgorithmTreeQueue struct {
// contains filtered or unexported fields
}
MultiQueuingAlgorithmTreeQueue holds metadata and a pointer to the root node of a hierarchical queue implementation. The root Node maintains a localQueue and an arbitrary number of child nodes (which themselves may have local queues and children). Each Node in MultiQueuingAlgorithmTreeQueue uses a QueuingAlgorithm (determined by node depth) to determine dequeue order of that Node's subtree.
Each queuing dimension is modeled as a node in the tree, internally reachable through a QueuePath.
The QueuePath is an ordered array of strings describing the path from the tree root to a Node. In addition to child Nodes, each Node contains a local queue (FIFO) of items.
When dequeuing from a given node, a Node will use its QueuingAlgorithm to choose either itself or a child node to dequeue from recursively (i.e., a child Node will use its own QueuingAlgorithm to determine how to proceed). MultiQueuingAlgorithmTreeQueue will not dequeue from two different Nodes at the same depth consecutively, unless the previously-checked Node was empty down to the leaf node.
func NewTree ¶
func NewTree(queuingAlgorithms ...QueuingAlgorithm) (*MultiQueuingAlgorithmTreeQueue, error)
func (*MultiQueuingAlgorithmTreeQueue) Dequeue ¶
func (t *MultiQueuingAlgorithmTreeQueue) Dequeue(dequeueArgs *DequeueArgs) (QueuePath, any)
Dequeue removes and returns an item from the front of the next appropriate Node in the MultiQueuingAlgorithmTreeQueue, as well as the path to the Node which that item was dequeued from. If DequeueArgs are passed, each QueuingAlgorithm's setup function is called with DequeueArgs to update its state. If DequeueArgs is nil, the dequeue operation will proceed without setting up QueuingAlgorithm state.
Either the root/self node or a child node is chosen according to the Node's QueuingAlgorithm. If the root node is chosen, an item will be dequeued from the front of its localQueue. If a child node is chosen, it is recursively dequeued from until a node selects its localQueue.
Nodes that satisfy IsEmpty after a dequeue operation are deleted as the recursion returns up the stack. This maintains structural guarantees relied upon to make IsEmpty() non-recursive.
func (*MultiQueuingAlgorithmTreeQueue) EnqueueBackByPath ¶
func (t *MultiQueuingAlgorithmTreeQueue) EnqueueBackByPath(path QueuePath, v any) error
EnqueueBackByPath enqueues an item in the back of the local queue of the node located at a given path through the tree; nodes for the path are created as needed.
path is relative to the root node; providing a QueuePath beginning with "root" will create a child node of the root node which is also named "root."
func (*MultiQueuingAlgorithmTreeQueue) EnqueueFrontByPath ¶
func (t *MultiQueuingAlgorithmTreeQueue) EnqueueFrontByPath(path QueuePath, v any) error
EnqueueFrontByPath enqueues an item in the front of the local queue of the Node located at a given path through the MultiQueuingAlgorithmTreeQueue; nodes for the path are created as needed.
Enqueueing to the front is intended only for items which were first enqueued to the back and then dequeued after reaching the front.
Re-enqueueing to the front is only intended for use in cases where a queue consumer fails to complete operations on the dequeued item, but failure is not yet final, and the operations should be retried by a subsequent queue consumer. A concrete example is when a queue consumer fails or disconnects for unrelated reasons while we are in the process of dequeuing a request for it.
path must be relative to the root node; providing a QueuePath beginning with "root" will create a child node of root which is also named "root."
func (*MultiQueuingAlgorithmTreeQueue) GetNode ¶
func (t *MultiQueuingAlgorithmTreeQueue) GetNode(path QueuePath) *Node
func (*MultiQueuingAlgorithmTreeQueue) IsEmpty ¶
func (t *MultiQueuingAlgorithmTreeQueue) IsEmpty() bool
func (*MultiQueuingAlgorithmTreeQueue) ItemCount ¶
func (t *MultiQueuingAlgorithmTreeQueue) ItemCount() int
type Node ¶
type Node struct {
// contains filtered or unexported fields
}
Node maintains node-specific information used to enqueue and dequeue to itself, such as a local queue, node height, references to its children, and position in queue. Note that the tenantQuerierAssignments QueuingAlgorithm largely disregards Node's queueOrder and queuePosition, managing analogous state instead, because shuffle-sharding + fairness requirements necessitate input from the querier.
type QuerierWorkerConn ¶
type QuerierWorkerConn struct { QuerierID QuerierID WorkerID int // contains filtered or unexported fields }
QuerierWorkerConn is a connection from the querier-worker to the request queue.
WorkerID is unique only per querier; querier-1 and querier-2 will both have a WorkerID=0. WorkerID is derived internally in order to distribute worker connections across queue dimensions. Unregistered querier-worker connections are assigned a sentinel unregisteredWorkerID.
QuerierWorkerConn is also used when passing querierWorkerOperation messages to update querier connection statuses. The querierWorkerOperations can be specific to a querier, but not a particular worker connection (notifyShutdown), or may apply to all queriers instead of any particular querier (forgetDisconnected). In these cases the relevant ID fields are ignored and should be left as their unregistered or zero values.
func NewUnregisteredQuerierWorkerConn ¶
func NewUnregisteredQuerierWorkerConn(ctx context.Context, querierID QuerierID) *QuerierWorkerConn
func (*QuerierWorkerConn) IsRegistered ¶
func (qwc *QuerierWorkerConn) IsRegistered() bool
type QuerierWorkerDequeueRequest ¶
type QuerierWorkerDequeueRequest struct { *QuerierWorkerConn // contains filtered or unexported fields }
QuerierWorkerDequeueRequest is a request from a querier-worker which is ready to receive the next query. It embeds the unbuffered `recvChan` to receive the querierWorkerDequeueResponse from the RequestQueue.
func NewQuerierWorkerDequeueRequest ¶
func NewQuerierWorkerDequeueRequest(querierWorkerConn *QuerierWorkerConn, lastTenantIdx TenantIndex) *QuerierWorkerDequeueRequest
type QuerierWorkerQueuePriorityAlgo ¶
type QuerierWorkerQueuePriorityAlgo struct {
// contains filtered or unexported fields
}
QuerierWorkerQueuePriorityAlgo implements QueuingAlgorithm by mapping worker IDs to a queue node to prioritize. Querier-workers' prioritized queue nodes are calculated by the integer workerID % len(nodeOrder). This distribution of workers across query component subtrees ensures that when one query component is experiencing high latency about 25% of querier-workers continue prioritizing queries for unaffected components.
This significantly outperforms the previous round-robin approach which simply rotated through the node order. Although a vanilla round-robin algorithm will select a given query-component node 1 / 4 of the time, in situations of high latency on a query component, the utilization of the querier-worker connections as measured by inflight query processing time will grow asymptotically to be dominated by the slow query component.
There are 4 possible query components: "ingester", "store-gateway", "ingester-and-store-gateway", and "unknown". When all 4 queue nodes exist, approximately 1 / 4 of the querier-workers are prioritized to each queue node. This algorithm requires a minimum of 4 querier-workers per querier to prevent queue starvation. The minimum is enforced in the queriers by overriding -querier.max-concurrent if necessary.
MultiQueuingAlgorithmTreeQueue always deletes empty leaf nodes and nodes with no children after a dequeue operation, and only recreates the queue nodes when a new query request is enqueued which requires that path through the tree. QuerierWorkerQueuePriorityAlgo responds by removing or re-adding the query component nodes to the nodeOrder. This has two implications for the distribution of workers across queue nodes:
- The modulo operation may modulo the worker ID by 1, 2, 3, or 4 depending on the number of node types currently present in the nodeOrder, which can change which node a worker ID is prioritized for.
- The nodeOrder changes as queues are deleted and re-created, so the worker ID-to-node mapping changes as the random enqueue order places query component nodes in different positions in the order.
These changes in nodeOrder guarantee that when the number of querier-workers is not evenly divisible by the number of query component nodes, through the randomized changes in nodeOrder over time, the workers are more evenly distributed across query component nodes than if length and order of the nodes were fixed.
A given worker ID is prioritized to *start* at a given queue node, but is not assigned strictly to that node. During any period without change to the nodeOrder, the same worker ID consistently starts at the same queue node, but moves on to other nodes if it cannot dequeue a request from the subtree of its first prioritized queue node. Continuing to search through other query-component nodes and their subtrees minimizes idle querier-worker capacity.
A querier-worker can process queries for nodes it has not prioritized when this QueuingAlgorithm is applied at the highest layer of the tree and the tenant-querier-shuffle-shard QueuingAlgorithm applied at the second layer of the tree. If shuffle-sharding is enabled, a querier-worker that prioritizes ingester-only queries may not find ingester-only queries for any tenant it is assigned to, and move on to the next query component subtree. E.g.:
This algorithm has nodeOrder: ["ingester", "store-gateway", "ingester-and-store-gateway", "unknown"].
A querier-worker with workerID 0 requests to dequeue; it prioritizes the "ingester" queue node.
The dequeue operation attempts to dequeue first from the child nodes of the "ingester" node, where each child node is a tenant-specific queue of ingester-only queries. The tenantQuerierAssignments QueuingAlgorithm checks if any of its tenant queue nodes is sharded to this querier, and finds none.
The dequeue operation walks back up to the QuerierWorkerQueuePriorityAlgo level, not having dequeued anything. The QuerierWorkerQueuePriorityAlgo moves on and selects the next query-component node in the nodeOrder, and recurs again to search that next subtree for tenant queue nodes sharded to this querier, from step 3, etc., until a dequeue-able tenant queue node is found, or every query component node subtree has been exhausted.
func NewQuerierWorkerQueuePriorityAlgo ¶
func NewQuerierWorkerQueuePriorityAlgo() *QuerierWorkerQueuePriorityAlgo
type QueryComponent ¶
type QueryComponent string
const ( StoreGateway QueryComponent = "store-gateway" Ingester QueryComponent = "ingester" )
type QueryComponentUtilization ¶
type QueryComponentUtilization struct {
// contains filtered or unexported fields
}
QueryComponentUtilization tracks requests from the time they are forwarded to a querier to the time are completed by the querier or failed due to cancel, timeout, or disconnect. Unlike the Scheduler's schedulerInflightRequests, tracking begins only when the request is sent to a querier.
Scheduler-Querier inflight requests are broken out by the query component flags, representing whether the query request will be served by the ingesters, store-gateways, or both. Query requests utilizing both ingesters and store-gateways are tracked in the atomics for both component, therefore the sum of inflight requests by component is likely to exceed the inflight requests total.
func NewQueryComponentUtilization ¶
func NewQueryComponentUtilization( querierInflightRequestsMetric *prometheus.SummaryVec, ) (*QueryComponentUtilization, error)
func (*QueryComponentUtilization) GetForComponent ¶
func (qcl *QueryComponentUtilization) GetForComponent(component QueryComponent) int
GetForComponent is a test-only util
func (*QueryComponentUtilization) MarkRequestCompleted ¶
func (qcl *QueryComponentUtilization) MarkRequestCompleted(req *SchedulerRequest)
MarkRequestCompleted is called when a querier completes or fails a request
func (*QueryComponentUtilization) MarkRequestSent ¶
func (qcl *QueryComponentUtilization) MarkRequestSent(req *SchedulerRequest)
MarkRequestSent is called when a request is sent to a querier
func (*QueryComponentUtilization) ObserveInflightRequests ¶
func (qcl *QueryComponentUtilization) ObserveInflightRequests()
type QueryRequest ¶
type QueryRequest interface{}
QueryRequest represents the items stored in the queue which may be a SchedulerRequest when running with the standalone scheduler process, or a frontend/v1 request when running with the RequestQueue embedded in the v1 frontend.
type QueueIndex ¶
type QueueIndex int //nolint:revive // disallows types beginning with package name
type QueuePath ¶
type QueuePath []string //nolint:revive // disallows types beginning with package name
type QueuingAlgorithm ¶
type QueuingAlgorithm interface {
// contains filtered or unexported methods
}
QueuingAlgorithm represents the set of operations specific to different approaches to queuing/dequeuing. It is applied at the layer-level -- every Node at the same depth in a MultiQueuingAlgorithmTreeQueue shares the same QueuingAlgorithm, including any state in structs that implement QueuingAlgorithm.
type RequestKey ¶
type RequestKey struct {
// contains filtered or unexported fields
}
func NewSchedulerRequestKey ¶
func NewSchedulerRequestKey(frontendAddr string, queryID uint64) RequestKey
type RequestQueue ¶
type RequestQueue struct { services.Service // QueryComponentUtilization encapsulates tracking requests from the time they are forwarded to a querier // to the time are completed by the querier or failed due to cancel, timeout, or disconnect. // Unlike schedulerInflightRequests, tracking begins only when the request is sent to a querier. QueryComponentUtilization *QueryComponentUtilization // contains filtered or unexported fields }
RequestQueue holds incoming requests in queues, split by multiple dimensions based on properties of the request. Dequeuing selects the next request from an appropriate queue given the state of the system. Two layers of QueueAlgorithms are used by the RequestQueue to select the next queue to dequeue a request from:
Tenant-Querier Assignments Tenants with shuffle-sharding enabled by setting maxQueriers > 0 are assigned a subset of queriers. The RequestQueue utilizes the querier assignments to only dequeue requests for a tenant assigned to that querier. If shuffle-sharding is disabled, requests are dequeued in a fair round-robin fashion across all tenants.
Querier-Worker Queue Priority Querier-worker connections are distributed across queue partitions which separate query requests based on the query component expected to be utilized to service the query. This division prevents a query component experiencing high latency from dominating the utilization of querier-worker connections and preventing requests for other query components from being serviced.
See each QueueAlgorithm implementation for more details.
func NewRequestQueue ¶
func NewRequestQueue( log log.Logger, maxOutstandingPerTenant int, prioritizeQueryComponents bool, forgetDelay time.Duration, queueLength *prometheus.GaugeVec, discardedRequests *prometheus.CounterVec, enqueueDuration prometheus.Histogram, querierInflightRequestsMetric *prometheus.SummaryVec, ) (*RequestQueue, error)
func (*RequestQueue) AwaitRegisterQuerierWorkerConn ¶
func (q *RequestQueue) AwaitRegisterQuerierWorkerConn(conn *QuerierWorkerConn) error
func (*RequestQueue) AwaitRequestForQuerier ¶
func (q *RequestQueue) AwaitRequestForQuerier(dequeueReq *QuerierWorkerDequeueRequest) (QueryRequest, TenantIndex, error)
AwaitRequestForQuerier is called by a querier-worker to submit a QuerierWorkerDequeueRequest message to the RequestQueue.
This method blocks until the QuerierWorkerDequeueRequest gets a querierWorkerDequeueResponse message on its receiving channel, the querier-worker connection context is canceled, or the RequestQueue service stops.
Querier-workers should pass the last TenantIndex received from their previous call to AwaitRequestForQuerier, which enables the RequestQueue to iterate fairly across all tenants assigned to a querier. If a querier-worker finds that the query request received for the tenant is already expired, it can get another request for the same tenant by using TenantIndex.ReuseLastTenant. Newly-connected querier-workers should pass FirstTenant as the TenantIndex to start iteration from the beginning.
func (*RequestQueue) GetConnectedQuerierWorkersMetric ¶
func (q *RequestQueue) GetConnectedQuerierWorkersMetric() float64
func (*RequestQueue) SubmitNotifyQuerierShutdown ¶
func (q *RequestQueue) SubmitNotifyQuerierShutdown(ctx context.Context, querierID QuerierID)
SubmitNotifyQuerierShutdown is called by the v1 frontend or scheduler when NotifyQuerierShutdown requests are submitted from the querier to an endpoint, separate from any specific querier-worker connection.
func (*RequestQueue) SubmitRequestToEnqueue ¶
func (q *RequestQueue) SubmitRequestToEnqueue(tenantID string, req QueryRequest, maxQueriers int, successFn func()) error
SubmitRequestToEnqueue handles a query request from the query frontend or scheduler and submits it to the queue. This method will block until the queue's processing loop has enqueued the request into its internal queue structure.
If request is successfully enqueued, successFn is called before any querier can receive the request. Returns error if any occurred during enqueuing, or if the RequestQueue service stopped before enqueuing the request.
maxQueriers is tenant-specific value to compute which queriers should handle requests for this tenant. It is passed to SubmitRequestToEnqueue because the value can change between calls.
func (*RequestQueue) SubmitUnregisterQuerierWorkerConn ¶
func (q *RequestQueue) SubmitUnregisterQuerierWorkerConn(conn *QuerierWorkerConn)
type SchedulerRequest ¶
type SchedulerRequest struct { FrontendAddr string UserID string QueryID uint64 Request *httpgrpc.HTTPRequest StatsEnabled bool AdditionalQueueDimensions []string EnqueueTime time.Time Ctx context.Context CancelFunc context.CancelCauseFunc QueueSpan opentracing.Span ParentSpanContext opentracing.SpanContext }
func (*SchedulerRequest) ExpectedQueryComponentName ¶
func (sr *SchedulerRequest) ExpectedQueryComponentName() string
ExpectedQueryComponentName parses the expected query component from annotations by the frontend.
func (*SchedulerRequest) Key ¶
func (sr *SchedulerRequest) Key() RequestKey
type TenantIndex ¶
type TenantIndex struct {
// contains filtered or unexported fields
}
TenantIndex is opaque type that allows to resume iteration over tenants between successive calls of RequestQueue.AwaitRequestForQuerier method.
func FirstTenant ¶
func FirstTenant() TenantIndex
FirstTenant returns TenantIndex that starts iteration over tenant queues from the very first tenant.
func (TenantIndex) ReuseLastTenant ¶
func (ui TenantIndex) ReuseLastTenant() TenantIndex
ReuseLastTenant modifies index to start iteration on the same tenant, for which last queue was returned.