streamallocator

package
v0.0.0-...-c2cc3df Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 28, 2024 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Overview

Design of Prober

Probing is used to check for existence of excess channel capacity. This is especially useful in the downstream direction of SFU. SFU forwards audio/video streams from one or more publishers to all the subscribers. But, the downstream channel of a subscriber may not be big enough to carry all the streams. It is also a time varying quantity.

When there is not enough capacity, some streams will be paused. To resume a stream, SFU would need to know that the channel has enough capacity. That's where probing comes in. When conditions are favorable, SFU can send probe packets so that the bandwidth estimator has more data to estimate available channel capacity better. NOTE: What defines `favorable conditions` is implementation dependent.

There are two options for probing

  • Use padding only RTP packets: This one is preferable as probe rate can be controlled more tightly.
  • Resume a paused stream or forward a higher spatial layer: Have to find a stream at probing rate. Also, a stream could get a key frame unexpectedly boosting rate in the probing window.

The strategy used depends on stream allocator implementation. This module can be used if the stream allocator decides to use padding only RTP packets for probing purposes.

Implementation: There are a couple of options

  • Check prober in the forwarding path (pull from prober). This is preferred for scalability reasons. But, this suffers from not being able to probe when all streams are paused (could be due to downstream bandwidth constraints or the corresponding upstream tracks may have paused due to upstream bandwidth constraints). Another issue is not being to have tight control on probing window boundary as the packet forwarding path may not have a packet to forward. But, it should not be a major concern as long as some stream(s) is/are forwarded as there should be a packet at least every 60 ms or so (forwarding only one stream at 15 fps). Usually, it will be serviced much more frequently when there are multiple streams getting forwarded.
  • Run it a go routine. But, that would have to wake up very often to prevent bunching up of probe packets. So, a scalability concern as there is one prober per subscriber peer connection. But, probe windows should be very short (of the order of 100s of ms). So, this approach might be fine.

The implementation here follows the second approach of using a go routine.

Pacing: ------ Ideally, the subscriber peer connection should have a pacer which trickles data out at the estimated channel capacity rate (and estimated channel capacity + probing rate when actively probing).

But, there a few significant challenges

  1. Pacer will require buffering of forwarded packets. That means more memory, more CPU (have to make copy of packets) and more latency in the media stream.
  2. Scalability concern as SFU may be handling hundreds of subscriber peer connections and each one processing the pacing loop at 5ms interval will add up.

So, this module assumes that pacing is inherently provided by the publishers for media streams. That is a reasonable assumption given that publishing clients will run their own pacer and pacing data out at a steady rate.

A further assumption is that if there are multiple publishers for a subscriber peer connection, all the publishers are not pacing in sync, i.e. each publisher's pacer is completely independent and SFU will be receiving the media packets with a good spread and not clumped together.

Given those assumptions, this module monitors media send rate and adjusts probing packet sends accordingly. Although the probing may have a high enough wake up frequency, it is for short windows. For example, probing at 5 Mbps for 1/2 second and sending 1000 byte probe per iteration will wake up every 1.6 ms. That is very high, but should last for 1/2 second or so.

5 Mbps over 1/2 second = 2.5 Mbps
2.5 Mbps = 312500 bytes = 313 probes at 1000 byte probes
313 probes over 1/2 second = 1.6 ms between probes

A few things to note

  1. When a probe cluster is added, the expected media rate is provided. So, the wake-up interval takes that into account. For example, if probing at 5 Mbps for 1/2 second and if 4 Mbps of it is expected to be provided by media traffic, the wake-up interval becomes 8 ms.
  2. The amount of probing should actually be capped at some value to avoid too much self-induced congestion. It maybe something like 500 kbps. That will increase the wake-up interval to 16 ms in the above example.
  3. In practice, the probing interval may also be shorter. Typically, it can be run for 2 - 3 RTTs to get a good measurement. For the longest hauls, RTT could be 250 ms or so leading to the probing window being long(ish). But, RTT should be much shorter especially if the subscriber peer connection of the client is able to connect to the nearest data center.

Index

Constants

View Source
const (
	ChannelCapacityInfinity = 100 * 1000 * 1000 // 100 Mbps

	PriorityMin                = uint8(1)
	PriorityMax                = uint8(255)
	PriorityDefaultScreenshare = PriorityMax
	PriorityDefaultVideo       = PriorityMin

	FlagAllowOvershootWhileOptimal              = true
	FlagAllowOvershootWhileDeficient            = false
	FlagAllowOvershootExemptTrackWhileDeficient = true
	FlagAllowOvershootInProbe                   = true
	FlagAllowOvershootInCatchup                 = false
	FlagAllowOvershootInBoost                   = true
)

Variables

This section is empty.

Functions

This section is empty.

Types

type AddTrackParams

type AddTrackParams struct {
	Source      livekit.TrackSource
	Priority    uint8
	IsSimulcast bool
	PublisherID livekit.ParticipantID
}

type ChannelCongestionReason

type ChannelCongestionReason int
const (
	ChannelCongestionReasonNone ChannelCongestionReason = iota
	ChannelCongestionReasonEstimate
	ChannelCongestionReasonLoss
)

func (ChannelCongestionReason) String

func (c ChannelCongestionReason) String() string

type ChannelObserver

type ChannelObserver struct {
	// contains filtered or unexported fields
}

func NewChannelObserver

func NewChannelObserver(params ChannelObserverParams, logger logger.Logger) *ChannelObserver

func (*ChannelObserver) AddEstimate

func (c *ChannelObserver) AddEstimate(estimate int64)

func (*ChannelObserver) AddNack

func (c *ChannelObserver) AddNack(packets uint32, repeatedNacks uint32)

func (*ChannelObserver) GetHighestEstimate

func (c *ChannelObserver) GetHighestEstimate() int64

func (*ChannelObserver) GetLowestEstimate

func (c *ChannelObserver) GetLowestEstimate() int64

func (*ChannelObserver) GetNackRatio

func (c *ChannelObserver) GetNackRatio() float64

func (*ChannelObserver) GetTrend

func (*ChannelObserver) HasEnoughEstimateSamples

func (c *ChannelObserver) HasEnoughEstimateSamples() bool

func (*ChannelObserver) SeedEstimate

func (c *ChannelObserver) SeedEstimate(estimate int64)

func (*ChannelObserver) ToString

func (c *ChannelObserver) ToString() string

type ChannelObserverParams

type ChannelObserverParams struct {
	Name   string
	Config config.CongestionControlChannelObserverConfig
}

type ChannelTrend

type ChannelTrend int
const (
	ChannelTrendNeutral ChannelTrend = iota
	ChannelTrendClearing
	ChannelTrendCongesting
)

func (ChannelTrend) String

func (c ChannelTrend) String() string

type Cluster

type Cluster struct {
	// contains filtered or unexported fields
}

func NewCluster

func NewCluster(id ProbeClusterId, mode ProbeClusterMode, desiredRateBps int, expectedRateBps int, minDuration time.Duration, maxDuration time.Duration) *Cluster

func (*Cluster) GetInfo

func (c *Cluster) GetInfo() ProbeClusterInfo

func (*Cluster) GetSleepDuration

func (c *Cluster) GetSleepDuration() time.Duration

func (*Cluster) IsFinished

func (c *Cluster) IsFinished() bool

func (*Cluster) PacketsSent

func (c *Cluster) PacketsSent(size int)

func (*Cluster) ProbeSent

func (c *Cluster) ProbeSent(size int)

func (*Cluster) Process

func (c *Cluster) Process(pl ProberListener)

func (*Cluster) Start

func (c *Cluster) Start()

func (*Cluster) String

func (c *Cluster) String() string

type Event

type Event struct {
	*StreamAllocator
	Signal  streamAllocatorSignal
	TrackID livekit.TrackID
	Data    interface{}
}

func (Event) String

func (e Event) String() string

type MaxDistanceSorter

type MaxDistanceSorter []*Track

func (MaxDistanceSorter) Len

func (m MaxDistanceSorter) Len() int

func (MaxDistanceSorter) Less

func (m MaxDistanceSorter) Less(i, j int) bool

func (MaxDistanceSorter) Swap

func (m MaxDistanceSorter) Swap(i, j int)

type MinDistanceSorter

type MinDistanceSorter []*Track

func (MinDistanceSorter) Len

func (m MinDistanceSorter) Len() int

func (MinDistanceSorter) Less

func (m MinDistanceSorter) Less(i, j int) bool

func (MinDistanceSorter) Swap

func (m MinDistanceSorter) Swap(i, j int)

type NackTracker

type NackTracker struct {
	// contains filtered or unexported fields
}

func NewNackTracker

func NewNackTracker(params NackTrackerParams) *NackTracker

func (*NackTracker) Add

func (n *NackTracker) Add(packets uint32, repeatedNacks uint32)

func (*NackTracker) GetRatio

func (n *NackTracker) GetRatio() float64

func (*NackTracker) IsTriggered

func (n *NackTracker) IsTriggered() bool

func (*NackTracker) ToString

func (n *NackTracker) ToString() string

type NackTrackerParams

type NackTrackerParams struct {
	Name              string
	Logger            logger.Logger
	WindowMinDuration time.Duration
	WindowMaxDuration time.Duration
	RatioThreshold    float64
}

type ProbeClusterId

type ProbeClusterId uint32
const (
	ProbeClusterIdInvalid ProbeClusterId = 0
)

type ProbeClusterInfo

type ProbeClusterInfo struct {
	Id        ProbeClusterId
	BytesSent int
	Duration  time.Duration
}

type ProbeClusterMode

type ProbeClusterMode int
const (
	ProbeClusterModeUniform ProbeClusterMode = iota
	ProbeClusterModeLinearChirp
)

func (ProbeClusterMode) String

func (p ProbeClusterMode) String() string

type ProbeController

type ProbeController struct {
	// contains filtered or unexported fields
}

func NewProbeController

func NewProbeController(params ProbeControllerParams) *ProbeController

func (*ProbeController) AbortProbe

func (p *ProbeController) AbortProbe()

func (*ProbeController) CanProbe

func (p *ProbeController) CanProbe() bool

func (*ProbeController) CheckProbe

func (p *ProbeController) CheckProbe(trend ChannelTrend, highestEstimate int64)

func (*ProbeController) DoesProbeNeedFinalize

func (p *ProbeController) DoesProbeNeedFinalize() bool

func (*ProbeController) InitProbe

func (p *ProbeController) InitProbe(probeGoalDeltaBps int64, expectedBandwidthUsage int64) (ProbeClusterId, int64)

func (*ProbeController) IsInProbe

func (p *ProbeController) IsInProbe() bool

func (*ProbeController) MaybeFinalizeProbe

func (p *ProbeController) MaybeFinalizeProbe(
	isComplete bool,
	trend ChannelTrend,
	lowestEstimate int64,
) (isHandled bool, isNotFailing bool, isGoalReached bool)

func (*ProbeController) ProbeClusterDone

func (p *ProbeController) ProbeClusterDone(info ProbeClusterInfo)

func (*ProbeController) Reset

func (p *ProbeController) Reset()

func (*ProbeController) StopProbe

func (p *ProbeController) StopProbe()

type ProbeControllerParams

type ProbeControllerParams struct {
	Config config.CongestionControlProbeConfig
	Prober *Prober
	Logger logger.Logger
}

type Prober

type Prober struct {
	// contains filtered or unexported fields
}

func NewProber

func NewProber(params ProberParams) *Prober

func (*Prober) AddCluster

func (p *Prober) AddCluster(mode ProbeClusterMode, desiredRateBps int, expectedRateBps int, minDuration time.Duration, maxDuration time.Duration) ProbeClusterId

func (*Prober) IsRunning

func (p *Prober) IsRunning() bool

func (*Prober) PacketsSent

func (p *Prober) PacketsSent(size int)

func (*Prober) ProbeSent

func (p *Prober) ProbeSent(size int)

func (*Prober) Reset

func (p *Prober) Reset()

func (*Prober) SetProberListener

func (p *Prober) SetProberListener(listener ProberListener)

type ProberListener

type ProberListener interface {
	OnSendProbe(bytesToSend int)
	OnProbeClusterDone(info ProbeClusterInfo)
	OnActiveChanged(isActive bool)
}

type ProberParams

type ProberParams struct {
	Logger logger.Logger
}

type RateMonitor

type RateMonitor struct {
	// contains filtered or unexported fields
}

func NewRateMonitor

func NewRateMonitor() *RateMonitor

func (*RateMonitor) GetHistory

func (r *RateMonitor) GetHistory() []string

func (*RateMonitor) GetQueuingGuess

func (r *RateMonitor) GetQueuingGuess() float64

STREAM-ALLOCATOR-TODO: This should be updated periodically to flush any pending. Reason is that the estimate could be higher than the actual rate by a significant amount. So, updating periodically to flush out samples that will not contribute to queueing would be good.

func (*RateMonitor) Update

func (r *RateMonitor) Update(estimate int64, managedBytesSent uint32, managedBytesRetransmitted uint32, unmanagedBytesSent uint32, unmanagedBytesRetransmitted uint32)

type StreamAllocator

type StreamAllocator struct {
	// contains filtered or unexported fields
}

func NewStreamAllocator

func NewStreamAllocator(params StreamAllocatorParams) *StreamAllocator

func (*StreamAllocator) AddTrack

func (s *StreamAllocator) AddTrack(downTrack *sfu.DownTrack, params AddTrackParams)

func (*StreamAllocator) IsBWEEnabled

func (s *StreamAllocator) IsBWEEnabled(downTrack *sfu.DownTrack) bool

called to check if track should participate in BWE

func (*StreamAllocator) IsSubscribeMutable

func (s *StreamAllocator) IsSubscribeMutable(downTrack *sfu.DownTrack) bool

called to check if track subscription mute can be applied

func (*StreamAllocator) OnActiveChanged

func (s *StreamAllocator) OnActiveChanged(isActive bool)

called when prober active state changes

func (*StreamAllocator) OnAvailableLayersChanged

func (s *StreamAllocator) OnAvailableLayersChanged(downTrack *sfu.DownTrack)

called when feeding track's layer availability changes

func (*StreamAllocator) OnBitrateAvailabilityChanged

func (s *StreamAllocator) OnBitrateAvailabilityChanged(downTrack *sfu.DownTrack)

called when feeding track's bitrate measurement of any layer is available

func (*StreamAllocator) OnMaxPublishedSpatialChanged

func (s *StreamAllocator) OnMaxPublishedSpatialChanged(downTrack *sfu.DownTrack)

called when feeding track's max published spatial layer changes

func (*StreamAllocator) OnMaxPublishedTemporalChanged

func (s *StreamAllocator) OnMaxPublishedTemporalChanged(downTrack *sfu.DownTrack)

called when feeding track's max published temporal layer changes

func (*StreamAllocator) OnPacketsSent

func (s *StreamAllocator) OnPacketsSent(downTrack *sfu.DownTrack, size int)

called by a video DownTrack to report packet send

func (*StreamAllocator) OnProbeClusterDone

func (s *StreamAllocator) OnProbeClusterDone(info ProbeClusterInfo)

called when prober finishes a probe cluster, could be called when prober is reset which stops an active cluster

func (*StreamAllocator) OnREMB

func (s *StreamAllocator) OnREMB(downTrack *sfu.DownTrack, remb *rtcp.ReceiverEstimatedMaximumBitrate)

called when a new REMB is received (receive side bandwidth estimation)

func (*StreamAllocator) OnResume

func (s *StreamAllocator) OnResume(downTrack *sfu.DownTrack)

called when forwarder resumes a track

func (*StreamAllocator) OnSendProbe

func (s *StreamAllocator) OnSendProbe(bytesToSend int)

called when prober wants to send packet(s)

func (*StreamAllocator) OnStreamStateChange

func (s *StreamAllocator) OnStreamStateChange(f func(update *StreamStateUpdate) error)

func (*StreamAllocator) OnSubscribedLayerChanged

func (s *StreamAllocator) OnSubscribedLayerChanged(downTrack *sfu.DownTrack, layer buffer.VideoLayer)

called when subscribed layer changes (limiting max layer)

func (*StreamAllocator) OnSubscriptionChanged

func (s *StreamAllocator) OnSubscriptionChanged(downTrack *sfu.DownTrack)

called when subscription settings changes (muting/unmuting of track)

func (*StreamAllocator) OnTransportCCFeedback

func (s *StreamAllocator) OnTransportCCFeedback(downTrack *sfu.DownTrack, fb *rtcp.TransportLayerCC)

called when a new transport-cc feedback is received

func (*StreamAllocator) RemoveTrack

func (s *StreamAllocator) RemoveTrack(downTrack *sfu.DownTrack)

func (*StreamAllocator) SetAllowPause

func (s *StreamAllocator) SetAllowPause(allowPause bool)

func (*StreamAllocator) SetBandwidthEstimator

func (s *StreamAllocator) SetBandwidthEstimator(bwe cc.BandwidthEstimator)

func (*StreamAllocator) SetChannelCapacity

func (s *StreamAllocator) SetChannelCapacity(channelCapacity int64)

func (*StreamAllocator) SetTrackPriority

func (s *StreamAllocator) SetTrackPriority(downTrack *sfu.DownTrack, priority uint8)

func (*StreamAllocator) Start

func (s *StreamAllocator) Start()

func (*StreamAllocator) Stop

func (s *StreamAllocator) Stop()

type StreamAllocatorParams

type StreamAllocatorParams struct {
	Config config.CongestionControlConfig
	Logger logger.Logger
}

type StreamState

type StreamState int
const (
	StreamStateInactive StreamState = iota
	StreamStateActive
	StreamStatePaused
)

func (StreamState) String

func (s StreamState) String() string

type StreamStateInfo

type StreamStateInfo struct {
	ParticipantID livekit.ParticipantID
	TrackID       livekit.TrackID
	State         StreamState
}

type StreamStateUpdate

type StreamStateUpdate struct {
	StreamStates []*StreamStateInfo
}

func NewStreamStateUpdate

func NewStreamStateUpdate() *StreamStateUpdate

func (*StreamStateUpdate) Empty

func (s *StreamStateUpdate) Empty() bool

func (*StreamStateUpdate) HandleStreamingChange

func (s *StreamStateUpdate) HandleStreamingChange(track *Track, streamState StreamState)

type Track

type Track struct {
	// contains filtered or unexported fields
}

func NewTrack

func NewTrack(
	downTrack *sfu.DownTrack,
	source livekit.TrackSource,
	isSimulcast bool,
	publisherID livekit.ParticipantID,
	logger logger.Logger,
) *Track

func (*Track) AllocateNextHigher

func (t *Track) AllocateNextHigher(availableChannelCapacity int64, allowOvershoot bool) (sfu.VideoAllocation, bool)

func (*Track) AllocateOptimal

func (t *Track) AllocateOptimal(allowOvershoot bool) sfu.VideoAllocation

func (*Track) BandwidthRequested

func (t *Track) BandwidthRequested() int64

func (*Track) DistanceToDesired

func (t *Track) DistanceToDesired() float64

func (*Track) DownTrack

func (t *Track) DownTrack() *sfu.DownTrack

func (*Track) GetNackDelta

func (t *Track) GetNackDelta() (uint32, uint32)

func (*Track) GetNextHigherTransition

func (t *Track) GetNextHigherTransition(allowOvershoot bool) (sfu.VideoTransition, bool)

func (*Track) ID

func (t *Track) ID() livekit.TrackID

func (*Track) IsDeficient

func (t *Track) IsDeficient() bool

func (*Track) IsManaged

func (t *Track) IsManaged() bool

func (*Track) IsSubscribeMutable

func (t *Track) IsSubscribeMutable() bool

func (*Track) Pause

func (t *Track) Pause() sfu.VideoAllocation

func (*Track) Priority

func (t *Track) Priority() uint8

func (*Track) ProvisionalAllocate

func (t *Track) ProvisionalAllocate(availableChannelCapacity int64, layer buffer.VideoLayer, allowPause bool, allowOvershoot bool) (bool, int64)

func (*Track) ProvisionalAllocateCommit

func (t *Track) ProvisionalAllocateCommit() sfu.VideoAllocation

func (*Track) ProvisionalAllocateGetBestWeightedTransition

func (t *Track) ProvisionalAllocateGetBestWeightedTransition() sfu.VideoTransition

func (*Track) ProvisionalAllocateGetCooperativeTransition

func (t *Track) ProvisionalAllocateGetCooperativeTransition(allowOvershoot bool) sfu.VideoTransition

func (*Track) ProvisionalAllocatePrepare

func (t *Track) ProvisionalAllocatePrepare()

func (*Track) ProvisionalAllocateReset

func (t *Track) ProvisionalAllocateReset()

func (*Track) PublisherID

func (t *Track) PublisherID() livekit.ParticipantID

func (*Track) SetDirty

func (t *Track) SetDirty(isDirty bool) bool

func (*Track) SetMaxLayer

func (t *Track) SetMaxLayer(layer buffer.VideoLayer) bool

func (*Track) SetPriority

func (t *Track) SetPriority(priority uint8) bool

func (*Track) SetStreamState

func (t *Track) SetStreamState(streamState StreamState) bool

func (*Track) WritePaddingRTP

func (t *Track) WritePaddingRTP(bytesToSend int) int

type TrackSorter

type TrackSorter []*Track

func (TrackSorter) Len

func (t TrackSorter) Len() int

func (TrackSorter) Less

func (t TrackSorter) Less(i, j int) bool

func (TrackSorter) Swap

func (t TrackSorter) Swap(i, j int)

type TrendDetector

type TrendDetector struct {
	// contains filtered or unexported fields
}

func NewTrendDetector

func NewTrendDetector(params TrendDetectorParams) *TrendDetector

func (*TrendDetector) AddValue

func (t *TrendDetector) AddValue(value int64)

func (*TrendDetector) GetDirection

func (t *TrendDetector) GetDirection() TrendDirection

func (*TrendDetector) GetHighest

func (t *TrendDetector) GetHighest() int64

func (*TrendDetector) GetLowest

func (t *TrendDetector) GetLowest() int64

func (*TrendDetector) HasEnoughSamples

func (t *TrendDetector) HasEnoughSamples() bool

func (*TrendDetector) Seed

func (t *TrendDetector) Seed(value int64)

func (*TrendDetector) ToString

func (t *TrendDetector) ToString() string

type TrendDetectorParams

type TrendDetectorParams struct {
	Name                   string
	Logger                 logger.Logger
	RequiredSamples        int
	RequiredSamplesMin     int
	DownwardTrendThreshold float64
	DownwardTrendMaxWait   time.Duration
	CollapseThreshold      time.Duration
	ValidityWindow         time.Duration
}

type TrendDirection

type TrendDirection int
const (
	TrendDirectionNeutral TrendDirection = iota
	TrendDirectionUpward
	TrendDirectionDownward
)

func (TrendDirection) String

func (t TrendDirection) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL