quota

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 18, 2021 License: Apache-2.0, BSD-2-Clause, BSD-3-Clause, + 3 more Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Disabled      = "rateLimit disabled"
	RuleNotExists = "quota rule not exists"
)
View Source
const (
	//刚创建, 无需进行后台调度
	Created int64 = iota
	//已获取调度权,准备开始调度
	Initializing
	//已经在远程初始化结束
	Initialized
	//已经删除
	Deleted
)

Variables

View Source
var (
	// 淘汰因子,过期时间=MaxDuration + ExpireFactor
	ExpireFactor = 1 * time.Second

	DefaultStatisticReportPeriod = 1 * time.Second
)

超过多长时间后进行淘汰,淘汰后需要重新init

Functions

func CalculateStartTimeMilli

func CalculateStartTimeMilli(curTimeMs int64, interval int64) int64

计算起始滑窗

func HasRegex

func HasRegex(rule *namingpb.Rule) bool

规则是否还有正则表达式匹配逻辑

Types

type BucketShareInfo

type BucketShareInfo struct {
	// contains filtered or unexported fields
}

通用信息

type FlowQuotaAssistant

type FlowQuotaAssistant struct {
	// contains filtered or unexported fields
}

限额流程的辅助类

func (*FlowQuotaAssistant) AddWindowCount

func (f *FlowQuotaAssistant) AddWindowCount()

func (*FlowQuotaAssistant) CountRateLimitWindowSet

func (f *FlowQuotaAssistant) CountRateLimitWindowSet() int

获取分配窗口集合数量,只用于测试

func (*FlowQuotaAssistant) DelWindowCount

func (f *FlowQuotaAssistant) DelWindowCount()

func (*FlowQuotaAssistant) DeleteRateLimitWindowSet

func (f *FlowQuotaAssistant) DeleteRateLimitWindowSet(svcKey model.ServiceKey)

获取配额分配窗口集合

func (*FlowQuotaAssistant) Destroy

func (f *FlowQuotaAssistant) Destroy()

func (*FlowQuotaAssistant) GetAllWindowSets

func (f *FlowQuotaAssistant) GetAllWindowSets() map[model.ServiceKey]*RateLimitWindowSet

获取当前所有的限流窗口集合

func (*FlowQuotaAssistant) GetQuota

func (f *FlowQuotaAssistant) GetQuota(commonRequest *data.CommonRateLimitRequest) (*model.QuotaFutureImpl, error)

获取配额

func (*FlowQuotaAssistant) GetRateLimitWindow

func (f *FlowQuotaAssistant) GetRateLimitWindow(svcKey model.ServiceKey, rule *namingpb.Rule,
	label string) (*RateLimitWindowSet, *RateLimitWindow)

获取配额分配窗口

func (*FlowQuotaAssistant) GetRateLimitWindowSet

func (f *FlowQuotaAssistant) GetRateLimitWindowSet(svcKey model.ServiceKey, create bool) *RateLimitWindowSet

获取配额分配窗口集合

func (*FlowQuotaAssistant) GetWindowCount

func (f *FlowQuotaAssistant) GetWindowCount() int32

func (*FlowQuotaAssistant) Init

func (f *FlowQuotaAssistant) Init(engine model.Engine, cfg config.Configuration, supplier plugin.Supplier) error

初始化限额辅助

func (*FlowQuotaAssistant) IsDestroyed

func (f *FlowQuotaAssistant) IsDestroyed() bool

func (*FlowQuotaAssistant) OnServiceDeleted

func (f *FlowQuotaAssistant) OnServiceDeleted(event *common.PluginEvent) error

服务删除回调

func (*FlowQuotaAssistant) OnServiceUpdated

func (f *FlowQuotaAssistant) OnServiceUpdated(event *common.PluginEvent) error

服务更新回调,找到具体的限流窗口集合,然后触发更新

func (*FlowQuotaAssistant) TaskValues

func (f *FlowQuotaAssistant) TaskValues() model.TaskValues

获取调度任务

type LimitMode

type LimitMode int
const (
	//未知类型,用于兼容前面pb
	LimitUnknownMode LimitMode = 0
	//全局类型,与限流server发生交互
	LimitGlobalMode LimitMode = 1
	//本地类型,使用本地限流算法
	LimitLocalMode LimitMode = 2
	//降级类型,因为无法连接限流server,导致必须使用本地限流算法
	LimitDegradeMode LimitMode = 3
)

type RateLimitGauge

type RateLimitGauge struct {
	model.EmptyInstanceGauge
	Window    *RateLimitWindow
	Namespace string
	Service   string
	Type      RateLimitType
	//限流周期, 单位秒
	Duration uint32
	//限流发生时的mode, 和plugin的pb要保持一致
	LimitModeType LimitMode
}

限流统计gauge

type RateLimitType

type RateLimitType int

限流的类型

const (
	TrafficShapingLimited RateLimitType = 0
	QuotaLimited          RateLimitType = 1
	WindowDeleted         RateLimitType = 2
	//QuotaRequested        RateLimitType = 3
	QuotaGranted RateLimitType = 4
)

type RateLimitWindow

type RateLimitWindow struct {
	//配额窗口集合
	WindowSet *RateLimitWindowSet
	// 服务信息
	SvcKey model.ServiceKey
	// 正则对应的label
	Labels string

	// 已经匹配到的限流规则,没有匹配则为空
	// 由于可能会出现规则并没有发生变化,但是缓存对象更新的情况,因此这里使用原子变量
	Rule *namingpb.Rule
	//其他插件在这里添加的相关数据,一般是统计插件使用
	PluginData map[int32]interface{}
	// contains filtered or unexported fields
}

限流窗口

func NewRateLimitWindow

func NewRateLimitWindow(windowSet *RateLimitWindowSet, rule *namingpb.Rule,
	commonRequest *data.CommonRateLimitRequest, labels string) (*RateLimitWindow, error)

创建限流窗口

func (*RateLimitWindow) AllocateQuota

func (r *RateLimitWindow) AllocateQuota() (*model.QuotaFutureImpl, error)

分配配额

func (*RateLimitWindow) AsyncRateLimitConnector

func (r *RateLimitWindow) AsyncRateLimitConnector() serverconnector.AsyncRateLimitConnector

获取异步连接器

func (*RateLimitWindow) CasStatus

func (r *RateLimitWindow) CasStatus(oldStatus int64, status int64) bool

CAS设置状态

func (*RateLimitWindow) CompareTo

func (r *RateLimitWindow) CompareTo(another interface{}) int

比较两个窗口是否相同

func (*RateLimitWindow) DoAsyncRemoteAcquire

func (r *RateLimitWindow) DoAsyncRemoteAcquire() error

异步发送 acquire

func (*RateLimitWindow) DoAsyncRemoteInit

func (r *RateLimitWindow) DoAsyncRemoteInit() error

异步处理发送init

func (*RateLimitWindow) Engine

func (r *RateLimitWindow) Engine() model.Engine

获取SDK引擎

func (*RateLimitWindow) EnsureDeleted

func (r *RateLimitWindow) EnsureDeleted(value interface{}) bool

删除前进行检查,返回true才删除,该检查是同步操作

func (*RateLimitWindow) Expired

func (r *RateLimitWindow) Expired(nowMilli int64) bool

是否已经过期

func (*RateLimitWindow) GetLastAccessTimeMilli

func (r *RateLimitWindow) GetLastAccessTimeMilli() int64

获取最近访问时间

func (*RateLimitWindow) GetStatus

func (r *RateLimitWindow) GetStatus() int64

原子获取状态

func (*RateLimitWindow) Init

func (r *RateLimitWindow) Init()

初始化限流窗口

func (*RateLimitWindow) InitializeRequest

func (r *RateLimitWindow) InitializeRequest() *rlimitV2.RateLimitInitRequest

转换成限流PB初始化消息

func (*RateLimitWindow) OnInitResponse

func (r *RateLimitWindow) OnInitResponse(counter *rlimitV2.QuotaCounter, duration time.Duration, srvTimeMilli int64)

应答回调函数

func (*RateLimitWindow) OnReportResponse

func (r *RateLimitWindow) OnReportResponse(counter *rlimitV2.QuotaLeft, duration time.Duration, curTimeMilli int64)

应答回调函数

func (*RateLimitWindow) SetStatus

func (r *RateLimitWindow) SetStatus(status int64)

设置状态

type RateLimitWindowSet

type RateLimitWindowSet struct {
	// contains filtered or unexported fields
}

限流分配窗口的缓存

func NewRateLimitWindowSet

func NewRateLimitWindowSet(assistant *FlowQuotaAssistant) *RateLimitWindowSet

构造函数

func (*RateLimitWindowSet) AddRateLimitWindow

func (rs *RateLimitWindowSet) AddRateLimitWindow(
	commonRequest *data.CommonRateLimitRequest, rule *namingpb.Rule, flatLabels string) (*RateLimitWindow, error)

添加限流窗口

func (*RateLimitWindowSet) GetRateLimitWindow

func (rs *RateLimitWindowSet) GetRateLimitWindow(rule *namingpb.Rule, flatLabels string) *RateLimitWindow

获取限流窗口

func (*RateLimitWindowSet) GetRateLimitWindows

func (rs *RateLimitWindowSet) GetRateLimitWindows() []*RateLimitWindow

拷贝一份只读数据

func (*RateLimitWindowSet) OnServiceUpdated

func (rs *RateLimitWindowSet) OnServiceUpdated(svcEventObject *common.ServiceEventObject)

服务更新回调

func (*RateLimitWindowSet) OnWindowExpired

func (rs *RateLimitWindowSet) OnWindowExpired(nowMilli int64, window *RateLimitWindow) bool

窗口过期

func (*RateLimitWindowSet) PurgeWindows

func (rs *RateLimitWindowSet) PurgeWindows(nowMilli int64)

执行窗口淘汰

type RemoteAwareBucket

type RemoteAwareBucket interface {
	// 父接口,执行用户配额分配操作
	model.QuotaAllocator
	//设置通过限流服务端获取的远程配额
	SetRemoteQuota(*RemoteQuotaResult)
	// 获取已经分配的配额
	GetQuotaUsed(curTimeMilli int64) *UsageInfo
	//获取TokenBuckets
	GetTokenBuckets() TokenBuckets
	//更新时间间隔
	UpdateTimeDiff(timeDiff int64)
}

远程配额分配的令牌桶

type RemoteAwareQpsBucket

type RemoteAwareQpsBucket struct {
	// contains filtered or unexported fields
}

远程配额分配的算法桶

func NewRemoteAwareQpsBucket

func NewRemoteAwareQpsBucket(window *RateLimitWindow) *RemoteAwareQpsBucket

创建QPS远程限流窗口

func (*RemoteAwareQpsBucket) Allocate

func (r *RemoteAwareQpsBucket) Allocate() *model.QuotaResponse

执行配额分配操作

func (*RemoteAwareQpsBucket) GetQuotaUsed

func (r *RemoteAwareQpsBucket) GetQuotaUsed(curTimeMilli int64) *UsageInfo

func (*RemoteAwareQpsBucket) GetTokenBuckets

func (r *RemoteAwareQpsBucket) GetTokenBuckets() TokenBuckets

func (*RemoteAwareQpsBucket) Release

func (r *RemoteAwareQpsBucket) Release()

执行配额回收操作

func (*RemoteAwareQpsBucket) SetRemoteQuota

func (r *RemoteAwareQpsBucket) SetRemoteQuota(remoteQuotas *RemoteQuotaResult)

设置通过限流服务端获取的远程QPS

func (*RemoteAwareQpsBucket) UpdateTimeDiff

func (r *RemoteAwareQpsBucket) UpdateTimeDiff(timeDiff int64)

更新时间间隔

type RemoteErrorContainer

type RemoteErrorContainer struct {
	// contains filtered or unexported fields
}

远程访问的错误信息

type RemoteQuotaCallBack

type RemoteQuotaCallBack struct {
	// contains filtered or unexported fields
}

远程配额查询任务

func NewRemoteQuotaCallback

func NewRemoteQuotaCallback(cfg config.Configuration, supplier plugin.Supplier,
	engine model.Engine) (*RemoteQuotaCallBack, error)

创建查询任务

func (*RemoteQuotaCallBack) OnTaskEvent

func (r *RemoteQuotaCallBack) OnTaskEvent(event model.TaskEvent)

OnTaskEvent 任务事件回调

func (*RemoteQuotaCallBack) Process

func (r *RemoteQuotaCallBack) Process(
	taskKey interface{}, taskValue interface{}, lastProcessTime time.Time) model.TaskResult

处理远程配额查询任务

type RemoteQuotaResult

type RemoteQuotaResult struct {
	Left            int64
	ClientCount     uint32
	ServerTimeMilli int64
	DurationMill    int64
}

远程下发配额

type RemoteSyncParam

type RemoteSyncParam struct {
	// 连接相关参数
	model.ControlParam
}

远程同步相关参数

type ReportElements

type ReportElements struct {
	TotalCount int64
	LimitCount int64
}

返回记录

type SlidingWindow

type SlidingWindow struct {
	// contains filtered or unexported fields
}

滑窗通用实现

func NewSlidingWindow

func NewSlidingWindow(slideCount int, intervalMs int) *SlidingWindow

创建滑窗

func (*SlidingWindow) AcquireCurrentValues

func (s *SlidingWindow) AcquireCurrentValues(curTimeMs int64) (uint32, uint32, *Window)

获取上报数据

func (*SlidingWindow) AddAndGetCurrentLimited

func (s *SlidingWindow) AddAndGetCurrentLimited(curTimeMs int64, value uint32) (uint32, *Window)

原子增加,并返回当前bucket

func (*SlidingWindow) AddAndGetCurrentPassed

func (s *SlidingWindow) AddAndGetCurrentPassed(curTimeMs int64, value uint32) (uint32, *Window)

原子增加,并返回当前bucket

func (*SlidingWindow) TouchCurrentPassed

func (s *SlidingWindow) TouchCurrentPassed(curTimeMs int64) (uint32, *Window)

获取上报数据

type StatisticsBucket

type StatisticsBucket struct {
	// contains filtered or unexported fields
}

用于metric report 统计

func NewStatisticsBucket

func NewStatisticsBucket() *StatisticsBucket

create StatisticsBucket

func (*StatisticsBucket) AddCount

func (b *StatisticsBucket) AddCount(isLimit bool, now time.Time)

AddCount,外面使用该接口

func (*StatisticsBucket) AddCountByUnixTime

func (b *StatisticsBucket) AddCountByUnixTime(isLimit bool, now int64)

func (*StatisticsBucket) GetReportData

func (b *StatisticsBucket) GetReportData(periodTime int64) *ReportElements

GetReportData

type TokenBucket

type TokenBucket struct {
	UpdateIdentifier
	// contains filtered or unexported fields
}

令牌桶

func NewTokenBucket

func NewTokenBucket(
	windowKey string, validDuration time.Duration, tokenAmount uint32, shareInfo *BucketShareInfo) *TokenBucket

创建令牌桶

func (*TokenBucket) ConfirmLimited

func (t *TokenBucket) ConfirmLimited(limited uint32, nowMilli int64)

记录限流分配配额

func (*TokenBucket) ConfirmPassed

func (t *TokenBucket) ConfirmPassed(passed uint32, nowMilli int64)

记录真实分配配额

func (*TokenBucket) GetRuleTotal

func (t *TokenBucket) GetRuleTotal() int64

获取限流总量

func (*TokenBucket) GiveBackToken

func (t *TokenBucket) GiveBackToken(identifier *UpdateIdentifier, token int64, mode TokenBucketMode)

归还配额

func (*TokenBucket) TryAllocateToken

func (t *TokenBucket) TryAllocateToken(
	token uint32, nowMilli int64, identifier *UpdateIdentifier, mode TokenBucketMode) (int64, TokenBucketMode)

尝试分配配额

func (*TokenBucket) UpdateRemoteClientCount

func (t *TokenBucket) UpdateRemoteClientCount(remoteQuotas *RemoteQuotaResult)

只更新远程客户端数量,不更新配额

func (*TokenBucket) UpdateRemoteToken

func (t *TokenBucket) UpdateRemoteToken(remoteQuotas *RemoteQuotaResult, updateClient bool)

更新远程配额

type TokenBucketMode

type TokenBucketMode int
const (
	Unknown TokenBucketMode = iota
	Remote
	RemoteToLocal
	Local
)

type TokenBuckets

type TokenBuckets []*TokenBucket

令牌桶序列

func (TokenBuckets) Len

func (tbs TokenBuckets) Len() int

数组长度

func (TokenBuckets) Less

func (tbs TokenBuckets) Less(i, j int) bool

比较数组成员大小

func (TokenBuckets) Swap

func (tbs TokenBuckets) Swap(i, j int)

交换数组成员

type UpdateIdentifier

type UpdateIdentifier struct {
	// contains filtered or unexported fields
}

令牌桶是否进行更新的凭证

type UsageInfo

type UsageInfo struct {
	//配额使用时间
	CurTimeMilli int64
	//配额使用详情
	Passed map[int64]uint32
	//限流情况
	Limited map[int64]uint32
}

配额使用信息

type Window

type Window struct {
	//起始时间
	WindowStart int64
	//通过数
	PassedValue uint32
	//被限流数
	LimitedValue uint32
}

单个窗口

type WindowContainer

type WindowContainer struct {
	//主窗口,非正则表达式的适用
	MainWindow *RateLimitWindow
	//适用于正则表达式展开的
	WindowByLabel map[string]*RateLimitWindow
}

窗口容器

func NewWindowContainer

func NewWindowContainer() *WindowContainer

创建窗口容器

func (*WindowContainer) GetRateLimitWindows

func (w *WindowContainer) GetRateLimitWindows() []*RateLimitWindow

获取限流滑窗

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL