Documentation ¶
Index ¶
- Variables
- func BulkIndexerGlobalRun(maxConns int, done chan bool)
- func BulkSend(buf *bytes.Buffer) error
- func Delete(index string, _type string, id string, args map[string]interface{}) (api.BaseResponse, error)
- func DeleteByQuery(indices []string, types []string, args map[string]interface{}, ...) (api.BaseResponse, error)
- func Exists(index string, _type string, id string, args map[string]interface{}) (bool, error)
- func ExistsIndex(index string, _type string, args map[string]interface{}) (bool, error)
- func Explain(index string, _type string, id string, args map[string]interface{}, ...) (api.Match, error)
- func Get(index string, _type string, id string, args map[string]interface{}) (api.BaseResponse, error)
- func GetCustom(index string, _type string, id string, args map[string]interface{}, ...) (api.BaseResponse, error)
- func GetIndexUrl(index string, _type string, id string, parentId string, version int, ...) (retval string, e error)
- func GetSource(index string, _type string, id string, args map[string]interface{}, ...) error
- func Index(index string, _type string, id string, args map[string]interface{}, ...) (api.BaseResponse, error)
- func IndexBulk(index string, _type string, id string, date *time.Time, data interface{}, ...) error
- func IndexBulkTtl(index string, _type string, id, ttl string, date *time.Time, data interface{}, ...) error
- func IndexWithParameters(index string, _type string, id string, parentId string, version int, ...) (api.BaseResponse, error)
- func MoreLikeThis(index string, _type string, id string, args map[string]interface{}, ...) (api.BaseResponse, error)
- func Percolate(index string, _type string, name string, args map[string]interface{}, ...) (api.Match, error)
- func RegisterPercolate(index string, name string, args map[string]interface{}, query api.Query) (api.BaseResponse, error)
- func Update(index string, _type string, id string, args map[string]interface{}, ...) (api.BaseResponse, error)
- func UpdateBulk(index string, _type string, id string, date *time.Time, data interface{}, ...) error
- func UpdateBulkTtl(index string, _type string, id, ttl string, date *time.Time, data interface{}, ...) error
- func UpdateWithPartialDoc(index string, _type string, id string, args map[string]interface{}, ...) (api.BaseResponse, error)
- func UpdateWithScript(index string, _type string, id string, args map[string]interface{}, ...) (api.BaseResponse, error)
- func Validate(index string, _type string, args map[string]interface{}) (api.BaseResponse, error)
- func WriteBulkBytes(op string, index string, _type string, id, ttl string, date *time.Time, ...) ([]byte, error)
- type BulkIndexer
- func (b *BulkIndexer) Flush()
- func (b *BulkIndexer) Index(index string, _type string, id, ttl string, date *time.Time, data interface{}, ...) error
- func (b *BulkIndexer) PendingDocuments() int
- func (b *BulkIndexer) Run(done chan bool)
- func (b *BulkIndexer) Update(index string, _type string, id, ttl string, date *time.Time, data interface{}, ...) error
- type CountResponse
- type DeleteByQueryResponse
- type ErrorBuffer
- type Explaination
- type Explanation
- type Float32Nullable
- type Hit
- type Hits
- type IndexStatus
- type MGetRequest
- type MGetRequestContainer
- type MGetResponseContainer
- type MLT
- type MoreLikeThisQuery
- type SearchResult
- type Validation
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // Max buffer size in bytes before flushing to elasticsearch BulkMaxBuffer = 1048576 // Max number of Docs to hold in buffer before forcing flush BulkMaxDocs = 100 // Max delay before forcing a flush to Elasticearch BulkDelaySeconds = 5 // Keep a running total of errors seen, since it is in the background BulkErrorCt uint64 // maximum wait shutdown seconds MAX_SHUTDOWN_SECS = 5 // There is one Global Bulk Indexer for convenience GlobalBulkIndexer *BulkIndexer )
var (
DebugRequests = false
)
var VerboseLogging bool = true
VerboseLogging controls whether elastigo will log more information about its actions. Set it to false for less logging.
Functions ¶
func BulkIndexerGlobalRun ¶
There is one global bulk indexer available for convenience so the IndexBulk() function can be called. However, the recommended usage is create your own BulkIndexer to allow for multiple seperate elasticsearch servers/host connections.
@maxConns is the max number of in flight http requests @done is a channel to cause the indexer to stop done := make(chan bool) BulkIndexerGlobalRun(100, done)
func BulkSend ¶
This does the actual send of a buffer, which has already been formatted into bytes of ES formatted bulk data
func Delete ¶
func Delete(index string, _type string, id string, args map[string]interface{}) (api.BaseResponse, error)
Delete API allows to delete a typed JSON document from a specific index based on its id. http://www.elasticsearch.org/guide/reference/api/delete.html
func DeleteByQuery ¶
func DeleteByQuery(indices []string, types []string, args map[string]interface{}, query interface{}) (api.BaseResponse, error)
DeleteByQuery allows the caller to delete documents from one or more indices and one or more types based on a query. The query can either be provided using a simple query string as a parameter, or using the Query DSL defined within the request body. see: http://www.elasticsearch.org/guide/reference/api/delete-by-query.html
func ExistsIndex ¶
ExistsIndex allows caller to check for the existance of an index or a type using HEAD
func Explain ¶
func Explain(index string, _type string, id string, args map[string]interface{}, query string) (api.Match, error)
Explain computes a score explanation for a query and a specific document. This can give useful feedback whether a document matches or didn’t match a specific query. This feature is available from version 0.19.9 and up. see http://www.elasticsearch.org/guide/reference/api/explain.html
func Get ¶
func Get(index string, _type string, id string, args map[string]interface{}) (api.BaseResponse, error)
The get API allows to get a typed JSON document from the index based on its id. GET - retrieves the doc HEAD - checks for existence of the doc http://www.elasticsearch.org/guide/reference/api/get.html TODO: make this implement an interface
func GetCustom ¶
func GetCustom(index string, _type string, id string, args map[string]interface{}, source interface{}) (api.BaseResponse, error)
Same as Get but with custom source type.
func GetIndexUrl ¶
func GetSource ¶
func GetSource(index string, _type string, id string, args map[string]interface{}, source interface{}) error
GetSource retrieves the document by id and converts it to provided interface
func Index ¶
func Index(index string, _type string, id string, args map[string]interface{}, data interface{}) (api.BaseResponse, error)
Index adds or updates a typed JSON document in a specific index, making it searchable, creating an index if it did not exist. if id is omited, op_type 'create' will be passed and http method will default to "POST" _type is optional id is optional parentId is optional version is optional op_type is optional routing is optional timestamp is optional ttl is optional percolate is optional timeout is optional http://www.elasticsearch.org/guide/reference/api/index_.html
func IndexBulk ¶
func IndexBulk(index string, _type string, id string, date *time.Time, data interface{}, refresh bool) error
The index bulk API adds or updates a typed JSON document to a specific index, making it searchable. it operates by buffering requests, and ocassionally flushing to elasticsearch
This uses the one Global Bulk Indexer, you can also create your own non-global indexers and use the Index functions of that
func IndexBulkTtl ¶
func IndexBulkTtl(index string, _type string, id, ttl string, date *time.Time, data interface{}, refresh bool) error
The index bulk API adds or updates a typed JSON document to a specific index, making it searchable. it operates by buffering requests, and ocassionally flushing to elasticsearch.
This uses the one Global Bulk Indexer, you can also create your own non-global indexers and use the IndexTtl functions of that
func IndexWithParameters ¶
func IndexWithParameters(index string, _type string, id string, parentId string, version int, op_type string, routing string, timestamp string, ttl int, percolate string, timeout string, refresh bool, args map[string]interface{}, data interface{}) (api.BaseResponse, error)
IndexWithParameters takes all the potential parameters available
func MoreLikeThis ¶
func MoreLikeThis(index string, _type string, id string, args map[string]interface{}, query MoreLikeThisQuery) (api.BaseResponse, error)
MoreLikeThis allows the caller to get documents that are “like” a specified document. http://www.elasticsearch.org/guide/reference/api/more-like-this.html
func RegisterPercolate ¶
func RegisterPercolate(index string, name string, args map[string]interface{}, query api.Query) (api.BaseResponse, error)
RegisterPercolate allows the caller to register queries against an index, and then send percolate requests which include a doc, and getting back the queries that match on that doc out of the set of registered queries. Think of it as the reverse operation of indexing and then searching. Instead of sending docs, indexing them, and then running queries. One sends queries, registers them, and then sends docs and finds out which queries match that doc. see http://www.elasticsearch.org/guide/reference/api/percolate.html
func Update ¶
func Update(index string, _type string, id string, args map[string]interface{}, data interface{}) (api.BaseResponse, error)
Update updates a document based on a script provided. The operation gets the document (collocated with the shard) from the index, runs the script (with optional script language and parameters), and index back the result (also allows to delete, or ignore the operation). It uses versioning to make sure no updates have happened during the “get” and “reindex”. (available from 0.19 onwards). Note, this operation still means full reindex of the document, it just removes some network roundtrips and reduces chances of version conflicts between the get and the index. The _source field need to be enabled for this feature to work.
http://www.elasticsearch.org/guide/reference/api/update.html TODO: finish this, it's fairly complex
func UpdateBulk ¶
func UpdateBulkTtl ¶
func UpdateWithPartialDoc ¶
func UpdateWithPartialDoc(index string, _type string, id string, args map[string]interface{}, doc interface{}, upsert bool) (api.BaseResponse, error)
UpdateWithPartialDoc updates a document based on partial document provided. The update API also support passing a partial document (since 0.20), which will be merged into the existing document (simple recursive merge, inner merging of objects, replacing core "keys/values" and arrays). If both doc and script is specified, then doc is ignored. Best is to put your field pairs of the partial document in the script itself.
http://www.elasticsearch.org/guide/reference/api/update.html
func UpdateWithScript ¶
func UpdateWithScript(index string, _type string, id string, args map[string]interface{}, script string, params interface{}) (api.BaseResponse, error)
UpdateWithScript updates a document based on a script provided. The operation gets the document (collocated with the shard) from the index, runs the script (with optional script language and parameters), and index back the result (also allows to delete, or ignore the operation). It uses versioning to make sure no updates have happened during the "get" and "reindex". (available from 0.19 onwards).
Note, this operation still means full reindex of the document, it just removes some network roundtrips and reduces chances of version conflicts between the get and the index. The _source field need to be enabled for this feature to work. http://www.elasticsearch.org/guide/reference/api/update.html
func Validate ¶
Validate allows a user to validate a potentially expensive query without executing it. see http://www.elasticsearch.org/guide/reference/api/validate.html
func WriteBulkBytes ¶
func WriteBulkBytes(op string, index string, _type string, id, ttl string, date *time.Time, data interface{}, refresh bool) ([]byte, error)
Given a set of arguments for index, type, id, data create a set of bytes that is formatted for bulkd index http://www.elasticsearch.org/guide/reference/api/bulk.html
Types ¶
type BulkIndexer ¶
type BulkIndexer struct { // We are creating a variable defining the func responsible for sending // to allow a mock sendor for test purposes BulkSender func(*bytes.Buffer) error // Deprecated, for backwards compatibility BulkSendor func(*bytes.Buffer) error // If we encounter an error in sending, we are going to retry for this long // before returning an error // if 0 it will not retry RetryForSeconds int // channel for getting errors ErrorChannel chan *ErrorBuffer // Buffer for Max number of time before forcing flush BufferDelayMax time.Duration // Max buffer size in bytes before flushing to elasticsearch BulkMaxBuffer int // 1048576 // Max number of Docs to hold in buffer before forcing flush BulkMaxDocs int // 100 // contains filtered or unexported fields }
A bulk indexer creates goroutines, and channels for connecting and sending data to elasticsearch in bulk, using buffers.
Example (Errorchannel) ¶
The simplest usage of background bulk indexing with error channel
package main import ( "fmt" "github.com/packetbeat/elastigo/core" "strconv" ) func main() { indexer := core.NewBulkIndexerErrors(10, 60) done := make(chan bool) indexer.Run(done) go func() { for errBuf := range indexer.ErrorChannel { // just blissfully print errors forever fmt.Println(errBuf.Err) } }() for i := 0; i < 20; i++ { indexer.Index("twitter", "user", strconv.Itoa(i), "", nil, `{"name":"bob"}`, true) } done <- true }
Output:
Example (Errorsmarter) ¶
The simplest usage of background bulk indexing with error channel
package main import ( "fmt" "github.com/packetbeat/elastigo/core" "strconv" "time" ) func main() { indexer := core.NewBulkIndexerErrors(10, 60) done := make(chan bool) indexer.Run(done) errorCt := 0 // use sync.atomic or something if you need timer := time.NewTicker(time.Minute * 3) go func() { for { select { case _ = <-timer.C: if errorCt < 2 { errorCt = 0 } case _ = <-done: return } } }() go func() { for errBuf := range indexer.ErrorChannel { errorCt++ fmt.Println(errBuf.Err) // log to disk? db? ???? Panic } }() for i := 0; i < 20; i++ { indexer.Index("twitter", "user", strconv.Itoa(i), "", nil, `{"name":"bob"}`, true) } done <- true // send shutdown signal }
Output:
Example (Responses) ¶
The inspecting the response
package main import ( "bytes" "fmt" "github.com/packetbeat/elastigo/api" "github.com/packetbeat/elastigo/core" "strconv" ) func main() { indexer := core.NewBulkIndexer(10) // Create a custom Sender Func, to allow inspection of response/error indexer.BulkSender = func(buf *bytes.Buffer) error { // @buf is the buffer of docs about to be written respJson, err := api.DoCommand("POST", "/_bulk", nil, buf) if err != nil { // handle it better than this fmt.Println(string(respJson)) } return err } done := make(chan bool) indexer.Run(done) for i := 0; i < 20; i++ { indexer.Index("twitter", "user", strconv.Itoa(i), "", nil, `{"name":"bob"}`, true) } done <- true // send shutdown signal }
Output:
Example (Simple) ¶
The simplest usage of background bulk indexing
package main import ( "github.com/packetbeat/elastigo/core" ) func main() { indexer := core.NewBulkIndexerErrors(10, 60) done := make(chan bool) indexer.Run(done) indexer.Index("twitter", "user", "1", "", nil, `{"name":"bob"}`, true) <-done // wait forever }
Output:
func NewBulkIndexer ¶
func NewBulkIndexer(maxConns int) *BulkIndexer
func NewBulkIndexerErrors ¶
func NewBulkIndexerErrors(maxConns, retrySeconds int) *BulkIndexer
A bulk indexer with more control over error handling
@maxConns is the max number of in flight http requests @retrySeconds is # of seconds to wait before retrying falied requests done := make(chan bool) BulkIndexerGlobalRun(100, done)
func (*BulkIndexer) Flush ¶
func (b *BulkIndexer) Flush()
Flush all current documents to ElasticSearch
func (*BulkIndexer) Index ¶
func (b *BulkIndexer) Index(index string, _type string, id, ttl string, date *time.Time, data interface{}, refresh bool) error
The index bulk API adds or updates a typed JSON document to a specific index, making it searchable. it operates by buffering requests, and ocassionally flushing to elasticsearch http://www.elasticsearch.org/guide/reference/api/bulk.html
func (*BulkIndexer) PendingDocuments ¶
func (b *BulkIndexer) PendingDocuments() int
func (*BulkIndexer) Run ¶
func (b *BulkIndexer) Run(done chan bool)
Starts this bulk Indexer running, this Run opens a go routine so is Non blocking
type CountResponse ¶
func Count ¶
func Count(index string, _type string, args map[string]interface{}) (CountResponse, error)
Count allows the caller to easily execute a query and get the number of matches for that query. It can be executed across one or more indices and across one or more types. The query can either be provided using a simple query string as a parameter, or using the Query DSL defined within the request body. http://www.elasticsearch.org/guide/reference/api/count.html
type DeleteByQueryResponse ¶
type DeleteByQueryResponse struct { Status bool `json:"ok"` Indicies map[string]IndexStatus `json:"_indices"` }
type ErrorBuffer ¶
type Explaination ¶
type Explanation ¶
type Explanation struct { Value float32 `json:"value"` Description string `json:"description"` Details []*Explanation `json:"details,omitempty"` }
func (*Explanation) String ¶
func (e *Explanation) String(indent string) string
type Float32Nullable ¶
type Float32Nullable float32
Elasticsearch returns some invalid (according to go) json, with floats having...
json: cannot unmarshal null into Go value of type float32 (see last field.)
"hits":{"total":6808,"max_score":null,
"hits":[{"_index":"10user","_type":"user","_id":"751820","_score":null,
func (*Float32Nullable) UnmarshalJSON ¶
func (i *Float32Nullable) UnmarshalJSON(data []byte) error
type Hit ¶
type Hit struct { Index string `json:"_index"` Type string `json:"_type,omitempty"` Id string `json:"_id"` Score Float32Nullable `json:"_score,omitempty"` // Filters (no query) dont have score, so is null Source *json.RawMessage `json:"_source"` // marshalling left to consumer Fields *json.RawMessage `json:"fields"` // when a field arg is passed to ES, instead of _source it returns fields Explanation *Explanation `json:"_explanation,omitempty"` }
type Hits ¶
type IndexStatus ¶
type MGetRequest ¶
type MGetRequestContainer ¶
type MGetRequestContainer struct {
Docs []MGetRequest `json:"docs"`
}
type MGetResponseContainer ¶
type MGetResponseContainer struct {
Docs []api.BaseResponse `json:"docs"`
}
func MGet ¶
func MGet(index string, _type string, mgetRequest MGetRequestContainer, args map[string]interface{}) (MGetResponseContainer, error)
MGet allows the caller to get multiple documents based on an index, type (optional) and id (and possibly routing). The response includes a docs array with all the fetched documents, each element similar in structure to a document provided by the get API. see http://www.elasticsearch.org/guide/reference/api/multi-get.html
type MLT ¶
type MLT struct { Fields []string `json:"fields"` LikeText string `json:"like_text"` PercentTermsToMatch float32 `json:"percent_terms_to_match"` MinTermFrequency int `json:"min_term_freq"` MaxQueryTerms int `json:"max_query_terms"` StopWords []string `json:"stop_words"` MinDocFrequency int `json:"min_doc_freq"` MaxDocFrequency int `json:"max_doc_freq"` MinWordLength int `json:"min_word_len"` MaxWordLength int `json:"max_word_len"` BoostTerms int `json:"boost_terms"` Boost float32 `json:"boost"` Analyzer string `json:"analyzer"` }
type MoreLikeThisQuery ¶
type MoreLikeThisQuery struct {
MoreLikeThis MLT `json:"more_like_this"`
}
type SearchResult ¶
type SearchResult struct { Took int `json:"took"` TimedOut bool `json:"timed_out"` ShardStatus api.Status `json:"_shards"` Hits Hits `json:"hits"` Facets json.RawMessage `json:"facets,omitempty"` // structure varies on query ScrollId string `json:"_scroll_id,omitempty"` Aggregations json.RawMessage `json:"aggregations,omitempty"` // structure varies on query }
func SearchRequest ¶
func SearchRequest(index string, _type string, args map[string]interface{}, query interface{}) (SearchResult, error)
SearchRequest performs a very basic search on an index via the request URI API.
params:
@index: the elasticsearch index @_type: optional ("" if not used) search specific type in this index @args: a map of URL parameters. Allows all the URI-request parameters allowed by ElasticSearch. @query: this can be one of 3 types: 1) string value that is valid elasticsearch 2) io.Reader that can be set in body (also valid elasticsearch string syntax..) 3) other type marshalable to json (also valid elasticsearch json) out, err := SearchRequest(true, "github", map[string]interface{} {"from" : 10}, qryType)
http://www.elasticsearch.org/guide/reference/api/search/uri-request.html
func SearchUri ¶
func SearchUri(index, _type string, args map[string]interface{}) (SearchResult, error)
SearchUri performs the simplest possible query in url string params:
@index: the elasticsearch index @_type: optional ("" if not used) search specific type in this index @args: a map of URL parameters. Most important one is q out, err := SearchUri("github","", map[string]interface{} { "q" : `user:kimchy`})
produces a request like this: host:9200/github/_search?q=user:kimchy"
http://www.elasticsearch.org/guide/reference/api/search/uri-request.html
func (*SearchResult) String ¶
func (s *SearchResult) String() string
type Validation ¶
type Validation struct { Valid bool `json:"valid"` Shards api.Status `json:"_shards"` Explainations []Explaination `json:"explanations,omitempty"` }