Documentation ¶
Overview ¶
Package docappender provides an API for append-only bulk document indexing into Elasticsearch.
This package provides an intentionally simpler and more restrictive API than the go-elasticsearch/esutil.BulkIndexer API; it is not intended to cover all bulk API use cases. It is intended to be used for conflict-free, append-only indexing into Elasticsearch data streams.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrClosed is returned from methods of closed Indexers. ErrClosed = errors.New("model indexer closed") )
Functions ¶
This section is empty.
Types ¶
type Appender ¶
type Appender struct {
// contains filtered or unexported fields
}
Appender provides an append-only API for bulk indexing documents into Elasticsearch.
Appender buffers documents in their JSON encoding until either the accumulated buffer reaches `config.FlushBytes`, or `config.FlushInterval` elapses.
Appender fills a single bulk request buffer at a time to ensure bulk requests are optimally sized, avoiding sparse bulk requests as much as possible. After a bulk request is flushed, the next document added will wait for the next available bulk request buffer and repeat the process.
Up to `config.MaxRequests` bulk requests may be flushing/active concurrently, to allow the server to make progress encoding while Elasticsearch is busy servicing flushed bulk requests.
func (*Appender) Add ¶
Add enqueues document for appending to index.
The document body will be copied to a buffer using io.Copy, and document may implement io.WriterTo to reduce overhead of copying.
The document io.WriterTo will be accessed after Add returns, and must remain accessible until its Read method returns EOF, or its WriterTo method returns.
type BulkIndexer ¶ added in v1.1.0
type BulkIndexer struct {
// contains filtered or unexported fields
}
func NewBulkIndexer ¶ added in v1.1.0
func NewBulkIndexer(client *elasticsearch.Client, compressionLevel int, maxDocRetry int) *BulkIndexer
func (*BulkIndexer) Add ¶ added in v1.1.0
func (b *BulkIndexer) Add(item BulkIndexerItem) error
Add encodes an item in the buffer.
func (*BulkIndexer) BytesFlushed ¶ added in v1.1.0
func (b *BulkIndexer) BytesFlushed() int
BytesFlushed returns the number of bytes flushed by the bulk indexer.
func (*BulkIndexer) Flush ¶ added in v1.1.0
func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error)
Flush executes a bulk request if there are any items buffered, and clears out the buffer.
func (*BulkIndexer) Items ¶ added in v1.1.0
func (b *BulkIndexer) Items() int
Added returns the number of buffered items.
func (*BulkIndexer) Len ¶ added in v1.1.0
func (b *BulkIndexer) Len() int
Len returns the number of buffered bytes.
func (*BulkIndexer) Reset ¶ added in v1.1.0
func (b *BulkIndexer) Reset()
BulkIndexer resets b, ready for a new request.
type BulkIndexerItem ¶ added in v1.1.0
type BulkIndexerResponseItem ¶ added in v0.2.0
type BulkIndexerResponseItem struct { Index string `json:"_index"` Status int `json:"status"` Position int Error struct { Type string `json:"type"` Reason string `json:"reason"` } `json:"error,omitempty"` }
BulkIndexerResponseItem represents the Elasticsearch response item.
type BulkIndexerResponseStat ¶ added in v1.0.0
type BulkIndexerResponseStat struct { Indexed int64 RetriedDocs int64 FailedDocs []BulkIndexerResponseItem }
type Config ¶
type Config struct { // Logger holds an optional Logger to use for logging indexing requests. // // All Elasticsearch errors will be logged at error level, so in cases // where the indexer is used for high throughput indexing, is recommended // that a rate-limited logger is used. // // If Logger is nil, logging will be disabled. Logger *zap.Logger // Tracer holds an optional apm.Tracer to use for tracing bulk requests // to Elasticsearch. Each bulk request is traced as a transaction. // // If Tracer is nil, requests will not be traced. Tracer *apm.Tracer // CompressionLevel holds the gzip compression level, from 0 (gzip.NoCompression) // to 9 (gzip.BestCompression). Higher values provide greater compression, at a // greater cost of CPU. The special value -1 (gzip.DefaultCompression) selects the // default compression level. CompressionLevel int // MaxRequests holds the maximum number of bulk index requests to execute concurrently. // The maximum memory usage of Appender is thus approximately MaxRequests*FlushBytes. // // If MaxRequests is less than or equal to zero, the default of 10 will be used. MaxRequests int // MaxDocumentRetries holds the maximum number of document retries MaxDocumentRetries int // FlushBytes holds the flush threshold in bytes. If Compression is enabled, // The number of documents that can be buffered will be greater. // // If FlushBytes is zero, the default of 1MB will be used. FlushBytes int // FlushInterval holds the flush threshold as a duration. // // If FlushInterval is zero, the default of 30 seconds will be used. FlushInterval time.Duration // FlushTimeout holds the flush timeout as a duration. // // If FlushTimeout is zero, no timeout will be used. FlushTimeout time.Duration // DocumentBufferSize sets the number of documents that can be buffered before // they are stored in the active indexer buffer. // // If DocumentBufferSize is zero, the default 1024 will be used. DocumentBufferSize int // Scaling configuration for the docappender. // // If unspecified, scaling is enabled by default. Scaling ScalingConfig // MeterProvider holds the OTel MeterProvider to be used to create and // record appender metrics. // // If unset, the global OTel MeterProvider will be used, if that is unset, // no metrics will be recorded. MeterProvider metric.MeterProvider // MetricAttributes holds any extra attributes to set in the recorded // metrics. MetricAttributes attribute.Set }
Config holds configuration for Appender.
type ScaleActionConfig ¶
type ScaleActionConfig struct { // Threshold is the number of consecutive times a scale up/down condition // has to happen for the scaling action will be triggered. Threshold uint // CoolDown is the amount of time needed to elapse between scaling actions // to trigger it. CoolDown time.Duration }
ScaleActionConfig holds the configuration for a scaling action
type ScalingConfig ¶
type ScalingConfig struct { // Disabled toggles active indexer scaling on. // // It is enabled by default. Disabled bool // ActiveRatio defines the threshold for (potential) active indexers to // GOMAXPROCS. The higher the number, the more potential active indexers // there will be actively pulling from the BulkIndexerItem channel. // For example, when ActiveRatio:1 and GOMAXPROCS:2, there can be a max // of 2 active indexers, or 1 per GOMAXPROCS. // If set to 0.5, the maximum number of active indexers is 1, since. // The value must be between 0 and 1. // // It defaults to 0.25 by default. ActiveRatio float64 // ScaleDown configures the Threshold and CoolDown for the scale down // action. In order to scale down an active indexer, the Threshold has // to be met after the CoolDown has elapsed. Scale down will only take // place if there are more than 1 active indexer. // Active indexers will be destroyed when they aren't needed anymore, // when enough timed flushes (FlushInterval) are performed by an active // indexer (controlled by Threshold), or when an active indexer is idle // for (IdleInterval * Threshold) as long as CoolDown allows it. // // When unset, the default of 30 is used for Threshold, and 30 seconds for // CoolDown. ScaleDown ScaleActionConfig // ScaleUp configures the Threshold and CoolDown for the scale up action. // // In order for a scale up to occur, the Threshold has to be met after // the CoolDown has elapsed. By default, a single active indexer is created // which actively pulls items from the internal buffered queue. When enough // full flushes (FlushBytes) are performed by an active indexer (controlled // by Threshold), a new active indexer will be created until GOMAXPROCS / 4 // is reached (25% of CPU capacity) if the CoolDown allows it. // // When unspecified, the default of 60 is used for Threshold, and 60 seconds // for CoolDown. ScaleUp ScaleActionConfig // IdleInterval defines how long an active indexer performs an inactivity // check. The ScaleDown.Threshold and ScaleDown.CoolDown needs to be met // for an active indexer to be destroyed. // // When unspecified, the default of 30 seconds will be used. IdleInterval time.Duration }
ScalingConfig holds the docappender autoscaling configuration.
type Stats ¶
type Stats struct { // Active holds the active number of items waiting in the indexer's queue. Active int64 // Added holds the number of items added to the indexer. Added int64 // BulkRequests holds the number of bulk requests completed. BulkRequests int64 // Failed holds the number of indexing operations that failed. It includes // all failures. Failed int64 // FailedClient holds the number of indexing operations that failed with a // status_code >= 400 < 500, but not 429. FailedClient int64 // FailedClient holds the number of indexing operations that failed with a // status_code >= 500. FailedServer int64 // Indexed holds the number of indexing operations that have completed // successfully. Indexed int64 // TooManyRequests holds the number of indexing operations that failed due // to Elasticsearch responding with 429 Too many Requests. TooManyRequests int64 // BytesTotal represents the total number of bytes written to the request // body that is sent in the outgoing _bulk request to Elasticsearch. // The number of bytes written will be smaller when compression is enabled. // This implementation differs from the previous number reported by libbeat // which counts bytes at the transport level. BytesTotal int64 // AvailableBulkRequests represents the number of bulk indexers // available for making bulk index requests. AvailableBulkRequests int64 // IndexersActive represents the number of active bulk indexers that are // concurrently processing batches. IndexersActive int64 // IndexersCreated represents the number of times new active indexers were // created. IndexersCreated int64 // Downscales represents the number of times an active indexer was destroyed. IndexersDestroyed int64 }
Stats holds bulk indexing statistics.