Documentation ¶
Overview ¶
Design of Prober
Probing is used to check for existence of excess channel capacity. This is especially useful in the downstream direction of SFU. SFU forwards audio/video streams from one or more publishers to all the subscribers. But, the downstream channel of a subscriber may not be big enough to carry all the streams. It is also a time varying quantity.
When there is not enough capacity, some streams will be paused. To resume a stream, SFU would need to know that the channel has enough capacity. That's where probing comes in. When conditions are favorable, SFU can send probe packets so that the bandwidth estimator has more data to estimate available channel capacity better. NOTE: What defines `favorable conditions` is implementation dependent.
There are two options for probing
- Use padding only RTP packets: This one is preferable as probe rate can be controlled more tightly.
- Resume a paused stream or forward a higher spatial layer: Have to find a stream at probing rate. Also, a stream could get a key frame unexpectedly boosting rate in the probing window.
The strategy used depends on stream allocator implementation. This module can be used if the stream allocator decides to use padding only RTP packets for probing purposes.
Implementation: There are a couple of options
- Check prober in the forwarding path (pull from prober). This is preferred for scalability reasons. But, this suffers from not being able to probe when all streams are paused (could be due to downstream bandwidth constraints or the corresponding upstream tracks may have paused due to upstream bandwidth constraints). Another issue is not being to have tight control on probing window boundary as the packet forwarding path may not have a packet to forward. But, it should not be a major concern as long as some stream(s) is/are forwarded as there should be a packet at least every 60 ms or so (forwarding only one stream at 15 fps). Usually, it will be serviced much more frequently when there are multiple streams getting forwarded.
- Run it a go routine. But, that would have to wake up very often to prevent bunching up of probe packets. So, a scalability concern as there is one prober per subscriber peer connection. But, probe windows should be very short (of the order of 100s of ms). So, this approach might be fine.
The implementation here follows the second approach of using a go routine.
Pacing: ------ Ideally, the subscriber peer connection should have a pacer which trickles data out at the estimated channel capacity rate (and estimated channel capacity + probing rate when actively probing).
But, there a few significant challenges
- Pacer will require buffering of forwarded packets. That means more memory, more CPU (have to make copy of packets) and more latency in the media stream.
- Scalability concern as SFU may be handling hundreds of subscriber peer connections and each one processing the pacing loop at 5ms interval will add up.
So, this module assumes that pacing is inherently provided by the publishers for media streams. That is a reasonable assumption given that publishing clients will run their own pacer and pacing data out at a steady rate.
A further assumption is that if there are multiple publishers for a subscriber peer connection, all the publishers are not pacing in sync, i.e. each publisher's pacer is completely independent and SFU will be receiving the media packets with a good spread and not clumped together.
Given those assumptions, this module monitors media send rate and adjusts probing packet sends accordingly. Although the probing may have a high enough wake up frequency, it is for short windows. For example, probing at 5 Mbps for 1/2 second and sending 1000 byte probe per iteration will wake up every 1.6 ms. That is very high, but should last for 1/2 second or so.
5 Mbps over 1/2 second = 2.5 Mbps 2.5 Mbps = 312500 bytes = 313 probes at 1000 byte probes 313 probes over 1/2 second = 1.6 ms between probes
A few things to note
- When a probe cluster is added, the expected media rate is provided. So, the wake up interval takes that into account. For example, if probing at 5 Mbps for 1/2 second and if 4 Mbps of it is expected to be provided by media traffic, the wake up interval becomes 8 ms.
- The amount of probing should actually be capped at some value to avoid too much self-induced congestion. It maybe something like 500 kbps. That will increase the wake up interval to 16 ms in the above example.
- In practice, the probing interval may also be shorter. Typically, it can be run for 2 - 3 RTTs to get a good measurement. For the longest hauls, RTT could be 250 ms or so leading to the probing window being long(ish). But, RTT should be much shorter especially if the subscriber peer connection of the client is able to connect to the nearest data center.
Index ¶
- Constants
- Variables
- func FixedPointToPercent(frac uint8) uint32
- type AddTrackParams
- type Bitrates
- type Cluster
- type DownTrack
- func (d *DownTrack) AddReceiverReportListener(listener ReceiverReportListener)
- func (d *DownTrack) Allocate(availableChannelCapacity int64, allowPause bool) VideoAllocation
- func (d *DownTrack) AllocateNextHigher() (VideoAllocation, bool)
- func (d *DownTrack) BandwidthRequested() int64
- func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, error)
- func (d *DownTrack) Close()
- func (d *DownTrack) Codec() webrtc.RTPCodecCapability
- func (d *DownTrack) CreateSenderReport() *rtcp.SenderReport
- func (d *DownTrack) CreateSourceDescriptionChunks() []rtcp.SourceDescriptionChunk
- func (d *DownTrack) CurrentMaxLossFraction() uint8
- func (d *DownTrack) DebugInfo() map[string]interface{}
- func (d *DownTrack) DistanceToDesired() int32
- func (d *DownTrack) FinalizeAllocate() VideoAllocation
- func (d *DownTrack) GetConnectionScore() float64
- func (d *DownTrack) GetForwardingStatus() ForwardingStatus
- func (d *DownTrack) ID() string
- func (d *DownTrack) IsDeficient() bool
- func (d *DownTrack) Kind() webrtc.RTPCodecType
- func (d *DownTrack) MaxLayers() VideoLayers
- func (d *DownTrack) Mute(val bool)
- func (d *DownTrack) OnAvailableLayersChanged(fn func(dt *DownTrack))
- func (d *DownTrack) OnBind(fn func())
- func (d *DownTrack) OnCloseHandler(fn func())
- func (d *DownTrack) OnPacketSent(fn func(dt *DownTrack, size int))
- func (d *DownTrack) OnPaddingSent(fn func(dt *DownTrack, size int))
- func (d *DownTrack) OnREMB(fn func(dt *DownTrack, remb *rtcp.ReceiverEstimatedMaximumBitrate))
- func (d *DownTrack) OnRTCP(fn func([]rtcp.Packet))
- func (d *DownTrack) OnSubscribedLayersChanged(fn func(dt *DownTrack, layers VideoLayers))
- func (d *DownTrack) OnSubscriptionChanged(fn func(dt *DownTrack))
- func (d *DownTrack) OnTransportCCFeedback(fn func(dt *DownTrack, cc *rtcp.TransportLayerCC))
- func (d *DownTrack) Pause() VideoAllocation
- func (d *DownTrack) PeerID() livekit.ParticipantID
- func (d *DownTrack) ProvisionalAllocate(availableChannelCapacity int64, layers VideoLayers, allowPause bool) int64
- func (d *DownTrack) ProvisionalAllocateCommit() VideoAllocation
- func (d *DownTrack) ProvisionalAllocateGetBestWeightedTransition() VideoTransition
- func (d *DownTrack) ProvisionalAllocateGetCooperativeTransition() VideoTransition
- func (d *DownTrack) ProvisionalAllocatePrepare()
- func (d *DownTrack) SSRC() uint32
- func (d *DownTrack) SetMaxSpatialLayer(spatialLayer int32)
- func (d *DownTrack) SetMaxTemporalLayer(temporalLayer int32)
- func (d *DownTrack) SetRTPHeaderExtensions(rtpHeaderExtensions []webrtc.RTPHeaderExtensionParameter)
- func (d *DownTrack) SetTransceiver(transceiver *webrtc.RTPTransceiver)
- func (d *DownTrack) Stop() error
- func (d *DownTrack) StreamID() string
- func (d *DownTrack) Unbind(_ webrtc.TrackLocalContext) error
- func (d *DownTrack) UpdatePaddingStats(packetLen uint32)
- func (d *DownTrack) UpdatePrimaryStats(packetLen uint32)
- func (d *DownTrack) UpdateRtxStats(packetLen uint32)
- func (d *DownTrack) UptrackLayersChange(availableLayers []uint16)
- func (d *DownTrack) WritePaddingRTP(bytesToSend int) int
- func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error
- type Event
- type Forwarder
- func (f *Forwarder) Allocate(availableChannelCapacity int64, allowPause bool, brs Bitrates) VideoAllocation
- func (f *Forwarder) AllocateNextHigher(brs Bitrates) (VideoAllocation, bool)
- func (f *Forwarder) BandwidthRequested() int64
- func (f *Forwarder) CurrentLayers() VideoLayers
- func (f *Forwarder) DistanceToDesired() int32
- func (f *Forwarder) FinalizeAllocate(brs Bitrates) VideoAllocation
- func (f *Forwarder) GetForwardingStatus() ForwardingStatus
- func (f *Forwarder) GetPaddingVP8(frameEndNeeded bool) *buffer.VP8
- func (f *Forwarder) GetRTPMungerParams() RTPMungerParams
- func (f *Forwarder) GetSnTsForBlankFrames() ([]SnTs, bool, error)
- func (f *Forwarder) GetSnTsForPadding(num int) ([]SnTs, error)
- func (f *Forwarder) GetTranslationParams(extPkt *buffer.ExtPacket, layer int32) (*TranslationParams, error)
- func (f *Forwarder) IsDeficient() bool
- func (f *Forwarder) MaxLayers() VideoLayers
- func (f *Forwarder) Mute(val bool) bool
- func (f *Forwarder) Muted() bool
- func (f *Forwarder) Pause(brs Bitrates) VideoAllocation
- func (f *Forwarder) ProvisionalAllocate(availableChannelCapacity int64, layers VideoLayers, allowPause bool) int64
- func (f *Forwarder) ProvisionalAllocateCommit() VideoAllocation
- func (f *Forwarder) ProvisionalAllocateGetBestWeightedTransition() VideoTransition
- func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition() VideoTransition
- func (f *Forwarder) ProvisionalAllocatePrepare(bitrates Bitrates)
- func (f *Forwarder) SetMaxSpatialLayer(spatialLayer int32) (bool, VideoLayers)
- func (f *Forwarder) SetMaxTemporalLayer(temporalLayer int32) (bool, VideoLayers)
- func (f *Forwarder) TargetLayers() VideoLayers
- func (f *Forwarder) UptrackLayersChange(availableLayers []uint16)
- type ForwardingStatus
- type MaxDistanceSorter
- type MinDistanceSorter
- type PacketStats
- type Prober
- type ProberParams
- type RTPMunger
- func (r *RTPMunger) GetParams() RTPMungerParams
- func (r *RTPMunger) IsOnFrameBoundary() bool
- func (r *RTPMunger) PacketDropped(extPkt *buffer.ExtPacket)
- func (r *RTPMunger) SetLastSnTs(extPkt *buffer.ExtPacket)
- func (r *RTPMunger) UpdateAndGetPaddingSnTs(num int, clockRate uint32, frameRate uint32, forceMarker bool) ([]SnTs, error)
- func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationParamsRTP, error)
- func (r *RTPMunger) UpdateSnTsOffsets(extPkt *buffer.ExtPacket, snAdjust uint16, tsAdjust uint32)
- type RTPMungerParams
- type Receiver
- type ReceiverOpts
- type ReceiverReportListener
- type SequenceNumberOrdering
- type Signal
- type SnTs
- type State
- type StreamAllocator
- type StreamAllocatorParams
- type StreamState
- type StreamStateInfo
- type StreamStateUpdate
- type StreamStatus
- type StreamTracker
- type Track
- func (t *Track) Allocate(availableChannelCapacity int64, allowPause bool) VideoAllocation
- func (t *Track) AllocateNextHigher() (VideoAllocation, bool)
- func (t *Track) BandwidthRequested() int64
- func (t *Track) DistanceToDesired() int32
- func (t *Track) DownTrack() *DownTrack
- func (t *Track) FinalizeAllocate()
- func (t *Track) GetPacketStats() (uint32, uint32)
- func (t *Track) ID() livekit.TrackID
- func (t *Track) IsDeficient() bool
- func (t *Track) IsManaged() bool
- func (t *Track) Pause() VideoAllocation
- func (t *Track) PeerID() livekit.ParticipantID
- func (t *Track) ProvisionalAllocate(availableChannelCapacity int64, layers VideoLayers, allowPause bool) int64
- func (t *Track) ProvisionalAllocateCommit() VideoAllocation
- func (t *Track) ProvisionalAllocateGetBestWeightedTransition() VideoTransition
- func (t *Track) ProvisionalAllocateGetCooperativeTransition() VideoTransition
- func (t *Track) ProvisionalAllocatePrepare()
- func (t *Track) UpdateMaxLayers(layers VideoLayers)
- func (t *Track) UpdatePacketStats(rr *rtcp.ReceiverReport)
- func (t *Track) WritePaddingRTP(bytesToSend int) int
- type TrackReceiver
- type TrackSender
- type TrackSorter
- type TranslationParams
- type TranslationParamsRTP
- type TranslationParamsVP8
- type VP8Munger
- func (v *VP8Munger) PictureIdOffset(extPictureId int32) (int32, bool)
- func (v *VP8Munger) SetLast(extPkt *buffer.ExtPacket)
- func (v *VP8Munger) UpdateAndGet(extPkt *buffer.ExtPacket, ordering SequenceNumberOrdering, ...) (*TranslationParamsVP8, error)
- func (v *VP8Munger) UpdateAndGetPadding(newPicture bool) *buffer.VP8
- func (v *VP8Munger) UpdateOffsets(extPkt *buffer.ExtPacket)
- type VP8MungerParams
- type VP8PictureIdWrapHandler
- type VideoAllocation
- type VideoAllocationProvisional
- type VideoAllocationState
- type VideoLayers
- type VideoStreamingChange
- type VideoTransition
- type WebRTCReceiver
- func (w *WebRTCReceiver) AddDownTrack(track TrackSender)
- func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buffer)
- func (w *WebRTCReceiver) Codec() webrtc.RTPCodecCapability
- func (w *WebRTCReceiver) DebugInfo() map[string]interface{}
- func (w *WebRTCReceiver) DeleteDownTrack(peerID livekit.ParticipantID)
- func (w *WebRTCReceiver) GetBitrateTemporalCumulative() Bitrates
- func (w *WebRTCReceiver) GetSenderReportTime(layer int32) (rtpTS uint32, ntpTS uint64)
- func (w *WebRTCReceiver) Kind() webrtc.RTPCodecType
- func (w *WebRTCReceiver) NumAvailableSpatialLayers() int
- func (w *WebRTCReceiver) OnCloseHandler(fn func())
- func (w *WebRTCReceiver) ReadRTP(buf []byte, layer uint8, sn uint16) (int, error)
- func (w *WebRTCReceiver) SSRC(layer int) uint32
- func (w *WebRTCReceiver) SendPLI(layer int32)
- func (w *WebRTCReceiver) SendRTCP(p []rtcp.Packet)
- func (w *WebRTCReceiver) SetMaxExpectedSpatialLayer(layer int32)
- func (w *WebRTCReceiver) SetRTCPCh(ch chan []rtcp.Packet)
- func (w *WebRTCReceiver) SetTrackMeta(trackID livekit.TrackID, streamID string)
- func (w *WebRTCReceiver) SetUpTrackPaused(paused bool)
- func (w *WebRTCReceiver) StreamID() string
- func (w *WebRTCReceiver) TrackID() livekit.TrackID
Constants ¶
const ( RTPPaddingMaxPayloadSize = 255 RTPPaddingEstimatedHeaderSize = 20 RTPBlankFramesMax = 6 )
const ( InvalidLayerSpatial = int32(-1) InvalidLayerTemporal = int32(-1) DefaultMaxLayerSpatial = int32(2) DefaultMaxLayerTemporal = int32(3) )
const ( QuarterResolution = "q" HalfResolution = "h" FullResolution = "f" )
const ( ChannelCapacityInfinity = 100 * 1000 * 1000 // 100 Mbps EstimateEpsilon = 2000 // 2 kbps GratuitousProbeHeadroomBps = 1 * 1000 * 1000 // if headroom > 1 Mbps, don't probe GratuitousProbePct = 10 GratuitousProbeMinBps = 100 * 1000 // 100 kbps GratuitousProbeMaxBps = 300 * 1000 // 300 kbps GratuitousProbeMinDuration = 500 * time.Millisecond GratuitousProbeMaxDuration = 600 * time.Millisecond AudioLossWeight = 0.75 VideoLossWeight = 0.25 // LK-TODO-START // These constants will definitely require more tweaking. // In fact, simple time threshold rules most probably will not be enough. // LK-TODO-END EstimateCommit = 2 * 1000 * time.Millisecond // 2 seconds ProbeWait = 8 * 1000 * time.Millisecond // 8 seconds BoostWait = 5 * 1000 * time.Millisecond // 5 seconds GratuitousProbeWait = 8 * 1000 * time.Millisecond // 8 seconds GratuitousProbeMoreWait = 5 * 1000 * time.Millisecond // 5 seconds )
const (
FlagPauseOnDowngrade = false
)
Forwarder
const (
TransitionCostSpatial = 10
)
Variables ¶
var ( ErrUnknownKind = errors.New("unknown kind of codec") ErrOutOfOrderSequenceNumberCacheMiss = errors.New("out-of-order sequence number not found in cache") ErrPaddingOnlyPacket = errors.New("padding only packet that need not be forwarded") ErrDuplicatePacket = errors.New("duplicate packet") ErrPaddingNotOnFrameBoundary = errors.New("padding cannot send on non-frame boundary") ErrNotVP8 = errors.New("not VP8") ErrOutOfOrderVP8PictureIdCacheMiss = errors.New("out-of-order VP8 picture id not found in cache") ErrFilteredVP8TemporalLayer = errors.New("filtered VP8 temporal layer") )
var ( VP8KeyFrame1x1 = []byte{0x10, 0x02, 0x00, 0x9d, 0x01, 0x2a, 0x01, 0x00, 0x01, 0x00, 0x0b, 0xc7, 0x08, 0x85, 0x85, 0x88, 0x85, 0x84, 0x88, 0x3f, 0x82, 0x00, 0x0c, 0x0d, 0x60, 0x00, 0xfe, 0xe6, 0xb5, 0x00} H264KeyFrame2x2SPS = []byte{0x67, 0x42, 0xc0, 0x1f, 0x0f, 0xd9, 0x1f, 0x88, 0x88, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04, 0x00, 0x00, 0x03, 0x00, 0xc8, 0x3c, 0x60, 0xc9, 0x20} H264KeyFrame2x2PPS = []byte{0x68, 0x87, 0xcb, 0x83, 0xcb, 0x20} H264KeyFrame2x2IDR = []byte{0x65, 0x88, 0x84, 0x0a, 0xf2, 0x62, 0x80, 0x00, 0xa7, 0xbe} H264KeyFrame2x2 = [][]byte{H264KeyFrame2x2SPS, H264KeyFrame2x2PPS, H264KeyFrame2x2IDR} )
var ( InvalidLayers = VideoLayers{ // contains filtered or unexported fields } DefaultMaxLayers = VideoLayers{ // contains filtered or unexported fields } )
var Logger = logr.Discard()
Logger is an implementation of logr.Logger. If is not provided - will be turned off.
var (
PacketFactory *sync.Pool
)
var ( VideoAllocationDefault = VideoAllocation{ // contains filtered or unexported fields } )
Functions ¶
func FixedPointToPercent ¶ added in v0.15.0
converts a fixed point number to the number part of %
Types ¶
type AddTrackParams ¶ added in v0.15.2
type AddTrackParams struct { Source livekit.TrackSource IsSimulcast bool }
type Bitrates ¶ added in v0.15.0
type Bitrates [DefaultMaxLayerSpatial + 1][DefaultMaxLayerTemporal + 1]int64
type Cluster ¶
type Cluster struct {
// contains filtered or unexported fields
}
func NewCluster ¶
func (*Cluster) GetSleepDuration ¶
func (*Cluster) IsFinished ¶ added in v0.15.0
func (*Cluster) PacketSent ¶
type DownTrack ¶
type DownTrack struct {
// contains filtered or unexported fields
}
DownTrack implements TrackLocal, is the track used to write packets to SFU Subscriber, the track handle the packets for simple, simulcast and SVC Publisher.
func NewDownTrack ¶
func NewDownTrack(c webrtc.RTPCodecCapability, r TrackReceiver, bf *buffer.Factory, peerID livekit.ParticipantID, mt int) (*DownTrack, error)
NewDownTrack returns a DownTrack.
func (*DownTrack) AddReceiverReportListener ¶
func (d *DownTrack) AddReceiverReportListener(listener ReceiverReportListener)
func (*DownTrack) Allocate ¶ added in v0.15.0
func (d *DownTrack) Allocate(availableChannelCapacity int64, allowPause bool) VideoAllocation
func (*DownTrack) AllocateNextHigher ¶ added in v0.15.0
func (d *DownTrack) AllocateNextHigher() (VideoAllocation, bool)
func (*DownTrack) BandwidthRequested ¶ added in v0.15.0
func (*DownTrack) Bind ¶
Bind is called by the PeerConnection after negotiation is complete This asserts that the code requested is supported by the remote peer. If so it sets up all the state (SSRC and PayloadType) to have a call
func (*DownTrack) Codec ¶
func (d *DownTrack) Codec() webrtc.RTPCodecCapability
Codec returns current track codec capability
func (*DownTrack) CreateSenderReport ¶
func (d *DownTrack) CreateSenderReport() *rtcp.SenderReport
func (*DownTrack) CreateSourceDescriptionChunks ¶
func (d *DownTrack) CreateSourceDescriptionChunks() []rtcp.SourceDescriptionChunk
func (*DownTrack) CurrentMaxLossFraction ¶
func (*DownTrack) DistanceToDesired ¶ added in v0.15.0
func (*DownTrack) FinalizeAllocate ¶ added in v0.15.0
func (d *DownTrack) FinalizeAllocate() VideoAllocation
func (*DownTrack) GetConnectionScore ¶ added in v0.15.0
func (*DownTrack) GetForwardingStatus ¶ added in v0.15.0
func (d *DownTrack) GetForwardingStatus() ForwardingStatus
func (*DownTrack) ID ¶
ID is the unique identifier for this Track. This should be unique for the stream, but doesn't have to globally unique. A common example would be 'audio' or 'video' and StreamID would be 'desktop' or 'webcam'
func (*DownTrack) IsDeficient ¶ added in v0.15.0
func (*DownTrack) Kind ¶
func (d *DownTrack) Kind() webrtc.RTPCodecType
Kind controls if this TrackLocal is audio or video
func (*DownTrack) MaxLayers ¶ added in v0.15.0
func (d *DownTrack) MaxLayers() VideoLayers
func (*DownTrack) OnAvailableLayersChanged ¶
func (*DownTrack) OnCloseHandler ¶
func (d *DownTrack) OnCloseHandler(fn func())
OnCloseHandler method to be called on remote tracked removed
func (*DownTrack) OnPacketSent ¶
func (*DownTrack) OnPaddingSent ¶ added in v0.15.0
func (*DownTrack) OnREMB ¶
func (d *DownTrack) OnREMB(fn func(dt *DownTrack, remb *rtcp.ReceiverEstimatedMaximumBitrate))
func (*DownTrack) OnSubscribedLayersChanged ¶ added in v0.15.0
func (d *DownTrack) OnSubscribedLayersChanged(fn func(dt *DownTrack, layers VideoLayers))
func (*DownTrack) OnSubscriptionChanged ¶
func (*DownTrack) OnTransportCCFeedback ¶ added in v0.15.2
func (d *DownTrack) OnTransportCCFeedback(fn func(dt *DownTrack, cc *rtcp.TransportLayerCC))
func (*DownTrack) Pause ¶ added in v0.15.0
func (d *DownTrack) Pause() VideoAllocation
func (*DownTrack) PeerID ¶ added in v0.15.0
func (d *DownTrack) PeerID() livekit.ParticipantID
func (*DownTrack) ProvisionalAllocate ¶ added in v0.15.0
func (d *DownTrack) ProvisionalAllocate(availableChannelCapacity int64, layers VideoLayers, allowPause bool) int64
func (*DownTrack) ProvisionalAllocateCommit ¶ added in v0.15.0
func (d *DownTrack) ProvisionalAllocateCommit() VideoAllocation
func (*DownTrack) ProvisionalAllocateGetBestWeightedTransition ¶ added in v0.15.0
func (d *DownTrack) ProvisionalAllocateGetBestWeightedTransition() VideoTransition
func (*DownTrack) ProvisionalAllocateGetCooperativeTransition ¶ added in v0.15.0
func (d *DownTrack) ProvisionalAllocateGetCooperativeTransition() VideoTransition
func (*DownTrack) ProvisionalAllocatePrepare ¶ added in v0.15.0
func (d *DownTrack) ProvisionalAllocatePrepare()
func (*DownTrack) SetMaxSpatialLayer ¶ added in v0.15.0
func (*DownTrack) SetMaxTemporalLayer ¶ added in v0.15.0
func (*DownTrack) SetRTPHeaderExtensions ¶
func (d *DownTrack) SetRTPHeaderExtensions(rtpHeaderExtensions []webrtc.RTPHeaderExtensionParameter)
Sets RTP header extensions for this track
func (*DownTrack) SetTransceiver ¶
func (d *DownTrack) SetTransceiver(transceiver *webrtc.RTPTransceiver)
func (*DownTrack) Unbind ¶
Unbind implements the teardown logic when the track is no longer needed. This happens because a track has been stopped.
func (*DownTrack) UpdatePaddingStats ¶ added in v0.15.0
func (*DownTrack) UpdatePrimaryStats ¶ added in v0.15.0
func (*DownTrack) UpdateRtxStats ¶ added in v0.15.0
func (*DownTrack) UptrackLayersChange ¶
func (*DownTrack) WritePaddingRTP ¶
WritePaddingRTP tries to write as many padding only RTP packets as necessary to satisfy given size to the DownTrack
type Forwarder ¶ added in v0.15.0
type Forwarder struct {
// contains filtered or unexported fields
}
func NewForwarder ¶ added in v0.15.0
func NewForwarder(codec webrtc.RTPCodecCapability, kind webrtc.RTPCodecType) *Forwarder
func (*Forwarder) Allocate ¶ added in v0.15.0
func (f *Forwarder) Allocate(availableChannelCapacity int64, allowPause bool, brs Bitrates) VideoAllocation
func (*Forwarder) AllocateNextHigher ¶ added in v0.15.0
func (f *Forwarder) AllocateNextHigher(brs Bitrates) (VideoAllocation, bool)
func (*Forwarder) BandwidthRequested ¶ added in v0.15.0
func (*Forwarder) CurrentLayers ¶ added in v0.15.0
func (f *Forwarder) CurrentLayers() VideoLayers
func (*Forwarder) DistanceToDesired ¶ added in v0.15.0
func (*Forwarder) FinalizeAllocate ¶ added in v0.15.0
func (f *Forwarder) FinalizeAllocate(brs Bitrates) VideoAllocation
func (*Forwarder) GetForwardingStatus ¶ added in v0.15.0
func (f *Forwarder) GetForwardingStatus() ForwardingStatus
func (*Forwarder) GetPaddingVP8 ¶ added in v0.15.0
func (*Forwarder) GetRTPMungerParams ¶ added in v0.15.0
func (f *Forwarder) GetRTPMungerParams() RTPMungerParams
func (*Forwarder) GetSnTsForBlankFrames ¶ added in v0.15.0
func (*Forwarder) GetSnTsForPadding ¶ added in v0.15.0
func (*Forwarder) GetTranslationParams ¶ added in v0.15.0
func (*Forwarder) IsDeficient ¶ added in v0.15.0
func (*Forwarder) MaxLayers ¶ added in v0.15.0
func (f *Forwarder) MaxLayers() VideoLayers
func (*Forwarder) Pause ¶ added in v0.15.0
func (f *Forwarder) Pause(brs Bitrates) VideoAllocation
func (*Forwarder) ProvisionalAllocate ¶ added in v0.15.0
func (f *Forwarder) ProvisionalAllocate(availableChannelCapacity int64, layers VideoLayers, allowPause bool) int64
func (*Forwarder) ProvisionalAllocateCommit ¶ added in v0.15.0
func (f *Forwarder) ProvisionalAllocateCommit() VideoAllocation
func (*Forwarder) ProvisionalAllocateGetBestWeightedTransition ¶ added in v0.15.0
func (f *Forwarder) ProvisionalAllocateGetBestWeightedTransition() VideoTransition
func (*Forwarder) ProvisionalAllocateGetCooperativeTransition ¶ added in v0.15.0
func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition() VideoTransition
func (*Forwarder) ProvisionalAllocatePrepare ¶ added in v0.15.0
func (*Forwarder) SetMaxSpatialLayer ¶ added in v0.15.0
func (f *Forwarder) SetMaxSpatialLayer(spatialLayer int32) (bool, VideoLayers)
func (*Forwarder) SetMaxTemporalLayer ¶ added in v0.15.0
func (f *Forwarder) SetMaxTemporalLayer(temporalLayer int32) (bool, VideoLayers)
func (*Forwarder) TargetLayers ¶ added in v0.15.0
func (f *Forwarder) TargetLayers() VideoLayers
func (*Forwarder) UptrackLayersChange ¶ added in v0.15.0
type ForwardingStatus ¶ added in v0.15.0
type ForwardingStatus int
const ( ForwardingStatusOff ForwardingStatus = iota ForwardingStatusPartial ForwardingStatusOptimal )
type MaxDistanceSorter ¶ added in v0.15.0
type MaxDistanceSorter []*Track
func (MaxDistanceSorter) Len ¶ added in v0.15.0
func (m MaxDistanceSorter) Len() int
func (MaxDistanceSorter) Less ¶ added in v0.15.0
func (m MaxDistanceSorter) Less(i, j int) bool
func (MaxDistanceSorter) Swap ¶ added in v0.15.0
func (m MaxDistanceSorter) Swap(i, j int)
type MinDistanceSorter ¶ added in v0.15.0
type MinDistanceSorter []*Track
func (MinDistanceSorter) Len ¶ added in v0.15.0
func (m MinDistanceSorter) Len() int
func (MinDistanceSorter) Less ¶ added in v0.15.0
func (m MinDistanceSorter) Less(i, j int) bool
func (MinDistanceSorter) Swap ¶ added in v0.15.0
func (m MinDistanceSorter) Swap(i, j int)
type PacketStats ¶ added in v0.15.0
type PacketStats struct {
// contains filtered or unexported fields
}
type Prober ¶
type Prober struct {
// contains filtered or unexported fields
}
func NewProber ¶
func NewProber(params ProberParams) *Prober
func (*Prober) AddCluster ¶
func (*Prober) OnSendProbe ¶
func (*Prober) PacketSent ¶
type ProberParams ¶ added in v0.15.0
type RTPMunger ¶ added in v0.15.0
type RTPMunger struct {
RTPMungerParams
}
func NewRTPMunger ¶ added in v0.15.0
func NewRTPMunger() *RTPMunger
func (*RTPMunger) GetParams ¶ added in v0.15.0
func (r *RTPMunger) GetParams() RTPMungerParams
func (*RTPMunger) IsOnFrameBoundary ¶ added in v0.15.0
func (*RTPMunger) PacketDropped ¶ added in v0.15.0
func (*RTPMunger) SetLastSnTs ¶ added in v0.15.0
func (*RTPMunger) UpdateAndGetPaddingSnTs ¶ added in v0.15.0
func (*RTPMunger) UpdateAndGetSnTs ¶ added in v0.15.0
func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationParamsRTP, error)
type RTPMungerParams ¶ added in v0.15.0
type RTPMungerParams struct {
// contains filtered or unexported fields
}
type Receiver ¶
type Receiver interface { TrackID() livekit.TrackID StreamID() string Codec() webrtc.RTPCodecCapability AddUpTrack(track *webrtc.TrackRemote, buffer *buffer.Buffer) AddDownTrack(track TrackSender) SetUpTrackPaused(paused bool) SetMaxExpectedSpatialLayer(layer int32) NumAvailableSpatialLayers() int GetBitrateTemporalCumulative() Bitrates ReadRTP(buf []byte, layer uint8, sn uint16) (int, error) DeleteDownTrack(peerID livekit.ParticipantID) OnCloseHandler(fn func()) SendPLI(layer int32) SetRTCPCh(ch chan []rtcp.Packet) GetSenderReportTime(layer int32) (rtpTS uint32, ntpTS uint64) DebugInfo() map[string]interface{} }
Receiver defines an interface for a track receivers
func NewWebRTCReceiver ¶
func NewWebRTCReceiver(receiver *webrtc.RTPReceiver, track *webrtc.TrackRemote, pid livekit.ParticipantID, opts ...ReceiverOpts) Receiver
NewWebRTCReceiver creates a new webrtc track receivers
type ReceiverOpts ¶
type ReceiverOpts func(w *WebRTCReceiver) *WebRTCReceiver
func WithLoadBalanceThreshold ¶
func WithLoadBalanceThreshold(downTracks int) ReceiverOpts
WithLoadBalanceThreshold enables parallelization of packet writes when downTracks exceeds threshold Value should be between 3 and 150. For a server handling a few large rooms, use a smaller value (required to handle very large (250+ participant) rooms). For a server handling many small rooms, use a larger value or disable. Set to 0 (disabled) by default.
func WithPliThrottle ¶
func WithPliThrottle(period int64) ReceiverOpts
WithPliThrottle indicates minimum time(ms) between sending PLIs
func WithStreamTrackers ¶
func WithStreamTrackers() ReceiverOpts
WithStreamTrackers enables StreamTracker use for simulcast
type ReceiverReportListener ¶
type ReceiverReportListener func(dt *DownTrack, report *rtcp.ReceiverReport)
type SequenceNumberOrdering ¶
type SequenceNumberOrdering int
RTPMunger
const ( SequenceNumberOrderingContiguous SequenceNumberOrdering = iota SequenceNumberOrderingOutOfOrder SequenceNumberOrderingGap SequenceNumberOrderingDuplicate )
type StreamAllocator ¶
type StreamAllocator struct {
// contains filtered or unexported fields
}
func NewStreamAllocator ¶
func NewStreamAllocator(params StreamAllocatorParams) *StreamAllocator
func (*StreamAllocator) AddTrack ¶
func (s *StreamAllocator) AddTrack(downTrack *DownTrack, params AddTrackParams)
func (*StreamAllocator) OnStreamStateChange ¶ added in v0.15.0
func (s *StreamAllocator) OnStreamStateChange(f func(update *StreamStateUpdate) error)
func (*StreamAllocator) RemoveTrack ¶
func (s *StreamAllocator) RemoveTrack(downTrack *DownTrack)
func (*StreamAllocator) Start ¶
func (s *StreamAllocator) Start()
func (*StreamAllocator) Stop ¶
func (s *StreamAllocator) Stop()
type StreamAllocatorParams ¶ added in v0.15.0
type StreamAllocatorParams struct { Config config.CongestionControlConfig Logger logger.Logger }
type StreamState ¶ added in v0.15.0
type StreamState int
const ( StreamStateActive StreamState = iota StreamStatePaused )
type StreamStateInfo ¶ added in v0.15.0
type StreamStateInfo struct { ParticipantID livekit.ParticipantID TrackID livekit.TrackID State StreamState }
type StreamStateUpdate ¶ added in v0.15.0
type StreamStateUpdate struct {
StreamStates []*StreamStateInfo
}
func NewStreamStateUpdate ¶ added in v0.15.0
func NewStreamStateUpdate() *StreamStateUpdate
func (*StreamStateUpdate) Empty ¶ added in v0.15.0
func (s *StreamStateUpdate) Empty() bool
func (*StreamStateUpdate) HandleStreamingChange ¶ added in v0.15.0
func (s *StreamStateUpdate) HandleStreamingChange(change VideoStreamingChange, track *Track)
type StreamStatus ¶
type StreamStatus int32
const ( StreamStatusStopped StreamStatus = 0 StreamStatusActive StreamStatus = 1 )
func (StreamStatus) String ¶
func (s StreamStatus) String() string
type StreamTracker ¶
type StreamTracker struct {
// contains filtered or unexported fields
}
StreamTracker keeps track of packet flow and ensures a particular uptrack is consistently producing It runs its own goroutine for detection, and fires OnStatusChanged callback
func NewStreamTracker ¶
func NewStreamTracker(samplesRequired uint32, cyclesRequired uint64, cycleDuration time.Duration) *StreamTracker
func (*StreamTracker) Observe ¶
func (s *StreamTracker) Observe(sn uint16)
Observe a packet that's received
func (*StreamTracker) OnStatusChanged ¶
func (s *StreamTracker) OnStatusChanged(f func(status StreamStatus))
func (*StreamTracker) Reset ¶ added in v0.15.2
func (s *StreamTracker) Reset()
func (*StreamTracker) SetPaused ¶
func (s *StreamTracker) SetPaused(paused bool)
func (*StreamTracker) Start ¶
func (s *StreamTracker) Start()
func (*StreamTracker) Status ¶
func (s *StreamTracker) Status() StreamStatus
func (*StreamTracker) Stop ¶
func (s *StreamTracker) Stop()
type Track ¶
type Track struct {
// contains filtered or unexported fields
}
func (*Track) Allocate ¶ added in v0.15.0
func (t *Track) Allocate(availableChannelCapacity int64, allowPause bool) VideoAllocation
func (*Track) AllocateNextHigher ¶ added in v0.15.0
func (t *Track) AllocateNextHigher() (VideoAllocation, bool)
func (*Track) BandwidthRequested ¶
func (*Track) DistanceToDesired ¶ added in v0.15.0
func (*Track) FinalizeAllocate ¶ added in v0.15.0
func (t *Track) FinalizeAllocate()
func (*Track) GetPacketStats ¶
func (*Track) IsDeficient ¶ added in v0.15.0
func (*Track) Pause ¶ added in v0.15.0
func (t *Track) Pause() VideoAllocation
func (*Track) PeerID ¶ added in v0.15.0
func (t *Track) PeerID() livekit.ParticipantID
func (*Track) ProvisionalAllocate ¶ added in v0.15.0
func (t *Track) ProvisionalAllocate(availableChannelCapacity int64, layers VideoLayers, allowPause bool) int64
func (*Track) ProvisionalAllocateCommit ¶ added in v0.15.0
func (t *Track) ProvisionalAllocateCommit() VideoAllocation
func (*Track) ProvisionalAllocateGetBestWeightedTransition ¶ added in v0.15.0
func (t *Track) ProvisionalAllocateGetBestWeightedTransition() VideoTransition
func (*Track) ProvisionalAllocateGetCooperativeTransition ¶ added in v0.15.0
func (t *Track) ProvisionalAllocateGetCooperativeTransition() VideoTransition
func (*Track) ProvisionalAllocatePrepare ¶ added in v0.15.0
func (t *Track) ProvisionalAllocatePrepare()
func (*Track) UpdateMaxLayers ¶ added in v0.15.0
func (t *Track) UpdateMaxLayers(layers VideoLayers)
func (*Track) UpdatePacketStats ¶
func (t *Track) UpdatePacketStats(rr *rtcp.ReceiverReport)
LK-TODO this should probably be maintained in downTrack and this module can query what it needs
func (*Track) WritePaddingRTP ¶
type TrackReceiver ¶ added in v0.15.0
type TrackReceiver interface { TrackID() livekit.TrackID StreamID() string GetBitrateTemporalCumulative() Bitrates ReadRTP(buf []byte, layer uint8, sn uint16) (int, error) AddDownTrack(track TrackSender) DeleteDownTrack(peerID livekit.ParticipantID) SendPLI(layer int32) GetSenderReportTime(layer int32) (rtpTS uint32, ntpTS uint64) Codec() webrtc.RTPCodecCapability }
TrackReceiver defines an interface receive media from remote peer
type TrackSender ¶ added in v0.15.0
type TrackSender interface { UptrackLayersChange(availableLayers []uint16) WriteRTP(p *buffer.ExtPacket, layer int32) error Close() // ID is the globally unique identifier for this Track. ID() string Codec() webrtc.RTPCodecCapability PeerID() livekit.ParticipantID }
TrackSender defines an interface send media to remote peer
type TrackSorter ¶ added in v0.15.0
type TrackSorter []*Track
func (TrackSorter) Len ¶ added in v0.15.0
func (t TrackSorter) Len() int
func (TrackSorter) Less ¶ added in v0.15.0
func (t TrackSorter) Less(i, j int) bool
func (TrackSorter) Swap ¶ added in v0.15.0
func (t TrackSorter) Swap(i, j int)
type TranslationParams ¶ added in v0.15.0
type TranslationParams struct {
// contains filtered or unexported fields
}
type TranslationParamsRTP ¶ added in v0.15.0
type TranslationParamsRTP struct {
// contains filtered or unexported fields
}
type TranslationParamsVP8 ¶ added in v0.15.0
type TranslationParamsVP8 struct {
// contains filtered or unexported fields
}
VP8 munger
type VP8Munger ¶
type VP8Munger struct {
VP8MungerParams
}
func NewVP8Munger ¶
func NewVP8Munger() *VP8Munger
func (*VP8Munger) PictureIdOffset ¶ added in v0.15.0
for testing only
func (*VP8Munger) UpdateAndGet ¶
func (v *VP8Munger) UpdateAndGet(extPkt *buffer.ExtPacket, ordering SequenceNumberOrdering, maxTemporalLayer int32) (*TranslationParamsVP8, error)
func (*VP8Munger) UpdateAndGetPadding ¶
func (*VP8Munger) UpdateOffsets ¶
type VP8MungerParams ¶
type VP8MungerParams struct {
// contains filtered or unexported fields
}
type VP8PictureIdWrapHandler ¶
type VP8PictureIdWrapHandler struct {
// contains filtered or unexported fields
}
func (*VP8PictureIdWrapHandler) Init ¶
func (v *VP8PictureIdWrapHandler) Init(extPictureId int32, mBit bool)
func (*VP8PictureIdWrapHandler) MaxPictureId ¶
func (v *VP8PictureIdWrapHandler) MaxPictureId() int32
func (*VP8PictureIdWrapHandler) Unwrap ¶
func (v *VP8PictureIdWrapHandler) Unwrap(pictureId uint16, mBit bool) int32
unwrap picture id and update the maxPictureId. return unwrapped value
func (*VP8PictureIdWrapHandler) UpdateMaxPictureId ¶
func (v *VP8PictureIdWrapHandler) UpdateMaxPictureId(extPictureId int32, mBit bool)
type VideoAllocation ¶ added in v0.15.0
type VideoAllocation struct {
// contains filtered or unexported fields
}
func (VideoAllocation) String ¶ added in v0.15.0
func (v VideoAllocation) String() string
type VideoAllocationProvisional ¶ added in v0.15.0
type VideoAllocationProvisional struct {
// contains filtered or unexported fields
}
type VideoAllocationState ¶ added in v0.15.0
type VideoAllocationState int
const ( VideoAllocationStateNone VideoAllocationState = iota VideoAllocationStateMuted VideoAllocationStateFeedDry VideoAllocationStateAwaitingMeasurement VideoAllocationStateOptimal VideoAllocationStateDeficient )
func (VideoAllocationState) String ¶ added in v0.15.0
func (v VideoAllocationState) String() string
type VideoLayers ¶ added in v0.15.0
type VideoLayers struct {
// contains filtered or unexported fields
}
func (VideoLayers) GreaterThan ¶ added in v0.15.0
func (v VideoLayers) GreaterThan(v2 VideoLayers) bool
func (VideoLayers) String ¶ added in v0.15.0
func (v VideoLayers) String() string
type VideoStreamingChange ¶ added in v0.15.0
type VideoStreamingChange int
const ( VideoStreamingChangeNone VideoStreamingChange = iota VideoStreamingChangePausing VideoStreamingChangeResuming )
func (VideoStreamingChange) String ¶ added in v0.15.0
func (v VideoStreamingChange) String() string
type VideoTransition ¶ added in v0.15.0
type VideoTransition struct {
// contains filtered or unexported fields
}
type WebRTCReceiver ¶
type WebRTCReceiver struct {
// contains filtered or unexported fields
}
WebRTCReceiver receives a video track
func (*WebRTCReceiver) AddDownTrack ¶
func (w *WebRTCReceiver) AddDownTrack(track TrackSender)
func (*WebRTCReceiver) AddUpTrack ¶
func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buffer)
func (*WebRTCReceiver) Codec ¶
func (w *WebRTCReceiver) Codec() webrtc.RTPCodecCapability
func (*WebRTCReceiver) DebugInfo ¶
func (w *WebRTCReceiver) DebugInfo() map[string]interface{}
func (*WebRTCReceiver) DeleteDownTrack ¶
func (w *WebRTCReceiver) DeleteDownTrack(peerID livekit.ParticipantID)
DeleteDownTrack removes a DownTrack from a Receiver
func (*WebRTCReceiver) GetBitrateTemporalCumulative ¶
func (w *WebRTCReceiver) GetBitrateTemporalCumulative() Bitrates
func (*WebRTCReceiver) GetSenderReportTime ¶
func (w *WebRTCReceiver) GetSenderReportTime(layer int32) (rtpTS uint32, ntpTS uint64)
func (*WebRTCReceiver) Kind ¶
func (w *WebRTCReceiver) Kind() webrtc.RTPCodecType
func (*WebRTCReceiver) NumAvailableSpatialLayers ¶ added in v0.15.0
func (w *WebRTCReceiver) NumAvailableSpatialLayers() int
func (*WebRTCReceiver) OnCloseHandler ¶
func (w *WebRTCReceiver) OnCloseHandler(fn func())
OnCloseHandler method to be called on remote tracked removed
func (*WebRTCReceiver) SSRC ¶
func (w *WebRTCReceiver) SSRC(layer int) uint32
func (*WebRTCReceiver) SendPLI ¶ added in v0.15.0
func (w *WebRTCReceiver) SendPLI(layer int32)
func (*WebRTCReceiver) SendRTCP ¶
func (w *WebRTCReceiver) SendRTCP(p []rtcp.Packet)
func (*WebRTCReceiver) SetMaxExpectedSpatialLayer ¶ added in v0.15.2
func (w *WebRTCReceiver) SetMaxExpectedSpatialLayer(layer int32)
func (*WebRTCReceiver) SetRTCPCh ¶
func (w *WebRTCReceiver) SetRTCPCh(ch chan []rtcp.Packet)
func (*WebRTCReceiver) SetTrackMeta ¶
func (w *WebRTCReceiver) SetTrackMeta(trackID livekit.TrackID, streamID string)
func (*WebRTCReceiver) SetUpTrackPaused ¶
func (w *WebRTCReceiver) SetUpTrackPaused(paused bool)
SetUpTrackPaused indicates upstream will not be sending any data. this will reflect the "muted" status and will pause streamtracker to ensure we don't turn off the layer
func (*WebRTCReceiver) StreamID ¶
func (w *WebRTCReceiver) StreamID() string
func (*WebRTCReceiver) TrackID ¶
func (w *WebRTCReceiver) TrackID() livekit.TrackID