opensearchutil

package
v2.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 15, 2022 License: Apache-2.0 Imports: 12 Imported by: 13

Documentation

Overview

Package opensearchutil provides helper utilities to the Go client for OpenSearch.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewJSONReader

func NewJSONReader(v interface{}) io.Reader

NewJSONReader encodes v into JSON and returns it as an io.Reader.

Types

type BulkIndexer

type BulkIndexer interface {
	// Add adds an item to the indexer. It returns an error when the item cannot be added.
	// Use the OnSuccess and OnFailure callbacks to get the operation result for the item.
	//
	// You must call the Close() method after you're done adding items.
	//
	// It is safe for concurrent use. When it's called from goroutines,
	// they must finish before the call to Close, eg. using sync.WaitGroup.
	Add(context.Context, BulkIndexerItem) error

	// Close waits until all added items are flushed and closes the indexer.
	Close(context.Context) error

	// Stats returns indexer statistics.
	Stats() BulkIndexerStats
}

BulkIndexer represents a parallel, asynchronous, efficient indexer for OpenSearch.

func NewBulkIndexer

func NewBulkIndexer(cfg BulkIndexerConfig) (BulkIndexer, error)

NewBulkIndexer creates a new bulk indexer.

Example
log.SetFlags(0)

// Create the OpenSearch client
//
client, err := opensearch.NewClient(opensearch.Config{
	// Retry on 429 TooManyRequests statuses
	//
	RetryOnStatus: []int{502, 503, 504, 429},

	// A simple incremental backoff function
	//
	RetryBackoff: func(i int) time.Duration { return time.Duration(i) * 100 * time.Millisecond },

	// Retry up to 5 attempts
	//
	MaxRetries: 5,
})
if err != nil {
	log.Fatalf("Error creating the client: %s", err)
}

// Create the indexer
//
indexer, err := opensearchutil.NewBulkIndexer(opensearchutil.BulkIndexerConfig{
	Client:     client, // The OpenSearch client
	Index:      "test", // The default index name
	NumWorkers: 4,      // The number of worker goroutines (default: number of CPUs)
	FlushBytes: 5e+6,   // The flush threshold in bytes (default: 5M)
})
if err != nil {
	log.Fatalf("Error creating the indexer: %s", err)
}

// Add an item to the indexer
//
err = indexer.Add(
	context.Background(),
	opensearchutil.BulkIndexerItem{
		// Action field configures the operation to perform (index, create, delete, update)
		Action: "index",

		// DocumentID is the optional document ID
		DocumentID: "1",

		// Body is an `io.Reader` with the payload
		Body: strings.NewReader(`{"title":"Test"}`),

		// OnSuccess is the optional callback for each successful operation
		OnSuccess: func(
			ctx context.Context,
			item opensearchutil.BulkIndexerItem,
			res opensearchutil.BulkIndexerResponseItem,
		) {
			fmt.Printf("[%d] %s test/%s", res.Status, res.Result, item.DocumentID)
		},

		// OnFailure is the optional callback for each failed operation
		OnFailure: func(
			ctx context.Context,
			item opensearchutil.BulkIndexerItem,
			res opensearchutil.BulkIndexerResponseItem, err error,
		) {
			if err != nil {
				log.Printf("ERROR: %s", err)
			} else {
				log.Printf("ERROR: %s: %s", res.Error.Type, res.Error.Reason)
			}
		},
	},
)
if err != nil {
	log.Fatalf("Unexpected error: %s", err)
}

// Close the indexer channel and flush remaining items
//
if err := indexer.Close(context.Background()); err != nil {
	log.Fatalf("Unexpected error: %s", err)
}

// Report the indexer statistics
//
stats := indexer.Stats()
if stats.NumFailed > 0 {
	log.Fatalf("Indexed [%d] documents with [%d] errors", stats.NumFlushed, stats.NumFailed)
} else {
	log.Printf("Successfully indexed [%d] documents", stats.NumFlushed)
}

// For optimal performance, consider using a third-party package for JSON decoding and HTTP transport.
Output:

type BulkIndexerConfig

type BulkIndexerConfig struct {
	NumWorkers    int           // The number of workers. Defaults to runtime.NumCPU().
	FlushBytes    int           // The flush threshold in bytes. Defaults to 5MB.
	FlushInterval time.Duration // The flush threshold as duration. Defaults to 30sec.

	Client      *opensearch.Client      // The OpenSearch client.
	Decoder     BulkResponseJSONDecoder // A custom JSON decoder.
	DebugLogger BulkIndexerDebugLogger  // An optional logger for debugging.

	OnError      func(context.Context, error)          // Called for indexer errors.
	OnFlushStart func(context.Context) context.Context // Called when the flush starts.
	OnFlushEnd   func(context.Context)                 // Called when the flush ends.

	// Parameters of the Bulk API.
	Index               string
	ErrorTrace          bool
	FilterPath          []string
	Header              http.Header
	Human               bool
	Pipeline            string
	Pretty              bool
	Refresh             string
	Routing             string
	Source              []string
	SourceExcludes      []string
	SourceIncludes      []string
	Timeout             time.Duration
	WaitForActiveShards string
}

BulkIndexerConfig represents configuration of the indexer.

type BulkIndexerDebugLogger

type BulkIndexerDebugLogger interface {
	Printf(string, ...interface{})
}

BulkIndexerDebugLogger defines the interface for a debugging logger.

type BulkIndexerItem

type BulkIndexerItem struct {
	Index               string
	Action              string
	DocumentID          string
	Routing             *string
	Version             *int64
	VersionType         *string
	IfSeqNum            *int64
	IfPrimaryTerm       *int64
	WaitForActiveShards interface{}
	Refresh             *string
	RequireAlias        *bool
	Body                io.ReadSeeker
	RetryOnConflict     *int

	OnSuccess func(context.Context, BulkIndexerItem, BulkIndexerResponseItem)        // Per item
	OnFailure func(context.Context, BulkIndexerItem, BulkIndexerResponseItem, error) // Per item
}

BulkIndexerItem represents an indexer item.

type BulkIndexerResponse

type BulkIndexerResponse struct {
	Took      int                                  `json:"took"`
	HasErrors bool                                 `json:"errors"`
	Items     []map[string]BulkIndexerResponseItem `json:"items,omitempty"`
}

BulkIndexerResponse represents the OpenSearch response.

type BulkIndexerResponseItem

type BulkIndexerResponseItem struct {
	Index      string `json:"_index"`
	DocumentID string `json:"_id"`
	Version    int64  `json:"_version"`
	Result     string `json:"result"`
	Status     int    `json:"status"`
	SeqNo      int64  `json:"_seq_no"`
	PrimTerm   int64  `json:"_primary_term"`

	Shards struct {
		Total      int `json:"total"`
		Successful int `json:"successful"`
		Failed     int `json:"failed"`
	} `json:"_shards"`

	Error struct {
		Type   string `json:"type"`
		Reason string `json:"reason"`
		Cause  struct {
			Type        string    `json:"type"`
			Reason      string    `json:"reason"`
			ScriptStack *[]string `json:"script_stack,omitempty"`
			Script      *string   `json:"script,omitempty"`
			Lang        *string   `json:"lang,omitempty"`
			Position    *struct {
				Offset int `json:"offset"`
				Start  int `json:"start"`
				End    int `json:"end"`
			} `json:"position,omitempty"`
			Cause *struct {
				Type   string `json:"type"`
				Reason string `json:"reason"`
			} `json:"caused_by"`
		} `json:"caused_by,omitempty"`
	} `json:"error,omitempty"`
}

BulkIndexerResponseItem represents the OpenSearch response item.

type BulkIndexerStats

type BulkIndexerStats struct {
	NumAdded    uint64
	NumFlushed  uint64
	NumFailed   uint64
	NumIndexed  uint64
	NumCreated  uint64
	NumUpdated  uint64
	NumDeleted  uint64
	NumRequests uint64
}

BulkIndexerStats represents the indexer statistics.

type BulkResponseJSONDecoder

type BulkResponseJSONDecoder interface {
	UnmarshalFromReader(io.Reader, *BulkIndexerResponse) error
}

BulkResponseJSONDecoder defines the interface for custom JSON decoders.

type JSONEncoder

type JSONEncoder interface {
	EncodeJSON(io.Writer) error
}

JSONEncoder defines the interface for custom JSON encoders.

type JSONReader

type JSONReader struct {
	// contains filtered or unexported fields
}

JSONReader represents a reader which takes an interface value, encodes it into JSON, and wraps it in an io.Reader.

func (*JSONReader) Read

func (r *JSONReader) Read(p []byte) (int, error)

Read implements the io.Reader interface.

func (*JSONReader) WriteTo

func (r *JSONReader) WriteTo(w io.Writer) (int64, error)

WriteTo implements the io.WriterTo interface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL