concurrency

package
v0.1.34 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 22, 2024 License: MPL-2.0 Imports: 10 Imported by: 0

Documentation

Overview

concurrency/const.go

concurrency/handler.go

concurrency/metrics.go

concurrency/resize.go

concurrency/scale.go

concurrency/semaphore.go

package provides utilities to manage concurrency control. The Concurrency Manager

ensures no more than a certain number of concurrent requests (e.g., 5 for Jamf Pro) are sent at the same time. This is managed using a semaphore

Index

Constants

View Source
const (
	// MaxConcurrency defines the upper limit of concurrent requests the system can handle.
	MaxConcurrency = 10

	// MinConcurrency defines the lower limit of concurrent requests the system will maintain.
	MinConcurrency = 1

	// EvaluationInterval specifies the frequency at which the system evaluates its performance metrics
	// to decide if concurrency adjustments are needed.
	EvaluationInterval = 1 * time.Minute

	// MaxAcceptableTTFB represents the maximum acceptable Time to First Byte (TTFB) in milliseconds.
	// TTFB is the time taken for the server to start sending the first byte of data in response to a request.
	// Adjustments in concurrency will be made if the TTFB exceeds this threshold.
	MaxAcceptableTTFB = 300 * time.Millisecond

	// MaxAcceptableThroughput represents the maximum acceptable network throughput in bytes per second.
	// Throughput is the amount of data transferred over the network within a specific time interval.
	// Adjustments in concurrency will be made if the network throughput exceeds this threshold.
	MaxAcceptableThroughput = 5 * 1024 * 1024

	// MaxAcceptableResponseTimeVariability represents the maximum acceptable variability in response times.
	// It is used as a threshold to dynamically adjust concurrency based on fluctuations in response times.
	MaxAcceptableResponseTimeVariability = 500 * time.Millisecond

	// ErrorRateThreshold represents the threshold for error rate above which concurrency will be adjusted.
	// Error rate is calculated as (TotalRateLimitErrors + 5xxErrors) / TotalRequests.
	// Adjustments in concurrency will be made if the error rate exceeds this threshold. A threshold of 0.1 (or 10%) is common.
	ErrorRateThreshold = 0.1

	// Weight assigned to each metric feedback type
	WeightRateLimit     = 0.5 // Weight for rate limit feedback, less if not all APIs provide this data
	WeightResponseCodes = 1.0 // Weight for server response codes
	WeightResponseTime  = 1.5 // Higher weight for response time variability

	// Thresholds for semaphore scaling actions
	ThresholdScaleDown = -1.5 // Threshold to decide scaling down
	ThresholdScaleUp   = 1.5  // Threshold to decide scaling up
)

Variables

This section is empty.

Functions

This section is empty.

Types

type ConcurrencyHandler

type ConcurrencyHandler struct {
	Metrics *ConcurrencyMetrics
	// contains filtered or unexported fields
}

func NewConcurrencyHandler

func NewConcurrencyHandler(limit int64, logger logger.Logger, metrics *ConcurrencyMetrics) *ConcurrencyHandler

NewConcurrencyHandler initializes a new ConcurrencyHandler with the given concurrency limit, logger, and concurrency metrics. The ConcurrencyHandler ensures no more than a certain number of concurrent requests are made. It uses a semaphore to control concurrency.

func NewConcurrencyHandler(limit int, logger logger.Logger, metrics *ConcurrencyMetrics) *ConcurrencyHandler {
	return &ConcurrencyHandler{
		sem:              make(chan struct{}, limit),
		logger:           logger,
		AcquisitionTimes: []time.Duration{},
		Metrics:          metrics,
	}
}

func (*ConcurrencyHandler) AcquireConcurrencyPermit added in v0.1.33

func (ch *ConcurrencyHandler) AcquireConcurrencyPermit(ctx context.Context) (context.Context, uuid.UUID, error)
	case <-ctxWithTimeout.Done(): // Failed to acquire a permit within the timeout
		log.Error("Failed to acquire concurrency permit", zap.Error(ctxWithTimeout.Err()))
		return ctx, requestID, ctxWithTimeout.Err()
	}
}

func (*ConcurrencyHandler) EvaluateAndAdjustConcurrency added in v0.1.32

func (ch *ConcurrencyHandler) EvaluateAndAdjustConcurrency(resp *http.Response, responseTime time.Duration)

EvaluateAndAdjustConcurrency evaluates the HTTP response from a server along with the request's response time and adjusts the concurrency level of the system accordingly. It utilizes three monitoring functions: MonitorRateLimitHeaders, MonitorServerResponseCodes, and MonitorResponseTimeVariability, each of which provides feedback on different aspects of the response and system's current state. The function aggregates feedback from these monitoring functions to make a decision on whether to scale up or scale down the concurrency. The decision is based on a simple majority of suggestions: if more functions suggest scaling down (return -1), it scales down; if more suggest scaling up (return 1), it scales up. This method centralizes concurrency control decision-making, providing a systematic approach to managing request handling capacity based on real-time operational metrics.

Parameters:

resp - The HTTP response received from the server.
responseTime - The time duration between sending the request and receiving the response.

It logs the specific reason for scaling decisions, helping in traceability and fine-tuning system performance.

func (*ConcurrencyHandler) MonitorRateLimitHeaders added in v0.1.31

func (ch *ConcurrencyHandler) MonitorRateLimitHeaders(resp *http.Response) int
	return suggestion
}

func (*ConcurrencyHandler) MonitorResponseTimeVariability added in v0.1.31

func (ch *ConcurrencyHandler) MonitorResponseTimeVariability(responseTime time.Duration) int
	// Determine action based on standard deviation
	if stdDev > ch.Metrics.ResponseTimeVariability.StdDevThreshold {
		// Suggest decrease concurrency
		return -1
	} else if stdDev <= ch.Metrics.ResponseTimeVariability.StdDevThreshold && len(ch.sem) < MaxConcurrency {
		// Suggest increase concurrency if there is capacity
		return 1
	}
	return 0
}

func (*ConcurrencyHandler) MonitorServerResponseCodes added in v0.1.31

func (ch *ConcurrencyHandler) MonitorServerResponseCodes(resp *http.Response) int
	// Determine action based on the error rate
	if errorRate > ErrorRateThreshold {
		// Suggest decrease concurrency
		return -1
	} else if errorRate <= ErrorRateThreshold && len(ch.sem) < MaxConcurrency {
		// Suggest increase concurrency if there is capacity
		return 1
	}
	return 0
}

func (*ConcurrencyHandler) ReleaseConcurrencyToken

func (ch *ConcurrencyHandler) ReleaseConcurrencyToken(requestID uuid.UUID)
	// Log the release of the concurrency permit for auditing and debugging purposes
	ch.logger.Debug("Released concurrency permit",
		zap.String("RequestID", requestID.String()),
		zap.Int("UtilizedTokens", utilizedTokens),
		zap.Int("AvailableTokens", availableTokens),
	)
}

func (*ConcurrencyHandler) ResizeSemaphore added in v0.1.31

func (ch *ConcurrencyHandler) ResizeSemaphore(newSize int64)
	// Transfer tokens from the old semaphore to the new one.
	for {
		select {
		case token := <-ch.sem:
			select {
			case newSem <- token:
				// Token transferred to new semaphore.
			default:
				// New semaphore is full, put token back to the old one to allow ongoing operations to complete.
				ch.sem <- token
			}
		default:
			// No more tokens to transfer.
			close(ch.sem)
			ch.sem = newSem
			return
		}
	}
}

func (*ConcurrencyHandler) ScaleDown added in v0.1.32

func (ch *ConcurrencyHandler) ScaleDown()
	// We must consider the capacity rather than the length of the semaphore channel
	currentSize := cap(ch.sem)
	if currentSize > MinConcurrency {
		newSize := currentSize - 1
		ch.logger.Info("Reducing request concurrency", zap.Int("currentSize", currentSize), zap.Int("newSize", newSize))
		ch.ResizeSemaphore(newSize)
	} else {
		ch.logger.Info("Concurrency already at minimum level; cannot reduce further", zap.Int("currentSize", currentSize))
	}
}

func (*ConcurrencyHandler) ScaleUp added in v0.1.32

func (ch *ConcurrencyHandler) ScaleUp()
	currentSize := cap(ch.sem)
	if currentSize < MaxConcurrency {
		newSize := currentSize + 1
		ch.logger.Info("Increasing request concurrency", zap.Int("currentSize", currentSize), zap.Int("newSize", newSize))
		ch.ResizeSemaphore(newSize)
	} else {
		ch.logger.Info("Concurrency already at maximum level; cannot increase further", zap.Int("currentSize", currentSize))
	}
}

type ConcurrencyMetrics

type ConcurrencyMetrics struct {
	TotalRequests        int64         // Total number of requests made
	TotalRetries         int64         // Total number of retry attempts
	TotalRateLimitErrors int64         // Total number of rate limit errors encountered
	TokenWaitTime        time.Duration // Total time spent waiting for tokens
	TTFB                 struct {
		Total time.Duration // Total Time to First Byte (TTFB) for all requests
		Count int64         // Count of requests used for calculating TTFB
		Lock  sync.Mutex    // Lock for TTFB metrics
	}
	Throughput struct {
		Total float64    // Total network throughput for all requests
		Count int64      // Count of requests used for calculating throughput
		Lock  sync.Mutex // Lock for throughput metrics/
	}
	ResponseTimeVariability struct {
		Total           time.Duration // Total response time for all requests
		Average         time.Duration // Average response time across all requests
		Variance        float64       // Variance of response times
		Count           int64         // Count of responses used for calculating response time variability
		Lock            sync.Mutex    // Lock for response time variability metrics
		StdDevThreshold float64       // Maximum acceptable standard deviation for adjusting concurrency
	}
	ResponseCodeMetrics struct {
		ErrorRate float64    // Error rate calculated as (TotalRateLimitErrors + 5xxErrors) / TotalRequests
		Lock      sync.Mutex // Lock for response code metrics
	}
	Lock sync.Mutex // Lock for overall metrics fields
}

ConcurrencyMetrics captures various metrics related to managing concurrency for the client's interactions with the API.

type RequestIDKey

type RequestIDKey struct{}

RequestIDKey is type used as a key for storing and retrieving request-specific identifiers from a context.Context object. This private type ensures that the key is distinct and prevents accidental value retrieval or conflicts with other context keys. The value associated with this key in a context is typically a UUID that uniquely identifies a request being processed by the ConcurrencyManager, allowing for fine-grained control and tracking of concurrent HTTP requests.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL