batchpipe

package
v1.0.0-rc2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 24, 2023 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Overview

A common util to export data (trace and metric for now) as batch

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BaseBatchPipe

type BaseBatchPipe[T HasName, B any] struct {
	// contains filtered or unexported fields
}

func NewBaseBatchPipe

func NewBaseBatchPipe[T HasName, B any](impl PipeImpl[T, B], opts ...BaseBatchPipeOpt) *BaseBatchPipe[T, B]

func (*BaseBatchPipe[T, B]) SendItem

func (bc *BaseBatchPipe[T, B]) SendItem(ctx context.Context, items ...T) error

SendItem returns error when pipe is closed

func (*BaseBatchPipe[T, B]) Start

func (bc *BaseBatchPipe[T, B]) Start(ctx context.Context) bool

Start kicks off the merge workers and batch workers, return false if the pipe is workinghas been called

func (*BaseBatchPipe[T, B]) Stop

func (bc *BaseBatchPipe[T, B]) Stop(graceful bool) (<-chan struct{}, bool)

Stop terminates all workers. If graceful asked, wait until all items are processed, otherwise, quit immediately. Caller can use the returned channel to wait Stop finish

type BaseBatchPipeOpt

type BaseBatchPipeOpt interface {
	ApplyTo(*baseBatchPipeOpts)
}

type HasName

type HasName interface {
	GetName() string
}

HasName decides which table the owner will go to

type ItemBuffer

type ItemBuffer[T any, B any] interface {
	Reminder
	Add(item T)
	Reset()
	IsEmpty() bool
	ShouldFlush() bool
	// GetBatch use bytes.Buffer to mitigate mem allocation and the returned bytes should own its data
	GetBatch(ctx context.Context, buf *bytes.Buffer) B
}

ItemBuffer Stash items and construct a batch can be stored. for instance, a sql inserting all items into a table

type PipeImpl

type PipeImpl[T any, B any] interface {
	// NewItemBuffer create a new buffer for one kind of Item
	NewItemBuffer(name string) ItemBuffer[T, B]
	// NewItemBatchHandler handle the StoreBatch from an ItemBuffer, for example, execute an insert sql.
	// this handle may be running on multiple goroutine
	NewItemBatchHandler(ctx context.Context) func(batch B)
}

type PipeWithBatchWorkerNum

type PipeWithBatchWorkerNum int

func (PipeWithBatchWorkerNum) ApplyTo

func (x PipeWithBatchWorkerNum) ApplyTo(o *baseBatchPipeOpts)

type PipeWithBufferWorkerNum

type PipeWithBufferWorkerNum int

func (PipeWithBufferWorkerNum) ApplyTo

func (x PipeWithBufferWorkerNum) ApplyTo(o *baseBatchPipeOpts)

type PipeWithItemNameFormatter

type PipeWithItemNameFormatter func(HasName) string

func (PipeWithItemNameFormatter) ApplyTo

func (x PipeWithItemNameFormatter) ApplyTo(o *baseBatchPipeOpts)

type Reminder

type Reminder interface {
	RemindNextAfter() time.Duration
	RemindBackOff()
	RemindBackOffCnt() int
	RemindReset()
}

Reminder to force flush ItemBuffer

func NewBackOffClock

func NewBackOffClock(base time.Duration, f func(int, time.Duration) time.Duration) Reminder

func NewConstantClock

func NewConstantClock(cycle time.Duration) Reminder

func NewExpBackOffClock

func NewExpBackOffClock(init, max time.Duration, base int) Reminder

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL