core

package
v0.0.0-...-ed2d56a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 13, 2014 License: Apache-2.0 Imports: 13 Imported by: 10

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// Max buffer size in bytes before flushing to elasticsearch
	BulkMaxBuffer = 1048576
	// Max number of Docs to hold in buffer before forcing flush
	BulkMaxDocs = 100
	// Max delay before forcing a flush to Elasticearch
	BulkDelaySeconds = 5
	// Keep a running total of errors seen, since it is in the background
	BulkErrorCt uint64
	// maximum wait shutdown seconds
	MAX_SHUTDOWN_SECS = 5

	// There is one Global Bulk Indexer for convenience
	GlobalBulkIndexer *BulkIndexer
)
View Source
var (
	DebugRequests = false
)
View Source
var VerboseLogging bool = true

VerboseLogging controls whether elastigo will log more information about its actions. Set it to false for less logging.

Functions

func BulkIndexerGlobalRun

func BulkIndexerGlobalRun(maxConns int, done chan bool)

There is one global bulk indexer available for convenience so the IndexBulk() function can be called. However, the recommended usage is create your own BulkIndexer to allow for multiple seperate elasticsearch servers/host connections.

 @maxConns is the max number of in flight http requests
 @done is a channel to cause the indexer to stop

done := make(chan bool)
BulkIndexerGlobalRun(100, done)

func BulkSend

func BulkSend(buf *bytes.Buffer) error

This does the actual send of a buffer, which has already been formatted into bytes of ES formatted bulk data

func Delete

func Delete(index string, _type string, id string, args map[string]interface{}) (api.BaseResponse, error)

Delete API allows to delete a typed JSON document from a specific index based on its id. http://www.elasticsearch.org/guide/reference/api/delete.html

func DeleteByQuery

func DeleteByQuery(indices []string, types []string, args map[string]interface{}, query interface{}) (api.BaseResponse, error)

DeleteByQuery allows the caller to delete documents from one or more indices and one or more types based on a query. The query can either be provided using a simple query string as a parameter, or using the Query DSL defined within the request body. see: http://www.elasticsearch.org/guide/reference/api/delete-by-query.html

func Exists

func Exists(index string, _type string, id string, args map[string]interface{}) (bool, error)

Exists allows caller to check for the existance of a document using HEAD

func ExistsIndex

func ExistsIndex(index string, _type string, args map[string]interface{}) (bool, error)

ExistsIndex allows caller to check for the existance of an index or a type using HEAD

func Explain

func Explain(index string, _type string, id string, args map[string]interface{}, query string) (api.Match, error)

Explain computes a score explanation for a query and a specific document. This can give useful feedback whether a document matches or didn’t match a specific query. This feature is available from version 0.19.9 and up. see http://www.elasticsearch.org/guide/reference/api/explain.html

func Get

func Get(index string, _type string, id string, args map[string]interface{}) (api.BaseResponse, error)

The get API allows to get a typed JSON document from the index based on its id. GET - retrieves the doc HEAD - checks for existence of the doc http://www.elasticsearch.org/guide/reference/api/get.html TODO: make this implement an interface

func GetCustom

func GetCustom(index string, _type string, id string, args map[string]interface{}, source interface{}) (api.BaseResponse, error)

Same as Get but with custom source type.

func GetIndexUrl

func GetIndexUrl(index string, _type string, id string, parentId string, version int, op_type string,
	routing string, timestamp string, ttl int, percolate string, timeout string, refresh bool) (retval string, e error)

func GetSource

func GetSource(index string, _type string, id string, args map[string]interface{}, source interface{}) error

GetSource retrieves the document by id and converts it to provided interface

func Index

func Index(index string, _type string, id string, args map[string]interface{}, data interface{}) (api.BaseResponse, error)

Index adds or updates a typed JSON document in a specific index, making it searchable, creating an index if it did not exist. if id is omited, op_type 'create' will be passed and http method will default to "POST" _type is optional id is optional parentId is optional version is optional op_type is optional routing is optional timestamp is optional ttl is optional percolate is optional timeout is optional http://www.elasticsearch.org/guide/reference/api/index_.html

func IndexBulk

func IndexBulk(index string, _type string, id string, date *time.Time, data interface{}, refresh bool) error

The index bulk API adds or updates a typed JSON document to a specific index, making it searchable. it operates by buffering requests, and ocassionally flushing to elasticsearch

This uses the one Global Bulk Indexer, you can also create your own non-global indexers and use the Index functions of that

http://www.elasticsearch.org/guide/reference/api/bulk.html

func IndexBulkTtl

func IndexBulkTtl(index string, _type string, id, ttl string, date *time.Time, data interface{}, refresh bool) error

The index bulk API adds or updates a typed JSON document to a specific index, making it searchable. it operates by buffering requests, and ocassionally flushing to elasticsearch.

This uses the one Global Bulk Indexer, you can also create your own non-global indexers and use the IndexTtl functions of that

http://www.elasticsearch.org/guide/reference/api/bulk.html

func IndexWithParameters

func IndexWithParameters(index string, _type string, id string, parentId string, version int, op_type string,
	routing string, timestamp string, ttl int, percolate string, timeout string, refresh bool,
	args map[string]interface{}, data interface{}) (api.BaseResponse, error)

IndexWithParameters takes all the potential parameters available

func MoreLikeThis

func MoreLikeThis(index string, _type string, id string, args map[string]interface{}, query MoreLikeThisQuery) (api.BaseResponse, error)

MoreLikeThis allows the caller to get documents that are “like” a specified document. http://www.elasticsearch.org/guide/reference/api/more-like-this.html

func Percolate

func Percolate(index string, _type string, name string, args map[string]interface{}, doc string) (api.Match, error)

func RegisterPercolate

func RegisterPercolate(index string, name string, args map[string]interface{}, query api.Query) (api.BaseResponse, error)

RegisterPercolate allows the caller to register queries against an index, and then send percolate requests which include a doc, and getting back the queries that match on that doc out of the set of registered queries. Think of it as the reverse operation of indexing and then searching. Instead of sending docs, indexing them, and then running queries. One sends queries, registers them, and then sends docs and finds out which queries match that doc. see http://www.elasticsearch.org/guide/reference/api/percolate.html

func Update

func Update(index string, _type string, id string, args map[string]interface{}, data interface{}) (api.BaseResponse, error)

Update updates a document based on a script provided. The operation gets the document (collocated with the shard) from the index, runs the script (with optional script language and parameters), and index back the result (also allows to delete, or ignore the operation). It uses versioning to make sure no updates have happened during the “get” and “reindex”. (available from 0.19 onwards). Note, this operation still means full reindex of the document, it just removes some network roundtrips and reduces chances of version conflicts between the get and the index. The _source field need to be enabled for this feature to work.

http://www.elasticsearch.org/guide/reference/api/update.html TODO: finish this, it's fairly complex

func UpdateBulk

func UpdateBulk(index string, _type string, id string, date *time.Time, data interface{}, refresh bool) error

func UpdateBulkTtl

func UpdateBulkTtl(index string, _type string, id, ttl string, date *time.Time, data interface{}, refresh bool) error

func UpdateWithPartialDoc

func UpdateWithPartialDoc(index string, _type string, id string, args map[string]interface{}, doc interface{}, upsert bool) (api.BaseResponse, error)

UpdateWithPartialDoc updates a document based on partial document provided. The update API also support passing a partial document (since 0.20), which will be merged into the existing document (simple recursive merge, inner merging of objects, replacing core "keys/values" and arrays). If both doc and script is specified, then doc is ignored. Best is to put your field pairs of the partial document in the script itself.

http://www.elasticsearch.org/guide/reference/api/update.html

func UpdateWithScript

func UpdateWithScript(index string, _type string, id string, args map[string]interface{}, script string, params interface{}) (api.BaseResponse, error)

UpdateWithScript updates a document based on a script provided. The operation gets the document (collocated with the shard) from the index, runs the script (with optional script language and parameters), and index back the result (also allows to delete, or ignore the operation). It uses versioning to make sure no updates have happened during the "get" and "reindex". (available from 0.19 onwards).

Note, this operation still means full reindex of the document, it just removes some network roundtrips and reduces chances of version conflicts between the get and the index. The _source field need to be enabled for this feature to work. http://www.elasticsearch.org/guide/reference/api/update.html

func Validate

func Validate(index string, _type string, args map[string]interface{}) (api.BaseResponse, error)

Validate allows a user to validate a potentially expensive query without executing it. see http://www.elasticsearch.org/guide/reference/api/validate.html

func WriteBulkBytes

func WriteBulkBytes(op string, index string, _type string, id, ttl string, date *time.Time, data interface{}, refresh bool) ([]byte, error)

Given a set of arguments for index, type, id, data create a set of bytes that is formatted for bulkd index http://www.elasticsearch.org/guide/reference/api/bulk.html

Types

type BulkIndexer

type BulkIndexer struct {

	// We are creating a variable defining the func responsible for sending
	// to allow a mock sendor for test purposes
	BulkSender func(*bytes.Buffer) error
	// Deprecated, for backwards compatibility
	BulkSendor func(*bytes.Buffer) error

	// If we encounter an error in sending, we are going to retry for this long
	// before returning an error
	// if 0 it will not retry
	RetryForSeconds int

	// channel for getting errors
	ErrorChannel chan *ErrorBuffer

	// Buffer for Max number of time before forcing flush
	BufferDelayMax time.Duration
	// Max buffer size in bytes before flushing to elasticsearch
	BulkMaxBuffer int // 1048576
	// Max number of Docs to hold in buffer before forcing flush
	BulkMaxDocs int // 100
	// contains filtered or unexported fields
}

A bulk indexer creates goroutines, and channels for connecting and sending data to elasticsearch in bulk, using buffers.

Example (Errorchannel)

The simplest usage of background bulk indexing with error channel

package main

import (
	"fmt"
	"github.com/packetbeat/elastigo/core"
	"strconv"
)

func main() {
	indexer := core.NewBulkIndexerErrors(10, 60)
	done := make(chan bool)
	indexer.Run(done)

	go func() {
		for errBuf := range indexer.ErrorChannel {
			// just blissfully print errors forever
			fmt.Println(errBuf.Err)
		}
	}()
	for i := 0; i < 20; i++ {
		indexer.Index("twitter", "user", strconv.Itoa(i), "", nil, `{"name":"bob"}`, true)
	}
	done <- true
}
Output:

Example (Errorsmarter)

The simplest usage of background bulk indexing with error channel

package main

import (
	"fmt"
	"github.com/packetbeat/elastigo/core"
	"strconv"
	"time"
)

func main() {
	indexer := core.NewBulkIndexerErrors(10, 60)
	done := make(chan bool)
	indexer.Run(done)

	errorCt := 0 // use sync.atomic or something if you need
	timer := time.NewTicker(time.Minute * 3)
	go func() {
		for {
			select {
			case _ = <-timer.C:
				if errorCt < 2 {
					errorCt = 0
				}
			case _ = <-done:
				return
			}
		}
	}()

	go func() {
		for errBuf := range indexer.ErrorChannel {
			errorCt++
			fmt.Println(errBuf.Err)
			// log to disk?  db?   ????  Panic
		}
	}()
	for i := 0; i < 20; i++ {
		indexer.Index("twitter", "user", strconv.Itoa(i), "", nil, `{"name":"bob"}`, true)
	}
	done <- true // send shutdown signal
}
Output:

Example (Responses)

The inspecting the response

package main

import (
	"bytes"
	"fmt"
	"github.com/packetbeat/elastigo/api"
	"github.com/packetbeat/elastigo/core"
	"strconv"
)

func main() {
	indexer := core.NewBulkIndexer(10)
	// Create a custom Sender Func, to allow inspection of response/error
	indexer.BulkSender = func(buf *bytes.Buffer) error {
		// @buf is the buffer of docs about to be written
		respJson, err := api.DoCommand("POST", "/_bulk", nil, buf)
		if err != nil {
			// handle it better than this
			fmt.Println(string(respJson))
		}
		return err
	}
	done := make(chan bool)
	indexer.Run(done)

	for i := 0; i < 20; i++ {
		indexer.Index("twitter", "user", strconv.Itoa(i), "", nil, `{"name":"bob"}`, true)
	}
	done <- true // send shutdown signal
}
Output:

Example (Simple)

The simplest usage of background bulk indexing

package main

import (
	"github.com/packetbeat/elastigo/core"
)

func main() {
	indexer := core.NewBulkIndexerErrors(10, 60)
	done := make(chan bool)
	indexer.Run(done)

	indexer.Index("twitter", "user", "1", "", nil, `{"name":"bob"}`, true)

	<-done // wait forever
}
Output:

func NewBulkIndexer

func NewBulkIndexer(maxConns int) *BulkIndexer

func NewBulkIndexerErrors

func NewBulkIndexerErrors(maxConns, retrySeconds int) *BulkIndexer

A bulk indexer with more control over error handling

 @maxConns is the max number of in flight http requests
 @retrySeconds is # of seconds to wait before retrying falied requests

done := make(chan bool)
BulkIndexerGlobalRun(100, done)

func (*BulkIndexer) Flush

func (b *BulkIndexer) Flush()

Flush all current documents to ElasticSearch

func (*BulkIndexer) Index

func (b *BulkIndexer) Index(index string, _type string, id, ttl string, date *time.Time, data interface{}, refresh bool) error

The index bulk API adds or updates a typed JSON document to a specific index, making it searchable. it operates by buffering requests, and ocassionally flushing to elasticsearch http://www.elasticsearch.org/guide/reference/api/bulk.html

func (*BulkIndexer) PendingDocuments

func (b *BulkIndexer) PendingDocuments() int

func (*BulkIndexer) Run

func (b *BulkIndexer) Run(done chan bool)

Starts this bulk Indexer running, this Run opens a go routine so is Non blocking

func (*BulkIndexer) Update

func (b *BulkIndexer) Update(index string, _type string, id, ttl string, date *time.Time, data interface{}, refresh bool) error

type CountResponse

type CountResponse struct {
	Count int        `json:"count"`
	Shard api.Status `json:"_shards"`
}

func Count

func Count(index string, _type string, args map[string]interface{}) (CountResponse, error)

Count allows the caller to easily execute a query and get the number of matches for that query. It can be executed across one or more indices and across one or more types. The query can either be provided using a simple query string as a parameter, or using the Query DSL defined within the request body. http://www.elasticsearch.org/guide/reference/api/count.html

type DeleteByQueryResponse

type DeleteByQueryResponse struct {
	Status   bool                   `json:"ok"`
	Indicies map[string]IndexStatus `json:"_indices"`
}

type ErrorBuffer

type ErrorBuffer struct {
	Err error
	Buf *bytes.Buffer
}

type Explaination

type Explaination struct {
	Index string `json:"index"`
	Valid bool   `json:"valid"`
	Error string `json:"error"`
}

type Explanation

type Explanation struct {
	Value       float32        `json:"value"`
	Description string         `json:"description"`
	Details     []*Explanation `json:"details,omitempty"`
}

func (*Explanation) String

func (e *Explanation) String(indent string) string

type Float32Nullable

type Float32Nullable float32

Elasticsearch returns some invalid (according to go) json, with floats having...

json: cannot unmarshal null into Go value of type float32 (see last field.)

"hits":{"total":6808,"max_score":null,

"hits":[{"_index":"10user","_type":"user","_id":"751820","_score":null,

func (*Float32Nullable) UnmarshalJSON

func (i *Float32Nullable) UnmarshalJSON(data []byte) error

type Hit

type Hit struct {
	Index       string           `json:"_index"`
	Type        string           `json:"_type,omitempty"`
	Id          string           `json:"_id"`
	Score       Float32Nullable  `json:"_score,omitempty"` // Filters (no query) dont have score, so is null
	Source      *json.RawMessage `json:"_source"`          // marshalling left to consumer
	Fields      *json.RawMessage `json:"fields"`           // when a field arg is passed to ES, instead of _source it returns fields
	Explanation *Explanation     `json:"_explanation,omitempty"`
}

type Hits

type Hits struct {
	Total int `json:"total"`
	//	MaxScore float32 `json:"max_score"`
	Hits []Hit `json:"hits"`
}

func (*Hits) Len

func (h *Hits) Len() int

type IndexStatus

type IndexStatus struct {
	Shards api.Status `json:"_shards"`
}

type MGetRequest

type MGetRequest struct {
	Index  string   `json:"_index"`
	Type   string   `json:"_type"`
	ID     string   `json:"_id"`
	IDS    []string `json:"_ids,omitempty"`
	Fields []string `json:"fields,omitempty"`
}

type MGetRequestContainer

type MGetRequestContainer struct {
	Docs []MGetRequest `json:"docs"`
}

type MGetResponseContainer

type MGetResponseContainer struct {
	Docs []api.BaseResponse `json:"docs"`
}

func MGet

func MGet(index string, _type string, mgetRequest MGetRequestContainer, args map[string]interface{}) (MGetResponseContainer, error)

MGet allows the caller to get multiple documents based on an index, type (optional) and id (and possibly routing). The response includes a docs array with all the fetched documents, each element similar in structure to a document provided by the get API. see http://www.elasticsearch.org/guide/reference/api/multi-get.html

type MLT

type MLT struct {
	Fields              []string `json:"fields"`
	LikeText            string   `json:"like_text"`
	PercentTermsToMatch float32  `json:"percent_terms_to_match"`
	MinTermFrequency    int      `json:"min_term_freq"`
	MaxQueryTerms       int      `json:"max_query_terms"`
	StopWords           []string `json:"stop_words"`
	MinDocFrequency     int      `json:"min_doc_freq"`
	MaxDocFrequency     int      `json:"max_doc_freq"`
	MinWordLength       int      `json:"min_word_len"`
	MaxWordLength       int      `json:"max_word_len"`
	BoostTerms          int      `json:"boost_terms"`
	Boost               float32  `json:"boost"`
	Analyzer            string   `json:"analyzer"`
}

type MoreLikeThisQuery

type MoreLikeThisQuery struct {
	MoreLikeThis MLT `json:"more_like_this"`
}

type SearchResult

type SearchResult struct {
	Took         int             `json:"took"`
	TimedOut     bool            `json:"timed_out"`
	ShardStatus  api.Status      `json:"_shards"`
	Hits         Hits            `json:"hits"`
	Facets       json.RawMessage `json:"facets,omitempty"` // structure varies on query
	ScrollId     string          `json:"_scroll_id,omitempty"`
	Aggregations json.RawMessage `json:"aggregations,omitempty"` // structure varies on query
}

func Scroll

func Scroll(args map[string]interface{}, scroll_id string) (SearchResult, error)

func SearchRequest

func SearchRequest(index string, _type string, args map[string]interface{}, query interface{}) (SearchResult, error)

SearchRequest performs a very basic search on an index via the request URI API.

params:

@index:  the elasticsearch index
@_type:  optional ("" if not used) search specific type in this index
@args:   a map of URL parameters. Allows all the URI-request parameters allowed by ElasticSearch.
@query:  this can be one of 3 types:
           1)  string value that is valid elasticsearch
           2)  io.Reader that can be set in body (also valid elasticsearch string syntax..)
           3)  other type marshalable to json (also valid elasticsearch json)

out, err := SearchRequest(true, "github", map[string]interface{} {"from" : 10}, qryType)

http://www.elasticsearch.org/guide/reference/api/search/uri-request.html

func SearchUri

func SearchUri(index, _type string, args map[string]interface{}) (SearchResult, error)

SearchUri performs the simplest possible query in url string params:

@index:  the elasticsearch index
@_type:  optional ("" if not used) search specific type in this index
@args: a map of URL parameters. Most important one is q

out, err := SearchUri("github","", map[string]interface{} { "q" : `user:kimchy`})

produces a request like this: host:9200/github/_search?q=user:kimchy"

http://www.elasticsearch.org/guide/reference/api/search/uri-request.html

func (*SearchResult) String

func (s *SearchResult) String() string

type Validation

type Validation struct {
	Valid         bool           `json:"valid"`
	Shards        api.Status     `json:"_shards"`
	Explainations []Explaination `json:"explanations,omitempty"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL