Documentation ¶
Overview ¶
concurrency/const.go
concurrency/handler.go
concurrency/metrics.go
concurrency/resize.go
concurrency/scale.go
concurrency/semaphore.go
package provides utilities to manage concurrency control. The Concurrency Manager
ensures no more than a certain number of concurrent requests (e.g., 5 for Jamf Pro) are sent at the same time. This is managed using a semaphore
Index ¶
- Constants
- type ConcurrencyHandler
- func (ch *ConcurrencyHandler) AcquireConcurrencyPermit(ctx context.Context) (context.Context, uuid.UUID, error)
- func (ch *ConcurrencyHandler) EvaluateAndAdjustConcurrency(resp *http.Response, responseTime time.Duration)
- func (ch *ConcurrencyHandler) MonitorRateLimitHeaders(resp *http.Response) int
- func (ch *ConcurrencyHandler) MonitorResponseTimeVariability(responseTime time.Duration) int
- func (ch *ConcurrencyHandler) MonitorServerResponseCodes(resp *http.Response) int
- func (ch *ConcurrencyHandler) ReleaseConcurrencyToken(requestID uuid.UUID)
- func (ch *ConcurrencyHandler) ResizeSemaphore(newSize int64)
- func (ch *ConcurrencyHandler) ScaleDown()
- func (ch *ConcurrencyHandler) ScaleUp()
- type ConcurrencyMetrics
- type RequestIDKey
Constants ¶
const ( // MaxConcurrency defines the upper limit of concurrent requests the system can handle. MaxConcurrency = 10 // MinConcurrency defines the lower limit of concurrent requests the system will maintain. MinConcurrency = 1 // EvaluationInterval specifies the frequency at which the system evaluates its performance metrics // to decide if concurrency adjustments are needed. EvaluationInterval = 1 * time.Minute // MaxAcceptableTTFB represents the maximum acceptable Time to First Byte (TTFB) in milliseconds. // TTFB is the time taken for the server to start sending the first byte of data in response to a request. // Adjustments in concurrency will be made if the TTFB exceeds this threshold. MaxAcceptableTTFB = 300 * time.Millisecond // MaxAcceptableThroughput represents the maximum acceptable network throughput in bytes per second. // Throughput is the amount of data transferred over the network within a specific time interval. // Adjustments in concurrency will be made if the network throughput exceeds this threshold. MaxAcceptableThroughput = 5 * 1024 * 1024 // MaxAcceptableResponseTimeVariability represents the maximum acceptable variability in response times. // It is used as a threshold to dynamically adjust concurrency based on fluctuations in response times. MaxAcceptableResponseTimeVariability = 500 * time.Millisecond // ErrorRateThreshold represents the threshold for error rate above which concurrency will be adjusted. // Error rate is calculated as (TotalRateLimitErrors + 5xxErrors) / TotalRequests. // Adjustments in concurrency will be made if the error rate exceeds this threshold. A threshold of 0.1 (or 10%) is common. ErrorRateThreshold = 0.1 // Weight assigned to each metric feedback type WeightRateLimit = 0.5 // Weight for rate limit feedback, less if not all APIs provide this data WeightResponseCodes = 1.0 // Weight for server response codes WeightResponseTime = 1.5 // Higher weight for response time variability // Thresholds for semaphore scaling actions ThresholdScaleDown = -1.5 // Threshold to decide scaling down ThresholdScaleUp = 1.5 // Threshold to decide scaling up )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConcurrencyHandler ¶
type ConcurrencyHandler struct { Metrics *ConcurrencyMetrics // contains filtered or unexported fields }
func NewConcurrencyHandler ¶
func NewConcurrencyHandler(limit int64, logger logger.Logger, metrics *ConcurrencyMetrics) *ConcurrencyHandler
NewConcurrencyHandler initializes a new ConcurrencyHandler with the given concurrency limit, logger, and concurrency metrics. The ConcurrencyHandler ensures no more than a certain number of concurrent requests are made. It uses a semaphore to control concurrency.
func NewConcurrencyHandler(limit int, logger logger.Logger, metrics *ConcurrencyMetrics) *ConcurrencyHandler { return &ConcurrencyHandler{ sem: make(chan struct{}, limit), logger: logger, AcquisitionTimes: []time.Duration{}, Metrics: metrics, } }
func (*ConcurrencyHandler) AcquireConcurrencyPermit ¶ added in v0.1.33
func (ch *ConcurrencyHandler) AcquireConcurrencyPermit(ctx context.Context) (context.Context, uuid.UUID, error)
case <-ctxWithTimeout.Done(): // Failed to acquire a permit within the timeout log.Error("Failed to acquire concurrency permit", zap.Error(ctxWithTimeout.Err())) return ctx, requestID, ctxWithTimeout.Err() } }
func (*ConcurrencyHandler) EvaluateAndAdjustConcurrency ¶ added in v0.1.32
func (ch *ConcurrencyHandler) EvaluateAndAdjustConcurrency(resp *http.Response, responseTime time.Duration)
EvaluateAndAdjustConcurrency evaluates the HTTP response from a server along with the request's response time and adjusts the concurrency level of the system accordingly. It utilizes three monitoring functions: MonitorRateLimitHeaders, MonitorServerResponseCodes, and MonitorResponseTimeVariability, each of which provides feedback on different aspects of the response and system's current state. The function aggregates feedback from these monitoring functions to make a decision on whether to scale up or scale down the concurrency. The decision is based on a simple majority of suggestions: if more functions suggest scaling down (return -1), it scales down; if more suggest scaling up (return 1), it scales up. This method centralizes concurrency control decision-making, providing a systematic approach to managing request handling capacity based on real-time operational metrics.
Parameters:
resp - The HTTP response received from the server. responseTime - The time duration between sending the request and receiving the response.
It logs the specific reason for scaling decisions, helping in traceability and fine-tuning system performance.
func (*ConcurrencyHandler) MonitorRateLimitHeaders ¶ added in v0.1.31
func (ch *ConcurrencyHandler) MonitorRateLimitHeaders(resp *http.Response) int
return suggestion }
func (*ConcurrencyHandler) MonitorResponseTimeVariability ¶ added in v0.1.31
func (ch *ConcurrencyHandler) MonitorResponseTimeVariability(responseTime time.Duration) int
// Determine action based on standard deviation if stdDev > ch.Metrics.ResponseTimeVariability.StdDevThreshold { // Suggest decrease concurrency return -1 } else if stdDev <= ch.Metrics.ResponseTimeVariability.StdDevThreshold && len(ch.sem) < MaxConcurrency { // Suggest increase concurrency if there is capacity return 1 } return 0 }
func (*ConcurrencyHandler) MonitorServerResponseCodes ¶ added in v0.1.31
func (ch *ConcurrencyHandler) MonitorServerResponseCodes(resp *http.Response) int
// Determine action based on the error rate if errorRate > ErrorRateThreshold { // Suggest decrease concurrency return -1 } else if errorRate <= ErrorRateThreshold && len(ch.sem) < MaxConcurrency { // Suggest increase concurrency if there is capacity return 1 } return 0 }
func (*ConcurrencyHandler) ReleaseConcurrencyToken ¶
func (ch *ConcurrencyHandler) ReleaseConcurrencyToken(requestID uuid.UUID)
// Log the release of the concurrency permit for auditing and debugging purposes ch.logger.Debug("Released concurrency permit", zap.String("RequestID", requestID.String()), zap.Int("UtilizedTokens", utilizedTokens), zap.Int("AvailableTokens", availableTokens), ) }
func (*ConcurrencyHandler) ResizeSemaphore ¶ added in v0.1.31
func (ch *ConcurrencyHandler) ResizeSemaphore(newSize int64)
// Transfer tokens from the old semaphore to the new one. for { select { case token := <-ch.sem: select { case newSem <- token: // Token transferred to new semaphore. default: // New semaphore is full, put token back to the old one to allow ongoing operations to complete. ch.sem <- token } default: // No more tokens to transfer. close(ch.sem) ch.sem = newSem return } } }
func (*ConcurrencyHandler) ScaleDown ¶ added in v0.1.32
func (ch *ConcurrencyHandler) ScaleDown()
// We must consider the capacity rather than the length of the semaphore channel currentSize := cap(ch.sem) if currentSize > MinConcurrency { newSize := currentSize - 1 ch.logger.Info("Reducing request concurrency", zap.Int("currentSize", currentSize), zap.Int("newSize", newSize)) ch.ResizeSemaphore(newSize) } else { ch.logger.Info("Concurrency already at minimum level; cannot reduce further", zap.Int("currentSize", currentSize)) } }
func (*ConcurrencyHandler) ScaleUp ¶ added in v0.1.32
func (ch *ConcurrencyHandler) ScaleUp()
currentSize := cap(ch.sem) if currentSize < MaxConcurrency { newSize := currentSize + 1 ch.logger.Info("Increasing request concurrency", zap.Int("currentSize", currentSize), zap.Int("newSize", newSize)) ch.ResizeSemaphore(newSize) } else { ch.logger.Info("Concurrency already at maximum level; cannot increase further", zap.Int("currentSize", currentSize)) } }
type ConcurrencyMetrics ¶
type ConcurrencyMetrics struct { TotalRequests int64 // Total number of requests made TotalRetries int64 // Total number of retry attempts TotalRateLimitErrors int64 // Total number of rate limit errors encountered TokenWaitTime time.Duration // Total time spent waiting for tokens TTFB struct { Total time.Duration // Total Time to First Byte (TTFB) for all requests Count int64 // Count of requests used for calculating TTFB Lock sync.Mutex // Lock for TTFB metrics } Throughput struct { Total float64 // Total network throughput for all requests Count int64 // Count of requests used for calculating throughput Lock sync.Mutex // Lock for throughput metrics/ } ResponseTimeVariability struct { Total time.Duration // Total response time for all requests Average time.Duration // Average response time across all requests Variance float64 // Variance of response times Count int64 // Count of responses used for calculating response time variability Lock sync.Mutex // Lock for response time variability metrics StdDevThreshold float64 // Maximum acceptable standard deviation for adjusting concurrency } ResponseCodeMetrics struct { ErrorRate float64 // Error rate calculated as (TotalRateLimitErrors + 5xxErrors) / TotalRequests Lock sync.Mutex // Lock for response code metrics } Lock sync.Mutex // Lock for overall metrics fields }
ConcurrencyMetrics captures various metrics related to managing concurrency for the client's interactions with the API.
type RequestIDKey ¶
type RequestIDKey struct{}
RequestIDKey is type used as a key for storing and retrieving request-specific identifiers from a context.Context object. This private type ensures that the key is distinct and prevents accidental value retrieval or conflicts with other context keys. The value associated with this key in a context is typically a UUID that uniquely identifies a request being processed by the ConcurrencyManager, allowing for fine-grained control and tracking of concurrent HTTP requests.