Documentation ¶
Overview ¶
Example ¶
package main import ( "context" "fmt" "sync" "time" "github.com/keybase/pipeliner" ) type Request struct{ i int } //nolint type Result struct{ i int } func (r Request) Do() (Result, error) { time.Sleep(time.Millisecond) return Result(r), nil } func main() { requests := []Request{{0}, {1}, {2}, {3}, {4}} results, _ := makeRequests(context.Background(), requests, 2) for _, r := range results { fmt.Printf("%d ", r.i) } } // makeRequests calls `Do` on all of the given requests, with only `window` outstanding // at any given time. It puts the results in `results`, and errors out on the first // failure. func makeRequests(ctx context.Context, requests []Request, window int) (results []Result, err error) { var resultsLock sync.Mutex results = make([]Result, len(requests)) pipeliner := pipeliner.NewPipeliner(window) worker := func(ctx context.Context, i int) error { res, err := requests[i].Do() resultsLock.Lock() results[i] = res resultsLock.Unlock() return err // the first error will kill the pipeline } for i := range requests { err := pipeliner.WaitForRoom(ctx) if err != nil { return nil, err } go func(i int) { pipeliner.CompleteOne(worker(ctx, i)) }(i) } return results, pipeliner.Flush(ctx) }
Output: 0 1 2 3 4
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Pipeliner ¶
Pipeliner coordinates a flow of parallel requests, rate-limiting so that only a fixed number are oustanding at any one given time.
func NewPipeliner ¶
NewPipeliner makes a pipeliner with window size `w`.
func (*Pipeliner) CompleteOne ¶
CompleteOne should be called when a request is completed, to make room for subsequent requests. Call it with an error if you want the rest of the pipeline to be short-circuited. This is the error that is returned from WaitForRoom.
func (*Pipeliner) Flush ¶
Flush any outstanding requests, blocking until the last completes. Returns an error set by CompleteOne, or a context-based error if any request was canceled mid-flight.
func (*Pipeliner) WaitForRoom ¶
WaitForRoom will block until there is room in the window to fire another request. It returns an error if any prior request failed, instructing the caller to stop firing off new requests. The error originates either from CompleteOne(), or from a context-based cancelation