core

package
v0.0.0-...-56f2276 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 14, 2020 License: Apache-2.0 Imports: 13 Imported by: 2

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// Max buffer size in bytes before flushing to elasticsearch
	BulkMaxBuffer = 1048576
	// Max number of Docs to hold in buffer before forcing flush
	BulkMaxDocs = 100
	// Max delay before forcing a flush to Elasticearch
	BulkDelaySeconds = 5
	// Keep a running total of errors seen, since it is in the background
	BulkErrorCt uint64
	// maximum wait shutdown seconds
	MAX_SHUTDOWN_SECS = 5

	// There is one Global Bulk Indexer for convenience
	GlobalBulkIndexer *BulkIndexer
)
View Source
var (
	DebugRequests = false
)

Functions

func BulkIndexerGlobalRun

func BulkIndexerGlobalRun(maxConns int, done chan bool)

There is one global bulk indexer available for convenience so the IndexBulk() function can be called. However, the recommended usage is create your own BulkIndexer to allow for multiple seperate elasticsearch servers/host connections.

 @maxConns is the max number of in flight http requests
 @done is a channel to cause the indexer to stop

done := make(chan bool)
BulkIndexerGlobalRun(100, done)

func BulkSend

func BulkSend(buf *bytes.Buffer) error

This does the actual send of a buffer, which has already been formatted into bytes of ES formatted bulk data

func Delete

func Delete(pretty bool, index string, _type string, id string, version int, routing string) (api.BaseResponse, error)

Delete API allows to delete a typed JSON document from a specific index based on its id. http://www.elasticsearch.org/guide/reference/api/delete.html todo: add routing and versioning support

func DeleteByQuery

func DeleteByQuery(pretty bool, indices []string, types []string, query interface{}) (api.BaseResponse, error)

DeleteByQuery allows the caller to delete documents from one or more indices and one or more types based on a query. The query can either be provided using a simple query string as a parameter, or using the Query DSL defined within the request body. see: http://www.elasticsearch.org/guide/reference/api/delete-by-query.html

func Exists

func Exists(pretty bool, index string, _type string, id string) (bool, error)

Exists allows caller to check for the existance of a document using HEAD

func ExistsIndex

func ExistsIndex(pretty bool, index string, _type string) (bool, error)

ExistsIndex allows caller to check for the existance of an index or a type using HEAD

func Explain

func Explain(pretty bool, index string, _type string, id string, query string) (api.Match, error)

Explain computes a score explanation for a query and a specific document. This can give useful feedback whether a document matches or didn’t match a specific query. This feature is available from version 0.19.9 and up. see http://www.elasticsearch.org/guide/reference/api/explain.html

func Get

func Get(pretty bool, index string, _type string, id string) (api.BaseResponse, error)

Get allows caller to get a typed JSON document from the index based on its id. GET - retrieves the doc HEAD - checks for existence of the doc http://www.elasticsearch.org/guide/reference/api/get.html TODO: make this implement an interface

func GetIndexUrl

func GetIndexUrl(index string, _type string, id string, parentId string, version int, op_type string,
	routing string, timestamp string, ttl int, percolate string, timeout string, refresh bool) (retval string, e error)

func GetSource

func GetSource(index string, _type string, id string, source interface{}) error

GetSource retrieves the document by id and converts it to provided interface

func Index

func Index(pretty bool, index string, _type string, id string, data interface{}) (api.BaseResponse, error)

Index adds or updates a typed JSON document in a specific index, making it searchable, creating an index if it did not exist. if id is omited, op_type 'create' will be pased and http method will default to "POST" id is optional parentId is optional version is optional op_type is optional routing is optional timestamp is optional ttl is optional percolate is optional timeout is optional http://www.elasticsearch.org/guide/reference/api/index_.html

func IndexBulk

func IndexBulk(index string, _type string, id string, date *time.Time, data interface{}) error

The index bulk API adds or updates a typed JSON document to a specific index, making it searchable. it operates by buffering requests, and ocassionally flushing to elasticsearch

This uses the one Global Bulk Indexer, you can also create your own non-global indexers and use the Index functions of that

http://www.elasticsearch.org/guide/reference/api/bulk.html

func IndexBulkTtl

func IndexBulkTtl(index string, _type string, id, ttl string, date *time.Time, data interface{}) error

The index bulk API adds or updates a typed JSON document to a specific index, making it searchable. it operates by buffering requests, and ocassionally flushing to elasticsearch.

This uses the one Global Bulk Indexer, you can also create your own non-global indexers and use the IndexTtl functions of that

http://www.elasticsearch.org/guide/reference/api/bulk.html

func IndexWithParameters

func IndexWithParameters(pretty bool, index string, _type string, id string, parentId string, version int, op_type string,
	routing string, timestamp string, ttl int, percolate string, timeout string, refresh bool, data interface{}) (api.BaseResponse, error)

IndexWithParameters takes all the potential parameters available

func MoreLikeThis

func MoreLikeThis(pretty bool, index string, _type string, id string, query MoreLikeThisQuery) (api.BaseResponse, error)

MoreLikeThis allows the caller to get documents that are “like” a specified document. http://www.elasticsearch.org/guide/reference/api/more-like-this.html

func Percolate

func Percolate(pretty bool, index string, _type string, name string, doc string) (api.Match, error)

func RegisterPercolate

func RegisterPercolate(pretty bool, index string, name string, query api.Query) (api.BaseResponse, error)

RegisterPercolate allows the caller to register queries against an index, and then send percolate requests which include a doc, and getting back the queries that match on that doc out of the set of registered queries. Think of it as the reverse operation of indexing and then searching. Instead of sending docs, indexing them, and then running queries. One sends queries, registers them, and then sends docs and finds out which queries match that doc. see http://www.elasticsearch.org/guide/reference/api/percolate.html

func Update

func Update(pretty bool, index string, _type string, id string, data interface{}) (api.BaseResponse, error)

Update updates a document based on a script provided. The operation gets the document (collocated with the shard) from the index, runs the script (with optional script language and parameters), and index back the result (also allows to delete, or ignore the operation). It uses versioning to make sure no updates have happened during the “get” and “reindex”. (available from 0.19 onwards). Note, this operation still means full reindex of the document, it just removes some network roundtrips and reduces chances of version conflicts between the get and the index. The _source field need to be enabled for this feature to work.

http://www.elasticsearch.org/guide/reference/api/update.html TODO: finish this, it's fairly complex

func UpdateBulk

func UpdateBulk(index string, _type string, id string, date *time.Time, data interface{}) error

func UpdateBulkTtl

func UpdateBulkTtl(index string, _type string, id, ttl string, date *time.Time, data interface{}) error

func UpdateWithPartialDoc

func UpdateWithPartialDoc(pretty bool, index string, _type string, id string, doc interface{}, upsert bool) (api.BaseResponse, error)

UpdateWithPartialDoc updates a document based on partial document provided. The update API also support passing a partial document (since 0.20), which will be merged into the existing document (simple recursive merge, inner merging of objects, replacing core "keys/values" and arrays). If both doc and script is specified, then doc is ignored. Best is to put your field pairs of the partial document in the script itself.

http://www.elasticsearch.org/guide/reference/api/update.html

func UpdateWithScript

func UpdateWithScript(pretty bool, index string, _type string, id string, script string, params interface{}) (api.BaseResponse, error)

UpdateWithScript updates a document based on a script provided. The operation gets the document (collocated with the shard) from the index, runs the script (with optional script language and parameters), and index back the result (also allows to delete, or ignore the operation). It uses versioning to make sure no updates have happened during the "get" and "reindex". (available from 0.19 onwards).

Note, this operation still means full reindex of the document, it just removes some network roundtrips and reduces chances of version conflicts between the get and the index. The _source field need to be enabled for this feature to work. http://www.elasticsearch.org/guide/reference/api/update.html

func Validate

func Validate(pretty bool, index string, _type string, query string, explain bool) (api.BaseResponse, error)

Validate allows a user to validate a potentially expensive query without executing it. see http://www.elasticsearch.org/guide/reference/api/validate.html

func WriteBulkBytes

func WriteBulkBytes(op string, index string, _type string, id, ttl string, date *time.Time, data interface{}) ([]byte, error)

Given a set of arguments for index, type, id, data create a set of bytes that is formatted for bulkd index http://www.elasticsearch.org/guide/reference/api/bulk.html

Types

type BulkIndexer

type BulkIndexer struct {

	// We are creating a variable defining the func responsible for sending
	// to allow a mock sendor for test purposes
	BulkSendor func(*bytes.Buffer) error

	// If we encounter an error in sending, we are going to retry for this long
	// before returning an error
	// if 0 it will not retry
	RetryForSeconds int

	// channel for getting errors
	ErrorChannel chan *ErrorBuffer

	// Buffer for Max number of time before forcing flush
	BufferDelayMax time.Duration
	// Max buffer size in bytes before flushing to elasticsearch
	BulkMaxBuffer int // 1048576
	// Max number of Docs to hold in buffer before forcing flush
	BulkMaxDocs int // 100
	// contains filtered or unexported fields
}

A bulk indexer creates goroutines, and channels for connecting and sending data to elasticsearch in bulk, using buffers.

Example (Errorchannel)

The simplest usage of background bulk indexing with error channel

package main

import (
	"fmt"
	"github.com/zenoss/elastigo/core"
	"strconv"
)

func main() {
	indexer := core.NewBulkIndexerErrors(10, 60)
	done := make(chan bool)
	indexer.Run(done)

	go func() {
		for errBuf := range indexer.ErrorChannel {
			// just blissfully print errors forever
			fmt.Println(errBuf.Err)
		}
	}()
	for i := 0; i < 20; i++ {
		indexer.Index("twitter", "user", strconv.Itoa(i), "", nil, `{"name":"bob"}`)
	}
	done <- true
}
Output:

Example (Errorsmarter)

The simplest usage of background bulk indexing with error channel

package main

import (
	"fmt"
	"github.com/zenoss/elastigo/core"
	"strconv"
	"time"
)

func main() {
	indexer := core.NewBulkIndexerErrors(10, 60)
	done := make(chan bool)
	indexer.Run(done)

	errorCt := 0 // use sync.atomic or something if you need
	timer := time.NewTicker(time.Minute * 3)
	go func() {
		for {
			select {
			case _ = <-timer.C:
				if errorCt < 2 {
					errorCt = 0
				}
			case _ = <-done:
				return
			}
		}
	}()

	go func() {
		for errBuf := range indexer.ErrorChannel {
			errorCt++
			fmt.Println(errBuf.Err)
			// log to disk?  db?   ????  Panic
		}
	}()
	for i := 0; i < 20; i++ {
		indexer.Index("twitter", "user", strconv.Itoa(i), "", nil, `{"name":"bob"}`)
	}
	done <- true // send shutdown signal
}
Output:

Example (Responses)

The inspecting the response

package main

import (
	"bytes"
	"fmt"
	"github.com/zenoss/elastigo/api"
	"github.com/zenoss/elastigo/core"
	"strconv"
)

func main() {
	indexer := core.NewBulkIndexer(10)
	// Create a custom Sendor Func, to allow inspection of response/error
	indexer.BulkSendor = func(buf *bytes.Buffer) error {
		// @buf is the buffer of docs about to be written
		respJson, err := api.DoCommand("POST", "/_bulk", buf)
		if err != nil {
			// handle it better than this
			fmt.Println(string(respJson))
		}
		return err
	}
	done := make(chan bool)
	indexer.Run(done)

	for i := 0; i < 20; i++ {
		indexer.Index("twitter", "user", strconv.Itoa(i), "", nil, `{"name":"bob"}`)
	}
	done <- true // send shutdown signal
}
Output:

Example (Simple)

The simplest usage of background bulk indexing

package main

import (
	"github.com/zenoss/elastigo/core"
)

func main() {
	indexer := core.NewBulkIndexerErrors(10, 60)
	done := make(chan bool)
	indexer.Run(done)

	indexer.Index("twitter", "user", "1", "", nil, `{"name":"bob"}`)

	<-done // wait forever
}
Output:

func NewBulkIndexer

func NewBulkIndexer(maxConns int) *BulkIndexer

func NewBulkIndexerErrors

func NewBulkIndexerErrors(maxConns, retrySeconds int) *BulkIndexer

A bulk indexer with more control over error handling

 @maxConns is the max number of in flight http requests
 @retrySeconds is # of seconds to wait before retrying falied requests

done := make(chan bool)
BulkIndexerGlobalRun(100, done)

func (*BulkIndexer) Flush

func (b *BulkIndexer) Flush()

Flush all current documents to ElasticSearch

func (*BulkIndexer) Index

func (b *BulkIndexer) Index(index string, _type string, id, ttl string, date *time.Time, data interface{}) error

The index bulk API adds or updates a typed JSON document to a specific index, making it searchable. it operates by buffering requests, and ocassionally flushing to elasticsearch http://www.elasticsearch.org/guide/reference/api/bulk.html

func (*BulkIndexer) Run

func (b *BulkIndexer) Run(done chan bool)

Starts this bulk Indexer running, this Run opens a go routine so is Non blocking

func (*BulkIndexer) Update

func (b *BulkIndexer) Update(index string, _type string, id, ttl string, date *time.Time, data interface{}) error

type CountResponse

type CountResponse struct {
	Count int        `json:"count"`
	Shard api.Status `json:"_shards"`
}

func Count

func Count(pretty bool, index string, _type string) (CountResponse, error)

Count allows the caller to easily execute a query and get the number of matches for that query. It can be executed across one or more indices and across one or more types. The query can either be provided using a simple query string as a parameter, or using the Query DSL defined within the request body. http://www.elasticsearch.org/guide/reference/api/count.html TODO: take parameters. currently not working against 0.19.10

type DeleteByQueryResponse

type DeleteByQueryResponse struct {
	Status   bool                   `json:"ok"`
	Indicies map[string]IndexStatus `json:"_indices"`
}

type ErrorBuffer

type ErrorBuffer struct {
	Err error
	Buf *bytes.Buffer
}

type Explaination

type Explaination struct {
	Index string `json:"index"`
	Valid bool   `json:"valid"`
	Error string `json:"error"`
}

type Float32Nullable

type Float32Nullable float32

Elasticsearch returns some invalid (according to go) json, with floats having...

json: cannot unmarshal null into Go value of type float32 (see last field.)

"hits":{"total":6808,"max_score":null,

"hits":[{"_index":"10user","_type":"user","_id":"751820","_score":null,

func (*Float32Nullable) UnmarshalJSON

func (i *Float32Nullable) UnmarshalJSON(data []byte) error

type Hit

type Hit struct {
	Index   string          `json:"_index"`
	Type    string          `json:"_type,omitempty"`
	Id      string          `json:"_id"`
	Score   Float32Nullable `json:"_score,omitempty"` // Filters (no query) dont have score, so is null
	Source  json.RawMessage `json:"_source"`          // marshalling left to consumer
	Fields  json.RawMessage `json:"fields"`           // when a field arg is passed to ES, instead of _source it returns fields
	Version int             `json:"_version"`
}

type Hits

type Hits struct {
	Total int `json:"total"`
	//	MaxScore float32 `json:"max_score"`
	Hits []Hit `json:"hits"`
}

func (*Hits) Len

func (h *Hits) Len() int

type IndexStatus

type IndexStatus struct {
	Shards api.Status `json:"_shards"`
}

type MGetRequest

type MGetRequest struct {
	Index  string   `json:"_index"`
	Type   string   `json:"_type"`
	ID     string   `json:"_id"`
	IDS    []string `json:"_ids,omitempty"`
	Fields []string `json:"fields,omitempty"`
}

type MGetRequestContainer

type MGetRequestContainer struct {
	Docs []MGetRequest `json:"docs"`
}

type MGetResponseContainer

type MGetResponseContainer struct {
	Docs []api.BaseResponse `json:"docs"`
}

func MGet

func MGet(pretty bool, index string, _type string, mgetRequest MGetRequestContainer) (MGetResponseContainer, error)

MGet allows the caller to get multiple documents based on an index, type (optional) and id (and possibly routing). The response includes a docs array with all the fetched documents, each element similar in structure to a document provided by the get API. see http://www.elasticsearch.org/guide/reference/api/multi-get.html

type MLT

type MLT struct {
	Fields              []string `json:"fields"`
	LikeText            string   `json:"like_text"`
	PercentTermsToMatch float32  `json:"percent_terms_to_match"`
	MinTermFrequency    int      `json:"min_term_freq"`
	MaxQueryTerms       int      `json:"max_query_terms"`
	StopWords           []string `json:"stop_words"`
	MinDocFrequency     int      `json:"min_doc_freq"`
	MaxDocFrequency     int      `json:"max_doc_freq"`
	MinWordLength       int      `json:"min_word_len"`
	MaxWordLength       int      `json:"max_word_len"`
	BoostTerms          int      `json:"boost_terms"`
	Boost               float32  `json:"boost"`
	Analyzer            string   `json:"analyzer"`
}

type MoreLikeThisQuery

type MoreLikeThisQuery struct {
	MoreLikeThis MLT `json:"more_like_this"`
}

type SearchResult

type SearchResult struct {
	Took        int             `json:"took"`
	TimedOut    bool            `json:"timed_out"`
	ShardStatus api.Status      `json:"_shards"`
	Hits        Hits            `json:"hits"`
	Facets      json.RawMessage `json:"facets,omitempty"` // structure varies on query
	ScrollId    string          `json:"_scroll_id,omitempty"`
}

func Scroll

func Scroll(pretty bool, scroll_id string, scroll string) (SearchResult, error)

func SearchRequest

func SearchRequest(pretty bool, index string, _type string, query interface{}, scroll string, scan int) (SearchResult, error)

SearchRequest performs a very basic search on an index via the request URI API.

params:

@pretty:  bool for pretty reply or not, a parameter to elasticsearch
@index:  the elasticsearch index
@_type:  optional ("" if not used) search specific type in this index
@query:  this can be one of 3 types:
           1)  string value that is valid elasticsearch
           2)  io.Reader that can be set in body (also valid elasticsearch string syntax..)
           3)  other type marshalable to json (also valid elasticsearch json)

out, err := SearchRequest(true, "github","",qryType ,"", 0)

http://www.elasticsearch.org/guide/reference/api/search/uri-request.html

func SearchUri

func SearchUri(index, _type string, query, scroll string, scan int) (SearchResult, error)

SearchUri performs the simplest possible query in url string params:

@index:  the elasticsearch index
@_type:  optional ("" if not used) search specific type in this index
@query:  valid string lucene search syntax

out, err := SearchUri("github","",`user:kimchy` ,"", 0)

produces a request like this: host:9200/github/_search?q=user:kimchy"

http://www.elasticsearch.org/guide/reference/api/search/uri-request.html

func (*SearchResult) String

func (s *SearchResult) String() string

type Validation

type Validation struct {
	Valid         bool           `json:"valid"`
	Shards        api.Status     `json:"_shards"`
	Explainations []Explaination `json:"explanations,omitempty"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL