Documentation ¶
Index ¶
- Constants
- func WaitReservations(ctx context.Context, now time.Time, reservations []*Reservation) (time.Duration, error)
- type Config
- type ControllerConfig
- type KVCalculator
- type Limit
- type Limiter
- func (lim *Limiter) AvailableTokens(now time.Time) float64
- func (lim *Limiter) GetBurst() int64
- func (lim *Limiter) IsLowTokens() bool
- func (lim *Limiter) Limit() Limit
- func (lim *Limiter) Reconfigure(now time.Time, args tokenBucketReconfigureArgs, opts ...LimiterOption)
- func (lim *Limiter) RemoveTokens(now time.Time, amount float64)
- func (lim *Limiter) Reserve(ctx context.Context, waitDuration time.Duration, now time.Time, n float64) *Reservation
- func (lim *Limiter) ResetRemainingNotifyTimes()
- func (lim *Limiter) SetupNotificationThreshold(now time.Time, threshold float64)
- type LimiterOption
- type RequestInfo
- type RequestUnit
- type RequestUnitConfig
- type Reservation
- type ResourceCalculator
- type ResourceControlCreateOption
- type ResourceGroupKVInterceptor
- type ResourceGroupProvider
- type ResourceGroupsController
- func (c *ResourceGroupsController) CheckResourceGroupExist(name string) bool
- func (c *ResourceGroupsController) GetConfig() *Config
- func (c *ResourceGroupsController) OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) (*rmpb.Consumption, *rmpb.Consumption, error)
- func (c *ResourceGroupsController) OnResponse(resourceGroupName string, req RequestInfo, resp ResponseInfo) (*rmpb.Consumption, error)
- func (c *ResourceGroupsController) Start(ctx context.Context)
- func (c *ResourceGroupsController) Stop() error
- type ResponseInfo
- type SQLCalculator
- type TestRequestInfo
- type TestResponseInfo
Constants ¶
const ( FromPeriodReport = "period_report" FromLowRU = "low_ru" )
Source List
const Inf = Limit(math.MaxFloat64)
Inf is the infinite rate limit; it allows all events (even if burst is zero).
const InfDuration = time.Duration(1<<63 - 1)
InfDuration is the duration returned by Delay when a Reservation is not OK.
Variables ¶
This section is empty.
Functions ¶
func WaitReservations ¶
func WaitReservations(ctx context.Context, now time.Time, reservations []*Reservation) (time.Duration, error)
WaitReservations is used to process a series of reservations so that all limiter tokens are returned if one reservation fails
Types ¶
type Config ¶
type Config struct { // RU model config ReadBaseCost RequestUnit ReadBytesCost RequestUnit WriteBaseCost RequestUnit WriteBytesCost RequestUnit CPUMsCost RequestUnit DegradedModeWaitDuration time.Duration // contains filtered or unexported fields }
Config is the configuration of the resource units, which gives the read/write request units or request resource cost standards. It should be calculated by a given `RequestUnitConfig` or `RequestResourceConfig`.
func GenerateConfig ¶
func GenerateConfig(config *ControllerConfig) *Config
GenerateConfig generates the configuration by the given request unit configuration.
type ControllerConfig ¶
type ControllerConfig struct { // EnableDegradedMode is to control whether resource control client enable degraded mode when server is disconnect. DegradedModeWaitDuration string `toml:"degraded-mode-wait-duration" json:"degraded-mode-wait-duration"` // RequestUnit is the configuration determines the coefficients of the RRU and WRU cost. // This configuration should be modified carefully. RequestUnit RequestUnitConfig `toml:"request-unit" json:"request-unit"` }
ControllerConfig is the configuration of the resource manager controller which includes some option for client needed.
func DefaultControllerConfig ¶
func DefaultControllerConfig() *ControllerConfig
DefaultControllerConfig returns the default resource manager controller configuration.
type KVCalculator ¶
type KVCalculator struct {
*Config
}
KVCalculator is used to calculate the KV-side consumption.
func (*KVCalculator) AfterKVRequest ¶
func (kc *KVCalculator) AfterKVRequest(consumption *rmpb.Consumption, req RequestInfo, res ResponseInfo)
AfterKVRequest ...
func (*KVCalculator) BeforeKVRequest ¶
func (kc *KVCalculator) BeforeKVRequest(consumption *rmpb.Consumption, req RequestInfo)
BeforeKVRequest ...
type Limit ¶
type Limit float64
Limit defines the maximum frequency of some events. Limit is represented as number of events per second. A zero Limit allows no events.
type Limiter ¶
type Limiter struct {
// contains filtered or unexported fields
}
A Limiter controls how frequently events are allowed to happen. It implements a "token bucket" of size b, initially full and refilled at rate r tokens per second. Informally, in any large enough time interval, the Limiter limits the rate to r tokens per second, with a maximum burst size of b events. As a special case, if r == Inf (the infinite rate), b is ignored. See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets.
The zero value is a valid Limiter, but it will reject all events. Use NewLimiter to create non-zero Limiters.
Limiter has one main methods Reserve. If no token is available, Reserve returns a reservation for a future token and the amount of time the caller must wait before using it, or its associated context.Context is canceled.
Some changes about burst(b):
- If b == 0, that means the limiter is unlimited capacity. default use in resource controller (burst with a rate within an unlimited capacity).
- If b < 0, that means the limiter is unlimited capacity and r is ignored, can be seen as r == Inf (burst within an unlimited capacity).
- If b > 0, that means the limiter is limited capacity.
func NewLimiter ¶
func NewLimiter(now time.Time, r Limit, b int64, tokens float64, lowTokensNotifyChan chan<- struct{}) *Limiter
NewLimiter returns a new Limiter that allows events up to rate r and permits bursts of at most b tokens.
func NewLimiterWithCfg ¶
func NewLimiterWithCfg(now time.Time, cfg tokenBucketReconfigureArgs, lowTokensNotifyChan chan<- struct{}) *Limiter
NewLimiterWithCfg returns a new Limiter that allows events up to rate r and permits bursts of at most b tokens.
func (*Limiter) AvailableTokens ¶
AvailableTokens decreases the amount of tokens currently available.
func (*Limiter) IsLowTokens ¶
IsLowTokens returns whether the limiter is in low tokens
func (*Limiter) Reconfigure ¶
func (lim *Limiter) Reconfigure(now time.Time, args tokenBucketReconfigureArgs, opts ...LimiterOption, )
Reconfigure modifies all setting for limiter
func (*Limiter) RemoveTokens ¶
RemoveTokens decreases the amount of tokens currently available.
func (*Limiter) Reserve ¶
func (lim *Limiter) Reserve(ctx context.Context, waitDuration time.Duration, now time.Time, n float64) *Reservation
Reserve returns a Reservation that indicates how long the caller must wait before n events happen. The Limiter takes this Reservation into account when allowing future events. The returned Reservation’s OK() method returns false if wait duration exceeds deadline. Usage example:
r := lim.Reserve(time.Now(), 1) if !r.OK() { // Not allowed to act! Did you remember to set lim.burst to be > 0 ? return } time.Sleep(r.Delay()) Act()
Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events.
func (*Limiter) ResetRemainingNotifyTimes ¶
func (lim *Limiter) ResetRemainingNotifyTimes()
type LimiterOption ¶
type LimiterOption func(*Limiter)
type RequestInfo ¶
RequestInfo is the interface of the request information provider. A request should be able to tell whether it's a write request and if so, the written bytes would also be provided.
type RequestUnit ¶
type RequestUnit float64
RequestUnit is the basic unit of the resource request management, which has two types:
- RRU: read request unit
- WRU: write request unit
type RequestUnitConfig ¶
type RequestUnitConfig struct { // ReadBaseCost is the base cost for a read request. No matter how many bytes read/written or // the CPU times taken for a request, this cost is inevitable. ReadBaseCost float64 `toml:"read-base-cost" json:"read-base-cost"` // ReadCostPerByte is the cost for each byte read. It's 1 RU = 64 KiB by default. ReadCostPerByte float64 `toml:"read-cost-per-byte" json:"read-cost-per-byte"` // WriteBaseCost is the base cost for a write request. No matter how many bytes read/written or // the CPU times taken for a request, this cost is inevitable. WriteBaseCost float64 `toml:"write-base-cost" json:"write-base-cost"` // WriteCostPerByte is the cost for each byte written. It's 1 RU = 1 KiB by default. WriteCostPerByte float64 `toml:"write-cost-per-byte" json:"write-cost-per-byte"` // CPUMsCost is the cost for each millisecond of CPU time taken. // It's 1 RU = 3 millisecond by default. CPUMsCost float64 `toml:"read-cpu-ms-cost" json:"read-cpu-ms-cost"` }
RequestUnitConfig is the configuration of the request units, which determines the coefficients of the RRU and WRU cost. This configuration should be modified carefully.
func DefaultRequestUnitConfig ¶
func DefaultRequestUnitConfig() RequestUnitConfig
DefaultRequestUnitConfig returns the default request unit configuration.
type Reservation ¶
type Reservation struct {
// contains filtered or unexported fields
}
A Reservation holds information about events that are permitted by a Limiter to happen after a delay. A Reservation may be canceled, which may enable the Limiter to permit additional events.
func (*Reservation) CancelAt ¶
func (r *Reservation) CancelAt(now time.Time)
CancelAt indicates that the reservation holder will not perform the reserved action and reverses tokens which be refilled into limiter.
func (*Reservation) Delay ¶
func (r *Reservation) Delay() time.Duration
Delay is shorthand for DelayFrom(time.Now()).
func (*Reservation) DelayFrom ¶
func (r *Reservation) DelayFrom(now time.Time) time.Duration
DelayFrom returns the duration for which the reservation holder must wait before taking the reserved action. Zero duration means act immediately. InfDuration means the limiter cannot grant the tokens requested in this Reservation within the maximum wait time.
func (*Reservation) OK ¶
func (r *Reservation) OK() bool
OK returns whether the limiter can provide the requested number of tokens within the maximum wait time. If OK is false, Delay returns InfDuration, and Cancel does nothing.
type ResourceCalculator ¶
type ResourceCalculator interface { // Trickle is used to calculate the resource consumption periodically rather than on the request path. // It's mainly used to calculate like the SQL CPU cost. // Need to check if it is a serverless environment Trickle(*rmpb.Consumption) // BeforeKVRequest is used to calculate the resource consumption before the KV request. // It's mainly used to calculate the base and write request cost. BeforeKVRequest(*rmpb.Consumption, RequestInfo) // AfterKVRequest is used to calculate the resource consumption after the KV request. // It's mainly used to calculate the read request cost and KV CPU cost. AfterKVRequest(*rmpb.Consumption, RequestInfo, ResponseInfo) }
ResourceCalculator is used to calculate the resource consumption of a request.
type ResourceControlCreateOption ¶
type ResourceControlCreateOption func(controller *ResourceGroupsController)
ResourceControlCreateOption create a ResourceGroupsController with the optional settings.
func EnableSingleGroupByKeyspace ¶
func EnableSingleGroupByKeyspace() ResourceControlCreateOption
EnableSingleGroupByKeyspace is the option to enable single group by keyspace feature.
func WithMaxWaitDuration ¶
func WithMaxWaitDuration(d time.Duration) ResourceControlCreateOption
WithMaxWaitDuration is the option to set the max wait duration for acquiring token buckets.
type ResourceGroupKVInterceptor ¶
type ResourceGroupKVInterceptor interface { // OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time. OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) (*rmpb.Consumption, *rmpb.Consumption, error) // OnResponse is used to consume tokens after receiving response OnResponse(resourceGroupName string, req RequestInfo, resp ResponseInfo) (*rmpb.Consumption, error) }
ResourceGroupKVInterceptor is used as quota limit controller for resource group using kv store.
type ResourceGroupProvider ¶
type ResourceGroupProvider interface { ListResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error) GetResourceGroup(ctx context.Context, resourceGroupName string) (*rmpb.ResourceGroup, error) AddResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error) ModifyResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error) DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error) AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error) LoadGlobalConfig(ctx context.Context, names []string, configPath string) ([]tm.GlobalConfigItem, int64, error) }
ResourceGroupProvider provides some api to interact with resource manager server。
type ResourceGroupsController ¶
type ResourceGroupsController struct {
// contains filtered or unexported fields
}
ResourceGroupsController impls ResourceGroupKVInterceptor.
func NewResourceGroupController ¶
func NewResourceGroupController( ctx context.Context, clientUniqueID uint64, provider ResourceGroupProvider, requestUnitConfig *RequestUnitConfig, opts ...ResourceControlCreateOption, ) (*ResourceGroupsController, error)
NewResourceGroupController returns a new ResourceGroupsController which impls ResourceGroupKVInterceptor
func (*ResourceGroupsController) CheckResourceGroupExist ¶
func (c *ResourceGroupsController) CheckResourceGroupExist(name string) bool
CheckResourceGroupExist checks if groupsController map {rg.name -> resource group controller} contains name. Used for test only.
func (*ResourceGroupsController) GetConfig ¶
func (c *ResourceGroupsController) GetConfig() *Config
GetConfig returns the config of controller. It's only used for test.
func (*ResourceGroupsController) OnRequestWait ¶
func (c *ResourceGroupsController) OnRequestWait( ctx context.Context, resourceGroupName string, info RequestInfo, ) (*rmpb.Consumption, *rmpb.Consumption, error)
OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time.
func (*ResourceGroupsController) OnResponse ¶
func (c *ResourceGroupsController) OnResponse( resourceGroupName string, req RequestInfo, resp ResponseInfo, ) (*rmpb.Consumption, error)
OnResponse is used to consume tokens after receiving response
func (*ResourceGroupsController) Start ¶
func (c *ResourceGroupsController) Start(ctx context.Context)
Start starts ResourceGroupController service.
func (*ResourceGroupsController) Stop ¶
func (c *ResourceGroupsController) Stop() error
Stop stops ResourceGroupController service.
type ResponseInfo ¶
type ResponseInfo interface { ReadBytes() uint64 KVCPU() time.Duration // Succeed is used to tell whether the request is successfully returned. // If not, we need to pay back the WRU cost of the request. Succeed() bool }
ResponseInfo is the interface of the response information provider. A response should be able to tell how many bytes it read and KV CPU cost in milliseconds.
type SQLCalculator ¶
type SQLCalculator struct {
*Config
}
SQLCalculator is used to calculate the SQL-side consumption.
func (*SQLCalculator) AfterKVRequest ¶
func (dsc *SQLCalculator) AfterKVRequest(consumption *rmpb.Consumption, req RequestInfo, res ResponseInfo)
AfterKVRequest ...
func (*SQLCalculator) BeforeKVRequest ¶
func (dsc *SQLCalculator) BeforeKVRequest(consumption *rmpb.Consumption, req RequestInfo)
BeforeKVRequest ...
func (*SQLCalculator) Trickle ¶
func (dsc *SQLCalculator) Trickle(consumption *rmpb.Consumption)
Trickle update sql layer CPU consumption.
type TestRequestInfo ¶
type TestRequestInfo struct {
// contains filtered or unexported fields
}
TestRequestInfo is used to test the request info interface.
func NewTestRequestInfo ¶
func NewTestRequestInfo(isWrite bool, writeBytes uint64, storeID uint64) *TestRequestInfo
NewTestRequestInfo creates a new TestRequestInfo.
func (*TestRequestInfo) IsWrite ¶
func (tri *TestRequestInfo) IsWrite() bool
IsWrite implements the RequestInfo interface.
func (*TestRequestInfo) StoreID ¶
func (tri *TestRequestInfo) StoreID() uint64
StoreID implements the RequestInfo interface.
func (*TestRequestInfo) WriteBytes ¶
func (tri *TestRequestInfo) WriteBytes() uint64
WriteBytes implements the RequestInfo interface.
type TestResponseInfo ¶
type TestResponseInfo struct {
// contains filtered or unexported fields
}
TestResponseInfo is used to test the response info interface.
func NewTestResponseInfo ¶
func NewTestResponseInfo(readBytes uint64, kvCPU time.Duration, succeed bool) *TestResponseInfo
func (*TestResponseInfo) KVCPU ¶
func (tri *TestResponseInfo) KVCPU() time.Duration
KVCPU implements the ResponseInfo interface.
func (*TestResponseInfo) ReadBytes ¶
func (tri *TestResponseInfo) ReadBytes() uint64
ReadBytes implements the ResponseInfo interface.
func (*TestResponseInfo) Succeed ¶
func (tri *TestResponseInfo) Succeed() bool
Succeed implements the ResponseInfo interface.