worker

package
v1.104.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 13, 2023 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Overview

Example (ConcurrentOrdered)
package main

import (
	"fmt"
	"time"

	"github.com/opencost/opencost/pkg/util/worker"
)

// slowAddTenToFloat simulates "work" -- it accepts an integer, adds 10, converts it to a float64,
// waits 1 second, then returns the result.
func slowAddTenToFloat(i int) float64 {
	result := float64(i + 10)
	time.Sleep(time.Second)
	return result
}

func main() {
	// Expanding on the previous idea, let's assume that we want to receive the result for
	// every input. That would normally require some specialized synchronization and boilerplate,
	// but the worker package contains a ordered group type for exactly this functionality

	// This time, let's create a worker pool and use the MAXGOPROCS value to determine the number
	// of workers
	workerCount := worker.OptimalWorkerCount()
	workerPool := worker.NewWorkerPool(workerCount, slowAddTenToFloat)

	// Shutdown the worker pool when complete
	defer workerPool.Shutdown()

	// now we can create our ordered group type and pass in the worker pool, and since we know our
	// number of inputs (let's choose 12 this time), we can pass that to the group as well.
	const numInputs = 12
	orderedGroup := worker.NewOrderedGroup(workerPool, numInputs)

	// loop over our inputs and pass them to the group
	for i := 0; i < numInputs; i++ {
		// ordered group has a strict size constraint (set in the NewOrderedGroup func), and will
		// error if the number of inputs pushed exceeds that size constraint
		err := orderedGroup.Push(i)
		if err != nil {
			panic(err)
		}
	}

	// now we can simply call Wait() to receive the results
	results := orderedGroup.Wait()

	// Note that the order of the results is consistent with the order in which they were pushed
	for idx, result := range results {
		fmt.Printf("Received Result: %.2f for Input: %d\n", result, idx)
	}

}
Output:

Received Result: 10.00 for Input: 0
Received Result: 11.00 for Input: 1
Received Result: 12.00 for Input: 2
Received Result: 13.00 for Input: 3
Received Result: 14.00 for Input: 4
Received Result: 15.00 for Input: 5
Received Result: 16.00 for Input: 6
Received Result: 17.00 for Input: 7
Received Result: 18.00 for Input: 8
Received Result: 19.00 for Input: 9
Received Result: 20.00 for Input: 10
Received Result: 21.00 for Input: 11
Example (ConcurrentOrderedSimple)
package main

import (
	"fmt"
	"time"

	"github.com/opencost/opencost/pkg/util/worker"
)

// slowAddTenToFloat simulates "work" -- it accepts an integer, adds 10, converts it to a float64,
// waits 1 second, then returns the result.
func slowAddTenToFloat(i int) float64 {
	result := float64(i + 10)
	time.Sleep(time.Second)
	return result
}

func main() {
	// This last example highlights a simplified version of the previous example. While
	// the ordered example provides tuning knobs for total goroutines and allows pushing
	// data dynamically, it can be quite verbose and difficult to read at times. The worker
	// package also provides a utility function that simplifies the ordered concurrent
	// processing into a worker function and a slice of inputs

	// Let's create our inputs 0-12 like in the previous example
	const numInputs = 12
	inputs := make([]int, numInputs)
	for i := 0; i < numInputs; i++ {
		inputs[i] = i
	}

	// Now, we can just call ConcurrentDo with the inputs and worker func:
	results := worker.ConcurrentDo(slowAddTenToFloat, inputs)

	// Note that the order of the results is consistent with the order of inputs
	for i := 0; i < numInputs; i++ {
		fmt.Printf("Received Result: %.2f for Input: %d\n", results[i], inputs[i])
	}

}
Output:

Received Result: 10.00 for Input: 0
Received Result: 11.00 for Input: 1
Received Result: 12.00 for Input: 2
Received Result: 13.00 for Input: 3
Received Result: 14.00 for Input: 4
Received Result: 15.00 for Input: 5
Received Result: 16.00 for Input: 6
Received Result: 17.00 for Input: 7
Received Result: 18.00 for Input: 8
Received Result: 19.00 for Input: 9
Received Result: 20.00 for Input: 10
Received Result: 21.00 for Input: 11
Example (ConcurrentWorkers)
package main

import (
	"fmt"
	"time"

	"github.com/opencost/opencost/pkg/util/worker"
)

// slowAddTenToFloat simulates "work" -- it accepts an integer, adds 10, converts it to a float64,
// waits 1 second, then returns the result.
func slowAddTenToFloat(i int) float64 {
	result := float64(i + 10)
	time.Sleep(time.Second)
	return result
}

func main() {
	// Assuming we have a list of ints we want to pass to slowAddTenToFloat(),
	// rather than serially calling the function on each input (requiring a wait
	// of 1 second between calls), we'll want to execute each in a goroutine. Let's
	// say we had 100 inputs, we may not want to create that many go routines, so
	// instead, we can create a pool of goroutines that work on our inputs as fast as
	// possible.

	// Create a worker pool using 50 goroutines:
	workerPool := worker.NewWorkerPool(50, slowAddTenToFloat)

	// we want to shutdown the workerPool at the end of it's use to ensure we don't
	// leak go routines
	defer workerPool.Shutdown()

	// Loop over 100 inputs and run slowAddTenToFloat
	for i := 0; i < 100; i++ {
		// Run accepts a receive channel for each input, but it is not required.
		// To demonstrate receiving, we'll receive the results when the input
		// is 50:
		if i == 50 {
			receive := make(chan float64)
			workerPool.Run(i, receive)

			// since we don't want to slow down the input loop, let's receive the
			// result in a separate go routine
			go func(input int, rec chan float64) {
				defer close(rec)
				result := <-rec
				fmt.Printf("Receive Result: %.2f for Input: %d\n", result, input)
			}(i, receive)
		} else {
			// pass nil if receiving the result isn't necessary
			workerPool.Run(i, nil)
		}
	}

	// 100 inputs with 50 go routines should take 2 seconds, so let's wait a bit longer than that
	time.Sleep((2 * time.Second) + (500 * time.Millisecond))

}
Output:

Receive Result: 60.00 for Input: 50

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func ConcurrentDo

func ConcurrentDo[T any, U any](worker Worker[T, U], inputs []T) []U

ConcurrentDo runs a pool of workers which concurrently call the provided worker func on each input to get ordered output corresponding to the inputs

func OptimalWorkerCount

func OptimalWorkerCount() int

OptimalWorkerCount will return an optimal worker count based on runtime.NumCPU()

func OptimalWorkerCountInRange

func OptimalWorkerCountInRange(min int, max int) int

OptimalWorkerCount will return runtime.NumCPU() constrained to the provided min and max range

Types

type WorkGroup

type WorkGroup[T any, U any] interface {
	// Push adds a new input to the work group.
	Push(T) error

	// Wait waits for all pending worker tasks to complete, then returns all the results.
	Wait() []U
}

WorkGroup is a group of inputs that leverage a WorkerPool to run inputs through workers and collect the results in a single slice.

func NewOrderedGroup

func NewOrderedGroup[T any, U any](pool WorkerPool[T, U], size int) WorkGroup[T, U]

NewGroup creates a new WorkGroup implementation for processing a group of inputs in the order in which they are pushed. Ordered groups do not support concurrent Push() calls.

type Worker

type Worker[T any, U any] func(T) U

Worker is a transformation function from input type T to output type U.

type WorkerPool

type WorkerPool[T any, U any] interface {
	// Run executes a Worker in the pool on the provided input and onComplete receive chanel
	// to get the results. An error is returned if the pool is shutdown, or is in the process
	// of shutting down.
	Run(input T, onComplete chan<- U) error

	// Shutdown stops all of the workers (if running).
	Shutdown()
}

WorkerPool is a pool of go routines executing a Worker on supplied inputs via the Run function.

func NewWorkerPool

func NewWorkerPool[T any, U any](workers int, work Worker[T, U]) WorkerPool[T, U]

NewWorkerPool creates a new worker pool provided the number of workers to run as well as the worker func used to transform inputs to outputs.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL