processor

package
v0.0.171 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 4, 2024 License: GPL-3.0 Imports: 8 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultMaxQueueSize       = 51200
	DefaultScheduleDelay      = 5000
	DefaultExportTimeout      = 30000
	DefaultMaxExportBatchSize = 512
	DefaultShippingMethod     = ShippingMethodAsync
	DefaultNumWorkers         = 1
)

Defaults for BatchItemProcessorOptions.

Variables

View Source
var (
	DefaultMetrics = NewMetrics("xatu")
)

Functions

This section is empty.

Types

type BatchItemProcessor

type BatchItemProcessor[T any] struct {
	// contains filtered or unexported fields
}

BatchItemProcessor is a buffer that batches asynchronously-received items and sends them to a exporter when complete.

func NewBatchItemProcessor

func NewBatchItemProcessor[T any](exporter ItemExporter[T], name string, log logrus.FieldLogger, options ...BatchItemProcessorOption) (*BatchItemProcessor[T], error)

NewBatchItemProcessor creates a new ItemProcessor that will send completed item batches to the exporter with the supplied options.

If the exporter is nil, the item processor will preform no action.

func (*BatchItemProcessor[T]) ImmediatelyExportItems

func (bvp *BatchItemProcessor[T]) ImmediatelyExportItems(ctx context.Context, items []*T) error

ImmediatelyExportItems immediately exports the items to the exporter. Useful for propogating errors from the exporter.

func (*BatchItemProcessor[T]) Shutdown

func (bvp *BatchItemProcessor[T]) Shutdown(ctx context.Context) error

Shutdown flushes the queue and waits until all items are processed. It only executes once. Subsequent call does nothing.

func (*BatchItemProcessor[T]) Write

func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error

OnEnd method enqueues a item for later processing.

type BatchItemProcessorOption

type BatchItemProcessorOption func(o *BatchItemProcessorOptions)

BatchItemProcessorOption configures a BatchItemProcessor.

func WithBatchTimeout

func WithBatchTimeout(delay time.Duration) BatchItemProcessorOption

WithBatchTimeout returns a BatchItemProcessorOption that configures the maximum delay allowed for a BatchItemProcessor before it will export any held item (whether the queue is full or not).

func WithExportTimeout

func WithExportTimeout(timeout time.Duration) BatchItemProcessorOption

WithExportTimeout returns a BatchItemProcessorOption that configures the amount of time a BatchItemProcessor waits for an exporter to export before abandoning the export.

func WithMaxExportBatchSize

func WithMaxExportBatchSize(size int) BatchItemProcessorOption

WithMaxExportBatchSize returns a BatchItemProcessorOption that configures the maximum export batch size allowed for a BatchItemProcessor.

func WithMaxQueueSize

func WithMaxQueueSize(size int) BatchItemProcessorOption

WithMaxQueueSize returns a BatchItemProcessorOption that configures the maximum queue size allowed for a BatchItemProcessor.

func WithShippingMethod

func WithShippingMethod(method ShippingMethod) BatchItemProcessorOption

WithExportTimeout returns a BatchItemProcessorOption that configures the amount of time a BatchItemProcessor waits for an exporter to export before abandoning the export.

func WithWorkers

func WithWorkers(workers int) BatchItemProcessorOption

WithWorkers returns a BatchItemProcessorOption that configures the number of workers to process items.

type BatchItemProcessorOptions

type BatchItemProcessorOptions struct {
	// MaxQueueSize is the maximum queue size to buffer items for delayed processing. If the
	// queue gets full it drops the items.
	// The default value of MaxQueueSize is 51200.
	MaxQueueSize int

	// BatchTimeout is the maximum duration for constructing a batch. Processor
	// forcefully sends available items when timeout is reached.
	// The default value of BatchTimeout is 5000 msec.
	BatchTimeout time.Duration

	// ExportTimeout specifies the maximum duration for exporting items. If the timeout
	// is reached, the export will be cancelled.
	// The default value of ExportTimeout is 30000 msec.
	ExportTimeout time.Duration

	// MaxExportBatchSize is the maximum number of items to process in a single batch.
	// If there are more than one batch worth of items then it processes multiple batches
	// of items one batch after the other without any delay.
	// The default value of MaxExportBatchSize is 512.
	MaxExportBatchSize int

	// ShippingMethod is the method used to ship items to the exporter.
	ShippingMethod ShippingMethod

	// Number of workers to process items.
	Workers int
}

BatchItemProcessorOptions is configuration settings for a BatchItemProcessor.

type ItemExporter

type ItemExporter[T any] interface {
	// ExportItems exports a batch of items.
	//
	// This function is called synchronously, so there is no concurrency
	// safety requirement. However, due to the synchronous calling pattern,
	// it is critical that all timeouts and cancellations contained in the
	// passed context must be honored.
	//
	// Any retry logic must be contained in this function. The SDK that
	// calls this function will not implement any retry logic. All errors
	// returned by this function are considered unrecoverable and will be
	// reported to a configured error Handler.
	ExportItems(ctx context.Context, items []*T) error

	// Shutdown notifies the exporter of a pending halt to operations. The
	// exporter is expected to preform any cleanup or synchronization it
	// requires while honoring all timeouts and cancellations contained in
	// the passed context.
	Shutdown(ctx context.Context) error
}

type Metrics

type Metrics struct {
	// contains filtered or unexported fields
}

func NewMetrics

func NewMetrics(namespace string) *Metrics

func (*Metrics) IncItemsDroppedBy

func (m *Metrics) IncItemsDroppedBy(name string, count float64)

func (*Metrics) IncItemsExportedBy

func (m *Metrics) IncItemsExportedBy(name string, count float64)

func (*Metrics) IncItemsFailedBy added in v0.0.158

func (m *Metrics) IncItemsFailedBy(name string, count float64)

func (*Metrics) SetItemsQueued

func (m *Metrics) SetItemsQueued(name string, count float64)

type ShippingMethod

type ShippingMethod string
const (
	ShippingMethodUnknown ShippingMethod = "unknown"
	ShippingMethodAsync   ShippingMethod = "async"
	ShippingMethodSync    ShippingMethod = "sync"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL