processor

package
v1.0.25 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 9, 2025 License: GPL-3.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultMaxQueueSize       = 51200
	DefaultScheduleDelay      = 5000
	DefaultExportTimeout      = 30000
	DefaultMaxExportBatchSize = 512
	DefaultShippingMethod     = ShippingMethodAsync
	DefaultNumWorkers         = 5
)

Variables

View Source
var (
	DefaultMetrics = NewMetrics("xatu")
)

Functions

This section is empty.

Types

type BatchItemProcessor

type BatchItemProcessor[T any] struct {
	// contains filtered or unexported fields
}

BatchItemProcessor is a processor that batches items for export.

func NewBatchItemProcessor

func NewBatchItemProcessor[T any](exporter ItemExporter[T], name string, log logrus.FieldLogger, options ...BatchItemProcessorOption) (*BatchItemProcessor[T], error)

NewBatchItemProcessor creates a new batch item processor.

func (*BatchItemProcessor[T]) Shutdown

func (bvp *BatchItemProcessor[T]) Shutdown(ctx context.Context) error

Shutdown shuts down the batch item processor.

func (*BatchItemProcessor[T]) Start added in v0.0.179

func (bvp *BatchItemProcessor[T]) Start(ctx context.Context)

func (*BatchItemProcessor[T]) Write

func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error

Write writes items to the queue. If the Processor is configured to use the sync shipping method, the items will be written to the queue and this function will return when all items have been processed. If the Processor is configured to use the async shipping method, the items will be written to the queue and this function will return immediately.

type BatchItemProcessorOption

type BatchItemProcessorOption func(o *BatchItemProcessorOptions)

BatchItemProcessorOption is a functional option for the batch item processor.

func WithBatchTimeout

func WithBatchTimeout(delay time.Duration) BatchItemProcessorOption

func WithExportTimeout

func WithExportTimeout(timeout time.Duration) BatchItemProcessorOption

func WithMaxExportBatchSize

func WithMaxExportBatchSize(size int) BatchItemProcessorOption

func WithMaxQueueSize

func WithMaxQueueSize(size int) BatchItemProcessorOption

func WithMetrics added in v0.0.179

func WithMetrics(metrics *Metrics) BatchItemProcessorOption

func WithShippingMethod

func WithShippingMethod(method ShippingMethod) BatchItemProcessorOption

func WithWorkers

func WithWorkers(workers int) BatchItemProcessorOption

type BatchItemProcessorOptions

type BatchItemProcessorOptions struct {
	// MaxQueueSize is the maximum queue size to buffer items for delayed processing. If the
	// queue gets full it drops the items.
	// The default value of MaxQueueSize is 51200.
	MaxQueueSize int
	// BatchTimeout is the maximum duration for constructing a batch. Processor
	// forcefully sends available items when timeout is reached.
	// The default value of BatchTimeout is 5000 msec.
	BatchTimeout time.Duration

	// ExportTimeout specifies the maximum duration for exporting items. If the timeout
	// is reached, the export will be cancelled.
	// The default value of ExportTimeout is 30000 msec.
	ExportTimeout time.Duration
	// MaxExportBatchSize is the maximum number of items to include in a batch.
	// The default value of MaxExportBatchSize is 512.
	MaxExportBatchSize int
	// ShippingMethod is the method of shipping items for export. The default value
	// of ShippingMethod is "async".
	ShippingMethod ShippingMethod
	// Workers is the number of workers to process batches.
	// The default value of Workers is 5.
	Workers int
	// Metrics is the metrics instance to use.
	Metrics *Metrics
}

func (*BatchItemProcessorOptions) Validate added in v0.0.175

func (o *BatchItemProcessorOptions) Validate() error

type ItemExporter

type ItemExporter[T any] interface {
	// ExportItems exports a batch of items.
	//
	// This function is called synchronously, so there is no concurrency
	// safety requirement. However, due to the synchronous calling pattern,
	// it is critical that all timeouts and cancellations contained in the
	// passed context must be honored.
	//
	// Any retry logic must be contained in this function. The SDK that
	// calls this function will not implement any retry logic. All errors
	// returned by this function are considered unrecoverable and will be
	// reported to a configured error Handler.
	ExportItems(ctx context.Context, items []*T) error

	// Shutdown notifies the exporter of a pending halt to operations. The
	// exporter is expected to preform any cleanup or synchronization it
	// requires while honoring all timeouts and cancellations contained in
	// the passed context.
	Shutdown(ctx context.Context) error
}

ItemExporter is an interface for exporting items.

type Metrics

type Metrics struct {
	// contains filtered or unexported fields
}

func NewMetrics

func NewMetrics(namespace string) *Metrics

func (*Metrics) DecWorkerExportInProgress added in v0.0.179

func (m *Metrics) DecWorkerExportInProgress(name string)

func (*Metrics) IncItemsDroppedBy

func (m *Metrics) IncItemsDroppedBy(name string, count float64)

func (*Metrics) IncItemsExportedBy

func (m *Metrics) IncItemsExportedBy(name string, count float64)

func (*Metrics) IncItemsFailedBy added in v0.0.158

func (m *Metrics) IncItemsFailedBy(name string, count float64)

func (*Metrics) IncWorkerExportInProgress added in v0.0.179

func (m *Metrics) IncWorkerExportInProgress(name string)

func (*Metrics) ObserveBatchSize added in v0.0.179

func (m *Metrics) ObserveBatchSize(name string, size float64)

func (*Metrics) ObserveExportDuration added in v0.0.179

func (m *Metrics) ObserveExportDuration(name string, duration time.Duration)

func (*Metrics) SetItemsQueued

func (m *Metrics) SetItemsQueued(name string, count float64)

func (*Metrics) SetWorkerCount added in v0.0.179

func (m *Metrics) SetWorkerCount(name string, count float64)

type ShippingMethod

type ShippingMethod string

ShippingMethod is the method of shipping items for export.

const (
	ShippingMethodUnknown ShippingMethod = "unknown"
	ShippingMethodAsync   ShippingMethod = "async"
	ShippingMethodSync    ShippingMethod = "sync"
)

type TraceableItem added in v0.0.179

type TraceableItem[T any] struct {
	// contains filtered or unexported fields
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL