dispatcher

package
v0.0.0-...-51d3280 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 10, 2025 License: Apache-2.0 Imports: 9 Imported by: 7

Documentation

Overview

Package dispatcher implements a super-charged version of a buffered channel connected to a (potentially) parallelized work dispatcher.

This can be used when you have a mismatch between the rate of production of items and the rate of consumption of those items. For example:

  • if you have a producer which constantly produces new world states, and you want to sink the latest one into a slow external RPC (but still do retries if no new state appears).
  • if you have bursty user data which you'd like to batch according to some maximum batch size, but you don't want the data to get too stale in case you don't hit that batch limit.
  • your external RPC can absorb almost infinite data, and the order of delivery doesn't matter, but you don't want to block the data producer.
  • etc.

The dispatcher can be configured to:

  • Buffer a certain amount of work (with possible backpressure to the producer).
  • Batch pending work into chunks for the send function.
  • Drop stale work which is no longer important to send.
  • Enforce a maximum QPS on the send function (even with parallel senders).
  • Retry batches independently with configurable per-batch retry policy.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DropFnQuiet

func DropFnQuiet[T any](*buffer.Batch[T], bool)

DropFnQuiet is an implementation of Options.DropFn which drops batches without logging anything.

func DropFnSummarized

func DropFnSummarized[T any](ctx context.Context, lim *rate.Limiter) func(*buffer.Batch[T], bool)

DropFnSummarized returns an implementation of Options.DropFn which counts the number of dropped batches, and only reports it at the rate provided.

Unlike the default log function, this only logs the number of dropped items and the duration that they were collected over.

func ErrorFnQuiet

func ErrorFnQuiet[T any](b *buffer.Batch[T], err error) (retry bool)

ErrorFnQuiet is an implementation of Options.ErrorFn which doesn't log the batch, but does check for `transient.Tag` to determine `retry`.

func Process

func Process[T any](ctx context.Context, source <-chan T, options *Options[T], consumer SendFn[T]) error

Process consumes a standard channel and blocks until all the items are consumed. It accepts the same arguments as NewChannel.

Types

type Channel

type Channel[T any] struct {
	// C is an unbuffered channel which you can push single work items into.
	//
	// Close this to shut down the Channel.
	C chan<- T

	// DrainC will unblock when this Channel is closed/canceled and fully drained.
	DrainC <-chan struct{}
}

Channel holds a chan which you can push individual work items to.

func NewChannel

func NewChannel[T any](ctx context.Context, opts *Options[T], send SendFn[T]) (Channel[T], error)

NewChannel produces a new Channel ready to listen and send work items.

Args:

  • `ctx` will be used for cancellation and logging. When the `ctx` is canceled, the Channel will:
  • drop all incoming data on Channel.C; All new data will be dropped (calling DropFn).
  • drop all existing unleased batches (calling DropFn)
  • ignore all errors from SendFn (i.e. even if ErrorFn returns 'retry=true', the batch will be dropped anyway) If you want to gracefully drain the Channel, you must close the channel and wait for DrainC before canceling the context.
  • `send` is required, and defines the function to use to process Batches of data. This function MUST respect `ctx.Done`, or the Channel cannot drain properly.
  • `opts` is optional (see Options for the defaults).

The Channel MUST be Close()'d when you're done with it, or the Channel will not terminate. This applies even if you cancel it via ctx. The caller is responsible for this (as opposed to having Channel implement this internally) because there is no generally-safe way in Go to close a channel without coordinating that event with all senders on that channel. Because the caller of NewChannel is effectively the sender (owner) of Channel.C, they must coordinate closure of this channel with all their use of sends to this channel.

func (*Channel[T]) Close

func (c *Channel[T]) Close()

Close is a convenience function which closes C (and swallows panic if already closed).

Close is safe to call on a nil pointer or a Channel containing a nil chan<-. It is safe and recommended to call close in a defer block before checking for errors.

c, err := NewChannel(...)
defer c.Close()
if err != nil {
	...
}

func (*Channel[T]) CloseAndDrain

func (c *Channel[T]) CloseAndDrain(ctx context.Context)

CloseAndDrain is a convenience function which closes C (and swallows panic if already closed) and then blocks on DrainC/ctx.Done().

CloseAndDrain is safe to call on a nil pointer or a channel containing a nil chan<-. It is safe and recommended to call close in a defer block before checking for errors. See close for an example.

func (Channel[T]) Consume

func (c Channel[T]) Consume(ctx context.Context, source <-chan T) error

Consume takes a source channel of items and feeds each item into the dispatcher.Channel. We then call SendFn repeatedly on batches of elements as usual.

Control is handed back to the caller when the deadline is exceeded or every item from the channel is consumed.

Consume returns a non-nil error precisely when the deadline was exceeded.

func (Channel[T]) IsDrained

func (c Channel[T]) IsDrained() bool

IsDrained returns true iff the Channel is closed and drained.

type ErrorFn

type ErrorFn[T any] func(failedBatch *buffer.Batch[T], err error) (retry bool)

ErrorFn is called to handle the error from SendFn.

This is also invoked with buffer.ErrItemTooLarge if your supplied ItemSizeFunc returns a size larger than Buffer.BatchSizeMax (i.e. you pushed an item which couldn't fit inside of a Batch). Similarly, if your ItemSizeFunc returns <=0, this is invoked with buffer.ErrItemTooSmall. Channel ignores the `retry` return value of this function in these cases.

It executes in the main handler loop of the dispatcher so it can make synchronous decisions about the dispatcher state.

Blocking in this function will block ALL dispatcher actions, so be quick :).

DO NOT WRITE TO THE CHANNEL DIRECTLY FROM THIS FUNCTION. Doing so will very likely cause deadlocks.

This may:

  • inspect/log the error
  • manipulate the contents of failedBatch
  • return a boolean of whether this Batch should be retried or not. If this is false then the Batch is dropped. If it's true, then it will be re-queued as-is for transmission according to BufferFullBehavior.
  • pass the Batch.Data to another goroutine (in a non-blocking way!) to be re-queued through Channel.WriteChan.

Args:

  • failedBatch - The Batch for which SendFn produced a non-nil error.
  • err - The error SendFn produced.

Returns true iff the dispatcher should re-try sending this Batch, according to Buffer.Retry.

func ErrorFnReport

func ErrorFnReport[T any](bufferSize int, inner ErrorFn[T]) (ErrorFn[T], <-chan error)

ErrorFnReport is an implementation of Options.ErrorFn which sends all errors to a buffered channel. The channel MUST be drained as quickly as possible. Otherwise, it may block all dispatcher actions.

If `inner` error function is provided, it is used to determine `retry`. Otherwise, `retry` is always false.

type Options

type Options[T any] struct {
	// [OPTIONAL] The ErrorFn to use (see ErrorFn docs for details).
	//
	// Default: Logs the error (at Info for retryable errors, and Error for
	// non-retryable errors) and returns true on a transient error.
	ErrorFn ErrorFn[T]

	// [OPTIONAL] Called with the dropped batch any time the Channel drops a batch.
	//
	// This includes:
	//   * When FullBehavior==DropOldestBatch and we get new data.
	//   * When FullBehavior==DropOldestBatch and we attempt to retry old data.
	//   * When ErrorFn returns false for a batch.
	//
	// It executes in the main handler loop of the dispatcher so it can make
	// synchronous decisions about the dispatcher state.
	//
	// Blocking in this function will block ALL dispatcher actions, so be quick
	// :).
	//
	// DO NOT WRITE TO THE CHANNEL DIRECTLY FROM THIS FUNCTION. Doing so will very
	// likely cause deadlocks.
	//
	// When the channel is fully drained, this will be invoked exactly once with
	// `(nil, true)`. This will occur immediately before the DrainedFn is called.
	// Some drop functions buffer their information, and this gives them an
	// opportunity to flush out any buffered data.
	//
	// Default: logs (at Info level if FullBehavior==DropOldestBatch, or Warning
	// level otherwise) the number of data items in the Batch being dropped.
	DropFn func(b *buffer.Batch[T], flush bool)

	// [OPTIONAL] Called exactly once when the associated Channel is closed and
	// has fully drained its buffer, but before DrainC is closed.
	//
	// Note that this takes effect whether the Channel is shut down via Context
	// cancellation or explicitly by closing Channel.C.
	//
	// This is useful for performing final state synchronization tasks/metrics
	// finalization/helpful "everything is done!" messages/etc. without having to
	// poll the Channel to see if it's done and also maintain external
	// synchronization around the finalization action.
	//
	// Called in the main handler loop, but it's called after all other work is
	// done by the Channel, so the only thing it blocks is the closure of DrainC.
	//
	// Default: No action.
	DrainedFn func()

	// [OPTIONAL] A rate limiter for how frequently this will invoke SendFn.
	//
	// Default: No limit.
	QPSLimit *rate.Limiter

	// [OPTIONAL] The minimal frequency of invoking SendFn.
	//
	// If greater than zero, this Channel will invoke SendFn at least this often,
	// as long as:
	//   * the Channel remains open.
	//   * the Channel Buffer has no data (no pending data and no outstanding
	//     SendFn goroutines).
	//
	// If there's a period of time longer than this with no work items where the
	// criteria above matches, Channel will invoke SendFn with a batch containing
	// a single zero-size BatchItem with the `Synthetic` flag set.
	//
	// NOTE: Depending on the Buffer.FullBehavior setting, these synthesized items
	// could cause existing leases to drop (so - if SendFn returns an error, and
	// ErrorFn returns `retry == true`, the channel may end up dropping the batch
	// because some other MinQPSBatch's were synthesized).
	//
	// Errors returned from these SendFn invocations are still processed
	// normally, and synthesized batches still count against QPSLimit.
	//
	// It is an error to specify a MinQPS value which is
	// * greater than QPSLimit.Limit(),
	// * or is rate.Inf.
	//
	// Default: No minimum QPS, no MinQPSBatch batches will be sent.
	MinQPS rate.Limit

	// [OPTIONAL]
	// Should return the size of the given buffer item (i.e. what you push into
	// Channel.C) in whatever units you like (see Buffer.BatchSizeMax).
	//
	// The function will only ever be called once per pushed item.
	//
	// [REQUIRED]
	// Must be non-nil if Buffer.BatchSizeMax is specified.
	//
	// Must return a positive value less than Buffer.BatchSizeMax. Failure to do
	// so will cause `itm` to be immediately rejected from the dispatcher.Channel
	// and routed to ErrorFn with no further processing.
	ItemSizeFunc func(itm T) int

	Buffer buffer.Options
	// contains filtered or unexported fields
}

Options is the configuration options for NewChannel.

type SendFn

type SendFn[T any] func(data *buffer.Batch[T]) error

SendFn is the function which does the work to actually transmit the Batch to the next stage of your processing pipeline (e.g. do an RPC to a remote service).

The function may manipulate the Batch however it wants (see Batch).

In particular, shrinking the size of Batch.Data for confirmed-sent items will allow the dispatcher to reduce its buffer count when SendFn returns, even if SendFn returns an error. Removing items from the Batch will not cause the remaining items to be coalesced into a different Batch.

The SendFn MUST be bound to this Channel's Context; if the Channel's Context is Cancel'd, SendFn MUST terminate, or the Channel's DrainC will be blocked. We don't pass it as part of SendFn's signature in case SendFn needs to be bound to a derived Context.

Non-nil errors returned by this function will be handled by ErrorFn.

Directories

Path Synopsis
Package buffer implements a batching buffer with batch lease and retry management.
Package buffer implements a batching buffer with batch lease and retry management.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL