elastic

package
v2.36.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 10, 2022 License: MIT Imports: 10 Imported by: 0

README

This package provides helper for common functionality using elastic search, such as search, index (upsert), remove document, remove index and bulk index. You expected to embed this elastic search client to your client and wrap the elastic search common functionality with your specific needs.

Embedding Client

You can just embed the client and create wrapper function like this:

package myesclient

import ( 
    "github.com/kitabisa/perkakas/elastic"
    es "github.com/olivere/elastic/v7"
    "context"
    "fmt"
)

type MyClient struct {
    *elastic.ElasticClient
}

func NewMyClient(url string) *MyClient{
	c, err := elastic.NewClient(url)
	if err != nil {
		fmt.Println(err)
	}

	return &MyClient {
		ElasticClient: c,
	}
}

func (m *MyClient) createExampleTemplate() *elastic.DynamicTemplate {
    // campaigner dynamic template here
	mappings := elastic.NewMappings().
		AddDynamicTemplate("id_field", MatchConditions{
			Match:            "id",
			MatchMappingType: "string",
			Mapping: MatchMapping{
				Type: "text",
			},
		}).
		AddDynamicTemplate("name_field", MatchConditions{
			Match:            "name",
			MatchMappingType: "string",
			Mapping: MatchMapping{
				Type: "text",
				Fields: map[string]Field{
					"keyword": Field{
						Type:        "keyword",
						IgnoreAbove: 256,
					},
				},
			},
		}).
		AddDynamicTemplate("age", MatchConditions{
			Match:            "age",
			MatchMappingType: "double",
			Mapping: MatchMapping{
				Type: "double",
			},
		})

	return &elastic.DynamicTemplate{
		Settings: map[string]interface{}{
			"index.refresh_interval": "5s",
		},
		Mappings: mappings,
	}
}

// This will shadow Store function from perkakas
func (m *MyClient) Store(ctx context.Context, name string, doc interface{}) (res *es.IndexResponse, err error) {
    template := m.createExampleTemplate()
    return m.Client.Store(ctx context.Context, name string, doc interface{}, template)
}

// other wrap function here...
client, err := NewClient("http://localhost:9200")
if err != nil {
    fmt.Println(err)
}
ctx := context.Background()
searchOption := SearchOption{
    Query: es.NewWildcardQuery("id", "*1234*"),
    Sort:  nil,
    Size:  10, // if you not set the size, it will be default to 10
}

result, err := client.Search(ctx, "campaign-test-111", searchOption)
if err != nil {
    // handle error
}

Search result is []byte with json format The query can be any query model for elastic search. For reference see here: https://github.com/olivere/elastic/wiki/Search

Index Document (Upsert)

person := Person{
    ID: "person-123",
    Name: "Mukidi",
    Age: 45,
}

ctx := context.Background()
mapping := elasticCreateIndex()

result, err := client.Store(ctx, "person-test-123", person, mapping)
if err != nil {
    // handle error here...
}

Index id is string and can have any value, but better if you give prefix for avoiding collision in the id

Remove Document

ctx := context.Background()
_, err := client.Remove(ctx, "person-test-123", "1234")
if err != nil {
    // handle error here...
}

Remove Index

ctx := context.Background()
_, err := client.RemoveIndex(ctx, "index1", "index2")
if err != nil {
    t.Log(err)
    t.FailNow()
}

Bulk Index Document (Upsert)

ctx := context.Background()
docs := []interface{}{
    Person {
        ID:   "abcde",
        Name: "Budi",
        Age: 17,
    },
    Person {
        ID:   "fghij",
        Name: "Badu",
        Age: 50,
    },
}

template := elasticCreateIndex()
err := client.BulkStore(ctx, "person-list", "default", docs, template)
if err != nil {
    // handle error here
}

As you can see when client.BulkStore() called, it require a bulk processor. You can ignore it by give an empty string argument, and it will use default bulk processor. You can also create your own bulk processor and add it to the client:

bulkProcessor := BulkProcessor{
    Name:          "my-bulk-processor", // name of the bulk processor
    Workers:       10, // how many workers
    BulkActions:   100, // flush when reach 100 bulk requests
    BulkSize:      2 << 20, // flush when reach bulk request size 2 MB
    FlushInterval: 1 * time.Second, // flush every 1 second
}

err := client.AddBulkProcessor(bulkProcessor)

Or if you have add bulk processor earlier, you can get the bulk processor with client.GetBulkProcessor("name"). It will returns the bulk processor and error if cannot find the specified bulk processor name.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BulkProcessor

type BulkProcessor struct {
	Name          string
	Workers       int
	BulkActions   int
	BulkSize      int
	FlushInterval time.Duration
}

type Client

type Client struct {
	Config ClientConfig
	// contains filtered or unexported fields
}

func (*Client) AddBulkProcessor

func (c *Client) AddBulkProcessor(bulkProcessor BulkProcessor) (err error)

func (*Client) BulkStore

func (c *Client) BulkStore(ctx context.Context, indexName string, processorName string, docs []interface{}, template *DynamicTemplate) (err error)

BulkStore bulk index(upsert) document to elastic

func (*Client) GetBulkProcessor

func (c *Client) GetBulkProcessor(name string) (processor *es.BulkProcessor, err error)

func (*Client) Ping

func (c *Client) Ping(ctx context.Context, nodeURL string) (*es.PingResult, int, error)

func (*Client) Remove

func (c *Client) Remove(ctx context.Context, indexName string, id string) (res *es.DeleteResponse, err error)

Delete delete document to elastic

func (*Client) RemoveIndex

func (c *Client) RemoveIndex(ctx context.Context, indexName ...string) (res *es.IndicesDeleteResponse, err error)

Delete delete document to elastic

func (*Client) Search

func (c *Client) Search(ctx context.Context, indexName string, option SearchOption) (result []byte, err error)

func (*Client) Store

func (c *Client) Store(ctx context.Context, name string, doc interface{}, template *DynamicTemplate) (res *es.IndexResponse, err error)

Index index(upsert) document to elastic

type ClientConfig

type ClientConfig struct {
	BulkProcessors map[string]*es.BulkProcessor
}

type DynamicTemplate

type DynamicTemplate struct {
	Settings map[string]interface{} `json:"settings,omitempty"`
	Mappings *Mappings              `json:"mappings,omitempty"`
}

type ElasticBasicActions

type ElasticBasicActions interface {
	Search(ctx context.Context, indexName string, option SearchOption) (result []byte, err error)
	Store(ctx context.Context, name string, doc interface{}, template *DynamicTemplate) (res *es.IndexResponse, err error)
	Remove(ctx context.Context, indexName string, id string) (res *es.DeleteResponse, err error)
	RemoveIndex(ctx context.Context, indexName ...string) (res *es.IndicesDeleteResponse, err error)
	Ping(ctx context.Context, nodeURL string) (*es.PingResult, int, error)
}

type ElasticBulkActions

type ElasticBulkActions interface {
	AddBulkProcessor(bulkProcessor BulkProcessor) (err error)
	BulkStore(ctx context.Context, indexName string, processorName string, docs []interface{}, template *DynamicTemplate) (err error)
}

type ElasticClient

type ElasticClient interface {
	ElasticBasicActions
	ElasticBulkActions
}

func NewClient

func NewClient(urls ...string) (c ElasticClient, err error)

type ElasticRetrier

type ElasticRetrier struct {
	// contains filtered or unexported fields
}

func NewElasticRetrier

func NewElasticRetrier(t time.Duration, f func(err error)) *ElasticRetrier

func (*ElasticRetrier) Retry

func (r *ElasticRetrier) Retry(ctx context.Context, retry int, req *http.Request, resp *http.Response, err error) (time.Duration, bool, error)

type Field

type Field struct {
	Type        string `json:"type"`
	IgnoreAbove int    `json:"ignore_above,omitempty"`
}

type Mappings

type Mappings struct {
	DynamicTemplates []map[string]MatchConditions `json:"dynamic_templates"`
	Properties       map[string]Property          `json:"properties"`
}

func NewMappings

func NewMappings() *Mappings

NewESMappings

func (*Mappings) AddDynamicTemplate

func (es *Mappings) AddDynamicTemplate(name string, matchCondition MatchConditions) *Mappings

AddDynamicTemplate

func (*Mappings) AddPropertyWithType

func (es *Mappings) AddPropertyWithType(name, typ string) *Mappings

AddPropertyWithType

type MatchConditions

type MatchConditions struct {
	Match            string       `json:"match,omitempty"`
	UnMatch          string       `json:"unmatch,omitempty"`
	PathMatch        string       `json:"path_match,omitempty"`
	MatchMappingType string       `json:"match_mapping_type,omitempty"`
	Mapping          MatchMapping `json:"mapping,omitempty"`
}

ESDynamicTemplate https://www.elastic.co/guide/en/elasticsearch/reference/current/dynamic-templates.html

type MatchMapping

type MatchMapping struct {
	Type            string           `json:"type"`
	Norms           bool             `json:"norms"`
	Fields          map[string]Field `json:"fields,omitempty"`
	IncludeInParent bool             `json:"include_in_parent,omitempty"`
}

type Property

type Property struct {
	Type string `json:"type"`
}

type SearchOption

type SearchOption struct {
	Query es.Query
	Sort  map[string]bool
	From  int
	Size  int
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL