buffered

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 19, 2024 License: MIT Imports: 15 Imported by: 1

README

Buffered writes in Go

buffered is a package for buffered writing to a database. It's agnostic to the type of database used. It supports different flushing strategies based on:

  1. The maximum amount of time an item should spend in the buffer
  2. The maximum size of the buffer
  3. The maximum amount of memory in the buffer

This library is used internally by Hatchet, but is meant to be general-purpose. Issues and PRs are welcome.

Usage

Install the package:

go get github.com/hatchet-dev/buffered

Basic usage is as follows:

type mockItem struct {
	ID    int
	Size  int
	Value string
}

type mockResult struct {
	ID int
}

opts := buffered.BufferOpts[mockItem, mockResult]{
    Name:               "test",
    MaxCapacity:        2,
    // We set the flush period to be 5 seconds
    FlushPeriod:        5 * time.Second,
    MaxDataSizeInQueue: 100,
    FlushFunc:          mockFlushFunc,
    SizeFunc:           mockSizeFunc,
}

b := buffered.NewBuffer(opts)

cleanup, err := b.Start()
defer cleanup()

if err != nil {
    panic(err)
}

b.BuffItem()
doneChan, err := buf.BuffItem(mockItem{
    ID: 1,
    Size: 10,
    Value: "one"
})

if err != nil {
    panic(err)
}

// This will return after 5 seconds
resp := <-doneChan

fmt.Println(resp.Result)

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Validate

func Validate(s any) error

Types

type Buffer

type Buffer[T any, U any] struct {
	// contains filtered or unexported fields
}

Buffer stores items in a buffer and flushes them when the following conditions are met:

1. The buffer is full 2. The buffer has held items for a certain period of time 3. The buffer has held items with a certain size

func NewBuffer

func NewBuffer[T any, U any](opts BufferOpts[T, U]) *Buffer[T, U]

NewBuffer creates a new buffer for any type T

func (*Buffer[T, U]) BuffItem

func (b *Buffer[T, U]) BuffItem(item T) (chan *FlushResponse[U], error)

BuffItem adds a new item to the buffer. It returns a channel that will receive the result of the flush.

func (*Buffer[T, U]) Start

func (b *Buffer[T, U]) Start() (func() error, error)

Start starts the buffer. It returns a function that can be called to stop the buffer.

func (*Buffer[T, U]) StartDebugLoop

func (b *Buffer[T, U]) StartDebugLoop()

type BufferOpts

type BufferOpts[T any, U any] struct {
	// Name of the buffer. Used for debugging.
	Name string `validate:"required"`

	// MaxCapacity is the maximum number of items to hold in buffer before we initiate a flush
	MaxCapacity int `validate:"required,gt=0"`

	// FlushPeriod is the maximum time to hold items in buffer before we initiate a flush
	FlushPeriod time.Duration `validate:"required,gt=0"`

	// MaxDataSizeInQueue is the maximum number of bytes to hold in buffer before we initiate a flush
	MaxDataSizeInQueue int `validate:"required,gt=0"`

	// FlushFunc is the function to call to flush the buffer
	FlushFunc func(ctx context.Context, items []T) ([]U, error) `validate:"required"`

	// SizeFunc is the function to call to get the size of an item
	SizeFunc func(T) int `validate:"required"`

	// (optional) a zerolog logger
	L *zerolog.Logger `validate:"omitnil"`

	// MaxConcurrent is the maximum number of concurrent flushes
	MaxConcurrent int `validate:"omitempty,gt=0"`

	// WaitForFlush is the time to wait for the buffer to flush used for backpressure on writers
	WaitForFlush time.Duration `validate:"omitempty,gt=0"`
}

BufferOpts is the configuration for the buffer

type FlushResponse

type FlushResponse[U any] struct {
	Result U
	Err    error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL