es

package
v0.0.0-...-6f9dc6e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 5, 2023 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultClient       = "es-default-client"
	DefaultReadClient   = "es-default-read-client"
	DefaultWriteClient  = "es-default-write-client"
	DefaultVersionType  = "external"
	VersionTypeInternal = "internal"
	DefaultRefresh      = "false"
	RefreshWaitFor      = "wait_for"
	RefreshTrue         = "true"
	DefaultScriptLang   = "painless"
)
View Source
const DefaultPreference = "_local"
View Source
const (
	SimpleClient = "simple-es-client"
)

Variables

View Source
var EStdLogger stdLogger

Functions

func CloseAll

func CloseAll()

func InitClient

func InitClient(clientName string, urls []string, username string, password string) error

func InitClientWithOptions

func InitClientWithOptions(clientName string, urls []string, username string, password string, options ...Option) error

func InitSimpleClient

func InitSimpleClient(urls []string, username, password string) error

Types

type Bulk

type Bulk struct {
	Name          string
	Workers       int
	FlushInterval time.Duration
	ActionSize    int //每批提交的文档数
	RequestSize   int //每批提交的文档大小
	AfterFunc     elastic.BulkAfterFunc
	Ctx           context.Context
}

func DefaultBulk

func DefaultBulk() *Bulk

type BulkCreateDoc

type BulkCreateDoc struct {
	BulkDoc
	Doc interface{}
}

type BulkDoc

type BulkDoc struct {
	ID          string
	Routing     string
	Version     int64
	VersionType string
}

type BulkUpdateDoc

type BulkUpdateDoc struct {
	BulkDoc
	// contains filtered or unexported fields
}

type BulkUpsertDoc

type BulkUpsertDoc struct {
	BulkCreateDoc
	// contains filtered or unexported fields
}

type Client

type Client struct {
	Name           string
	Urls           []string
	QueryLogEnable bool
	Username       string

	Bulk          *Bulk
	Client        *elastic.Client
	BulkProcessor *elastic.BulkProcessor
	DebugMode     bool
	//本地缓存已经创建的索引,用于加速索引是否存在的判断
	CachedIndices sync.Map
	// contains filtered or unexported fields
}

func GetClient

func GetClient(name string) *Client

func GetSimpleClient

func GetSimpleClient() *Client

func (*Client) AddIndexCache

func (c *Client) AddIndexCache(indexName ...string)

func (*Client) BulkCreate

func (c *Client) BulkCreate(indexName, id, routing string, doc interface{})

批量的方式新建文档,后台提交

func (*Client) BulkCreateDocs

func (c *Client) BulkCreateDocs(ctx context.Context, indexName string, docs []*BulkCreateDoc) (*elastic.BulkResponse, error)

批量的方式新建一批文档,实时提交

func (*Client) BulkCreateWithVersion

func (c *Client) BulkCreateWithVersion(ctx context.Context, indexName, id, routing string, version int64, doc interface{})

业务数据修改时生成版本号,在更新ES文档数据时,根据这个版本号来避免重复和乱序处理,实现操作的幂等性 PUT twitter/_doc/1?version=2&version_type=external { "message" : "elasticsearch now has versioning support, double cool!" } 1.版本管理是完全实时的,不受搜索操作的近实时方面的影响 2.如果当前提供的版本号比实际的版本号大(或者实际文档不存在)就会成功写入,并设置为当前的版本号,否则返回失败(409状态码)

func (*Client) BulkDelete

func (c *Client) BulkDelete(indexName, id, routing string)

func (*Client) BulkDeleteWithVersion

func (c *Client) BulkDeleteWithVersion(indexName, id, routing string, version int64)

func (*Client) BulkReplace

func (c *Client) BulkReplace(indexName, id, routing string, doc interface{})

func (*Client) BulkReplaceDocs

func (c *Client) BulkReplaceDocs(ctx context.Context, indexName string, docs []*BulkCreateDoc) (*elastic.BulkResponse, error)

func (*Client) BulkUpdate

func (c *Client) BulkUpdate(indexName, id, routing string, update map[string]interface{})

func (*Client) BulkUpdateDocs

func (c *Client) BulkUpdateDocs(ctx context.Context, index string, updates []*BulkUpdateDoc) (*elastic.BulkResponse, error)

func (*Client) BulkUpsert

func (c *Client) BulkUpsert(indexName, id, routing string, update map[string]interface{}, doc interface{})

异步upsert

func (*Client) BulkUpsertDocs

func (c *Client) BulkUpsertDocs(ctx context.Context, index string, docs []*BulkUpsertDoc) (*elastic.BulkResponse, error)

UpsertBulk 批量upsert

func (*Client) Close

func (c *Client) Close() error

func (*Client) Create

func (c *Client) Create(ctx context.Context, indexName, id, routing string, doc interface{}) error

新建文档

func (*Client) CreateIndex

func (c *Client) CreateIndex(ctx context.Context, indexName, bodyJson string, forceCheck bool) error

创建索引

func (*Client) Delete

func (c *Client) Delete(ctx context.Context, indexName, id, routing string) error

func (*Client) DeleteByQuery

func (c *Client) DeleteByQuery(ctx context.Context, indexName, id, routing string, query elastic.Query) error

删除过程中如果命中的文档被修改会导致删除出现版本冲突,为了彻底删除文档,需要指定ProceedOnVersionConflict

func (*Client) DeleteIndexCache

func (c *Client) DeleteIndexCache(indexName ...string)

func (*Client) DeleteRefresh

func (c *Client) DeleteRefresh(ctx context.Context, indexName, id, routing string) error

func (*Client) DeleteWithVersion

func (c *Client) DeleteWithVersion(ctx context.Context, indexName, id, routing string, version int64) error

func (*Client) Get

func (c *Client) Get(ctx context.Context, indexName, id, routing string) (*elastic.GetResult, error)

func (*Client) IndexExists

func (c *Client) IndexExists(ctx context.Context, indexName string, forceCheck bool) (bool, error)

检查索引是否存在

func (*Client) Mget

func (c *Client) Mget(ctx context.Context, mgetItems []Mget) (*elastic.MgetResponse, error)

func (*Client) Query

func (c *Client) Query(ctx context.Context, indexName string, routings []string, query elastic.Query, from, size int, options ...QueryOption) (*elastic.SearchResult, error)

func (*Client) ScrollQuery

func (c *Client) ScrollQuery(ctx context.Context, index []string, typeStr string, query elastic.Query, size int, routings []string, callback func(res *elastic.SearchResult, err error), options ...QueryOption)

func (*Client) Update

func (c *Client) Update(ctx context.Context, indexName, id, routing string, update map[string]interface{}) error

func (*Client) UpdateByQuery

func (c *Client) UpdateByQuery(ctx context.Context, indexName string, routings []string, query elastic.Query, script string, scriptParams map[string]interface{}) (*elastic.BulkIndexByScrollResponse, error)

func (*Client) UpdateRefresh

func (c *Client) UpdateRefresh(ctx context.Context, indexName, id, routing string, update map[string]interface{}) error

func (*Client) Upsert

func (c *Client) Upsert(ctx context.Context, indexName, id, routing string, update map[string]interface{}, doc interface{}) error

Upsert 不存在则插入

func (*Client) UpsertWithVersion

func (c *Client) UpsertWithVersion(ctx context.Context, indexName, id, routing string, doc interface{}, version int64) error

文档结构必须完整

type Mget

type Mget struct {
	Index   string
	ID      string
	Routing string
}

type Option

type Option func(*option)

func WithBulk

func WithBulk(bulk *Bulk) Option

func WithQueryLogEnable

func WithQueryLogEnable(enable bool) Option

func WithScheme

func WithScheme(scheme string) Option

func WithSlowQueryLogMillisecond

func WithSlowQueryLogMillisecond(slowQueryMillisecond int64) Option

type QueryOption

type QueryOption func(queryOption *queryOption)

func WithEnableDSL

func WithEnableDSL(enableDSL bool) QueryOption

func WithExcludeFields

func WithExcludeFields(excludeFields []string) QueryOption

func WithFetchSource

func WithFetchSource(fetchSource bool) QueryOption

func WithHighlight

func WithHighlight(highlight *elastic.Highlight) QueryOption

func WithIncludeFields

func WithIncludeFields(includeFields []string) QueryOption

func WithOrders

func WithOrders(orders []map[string]bool) QueryOption

func WithPreference

func WithPreference(preference string) QueryOption

func WithProfile

func WithProfile(profile bool) QueryOption

func WithSlowQueryMillisecond

func WithSlowQueryMillisecond(slowQueryLogMillisecond int64) QueryOption

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL