concurrency

package
v0.1.32 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 18, 2024 License: MPL-2.0 Imports: 9 Imported by: 0

Documentation

Overview

concurrency/const.go

concurrency/handler.go

concurrency/metrics.go

concurrency/resize.go

concurrency/scale.go

concurrency/semaphore.go

package provides utilities to manage concurrency control. The Concurrency Manager

ensures no more than a certain number of concurrent requests (e.g., 5 for Jamf Pro) are sent at the same time. This is managed using a semaphore

Index

Constants

View Source
const (
	// MaxAcceptableTTFB represents the maximum acceptable Time to First Byte (TTFB) in milliseconds.
	// TTFB is the time taken for the server to start sending the first byte of data in response to a request.
	// Adjustments in concurrency will be made if the TTFB exceeds this threshold.
	MaxAcceptableTTFB = 300 * time.Millisecond

	// MaxAcceptableThroughput represents the maximum acceptable network throughput in bytes per second.
	// Throughput is the amount of data transferred over the network within a specific time interval.
	// Adjustments in concurrency will be made if the network throughput exceeds this threshold.
	MaxAcceptableThroughput = 5 * 1024 * 1024

	// MaxAcceptableResponseTimeVariability represents the maximum acceptable variability in response times.
	// It is used as a threshold to dynamically adjust concurrency based on fluctuations in response times.
	MaxAcceptableResponseTimeVariability = 500 * time.Millisecond

	// ErrorRateThreshold represents the threshold for error rate above which concurrency will be adjusted.
	// Error rate is calculated as (TotalRateLimitErrors + 5xxErrors) / TotalRequests.
	// Adjustments in concurrency will be made if the error rate exceeds this threshold. A threshold of 0.1 (or 10%) is common.
	ErrorRateThreshold = 0.1
)
View Source
const (
	MaxConcurrency     = 10              // Maximum allowed concurrent requests
	MinConcurrency     = 1               // Minimum allowed concurrent requests
	EvaluationInterval = 1 * time.Minute // Time interval for evaluating metrics and adjusting concurrency
)

Constants and Data Structures:

Variables

This section is empty.

Functions

This section is empty.

Types

type ConcurrencyHandler

type ConcurrencyHandler struct {
	AcquisitionTimes []time.Duration

	Metrics *ConcurrencyMetrics
	// contains filtered or unexported fields
}

ConcurrencyHandler controls the number of concurrent HTTP requests.

func NewConcurrencyHandler

func NewConcurrencyHandler(limit int, logger logger.Logger, metrics *ConcurrencyMetrics) *ConcurrencyHandler

NewConcurrencyHandler initializes a new ConcurrencyHandler with the given concurrency limit, logger, and concurrency metrics. The ConcurrencyHandler ensures no more than a certain number of concurrent requests are made. It uses a semaphore to control concurrency.

func (*ConcurrencyHandler) AcquireConcurrencyToken

func (ch *ConcurrencyHandler) AcquireConcurrencyToken(ctx context.Context) (context.Context, uuid.UUID, error)

AcquireConcurrencyToken acquires a concurrency token to regulate the number of concurrent operations within predefined limits, ensuring system stability and adherence to concurrency policies. This function initiates a token acquisition process that involves generating a unique request ID for tracking purposes and attempting to acquire a token within a specified timeout to prevent indefinite blocking. Successful acquisition updates performance metrics and associates the unique request ID with the provided context for enhanced traceability and management of concurrent requests.

Parameters:

  • ctx: A parent context used as the basis for the token acquisition attempt, facilitating appropriate cancellation and timeout handling in line with best practices for concurrency control.

Returns:

  • context.Context: A derived context that includes the unique request ID, offering a mechanism for associating subsequent operations with the acquired concurrency token and facilitating effective request tracking and management.
  • uuid.UUID: The unique request ID generated as part of the token acquisition process, serving as an identifier for the acquired token and enabling detailed tracking and auditing of concurrent operations.
  • error: An error that signals failure to acquire a concurrency token within the allotted timeout, or due to other encountered issues, ensuring that potential problems in concurrency control are surfaced and can be addressed.

Usage: This function is a critical component of concurrency control and should be invoked prior to executing operations that require regulation of concurrency. The returned context, enhanced with the unique request ID, should be utilized in the execution of these operations to maintain consistency in tracking and managing concurrency tokens. The unique request ID also facilitates detailed auditing and troubleshooting of the concurrency control mechanism.

Example: ctx, requestID, err := concurrencyHandler.AcquireConcurrencyToken(context.Background())

if err != nil {
    // Handle token acquisition failure
}

defer concurrencyHandler.Release(requestID) // Proceed with the operation using the modified context

func (*ConcurrencyHandler) EvaluateAndAdjustConcurrency added in v0.1.32

func (ch *ConcurrencyHandler) EvaluateAndAdjustConcurrency(resp *http.Response, responseTime time.Duration)

EvaluateAndAdjustConcurrency evaluates the HTTP response from a server along with the request's response time and adjusts the concurrency level of the system accordingly. It utilizes three monitoring functions: MonitorRateLimitHeaders, MonitorServerResponseCodes, and MonitorResponseTimeVariability, each of which provides feedback on different aspects of the response and system's current state. The function aggregates feedback from these monitoring functions to make a decision on whether to scale up or scale down the concurrency. The decision is based on a simple majority of suggestions: if more functions suggest scaling down (return -1), it scales down; if more suggest scaling up (return 1), it scales up. This method centralizes concurrency control decision-making, providing a systematic approach to managing request handling capacity based on real-time operational metrics.

Parameters:

resp - The HTTP response received from the server.
responseTime - The time duration between sending the request and receiving the response.

It logs the specific reason for scaling decisions, helping in traceability and fine-tuning system performance.

func (*ConcurrencyHandler) MonitorRateLimitHeaders added in v0.1.31

func (ch *ConcurrencyHandler) MonitorRateLimitHeaders(resp *http.Response) int

MonitorRateLimitHeaders monitors the rate limit headers in the response and suggests a concurrency adjustment.

func (*ConcurrencyHandler) MonitorResponseTimeVariability added in v0.1.31

func (ch *ConcurrencyHandler) MonitorResponseTimeVariability(responseTime time.Duration) int

MonitorResponseTimeVariability monitors the response time variability and suggests a concurrency adjustment.

func (*ConcurrencyHandler) MonitorServerResponseCodes added in v0.1.31

func (ch *ConcurrencyHandler) MonitorServerResponseCodes(resp *http.Response) int

MonitorServerResponseCodes monitors the response status codes and suggests a concurrency adjustment.

func (*ConcurrencyHandler) ReleaseConcurrencyToken

func (ch *ConcurrencyHandler) ReleaseConcurrencyToken(requestID uuid.UUID)

ReleaseConcurrencyToken returns a token back to the semaphore pool, allowing other operations to proceed. It uses the provided requestID for structured logging, aiding in tracking and debugging the release of concurrency tokens.

func (*ConcurrencyHandler) ResizeSemaphore added in v0.1.31

func (ch *ConcurrencyHandler) ResizeSemaphore(newSize int)

ResizeSemaphore adjusts the size of the semaphore used to control concurrency. This method creates a new semaphore with the specified new size and closes the old semaphore to ensure that no further tokens can be acquired from it. This approach helps manage the transition from the old concurrency level to the new one without affecting ongoing operations significantly.

Parameters:

  • newSize: The new size for the semaphore, representing the updated limit on concurrent requests.

This function should be called from within synchronization contexts, such as AdjustConcurrency, to avoid race conditions and ensure that changes to the semaphore are consistent with the observed metrics.

func (*ConcurrencyHandler) ScaleDown added in v0.1.32

func (ch *ConcurrencyHandler) ScaleDown()

ScaleDown reduces the concurrency level by one, down to the minimum limit.

func (*ConcurrencyHandler) ScaleUp added in v0.1.32

func (ch *ConcurrencyHandler) ScaleUp()

ScaleUp increases the concurrency level by one, up to the maximum limit.

type ConcurrencyMetrics

type ConcurrencyMetrics struct {
	TotalRequests        int64         // Total number of requests made
	TotalRetries         int64         // Total number of retry attempts
	TotalRateLimitErrors int64         // Total number of rate limit errors encountered
	TokenWaitTime        time.Duration // Total time spent waiting for tokens
	TTFB                 struct {
		Total time.Duration // Total Time to First Byte (TTFB) for all requests
		Count int64         // Count of requests used for calculating TTFB
		Lock  sync.Mutex    // Lock for TTFB metrics
	}
	Throughput struct {
		Total float64    // Total network throughput for all requests
		Count int64      // Count of requests used for calculating throughput
		Lock  sync.Mutex // Lock for throughput metrics/
	}
	ResponseTimeVariability struct {
		Total           time.Duration // Total response time for all requests
		Average         time.Duration // Average response time across all requests
		Variance        float64       // Variance of response times
		Count           int64         // Count of responses used for calculating response time variability
		Lock            sync.Mutex    // Lock for response time variability metrics
		StdDevThreshold float64       // Maximum acceptable standard deviation for adjusting concurrency
	}
	ResponseCodeMetrics struct {
		ErrorRate float64    // Error rate calculated as (TotalRateLimitErrors + 5xxErrors) / TotalRequests
		Lock      sync.Mutex // Lock for response code metrics
	}
	Lock sync.Mutex // Lock for overall metrics fields
}

ConcurrencyMetrics captures various metrics related to managing concurrency for the client's interactions with the API.

type RequestIDKey

type RequestIDKey struct{}

RequestIDKey is type used as a key for storing and retrieving request-specific identifiers from a context.Context object. This private type ensures that the key is distinct and prevents accidental value retrieval or conflicts with other context keys. The value associated with this key in a context is typically a UUID that uniquely identifies a request being processed by the ConcurrencyManager, allowing for fine-grained control and tracking of concurrent HTTP requests.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL