priority

package
v1.5.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 4, 2024 License: MIT Imports: 10 Imported by: 0

README

Priority discipline

Purpose

Used to distributes data among handlers according to priority

Also may be used to equaling distribution of data with different processing times

Principle of operation

  • Prioritization:

    Principle of operation, prioritization

  • Equaling:

    Principle of operation, equaling

Comparison with unmanaged distribution

If different times are spent processing data of different priorities, then we will get different processing speeds in the case of using the priority discipline and without it.

For example, suppose that data from channel of priority 3 is processed in time T, data from channel of priority 2 is processed in time 5*T, and data from channel of priority 1 is processed in time 10*T, then we will get the following results:

  • equaling by priority discipline:

    Equaling by priority discipline

  • unmanaged distribution:

    Unmanaged distribution

It can be seen that with unmanaged distribution, the processing speed of data with priority 3 is limited by the slowest processed data (with priority 1 and 2), but at with equaling by priority discipline the processing speed of data with priority 3 is no limited by others priorities

Usage

Example:

package main

import (
    "fmt"
    "strconv"
    "sync"

    "github.com/akramarenkov/cqos/priority"
)

func main() {
    handlersQuantity := 100
    // Preferably input channels should be buffered
    inputCapacity := 10
    itemsQuantity := 100

    inputs := map[uint]chan string{
        3: make(chan string, inputCapacity),
        2: make(chan string, inputCapacity),
        1: make(chan string, inputCapacity),
    }

    // Map key is a value of priority
    inputsOpts := map[uint]<-chan string{
        3: inputs[3],
        2: inputs[2],
        1: inputs[1],
    }

    // Data from input channels passed to handlers by output channel
    output := make(chan priority.Prioritized[string])

    // Handlers must write priority of processed data to feedback channel after it has been processed
    feedback := make(chan uint)
    defer close(feedback)

    // Used only in this example for detect that all written data are processed
    measures := make(chan string)
    defer close(measures)

    // For equaling use FairDivider, for prioritization use
    // RateDivider or custom divider
    opts := priority.Opts[string]{
        Divider:          priority.RateDivider,
        Feedback:         feedback,
        HandlersQuantity: uint(handlersQuantity),
        Inputs:           inputsOpts,
        Output:           output,
    }

    discipline, err := priority.New(opts)
    if err != nil {
        panic(err)
    }

    defer discipline.Stop()

    wg := &sync.WaitGroup{}
    defer wg.Wait()

    // Run writers, that write data to input channels
    for priority, input := range inputs {
        wg.Add(1)

        go func(precedency uint, channel chan string) {
            defer wg.Done()
            defer close(channel)

            base := strconv.Itoa(int(precedency))

            for id := 0; id < itemsQuantity; id++ {
                item := base + ":" + strconv.Itoa(id)

                channel <- item
            }
        }(priority, input)
    }

    // Run handlers, that process data
    for handler := 0; handler < handlersQuantity; handler++ {
        wg.Add(1)

        go func() {
            defer wg.Done()

            for prioritized := range output {
                // Data processing
                measures <- prioritized.Item

                // Handler must indicate that current data has been processed and
                // handler is ready to receive new data
                feedback <- prioritized.Priority
            }
        }()
    }

    // Terminate handlers
    defer close(output)

    received := 0

    // Wait for process all written data
    for range measures {
        received++

        if received == itemsQuantity*len(inputs) {
            break
        }
    }

    fmt.Println("Processed items quantity:", received)
    // Output: Processed items quantity: 300
}

Documentation

Overview

Discipline that used to distributes data among handlers according to priority.

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	ErrEmptyDivider         = errors.New("priorities divider was not specified")
	ErrEmptyFeedback        = errors.New("feedback channel was not specified")
	ErrEmptyOutput          = errors.New("output channel was not specified")
	ErrHandlersQuantityZero = errors.New("handlers quantity is zero")
	ErrQuantityExceeded     = errors.New("value of handlers quantity has been exceeded")
)
View Source
var (
	ErrEmptyHandle = errors.New("handle function was not specified")
	ErrEmptyInput  = errors.New("input channels was not specified")
)
View Source
var (
	ErrDividerBad = errors.New("divider produces an incorrect distribution")
)

Functions

func FairDivider added in v1.0.0

func FairDivider(priorities []uint, dividend uint, distribution map[uint]uint) map[uint]uint

Distributes quantity evenly among the priorities.

Used for equaling.

Example results:

  • 6 / [3 2 1] = map[3:2, 2:2, 1:2]
  • 100 / [70 20 10] = map[70:34, 20:33, 10:33]

func IsNonFatalConfig added in v1.2.0

func IsNonFatalConfig(
	priorities []uint,
	divider Divider,
	quantity uint,
) bool

Due to the imperfection of the dividing function and working with integers (since the quantity of handlers is an integer), large errors can occur when distributing handlers by priority, especially for small quantity of handlers. This function allows you to determine that with the specified combination of priorities, the dividing function and the quantity of handlers, the distribution error does not cause stop processing of one or more priorities (for none of the priorities, the quantity is not equal to zero).

func IsSuitableConfig added in v1.2.0

func IsSuitableConfig(
	priorities []uint,
	divider Divider,
	quantity uint,
	limit float64,
) bool

Due to the imperfection of the dividing function and working with integers (since the quantity of handlers is an integer), large errors can occur when distributing handlers by priority, especially for small quantity of handlers. This function allows you to determine that with the specified combination of priorities, the dividing function and the quantity of handlers, the distribution error does not exceed the limit, specified as a percentage.

func PickUpMaxNonFatalQuantity added in v1.2.0

func PickUpMaxNonFatalQuantity(
	priorities []uint,
	divider Divider,
	maxQuantity uint,
) uint

Picks up the maximum quantity of handlers for which the division error does not cause stop processing of one or more priorities.

func PickUpMaxSuitableQuantity added in v1.2.0

func PickUpMaxSuitableQuantity(
	priorities []uint,
	divider Divider,
	maxQuantity uint,
	limit float64,
) uint

Picks up the maximum quantity of handlers for which the division error does not exceed the limit, specified as a percentage.

func PickUpMinNonFatalQuantity added in v1.2.0

func PickUpMinNonFatalQuantity(
	priorities []uint,
	divider Divider,
	maxQuantity uint,
) uint

Picks up the minimum quantity of handlers for which the division error does not cause stop processing of one or more priorities.

func PickUpMinSuitableQuantity added in v1.2.0

func PickUpMinSuitableQuantity(
	priorities []uint,
	divider Divider,
	maxQuantity uint,
	limit float64,
) uint

Picks up the minimum quantity of handlers for which the division error does not exceed the limit, specified as a percentage.

func RateDivider added in v1.0.0

func RateDivider(priorities []uint, dividend uint, distribution map[uint]uint) map[uint]uint

Distributes quantity between priorities in proportion to the priority value.

Used for prioritization.

Example results:

  • 6 / [3 2 1] = map[3:3, 2:2, 1:1]
  • 100 / [70 20 10] = map[70:70, 20:20, 10:10]

Types

type Discipline

type Discipline[Type any] struct {
	// contains filtered or unexported fields
}

Prioritization discipline.

Preferably input channels should be buffered for performance reasons.

Data from input channels passed to handlers by output channel.

Handlers must write priority of processed data to feedback channel after it has been processed.

For equaling use FairDivider, for prioritization use RateDivider or custom divider.

Example
package main

import (
	"fmt"
	"strconv"
	"sync"

	"github.com/akramarenkov/cqos/priority"
)

func main() {
	handlersQuantity := 100
	// Preferably input channels should be buffered
	inputCapacity := 10
	itemsQuantity := 100

	inputs := map[uint]chan string{
		3: make(chan string, inputCapacity),
		2: make(chan string, inputCapacity),
		1: make(chan string, inputCapacity),
	}

	// Map key is a value of priority
	inputsOpts := map[uint]<-chan string{
		3: inputs[3],
		2: inputs[2],
		1: inputs[1],
	}

	// Data from input channels passed to handlers by output channel
	output := make(chan priority.Prioritized[string])

	// Handlers must write priority of processed data to feedback channel after it has been processed
	feedback := make(chan uint)
	defer close(feedback)

	// Used only in this example for detect that all written data are processed
	measures := make(chan string)
	defer close(measures)

	// For equaling use FairDivider, for prioritization use
	// RateDivider or custom divider
	opts := priority.Opts[string]{
		Divider:          priority.RateDivider,
		Feedback:         feedback,
		HandlersQuantity: uint(handlersQuantity),
		Inputs:           inputsOpts,
		Output:           output,
	}

	discipline, err := priority.New(opts)
	if err != nil {
		panic(err)
	}

	defer discipline.Stop()

	wg := &sync.WaitGroup{}
	defer wg.Wait()

	// Run writers, that write data to input channels
	for priority, input := range inputs {
		wg.Add(1)

		go func(precedency uint, channel chan string) {
			defer wg.Done()
			defer close(channel)

			base := strconv.Itoa(int(precedency))

			for id := 0; id < itemsQuantity; id++ {
				item := base + ":" + strconv.Itoa(id)

				channel <- item
			}
		}(priority, input)
	}

	// Run handlers, that process data
	for handler := 0; handler < handlersQuantity; handler++ {
		wg.Add(1)

		go func() {
			defer wg.Done()

			for prioritized := range output {
				// Data processing
				measures <- prioritized.Item

				// Handler must indicate that current data has been processed and
				// handler is ready to receive new data
				feedback <- prioritized.Priority
			}
		}()
	}

	// Terminate handlers
	defer close(output)

	received := 0

	// Wait for process all written data
	for range measures {
		received++

		if received == itemsQuantity*len(inputs) {
			break
		}
	}

	fmt.Println("Processed items quantity:", received)
}
Output:

Processed items quantity: 300

func New

func New[Type any](opts Opts[Type]) (*Discipline[Type], error)

Creates and runs discipline.

func (*Discipline[Type]) AddInput

func (dsc *Discipline[Type]) AddInput(channel <-chan Type, priority uint)

Adds or updates (if it added previously) input channel for specified priority.

func (*Discipline[Type]) Err added in v1.3.0

func (dsc *Discipline[Type]) Err() <-chan error

Returns a channel with errors. If an error occurs (the value from the channel is not equal to nil) the discipline terminates its work. The most likely cause of the error is an incorrectly working dividing function in which the sum of the distributed quantities is not equal to the original quantity.

The single nil value means that the discipline has terminated in normal mode.

If you are sure that the divider is working correctly, then you don’t have to read from this channel and you don’t have to check the received value.

func (*Discipline[Type]) GracefulStop added in v1.4.0

func (dsc *Discipline[Type]) GracefulStop()

Graceful terminates work of the discipline.

Waits draining input channels, waits end processing data in handlers and terminates.

You must end write to input channels and close them (or remove), otherwise graceful stop not be ended.

Example
package main

import (
	"fmt"
	"strconv"
	"sync"

	"github.com/akramarenkov/cqos/priority"
)

func main() {
	handlersQuantity := 100
	// Preferably input channels should be buffered
	inputCapacity := 10
	itemsQuantity := 100

	inputs := map[uint]chan string{
		3: make(chan string, inputCapacity),
		2: make(chan string, inputCapacity),
		1: make(chan string, inputCapacity),
	}

	// Map key is a value of priority
	inputsOpts := map[uint]<-chan string{
		3: inputs[3],
		2: inputs[2],
		1: inputs[1],
	}

	// Data from input channels passed to handlers by output channel
	output := make(chan priority.Prioritized[string])

	// Handlers must write priority of processed data to feedback channel after it has been processed
	feedback := make(chan uint)
	defer close(feedback)

	// Used only in this example for detect that all written data are processed
	measures := make(chan string)
	defer close(measures)

	// For equaling use FairDivider, for prioritization use
	// RateDivider or custom divider
	opts := priority.Opts[string]{
		Divider:          priority.RateDivider,
		Feedback:         feedback,
		HandlersQuantity: uint(handlersQuantity),
		Inputs:           inputsOpts,
		Output:           output,
	}

	discipline, err := priority.New(opts)
	if err != nil {
		panic(err)
	}

	wg := &sync.WaitGroup{}
	defer wg.Wait()

	// Run writers, that write data to input channels
	for priority, input := range inputs {
		wg.Add(1)

		go func(precedency uint, channel chan string) {
			defer wg.Done()
			defer close(channel)

			base := strconv.Itoa(int(precedency))

			for id := 0; id < itemsQuantity; id++ {
				item := base + ":" + strconv.Itoa(id)

				channel <- item
			}
		}(priority, input)
	}

	// Run handlers, that process data
	for handler := 0; handler < handlersQuantity; handler++ {
		wg.Add(1)

		go func() {
			defer wg.Done()

			for prioritized := range output {
				// Data processing
				measures <- prioritized.Item

				feedback <- prioritized.Priority
			}
		}()
	}

	// Terminate handlers
	defer close(output)

	obtained := make(chan int)

	go func() {
		defer close(obtained)

		received := 0

		// Wait for process all written data
		for range measures {
			received++

			if received == itemsQuantity*len(inputs) {
				obtained <- received
				return
			}
		}
	}()

	discipline.GracefulStop()

	fmt.Println("Processed items quantity:", <-obtained)
}
Output:

Processed items quantity: 300

func (*Discipline[Type]) RemoveInput

func (dsc *Discipline[Type]) RemoveInput(priority uint)

Removes input channel for specified priority.

func (*Discipline[Type]) Stop

func (dsc *Discipline[Type]) Stop()

Roughly terminates work of the discipline.

Use for wait completion at terminates via context.

type Divider added in v1.0.0

type Divider func(priorities []uint, dividend uint, distribution map[uint]uint) map[uint]uint

Distributes quantity of something by priorities. Determines how handlers are distributed among priorities.

Slice of priorities is passed to this function sorted from highest to lowest.

Sum of the distributed quantities must equal the original quantity.

If distribution is nil then it must be created and returned, otherwise it must be updated and returned.

type Handle added in v1.2.0

type Handle[Type any] func(ctx context.Context, item Type)

Callback function called in handlers when an item is received.

Function should be interrupted when context is canceled.

type Opts

type Opts[Type any] struct {
	// Roughly terminates (cancels) work of the discipline
	Ctx context.Context
	// Determines how handlers are distributed among priorities
	Divider Divider
	// Handlers must write priority of processed data to feedback channel after it has been processed
	Feedback <-chan uint
	// Between how many handlers you need to distribute data
	HandlersQuantity uint
	// Channels with input data, should be buffered for performance reasons
	// Map key is a value of priority
	// For graceful termination need close all input channels or remove them
	Inputs map[uint]<-chan Type
	// Handlers should read distributed data from this channel
	Output chan<- Prioritized[Type]
}

Options of the created discipline.

type Prioritized added in v1.0.0

type Prioritized[Type any] struct {
	Item     Type
	Priority uint
}

Describes the data distributed by the prioritization discipline.

type Simple added in v1.2.0

type Simple[Type any] struct {
	// contains filtered or unexported fields
}

Simplified version of the discipline that runs handlers on its own and hides the output and feedback channels.

Preferably input channels should be buffered for performance reasons.

For equaling use divider.Fair divider, for prioritization use divider.Rate divider or custom divider.

Example
package main

import (
	"context"
	"fmt"
	"strconv"
	"sync"

	"github.com/akramarenkov/cqos/priority"
)

func main() {
	handlersQuantity := 100
	// Preferably input channels should be buffered
	inputCapacity := 10
	itemsQuantity := 100

	inputs := map[uint]chan string{
		3: make(chan string, inputCapacity),
		2: make(chan string, inputCapacity),
		1: make(chan string, inputCapacity),
	}

	// Map key is a value of priority
	inputsOpts := map[uint]<-chan string{
		3: inputs[3],
		2: inputs[2],
		1: inputs[1],
	}

	// Used only in this example for detect that all written data are processed
	measures := make(chan string)
	defer close(measures)

	handle := func(ctx context.Context, item string) {
		// Data processing
		select {
		case <-ctx.Done():
		case measures <- item:
		}
	}

	// For equaling use FairDivider, for prioritization use
	// RateDivider or custom divider
	opts := priority.SimpleOpts[string]{
		Divider:          priority.RateDivider,
		Handle:           handle,
		HandlersQuantity: uint(handlersQuantity),
		Inputs:           inputsOpts,
	}

	simple, err := priority.NewSimple(opts)
	if err != nil {
		panic(err)
	}

	defer simple.Stop()

	wg := &sync.WaitGroup{}
	defer wg.Wait()

	// Run writers, that write data to input channels
	for priority, input := range inputs {
		wg.Add(1)

		go func(precedency uint, channel chan string) {
			defer wg.Done()
			defer close(channel)

			base := strconv.Itoa(int(precedency))

			for id := 0; id < itemsQuantity; id++ {
				item := base + ":" + strconv.Itoa(id)

				channel <- item
			}
		}(priority, input)
	}

	received := 0

	// Wait for process all written data
	for range measures {
		received++

		if received == itemsQuantity*len(inputs) {
			break
		}
	}

	fmt.Println("Processed items quantity:", received)
}
Output:

Processed items quantity: 300

func NewSimple added in v1.2.0

func NewSimple[Type any](opts SimpleOpts[Type]) (*Simple[Type], error)

Creates and runs discipline.

func (*Simple[Type]) Err added in v1.3.0

func (smpl *Simple[Type]) Err() <-chan error

Returns a channel with errors. If an error occurs (the value from the channel is not equal to nil) the discipline terminates its work. The most likely cause of the error is an incorrectly working dividing function in which the sum of the distributed quantities is not equal to the original quantity.

The single nil value means that the discipline has terminated in normal mode.

func (*Simple[Type]) GracefulStop added in v1.4.0

func (smpl *Simple[Type]) GracefulStop()

Graceful terminates work of the discipline.

Waits draining input channels, waits end processing data in handlers and terminates.

You must end write to input channels and close them, otherwise graceful stop not be ended.

Example
package main

import (
	"context"
	"fmt"
	"strconv"
	"sync"

	"github.com/akramarenkov/cqos/priority"
)

func main() {
	handlersQuantity := 100
	// Preferably input channels should be buffered
	inputCapacity := 10
	itemsQuantity := 100

	inputs := map[uint]chan string{
		3: make(chan string, inputCapacity),
		2: make(chan string, inputCapacity),
		1: make(chan string, inputCapacity),
	}

	// Map key is a value of priority
	inputsOpts := map[uint]<-chan string{
		3: inputs[3],
		2: inputs[2],
		1: inputs[1],
	}

	// Used only in this example for detect that all written data are processed
	measures := make(chan string)
	defer close(measures)

	handle := func(ctx context.Context, item string) {
		// Data processing
		select {
		case <-ctx.Done():
		case measures <- item:
		}
	}

	// For equaling use FairDivider, for prioritization use
	// RateDivider or custom divider
	opts := priority.SimpleOpts[string]{
		Divider:          priority.RateDivider,
		Handle:           handle,
		HandlersQuantity: uint(handlersQuantity),
		Inputs:           inputsOpts,
	}

	simple, err := priority.NewSimple(opts)
	if err != nil {
		panic(err)
	}

	wg := &sync.WaitGroup{}
	defer wg.Wait()

	// Run writers, that write data to input channels
	for priority, input := range inputs {
		wg.Add(1)

		go func(precedency uint, channel chan string) {
			defer wg.Done()
			defer close(channel)

			base := strconv.Itoa(int(precedency))

			for id := 0; id < itemsQuantity; id++ {
				item := base + ":" + strconv.Itoa(id)

				channel <- item
			}
		}(priority, input)
	}

	obtained := make(chan int)

	go func() {
		defer close(obtained)

		received := 0

		// Wait for process all written data
		for range measures {
			received++

			if received == itemsQuantity*len(inputs) {
				obtained <- received
				return
			}
		}
	}()

	simple.GracefulStop()

	fmt.Println("Processed items quantity:", <-obtained)
}
Output:

Processed items quantity: 300

func (*Simple[Type]) Stop added in v1.2.0

func (smpl *Simple[Type]) Stop()

Roughly terminates work of the discipline.

Use for wait completion at terminates via context.

type SimpleOpts added in v1.2.0

type SimpleOpts[Type any] struct {
	// Roughly terminates (cancels) work of the discipline
	Ctx context.Context
	// Determines how handlers are distributed among priorities
	Divider Divider
	// Callback function called in handlers when an item is received
	Handle Handle[Type]
	// Between how many handlers you need to distribute data
	HandlersQuantity uint
	// Channels with input data, should be buffered for performance reasons
	// Map key is a value of priority
	// For graceful termination need close all input channels
	Inputs map[uint]<-chan Type
}

Options of the created discipline.

Directories

Path Synopsis
internal
common
Internal package with functions and data types used in other packages.
Internal package with functions and data types used in other packages.
starter
Internal package with implementation of the Starter which is used to run multiple goroutines at the same time.
Internal package with implementation of the Starter which is used to run multiple goroutines at the same time.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL