microbatcher

package module
v0.0.0-...-26baf89 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 20, 2024 License: Apache-2.0 Imports: 4 Imported by: 0

README

MicroBatcher

MicroBatcher is a Go package for managing and processing batches of jobs. It allows you to submit jobs to a queue, batch them together, and process them using a configurable batch processor. This package is useful for scenarios where you need to handle jobs in groups, providing both batching by size and time intervals.

Features

  • Batching: Collect jobs into batches based on size or time interval.
  • Concurrent Processing: Process batches of jobs concurrently using a BatchProcessor.
  • Graceful Shutdown: Ensure all jobs are processed before shutting down.
  • Configurable Options: Customize batch size, interval, and the batch implementation.

Installation

To include the microbatcher package in your Go project, use:

go get github.com/cuzzasoft/batcher

Usage

Try it out at the Go Playground.

Start a new batcher

Provide your own BatchProcessor implementation and configure the batcher with the desired options.

batcher := microbatcher.NewMicroBatcher(
    &MyProcessor{},
    microbatcher.WithBatchSize(5),
    microbatcher.WithBatchInterval(2*time.Second),
)
batcher.Start(ctx)
Listen for results

When your jobs are complete they will be sent back to the results channel. This is obtained using the Results method.

jobResultsCh := batcher.Results(ctx)
Submit jobs

Submit jobs to the batcher using the Submit method. The batcher will batch the jobs together and process them according to the configured options.

for i := 0; i < 25; i++ {
    batcher.Submit(ctx, microbatcher.Job{
        ID:   fmt.Sprintf("job-%d", i),
        Data: fmt.Sprintf("data-%d", i),
    })
}
Graceful Shutdown

Shutdown the batcher to ensure all jobs are processed before exiting. This will wait for all jobs to be processed before returning.

err := batcher.Shutdown(ctx)
if err != nil {
    return
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Batch

type Batch interface {
	Add(job Job)
	Size() int
	Drain() []Job
}

Batch represents a collection of jobs. Must be thread-safe.

type BatchProcessor

type BatchProcessor interface {
	ProcessBatch(jobs []Job) []JobResult
}

BatchProcessor should be implemented to handle processing the batches of jobs. It is responsible for handling any errors / timeouts in the batch.

type Job

type Job struct {
	ID   string
	Data interface{}
}

Job represents a single unit of work provided to the BatchProcessor.

type JobBatch

type JobBatch struct {
	// contains filtered or unexported fields
}

func NewJobBatch

func NewJobBatch() *JobBatch

NewJobBatch creates a new JobBatch.

func (*JobBatch) Add

func (b *JobBatch) Add(job Job)

Add adds a Job to the batch.

func (*JobBatch) Drain

func (b *JobBatch) Drain() []Job

Drain clears and returns all jobs in the batch.

func (*JobBatch) Size

func (b *JobBatch) Size() int

Size returns the number of Jobs currently in the batch.

type JobResult

type JobResult struct {
	Job    Job
	Error  error
	Result interface{}
}

JobResult represents the result of a processed job provided by the BatchProcessor.

type MicroBatcher

type MicroBatcher struct {
	// contains filtered or unexported fields
}

MicroBatcher manages handling of the Job submission, batching the Job, and processing of it. When a Job is submitted, it is added to the batch and processed by the BatchProcessor if the batch size is reached. When the batchInterval is reached, the batch is processed regardless of the batch size. See Shutdown for details on shutdown.

func NewMicroBatcher

func NewMicroBatcher(processor BatchProcessor, options ...Option) *MicroBatcher

NewMicroBatcher creates a new MicroBatcher with the provided BatchProcessor, batch size, and batch interval. The default batch size is 10, and the default batch interval is 3 minutes.

func (*MicroBatcher) Results

func (mb *MicroBatcher) Results(context.Context) <-chan JobResult

Results returns a channel to receive the results of processing as JobResult. This channel is closed when Shutdown is called.

func (*MicroBatcher) Shutdown

func (mb *MicroBatcher) Shutdown(context.Context) error

Shutdown stops the MicroBatcher and waits for all jobs to be processed. When calling Shutdown, the system processes the batch and sends the results to the results channel.

func (*MicroBatcher) Start

func (mb *MicroBatcher) Start(context.Context)

Start initializes the processing loop for batching jobs. This will process the jobs in batches and send the results to the results channel. If shutdown is called, the processing loop will be stopped and the results channel will be shutdown.

func (*MicroBatcher) Submit

func (mb *MicroBatcher) Submit(ctx context.Context, job Job)

Submit a Job to the micro-batcher.

type Option

type Option func(*MicroBatcher)

Option for configuring the MicroBatcher.

func WithBatch

func WithBatch(batch Batch) Option

WithBatch sets the batch for the MicroBatcher. This is how the jobs are batched together before being processed.

func WithBatchInterval

func WithBatchInterval(interval time.Duration) Option

WithBatchInterval sets the batch interval for the MicroBatcher. This is how often the batch is processed, regardless of the batch size.

func WithBatchSize

func WithBatchSize(size int) Option

WithBatchSize sets the batch size for the MicroBatcher. This is how many jobs are batched together before being processed.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL