join

package
v2.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 26, 2023 License: MIT Imports: 3 Imported by: 0

README

Join discipline

Purpose

Accumulates elements from the input channel into a slice and writes it to the output channel when the size or timeout is reached

Works in two modes:

  1. Making a copy of the slice before writing it to the output channel

  2. Writes to the output channel of the accumulated slice without copying, in this case it is necessary to inform the discipline that the slice is no longer used by writing to the Released channel

Usage

Example:

package main

import (
    "fmt"

    "github.com/akramarenkov/cqos/v2/join"
)

func main() {
    quantity := 27

    input := make(chan int)

    opts := join.Opts[int]{
        Input:    input,
        JoinSize: 5,
        Timeout:  10 * time.Second,
    }

    discipline, err := join.New(opts)
    if err != nil {
        panic(err)
    }

    go func() {
        defer close(input)

        for stage := 1; stage <= quantity; stage++ {
            input <- stage
        }
    }()

    outSequence := make([]int, 0, quantity)

    for slice := range discipline.Output() {
        outSequence = append(outSequence, slice...)
    }

    fmt.Println(outSequence)
    // Output:[1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27]
}

Documentation

Overview

Discipline that used to accumulates elements from the input channel into a slice and writes it to the output channel when the size or timeout is reached

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	ErrTimeoutInaccuracyTooBig = errors.New("timeout inaccuracy is too big")
	ErrTimeoutInaccuracyZero   = errors.New("timeout inaccuracy is zero")
	ErrTimeoutTooSmall         = errors.New("timeout value is too small")
)
View Source
var (
	ErrInputEmpty   = errors.New("input channel was not specified")
	ErrJoinSizeZero = errors.New("join size is zero")
)

Functions

This section is empty.

Types

type Discipline

type Discipline[Type any] struct {
	// contains filtered or unexported fields
}

Join discipline

Example
package main

import (
	"fmt"
	"time"

	"github.com/akramarenkov/cqos/v2/join"
)

func main() {
	quantity := 27

	input := make(chan int)

	opts := join.Opts[int]{
		Input:    input,
		JoinSize: 5,
		Timeout:  10 * time.Second,
	}

	discipline, err := join.New(opts)
	if err != nil {
		panic(err)
	}

	go func() {
		defer close(input)

		for stage := 1; stage <= quantity; stage++ {
			input <- stage
		}
	}()

	outSequence := make([]int, 0, quantity)

	for slice := range discipline.Output() {
		outSequence = append(outSequence, slice...)
	}

	fmt.Println(outSequence)
}
Output:

[1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27]

func New

func New[Type any](opts Opts[Type]) (*Discipline[Type], error)

Creates and runs discipline

func (*Discipline[Type]) Output

func (dsc *Discipline[Type]) Output() <-chan []Type

Returns output channel.

If this channel is closed, it means that the discipline is terminated

func (*Discipline[Type]) Release

func (dsc *Discipline[Type]) Release()

Marks accumulated slice as no longer used.

Must be used only if NoCopy option is set to true

type Opts

type Opts[Type any] struct {
	// Input data channel. For terminate discipline it is necessary and sufficient to
	// close the input channel
	Input <-chan Type
	// Output slice size
	JoinSize uint
	// By default, to the output channel is written a copy of the accumulated slice
	// If the NoCopy is set to true, then to the output channel will be directly
	// written the accumulated slice
	// In this case, after the accumulated slice is no longer used it is necessary to
	// inform the discipline about it by calling Release()
	NoCopy bool
	// Send timeout of accumulated slice. A zero or negative value means that no data is
	// written to the output channel after the time has elapsed
	Timeout time.Duration
	// Due to the fact that it is not possible to reliably reset the timer/ticker
	// (without false ticks), a ticker with a duration several times shorter than
	// the timeout is used and to determine the expiration of the timeout,
	// the current time is compared with the time of the last recording to
	// the output channel. This method has an inaccuracy that can be set by
	// this parameter in percents
	TimeoutInaccuracy uint
}

Options of the created discipline

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL