sfu

package
v0.15.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 6, 2022 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Overview

Design of Prober

Probing is used to check for existence of excess channel capacity. This is especially useful in the downstream direction of SFU. SFU forwards audio/video streams from one or more publishers to all the subscribers. But, the downstream channel of a subscriber may not be big enough to carry all the streams. It is also a time varying quantity.

When there is not enough capacity, some streams will be paused. To resume a stream, SFU would need to know that the channel has enough capacity. That's where probing comes in. When conditions are favorable, SFU can send probe packets so that the bandwidth estimator has more data to estimate available channel capacity better. NOTE: What defines `favorable conditions` is implementation dependent.

There are two options for probing

  • Use padding only RTP packets: This one is preferable as probe rate can be controlled more tightly.
  • Resume a paused stream or forward a higher spatial layer: Have to find a stream at probing rate. Also, a stream could get a key frame unexpectedly boosting rate in the probing window.

The strategy used depends on stream allocator implementation. This module can be used if the stream allocator decides to use padding only RTP packets for probing purposes.

Implementation: There are a couple of options

  • Check prober in the forwarding path (pull from prober). This is preferred for scalability reasons. But, this suffers from not being able to probe when all streams are paused (could be due to downstream bandwidth constraints or the corresponding upstream tracks may have paused due to upstream bandwidth constraints). Another issue is not being to have tight control on probing window boundary as the packet forwarding path may not have a packet to forward. But, it should not be a major concern as long as some stream(s) is/are forwarded as there should be a packet at least every 60 ms or so (forwarding only one stream at 15 fps). Usually, it will be serviced much more frequently when there are multiple streams getting forwarded.
  • Run it a go routine. But, that would have to wake up very often to prevent bunching up of probe packets. So, a scalability concern as there is one prober per subscriber peer connection. But, probe windows should be very short (of the order of 100s of ms). So, this approach might be fine.

The implementation here follows the second approach of using a go routine.

Pacing: ------ Ideally, the subscriber peer connection should have a pacer which trickles data out at the estimated channel capacity rate (and estimated channel capacity + probing rate when actively probing).

But, there a few significant challenges

  1. Pacer will require buffering of forwarded packets. That means more memory, more CPU (have to make copy of packets) and more latency in the media stream.
  2. Scalability concern as SFU may be handling hundreds of subscriber peer connections and each one processing the pacing loop at 5ms interval will add up.

So, this module assumes that pacing is inherently provided by the publishers for media streams. That is a reasonable assumption given that publishing clients will run their own pacer and pacing data out at a steady rate.

A further assumption is that if there are multiple publishers for a subscriber peer connection, all the publishers are not pacing in sync, i.e. each publisher's pacer is completely independent and SFU will be receiving the media packets with a good spread and not clumped together.

Given those assumptions, this module monitors media send rate and adjusts probing packet sends accordingly. Although the probing may have a high enough wake up frequency, it is for short windows. For example, probing at 5 Mbps for 1/2 second and sending 1000 byte probe per iteration will wake up every 1.6 ms. That is very high, but should last for 1/2 second or so.

5 Mbps over 1/2 second = 2.5 Mbps
2.5 Mbps = 312500 bytes = 313 probes at 1000 byte probes
313 probes over 1/2 second = 1.6 ms between probes

A few things to note

  1. When a probe cluster is added, the expected media rate is provided. So, the wake up interval takes that into account. For example, if probing at 5 Mbps for 1/2 second and if 4 Mbps of it is expected to be provided by media traffic, the wake up interval becomes 8 ms.
  2. The amount of probing should actually be capped at some value to avoid too much self-induced congestion. It maybe something like 500 kbps. That will increase the wake up interval to 16 ms in the above example.
  3. In practice, the probing interval may also be shorter. Typically, it can be run for 2 - 3 RTTs to get a good measurement. For the longest hauls, RTT could be 250 ms or so leading to the probing window being long(ish). But, RTT should be much shorter especially if the subscriber peer connection of the client is able to connect to the nearest data center.

Index

Constants

View Source
const (
	RTPPaddingMaxPayloadSize      = 255
	RTPPaddingEstimatedHeaderSize = 20
	RTPBlankFramesMax             = 6
)
View Source
const (
	InvalidLayerSpatial  = int32(-1)
	InvalidLayerTemporal = int32(-1)

	DefaultMaxLayerSpatial  = int32(2)
	DefaultMaxLayerTemporal = int32(3)
)
View Source
const (
	QuarterResolution = "q"
	HalfResolution    = "h"
	FullResolution    = "f"
)
View Source
const (
	ChannelCapacityInfinity = 100 * 1000 * 1000 // 100 Mbps

	EstimateEpsilon = 2000 // 2 kbps

	GratuitousProbeHeadroomBps = 1 * 1000 * 1000 // if headroom > 1 Mbps, don't probe
	GratuitousProbePct         = 10
	GratuitousProbeMinBps      = 100 * 1000 // 100 kbps
	GratuitousProbeMaxBps      = 300 * 1000 // 300 kbps
	GratuitousProbeMinDuration = 500 * time.Millisecond
	GratuitousProbeMaxDuration = 600 * time.Millisecond

	AudioLossWeight = 0.75
	VideoLossWeight = 0.25

	// LK-TODO-START
	// These constants will definitely require more tweaking.
	// In fact, simple time threshold rules most probably will not be enough.
	// LK-TODO-END
	EstimateCommit          = 2 * 1000 * time.Millisecond // 2 seconds
	ProbeWait               = 8 * 1000 * time.Millisecond // 8 seconds
	BoostWait               = 5 * 1000 * time.Millisecond // 5 seconds
	GratuitousProbeWait     = 8 * 1000 * time.Millisecond // 8 seconds
	GratuitousProbeMoreWait = 5 * 1000 * time.Millisecond // 5 seconds
)
View Source
const (
	FlagPauseOnDowngrade = false
)

Forwarder

View Source
const (
	TransitionCostSpatial = 10
)

Variables

View Source
var (
	ErrUnknownKind                       = errors.New("unknown kind of codec")
	ErrOutOfOrderSequenceNumberCacheMiss = errors.New("out-of-order sequence number not found in cache")
	ErrPaddingOnlyPacket                 = errors.New("padding only packet that need not be forwarded")
	ErrDuplicatePacket                   = errors.New("duplicate packet")
	ErrPaddingNotOnFrameBoundary         = errors.New("padding cannot send on non-frame boundary")
	ErrNotVP8                            = errors.New("not VP8")
	ErrOutOfOrderVP8PictureIdCacheMiss   = errors.New("out-of-order VP8 picture id not found in cache")
	ErrFilteredVP8TemporalLayer          = errors.New("filtered VP8 temporal layer")
)
View Source
var (
	VP8KeyFrame1x1 = []byte{0x10, 0x02, 0x00, 0x9d, 0x01, 0x2a, 0x01, 0x00, 0x01, 0x00, 0x0b, 0xc7, 0x08, 0x85, 0x85, 0x88, 0x85, 0x84, 0x88, 0x3f, 0x82, 0x00, 0x0c, 0x0d, 0x60, 0x00, 0xfe, 0xe6, 0xb5, 0x00}

	H264KeyFrame2x2SPS = []byte{0x67, 0x42, 0xc0, 0x1f, 0x0f, 0xd9, 0x1f, 0x88, 0x88, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04, 0x00, 0x00, 0x03, 0x00, 0xc8, 0x3c, 0x60, 0xc9, 0x20}
	H264KeyFrame2x2PPS = []byte{0x68, 0x87, 0xcb, 0x83, 0xcb, 0x20}
	H264KeyFrame2x2IDR = []byte{0x65, 0x88, 0x84, 0x0a, 0xf2, 0x62, 0x80, 0x00, 0xa7, 0xbe}

	H264KeyFrame2x2 = [][]byte{H264KeyFrame2x2SPS, H264KeyFrame2x2PPS, H264KeyFrame2x2IDR}
)
View Source
var (
	InvalidLayers = VideoLayers{
					// contains filtered or unexported fields
	}

	DefaultMaxLayers = VideoLayers{
						// contains filtered or unexported fields
	}
)
View Source
var Logger = logr.Discard()

Logger is an implementation of logr.Logger. If is not provided - will be turned off.

View Source
var (
	PacketFactory *sync.Pool
)
View Source
var (
	VideoAllocationDefault = VideoAllocation{
		// contains filtered or unexported fields
	}
)

Functions

func FixedPointToPercent added in v0.15.0

func FixedPointToPercent(frac uint8) uint32

converts a fixed point number to the number part of %

Types

type AddTrackParams added in v0.15.2

type AddTrackParams struct {
	Source      livekit.TrackSource
	IsSimulcast bool
}

type Bitrates added in v0.15.0

type Cluster

type Cluster struct {
	// contains filtered or unexported fields
}

func NewCluster

func NewCluster(desiredRateBps int, expectedRateBps int, minDuration time.Duration, maxDuration time.Duration) *Cluster

func (*Cluster) GetSleepDuration

func (c *Cluster) GetSleepDuration() time.Duration

func (*Cluster) IsFinished added in v0.15.0

func (c *Cluster) IsFinished() bool

func (*Cluster) PacketSent

func (c *Cluster) PacketSent(size int)

func (*Cluster) ProbeSent added in v0.15.0

func (c *Cluster) ProbeSent(size int)

func (*Cluster) Process

func (c *Cluster) Process(p *Prober)

func (*Cluster) Start

func (c *Cluster) Start()

func (*Cluster) String added in v0.15.0

func (c *Cluster) String() string

type DownTrack

type DownTrack struct {
	// contains filtered or unexported fields
}

DownTrack implements TrackLocal, is the track used to write packets to SFU Subscriber, the track handle the packets for simple, simulcast and SVC Publisher.

func NewDownTrack

func NewDownTrack(c webrtc.RTPCodecCapability, r TrackReceiver, bf *buffer.Factory, peerID livekit.ParticipantID, mt int) (*DownTrack, error)

NewDownTrack returns a DownTrack.

func (*DownTrack) AddReceiverReportListener

func (d *DownTrack) AddReceiverReportListener(listener ReceiverReportListener)

func (*DownTrack) Allocate added in v0.15.0

func (d *DownTrack) Allocate(availableChannelCapacity int64, allowPause bool) VideoAllocation

func (*DownTrack) AllocateNextHigher added in v0.15.0

func (d *DownTrack) AllocateNextHigher() (VideoAllocation, bool)

func (*DownTrack) BandwidthRequested added in v0.15.0

func (d *DownTrack) BandwidthRequested() int64

func (*DownTrack) Bind

func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, error)

Bind is called by the PeerConnection after negotiation is complete This asserts that the code requested is supported by the remote peer. If so it sets up all the state (SSRC and PayloadType) to have a call

func (*DownTrack) Close

func (d *DownTrack) Close()

Close track

func (*DownTrack) Codec

func (d *DownTrack) Codec() webrtc.RTPCodecCapability

Codec returns current track codec capability

func (*DownTrack) CreateSenderReport

func (d *DownTrack) CreateSenderReport() *rtcp.SenderReport

func (*DownTrack) CreateSourceDescriptionChunks

func (d *DownTrack) CreateSourceDescriptionChunks() []rtcp.SourceDescriptionChunk

func (*DownTrack) CurrentMaxLossFraction

func (d *DownTrack) CurrentMaxLossFraction() uint8

func (*DownTrack) DebugInfo

func (d *DownTrack) DebugInfo() map[string]interface{}

func (*DownTrack) DistanceToDesired added in v0.15.0

func (d *DownTrack) DistanceToDesired() int32

func (*DownTrack) FinalizeAllocate added in v0.15.0

func (d *DownTrack) FinalizeAllocate() VideoAllocation

func (*DownTrack) GetConnectionScore added in v0.15.0

func (d *DownTrack) GetConnectionScore() float64

func (*DownTrack) GetForwardingStatus added in v0.15.0

func (d *DownTrack) GetForwardingStatus() ForwardingStatus

func (*DownTrack) ID

func (d *DownTrack) ID() string

ID is the unique identifier for this Track. This should be unique for the stream, but doesn't have to globally unique. A common example would be 'audio' or 'video' and StreamID would be 'desktop' or 'webcam'

func (*DownTrack) IsDeficient added in v0.15.0

func (d *DownTrack) IsDeficient() bool

func (*DownTrack) Kind

func (d *DownTrack) Kind() webrtc.RTPCodecType

Kind controls if this TrackLocal is audio or video

func (*DownTrack) MaxLayers added in v0.15.0

func (d *DownTrack) MaxLayers() VideoLayers

func (*DownTrack) Mute

func (d *DownTrack) Mute(val bool)

Mute enables or disables media forwarding

func (*DownTrack) OnAvailableLayersChanged

func (d *DownTrack) OnAvailableLayersChanged(fn func(dt *DownTrack))

func (*DownTrack) OnBind

func (d *DownTrack) OnBind(fn func())

func (*DownTrack) OnCloseHandler

func (d *DownTrack) OnCloseHandler(fn func())

OnCloseHandler method to be called on remote tracked removed

func (*DownTrack) OnPacketSent

func (d *DownTrack) OnPacketSent(fn func(dt *DownTrack, size int))

func (*DownTrack) OnPaddingSent added in v0.15.0

func (d *DownTrack) OnPaddingSent(fn func(dt *DownTrack, size int))

func (*DownTrack) OnREMB

func (d *DownTrack) OnREMB(fn func(dt *DownTrack, remb *rtcp.ReceiverEstimatedMaximumBitrate))

func (*DownTrack) OnRTCP added in v0.14.2

func (d *DownTrack) OnRTCP(fn func([]rtcp.Packet))

func (*DownTrack) OnSubscribedLayersChanged added in v0.15.0

func (d *DownTrack) OnSubscribedLayersChanged(fn func(dt *DownTrack, layers VideoLayers))

func (*DownTrack) OnSubscriptionChanged

func (d *DownTrack) OnSubscriptionChanged(fn func(dt *DownTrack))

func (*DownTrack) OnTransportCCFeedback added in v0.15.2

func (d *DownTrack) OnTransportCCFeedback(fn func(dt *DownTrack, cc *rtcp.TransportLayerCC))

func (*DownTrack) Pause added in v0.15.0

func (d *DownTrack) Pause() VideoAllocation

func (*DownTrack) PeerID added in v0.15.0

func (d *DownTrack) PeerID() livekit.ParticipantID

func (*DownTrack) ProvisionalAllocate added in v0.15.0

func (d *DownTrack) ProvisionalAllocate(availableChannelCapacity int64, layers VideoLayers, allowPause bool) int64

func (*DownTrack) ProvisionalAllocateCommit added in v0.15.0

func (d *DownTrack) ProvisionalAllocateCommit() VideoAllocation

func (*DownTrack) ProvisionalAllocateGetBestWeightedTransition added in v0.15.0

func (d *DownTrack) ProvisionalAllocateGetBestWeightedTransition() VideoTransition

func (*DownTrack) ProvisionalAllocateGetCooperativeTransition added in v0.15.0

func (d *DownTrack) ProvisionalAllocateGetCooperativeTransition() VideoTransition

func (*DownTrack) ProvisionalAllocatePrepare added in v0.15.0

func (d *DownTrack) ProvisionalAllocatePrepare()

func (*DownTrack) SSRC

func (d *DownTrack) SSRC() uint32

func (*DownTrack) SetMaxSpatialLayer added in v0.15.0

func (d *DownTrack) SetMaxSpatialLayer(spatialLayer int32)

func (*DownTrack) SetMaxTemporalLayer added in v0.15.0

func (d *DownTrack) SetMaxTemporalLayer(temporalLayer int32)

func (*DownTrack) SetRTPHeaderExtensions

func (d *DownTrack) SetRTPHeaderExtensions(rtpHeaderExtensions []webrtc.RTPHeaderExtensionParameter)

Sets RTP header extensions for this track

func (*DownTrack) SetTransceiver

func (d *DownTrack) SetTransceiver(transceiver *webrtc.RTPTransceiver)

func (*DownTrack) Stop

func (d *DownTrack) Stop() error

func (*DownTrack) StreamID

func (d *DownTrack) StreamID() string

StreamID is the group this track belongs too. This must be unique

func (*DownTrack) Unbind

func (d *DownTrack) Unbind(_ webrtc.TrackLocalContext) error

Unbind implements the teardown logic when the track is no longer needed. This happens because a track has been stopped.

func (*DownTrack) UpdatePaddingStats added in v0.15.0

func (d *DownTrack) UpdatePaddingStats(packetLen uint32)

func (*DownTrack) UpdatePrimaryStats added in v0.15.0

func (d *DownTrack) UpdatePrimaryStats(packetLen uint32)

func (*DownTrack) UpdateRtxStats added in v0.15.0

func (d *DownTrack) UpdateRtxStats(packetLen uint32)

func (*DownTrack) UptrackLayersChange

func (d *DownTrack) UptrackLayersChange(availableLayers []uint16)

func (*DownTrack) WritePaddingRTP

func (d *DownTrack) WritePaddingRTP(bytesToSend int) int

WritePaddingRTP tries to write as many padding only RTP packets as necessary to satisfy given size to the DownTrack

func (*DownTrack) WriteRTP

func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error

WriteRTP writes an RTP Packet to the DownTrack

type Event

type Event struct {
	Signal    Signal
	DownTrack *DownTrack
	Data      interface{}
}

func (Event) String added in v0.15.0

func (e Event) String() string

type Forwarder added in v0.15.0

type Forwarder struct {
	// contains filtered or unexported fields
}

func NewForwarder added in v0.15.0

func NewForwarder(codec webrtc.RTPCodecCapability, kind webrtc.RTPCodecType) *Forwarder

func (*Forwarder) Allocate added in v0.15.0

func (f *Forwarder) Allocate(availableChannelCapacity int64, allowPause bool, brs Bitrates) VideoAllocation

func (*Forwarder) AllocateNextHigher added in v0.15.0

func (f *Forwarder) AllocateNextHigher(brs Bitrates) (VideoAllocation, bool)

func (*Forwarder) BandwidthRequested added in v0.15.0

func (f *Forwarder) BandwidthRequested() int64

func (*Forwarder) CurrentLayers added in v0.15.0

func (f *Forwarder) CurrentLayers() VideoLayers

func (*Forwarder) DistanceToDesired added in v0.15.0

func (f *Forwarder) DistanceToDesired() int32

func (*Forwarder) FinalizeAllocate added in v0.15.0

func (f *Forwarder) FinalizeAllocate(brs Bitrates) VideoAllocation

func (*Forwarder) GetForwardingStatus added in v0.15.0

func (f *Forwarder) GetForwardingStatus() ForwardingStatus

func (*Forwarder) GetPaddingVP8 added in v0.15.0

func (f *Forwarder) GetPaddingVP8(frameEndNeeded bool) *buffer.VP8

func (*Forwarder) GetRTPMungerParams added in v0.15.0

func (f *Forwarder) GetRTPMungerParams() RTPMungerParams

func (*Forwarder) GetSnTsForBlankFrames added in v0.15.0

func (f *Forwarder) GetSnTsForBlankFrames() ([]SnTs, bool, error)

func (*Forwarder) GetSnTsForPadding added in v0.15.0

func (f *Forwarder) GetSnTsForPadding(num int) ([]SnTs, error)

func (*Forwarder) GetTranslationParams added in v0.15.0

func (f *Forwarder) GetTranslationParams(extPkt *buffer.ExtPacket, layer int32) (*TranslationParams, error)

func (*Forwarder) IsDeficient added in v0.15.0

func (f *Forwarder) IsDeficient() bool

func (*Forwarder) MaxLayers added in v0.15.0

func (f *Forwarder) MaxLayers() VideoLayers

func (*Forwarder) Mute added in v0.15.0

func (f *Forwarder) Mute(val bool) bool

func (*Forwarder) Muted added in v0.15.0

func (f *Forwarder) Muted() bool

func (*Forwarder) Pause added in v0.15.0

func (f *Forwarder) Pause(brs Bitrates) VideoAllocation

func (*Forwarder) ProvisionalAllocate added in v0.15.0

func (f *Forwarder) ProvisionalAllocate(availableChannelCapacity int64, layers VideoLayers, allowPause bool) int64

func (*Forwarder) ProvisionalAllocateCommit added in v0.15.0

func (f *Forwarder) ProvisionalAllocateCommit() VideoAllocation

func (*Forwarder) ProvisionalAllocateGetBestWeightedTransition added in v0.15.0

func (f *Forwarder) ProvisionalAllocateGetBestWeightedTransition() VideoTransition

func (*Forwarder) ProvisionalAllocateGetCooperativeTransition added in v0.15.0

func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition() VideoTransition

func (*Forwarder) ProvisionalAllocatePrepare added in v0.15.0

func (f *Forwarder) ProvisionalAllocatePrepare(bitrates Bitrates)

func (*Forwarder) SetMaxSpatialLayer added in v0.15.0

func (f *Forwarder) SetMaxSpatialLayer(spatialLayer int32) (bool, VideoLayers)

func (*Forwarder) SetMaxTemporalLayer added in v0.15.0

func (f *Forwarder) SetMaxTemporalLayer(temporalLayer int32) (bool, VideoLayers)

func (*Forwarder) TargetLayers added in v0.15.0

func (f *Forwarder) TargetLayers() VideoLayers

func (*Forwarder) UptrackLayersChange added in v0.15.0

func (f *Forwarder) UptrackLayersChange(availableLayers []uint16)

type ForwardingStatus added in v0.15.0

type ForwardingStatus int
const (
	ForwardingStatusOff ForwardingStatus = iota
	ForwardingStatusPartial
	ForwardingStatusOptimal
)

type MaxDistanceSorter added in v0.15.0

type MaxDistanceSorter []*Track

func (MaxDistanceSorter) Len added in v0.15.0

func (m MaxDistanceSorter) Len() int

func (MaxDistanceSorter) Less added in v0.15.0

func (m MaxDistanceSorter) Less(i, j int) bool

func (MaxDistanceSorter) Swap added in v0.15.0

func (m MaxDistanceSorter) Swap(i, j int)

type MinDistanceSorter added in v0.15.0

type MinDistanceSorter []*Track

func (MinDistanceSorter) Len added in v0.15.0

func (m MinDistanceSorter) Len() int

func (MinDistanceSorter) Less added in v0.15.0

func (m MinDistanceSorter) Less(i, j int) bool

func (MinDistanceSorter) Swap added in v0.15.0

func (m MinDistanceSorter) Swap(i, j int)

type PacketStats added in v0.15.0

type PacketStats struct {
	// contains filtered or unexported fields
}

type Prober

type Prober struct {
	// contains filtered or unexported fields
}

func NewProber

func NewProber(params ProberParams) *Prober

func (*Prober) AddCluster

func (p *Prober) AddCluster(desiredRateBps int, expectedRateBps int, minDuration time.Duration, maxDuration time.Duration)

func (*Prober) IsRunning

func (p *Prober) IsRunning() bool

func (*Prober) OnSendProbe

func (p *Prober) OnSendProbe(f func(bytesToSend int))

func (*Prober) PacketSent

func (p *Prober) PacketSent(size int)

func (*Prober) ProbeSent added in v0.15.0

func (p *Prober) ProbeSent(size int)

func (*Prober) Reset

func (p *Prober) Reset()

type ProberParams added in v0.15.0

type ProberParams struct {
	Logger logger.Logger
}

type RTPMunger added in v0.15.0

type RTPMunger struct {
	RTPMungerParams
}

func NewRTPMunger added in v0.15.0

func NewRTPMunger() *RTPMunger

func (*RTPMunger) GetParams added in v0.15.0

func (r *RTPMunger) GetParams() RTPMungerParams

func (*RTPMunger) IsOnFrameBoundary added in v0.15.0

func (r *RTPMunger) IsOnFrameBoundary() bool

func (*RTPMunger) PacketDropped added in v0.15.0

func (r *RTPMunger) PacketDropped(extPkt *buffer.ExtPacket)

func (*RTPMunger) SetLastSnTs added in v0.15.0

func (r *RTPMunger) SetLastSnTs(extPkt *buffer.ExtPacket)

func (*RTPMunger) UpdateAndGetPaddingSnTs added in v0.15.0

func (r *RTPMunger) UpdateAndGetPaddingSnTs(num int, clockRate uint32, frameRate uint32, forceMarker bool) ([]SnTs, error)

func (*RTPMunger) UpdateAndGetSnTs added in v0.15.0

func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationParamsRTP, error)

func (*RTPMunger) UpdateSnTsOffsets added in v0.15.0

func (r *RTPMunger) UpdateSnTsOffsets(extPkt *buffer.ExtPacket, snAdjust uint16, tsAdjust uint32)

type RTPMungerParams added in v0.15.0

type RTPMungerParams struct {
	// contains filtered or unexported fields
}

type Receiver

type Receiver interface {
	TrackID() livekit.TrackID
	StreamID() string
	Codec() webrtc.RTPCodecCapability
	AddUpTrack(track *webrtc.TrackRemote, buffer *buffer.Buffer)
	AddDownTrack(track TrackSender)
	SetUpTrackPaused(paused bool)
	SetMaxExpectedSpatialLayer(layer int32)
	NumAvailableSpatialLayers() int
	GetBitrateTemporalCumulative() Bitrates
	ReadRTP(buf []byte, layer uint8, sn uint16) (int, error)
	DeleteDownTrack(peerID livekit.ParticipantID)
	OnCloseHandler(fn func())
	SendPLI(layer int32)
	SetRTCPCh(ch chan []rtcp.Packet)

	GetSenderReportTime(layer int32) (rtpTS uint32, ntpTS uint64)
	DebugInfo() map[string]interface{}
}

Receiver defines an interface for a track receivers

func NewWebRTCReceiver

func NewWebRTCReceiver(receiver *webrtc.RTPReceiver, track *webrtc.TrackRemote, pid livekit.ParticipantID, opts ...ReceiverOpts) Receiver

NewWebRTCReceiver creates a new webrtc track receivers

type ReceiverOpts

type ReceiverOpts func(w *WebRTCReceiver) *WebRTCReceiver

func WithLoadBalanceThreshold

func WithLoadBalanceThreshold(downTracks int) ReceiverOpts

WithLoadBalanceThreshold enables parallelization of packet writes when downTracks exceeds threshold Value should be between 3 and 150. For a server handling a few large rooms, use a smaller value (required to handle very large (250+ participant) rooms). For a server handling many small rooms, use a larger value or disable. Set to 0 (disabled) by default.

func WithPliThrottle

func WithPliThrottle(period int64) ReceiverOpts

WithPliThrottle indicates minimum time(ms) between sending PLIs

func WithStreamTrackers

func WithStreamTrackers() ReceiverOpts

WithStreamTrackers enables StreamTracker use for simulcast

type ReceiverReportListener

type ReceiverReportListener func(dt *DownTrack, report *rtcp.ReceiverReport)

type SequenceNumberOrdering

type SequenceNumberOrdering int

RTPMunger

const (
	SequenceNumberOrderingContiguous SequenceNumberOrdering = iota
	SequenceNumberOrderingOutOfOrder
	SequenceNumberOrderingGap
	SequenceNumberOrderingDuplicate
)

type Signal

type Signal int
const (
	SignalAddTrack Signal = iota
	SignalRemoveTrack
	SignalEstimate
	SignalTargetBitrate
	SignalReceiverReport
	SignalAvailableLayersChange
	SignalSubscriptionChange
	SignalSubscribedLayersChange
	SignalPeriodicPing
	SignalSendProbe
)

func (Signal) String added in v0.15.0

func (s Signal) String() string

type SnTs added in v0.15.0

type SnTs struct {
	// contains filtered or unexported fields
}

type State

type State int
const (
	StateStable State = iota
	StateDeficient
)

func (State) String added in v0.15.0

func (s State) String() string

type StreamAllocator

type StreamAllocator struct {
	// contains filtered or unexported fields
}

func NewStreamAllocator

func NewStreamAllocator(params StreamAllocatorParams) *StreamAllocator

func (*StreamAllocator) AddTrack

func (s *StreamAllocator) AddTrack(downTrack *DownTrack, params AddTrackParams)

func (*StreamAllocator) OnStreamStateChange added in v0.15.0

func (s *StreamAllocator) OnStreamStateChange(f func(update *StreamStateUpdate) error)

func (*StreamAllocator) RemoveTrack

func (s *StreamAllocator) RemoveTrack(downTrack *DownTrack)

func (*StreamAllocator) Start

func (s *StreamAllocator) Start()

func (*StreamAllocator) Stop

func (s *StreamAllocator) Stop()

type StreamAllocatorParams added in v0.15.0

type StreamAllocatorParams struct {
	Config config.CongestionControlConfig
	Logger logger.Logger
}

type StreamState added in v0.15.0

type StreamState int
const (
	StreamStateActive StreamState = iota
	StreamStatePaused
)

type StreamStateInfo added in v0.15.0

type StreamStateInfo struct {
	ParticipantID livekit.ParticipantID
	TrackID       livekit.TrackID
	State         StreamState
}

type StreamStateUpdate added in v0.15.0

type StreamStateUpdate struct {
	StreamStates []*StreamStateInfo
}

func NewStreamStateUpdate added in v0.15.0

func NewStreamStateUpdate() *StreamStateUpdate

func (*StreamStateUpdate) Empty added in v0.15.0

func (s *StreamStateUpdate) Empty() bool

func (*StreamStateUpdate) HandleStreamingChange added in v0.15.0

func (s *StreamStateUpdate) HandleStreamingChange(change VideoStreamingChange, track *Track)

type StreamStatus

type StreamStatus int32
const (
	StreamStatusStopped StreamStatus = 0
	StreamStatusActive  StreamStatus = 1
)

func (StreamStatus) String

func (s StreamStatus) String() string

type StreamTracker

type StreamTracker struct {
	// contains filtered or unexported fields
}

StreamTracker keeps track of packet flow and ensures a particular uptrack is consistently producing It runs its own goroutine for detection, and fires OnStatusChanged callback

func NewStreamTracker

func NewStreamTracker(samplesRequired uint32, cyclesRequired uint64, cycleDuration time.Duration) *StreamTracker

func (*StreamTracker) Observe

func (s *StreamTracker) Observe(sn uint16)

Observe a packet that's received

func (*StreamTracker) OnStatusChanged

func (s *StreamTracker) OnStatusChanged(f func(status StreamStatus))

func (*StreamTracker) Reset added in v0.15.2

func (s *StreamTracker) Reset()

func (*StreamTracker) SetPaused

func (s *StreamTracker) SetPaused(paused bool)

func (*StreamTracker) Start

func (s *StreamTracker) Start()

func (*StreamTracker) Status

func (s *StreamTracker) Status() StreamStatus

func (*StreamTracker) Stop

func (s *StreamTracker) Stop()

type Track

type Track struct {
	// contains filtered or unexported fields
}

func (*Track) Allocate added in v0.15.0

func (t *Track) Allocate(availableChannelCapacity int64, allowPause bool) VideoAllocation

func (*Track) AllocateNextHigher added in v0.15.0

func (t *Track) AllocateNextHigher() (VideoAllocation, bool)

func (*Track) BandwidthRequested

func (t *Track) BandwidthRequested() int64

func (*Track) DistanceToDesired added in v0.15.0

func (t *Track) DistanceToDesired() int32

func (*Track) DownTrack added in v0.15.0

func (t *Track) DownTrack() *DownTrack

func (*Track) FinalizeAllocate added in v0.15.0

func (t *Track) FinalizeAllocate()

func (*Track) GetPacketStats

func (t *Track) GetPacketStats() (uint32, uint32)

func (*Track) ID added in v0.15.0

func (t *Track) ID() livekit.TrackID

func (*Track) IsDeficient added in v0.15.0

func (t *Track) IsDeficient() bool

func (*Track) IsManaged added in v0.15.0

func (t *Track) IsManaged() bool

func (*Track) Pause added in v0.15.0

func (t *Track) Pause() VideoAllocation

func (*Track) PeerID added in v0.15.0

func (t *Track) PeerID() livekit.ParticipantID

func (*Track) ProvisionalAllocate added in v0.15.0

func (t *Track) ProvisionalAllocate(availableChannelCapacity int64, layers VideoLayers, allowPause bool) int64

func (*Track) ProvisionalAllocateCommit added in v0.15.0

func (t *Track) ProvisionalAllocateCommit() VideoAllocation

func (*Track) ProvisionalAllocateGetBestWeightedTransition added in v0.15.0

func (t *Track) ProvisionalAllocateGetBestWeightedTransition() VideoTransition

func (*Track) ProvisionalAllocateGetCooperativeTransition added in v0.15.0

func (t *Track) ProvisionalAllocateGetCooperativeTransition() VideoTransition

func (*Track) ProvisionalAllocatePrepare added in v0.15.0

func (t *Track) ProvisionalAllocatePrepare()

func (*Track) UpdateMaxLayers added in v0.15.0

func (t *Track) UpdateMaxLayers(layers VideoLayers)

func (*Track) UpdatePacketStats

func (t *Track) UpdatePacketStats(rr *rtcp.ReceiverReport)

LK-TODO this should probably be maintained in downTrack and this module can query what it needs

func (*Track) WritePaddingRTP

func (t *Track) WritePaddingRTP(bytesToSend int) int

type TrackReceiver added in v0.15.0

type TrackReceiver interface {
	TrackID() livekit.TrackID
	StreamID() string
	GetBitrateTemporalCumulative() Bitrates
	ReadRTP(buf []byte, layer uint8, sn uint16) (int, error)
	AddDownTrack(track TrackSender)
	DeleteDownTrack(peerID livekit.ParticipantID)
	SendPLI(layer int32)
	GetSenderReportTime(layer int32) (rtpTS uint32, ntpTS uint64)
	Codec() webrtc.RTPCodecCapability
}

TrackReceiver defines an interface receive media from remote peer

type TrackSender added in v0.15.0

type TrackSender interface {
	UptrackLayersChange(availableLayers []uint16)
	WriteRTP(p *buffer.ExtPacket, layer int32) error
	Close()
	// ID is the globally unique identifier for this Track.
	ID() string
	Codec() webrtc.RTPCodecCapability
	PeerID() livekit.ParticipantID
}

TrackSender defines an interface send media to remote peer

type TrackSorter added in v0.15.0

type TrackSorter []*Track

func (TrackSorter) Len added in v0.15.0

func (t TrackSorter) Len() int

func (TrackSorter) Less added in v0.15.0

func (t TrackSorter) Less(i, j int) bool

func (TrackSorter) Swap added in v0.15.0

func (t TrackSorter) Swap(i, j int)

type TranslationParams added in v0.15.0

type TranslationParams struct {
	// contains filtered or unexported fields
}

type TranslationParamsRTP added in v0.15.0

type TranslationParamsRTP struct {
	// contains filtered or unexported fields
}

type TranslationParamsVP8 added in v0.15.0

type TranslationParamsVP8 struct {
	// contains filtered or unexported fields
}

VP8 munger

type VP8Munger

type VP8Munger struct {
	VP8MungerParams
}

func NewVP8Munger

func NewVP8Munger() *VP8Munger

func (*VP8Munger) PictureIdOffset added in v0.15.0

func (v *VP8Munger) PictureIdOffset(extPictureId int32) (int32, bool)

for testing only

func (*VP8Munger) SetLast

func (v *VP8Munger) SetLast(extPkt *buffer.ExtPacket)

func (*VP8Munger) UpdateAndGet

func (v *VP8Munger) UpdateAndGet(extPkt *buffer.ExtPacket, ordering SequenceNumberOrdering, maxTemporalLayer int32) (*TranslationParamsVP8, error)

func (*VP8Munger) UpdateAndGetPadding

func (v *VP8Munger) UpdateAndGetPadding(newPicture bool) *buffer.VP8

func (*VP8Munger) UpdateOffsets

func (v *VP8Munger) UpdateOffsets(extPkt *buffer.ExtPacket)

type VP8MungerParams

type VP8MungerParams struct {
	// contains filtered or unexported fields
}

type VP8PictureIdWrapHandler

type VP8PictureIdWrapHandler struct {
	// contains filtered or unexported fields
}

func (*VP8PictureIdWrapHandler) Init

func (v *VP8PictureIdWrapHandler) Init(extPictureId int32, mBit bool)

func (*VP8PictureIdWrapHandler) MaxPictureId

func (v *VP8PictureIdWrapHandler) MaxPictureId() int32

func (*VP8PictureIdWrapHandler) Unwrap

func (v *VP8PictureIdWrapHandler) Unwrap(pictureId uint16, mBit bool) int32

unwrap picture id and update the maxPictureId. return unwrapped value

func (*VP8PictureIdWrapHandler) UpdateMaxPictureId

func (v *VP8PictureIdWrapHandler) UpdateMaxPictureId(extPictureId int32, mBit bool)

type VideoAllocation added in v0.15.0

type VideoAllocation struct {
	// contains filtered or unexported fields
}

func (VideoAllocation) String added in v0.15.0

func (v VideoAllocation) String() string

type VideoAllocationProvisional added in v0.15.0

type VideoAllocationProvisional struct {
	// contains filtered or unexported fields
}

type VideoAllocationState added in v0.15.0

type VideoAllocationState int
const (
	VideoAllocationStateNone VideoAllocationState = iota
	VideoAllocationStateMuted
	VideoAllocationStateFeedDry
	VideoAllocationStateAwaitingMeasurement
	VideoAllocationStateOptimal
	VideoAllocationStateDeficient
)

func (VideoAllocationState) String added in v0.15.0

func (v VideoAllocationState) String() string

type VideoLayers added in v0.15.0

type VideoLayers struct {
	// contains filtered or unexported fields
}

func (VideoLayers) GreaterThan added in v0.15.0

func (v VideoLayers) GreaterThan(v2 VideoLayers) bool

func (VideoLayers) String added in v0.15.0

func (v VideoLayers) String() string

type VideoStreamingChange added in v0.15.0

type VideoStreamingChange int
const (
	VideoStreamingChangeNone VideoStreamingChange = iota
	VideoStreamingChangePausing
	VideoStreamingChangeResuming
)

func (VideoStreamingChange) String added in v0.15.0

func (v VideoStreamingChange) String() string

type VideoTransition added in v0.15.0

type VideoTransition struct {
	// contains filtered or unexported fields
}

type WebRTCReceiver

type WebRTCReceiver struct {
	// contains filtered or unexported fields
}

WebRTCReceiver receives a video track

func (*WebRTCReceiver) AddDownTrack

func (w *WebRTCReceiver) AddDownTrack(track TrackSender)

func (*WebRTCReceiver) AddUpTrack

func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buffer)

func (*WebRTCReceiver) Codec

func (w *WebRTCReceiver) Codec() webrtc.RTPCodecCapability

func (*WebRTCReceiver) DebugInfo

func (w *WebRTCReceiver) DebugInfo() map[string]interface{}

func (*WebRTCReceiver) DeleteDownTrack

func (w *WebRTCReceiver) DeleteDownTrack(peerID livekit.ParticipantID)

DeleteDownTrack removes a DownTrack from a Receiver

func (*WebRTCReceiver) GetBitrateTemporalCumulative

func (w *WebRTCReceiver) GetBitrateTemporalCumulative() Bitrates

func (*WebRTCReceiver) GetSenderReportTime

func (w *WebRTCReceiver) GetSenderReportTime(layer int32) (rtpTS uint32, ntpTS uint64)

func (*WebRTCReceiver) Kind

func (w *WebRTCReceiver) Kind() webrtc.RTPCodecType

func (*WebRTCReceiver) NumAvailableSpatialLayers added in v0.15.0

func (w *WebRTCReceiver) NumAvailableSpatialLayers() int

func (*WebRTCReceiver) OnCloseHandler

func (w *WebRTCReceiver) OnCloseHandler(fn func())

OnCloseHandler method to be called on remote tracked removed

func (*WebRTCReceiver) ReadRTP added in v0.15.0

func (w *WebRTCReceiver) ReadRTP(buf []byte, layer uint8, sn uint16) (int, error)

func (*WebRTCReceiver) SSRC

func (w *WebRTCReceiver) SSRC(layer int) uint32

func (*WebRTCReceiver) SendPLI added in v0.15.0

func (w *WebRTCReceiver) SendPLI(layer int32)

func (*WebRTCReceiver) SendRTCP

func (w *WebRTCReceiver) SendRTCP(p []rtcp.Packet)

func (*WebRTCReceiver) SetMaxExpectedSpatialLayer added in v0.15.2

func (w *WebRTCReceiver) SetMaxExpectedSpatialLayer(layer int32)

func (*WebRTCReceiver) SetRTCPCh

func (w *WebRTCReceiver) SetRTCPCh(ch chan []rtcp.Packet)

func (*WebRTCReceiver) SetTrackMeta

func (w *WebRTCReceiver) SetTrackMeta(trackID livekit.TrackID, streamID string)

func (*WebRTCReceiver) SetUpTrackPaused

func (w *WebRTCReceiver) SetUpTrackPaused(paused bool)

SetUpTrackPaused indicates upstream will not be sending any data. this will reflect the "muted" status and will pause streamtracker to ensure we don't turn off the layer

func (*WebRTCReceiver) StreamID

func (w *WebRTCReceiver) StreamID() string

func (*WebRTCReceiver) TrackID

func (w *WebRTCReceiver) TrackID() livekit.TrackID

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL