pipeliner

package module
v0.0.0-...-c2d0059 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 6, 2025 License: BSD-3-Clause Imports: 2 Imported by: 14

README

pipeliner

Build Status GoDoc

A simplified pipline library, for parallel requests with bounded parallelism.

Getting

go get github.com/keybase/pipeliner

Background

Often you want do network requests with bounded parallelism. Let's say you have 1,000 DNS queries to make, and don't want to wait for them to complete in serial, but don't want to blast your server with 1,000 simultaneous requests. In this case, bounded parallelism makes sense. Make 1,000 requests with only 10 outstanding at any one time.

At this point, I usually Google for it, and come up with this blog post, and I become slightly sad, because that is a lot of code to digest and understand to do something that should be rather simple. It's not really the fault of the language, but more so the library. Here is a library that makes it a lot easier:

Example

import (
	"context"
	"github.com/keybase/pipeliner"
	"sync"
	"time"
)

// See example_request_test.go for a runnable example.

type Request struct{ i int }
type Result struct{ i int }

func (r Request) Do() (Result, error) {
	time.Sleep(time.Millisecond)
	return Result{r.i}, nil
}

// makeRequests calls `Do` on all of the given requests, with only `window` outstanding
// at any given time. It puts the results in `results`, and errors out on the first
// failure.
func makeRequests(ctx context.Context, requests []Request, window int) (results []Result, err error) {

	var resultsLock sync.Mutex
	results = make([]Result, len(requests))

	pipeliner := pipeliner.NewPipeliner(window)

	worker := func(ctx context.Context, i int) error {
		res, err := requests[i].Do()
		resultsLock.Lock()
		results[i] = res
		resultsLock.Unlock()
		return err // the first error will kill the pipeline
	}

	for i := range requests {
		err := pipeliner.WaitForRoom(ctx)
		if err != nil {
			return nil, err
		}
		go func(i int) { pipeliner.CompleteOne(worker(ctx, i)) }(i)
	}
	return results, pipeliner.Flush(ctx)
}

Documentation

Overview

Example
package main

import (
	"context"
	"fmt"
	"sync"
	"time"

	"github.com/keybase/pipeliner"
)

type Request struct{ i int } //nolint
type Result struct{ i int }

func (r Request) Do() (Result, error) {
	time.Sleep(time.Millisecond)
	return Result(r), nil
}

func main() {
	requests := []Request{{0}, {1}, {2}, {3}, {4}}
	results, _ := makeRequests(context.Background(), requests, 2)
	for _, r := range results {
		fmt.Printf("%d ", r.i)
	}
}

// makeRequests calls `Do` on all of the given requests, with only `window` outstanding
// at any given time. It puts the results in `results`, and errors out on the first
// failure.
func makeRequests(ctx context.Context, requests []Request, window int) (results []Result, err error) {

	var resultsLock sync.Mutex
	results = make([]Result, len(requests))

	pipeliner := pipeliner.NewPipeliner(window)

	worker := func(_ context.Context, i int) error {
		res, err := requests[i].Do()
		resultsLock.Lock()
		results[i] = res
		resultsLock.Unlock()
		return err // the first error will kill the pipeline
	}

	for i := range requests {
		err := pipeliner.WaitForRoom(ctx)
		if err != nil {
			return nil, err
		}
		go func(i int) { pipeliner.CompleteOne(worker(ctx, i)) }(i)
	}
	return results, pipeliner.Flush(ctx)
}
Output:

0 1 2 3 4

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Pipeliner

type Pipeliner struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Pipeliner coordinates a flow of parallel requests, rate-limiting so that only a fixed number are outstanding at any one given time.

func NewPipeliner

func NewPipeliner(w int) *Pipeliner

NewPipeliner makes a pipeliner with window size `w`.

func (*Pipeliner) CompleteOne

func (p *Pipeliner) CompleteOne(e error)

CompleteOne should be called when a request is completed, to make room for subsequent requests. Call it with an error if you want the rest of the pipeline to be short-circuited. This is the error that is returned from WaitForRoom.

func (*Pipeliner) Flush

func (p *Pipeliner) Flush(ctx context.Context) error

Flush any outstanding requests, blocking until the last completes. Returns an error set by CompleteOne, or a context-based error if any request was canceled mid-flight.

func (*Pipeliner) WaitForRoom

func (p *Pipeliner) WaitForRoom(ctx context.Context) error

WaitForRoom will block until there is room in the window to fire another request. It returns an error if any prior request failed, instructing the caller to stop firing off new requests. The error originates either from CompleteOne(), or from a context-based cancellation

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL