Documentation ¶
Index ¶
- Constants
- Variables
- func New(c *config.Config) (pipeline.Filter, error)
- func NewBulkRequestResort(c *config.Config) (pipeline.Filter, error)
- func NewBulkReshuffle(c *config.Config) (pipeline.Filter, error)
- func NewBulkResponseValidate(c *config.Config) (pipeline.Filter, error)
- func NewElasticsearchBulkRequestMutateFilter(c *config.Config) (pipeline.Filter, error)
- func NewElasticsearchRequestReshuffleFilter(c *config.Config) (pipeline.Filter, error)
- func NewHashModFilter(c *config.Config) (pipeline.Filter, error)
- func ParseURLMeta(pathStr string) (valid bool, urlLevelIndex, urlLevelType, id string)
- func SortDocumentsByTime(docs []elastic.VersionInfo)
- func SortDocumentsByVersion(docs []elastic.VersionInfo)
- type AutoGenerateDocID
- type BulkRequestResort
- type BulkReshuffle
- type BulkReshuffleConfig
- type BulkResponseProcess
- type CommitConfig
- type Config
- type DocumentBuffer
- type ElasticsearchBulkRequestMutate
- type ElasticsearchRequestReshuffle
- type HashModFilter
- type Level
- type Queue
- type Sorter
Constants ¶
View Source
const ClusterLevel = "cluster"
View Source
const IndexLevel = "index"
View Source
const NodeLevel = "node"
View Source
const ShardLevel = "shard"
Variables ¶
View Source
var JSON_CONTENT_TYPE = "application/json"
Functions ¶
func NewBulkResponseValidate ¶
func ParseURLMeta ¶
func SortDocumentsByTime ¶
func SortDocumentsByTime(docs []elastic.VersionInfo)
func SortDocumentsByVersion ¶
func SortDocumentsByVersion(docs []elastic.VersionInfo)
SortDocumentsByVersion 按照文档版本进行排序
Types ¶
type AutoGenerateDocID ¶
type AutoGenerateDocID struct {
Prefix string `config:"prefix" `
}
func (*AutoGenerateDocID) Filter ¶
func (filter *AutoGenerateDocID) Filter(ctx *fasthttp.RequestCtx)
func (*AutoGenerateDocID) Name ¶
func (filter *AutoGenerateDocID) Name() string
type BulkRequestResort ¶
type BulkRequestResort struct { BatchSizeInDocs int `config:"batch_size_in_docs"` //batch size for each bulk request BatchSizeInMB int `config:"batch_size_in_mb"` MinBufferSize int `config:"min_buffer_size"` MaxBufferSize int `config:"max_buffer_size"` MinDocPaddingForOneDocs int `config:"min_doc_versions_for_one_doc"` IdleTimeoutInSeconds string `config:"idle_timeout_in_seconds"` TagOnComplete string `config:"tag_on_complete"` //add tag to parent context when all documents are processed Elasticsearch string `config:"elasticsearch"` PartitionSize int `config:"partition_size"` OutputQueue Queue `config:"output_queue"` CommitConfig CommitConfig `config:"commit_offset"` // contains filtered or unexported fields }
func (*BulkRequestResort) Filter ¶
func (filter *BulkRequestResort) Filter(ctx *fasthttp.RequestCtx)
func (*BulkRequestResort) Name ¶
func (filter *BulkRequestResort) Name() string
func (*BulkRequestResort) NewDocumentBuffer ¶
func (filter *BulkRequestResort) NewDocumentBuffer(partitionID int, queueName string, minBufferSize, maxBufferSize int, idleTimeout time.Duration, minDocPaddingSize int) *DocumentBuffer
NewDocumentBuffer 创建一个新的文档缓冲区
type BulkReshuffle ¶
type BulkReshuffle struct {
// contains filtered or unexported fields
}
func (*BulkReshuffle) Filter ¶
func (this *BulkReshuffle) Filter(ctx *fasthttp.RequestCtx)
func (*BulkReshuffle) Name ¶
func (this *BulkReshuffle) Name() string
type BulkReshuffleConfig ¶
type BulkReshuffleConfig struct { TagsOnSuccess []string `config:"tag_on_success"` Elasticsearch string `config:"elasticsearch"` QueuePrefix string `config:"queue_name_prefix"` Level string `config:"level"` //cluster/node(will,change)/index/shard/partition PartitionSize int `config:"partition_size"` FixNullID bool `config:"fix_null_id"` ContinueAfterReshuffle bool `config:"continue_after_reshuffle"` IndexStatsAnalysis bool `config:"index_stats_analysis"` ActionStatsAnalysis bool `config:"action_stats_analysis"` ContinueMetadataNotFound bool `config:"continue_metadata_missing"` ValidateRequest bool `config:"validate_request"` //split all lines into memory rather than scan ValidEachLine bool `config:"validate_each_line"` ValidMetadata bool `config:"validate_metadata"` ValidPayload bool `config:"validate_payload"` StickToNode bool `config:"stick_to_node"` EnabledShards []int `config:"shards"` BufferPoolEnabled bool `config:"bytes_buffer_enabled"` MaxBufferCount uint32 `config:"max_buffer_items"` MaxBufferSize uint32 `config:"max_buffer_size"` }
type BulkResponseProcess ¶
type BulkResponseProcess struct {
// contains filtered or unexported fields
}
func (*BulkResponseProcess) Filter ¶
func (this *BulkResponseProcess) Filter(ctx *fasthttp.RequestCtx)
func (*BulkResponseProcess) Name ¶
func (this *BulkResponseProcess) Name() string
type CommitConfig ¶
type Config ¶
type Config struct { StatsOnly bool `config:"stats_only"` SuccessQueue string `config:"success_queue"` InvalidQueue string `config:"invalid_queue"` FailureQueue string `config:"failure_queue"` SuccessFlow string `config:"success_flow"` InvalidFlow string `config:"invalid_flow"` FailureFlow string `config:"failure_flow"` MessageTruncateSize int `config:"message_truncate_size"` ShowBulkErrorMessage bool `config:"show_bulk_error_message"` PartialFailureRetry bool `config:"partial_failure_retry"` //是否主动重试,只有部分失败的请求,避免大量没有意义的 409 PartialFailureMaxRetryTimes int `config:"partial_failure_max_retry_times"` //是否主动重试,只有部分失败的请求,避免大量没有意义的 409 PartialFailureRetryDelayLatencyInMs int `config:"partial_failure_retry_latency_in_ms"` //是否主动重试,只有部分失败的请求,避免大量没有意义的 409 ContinueOnAllError bool `config:"continue_on_all_error"` //没有拿到响应,整个请求都失败是否继续处理后续 flow ContinueOnAnyError bool `config:"continue_on_any_error"` //拿到响应,出现任意请求失败是否都继续 flow 还是结束处理 ContinueOnSuccess bool `config:"continue_on_success"` //所有请求都成功 TagsOnAllSuccess []string `config:"tag_on_all_success"` //所有请求都成功,没有失败 TagsOnNone2xx []string `config:"tag_on_none_2xx"` //整个 bulk 请求非 200 或者 201 返回 //bulk requests TagsOnAllInvalid []string `config:"tag_on_all_invalid"` //所有请求都是非法请求的情况 TagsOnAllFailure []string `config:"tag_on_all_failure"` //所有失败的请求都是失败请求的情况 TagsOnAnyError []string `config:"tag_on_any_error"` //请求里面包含任意失败或者非法请求的情况 TagsOnPartialSuccess []string `config:"tag_on_partial_success"` //包含部分成功的情况 TagsOnPartialFailure []string `config:"tag_on_partial_failure"` //包含部分失败的情况,可以重试 TagsOnPartialInvalid []string `config:"tag_on_partial_invalid"` //包含部分非法请求的情况,无需重试的请求 RetryFlow string `config:"retry_flow"` RetryRules elastic.RetryRules `config:"retry_rules"` BulkResponseParseConfig elastic.BulkResponseParseConfig `config:"response_handle"` }
type DocumentBuffer ¶
DocumentBuffer 表示文档的缓冲区
func (*DocumentBuffer) GetDocuments ¶
func (b *DocumentBuffer) GetDocuments(count int) (int, []elastic.VersionInfo)
GetDocuments 返回最旧的文档通道,最多读取指定数量的文档
type ElasticsearchBulkRequestMutate ¶
type ElasticsearchBulkRequestMutate struct { DefaultIndex string `config:"default_index"` DefaultType string `config:"default_type"` FixNilType bool `config:"fix_null_type"` FixNilID bool `config:"fix_null_id"` Pipeline string `config:"pipeline"` RemoveTypeMeta bool `config:"remove_type"` RemovePipeline bool `config:"remove_pipeline"` AddTimestampToID bool `config:"generate_enhanced_id"` IndexNameRename map[string]string `config:"index_rename"` TypeNameRename map[string]string `config:"type_rename"` }
func (*ElasticsearchBulkRequestMutate) Filter ¶
func (this *ElasticsearchBulkRequestMutate) Filter(ctx *fasthttp.RequestCtx)
func (*ElasticsearchBulkRequestMutate) Name ¶
func (filter *ElasticsearchBulkRequestMutate) Name() string
type ElasticsearchRequestReshuffle ¶
type ElasticsearchRequestReshuffle struct { Elasticsearch string `config:"elasticsearch"` TagsOnSuccess []string `config:"tag_on_success"` SkipBulk bool `config:"skip_bulk"` PartitionSize int `config:"partition_size"` QueuePrefix string `config:"queue_name_prefix"` HashFactor string `config:"hash_factor"` ContinueAfterReshuffle bool `config:"continue_after_reshuffle"` // contains filtered or unexported fields }
func (*ElasticsearchRequestReshuffle) Filter ¶
func (this *ElasticsearchRequestReshuffle) Filter(ctx *fasthttp.RequestCtx)
func (*ElasticsearchRequestReshuffle) Name ¶
func (filter *ElasticsearchRequestReshuffle) Name() string
type HashModFilter ¶
type HashModFilter struct { Source string `config:"source" ` TargetContextKey string `config:"target_context_name" ` PartitionSize int `config:"mod"` AddToRequestHeader bool `config:"add_to_request_header" type:"bool" default_value:"true"` AddToResponseHeader bool `config:"add_to_response_header" type:"bool" default_value:"true"` // contains filtered or unexported fields }
func (*HashModFilter) Filter ¶
func (filter *HashModFilter) Filter(ctx *fasthttp.RequestCtx)
func (*HashModFilter) Name ¶
func (filter *HashModFilter) Name() string
Source Files ¶
Click to show internal directories.
Click to hide internal directories.