elasticsearch

package
v0.0.0-...-a6858fa Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 8, 2024 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const (
	WRITE_COLUMNS = "write_columns"
)

Variables

This section is empty.

Functions

func ES_Version

func ES_Version(client *elastic.Client, conf *config.JSON) string

func ES_close

func ES_close()

func ES_init

func ES_init(conf *config.JSON) *elastic.Client

func ExecuteWithRetry

func ExecuteWithRetry(operation func(*elastic.BulkService, context.Context) (bool, error), bulkRequest *elastic.BulkService, ctx context.Context, maxRetries int, retryInterval time.Duration) (success bool, err error)

重试函数,接受一个操作函数、最大重试次数以及重试间隔

func GenBody

func GenBody(settings, mappings string, dynamic bool) string

func GenMappings

func GenMappings(dstDynamic, typeName string, isGreaterOrEqualThan7 bool, conf *config.JSON) string

func GetAlias

func GetAlias(conf *config.JSON) string

func GetBatchSize

func GetBatchSize(conf *config.JSON) int64

func GetDeleteBy

func GetDeleteBy(conf *config.JSON) string

func GetDiscoveryFilter

func GetDiscoveryFilter(conf *config.JSON) string

func GetDstDynamic

func GetDstDynamic(conf *config.JSON) string

func GetDynamic

func GetDynamic(conf *config.JSON) bool

func GetESVersion

func GetESVersion(conf *config.JSON) int64

func GetEndpoint

func GetEndpoint(conf *config.JSON) string

func GetFieldDelimiter

func GetFieldDelimiter(conf *config.JSON) string

func GetIncludeSettings

func GetIncludeSettings(conf *config.JSON) []string

func GetIndexName

func GetIndexName(conf *config.JSON) string

func GetMasterTimeout

func GetMasterTimeout(conf *config.JSON) string

func GetPassword

func GetPassword(conf *config.JSON) string

func GetRetryTimes

func GetRetryTimes(conf *config.JSON) int64

func GetSettings

func GetSettings(conf *config.JSON) map[string]*encoding.JSON

func GetSleepTimeInMilliSecond

func GetSleepTimeInMilliSecond(conf *config.JSON) int64

func GetSplitter

func GetSplitter(conf *config.JSON) string

func GetTimeout

func GetTimeout(conf *config.JSON) int64

func GetTryInterval

func GetTryInterval(conf *config.JSON) int64

func GetTrySize

func GetTrySize(conf *config.JSON) int64

func GetTypeName

func GetTypeName(conf *config.JSON) string

func GetUnifiedVersion

func GetUnifiedVersion(conf *config.JSON) int64

func GetUrlParams

func GetUrlParams(conf *config.JSON) map[string]interface{}

func GetUsername

func GetUsername(conf *config.JSON) string

func GetVersioning

func GetVersioning(conf *config.JSON) bool

func HasID

func HasID(conf *config.JSON) bool

func HasPrimaryKeyInfo

func HasPrimaryKeyInfo(conf *config.JSON) bool

func IsCompression

func IsCompression(conf *config.JSON) bool

func IsDiscovery

func IsDiscovery(conf *config.JSON) bool

func IsEnableNullUpdate

func IsEnableNullUpdate(conf *config.JSON) bool

func IsGreaterOrEqualThan7

func IsGreaterOrEqualThan7(conf *config.JSON, client *elastic.Client) bool

func IsHasId

func IsHasId(conf *config.JSON) bool

func IsHighSpeedMode

func IsHighSpeedMode(conf *config.JSON) bool

func IsIgnoreParseError

func IsIgnoreParseError(conf *config.JSON) bool

func IsIgnoreWriteError

func IsIgnoreWriteError(conf *config.JSON) bool

func IsMultiThread

func IsMultiThread(conf *config.JSON) bool

func IsNeedCleanAlias

func IsNeedCleanAlias(conf *config.JSON) bool

func IsTruncate

func IsTruncate(conf *config.JSON) bool

func NewWriterFromString

func NewWriterFromString(plugin string) (wr writer.Writer, err error)

NewWriterFromString create writer

func ParseDeleteCondition

func ParseDeleteCondition(conf *config.JSON) []map[string]interface{}

func RegistPlugin

func RegistPlugin()

Types

type ActionType

type ActionType int

定义ActionType类型

const (
	UNKNOW ActionType = iota
	INDEX
	CREATE
	DELETE
	UPDATE
)

定义ActionType常量

func GetActionType

func GetActionType(conf *config.JSON) ActionType

func (ActionType) String

func (at ActionType) String() string

String方法返回ActionType的字符串表示

type Body

type Body struct {
	Settings interface{} `json:"settings"`
	Mappings interface{} `json:"mappings"`
}

type ElasticSearchFieldType

type ElasticSearchFieldType int

ElasticSearchFieldType 枚举类型

const (
	ID ElasticSearchFieldType = iota
	PARENT
	ROUTING
	VERSION
	STRING
	TEXT
	KEYWORD
	LONG
	INTEGER
	SHORT
	BYTE
	DOUBLE
	FLOAT
	DATE
	BOOLEAN
	BINARY
	INTEGER_RANGE
	FLOAT_RANGE
	LONG_RANGE
	DOUBLE_RANGE
	DATE_RANGE
	GEO_POINT
	GEO_SHAPE
	IP
	IP_RANGE
	COMPLETION
	TOKEN_COUNT
	OBJECT
	NESTED
)

func GetESFieldType

func GetESFieldType(typeStr string) ElasticSearchFieldType

getESFieldType 根据传入的字符串类型返回对应的ElasticSearchFieldType

func (ElasticSearchFieldType) String

func (f ElasticSearchFieldType) String() string

String 方法用于返回ElasticSearchFieldType的字符串表示

type EsColumn

type EsColumn struct {
	Name                        string
	Type                        string
	Timezone                    string
	Format                      string
	DstFormat                   string
	Array                       bool
	DstArray                    bool
	JsonArray                   bool
	Origin                      bool
	CombineFields               []string
	CombineFieldsValueSeparator string
}

func DefaultColumn

func DefaultColumn() *EsColumn

func GetColumnList

func GetColumnList(conf *config.JSON) []EsColumn

func GetWriteColumns

func GetWriteColumns(conf *config.JSON) []EsColumn

func GetcombinedIdColumn

func GetcombinedIdColumn(columnList []EsColumn, typeList *[]string) *EsColumn

type Job

type Job struct {
	*plugin.BaseJob
	RetryTimes             int64
	SleepTimeInMilliSecond int64
	// contains filtered or unexported fields
}

Job

func (*Job) Destroy

func (j *Job) Destroy(ctx context.Context) (err error)

func (*Job) Init

func (j *Job) Init(ctx context.Context) (err error)

func (*Job) Post

func (j *Job) Post(ctx context.Context) (err error)

func (*Job) Prepare

func (j *Job) Prepare(ctx context.Context) (err error)

func (*Job) Split

func (j *Job) Split(ctx context.Context, number int) (confs []*config.JSON, err error)

type Maker

type Maker struct{}

func (*Maker) Default

func (m *Maker) Default() (writer.Writer, error)

type PartitionColumn

type PartitionColumn struct {
	Name     string
	MetaType string
	Comment  string
	Type     string
}

func GetEsPartitionColumn

func GetEsPartitionColumn(conf *config.JSON) []PartitionColumn

type PrimaryKeyInfo

type PrimaryKeyInfo struct {
	Type           string
	FieldDelimiter string
	Column         []string
}

func GetPrimaryKeyInfo

func GetPrimaryKeyInfo(conf *config.JSON) *PrimaryKeyInfo

type Task

type Task struct {
	*writer.BaseTask

	IndexName             string
	TypeName              string
	BatchSize             int64
	EnableRedundantColumn bool
	ColumnList            []EsColumn
	CombinedIdColumn      *EsColumn
	Splitter              string
	TypeList              []string
	PrimaryKeyInfo        *PrimaryKeyInfo
	ColNameToIndexMap     map[string]int
	EsPartitionColumn     []PartitionColumn
	DeleteByConditions    []map[string]interface{}
	Client                *elastic.Client
	ActionType            string
	EnableWriteNull       bool
	IsGreaterOrEqualThan7 bool

	RetryTimes             int64
	SleepTimeInMilliSecond int64
	UrlParams              map[string]interface{}
	FieldDelimiter         string
	// contains filtered or unexported fields
}

Task

func (*Task) Destroy

func (w *Task) Destroy(ctx context.Context) error

func (*Task) DoBatchInsert

func (t *Task) DoBatchInsert(writerBuffer []element.Record) error

func (*Task) Init

func (t *Task) Init(ctx context.Context) (err error)

func (*Task) IsDeleteRecord

func (t *Task) IsDeleteRecord(record element.Record) bool

func (*Task) StartWrite

func (t *Task) StartWrite(ctx context.Context, receiver plugin.RecordReceiver) (err error)

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

Writer Writer

func (*Writer) Job

func (w *Writer) Job() spiwriter.Job

Job Job

func (*Writer) ResourcesConfig

func (w *Writer) ResourcesConfig() *config.JSON

ResourcesConfig Plugin Resource Configuration

func (*Writer) Task

func (w *Writer) Task() spiwriter.Task

Task Task

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL