quota

package
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 2, 2022 License: Apache-2.0, Apache-2.0, BSD-2-Clause, + 4 more Imports: 31 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Disabled      = "rateLimit disabled"
	RuleNotExists = "quota rule not exists"
)
View Source
const (
	// 刚创建, 无需进行后台调度
	Created int64 = iota
	// 已获取调度权,准备开始调度
	Initializing
	// 已经在远程初始化结束
	Initialized
	// 已经删除
	Deleted
)

Variables

View Source
var (
	// 淘汰因子,过期时间=MaxDuration + ExpireFactor
	ExpireFactor = 1 * time.Second

	DefaultStatisticReportPeriod = 1 * time.Second
)

超过多长时间后进行淘汰,淘汰后需要重新init

Functions

func CalculateStartTimeMilli

func CalculateStartTimeMilli(curTimeMs int64, interval int64) int64

CalculateStartTimeMilli 计算起始滑窗

func HasRegex

func HasRegex(rule *namingpb.Rule) bool

HasRegex 规则是否还有正则表达式匹配逻辑

func IsSuccess

func IsSuccess(code uint32) bool

IsSuccess 是否成功错误码

Types

type AsyncRateLimitConnector

type AsyncRateLimitConnector interface {
	// 初始化限流控制信息
	GetMessageSender(svcKey model.ServiceKey, hashValue uint64) (RateLimitMsgSender, error)
	// 销毁
	Destroy()
	// 流数量
	StreamCount() int
}

AsyncRateLimitConnector 异步限流连接器

func NewAsyncRateLimitConnector

func NewAsyncRateLimitConnector(valueCtx model.ValueContext, cfg config.Configuration) AsyncRateLimitConnector

NewAsyncRateLimitConnector

type BucketShareInfo

type BucketShareInfo struct {
	// contains filtered or unexported fields
}

BucketShareInfo 通用信息

type CounterIdentifier

type CounterIdentifier struct {
	// contains filtered or unexported fields
}

CounterIdentifier 计数器标识

func (CounterIdentifier) String

func (c CounterIdentifier) String() string

String ToString输出

type DurationBaseCallBack

type DurationBaseCallBack struct {
	// contains filtered or unexported fields
}

DurationBaseCallBack 基于时间段的回调结构

type FlowQuotaAssistant

type FlowQuotaAssistant struct {
	// contains filtered or unexported fields
}

FlowQuotaAssistant 限额流程的辅助类

func (*FlowQuotaAssistant) AddWindowCount

func (f *FlowQuotaAssistant) AddWindowCount()

func (*FlowQuotaAssistant) AsyncRateLimitConnector

func (f *FlowQuotaAssistant) AsyncRateLimitConnector() AsyncRateLimitConnector

func (*FlowQuotaAssistant) CountRateLimitWindowSet

func (f *FlowQuotaAssistant) CountRateLimitWindowSet() int

CountRateLimitWindowSet 获取分配窗口集合数量,只用于测试

func (*FlowQuotaAssistant) DelWindowCount

func (f *FlowQuotaAssistant) DelWindowCount()

func (*FlowQuotaAssistant) DeleteRateLimitWindowSet

func (f *FlowQuotaAssistant) DeleteRateLimitWindowSet(svcKey model.ServiceKey)

func (*FlowQuotaAssistant) Destroy

func (f *FlowQuotaAssistant) Destroy()

func (*FlowQuotaAssistant) GetAllWindowSets

func (f *FlowQuotaAssistant) GetAllWindowSets() map[model.ServiceKey]*RateLimitWindowSet

GetAllWindowSets 获取当前所有的限流窗口集合

func (*FlowQuotaAssistant) GetQuota

func (f *FlowQuotaAssistant) GetQuota(commonRequest *data.CommonRateLimitRequest) (*model.QuotaFutureImpl, error)

GetQuota 获取配额

func (*FlowQuotaAssistant) GetRateLimitWindow

func (f *FlowQuotaAssistant) GetRateLimitWindow(svcKey model.ServiceKey, rule *namingpb.Rule,
	label string) (*RateLimitWindowSet, *RateLimitWindow)

GetRateLimitWindow 获取配额分配窗口

func (*FlowQuotaAssistant) GetRateLimitWindowSet

func (f *FlowQuotaAssistant) GetRateLimitWindowSet(svcKey model.ServiceKey, create bool) *RateLimitWindowSet

GetRateLimitWindowSet 获取配额分配窗口集合

func (*FlowQuotaAssistant) GetWindowCount

func (f *FlowQuotaAssistant) GetWindowCount() int32

func (*FlowQuotaAssistant) Init

func (f *FlowQuotaAssistant) Init(engine model.Engine, cfg config.Configuration, supplier plugin.Supplier) error

Init 初始化限额辅助

func (*FlowQuotaAssistant) IsDestroyed

func (f *FlowQuotaAssistant) IsDestroyed() bool

func (*FlowQuotaAssistant) OnServiceDeleted

func (f *FlowQuotaAssistant) OnServiceDeleted(event *common.PluginEvent) error

OnServiceDeleted 服务删除回调

func (*FlowQuotaAssistant) OnServiceUpdated

func (f *FlowQuotaAssistant) OnServiceUpdated(event *common.PluginEvent) error

服务更新回调,找到具体的限流窗口集合,然后触发更新

func (*FlowQuotaAssistant) TaskValues

func (f *FlowQuotaAssistant) TaskValues() model.TaskValues

TaskValues 获取调度任务

type HostIdentifier

type HostIdentifier struct {
	// contains filtered or unexported fields
}

HostIdentifier 节点标识

func (HostIdentifier) String

func (h HostIdentifier) String() string

ToString输出

type InitializeRecord

type InitializeRecord struct {
	// contains filtered or unexported fields
}

InitializeRecord 初始化记录

func (*InitializeRecord) Expired

func (ir *InitializeRecord) Expired(nowMilli int64) bool

Expired 记录超时

type LimitMode

type LimitMode int
const (
	// 未知类型,用于兼容前面pb
	LimitUnknownMode LimitMode = 0
	// 全局类型,与限流server发生交互
	LimitGlobalMode LimitMode = 1
	// 本地类型,使用本地限流算法
	LimitLocalMode LimitMode = 2
	// 降级类型,因为无法连接限流server,导致必须使用本地限流算法
	LimitDegradeMode LimitMode = 3
)

type RateLimitGauge

type RateLimitGauge struct {
	model.EmptyInstanceGauge
	Window    *RateLimitWindow
	Namespace string
	Service   string
	Type      RateLimitType
	Labels    map[string]string
	// 限流周期, 单位秒
	Duration uint32
	// 限流发生时的mode, 和plugin的pb要保持一致
	LimitModeType LimitMode
}

限流统计gauge

type RateLimitMsgSender

type RateLimitMsgSender interface {
	// 是否已经初始化
	HasInitialized(svcKey model.ServiceKey, labels string) bool
	// 发送初始化请求
	SendInitRequest(request *rlimitV2.RateLimitInitRequest, callback ResponseCallBack)
	// 发送上报请求
	SendReportRequest(request *rlimitV2.ClientRateLimitReportRequest) error
	// 同步时间
	AdjustTime() int64
}

RateLimitMsgSender 限流消息同步器

type RateLimitType

type RateLimitType int

RateLimitType 限流的类型

const (
	TrafficShapingLimited RateLimitType = 0
	QuotaLimited          RateLimitType = 1
	WindowDeleted         RateLimitType = 2
	// QuotaRequested        RateLimitType = 3
	QuotaGranted RateLimitType = 4
)

type RateLimitWindow

type RateLimitWindow struct {
	// 配额窗口集合
	WindowSet *RateLimitWindowSet
	// 服务信息
	SvcKey model.ServiceKey
	// 正则对应的label
	Labels string

	// 已经匹配到的限流规则,没有匹配则为空
	// 由于可能会出现规则并没有发生变化,但是缓存对象更新的情况,因此这里使用原子变量
	Rule *namingpb.Rule
	// 其他插件在这里添加的相关数据,一般是统计插件使用
	PluginData map[int32]interface{}
	// contains filtered or unexported fields
}

RateLimitWindow 限流窗口

func NewRateLimitWindow

func NewRateLimitWindow(windowSet *RateLimitWindowSet, rule *namingpb.Rule,
	commonRequest *data.CommonRateLimitRequest, labels string) (*RateLimitWindow, error)

NewRateLimitWindow 创建限流窗口

func (*RateLimitWindow) AllocateQuota

func (r *RateLimitWindow) AllocateQuota(commonRequest *data.CommonRateLimitRequest) (*model.QuotaFutureImpl, error)

AllocateQuota 分配配额

func (*RateLimitWindow) AsyncRateLimitConnector

func (r *RateLimitWindow) AsyncRateLimitConnector() AsyncRateLimitConnector

AsyncRateLimitConnector 获取异步连接器

func (*RateLimitWindow) CasStatus

func (r *RateLimitWindow) CasStatus(oldStatus int64, status int64) bool

CasStatus CAS设置状态

func (*RateLimitWindow) CompareTo

func (r *RateLimitWindow) CompareTo(another interface{}) int

CompareTo 比较两个窗口是否相同

func (*RateLimitWindow) DoAsyncRemoteAcquire

func (r *RateLimitWindow) DoAsyncRemoteAcquire() error

DoAsyncRemoteAcquire 异步发送 acquire

func (*RateLimitWindow) DoAsyncRemoteInit

func (r *RateLimitWindow) DoAsyncRemoteInit() error

DoAsyncRemoteInit 异步处理发送init

func (*RateLimitWindow) Engine

func (r *RateLimitWindow) Engine() model.Engine

Engine 获取SDK引擎

func (*RateLimitWindow) EnsureDeleted

func (r *RateLimitWindow) EnsureDeleted(value interface{}) bool

EnsureDeleted 删除前进行检查,返回true才删除,该检查是同步操作

func (*RateLimitWindow) Expired

func (r *RateLimitWindow) Expired(nowMilli int64) bool

Expired 是否已经过期

func (*RateLimitWindow) GetLastAccessTimeMilli

func (r *RateLimitWindow) GetLastAccessTimeMilli() int64

GetLastAccessTimeMilli 获取最近访问时间

func (*RateLimitWindow) GetStatus

func (r *RateLimitWindow) GetStatus() int64

GetStatus 原子获取状态

func (*RateLimitWindow) Init

func (r *RateLimitWindow) Init()

Init 初始化限流窗口

func (*RateLimitWindow) InitializeRequest

func (r *RateLimitWindow) InitializeRequest() *rlimitV2.RateLimitInitRequest

InitializeRequest 转换成限流PB初始化消息

func (*RateLimitWindow) OnInitResponse

func (r *RateLimitWindow) OnInitResponse(counter *rlimitV2.QuotaCounter, duration time.Duration, srvTimeMilli int64)

OnInitResponse 应答回调函数

func (*RateLimitWindow) OnReportResponse

func (r *RateLimitWindow) OnReportResponse(counter *rlimitV2.QuotaLeft, duration time.Duration, curTimeMilli int64)

OnReportResponse 应答回调函数

func (*RateLimitWindow) SetStatus

func (r *RateLimitWindow) SetStatus(status int64)

SetStatus 设置状态

type RateLimitWindowSet

type RateLimitWindowSet struct {
	// contains filtered or unexported fields
}

RateLimitWindowSet 限流分配窗口的缓存

func NewRateLimitWindowSet

func NewRateLimitWindowSet(assistant *FlowQuotaAssistant) *RateLimitWindowSet

NewRateLimitWindowSet 构造函数

func (*RateLimitWindowSet) AddRateLimitWindow

func (rs *RateLimitWindowSet) AddRateLimitWindow(
	commonRequest *data.CommonRateLimitRequest, rule *namingpb.Rule, flatLabels string) (*RateLimitWindow, error)

AddRateLimitWindow 添加限流窗口

func (*RateLimitWindowSet) GetRateLimitWindow

func (rs *RateLimitWindowSet) GetRateLimitWindow(rule *namingpb.Rule, flatLabels string) *RateLimitWindow

GetRateLimitWindow 获取限流窗口

func (*RateLimitWindowSet) GetRateLimitWindows

func (rs *RateLimitWindowSet) GetRateLimitWindows() []*RateLimitWindow

GetRateLimitWindows 拷贝一份只读数据

func (*RateLimitWindowSet) OnServiceUpdated

func (rs *RateLimitWindowSet) OnServiceUpdated(svcEventObject *common.ServiceEventObject)

OnServiceUpdated 服务更新回调

func (*RateLimitWindowSet) OnWindowExpired

func (rs *RateLimitWindowSet) OnWindowExpired(nowMilli int64, window *RateLimitWindow) bool

OnWindowExpired 窗口过期

func (*RateLimitWindowSet) PurgeWindows

func (rs *RateLimitWindowSet) PurgeWindows(nowMilli int64)

PurgeWindows 执行窗口淘汰

type RemoteAwareBucket

type RemoteAwareBucket interface {
	// 父接口,执行用户配额分配操作
	model.QuotaAllocator
	// 设置通过限流服务端获取的远程配额
	SetRemoteQuota(*RemoteQuotaResult)
	// 获取已经分配的配额
	GetQuotaUsed(curTimeMilli int64) *UsageInfo
	// 获取TokenBuckets
	GetTokenBuckets() TokenBuckets
	// 更新时间间隔
	UpdateTimeDiff(timeDiff int64)
}

RemoteAwareBucket 远程配额分配的令牌桶

type RemoteAwareQpsBucket

type RemoteAwareQpsBucket struct {
	// contains filtered or unexported fields
}

RemoteAwareQpsBucket 远程配额分配的算法桶

func NewRemoteAwareQpsBucket

func NewRemoteAwareQpsBucket(window *RateLimitWindow) *RemoteAwareQpsBucket

NewRemoteAwareQpsBucket 创建QPS远程限流窗口

func (*RemoteAwareQpsBucket) Allocate

func (r *RemoteAwareQpsBucket) Allocate() *model.QuotaResponse

Allocate 执行配额分配操作

func (*RemoteAwareQpsBucket) GetQuotaUsed

func (r *RemoteAwareQpsBucket) GetQuotaUsed(curTimeMilli int64) *UsageInfo

func (*RemoteAwareQpsBucket) GetTokenBuckets

func (r *RemoteAwareQpsBucket) GetTokenBuckets() TokenBuckets

func (*RemoteAwareQpsBucket) Release

func (r *RemoteAwareQpsBucket) Release()

Release 执行配额回收操作

func (*RemoteAwareQpsBucket) SetRemoteQuota

func (r *RemoteAwareQpsBucket) SetRemoteQuota(remoteQuotas *RemoteQuotaResult)

SetRemoteQuota 设置通过限流服务端获取的远程QPS

func (*RemoteAwareQpsBucket) UpdateTimeDiff

func (r *RemoteAwareQpsBucket) UpdateTimeDiff(timeDiff int64)

UpdateTimeDiff 更新时间间隔

type RemoteErrorContainer

type RemoteErrorContainer struct {
	// contains filtered or unexported fields
}

RemoteErrorContainer 远程访问的错误信息

type RemoteQuotaCallBack

type RemoteQuotaCallBack struct {
	// contains filtered or unexported fields
}

RemoteQuotaCallBack 远程配额查询任务

func NewRemoteQuotaCallback

func NewRemoteQuotaCallback(cfg config.Configuration, supplier plugin.Supplier,
	engine model.Engine, connector AsyncRateLimitConnector) (*RemoteQuotaCallBack, error)

NewRemoteQuotaCallback 创建查询任务

func (*RemoteQuotaCallBack) OnTaskEvent

func (r *RemoteQuotaCallBack) OnTaskEvent(event model.TaskEvent)

OnTaskEvent 任务事件回调

func (*RemoteQuotaCallBack) Process

func (r *RemoteQuotaCallBack) Process(
	taskKey interface{}, taskValue interface{}, lastProcessTime time.Time) model.TaskResult

Process 处理远程配额查询任务

type RemoteQuotaResult

type RemoteQuotaResult struct {
	Left            int64
	ClientCount     uint32
	ServerTimeMilli int64
	DurationMill    int64
}

RemoteQuotaResult 远程下发配额

type RemoteSyncParam

type RemoteSyncParam struct {
	// 连接相关参数
	model.ControlParam
}

RemoteSyncParam 远程同步相关参数

type ReportElements

type ReportElements struct {
	TotalCount int64
	LimitCount int64
}

ReportElements 返回记录

type ResponseCallBack

type ResponseCallBack interface {
	// 应答回调函数
	OnInitResponse(counter *rlimitV2.QuotaCounter, duration time.Duration, curTimeMilli int64)
	// 应答回调函数
	OnReportResponse(counter *rlimitV2.QuotaLeft, duration time.Duration, curTimeMilli int64)
}

ResponseCallBack 应答回调函数

type SlidingWindow

type SlidingWindow struct {
	// contains filtered or unexported fields
}

SlidingWindow 滑窗通用实现

func NewSlidingWindow

func NewSlidingWindow(slideCount int, intervalMs int) *SlidingWindow

NewSlidingWindow 创建滑窗

func (*SlidingWindow) AcquireCurrentValues

func (s *SlidingWindow) AcquireCurrentValues(curTimeMs int64) (uint32, uint32, *Window)

AcquireCurrentValues 获取上报数据

func (*SlidingWindow) AddAndGetCurrentLimited

func (s *SlidingWindow) AddAndGetCurrentLimited(curTimeMs int64, value uint32) (uint32, *Window)

AddAndGetCurrentLimited 原子增加,并返回当前bucket

func (*SlidingWindow) AddAndGetCurrentPassed

func (s *SlidingWindow) AddAndGetCurrentPassed(curTimeMs int64, value uint32) (uint32, *Window)

AddAndGetCurrentPassed 原子增加,并返回当前bucket

func (*SlidingWindow) TouchCurrentPassed

func (s *SlidingWindow) TouchCurrentPassed(curTimeMs int64) (uint32, *Window)

TouchCurrentPassed 获取上报数据

type StatisticsBucket

type StatisticsBucket struct {
	// contains filtered or unexported fields
}

用于metric report 统计

func NewStatisticsBucket

func NewStatisticsBucket() *StatisticsBucket

NewStatisticsBucket create StatisticsBucket

func (*StatisticsBucket) AddCount

func (b *StatisticsBucket) AddCount(isLimit bool, now time.Time)

AddCount,外面使用该接口

func (*StatisticsBucket) AddCountByUnixTime

func (b *StatisticsBucket) AddCountByUnixTime(isLimit bool, now int64)

func (*StatisticsBucket) GetReportData

func (b *StatisticsBucket) GetReportData(periodTime int64) *ReportElements

GetReportData

type StreamCounterSet

type StreamCounterSet struct {

	// 目标节点信息
	HostIdentifier *HostIdentifier
	// contains filtered or unexported fields
}

StreamCounterSet 同一个节点的counter集合,用于回调

func NewStreamCounterSet

func NewStreamCounterSet(asyncConnector *asyncRateLimitConnector, identifier *HostIdentifier) *StreamCounterSet

NewStreamCounterSet 新建流管理器

func (*StreamCounterSet) AdjustTime

func (s *StreamCounterSet) AdjustTime() int64

AdjustTime 同步时间

func (*StreamCounterSet) CompareTo

func (s *StreamCounterSet) CompareTo(value interface{}) int

CompareTo 比较两个元素

func (*StreamCounterSet) EnsureDeleted

func (s *StreamCounterSet) EnsureDeleted(value interface{}) bool

EnsureDeleted 删除前进行检查,返回true才删除,该检查是同步操作

func (*StreamCounterSet) Expired

func (s *StreamCounterSet) Expired(nowMilli int64, clearRecords bool) bool

Expired 检查是否已经超时

func (*StreamCounterSet) HasInitialized

func (s *StreamCounterSet) HasInitialized(svcKey model.ServiceKey, labels string) bool

HasInitialized 是否已经初始化

func (*StreamCounterSet) SendInitRequest

func (s *StreamCounterSet) SendInitRequest(initReq *rlimitV2.RateLimitInitRequest, callback ResponseCallBack)

SendInitRequest 发送初始化请求

func (*StreamCounterSet) SendReportRequest

func (s *StreamCounterSet) SendReportRequest(clientReportReq *rlimitV2.ClientRateLimitReportRequest) error

SendReportRequest 发送上报请求

type TokenBucket

type TokenBucket struct {
	UpdateIdentifier
	// contains filtered or unexported fields
}

TokenBucket 令牌桶

func NewTokenBucket

func NewTokenBucket(
	windowKey string, validDuration time.Duration, tokenAmount uint32, shareInfo *BucketShareInfo) *TokenBucket

NewTokenBucket 创建令牌桶

func (*TokenBucket) ConfirmLimited

func (t *TokenBucket) ConfirmLimited(limited uint32, nowMilli int64)

ConfirmLimited 记录限流分配配额

func (*TokenBucket) ConfirmPassed

func (t *TokenBucket) ConfirmPassed(passed uint32, nowMilli int64)

ConfirmPassed 记录真实分配配额

func (*TokenBucket) GetRuleTotal

func (t *TokenBucket) GetRuleTotal() int64

GetRuleTotal 获取限流总量

func (*TokenBucket) GiveBackToken

func (t *TokenBucket) GiveBackToken(identifier *UpdateIdentifier, token int64, mode TokenBucketMode)

GiveBackToken 归还配额

func (*TokenBucket) TryAllocateToken

func (t *TokenBucket) TryAllocateToken(
	token uint32, nowMilli int64, identifier *UpdateIdentifier, mode TokenBucketMode) (int64, TokenBucketMode)

TryAllocateToken 尝试分配配额

func (*TokenBucket) UpdateRemoteClientCount

func (t *TokenBucket) UpdateRemoteClientCount(remoteQuotas *RemoteQuotaResult)

UpdateRemoteClientCount 只更新远程客户端数量,不更新配额

func (*TokenBucket) UpdateRemoteToken

func (t *TokenBucket) UpdateRemoteToken(remoteQuotas *RemoteQuotaResult, updateClient bool)

UpdateRemoteToken 更新远程配额

type TokenBucketMode

type TokenBucketMode int
const (
	Unknown TokenBucketMode = iota
	Remote
	RemoteToLocal
	Local
)

type TokenBuckets

type TokenBuckets []*TokenBucket

TokenBuckets 令牌桶序列

func (TokenBuckets) Len

func (tbs TokenBuckets) Len() int

Len 数组长度

func (TokenBuckets) Less

func (tbs TokenBuckets) Less(i, j int) bool

Less 比较数组成员大小

func (TokenBuckets) Swap

func (tbs TokenBuckets) Swap(i, j int)

Swap 交换数组成员

type UpdateIdentifier

type UpdateIdentifier struct {
	// contains filtered or unexported fields
}

UpdateIdentifier 令牌桶是否进行更新的凭证

type UsageInfo

type UsageInfo struct {
	// 配额使用时间
	CurTimeMilli int64
	// 配额使用详情
	Passed map[int64]uint32
	// 限流情况
	Limited map[int64]uint32
}

UsageInfo 配额使用信息

type Window

type Window struct {
	// 起始时间
	WindowStart int64
	// 通过数
	PassedValue uint32
	// 被限流数
	LimitedValue uint32
}

Window 单个窗口

type WindowContainer

type WindowContainer struct {
	// 主窗口,非正则表达式的适用
	MainWindow *RateLimitWindow
	// 适用于正则表达式展开的
	WindowByLabel map[string]*RateLimitWindow
}

WindowContainer 窗口容器

func NewWindowContainer

func NewWindowContainer() *WindowContainer

NewWindowContainer 创建窗口容器

func (*WindowContainer) GetRateLimitWindows

func (w *WindowContainer) GetRateLimitWindows() []*RateLimitWindow

GetRateLimitWindows 获取限流滑窗

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL