Documentation ¶
Index ¶
- Constants
- Variables
- func FormatLabelToStr(request *data.CommonRateLimitRequest, rule *apitraffic.Rule) (string, bool)
- func HasRegex(rule *apitraffic.Rule) bool
- func IsSuccess(code uint32) bool
- type AsyncRateLimitConnector
- type CounterIdentifier
- type DurationBaseCallBack
- type FlowQuotaAssistant
- func (f *FlowQuotaAssistant) AddWindowCount()
- func (f *FlowQuotaAssistant) AsyncRateLimitConnector() AsyncRateLimitConnector
- func (f *FlowQuotaAssistant) CountRateLimitWindowSet() int
- func (f *FlowQuotaAssistant) DelWindowCount()
- func (f *FlowQuotaAssistant) DeleteRateLimitWindowSet(svcKey model.ServiceKey)
- func (f *FlowQuotaAssistant) Destroy()
- func (f *FlowQuotaAssistant) GetAllWindowSets() map[model.ServiceKey]*RateLimitWindowSet
- func (f *FlowQuotaAssistant) GetQuota(commonRequest *data.CommonRateLimitRequest) (*model.QuotaFutureImpl, error)
- func (f *FlowQuotaAssistant) GetRateLimitWindow(svcKey model.ServiceKey, rule *apitraffic.Rule, label string) (*RateLimitWindowSet, *RateLimitWindow)
- func (f *FlowQuotaAssistant) GetRateLimitWindowSet(svcKey model.ServiceKey, create bool) *RateLimitWindowSet
- func (f *FlowQuotaAssistant) GetWindowCount() int32
- func (f *FlowQuotaAssistant) Init(engine model.Engine, cfg config.Configuration, supplier plugin.Supplier) error
- func (f *FlowQuotaAssistant) IsDestroyed() bool
- func (f *FlowQuotaAssistant) OnServiceDeleted(event *common.PluginEvent) error
- func (f *FlowQuotaAssistant) OnServiceUpdated(event *common.PluginEvent) error
- func (f *FlowQuotaAssistant) TaskValues() model.TaskValues
- type HostIdentifier
- type InitializeRecord
- type RateLimitMsgSender
- type RateLimitWindow
- func (r *RateLimitWindow) AllocateQuota(commonRequest *data.CommonRateLimitRequest) *model.QuotaResponse
- func (r *RateLimitWindow) AsyncRateLimitConnector() AsyncRateLimitConnector
- func (r *RateLimitWindow) CasStatus(oldStatus int64, status int64) bool
- func (r *RateLimitWindow) CompareTo(another interface{}) int
- func (r *RateLimitWindow) DoAsyncRemoteAcquire() error
- func (r *RateLimitWindow) DoAsyncRemoteInit() error
- func (r *RateLimitWindow) Engine() model.Engine
- func (r *RateLimitWindow) EnsureDeleted(value interface{}) bool
- func (r *RateLimitWindow) Expired(nowMilli int64) bool
- func (r *RateLimitWindow) GetLastAccessTimeMilli() int64
- func (r *RateLimitWindow) GetStatus() int64
- func (r *RateLimitWindow) Init()
- func (r *RateLimitWindow) InitializeRequest() *slimiter.RateLimitInitRequest
- func (r *RateLimitWindow) OnInitResponse(counter *slimiter.QuotaCounter, duration time.Duration, srvTimeMilli int64)
- func (r *RateLimitWindow) OnReportResponse(counter *slimiter.QuotaLeft, duration time.Duration, curTimeMilli int64)
- func (r *RateLimitWindow) SetStatus(status int64)
- func (r *RateLimitWindow) UpdateTimeDiff(timeDiff int64)
- type RateLimitWindowSet
- func (rs *RateLimitWindowSet) AddRateLimitWindow(commonRequest *data.CommonRateLimitRequest, rule *apitraffic.Rule, ...) *RateLimitWindow
- func (rs *RateLimitWindowSet) GetRateLimitWindow(rule *apitraffic.Rule, flatLabels string) *RateLimitWindow
- func (rs *RateLimitWindowSet) GetRateLimitWindows() []*RateLimitWindow
- func (rs *RateLimitWindowSet) OnServiceUpdated(svcEventObject *common.ServiceEventObject)
- func (rs *RateLimitWindowSet) OnWindowExpired(nowMilli int64, window *RateLimitWindow) bool
- func (rs *RateLimitWindowSet) PurgeWindows(nowMilli int64)
- type RemoteErrorContainer
- type RemoteQuotaCallBack
- type RemoteSyncParam
- type ResponseCallBack
- type StreamCounterSet
- func (s *StreamCounterSet) AdjustTime() int64
- func (s *StreamCounterSet) CompareTo(value interface{}) int
- func (s *StreamCounterSet) EnsureDeleted(value interface{}) bool
- func (s *StreamCounterSet) Expired(nowMilli int64, clearRecords bool) bool
- func (s *StreamCounterSet) HasInitialized(svcKey model.ServiceKey, labels string) bool
- func (s *StreamCounterSet) SendInitRequest(initReq *ratelimiter.RateLimitInitRequest, callback ResponseCallBack)
- func (s *StreamCounterSet) SendReportRequest(clientReportReq *limitpb.ClientRateLimitReportRequest) error
- type WindowContainer
Constants ¶
const ( // Disabled is a constant for disabled quota. Disabled = "rateLimit disabled" // RuleNotExists is a constant for rules not exist. RuleNotExists = "quota rule not exists" )
const ( // Created 刚创建, 无需进行后台调度 Created int64 = iota // Initializing 已获取调度权,准备开始调度 Initializing // Initialized 已经在远程初始化结束 Initialized // Deleted 已经删除 Deleted )
Variables ¶
var ( // 淘汰因子,过期时间=MaxDuration + ExpireFactor ExpireFactor = 1 * time.Second DefaultStatisticReportPeriod = 1 * time.Second )
超过多长时间后进行淘汰,淘汰后需要重新init
Functions ¶
func FormatLabelToStr ¶
func FormatLabelToStr(request *data.CommonRateLimitRequest, rule *apitraffic.Rule) (string, bool)
FormatLabelToStr 格式化字符串
Types ¶
type AsyncRateLimitConnector ¶
type AsyncRateLimitConnector interface { // GetMessageSender 初始化限流控制信息 GetMessageSender(svcKey model.ServiceKey, hashValue uint64) (RateLimitMsgSender, error) // Destroy 销毁 Destroy() // StreamCount 流数量 StreamCount() int }
AsyncRateLimitConnector 异步限流连接器
func NewAsyncRateLimitConnector ¶
func NewAsyncRateLimitConnector(valueCtx model.ValueContext, cfg config.Configuration) AsyncRateLimitConnector
NewAsyncRateLimitConnector .
type CounterIdentifier ¶
type CounterIdentifier struct {
// contains filtered or unexported fields
}
CounterIdentifier 计数器标识
type DurationBaseCallBack ¶
type DurationBaseCallBack struct {
// contains filtered or unexported fields
}
DurationBaseCallBack 基于时间段的回调结构
type FlowQuotaAssistant ¶
type FlowQuotaAssistant struct {
// contains filtered or unexported fields
}
FlowQuotaAssistant 限额流程的辅助类
func (*FlowQuotaAssistant) AddWindowCount ¶
func (f *FlowQuotaAssistant) AddWindowCount()
AddWindowCount 添加窗口数量
func (*FlowQuotaAssistant) AsyncRateLimitConnector ¶
func (f *FlowQuotaAssistant) AsyncRateLimitConnector() AsyncRateLimitConnector
AsyncRateLimitConnector 异步限流连接器
func (*FlowQuotaAssistant) CountRateLimitWindowSet ¶
func (f *FlowQuotaAssistant) CountRateLimitWindowSet() int
CountRateLimitWindowSet 获取分配窗口集合数量,只用于测试
func (*FlowQuotaAssistant) DelWindowCount ¶
func (f *FlowQuotaAssistant) DelWindowCount()
DelWindowCount 减少窗口数量
func (*FlowQuotaAssistant) DeleteRateLimitWindowSet ¶
func (f *FlowQuotaAssistant) DeleteRateLimitWindowSet(svcKey model.ServiceKey)
DeleteRateLimitWindowSet 删除窗口集合
func (*FlowQuotaAssistant) GetAllWindowSets ¶
func (f *FlowQuotaAssistant) GetAllWindowSets() map[model.ServiceKey]*RateLimitWindowSet
GetAllWindowSets 获取当前所有的限流窗口集合
func (*FlowQuotaAssistant) GetQuota ¶
func (f *FlowQuotaAssistant) GetQuota(commonRequest *data.CommonRateLimitRequest) (*model.QuotaFutureImpl, error)
GetQuota 获取配额
func (*FlowQuotaAssistant) GetRateLimitWindow ¶
func (f *FlowQuotaAssistant) GetRateLimitWindow(svcKey model.ServiceKey, rule *apitraffic.Rule, label string) (*RateLimitWindowSet, *RateLimitWindow)
GetRateLimitWindow 获取配额分配窗口
func (*FlowQuotaAssistant) GetRateLimitWindowSet ¶
func (f *FlowQuotaAssistant) GetRateLimitWindowSet(svcKey model.ServiceKey, create bool) *RateLimitWindowSet
GetRateLimitWindowSet 获取配额分配窗口集合
func (*FlowQuotaAssistant) GetWindowCount ¶
func (f *FlowQuotaAssistant) GetWindowCount() int32
GetWindowCount 获取窗口数量
func (*FlowQuotaAssistant) Init ¶
func (f *FlowQuotaAssistant) Init(engine model.Engine, cfg config.Configuration, supplier plugin.Supplier) error
Init 初始化限额辅助
func (*FlowQuotaAssistant) IsDestroyed ¶
func (f *FlowQuotaAssistant) IsDestroyed() bool
IsDestroyed 是否已销毁
func (*FlowQuotaAssistant) OnServiceDeleted ¶
func (f *FlowQuotaAssistant) OnServiceDeleted(event *common.PluginEvent) error
OnServiceDeleted 服务删除回调
func (*FlowQuotaAssistant) OnServiceUpdated ¶
func (f *FlowQuotaAssistant) OnServiceUpdated(event *common.PluginEvent) error
OnServiceUpdated 服务更新回调,找到具体的限流窗口集合,然后触发更新
func (*FlowQuotaAssistant) TaskValues ¶
func (f *FlowQuotaAssistant) TaskValues() model.TaskValues
TaskValues 获取调度任务
type HostIdentifier ¶
type HostIdentifier struct {
// contains filtered or unexported fields
}
HostIdentifier 节点标识
type InitializeRecord ¶
type InitializeRecord struct {
// contains filtered or unexported fields
}
InitializeRecord 初始化记录
func (*InitializeRecord) Expired ¶
func (ir *InitializeRecord) Expired(nowMilli int64) bool
Expired 记录超时
type RateLimitMsgSender ¶
type RateLimitMsgSender interface { // HasInitialized 是否已经初始化 HasInitialized(svcKey model.ServiceKey, labels string) bool // SendInitRequest 发送初始化请求 SendInitRequest(request *ratelimiter.RateLimitInitRequest, callback ResponseCallBack) // SendReportRequest 发送上报请求 SendReportRequest(request *limitpb.ClientRateLimitReportRequest) error // AdjustTime 同步时间 AdjustTime() int64 }
RateLimitMsgSender 限流消息同步器
type RateLimitWindow ¶
type RateLimitWindow struct { // 配额窗口集合 WindowSet *RateLimitWindowSet // 服务信息 SvcKey model.ServiceKey // 正则对应的label Labels string // 已经匹配到的限流规则,没有匹配则为空 // 由于可能会出现规则并没有发生变化,但是缓存对象更新的情况,因此这里使用原子变量 Rule *apitraffic.Rule // 其他插件在这里添加的相关数据,一般是统计插件使用 PluginData map[int32]interface{} // contains filtered or unexported fields }
RateLimitWindow 限流窗口
func NewRateLimitWindow ¶
func NewRateLimitWindow(windowSet *RateLimitWindowSet, rule *apitraffic.Rule, commonRequest *data.CommonRateLimitRequest, labels string) *RateLimitWindow
NewRateLimitWindow 创建限流窗口
func (*RateLimitWindow) AllocateQuota ¶
func (r *RateLimitWindow) AllocateQuota(commonRequest *data.CommonRateLimitRequest) *model.QuotaResponse
AllocateQuota 分配配额
func (*RateLimitWindow) AsyncRateLimitConnector ¶
func (r *RateLimitWindow) AsyncRateLimitConnector() AsyncRateLimitConnector
AsyncRateLimitConnector 获取异步连接器
func (*RateLimitWindow) CasStatus ¶
func (r *RateLimitWindow) CasStatus(oldStatus int64, status int64) bool
CasStatus CAS设置状态
func (*RateLimitWindow) CompareTo ¶
func (r *RateLimitWindow) CompareTo(another interface{}) int
CompareTo 比较两个窗口是否相同
func (*RateLimitWindow) DoAsyncRemoteAcquire ¶
func (r *RateLimitWindow) DoAsyncRemoteAcquire() error
DoAsyncRemoteAcquire 异步发送 acquire
func (*RateLimitWindow) DoAsyncRemoteInit ¶
func (r *RateLimitWindow) DoAsyncRemoteInit() error
DoAsyncRemoteInit 异步处理发送init
func (*RateLimitWindow) EnsureDeleted ¶
func (r *RateLimitWindow) EnsureDeleted(value interface{}) bool
EnsureDeleted 删除前进行检查,返回true才删除,该检查是同步操作
func (*RateLimitWindow) Expired ¶
func (r *RateLimitWindow) Expired(nowMilli int64) bool
Expired 是否已经过期
func (*RateLimitWindow) GetLastAccessTimeMilli ¶
func (r *RateLimitWindow) GetLastAccessTimeMilli() int64
GetLastAccessTimeMilli 获取最近访问时间
func (*RateLimitWindow) InitializeRequest ¶
func (r *RateLimitWindow) InitializeRequest() *slimiter.RateLimitInitRequest
InitializeRequest 转换成限流PB初始化消息
func (*RateLimitWindow) OnInitResponse ¶
func (r *RateLimitWindow) OnInitResponse(counter *slimiter.QuotaCounter, duration time.Duration, srvTimeMilli int64)
OnInitResponse 应答回调函数
func (*RateLimitWindow) OnReportResponse ¶
func (r *RateLimitWindow) OnReportResponse(counter *slimiter.QuotaLeft, duration time.Duration, curTimeMilli int64)
OnReportResponse 应答回调函数
func (*RateLimitWindow) SetStatus ¶
func (r *RateLimitWindow) SetStatus(status int64)
SetStatus 设置状态
func (*RateLimitWindow) UpdateTimeDiff ¶
func (r *RateLimitWindow) UpdateTimeDiff(timeDiff int64)
UpdateTimeDiff 更新时间间隔
type RateLimitWindowSet ¶
type RateLimitWindowSet struct {
// contains filtered or unexported fields
}
RateLimitWindowSet 限流分配窗口的缓存
func NewRateLimitWindowSet ¶
func NewRateLimitWindowSet(assistant *FlowQuotaAssistant) *RateLimitWindowSet
NewRateLimitWindowSet 构造函数
func (*RateLimitWindowSet) AddRateLimitWindow ¶
func (rs *RateLimitWindowSet) AddRateLimitWindow( commonRequest *data.CommonRateLimitRequest, rule *apitraffic.Rule, flatLabels string, regexSpread bool) *RateLimitWindow
AddRateLimitWindow 添加限流窗口
func (*RateLimitWindowSet) GetRateLimitWindow ¶
func (rs *RateLimitWindowSet) GetRateLimitWindow(rule *apitraffic.Rule, flatLabels string) *RateLimitWindow
GetRateLimitWindow 获取限流窗口
func (*RateLimitWindowSet) GetRateLimitWindows ¶
func (rs *RateLimitWindowSet) GetRateLimitWindows() []*RateLimitWindow
GetRateLimitWindows 拷贝一份只读数据
func (*RateLimitWindowSet) OnServiceUpdated ¶
func (rs *RateLimitWindowSet) OnServiceUpdated(svcEventObject *common.ServiceEventObject)
OnServiceUpdated 服务更新回调
func (*RateLimitWindowSet) OnWindowExpired ¶
func (rs *RateLimitWindowSet) OnWindowExpired(nowMilli int64, window *RateLimitWindow) bool
OnWindowExpired 窗口过期
func (*RateLimitWindowSet) PurgeWindows ¶
func (rs *RateLimitWindowSet) PurgeWindows(nowMilli int64)
PurgeWindows 执行窗口淘汰
type RemoteErrorContainer ¶
type RemoteErrorContainer struct {
// contains filtered or unexported fields
}
RemoteErrorContainer 远程访问的错误信息
type RemoteQuotaCallBack ¶
type RemoteQuotaCallBack struct {
// contains filtered or unexported fields
}
RemoteQuotaCallBack 远程配额查询任务
func NewRemoteQuotaCallback ¶
func NewRemoteQuotaCallback(cfg config.Configuration, supplier plugin.Supplier, engine model.Engine, connector AsyncRateLimitConnector) (*RemoteQuotaCallBack, error)
NewRemoteQuotaCallback 创建查询任务
func (*RemoteQuotaCallBack) OnTaskEvent ¶
func (r *RemoteQuotaCallBack) OnTaskEvent(event model.TaskEvent)
OnTaskEvent 任务事件回调
func (*RemoteQuotaCallBack) Process ¶
func (r *RemoteQuotaCallBack) Process( taskKey interface{}, taskValue interface{}, lastProcessTime time.Time) model.TaskResult
Process 处理远程配额查询任务
type RemoteSyncParam ¶
type RemoteSyncParam struct { // 连接相关参数 model.ControlParam }
RemoteSyncParam 远程同步相关参数
type ResponseCallBack ¶
type ResponseCallBack interface { // OnInitResponse 应答回调函数 OnInitResponse(counter *ratelimiter.QuotaCounter, duration time.Duration, curTimeMilli int64) // OnReportResponse 应答回调函数 OnReportResponse(counter *ratelimiter.QuotaLeft, duration time.Duration, curTimeMilli int64) }
ResponseCallBack 应答回调函数
type StreamCounterSet ¶
type StreamCounterSet struct { // 目标节点信息 HostIdentifier *HostIdentifier // contains filtered or unexported fields }
StreamCounterSet 同一个节点的counter集合,用于回调
func NewStreamCounterSet ¶
func NewStreamCounterSet(asyncConnector *asyncRateLimitConnector, identifier *HostIdentifier) *StreamCounterSet
NewStreamCounterSet 新建流管理器
func (*StreamCounterSet) CompareTo ¶
func (s *StreamCounterSet) CompareTo(value interface{}) int
CompareTo 比较两个元素
func (*StreamCounterSet) EnsureDeleted ¶
func (s *StreamCounterSet) EnsureDeleted(value interface{}) bool
EnsureDeleted 删除前进行检查,返回true才删除,该检查是同步操作
func (*StreamCounterSet) Expired ¶
func (s *StreamCounterSet) Expired(nowMilli int64, clearRecords bool) bool
Expired 检查是否已经超时
func (*StreamCounterSet) HasInitialized ¶
func (s *StreamCounterSet) HasInitialized(svcKey model.ServiceKey, labels string) bool
HasInitialized 是否已经初始化
func (*StreamCounterSet) SendInitRequest ¶
func (s *StreamCounterSet) SendInitRequest(initReq *ratelimiter.RateLimitInitRequest, callback ResponseCallBack)
SendInitRequest 发送初始化请求
func (*StreamCounterSet) SendReportRequest ¶
func (s *StreamCounterSet) SendReportRequest(clientReportReq *limitpb.ClientRateLimitReportRequest) error
SendReportRequest 发送上报请求
type WindowContainer ¶
type WindowContainer struct { // 主窗口,非正则表达式的适用 MainWindow *RateLimitWindow // 适用于正则表达式展开的 WindowByLabel map[string]*RateLimitWindow }
WindowContainer 窗口容器
func (*WindowContainer) GetRateLimitWindows ¶
func (w *WindowContainer) GetRateLimitWindows() []*RateLimitWindow
GetRateLimitWindows 获取限流滑窗