Documentation ¶
Index ¶
- Constants
- Variables
- func CalculateStartTimeMilli(curTimeMs int64, interval int64) int64
- func HasRegex(rule *namingpb.Rule) bool
- type BucketShareInfo
- type FlowQuotaAssistant
- func (f *FlowQuotaAssistant) AddWindowCount()
- func (f *FlowQuotaAssistant) CountRateLimitWindowSet() int
- func (f *FlowQuotaAssistant) DelWindowCount()
- func (f *FlowQuotaAssistant) DeleteRateLimitWindowSet(svcKey model.ServiceKey)
- func (f *FlowQuotaAssistant) Destroy()
- func (f *FlowQuotaAssistant) GetAllWindowSets() map[model.ServiceKey]*RateLimitWindowSet
- func (f *FlowQuotaAssistant) GetQuota(commonRequest *data.CommonRateLimitRequest) (*model.QuotaFutureImpl, error)
- func (f *FlowQuotaAssistant) GetRateLimitWindow(svcKey model.ServiceKey, rule *namingpb.Rule, label string) (*RateLimitWindowSet, *RateLimitWindow)
- func (f *FlowQuotaAssistant) GetRateLimitWindowSet(svcKey model.ServiceKey, create bool) *RateLimitWindowSet
- func (f *FlowQuotaAssistant) GetWindowCount() int32
- func (f *FlowQuotaAssistant) Init(engine model.Engine, cfg config.Configuration, supplier plugin.Supplier) error
- func (f *FlowQuotaAssistant) IsDestroyed() bool
- func (f *FlowQuotaAssistant) OnServiceDeleted(event *common.PluginEvent) error
- func (f *FlowQuotaAssistant) OnServiceUpdated(event *common.PluginEvent) error
- func (f *FlowQuotaAssistant) TaskValues() model.TaskValues
- type LimitMode
- type RateLimitGauge
- type RateLimitType
- type RateLimitWindow
- func (r *RateLimitWindow) AllocateQuota() (*model.QuotaFutureImpl, error)
- func (r *RateLimitWindow) AsyncRateLimitConnector() serverconnector.AsyncRateLimitConnector
- func (r *RateLimitWindow) CasStatus(oldStatus int64, status int64) bool
- func (r *RateLimitWindow) CompareTo(another interface{}) int
- func (r *RateLimitWindow) DoAsyncRemoteAcquire() error
- func (r *RateLimitWindow) DoAsyncRemoteInit() error
- func (r *RateLimitWindow) Engine() model.Engine
- func (r *RateLimitWindow) EnsureDeleted(value interface{}) bool
- func (r *RateLimitWindow) Expired(nowMilli int64) bool
- func (r *RateLimitWindow) GetLastAccessTimeMilli() int64
- func (r *RateLimitWindow) GetStatus() int64
- func (r *RateLimitWindow) Init()
- func (r *RateLimitWindow) InitializeRequest() *rlimitV2.RateLimitInitRequest
- func (r *RateLimitWindow) OnInitResponse(counter *rlimitV2.QuotaCounter, duration time.Duration, srvTimeMilli int64)
- func (r *RateLimitWindow) OnReportResponse(counter *rlimitV2.QuotaLeft, duration time.Duration, curTimeMilli int64)
- func (r *RateLimitWindow) SetStatus(status int64)
- type RateLimitWindowSet
- func (rs *RateLimitWindowSet) AddRateLimitWindow(commonRequest *data.CommonRateLimitRequest, rule *namingpb.Rule, ...) (*RateLimitWindow, error)
- func (rs *RateLimitWindowSet) GetRateLimitWindow(rule *namingpb.Rule, flatLabels string) *RateLimitWindow
- func (rs *RateLimitWindowSet) GetRateLimitWindows() []*RateLimitWindow
- func (rs *RateLimitWindowSet) OnServiceUpdated(svcEventObject *common.ServiceEventObject)
- func (rs *RateLimitWindowSet) OnWindowExpired(nowMilli int64, window *RateLimitWindow) bool
- func (rs *RateLimitWindowSet) PurgeWindows(nowMilli int64)
- type RemoteAwareBucket
- type RemoteAwareQpsBucket
- func (r *RemoteAwareQpsBucket) Allocate() *model.QuotaResponse
- func (r *RemoteAwareQpsBucket) GetQuotaUsed(curTimeMilli int64) *UsageInfo
- func (r *RemoteAwareQpsBucket) GetTokenBuckets() TokenBuckets
- func (r *RemoteAwareQpsBucket) Release()
- func (r *RemoteAwareQpsBucket) SetRemoteQuota(remoteQuotas *RemoteQuotaResult)
- func (r *RemoteAwareQpsBucket) UpdateTimeDiff(timeDiff int64)
- type RemoteErrorContainer
- type RemoteQuotaCallBack
- type RemoteQuotaResult
- type RemoteSyncParam
- type ReportElements
- type SlidingWindow
- func (s *SlidingWindow) AcquireCurrentValues(curTimeMs int64) (uint32, uint32, *Window)
- func (s *SlidingWindow) AddAndGetCurrentLimited(curTimeMs int64, value uint32) (uint32, *Window)
- func (s *SlidingWindow) AddAndGetCurrentPassed(curTimeMs int64, value uint32) (uint32, *Window)
- func (s *SlidingWindow) TouchCurrentPassed(curTimeMs int64) (uint32, *Window)
- type StatisticsBucket
- type TokenBucket
- func (t *TokenBucket) ConfirmLimited(limited uint32, nowMilli int64)
- func (t *TokenBucket) ConfirmPassed(passed uint32, nowMilli int64)
- func (t *TokenBucket) GetRuleTotal() int64
- func (t *TokenBucket) GiveBackToken(identifier *UpdateIdentifier, token int64, mode TokenBucketMode)
- func (t *TokenBucket) TryAllocateToken(token uint32, nowMilli int64, identifier *UpdateIdentifier, ...) (int64, TokenBucketMode)
- func (t *TokenBucket) UpdateRemoteClientCount(remoteQuotas *RemoteQuotaResult)
- func (t *TokenBucket) UpdateRemoteToken(remoteQuotas *RemoteQuotaResult, updateClient bool)
- type TokenBucketMode
- type TokenBuckets
- type UpdateIdentifier
- type UsageInfo
- type Window
- type WindowContainer
Constants ¶
const ( Disabled = "rateLimit disabled" RuleNotExists = "quota rule not exists" )
const ( //刚创建, 无需进行后台调度 Created int64 = iota //已获取调度权,准备开始调度 Initializing //已经在远程初始化结束 Initialized //已经删除 Deleted )
Variables ¶
var ( // 淘汰因子,过期时间=MaxDuration + ExpireFactor ExpireFactor = 1 * time.Second DefaultStatisticReportPeriod = 1 * time.Second )
超过多长时间后进行淘汰,淘汰后需要重新init
Functions ¶
func CalculateStartTimeMilli ¶
计算起始滑窗
Types ¶
type BucketShareInfo ¶
type BucketShareInfo struct {
// contains filtered or unexported fields
}
通用信息
type FlowQuotaAssistant ¶
type FlowQuotaAssistant struct {
// contains filtered or unexported fields
}
限额流程的辅助类
func (*FlowQuotaAssistant) AddWindowCount ¶
func (f *FlowQuotaAssistant) AddWindowCount()
func (*FlowQuotaAssistant) CountRateLimitWindowSet ¶
func (f *FlowQuotaAssistant) CountRateLimitWindowSet() int
获取分配窗口集合数量,只用于测试
func (*FlowQuotaAssistant) DelWindowCount ¶
func (f *FlowQuotaAssistant) DelWindowCount()
func (*FlowQuotaAssistant) DeleteRateLimitWindowSet ¶
func (f *FlowQuotaAssistant) DeleteRateLimitWindowSet(svcKey model.ServiceKey)
获取配额分配窗口集合
func (*FlowQuotaAssistant) Destroy ¶
func (f *FlowQuotaAssistant) Destroy()
func (*FlowQuotaAssistant) GetAllWindowSets ¶
func (f *FlowQuotaAssistant) GetAllWindowSets() map[model.ServiceKey]*RateLimitWindowSet
获取当前所有的限流窗口集合
func (*FlowQuotaAssistant) GetQuota ¶
func (f *FlowQuotaAssistant) GetQuota(commonRequest *data.CommonRateLimitRequest) (*model.QuotaFutureImpl, error)
获取配额
func (*FlowQuotaAssistant) GetRateLimitWindow ¶
func (f *FlowQuotaAssistant) GetRateLimitWindow(svcKey model.ServiceKey, rule *namingpb.Rule, label string) (*RateLimitWindowSet, *RateLimitWindow)
获取配额分配窗口
func (*FlowQuotaAssistant) GetRateLimitWindowSet ¶
func (f *FlowQuotaAssistant) GetRateLimitWindowSet(svcKey model.ServiceKey, create bool) *RateLimitWindowSet
获取配额分配窗口集合
func (*FlowQuotaAssistant) GetWindowCount ¶
func (f *FlowQuotaAssistant) GetWindowCount() int32
func (*FlowQuotaAssistant) Init ¶
func (f *FlowQuotaAssistant) Init(engine model.Engine, cfg config.Configuration, supplier plugin.Supplier) error
初始化限额辅助
func (*FlowQuotaAssistant) IsDestroyed ¶
func (f *FlowQuotaAssistant) IsDestroyed() bool
func (*FlowQuotaAssistant) OnServiceDeleted ¶
func (f *FlowQuotaAssistant) OnServiceDeleted(event *common.PluginEvent) error
服务删除回调
func (*FlowQuotaAssistant) OnServiceUpdated ¶
func (f *FlowQuotaAssistant) OnServiceUpdated(event *common.PluginEvent) error
服务更新回调,找到具体的限流窗口集合,然后触发更新
func (*FlowQuotaAssistant) TaskValues ¶
func (f *FlowQuotaAssistant) TaskValues() model.TaskValues
获取调度任务
type RateLimitGauge ¶
type RateLimitGauge struct { model.EmptyInstanceGauge Window *RateLimitWindow Namespace string Service string Type RateLimitType //限流周期, 单位秒 Duration uint32 //限流发生时的mode, 和plugin的pb要保持一致 LimitModeType LimitMode }
限流统计gauge
type RateLimitType ¶
type RateLimitType int
限流的类型
const ( TrafficShapingLimited RateLimitType = 0 QuotaLimited RateLimitType = 1 WindowDeleted RateLimitType = 2 //QuotaRequested RateLimitType = 3 QuotaGranted RateLimitType = 4 )
type RateLimitWindow ¶
type RateLimitWindow struct { //配额窗口集合 WindowSet *RateLimitWindowSet // 服务信息 SvcKey model.ServiceKey // 正则对应的label Labels string // 已经匹配到的限流规则,没有匹配则为空 // 由于可能会出现规则并没有发生变化,但是缓存对象更新的情况,因此这里使用原子变量 Rule *namingpb.Rule //其他插件在这里添加的相关数据,一般是统计插件使用 PluginData map[int32]interface{} // contains filtered or unexported fields }
限流窗口
func NewRateLimitWindow ¶
func NewRateLimitWindow(windowSet *RateLimitWindowSet, rule *namingpb.Rule, commonRequest *data.CommonRateLimitRequest, labels string) (*RateLimitWindow, error)
创建限流窗口
func (*RateLimitWindow) AllocateQuota ¶
func (r *RateLimitWindow) AllocateQuota() (*model.QuotaFutureImpl, error)
分配配额
func (*RateLimitWindow) AsyncRateLimitConnector ¶
func (r *RateLimitWindow) AsyncRateLimitConnector() serverconnector.AsyncRateLimitConnector
获取异步连接器
func (*RateLimitWindow) CasStatus ¶
func (r *RateLimitWindow) CasStatus(oldStatus int64, status int64) bool
CAS设置状态
func (*RateLimitWindow) CompareTo ¶
func (r *RateLimitWindow) CompareTo(another interface{}) int
比较两个窗口是否相同
func (*RateLimitWindow) DoAsyncRemoteAcquire ¶
func (r *RateLimitWindow) DoAsyncRemoteAcquire() error
异步发送 acquire
func (*RateLimitWindow) DoAsyncRemoteInit ¶
func (r *RateLimitWindow) DoAsyncRemoteInit() error
异步处理发送init
func (*RateLimitWindow) EnsureDeleted ¶
func (r *RateLimitWindow) EnsureDeleted(value interface{}) bool
删除前进行检查,返回true才删除,该检查是同步操作
func (*RateLimitWindow) GetLastAccessTimeMilli ¶
func (r *RateLimitWindow) GetLastAccessTimeMilli() int64
获取最近访问时间
func (*RateLimitWindow) InitializeRequest ¶
func (r *RateLimitWindow) InitializeRequest() *rlimitV2.RateLimitInitRequest
转换成限流PB初始化消息
func (*RateLimitWindow) OnInitResponse ¶
func (r *RateLimitWindow) OnInitResponse(counter *rlimitV2.QuotaCounter, duration time.Duration, srvTimeMilli int64)
应答回调函数
func (*RateLimitWindow) OnReportResponse ¶
func (r *RateLimitWindow) OnReportResponse(counter *rlimitV2.QuotaLeft, duration time.Duration, curTimeMilli int64)
应答回调函数
type RateLimitWindowSet ¶
type RateLimitWindowSet struct {
// contains filtered or unexported fields
}
限流分配窗口的缓存
func NewRateLimitWindowSet ¶
func NewRateLimitWindowSet(assistant *FlowQuotaAssistant) *RateLimitWindowSet
构造函数
func (*RateLimitWindowSet) AddRateLimitWindow ¶
func (rs *RateLimitWindowSet) AddRateLimitWindow( commonRequest *data.CommonRateLimitRequest, rule *namingpb.Rule, flatLabels string) (*RateLimitWindow, error)
添加限流窗口
func (*RateLimitWindowSet) GetRateLimitWindow ¶
func (rs *RateLimitWindowSet) GetRateLimitWindow(rule *namingpb.Rule, flatLabels string) *RateLimitWindow
获取限流窗口
func (*RateLimitWindowSet) GetRateLimitWindows ¶
func (rs *RateLimitWindowSet) GetRateLimitWindows() []*RateLimitWindow
拷贝一份只读数据
func (*RateLimitWindowSet) OnServiceUpdated ¶
func (rs *RateLimitWindowSet) OnServiceUpdated(svcEventObject *common.ServiceEventObject)
服务更新回调
func (*RateLimitWindowSet) OnWindowExpired ¶
func (rs *RateLimitWindowSet) OnWindowExpired(nowMilli int64, window *RateLimitWindow) bool
窗口过期
func (*RateLimitWindowSet) PurgeWindows ¶
func (rs *RateLimitWindowSet) PurgeWindows(nowMilli int64)
执行窗口淘汰
type RemoteAwareBucket ¶
type RemoteAwareBucket interface { // 父接口,执行用户配额分配操作 model.QuotaAllocator //设置通过限流服务端获取的远程配额 SetRemoteQuota(*RemoteQuotaResult) // 获取已经分配的配额 GetQuotaUsed(curTimeMilli int64) *UsageInfo //获取TokenBuckets GetTokenBuckets() TokenBuckets //更新时间间隔 UpdateTimeDiff(timeDiff int64) }
远程配额分配的令牌桶
type RemoteAwareQpsBucket ¶
type RemoteAwareQpsBucket struct {
// contains filtered or unexported fields
}
远程配额分配的算法桶
func NewRemoteAwareQpsBucket ¶
func NewRemoteAwareQpsBucket(window *RateLimitWindow) *RemoteAwareQpsBucket
创建QPS远程限流窗口
func (*RemoteAwareQpsBucket) Allocate ¶
func (r *RemoteAwareQpsBucket) Allocate() *model.QuotaResponse
执行配额分配操作
func (*RemoteAwareQpsBucket) GetQuotaUsed ¶
func (r *RemoteAwareQpsBucket) GetQuotaUsed(curTimeMilli int64) *UsageInfo
func (*RemoteAwareQpsBucket) GetTokenBuckets ¶
func (r *RemoteAwareQpsBucket) GetTokenBuckets() TokenBuckets
func (*RemoteAwareQpsBucket) SetRemoteQuota ¶
func (r *RemoteAwareQpsBucket) SetRemoteQuota(remoteQuotas *RemoteQuotaResult)
设置通过限流服务端获取的远程QPS
func (*RemoteAwareQpsBucket) UpdateTimeDiff ¶
func (r *RemoteAwareQpsBucket) UpdateTimeDiff(timeDiff int64)
更新时间间隔
type RemoteErrorContainer ¶
type RemoteErrorContainer struct {
// contains filtered or unexported fields
}
远程访问的错误信息
type RemoteQuotaCallBack ¶
type RemoteQuotaCallBack struct {
// contains filtered or unexported fields
}
远程配额查询任务
func NewRemoteQuotaCallback ¶
func NewRemoteQuotaCallback(cfg config.Configuration, supplier plugin.Supplier, engine model.Engine) (*RemoteQuotaCallBack, error)
创建查询任务
func (*RemoteQuotaCallBack) OnTaskEvent ¶
func (r *RemoteQuotaCallBack) OnTaskEvent(event model.TaskEvent)
OnTaskEvent 任务事件回调
func (*RemoteQuotaCallBack) Process ¶
func (r *RemoteQuotaCallBack) Process( taskKey interface{}, taskValue interface{}, lastProcessTime time.Time) model.TaskResult
处理远程配额查询任务
type RemoteQuotaResult ¶
type RemoteQuotaResult struct { Left int64 ClientCount uint32 ServerTimeMilli int64 DurationMill int64 }
远程下发配额
type SlidingWindow ¶
type SlidingWindow struct {
// contains filtered or unexported fields
}
滑窗通用实现
func (*SlidingWindow) AcquireCurrentValues ¶
func (s *SlidingWindow) AcquireCurrentValues(curTimeMs int64) (uint32, uint32, *Window)
获取上报数据
func (*SlidingWindow) AddAndGetCurrentLimited ¶
func (s *SlidingWindow) AddAndGetCurrentLimited(curTimeMs int64, value uint32) (uint32, *Window)
原子增加,并返回当前bucket
func (*SlidingWindow) AddAndGetCurrentPassed ¶
func (s *SlidingWindow) AddAndGetCurrentPassed(curTimeMs int64, value uint32) (uint32, *Window)
原子增加,并返回当前bucket
func (*SlidingWindow) TouchCurrentPassed ¶
func (s *SlidingWindow) TouchCurrentPassed(curTimeMs int64) (uint32, *Window)
获取上报数据
type StatisticsBucket ¶
type StatisticsBucket struct {
// contains filtered or unexported fields
}
用于metric report 统计
func (*StatisticsBucket) AddCount ¶
func (b *StatisticsBucket) AddCount(isLimit bool, now time.Time)
AddCount,外面使用该接口
func (*StatisticsBucket) AddCountByUnixTime ¶
func (b *StatisticsBucket) AddCountByUnixTime(isLimit bool, now int64)
func (*StatisticsBucket) GetReportData ¶
func (b *StatisticsBucket) GetReportData(periodTime int64) *ReportElements
GetReportData
type TokenBucket ¶
type TokenBucket struct { UpdateIdentifier // contains filtered or unexported fields }
令牌桶
func NewTokenBucket ¶
func NewTokenBucket( windowKey string, validDuration time.Duration, tokenAmount uint32, shareInfo *BucketShareInfo) *TokenBucket
创建令牌桶
func (*TokenBucket) ConfirmLimited ¶
func (t *TokenBucket) ConfirmLimited(limited uint32, nowMilli int64)
记录限流分配配额
func (*TokenBucket) ConfirmPassed ¶
func (t *TokenBucket) ConfirmPassed(passed uint32, nowMilli int64)
记录真实分配配额
func (*TokenBucket) GiveBackToken ¶
func (t *TokenBucket) GiveBackToken(identifier *UpdateIdentifier, token int64, mode TokenBucketMode)
归还配额
func (*TokenBucket) TryAllocateToken ¶
func (t *TokenBucket) TryAllocateToken( token uint32, nowMilli int64, identifier *UpdateIdentifier, mode TokenBucketMode) (int64, TokenBucketMode)
尝试分配配额
func (*TokenBucket) UpdateRemoteClientCount ¶
func (t *TokenBucket) UpdateRemoteClientCount(remoteQuotas *RemoteQuotaResult)
只更新远程客户端数量,不更新配额
func (*TokenBucket) UpdateRemoteToken ¶
func (t *TokenBucket) UpdateRemoteToken(remoteQuotas *RemoteQuotaResult, updateClient bool)
更新远程配额
type TokenBucketMode ¶
type TokenBucketMode int
const ( Unknown TokenBucketMode = iota Remote RemoteToLocal Local )
type UpdateIdentifier ¶
type UpdateIdentifier struct {
// contains filtered or unexported fields
}
令牌桶是否进行更新的凭证
type UsageInfo ¶
type UsageInfo struct { //配额使用时间 CurTimeMilli int64 //配额使用详情 Passed map[int64]uint32 //限流情况 Limited map[int64]uint32 }
配额使用信息
type WindowContainer ¶
type WindowContainer struct { //主窗口,非正则表达式的适用 MainWindow *RateLimitWindow //适用于正则表达式展开的 WindowByLabel map[string]*RateLimitWindow }
窗口容器
func (*WindowContainer) GetRateLimitWindows ¶
func (w *WindowContainer) GetRateLimitWindows() []*RateLimitWindow
获取限流滑窗