bundler

package
v0.64.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 6, 2022 License: BSD-3-Clause Imports: 6 Imported by: 290

Documentation

Overview

Package bundler supports bundling (batching) of items. Bundling amortizes an action with fixed costs over multiple items. For example, if an API provides an RPC that accepts a list of items as input, but clients would prefer adding items one at a time, then a Bundler can accept individual items from the client and bundle many of them into a single RPC.

This package is experimental and subject to change without notice.

Index

Constants

View Source
const (
	DefaultDelayThreshold       = time.Second
	DefaultBundleCountThreshold = 10
	DefaultBundleByteThreshold  = 1e6 // 1M
	DefaultBufferedByteLimit    = 1e9 // 1G
)

Variables

View Source
var (
	// ErrOverflow indicates that Bundler's stored bytes exceeds its BufferedByteLimit.
	ErrOverflow = errors.New("bundler reached buffered byte limit")

	// ErrOversizedItem indicates that an item's size exceeds the maximum bundle size.
	ErrOversizedItem = errors.New("item size exceeds bundle byte limit")
)

Functions

This section is empty.

Types

type Bundler

type Bundler struct {
	// Starting from the time that the first message is added to a bundle, once
	// this delay has passed, handle the bundle. The default is DefaultDelayThreshold.
	DelayThreshold time.Duration

	// Once a bundle has this many items, handle the bundle. Since only one
	// item at a time is added to a bundle, no bundle will exceed this
	// threshold, so it also serves as a limit. The default is
	// DefaultBundleCountThreshold.
	BundleCountThreshold int

	// Once the number of bytes in current bundle reaches this threshold, handle
	// the bundle. The default is DefaultBundleByteThreshold. This triggers handling,
	// but does not cap the total size of a bundle.
	BundleByteThreshold int

	// The maximum size of a bundle, in bytes. Zero means unlimited.
	BundleByteLimit int

	// The maximum number of bytes that the Bundler will keep in memory before
	// returning ErrOverflow. The default is DefaultBufferedByteLimit.
	BufferedByteLimit int

	// The maximum number of handler invocations that can be running at once.
	// The default is 1.
	HandlerLimit int
	// contains filtered or unexported fields
}

A Bundler collects items added to it into a bundle until the bundle exceeds a given size, then calls a user-provided function to handle the bundle.

The exported fields are only safe to modify prior to the first call to Add or AddWait.

func NewBundler

func NewBundler(itemExample interface{}, handler func(interface{})) *Bundler

NewBundler creates a new Bundler.

itemExample is a value of the type that will be bundled. For example, if you want to create bundles of *Entry, you could pass &Entry{} for itemExample.

handler is a function that will be called on each bundle. If itemExample is of type T, the argument to handler is of type []T. handler is always called sequentially for each bundle, and never in parallel.

Configure the Bundler by setting its thresholds and limits before calling any of its methods.

func (*Bundler) Add

func (b *Bundler) Add(item interface{}, size int) error

Add adds item to the current bundle. It marks the bundle for handling and starts a new one if any of the thresholds or limits are exceeded. The type of item must be assignable to the itemExample parameter of the NewBundler method, otherwise there will be a panic.

If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then the item can never be handled. Add returns ErrOversizedItem in this case.

If adding the item would exceed the maximum memory allowed (Bundler.BufferedByteLimit) or an AddWait call is blocked waiting for memory, Add returns ErrOverflow.

Add never blocks.

func (*Bundler) AddWait

func (b *Bundler) AddWait(ctx context.Context, item interface{}, size int) error

AddWait adds item to the current bundle. It marks the bundle for handling and starts a new one if any of the thresholds or limits are exceeded.

If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then the item can never be handled. AddWait returns ErrOversizedItem in this case.

If adding the item would exceed the maximum memory allowed (Bundler.BufferedByteLimit), AddWait blocks until space is available or ctx is done.

Calls to Add and AddWait should not be mixed on the same Bundler.

func (*Bundler) Flush

func (b *Bundler) Flush()

Flush invokes the handler for all remaining items in the Bundler and waits for it to return.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL