varopt

package module
v1.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 29, 2023 License: Apache-2.0 Imports: 4 Imported by: 1

README

Docs

VarOpt Sampling Algorithm

This is an implementation of VarOpt, an unbiased weighted sampling algorithm described in the paper Stream sampling for variance-optimal estimation of subset sums (2008) by Edith Cohen, Nick Duffield, Haim Kaplan, Carsten Lund, and Mikkel Thorup.

VarOpt is a reservoir-type sampler that maintains a fixed-size sample and provides a mechanism for merging unequal-weight samples.

This repository also includes a simple reservoir sampling algorithm, often useful in conjunction with weighed reservoir sampling, that implements Algorithm R from Random sampling with a reservoir (1985) by Jeffrey Vitter.

Usage: Natural Weights

A typical use of VarOpt sampling is to estimate network flows using sample packets. In this use-case, the weight applied to each sample is the size of the packet. Because VarOpt computes an unbiased sample, sample data points can be summarized along secondary dimensions. For example, we can select a subset of sampled packets according to a secondary attribute, sum the sample weights, and the result is expected to equal the size of packets corresponding to the secondary attribute from the original population.

See weighted_test.go for an example.

Usage: Inverse-probability Weights

Another use for VarOpt sampling uses inverse-probability weights to estimate frequencies while simultaneously controlling sample diversity. Suppose a sequence of observations can be naturally categorized into N different buckets. The goal in this case is to compute a sample where each bucket is well represented, while maintaining frequency estimates.

In this use-case, the weight assigned to each observation is the inverse probability of the bucket it belongs to. The result of weighted sampling with inverse-probability weights is a uniform expectated value; in this example we expect an equal number of observations falling into each bucket. Each observation represents a frequency of its sample weight (computed by VarOpt) divided by its original weight (the inverse-probability).

See frequency_test.go for an example.

Usage: Merging Samples

VarOpt supports merging independently collected samples one observation at a time. This is useful for building distributed sampling schemes. In this use-case, each node in a distributed system computes a weighted sample. To combine samples, simply input all the observations and their corresponding weights into a new VarOpt sample.

Documentation

Overview

Package varopt contains an implementation of VarOpt, an unbiased weighted sampling algorithm described in the paper "Stream sampling for variance-optimal estimation of subset sums" https://arxiv.org/pdf/0803.0473.pdf (2008), by Edith Cohen, Nick Duffield, Haim Kaplan, Carsten Lund, and Mikkel Thorup.

VarOpt is a reservoir-type sampler that maintains a fixed-size sample and provides a mechanism for merging unequal-weight samples.

This package also includes a simple reservoir sampling algorithm, often useful in conjunction with weighed reservoir sampling, using Algorithm R from "Random sampling with a reservoir", https://en.wikipedia.org/wiki/Reservoir_sampling#Algorithm_R (1985), by Jeffrey Vitter.

See https://github.com/lightstep/varopt/blob/master/README.md for more detail.

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrInvalidWeight = fmt.Errorf("Negative, Zero, Inf or NaN weight")

Functions

This section is empty.

Types

type Varopt

type Varopt[T any] struct {

	// Large-weight items stored in a min-heap.
	L internal.SampleHeap[T]

	// Light-weight items.
	T []internal.Vsample[T]

	// Temporary buffer.
	X []internal.Vsample[T]
	// contains filtered or unexported fields
}

Varopt implements the algorithm from Stream sampling for variance-optimal estimation of subset sums Edith Cohen, Nick Duffield, Haim Kaplan, Carsten Lund, Mikkel Thorup 2008

https://arxiv.org/pdf/0803.0473.pdf

func New

func New[T any](capacity int, rnd *rand.Rand) *Varopt[T]

New returns a new Varopt sampler with given capacity (i.e., reservoir size) and random number generator.

Example
package main

import (
	"fmt"
	"math"
	"math/rand"

	"github.com/lightstep/varopt"
)

type packet struct {
	size     int
	color    string
	protocol string
}

func main() {
	const totalPackets = 1e6
	const sampleRatio = 0.01

	colors := []string{"red", "green", "blue"}
	protocols := []string{"http", "tcp", "udp"}

	sizeByColor := map[string]int{}
	sizeByProtocol := map[string]int{}
	trueTotalWeight := 0.0

	rnd := rand.New(rand.NewSource(32491))
	sampler := varopt.New[packet](totalPackets*sampleRatio, rnd)

	for i := 0; i < totalPackets; i++ {
		packet := packet{
			size:     1 + rnd.Intn(100000),
			color:    colors[rnd.Intn(len(colors))],
			protocol: protocols[rnd.Intn(len(protocols))],
		}

		sizeByColor[packet.color] += packet.size
		sizeByProtocol[packet.protocol] += packet.size
		trueTotalWeight += float64(packet.size)

		sampler.Add(packet, float64(packet.size))
	}

	estSizeByColor := map[string]float64{}
	estSizeByProtocol := map[string]float64{}
	estTotalWeight := 0.0

	for i := 0; i < sampler.Size(); i++ {
		packet, weight := sampler.Get(i)
		estSizeByColor[packet.color] += weight
		estSizeByProtocol[packet.protocol] += weight
		estTotalWeight += weight
	}

	// Compute mean average percentage error for colors
	colorMape := 0.0
	for _, c := range colors {
		colorMape += math.Abs(float64(sizeByColor[c])-estSizeByColor[c]) / float64(sizeByColor[c])
	}
	colorMape /= float64(len(colors))

	// Compute mean average percentage error for protocols
	protocolMape := 0.0
	for _, p := range protocols {
		protocolMape += math.Abs(float64(sizeByProtocol[p])-estSizeByProtocol[p]) / float64(sizeByProtocol[p])
	}
	protocolMape /= float64(len(protocols))

	// Compute total sum error percentage
	fmt.Printf("Total sum error %.2g%%\n", 100*math.Abs(estTotalWeight-trueTotalWeight)/trueTotalWeight)
	fmt.Printf("Color mean absolute percentage error %.2f%%\n", 100*colorMape)
	fmt.Printf("Protocol mean absolute percentage error %.2f%%\n", 100*protocolMape)

}
Output:

Total sum error 2.4e-11%
Color mean absolute percentage error 0.73%
Protocol mean absolute percentage error 1.62%

func (*Varopt[T]) Add

func (s *Varopt[T]) Add(item T, weight float64) (T, error)

Add considers a new observation for the sample with given weight. If there is an item ejected from the sample as a result, the item is returned to allow re-use of memory.

An error will be returned if the weight is either negative or NaN.

func (*Varopt[T]) Capacity

func (s *Varopt[T]) Capacity() int

Capacity returns the size of the reservoir. This is the maximum size of the sample.

func (*Varopt[T]) CopyFrom added in v1.4.0

func (s *Varopt[T]) CopyFrom(from *Varopt[T])

CopyFrom copies the fields of `from` into this Varopt[T].

func (*Varopt[T]) Get

func (s *Varopt[T]) Get(i int) (T, float64)

Get() returns the i'th sample and its adjusted weight. To obtain the sample's original weight (i.e. what was passed to Add), use GetOriginalWeight(i).

func (*Varopt[T]) GetOriginalWeight

func (s *Varopt[T]) GetOriginalWeight(i int) float64

GetOriginalWeight returns the original input weight of the sample item that was passed to Add(). This can be useful for computing a frequency from the adjusted sample weight.

Example

This example shows how to use Varopt sampling to estimate frequencies with the use of inverse probability weights. The use of inverse probability creates a uniform expected value, in this of the number of sample points per second.

While the number of expected points per second is uniform, the output sample weights are expected to match the original frequencies.

package main

import (
	"fmt"
	"math"
	"math/rand"

	"github.com/lightstep/varopt"
)

type curve struct {
	color  string
	mean   float64
	stddev float64
}

type testPoint struct {
	color  int
	xvalue float64
}

var colors = []curve{
	{color: "red", mean: 10, stddev: 15},
	{color: "green", mean: 30, stddev: 10},
	{color: "blue", mean: 50, stddev: 20},
}

// This example shows how to use Varopt sampling to estimate
// frequencies with the use of inverse probability weights.  The use
// of inverse probability creates a uniform expected value, in this of
// the number of sample points per second.
//
// While the number of expected points per second is uniform, the
// output sample weights are expected to match the original
// frequencies.
func main() {
	// Number of points.
	const totalCount = 1e6

	// Relative size of the sample.
	const sampleRatio = 0.01

	// Ensure this test is deterministic.
	rnd := rand.New(rand.NewSource(104729))

	// Construct a timeseries consisting of three colored signals,
	// for x=0 to x=60 seconds.
	var points []testPoint

	// origCounts stores the original signals at second granularity.
	origCounts := make([][]int, len(colors))
	for i := range colors {
		origCounts[i] = make([]int, 60)
	}

	// Construct the signals by choosing a random color, then
	// using its Gaussian to compute a timestamp.
	for len(points) < totalCount {
		choose := rnd.Intn(len(colors))
		series := colors[choose]
		xvalue := rnd.NormFloat64()*series.stddev + series.mean

		if xvalue < 0 || xvalue > 60 {
			continue
		}
		origCounts[choose][int(math.Floor(xvalue))]++
		points = append(points, testPoint{
			color:  choose,
			xvalue: xvalue,
		})
	}

	// Compute the total number of points per second.  This will be
	// used to establish the per-second probability.
	xcount := make([]int, 60)
	for _, point := range points {
		xcount[int(math.Floor(point.xvalue))]++
	}

	// Compute the sample with using the inverse probability as a
	// weight.  This ensures a uniform distribution of points in each
	// second.
	sampleSize := int(sampleRatio * float64(totalCount))
	sampler := varopt.New[testPoint](sampleSize, rnd)
	for _, point := range points {
		second := int(math.Floor(point.xvalue))
		prob := float64(xcount[second]) / float64(totalCount)
		sampler.Add(point, 1/prob)
	}

	// sampleCounts stores the reconstructed signals.
	sampleCounts := make([][]float64, len(colors))
	for i := range colors {
		sampleCounts[i] = make([]float64, 60)
	}

	// pointCounts stores the number of points per second.
	pointCounts := make([]int, 60)

	// Reconstruct the signals using the output sample weights.
	// The effective count of each sample point is its output
	// weight divided by its original weight.
	for i := 0; i < sampler.Size(); i++ {
		point, weight := sampler.Get(i)
		origWeight := sampler.GetOriginalWeight(i)
		second := int(math.Floor(point.xvalue))
		sampleCounts[point.color][second] += (weight / origWeight)
		pointCounts[second]++
	}

	// Compute standard deviation of sample points per second.
	sum := 0.0
	mean := float64(sampleSize) / 60
	for s := 0; s < 60; s++ {
		e := float64(pointCounts[s]) - mean
		sum += e * e
	}
	stddev := math.Sqrt(sum / (60 - 1))

	fmt.Printf("Samples per second mean %.2f\n", mean)
	fmt.Printf("Samples per second standard deviation %.2f\n", stddev)

	// Compute mean absolute percentage error between sampleCounts
	// and origCounts for each signal.
	for c := range colors {
		mae := 0.0
		for s := 0; s < 60; s++ {
			mae += math.Abs(sampleCounts[c][s]-float64(origCounts[c][s])) / float64(origCounts[c][s])
		}
		mae /= 60
		fmt.Printf("Mean absolute percentage error (%s) = %.2f%%\n", colors[c].color, mae*100)
	}

}
Output:

Samples per second mean 166.67
Samples per second standard deviation 13.75
Mean absolute percentage error (red) = 25.16%
Mean absolute percentage error (green) = 14.30%
Mean absolute percentage error (blue) = 14.23%

func (*Varopt[T]) Init added in v1.4.0

func (v *Varopt[T]) Init(capacity int, rnd *rand.Rand)

Init initializes a Varopt[T] in-place, avoiding an allocation compared with New().

func (*Varopt[T]) Reset added in v1.2.0

func (s *Varopt[T]) Reset()

Reset returns the sampler to its initial state, maintaining its capacity and random number source.

func (*Varopt[T]) Size

func (s *Varopt[T]) Size() int

Size returns the current number of items in the sample. If the reservoir is full, this returns Capacity().

func (*Varopt[T]) Tau

func (s *Varopt[T]) Tau() float64

Tau returns the current large-weight threshold. Weights larger than Tau() carry their exact weight in the sample. See the VarOpt paper for details.

func (*Varopt[T]) TotalCount

func (s *Varopt[T]) TotalCount() int

TotalCount returns the number of calls to Add().

func (*Varopt[T]) TotalWeight

func (s *Varopt[T]) TotalWeight() float64

TotalWeight returns the sum of weights that were passed to Add().

Directories

Path Synopsis
Package simple implements an unweighted reservoir sampling algorithm.
Package simple implements an unweighted reservoir sampling algorithm.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL