Documentation
¶
Overview ¶
Example (ConcurrentOrdered) ¶
package main import ( "fmt" "time" "github.com/opencost/opencost/pkg/util/worker" ) // slowAddTenToFloat simulates "work" -- it accepts an integer, adds 10, converts it to a float64, // waits 1 second, then returns the result. func slowAddTenToFloat(i int) float64 { result := float64(i + 10) time.Sleep(time.Second) return result } func main() { // Expanding on the previous idea, let's assume that we want to receive the result for // every input. That would normally require some specialized synchronization and boilerplate, // but the worker package contains a ordered group type for exactly this functionality // This time, let's create a worker pool and use the MAXGOPROCS value to determine the number // of workers workerCount := worker.OptimalWorkerCount() workerPool := worker.NewWorkerPool(workerCount, slowAddTenToFloat) // Shutdown the worker pool when complete defer workerPool.Shutdown() // now we can create our ordered group type and pass in the worker pool, and since we know our // number of inputs (let's choose 12 this time), we can pass that to the group as well. const numInputs = 12 orderedGroup := worker.NewOrderedGroup(workerPool, numInputs) // loop over our inputs and pass them to the group for i := 0; i < numInputs; i++ { // ordered group has a strict size constraint (set in the NewOrderedGroup func), and will // error if the number of inputs pushed exceeds that size constraint err := orderedGroup.Push(i) if err != nil { panic(err) } } // now we can simply call Wait() to receive the results results := orderedGroup.Wait() // Note that the order of the results is consistent with the order in which they were pushed for idx, result := range results { fmt.Printf("Received Result: %.2f for Input: %d\n", result, idx) } }
Output: Received Result: 10.00 for Input: 0 Received Result: 11.00 for Input: 1 Received Result: 12.00 for Input: 2 Received Result: 13.00 for Input: 3 Received Result: 14.00 for Input: 4 Received Result: 15.00 for Input: 5 Received Result: 16.00 for Input: 6 Received Result: 17.00 for Input: 7 Received Result: 18.00 for Input: 8 Received Result: 19.00 for Input: 9 Received Result: 20.00 for Input: 10 Received Result: 21.00 for Input: 11
Example (ConcurrentOrderedSimple) ¶
package main import ( "fmt" "time" "github.com/opencost/opencost/pkg/util/worker" ) // slowAddTenToFloat simulates "work" -- it accepts an integer, adds 10, converts it to a float64, // waits 1 second, then returns the result. func slowAddTenToFloat(i int) float64 { result := float64(i + 10) time.Sleep(time.Second) return result } func main() { // This last example highlights a simplified version of the previous example. While // the ordered example provides tuning knobs for total goroutines and allows pushing // data dynamically, it can be quite verbose and difficult to read at times. The worker // package also provides a utility function that simplifies the ordered concurrent // processing into a worker function and a slice of inputs // Let's create our inputs 0-12 like in the previous example const numInputs = 12 inputs := make([]int, numInputs) for i := 0; i < numInputs; i++ { inputs[i] = i } // Now, we can just call ConcurrentDo with the inputs and worker func: results := worker.ConcurrentDo(slowAddTenToFloat, inputs) // Note that the order of the results is consistent with the order of inputs for i := 0; i < numInputs; i++ { fmt.Printf("Received Result: %.2f for Input: %d\n", results[i], inputs[i]) } }
Output: Received Result: 10.00 for Input: 0 Received Result: 11.00 for Input: 1 Received Result: 12.00 for Input: 2 Received Result: 13.00 for Input: 3 Received Result: 14.00 for Input: 4 Received Result: 15.00 for Input: 5 Received Result: 16.00 for Input: 6 Received Result: 17.00 for Input: 7 Received Result: 18.00 for Input: 8 Received Result: 19.00 for Input: 9 Received Result: 20.00 for Input: 10 Received Result: 21.00 for Input: 11
Example (ConcurrentWorkers) ¶
package main import ( "fmt" "time" "github.com/opencost/opencost/pkg/util/worker" ) // slowAddTenToFloat simulates "work" -- it accepts an integer, adds 10, converts it to a float64, // waits 1 second, then returns the result. func slowAddTenToFloat(i int) float64 { result := float64(i + 10) time.Sleep(time.Second) return result } func main() { // Assuming we have a list of ints we want to pass to slowAddTenToFloat(), // rather than serially calling the function on each input (requiring a wait // of 1 second between calls), we'll want to execute each in a goroutine. Let's // say we had 100 inputs, we may not want to create that many go routines, so // instead, we can create a pool of goroutines that work on our inputs as fast as // possible. // Create a worker pool using 50 goroutines: workerPool := worker.NewWorkerPool(50, slowAddTenToFloat) // we want to shutdown the workerPool at the end of it's use to ensure we don't // leak go routines defer workerPool.Shutdown() // Loop over 100 inputs and run slowAddTenToFloat for i := 0; i < 100; i++ { // Run accepts a receive channel for each input, but it is not required. // To demonstrate receiving, we'll receive the results when the input // is 50: if i == 50 { receive := make(chan float64) workerPool.Run(i, receive) // since we don't want to slow down the input loop, let's receive the // result in a separate go routine go func(input int, rec chan float64) { defer close(rec) result := <-rec fmt.Printf("Receive Result: %.2f for Input: %d\n", result, input) }(i, receive) } else { // pass nil if receiving the result isn't necessary workerPool.Run(i, nil) } } // 100 inputs with 50 go routines should take 2 seconds, so let's wait a bit longer than that time.Sleep((2 * time.Second) + (500 * time.Millisecond)) }
Output: Receive Result: 60.00 for Input: 50
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ConcurrentDo ¶
ConcurrentDo runs a pool of workers which concurrently call the provided worker func on each input to get ordered output corresponding to the inputs
func OptimalWorkerCount ¶
func OptimalWorkerCount() int
OptimalWorkerCount will return an optimal worker count based on runtime.NumCPU()
func OptimalWorkerCountInRange ¶
OptimalWorkerCount will return runtime.NumCPU() constrained to the provided min and max range
Types ¶
type WorkGroup ¶
type WorkGroup[T any, U any] interface { // Push adds a new input to the work group. Push(T) error // Wait waits for all pending worker tasks to complete, then returns all the results. Wait() []U }
WorkGroup is a group of inputs that leverage a WorkerPool to run inputs through workers and collect the results in a single slice.
func NewOrderedGroup ¶
func NewOrderedGroup[T any, U any](pool WorkerPool[T, U], size int) WorkGroup[T, U]
NewGroup creates a new WorkGroup implementation for processing a group of inputs in the order in which they are pushed. Ordered groups do not support concurrent Push() calls.
type WorkerPool ¶
type WorkerPool[T any, U any] interface { // Run executes a Worker in the pool on the provided input and onComplete receive chanel // to get the results. An error is returned if the pool is shutdown, or is in the process // of shutting down. Run(input T, onComplete chan<- U) error // Shutdown stops all of the workers (if running). Shutdown() }
WorkerPool is a pool of go routines executing a Worker on supplied inputs via the Run function.
func NewWorkerPool ¶
func NewWorkerPool[T any, U any](workers int, work Worker[T, U]) WorkerPool[T, U]
NewWorkerPool creates a new worker pool provided the number of workers to run as well as the worker func used to transform inputs to outputs.