limit

package
v2.0.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 4, 2024 License: MIT Imports: 5 Imported by: 0

README

Limit discipline

Purpose

Limits the speed of passing data elements from the input channel to the output channel

The speed limit is set by the Rate structure, in which the Quantity field specifies the number of data elements that must pass over the time interval specified in the Interval field

Peculiarities

As we know, the speed of 1000 data elements per second is, in fact, the same speed as 1 data element per millisecond specified in different units of measurement

However, the units of measurement affect the distribution of data elements written to the output channel over time and the performance of the discipline

If the speed is specified as 1000 data elements per second, first 1000 data elements will be written to the output channel, and then a pause will be made equal to 1 second minus the time spent writing 1000 data elements

If the speed is specified in the form of 1 data element per millisecond, first 1 data element will be written to the output channel, and then a pause will be made equal to 1 millisecond minus the time spent on writing 1 data element

However, the performance of the discipline if the speed is specified in the form of 1 data element per millisecond will be lower but the uniformity of the distribution of data elements over time will be higher

Thus, when choosing units of measurement, you can balance between the uniform distribution of data elements over time and performance (the maximum achievable speed)

Based on measurements, specifying a time interval of less than 10 milliseconds significantly reduces performance and accuracy

Usage

Example:

package main

import (
    "fmt"
    "time"

    "github.com/akramarenkov/cqos/v2/limit"
)

func main() {
    quantity := 10

    input := make(chan int)

    opts := limit.Opts[int]{
        Input: input,
        Limit: limit.Rate{
            Interval: time.Second,
            Quantity: 1,
        },
    }

    discipline, err := limit.New(opts)
    if err != nil {
        panic(err)
    }

    outSequence := make([]int, 0, quantity)

    startedAt := time.Now()

    go func() {
        defer close(input)

        for stage := 1; stage <= quantity; stage++ {
            input <- stage
        }
    }()

    for item := range discipline.Output() {
        outSequence = append(outSequence, item)
    }

    duration := time.Since(startedAt)
    expected := (time.Duration(quantity) * opts.Limit.Interval) / time.Duration(opts.Limit.Quantity)
    deviation := 0.01

    fmt.Println(duration <= time.Duration(float64(expected)*(1.0+deviation)))
    fmt.Println(duration >= time.Duration(float64(expected)*(1.0-deviation)))
    fmt.Println(outSequence)
    // Output:
    // true
    // true
    // [1 2 3 4 5 6 7 8 9 10]
}

Documentation

Overview

Discipline that used to limits the speed of passing data elements from the input channel to the output channel.

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	ErrConvertedIntervalZero   = errors.New("converted interval is zero")
	ErrConvertedQuantityZero   = errors.New("converted quantity is zero")
	ErrIntervalZeroNegative    = errors.New("interval is zero or negative")
	ErrMinimumIntervalNegative = errors.New("minimum interval is negative")
	ErrQuantityZero            = errors.New("quantity is zero")
)
View Source
var (
	ErrInputEmpty = errors.New("input channel was not specified")
)

Functions

This section is empty.

Types

type Discipline

type Discipline[Type any] struct {
	// contains filtered or unexported fields
}

Limit discipline.

Example
package main

import (
	"fmt"
	"time"

	"github.com/akramarenkov/cqos/v2/limit"
)

func main() {
	quantity := 10

	input := make(chan int)

	opts := limit.Opts[int]{
		Input: input,
		Limit: limit.Rate{
			Interval: time.Second,
			Quantity: 1,
		},
	}

	discipline, err := limit.New(opts)
	if err != nil {
		panic(err)
	}

	outSequence := make([]int, 0, quantity)

	startedAt := time.Now()

	go func() {
		defer close(input)

		for stage := 1; stage <= quantity; stage++ {
			input <- stage
		}
	}()

	for item := range discipline.Output() {
		outSequence = append(outSequence, item)
	}

	duration := time.Since(startedAt)
	expected := (time.Duration(quantity) * opts.Limit.Interval) / time.Duration(opts.Limit.Quantity)
	deviation := 0.01

	fmt.Println(duration <= time.Duration(float64(expected)*(1.0+deviation)))
	fmt.Println(duration >= time.Duration(float64(expected)*(1.0-deviation)))
	fmt.Println(outSequence)
}
Output:

true
true
[1 2 3 4 5 6 7 8 9 10]

func New

func New[Type any](opts Opts[Type]) (*Discipline[Type], error)

Creates and runs discipline.

func (*Discipline[Type]) Output

func (dsc *Discipline[Type]) Output() <-chan Type

Returns output channel.

If this channel is closed, it means that the discipline is terminated.

type Opts

type Opts[Type any] struct {
	// Input data channel. For terminate discipline it is necessary and sufficient to
	// close the input channel
	Input <-chan Type
	// Rate limit
	Limit Rate
}

Options of the created discipline.

type Rate

type Rate struct {
	Interval time.Duration
	Quantity uint64
}

Quantity data elements per time Interval.

func (Rate) Flatten

func (rate Rate) Flatten() (Rate, error)

Recalculates the units of measurement of the Interval so that the Quantity is equal to 1.

Maximizes the uniformity of the distribution of data elements over time by reducing the productivity of the discipline.

func (Rate) IsValid

func (rate Rate) IsValid() error

Validates field values.

Interval cannot be negative or equal to zero.

Quantity cannot be equal to zero.

func (Rate) Optimize

func (rate Rate) Optimize() (Rate, error)

Recalculates the units of measurement of the interval so that the Quantity is as small as possible but the Interval is not less than the recommended value.

Increases the uniformity of the distribution of data elements over time, almost without reducing the productivity of the discipline.

Directories

Path Synopsis
internal
research
Internal package with research functions that are used for testing.
Internal package with research functions that are used for testing.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL