Documentation ¶
Overview ¶
Discipline that used to accumulates elements from the input channel into a slice and writes it to the output channel when the size or timeout is reached.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var ( ErrTimeoutInaccuracyTooBig = errors.New("timeout inaccuracy is too big") ErrTimeoutInaccuracyZero = errors.New("timeout inaccuracy is zero") ErrTimeoutTooSmall = errors.New("timeout value is too small") )
View Source
var ( ErrInputEmpty = errors.New("input channel was not specified") ErrJoinSizeZero = errors.New("join size is zero") )
Functions ¶
This section is empty.
Types ¶
type Discipline ¶
type Discipline[Type any] struct { // contains filtered or unexported fields }
Join discipline.
Example ¶
package main import ( "fmt" "time" "github.com/akramarenkov/cqos/v2/join" ) func main() { quantity := 27 input := make(chan int) opts := join.Opts[int]{ Input: input, JoinSize: 5, Timeout: 10 * time.Second, } discipline, err := join.New(opts) if err != nil { panic(err) } go func() { defer close(input) for stage := 1; stage <= quantity; stage++ { input <- stage } }() outSequence := make([]int, 0, quantity) for slice := range discipline.Output() { outSequence = append(outSequence, slice...) } fmt.Println(outSequence) }
Output: [1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27]
func New ¶
func New[Type any](opts Opts[Type]) (*Discipline[Type], error)
Creates and runs discipline.
func (*Discipline[Type]) Output ¶
func (dsc *Discipline[Type]) Output() <-chan []Type
Returns output channel.
If this channel is closed, it means that the discipline is terminated.
func (*Discipline[Type]) Release ¶
func (dsc *Discipline[Type]) Release()
Marks accumulated slice as no longer used.
Must be used only if NoCopy option is set to true.
Example ¶
package main import ( "fmt" "time" "github.com/akramarenkov/cqos/v2/join" ) func main() { quantity := 27 input := make(chan int) opts := join.Opts[int]{ Input: input, JoinSize: 5, NoCopy: true, Timeout: 10 * time.Second, } discipline, err := join.New(opts) if err != nil { panic(err) } go func() { defer close(input) for stage := 1; stage <= quantity; stage++ { input <- stage } }() outSequence := make([]int, 0, quantity) for slice := range discipline.Output() { outSequence = append(outSequence, slice...) discipline.Release() } fmt.Println(outSequence) }
Output: [1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27]
type Opts ¶
type Opts[Type any] struct { // Input data channel. For terminate discipline it is necessary and sufficient to // close the input channel Input <-chan Type // Output slice size JoinSize uint // By default, to the output channel is written a copy of the accumulated slice // If the NoCopy is set to true, then to the output channel will be directly // written the accumulated slice // In this case, after the accumulated slice is no longer used it is necessary to // inform the discipline about it by calling Release() NoCopy bool // Send timeout of accumulated slice. A zero or negative value means that no data is // written to the output channel after the time has elapsed Timeout time.Duration // Due to the fact that it is not possible to reliably reset the timer/ticker // (without false ticks), a ticker with a duration several times shorter than // the timeout is used and to determine the expiration of the timeout, // the current time is compared with the time of the last recording to // the output channel. This method has an inaccuracy that can be set by // this parameter in percents TimeoutInaccuracy uint }
Options of the created discipline.
Click to show internal directories.
Click to hide internal directories.