Documentation ¶
Overview ¶
Package esutil provides helper utilities to the Go client for Elasticsearch.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewJSONReader ¶
NewJSONReader encodes v into JSON and returns it as an io.Reader.
Types ¶
type BulkIndexer ¶
type BulkIndexer interface { // Add adds an item to the indexer. It returns an error when the item cannot be added. // Use the OnSuccess and OnFailure callbacks to get the operation result for the item. // // You must call the Close() method after you're done adding items. // // It is safe for concurrent use. When it's called from goroutines, // they must finish before the call to Close, eg. using sync.WaitGroup. Add(context.Context, BulkIndexerItem) error // Close waits until all added items are flushed and closes the indexer. Close(context.Context) error // Stats returns indexer statistics. Stats() BulkIndexerStats }
BulkIndexer represents a parallel, asynchronous, efficient indexer for Elasticsearch.
func NewBulkIndexer ¶
func NewBulkIndexer(cfg BulkIndexerConfig) (BulkIndexer, error)
NewBulkIndexer creates a new bulk indexer.
Example ¶
log.SetFlags(0) // Create the Elasticsearch client // es, err := elasticsearch.NewClient(elasticsearch.Config{ // Retry on 429 TooManyRequests statuses // RetryOnStatus: []int{502, 503, 504, 429}, // A simple incremental backoff function // RetryBackoff: func(i int) time.Duration { return time.Duration(i) * 100 * time.Millisecond }, // Retry up to 5 attempts // MaxRetries: 5, }) if err != nil { log.Fatalf("Error creating the client: %s", err) } // Create the indexer // indexer, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ Client: es, // The Elasticsearch client Index: "test", // The default index name NumWorkers: 4, // The number of worker goroutines (default: number of CPUs) FlushBytes: 5e+6, // The flush threshold in bytes (default: 5M) }) if err != nil { log.Fatalf("Error creating the indexer: %s", err) } // Add an item to the indexer // err = indexer.Add( context.Background(), esutil.BulkIndexerItem{ // Action field configures the operation to perform (index, create, delete, update) Action: "index", // DocumentID is the optional document ID DocumentID: "1", // Body is an `io.Reader` with the payload Body: strings.NewReader(`{"title":"Test"}`), // OnSuccess is the optional callback for each successful operation OnSuccess: func( ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, ) { fmt.Printf("[%d] %s test/%s", res.Status, res.Result, item.DocumentID) }, // OnFailure is the optional callback for each failed operation OnFailure: func( ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error, ) { if err != nil { log.Printf("ERROR: %s", err) } else { log.Printf("ERROR: %s: %s", res.Error.Type, res.Error.Reason) } }, }, ) if err != nil { log.Fatalf("Unexpected error: %s", err) } // Close the indexer channel and flush remaining items // if err := indexer.Close(context.Background()); err != nil { log.Fatalf("Unexpected error: %s", err) } // Report the indexer statistics // stats := indexer.Stats() if stats.NumFailed > 0 { log.Fatalf("Indexed [%d] documents with [%d] errors", stats.NumFlushed, stats.NumFailed) } else { log.Printf("Successfully indexed [%d] documents", stats.NumFlushed) } // For optimal performance, consider using a third-party package for JSON decoding and HTTP transport. // // For more information, examples and benchmarks, see: // // --> https://github.com/elastic/go-elasticsearch/tree/master/_examples/bulk
Output:
type BulkIndexerConfig ¶
type BulkIndexerConfig struct { NumWorkers int // The number of workers. Defaults to runtime.NumCPU(). FlushBytes int // The flush threshold in bytes. Defaults to 5MB. FlushInterval time.Duration // The flush threshold as duration. Defaults to 30sec. Client *elasticsearch.Client // The Elasticsearch client. Decoder BulkResponseJSONDecoder // A custom JSON decoder. DebugLogger BulkIndexerDebugLogger // An optional logger for debugging. OnError func(context.Context, error) // Called for indexer errors. OnFlushStart func(context.Context) context.Context // Called when the flush starts. OnFlushEnd func(context.Context) // Called when the flush ends. // Parameters of the Bulk API. Index string ErrorTrace bool FilterPath []string Header http.Header Human bool Pipeline string Pretty bool Refresh string Routing string Source []string SourceExcludes []string SourceIncludes []string Timeout time.Duration WaitForActiveShards string }
BulkIndexerConfig represents configuration of the indexer.
type BulkIndexerDebugLogger ¶
type BulkIndexerDebugLogger interface {
Printf(string, ...interface{})
}
BulkIndexerDebugLogger defines the interface for a debugging logger.
type BulkIndexerItem ¶
type BulkIndexerItem struct { Index string Action string DocumentID string Body io.Reader RetryOnConflict *int OnSuccess func(context.Context, BulkIndexerItem, BulkIndexerResponseItem) // Per item OnFailure func(context.Context, BulkIndexerItem, BulkIndexerResponseItem, error) // Per item }
BulkIndexerItem represents an indexer item.
type BulkIndexerResponse ¶
type BulkIndexerResponse struct { Took int `json:"took"` HasErrors bool `json:"errors"` Items []map[string]BulkIndexerResponseItem `json:"items,omitempty"` }
BulkIndexerResponse represents the Elasticsearch response.
type BulkIndexerResponseItem ¶
type BulkIndexerResponseItem struct { Index string `json:"_index"` DocumentID string `json:"_id"` Version int64 `json:"_version"` Result string `json:"result"` Status int `json:"status"` SeqNo int64 `json:"_seq_no"` PrimTerm int64 `json:"_primary_term"` Shards struct { Total int `json:"total"` Successful int `json:"successful"` Failed int `json:"failed"` } `json:"_shards"` Error struct { Type string `json:"type"` Reason string `json:"reason"` Cause struct { Type string `json:"type"` Reason string `json:"reason"` } `json:"caused_by"` } `json:"error,omitempty"` }
BulkIndexerResponseItem represents the Elasticsearch response item.
type BulkIndexerStats ¶
type BulkIndexerStats struct { NumAdded uint64 NumFlushed uint64 NumFailed uint64 NumIndexed uint64 NumCreated uint64 NumUpdated uint64 NumDeleted uint64 NumRequests uint64 }
BulkIndexerStats represents the indexer statistics.
type BulkResponseJSONDecoder ¶
type BulkResponseJSONDecoder interface {
UnmarshalFromReader(io.Reader, *BulkIndexerResponse) error
}
BulkResponseJSONDecoder defines the interface for custom JSON decoders.
type JSONEncoder ¶
JSONEncoder defines the interface for custom JSON encoders.
type JSONReader ¶
type JSONReader struct {
// contains filtered or unexported fields
}
JSONReader represents a reader which takes an interface value, encodes it into JSON, and wraps it in an io.Reader.