Documentation ¶
Overview ¶
Package rate provides a rate limiter to rate limit requests that can be burstable but they should only allowed N per a period defined. This package differs from the "golang.org/x/time/rate" package as it does not implement the token bucket algorithm.
- Copyright (c) 2023 Baidu, Inc. All Rights Reserved. *
- Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- except in compliance with the License. You may obtain a copy of the License at *
- http://www.apache.org/licenses/LICENSE-2.0 *
- Unless required by applicable law or agreed to in writing, software distributed under the
- License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- either express or implied. See the License for the specific language governing permissions
- and limitations under the License. *
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type APILimiter ¶
type APILimiter struct {
// contains filtered or unexported fields
}
APILimiter is an extension to x/time/rate.Limiter specifically for CCE API calls. It allows to automatically adjust the rate, burst and maximum parallel API calls to stay as close as possible to an estimated processing time.
func NewAPILimiter ¶
func NewAPILimiter(name string, p APILimiterParameters, metrics MetricsObserver) *APILimiter
NewAPILimiter returns a new APILimiter based on the parameters and metrics implementation
func NewAPILimiterFromConfig ¶
func NewAPILimiterFromConfig(name, config string, metrics MetricsObserver) (*APILimiter, error)
NewAPILimiterFromConfig returns a new APILimiter based on user configuration
func (*APILimiter) Wait ¶
func (l *APILimiter) Wait(ctx context.Context) (LimitedRequest, error)
Wait blocks until the next API call is allowed to be processed. If the configured MaxWaitDuration is exceeded, an error is returned. On success, a LimitedRequest is returned on which Done() must be called when the API call has completed or Error() if an error occurred.
type APILimiterParameters ¶
type APILimiterParameters struct { // EstimatedProcessingDuration is the estimated duration an API call // will take. This value is used if AutoAdjust is enabled to // automatically adjust rate limits to stay as close as possible to the // estimated processing duration. EstimatedProcessingDuration time.Duration // AutoAdjust enables automatic adjustment of the values // ParallelRequests, RateLimit, and RateBurst in order to keep the // mean processing duration close to EstimatedProcessingDuration AutoAdjust bool // MeanOver is the number of entries to keep in order to calculate the // mean processing and wait duration MeanOver int // ParallelRequests is the parallel requests allowed. If AutoAdjust is // enabled, the value will adjust automatically. ParallelRequests int // MaxParallelRequests is the maximum parallel requests allowed. If // AutoAdjust is enabled, then the ParalelRequests will never grow // above MaxParallelRequests. MaxParallelRequests int // MinParallelRequests is the minimum parallel requests allowed. If // AutoAdjust is enabled, then the ParallelRequests will never fall // below MinParallelRequests. MinParallelRequests int // RateLimit is the initial number of API requests allowed per second. // If AutoAdjust is enabled, the value will adjust automatically. RateLimit rate.Limit // RateBurst is the initial allowed burst of API requests allowed. If // AutoAdjust is enabled, the value will adjust automatically. RateBurst int // MinWaitDuration is the minimum time an API request always has to // wait before the Wait() function returns an error. MinWaitDuration time.Duration // MaxWaitDuration is the maximum time an API request is allowed to // wait before the Wait() function returns an error. MaxWaitDuration time.Duration // Log enables info logging of processed API requests. This should only // be used for low frequency API calls. Log bool // DelayedAdjustmentFactor is percentage of the AdjustmentFactor to be // applied to RateBurst and MaxWaitDuration defined as a value between // 0.0..1.0. This is used to steer a slower reaction of the RateBurst // and ParallelRequests compared to RateLimit. DelayedAdjustmentFactor float64 // SkipInitial is the number of initial API calls for which to not // apply any rate limiting. This is useful to define a learning phase // in the beginning to allow for auto adjustment before imposing wait // durations and rate limiting on API calls. SkipInitial int // MaxAdjustmentFactor is the maximum adjustment factor when AutoAdjust // is enabled. Base values will not adjust more than by this factor. MaxAdjustmentFactor float64 }
APILimiterParameters is the configuration of an APILimiter. The structure may not be mutated after it has been passed into NewAPILimiter().
func (APILimiterParameters) MergeUserConfig ¶
func (p APILimiterParameters) MergeUserConfig(config string) (APILimiterParameters, error)
MergeUserConfig merges the provided user configuration into the existing parameters and returns a new copy.
type APILimiterSet ¶
type APILimiterSet struct {
// contains filtered or unexported fields
}
APILimiterSet is a set of APILimiter indexed by name
func NewAPILimiterSet ¶
func NewAPILimiterSet(config map[string]string, defaults map[string]APILimiterParameters, metrics MetricsObserver) (*APILimiterSet, error)
NewAPILimiterSet creates a new APILimiterSet based on a set of rate limiting configurations and the default configuration. Any rate limiter that is configured in the config OR the defaults will be configured and made available via the Limiter(name) and Wait() function.
func (*APILimiterSet) Limiter ¶
func (s *APILimiterSet) Limiter(name string) *APILimiter
Limiter returns the APILimiter with a given name
func (*APILimiterSet) Wait ¶
func (s *APILimiterSet) Wait(ctx context.Context, name string) (LimitedRequest, error)
Wait invokes Wait() on the APILimiter with the given name. If the limiter does not exist, a dummy limiter is used which will not impose any restrictions.
type DefaultAPILimiterSet ¶
type DefaultAPILimiterSet struct { sync.RWMutex *APILimiterSet // contains filtered or unexported fields }
DefaultAPILimiterSet returns a APILimiterSet that is a copy of the passed in set. create new APILimiter with the default param if APILimiter is not exsits when the `Limiter()` function is called.
func APILimiterSetWrapDefault ¶
func APILimiterSetWrapDefault(limiterSet *APILimiterSet, defaultParam *APILimiterParameters) *DefaultAPILimiterSet
APILimiterSetWrapDefault creates a new APILimiterSet based on a set of rate limiting configurations and the default configuration. Any rate limiter that is configured in the config OR the defaults will be configured and made available via the Limiter(name) and Wait() function.
func (*DefaultAPILimiterSet) Limiter ¶
func (s *DefaultAPILimiterSet) Limiter(name string) *APILimiter
Limiter returns the APILimiter with a given name
type LimitedRequest ¶
LimitedRequest represents a request that is being limited. It is returned by Wait() and the caller of Wait() is responsible to call Done() or Error() when the API call has been processed or resulted in an error. It is safe to call Error() and then Done(). It is not safe to call Done(), Error(), or WaitDuration() concurrently.
type Limiter ¶
type Limiter struct {
// contains filtered or unexported fields
}
Limiter is used to limit the number of operations done.
func NewLimiter ¶
NewLimiter returns a new Limiter that allows events up to b tokens during the given interval. This Limiter has a different implementation from the 'x/time/rate's Limiter implementation. 'x/time/rate.Limiter' sends a constant stream of updates (at a rate of few dozen events per second) over the period of a N minutes which is the behavior of the token bucket algorithm. It is designed to flatten bursts in a signal to a fixed output rate. This rate.Limiter does the opposite of 'x/time/rate.Limiter'. It takes a somewhat fixed-rate stream of updates and turns it into a stream of controlled small bursts every N minutes.
func (*Limiter) Stop ¶
func (lim *Limiter) Stop()
Stop stops the internal components used for the rate limiter logic.
type MetricsObserver ¶
type MetricsObserver interface { // ProcessedRequest is invoked after invocation of an API call ProcessedRequest(name string, values MetricsValues) }
MetricsObserver is the interface that must be implemented to extract metrics
var SimpleMetricsObserver MetricsObserver = &simpleMetricsObserver{}
SimpleMetricsObserver sets the metrics observer to a safe value
type MetricsValues ¶
type MetricsValues struct { WaitDuration time.Duration ProcessDuration time.Duration TotalDuration time.Duration Outcome string EstimatedProcessingDuration float64 ParallelRequests int Limit rate.Limit Burst int CurrentRequestsInFlight int AdjustmentFactor float64 Error error }
MetricsValues is the snapshot of relevant values to feed into the MetricsObserver
type ServiceLimiterManager ¶
type ServiceLimiterManager interface { Wait(ctx context.Context, name string) (LimitedRequest, error) Limiter(name string) *APILimiter }
ServiceLimiterManager manages ratelimiting for a set of services.