Documentation ¶
Overview ¶
concurrency/const.go
concurrency/handler.go
concurrency/metrics.go
concurrency/resize.go
concurrency/scale.go
concurrency/semaphore.go
package provides utilities to manage concurrency control. The Concurrency Manager
ensures no more than a certain number of concurrent requests (e.g., 5 for Jamf Pro) are sent at the same time. This is managed using a semaphore
Index ¶
- Constants
- type ConcurrencyHandler
- func (ch *ConcurrencyHandler) AcquireConcurrencyPermit(ctx context.Context) (context.Context, uuid.UUID, error)
- func (ch *ConcurrencyHandler) EvaluateAndAdjustConcurrency(resp *http.Response, responseTime time.Duration)
- func (ch *ConcurrencyHandler) MonitorRateLimitHeaders(resp *http.Response) int
- func (ch *ConcurrencyHandler) MonitorResponseTimeVariability(responseTime time.Duration) int
- func (ch *ConcurrencyHandler) MonitorServerResponseCodes(resp *http.Response) int
- func (ch *ConcurrencyHandler) ReleaseConcurrencyPermit(requestID uuid.UUID)
- func (ch *ConcurrencyHandler) ResizeSemaphore(newSize int)
- func (ch *ConcurrencyHandler) ScaleDown()
- func (ch *ConcurrencyHandler) ScaleUp()
- type ConcurrencyMetrics
- type RequestIDKey
Constants ¶
const ( // MaxConcurrency represents the maximum number of concurrent requests the system is designed to handle safely. MaxConcurrency = 10 // MinConcurrency is the minimum number of concurrent requests that the system will maintain, // even under low traffic conditions or when scaling down due to low resource utilization. MinConcurrency = 1 // EvaluationInterval specifies the frequency at which the system evaluates its performance metrics // to make decisions about scaling concurrency up or down. EvaluationInterval = 1 * time.Minute // MaxAcceptableTTFB (Time to First Byte) is the threshold for the longest acceptable delay // between making a request and receiving the first byte of data in the response. If response // times exceed this threshold, it indicates potential performance issues, and the system may // scale down concurrency to reduce load on the server. MaxAcceptableTTFB = 300 * time.Millisecond // MaxAcceptableThroughput is the threshold for the maximum network data transfer rate. If the // system's throughput exceeds this value, it may be an indicator of high traffic demanding // significant bandwidth, which could warrant a scale-up in concurrency to maintain performance. MaxAcceptableThroughput = 5 * 1024 * 1024 // 5 MBps // MaxAcceptableResponseTimeVariability is the threshold for the maximum allowed variability or // fluctuations in response times. A high variability often indicates an unstable system, which // could trigger a scale-down to allow the system to stabilize. MaxAcceptableResponseTimeVariability = 500 * time.Millisecond // ErrorRateThreshold is the maximum acceptable rate of error responses (such as rate-limit errors // and 5xx server errors) compared to the total number of requests. Exceeding this threshold suggests // the system is encountering issues that may be alleviated by scaling down concurrency. ErrorRateThreshold = 0.1 // 10% error rate // RateLimitCriticalThreshold defines the number of available rate limit slots considered critical. // Falling at or below this threshold suggests the system is close to hitting the rate limit enforced // by the external service, and it should scale down to prevent rate-limiting errors. RateLimitCriticalThreshold = 5 // ErrorResponseThreshold is the threshold for the error rate that, once exceeded, indicates the system // should consider scaling down. It is a ratio of the number of error responses to the total number of // requests, reflecting the health of the interaction with the external system. ErrorResponseThreshold = 0.2 // 20% error rate // ResponseTimeCriticalThreshold is the duration beyond which the response time is considered critically // high. If response times exceed this threshold, it could signal that the system or the external service // is under heavy load and may benefit from scaling down concurrency to alleviate pressure. ResponseTimeCriticalThreshold = 2 * time.Second // AcceptableAverageResponseTime = 100 * time.Millisecond )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConcurrencyHandler ¶
type ConcurrencyHandler struct { AcquisitionTimes []time.Duration Metrics *ConcurrencyMetrics // contains filtered or unexported fields }
ConcurrencyHandler controls the number of concurrent HTTP requests.
func NewConcurrencyHandler ¶
func NewConcurrencyHandler(limit int, logger logger.Logger, metrics *ConcurrencyMetrics) *ConcurrencyHandler
NewConcurrencyHandler initializes a new ConcurrencyHandler with the given concurrency limit, logger, and concurrency metrics. The ConcurrencyHandler ensures no more than a certain number of concurrent requests are made. It uses a semaphore to control concurrency.
func (*ConcurrencyHandler) AcquireConcurrencyPermit ¶ added in v0.1.33
func (ch *ConcurrencyHandler) AcquireConcurrencyPermit(ctx context.Context) (context.Context, uuid.UUID, error)
AcquireConcurrencyPermit acquires a concurrency permit to manage the number of simultaneous operations within predefined limits. This method ensures system stability and compliance with concurrency policies by regulating the execution of concurrent operations.
Parameters:
- ctx: A parent context which is used as the basis for permit acquisition. This allows for proper handling of timeouts and cancellation in line with best practices.
Returns:
- context.Context: A new context derived from the original, including a unique request ID. This context is used to trace and manage operations under the acquired concurrency permit.
- uuid.UUID: The unique request ID generated during the permit acquisition process.
- error: An error object that indicates failure to acquire a permit within the allotted timeout, or other system-related issues.
Usage: This function should be used before initiating any operation that requires concurrency control. The returned context should be passed to subsequent operations to maintain consistency in concurrency tracking.
func (*ConcurrencyHandler) EvaluateAndAdjustConcurrency ¶ added in v0.1.32
func (ch *ConcurrencyHandler) EvaluateAndAdjustConcurrency(resp *http.Response, responseTime time.Duration)
EvaluateAndAdjustConcurrency assesses the current state of system metrics and decides whether to scale up or down the number of concurrent operations allowed. It employs a combination of strategies: a weighted scoring system, threshold-based direct actions, and cumulative impact assessment.
A weighted scoring system is used to prioritize the importance of different system metrics. Each metric can influence the scaling decision based on its assigned weight, reflecting its relative impact on system performance.
Threshold-based scaling provides a fast-track decision path for critical metrics that have exceeded predefined limits. If a critical metric, such as the rate limit remaining slots or server error rates, crosses a specified threshold, immediate action is taken to scale down the concurrency to prevent system overload.
Cumulative impact assessment calculates a cumulative score from all monitored metrics, taking into account their respective weights. This score determines the overall tendency of the system to either scale up or down. If the score indicates a negative trend (i.e., below zero), the system will scale down to reduce load. Conversely, a positive score suggests that there is capacity to handle more concurrent operations, leading to a scale-up decision.
Parameters:
- resp: The HTTP response received from the server, providing status codes and headers for rate limiting.
- responseTime: The time duration between sending the request and receiving the response, indicating the server's responsiveness.
The function logs the decision process at each step, providing traceability and insight into the scaling mechanism. The method should be called after each significant interaction with the external system (e.g., an HTTP request) to ensure concurrency levels are adapted to current conditions.
Returns: None. The function directly calls the ScaleUp or ScaleDown methods as needed.
Note: This function does not return any value; it performs actions based on internal assessments and logs outcomes.
func (*ConcurrencyHandler) MonitorRateLimitHeaders ¶ added in v0.1.31
func (ch *ConcurrencyHandler) MonitorRateLimitHeaders(resp *http.Response) int
MonitorRateLimitHeaders monitors the rate limit headers in the response and suggests a concurrency adjustment.
func (*ConcurrencyHandler) MonitorResponseTimeVariability ¶ added in v0.1.31
func (ch *ConcurrencyHandler) MonitorResponseTimeVariability(responseTime time.Duration) int
MonitorResponseTimeVariability assesses the response time variability from a series of HTTP requests and decides whether to adjust the concurrency level of outgoing requests. This function is integral to maintaining optimal system performance under varying load conditions.
The function first appends the latest response time to a sliding window of the last 10 response times to maintain a recent history. It then calculates the standard deviation and the average of these times. The standard deviation helps determine the variability or consistency of response times, while the average gives a central tendency.
Based on these calculated metrics, the function employs a multi-factor decision mechanism: - If the standard deviation exceeds a pre-defined threshold and the average response time is greater than an acceptable maximum, a debounce counter is incremented. This counter must reach a predefined threshold (debounceScaleDownThreshold) before a decision to decrease concurrency is made, ensuring that only sustained negative trends lead to a scale down. - If the standard deviation is below or equal to the threshold, suggesting stable response times, and the system is currently operating below its concurrency capacity, it may suggest an increase in concurrency to improve throughput.
This approach aims to prevent transient spikes in response times from causing undue scaling actions, thus stabilizing the overall performance and responsiveness of the system.
Returns: - (-1) to suggest a decrease in concurrency, - (1) to suggest an increase in concurrency, - (0) to indicate no change needed.
func (*ConcurrencyHandler) MonitorServerResponseCodes ¶ added in v0.1.31
func (ch *ConcurrencyHandler) MonitorServerResponseCodes(resp *http.Response) int
MonitorServerResponseCodes monitors the response status codes and suggests a concurrency adjustment.
func (*ConcurrencyHandler) ReleaseConcurrencyPermit ¶ added in v0.1.35
func (ch *ConcurrencyHandler) ReleaseConcurrencyPermit(requestID uuid.UUID)
ReleaseConcurrencyPermit releases a concurrency permit back to the semaphore, making it available for other operations. This function is essential for maintaining the health and efficiency of the application's concurrency control system by ensuring that resources are properly recycled and available for use by subsequent operations.
Parameters:
- requestID: The unique identifier for the request associated with the permit being released. This ID is used for structured logging to aid in tracking and debugging permit lifecycle events.
Usage: This method should be called as soon as a request or operation that required a concurrency permit is completed. It ensures that concurrency limits are adhered to and helps prevent issues such as permit leakage or semaphore saturation, which could lead to degraded performance or deadlock conditions.
Example: defer concurrencyHandler.ReleaseConcurrencyPermit(requestID) This usage ensures that the permit is released in a deferred manner at the end of the operation, regardless of how the operation exits (normal completion or error path).
func (*ConcurrencyHandler) ResizeSemaphore ¶ added in v0.1.31
func (ch *ConcurrencyHandler) ResizeSemaphore(newSize int)
ResizeSemaphore adjusts the size of the semaphore used to control concurrency. This method creates a new semaphore with the specified new size and closes the old semaphore to ensure that no further tokens can be acquired from it. This approach helps manage the transition from the old concurrency level to the new one without affecting ongoing operations significantly.
Parameters:
- newSize: The new size for the semaphore, representing the updated limit on concurrent requests.
This function should be called from within synchronization contexts, such as AdjustConcurrency, to avoid race conditions and ensure that changes to the semaphore are consistent with the observed metrics.
func (*ConcurrencyHandler) ScaleDown ¶ added in v0.1.32
func (ch *ConcurrencyHandler) ScaleDown()
ScaleDown reduces the concurrency level by one, down to the minimum limit.
func (*ConcurrencyHandler) ScaleUp ¶ added in v0.1.32
func (ch *ConcurrencyHandler) ScaleUp()
ScaleUp increases the concurrency level by one, up to the maximum limit.
type ConcurrencyMetrics ¶
type ConcurrencyMetrics struct { TotalRequests int64 // Total number of requests made TotalRetries int64 // Total number of retry attempts TotalRateLimitErrors int64 // Total number of rate limit errors encountered PermitWaitTime time.Duration // Total time spent waiting for tokens TTFB struct { Total time.Duration // Total Time to First Byte (TTFB) for all requests Count int64 // Count of requests used for calculating TTFB Lock sync.Mutex // Lock for TTFB metrics } Throughput struct { Total float64 // Total network throughput for all requests Count int64 // Count of requests used for calculating throughput Lock sync.Mutex // Lock for throughput metrics/ } ResponseTimeVariability struct { Total time.Duration // Total response time for all requests Average time.Duration // Average response time across all requests Variance float64 // Variance of response times Count int64 // Count of responses used for calculating response time variability Lock sync.Mutex // Lock for response time variability metrics StdDevThreshold float64 // Maximum acceptable standard deviation for adjusting concurrency DebounceScaleDownCount int // Counter to manage scale down actions after consecutive triggers DebounceScaleUpCount int // Counter to manage scale up actions after consecutive triggers } ResponseCodeMetrics struct { ErrorRate float64 // Error rate calculated as (TotalRateLimitErrors + 5xxErrors) / TotalRequests Lock sync.Mutex // Lock for response code metrics } Lock sync.Mutex // Lock for overall metrics fields }
ConcurrencyMetrics captures various metrics related to managing concurrency for the client's interactions with the API.
type RequestIDKey ¶
type RequestIDKey struct{}
RequestIDKey is type used as a key for storing and retrieving request-specific identifiers from a context.Context object. This private type ensures that the key is distinct and prevents accidental value retrieval or conflicts with other context keys. The value associated with this key in a context is typically a UUID that uniquely identifies a request being processed by the ConcurrencyManager, allowing for fine-grained control and tracking of concurrent HTTP requests.