Documentation ¶
Overview ¶
Package bundler supports bundling (batching) of items. Bundling amortizes an action with fixed costs over multiple items. For example, if an API provides an RPC that accepts a list of items as input, but clients would prefer adding items one at a time, then a Bundler can accept individual items from the client and bundle many of them into a single RPC.
This package is experimental and subject to change without notice.
Index ¶
Constants ¶
const ( DefaultDelayThreshold = time.Second DefaultBundleCountThreshold = 10 DefaultBundleByteThreshold = 1e6 // 1M DefaultBufferedByteLimit = 1e9 // 1G )
Variables ¶
var ( // ErrOverflow indicates that Bundler's stored bytes exceeds its BufferedByteLimit. ErrOverflow = errors.New("bundler reached buffered byte limit") // ErrOversizedItem indicates that an item's size exceeds the maximum bundle size. ErrOversizedItem = errors.New("item size exceeds bundle byte limit") )
Functions ¶
This section is empty.
Types ¶
type Bundler ¶
type Bundler struct { // Starting from the time that the first message is added to a bundle, once // this delay has passed, handle the bundle. The default is DefaultDelayThreshold. DelayThreshold time.Duration // Once a bundle has this many items, handle the bundle. Since only one // item at a time is added to a bundle, no bundle will exceed this // threshold, so it also serves as a limit. The default is // DefaultBundleCountThreshold. BundleCountThreshold int // Once the number of bytes in current bundle reaches this threshold, handle // the bundle. The default is DefaultBundleByteThreshold. This triggers handling, // but does not cap the total size of a bundle. BundleByteThreshold int // The maximum size of a bundle, in bytes. Zero means unlimited. BundleByteLimit int // The maximum number of bytes that the Bundler will keep in memory before // returning ErrOverflow. The default is DefaultBufferedByteLimit. BufferedByteLimit int // The maximum number of handler invocations that can be running at once. // The default is 1. HandlerLimit int // contains filtered or unexported fields }
A Bundler collects items added to it into a bundle until the bundle exceeds a given size, then calls a user-provided function to handle the bundle.
The exported fields are only safe to modify prior to the first call to Add or AddWait.
func NewBundler ¶
func NewBundler(itemExample interface{}, handler func(interface{})) *Bundler
NewBundler creates a new Bundler.
itemExample is a value of the type that will be bundled. For example, if you want to create bundles of *Entry, you could pass &Entry{} for itemExample.
handler is a function that will be called on each bundle. If itemExample is of type T, the argument to handler is of type []T. handler is always called sequentially for each bundle, and never in parallel.
Configure the Bundler by setting its thresholds and limits before calling any of its methods.
func (*Bundler) Add ¶
Add adds item to the current bundle. It marks the bundle for handling and starts a new one if any of the thresholds or limits are exceeded. The type of item must be assignable to the itemExample parameter of the NewBundler method, otherwise there will be a panic.
If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then the item can never be handled. Add returns ErrOversizedItem in this case.
If adding the item would exceed the maximum memory allowed (Bundler.BufferedByteLimit) or an AddWait call is blocked waiting for memory, Add returns ErrOverflow.
Add never blocks.
func (*Bundler) AddWait ¶
AddWait adds item to the current bundle. It marks the bundle for handling and starts a new one if any of the thresholds or limits are exceeded.
If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then the item can never be handled. AddWait returns ErrOversizedItem in this case.
If adding the item would exceed the maximum memory allowed (Bundler.BufferedByteLimit), AddWait blocks until space is available or ctx is done.
Calls to Add and AddWait should not be mixed on the same Bundler.