batch

package
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 5, 2024 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrDontAck = errors.New("Destination encountered a retryable error")

ErrDontAck should be returned by ErrorHandlers when they wish to signal to the batcher to skip acking a message as delivered, but continue to process. For example, if an error is retryable and will be retried upstream at the source if an ack is not received before some timeout.

Functions

func FlushFrequency

func FlushFrequency(d time.Duration) func(*Opts)

func FlushLength

func FlushLength(size int) func(*Opts)

func FlushParallelism

func FlushParallelism(n int) func(*Opts)

func FlushTimeout added in v0.2.0

func FlushTimeout(d time.Duration) func(*Opts)

func StopTimeout added in v0.0.6

func StopTimeout(d time.Duration) func(*Opts)

func WatchdogTimeout added in v0.2.0

func WatchdogTimeout(d time.Duration) func(*Opts)

Types

type Destination

type Destination[T any] struct {
	// contains filtered or unexported fields
}

Destination is a batching destination that will buffer messages until the FlushLength limit is reached or the FlushFrequency timer fires, whichever comes first.

`Destination.Run` must be called after calling `New` before events will be processed in this destination. Not calling `Run` will likely end in a deadlock as the internal channel being written to by `Send` will not be getting read.

func NewDestination

func NewDestination[T any](f Flusher[T], e ErrorHandler[T], opts ...OptFunc) *Destination[T]

NewDestination instantiates a new batcher.

func (*Destination[T]) Run

func (d *Destination[T]) Run(ctx context.Context) error

Run starts the batching destination. It must be called before messages will be processed and written to the underlying Flusher. Run will block until the context is canceled. Upon cancellation, Run will flush any remaining messages in the buffer and return any flush errors that occur

func (*Destination[T]) Send

func (d *Destination[T]) Send(ctx context.Context, ack func(), msgs ...kawa.Message[T]) error

Send satisfies the kawa.Destination interface and accepts messages to be buffered for flushing after the FlushLength limit is reached or the FlushFrequency timer fires, whichever comes first.

Messages will not be acknowledged until they have been flushed successfully.

type ErrorFunc added in v0.1.3

type ErrorFunc[T any] func(context.Context, error, []kawa.Message[T]) error

func (ErrorFunc[T]) HandleError added in v0.1.3

func (ef ErrorFunc[T]) HandleError(c context.Context, err error, msgs []kawa.Message[T]) error

type ErrorHandler added in v0.0.8

type ErrorHandler[T any] interface {
	HandleError(context.Context, error, []kawa.Message[T]) error
}

func DiscardHandler added in v0.0.8

func DiscardHandler[T any]() ErrorHandler[T]

func Raise added in v0.0.8

func Raise[T any]() ErrorHandler[T]

type FlushFunc

type FlushFunc[T any] func(context.Context, []kawa.Message[T]) error

func (FlushFunc[T]) Flush

func (ff FlushFunc[T]) Flush(c context.Context, msgs []kawa.Message[T]) error

type Flusher

type Flusher[T any] interface {
	Flush(context.Context, []kawa.Message[T]) error
}

Flusher is the core interface that the user of this package must implement to get the batching functionality. It takes a slice of messages and returns an error if the flush fails. It's expected to be run synchronously and only return once the flush is complete. The flusher MUST respond to the context being canceled and return an error if the context is canceled. If no other error occured, then return the context error.

type OptFunc

type OptFunc func(*Opts)

type Opts

type Opts struct {
	FlushLength      int
	FlushFrequency   time.Duration
	FlushTimeout     time.Duration
	FlushParallelism int
	StopTimeout      time.Duration
	WatchdogTimeout  time.Duration
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL