concurrencyhandler

package
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 3, 2024 License: MIT Imports: 6 Imported by: 0

Documentation

Overview

concurrencyhandler/handler.go

concurrencyhandler/metrics.go

concurrencyhandler/semaphore.go

package provides utilities to manage concurrency control. The Concurrency Manager

ensures no more than a certain number of concurrent requests (e.g., 5 for Jamf Pro) are sent at the same time. This is managed using a semaphore

Index

Constants

View Source
const (
	MaxConcurrency     = 10              // Maximum allowed concurrent requests
	MinConcurrency     = 1               // Minimum allowed concurrent requests
	EvaluationInterval = 1 * time.Minute // Time interval for evaluating metrics and adjusting concurrency
)

Constants and Data Structures:

Variables

This section is empty.

Functions

func Min

func Min(a, b int) int

Min returns the smaller of the two integers.

Types

type ConcurrencyHandler

type ConcurrencyHandler struct {
	AcquisitionTimes []time.Duration

	PerfMetrics *PerformanceMetrics
	// contains filtered or unexported fields
}

ConcurrencyHandler controls the number of concurrent HTTP requests.

func NewConcurrencyHandler

func NewConcurrencyHandler(limit int, logger logger.Logger, perfMetrics *PerformanceMetrics) *ConcurrencyHandler

NewConcurrencyManager initializes a new ConcurrencyManager with the given concurrency limit, logger, and perf metrics. The ConcurrencyManager ensures no more than a certain number of concurrent requests are made. It uses a semaphore to control concurrency.

func (*ConcurrencyHandler) Acquire

func (ch *ConcurrencyHandler) Acquire(ctx context.Context) (uuid.UUID, error)

Acquire attempts to get a token to allow an HTTP request to proceed. It blocks until a token is available or the context expires. Returns a unique request ID upon successful acquisition.

func (*ConcurrencyHandler) AcquireConcurrencyToken

func (ch *ConcurrencyHandler) AcquireConcurrencyToken(ctx context.Context) (context.Context, error)

AcquireConcurrencyToken attempts to acquire a token from the ConcurrencyManager to manage the number of concurrent requests. This function is designed to ensure that the HTTP client adheres to predefined concurrency limits, preventing an excessive number of simultaneous requests. It creates a new context with a timeout to avoid indefinite blocking in case the concurrency limit is reached. Upon successfully acquiring a token, it records the time taken to acquire the token and updates performance metrics accordingly. The function then adds the acquired request ID to the context, which can be used for tracking and managing individual requests.

Parameters: - ctx: The parent context from which the new context with timeout will be derived. This allows for proper request cancellation and timeout handling.

Returns: - A new context containing the acquired request ID, which should be passed to subsequent operations requiring concurrency control.

- An error if the token could not be acquired within the timeout period or due to any other issues encountered by the ConcurrencyManager.

Usage: This function should be called before making an HTTP request that needs to be controlled for concurrency. The returned context should be used for the HTTP request to ensure it is associated with the acquired concurrency token.

func (*ConcurrencyHandler) AdjustConcurrencyBasedOnMetrics

func (ch *ConcurrencyHandler) AdjustConcurrencyBasedOnMetrics()

AdjustConcurrencyBasedOnMetrics evaluates the current metrics and adjusts the concurrency limit if required. It checks metrics like average token acquisition time and decides on a new concurrency limit. The method ensures that the new limit respects the minimum and maximum allowed concurrency bounds.

func (*ConcurrencyHandler) AdjustConcurrencyLimit

func (ch *ConcurrencyHandler) AdjustConcurrencyLimit(newLimit int)

AdjustConcurrencyLimit dynamically modifies the maximum concurrency limit based on the newLimit provided. This function helps in adjusting the concurrency limit in real-time based on observed system performance and other metrics. It transfers the tokens from the old semaphore to the new one, ensuring that there's no loss of tokens during the transition.

func (*ConcurrencyHandler) AverageAcquisitionTime

func (ch *ConcurrencyHandler) AverageAcquisitionTime() time.Duration

AverageAcquisitionTime computes the average time taken to acquire a token from the semaphore. It helps in understanding the contention for tokens and can be used to adjust concurrency limits.

func (*ConcurrencyHandler) EvaluateMetricsAndAdjustConcurrency

func (ch *ConcurrencyHandler) EvaluateMetricsAndAdjustConcurrency()

EvaluateMetricsAndAdjustConcurrency evaluates the performance metrics and makes necessary adjustments to the concurrency limit. The method assesses the average response time and adjusts the concurrency based on how it compares to the historical average acquisition time. If the average response time has significantly increased compared to the historical average, the concurrency limit is decreased, and vice versa. The method ensures that the concurrency limit remains within the bounds defined by the system's best practices.

func (*ConcurrencyHandler) GetAverageAcquisitionTime

func (ch *ConcurrencyHandler) GetAverageAcquisitionTime() time.Duration

Returns the average Acquisition Time to get a token from the semaphore

func (*ConcurrencyHandler) GetHistoricalAverageAcquisitionTime

func (ch *ConcurrencyHandler) GetHistoricalAverageAcquisitionTime() time.Duration

func (*ConcurrencyHandler) GetPerformanceMetrics

func (ch *ConcurrencyHandler) GetPerformanceMetrics() *PerformanceMetrics

GetPerformanceMetrics returns the current performance metrics of the ConcurrencyHandler. This includes counts of total requests, retries, rate limit errors, total response time, and token wait time.

func (*ConcurrencyHandler) HistoricalAverageAcquisitionTime

func (ch *ConcurrencyHandler) HistoricalAverageAcquisitionTime() time.Duration

HistoricalAverageAcquisitionTime computes the average time taken to acquire a token from the semaphore over a historical period (e.g., the last 5 minutes). It helps in understanding the historical contention for tokens and can be used to adjust concurrency limits.

func (*ConcurrencyHandler) Release

func (ch *ConcurrencyHandler) Release(requestID uuid.UUID)

Release returns a token back to the pool, allowing other requests to proceed. It uses the provided requestID for logging and debugging purposes.

func (*ConcurrencyHandler) StartConcurrencyAdjustment

func (ch *ConcurrencyHandler) StartConcurrencyAdjustment()

StartConcurrencyAdjustment launches a periodic checker that evaluates current metrics and adjusts concurrency limits if needed. It uses a ticker to periodically trigger the adjustment logich.

func (*ConcurrencyHandler) StartMetricEvaluation

func (ch *ConcurrencyHandler) StartMetricEvaluation()

StartMetricEvaluation continuously monitors the client's interactions with the API and adjusts the concurrency limits dynamically. The function evaluates metrics at regular intervals to detect burst activity patterns. If a burst activity is detected (e.g., many requests in a short period), the evaluation interval is reduced for more frequent checks. Otherwise, it reverts to a default interval for regular checks. After each evaluation, the function calls EvaluateMetricsAndAdjustConcurrency to potentially adjust the concurrency based on observed metrics.

The evaluation process works as follows: 1. Sleep for the defined evaluation interval. 2. Check if there's a burst in activity using the isBurstActivity method. 3. If a burst is detected, the evaluation interval is shortened to more frequently monitor and adjust the concurrency. 4. If no burst is detected, it maintains the default evaluation interval. 5. It then evaluates the metrics and adjusts the concurrency accordingly.

func (*ConcurrencyHandler) UpdatePerformanceMetrics

func (ch *ConcurrencyHandler) UpdatePerformanceMetrics(duration time.Duration)

updatePerformanceMetrics updates the ConcurrencyHandler's performance metrics by recording the duration of an HTTP request and incrementing the total request count. This function is thread-safe and uses a mutex to synchronize updates to the performance metrics.

Parameters: - duration: The time duration it took for an HTTP request to complete.

This function should be called after each HTTP request to keep track of the ConcurrencyHandler's performance over time.

type PerformanceMetrics

type PerformanceMetrics struct {
	TotalRequests        int64
	TotalRetries         int64
	TotalRateLimitErrors int64
	TotalResponseTime    time.Duration
	TokenWaitTime        time.Duration
	// contains filtered or unexported fields
}

PerformanceMetrics captures various metrics related to the client's interactions with the API.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL