ccutils

package
v1.8.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 7, 2025 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Overview

Design of Prober

Probing is used to check for existence of excess channel capacity. This is especially useful in the downstream direction of SFU. SFU forwards audio/video streams from one or more publishers to all the subscribers. But, the downstream channel of a subscriber may not be big enough to carry all the streams. It is also a time varying quantity.

When there is not enough capacity, some streams will be paused. To resume a stream, SFU would need to know that the channel has enough capacity. That's where probing comes in. When conditions are favorable, SFU can send probe packets so that the bandwidth estimator has more data to estimate available channel capacity better. NOTE: What defines `favorable conditions` is implementation dependent.

There are two options for probing

  • Use padding only RTP packets: This one is preferable as probe rate can be controlled more tightly.
  • Resume a paused stream or forward a higher spatial layer: Have to find a stream at probing rate. Also, a stream could get a key frame unexpectedly boosting rate in the probing window.

The strategy used depends on stream allocator implementation. This module can be used if the stream allocator decides to use padding only RTP packets for probing purposes.

Implementation: There are a couple of options

  • Check prober in the forwarding path (pull from prober). This is preferred for scalability reasons. But, this suffers from not being able to probe when all streams are paused (could be due to downstream bandwidth constraints or the corresponding upstream tracks may have paused due to upstream bandwidth constraints). Another issue is not being able to have tight control on probing window boundary as the packet forwarding path may not have a packet to forward. But, it should not be a major concern as long as some stream(s) is/are forwarded as there should be a packet at least every 60 ms or so (forwarding only one stream at 15 fps). Usually, it will be serviced much more frequently when there are multiple streams getting forwarded.
  • Run it a go routine. But, that would have to wake up very often to prevent bunching up of probe packets. So, a scalability concern as there is one prober per subscriber peer connection. But, probe windows should be very short (of the order of 100s of ms). So, this approach might be fine.

The implementation here follows the second approach of using a go routine.

Pacing: ------ Ideally, the subscriber peer connection should have a pacer which trickles data out at the estimated channel capacity rate (and estimated channel capacity + probing rate when actively probing).

But, there a few significant challenges

  1. Pacer will require buffering of forwarded packets. That means more memory, more CPU (have to make copy of packets) and more latency in the media stream.
  2. Scalability concern as SFU may be handling hundreds of subscriber peer connections and each one processing the pacing loop at 5ms interval will add up.

So, this module assumes that pacing is inherently provided by the publishers for media streams. That is a reasonable assumption given that publishing clients will run their own pacer and pacing data out at a steady rate.

A further assumption is that if there are multiple publishers for a subscriber peer connection, all the publishers are not pacing in sync, i.e. each publisher's pacer is completely independent and SFU will be receiving the media packets with a good spread and not clumped together.

Given those assumptions, this module monitors media send rate and adjusts probing packet sends accordingly. Although the probing may have a high enough wake up frequency, it is for short windows. For example, probing at 5 Mbps for 1/2 second and sending 1000 byte probe per iteration will wake up every 1.6 ms. That is very high, but should last for 1/2 second or so.

5 Mbps over 1/2 second = 2.5 Mbps
2.5 Mbps = 312500 bytes = 313 probes at 1000 byte probes
313 probes over 1/2 second = 1.6 ms between probes

A few things to note

  1. When a probe cluster is added, the expected media rate is provided. So, the wake-up interval takes that into account. For example, if probing at 5 Mbps for 1/2 second and if 4 Mbps of it is expected to be provided by media traffic, the wake-up interval becomes 8 ms.
  2. The amount of probing should actually be capped at some value to avoid too much self-induced congestion. It maybe something like 500 kbps. That will increase the wake-up interval to 16 ms in the above example.
  3. In practice, the probing interval may also be shorter. Typically, it can be run for 2 - 3 RTTs to get a good measurement. For the longest hauls, RTT could be 250 ms or so leading to the probing window being long(ish). But, RTT should be much shorter especially if the subscriber peer connection of the client is able to connect to the nearest data center.

Index

Constants

This section is empty.

Variables

View Source
var (
	DefaultProbeRegulatorConfig = ProbeRegulatorConfig{
		BaseInterval:  3 * time.Second,
		BackoffFactor: 1.5,
		MaxInterval:   2 * time.Minute,

		MinDuration:            200 * time.Millisecond,
		MaxDuration:            20 * time.Second,
		DurationIncreaseFactor: 1.5,
	}
)
View Source
var (
	ProbeClusterInfoInvalid = ProbeClusterInfo{Id: ProbeClusterIdInvalid}
)

Functions

This section is empty.

Types

type Cluster

type Cluster struct {
	// contains filtered or unexported fields
}

func (*Cluster) Id

func (c *Cluster) Id() ProbeClusterId

func (*Cluster) Info

func (c *Cluster) Info() ProbeClusterInfo

func (*Cluster) MarkCompleted

func (c *Cluster) MarkCompleted(result ProbeClusterResult)

func (*Cluster) MarshalLogObject

func (c *Cluster) MarshalLogObject(e zapcore.ObjectEncoder) error

func (*Cluster) ProbesSent

func (c *Cluster) ProbesSent(bytesSent int)

func (*Cluster) Process

func (c *Cluster) Process() time.Duration

func (*Cluster) Start

func (c *Cluster) Start()

type ProbeClusterGoal

type ProbeClusterGoal struct {
	AvailableBandwidthBps int
	ExpectedUsageBps      int
	DesiredBps            int
	Duration              time.Duration
	DesiredBytes          int
}

func (ProbeClusterGoal) MarshalLogObject

func (p ProbeClusterGoal) MarshalLogObject(e zapcore.ObjectEncoder) error

type ProbeClusterId

type ProbeClusterId uint32
const (
	ProbeClusterIdInvalid ProbeClusterId = 0
)

type ProbeClusterInfo

type ProbeClusterInfo struct {
	Id        ProbeClusterId
	CreatedAt time.Time
	Goal      ProbeClusterGoal
	Result    ProbeClusterResult
}

func (ProbeClusterInfo) MarshalLogObject

func (p ProbeClusterInfo) MarshalLogObject(e zapcore.ObjectEncoder) error

type ProbeClusterMode

type ProbeClusterMode int
const (
	ProbeClusterModeUniform ProbeClusterMode = iota
	ProbeClusterModeLinearChirp
)

func (ProbeClusterMode) String

func (p ProbeClusterMode) String() string

type ProbeClusterResult

type ProbeClusterResult struct {
	StartTime              int64
	EndTime                int64
	PacketsProbe           int
	BytesProbe             int
	PacketsNonProbePrimary int
	BytesNonProbePrimary   int
	PacketsNonProbeRTX     int
	BytesNonProbeRTX       int
	IsCompleted            bool
}

func (ProbeClusterResult) Bitrate

func (p ProbeClusterResult) Bitrate() float64

func (ProbeClusterResult) Bytes

func (p ProbeClusterResult) Bytes() int

func (ProbeClusterResult) Duration

func (p ProbeClusterResult) Duration() time.Duration

func (ProbeClusterResult) MarshalLogObject

func (p ProbeClusterResult) MarshalLogObject(e zapcore.ObjectEncoder) error

type ProbeRegulator

type ProbeRegulator struct {
	// contains filtered or unexported fields
}

func NewProbeRegulator

func NewProbeRegulator(params ProbeRegulatorParams) *ProbeRegulator

func (*ProbeRegulator) CanProbe

func (p *ProbeRegulator) CanProbe() bool

func (*ProbeRegulator) ProbeDuration

func (p *ProbeRegulator) ProbeDuration() time.Duration

func (*ProbeRegulator) ProbeSignal

func (p *ProbeRegulator) ProbeSignal(probeSignal ProbeSignal, baseTime time.Time)

type ProbeRegulatorConfig

type ProbeRegulatorConfig struct {
	BaseInterval  time.Duration `yaml:"base_interval,omitempty"`
	BackoffFactor float64       `yaml:"backoff_factor,omitempty"`
	MaxInterval   time.Duration `yaml:"max_interval,omitempty"`

	MinDuration            time.Duration `yaml:"min_duration,omitempty"`
	MaxDuration            time.Duration `yaml:"max_duration,omitempty"`
	DurationIncreaseFactor float64       `yaml:"duration_increase_factor,omitempty"`
}

type ProbeRegulatorParams

type ProbeRegulatorParams struct {
	Config ProbeRegulatorConfig
	Logger logger.Logger
}

type ProbeSignal

type ProbeSignal int
const (
	ProbeSignalInconclusive ProbeSignal = iota
	ProbeSignalCongesting
	ProbeSignalNotCongesting
)

func (ProbeSignal) String

func (p ProbeSignal) String() string

type Prober

type Prober struct {
	// contains filtered or unexported fields
}

func NewProber

func NewProber(params ProberParams) *Prober

func (*Prober) AddCluster

func (p *Prober) AddCluster(mode ProbeClusterMode, pcg ProbeClusterGoal) ProbeClusterInfo

func (*Prober) ClusterDone

func (p *Prober) ClusterDone(info ProbeClusterInfo)

func (*Prober) GetActiveClusterId

func (p *Prober) GetActiveClusterId() ProbeClusterId

func (*Prober) IsRunning

func (p *Prober) IsRunning() bool

func (*Prober) ProbesSent

func (p *Prober) ProbesSent(bytesSent int)

func (*Prober) Reset

func (p *Prober) Reset(info ProbeClusterInfo)

type ProberListener

type ProberListener interface {
	OnProbeClusterSwitch(info ProbeClusterInfo)
	OnSendProbe(bytesToSend int)
}

type ProberParams

type ProberParams struct {
	Listener ProberListener
	Logger   logger.Logger
}

type TrendDetector

type TrendDetector[T trendDetectorNumber] struct {
	// contains filtered or unexported fields
}

func NewTrendDetector

func NewTrendDetector[T trendDetectorNumber](params TrendDetectorParams) *TrendDetector[T]

func (*TrendDetector[T]) AddValue

func (t *TrendDetector[T]) AddValue(value T)

func (*TrendDetector[T]) GetDirection

func (t *TrendDetector[T]) GetDirection() TrendDirection

func (*TrendDetector[T]) GetHighest

func (t *TrendDetector[T]) GetHighest() T

func (*TrendDetector[T]) GetLowest

func (t *TrendDetector[T]) GetLowest() T

func (*TrendDetector[T]) HasEnoughSamples

func (t *TrendDetector[T]) HasEnoughSamples() bool

func (*TrendDetector[T]) MarshalLogObject

func (t *TrendDetector[T]) MarshalLogObject(e zapcore.ObjectEncoder) error

func (*TrendDetector[T]) Seed

func (t *TrendDetector[T]) Seed(value T)

type TrendDetectorConfig

type TrendDetectorConfig struct {
	RequiredSamples        int           `yaml:"required_samples,omitempty"`
	RequiredSamplesMin     int           `yaml:"required_samples_min,omitempty"`
	DownwardTrendThreshold float64       `yaml:"downward_trend_threshold,omitempty"`
	DownwardTrendMaxWait   time.Duration `yaml:"downward_trend_max_wait,omitempty"`
	CollapseThreshold      time.Duration `yaml:"collapse_threshold,omitempty"`
	ValidityWindow         time.Duration `yaml:"validity_window,omitempty"`
}

type TrendDetectorParams

type TrendDetectorParams struct {
	Name   string
	Logger logger.Logger
	Config TrendDetectorConfig
}

type TrendDirection

type TrendDirection int
const (
	TrendDirectionInconclusive TrendDirection = iota
	TrendDirectionUpward
	TrendDirectionDownward
)

func (TrendDirection) String

func (t TrendDirection) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL