micro

package module
v0.0.0-...-140feae Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 7, 2024 License: MIT Imports: 8 Imported by: 0

README

Micro

Micro is a go micro-batching library.

What is micro-batching?

Miro-batching is a batching technique aimed at balancing processing throughput and system overhead. It achieves this by grouping tasks into small batches that can be processed collectively by downstream systems.

Usage

See here for an example on how to use the library.

Design

The library is designed as a utility wrapper for function-specific batch processors. The library is able to utilize batch processors that implement the exposed interface BatchProcessor. The batching service provided handles the ingestion/buffering of incoming jobs and executes the batch processor as required; based on the configuration set.

Documentation

Overview

Example
package main

import (
	"context"
	"fmt"
	"sync"
	"time"

	"github.com/lachlandrury99/micro"
)

type Request string
type Result string

type BatchProcessor struct{}

func (b *BatchProcessor) Process(ctx context.Context, requests []Request) ([]Result, error) {
	fmt.Printf("Processing batch: %v\n", requests)
	results := make([]Result, len(requests))
	for i, v := range requests {
		results[i] = Result(fmt.Sprintf("Processed: %s", v))
	}
	return results, nil
}

func main() {
	var wg sync.WaitGroup

	ctx := context.Background()
	batcher := micro.NewBatcher(&BatchProcessor{}, micro.WithSizeThreshold(2))

	batcher.Start(ctx)

	requests := []Request{"a", "b", "c", "d", "e"}
	for _, request := range requests {
		wg.Add(1)

		job := micro.NewJob(request)
		jobResult := batcher.Submit(job)

		time.Sleep(time.Millisecond)

		go func() {
			defer wg.Done()
			result, err := jobResult.Get()
			if err != nil {
				panic(err)
			}
			fmt.Println(result)
		}()
	}

	batcher.Stop()

	wg.Wait()

}
Output:

Processing batch: [a b]
Processed: a
Processed: b
Processing batch: [c d]
Processed: c
Processed: d
Processing batch: [e]
Processed: e

Index

Examples

Constants

This section is empty.

Variables

View Source
var DefaultBatcherConfig = BatcherConfig{
	DurationThreshold:  time.Second,
	SizeThreshold:      10,
	InflightBatchLimit: 1,
	BufferSize:         100,
}

DefaultBatcherConfig holds the default configuration for BatcherConfig.

View Source
var (
	// Returned when an inconsistent number of results is returned by the batch processor.
	ErrBadResults = errors.New("bad result from batch processor")
)

Functions

This section is empty.

Types

type BatchProcessor

type BatchProcessor[R, Q any] interface {
	Process(context.Context, []R) ([]Q, error)
}

BatchProcessor is a batch processing interface used by the Batcher to process batches.

Implementations of BatchProcessor should ensure that the Process method always returns results in the same order in which they are requested.

type Batcher

type Batcher[R, Q any] struct {
	// contains filtered or unexported fields
}

Batcher is a service that processes incoming jobs in grouped 'micro batches' according to the set of configurable thresholds.

func NewBatcher

func NewBatcher[R any, Q any](processor BatchProcessor[R, Q], opts ...Option) Batcher[R, Q]

NewBatcher creates a new batcher. The default BatcherConfig is used as fallback.

func NewBatcherWithConfig

func NewBatcherWithConfig[R any, Q any](processor BatchProcessor[R, Q], config BatcherConfig, opts ...Option) Batcher[R, Q]

NewBatcher creates a new batcher with the given config.

func (*Batcher[R, Q]) Start

func (m *Batcher[R, Q]) Start(ctx context.Context)

Start begins the Batcher's service loop.

func (*Batcher[R, Q]) Stop

func (m *Batcher[R, Q]) Stop()

Stop closes the internal request queue and waits for the service loop to complete shutdown.

func (*Batcher[R, Q]) Submit

func (m *Batcher[R, Q]) Submit(j Job[R]) JobResult[Q]

Submit submits the job to the request queue, returning a JobResult. The function is non-blocking as long as the internal request queue is not full.

This method is safe for concurrent use.

type BatcherConfig

type BatcherConfig struct {
	// DurationThreshold is the frequency of the batch processor.
	DurationThreshold time.Duration
	// SizeThreshold is the maximum number of buffered jobs before the batch processor
	// is executed.
	SizeThreshold int
	// InflightBatchLimit is the maximum number of batch processes that can execute
	// concurrently. This may be useful in cases where the batch processor is
	// IO bound.
	InflightBatchLimit int
	// BufferSize is the size of the job request queue. If the job request queue is full
	// then batcher.Submit will block until the queue has capacity again.
	BufferSize int
}

BatcherConfig is used to configure a Batcher.

type Buffer

type Buffer[T any] struct {
	// contains filtered or unexported fields
}

Buffer is a collection utility that allows items to be collected until a point in which they are ready to be utilized, allowing the buffer to be flushed and reset.

Buffer is safe for concurrent use.

func (*Buffer[T]) Add

func (b *Buffer[T]) Add(item T)

Add adds an item to the buffer

func (*Buffer[T]) Flush

func (b *Buffer[T]) Flush() []T

Flush returns the buffered items and resets the buffer

func (*Buffer[T]) Size

func (b *Buffer[T]) Size() int

Size returns the number of items in the buffer

type Job

type Job[R any] struct {
	Request R
	// contains filtered or unexported fields
}

Job is a request wrapper

func NewJob

func NewJob[R any](request R) Job[R]

NewJob returns a new job

func (Job[R]) ID

func (j Job[R]) ID() string

ID returns a unique identifier for the job

type JobResult

type JobResult[Q any] struct {
	JobID string
	// contains filtered or unexported fields
}

JobResult is a result type that contains a channel for the expected result

func (*JobResult[Q]) Get

func (j *JobResult[Q]) Get() (Q, error)

Get waits for the result to be received from the batcher

type Option

type Option func(*BatcherConfig)

Option is a function that applies a configuration to BatcherConfig

func WithBufferSize

func WithBufferSize(s int) Option

WithBufferSize configures BatcherConfig.BufferSize

func WithDurationThreshold

func WithDurationThreshold(d time.Duration) Option

WithDurationThreshold configures BatcherConfig.DurationThreshold

func WithInflightBatchLimit

func WithInflightBatchLimit(l int) Option

WithInflightBatchLimit configures BatcherConfig.InflightBatchLimit

func WithSizeThreshold

func WithSizeThreshold(d int) Option

WithSizeThreshold configures BatcherConfig.SizeThreshold

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL