rate

package
v0.0.0-...-ec72bf4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 13, 2024 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Overview

Package rate provides a rate limiter to rate limit requests that can be burstable but they should only allowed N per a period defined. This package differs from the "golang.org/x/time/rate" package as it does not implement the token bucket algorithm.

  • Copyright (c) 2023 Baidu, Inc. All Rights Reserved. *
  • Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
  • except in compliance with the License. You may obtain a copy of the License at *
  • http://www.apache.org/licenses/LICENSE-2.0 *
  • Unless required by applicable law or agreed to in writing, software distributed under the
  • License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
  • either express or implied. See the License for the specific language governing permissions
  • and limitations under the License. *

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type APILimiter

type APILimiter struct {
	// contains filtered or unexported fields
}

APILimiter is an extension to x/time/rate.Limiter specifically for CCE API calls. It allows to automatically adjust the rate, burst and maximum parallel API calls to stay as close as possible to an estimated processing time.

func NewAPILimiter

func NewAPILimiter(name string, p APILimiterParameters, metrics MetricsObserver) *APILimiter

NewAPILimiter returns a new APILimiter based on the parameters and metrics implementation

func NewAPILimiterFromConfig

func NewAPILimiterFromConfig(name, config string, metrics MetricsObserver) (*APILimiter, error)

NewAPILimiterFromConfig returns a new APILimiter based on user configuration

func (*APILimiter) Wait

func (l *APILimiter) Wait(ctx context.Context) (LimitedRequest, error)

Wait blocks until the next API call is allowed to be processed. If the configured MaxWaitDuration is exceeded, an error is returned. On success, a LimitedRequest is returned on which Done() must be called when the API call has completed or Error() if an error occurred.

type APILimiterParameters

type APILimiterParameters struct {
	// EstimatedProcessingDuration is the estimated duration an API call
	// will take. This value is used if AutoAdjust is enabled to
	// automatically adjust rate limits to stay as close as possible to the
	// estimated processing duration.
	EstimatedProcessingDuration time.Duration

	// AutoAdjust enables automatic adjustment of the values
	// ParallelRequests, RateLimit, and RateBurst in order to keep the
	// mean processing duration close to EstimatedProcessingDuration
	AutoAdjust bool

	// MeanOver is the number of entries to keep in order to calculate the
	// mean processing and wait duration
	MeanOver int

	// ParallelRequests is the parallel requests allowed. If AutoAdjust is
	// enabled, the value will adjust automatically.
	ParallelRequests int

	// MaxParallelRequests is the maximum parallel requests allowed. If
	// AutoAdjust is enabled, then the ParalelRequests will never grow
	// above MaxParallelRequests.
	MaxParallelRequests int

	// MinParallelRequests is the minimum parallel requests allowed. If
	// AutoAdjust is enabled, then the ParallelRequests will never fall
	// below MinParallelRequests.
	MinParallelRequests int

	// RateLimit is the initial number of API requests allowed per second.
	// If AutoAdjust is enabled, the value will adjust automatically.
	RateLimit rate.Limit

	// RateBurst is the initial allowed burst of API requests allowed. If
	// AutoAdjust is enabled, the value will adjust automatically.
	RateBurst int

	// MinWaitDuration is the minimum time an API request always has to
	// wait before the Wait() function returns an error.
	MinWaitDuration time.Duration

	// MaxWaitDuration is the maximum time an API request is allowed to
	// wait before the Wait() function returns an error.
	MaxWaitDuration time.Duration

	// Log enables info logging of processed API requests. This should only
	// be used for low frequency API calls.
	Log bool

	// DelayedAdjustmentFactor is percentage of the AdjustmentFactor to be
	// applied to RateBurst and MaxWaitDuration defined as a value between
	// 0.0..1.0. This is used to steer a slower reaction of the RateBurst
	// and ParallelRequests compared to RateLimit.
	DelayedAdjustmentFactor float64

	// SkipInitial is the number of initial API calls for which to not
	// apply any rate limiting. This is useful to define a learning phase
	// in the beginning to allow for auto adjustment before imposing wait
	// durations and rate limiting on API calls.
	SkipInitial int

	// MaxAdjustmentFactor is the maximum adjustment factor when AutoAdjust
	// is enabled. Base values will not adjust more than by this factor.
	MaxAdjustmentFactor float64
}

APILimiterParameters is the configuration of an APILimiter. The structure may not be mutated after it has been passed into NewAPILimiter().

func (APILimiterParameters) MergeUserConfig

func (p APILimiterParameters) MergeUserConfig(config string) (APILimiterParameters, error)

MergeUserConfig merges the provided user configuration into the existing parameters and returns a new copy.

type APILimiterSet

type APILimiterSet struct {
	// contains filtered or unexported fields
}

APILimiterSet is a set of APILimiter indexed by name

func NewAPILimiterSet

func NewAPILimiterSet(config map[string]string, defaults map[string]APILimiterParameters, metrics MetricsObserver) (*APILimiterSet, error)

NewAPILimiterSet creates a new APILimiterSet based on a set of rate limiting configurations and the default configuration. Any rate limiter that is configured in the config OR the defaults will be configured and made available via the Limiter(name) and Wait() function.

func (*APILimiterSet) Limiter

func (s *APILimiterSet) Limiter(name string) *APILimiter

Limiter returns the APILimiter with a given name

func (*APILimiterSet) Wait

func (s *APILimiterSet) Wait(ctx context.Context, name string) (LimitedRequest, error)

Wait invokes Wait() on the APILimiter with the given name. If the limiter does not exist, a dummy limiter is used which will not impose any restrictions.

type DefaultAPILimiterSet

type DefaultAPILimiterSet struct {
	sync.RWMutex
	*APILimiterSet
	// contains filtered or unexported fields
}

DefaultAPILimiterSet returns a APILimiterSet that is a copy of the passed in set. create new APILimiter with the default param if APILimiter is not exsits when the `Limiter()` function is called.

func APILimiterSetWrapDefault

func APILimiterSetWrapDefault(limiterSet *APILimiterSet, defaultParam *APILimiterParameters) *DefaultAPILimiterSet

APILimiterSetWrapDefault creates a new APILimiterSet based on a set of rate limiting configurations and the default configuration. Any rate limiter that is configured in the config OR the defaults will be configured and made available via the Limiter(name) and Wait() function.

func (*DefaultAPILimiterSet) Limiter

func (s *DefaultAPILimiterSet) Limiter(name string) *APILimiter

Limiter returns the APILimiter with a given name

type LimitedRequest

type LimitedRequest interface {
	Done()
	Error(err error)
	WaitDuration() time.Duration
}

LimitedRequest represents a request that is being limited. It is returned by Wait() and the caller of Wait() is responsible to call Done() or Error() when the API call has been processed or resulted in an error. It is safe to call Error() and then Done(). It is not safe to call Done(), Error(), or WaitDuration() concurrently.

type Limiter

type Limiter struct {
	// contains filtered or unexported fields
}

Limiter is used to limit the number of operations done.

func NewLimiter

func NewLimiter(interval time.Duration, b int64) *Limiter

NewLimiter returns a new Limiter that allows events up to b tokens during the given interval. This Limiter has a different implementation from the 'x/time/rate's Limiter implementation. 'x/time/rate.Limiter' sends a constant stream of updates (at a rate of few dozen events per second) over the period of a N minutes which is the behavior of the token bucket algorithm. It is designed to flatten bursts in a signal to a fixed output rate. This rate.Limiter does the opposite of 'x/time/rate.Limiter'. It takes a somewhat fixed-rate stream of updates and turns it into a stream of controlled small bursts every N minutes.

func (*Limiter) Allow

func (lim *Limiter) Allow() bool

Allow is shorthand for AllowN(1).

func (*Limiter) AllowN

func (lim *Limiter) AllowN(n int64) bool

AllowN returns true if it's possible to allow n tokens.

func (*Limiter) Stop

func (lim *Limiter) Stop()

Stop stops the internal components used for the rate limiter logic.

func (*Limiter) Wait

func (lim *Limiter) Wait(ctx context.Context) error

Wait is shorthand for WaitN(ctx, 1).

func (*Limiter) WaitN

func (lim *Limiter) WaitN(ctx context.Context, n int64) error

WaitN acquires n tokens, blocking until resources are available or ctx is done. On success, returns nil. On failure, returns ctx.Err() and leaves the limiter unchanged.

If ctx is already done, WaitN may still succeed without blocking.

type MetricsObserver

type MetricsObserver interface {
	// ProcessedRequest is invoked after invocation of an API call
	ProcessedRequest(name string, values MetricsValues)
}

MetricsObserver is the interface that must be implemented to extract metrics

var SimpleMetricsObserver MetricsObserver = &simpleMetricsObserver{}

SimpleMetricsObserver sets the metrics observer to a safe value

type MetricsValues

type MetricsValues struct {
	WaitDuration                time.Duration
	ProcessDuration             time.Duration
	TotalDuration               time.Duration
	Outcome                     string
	EstimatedProcessingDuration float64
	ParallelRequests            int
	Limit                       rate.Limit
	Burst                       int
	CurrentRequestsInFlight     int
	AdjustmentFactor            float64
	Error                       error
}

MetricsValues is the snapshot of relevant values to feed into the MetricsObserver

type ServiceLimiterManager

type ServiceLimiterManager interface {
	Wait(ctx context.Context, name string) (LimitedRequest, error)
	Limiter(name string) *APILimiter
}

ServiceLimiterManager manages ratelimiting for a set of services.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL