elasticsearch

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 18, 2021 License: BSD-3-Clause Imports: 15 Imported by: 3

README

Elasticsearch adaptor

The elasticsearch adaptor sends data to defined endpoints. List of supported versions is below.

Version Note
1.X This version does not support bulk operations and will thus be much slower.
2.X Will only receive bug fixes, please consider upgrading.
5.X Most recent and supported version.

IMPORTANT

If you want to keep the source _id as the elasticsearch document _id, transporter will automatically do this. If you wish to use the auto-generated _id field for elasticsearch but would like to retain the originating _id from the source, you'll need to include a transform function similar to the following (assumes MongoDB source):

module.exports = function(msg) {
   msg.data["mongo_id"] = msg.data._id['$oid']
   msg.data = _.omit(msg.data, ["_id"]);
   return msg;
}

NOTES

By using the elasticsearch auto-generated _id, it is not currently possible for transporter to process update/delete operations. Future work is planned in #39 to address this problem.

If no INDEX_NAME is provided, transporter will configure a default index named test.

Configuration:
es = elasticsearch({
  "uri": "https://username:password@hostname:port/INDEX_NAME"
  "timeout": "10s" // optional, defaults to 30s
  "aws_access_key": "XXX" // optional, used for signing requests to AWS Elasticsearch service
  "aws_access_secret": "XXX" // optional, used for signing requests to AWS Elasticsearch service
  "parent_id": "elastic_parent" // optional, used for specifying parent-child relationships
})

NOTES

When writing to Elasticsearch with larger documents or any complex inserts that requires longer Elasticsearch timeouts, increase your write timeouts with Config({"write_timeout":"30s"}) in addition to adapter level configuration to prevent concurrency issues.

t.Config({"write_timeout":"30s"}).Source("source", source).Save("sink", sink)

Addressing #391

Parent-Child Relationships

Note Only Elasticsearch 5.x is being supported at the moment.

If you have parent-child relationships in your data, specify parent_id in the configs.

Be sure to add your parent-child mapping and make sure that your elasticsearch _id in your parent corresponds with the parent_id that you specified in your configs.

Check that after you add your parent-child mapping, that data is getting inserted properly.

Update

About Routing:

Elasticsearch recommends that routing values default to _id if there are no parent values specified. However, if a parent is specified, the parent and child should use the same routing value which is the parent _id. In our case, we will be using parent_id as our routing value to that it ensures both parent and child documents are on the same shard.

https://www.elastic.co/guide/en/elasticsearch/guide/current/indexing-parent-child.html

Example of Parent-Child Mapping:

This step is manual, you must set your mapping manually using a PUT request to Elasticsearch.

PUT /<your index name>

{
   "mappings":{
      "company":{},
      "employee":{
         "_parent":{
            "type":"company"
         }
      }
   }
}
Then your transporter configs will be something like:
es = elasticsearch({
  "uri": "https://username:password@hostname:port/INDEX_NAME"
  "timeout": "10s"
  "aws_access_key": "XXX"
  "aws_access_secret": "XXX"
  "parent_id": "company_id"
})
Sample Data for the config above to insert:

In this sample dataset, a company has many employees.

Note: company_id is the parent reference, below .

Company
{"_id": "9g2g", "name": "gingerbreadhouse"}
Employee
{"_id": "9g2g", "name": "hansel", "company_id": "gingerbreadhouse"}
{"_id": "9g4g", "name": "gretel", "company_id": "gingerbreadhouse"}
{"_id": "9g6g", "name": "witch", "company_id": "gingerbreadhouse"}

Caution: If you try to insert / update data without the mapping step, the inserts will fail. Run transporter with the debug flag to see errors.

transporter run -log.level=debug

Documentation

Index

Constants

View Source
const (
	// DefaultIndex is used when there is not one included in the provided URI.
	DefaultIndex = "test"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type AWSTransport

type AWSTransport struct {
	Credentials awsauth.Credentials
	// contains filtered or unexported fields
}

AWSTransport handles wrapping requests to AWS Elasticsearch service

func (AWSTransport) RoundTrip

func (a AWSTransport) RoundTrip(req *http.Request) (*http.Response, error)

RoundTrip implementation

type Elasticsearch

type Elasticsearch struct {
	adaptor.BaseConfig
	AWSAccessKeyID  string `json:"aws_access_key" doc:"credentials for use with AWS Elasticsearch service"`
	AWSAccessSecret string `json:"aws_access_secret" doc:"credentials for use with AWS Elasticsearch service"`
	ParentID        string `json:"parent_id"`
}

Elasticsearch is an adaptor to connect a pipeline to an elasticsearch cluster.

func (*Elasticsearch) Client added in v0.3.0

func (e *Elasticsearch) Client() (client.Client, error)

Client returns a client that doesn't do anything other than fulfill the client.Client interface.

func (*Elasticsearch) Description

func (e *Elasticsearch) Description() string

Description for the Elasticsearcb adaptor

func (*Elasticsearch) Reader added in v0.3.0

func (e *Elasticsearch) Reader() (client.Reader, error)

Reader returns an error because this adaptor is currently not supported as a Source.

func (*Elasticsearch) SampleConfig

func (e *Elasticsearch) SampleConfig() string

SampleConfig for elasticsearch adaptor

func (*Elasticsearch) Writer added in v0.3.0

func (e *Elasticsearch) Writer(done chan struct{}, wg *sync.WaitGroup) (client.Writer, error)

Writer determines the which underlying writer to used based on the cluster's version.

Directories

Path Synopsis
all
v1
v2
v5

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL