Documentation ¶
Overview ¶
Design of Prober
Probing is used to check for existence of excess channel capacity. This is especially useful in the downstream direction of SFU. SFU forwards audio/video streams from one or more publishers to all the subscribers. But, the downstream channel of a subscriber may not be big enough to carry all the streams. It is also a time varying quantity.
When there is not enough capacity, some streams will be paused. To resume a stream, SFU would need to know that the channel has enough capacity. That's where probing comes in. When conditions are favorable, SFU can send probe packets so that the bandwidth estimator has more data to estimate available channel capacity better. NOTE: What defines `favorable conditions` is implementation dependent.
There are two options for probing
- Use padding only RTP packets: This one is preferable as probe rate can be controlled more tightly.
- Resume a paused stream or forward a higher spatial layer: Have to find a stream at probing rate. Also, a stream could get a key frame unexpectedly boosting rate in the probing window.
The strategy used depends on stream allocator implementation. This module can be used if the stream allocator decides to use padding only RTP packets for probing purposes.
Implementation: There are a couple of options
- Check prober in the forwarding path (pull from prober). This is preferred for scalability reasons. But, this suffers from not being able to probe when all streams are paused (could be due to downstream bandwidth constraints or the corresponding upstream tracks may have paused due to upstream bandwidth constraints). Another issue is not being able to have tight control on probing window boundary as the packet forwarding path may not have a packet to forward. But, it should not be a major concern as long as some stream(s) is/are forwarded as there should be a packet at least every 60 ms or so (forwarding only one stream at 15 fps). Usually, it will be serviced much more frequently when there are multiple streams getting forwarded.
- Run it a go routine. But, that would have to wake up very often to prevent bunching up of probe packets. So, a scalability concern as there is one prober per subscriber peer connection. But, probe windows should be very short (of the order of 100s of ms). So, this approach might be fine.
The implementation here follows the second approach of using a go routine.
Pacing: ------ Ideally, the subscriber peer connection should have a pacer which trickles data out at the estimated channel capacity rate (and estimated channel capacity + probing rate when actively probing).
But, there a few significant challenges
- Pacer will require buffering of forwarded packets. That means more memory, more CPU (have to make copy of packets) and more latency in the media stream.
- Scalability concern as SFU may be handling hundreds of subscriber peer connections and each one processing the pacing loop at 5ms interval will add up.
So, this module assumes that pacing is inherently provided by the publishers for media streams. That is a reasonable assumption given that publishing clients will run their own pacer and pacing data out at a steady rate.
A further assumption is that if there are multiple publishers for a subscriber peer connection, all the publishers are not pacing in sync, i.e. each publisher's pacer is completely independent and SFU will be receiving the media packets with a good spread and not clumped together.
Given those assumptions, this module monitors media send rate and adjusts probing packet sends accordingly. Although the probing may have a high enough wake up frequency, it is for short windows. For example, probing at 5 Mbps for 1/2 second and sending 1000 byte probe per iteration will wake up every 1.6 ms. That is very high, but should last for 1/2 second or so.
5 Mbps over 1/2 second = 2.5 Mbps 2.5 Mbps = 312500 bytes = 313 probes at 1000 byte probes 313 probes over 1/2 second = 1.6 ms between probes
A few things to note
- When a probe cluster is added, the expected media rate is provided. So, the wake-up interval takes that into account. For example, if probing at 5 Mbps for 1/2 second and if 4 Mbps of it is expected to be provided by media traffic, the wake-up interval becomes 8 ms.
- The amount of probing should actually be capped at some value to avoid too much self-induced congestion. It maybe something like 500 kbps. That will increase the wake-up interval to 16 ms in the above example.
- In practice, the probing interval may also be shorter. Typically, it can be run for 2 - 3 RTTs to get a good measurement. For the longest hauls, RTT could be 250 ms or so leading to the probing window being long(ish). But, RTT should be much shorter especially if the subscriber peer connection of the client is able to connect to the nearest data center.
Index ¶
- Variables
- type Cluster
- func (c *Cluster) Id() ProbeClusterId
- func (c *Cluster) Info() ProbeClusterInfo
- func (c *Cluster) MarkCompleted(result ProbeClusterResult)
- func (c *Cluster) MarshalLogObject(e zapcore.ObjectEncoder) error
- func (c *Cluster) ProbesSent(bytesSent int)
- func (c *Cluster) Process() time.Duration
- func (c *Cluster) Start()
- type ProbeClusterGoal
- type ProbeClusterId
- type ProbeClusterInfo
- type ProbeClusterMode
- type ProbeClusterResult
- type ProbeRegulator
- type ProbeRegulatorConfig
- type ProbeRegulatorParams
- type ProbeSignal
- type Prober
- func (p *Prober) AddCluster(mode ProbeClusterMode, pcg ProbeClusterGoal) ProbeClusterInfo
- func (p *Prober) ClusterDone(info ProbeClusterInfo)
- func (p *Prober) GetActiveClusterId() ProbeClusterId
- func (p *Prober) IsRunning() bool
- func (p *Prober) ProbesSent(bytesSent int)
- func (p *Prober) Reset(info ProbeClusterInfo)
- type ProberListener
- type ProberParams
- type TrendDetector
- func (t *TrendDetector[T]) AddValue(value T)
- func (t *TrendDetector[T]) GetDirection() TrendDirection
- func (t *TrendDetector[T]) GetHighest() T
- func (t *TrendDetector[T]) GetLowest() T
- func (t *TrendDetector[T]) HasEnoughSamples() bool
- func (t *TrendDetector[T]) MarshalLogObject(e zapcore.ObjectEncoder) error
- func (t *TrendDetector[T]) Seed(value T)
- type TrendDetectorConfig
- type TrendDetectorParams
- type TrendDirection
Constants ¶
This section is empty.
Variables ¶
var ( DefaultProbeRegulatorConfig = ProbeRegulatorConfig{ BaseInterval: 3 * time.Second, BackoffFactor: 1.5, MaxInterval: 2 * time.Minute, MinDuration: 200 * time.Millisecond, MaxDuration: 20 * time.Second, DurationIncreaseFactor: 1.5, } )
var (
ProbeClusterInfoInvalid = ProbeClusterInfo{Id: ProbeClusterIdInvalid}
)
Functions ¶
This section is empty.
Types ¶
type Cluster ¶
type Cluster struct {
// contains filtered or unexported fields
}
func (*Cluster) Id ¶
func (c *Cluster) Id() ProbeClusterId
func (*Cluster) Info ¶
func (c *Cluster) Info() ProbeClusterInfo
func (*Cluster) MarkCompleted ¶
func (c *Cluster) MarkCompleted(result ProbeClusterResult)
func (*Cluster) MarshalLogObject ¶
func (c *Cluster) MarshalLogObject(e zapcore.ObjectEncoder) error
func (*Cluster) ProbesSent ¶
type ProbeClusterGoal ¶
type ProbeClusterGoal struct { AvailableBandwidthBps int ExpectedUsageBps int DesiredBps int Duration time.Duration DesiredBytes int }
func (ProbeClusterGoal) MarshalLogObject ¶
func (p ProbeClusterGoal) MarshalLogObject(e zapcore.ObjectEncoder) error
type ProbeClusterInfo ¶
type ProbeClusterInfo struct { Id ProbeClusterId CreatedAt time.Time Goal ProbeClusterGoal Result ProbeClusterResult }
func (ProbeClusterInfo) MarshalLogObject ¶
func (p ProbeClusterInfo) MarshalLogObject(e zapcore.ObjectEncoder) error
type ProbeClusterMode ¶
type ProbeClusterMode int
const ( ProbeClusterModeUniform ProbeClusterMode = iota ProbeClusterModeLinearChirp )
func (ProbeClusterMode) String ¶
func (p ProbeClusterMode) String() string
type ProbeClusterResult ¶
type ProbeClusterResult struct { StartTime int64 EndTime int64 PacketsProbe int BytesProbe int PacketsNonProbePrimary int BytesNonProbePrimary int PacketsNonProbeRTX int BytesNonProbeRTX int IsCompleted bool }
func (ProbeClusterResult) Bitrate ¶
func (p ProbeClusterResult) Bitrate() float64
func (ProbeClusterResult) Bytes ¶
func (p ProbeClusterResult) Bytes() int
func (ProbeClusterResult) Duration ¶
func (p ProbeClusterResult) Duration() time.Duration
func (ProbeClusterResult) MarshalLogObject ¶
func (p ProbeClusterResult) MarshalLogObject(e zapcore.ObjectEncoder) error
type ProbeRegulator ¶
type ProbeRegulator struct {
// contains filtered or unexported fields
}
func NewProbeRegulator ¶
func NewProbeRegulator(params ProbeRegulatorParams) *ProbeRegulator
func (*ProbeRegulator) CanProbe ¶
func (p *ProbeRegulator) CanProbe() bool
func (*ProbeRegulator) ProbeDuration ¶
func (p *ProbeRegulator) ProbeDuration() time.Duration
func (*ProbeRegulator) ProbeSignal ¶
func (p *ProbeRegulator) ProbeSignal(probeSignal ProbeSignal, baseTime time.Time)
type ProbeRegulatorConfig ¶
type ProbeRegulatorConfig struct { BaseInterval time.Duration `yaml:"base_interval,omitempty"` BackoffFactor float64 `yaml:"backoff_factor,omitempty"` MaxInterval time.Duration `yaml:"max_interval,omitempty"` MinDuration time.Duration `yaml:"min_duration,omitempty"` MaxDuration time.Duration `yaml:"max_duration,omitempty"` DurationIncreaseFactor float64 `yaml:"duration_increase_factor,omitempty"` }
type ProbeRegulatorParams ¶
type ProbeRegulatorParams struct { Config ProbeRegulatorConfig Logger logger.Logger }
type ProbeSignal ¶
type ProbeSignal int
const ( ProbeSignalInconclusive ProbeSignal = iota ProbeSignalCongesting ProbeSignalNotCongesting )
func (ProbeSignal) String ¶
func (p ProbeSignal) String() string
type Prober ¶
type Prober struct {
// contains filtered or unexported fields
}
func NewProber ¶
func NewProber(params ProberParams) *Prober
func (*Prober) AddCluster ¶
func (p *Prober) AddCluster(mode ProbeClusterMode, pcg ProbeClusterGoal) ProbeClusterInfo
func (*Prober) ClusterDone ¶
func (p *Prober) ClusterDone(info ProbeClusterInfo)
func (*Prober) GetActiveClusterId ¶
func (p *Prober) GetActiveClusterId() ProbeClusterId
func (*Prober) ProbesSent ¶
func (*Prober) Reset ¶
func (p *Prober) Reset(info ProbeClusterInfo)
type ProberListener ¶
type ProberListener interface { OnProbeClusterSwitch(info ProbeClusterInfo) OnSendProbe(bytesToSend int) }
type ProberParams ¶
type ProberParams struct { Listener ProberListener Logger logger.Logger }
type TrendDetector ¶
type TrendDetector[T trendDetectorNumber] struct {
// contains filtered or unexported fields
}
func NewTrendDetector ¶
func NewTrendDetector[T trendDetectorNumber](params TrendDetectorParams) *TrendDetector[T]
func (*TrendDetector[T]) AddValue ¶
func (t *TrendDetector[T]) AddValue(value T)
func (*TrendDetector[T]) GetDirection ¶
func (t *TrendDetector[T]) GetDirection() TrendDirection
func (*TrendDetector[T]) GetHighest ¶
func (t *TrendDetector[T]) GetHighest() T
func (*TrendDetector[T]) GetLowest ¶
func (t *TrendDetector[T]) GetLowest() T
func (*TrendDetector[T]) HasEnoughSamples ¶
func (t *TrendDetector[T]) HasEnoughSamples() bool
func (*TrendDetector[T]) MarshalLogObject ¶
func (t *TrendDetector[T]) MarshalLogObject(e zapcore.ObjectEncoder) error
func (*TrendDetector[T]) Seed ¶
func (t *TrendDetector[T]) Seed(value T)
type TrendDetectorConfig ¶
type TrendDetectorConfig struct { RequiredSamples int `yaml:"required_samples,omitempty"` RequiredSamplesMin int `yaml:"required_samples_min,omitempty"` DownwardTrendThreshold float64 `yaml:"downward_trend_threshold,omitempty"` DownwardTrendMaxWait time.Duration `yaml:"downward_trend_max_wait,omitempty"` CollapseThreshold time.Duration `yaml:"collapse_threshold,omitempty"` ValidityWindow time.Duration `yaml:"validity_window,omitempty"` }
type TrendDetectorParams ¶
type TrendDetectorParams struct { Name string Logger logger.Logger Config TrendDetectorConfig }
type TrendDirection ¶
type TrendDirection int
const ( TrendDirectionInconclusive TrendDirection = iota TrendDirectionUpward TrendDirectionDownward )
func (TrendDirection) String ¶
func (t TrendDirection) String() string