Documentation ¶
Overview ¶
concurrency/handler.go
concurrency/metrics.go
concurrency/semaphore.go
package provides utilities to manage concurrency control. The Concurrency Manager
ensures no more than a certain number of concurrent requests (e.g., 5 for Jamf Pro) are sent at the same time. This is managed using a semaphore
Index ¶
- Constants
- func Min(a, b int) int
- type ConcurrencyHandler
- func (ch *ConcurrencyHandler) AcquireConcurrencyToken(ctx context.Context) (context.Context, uuid.UUID, error)
- func (ch *ConcurrencyHandler) AdjustConcurrencyBasedOnMetrics()
- func (ch *ConcurrencyHandler) AdjustConcurrencyLimit(newLimit int)
- func (ch *ConcurrencyHandler) AverageAcquisitionTime() time.Duration
- func (ch *ConcurrencyHandler) EvaluateMetricsAndAdjustConcurrency()
- func (ch *ConcurrencyHandler) GetAverageAcquisitionTime() time.Duration
- func (ch *ConcurrencyHandler) GetHistoricalAverageAcquisitionTime() time.Duration
- func (ch *ConcurrencyHandler) GetPerformanceMetrics() *ConcurrencyMetrics
- func (ch *ConcurrencyHandler) HistoricalAverageAcquisitionTime() time.Duration
- func (ch *ConcurrencyHandler) ReleaseConcurrencyToken(requestID uuid.UUID)
- func (ch *ConcurrencyHandler) StartConcurrencyAdjustment()
- func (ch *ConcurrencyHandler) StartMetricEvaluation()
- func (ch *ConcurrencyHandler) UpdatePerformanceMetrics(duration time.Duration)
- type ConcurrencyMetrics
- type RequestIDKey
Constants ¶
const ( MaxConcurrency = 10 // Maximum allowed concurrent requests MinConcurrency = 1 // Minimum allowed concurrent requests EvaluationInterval = 1 * time.Minute // Time interval for evaluating metrics and adjusting concurrency )
Constants and Data Structures:
Variables ¶
This section is empty.
Functions ¶
Types ¶
type ConcurrencyHandler ¶
type ConcurrencyHandler struct { AcquisitionTimes []time.Duration Metrics *ConcurrencyMetrics // contains filtered or unexported fields }
ConcurrencyHandler controls the number of concurrent HTTP requests.
func NewConcurrencyHandler ¶
func NewConcurrencyHandler(limit int, logger logger.Logger, metrics *ConcurrencyMetrics) *ConcurrencyHandler
NewConcurrencyHandler initializes a new ConcurrencyHandler with the given concurrency limit, logger, and concurrency metrics. The ConcurrencyHandler ensures no more than a certain number of concurrent requests are made. It uses a semaphore to control concurrency.
func (*ConcurrencyHandler) AcquireConcurrencyToken ¶
func (ch *ConcurrencyHandler) AcquireConcurrencyToken(ctx context.Context) (context.Context, uuid.UUID, error)
AcquireConcurrencyToken acquires a concurrency token to regulate the number of concurrent operations within predefined limits, ensuring system stability and adherence to concurrency policies. This function initiates a token acquisition process that involves generating a unique request ID for tracking purposes and attempting to acquire a token within a specified timeout to prevent indefinite blocking. Successful acquisition updates performance metrics and associates the unique request ID with the provided context for enhanced traceability and management of concurrent requests.
Parameters:
- ctx: A parent context used as the basis for the token acquisition attempt, facilitating appropriate cancellation and timeout handling in line with best practices for concurrency control.
Returns:
- context.Context: A derived context that includes the unique request ID, offering a mechanism for associating subsequent operations with the acquired concurrency token and facilitating effective request tracking and management.
- uuid.UUID: The unique request ID generated as part of the token acquisition process, serving as an identifier for the acquired token and enabling detailed tracking and auditing of concurrent operations.
- error: An error that signals failure to acquire a concurrency token within the allotted timeout, or due to other encountered issues, ensuring that potential problems in concurrency control are surfaced and can be addressed.
Usage: This function is a critical component of concurrency control and should be invoked prior to executing operations that require regulation of concurrency. The returned context, enhanced with the unique request ID, should be utilized in the execution of these operations to maintain consistency in tracking and managing concurrency tokens. The unique request ID also facilitates detailed auditing and troubleshooting of the concurrency control mechanism.
Example: ctx, requestID, err := concurrencyHandler.AcquireConcurrencyToken(context.Background())
if err != nil { // Handle token acquisition failure }
defer concurrencyHandler.Release(requestID) // Proceed with the operation using the modified context
func (*ConcurrencyHandler) AdjustConcurrencyBasedOnMetrics ¶
func (ch *ConcurrencyHandler) AdjustConcurrencyBasedOnMetrics()
AdjustConcurrencyBasedOnMetrics evaluates the current metrics and adjusts the concurrency limit if required. It checks metrics like average token acquisition time and decides on a new concurrency limit. The method ensures that the new limit respects the minimum and maximum allowed concurrency bounds.
func (*ConcurrencyHandler) AdjustConcurrencyLimit ¶
func (ch *ConcurrencyHandler) AdjustConcurrencyLimit(newLimit int)
AdjustConcurrencyLimit dynamically modifies the maximum concurrency limit based on the newLimit provided. This function helps in adjusting the concurrency limit in real-time based on observed system performance and other metrics. It transfers the tokens from the old semaphore to the new one, ensuring that there's no loss of tokens during the transition.
func (*ConcurrencyHandler) AverageAcquisitionTime ¶
func (ch *ConcurrencyHandler) AverageAcquisitionTime() time.Duration
AverageAcquisitionTime computes the average time taken to acquire a token from the semaphore. It helps in understanding the contention for tokens and can be used to adjust concurrency limits.
func (*ConcurrencyHandler) EvaluateMetricsAndAdjustConcurrency ¶
func (ch *ConcurrencyHandler) EvaluateMetricsAndAdjustConcurrency()
EvaluateMetricsAndAdjustConcurrency evaluates the performance metrics and makes necessary adjustments to the concurrency limit. The method assesses the average response time and adjusts the concurrency based on how it compares to the historical average acquisition time. If the average response time has significantly increased compared to the historical average, the concurrency limit is decreased, and vice versa. The method ensures that the concurrency limit remains within the bounds defined by the system's best practices.
func (*ConcurrencyHandler) GetAverageAcquisitionTime ¶
func (ch *ConcurrencyHandler) GetAverageAcquisitionTime() time.Duration
Returns the average Acquisition Time to get a token from the semaphore
func (*ConcurrencyHandler) GetHistoricalAverageAcquisitionTime ¶
func (ch *ConcurrencyHandler) GetHistoricalAverageAcquisitionTime() time.Duration
func (*ConcurrencyHandler) GetPerformanceMetrics ¶
func (ch *ConcurrencyHandler) GetPerformanceMetrics() *ConcurrencyMetrics
GetPerformanceMetrics returns the current performance metrics of the ConcurrencyHandler. This includes counts of total requests, retries, rate limit errors, total response time, and token wait time.
func (*ConcurrencyHandler) HistoricalAverageAcquisitionTime ¶
func (ch *ConcurrencyHandler) HistoricalAverageAcquisitionTime() time.Duration
HistoricalAverageAcquisitionTime computes the average time taken to acquire a token from the semaphore over a historical period (e.g., the last 5 minutes). It helps in understanding the historical contention for tokens and can be used to adjust concurrency limits.
func (*ConcurrencyHandler) ReleaseConcurrencyToken ¶
func (ch *ConcurrencyHandler) ReleaseConcurrencyToken(requestID uuid.UUID)
ReleaseConcurrencyToken returns a token back to the semaphore pool, allowing other operations to proceed. It uses the provided requestID for structured logging, aiding in tracking and debugging the release of concurrency tokens.
func (*ConcurrencyHandler) StartConcurrencyAdjustment ¶
func (ch *ConcurrencyHandler) StartConcurrencyAdjustment()
StartConcurrencyAdjustment launches a periodic checker that evaluates current metrics and adjusts concurrency limits if needed. It uses a ticker to periodically trigger the adjustment logich.
func (*ConcurrencyHandler) StartMetricEvaluation ¶
func (ch *ConcurrencyHandler) StartMetricEvaluation()
StartMetricEvaluation continuously monitors the client's interactions with the API and adjusts the concurrency limits dynamically. The function evaluates metrics at regular intervals to detect burst activity patterns. If a burst activity is detected (e.g., many requests in a short period), the evaluation interval is reduced for more frequent checks. Otherwise, it reverts to a default interval for regular checks. After each evaluation, the function calls EvaluateMetricsAndAdjustConcurrency to potentially adjust the concurrency based on observed metrics.
The evaluation process works as follows: 1. Sleep for the defined evaluation interval. 2. Check if there's a burst in activity using the isBurstActivity method. 3. If a burst is detected, the evaluation interval is shortened to more frequently monitor and adjust the concurrency. 4. If no burst is detected, it maintains the default evaluation interval. 5. It then evaluates the metrics and adjusts the concurrency accordingly.
func (*ConcurrencyHandler) UpdatePerformanceMetrics ¶
func (ch *ConcurrencyHandler) UpdatePerformanceMetrics(duration time.Duration)
updatePerformanceMetrics updates the ConcurrencyHandler's performance metrics by recording the duration of an HTTP request and incrementing the total request count. This function is thread-safe and uses a mutex to synchronize updates to the performance metrics.
Parameters: - duration: The time duration it took for an HTTP request to complete.
This function should be called after each HTTP request to keep track of the ConcurrencyHandler's performance over time.
type ConcurrencyMetrics ¶
type ConcurrencyMetrics struct { TotalRequests int64 TotalRetries int64 TotalRateLimitErrors int64 TotalResponseTime time.Duration TokenWaitTime time.Duration Lock sync.Mutex // Protects performance metrics fields }
ConcurrencyMetrics captures various metrics related to managing concurrency for the client's interactions with the API.
type RequestIDKey ¶
type RequestIDKey struct{}
RequestIDKey is type used as a key for storing and retrieving request-specific identifiers from a context.Context object. This private type ensures that the key is distinct and prevents accidental value retrieval or conflicts with other context keys. The value associated with this key in a context is typically a UUID that uniquely identifies a request being processed by the ConcurrencyManager, allowing for fine-grained control and tracking of concurrent HTTP requests.