Documentation ¶
Overview ¶
Package opensearchutil provides helper utilities to the Go client for OpenSearch.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewJSONReader ¶
NewJSONReader encodes v into JSON and returns it as an io.Reader.
Types ¶
type BulkIndexer ¶
type BulkIndexer interface { // Add adds an item to the indexer. It returns an error when the item cannot be added. // Use the OnSuccess and OnFailure callbacks to get the operation result for the item. // // You must call the Close() method after you're done adding items. // // It is safe for concurrent use. When it's called from goroutines, // they must finish before the call to Close, eg. using sync.WaitGroup. Add(context.Context, BulkIndexerItem) error // Close waits until all added items are flushed and closes the indexer. Close(context.Context) error // Stats returns indexer statistics. Stats() BulkIndexerStats }
BulkIndexer represents a parallel, asynchronous, efficient indexer for OpenSearch.
func NewBulkIndexer ¶
func NewBulkIndexer(cfg BulkIndexerConfig) (BulkIndexer, error)
NewBulkIndexer creates a new bulk indexer.
Example ¶
log.SetFlags(0) // Create the OpenSearch client // client, err := opensearchapi.NewClient(opensearchapi.Config{ Client: opensearch.Config{ // Retry on 429 TooManyRequests statuses // RetryOnStatus: []int{502, 503, 504, 429}, // A simple incremental backoff function // RetryBackoff: func(i int) time.Duration { return time.Duration(i) * 100 * time.Millisecond }, // Retry up to 5 attempts // MaxRetries: 5, }, }) if err != nil { log.Fatalf("Error creating the client: %s", err) } // Create the indexer // indexer, err := opensearchutil.NewBulkIndexer(opensearchutil.BulkIndexerConfig{ Client: client, // The OpenSearch client Index: "test", // The default index name NumWorkers: 4, // The number of worker goroutines (default: number of CPUs) FlushBytes: 5e+6, // The flush threshold in bytes (default: 5M) }) if err != nil { log.Fatalf("Error creating the indexer: %s", err) } // Add an item to the indexer // err = indexer.Add( context.Background(), opensearchutil.BulkIndexerItem{ // Action field configures the operation to perform (index, create, delete, update) Action: "index", // DocumentID is the optional document ID DocumentID: "1", // Body is an `io.Reader` with the payload Body: strings.NewReader(`{"title":"Test"}`), // OnSuccess is the optional callback for each successful operation OnSuccess: func( ctx context.Context, item opensearchutil.BulkIndexerItem, res opensearchapi.BulkRespItem, ) { fmt.Printf("[%d] %s test/%s", res.Status, res.Result, item.DocumentID) }, // OnFailure is the optional callback for each failed operation OnFailure: func( ctx context.Context, item opensearchutil.BulkIndexerItem, res opensearchapi.BulkRespItem, err error, ) { if err != nil { log.Printf("ERROR: %s", err) } else { log.Printf("ERROR: %s: %s", res.Error.Type, res.Error.Reason) } }, }, ) if err != nil { log.Fatalf("Unexpected error: %s", err) } // Close the indexer channel and flush remaining items // if err := indexer.Close(context.Background()); err != nil { log.Fatalf("Unexpected error: %s", err) } // Report the indexer statistics // stats := indexer.Stats() if stats.NumFailed > 0 { log.Fatalf("Indexed [%d] documents with [%d] errors", stats.NumFlushed, stats.NumFailed) } log.Printf("Successfully indexed [%d] documents", stats.NumFlushed) // For optimal performance, consider using a third-party package for JSON decoding and HTTP transport.
Output:
type BulkIndexerConfig ¶
type BulkIndexerConfig struct { NumWorkers int // The number of workers. Defaults to runtime.NumCPU(). FlushBytes int // The flush threshold in bytes. Defaults to 5MB. FlushInterval time.Duration // The flush threshold as duration. Defaults to 30sec. Client *opensearchapi.Client // The OpenSearch client. DebugLogger BulkIndexerDebugLogger // An optional logger for debugging. OnError func(context.Context, error) // Called for indexer errors. OnFlushStart func(context.Context) context.Context // Called when the flush starts. OnFlushEnd func(context.Context) // Called when the flush ends. // Parameters of the Bulk API. Index string ErrorTrace bool Header http.Header Human bool Pipeline string Pretty bool Refresh string Routing string Source []string SourceExcludes []string SourceIncludes []string Timeout time.Duration WaitForActiveShards string }
BulkIndexerConfig represents configuration of the indexer.
type BulkIndexerDebugLogger ¶
type BulkIndexerDebugLogger interface {
Printf(string, ...interface{})
}
BulkIndexerDebugLogger defines the interface for a debugging logger.
type BulkIndexerItem ¶
type BulkIndexerItem struct { Index string Action string DocumentID string Routing *string Version *int64 VersionType *string IfSeqNum *int64 IfPrimaryTerm *int64 WaitForActiveShards interface{} Refresh *string RequireAlias *bool Body io.ReadSeeker RetryOnConflict *int OnSuccess func(context.Context, BulkIndexerItem, opensearchapi.BulkRespItem) // Per item OnFailure func(context.Context, BulkIndexerItem, opensearchapi.BulkRespItem, error) // Per item }
BulkIndexerItem represents an indexer item.
type BulkIndexerStats ¶
type BulkIndexerStats struct { NumAdded uint64 NumFlushed uint64 NumFailed uint64 NumIndexed uint64 NumCreated uint64 NumUpdated uint64 NumDeleted uint64 NumRequests uint64 }
BulkIndexerStats represents the indexer statistics.
type JSONEncoder ¶
JSONEncoder defines the interface for custom JSON encoders.
type JSONReader ¶
type JSONReader struct {
// contains filtered or unexported fields
}
JSONReader represents a reader which takes an interface value, encodes it into JSON, and wraps it in an io.Reader.