elastic

package
v0.0.0-...-a101f2f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2024 License: AGPL-3.0 Imports: 27 Imported by: 0

Documentation

Index

Constants

View Source
const ClusterLevel = "cluster"
View Source
const IndexLevel = "index"
View Source
const NodeLevel = "node"
View Source
const ShardLevel = "shard"

Variables

View Source
var JSON_CONTENT_TYPE = "application/json"

Functions

func New

func New(c *config.Config) (pipeline.Filter, error)

func NewBulkRequestResort

func NewBulkRequestResort(c *config.Config) (pipeline.Filter, error)

func NewBulkReshuffle

func NewBulkReshuffle(c *config.Config) (pipeline.Filter, error)

func NewBulkResponseValidate

func NewBulkResponseValidate(c *config.Config) (pipeline.Filter, error)

func NewElasticsearchBulkRequestMutateFilter

func NewElasticsearchBulkRequestMutateFilter(c *config.Config) (pipeline.Filter, error)

func NewElasticsearchRequestReshuffleFilter

func NewElasticsearchRequestReshuffleFilter(c *config.Config) (pipeline.Filter, error)

func NewHashModFilter

func NewHashModFilter(c *config.Config) (pipeline.Filter, error)

func ParseURLMeta

func ParseURLMeta(pathStr string) (valid bool, urlLevelIndex, urlLevelType, id string)

func SortDocumentsByTime

func SortDocumentsByTime(docs []elastic.VersionInfo)

func SortDocumentsByVersion

func SortDocumentsByVersion(docs []elastic.VersionInfo)

SortDocumentsByVersion 按照文档版本进行排序

Types

type AutoGenerateDocID

type AutoGenerateDocID struct {
	Prefix string `config:"prefix" `
}

func (*AutoGenerateDocID) Filter

func (filter *AutoGenerateDocID) Filter(ctx *fasthttp.RequestCtx)

func (*AutoGenerateDocID) Name

func (filter *AutoGenerateDocID) Name() string

type BulkRequestResort

type BulkRequestResort struct {
	BatchSizeInDocs int `config:"batch_size_in_docs"` //batch size for each bulk request
	BatchSizeInMB   int `config:"batch_size_in_mb"`

	MinBufferSize           int    `config:"min_buffer_size"`
	MaxBufferSize           int    `config:"max_buffer_size"`
	MinDocPaddingForOneDocs int    `config:"min_doc_versions_for_one_doc"`
	IdleTimeoutInSeconds    string `config:"idle_timeout_in_seconds"`

	TagOnComplete string `config:"tag_on_complete"` //add tag to parent context when all documents are processed

	Elasticsearch string       `config:"elasticsearch"`
	PartitionSize int          `config:"partition_size"`
	OutputQueue   Queue        `config:"output_queue"`
	CommitConfig  CommitConfig `config:"commit_offset"`
	// contains filtered or unexported fields
}

func (*BulkRequestResort) Filter

func (filter *BulkRequestResort) Filter(ctx *fasthttp.RequestCtx)

func (*BulkRequestResort) Name

func (filter *BulkRequestResort) Name() string

func (*BulkRequestResort) NewDocumentBuffer

func (filter *BulkRequestResort) NewDocumentBuffer(partitionID int, queueName string, minBufferSize, maxBufferSize int, idleTimeout time.Duration, minDocPaddingSize int) *DocumentBuffer

NewDocumentBuffer 创建一个新的文档缓冲区

type BulkReshuffle

type BulkReshuffle struct {
	// contains filtered or unexported fields
}

func (*BulkReshuffle) Filter

func (this *BulkReshuffle) Filter(ctx *fasthttp.RequestCtx)

func (*BulkReshuffle) Name

func (this *BulkReshuffle) Name() string

type BulkReshuffleConfig

type BulkReshuffleConfig struct {
	TagsOnSuccess []string `config:"tag_on_success"`

	Elasticsearch          string `config:"elasticsearch"`
	QueuePrefix            string `config:"queue_name_prefix"`
	Level                  string `config:"level"` //cluster/node(will,change)/index/shard/partition
	PartitionSize          int    `config:"partition_size"`
	FixNullID              bool   `config:"fix_null_id"`
	ContinueAfterReshuffle bool   `config:"continue_after_reshuffle"`
	IndexStatsAnalysis     bool   `config:"index_stats_analysis"`
	ActionStatsAnalysis    bool   `config:"action_stats_analysis"`

	ContinueMetadataNotFound bool `config:"continue_metadata_missing"`

	ValidateRequest bool `config:"validate_request"`

	//split all lines into memory rather than scan
	ValidEachLine bool  `config:"validate_each_line"`
	ValidMetadata bool  `config:"validate_metadata"`
	ValidPayload  bool  `config:"validate_payload"`
	StickToNode   bool  `config:"stick_to_node"`
	EnabledShards []int `config:"shards"`

	BufferPoolEnabled bool   `config:"bytes_buffer_enabled"`
	MaxBufferCount    uint32 `config:"max_buffer_items"`
	MaxBufferSize     uint32 `config:"max_buffer_size"`
}

type BulkResponseProcess

type BulkResponseProcess struct {
	// contains filtered or unexported fields
}

func (*BulkResponseProcess) Filter

func (this *BulkResponseProcess) Filter(ctx *fasthttp.RequestCtx)

func (*BulkResponseProcess) Name

func (this *BulkResponseProcess) Name() string

type CommitConfig

type CommitConfig struct {
	QueueName      string `config:"queue_name" json:"queue_name,omitempty"`
	Group          string `config:"group" json:"group,omitempty"`
	Name           string `config:"name" json:"name,omitempty"`
	CommitInterval string `config:"interval"`
}

type Config

type Config struct {
	StatsOnly bool `config:"stats_only"`

	SuccessQueue string `config:"success_queue"`
	InvalidQueue string `config:"invalid_queue"`
	FailureQueue string `config:"failure_queue"`

	SuccessFlow string `config:"success_flow"`
	InvalidFlow string `config:"invalid_flow"`
	FailureFlow string `config:"failure_flow"`

	MessageTruncateSize int `config:"message_truncate_size"`

	ShowBulkErrorMessage bool `config:"show_bulk_error_message"`

	PartialFailureRetry                 bool `config:"partial_failure_retry"`               //是否主动重试,只有部分失败的请求,避免大量没有意义的 409
	PartialFailureMaxRetryTimes         int  `config:"partial_failure_max_retry_times"`     //是否主动重试,只有部分失败的请求,避免大量没有意义的 409
	PartialFailureRetryDelayLatencyInMs int  `config:"partial_failure_retry_latency_in_ms"` //是否主动重试,只有部分失败的请求,避免大量没有意义的 409

	ContinueOnAllError bool `config:"continue_on_all_error"` //没有拿到响应,整个请求都失败是否继续处理后续 flow
	ContinueOnAnyError bool `config:"continue_on_any_error"` //拿到响应,出现任意请求失败是否都继续 flow 还是结束处理
	ContinueOnSuccess  bool `config:"continue_on_success"`   //所有请求都成功

	TagsOnAllSuccess []string `config:"tag_on_all_success"` //所有请求都成功,没有失败
	TagsOnNone2xx    []string `config:"tag_on_none_2xx"`    //整个 bulk 请求非 200 或者 201 返回

	//bulk requests
	TagsOnAllInvalid []string `config:"tag_on_all_invalid"` //所有请求都是非法请求的情况
	TagsOnAllFailure []string `config:"tag_on_all_failure"` //所有失败的请求都是失败请求的情况

	TagsOnAnyError       []string `config:"tag_on_any_error"`       //请求里面包含任意失败或者非法请求的情况
	TagsOnPartialSuccess []string `config:"tag_on_partial_success"` //包含部分成功的情况
	TagsOnPartialFailure []string `config:"tag_on_partial_failure"` //包含部分失败的情况,可以重试
	TagsOnPartialInvalid []string `config:"tag_on_partial_invalid"` //包含部分非法请求的情况,无需重试的请求

	RetryFlow  string             `config:"retry_flow"`
	RetryRules elastic.RetryRules `config:"retry_rules"`

	BulkResponseParseConfig elastic.BulkResponseParseConfig `config:"response_handle"`
}

type DocumentBuffer

type DocumentBuffer struct {
	LastOffset queue.Offset
	// contains filtered or unexported fields
}

DocumentBuffer 表示文档的缓冲区

func (*DocumentBuffer) Add

func (b *DocumentBuffer) Add(docs []elastic.VersionInfo)

Add 将文档添加到缓冲区

func (*DocumentBuffer) GetDocuments

func (b *DocumentBuffer) GetDocuments(count int) (int, []elastic.VersionInfo)

GetDocuments 返回最旧的文档通道,最多读取指定数量的文档

type ElasticsearchBulkRequestMutate

type ElasticsearchBulkRequestMutate struct {
	DefaultIndex     string            `config:"default_index"`
	DefaultType      string            `config:"default_type"`
	FixNilType       bool              `config:"fix_null_type"`
	FixNilID         bool              `config:"fix_null_id"`
	Pipeline         string            `config:"pipeline"`
	RemoveTypeMeta   bool              `config:"remove_type"`
	RemovePipeline   bool              `config:"remove_pipeline"`
	AddTimestampToID bool              `config:"generate_enhanced_id"`
	IndexNameRename  map[string]string `config:"index_rename"`
	TypeNameRename   map[string]string `config:"type_rename"`
}

func (*ElasticsearchBulkRequestMutate) Filter

func (*ElasticsearchBulkRequestMutate) Name

func (filter *ElasticsearchBulkRequestMutate) Name() string

type ElasticsearchRequestReshuffle

type ElasticsearchRequestReshuffle struct {
	Elasticsearch          string   `config:"elasticsearch"`
	TagsOnSuccess          []string `config:"tag_on_success"`
	SkipBulk               bool     `config:"skip_bulk"`
	PartitionSize          int      `config:"partition_size"`
	QueuePrefix            string   `config:"queue_name_prefix"`
	HashFactor             string   `config:"hash_factor"`
	ContinueAfterReshuffle bool     `config:"continue_after_reshuffle"`
	// contains filtered or unexported fields
}

func (*ElasticsearchRequestReshuffle) Filter

func (*ElasticsearchRequestReshuffle) Name

func (filter *ElasticsearchRequestReshuffle) Name() string

type HashModFilter

type HashModFilter struct {
	Source           string `config:"source" `
	TargetContextKey string `config:"target_context_name" `
	PartitionSize    int    `config:"mod"`

	AddToRequestHeader  bool `config:"add_to_request_header" type:"bool" default_value:"true"`
	AddToResponseHeader bool `config:"add_to_response_header" type:"bool" default_value:"true"`
	// contains filtered or unexported fields
}

func (*HashModFilter) Filter

func (filter *HashModFilter) Filter(ctx *fasthttp.RequestCtx)

func (*HashModFilter) Name

func (filter *HashModFilter) Name() string

type Level

type Level string

type Queue

type Queue struct {
	Type      string                 `config:"type"`
	QueueName string                 `config:"queue_name"`
	Labels    map[string]interface{} `config:"labels,omitempty"`
}

type Sorter

type Sorter struct {
	// contains filtered or unexported fields
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL