Documentation ¶
Index ¶
- Constants
- Variables
- type BatchItemProcessor
- type BatchItemProcessorOption
- func WithBatchTimeout(delay time.Duration) BatchItemProcessorOption
- func WithExportTimeout(timeout time.Duration) BatchItemProcessorOption
- func WithMaxExportBatchSize(size int) BatchItemProcessorOption
- func WithMaxQueueSize(size int) BatchItemProcessorOption
- func WithMetrics(metrics *Metrics) BatchItemProcessorOption
- func WithShippingMethod(method ShippingMethod) BatchItemProcessorOption
- func WithWorkers(workers int) BatchItemProcessorOption
- type BatchItemProcessorOptions
- type ItemExporter
- type Metrics
- func (m *Metrics) DecWorkerExportInProgress(name string)
- func (m *Metrics) IncItemsDroppedBy(name string, count float64)
- func (m *Metrics) IncItemsExportedBy(name string, count float64)
- func (m *Metrics) IncItemsFailedBy(name string, count float64)
- func (m *Metrics) IncWorkerExportInProgress(name string)
- func (m *Metrics) ObserveBatchSize(name string, size float64)
- func (m *Metrics) ObserveExportDuration(name string, duration time.Duration)
- func (m *Metrics) SetItemsQueued(name string, count float64)
- func (m *Metrics) SetWorkerCount(name string, count float64)
- type ShippingMethod
- type TraceableItem
Constants ¶
const ( DefaultMaxQueueSize = 51200 DefaultScheduleDelay = 5000 DefaultExportTimeout = 30000 DefaultMaxExportBatchSize = 512 DefaultShippingMethod = ShippingMethodAsync DefaultNumWorkers = 5 )
Variables ¶
var (
DefaultMetrics = NewMetrics("xatu")
)
Functions ¶
This section is empty.
Types ¶
type BatchItemProcessor ¶
type BatchItemProcessor[T any] struct { // contains filtered or unexported fields }
BatchItemProcessor is a processor that batches items for export.
func NewBatchItemProcessor ¶
func NewBatchItemProcessor[T any](exporter ItemExporter[T], name string, log logrus.FieldLogger, options ...BatchItemProcessorOption) (*BatchItemProcessor[T], error)
NewBatchItemProcessor creates a new batch item processor.
func (*BatchItemProcessor[T]) Shutdown ¶
func (bvp *BatchItemProcessor[T]) Shutdown(ctx context.Context) error
Shutdown shuts down the batch item processor.
func (*BatchItemProcessor[T]) Start ¶ added in v0.0.179
func (bvp *BatchItemProcessor[T]) Start(ctx context.Context)
func (*BatchItemProcessor[T]) Write ¶
func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error
Write writes items to the queue. If the Processor is configured to use the sync shipping method, the items will be written to the queue and this function will return when all items have been processed. If the Processor is configured to use the async shipping method, the items will be written to the queue and this function will return immediately.
type BatchItemProcessorOption ¶
type BatchItemProcessorOption func(o *BatchItemProcessorOptions)
BatchItemProcessorOption is a functional option for the batch item processor.
func WithBatchTimeout ¶
func WithBatchTimeout(delay time.Duration) BatchItemProcessorOption
func WithExportTimeout ¶
func WithExportTimeout(timeout time.Duration) BatchItemProcessorOption
func WithMaxExportBatchSize ¶
func WithMaxExportBatchSize(size int) BatchItemProcessorOption
func WithMaxQueueSize ¶
func WithMaxQueueSize(size int) BatchItemProcessorOption
func WithMetrics ¶ added in v0.0.179
func WithMetrics(metrics *Metrics) BatchItemProcessorOption
func WithShippingMethod ¶
func WithShippingMethod(method ShippingMethod) BatchItemProcessorOption
func WithWorkers ¶
func WithWorkers(workers int) BatchItemProcessorOption
type BatchItemProcessorOptions ¶
type BatchItemProcessorOptions struct { // MaxQueueSize is the maximum queue size to buffer items for delayed processing. If the // queue gets full it drops the items. // The default value of MaxQueueSize is 51200. MaxQueueSize int // BatchTimeout is the maximum duration for constructing a batch. Processor // forcefully sends available items when timeout is reached. // The default value of BatchTimeout is 5000 msec. BatchTimeout time.Duration // ExportTimeout specifies the maximum duration for exporting items. If the timeout // is reached, the export will be cancelled. // The default value of ExportTimeout is 30000 msec. ExportTimeout time.Duration // MaxExportBatchSize is the maximum number of items to include in a batch. // The default value of MaxExportBatchSize is 512. MaxExportBatchSize int // ShippingMethod is the method of shipping items for export. The default value // of ShippingMethod is "async". ShippingMethod ShippingMethod // Workers is the number of workers to process batches. // The default value of Workers is 5. Workers int // Metrics is the metrics instance to use. Metrics *Metrics }
func (*BatchItemProcessorOptions) Validate ¶ added in v0.0.175
func (o *BatchItemProcessorOptions) Validate() error
type ItemExporter ¶
type ItemExporter[T any] interface { // ExportItems exports a batch of items. // // This function is called synchronously, so there is no concurrency // safety requirement. However, due to the synchronous calling pattern, // it is critical that all timeouts and cancellations contained in the // passed context must be honored. // // Any retry logic must be contained in this function. The SDK that // calls this function will not implement any retry logic. All errors // returned by this function are considered unrecoverable and will be // reported to a configured error Handler. ExportItems(ctx context.Context, items []*T) error // Shutdown notifies the exporter of a pending halt to operations. The // exporter is expected to preform any cleanup or synchronization it // requires while honoring all timeouts and cancellations contained in // the passed context. Shutdown(ctx context.Context) error }
ItemExporter is an interface for exporting items.
type Metrics ¶
type Metrics struct {
// contains filtered or unexported fields
}
func NewMetrics ¶
func (*Metrics) DecWorkerExportInProgress ¶ added in v0.0.179
func (*Metrics) IncItemsDroppedBy ¶
func (*Metrics) IncItemsExportedBy ¶
func (*Metrics) IncItemsFailedBy ¶ added in v0.0.158
func (*Metrics) IncWorkerExportInProgress ¶ added in v0.0.179
func (*Metrics) ObserveBatchSize ¶ added in v0.0.179
func (*Metrics) ObserveExportDuration ¶ added in v0.0.179
func (*Metrics) SetItemsQueued ¶
func (*Metrics) SetWorkerCount ¶ added in v0.0.179
type ShippingMethod ¶
type ShippingMethod string
ShippingMethod is the method of shipping items for export.
const ( ShippingMethodUnknown ShippingMethod = "unknown" ShippingMethodAsync ShippingMethod = "async" ShippingMethodSync ShippingMethod = "sync" )
type TraceableItem ¶ added in v0.0.179
type TraceableItem[T any] struct { // contains filtered or unexported fields }