Documentation ¶
Index ¶
- Variables
- type Request
- type RequestQueue
- func (q *RequestQueue) EnqueueRequest(userID string, req Request, maxQueriers int, successFn func()) error
- func (q *RequestQueue) GetConnectedQuerierWorkersMetric() float64
- func (q *RequestQueue) GetNextRequestForQuerier(ctx context.Context, last UserIndex, querierID string) (Request, UserIndex, error)
- func (q *RequestQueue) QuerierDisconnecting()
- func (q *RequestQueue) RegisterQuerierConnection(querier string)
- func (q *RequestQueue) Stop()
- func (q *RequestQueue) UnregisterQuerierConnection(querier string)
- type UserIndex
Constants ¶
This section is empty.
Variables ¶
var ( ErrTooManyRequests = errors.New("too many outstanding requests") ErrStopped = errors.New("queue is stopped") )
Functions ¶
This section is empty.
Types ¶
type RequestQueue ¶
type RequestQueue struct {
// contains filtered or unexported fields
}
RequestQueue holds incoming requests in per-user queues. It also assigns each user specified number of queriers, and when querier asks for next request to handle (using GetNextRequestForQuerier), it returns requests in a fair fashion.
func NewRequestQueue ¶
func NewRequestQueue(maxOutstandingPerTenant int, queueLength *prometheus.GaugeVec, discardedRequests *prometheus.CounterVec) *RequestQueue
func (*RequestQueue) EnqueueRequest ¶
func (q *RequestQueue) EnqueueRequest(userID string, req Request, maxQueriers int, successFn func()) error
EnqueueRequest puts the request into the queue. MaxQueries is user-specific value that specifies how many queriers can this user use (zero or negative = all queriers). It is passed to each EnqueueRequest, because it can change between calls.
If request is successfully enqueued, successFn is called with the lock held, before any querier can receive the request.
func (*RequestQueue) GetConnectedQuerierWorkersMetric ¶
func (q *RequestQueue) GetConnectedQuerierWorkersMetric() float64
func (*RequestQueue) GetNextRequestForQuerier ¶
func (q *RequestQueue) GetNextRequestForQuerier(ctx context.Context, last UserIndex, querierID string) (Request, UserIndex, error)
GetNextRequestForQuerier find next user queue and takes the next request off of it. Will block if there are no requests. By passing user index from previous call of this method, querier guarantees that it iterates over all users fairly. If querier finds that request from the user is already expired, it can get a request for the same user by using UserIndex.ReuseLastUser.
func (*RequestQueue) QuerierDisconnecting ¶
func (q *RequestQueue) QuerierDisconnecting()
When querier is waiting for next request, this unblocks the method.
func (*RequestQueue) RegisterQuerierConnection ¶
func (q *RequestQueue) RegisterQuerierConnection(querier string)
func (*RequestQueue) Stop ¶
func (q *RequestQueue) Stop()
func (*RequestQueue) UnregisterQuerierConnection ¶
func (q *RequestQueue) UnregisterQuerierConnection(querier string)
type UserIndex ¶
type UserIndex struct {
// contains filtered or unexported fields
}
UserIndex is opaque type that allows to resume iteration over users between successive calls of RequestQueue.GetNextRequestForQuerier method.
func FirstUser ¶
func FirstUser() UserIndex
FirstUser returns UserIndex that starts iteration over user queues from the very first user.
func (UserIndex) ReuseLastUser ¶
Modify index to start iteration on the same user, for which last queue was returned.