gomicrobee

package
v0.0.0-...-5d479c6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 7, 2024 License: GPL-3.0, MIT Imports: 3 Imported by: 0

README

Micro Bee

Busy as a Bee: Turbocharge Your Workflows with Go Micro Bee's Micro Batching

a Bee busy doing jobs

Micro Bee is a small library to do micro batch processing in golang.

Mico batching is the process of grouping Jobs together to process them at once. This can bring significant performance benefits when processing a large amounts of data. For example, you could use micro batching to keep CPU cores busy or minimise the amount of database transactions.

Usage

Consider if you had a database that was overwhelmed by a large amount of transactions inserting Users. You could optimise this process by using microbee.

First you would create a processor with a function that to bulk insert the users. Then you could submit the users one by one as they became available.

func MyInsertUsers(users []MyUser) []bool {
    ???
}

processor := microbee.NewProcessor(
    MyInsertUsers,
    100, // batch size
    100 * time.Millisecond // linger time
)

jobResult1 := processor.Submit( MyUser{ "foo" } )
jobResult2 := processor.Submit( MyUser{ "bar" } )

result1, err := jobResutl1.Get()
if err != nil {
    log.Print("error: unable to insert user")
}
result2, err := jobResutl2.Get()
if err != nil {
    log.Print("error: unable to insert user")
}

processor.Shutdown()

testing

Tests can be run with go test ./...

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrShutdown = errors.New("shutdown")

Functions

This section is empty.

Types

type BatchProcessor

type BatchProcessor[A any, B any] func(jobs []A) []B

The BatchProcessor function transforms a list of values into a list of results. The processor MUST produce a result for each input, and the order of the results must be in the same order as the input.

type JobResult

type JobResult[T any] interface {
	// Block until the result is ready, potentially waiting for the System wait for more jobs and execute the batch.
	// Can be called multiple times to receive the same value.
	Get() T
}

JobResults contain a reference to Get the eventual result.

type System

type System[A any, B any] interface {
	// Add a new job to the queue of pending jobs.
	// Jobs are added to the queue but their execution is delayed until the batch is processed.
	// Returns a JobResult with the eventual answer.
	// Can return an error if the System was unable to process the job. Eg, it was shutdown.
	Submit(job A) (JobResult[B], error)
	// Stop the system from accepting new jobs and finish any existing jobs.
	Shutdown()
}

A System is configured with a BatchProcessor, a batch size and an interval. the System accepts jobs via `Submit` and returns a `JobResult` which contains a way to get the result when it is eventually ready.

func NewSystem

func NewSystem[A any, B any](
	processor BatchProcessor[A, B],

	batchSize int,

	linger time.Duration,
) System[A, B]

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL