Documentation ¶
Index ¶
- Constants
- Variables
- type BatchItemProcessor
- type BatchItemProcessorOption
- func WithBatchTimeout(delay time.Duration) BatchItemProcessorOption
- func WithExportTimeout(timeout time.Duration) BatchItemProcessorOption
- func WithMaxExportBatchSize(size int) BatchItemProcessorOption
- func WithMaxQueueSize(size int) BatchItemProcessorOption
- func WithShippingMethod(method ShippingMethod) BatchItemProcessorOption
- func WithWorkers(workers int) BatchItemProcessorOption
- type BatchItemProcessorOptions
- type ItemExporter
- type Metrics
- type ShippingMethod
Constants ¶
const ( DefaultMaxQueueSize = 51200 DefaultScheduleDelay = 5000 DefaultExportTimeout = 30000 DefaultMaxExportBatchSize = 512 DefaultShippingMethod = ShippingMethodAsync DefaultNumWorkers = 1 )
Defaults for BatchItemProcessorOptions.
Variables ¶
var (
DefaultMetrics = NewMetrics("xatu")
)
Functions ¶
This section is empty.
Types ¶
type BatchItemProcessor ¶
type BatchItemProcessor[T any] struct { // contains filtered or unexported fields }
BatchItemProcessor is a buffer that batches asynchronously-received items and sends them to a exporter when complete.
func NewBatchItemProcessor ¶
func NewBatchItemProcessor[T any](exporter ItemExporter[T], name string, log logrus.FieldLogger, options ...BatchItemProcessorOption) (*BatchItemProcessor[T], error)
NewBatchItemProcessor creates a new ItemProcessor that will send completed item batches to the exporter with the supplied options.
If the exporter is nil, the item processor will preform no action.
func (*BatchItemProcessor[T]) ImmediatelyExportItems ¶
func (bvp *BatchItemProcessor[T]) ImmediatelyExportItems(ctx context.Context, items []*T) error
ImmediatelyExportItems immediately exports the items to the exporter. Useful for propogating errors from the exporter.
type BatchItemProcessorOption ¶
type BatchItemProcessorOption func(o *BatchItemProcessorOptions)
BatchItemProcessorOption configures a BatchItemProcessor.
func WithBatchTimeout ¶
func WithBatchTimeout(delay time.Duration) BatchItemProcessorOption
WithBatchTimeout returns a BatchItemProcessorOption that configures the maximum delay allowed for a BatchItemProcessor before it will export any held item (whether the queue is full or not).
func WithExportTimeout ¶
func WithExportTimeout(timeout time.Duration) BatchItemProcessorOption
WithExportTimeout returns a BatchItemProcessorOption that configures the amount of time a BatchItemProcessor waits for an exporter to export before abandoning the export.
func WithMaxExportBatchSize ¶
func WithMaxExportBatchSize(size int) BatchItemProcessorOption
WithMaxExportBatchSize returns a BatchItemProcessorOption that configures the maximum export batch size allowed for a BatchItemProcessor.
func WithMaxQueueSize ¶
func WithMaxQueueSize(size int) BatchItemProcessorOption
WithMaxQueueSize returns a BatchItemProcessorOption that configures the maximum queue size allowed for a BatchItemProcessor.
func WithShippingMethod ¶
func WithShippingMethod(method ShippingMethod) BatchItemProcessorOption
WithExportTimeout returns a BatchItemProcessorOption that configures the amount of time a BatchItemProcessor waits for an exporter to export before abandoning the export.
func WithWorkers ¶
func WithWorkers(workers int) BatchItemProcessorOption
WithWorkers returns a BatchItemProcessorOption that configures the number of workers to process items.
type BatchItemProcessorOptions ¶
type BatchItemProcessorOptions struct { // MaxQueueSize is the maximum queue size to buffer items for delayed processing. If the // queue gets full it drops the items. // The default value of MaxQueueSize is 51200. MaxQueueSize int // BatchTimeout is the maximum duration for constructing a batch. Processor // forcefully sends available items when timeout is reached. // The default value of BatchTimeout is 5000 msec. BatchTimeout time.Duration // ExportTimeout specifies the maximum duration for exporting items. If the timeout // is reached, the export will be cancelled. // The default value of ExportTimeout is 30000 msec. ExportTimeout time.Duration // MaxExportBatchSize is the maximum number of items to process in a single batch. // If there are more than one batch worth of items then it processes multiple batches // of items one batch after the other without any delay. // The default value of MaxExportBatchSize is 512. MaxExportBatchSize int // ShippingMethod is the method used to ship items to the exporter. ShippingMethod ShippingMethod // Number of workers to process items. Workers int }
BatchItemProcessorOptions is configuration settings for a BatchItemProcessor.
type ItemExporter ¶
type ItemExporter[T any] interface { // ExportItems exports a batch of items. // // This function is called synchronously, so there is no concurrency // safety requirement. However, due to the synchronous calling pattern, // it is critical that all timeouts and cancellations contained in the // passed context must be honored. // // Any retry logic must be contained in this function. The SDK that // calls this function will not implement any retry logic. All errors // returned by this function are considered unrecoverable and will be // reported to a configured error Handler. ExportItems(ctx context.Context, items []*T) error // Shutdown notifies the exporter of a pending halt to operations. The // exporter is expected to preform any cleanup or synchronization it // requires while honoring all timeouts and cancellations contained in // the passed context. Shutdown(ctx context.Context) error }
type Metrics ¶
type Metrics struct {
// contains filtered or unexported fields
}
func NewMetrics ¶
func (*Metrics) IncItemsDroppedBy ¶
func (*Metrics) IncItemsExportedBy ¶
func (*Metrics) IncItemsFailedBy ¶ added in v0.0.158
func (*Metrics) SetItemsQueued ¶
type ShippingMethod ¶
type ShippingMethod string
const ( ShippingMethodUnknown ShippingMethod = "unknown" ShippingMethodAsync ShippingMethod = "async" ShippingMethodSync ShippingMethod = "sync" )