batch

package
v0.0.0-...-75eef08 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 4, 2025 License: MIT Imports: 5 Imported by: 0

README

Batch Processing

This package offers a robust batch processing system for aggregating and processing items efficiently in batches. It's designed with concurrency and efficiency in mind, aligning with Go's concurrency patterns.

Features

  • Batch processing: Groups items for efficient bulk processing.
  • Concurrency Safe: Thread-safe for reliable operation under concurrent loads.
  • Configurable: Allows for custom batch sizes and tick intervals.
  • Context Support: Supports graceful shutdowns and cancellations.
  • Generics: Utilizes Go's generics for type safety.

Usage

Here's an example of how to use the batch processing package:

package main

import (
	"context"
	"fmt"
	"time"

  "github.com/shortlink-org/shortlink/pkg/batch"
)

func main() {
	ctx := context.Background()

	// Define the callback function
	callback := func(items []*batch.Item[string]) error {
		for _, item := range items {
			// Process item
			time.Sleep(time.Millisecond * 10) // Simulate work
			item.CallbackChannel <- item.Item + " processed"
			close(item.CallbackChannel)
		}
		return nil
	}

	// Create a new batch processor
	b, err := batch.New(ctx, callback, batch.WithSize, batch.WithInterval[string](time.Second))
	if err != nil {
		panic(err)
	}

	// Push items into the batch processor
	for i := 0; i < 20; i++ {
		resChan := b.Push(fmt.Sprintf("Item %d", i))
		go func(ch chan string) {
			result, ok := <-ch
			if ok {
				fmt.Println(result)
			} else {
				fmt.Println("Channel closed before processing")
			}
		}(resChan)
	}

	// Wait to ensure all items are processed
	time.Sleep(2 * time.Second)
}

References

Documentation

Overview

Package for work in batch mode

Index

Constants

This section is empty.

Variables

View Source
var ErrInvalidType = errors.New("invalid type")

Functions

This section is empty.

Types

type Batch

type Batch[T any] struct {
	// contains filtered or unexported fields
}

Batch is a structure for batch processing

func New

func New[T any](ctx context.Context, cb func([]*Item[T]) error, opts ...Option[T]) (*Batch[T], error)

New creates a new batch with a specified callback function.

func (*Batch[T]) Push

func (b *Batch[T]) Push(item T) chan T

Push adds an item to the batch.

type Item

type Item[T any] struct {
	CallbackChannel chan T
	Item            T
}

Item represents an item that can be pushed to the batch.

type Option

type Option[T any] func(*Batch[T])

Option is the type for batch options.

func WithInterval

func WithInterval[T any](interval time.Duration) Option[T]

WithInterval sets the interval for the batch.

func WithSize

func WithSize[T any](size int) Option[T]

WithSize sets the size for the batch.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL