priority

package
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 24, 2023 License: MIT Imports: 6 Imported by: 0

README

Priority discipline

Purpose

Used to distributes data among handlers according to priority

Also may be used to equaling distribution of data with different processing times

Principle of operation

  • Prioritization:

    Principle of operation, prioritization

  • Equaling:

    Principle of operation, equaling

Comparison with unmanaged distribution

If different times are spent processing data of different priorities, then we will get different processing speeds in the case of using the priority discipline and without it.

For example, suppose that data from channel of priority 3 is processed in time T, data from channel of priority 2 is processed in time 5*T, and data from channel of priority 1 is processed in time 10*T, then we will get the following results:

  • equaling by priority discipline:

    Equaling by priority discipline

  • unmanaged distribution:

    Unmanaged distribution

It can be seen that with unmanaged distribution, the processing speed of data with priority 3 is limited by the slowest processed data (with priority 1 and 2), but at with equaling by priority discipline the processing speed of data with priority 3 is no limited by others priorities

Usage

Example:

package main

import (
    "fmt"
    "strconv"
    "sync"

    "github.com/akramarenkov/cqos/priority"
    "github.com/akramarenkov/cqos/priority/divider"
    "github.com/akramarenkov/cqos/types"
)

func main() {
    handlersQuantity := 100
    // Preferably input channels should be buffered
    inputCapacity := 10
    itemsQuantity := 100

    inputs := map[uint]chan string{
        3: make(chan string, inputCapacity),
        2: make(chan string, inputCapacity),
        1: make(chan string, inputCapacity),
    }

    inputsOpts := map[uint]<-chan string{
        3: inputs[3],
        2: inputs[2],
        1: inputs[1],
    }

    defer func() {
        for _, input := range inputs {
            close(input)
        }
    }()

    // Data from input channels passed to handlers by output channel
    output := make(chan types.Prioritized[string])

    // Handlers must write priority of processed data to feedback channel
    feedback := make(chan uint)
    defer close(feedback)

    // Used only in this example for detect that all written data are processed
    measurements := make(chan bool)
    defer close(measurements)

    // For "Equaling" use "Fair" divider, for "Prioritization" use "Rate" divider
    disciplineOpts := priority.Opts[string]{
        Divider:          divider.Rate,
        Feedback:         feedback,
        HandlersQuantity: uint(handlersQuantity),
        Inputs:           inputsOpts,
        Output:           output,
    }

    discipline, err := priority.New(disciplineOpts)
    if err != nil {
        panic(err)
    }

    defer discipline.Stop()

    wg := &sync.WaitGroup{}
    defer wg.Wait()

    // Run "handlers", that process data
    for handler := 0; handler < handlersQuantity; handler++ {
        wg.Add(1)

        go func() {
            defer wg.Done()

            for prioritized := range output {
                // "Data processing"
                fmt.Println(prioritized.Item)
                measurements <- true

                feedback <- prioritized.Priority
            }
        }()
    }

    // Run "writers", that write data to input channels
    for priority, input := range inputs {
        wg.Add(1)

        go func(precedency uint, channel chan string) {
            defer wg.Done()

            base := strconv.Itoa(int(precedency))

            for id := 0; id < itemsQuantity; id++ {
                item := base + ":" + strconv.Itoa(id)
                
                channel <- item
            }
        }(priority, input)
    }

    defer close(output)

    received := 0

    // Wait for process all written data
    for range measurements {
        received++

        if received == itemsQuantity*len(inputs) {
            return
        }
    }
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrEmptyDivider  = errors.New("priorities divider was not specified")
	ErrEmptyFeedback = errors.New("feedback channel was not specified")
	ErrEmptyOutput   = errors.New("output channel was not specified")
)

Functions

This section is empty.

Types

type Discipline

type Discipline[Type any] struct {
	// contains filtered or unexported fields
}

func New

func New[Type any](opts Opts[Type]) (*Discipline[Type], error)

Creates and runs main prioritization discipline Preferably input channels should be buffered Data from input channels passed to handlers by output channel Handlers must write priority of processed data to feedback channel For "Equaling" use "Fair" divider, for "Prioritization" use "Rate" divider

func (*Discipline[Type]) AddInput

func (dsc *Discipline[Type]) AddInput(channel chan Type, priority uint)

Adds or updates (if it added previously) input channel for specified priority

func (*Discipline[Type]) RemoveInput

func (dsc *Discipline[Type]) RemoveInput(priority uint)

Removes input channel for specified priority

func (*Discipline[Type]) Stop

func (dsc *Discipline[Type]) Stop()

Terminates work of the discipline

type Opts

type Opts[Type any] struct {
	Divider          divider.Divider
	Feedback         <-chan uint
	HandlersQuantity uint
	Inputs           map[uint]<-chan Type
	Output           chan<- types.Prioritized[Type]
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL