Documentation
¶
Overview ¶
Used to distributes data among handlers according to priority
Index ¶
- Variables
- func FairDivider(priorities []uint, dividend uint, distribution map[uint]uint) map[uint]uint
- func IsNonFatalConfig(priorities []uint, divider Divider, quantity uint) bool
- func IsSuitableConfig(priorities []uint, divider Divider, quantity uint, limit float64) bool
- func PickUpMaxNonFatalQuantity(priorities []uint, divider Divider, maxQuantity uint) uint
- func PickUpMaxSuitableQuantity(priorities []uint, divider Divider, maxQuantity uint, limit float64) uint
- func PickUpMinNonFatalQuantity(priorities []uint, divider Divider, maxQuantity uint) uint
- func PickUpMinSuitableQuantity(priorities []uint, divider Divider, maxQuantity uint, limit float64) uint
- func RateDivider(priorities []uint, dividend uint, distribution map[uint]uint) map[uint]uint
- type Discipline
- type Divider
- type Handle
- type Opts
- type Prioritized
- type Simple
- type SimpleOpts
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( ErrEmptyDivider = errors.New("priorities divider was not specified") ErrEmptyFeedback = errors.New("feedback channel was not specified") ErrEmptyOutput = errors.New("output channel was not specified") ErrQuantityExceeded = errors.New("value of handlers quantity has been exceeded") )
var (
ErrEmptyHandle = errors.New("handle function was not specified")
)
Functions ¶
func FairDivider ¶ added in v1.0.0
Distributes quantity evenly among the priorities.
Used for equaling.
Example results:
- 6 / [3 2 1] = map[3:2, 2:2, 1:2]
- 100 / [70 20 10] = map[70:34, 20:33, 10:33]
func IsNonFatalConfig ¶ added in v1.2.0
Due to the imperfection of the dividing function and working with integers (since the quantity of handlers is an integer), large errors can occur when distributing handlers by priority, especially for small quantity of handlers. This function allows you to determine that with the specified combination of priorities, the dividing function and the quantity of handlers, the distribution error does not cause stop processing of one or more priorities (for none of the priorities, the quantity is not equal to zero)
func IsSuitableConfig ¶ added in v1.2.0
Due to the imperfection of the dividing function and working with integers (since the quantity of handlers is an integer), large errors can occur when distributing handlers by priority, especially for small quantity of handlers. This function allows you to determine that with the specified combination of priorities, the dividing function and the quantity of handlers, the distribution error does not exceed the limit
func PickUpMaxNonFatalQuantity ¶ added in v1.2.0
Picks up the maximum quantity of handlers for which the division error does not cause stop processing of one or more priorities
func PickUpMaxSuitableQuantity ¶ added in v1.2.0
func PickUpMaxSuitableQuantity( priorities []uint, divider Divider, maxQuantity uint, limit float64, ) uint
Picks up the maximum quantity of handlers for which the division error does not exceed the limit
func PickUpMinNonFatalQuantity ¶ added in v1.2.0
Picks up the minimum quantity of handlers for which the division error does not cause stop processing of one or more priorities
func PickUpMinSuitableQuantity ¶ added in v1.2.0
func PickUpMinSuitableQuantity( priorities []uint, divider Divider, maxQuantity uint, limit float64, ) uint
Picks up the minimum quantity of handlers for which the division error does not exceed the limit
func RateDivider ¶ added in v1.0.0
Distributes quantity between priorities in proportion to the priority value.
Used for prioritization.
Example results:
- 6 / [3 2 1] = map[3:3, 2:2, 1:1]
- 100 / [70 20 10] = map[70:70, 20:20, 10:10]
Types ¶
type Discipline ¶
type Discipline[Type any] struct { // contains filtered or unexported fields }
Main prioritization discipline.
Preferably input channels should be buffered for performance reasons.
Data from input channels passed to handlers by output channel.
Handlers must write priority of processed data to feedback channel after it has been processed.
For equaling use FairDivider, for prioritization use RateDivider or custom divider
Example ¶
handlersQuantity := 100 // Preferably input channels should be buffered inputCapacity := 10 itemsQuantity := 100 inputs := map[uint]chan string{ 3: make(chan string, inputCapacity), 2: make(chan string, inputCapacity), 1: make(chan string, inputCapacity), } // Map key is a value of priority inputsOpts := map[uint]<-chan string{ 3: inputs[3], 2: inputs[2], 1: inputs[1], } defer func() { for _, input := range inputs { close(input) } }() // Data from input channels passed to handlers by output channel output := make(chan Prioritized[string]) // Handlers must write priority of processed data to feedback channel after it has been processed feedback := make(chan uint) defer close(feedback) // Used only in this example for detect that all written data are processed measurements := make(chan bool) defer close(measurements) // For equaling use FairDivider, for prioritization use RateDivider or custom divider disciplineOpts := Opts[string]{ Divider: RateDivider, Feedback: feedback, HandlersQuantity: uint(handlersQuantity), Inputs: inputsOpts, Output: output, } discipline, err := New(disciplineOpts) if err != nil { panic(err) } defer discipline.Stop() wg := &sync.WaitGroup{} defer wg.Wait() // Run handlers, that process data for handler := 0; handler < handlersQuantity; handler++ { wg.Add(1) go func() { defer wg.Done() for prioritized := range output { // Data processing // fmt.Println(prioritized.Item) measurements <- true feedback <- prioritized.Priority } }() } // Run writers, that write data to input channels for priority, input := range inputs { wg.Add(1) go func(precedency uint, channel chan string) { defer wg.Done() base := strconv.Itoa(int(precedency)) for id := 0; id < itemsQuantity; id++ { item := base + ":" + strconv.Itoa(id) channel <- item } }(priority, input) } // Terminate handlers defer close(output) received := 0 // Wait for process all written data for range measurements { received++ if received == itemsQuantity*len(inputs) { break } } fmt.Println("Processed items quantity:", received)
Output: Processed items quantity: 300
func New ¶
func New[Type any](opts Opts[Type]) (*Discipline[Type], error)
Creates and runs main prioritization discipline
func (*Discipline[Type]) AddInput ¶
func (dsc *Discipline[Type]) AddInput(channel <-chan Type, priority uint)
Adds or updates (if it added previously) input channel for specified priority
func (*Discipline[Type]) Err ¶ added in v1.3.0
func (dsc *Discipline[Type]) Err() <-chan error
Returns a channel with errors. If an error occurs (the value from the channel is not equal to nil) the discipline terminates its work. The most likely cause of the error is an incorrectly working dividing function in which the sum of the distributed quantities is not equal to the original quantity.
The single nil value means that the discipline has terminated in normal mode
func (*Discipline[Type]) GracefulStop ¶ added in v1.4.0
func (dsc *Discipline[Type]) GracefulStop()
Graceful terminates work of the discipline.
Waits draining input channels, waits end processing data in handlers and terminates.
You must end write to input channels and close them (or remove), otherwise graceful stop not be ended
Example ¶
handlersQuantity := 100 // Preferably input channels should be buffered inputCapacity := 10 itemsQuantity := 100 inputs := map[uint]chan string{ 3: make(chan string, inputCapacity), 2: make(chan string, inputCapacity), 1: make(chan string, inputCapacity), } // Map key is a value of priority inputsOpts := map[uint]<-chan string{ 3: inputs[3], 2: inputs[2], 1: inputs[1], } // Data from input channels passed to handlers by output channel output := make(chan Prioritized[string]) // Handlers must write priority of processed data to feedback channel after it has been processed feedback := make(chan uint) defer close(feedback) // Used only in this example for detect that all written data are processed measurements := make(chan bool) // For equaling use FairDivider, for prioritization use RateDivider or custom divider disciplineOpts := Opts[string]{ Divider: RateDivider, Feedback: feedback, HandlersQuantity: uint(handlersQuantity), Inputs: inputsOpts, Output: output, } discipline, err := New(disciplineOpts) if err != nil { panic(err) } wgh := &sync.WaitGroup{} defer wgh.Wait() // Run handlers, that process data for handler := 0; handler < handlersQuantity; handler++ { wgh.Add(1) go func() { defer wgh.Done() for prioritized := range output { // Data processing // fmt.Println(prioritized.Item) measurements <- true feedback <- prioritized.Priority } }() } wgw := &sync.WaitGroup{} // Run writers, that write data to input channels for priority, input := range inputs { wgw.Add(1) go func(precedency uint, channel chan string) { defer wgw.Done() base := strconv.Itoa(int(precedency)) for id := 0; id < itemsQuantity; id++ { item := base + ":" + strconv.Itoa(id) channel <- item } }(priority, input) } // Terminate handlers defer close(output) obtained := make(chan int) defer close(obtained) // Counting the amount of received data go func() { received := 0 for range measurements { received++ } obtained <- received }() // You must end write to input channels and close them (or remove), // otherwise graceful stop not be ended wgw.Wait() for _, input := range inputs { close(input) } discipline.GracefulStop() // Terminate measurements close(measurements) received := <-obtained // Verify data received from discipline if received != itemsQuantity*len(inputs) { panic("graceful stop work not properly") } fmt.Println("Processed items quantity:", received)
Output: Processed items quantity: 300
func (*Discipline[Type]) RemoveInput ¶
func (dsc *Discipline[Type]) RemoveInput(priority uint)
Removes input channel for specified priority
func (*Discipline[Type]) Stop ¶
func (dsc *Discipline[Type]) Stop()
Roughly terminates work of the discipline.
Use for wait completion at terminates via context
type Divider ¶ added in v1.0.0
Distributes quantity of something by priorities. Determines how handlers are distributed among priorities.
Slice of priorities is passed to this function sorted from highest to lowest.
Sum of the distributed quantities must equal the original quantity.
If distribution is nil then it must be created and returned, otherwise it must be updated and returned.
type Handle ¶ added in v1.2.0
Callback function called in handlers of simplified prioritization discipline when an item is received.
Function should be interrupted when context is canceled
type Opts ¶
type Opts[Type any] struct { // Roughly terminates (cancels) work of the discipline Ctx context.Context // Determines how handlers are distributed among priorities Divider Divider // Handlers must write priority of processed data to feedback channel after it has been processed Feedback <-chan uint // Between how many handlers you need to distribute data HandlersQuantity uint // Channels with input data, should be buffered for performance reasons // Map key is a value of priority // For graceful termination need close all input channels or remove them Inputs map[uint]<-chan Type // Handlers should read distributed data from this channel Output chan<- Prioritized[Type] }
Options of the created main prioritization discipline
type Prioritized ¶ added in v1.0.0
Describes the data distributed by the prioritization discipline
type Simple ¶ added in v1.2.0
type Simple[Type any] struct { // contains filtered or unexported fields }
Simplified version of the discipline that runs handlers on its own and hides the output and feedback channels
Example ¶
handlersQuantity := 100 // Preferably input channels should be buffered inputCapacity := 10 itemsQuantity := 100 inputs := map[uint]chan string{ 3: make(chan string, inputCapacity), 2: make(chan string, inputCapacity), 1: make(chan string, inputCapacity), } // Map key is a value of priority inputsOpts := map[uint]<-chan string{ 3: inputs[3], 2: inputs[2], 1: inputs[1], } defer func() { for _, input := range inputs { close(input) } }() // Used only in this example for detect that all written data are processed measurements := make(chan bool) defer close(measurements) handle := func(ctx context.Context, item string) { // Data processing // fmt.Println(item) select { case <-ctx.Done(): case measurements <- true: } } // For equaling use FairDivider, for prioritization use RateDivider or custom divider opts := SimpleOpts[string]{ Divider: RateDivider, Handle: handle, HandlersQuantity: uint(handlersQuantity), Inputs: inputsOpts, } simple, err := NewSimple(opts) if err != nil { panic(err) } defer simple.Stop() wg := &sync.WaitGroup{} defer wg.Wait() // Run writers, that write data to input channels for priority, input := range inputs { wg.Add(1) go func(precedency uint, channel chan string) { defer wg.Done() base := strconv.Itoa(int(precedency)) for id := 0; id < itemsQuantity; id++ { item := base + ":" + strconv.Itoa(id) channel <- item } }(priority, input) } received := 0 // Wait for process all written data for range measurements { received++ if received == itemsQuantity*len(inputs) { break } } fmt.Println("Processed items quantity:", received)
Output: Processed items quantity: 300
func NewSimple ¶ added in v1.2.0
func NewSimple[Type any](opts SimpleOpts[Type]) (*Simple[Type], error)
Creates and runs simplified prioritization discipline
func (*Simple[Type]) Err ¶ added in v1.3.0
Returns a channel with errors. If an error occurs (the value from the channel is not equal to nil) the discipline terminates its work. The most likely cause of the error is an incorrectly working dividing function in which the sum of the distributed quantities is not equal to the original quantity.
The single nil value means that the discipline has terminated in normal mode
func (*Simple[Type]) GracefulStop ¶ added in v1.4.0
func (smpl *Simple[Type]) GracefulStop()
Graceful terminates work of the discipline.
Waits draining input channels, waits end processing data in handlers and terminates.
You must end write to input channels and close them, otherwise graceful stop not be ended
Example ¶
handlersQuantity := 100 // Preferably input channels should be buffered inputCapacity := 10 itemsQuantity := 100 inputs := map[uint]chan string{ 3: make(chan string, inputCapacity), 2: make(chan string, inputCapacity), 1: make(chan string, inputCapacity), } // Map key is a value of priority inputsOpts := map[uint]<-chan string{ 3: inputs[3], 2: inputs[2], 1: inputs[1], } // Used only in this example for detect that all written data are processed measurements := make(chan bool) handle := func(ctx context.Context, item string) { // Data processing // fmt.Println(item) select { case <-ctx.Done(): case measurements <- true: } } // For equaling use FairDivider, for prioritization use RateDivider or custom divider opts := SimpleOpts[string]{ Divider: RateDivider, Handle: handle, HandlersQuantity: uint(handlersQuantity), Inputs: inputsOpts, } simple, err := NewSimple(opts) if err != nil { panic(err) } wg := &sync.WaitGroup{} // Run writers, that write data to input channels for priority, input := range inputs { wg.Add(1) go func(precedency uint, channel chan string) { defer wg.Done() base := strconv.Itoa(int(precedency)) for id := 0; id < itemsQuantity; id++ { item := base + ":" + strconv.Itoa(id) channel <- item } }(priority, input) } obtained := make(chan int) defer close(obtained) // Counting the amount of received data go func() { received := 0 for range measurements { received++ } obtained <- received }() // You must end write to input channels and close them (or remove), // otherwise graceful stop not be ended wg.Wait() for _, input := range inputs { close(input) } simple.GracefulStop() // Terminate measurements close(measurements) received := <-obtained // Verify data received from discipline if received != itemsQuantity*len(inputs) { panic("graceful stop work not properly") } fmt.Println("Processed items quantity:", received)
Output: Processed items quantity: 300
type SimpleOpts ¶ added in v1.2.0
type SimpleOpts[Type any] struct { // Roughly terminates (cancels) work of the discipline Ctx context.Context // Determines how handlers are distributed among priorities Divider Divider // Callback function called in handlers when an item is received Handle Handle[Type] // Between how many handlers you need to distribute data HandlersQuantity uint // Channels with input data, should be buffered for performance reasons // Map key is a value of priority // For graceful termination need close all input channels Inputs map[uint]<-chan Type }
Options of the created simplified prioritization discipline