batcher

package
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 11, 2024 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Overview

batcher implements a generic batching lib for API calls so that load can be reduced to external APIs that excessively throttle

Index

Constants

This section is empty.

Variables

View Source
var (
	BatchWindowDuration = opmetrics.NewPrometheusHistogram(crmetrics.Registry, prometheus.HistogramOpts{
		Namespace: metrics.Namespace,
		Subsystem: batcherSubsystem,
		Name:      "batch_time_seconds",
		Help:      "Duration of the batching window per batcher",
		Buckets:   metrics.DurationBuckets(),
	}, []string{batcherNameLabel})
	BatchSize = opmetrics.NewPrometheusHistogram(crmetrics.Registry, prometheus.HistogramOpts{
		Namespace: metrics.Namespace,
		Subsystem: batcherSubsystem,
		Name:      "batch_size",
		Help:      "Size of the request batch per batcher",
		Buckets:   SizeBuckets(),
	}, []string{batcherNameLabel})
)

Functions

func DefaultHasher

func DefaultHasher[T input](_ context.Context, input *T) uint64

DefaultHasher will hash the entire input

func FilterHasher

func FilterHasher(ctx context.Context, input *ec2.DescribeInstancesInput) uint64

func OneBucketHasher

func OneBucketHasher[T input](_ context.Context, _ *T) uint64

OneBucketHasher will return a constant hash and should be used when there is only one type of request

func SizeBuckets

func SizeBuckets() []float64

SizeBuckets returns a []float64 of default threshold values for size histograms. Each returned slice is new and may be modified without impacting other bucket definitions.

Types

type BatchExecutor

type BatchExecutor[T input, U output] func(ctx context.Context, input []*T) []Result[U]

BatchExecutor is a function that executes a slice of inputs against the batched API. inputs will be mutated The returned Result slice is expected to match the len of the input slice and be in the same order, if order matters for the batched API

type Batcher

type Batcher[T input, U output] struct {
	// contains filtered or unexported fields
}

Batcher is used to batch API calls with identical parameters into a single call

func NewBatcher

func NewBatcher[T input, U output](ctx context.Context, options Options[T, U]) *Batcher[T, U]

NewBatcher creates a batcher that can batch a particular input and output type

func (*Batcher[T, U]) Add

func (b *Batcher[T, U]) Add(ctx context.Context, input *T) Result[U]

Add will add an input to the batcher using the batcher's hashing function

type CreateFleetBatcher

type CreateFleetBatcher struct {
	// contains filtered or unexported fields
}

func NewCreateFleetBatcher

func NewCreateFleetBatcher(ctx context.Context, ec2api sdk.EC2API) *CreateFleetBatcher

func (*CreateFleetBatcher) CreateFleet

func (b *CreateFleetBatcher) CreateFleet(ctx context.Context, createFleetInput *ec2.CreateFleetInput) (*ec2.CreateFleetOutput, error)

type DescribeInstancesBatcher

type DescribeInstancesBatcher struct {
	// contains filtered or unexported fields
}

func NewDescribeInstancesBatcher

func NewDescribeInstancesBatcher(ctx context.Context, ec2api sdk.EC2API) *DescribeInstancesBatcher

func (*DescribeInstancesBatcher) DescribeInstances

func (b *DescribeInstancesBatcher) DescribeInstances(ctx context.Context, describeInstancesInput *ec2.DescribeInstancesInput) (*ec2.DescribeInstancesOutput, error)

type Options

type Options[T input, U output] struct {
	Name              string
	IdleTimeout       time.Duration
	MaxTimeout        time.Duration
	MaxItems          int
	MaxRequestWorkers int
	RequestHasher     RequestHasher[T]
	BatchExecutor     BatchExecutor[T, U]
}

Options allows for configuration of the Batcher

type RequestHasher

type RequestHasher[T input] func(ctx context.Context, input *T) uint64

RequestHasher is a function that hashes input to bucket inputs into distinct batches

type Result

type Result[U output] struct {
	Output *U
	Err    error
}

Result is a container for the output and error of an execution

type TerminateInstancesBatcher

type TerminateInstancesBatcher struct {
	// contains filtered or unexported fields
}

func NewTerminateInstancesBatcher

func NewTerminateInstancesBatcher(ctx context.Context, ec2api sdk.EC2API) *TerminateInstancesBatcher

func (*TerminateInstancesBatcher) TerminateInstances

func (b *TerminateInstancesBatcher) TerminateInstances(ctx context.Context, terminateInstancesInput *ec2.TerminateInstancesInput) (*ec2.TerminateInstancesOutput, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL