Documentation ¶
Overview ¶
Copyright (c) 2016,2020 Uber Technologies, Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
SOURCE: https://github.com/uber-go/ratelimit/blob/main/limiter_mutexbased.go EDIT: slight modification to allow setting rate limit on the fly SCOPE: LeakyBucket
Index ¶
- Constants
- Variables
- func AggregateRTPStats(statsList []*livekit.RTPStats, gapHistogramSize int) *livekit.RTPStats
- func CloneProto[T proto.Message](m T) T
- func CloseFunc[T CloseFuncType](fn T) io.Closer
- func DeepCopy[T any](v T) T
- func EnableLockTracker()
- func ErrorIsOneOf(err error, targets ...error) bool
- func FastLogisticFunc(x0, L, k float64) func(x float64) float64
- func GetMimeTypeForAudioCodec(codec livekit.AudioCodec) string
- func GetMimeTypeForVideoCodec(codec livekit.VideoCodec) string
- func HashedID(id string) string
- func HedgeCall[T any](ctx context.Context, params HedgeParams[T]) (v T, err error)
- func IsConnectionQualityHigher(prev livekit.ConnectionQuality, curr livekit.ConnectionQuality) bool
- func IsConnectionQualityLower(prev livekit.ConnectionQuality, curr livekit.ConnectionQuality) bool
- func LocalNodeID() (string, error)
- func LogisticFunc(x0, L, k float64) func(x float64) float64
- func MarshalGuid[T livekit.Guid](id T) livekit.GuidBlock
- func NewGuid(prefix string) string
- func NumMutexes() int
- func ParallelExec[T any](vals []T, parallelThreshold, step uint64, fn func(T))
- func RandomSecret() string
- func Redact(s, name string) string
- func RedactIdentifier(identifier string) string
- func RedactStreamKey(url string) (string, bool)
- func ToggleLockTrackerStackTraces(enable bool)
- func UnmarshalGuid[T livekit.Guid](b livekit.GuidBlock) T
- type Bitmap
- type Clock
- type CloseFuncType
- type Closers
- type ConfigBuilder
- type ConfigDefaulter
- type ConfigObserver
- type DedupedSlice
- type ErrArray
- type FlowGraph
- type Graph
- func (g *Graph[K, N, E]) DeleteEdge(src, dst K)
- func (g *Graph[K, N, E]) DeleteNode(id K)
- func (g *Graph[K, N, E]) Edge(src, dst K) (p E)
- func (g *Graph[K, N, E]) HasEdge(src, dst K) bool
- func (g *Graph[K, N, E]) HasNode(id K) bool
- func (g *Graph[K, N, E]) InEdges(dst K) map[K]E
- func (g *Graph[K, N, E]) InsertEdge(src, dst K, props E)
- func (g *Graph[K, N, E]) InsertNode(props N)
- func (g *Graph[K, N, E]) Node(id K) (props N)
- func (g *Graph[K, N, E]) NodeIDs() []K
- func (g *Graph[K, N, E]) OutEdges(src K) map[K]E
- func (g *Graph[K, N, E]) ShortestPath(src, dst K) ([]N, int64)
- func (g *Graph[K, N, E]) Size() int
- func (g *Graph[K, N, E]) TopologicalSort() []N
- type GraphEdge
- type GraphEdgeProps
- type GraphNode
- type GraphNodeProps
- type HedgeParams
- type KillableService
- type LatencyAggregate
- func (a *LatencyAggregate) Get(ts time.Duration) (Welford, bool)
- func (a *LatencyAggregate) MarshalLogObject(e zapcore.ObjectEncoder) error
- func (a *LatencyAggregate) Summarize() Welford
- func (a *LatencyAggregate) SummarizeLast(d time.Duration) Welford
- func (a *LatencyAggregate) Update(ts time.Duration, v float64)
- type LeakyBucket
- type MinCostMaxFlow
- type MultitonService
- type Mutex
- type Promise
- type ProtoProxy
- type RWMutex
- type SimpleGraphEdge
- type SimulatedClock
- type StuckLock
- type SystemClock
- type TimedAggregator
- func (t *TimedAggregator[T]) AddSample(val T) error
- func (t *TimedAggregator[T]) AddSampleAt(val T, at time.Time) error
- func (t *TimedAggregator[T]) GetAggregate() (T, time.Duration)
- func (t *TimedAggregator[T]) GetAggregateAndRestartAt(at time.Time) (T, time.Duration, error)
- func (t *TimedAggregator[T]) GetAggregateAt(at time.Time) (T, time.Duration, error)
- func (t *TimedAggregator[T]) GetAverage() float64
- func (t *TimedAggregator[T]) GetAverageAndRestartAt(at time.Time) (float64, error)
- func (t *TimedAggregator[T]) GetAverageAt(at time.Time) (float64, error)
- func (t *TimedAggregator[T]) Reset()
- func (t *TimedAggregator[T]) Restart()
- func (t *TimedAggregator[T]) RestartAt(at time.Time)
- type TimedAggregatorParams
- type TimedVersion
- func (t TimedVersion) After(other TimedVersion) bool
- func (t TimedVersion) Compare(other TimedVersion) int
- func (t *TimedVersion) Downgrade(other TimedVersion) bool
- func (t TimedVersion) GormDataType() string
- func (t TimedVersion) IsZero() bool
- func (t TimedVersion) Load() TimedVersion
- func (t *TimedVersion) Scan(src interface{}) (err error)
- func (t *TimedVersion) Store(other TimedVersion)
- func (t TimedVersion) String() string
- func (t TimedVersion) Time() time.Time
- func (t TimedVersion) ToProto() *livekit.TimedVersion
- func (t *TimedVersion) Update(other TimedVersion) bool
- func (t *TimedVersion) Upgrade(other TimedVersion) bool
- func (t TimedVersion) Value() (driver.Value, error)
- type TimedVersionGenerator
- type TimeoutQueue
- type TimeoutQueueItem
- type TimeoutQueueIterator
- type Welford
- type WorkerGroup
Constants ¶
const ( RoomPrefix = guid.RoomPrefix NodePrefix = guid.NodePrefix ParticipantPrefix = guid.ParticipantPrefix TrackPrefix = guid.TrackPrefix APIKeyPrefix = guid.APIKeyPrefix EgressPrefix = guid.EgressPrefix IngressPrefix = guid.IngressPrefix SIPTrunkPrefix = guid.SIPTrunkPrefix SIPDispatchRulePrefix = guid.SIPDispatchRulePrefix SIPCallPrefix = guid.SIPCallPrefix RPCPrefix = guid.RPCPrefix WHIPResourcePrefix = guid.WHIPResourcePrefix RTMPResourcePrefix = guid.RTMPResourcePrefix URLResourcePrefix = guid.URLResourcePrefix AgentWorkerPrefix = guid.AgentWorkerPrefix AgentJobPrefix = guid.AgentJobPrefix AgentDispatchPrefix = guid.AgentDispatchPrefix )
const (
GuidSize = guid.Size
)
Variables ¶
var (
ErrAnachronousSample = errors.New("anachronous sample")
)
var ErrMaxAttemptsReached = errors.New("max attempts reached")
Functions ¶
func AggregateRTPStats ¶
func CloneProto ¶
func CloseFunc ¶
func CloseFunc[T CloseFuncType](fn T) io.Closer
func EnableLockTracker ¶
func EnableLockTracker()
EnableLockTracker enable lock tracking background worker. This should be called during init
func ErrorIsOneOf ¶
func FastLogisticFunc ¶
func GetMimeTypeForAudioCodec ¶
func GetMimeTypeForAudioCodec(codec livekit.AudioCodec) string
func GetMimeTypeForVideoCodec ¶
func GetMimeTypeForVideoCodec(codec livekit.VideoCodec) string
func HedgeCall ¶
func HedgeCall[T any](ctx context.Context, params HedgeParams[T]) (v T, err error)
race retries if the function takes too long to return |---------------- attempt 1 ----------------| | delay |--------- attempt 2 ---------|
func IsConnectionQualityHigher ¶
func IsConnectionQualityHigher(prev livekit.ConnectionQuality, curr livekit.ConnectionQuality) bool
func IsConnectionQualityLower ¶
func IsConnectionQualityLower(prev livekit.ConnectionQuality, curr livekit.ConnectionQuality) bool
func LocalNodeID ¶
func LogisticFunc ¶
func NumMutexes ¶
func NumMutexes() int
func ParallelExec ¶
ParallelExec will executes the given function with each element of vals, if len(vals) >= parallelThreshold, will execute them in parallel, with the given step size. So fn must be thread-safe.
func RandomSecret ¶
func RandomSecret() string
func RedactIdentifier ¶
func RedactStreamKey ¶
func ToggleLockTrackerStackTraces ¶
func ToggleLockTrackerStackTraces(enable bool)
Types ¶
type Bitmap ¶
type Bitmap[T bitmapNumber] struct {
// contains filtered or unexported fields
}
func (*Bitmap[T]) ClearRange ¶
func (b *Bitmap[T]) ClearRange(min, max T)
type CloseFuncType ¶
type CloseFuncType interface { ~func() error | ~func() }
type Closers ¶
func CombineClosers ¶
type ConfigBuilder ¶
type ConfigDefaulter ¶
type ConfigObserver ¶
type ConfigObserver[T any] struct { // contains filtered or unexported fields }
func NewConfigObserver ¶
func NewConfigObserver[T any](path string, builder ConfigBuilder[T]) (*ConfigObserver[T], *T, error)
func (*ConfigObserver[T]) Close ¶
func (c *ConfigObserver[T]) Close()
func (*ConfigObserver[T]) EmitConfigUpdate ¶
func (c *ConfigObserver[T]) EmitConfigUpdate(conf *T)
func (*ConfigObserver[T]) Load ¶
func (c *ConfigObserver[T]) Load() *T
func (*ConfigObserver[T]) Observe ¶
func (c *ConfigObserver[T]) Observe(cb func(*T)) func()
type DedupedSlice ¶
type DedupedSlice[T comparable] struct { // contains filtered or unexported fields }
func NewDedupedSlice ¶
func NewDedupedSlice[T comparable](maxLen int) *DedupedSlice[T]
func (*DedupedSlice[T]) Add ¶
func (d *DedupedSlice[T]) Add(val T) bool
func (*DedupedSlice[T]) Clear ¶
func (d *DedupedSlice[T]) Clear()
func (*DedupedSlice[T]) Get ¶
func (d *DedupedSlice[T]) Get() []T
func (*DedupedSlice[T]) Has ¶
func (d *DedupedSlice[T]) Has(val T) bool
func (*DedupedSlice[T]) Len ¶
func (d *DedupedSlice[T]) Len() int
type FlowGraph ¶
type FlowGraph struct {
// contains filtered or unexported fields
}
func NewFlowGraph ¶
type Graph ¶
type Graph[K comparable, N GraphNodeProps[K], E GraphEdgeProps] struct { // contains filtered or unexported fields }
func NewGraph ¶
func NewGraph[K comparable, N GraphNodeProps[K], E GraphEdgeProps]() *Graph[K, N, E]
func (*Graph[K, N, E]) DeleteEdge ¶
func (g *Graph[K, N, E]) DeleteEdge(src, dst K)
func (*Graph[K, N, E]) DeleteNode ¶
func (g *Graph[K, N, E]) DeleteNode(id K)
func (*Graph[K, N, E]) InsertEdge ¶
func (g *Graph[K, N, E]) InsertEdge(src, dst K, props E)
func (*Graph[K, N, E]) InsertNode ¶
func (g *Graph[K, N, E]) InsertNode(props N)
func (*Graph[K, N, E]) ShortestPath ¶
func (*Graph[K, N, E]) TopologicalSort ¶
func (g *Graph[K, N, E]) TopologicalSort() []N
type GraphEdgeProps ¶
type GraphEdgeProps interface {
Length() int64
}
type GraphNodeProps ¶
type GraphNodeProps[K comparable] interface { ID() K }
type HedgeParams ¶
type KillableService ¶
type KillableService interface {
Kill()
}
type LatencyAggregate ¶
type LatencyAggregate struct {
// contains filtered or unexported fields
}
a ring buffer of welford mean/var summaries used to aggregate jitter and rtt.
func NewLatencyAggregate ¶
func NewLatencyAggregate(interval, windowLength time.Duration) *LatencyAggregate
func (*LatencyAggregate) MarshalLogObject ¶
func (a *LatencyAggregate) MarshalLogObject(e zapcore.ObjectEncoder) error
func (*LatencyAggregate) Summarize ¶
func (a *LatencyAggregate) Summarize() Welford
aggregate the interval summaries
func (*LatencyAggregate) SummarizeLast ¶
func (a *LatencyAggregate) SummarizeLast(d time.Duration) Welford
type LeakyBucket ¶
type LeakyBucket struct {
// contains filtered or unexported fields
}
func NewLeakyBucket ¶
func NewLeakyBucket(rateLimit int, slack int, clock Clock) *LeakyBucket
NewLeakyBucket initiates LeakyBucket with rateLimit, slack, and clock.
rateLimit is defined as the number of request per second.
slack is defined as the number of allowed requests before limiting. e.g. when slack=5, LeakyBucket will allow 5 requests to pass through Take without a sleep as long as these requests are under perRequest duration.
func (*LeakyBucket) Take ¶
func (lb *LeakyBucket) Take() time.Time
Take blocks to ensure that the time spent between multiple Take calls is on average time.Second/rate.
Take is THREAD SAFE and BLOCKING.
func (*LeakyBucket) Update ¶
func (lb *LeakyBucket) Update(rateLimit int, slack int)
Update sets the underlying rate limit and slack. The setting may not be applied immediately.
Update is THREAD SAFE and NON-BLOCKING.
type MinCostMaxFlow ¶
type MinCostMaxFlow struct {
// contains filtered or unexported fields
}
func (*MinCostMaxFlow) ComputeMaxFlow ¶
func (f *MinCostMaxFlow) ComputeMaxFlow(g FlowGraph, s, t int64) (flow, cost int64)
func (*MinCostMaxFlow) Flow ¶
func (f *MinCostMaxFlow) Flow(s, t int64) int64
type MultitonService ¶
type MultitonService[K comparable] struct { // contains filtered or unexported fields }
func (*MultitonService[K]) Kill ¶
func (s *MultitonService[K]) Kill()
func (*MultitonService[K]) Replace ¶
func (s *MultitonService[K]) Replace(k K, v KillableService) func()
type ProtoProxy ¶
ProtoProxy is a caching proxy for protobuf messages that may be expensive to compute. It is used to avoid unnecessary re-generation of Protobufs
func NewProtoProxy ¶
func NewProtoProxy[T proto.Message](refreshInterval time.Duration, updateFn func() T) *ProtoProxy[T]
NewProtoProxy creates a new ProtoProxy that regenerates underlying values at a cadence of refreshInterval this should be used for updates that should be sent periodically, but does not have the urgency of immediate delivery updateFn should provide computations required to generate the protobuf if refreshInterval is 0, then proxy will only update on MarkDirty(true)
func (*ProtoProxy[T]) Get ¶
func (p *ProtoProxy[T]) Get() T
func (*ProtoProxy[T]) MarkDirty ¶
func (p *ProtoProxy[T]) MarkDirty(immediate bool) <-chan struct{}
func (*ProtoProxy[T]) Stop ¶
func (p *ProtoProxy[T]) Stop()
func (*ProtoProxy[T]) Updated ¶
func (p *ProtoProxy[T]) Updated() <-chan struct{}
type SimpleGraphEdge ¶
type SimpleGraphEdge struct{}
func (SimpleGraphEdge) Length ¶
func (e SimpleGraphEdge) Length() int64
type SimulatedClock ¶
type StuckLock ¶
type StuckLock struct {
// contains filtered or unexported fields
}
func ScanTrackedLocks ¶
ScanTrackedLocks check all lock trackers
func ScanTrackedLocksI ¶
ScanTrackedLocksI check lock trackers incrementally n at a time
func (*StuckLock) FirstLockedAtStack ¶
func (*StuckLock) NumGoroutineHeld ¶
func (*StuckLock) NumGoroutineWaiting ¶
type SystemClock ¶
type SystemClock struct{}
func (SystemClock) Now ¶
func (SystemClock) Now() time.Time
func (SystemClock) Sleep ¶
func (SystemClock) Sleep(d time.Duration)
type TimedAggregator ¶
type TimedAggregator[T timedAggregatorNumber] struct {
// contains filtered or unexported fields
}
func NewTimedAggregator ¶
func NewTimedAggregator[T timedAggregatorNumber](params TimedAggregatorParams) *TimedAggregator[T]
func (*TimedAggregator[T]) AddSample ¶
func (t *TimedAggregator[T]) AddSample(val T) error
func (*TimedAggregator[T]) AddSampleAt ¶
func (t *TimedAggregator[T]) AddSampleAt(val T, at time.Time) error
func (*TimedAggregator[T]) GetAggregate ¶
func (t *TimedAggregator[T]) GetAggregate() (T, time.Duration)
func (*TimedAggregator[T]) GetAggregateAndRestartAt ¶
func (*TimedAggregator[T]) GetAggregateAt ¶
func (*TimedAggregator[T]) GetAverage ¶
func (t *TimedAggregator[T]) GetAverage() float64
func (*TimedAggregator[T]) GetAverageAndRestartAt ¶
func (t *TimedAggregator[T]) GetAverageAndRestartAt(at time.Time) (float64, error)
func (*TimedAggregator[T]) GetAverageAt ¶
func (t *TimedAggregator[T]) GetAverageAt(at time.Time) (float64, error)
func (*TimedAggregator[T]) Reset ¶
func (t *TimedAggregator[T]) Reset()
func (*TimedAggregator[T]) Restart ¶
func (t *TimedAggregator[T]) Restart()
func (*TimedAggregator[T]) RestartAt ¶
func (t *TimedAggregator[T]) RestartAt(at time.Time)
type TimedAggregatorParams ¶
type TimedAggregatorParams struct {
CapNegativeValues bool
}
type TimedVersion ¶
type TimedVersion uint64
func TimedVersionFromProto ¶
func TimedVersionFromProto(proto *livekit.TimedVersion) TimedVersion
func TimedVersionFromTime ¶
func TimedVersionFromTime(t time.Time) TimedVersion
func (TimedVersion) After ¶
func (t TimedVersion) After(other TimedVersion) bool
func (TimedVersion) Compare ¶
func (t TimedVersion) Compare(other TimedVersion) int
func (*TimedVersion) Downgrade ¶
func (t *TimedVersion) Downgrade(other TimedVersion) bool
func (TimedVersion) GormDataType ¶
func (t TimedVersion) GormDataType() string
func (TimedVersion) IsZero ¶
func (t TimedVersion) IsZero() bool
func (TimedVersion) Load ¶
func (t TimedVersion) Load() TimedVersion
func (*TimedVersion) Scan ¶
func (t *TimedVersion) Scan(src interface{}) (err error)
func (*TimedVersion) Store ¶
func (t *TimedVersion) Store(other TimedVersion)
func (TimedVersion) String ¶
func (t TimedVersion) String() string
func (TimedVersion) Time ¶
func (t TimedVersion) Time() time.Time
func (TimedVersion) ToProto ¶
func (t TimedVersion) ToProto() *livekit.TimedVersion
func (*TimedVersion) Update ¶
func (t *TimedVersion) Update(other TimedVersion) bool
func (*TimedVersion) Upgrade ¶
func (t *TimedVersion) Upgrade(other TimedVersion) bool
type TimedVersionGenerator ¶
type TimedVersionGenerator interface {
Next() TimedVersion
}
func NewDefaultTimedVersionGenerator ¶
func NewDefaultTimedVersionGenerator() TimedVersionGenerator
type TimeoutQueue ¶
type TimeoutQueue[T any] struct { // contains filtered or unexported fields }
func (*TimeoutQueue[T]) IterateAfter ¶
func (q *TimeoutQueue[T]) IterateAfter(timeout time.Duration) TimeoutQueueIterator[T]
func (*TimeoutQueue[T]) IterateRemoveAfter ¶
func (q *TimeoutQueue[T]) IterateRemoveAfter(timeout time.Duration) TimeoutQueueIterator[T]
func (*TimeoutQueue[T]) Remove ¶
func (q *TimeoutQueue[T]) Remove(i *TimeoutQueueItem[T])
func (*TimeoutQueue[T]) Reset ¶
func (q *TimeoutQueue[T]) Reset(i *TimeoutQueueItem[T]) bool
type TimeoutQueueItem ¶
type TimeoutQueueItem[T any] struct { Value T // contains filtered or unexported fields }
type TimeoutQueueIterator ¶
type TimeoutQueueIterator[T any] struct { // contains filtered or unexported fields }
func (*TimeoutQueueIterator[T]) Item ¶
func (i *TimeoutQueueIterator[T]) Item() *TimeoutQueueItem[T]
func (*TimeoutQueueIterator[T]) Next ¶
func (i *TimeoutQueueIterator[T]) Next() bool
type Welford ¶
type Welford struct {
// contains filtered or unexported fields
}
Welford implements Welford's online algorithm for variance SEE: https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm
func WelfordMerge ¶
type WorkerGroup ¶
type WorkerGroup struct {
// contains filtered or unexported fields
}
func (*WorkerGroup) Go ¶
func (w *WorkerGroup) Go(fn func())
func (*WorkerGroup) Wait ¶
func (w *WorkerGroup) Wait()
Source Files ¶
- bitmap.go
- clock.go
- closers.go
- codec.go
- configobserver.go
- connectionquality.go
- dedupedslice.go
- deepcopy.go
- err_array.go
- error.go
- graph.go
- hedge.go
- id.go
- latencyaggregate.go
- lock_tracker.go
- math.go
- multitonservice.go
- parallel.go
- promise.go
- proto.go
- protoproxy.go
- rate.go
- redact.go
- rtpstats.go
- secret.go
- timed_aggregator.go
- timed_version.go
- timeoutqueue.go
- welford.go
- workergroup.go