concurrency

package
v0.1.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 4, 2024 License: MIT Imports: 6 Imported by: 0

Documentation

Overview

concurrency/handler.go

concurrency/metrics.go

concurrency/semaphore.go

package provides utilities to manage concurrency control. The Concurrency Manager

ensures no more than a certain number of concurrent requests (e.g., 5 for Jamf Pro) are sent at the same time. This is managed using a semaphore

Index

Constants

View Source
const (
	MaxConcurrency     = 10              // Maximum allowed concurrent requests
	MinConcurrency     = 1               // Minimum allowed concurrent requests
	EvaluationInterval = 1 * time.Minute // Time interval for evaluating metrics and adjusting concurrency
)

Constants and Data Structures:

Variables

This section is empty.

Functions

func Min

func Min(a, b int) int

Min returns the smaller of the two integers.

Types

type ConcurrencyHandler

type ConcurrencyHandler struct {
	AcquisitionTimes []time.Duration

	Metrics *ConcurrencyMetrics
	// contains filtered or unexported fields
}

ConcurrencyHandler controls the number of concurrent HTTP requests.

func NewConcurrencyHandler

func NewConcurrencyHandler(limit int, logger logger.Logger, metrics *ConcurrencyMetrics) *ConcurrencyHandler

NewConcurrencyHandler initializes a new ConcurrencyHandler with the given concurrency limit, logger, and concurrency metrics. The ConcurrencyHandler ensures no more than a certain number of concurrent requests are made. It uses a semaphore to control concurrency.

func (*ConcurrencyHandler) AcquireConcurrencyToken

func (ch *ConcurrencyHandler) AcquireConcurrencyToken(ctx context.Context) (context.Context, uuid.UUID, error)

AcquireConcurrencyToken acquires a concurrency token to regulate the number of concurrent operations within predefined limits, ensuring system stability and adherence to concurrency policies. This function initiates a token acquisition process that involves generating a unique request ID for tracking purposes and attempting to acquire a token within a specified timeout to prevent indefinite blocking. Successful acquisition updates performance metrics and associates the unique request ID with the provided context for enhanced traceability and management of concurrent requests.

Parameters:

  • ctx: A parent context used as the basis for the token acquisition attempt, facilitating appropriate cancellation and timeout handling in line with best practices for concurrency control.

Returns:

  • context.Context: A derived context that includes the unique request ID, offering a mechanism for associating subsequent operations with the acquired concurrency token and facilitating effective request tracking and management.
  • uuid.UUID: The unique request ID generated as part of the token acquisition process, serving as an identifier for the acquired token and enabling detailed tracking and auditing of concurrent operations.
  • error: An error that signals failure to acquire a concurrency token within the allotted timeout, or due to other encountered issues, ensuring that potential problems in concurrency control are surfaced and can be addressed.

Usage: This function is a critical component of concurrency control and should be invoked prior to executing operations that require regulation of concurrency. The returned context, enhanced with the unique request ID, should be utilized in the execution of these operations to maintain consistency in tracking and managing concurrency tokens. The unique request ID also facilitates detailed auditing and troubleshooting of the concurrency control mechanism.

Example: ctx, requestID, err := concurrencyHandler.AcquireConcurrencyToken(context.Background())

if err != nil {
    // Handle token acquisition failure
}

defer concurrencyHandler.Release(requestID) // Proceed with the operation using the modified context

func (*ConcurrencyHandler) AdjustConcurrencyBasedOnMetrics

func (ch *ConcurrencyHandler) AdjustConcurrencyBasedOnMetrics()

AdjustConcurrencyBasedOnMetrics evaluates the current metrics and adjusts the concurrency limit if required. It checks metrics like average token acquisition time and decides on a new concurrency limit. The method ensures that the new limit respects the minimum and maximum allowed concurrency bounds.

func (*ConcurrencyHandler) AdjustConcurrencyLimit

func (ch *ConcurrencyHandler) AdjustConcurrencyLimit(newLimit int)

AdjustConcurrencyLimit dynamically modifies the maximum concurrency limit based on the newLimit provided. This function helps in adjusting the concurrency limit in real-time based on observed system performance and other metrics. It transfers the tokens from the old semaphore to the new one, ensuring that there's no loss of tokens during the transition.

func (*ConcurrencyHandler) AverageAcquisitionTime

func (ch *ConcurrencyHandler) AverageAcquisitionTime() time.Duration

AverageAcquisitionTime computes the average time taken to acquire a token from the semaphore. It helps in understanding the contention for tokens and can be used to adjust concurrency limits.

func (*ConcurrencyHandler) EvaluateMetricsAndAdjustConcurrency

func (ch *ConcurrencyHandler) EvaluateMetricsAndAdjustConcurrency()

EvaluateMetricsAndAdjustConcurrency evaluates the performance metrics and makes necessary adjustments to the concurrency limit. The method assesses the average response time and adjusts the concurrency based on how it compares to the historical average acquisition time. If the average response time has significantly increased compared to the historical average, the concurrency limit is decreased, and vice versa. The method ensures that the concurrency limit remains within the bounds defined by the system's best practices.

func (*ConcurrencyHandler) GetAverageAcquisitionTime

func (ch *ConcurrencyHandler) GetAverageAcquisitionTime() time.Duration

Returns the average Acquisition Time to get a token from the semaphore

func (*ConcurrencyHandler) GetHistoricalAverageAcquisitionTime

func (ch *ConcurrencyHandler) GetHistoricalAverageAcquisitionTime() time.Duration

func (*ConcurrencyHandler) GetPerformanceMetrics

func (ch *ConcurrencyHandler) GetPerformanceMetrics() *ConcurrencyMetrics

GetPerformanceMetrics returns the current performance metrics of the ConcurrencyHandler. This includes counts of total requests, retries, rate limit errors, total response time, and token wait time.

func (*ConcurrencyHandler) HistoricalAverageAcquisitionTime

func (ch *ConcurrencyHandler) HistoricalAverageAcquisitionTime() time.Duration

HistoricalAverageAcquisitionTime computes the average time taken to acquire a token from the semaphore over a historical period (e.g., the last 5 minutes). It helps in understanding the historical contention for tokens and can be used to adjust concurrency limits.

func (*ConcurrencyHandler) ReleaseConcurrencyToken

func (ch *ConcurrencyHandler) ReleaseConcurrencyToken(requestID uuid.UUID)

ReleaseConcurrencyToken returns a token back to the semaphore pool, allowing other operations to proceed. It uses the provided requestID for structured logging, aiding in tracking and debugging the release of concurrency tokens.

func (*ConcurrencyHandler) StartConcurrencyAdjustment

func (ch *ConcurrencyHandler) StartConcurrencyAdjustment()

StartConcurrencyAdjustment launches a periodic checker that evaluates current metrics and adjusts concurrency limits if needed. It uses a ticker to periodically trigger the adjustment logich.

func (*ConcurrencyHandler) StartMetricEvaluation

func (ch *ConcurrencyHandler) StartMetricEvaluation()

StartMetricEvaluation continuously monitors the client's interactions with the API and adjusts the concurrency limits dynamically. The function evaluates metrics at regular intervals to detect burst activity patterns. If a burst activity is detected (e.g., many requests in a short period), the evaluation interval is reduced for more frequent checks. Otherwise, it reverts to a default interval for regular checks. After each evaluation, the function calls EvaluateMetricsAndAdjustConcurrency to potentially adjust the concurrency based on observed metrics.

The evaluation process works as follows: 1. Sleep for the defined evaluation interval. 2. Check if there's a burst in activity using the isBurstActivity method. 3. If a burst is detected, the evaluation interval is shortened to more frequently monitor and adjust the concurrency. 4. If no burst is detected, it maintains the default evaluation interval. 5. It then evaluates the metrics and adjusts the concurrency accordingly.

func (*ConcurrencyHandler) UpdatePerformanceMetrics

func (ch *ConcurrencyHandler) UpdatePerformanceMetrics(duration time.Duration)

updatePerformanceMetrics updates the ConcurrencyHandler's performance metrics by recording the duration of an HTTP request and incrementing the total request count. This function is thread-safe and uses a mutex to synchronize updates to the performance metrics.

Parameters: - duration: The time duration it took for an HTTP request to complete.

This function should be called after each HTTP request to keep track of the ConcurrencyHandler's performance over time.

type ConcurrencyMetrics

type ConcurrencyMetrics struct {
	TotalRequests        int64
	TotalRetries         int64
	TotalRateLimitErrors int64
	TotalResponseTime    time.Duration
	TokenWaitTime        time.Duration
	Lock                 sync.Mutex // Protects performance metrics fields
}

ConcurrencyMetrics captures various metrics related to managing concurrency for the client's interactions with the API.

type RequestIDKey

type RequestIDKey struct{}

RequestIDKey is type used as a key for storing and retrieving request-specific identifiers from a context.Context object. This private type ensures that the key is distinct and prevents accidental value retrieval or conflicts with other context keys. The value associated with this key in a context is typically a UUID that uniquely identifies a request being processed by the ConcurrencyManager, allowing for fine-grained control and tracking of concurrent HTTP requests.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL