Documentation ¶
Overview ¶
concurrency/const.go
concurrency/handler.go
concurrency/metrics.go
concurrency/resize.go
concurrency/scale.go
concurrency/semaphore.go
package provides utilities to manage concurrency control. The Concurrency Manager
ensures no more than a certain number of concurrent requests (e.g., 5 for Jamf Pro) are sent at the same time. This is managed using a semaphore
Index ¶
- Constants
- type ConcurrencyHandler
- func (ch *ConcurrencyHandler) AcquireConcurrencyToken(ctx context.Context) (context.Context, uuid.UUID, error)
- func (ch *ConcurrencyHandler) EvaluateAndAdjustConcurrency(resp *http.Response, responseTime time.Duration)
- func (ch *ConcurrencyHandler) MonitorRateLimitHeaders(resp *http.Response) int
- func (ch *ConcurrencyHandler) MonitorResponseTimeVariability(responseTime time.Duration) int
- func (ch *ConcurrencyHandler) MonitorServerResponseCodes(resp *http.Response) int
- func (ch *ConcurrencyHandler) ReleaseConcurrencyToken(requestID uuid.UUID)
- func (ch *ConcurrencyHandler) ResizeSemaphore(newSize int)
- func (ch *ConcurrencyHandler) ScaleDown()
- func (ch *ConcurrencyHandler) ScaleUp()
- type ConcurrencyMetrics
- type RequestIDKey
Constants ¶
const ( // MaxAcceptableTTFB represents the maximum acceptable Time to First Byte (TTFB) in milliseconds. // TTFB is the time taken for the server to start sending the first byte of data in response to a request. // Adjustments in concurrency will be made if the TTFB exceeds this threshold. MaxAcceptableTTFB = 300 * time.Millisecond // MaxAcceptableThroughput represents the maximum acceptable network throughput in bytes per second. // Throughput is the amount of data transferred over the network within a specific time interval. // Adjustments in concurrency will be made if the network throughput exceeds this threshold. MaxAcceptableThroughput = 5 * 1024 * 1024 // MaxAcceptableResponseTimeVariability represents the maximum acceptable variability in response times. // It is used as a threshold to dynamically adjust concurrency based on fluctuations in response times. MaxAcceptableResponseTimeVariability = 500 * time.Millisecond // ErrorRateThreshold represents the threshold for error rate above which concurrency will be adjusted. // Error rate is calculated as (TotalRateLimitErrors + 5xxErrors) / TotalRequests. // Adjustments in concurrency will be made if the error rate exceeds this threshold. A threshold of 0.1 (or 10%) is common. ErrorRateThreshold = 0.1 )
const ( MaxConcurrency = 10 // Maximum allowed concurrent requests MinConcurrency = 1 // Minimum allowed concurrent requests EvaluationInterval = 1 * time.Minute // Time interval for evaluating metrics and adjusting concurrency )
Constants and Data Structures:
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConcurrencyHandler ¶
type ConcurrencyHandler struct { AcquisitionTimes []time.Duration Metrics *ConcurrencyMetrics // contains filtered or unexported fields }
ConcurrencyHandler controls the number of concurrent HTTP requests.
func NewConcurrencyHandler ¶
func NewConcurrencyHandler(limit int, logger logger.Logger, metrics *ConcurrencyMetrics) *ConcurrencyHandler
NewConcurrencyHandler initializes a new ConcurrencyHandler with the given concurrency limit, logger, and concurrency metrics. The ConcurrencyHandler ensures no more than a certain number of concurrent requests are made. It uses a semaphore to control concurrency.
func (*ConcurrencyHandler) AcquireConcurrencyToken ¶
func (ch *ConcurrencyHandler) AcquireConcurrencyToken(ctx context.Context) (context.Context, uuid.UUID, error)
AcquireConcurrencyToken acquires a concurrency token to regulate the number of concurrent operations within predefined limits, ensuring system stability and adherence to concurrency policies. This function initiates a token acquisition process that involves generating a unique request ID for tracking purposes and attempting to acquire a token within a specified timeout to prevent indefinite blocking. Successful acquisition updates performance metrics and associates the unique request ID with the provided context for enhanced traceability and management of concurrent requests.
Parameters:
- ctx: A parent context used as the basis for the token acquisition attempt, facilitating appropriate cancellation and timeout handling in line with best practices for concurrency control.
Returns:
- context.Context: A derived context that includes the unique request ID, offering a mechanism for associating subsequent operations with the acquired concurrency token and facilitating effective request tracking and management.
- uuid.UUID: The unique request ID generated as part of the token acquisition process, serving as an identifier for the acquired token and enabling detailed tracking and auditing of concurrent operations.
- error: An error that signals failure to acquire a concurrency token within the allotted timeout, or due to other encountered issues, ensuring that potential problems in concurrency control are surfaced and can be addressed.
Usage: This function is a critical component of concurrency control and should be invoked prior to executing operations that require regulation of concurrency. The returned context, enhanced with the unique request ID, should be utilized in the execution of these operations to maintain consistency in tracking and managing concurrency tokens. The unique request ID also facilitates detailed auditing and troubleshooting of the concurrency control mechanism.
Example: ctx, requestID, err := concurrencyHandler.AcquireConcurrencyToken(context.Background())
if err != nil { // Handle token acquisition failure }
defer concurrencyHandler.Release(requestID) // Proceed with the operation using the modified context
func (*ConcurrencyHandler) EvaluateAndAdjustConcurrency ¶ added in v0.1.32
func (ch *ConcurrencyHandler) EvaluateAndAdjustConcurrency(resp *http.Response, responseTime time.Duration)
EvaluateAndAdjustConcurrency evaluates the HTTP response from a server along with the request's response time and adjusts the concurrency level of the system accordingly. It utilizes three monitoring functions: MonitorRateLimitHeaders, MonitorServerResponseCodes, and MonitorResponseTimeVariability, each of which provides feedback on different aspects of the response and system's current state. The function aggregates feedback from these monitoring functions to make a decision on whether to scale up or scale down the concurrency. The decision is based on a simple majority of suggestions: if more functions suggest scaling down (return -1), it scales down; if more suggest scaling up (return 1), it scales up. This method centralizes concurrency control decision-making, providing a systematic approach to managing request handling capacity based on real-time operational metrics.
Parameters:
resp - The HTTP response received from the server. responseTime - The time duration between sending the request and receiving the response.
It logs the specific reason for scaling decisions, helping in traceability and fine-tuning system performance.
func (*ConcurrencyHandler) MonitorRateLimitHeaders ¶ added in v0.1.31
func (ch *ConcurrencyHandler) MonitorRateLimitHeaders(resp *http.Response) int
MonitorRateLimitHeaders monitors the rate limit headers in the response and suggests a concurrency adjustment.
func (*ConcurrencyHandler) MonitorResponseTimeVariability ¶ added in v0.1.31
func (ch *ConcurrencyHandler) MonitorResponseTimeVariability(responseTime time.Duration) int
MonitorResponseTimeVariability monitors the response time variability and suggests a concurrency adjustment.
func (*ConcurrencyHandler) MonitorServerResponseCodes ¶ added in v0.1.31
func (ch *ConcurrencyHandler) MonitorServerResponseCodes(resp *http.Response) int
MonitorServerResponseCodes monitors the response status codes and suggests a concurrency adjustment.
func (*ConcurrencyHandler) ReleaseConcurrencyToken ¶
func (ch *ConcurrencyHandler) ReleaseConcurrencyToken(requestID uuid.UUID)
ReleaseConcurrencyToken returns a token back to the semaphore pool, allowing other operations to proceed. It uses the provided requestID for structured logging, aiding in tracking and debugging the release of concurrency tokens.
func (*ConcurrencyHandler) ResizeSemaphore ¶ added in v0.1.31
func (ch *ConcurrencyHandler) ResizeSemaphore(newSize int)
ResizeSemaphore adjusts the size of the semaphore used to control concurrency. This method creates a new semaphore with the specified new size and closes the old semaphore to ensure that no further tokens can be acquired from it. This approach helps manage the transition from the old concurrency level to the new one without affecting ongoing operations significantly.
Parameters:
- newSize: The new size for the semaphore, representing the updated limit on concurrent requests.
This function should be called from within synchronization contexts, such as AdjustConcurrency, to avoid race conditions and ensure that changes to the semaphore are consistent with the observed metrics.
func (*ConcurrencyHandler) ScaleDown ¶ added in v0.1.32
func (ch *ConcurrencyHandler) ScaleDown()
ScaleDown reduces the concurrency level by one, down to the minimum limit.
func (*ConcurrencyHandler) ScaleUp ¶ added in v0.1.32
func (ch *ConcurrencyHandler) ScaleUp()
ScaleUp increases the concurrency level by one, up to the maximum limit.
type ConcurrencyMetrics ¶
type ConcurrencyMetrics struct { TotalRequests int64 // Total number of requests made TotalRetries int64 // Total number of retry attempts TotalRateLimitErrors int64 // Total number of rate limit errors encountered TokenWaitTime time.Duration // Total time spent waiting for tokens TTFB struct { Total time.Duration // Total Time to First Byte (TTFB) for all requests Count int64 // Count of requests used for calculating TTFB Lock sync.Mutex // Lock for TTFB metrics } Throughput struct { Total float64 // Total network throughput for all requests Count int64 // Count of requests used for calculating throughput Lock sync.Mutex // Lock for throughput metrics/ } ResponseTimeVariability struct { Total time.Duration // Total response time for all requests Average time.Duration // Average response time across all requests Variance float64 // Variance of response times Count int64 // Count of responses used for calculating response time variability Lock sync.Mutex // Lock for response time variability metrics StdDevThreshold float64 // Maximum acceptable standard deviation for adjusting concurrency } ResponseCodeMetrics struct { ErrorRate float64 // Error rate calculated as (TotalRateLimitErrors + 5xxErrors) / TotalRequests Lock sync.Mutex // Lock for response code metrics } Lock sync.Mutex // Lock for overall metrics fields }
ConcurrencyMetrics captures various metrics related to managing concurrency for the client's interactions with the API.
type RequestIDKey ¶
type RequestIDKey struct{}
RequestIDKey is type used as a key for storing and retrieving request-specific identifiers from a context.Context object. This private type ensures that the key is distinct and prevents accidental value retrieval or conflicts with other context keys. The value associated with this key in a context is typically a UUID that uniquely identifies a request being processed by the ConcurrencyManager, allowing for fine-grained control and tracking of concurrent HTTP requests.