Documentation ¶
Overview ¶
Example ¶
package main import ( "context" "fmt" "sync" "time" "github.com/lachlandrury99/micro" ) type Request string type Result string type BatchProcessor struct{} func (b *BatchProcessor) Process(ctx context.Context, requests []Request) ([]Result, error) { fmt.Printf("Processing batch: %v\n", requests) results := make([]Result, len(requests)) for i, v := range requests { results[i] = Result(fmt.Sprintf("Processed: %s", v)) } return results, nil } func main() { var wg sync.WaitGroup ctx := context.Background() batcher := micro.NewBatcher(&BatchProcessor{}, micro.WithSizeThreshold(2)) batcher.Start(ctx) requests := []Request{"a", "b", "c", "d", "e"} for _, request := range requests { wg.Add(1) job := micro.NewJob(request) jobResult := batcher.Submit(job) time.Sleep(time.Millisecond) go func() { defer wg.Done() result, err := jobResult.Get() if err != nil { panic(err) } fmt.Println(result) }() } batcher.Stop() wg.Wait() }
Output: Processing batch: [a b] Processed: a Processed: b Processing batch: [c d] Processed: c Processed: d Processing batch: [e] Processed: e
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var DefaultBatcherConfig = BatcherConfig{ DurationThreshold: time.Second, SizeThreshold: 10, InflightBatchLimit: 1, BufferSize: 100, }
DefaultBatcherConfig holds the default configuration for BatcherConfig.
var ( // Returned when an inconsistent number of results is returned by the batch processor. ErrBadResults = errors.New("bad result from batch processor") )
Functions ¶
This section is empty.
Types ¶
type BatchProcessor ¶
BatchProcessor is a batch processing interface used by the Batcher to process batches.
Implementations of BatchProcessor should ensure that the Process method always returns results in the same order in which they are requested.
type Batcher ¶
type Batcher[R, Q any] struct { // contains filtered or unexported fields }
Batcher is a service that processes incoming jobs in grouped 'micro batches' according to the set of configurable thresholds.
func NewBatcher ¶
func NewBatcher[R any, Q any](processor BatchProcessor[R, Q], opts ...Option) Batcher[R, Q]
NewBatcher creates a new batcher. The default BatcherConfig is used as fallback.
func NewBatcherWithConfig ¶
func NewBatcherWithConfig[R any, Q any](processor BatchProcessor[R, Q], config BatcherConfig, opts ...Option) Batcher[R, Q]
NewBatcher creates a new batcher with the given config.
type BatcherConfig ¶
type BatcherConfig struct { // DurationThreshold is the frequency of the batch processor. DurationThreshold time.Duration // SizeThreshold is the maximum number of buffered jobs before the batch processor // is executed. SizeThreshold int // InflightBatchLimit is the maximum number of batch processes that can execute // concurrently. This may be useful in cases where the batch processor is // IO bound. InflightBatchLimit int // BufferSize is the size of the job request queue. If the job request queue is full // then batcher.Submit will block until the queue has capacity again. BufferSize int }
BatcherConfig is used to configure a Batcher.
type Buffer ¶
type Buffer[T any] struct { // contains filtered or unexported fields }
Buffer is a collection utility that allows items to be collected until a point in which they are ready to be utilized, allowing the buffer to be flushed and reset.
Buffer is safe for concurrent use.
type Job ¶
type Job[R any] struct { Request R // contains filtered or unexported fields }
Job is a request wrapper
type Option ¶
type Option func(*BatcherConfig)
Option is a function that applies a configuration to BatcherConfig
func WithBufferSize ¶
WithBufferSize configures BatcherConfig.BufferSize
func WithDurationThreshold ¶
WithDurationThreshold configures BatcherConfig.DurationThreshold
func WithInflightBatchLimit ¶
WithInflightBatchLimit configures BatcherConfig.InflightBatchLimit
func WithSizeThreshold ¶
WithSizeThreshold configures BatcherConfig.SizeThreshold