utils

package
v1.27.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 23, 2024 License: Apache-2.0 Imports: 39 Imported by: 0

Documentation

Overview

Copyright (c) 2016,2020 Uber Technologies, Inc.

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

SOURCE: https://github.com/uber-go/ratelimit/blob/main/limiter_mutexbased.go EDIT: slight modification to allow setting rate limit on the fly SCOPE: LeakyBucket

Index

Constants

View Source
const (
	RoomPrefix            = guid.RoomPrefix
	NodePrefix            = guid.NodePrefix
	ParticipantPrefix     = guid.ParticipantPrefix
	TrackPrefix           = guid.TrackPrefix
	APIKeyPrefix          = guid.APIKeyPrefix
	EgressPrefix          = guid.EgressPrefix
	IngressPrefix         = guid.IngressPrefix
	SIPTrunkPrefix        = guid.SIPTrunkPrefix
	SIPDispatchRulePrefix = guid.SIPDispatchRulePrefix
	SIPCallPrefix         = guid.SIPCallPrefix
	RPCPrefix             = guid.RPCPrefix
	WHIPResourcePrefix    = guid.WHIPResourcePrefix
	RTMPResourcePrefix    = guid.RTMPResourcePrefix
	URLResourcePrefix     = guid.URLResourcePrefix
	AgentWorkerPrefix     = guid.AgentWorkerPrefix
	AgentJobPrefix        = guid.AgentJobPrefix
	AgentDispatchPrefix   = guid.AgentDispatchPrefix
)
View Source
const (
	GuidSize = guid.Size
)

Variables

View Source
var (
	ErrAnachronousSample = errors.New("anachronous sample")
)
View Source
var ErrMaxAttemptsReached = errors.New("max attempts reached")

Functions

func AggregateRTPStats

func AggregateRTPStats(statsList []*livekit.RTPStats, gapHistogramSize int) *livekit.RTPStats

func CloneProto

func CloneProto[T proto.Message](m T) T

func CloseFunc

func CloseFunc[T CloseFuncType](fn T) io.Closer

func DeepCopy

func DeepCopy[T any](v T) T

func EnableLockTracker

func EnableLockTracker()

EnableLockTracker enable lock tracking background worker. This should be called during init

func ErrorIsOneOf

func ErrorIsOneOf(err error, targets ...error) bool

func FastLogisticFunc

func FastLogisticFunc(x0, L, k float64) func(x float64) float64

func GetMimeTypeForAudioCodec

func GetMimeTypeForAudioCodec(codec livekit.AudioCodec) string

func GetMimeTypeForVideoCodec

func GetMimeTypeForVideoCodec(codec livekit.VideoCodec) string

func HashedID

func HashedID(id string) string

HashedID creates a hashed ID from a unique string

func HedgeCall

func HedgeCall[T any](ctx context.Context, params HedgeParams[T]) (v T, err error)

race retries if the function takes too long to return |---------------- attempt 1 ----------------| | delay |--------- attempt 2 ---------|

func IsConnectionQualityHigher

func IsConnectionQualityHigher(prev livekit.ConnectionQuality, curr livekit.ConnectionQuality) bool

func IsConnectionQualityLower

func IsConnectionQualityLower(prev livekit.ConnectionQuality, curr livekit.ConnectionQuality) bool

func LocalNodeID

func LocalNodeID() (string, error)

func LogisticFunc

func LogisticFunc(x0, L, k float64) func(x float64) float64

func MarshalGuid

func MarshalGuid[T livekit.Guid](id T) livekit.GuidBlock

func NewGuid

func NewGuid(prefix string) string

func NumMutexes

func NumMutexes() int

func ParallelExec

func ParallelExec[T any](vals []T, parallelThreshold, step uint64, fn func(T))

ParallelExec will executes the given function with each element of vals, if len(vals) >= parallelThreshold, will execute them in parallel, with the given step size. So fn must be thread-safe.

func RandomSecret

func RandomSecret() string

func Redact

func Redact(s, name string) string

func RedactIdentifier

func RedactIdentifier(identifier string) string

func RedactStreamKey

func RedactStreamKey(url string) (string, bool)

func ToggleLockTrackerStackTraces

func ToggleLockTrackerStackTraces(enable bool)

func UnmarshalGuid

func UnmarshalGuid[T livekit.Guid](b livekit.GuidBlock) T

Types

type Bitmap

type Bitmap[T bitmapNumber] struct {
	// contains filtered or unexported fields
}

func NewBitmap

func NewBitmap[T bitmapNumber](size int) *Bitmap[T]

func (*Bitmap[T]) Clear

func (b *Bitmap[T]) Clear(val T)

func (*Bitmap[T]) ClearRange

func (b *Bitmap[T]) ClearRange(min, max T)

func (*Bitmap[T]) IsSet

func (b *Bitmap[T]) IsSet(val T) bool

func (*Bitmap[T]) Len

func (b *Bitmap[T]) Len() int

func (*Bitmap[T]) Set

func (b *Bitmap[T]) Set(val T)

func (*Bitmap[T]) SetRange

func (b *Bitmap[T]) SetRange(min, max T)

type Clock

type Clock interface {
	Now() time.Time
	Sleep(time.Duration)
}

type CloseFuncType

type CloseFuncType interface {
	~func() error | ~func()
}

type Closers

type Closers []io.Closer

func CombineClosers

func CombineClosers(cs ...io.Closer) Closers

func (*Closers) Close

func (s *Closers) Close() error

type ConfigBuilder

type ConfigBuilder[T any] interface {
	New() (*T, error)
}

type ConfigDefaulter

type ConfigDefaulter[T any] interface {
	InitDefaults(*T) error
}

type ConfigObserver

type ConfigObserver[T any] struct {
	// contains filtered or unexported fields
}

func NewConfigObserver

func NewConfigObserver[T any](path string, builder ConfigBuilder[T]) (*ConfigObserver[T], *T, error)

func (*ConfigObserver[T]) Close

func (c *ConfigObserver[T]) Close()

func (*ConfigObserver[T]) EmitConfigUpdate

func (c *ConfigObserver[T]) EmitConfigUpdate(conf *T)

func (*ConfigObserver[T]) Load

func (c *ConfigObserver[T]) Load() *T

func (*ConfigObserver[T]) Observe

func (c *ConfigObserver[T]) Observe(cb func(*T)) func()

type DedupedSlice

type DedupedSlice[T comparable] struct {
	// contains filtered or unexported fields
}

func NewDedupedSlice

func NewDedupedSlice[T comparable](maxLen int) *DedupedSlice[T]

func (*DedupedSlice[T]) Add

func (d *DedupedSlice[T]) Add(val T) bool

func (*DedupedSlice[T]) Clear

func (d *DedupedSlice[T]) Clear()

func (*DedupedSlice[T]) Get

func (d *DedupedSlice[T]) Get() []T

func (*DedupedSlice[T]) Has

func (d *DedupedSlice[T]) Has(val T) bool

func (*DedupedSlice[T]) Len

func (d *DedupedSlice[T]) Len() int

type ErrArray

type ErrArray struct {
	// contains filtered or unexported fields
}

func (*ErrArray) AppendErr

func (e *ErrArray) AppendErr(err error)

func (*ErrArray) ToError

func (e *ErrArray) ToError() psrpc.Error

type FlowGraph

type FlowGraph struct {
	// contains filtered or unexported fields
}

func NewFlowGraph

func NewFlowGraph(n int64) FlowGraph

func (*FlowGraph) AddEdge

func (g *FlowGraph) AddEdge(s, t, cap, cost int64)

type Graph

type Graph[K comparable, N GraphNodeProps[K], E GraphEdgeProps] struct {
	// contains filtered or unexported fields
}

func NewGraph

func NewGraph[K comparable, N GraphNodeProps[K], E GraphEdgeProps]() *Graph[K, N, E]

func (*Graph[K, N, E]) DeleteEdge

func (g *Graph[K, N, E]) DeleteEdge(src, dst K)

func (*Graph[K, N, E]) DeleteNode

func (g *Graph[K, N, E]) DeleteNode(id K)

func (*Graph[K, N, E]) Edge

func (g *Graph[K, N, E]) Edge(src, dst K) (p E)

func (*Graph[K, N, E]) HasEdge

func (g *Graph[K, N, E]) HasEdge(src, dst K) bool

func (*Graph[K, N, E]) HasNode

func (g *Graph[K, N, E]) HasNode(id K) bool

func (*Graph[K, N, E]) InEdges

func (g *Graph[K, N, E]) InEdges(dst K) map[K]E

func (*Graph[K, N, E]) InsertEdge

func (g *Graph[K, N, E]) InsertEdge(src, dst K, props E)

func (*Graph[K, N, E]) InsertNode

func (g *Graph[K, N, E]) InsertNode(props N)

func (*Graph[K, N, E]) Node

func (g *Graph[K, N, E]) Node(id K) (props N)

func (*Graph[K, N, E]) NodeIDs

func (g *Graph[K, N, E]) NodeIDs() []K

func (*Graph[K, N, E]) OutEdges

func (g *Graph[K, N, E]) OutEdges(src K) map[K]E

func (*Graph[K, N, E]) ShortestPath

func (g *Graph[K, N, E]) ShortestPath(src, dst K) ([]N, int64)

func (*Graph[K, N, E]) Size

func (g *Graph[K, N, E]) Size() int

func (*Graph[K, N, E]) TopologicalSort

func (g *Graph[K, N, E]) TopologicalSort() []N

type GraphEdge

type GraphEdge[N, E any] struct {
	// contains filtered or unexported fields
}

type GraphEdgeProps

type GraphEdgeProps interface {
	Length() int64
}

type GraphNode

type GraphNode[T any] struct {
	// contains filtered or unexported fields
}

type GraphNodeProps

type GraphNodeProps[K comparable] interface {
	ID() K
}

type HedgeParams

type HedgeParams[T any] struct {
	Timeout       time.Duration
	RetryDelay    time.Duration
	MaxAttempts   int
	IsRecoverable func(err error) bool
	Func          func(context.Context) (T, error)
}

type KillableService

type KillableService interface {
	Kill()
}

type LatencyAggregate

type LatencyAggregate struct {
	// contains filtered or unexported fields
}

a ring buffer of welford mean/var summaries used to aggregate jitter and rtt.

func NewLatencyAggregate

func NewLatencyAggregate(interval, windowLength time.Duration) *LatencyAggregate

func (*LatencyAggregate) Get

func (a *LatencyAggregate) Get(ts time.Duration) (Welford, bool)

func (*LatencyAggregate) MarshalLogObject

func (a *LatencyAggregate) MarshalLogObject(e zapcore.ObjectEncoder) error

func (*LatencyAggregate) Summarize

func (a *LatencyAggregate) Summarize() Welford

aggregate the interval summaries

func (*LatencyAggregate) SummarizeLast

func (a *LatencyAggregate) SummarizeLast(d time.Duration) Welford

func (*LatencyAggregate) Update

func (a *LatencyAggregate) Update(ts time.Duration, v float64)

extend the ring to contain ts then merge the value into the interval summary.

type LeakyBucket

type LeakyBucket struct {
	// contains filtered or unexported fields
}

func NewLeakyBucket

func NewLeakyBucket(rateLimit int, slack int, clock Clock) *LeakyBucket

NewLeakyBucket initiates LeakyBucket with rateLimit, slack, and clock.

rateLimit is defined as the number of request per second.

slack is defined as the number of allowed requests before limiting. e.g. when slack=5, LeakyBucket will allow 5 requests to pass through Take without a sleep as long as these requests are under perRequest duration.

func (*LeakyBucket) Take

func (lb *LeakyBucket) Take() time.Time

Take blocks to ensure that the time spent between multiple Take calls is on average time.Second/rate.

Take is THREAD SAFE and BLOCKING.

func (*LeakyBucket) Update

func (lb *LeakyBucket) Update(rateLimit int, slack int)

Update sets the underlying rate limit and slack. The setting may not be applied immediately.

Update is THREAD SAFE and NON-BLOCKING.

type MinCostMaxFlow

type MinCostMaxFlow struct {
	// contains filtered or unexported fields
}

func (*MinCostMaxFlow) ComputeMaxFlow

func (f *MinCostMaxFlow) ComputeMaxFlow(g FlowGraph, s, t int64) (flow, cost int64)

func (*MinCostMaxFlow) Flow

func (f *MinCostMaxFlow) Flow(s, t int64) int64

type MultitonService

type MultitonService[K comparable] struct {
	// contains filtered or unexported fields
}

func (*MultitonService[K]) Kill

func (s *MultitonService[K]) Kill()

func (*MultitonService[K]) Replace

func (s *MultitonService[K]) Replace(k K, v KillableService) func()

type Mutex

type Mutex struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func (*Mutex) Lock

func (m *Mutex) Lock()

func (*Mutex) Unlock

func (m *Mutex) Unlock()

type Promise

type Promise[T any] struct {
	Result T
	Err    error
	// contains filtered or unexported fields
}

func GoPromise

func GoPromise[T any](f func() (T, error)) *Promise[T]

func NewPromise

func NewPromise[T any]() *Promise[T]

func NewResolvedPromise

func NewResolvedPromise[T any](result T, err error) *Promise[T]

func (*Promise[T]) Done

func (p *Promise[T]) Done() <-chan struct{}

func (*Promise[T]) Resolve

func (p *Promise[T]) Resolve(result T, err error)

func (*Promise[T]) Resolved

func (p *Promise[T]) Resolved() bool

type ProtoProxy

type ProtoProxy[T proto.Message] struct {
	// contains filtered or unexported fields
}

ProtoProxy is a caching proxy for protobuf messages that may be expensive to compute. It is used to avoid unnecessary re-generation of Protobufs

func NewProtoProxy

func NewProtoProxy[T proto.Message](refreshInterval time.Duration, updateFn func() T) *ProtoProxy[T]

NewProtoProxy creates a new ProtoProxy that regenerates underlying values at a cadence of refreshInterval this should be used for updates that should be sent periodically, but does not have the urgency of immediate delivery updateFn should provide computations required to generate the protobuf if refreshInterval is 0, then proxy will only update on MarkDirty(true)

func (*ProtoProxy[T]) Get

func (p *ProtoProxy[T]) Get() T

func (*ProtoProxy[T]) MarkDirty

func (p *ProtoProxy[T]) MarkDirty(immediate bool) <-chan struct{}

func (*ProtoProxy[T]) Stop

func (p *ProtoProxy[T]) Stop()

func (*ProtoProxy[T]) Updated

func (p *ProtoProxy[T]) Updated() <-chan struct{}

type RWMutex

type RWMutex struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func (*RWMutex) Lock

func (m *RWMutex) Lock()

func (*RWMutex) RLock

func (m *RWMutex) RLock()

func (*RWMutex) RUnlock

func (m *RWMutex) RUnlock()

func (*RWMutex) Unlock

func (m *RWMutex) Unlock()

type SimpleGraphEdge

type SimpleGraphEdge struct{}

func (SimpleGraphEdge) Length

func (e SimpleGraphEdge) Length() int64

type SimulatedClock

type SimulatedClock struct {
	clock.Mock
}

type StuckLock

type StuckLock struct {
	// contains filtered or unexported fields
}

func ScanTrackedLocks

func ScanTrackedLocks(threshold time.Duration) []*StuckLock

ScanTrackedLocks check all lock trackers

func ScanTrackedLocksI

func ScanTrackedLocksI(threshold time.Duration, n int) []*StuckLock

ScanTrackedLocksI check lock trackers incrementally n at a time

func (*StuckLock) FirstLockedAtStack

func (d *StuckLock) FirstLockedAtStack() string

func (*StuckLock) HeldSince

func (d *StuckLock) HeldSince() time.Time

func (*StuckLock) NumGoroutineHeld

func (d *StuckLock) NumGoroutineHeld() int

func (*StuckLock) NumGoroutineWaiting

func (d *StuckLock) NumGoroutineWaiting() int

type SystemClock

type SystemClock struct{}

func (SystemClock) Now

func (SystemClock) Now() time.Time

func (SystemClock) Sleep

func (SystemClock) Sleep(d time.Duration)

type TimedAggregator

type TimedAggregator[T timedAggregatorNumber] struct {
	// contains filtered or unexported fields
}

func NewTimedAggregator

func NewTimedAggregator[T timedAggregatorNumber](params TimedAggregatorParams) *TimedAggregator[T]

func (*TimedAggregator[T]) AddSample

func (t *TimedAggregator[T]) AddSample(val T) error

func (*TimedAggregator[T]) AddSampleAt

func (t *TimedAggregator[T]) AddSampleAt(val T, at time.Time) error

func (*TimedAggregator[T]) GetAggregate

func (t *TimedAggregator[T]) GetAggregate() (T, time.Duration)

func (*TimedAggregator[T]) GetAggregateAndRestartAt

func (t *TimedAggregator[T]) GetAggregateAndRestartAt(at time.Time) (T, time.Duration, error)

func (*TimedAggregator[T]) GetAggregateAt

func (t *TimedAggregator[T]) GetAggregateAt(at time.Time) (T, time.Duration, error)

func (*TimedAggregator[T]) GetAverage

func (t *TimedAggregator[T]) GetAverage() float64

func (*TimedAggregator[T]) GetAverageAndRestartAt

func (t *TimedAggregator[T]) GetAverageAndRestartAt(at time.Time) (float64, error)

func (*TimedAggregator[T]) GetAverageAt

func (t *TimedAggregator[T]) GetAverageAt(at time.Time) (float64, error)

func (*TimedAggregator[T]) Reset

func (t *TimedAggregator[T]) Reset()

func (*TimedAggregator[T]) Restart

func (t *TimedAggregator[T]) Restart()

func (*TimedAggregator[T]) RestartAt

func (t *TimedAggregator[T]) RestartAt(at time.Time)

type TimedAggregatorParams

type TimedAggregatorParams struct {
	CapNegativeValues bool
}

type TimedVersion

type TimedVersion uint64

func TimedVersionFromProto

func TimedVersionFromProto(proto *livekit.TimedVersion) TimedVersion

func TimedVersionFromTime

func TimedVersionFromTime(t time.Time) TimedVersion

func (TimedVersion) After

func (t TimedVersion) After(other TimedVersion) bool

func (TimedVersion) Compare

func (t TimedVersion) Compare(other TimedVersion) int

func (*TimedVersion) Downgrade

func (t *TimedVersion) Downgrade(other TimedVersion) bool

func (TimedVersion) GormDataType

func (t TimedVersion) GormDataType() string

func (TimedVersion) IsZero

func (t TimedVersion) IsZero() bool

func (TimedVersion) Load

func (t TimedVersion) Load() TimedVersion

func (*TimedVersion) Scan

func (t *TimedVersion) Scan(src interface{}) (err error)

func (*TimedVersion) Store

func (t *TimedVersion) Store(other TimedVersion)

func (TimedVersion) String

func (t TimedVersion) String() string

func (TimedVersion) Time

func (t TimedVersion) Time() time.Time

func (TimedVersion) ToProto

func (t TimedVersion) ToProto() *livekit.TimedVersion

func (*TimedVersion) Update

func (t *TimedVersion) Update(other TimedVersion) bool

func (*TimedVersion) Upgrade

func (t *TimedVersion) Upgrade(other TimedVersion) bool

func (TimedVersion) Value

func (t TimedVersion) Value() (driver.Value, error)

type TimedVersionGenerator

type TimedVersionGenerator interface {
	Next() TimedVersion
}

func NewDefaultTimedVersionGenerator

func NewDefaultTimedVersionGenerator() TimedVersionGenerator

type TimeoutQueue

type TimeoutQueue[T any] struct {
	// contains filtered or unexported fields
}

func (*TimeoutQueue[T]) IterateAfter

func (q *TimeoutQueue[T]) IterateAfter(timeout time.Duration) TimeoutQueueIterator[T]

func (*TimeoutQueue[T]) IterateRemoveAfter

func (q *TimeoutQueue[T]) IterateRemoveAfter(timeout time.Duration) TimeoutQueueIterator[T]

func (*TimeoutQueue[T]) Remove

func (q *TimeoutQueue[T]) Remove(i *TimeoutQueueItem[T])

func (*TimeoutQueue[T]) Reset

func (q *TimeoutQueue[T]) Reset(i *TimeoutQueueItem[T]) bool

type TimeoutQueueItem

type TimeoutQueueItem[T any] struct {
	Value T
	// contains filtered or unexported fields
}

type TimeoutQueueIterator

type TimeoutQueueIterator[T any] struct {
	// contains filtered or unexported fields
}

func (*TimeoutQueueIterator[T]) Item

func (i *TimeoutQueueIterator[T]) Item() *TimeoutQueueItem[T]

func (*TimeoutQueueIterator[T]) Next

func (i *TimeoutQueueIterator[T]) Next() bool

type Welford

type Welford struct {
	// contains filtered or unexported fields
}

Welford implements Welford's online algorithm for variance SEE: https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm

func WelfordMerge

func WelfordMerge(ws ...Welford) Welford

func (Welford) Count

func (w Welford) Count() float64

func (Welford) Mean

func (w Welford) Mean() float64

func (*Welford) Reset

func (w *Welford) Reset()

func (Welford) StdDev

func (w Welford) StdDev() float64

func (*Welford) Update

func (w *Welford) Update(v float64)

func (Welford) Value

func (w Welford) Value() (mean, variance, sampleVariance float64)

func (Welford) Variance

func (w Welford) Variance() float64

type WorkerGroup

type WorkerGroup struct {
	// contains filtered or unexported fields
}

func (*WorkerGroup) Go

func (w *WorkerGroup) Go(fn func())

func (*WorkerGroup) Wait

func (w *WorkerGroup) Wait()

Directories

Path Synopsis
the missing go error package
the missing go error package

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL