Documentation ¶
Overview ¶
Design of Prober
Probing is used to check for existence of excess channel capacity. This is especially useful in the downstream direction of SFU. SFU forwards audio/video streams from one or more publishers to all the subscribers. But, the downstream channel of a subscriber may not be big enough to carry all the streams. It is also a time varying quantity.
When there is not enough capacity, some streams will be paused. To resume a stream, SFU would need to know that the channel has enough capacity. That's where probing comes in. When conditions are favorable, SFU can send probe packets so that the bandwidth estimator has more data to estimate available channel capacity better. NOTE: What defines `favorable conditions` is implementation dependent.
There are two options for probing
- Use padding only RTP packets: This one is preferable as probe rate can be controlled more tightly.
- Resume a paused stream or forward a higher spatial layer: Have to find a stream at probing rate. Also, a stream could get a key frame unexpectedly boosting rate in the probing window.
The strategy used depends on stream allocator implementation. This module can be used if the stream allocator decides to use padding only RTP packets for probing purposes.
Implementation: There are a couple of options
- Check prober in the forwarding path (pull from prober). This is preferred for scalability reasons. But, this suffers from not being able to probe when all streams are paused (could be due to downstream bandwidth constraints or the corresponding upstream tracks may have paused due to upstream bandwidth constraints). Another issue is not being to have tight control on probing window boundary as the packet forwarding path may not have a packet to forward. But, it should not be a major concern as long as some stream(s) is/are forwarded as there should be a packet at least every 60 ms or so (forwarding only one stream at 15 fps). Usually, it will be serviced much more frequently when there are multiple streams getting forwarded.
- Run it a go routine. But, that would have to wake up very often to prevent bunching up of probe packets. So, a scalability concern as there is one prober per subscriber peer connection. But, probe windows should be very short (of the order of 100s of ms). So, this approach might be fine.
The implementation here follows the second approach of using a go routine.
Pacing: ------ Ideally, the subscriber peer connection should have a pacer which trickles data out at the estimated channel capacity rate (and estimated channel capacity + probing rate when actively probing).
But, there a few significant challenges
- Pacer will require buffering of forwarded packets. That means more memory, more CPU (have to make copy of packets) and more latency in the media stream.
- Scalability concern as SFU may be handling hundreds of subscriber peer connections and each one processing the pacing loop at 5ms interval will add up.
So, this module assumes that pacing is inherently provided by the publishers for media streams. That is a reasonable assumption given that publishing clients will run their own pacer and pacing data out at a steady rate.
A further assumption is that if there are multiple publishers for a subscriber peer connection, all the publishers are not pacing in sync, i.e. each publisher's pacer is completely independent and SFU will be receiving the media packets with a good spread and not clumped together.
Given those assumptions, this module monitors media send rate and adjusts probing packet sends accordingly. Although the probing may have a high enough wake up frequency, it is for short windows. For example, probing at 5 Mbps for 1/2 second and sending 1000 byte probe per iteration will wake up every 1.6 ms. That is very high, but should last for 1/2 second or so.
5 Mbps over 1/2 second = 2.5 Mbps 2.5 Mbps = 312500 bytes = 313 probes at 1000 byte probes 313 probes over 1/2 second = 1.6 ms between probes
A few things to note
- When a probe cluster is added, the expected media rate is provided. So, the wake-up interval takes that into account. For example, if probing at 5 Mbps for 1/2 second and if 4 Mbps of it is expected to be provided by media traffic, the wake-up interval becomes 8 ms.
- The amount of probing should actually be capped at some value to avoid too much self-induced congestion. It maybe something like 500 kbps. That will increase the wake-up interval to 16 ms in the above example.
- In practice, the probing interval may also be shorter. Typically, it can be run for 2 - 3 RTTs to get a good measurement. For the longest hauls, RTT could be 250 ms or so leading to the probing window being long(ish). But, RTT should be much shorter especially if the subscriber peer connection of the client is able to connect to the nearest data center.
Index ¶
- Constants
- Variables
- func IsRedCodec(mime string) bool
- func IsSvcCodec(mime string) bool
- type AddTrackParams
- type AudioLevelHandle
- type Bitrates
- type ChannelCongestionReason
- type ChannelObserver
- func (c *ChannelObserver) AddEstimate(estimate int64)
- func (c *ChannelObserver) AddNack(packets uint32, repeatedNacks uint32)
- func (c *ChannelObserver) GetHighestEstimate() int64
- func (c *ChannelObserver) GetLowestEstimate() int64
- func (c *ChannelObserver) GetNackRatio() (uint32, uint32, float64)
- func (c *ChannelObserver) GetTrend() (ChannelTrend, ChannelCongestionReason)
- func (c *ChannelObserver) SeedEstimate(estimate int64)
- func (c *ChannelObserver) SeedNack(packets uint32, repeatedNacks uint32)
- type ChannelObserverParams
- type ChannelTrend
- type Cluster
- func (c *Cluster) GetInfo() ProbeClusterInfo
- func (c *Cluster) GetSleepDuration() time.Duration
- func (c *Cluster) IsFinished() bool
- func (c *Cluster) PacketSent(size int)
- func (c *Cluster) ProbeSent(size int)
- func (c *Cluster) Process(onSendProbe func(bytesToSend int))
- func (c *Cluster) Start()
- func (c *Cluster) String() string
- type DDVideoLayerSelector
- type DownTrack
- func (d *DownTrack) AddReceiverReportListener(listener ReceiverReportListener)
- func (d *DownTrack) AllocateNextHigher(availableChannelCapacity int64, allowOvershoot bool) (VideoAllocation, bool)
- func (d *DownTrack) AllocateOptimal(allowOvershoot bool) VideoAllocation
- func (d *DownTrack) BandwidthRequested() int64
- func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, error)
- func (d *DownTrack) Close()
- func (d *DownTrack) CloseWithFlush(flush bool)
- func (d *DownTrack) Codec() webrtc.RTPCodecCapability
- func (d *DownTrack) CreateSenderReport() *rtcp.SenderReport
- func (d *DownTrack) CreateSourceDescriptionChunks() []rtcp.SourceDescriptionChunk
- func (d *DownTrack) DebugInfo() map[string]interface{}
- func (d *DownTrack) DistanceToDesired() int32
- func (d *DownTrack) GetConnectionScore() float32
- func (d *DownTrack) GetForwardingStatus() ForwardingStatus
- func (d *DownTrack) GetNackStats() (totalPackets uint32, totalRepeatedNACKs uint32)
- func (d *DownTrack) GetNextHigherTransition(allowOvershoot bool) (VideoTransition, bool)
- func (d *DownTrack) GetState() DownTrackState
- func (d *DownTrack) GetTrackStats() *livekit.RTPStats
- func (d *DownTrack) GetTransceiver() *webrtc.RTPTransceiver
- func (d *DownTrack) ID() string
- func (d *DownTrack) IsClosed() bool
- func (d *DownTrack) IsDeficient() bool
- func (d *DownTrack) Kind() webrtc.RTPCodecType
- func (d *DownTrack) MaxLayers() VideoLayers
- func (d *DownTrack) Mute(muted bool)
- func (d *DownTrack) OnAvailableLayersChanged(fn func(dt *DownTrack))
- func (d *DownTrack) OnBind(fn func())
- func (d *DownTrack) OnBitrateAvailabilityChanged(fn func(dt *DownTrack))
- func (d *DownTrack) OnCloseHandler(fn func(willBeResumed bool))
- func (d *DownTrack) OnMaxLayerChanged(fn func(dt *DownTrack, layer int32))
- func (d *DownTrack) OnPacketSent(fn func(dt *DownTrack, size int))
- func (d *DownTrack) OnPaddingSent(fn func(dt *DownTrack, size int))
- func (d *DownTrack) OnREMB(fn func(dt *DownTrack, remb *rtcp.ReceiverEstimatedMaximumBitrate))
- func (d *DownTrack) OnRttUpdate(fn func(dt *DownTrack, rtt uint32))
- func (d *DownTrack) OnStatsUpdate(fn func(dt *DownTrack, stat *livekit.AnalyticsStat))
- func (d *DownTrack) OnSubscribedLayersChanged(fn func(dt *DownTrack, layers VideoLayers))
- func (d *DownTrack) OnSubscriptionChanged(fn func(dt *DownTrack))
- func (d *DownTrack) OnTransportCCFeedback(fn func(dt *DownTrack, cc *rtcp.TransportLayerCC))
- func (d *DownTrack) Pause() VideoAllocation
- func (d *DownTrack) ProvisionalAllocate(availableChannelCapacity int64, layers VideoLayers, allowPause bool, ...) int64
- func (d *DownTrack) ProvisionalAllocateCommit() VideoAllocation
- func (d *DownTrack) ProvisionalAllocateGetBestWeightedTransition() VideoTransition
- func (d *DownTrack) ProvisionalAllocateGetCooperativeTransition(allowOvershoot bool) VideoTransition
- func (d *DownTrack) ProvisionalAllocatePrepare()
- func (d *DownTrack) RID() string
- func (d *DownTrack) Resync()
- func (d *DownTrack) SSRC() uint32
- func (d *DownTrack) SeedState(state DownTrackState)
- func (d *DownTrack) SetConnected()
- func (d *DownTrack) SetMaxSpatialLayer(spatialLayer int32)
- func (d *DownTrack) SetMaxTemporalLayer(temporalLayer int32)
- func (d *DownTrack) SetRTPHeaderExtensions(rtpHeaderExtensions []webrtc.RTPHeaderExtensionParameter)
- func (d *DownTrack) SetTransceiver(transceiver *webrtc.RTPTransceiver)
- func (d *DownTrack) Stop() error
- func (d *DownTrack) StreamID() string
- func (d *DownTrack) SubscriberID() livekit.ParticipantID
- func (d *DownTrack) TrackInfoAvailable()
- func (d *DownTrack) Unbind(_ webrtc.TrackLocalContext) error
- func (d *DownTrack) UpTrackBitrateAvailabilityChange()
- func (d *DownTrack) UpTrackLayersChange(availableLayers []int32, exemptedLayers []int32)
- func (d *DownTrack) WritePaddingRTP(bytesToSend int) int
- func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error
- type DownTrackSpreader
- func (d *DownTrackSpreader) Broadcast(writer func(TrackSender))
- func (d *DownTrackSpreader) DownTrackCount() int
- func (d *DownTrackSpreader) Free(subscriberID livekit.ParticipantID)
- func (d *DownTrackSpreader) GetDownTracks() []TrackSender
- func (d *DownTrackSpreader) HasDownTrack(subscriberID livekit.ParticipantID) bool
- func (d *DownTrackSpreader) ResetAndGetDownTracks() []TrackSender
- func (d *DownTrackSpreader) Store(ts TrackSender)
- type DownTrackSpreaderParams
- type DownTrackState
- type Event
- type Forwarder
- func (f *Forwarder) AllocateNextHigher(availableChannelCapacity int64, brs Bitrates, allowOvershoot bool) (VideoAllocation, bool)
- func (f *Forwarder) AllocateOptimal(brs Bitrates, allowOvershoot bool) VideoAllocation
- func (f *Forwarder) BandwidthRequested(brs Bitrates) int64
- func (f *Forwarder) CheckSync() (locked bool, layer int32)
- func (f *Forwarder) CurrentLayers() VideoLayers
- func (f *Forwarder) DetermineCodec(codec webrtc.RTPCodecCapability)
- func (f *Forwarder) DistanceToDesired() int32
- func (f *Forwarder) FilterRTX(nacks []uint16) (filtered []uint16, disallowedLayers [DefaultMaxLayerSpatial + 1]bool)
- func (f *Forwarder) GetForwardingStatus() ForwardingStatus
- func (f *Forwarder) GetNextHigherTransition(brs Bitrates, allowOvershoot bool) (VideoTransition, bool)
- func (f *Forwarder) GetPaddingVP8(frameEndNeeded bool) *buffer.VP8
- func (f *Forwarder) GetRTPMungerParams() RTPMungerParams
- func (f *Forwarder) GetSnTsForBlankFrames(frameRate uint32, numPackets int) ([]SnTs, bool, error)
- func (f *Forwarder) GetSnTsForPadding(num int) ([]SnTs, error)
- func (f *Forwarder) GetState() ForwarderState
- func (f *Forwarder) GetTranslationParams(extPkt *buffer.ExtPacket, layer int32) (*TranslationParams, error)
- func (f *Forwarder) IsDeficient() bool
- func (f *Forwarder) IsMuted() bool
- func (f *Forwarder) IsReducedQuality() (int32, bool)
- func (f *Forwarder) MaxLayers() VideoLayers
- func (f *Forwarder) Mute(muted bool) (bool, VideoLayers)
- func (f *Forwarder) Pause(brs Bitrates) VideoAllocation
- func (f *Forwarder) ProvisionalAllocate(availableChannelCapacity int64, layers VideoLayers, allowPause bool, ...) int64
- func (f *Forwarder) ProvisionalAllocateCommit() VideoAllocation
- func (f *Forwarder) ProvisionalAllocateGetBestWeightedTransition() VideoTransition
- func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition(allowOvershoot bool) VideoTransition
- func (f *Forwarder) ProvisionalAllocatePrepare(bitrates Bitrates)
- func (f *Forwarder) Resync()
- func (f *Forwarder) SeedState(state ForwarderState)
- func (f *Forwarder) SetMaxSpatialLayer(spatialLayer int32) (bool, VideoLayers, VideoLayers)
- func (f *Forwarder) SetMaxTemporalLayer(temporalLayer int32) (bool, VideoLayers, VideoLayers)
- func (f *Forwarder) TargetLayers() VideoLayers
- func (f *Forwarder) UpTrackLayersChange(availableLayers []int32, exemptedLayers []int32)
- type ForwarderState
- type ForwardingStatus
- type MaxDistanceSorter
- type MinDistanceSorter
- type ProbeClusterId
- type ProbeClusterInfo
- type Prober
- func (p *Prober) AddCluster(desiredRateBps int, expectedRateBps int, minDuration time.Duration, ...) ProbeClusterId
- func (p *Prober) IsRunning() bool
- func (p *Prober) OnProbeClusterDone(f func(info ProbeClusterInfo))
- func (p *Prober) OnSendProbe(f func(bytesToSend int))
- func (p *Prober) PacketSent(size int)
- func (p *Prober) ProbeSent(size int)
- func (p *Prober) Reset()
- type ProberParams
- type RTPMunger
- func (r *RTPMunger) FilterRTX(nacks []uint16) []uint16
- func (r *RTPMunger) GetLast() RTPMungerState
- func (r *RTPMunger) GetParams() RTPMungerParams
- func (r *RTPMunger) IsOnFrameBoundary() bool
- func (r *RTPMunger) PacketDropped(extPkt *buffer.ExtPacket)
- func (r *RTPMunger) SeedLast(state RTPMungerState)
- func (r *RTPMunger) SetLastSnTs(extPkt *buffer.ExtPacket)
- func (r *RTPMunger) UpdateAndGetPaddingSnTs(num int, clockRate uint32, frameRate uint32, forceMarker bool) ([]SnTs, error)
- func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationParamsRTP, error)
- func (r *RTPMunger) UpdateSnTsOffsets(extPkt *buffer.ExtPacket, snAdjust uint16, tsAdjust uint32)
- type RTPMungerParams
- type RTPMungerState
- type ReceiverOpts
- type ReceiverReportListener
- type RedPrimaryReceiver
- func (r *RedPrimaryReceiver) AddDownTrack(track TrackSender) error
- func (r *RedPrimaryReceiver) CanClose() bool
- func (r *RedPrimaryReceiver) Close()
- func (r *RedPrimaryReceiver) DeleteDownTrack(subscriberID livekit.ParticipantID)
- func (r *RedPrimaryReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int32)
- func (r *RedPrimaryReceiver) ReadRTP(buf []byte, layer uint8, sn uint16) (int, error)
- type RedReceiver
- func (r *RedReceiver) AddDownTrack(track TrackSender) error
- func (r *RedReceiver) CanClose() bool
- func (r *RedReceiver) Close()
- func (r *RedReceiver) DeleteDownTrack(subscriberID livekit.ParticipantID)
- func (r *RedReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int32)
- func (r *RedReceiver) ReadRTP(buf []byte, layer uint8, sn uint16) (int, error)
- type SequenceNumberOrdering
- type SnTs
- type StreamAllocator
- func (s *StreamAllocator) AddTrack(downTrack *DownTrack, params AddTrackParams)
- func (s *StreamAllocator) OnStreamStateChange(f func(update *StreamStateUpdate) error)
- func (s *StreamAllocator) RemoveTrack(downTrack *DownTrack)
- func (s *StreamAllocator) SetBandwidthEstimator(bwe cc.BandwidthEstimator)
- func (s *StreamAllocator) SetTrackPriority(downTrack *DownTrack, priority uint8)
- func (s *StreamAllocator) Start()
- func (s *StreamAllocator) Stop()
- type StreamAllocatorParams
- type StreamState
- type StreamStateInfo
- type StreamStateUpdate
- type StreamStatus
- type StreamTracker
- func (s *StreamTracker) BitrateTemporalCumulative() []int64
- func (s *StreamTracker) Observe(temporalLayer int32, pktSize int, payloadSize int)
- func (s *StreamTracker) OnBitrateAvailable(f func())
- func (s *StreamTracker) OnStatusChanged(f func(status StreamStatus))
- func (s *StreamTracker) Reset()
- func (s *StreamTracker) SetPaused(paused bool)
- func (s *StreamTracker) Start()
- func (s *StreamTracker) Status() StreamStatus
- func (s *StreamTracker) Stop()
- type StreamTrackerManager
- func (s *StreamTrackerManager) AddTracker(layer int32) *StreamTracker
- func (s *StreamTrackerManager) DistanceToDesired() int32
- func (s *StreamTrackerManager) GetAvailableLayers() ([]int32, []int32)
- func (s *StreamTrackerManager) GetLayerDimension(layer int32) (uint32, uint32)
- func (s *StreamTrackerManager) GetLayeredBitrate() Bitrates
- func (s *StreamTrackerManager) GetMaxExpectedLayer() int32
- func (s *StreamTrackerManager) GetTracker(layer int32) *StreamTracker
- func (s *StreamTrackerManager) HasSpatialLayer(layer int32) bool
- func (s *StreamTrackerManager) OnAvailableLayersChanged(f func(availableLayers []int32, exemptedLayers []int32))
- func (s *StreamTrackerManager) OnBitrateAvailabilityChanged(f func())
- func (s *StreamTrackerManager) OnMaxLayerChanged(f func(maxLayer int32))
- func (s *StreamTrackerManager) RemoveAllTrackers()
- func (s *StreamTrackerManager) RemoveTracker(layer int32)
- func (s *StreamTrackerManager) SetMaxExpectedSpatialLayer(layer int32) int32
- func (s *StreamTrackerManager) SetPaused(paused bool)
- type StreamTrackerParams
- type Track
- func (t *Track) AllocateNextHigher(availableChannelCapacity int64, allowOvershoot bool) (VideoAllocation, bool)
- func (t *Track) AllocateOptimal(allowOvershoot bool) VideoAllocation
- func (t *Track) BandwidthRequested() int64
- func (t *Track) DistanceToDesired() int32
- func (t *Track) DownTrack() *DownTrack
- func (t *Track) GetNackDelta() (uint32, uint32)
- func (t *Track) GetNextHigherTransition(allowOvershoot bool) (VideoTransition, bool)
- func (t *Track) ID() livekit.TrackID
- func (t *Track) IsDeficient() bool
- func (t *Track) IsManaged() bool
- func (t *Track) Pause() VideoAllocation
- func (t *Track) Priority() uint8
- func (t *Track) ProvisionalAllocate(availableChannelCapacity int64, layers VideoLayers, allowPause bool, ...) int64
- func (t *Track) ProvisionalAllocateCommit() VideoAllocation
- func (t *Track) ProvisionalAllocateGetBestWeightedTransition() VideoTransition
- func (t *Track) ProvisionalAllocateGetCooperativeTransition(allowOvershoot bool) VideoTransition
- func (t *Track) ProvisionalAllocatePrepare()
- func (t *Track) PublisherID() livekit.ParticipantID
- func (t *Track) SetDirty(isDirty bool) bool
- func (t *Track) SetMaxLayers(layers VideoLayers) bool
- func (t *Track) SetPriority(priority uint8) bool
- func (t *Track) WritePaddingRTP(bytesToSend int) int
- type TrackReceiver
- type TrackSender
- type TrackSorter
- type TranslationParams
- type TranslationParamsRTP
- type TranslationParamsVP8
- type TrendDetector
- func (t *TrendDetector) AddValue(value int64)
- func (t *TrendDetector) GetDirection() TrendDirection
- func (t *TrendDetector) GetHighest() int64
- func (t *TrendDetector) GetLowest() int64
- func (t *TrendDetector) GetValues() []int64
- func (t *TrendDetector) Seed(value int64)
- func (t *TrendDetector) ToString() string
- type TrendDetectorParams
- type TrendDirection
- type Uint16Wrapper
- type VP8Munger
- func (v *VP8Munger) GetLast() VP8MungerState
- func (v *VP8Munger) PictureIdOffset(extPictureId int32) (int32, bool)
- func (v *VP8Munger) SeedLast(state VP8MungerState)
- func (v *VP8Munger) SetLast(extPkt *buffer.ExtPacket)
- func (v *VP8Munger) UpdateAndGet(extPkt *buffer.ExtPacket, ordering SequenceNumberOrdering, ...) (*TranslationParamsVP8, error)
- func (v *VP8Munger) UpdateAndGetPadding(newPicture bool) *buffer.VP8
- func (v *VP8Munger) UpdateOffsets(extPkt *buffer.ExtPacket)
- type VP8MungerParams
- type VP8MungerState
- type VP8PictureIdWrapHandler
- type VideoAllocation
- type VideoAllocationProvisional
- type VideoAllocationState
- type VideoLayers
- type VideoStreamingChange
- type VideoTransition
- type WebRTCReceiver
- func (w *WebRTCReceiver) AddDownTrack(track TrackSender) error
- func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buffer)
- func (w *WebRTCReceiver) Codec() webrtc.RTPCodecParameters
- func (w *WebRTCReceiver) DebugInfo() map[string]interface{}
- func (w *WebRTCReceiver) DeleteDownTrack(subscriberID livekit.ParticipantID)
- func (w *WebRTCReceiver) GetAudioLevel() (float64, bool)
- func (w *WebRTCReceiver) GetConnectionScore() float32
- func (w *WebRTCReceiver) GetLayerDimension(layer int32) (uint32, uint32)
- func (w *WebRTCReceiver) GetLayeredBitrate() Bitrates
- func (w *WebRTCReceiver) GetPrimaryReceiverForRed() TrackReceiver
- func (w *WebRTCReceiver) GetRedReceiver() TrackReceiver
- func (w *WebRTCReceiver) GetTemporalLayerFpsForSpatial(layer int32) []float32
- func (w *WebRTCReceiver) GetTrackStats() *livekit.RTPStats
- func (w *WebRTCReceiver) HeaderExtensions() []webrtc.RTPHeaderExtensionParameter
- func (w *WebRTCReceiver) Kind() webrtc.RTPCodecType
- func (w *WebRTCReceiver) OnCloseHandler(fn func())
- func (w *WebRTCReceiver) OnMaxLayerChange(fn func(maxLayer int32))
- func (w *WebRTCReceiver) OnStatsUpdate(fn func(w *WebRTCReceiver, stat *livekit.AnalyticsStat))
- func (w *WebRTCReceiver) ReadRTP(buf []byte, layer uint8, sn uint16) (int, error)
- func (w *WebRTCReceiver) SSRC(layer int) uint32
- func (w *WebRTCReceiver) SendPLI(layer int32, force bool)
- func (w *WebRTCReceiver) SetMaxExpectedSpatialLayer(layer int32)
- func (w *WebRTCReceiver) SetRTCPCh(ch chan []rtcp.Packet)
- func (w *WebRTCReceiver) SetRTT(rtt uint32)
- func (w *WebRTCReceiver) SetUpTrackPaused(paused bool)
- func (w *WebRTCReceiver) StreamID() string
- func (w *WebRTCReceiver) TrackID() livekit.TrackID
- func (w *WebRTCReceiver) TrackInfo() *livekit.TrackInfo
Constants ¶
const ( RTPPaddingMaxPayloadSize = 255 RTPPaddingEstimatedHeaderSize = 20 RTPBlankFramesMuteSeconds = float32(1.0) RTPBlankFramesCloseSeconds = float32(0.2) FlagStopRTXOnPLI = true )
const ( FlagPauseOnDowngrade = true FlagFilterRTX = true TransitionCostSpatial = 10 )
Forwarder
const ( InvalidLayerSpatial = buffer.InvalidLayerSpatial InvalidLayerTemporal = buffer.InvalidLayerTemporal DefaultMaxLayerSpatial = buffer.DefaultMaxLayerSpatial DefaultMaxLayerTemporal = buffer.DefaultMaxLayerTemporal )
const ( RtxGateWindow = 2000 SnOffsetCacheSize = 4096 SnOffsetCacheMask = SnOffsetCacheSize - 1 )
const ( ChannelCapacityInfinity = 100 * 1000 * 1000 // 100 Mbps NackRatioAttenuator = 0.4 // how much to attenuate NACK ratio while calculating loss adjusted estimate ProbeWaitBase = 5 * time.Second ProbeBackoffFactor = 1.5 ProbeWaitMax = 30 * time.Second ProbeSettleWait = 250 ProbeTrendWait = 2 * time.Second ProbePct = 120 ProbeMinBps = 200 * 1000 // 200 kbps ProbeMinDuration = 20 * time.Second ProbeMaxDuration = 21 * time.Second PriorityMin = uint8(1) PriorityMax = uint8(255) PriorityDefaultVideo = PriorityMin FlagAllowOvershootWhileOptimal = true FlagAllowOvershootWhileDeficient = false FlagAllowOvershootExemptTrackWhileDeficient = true FlagAllowOvershootInProbe = true FlagAllowOvershootInCatchup = true )
const (
MimeTypeAudioRed = "audio/red"
)
Variables ¶
var ( ErrUnknownKind = errors.New("unknown kind of codec") ErrOutOfOrderSequenceNumberCacheMiss = errors.New("out-of-order sequence number not found in cache") ErrPaddingOnlyPacket = errors.New("padding only packet that need not be forwarded") ErrDuplicatePacket = errors.New("duplicate packet") ErrPaddingNotOnFrameBoundary = errors.New("padding cannot send on non-frame boundary") ErrNotVP8 = errors.New("not VP8") ErrOutOfOrderVP8PictureIdCacheMiss = errors.New("out-of-order VP8 picture id not found in cache") ErrFilteredVP8TemporalLayer = errors.New("filtered VP8 temporal layer") ErrDownTrackAlreadyBound = errors.New("already bound") ErrDownTrackClosed = errors.New("downtrack closed") )
var ( VP8KeyFrame8x8 = []byte{ 0x10, 0x02, 0x00, 0x9d, 0x01, 0x2a, 0x08, 0x00, 0x08, 0x00, 0x00, 0x47, 0x08, 0x85, 0x85, 0x88, 0x85, 0x84, 0x88, 0x02, 0x02, 0x00, 0x0c, 0x0d, 0x60, 0x00, 0xfe, 0xff, 0xab, 0x50, 0x80, } H264KeyFrame2x2SPS = []byte{ 0x67, 0x42, 0xc0, 0x1f, 0x0f, 0xd9, 0x1f, 0x88, 0x88, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04, 0x00, 0x00, 0x03, 0x00, 0xc8, 0x3c, 0x60, 0xc9, 0x20, } H264KeyFrame2x2PPS = []byte{ 0x68, 0x87, 0xcb, 0x83, 0xcb, 0x20, } H264KeyFrame2x2IDR = []byte{ 0x65, 0x88, 0x84, 0x0a, 0xf2, 0x62, 0x80, 0x00, 0xa7, 0xbe, } H264KeyFrame2x2 = [][]byte{H264KeyFrame2x2SPS, H264KeyFrame2x2PPS, H264KeyFrame2x2IDR} OpusSilenceFrame = []byte{ 0xf8, 0xff, 0xfe, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, } )
var ( ErrReceiverClosed = errors.New("receiver closed") ErrDownTrackAlreadyExist = errors.New("DownTrack already exist") )
var ( ErrIncompleteRedHeader = errors.New("incomplete red block header") ErrIncompleteRedBlock = errors.New("incomplete red block payload") )
var ( ChannelObserverParamsProbe = ChannelObserverParams{ Name: "probe", EstimateRequiredSamples: 3, EstimateDownwardTrendThreshold: 0.0, EstimateCollapseValues: false, NackWindowMinDuration: 500 * time.Millisecond, NackWindowMaxDuration: 1 * time.Second, NackRatioThreshold: 0.04, } ChannelObserverParamsNonProbe = ChannelObserverParams{ Name: "non-probe", EstimateRequiredSamples: 8, EstimateDownwardTrendThreshold: -0.5, EstimateCollapseValues: true, NackWindowMinDuration: 1 * time.Second, NackWindowMaxDuration: 2 * time.Second, NackRatioThreshold: 0.08, } )
var ( ExemptedLayersVideo = []int32{} )
var ( ConfigVideo = []StreamTrackerParams{ { SamplesRequired: 1, CyclesRequired: 4, CycleDuration: 500 * time.Millisecond, BitrateReportInterval: 1 * time.Second, }, { SamplesRequired: 5, CyclesRequired: 20, CycleDuration: 500 * time.Millisecond, BitrateReportInterval: 1 * time.Second, }, { SamplesRequired: 5, CyclesRequired: 20, CycleDuration: 500 * time.Millisecond, BitrateReportInterval: 1 * time.Second, }, } ConfigScreenshare = []StreamTrackerParams{ { SamplesRequired: 1, CyclesRequired: 1, CycleDuration: 2 * time.Second, BitrateReportInterval: 4 * time.Second, }, { SamplesRequired: 1, CyclesRequired: 1, CycleDuration: 2 * time.Second, BitrateReportInterval: 4 * time.Second, }, { SamplesRequired: 1, CyclesRequired: 1, CycleDuration: 2 * time.Second, BitrateReportInterval: 4 * time.Second, }, } )
var (
InvalidLayers = buffer.InvalidLayers
)
var (
PacketFactory *sync.Pool
)
var ( VideoAllocationDefault = VideoAllocation{ // contains filtered or unexported fields } )
Functions ¶
func IsRedCodec ¶
func IsSvcCodec ¶
Types ¶
type AddTrackParams ¶
type AddTrackParams struct { Source livekit.TrackSource Priority uint8 IsSimulcast bool PublisherID livekit.ParticipantID }
type AudioLevelHandle ¶
type Bitrates ¶
type Bitrates [DefaultMaxLayerSpatial + 1][DefaultMaxLayerTemporal + 1]int64
type ChannelCongestionReason ¶
type ChannelCongestionReason int
const ( ChannelCongestionReasonNone ChannelCongestionReason = iota ChannelCongestionReasonEstimate ChannelCongestionReasonLoss )
func (ChannelCongestionReason) String ¶
func (c ChannelCongestionReason) String() string
type ChannelObserver ¶
type ChannelObserver struct {
// contains filtered or unexported fields
}
func NewChannelObserver ¶
func NewChannelObserver(params ChannelObserverParams, logger logger.Logger) *ChannelObserver
func (*ChannelObserver) AddEstimate ¶
func (c *ChannelObserver) AddEstimate(estimate int64)
func (*ChannelObserver) AddNack ¶
func (c *ChannelObserver) AddNack(packets uint32, repeatedNacks uint32)
func (*ChannelObserver) GetHighestEstimate ¶
func (c *ChannelObserver) GetHighestEstimate() int64
func (*ChannelObserver) GetLowestEstimate ¶
func (c *ChannelObserver) GetLowestEstimate() int64
func (*ChannelObserver) GetNackRatio ¶
func (c *ChannelObserver) GetNackRatio() (uint32, uint32, float64)
func (*ChannelObserver) GetTrend ¶
func (c *ChannelObserver) GetTrend() (ChannelTrend, ChannelCongestionReason)
func (*ChannelObserver) SeedEstimate ¶
func (c *ChannelObserver) SeedEstimate(estimate int64)
func (*ChannelObserver) SeedNack ¶
func (c *ChannelObserver) SeedNack(packets uint32, repeatedNacks uint32)
type ChannelObserverParams ¶
type ChannelTrend ¶
type ChannelTrend int
const ( ChannelTrendNeutral ChannelTrend = iota ChannelTrendClearing ChannelTrendCongesting )
func (ChannelTrend) String ¶
func (c ChannelTrend) String() string
type Cluster ¶
type Cluster struct {
// contains filtered or unexported fields
}
func NewCluster ¶
func (*Cluster) GetInfo ¶
func (c *Cluster) GetInfo() ProbeClusterInfo
func (*Cluster) GetSleepDuration ¶
func (*Cluster) IsFinished ¶
func (*Cluster) PacketSent ¶
type DDVideoLayerSelector ¶
type DDVideoLayerSelector struct {
// contains filtered or unexported fields
}
func NewDDVideoLayerSelector ¶
func NewDDVideoLayerSelector(logger logger.Logger) *DDVideoLayerSelector
func (*DDVideoLayerSelector) Select ¶
func (s *DDVideoLayerSelector) Select(expPkt *buffer.ExtPacket, tp *TranslationParams) (selected bool)
func (*DDVideoLayerSelector) SelectLayer ¶
func (s *DDVideoLayerSelector) SelectLayer(layer VideoLayers)
type DownTrack ¶
type DownTrack struct {
// contains filtered or unexported fields
}
DownTrack implements TrackLocal, is the track used to write packets to SFU Subscriber, the track handle the packets for simple, simulcast and SVC Publisher. A DownTrack has the following lifecycle - new - bound / unbound - closed once closed, a DownTrack cannot be re-used.
func NewDownTrack ¶
func NewDownTrack( codecs []webrtc.RTPCodecParameters, r TrackReceiver, bf *buffer.Factory, subID livekit.ParticipantID, mt int, logger logger.Logger, ) (*DownTrack, error)
NewDownTrack returns a DownTrack.
func (*DownTrack) AddReceiverReportListener ¶
func (d *DownTrack) AddReceiverReportListener(listener ReceiverReportListener)
func (*DownTrack) AllocateNextHigher ¶
func (d *DownTrack) AllocateNextHigher(availableChannelCapacity int64, allowOvershoot bool) (VideoAllocation, bool)
func (*DownTrack) AllocateOptimal ¶
func (d *DownTrack) AllocateOptimal(allowOvershoot bool) VideoAllocation
func (*DownTrack) BandwidthRequested ¶
func (*DownTrack) Bind ¶
Bind is called by the PeerConnection after negotiation is complete This asserts that the code requested is supported by the remote peer. If so it sets up all the state (SSRC and PayloadType) to have a call
func (*DownTrack) CloseWithFlush ¶
Close track, flush used to indicate whether send blank frame to flush decoder of client.
- When transceiver is reused by other participant's video track, set flush=true to avoid previous video shows before previous stream is displayed.
- in case of session migration, participant migrate from other node, video track should be resumed with same participant, set flush=false since we don't need to flush decoder.
func (*DownTrack) Codec ¶
func (d *DownTrack) Codec() webrtc.RTPCodecCapability
Codec returns current track codec capability
func (*DownTrack) CreateSenderReport ¶
func (d *DownTrack) CreateSenderReport() *rtcp.SenderReport
func (*DownTrack) CreateSourceDescriptionChunks ¶
func (d *DownTrack) CreateSourceDescriptionChunks() []rtcp.SourceDescriptionChunk
func (*DownTrack) DistanceToDesired ¶
func (*DownTrack) GetConnectionScore ¶
func (*DownTrack) GetForwardingStatus ¶
func (d *DownTrack) GetForwardingStatus() ForwardingStatus
func (*DownTrack) GetNackStats ¶
func (*DownTrack) GetNextHigherTransition ¶
func (d *DownTrack) GetNextHigherTransition(allowOvershoot bool) (VideoTransition, bool)
func (*DownTrack) GetState ¶
func (d *DownTrack) GetState() DownTrackState
func (*DownTrack) GetTrackStats ¶
func (*DownTrack) GetTransceiver ¶
func (d *DownTrack) GetTransceiver() *webrtc.RTPTransceiver
func (*DownTrack) ID ¶
ID is the unique identifier for this Track. This should be unique for the stream, but doesn't have to globally unique. A common example would be 'audio' or 'video' and StreamID would be 'desktop' or 'webcam'
func (*DownTrack) IsDeficient ¶
func (*DownTrack) Kind ¶
func (d *DownTrack) Kind() webrtc.RTPCodecType
Kind controls if this TrackLocal is audio or video
func (*DownTrack) MaxLayers ¶
func (d *DownTrack) MaxLayers() VideoLayers
func (*DownTrack) OnAvailableLayersChanged ¶
func (*DownTrack) OnBitrateAvailabilityChanged ¶
func (*DownTrack) OnCloseHandler ¶
OnCloseHandler method to be called on remote tracked removed
func (*DownTrack) OnMaxLayerChanged ¶
func (*DownTrack) OnPacketSent ¶
func (*DownTrack) OnPaddingSent ¶
func (*DownTrack) OnREMB ¶
func (d *DownTrack) OnREMB(fn func(dt *DownTrack, remb *rtcp.ReceiverEstimatedMaximumBitrate))
func (*DownTrack) OnRttUpdate ¶
func (*DownTrack) OnStatsUpdate ¶
func (d *DownTrack) OnStatsUpdate(fn func(dt *DownTrack, stat *livekit.AnalyticsStat))
func (*DownTrack) OnSubscribedLayersChanged ¶
func (d *DownTrack) OnSubscribedLayersChanged(fn func(dt *DownTrack, layers VideoLayers))
func (*DownTrack) OnSubscriptionChanged ¶
func (*DownTrack) OnTransportCCFeedback ¶
func (d *DownTrack) OnTransportCCFeedback(fn func(dt *DownTrack, cc *rtcp.TransportLayerCC))
func (*DownTrack) Pause ¶
func (d *DownTrack) Pause() VideoAllocation
func (*DownTrack) ProvisionalAllocate ¶
func (*DownTrack) ProvisionalAllocateCommit ¶
func (d *DownTrack) ProvisionalAllocateCommit() VideoAllocation
func (*DownTrack) ProvisionalAllocateGetBestWeightedTransition ¶
func (d *DownTrack) ProvisionalAllocateGetBestWeightedTransition() VideoTransition
func (*DownTrack) ProvisionalAllocateGetCooperativeTransition ¶
func (d *DownTrack) ProvisionalAllocateGetCooperativeTransition(allowOvershoot bool) VideoTransition
func (*DownTrack) ProvisionalAllocatePrepare ¶
func (d *DownTrack) ProvisionalAllocatePrepare()
func (*DownTrack) SeedState ¶
func (d *DownTrack) SeedState(state DownTrackState)
func (*DownTrack) SetConnected ¶
func (d *DownTrack) SetConnected()
func (*DownTrack) SetMaxSpatialLayer ¶
func (*DownTrack) SetMaxTemporalLayer ¶
func (*DownTrack) SetRTPHeaderExtensions ¶
func (d *DownTrack) SetRTPHeaderExtensions(rtpHeaderExtensions []webrtc.RTPHeaderExtensionParameter)
Sets RTP header extensions for this track
func (*DownTrack) SetTransceiver ¶
func (d *DownTrack) SetTransceiver(transceiver *webrtc.RTPTransceiver)
func (*DownTrack) SubscriberID ¶
func (d *DownTrack) SubscriberID() livekit.ParticipantID
func (*DownTrack) TrackInfoAvailable ¶
func (d *DownTrack) TrackInfoAvailable()
func (*DownTrack) Unbind ¶
Unbind implements the teardown logic when the track is no longer needed. This happens because a track has been stopped.
func (*DownTrack) UpTrackBitrateAvailabilityChange ¶
func (d *DownTrack) UpTrackBitrateAvailabilityChange()
func (*DownTrack) UpTrackLayersChange ¶
func (*DownTrack) WritePaddingRTP ¶
WritePaddingRTP tries to write as many padding only RTP packets as necessary to satisfy given size to the DownTrack
type DownTrackSpreader ¶
type DownTrackSpreader struct {
// contains filtered or unexported fields
}
func NewDownTrackSpreader ¶
func NewDownTrackSpreader(params DownTrackSpreaderParams) *DownTrackSpreader
func (*DownTrackSpreader) Broadcast ¶
func (d *DownTrackSpreader) Broadcast(writer func(TrackSender))
func (*DownTrackSpreader) DownTrackCount ¶
func (d *DownTrackSpreader) DownTrackCount() int
func (*DownTrackSpreader) Free ¶
func (d *DownTrackSpreader) Free(subscriberID livekit.ParticipantID)
func (*DownTrackSpreader) GetDownTracks ¶
func (d *DownTrackSpreader) GetDownTracks() []TrackSender
func (*DownTrackSpreader) HasDownTrack ¶
func (d *DownTrackSpreader) HasDownTrack(subscriberID livekit.ParticipantID) bool
func (*DownTrackSpreader) ResetAndGetDownTracks ¶
func (d *DownTrackSpreader) ResetAndGetDownTracks() []TrackSender
func (*DownTrackSpreader) Store ¶
func (d *DownTrackSpreader) Store(ts TrackSender)
type DownTrackSpreaderParams ¶
type DownTrackState ¶
type DownTrackState struct { RTPStats *buffer.RTPStats DeltaStatsSnapshotId uint32 ForwarderState ForwarderState }
func (DownTrackState) String ¶
func (d DownTrackState) String() string
type Forwarder ¶
type Forwarder struct {
// contains filtered or unexported fields
}
func NewForwarder ¶
func (*Forwarder) AllocateNextHigher ¶
func (*Forwarder) AllocateOptimal ¶
func (f *Forwarder) AllocateOptimal(brs Bitrates, allowOvershoot bool) VideoAllocation
func (*Forwarder) BandwidthRequested ¶
func (*Forwarder) CurrentLayers ¶
func (f *Forwarder) CurrentLayers() VideoLayers
func (*Forwarder) DetermineCodec ¶
func (f *Forwarder) DetermineCodec(codec webrtc.RTPCodecCapability)
func (*Forwarder) DistanceToDesired ¶
func (*Forwarder) FilterRTX ¶
func (f *Forwarder) FilterRTX(nacks []uint16) (filtered []uint16, disallowedLayers [DefaultMaxLayerSpatial + 1]bool)
func (*Forwarder) GetForwardingStatus ¶
func (f *Forwarder) GetForwardingStatus() ForwardingStatus
func (*Forwarder) GetNextHigherTransition ¶
func (f *Forwarder) GetNextHigherTransition(brs Bitrates, allowOvershoot bool) (VideoTransition, bool)
func (*Forwarder) GetPaddingVP8 ¶
func (*Forwarder) GetRTPMungerParams ¶
func (f *Forwarder) GetRTPMungerParams() RTPMungerParams
func (*Forwarder) GetSnTsForBlankFrames ¶
func (*Forwarder) GetSnTsForPadding ¶
func (*Forwarder) GetState ¶
func (f *Forwarder) GetState() ForwarderState
func (*Forwarder) GetTranslationParams ¶
func (*Forwarder) IsDeficient ¶
func (*Forwarder) IsReducedQuality ¶
func (*Forwarder) MaxLayers ¶
func (f *Forwarder) MaxLayers() VideoLayers
func (*Forwarder) Pause ¶
func (f *Forwarder) Pause(brs Bitrates) VideoAllocation
func (*Forwarder) ProvisionalAllocate ¶
func (*Forwarder) ProvisionalAllocateCommit ¶
func (f *Forwarder) ProvisionalAllocateCommit() VideoAllocation
func (*Forwarder) ProvisionalAllocateGetBestWeightedTransition ¶
func (f *Forwarder) ProvisionalAllocateGetBestWeightedTransition() VideoTransition
func (*Forwarder) ProvisionalAllocateGetCooperativeTransition ¶
func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition(allowOvershoot bool) VideoTransition
func (*Forwarder) ProvisionalAllocatePrepare ¶
func (*Forwarder) SeedState ¶
func (f *Forwarder) SeedState(state ForwarderState)
func (*Forwarder) SetMaxSpatialLayer ¶
func (f *Forwarder) SetMaxSpatialLayer(spatialLayer int32) (bool, VideoLayers, VideoLayers)
func (*Forwarder) SetMaxTemporalLayer ¶
func (f *Forwarder) SetMaxTemporalLayer(temporalLayer int32) (bool, VideoLayers, VideoLayers)
func (*Forwarder) TargetLayers ¶
func (f *Forwarder) TargetLayers() VideoLayers
func (*Forwarder) UpTrackLayersChange ¶
type ForwarderState ¶
type ForwarderState struct { LastTSCalc int64 RTP RTPMungerState VP8 VP8MungerState }
func (ForwarderState) String ¶
func (f ForwarderState) String() string
type ForwardingStatus ¶
type ForwardingStatus int
const ( ForwardingStatusOff ForwardingStatus = iota ForwardingStatusPartial ForwardingStatusOptimal )
type MaxDistanceSorter ¶
type MaxDistanceSorter []*Track
func (MaxDistanceSorter) Len ¶
func (m MaxDistanceSorter) Len() int
func (MaxDistanceSorter) Less ¶
func (m MaxDistanceSorter) Less(i, j int) bool
func (MaxDistanceSorter) Swap ¶
func (m MaxDistanceSorter) Swap(i, j int)
type MinDistanceSorter ¶
type MinDistanceSorter []*Track
func (MinDistanceSorter) Len ¶
func (m MinDistanceSorter) Len() int
func (MinDistanceSorter) Less ¶
func (m MinDistanceSorter) Less(i, j int) bool
func (MinDistanceSorter) Swap ¶
func (m MinDistanceSorter) Swap(i, j int)
type ProbeClusterInfo ¶
type ProbeClusterInfo struct { Id ProbeClusterId BytesSent int Duration time.Duration }
type Prober ¶
type Prober struct {
// contains filtered or unexported fields
}
func NewProber ¶
func NewProber(params ProberParams) *Prober
func (*Prober) AddCluster ¶
func (*Prober) OnProbeClusterDone ¶
func (p *Prober) OnProbeClusterDone(f func(info ProbeClusterInfo))
func (*Prober) OnSendProbe ¶
func (*Prober) PacketSent ¶
type ProberParams ¶
type RTPMunger ¶
type RTPMunger struct { RTPMungerParams // contains filtered or unexported fields }
func NewRTPMunger ¶
func (*RTPMunger) GetLast ¶
func (r *RTPMunger) GetLast() RTPMungerState
func (*RTPMunger) GetParams ¶
func (r *RTPMunger) GetParams() RTPMungerParams
func (*RTPMunger) IsOnFrameBoundary ¶
func (*RTPMunger) PacketDropped ¶
func (*RTPMunger) SeedLast ¶
func (r *RTPMunger) SeedLast(state RTPMungerState)
func (*RTPMunger) SetLastSnTs ¶
func (*RTPMunger) UpdateAndGetPaddingSnTs ¶
func (*RTPMunger) UpdateAndGetSnTs ¶
func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationParamsRTP, error)
type RTPMungerParams ¶
type RTPMungerParams struct {
// contains filtered or unexported fields
}
type RTPMungerState ¶
func (RTPMungerState) String ¶
func (r RTPMungerState) String() string
type ReceiverOpts ¶
type ReceiverOpts func(w *WebRTCReceiver) *WebRTCReceiver
func WithAudioConfig ¶
func WithAudioConfig(audioConfig config.AudioConfig) ReceiverOpts
WithAudioConfig sets up parameters for active speaker detection
func WithLoadBalanceThreshold ¶
func WithLoadBalanceThreshold(downTracks int) ReceiverOpts
WithLoadBalanceThreshold enables parallelization of packet writes when downTracks exceeds threshold Value should be between 3 and 150. For a server handling a few large rooms, use a smaller value (required to handle very large (250+ participant) rooms). For a server handling many small rooms, use a larger value or disable. Set to 0 (disabled) by default.
func WithPliThrottleConfig ¶
func WithPliThrottleConfig(pliThrottleConfig config.PLIThrottleConfig) ReceiverOpts
WithPliThrottleConfig indicates minimum time(ms) between sending PLIs
func WithStreamTrackers ¶
func WithStreamTrackers() ReceiverOpts
WithStreamTrackers enables StreamTracker use for simulcast
type ReceiverReportListener ¶
type ReceiverReportListener func(dt *DownTrack, report *rtcp.ReceiverReport)
type RedPrimaryReceiver ¶
type RedPrimaryReceiver struct { TrackReceiver // contains filtered or unexported fields }
func NewRedPrimaryReceiver ¶
func NewRedPrimaryReceiver(receiver TrackReceiver, dsp DownTrackSpreaderParams) *RedPrimaryReceiver
func (*RedPrimaryReceiver) AddDownTrack ¶
func (r *RedPrimaryReceiver) AddDownTrack(track TrackSender) error
func (*RedPrimaryReceiver) CanClose ¶
func (r *RedPrimaryReceiver) CanClose() bool
func (*RedPrimaryReceiver) Close ¶
func (r *RedPrimaryReceiver) Close()
func (*RedPrimaryReceiver) DeleteDownTrack ¶
func (r *RedPrimaryReceiver) DeleteDownTrack(subscriberID livekit.ParticipantID)
func (*RedPrimaryReceiver) ForwardRTP ¶
func (r *RedPrimaryReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int32)
type RedReceiver ¶
type RedReceiver struct { TrackReceiver // contains filtered or unexported fields }
func NewRedReceiver ¶
func NewRedReceiver(receiver TrackReceiver, dsp DownTrackSpreaderParams) *RedReceiver
func (*RedReceiver) AddDownTrack ¶
func (r *RedReceiver) AddDownTrack(track TrackSender) error
func (*RedReceiver) CanClose ¶
func (r *RedReceiver) CanClose() bool
func (*RedReceiver) Close ¶
func (r *RedReceiver) Close()
func (*RedReceiver) DeleteDownTrack ¶
func (r *RedReceiver) DeleteDownTrack(subscriberID livekit.ParticipantID)
func (*RedReceiver) ForwardRTP ¶
func (r *RedReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int32)
type SequenceNumberOrdering ¶
type SequenceNumberOrdering int
RTPMunger
const ( SequenceNumberOrderingContiguous SequenceNumberOrdering = iota SequenceNumberOrderingOutOfOrder SequenceNumberOrderingGap SequenceNumberOrderingDuplicate )
type StreamAllocator ¶
type StreamAllocator struct {
// contains filtered or unexported fields
}
func NewStreamAllocator ¶
func NewStreamAllocator(params StreamAllocatorParams) *StreamAllocator
func (*StreamAllocator) AddTrack ¶
func (s *StreamAllocator) AddTrack(downTrack *DownTrack, params AddTrackParams)
func (*StreamAllocator) OnStreamStateChange ¶
func (s *StreamAllocator) OnStreamStateChange(f func(update *StreamStateUpdate) error)
func (*StreamAllocator) RemoveTrack ¶
func (s *StreamAllocator) RemoveTrack(downTrack *DownTrack)
func (*StreamAllocator) SetBandwidthEstimator ¶
func (s *StreamAllocator) SetBandwidthEstimator(bwe cc.BandwidthEstimator)
func (*StreamAllocator) SetTrackPriority ¶
func (s *StreamAllocator) SetTrackPriority(downTrack *DownTrack, priority uint8)
func (*StreamAllocator) Start ¶
func (s *StreamAllocator) Start()
func (*StreamAllocator) Stop ¶
func (s *StreamAllocator) Stop()
type StreamAllocatorParams ¶
type StreamAllocatorParams struct { Config config.CongestionControlConfig Logger logger.Logger }
type StreamState ¶
type StreamState int
const ( StreamStateActive StreamState = iota StreamStatePaused )
func (StreamState) String ¶
func (s StreamState) String() string
type StreamStateInfo ¶
type StreamStateInfo struct { ParticipantID livekit.ParticipantID TrackID livekit.TrackID State StreamState }
type StreamStateUpdate ¶
type StreamStateUpdate struct {
StreamStates []*StreamStateInfo
}
func NewStreamStateUpdate ¶
func NewStreamStateUpdate() *StreamStateUpdate
func (*StreamStateUpdate) Empty ¶
func (s *StreamStateUpdate) Empty() bool
func (*StreamStateUpdate) HandleStreamingChange ¶
func (s *StreamStateUpdate) HandleStreamingChange(change VideoStreamingChange, track *Track)
type StreamStatus ¶
type StreamStatus int32
const ( StreamStatusStopped StreamStatus = 0 StreamStatusActive StreamStatus = 1 )
func (StreamStatus) String ¶
func (s StreamStatus) String() string
type StreamTracker ¶
type StreamTracker struct {
// contains filtered or unexported fields
}
StreamTracker keeps track of packet flow and ensures a particular up track is consistently producing It runs its own goroutine for detection, and fires OnStatusChanged callback
func NewStreamTracker ¶
func NewStreamTracker(params StreamTrackerParams) *StreamTracker
func (*StreamTracker) BitrateTemporalCumulative ¶
func (s *StreamTracker) BitrateTemporalCumulative() []int64
BitrateTemporalCumulative returns the current stream bitrate temporal layer accumulated with lower temporal layers.
func (*StreamTracker) Observe ¶
func (s *StreamTracker) Observe(temporalLayer int32, pktSize int, payloadSize int)
Observe a packet that's received
func (*StreamTracker) OnBitrateAvailable ¶
func (s *StreamTracker) OnBitrateAvailable(f func())
func (*StreamTracker) OnStatusChanged ¶
func (s *StreamTracker) OnStatusChanged(f func(status StreamStatus))
func (*StreamTracker) Reset ¶
func (s *StreamTracker) Reset()
func (*StreamTracker) SetPaused ¶
func (s *StreamTracker) SetPaused(paused bool)
func (*StreamTracker) Start ¶
func (s *StreamTracker) Start()
func (*StreamTracker) Status ¶
func (s *StreamTracker) Status() StreamStatus
func (*StreamTracker) Stop ¶
func (s *StreamTracker) Stop()
type StreamTrackerManager ¶
type StreamTrackerManager struct {
// contains filtered or unexported fields
}
func NewStreamTrackerManager ¶
func (*StreamTrackerManager) AddTracker ¶
func (s *StreamTrackerManager) AddTracker(layer int32) *StreamTracker
func (*StreamTrackerManager) DistanceToDesired ¶
func (s *StreamTrackerManager) DistanceToDesired() int32
func (*StreamTrackerManager) GetAvailableLayers ¶
func (s *StreamTrackerManager) GetAvailableLayers() ([]int32, []int32)
func (*StreamTrackerManager) GetLayerDimension ¶
func (s *StreamTrackerManager) GetLayerDimension(layer int32) (uint32, uint32)
func (*StreamTrackerManager) GetLayeredBitrate ¶
func (s *StreamTrackerManager) GetLayeredBitrate() Bitrates
func (*StreamTrackerManager) GetMaxExpectedLayer ¶
func (s *StreamTrackerManager) GetMaxExpectedLayer() int32
func (*StreamTrackerManager) GetTracker ¶
func (s *StreamTrackerManager) GetTracker(layer int32) *StreamTracker
func (*StreamTrackerManager) HasSpatialLayer ¶
func (s *StreamTrackerManager) HasSpatialLayer(layer int32) bool
func (*StreamTrackerManager) OnAvailableLayersChanged ¶
func (s *StreamTrackerManager) OnAvailableLayersChanged(f func(availableLayers []int32, exemptedLayers []int32))
func (*StreamTrackerManager) OnBitrateAvailabilityChanged ¶
func (s *StreamTrackerManager) OnBitrateAvailabilityChanged(f func())
func (*StreamTrackerManager) OnMaxLayerChanged ¶
func (s *StreamTrackerManager) OnMaxLayerChanged(f func(maxLayer int32))
func (*StreamTrackerManager) RemoveAllTrackers ¶
func (s *StreamTrackerManager) RemoveAllTrackers()
func (*StreamTrackerManager) RemoveTracker ¶
func (s *StreamTrackerManager) RemoveTracker(layer int32)
func (*StreamTrackerManager) SetMaxExpectedSpatialLayer ¶
func (s *StreamTrackerManager) SetMaxExpectedSpatialLayer(layer int32) int32
func (*StreamTrackerManager) SetPaused ¶
func (s *StreamTrackerManager) SetPaused(paused bool)
type StreamTrackerParams ¶
type Track ¶
type Track struct {
// contains filtered or unexported fields
}
func (*Track) AllocateNextHigher ¶
func (t *Track) AllocateNextHigher(availableChannelCapacity int64, allowOvershoot bool) (VideoAllocation, bool)
func (*Track) AllocateOptimal ¶
func (t *Track) AllocateOptimal(allowOvershoot bool) VideoAllocation
func (*Track) BandwidthRequested ¶
func (*Track) DistanceToDesired ¶
func (*Track) GetNackDelta ¶
func (*Track) GetNextHigherTransition ¶
func (t *Track) GetNextHigherTransition(allowOvershoot bool) (VideoTransition, bool)
func (*Track) IsDeficient ¶
func (*Track) Pause ¶
func (t *Track) Pause() VideoAllocation
func (*Track) ProvisionalAllocate ¶
func (*Track) ProvisionalAllocateCommit ¶
func (t *Track) ProvisionalAllocateCommit() VideoAllocation
func (*Track) ProvisionalAllocateGetBestWeightedTransition ¶
func (t *Track) ProvisionalAllocateGetBestWeightedTransition() VideoTransition
func (*Track) ProvisionalAllocateGetCooperativeTransition ¶
func (t *Track) ProvisionalAllocateGetCooperativeTransition(allowOvershoot bool) VideoTransition
func (*Track) ProvisionalAllocatePrepare ¶
func (t *Track) ProvisionalAllocatePrepare()
func (*Track) PublisherID ¶
func (t *Track) PublisherID() livekit.ParticipantID
func (*Track) SetMaxLayers ¶
func (t *Track) SetMaxLayers(layers VideoLayers) bool
func (*Track) SetPriority ¶
func (*Track) WritePaddingRTP ¶
type TrackReceiver ¶
type TrackReceiver interface { TrackID() livekit.TrackID StreamID() string Codec() webrtc.RTPCodecParameters HeaderExtensions() []webrtc.RTPHeaderExtensionParameter ReadRTP(buf []byte, layer uint8, sn uint16) (int, error) GetLayeredBitrate() Bitrates GetAudioLevel() (float64, bool) SendPLI(layer int32, force bool) SetUpTrackPaused(paused bool) SetMaxExpectedSpatialLayer(layer int32) AddDownTrack(track TrackSender) error DeleteDownTrack(participantID livekit.ParticipantID) DebugInfo() map[string]interface{} GetLayerDimension(layer int32) (uint32, uint32) TrackInfo() *livekit.TrackInfo // Get primary receiver if this receiver represents a RED codec; otherwise it will return itself GetPrimaryReceiverForRed() TrackReceiver // Get red receiver for primary codec, used by forward red encodings for opus only codec GetRedReceiver() TrackReceiver GetTemporalLayerFpsForSpatial(layer int32) []float32 }
TrackReceiver defines an interface receive media from remote peer
type TrackSender ¶
type TrackSender interface { UpTrackLayersChange(availableLayers []int32, exemptedLayers []int32) UpTrackBitrateAvailabilityChange() WriteRTP(p *buffer.ExtPacket, layer int32) error Close() IsClosed() bool // ID is the globally unique identifier for this Track. ID() string SubscriberID() livekit.ParticipantID TrackInfoAvailable() }
TrackSender defines an interface send media to remote peer
type TrackSorter ¶
type TrackSorter []*Track
func (TrackSorter) Len ¶
func (t TrackSorter) Len() int
func (TrackSorter) Less ¶
func (t TrackSorter) Less(i, j int) bool
func (TrackSorter) Swap ¶
func (t TrackSorter) Swap(i, j int)
type TranslationParams ¶
type TranslationParams struct {
// contains filtered or unexported fields
}
type TranslationParamsRTP ¶
type TranslationParamsRTP struct {
// contains filtered or unexported fields
}
type TrendDetector ¶
type TrendDetector struct {
// contains filtered or unexported fields
}
func NewTrendDetector ¶
func NewTrendDetector(params TrendDetectorParams) *TrendDetector
func (*TrendDetector) AddValue ¶
func (t *TrendDetector) AddValue(value int64)
func (*TrendDetector) GetDirection ¶
func (t *TrendDetector) GetDirection() TrendDirection
func (*TrendDetector) GetHighest ¶
func (t *TrendDetector) GetHighest() int64
func (*TrendDetector) GetLowest ¶
func (t *TrendDetector) GetLowest() int64
func (*TrendDetector) GetValues ¶
func (t *TrendDetector) GetValues() []int64
func (*TrendDetector) Seed ¶
func (t *TrendDetector) Seed(value int64)
func (*TrendDetector) ToString ¶
func (t *TrendDetector) ToString() string
type TrendDetectorParams ¶
type TrendDirection ¶
type TrendDirection int
const ( TrendDirectionNeutral TrendDirection = iota TrendDirectionUpward TrendDirectionDownward )
func (TrendDirection) String ¶
func (t TrendDirection) String() string
type Uint16Wrapper ¶
type Uint16Wrapper struct {
// contains filtered or unexported fields
}
TODO : use generic wrapper when updated to go 1.18
func (*Uint16Wrapper) Unwrap ¶
func (w *Uint16Wrapper) Unwrap(value uint16) int32
type VP8Munger ¶
type VP8Munger struct { VP8MungerParams // contains filtered or unexported fields }
func NewVP8Munger ¶
func (*VP8Munger) GetLast ¶
func (v *VP8Munger) GetLast() VP8MungerState
func (*VP8Munger) PictureIdOffset ¶
for testing only
func (*VP8Munger) SeedLast ¶
func (v *VP8Munger) SeedLast(state VP8MungerState)
func (*VP8Munger) UpdateAndGet ¶
func (v *VP8Munger) UpdateAndGet(extPkt *buffer.ExtPacket, ordering SequenceNumberOrdering, maxTemporalLayer int32) (*TranslationParamsVP8, error)
func (*VP8Munger) UpdateAndGetPadding ¶
func (*VP8Munger) UpdateOffsets ¶
type VP8MungerParams ¶
type VP8MungerParams struct {
// contains filtered or unexported fields
}
type VP8MungerState ¶
type VP8MungerState struct { ExtLastPictureId int32 PictureIdUsed int LastTl0PicIdx uint8 Tl0PicIdxUsed int TidUsed int LastKeyIdx uint8 KeyIdxUsed int }
func (VP8MungerState) String ¶
func (v VP8MungerState) String() string
type VP8PictureIdWrapHandler ¶
type VP8PictureIdWrapHandler struct {
// contains filtered or unexported fields
}
func (*VP8PictureIdWrapHandler) Init ¶
func (v *VP8PictureIdWrapHandler) Init(extPictureId int32, mBit bool)
func (*VP8PictureIdWrapHandler) MaxPictureId ¶
func (v *VP8PictureIdWrapHandler) MaxPictureId() int32
func (*VP8PictureIdWrapHandler) Unwrap ¶
func (v *VP8PictureIdWrapHandler) Unwrap(pictureId uint16, mBit bool) int32
unwrap picture id and update the maxPictureId. return unwrapped value
func (*VP8PictureIdWrapHandler) UpdateMaxPictureId ¶
func (v *VP8PictureIdWrapHandler) UpdateMaxPictureId(extPictureId int32, mBit bool)
type VideoAllocation ¶
type VideoAllocation struct {
// contains filtered or unexported fields
}
func (VideoAllocation) String ¶
func (v VideoAllocation) String() string
type VideoAllocationProvisional ¶
type VideoAllocationProvisional struct {
// contains filtered or unexported fields
}
type VideoAllocationState ¶
type VideoAllocationState int
const ( VideoAllocationStateNone VideoAllocationState = iota VideoAllocationStateMuted VideoAllocationStateFeedDry VideoAllocationStateAwaitingMeasurement VideoAllocationStateOptimal VideoAllocationStateDeficient )
func (VideoAllocationState) String ¶
func (v VideoAllocationState) String() string
type VideoLayers ¶
type VideoLayers = buffer.VideoLayer
-------------------------------------------------------------------
type VideoStreamingChange ¶
type VideoStreamingChange int
const ( VideoStreamingChangeNone VideoStreamingChange = iota VideoStreamingChangePausing VideoStreamingChangeResuming )
func (VideoStreamingChange) String ¶
func (v VideoStreamingChange) String() string
type VideoTransition ¶
type VideoTransition struct {
// contains filtered or unexported fields
}
func (VideoTransition) String ¶
func (v VideoTransition) String() string
type WebRTCReceiver ¶
type WebRTCReceiver struct {
// contains filtered or unexported fields
}
WebRTCReceiver receives a media track
func NewWebRTCReceiver ¶
func NewWebRTCReceiver( receiver *webrtc.RTPReceiver, track *webrtc.TrackRemote, trackInfo *livekit.TrackInfo, logger logger.Logger, twcc *twcc.Responder, opts ...ReceiverOpts, ) *WebRTCReceiver
NewWebRTCReceiver creates a new webrtc track receiver
func (*WebRTCReceiver) AddDownTrack ¶
func (w *WebRTCReceiver) AddDownTrack(track TrackSender) error
func (*WebRTCReceiver) AddUpTrack ¶
func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buffer)
func (*WebRTCReceiver) Codec ¶
func (w *WebRTCReceiver) Codec() webrtc.RTPCodecParameters
func (*WebRTCReceiver) DebugInfo ¶
func (w *WebRTCReceiver) DebugInfo() map[string]interface{}
func (*WebRTCReceiver) DeleteDownTrack ¶
func (w *WebRTCReceiver) DeleteDownTrack(subscriberID livekit.ParticipantID)
DeleteDownTrack removes a DownTrack from a Receiver
func (*WebRTCReceiver) GetAudioLevel ¶
func (w *WebRTCReceiver) GetAudioLevel() (float64, bool)
func (*WebRTCReceiver) GetConnectionScore ¶
func (w *WebRTCReceiver) GetConnectionScore() float32
func (*WebRTCReceiver) GetLayerDimension ¶
func (w *WebRTCReceiver) GetLayerDimension(layer int32) (uint32, uint32)
func (*WebRTCReceiver) GetLayeredBitrate ¶
func (w *WebRTCReceiver) GetLayeredBitrate() Bitrates
func (*WebRTCReceiver) GetPrimaryReceiverForRed ¶
func (w *WebRTCReceiver) GetPrimaryReceiverForRed() TrackReceiver
func (*WebRTCReceiver) GetRedReceiver ¶
func (w *WebRTCReceiver) GetRedReceiver() TrackReceiver
func (*WebRTCReceiver) GetTemporalLayerFpsForSpatial ¶
func (w *WebRTCReceiver) GetTemporalLayerFpsForSpatial(layer int32) []float32
func (*WebRTCReceiver) GetTrackStats ¶
func (w *WebRTCReceiver) GetTrackStats() *livekit.RTPStats
func (*WebRTCReceiver) HeaderExtensions ¶
func (w *WebRTCReceiver) HeaderExtensions() []webrtc.RTPHeaderExtensionParameter
func (*WebRTCReceiver) Kind ¶
func (w *WebRTCReceiver) Kind() webrtc.RTPCodecType
func (*WebRTCReceiver) OnCloseHandler ¶
func (w *WebRTCReceiver) OnCloseHandler(fn func())
OnCloseHandler method to be called on remote tracked removed
func (*WebRTCReceiver) OnMaxLayerChange ¶
func (w *WebRTCReceiver) OnMaxLayerChange(fn func(maxLayer int32))
func (*WebRTCReceiver) OnStatsUpdate ¶
func (w *WebRTCReceiver) OnStatsUpdate(fn func(w *WebRTCReceiver, stat *livekit.AnalyticsStat))
func (*WebRTCReceiver) SSRC ¶
func (w *WebRTCReceiver) SSRC(layer int) uint32
func (*WebRTCReceiver) SendPLI ¶
func (w *WebRTCReceiver) SendPLI(layer int32, force bool)
func (*WebRTCReceiver) SetMaxExpectedSpatialLayer ¶
func (w *WebRTCReceiver) SetMaxExpectedSpatialLayer(layer int32)
func (*WebRTCReceiver) SetRTCPCh ¶
func (w *WebRTCReceiver) SetRTCPCh(ch chan []rtcp.Packet)
func (*WebRTCReceiver) SetRTT ¶
func (w *WebRTCReceiver) SetRTT(rtt uint32)
func (*WebRTCReceiver) SetUpTrackPaused ¶
func (w *WebRTCReceiver) SetUpTrackPaused(paused bool)
SetUpTrackPaused indicates upstream will not be sending any data. this will reflect the "muted" status and will pause streamtracker to ensure we don't turn off the layer
func (*WebRTCReceiver) StreamID ¶
func (w *WebRTCReceiver) StreamID() string
func (*WebRTCReceiver) TrackID ¶
func (w *WebRTCReceiver) TrackID() livekit.TrackID
func (*WebRTCReceiver) TrackInfo ¶
func (w *WebRTCReceiver) TrackInfo() *livekit.TrackInfo