Documentation ¶
Index ¶
- Constants
- Variables
- func AggregateRTPStats(statsList []*livekit.RTPStats, gapHistogramSize int) *livekit.RTPStats
- func EnableLockTracker()
- func GetMimeTypeForAudioCodec(codec livekit.AudioCodec) string
- func GetMimeTypeForVideoCodec(codec livekit.VideoCodec) string
- func HashedID(id string) string
- func IsConnectionQualityLower(prev livekit.ConnectionQuality, curr livekit.ConnectionQuality) bool
- func Least[T Numeric](less func(a, b T) bool, vs ...T) T
- func LocalNodeID() (string, error)
- func MarshalGuid[T livekit.Guid](id T) livekit.GuidBlock
- func Max[T Numeric](vs ...T) T
- func Min[T Numeric](vs ...T) T
- func Most[T Numeric](less func(a, b T) bool, vs ...T) T
- func Must[T any](v T, err error) T
- func NewGuid(prefix string) string
- func NumMutexes() int
- func ParallelExec[T any](vals []T, parallelThreshold, step uint64, fn func(T))
- func RandomSecret() string
- func Redact(s, name string) string
- func RedactIdentifier(identifier string) string
- func RedactStreamKey(url string) (string, bool)
- func ToggleLockTrackerStackTraces(enable bool)
- func UnmarshalGuid[T livekit.Guid](b livekit.GuidBlock) T
- type Bitmap
- type CPUStats
- type DedupedSlice
- type ErrArray
- type EventEmitter
- type EventEmitterParams
- type EventObserver
- type EventObserverList
- type FlowGraph
- type Graph
- func (g *Graph[K, N, E]) DeleteEdge(src, dst K)
- func (g *Graph[K, N, E]) DeleteNode(id K)
- func (g *Graph[K, N, E]) Edge(src, dst K) (p E)
- func (g *Graph[K, N, E]) HasEdge(src, dst K) bool
- func (g *Graph[K, N, E]) HasNode(id K) bool
- func (g *Graph[K, N, E]) InEdges(dst K) map[K]E
- func (g *Graph[K, N, E]) InsertEdge(src, dst K, props E)
- func (g *Graph[K, N, E]) InsertNode(props N)
- func (g *Graph[K, N, E]) Node(id K) (props N)
- func (g *Graph[K, N, E]) NodeIDs() []K
- func (g *Graph[K, N, E]) OutEdges(src K) map[K]E
- func (g *Graph[K, N, E]) ShortestPath(src, dst K) ([]N, int64)
- func (g *Graph[K, N, E]) Size() int
- func (g *Graph[K, N, E]) TopologicalSort() []N
- type GraphEdge
- type GraphEdgeProps
- type GraphNode
- type GraphNodeProps
- type KillableService
- type MessageBus
- type MinCostMaxFlow
- type MultitonService
- type Mutex
- type Numeric
- type ProtoProxy
- type PubSub
- type RWMutex
- type RedisMessageBus
- func (r *RedisMessageBus) Lock(ctx context.Context, key string, expiration time.Duration) (bool, error)
- func (r *RedisMessageBus) Publish(ctx context.Context, channel string, msg proto.Message) error
- func (r *RedisMessageBus) Subscribe(ctx context.Context, channel string) (PubSub, error)
- func (r *RedisMessageBus) SubscribeQueue(ctx context.Context, channel string) (PubSub, error)
- type RedisPubSub
- type SimpleGraphEdge
- type StuckLock
- type TimedAggregator
- func (t *TimedAggregator[T]) AddSample(val T) error
- func (t *TimedAggregator[T]) AddSampleAt(val T, at time.Time) error
- func (t *TimedAggregator[T]) GetAggregate() (T, time.Duration)
- func (t *TimedAggregator[T]) GetAggregateAndRestartAt(at time.Time) (T, time.Duration, error)
- func (t *TimedAggregator[T]) GetAggregateAt(at time.Time) (T, time.Duration, error)
- func (t *TimedAggregator[T]) GetAverage() float64
- func (t *TimedAggregator[T]) GetAverageAndRestartAt(at time.Time) (float64, error)
- func (t *TimedAggregator[T]) GetAverageAt(at time.Time) (float64, error)
- func (t *TimedAggregator[T]) Reset()
- func (t *TimedAggregator[T]) Restart()
- func (t *TimedAggregator[T]) RestartAt(at time.Time)
- type TimedAggregatorParams
- type TimedVersion
- func (t *TimedVersion) After(other *TimedVersion) bool
- func (t *TimedVersion) Compare(other *TimedVersion) int
- func (t *TimedVersion) IsZero() bool
- func (t *TimedVersion) Load() TimedVersion
- func (t *TimedVersion) Store(other *TimedVersion)
- func (t *TimedVersion) String() string
- func (t *TimedVersion) Time() time.Time
- func (t *TimedVersion) ToProto() *livekit.TimedVersion
- func (t *TimedVersion) Update(other *TimedVersion) bool
- type TimedVersionGenerator
- type TimeoutQueue
- type TimeoutQueueItem
- type Welford
- type WorkerGroup
Constants ¶
const ( RoomPrefix = "RM_" NodePrefix = "ND_" ParticipantPrefix = "PA_" TrackPrefix = "TR_" APIKeyPrefix = "API" EgressPrefix = "EG_" IngressPrefix = "IN_" SIPTrunkPrefix = "ST_" SIPDispatchRulePrefix = "SDR_" SIPParticipantPrefix = "SP_" RPCPrefix = "RPC_" WHIPResourcePrefix = "WH_" RTMPResourcePrefix = "RT_" URLResourcePrefix = "UR_" )
const GuidSize = 12
Variables ¶
var (
ErrAnachronousSample = errors.New("anachronous sample")
)
var ( PromMessageBusCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "livekit", Subsystem: "messagebus", Name: "messages", }, []string{"type", "status"}, ) )
Functions ¶
func AggregateRTPStats ¶
func EnableLockTracker ¶
func EnableLockTracker()
EnableLockTracker enable lock tracking background worker. This should be called during init
func GetMimeTypeForAudioCodec ¶
func GetMimeTypeForAudioCodec(codec livekit.AudioCodec) string
func GetMimeTypeForVideoCodec ¶
func GetMimeTypeForVideoCodec(codec livekit.VideoCodec) string
func IsConnectionQualityLower ¶
func IsConnectionQualityLower(prev livekit.ConnectionQuality, curr livekit.ConnectionQuality) bool
func LocalNodeID ¶
func NumMutexes ¶
func NumMutexes() int
func ParallelExec ¶
ParallelExec will executes the given function with each element of vals, if len(vals) >= parallelThreshold, will execute them in parallel, with the given step size. So fn must be thread-safe.
func RandomSecret ¶
func RandomSecret() string
func RedactIdentifier ¶
func RedactStreamKey ¶
func ToggleLockTrackerStackTraces ¶
func ToggleLockTrackerStackTraces(enable bool)
Types ¶
type Bitmap ¶
type Bitmap[T bitmapNumber] struct {
// contains filtered or unexported fields
}
func (*Bitmap[T]) ClearRange ¶
func (b *Bitmap[T]) ClearRange(min, max T)
type CPUStats ¶
type CPUStats struct {
// contains filtered or unexported fields
}
func NewCPUStats ¶
func NewProcCPUStats ¶
func (*CPUStats) GetCPUIdle ¶
type DedupedSlice ¶
type DedupedSlice[T comparable] struct { // contains filtered or unexported fields }
func NewDedupedSlice ¶
func NewDedupedSlice[T comparable](maxLen int) *DedupedSlice[T]
func (*DedupedSlice[T]) Add ¶
func (d *DedupedSlice[T]) Add(val T) bool
func (*DedupedSlice[T]) Clear ¶
func (d *DedupedSlice[T]) Clear()
func (*DedupedSlice[T]) Get ¶
func (d *DedupedSlice[T]) Get() []T
func (*DedupedSlice[T]) Has ¶
func (d *DedupedSlice[T]) Has(val T) bool
func (*DedupedSlice[T]) Len ¶
func (d *DedupedSlice[T]) Len() int
type EventEmitter ¶
type EventEmitter[K comparable, V any] struct { // contains filtered or unexported fields }
func NewDefaultEventEmitter ¶
func NewDefaultEventEmitter[K comparable, V any]() *EventEmitter[K, V]
func NewEventEmitter ¶
func NewEventEmitter[K comparable, V any](params EventEmitterParams) *EventEmitter[K, V]
func (*EventEmitter[K, V]) Emit ¶
func (e *EventEmitter[K, V]) Emit(k K, v V)
func (*EventEmitter[K, V]) Observe ¶
func (e *EventEmitter[K, V]) Observe(k K) *EventObserver[V]
func (*EventEmitter[K, V]) ObservedKeys ¶
func (e *EventEmitter[K, V]) ObservedKeys() []K
type EventEmitterParams ¶
func DefaultEventEmitterParams ¶
func DefaultEventEmitterParams() EventEmitterParams
type EventObserver ¶
type EventObserver[V any] struct { // contains filtered or unexported fields }
func NewEventObserver ¶
func NewEventObserver[V any](stopFunc func()) (*EventObserver[V], func(v V))
func (*EventObserver[V]) Events ¶
func (o *EventObserver[V]) Events() <-chan V
func (*EventObserver[V]) Stop ¶
func (o *EventObserver[V]) Stop()
type EventObserverList ¶
type EventObserverList[V any] struct { // contains filtered or unexported fields }
func NewDefaultEventObserverList ¶
func NewDefaultEventObserverList[V any]() *EventObserverList[V]
func NewEventObserverList ¶
func NewEventObserverList[V any](params EventEmitterParams) *EventObserverList[V]
func (*EventObserverList[V]) Emit ¶
func (l *EventObserverList[V]) Emit(v V)
func (*EventObserverList[V]) Len ¶
func (l *EventObserverList[V]) Len() int
func (*EventObserverList[V]) Observe ¶
func (l *EventObserverList[V]) Observe() *EventObserver[V]
type FlowGraph ¶
type FlowGraph struct {
// contains filtered or unexported fields
}
func NewFlowGraph ¶
type Graph ¶
type Graph[K comparable, N GraphNodeProps[K], E GraphEdgeProps] struct { // contains filtered or unexported fields }
func NewGraph ¶
func NewGraph[K comparable, N GraphNodeProps[K], E GraphEdgeProps]() *Graph[K, N, E]
func (*Graph[K, N, E]) DeleteEdge ¶
func (g *Graph[K, N, E]) DeleteEdge(src, dst K)
func (*Graph[K, N, E]) DeleteNode ¶
func (g *Graph[K, N, E]) DeleteNode(id K)
func (*Graph[K, N, E]) InsertEdge ¶
func (g *Graph[K, N, E]) InsertEdge(src, dst K, props E)
func (*Graph[K, N, E]) InsertNode ¶
func (g *Graph[K, N, E]) InsertNode(props N)
func (*Graph[K, N, E]) ShortestPath ¶
func (*Graph[K, N, E]) TopologicalSort ¶
func (g *Graph[K, N, E]) TopologicalSort() []N
type GraphEdgeProps ¶
type GraphEdgeProps interface {
Length() int64
}
type GraphNodeProps ¶
type GraphNodeProps[K comparable] interface { ID() K }
type KillableService ¶
type KillableService interface {
Kill()
}
type MessageBus ¶
type MessageBus interface { Subscribe(ctx context.Context, channel string) (PubSub, error) // SubscribeQueue is like subscribe, but ensuring only a single instance gets to process the message SubscribeQueue(ctx context.Context, channel string) (PubSub, error) Publish(ctx context.Context, channel string, msg proto.Message) error }
func NewRedisMessageBus ¶
func NewRedisMessageBus(rc redis.UniversalClient) MessageBus
type MinCostMaxFlow ¶
type MinCostMaxFlow struct {
// contains filtered or unexported fields
}
func (*MinCostMaxFlow) ComputeMaxFlow ¶
func (f *MinCostMaxFlow) ComputeMaxFlow(g FlowGraph, s, t int64) (flow, cost int64)
func (*MinCostMaxFlow) Flow ¶
func (f *MinCostMaxFlow) Flow(s, t int64) int64
type MultitonService ¶
type MultitonService[K comparable] struct { // contains filtered or unexported fields }
func (*MultitonService[K]) Kill ¶
func (s *MultitonService[K]) Kill()
func (*MultitonService[K]) Replace ¶
func (s *MultitonService[K]) Replace(k K, v KillableService) func()
type Numeric ¶
type Numeric interface { constraints.Signed | constraints.Unsigned | time.Duration }
type ProtoProxy ¶
ProtoProxy is a caching proxy for protobuf messages that may be expensive to compute. It is used to avoid unnecessary re-generation of Protobufs
func NewProtoProxy ¶
func NewProtoProxy[T proto.Message](refreshInterval time.Duration, updateFn func() T) *ProtoProxy[T]
NewProtoProxy creates a new ProtoProxy that regenerates underlying values at a cadence of refreshInterval this should be used for updates that should be sent periodically, but does not have the urgency of immediate delivery updateFn should provide computations required to generate the protobuf if refreshInterval is 0, then proxy will only update on MarkDirty(true)
func (*ProtoProxy[T]) Get ¶
func (p *ProtoProxy[T]) Get() T
func (*ProtoProxy[T]) MarkDirty ¶
func (p *ProtoProxy[T]) MarkDirty(immediate bool)
func (*ProtoProxy[T]) Stop ¶
func (p *ProtoProxy[T]) Stop()
func (*ProtoProxy[T]) Updated ¶
func (p *ProtoProxy[T]) Updated() <-chan struct{}
type RedisMessageBus ¶
type RedisMessageBus struct {
// contains filtered or unexported fields
}
func (*RedisMessageBus) SubscribeQueue ¶
type RedisPubSub ¶
type RedisPubSub struct {
// contains filtered or unexported fields
}
func (*RedisPubSub) Channel ¶
func (r *RedisPubSub) Channel() <-chan interface{}
func (*RedisPubSub) Close ¶
func (r *RedisPubSub) Close() error
func (*RedisPubSub) Payload ¶
func (r *RedisPubSub) Payload(msg interface{}) []byte
type SimpleGraphEdge ¶
type SimpleGraphEdge struct{}
func (SimpleGraphEdge) Length ¶
func (e SimpleGraphEdge) Length() int64
type StuckLock ¶
type StuckLock struct {
// contains filtered or unexported fields
}
func ScanTrackedLocks ¶
ScanTrackedLocks check all lock trackers
func ScanTrackedLocksI ¶
ScanTrackedLocksI check lock trackers incrementally n at a time
func (*StuckLock) FirstLockedAtStack ¶
func (*StuckLock) NumGoroutineHeld ¶
func (*StuckLock) NumGoroutineWaiting ¶
type TimedAggregator ¶
type TimedAggregator[T timedAggregatorNumber] struct {
// contains filtered or unexported fields
}
func NewTimedAggregator ¶
func NewTimedAggregator[T timedAggregatorNumber](params TimedAggregatorParams) *TimedAggregator[T]
func (*TimedAggregator[T]) AddSample ¶
func (t *TimedAggregator[T]) AddSample(val T) error
func (*TimedAggregator[T]) AddSampleAt ¶
func (t *TimedAggregator[T]) AddSampleAt(val T, at time.Time) error
func (*TimedAggregator[T]) GetAggregate ¶
func (t *TimedAggregator[T]) GetAggregate() (T, time.Duration)
func (*TimedAggregator[T]) GetAggregateAndRestartAt ¶
func (*TimedAggregator[T]) GetAggregateAt ¶
func (*TimedAggregator[T]) GetAverage ¶
func (t *TimedAggregator[T]) GetAverage() float64
func (*TimedAggregator[T]) GetAverageAndRestartAt ¶
func (t *TimedAggregator[T]) GetAverageAndRestartAt(at time.Time) (float64, error)
func (*TimedAggregator[T]) GetAverageAt ¶
func (t *TimedAggregator[T]) GetAverageAt(at time.Time) (float64, error)
func (*TimedAggregator[T]) Reset ¶
func (t *TimedAggregator[T]) Reset()
func (*TimedAggregator[T]) Restart ¶
func (t *TimedAggregator[T]) Restart()
func (*TimedAggregator[T]) RestartAt ¶
func (t *TimedAggregator[T]) RestartAt(at time.Time)
type TimedAggregatorParams ¶
type TimedAggregatorParams struct {
CapNegativeValues bool
}
type TimedVersion ¶
type TimedVersion struct {
// contains filtered or unexported fields
}
func NewTimedVersionFromProto ¶
func NewTimedVersionFromProto(proto *livekit.TimedVersion) *TimedVersion
func NewTimedVersionFromTime ¶
func NewTimedVersionFromTime(t time.Time) *TimedVersion
func TimedVersionFromProto ¶
func TimedVersionFromProto(proto *livekit.TimedVersion) TimedVersion
func TimedVersionFromTime ¶
func TimedVersionFromTime(t time.Time) TimedVersion
func (*TimedVersion) After ¶
func (t *TimedVersion) After(other *TimedVersion) bool
func (*TimedVersion) Compare ¶
func (t *TimedVersion) Compare(other *TimedVersion) int
func (*TimedVersion) IsZero ¶
func (t *TimedVersion) IsZero() bool
func (*TimedVersion) Load ¶
func (t *TimedVersion) Load() TimedVersion
func (*TimedVersion) Store ¶
func (t *TimedVersion) Store(other *TimedVersion)
func (*TimedVersion) String ¶
func (t *TimedVersion) String() string
func (*TimedVersion) Time ¶
func (t *TimedVersion) Time() time.Time
func (*TimedVersion) ToProto ¶
func (t *TimedVersion) ToProto() *livekit.TimedVersion
func (*TimedVersion) Update ¶
func (t *TimedVersion) Update(other *TimedVersion) bool
type TimedVersionGenerator ¶
type TimedVersionGenerator interface { New() *TimedVersion Next() TimedVersion }
func NewDefaultTimedVersionGenerator ¶
func NewDefaultTimedVersionGenerator() TimedVersionGenerator
type TimeoutQueue ¶
type TimeoutQueue[T any] struct { // contains filtered or unexported fields }
func (*TimeoutQueue[T]) IterateAfter ¶
func (q *TimeoutQueue[T]) IterateAfter(timeout time.Duration) *timeoutQueueIterator[T]
func (*TimeoutQueue[T]) IterateRemoveAfter ¶
func (q *TimeoutQueue[T]) IterateRemoveAfter(timeout time.Duration) *timeoutQueueIterator[T]
func (*TimeoutQueue[T]) Remove ¶
func (q *TimeoutQueue[T]) Remove(i *TimeoutQueueItem[T])
func (*TimeoutQueue[T]) Reset ¶
func (q *TimeoutQueue[T]) Reset(i *TimeoutQueueItem[T]) bool
type TimeoutQueueItem ¶
type TimeoutQueueItem[T any] struct { Value T // contains filtered or unexported fields }
type Welford ¶
type Welford struct {
// contains filtered or unexported fields
}
Welford implements Welford's online algorithm for variance SEE: https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm
func WelfordMerge ¶
type WorkerGroup ¶
type WorkerGroup struct {
// contains filtered or unexported fields
}
func (*WorkerGroup) Go ¶
func (w *WorkerGroup) Go(fn func())
func (*WorkerGroup) Wait ¶
func (w *WorkerGroup) Wait()
Source Files ¶
- bitmap.go
- codec.go
- connectionquality.go
- cpu.go
- cpu_all.go
- cpu_linux.go
- dedupedslice.go
- err_array.go
- event_emitter.go
- graph.go
- id.go
- lock_tracker.go
- math.go
- messaging.go
- multitonservice.go
- must.go
- parallel.go
- protoproxy.go
- redact.go
- rtpstats.go
- secret.go
- timed_aggregator.go
- timed_version.go
- timeoutqueue.go
- welford.go
- workergroup.go