sender

package
v0.0.0-...-c38d673 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 5, 2024 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultSplitSize = 64 * 1024 // 默认分割为 64 kb
	// TypeMarshalError 表示marshal出错
	TypeMarshalError = reqerr.SendErrorType("Data Marshal failed")
)
View Source
const (
	// pandora key, 可选参数 当sender_type 为pandora 的时候,需要必填的字段
	KeyPandoraAk                   = "pandora_ak"
	KeyPandoraSk                   = "pandora_sk"
	KeyPandoraHost                 = "pandora_host"
	KeyPandoraWorkflowName         = "pandora_workflow_name"
	KeyPandoraRepoName             = "pandora_repo_name"
	KeyPandoraRegion               = "pandora_region"
	KeyPandoraSchema               = "pandora_schema"
	KeyPandoraSchemaUpdateInterval = "pandora_schema_update_interval"
	KeyPandoraAutoCreate           = "pandora_auto_create"
	KeyPandoraSchemaFree           = "pandora_schema_free"
	KeyPandoraExtraInfo            = "pandora_extra_info"

	KeyPandoraEnableLogDB   = "pandora_enable_logdb"
	KeyPandoraLogDBName     = "pandora_logdb_name"
	KeyPandoraLogDBHost     = "pandora_logdb_host"
	KeyPandoraLogDBAnalyzer = "pandora_logdb_analyzer"

	KeyPandoraEnableTSDB     = "pandora_enable_tsdb"
	KeyPandoraTSDBName       = "pandora_tsdb_name"
	KeyPandoraTSDBSeriesName = "pandora_tsdb_series_name"
	KeyPandoraTSDBSeriesTags = "pandora_tsdb_series_tags"
	KeyPandoraTSDBHost       = "pandora_tsdb_host"
	KeyPandoraTSDBTimeStamp  = "pandora_tsdb_timestamp"

	KeyPandoraEnableKodo         = "pandora_enable_kodo"
	KeyPandoraKodoBucketName     = "pandora_bucket_name"
	KeyPandoraKodoFilePrefix     = "pandora_kodo_prefix"
	KeyPandoraKodoLowFreqFile    = "pandora_kodo_low_frequency_file"
	KeyPandoraKodoCompressPrefix = "pandora_kodo_compress"
	KeyPandoraKodoGzip           = "pandora_kodo_gzip"
	KeyPandoraKodoRotateStrategy = "pandora_kodo_rotate_strategy"
	KeyPandoraKodoRotateInterval = "pandora_kodo_rotate_interval"
	KeyPandoraKodoRotateSize     = "pandora_kodo_rotate_size"
	KeyPandoraKodoFileRetention  = "pandora_kodo_file_retention"

	KeyPandoraEmail = "qiniu_email"

	KeyRequestRateLimit       = "request_rate_limit"
	KeyFlowRateLimit          = "flow_rate_limit"
	KeyPandoraGzip            = "pandora_gzip"
	KeyPandoraUUID            = "pandora_uuid"
	KeyPandoraWithIP          = "pandora_withip"
	KeyForceMicrosecond       = "force_microsecond"
	KeyForceDataConvert       = "pandora_force_convert"
	KeyNumberUseFloat         = "number_use_float"
	KeyPandoraAutoConvertDate = "pandora_auto_convert_date"
	KeyIgnoreInvalidField     = "ignore_invalid_field"
	KeyPandoraUnescape        = "pandora_unescape"
	KeyPandoraSendType        = "pandora_send_type"
	KeyInsecureServer         = "insecure_server"
	KeyPandoraDescription     = "pandora_description"

	PandoraUUID = "Pandora_UUID"

	TimestampPrecision = 19

	// Sender's conf keys
	KeySenderType        = "sender_type"
	KeyFaultTolerant     = "fault_tolerant"
	KeyName              = "name"
	KeyLogkitSendTime    = "logkit_send_time"
	KeyIsMetrics         = "is_metrics"
	KeyMetricTime        = "timestamp"
	UnderfinedRunnerName = "UnderfinedRunnerName"

	// SenderType 发送类型
	TypeFile              = "file"          // 本地文件
	TypePandora           = "pandora"       // pandora 打点
	TypeMongodbAccumulate = "mongodb_acc"   // mongodb 并且按字段聚合
	TypeInfluxdb          = "influxdb"      // influxdb
	TypeMock              = "mock"          // mock sender
	TypeDiscard           = "discard"       // discard sender
	TypeElastic           = "elasticsearch" // elastic
	TypeKafka             = "kafka"         // kafka
	TypeHttp              = "http"          // http sender

	InnerUserAgent = "_useragent"
)

pandora

View Source
const (
	// Elastic
	KeyElasticHost          = "elastic_host"
	KeyElasticVersion       = "elastic_version"
	KeyElasticIndex         = "elastic_index"
	KeyElasticType          = "elastic_type"
	KeyElasticAlias         = "elastic_keys"
	KeyElasticIndexStrategy = "elastic_index_strategy"
	KeyElasticTimezone      = "elastic_time_zone"

	KeyDefaultIndexStrategy = "default"
	KeyYearIndexStrategy    = "year"
	KeyMonthIndexStrategy   = "month"
	KeyDayIndexStrategy     = "day"

	// ElasticVersion3 v3.x
	ElasticVersion3 = "3.x"
	// ElasticVersion5 v5.x
	ElasticVersion5 = "5.x"
	// ElasticVersion6 v6.x
	ElasticVersion6 = "6.x"

	//timeZone
	KeylocalTimezone = "Local"
	KeyUTCTimezone   = "UTC"
	KeyPRCTimezone   = "PRC"

	KeySendTime = "sendTime"

	// fault_tolerant
	// 可选参数 fault_tolerant 为true的话,以下必填
	KeyFtSyncEvery         = "ft_sync_every"    // 该参数设置多少次写入会同步一次offset log
	KeyFtSaveLogPath       = "ft_save_log_path" // disk queue 数据日志路径
	KeyFtWriteLimit        = "ft_write_limit"   // 写入速度限制,单位MB
	KeyFtStrategy          = "ft_strategy"      // ft 的策略
	KeyFtProcs             = "ft_procs"         // ft并发数,当always_save或concurrent策略时启用
	KeyFtMemoryChannel     = "ft_memory_channel"
	KeyFtMemoryChannelSize = "ft_memory_channel_size"
	KeyFtLongDataDiscard   = "ft_long_data_discard"

	// queue
	KeyMaxDiskUsedBytes = "max_disk_used_bytes"
	KeyMaxSizePerFile   = "max_size_per_file"

	// ft 策略
	// KeyFtStrategyBackupOnly 只在失败的时候进行容错
	KeyFtStrategyBackupOnly = "backup_only"
	// KeyFtStrategyAlwaysSave 所有数据都进行容错
	KeyFtStrategyAlwaysSave = "always_save"
	// KeyFtStrategyConcurrent 适合并发发送数据,只在失败的时候进行容错
	KeyFtStrategyConcurrent = "concurrent"

	// Ft sender默认同步一次meta信息的数据次数
	DefaultFtSyncEvery = 10

	// file
	// 可选参数 当sender_type 为file 的时候
	KeyFileSenderPath         = "file_send_path"
	KeyFileSenderTimestampKey = "file_send_timestamp_key"
	KeyFileSenderMaxOpenFiles = "file_send_max_open_files"

	// http
	KeyHttpSenderUrl      = "http_sender_url"
	KeyHttpSenderGzip     = "http_sender_gzip"
	KeyHttpSenderProtocol = "http_sender_protocol"
	KeyHttpSenderCsvHead  = "http_sender_csv_head"
	KeyHttpSenderCsvSplit = "http_sender_csv_split"

	// Influxdb sender 的可配置字段
	KeyInfluxdbHost                  = "influxdb_host"
	KeyInfluxdbDB                    = "influxdb_db"
	KeyInfluxdbAutoCreate            = "influxdb_autoCreate"
	KeyInfluxdbRetetion              = "influxdb_retention"
	KeyInfluxdbRetetionDuration      = "influxdb_retention_duration"
	KeyInfluxdbMeasurement           = "influxdb_measurement"
	KeyInfluxdbTags                  = "influxdb_tags"
	KeyInfluxdbFields                = "influxdb_fields"                  // influxdb
	KeyInfluxdbTimestamp             = "influxdb_timestamp"               // 可选 nano时间戳字段
	KeyInfluxdbTimestampPrecision    = "influxdb_timestamp_precision"     // 时间戳字段的精度,代表时间戳1个单位代表多少纳秒
	KeyInfluxdbIgnoreBeyondRetention = "influxdb_ignore_beyond_retention" // 开启后将忽略超出 retention 时间的点

	// Kafka
	KeyKafkaCompressionNone   = "none"
	KeyKafkaCompressionGzip   = "gzip"
	KeyKafkaCompressionSnappy = "snappy"

	KeyKafkaHost     = "kafka_host"      //主机地址,可以有多个
	KeyKafkaTopic    = "kafka_topic"     //topic 1.填一个值,则topic为所填值 2.天两个值: %{[字段名]}, defaultTopic :根据每条event,以指定字段值为topic,若无,则用默认值
	KeyKafkaClientId = "kafka_client_id" //客户端ID
	//KeyKafkaFlushNum = "kafka_flush_num"				//缓冲条数
	//KeyKafkaFlushFrequency = "kafka_flush_frequency"	//缓冲频率
	KeyKafkaRetryMax    = "kafka_retry_max"   //最大重试次数
	KeyKafkaCompression = "kafka_compression" //压缩模式,有none, gzip, snappy
	KeyKafkaTimeout     = "kafka_timeout"     //连接超时时间
	KeyKafkaKeepAlive   = "kafka_keep_alive"  //保持连接时长
	KeyMaxMessageBytes  = "max_message_bytes" //每条消息最大字节数

	// Mongodb
	// 可选参数 当sender_type 为mongodb_* 的时候,需要必填的字段
	KeyMongodbHost       = "mongodb_host"
	KeyMongodbDB         = "mongodb_db"
	KeyMongodbCollection = "mongodb_collection"

	// 可选参数 当sender_type 为mongodb_acc 的时候,需要必填的字段
	KeyMongodbUpdateKey = "mongodb_acc_updkey"
	KeyMongodbAccKey    = "mongodb_acc_acckey"
)

Variables

View Source
var (
	OptionSaveLogPath = Option{
		KeyName:      KeyFtSaveLogPath,
		ChooseOnly:   false,
		Default:      "",
		DefaultNoUse: false,
		Description:  "管道磁盘数据保存路径(ft_save_log_path)",
		Advance:      true,
		ToolTip:      `指定备份数据的存放路径`,
	}
	OptionFtWriteLimit = Option{
		KeyName:      KeyFtWriteLimit,
		ChooseOnly:   false,
		Default:      "",
		DefaultNoUse: false,
		Description:  "磁盘写入限速(ft_write_limit)",
		CheckRegex:   "\\d+",
		Advance:      true,
		ToolTip:      `为了避免速率太快导致磁盘压力加大,可以根据系统情况自行限定写入本地磁盘的速率,单位MB/s`,
	}
	OptionFtStrategy = Option{
		KeyName:       KeyFtStrategy,
		ChooseOnly:    true,
		ChooseOptions: []interface{}{KeyFtStrategyBackupOnly, KeyFtStrategyAlwaysSave, KeyFtStrategyConcurrent},
		Default:       KeyFtStrategyBackupOnly,
		DefaultNoUse:  false,
		Description:   "磁盘管道容错策略[仅备份错误|全部数据走管道|仅增加并发](ft_strategy)",
		Advance:       true,
		ToolTip:       `设置为backup_only的时候,数据不经过本地队列直接发送到下游,设为always_save时则所有数据会先发送到本地队列,选concurrent的时候会直接并发发送,不经过队列。无论该选项设置什么,失败的数据都会加入到重试队列中异步循环重试`,
	}
	OptionFtProcs = Option{
		KeyName:      KeyFtProcs,
		ChooseOnly:   false,
		Default:      "",
		DefaultNoUse: false,
		Description:  "发送并发数量(ft_procs)",
		CheckRegex:   "\\d+",
		Advance:      true,
		ToolTip:      "并发仅在ft_strateg模式选择 always_save或concurrent 时生效",
	}
	OptionFtMemoryChannel = Option{
		KeyName:       KeyFtMemoryChannel,
		Element:       Radio,
		ChooseOnly:    true,
		ChooseOptions: []interface{}{"false", "true"},
		Default:       "false",
		DefaultNoUse:  false,
		Description:   "用内存管道(ft_memory_channel)",
		Advance:       true,
		ToolTip:       `内存管道替代磁盘管道`,
	}
	OptionFtMemoryChannelSize = Option{
		KeyName:       KeyFtMemoryChannelSize,
		ChooseOnly:    false,
		Default:       "",
		DefaultNoUse:  false,
		Description:   "内存管道长度(ft_memory_channel_size)",
		CheckRegex:    "\\d+",
		Advance:       true,
		AdvanceDepend: KeyFtMemoryChannel,
		ToolTip:       `默认为"100",单位为批次,也就是100代表100个待发送的批次,注意:该选项设置的大小表达的是队列中可存储的元素个数,并不是占用的内存大小`,
	}
	OptionKeyFtLongDataDiscard = Option{
		KeyName:       KeyFtLongDataDiscard,
		Element:       Radio,
		ChooseOnly:    true,
		ChooseOptions: []interface{}{"false", "true"},
		Default:       "false",
		DefaultNoUse:  false,
		Description:   "丢弃大于2M的数据(ft_long_data_discard)",
		Advance:       true,
		ToolTip:       `丢弃大于2M的数据`,
	}
	OptionMaxDiskUsedBytes = Option{
		KeyName:      KeyMaxDiskUsedBytes,
		ChooseOnly:   false,
		Default:      strconv.Itoa(maxDiskUsedBytes),
		DefaultNoUse: false,
		Description:  "磁盘使用总大小限制(max_disk_used_bytes)",
		Advance:      true,
		ToolTip:      `磁盘使用总大小限制`,
	}
	OptionMaxSizePerSize = Option{
		KeyName:      KeyMaxSizePerFile,
		ChooseOnly:   false,
		Default:      strconv.Itoa(maxBytesPerFile),
		DefaultNoUse: false,
		Description:  "磁盘队列单个文件最大字节(max_size_per_file)",
		Advance:      true,
		ToolTip:      `磁盘队列单个文件最大字节, 也是单个消息的最大字节。最大不超过2GB`,
	}
	OptionLogkitSendTime = Option{
		KeyName:       KeyLogkitSendTime,
		Element:       Radio,
		ChooseOnly:    true,
		ChooseOptions: []interface{}{"true", "false"},
		Default:       "true",
		DefaultNoUse:  false,
		Description:   "添加系统时间(logkit_send_time)",
		Advance:       true,
		ToolTip:       "在系统中添加数据发送时的当前时间作为时间戳",
	}
)
View Source
var ErrNotAsyncSender = errors.New("sender does not support for Async Push")

NotAsyncSender return when sender is not async

View Source
var ModeKeyOptions = map[string][]Option{
	TypeFile: {
		{
			KeyName:      KeyFileSenderPath,
			ChooseOnly:   false,
			Default:      "",
			Required:     true,
			Placeholder:  "/home/john/mylogs/my-%Y-%m-%d.log",
			DefaultNoUse: true,
			Description:  "发送到指定文件(file_send_path)",
			ToolTip:      `路径支持魔法变量,例如 "file_send_path":"data-%Y-%m-%d.txt" ,此时数据就会渲染出日期,存放为 data-2018-03-28.txt`,
		},
		{
			KeyName:      KeyFileSenderTimestampKey,
			ChooseOnly:   false,
			Default:      "",
			Required:     false,
			Placeholder:  "timestamp",
			DefaultNoUse: true,
			Description:  "时间戳键名(file_send_timestamp_key)",
			ToolTip:      `用于获取时间的时间戳键名,如果该键存在则将替代当前时间渲染路径中的魔法变量,格式必须是 RFC3339Nano`,
		},
	},
	TypePandora: {
		{
			KeyName:      KeyPandoraAk,
			ChooseOnly:   false,
			Default:      "",
			Required:     true,
			Placeholder:  "在此填写您七牛账号ak(access_key)",
			DefaultNoUse: true,
			Description:  "七牛公钥(access_key)",
		},
		{
			KeyName:      KeyPandoraSk,
			ChooseOnly:   false,
			Default:      "",
			Required:     true,
			Placeholder:  "在此填写您七牛账号的sk(secret_key)",
			DefaultNoUse: true,
			Description:  "七牛私钥(secret_key)",
			Secret:       true,
		},
		{
			KeyName:      KeyPandoraWorkflowName,
			ChooseOnly:   false,
			Default:      "",
			Placeholder:  "logkit_default_pipeline",
			DefaultNoUse: true,
			Required:     true,
			Description:  "新增或现有的Pipeline名称(pandora_workflow_name)",
			CheckRegex:   "^[a-zA-Z_][a-zA-Z0-9_]{0,127}$",
			ToolTip:      "新增或现有的七牛大数据平台Pipeline名称",
		},
		{
			KeyName:      KeyPandoraRepoName,
			ChooseOnly:   false,
			Default:      "",
			Required:     true,
			Placeholder:  "my_work",
			DefaultNoUse: true,
			Description:  "新增或现有的实时仓库名称(pandora_repo_name)",
			CheckRegex:   "^[a-zA-Z][a-zA-Z0-9_]{0,127}$",
			ToolTip:      "新增或现有的七牛大数据平台Pipeline中的实时仓库名称",
		},
		OptionSaveLogPath,
		OptionLogkitSendTime,
		{
			KeyName:      KeyPandoraHost,
			ChooseOnly:   false,
			Default:      config.DefaultPipelineEndpoint,
			DefaultNoUse: false,
			Description:  "大数据平台域名(pandora_host)",
			Advance:      true,
			ToolTip:      "数据发送的目的域名,私有部署请对应修改",
		},
		{
			KeyName:       KeyPandoraRegion,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"nb"},
			Default:       "nb",
			DefaultNoUse:  false,
			Description:   "创建的资源所在区域(pandora_region)",
			ToolTip:       "工作流资源创建所在区域",
		},
		{
			KeyName:       KeyPandoraSchemaFree,
			Element:       Radio,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"true", "false"},
			Default:       "true",
			DefaultNoUse:  false,
			Description:   "自动创建实时仓库并更新(pandora_schema_free)",
			Advance:       true,
			ToolTip:       "自动根据数据创建Pipeline、实时仓库并自动更新",
		},
		{
			KeyName:      KeyPandoraAutoCreate,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "以DSL语法自动创建实时仓库(pandora_auto_create)",
			Advance:      true,
			ToolTip:      `自动创建实时仓库,语法为 "f1 date, f2 string, f3 float, f4 map{f5 long}"`,
		},
		{
			KeyName:      KeyPandoraSchema,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "筛选字段(重命名)发送(pandora_schema)",
			Advance:      true,
			ToolTip:      `将f1重命名为f2: "f1 f2,...", 其他自动不要去掉...`,
		},
		{
			KeyName:       KeyPandoraEnableLogDB,
			Element:       Radio,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"true", "false"},
			Default:       "true",
			DefaultNoUse:  false,
			Description:   "自动创建并导出到日志分析(pandora_enable_logdb)",
		},
		{
			KeyName:       KeyPandoraLogDBName,
			ChooseOnly:    false,
			Default:       "",
			DefaultNoUse:  false,
			Description:   "新增或现有的日志分析仓库名称(pandora_logdb_name)",
			Advance:       true,
			AdvanceDepend: KeyPandoraEnableLogDB,
			ToolTip:       "若不指定使用实时仓库(pandora_repo_name)名称",
		},
		{
			KeyName:       KeyPandoraLogDBHost,
			ChooseOnly:    false,
			Default:       config.DefaultLogDBEndpoint,
			DefaultNoUse:  false,
			Description:   "日志分析域名[私有部署才修改](pandora_logdb_host)",
			Advance:       true,
			AdvanceDepend: KeyPandoraEnableLogDB,
			ToolTip:       "日志分析仓库域名,私有部署请对应修改",
		},
		{
			KeyName:       KeyPandoraLogDBAnalyzer,
			ChooseOnly:    false,
			Default:       "",
			DefaultNoUse:  false,
			Description:   "指定字段分词方式(pandora_logdb_analyzer)",
			Advance:       true,
			AdvanceDepend: KeyPandoraEnableLogDB,
			ToolTip:       `指定字段的分词方式,逗号分隔多个,如 "f1 keyword, f2 full_text"。仅在新建时生效,更改时不生效,请在日志仓库更改。`,
		},
		{
			KeyName:       KeyPandoraEnableTSDB,
			Element:       Radio,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"false", "true"},
			Default:       "false",
			DefaultNoUse:  false,
			Description:   "自动创建并导出到时序数据库(pandora_enable_tsdb)",
		},
		{
			KeyName:       KeyPandoraTSDBName,
			ChooseOnly:    false,
			Default:       "",
			DefaultNoUse:  false,
			Description:   "指定时序数据库仓库名称(pandora_tsdb_name)",
			AdvanceDepend: KeyPandoraEnableTSDB,
			ToolTip:       "若不指定使用实时仓库(pandora_repo_name)名称",
		},
		{
			KeyName:       KeyPandoraTSDBSeriesName,
			ChooseOnly:    false,
			Default:       "",
			DefaultNoUse:  false,
			Description:   "指定时序数据库序列名称(pandora_tsdb_series_name)",
			AdvanceDepend: KeyPandoraEnableTSDB,
			ToolTip:       "若不指定使用仓库(pandora_tsdb_name)名称",
		},
		{
			KeyName:       KeyPandoraTSDBSeriesTags,
			ChooseOnly:    false,
			Default:       "",
			DefaultNoUse:  false,
			Description:   "指定时序数据库标签(pandora_tsdb_series_tags)",
			AdvanceDepend: KeyPandoraEnableTSDB,
		},
		{
			KeyName:       KeyPandoraTSDBHost,
			ChooseOnly:    false,
			Default:       config.DefaultTSDBEndpoint,
			DefaultNoUse:  false,
			Description:   "时序数据库域名[私有部署才修改](pandora_tsdb_host)",
			Advance:       true,
			AdvanceDepend: KeyPandoraEnableTSDB,
			ToolTip:       "时序数据库域名,私有部署请对应修改",
		},
		{
			KeyName:       KeyPandoraTSDBTimeStamp,
			ChooseOnly:    false,
			Default:       "",
			DefaultNoUse:  false,
			Description:   "指定时序数据库时间戳(pandora_tsdb_timestamp)",
			Advance:       true,
			AdvanceDepend: KeyPandoraEnableTSDB,
		},
		{
			KeyName:       KeyPandoraEnableKodo,
			Element:       Radio,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"false", "true"},
			Default:       "false",
			DefaultNoUse:  false,
			Description:   "自动导出到七牛云存储(pandora_enable_kodo)",
		},
		{
			KeyName:       KeyPandoraKodoBucketName,
			ChooseOnly:    false,
			Default:       "",
			Required:      true,
			Placeholder:   "my_bucket_name",
			DefaultNoUse:  true,
			Description:   "云存储仓库名称(启用自动导出到云存储时必填)(pandora_bucket_name)",
			AdvanceDepend: KeyPandoraEnableKodo,
		},
		{
			KeyName:       KeyPandoraEmail,
			ChooseOnly:    false,
			Default:       "",
			Required:      true,
			Placeholder:   "my@email.com",
			DefaultNoUse:  true,
			Description:   "七牛账户邮箱(qiniu_email)",
			AdvanceDepend: KeyPandoraEnableKodo,
		},
		{
			KeyName:       KeyPandoraKodoFilePrefix,
			ChooseOnly:    false,
			Default:       "logkitauto/date=$(year)-$(mon)-$(day)/hour=$(hour)/min=$(min)/$(sec)",
			DefaultNoUse:  false,
			Description:   "云存储文件前缀(pandora_kodo_prefix)",
			AdvanceDepend: KeyPandoraEnableKodo,
			Advance:       true,
		},
		{
			KeyName:       KeyPandoraKodoLowFreqFile,
			Element:       Radio,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"false", "true"},
			Default:       "false",
			DefaultNoUse:  false,
			Description:   "存储为低频文件(pandora_kodo_low_frequency_file)",
			AdvanceDepend: KeyPandoraEnableKodo,
			Advance:       true,
		},
		{
			KeyName:       KeyPandoraKodoCompressPrefix,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"parquet", "json", "text", "csv"},
			Default:       "parquet",
			DefaultNoUse:  false,
			Description:   "云存储文件保存格式(pandora_kodo_compress)",
			AdvanceDepend: KeyPandoraEnableKodo,
			Advance:       true,
		},
		{
			KeyName:       KeyPandoraKodoGzip,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"false", "true"},
			Default:       "false",
			DefaultNoUse:  false,
			Description:   "云存储文件压缩存储(pandora_kodo_gzip)",
			AdvanceDepend: KeyPandoraEnableKodo,
			Advance:       true,
		},
		{
			KeyName:       KeyPandoraKodoRotateStrategy,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"interval", "size", "both"},
			Default:       "interval",
			DefaultNoUse:  false,
			Description:   "云存储文件分割策略(pandora_kodo_rotate_strategy)",
			AdvanceDepend: KeyPandoraEnableKodo,
			Advance:       true,
		},
		{
			KeyName:       KeyPandoraKodoRotateInterval,
			ChooseOnly:    false,
			Default:       "600",
			DefaultNoUse:  false,
			Description:   "云存储文件分割间隔时间(pandora_kodo_rotate_interval)(单位秒)",
			AdvanceDepend: KeyPandoraEnableKodo,
			Advance:       true,
		},
		{
			KeyName:       KeyPandoraKodoRotateSize,
			ChooseOnly:    false,
			Default:       "512000",
			DefaultNoUse:  false,
			Description:   "云存储文件分割文件大小(pandora_kodo_rotate_size)(单位KB)",
			AdvanceDepend: KeyPandoraEnableKodo,
			Advance:       true,
		},
		{
			KeyName:       KeyPandoraKodoFileRetention,
			ChooseOnly:    false,
			Default:       "0",
			DefaultNoUse:  false,
			Description:   "云存储文件保存时间(pandora_kodo_file_retention)(单位天,0为永久保存)",
			ToolTip:       "导出到云存储的文件保存时间,数字表示存的天数,0为永久保存",
			AdvanceDepend: KeyPandoraEnableKodo,
			Advance:       true,
		},
		{
			KeyName:       KeyPandoraGzip,
			Element:       Radio,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"true", "false"},
			Default:       "true",
			DefaultNoUse:  false,
			Description:   "压缩发送(pandora_gzip)",
			Advance:       true,
			ToolTip:       "使用gzip压缩发送",
		},
		{
			KeyName:      KeyFlowRateLimit,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "流量限制(flow_rate_limit)",
			CheckRegex:   "\\d+",
			Advance:      true,
			ToolTip:      "对请求流量限制,单位为[KB/s]",
		},
		{
			KeyName:      KeyRequestRateLimit,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "请求限制(request_rate_limit)",
			CheckRegex:   "\\d+",
			Advance:      true,
			ToolTip:      "对请求次数限制,单位为[次/s]",
		},
		{
			KeyName:       KeyPandoraUUID,
			Element:       Radio,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"false", "true"},
			Default:       "false",
			DefaultNoUse:  false,
			Description:   "数据植入UUID(pandora_uuid)",
			Advance:       true,
			ToolTip:       `该字段保证了发送出去的每一条数据都拥有一个唯一的UUID,可以用于数据去重等需要`,
		},
		{
			KeyName:       KeyPandoraWithIP,
			ChooseOnly:    false,
			ChooseOptions: []interface{}{"false", "true"},
			Default:       "false",
			DefaultNoUse:  false,
			Description:   "数据植入来源IP(pandora_withip)",
			Advance:       true,
		},
		OptionFtWriteLimit,
		OptionFtStrategy,
		OptionFtProcs,
		OptionFtMemoryChannel,
		OptionFtMemoryChannelSize,
		OptionKeyFtLongDataDiscard,
		OptionMaxDiskUsedBytes,
		OptionMaxSizePerSize,
		{
			KeyName:       KeyForceMicrosecond,
			Element:       Radio,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"false", "true"},
			Default:       "false",
			DefaultNoUse:  false,
			Description:   "扰动时间字段增加精度(force_microsecond)",
			Advance:       true,
		},
		{
			KeyName:       KeyForceDataConvert,
			Element:       Radio,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"false", "true"},
			Default:       "false",
			DefaultNoUse:  false,
			Description:   "自动转换类型(pandora_force_convert)",
			Advance:       true,
			ToolTip:       `强制类型转换,如定义的pandora schema为long,而实际的为string,则会尝试将string解析为long类型,若无法解析,则忽略该字段内容`,
		},
		{
			KeyName:       KeyNumberUseFloat,
			Element:       Radio,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"true", "false"},
			Default:       "true",
			DefaultNoUse:  false,
			Description:   "数字统一为float(number_use_float)",
			Advance:       true,
			ToolTip:       `对于整型和浮点型都自动识别为浮点型`,
		},
		{
			KeyName:       KeyIgnoreInvalidField,
			Element:       Radio,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"true", "false"},
			Default:       "true",
			DefaultNoUse:  false,
			Description:   "忽略错误字段(ignore_invalid_field)",
			Advance:       true,
			ToolTip:       `进行数据格式校验,并忽略不符合格式的字段数据`,
		},
		{
			KeyName:       KeyPandoraAutoConvertDate,
			Element:       Radio,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"false", "true"},
			Default:       "false",
			DefaultNoUse:  false,
			Description:   "自动转换时间类型(pandora_auto_convert_date)",
			Advance:       true,
			ToolTip:       `会自动将用户的自动尝试转换为Pandora的时间类型(date)`,
		},
		{
			KeyName:       KeyPandoraUnescape,
			Element:       Radio,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"true", "false"},
			Default:       "true",
			DefaultNoUse:  false,
			Description:   "服务端反转译换行/制表符(pandora_unescape)",
			Advance:       true,
			ToolTip:       `在pandora服务端反转译\\n=>\n, \\t=>\t; 由于pandora数据上传的编码方式会占用\t和\n这两个符号,所以在sdk中打点时会默认把\t转译为\\t,把\n转译为\\n,开启这个选项就是在服务端把这个反转译回来。开启该选项也会转译数据中原有的这些\\t和\\n符号`,
		},
		{
			KeyName:       KeyInsecureServer,
			Element:       Radio,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"false", "true"},
			Default:       "false",
			DefaultNoUse:  false,
			Description:   "认证不验证(insecure_server)",
			Advance:       true,
			ToolTip:       `对于https等情况不对证书和安全性检验`,
		},
	},
	TypeMongodbAccumulate: {
		{
			KeyName:      KeyMongodbHost,
			ChooseOnly:   false,
			Default:      "",
			Required:     true,
			Placeholder:  "mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]",
			DefaultNoUse: true,
			Description:  "数据库地址(mongodb_host)",
			ToolTip:      `Mongodb的地址: mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]`,
		},
		{
			KeyName:      KeyMongodbDB,
			ChooseOnly:   false,
			Default:      "",
			Required:     true,
			Placeholder:  "app123",
			DefaultNoUse: true,
			Description:  "数据库名称(mongodb_db)",
		},
		{
			KeyName:      KeyMongodbCollection,
			ChooseOnly:   false,
			Default:      "",
			Required:     true,
			Placeholder:  "collection1",
			DefaultNoUse: true,
			Description:  "数据表名称(mongodb_collection)",
		},
		{
			KeyName:      KeyMongodbUpdateKey,
			ChooseOnly:   false,
			Default:      "",
			Required:     true,
			Placeholder:  "domain,uid",
			DefaultNoUse: true,
			Description:  "聚合条件列(mongodb_acc_updkey)",
		},
		{
			KeyName:      KeyMongodbAccKey,
			ChooseOnly:   false,
			Default:      "",
			Required:     true,
			Placeholder:  "low,hit",
			DefaultNoUse: true,
			Description:  "聚合列(mongodb_acc_acckey)",
		},
		OptionSaveLogPath,
		OptionFtWriteLimit,
		OptionFtStrategy,
		OptionFtProcs,
		OptionFtMemoryChannel,
		OptionFtMemoryChannelSize,
		OptionKeyFtLongDataDiscard,
		OptionMaxDiskUsedBytes,
		OptionMaxSizePerSize,
	},
	TypeInfluxdb: {
		{
			KeyName:      KeyInfluxdbHost,
			ChooseOnly:   false,
			Default:      "",
			Required:     true,
			Placeholder:  "127.0.0.1:8086",
			DefaultNoUse: true,
			Description:  "数据库地址(influxdb_host)",
			ToolTip:      `数据库地址127.0.0.1:8086`,
		},
		{
			KeyName:      KeyInfluxdbDB,
			ChooseOnly:   false,
			Default:      "",
			Required:     true,
			Placeholder:  "testdb",
			DefaultNoUse: true,
			Description:  "数据库名称(influxdb_db)",
		},
		{
			KeyName:       KeyInfluxdbAutoCreate,
			Element:       Radio,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"true", "false"},
			Default:       "true",
			Description:   "自动创建数据库(influxdb_auto_create)",
			Advance:       true,
		},
		{
			KeyName:      KeyInfluxdbMeasurement,
			ChooseOnly:   false,
			Default:      "",
			Required:     true,
			Placeholder:  "test_table",
			DefaultNoUse: true,
			Description:  "measurement名称(influxdb_measurement)",
		},
		{
			KeyName:      KeyInfluxdbRetetion,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "retention名称(influxdb_retention)",
			Advance:      true,
		},
		{
			KeyName:       KeyInfluxdbRetetionDuration,
			ChooseOnly:    false,
			Default:       "",
			DefaultNoUse:  false,
			Description:   "retention时长(influxdb_retention_duration)",
			AdvanceDepend: KeyInfluxdbAutoCreate,
			Advance:       true,
		},
		{
			KeyName:      KeyInfluxdbTags,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "标签列数据(influxdb_tags)",
			Advance:      true,
		},
		{
			KeyName:      KeyInfluxdbFields,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "普通列数据(influxdb_fields)",
			Advance:      true,
		},
		{
			KeyName:      KeyInfluxdbTimestamp,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "时间戳列(influxdb_timestamp)",
			Advance:      true,
		},
		{
			KeyName:      KeyInfluxdbTimestampPrecision,
			ChooseOnly:   false,
			Default:      "100",
			DefaultNoUse: false,
			Description:  "时间戳列精度调整(influxdb_timestamp_precision)",
			Advance:      true,
		},
		{
			KeyName:       KeyInfluxdbIgnoreBeyondRetention,
			Element:       Radio,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"true", "false"},
			Default:       "false",
			Description:   "忽略超出 retention 时间的数据(influxdb_ignore_beyond_retention)",
			Advance:       true,
		},
		OptionSaveLogPath,
		OptionFtWriteLimit,
		OptionFtStrategy,
		OptionFtProcs,
		OptionFtMemoryChannel,
		OptionFtMemoryChannelSize,
		OptionKeyFtLongDataDiscard,
		OptionMaxDiskUsedBytes,
		OptionMaxSizePerSize,
	},
	TypeDiscard: {},
	TypeElastic: {
		{
			KeyName:      KeyElasticHost,
			ChooseOnly:   false,
			Default:      "",
			Required:     true,
			Placeholder:  "192.168.31.203:9200",
			DefaultNoUse: false,
			Description:  "host地址(elastic_host)",
			ToolTip:      `常用端口9200`,
		},
		{
			KeyName:       KeyElasticVersion,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{ElasticVersion3, ElasticVersion5, ElasticVersion6},
			Description:   "ES版本号(es_version)",
		},
		{
			KeyName:      KeyElasticIndex,
			ChooseOnly:   false,
			Default:      "",
			Required:     true,
			Placeholder:  "app-repo-123",
			DefaultNoUse: true,
			Description:  "索引名称(elastic_index)",
		},
		{
			KeyName:       KeyElasticIndexStrategy,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{KeyDefaultIndexStrategy, KeyYearIndexStrategy, KeyMonthIndexStrategy, KeyDayIndexStrategy},
			Default:       KeyFtStrategyBackupOnly,
			DefaultNoUse:  false,
			Description:   "自动索引模式(默认索引|按年索引|按月索引|按日索引)(index_strategy)",
			Advance:       true,
		},
		{
			KeyName:       KeyElasticTimezone,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{KeyUTCTimezone, KeylocalTimezone, KeyPRCTimezone},
			Default:       KeyUTCTimezone,
			DefaultNoUse:  false,
			Description:   "索引时区(Local(本地)|UTC(标准时间)|PRC(北京时间))(elastic_time_zone)",
			Advance:       true,
		},
		OptionLogkitSendTime,
		{
			KeyName:      KeyElasticType,
			ChooseOnly:   false,
			Default:      "",
			Required:     true,
			Placeholder:  "app",
			DefaultNoUse: true,
			Description:  "索引类型名称(elastic_type)",
		},
		OptionSaveLogPath,
		OptionFtWriteLimit,
		OptionFtStrategy,
		OptionFtProcs,
		OptionFtMemoryChannel,
		OptionFtMemoryChannelSize,
		OptionKeyFtLongDataDiscard,
		OptionMaxDiskUsedBytes,
		OptionMaxSizePerSize,
	},
	TypeKafka: {
		{
			KeyName:      KeyKafkaHost,
			ChooseOnly:   false,
			Required:     true,
			Default:      "",
			Placeholder:  "192.168.31.201:9092",
			DefaultNoUse: true,
			Description:  "broker的host地址(kafka_host)",
			ToolTip:      "常用端口 9092",
		},
		{
			KeyName:      KeyKafkaTopic,
			ChooseOnly:   false,
			Default:      "",
			Required:     true,
			Placeholder:  "my_topic",
			DefaultNoUse: true,
			Description:  "打点的topic名称(kafka_topic)",
		},
		{
			KeyName:       KeyKafkaCompression,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{KeyKafkaCompressionNone, KeyKafkaCompressionGzip, KeyKafkaCompressionSnappy},
			Default:       KeyKafkaCompressionNone,
			DefaultNoUse:  false,
			Description:   "压缩模式[none不压缩|gzip压缩|snappy压缩](kafka_compression)",
		},
		{
			KeyName:      KeyKafkaClientId,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "kafka客户端标识ID(kafka_client_id)",
			Advance:      true,
		},
		{
			KeyName:      KeyKafkaRetryMax,
			ChooseOnly:   false,
			Default:      "3",
			DefaultNoUse: false,
			Description:  "kafka最大错误重试次数(kafka_retry_max)",
			Advance:      true,
		},
		{
			KeyName:      KeyKafkaTimeout,
			ChooseOnly:   false,
			Default:      "30s",
			DefaultNoUse: false,
			Description:  "kafka连接超时时间(kafka_timeout)",
			Advance:      true,
		},
		{
			KeyName:      KeyKafkaKeepAlive,
			ChooseOnly:   false,
			Default:      "0",
			DefaultNoUse: false,
			Description:  "kafka的keepalive时间(kafka_keep_alive)",
			Advance:      true,
		},
		OptionSaveLogPath,
		OptionFtWriteLimit,
		OptionFtStrategy,
		OptionFtProcs,
		OptionFtMemoryChannel,
		OptionFtMemoryChannelSize,
		OptionKeyFtLongDataDiscard,
		OptionMaxDiskUsedBytes,
		OptionMaxSizePerSize,
	},
	TypeHttp: {
		{
			KeyName:      KeyHttpSenderUrl,
			ChooseOnly:   false,
			Default:      "",
			Placeholder:  "http://127.0.0.1/data",
			DefaultNoUse: true,
			Required:     true,
			Description:  "发送目的url(http_sender_url)",
		},
		{
			KeyName:       KeyHttpSenderProtocol,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"json", "csv", "body_json"},
			Default:       "json",
			Description:   "发送数据时使用的格式(http_sender_protocol)",
		},
		{
			KeyName:      KeyHttpSenderCsvSplit,
			ChooseOnly:   false,
			Default:      "",
			Placeholder:  ",",
			Required:     true,
			DefaultNoUse: true,
			Description:  "csv分隔符(http_sender_csv_split)",
		},
		{
			KeyName:       KeyHttpSenderGzip,
			Element:       Radio,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"true", "false"},
			Default:       "true",
			DefaultNoUse:  true,
			Description:   "是否启用gzip(http_sender_gzip)",
		},
		OptionSaveLogPath,
		OptionFtWriteLimit,
		OptionFtStrategy,
		OptionFtProcs,
		OptionFtMemoryChannel,
		OptionFtMemoryChannelSize,
		OptionKeyFtLongDataDiscard,
		OptionMaxDiskUsedBytes,
		OptionMaxSizePerSize,
	},
}
View Source
var ModeUsages = KeyValueSlice{
	{TypePandora, "发送至 七牛云智能日志平台(Pandora)", ""},
	{TypeFile, "发送至 本地文件", ""},
	{TypeMongodbAccumulate, "发送至 MongoDB 服务", ""},
	{TypeInfluxdb, "发送至 InfluxDB 服务", ""},
	{TypeDiscard, "消费数据但不发送", ""},
	{TypeElastic, "发送至 Elasticsearch 服务", ""},
	{TypeKafka, "发送至 Kafka 服务", ""},
	{TypeHttp, "发送至 HTTP 服务器", ""},
}

ModeUsages 用途说明

Functions

func ConvertDatas

func ConvertDatas(ins []map[string]interface{}) []Data

func ConvertDatasBack

func ConvertDatasBack(ins []Data) []map[string]interface{}

func RegisterConstructor

func RegisterConstructor(typ string, c Constructor)

RegisterConstructor adds a new constructor for a given type of reader.

func SplitData

func SplitData(data string, splitSize int64) (valArray []string)

Types

type Constructor

type Constructor func(conf.MapConf) (Sender, error)

type FtOption

type FtOption struct {
	// contains filtered or unexported fields
}

type FtSender

type FtSender struct {
	BackupQueue queue.BackendQueue
	// contains filtered or unexported fields
}

FtSender fault tolerance sender wrapper

func NewFtSender

func NewFtSender(innerSender Sender, conf conf.MapConf, ftSaveLogPath string) (*FtSender, error)

NewFtSender Fault tolerant sender constructor

func (*FtSender) Close

func (ft *FtSender) Close() error

func (*FtSender) Name

func (ft *FtSender) Name() string

func (*FtSender) Reset

func (ft *FtSender) Reset() error

func (*FtSender) Restore

func (ft *FtSender) Restore(info *StatsInfo)

func (*FtSender) Send

func (ft *FtSender) Send(datas []Data) error

func (*FtSender) SkipDeepCopy

func (ft *FtSender) SkipDeepCopy() bool

func (*FtSender) Stats

func (ft *FtSender) Stats() StatsInfo

func (*FtSender) TokenRefresh

func (ft *FtSender) TokenRefresh(mapConf conf.MapConf) (err error)

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

SenderRegistry sender 的工厂类。可以注册自定义sender

func NewRegistry

func NewRegistry() *Registry

func (*Registry) NewSender

func (r *Registry) NewSender(conf conf.MapConf, ftSaveLogPath string) (sender Sender, err error)

func (*Registry) RegisterSender

func (r *Registry) RegisterSender(senderType string, constructor func(conf.MapConf) (Sender, error)) error

type Sender

type Sender interface {
	Name() string
	// send data, error if failed
	Send([]Data) error
	Close() error
}

Sender send data to pandora, prometheus such different destinations

type SkipDeepCopySender

type SkipDeepCopySender interface {
	// SkipDeepCopy 需要返回值是因为如果一个 sender 封装了其它 sender,需要根据实际封装的类型返回是否忽略深度拷贝
	SkipDeepCopy() bool
}

SkipDeepCopySender 表示该 sender 不会对传入数据进行污染,凡是有次保证的 sender 需要实现该接口提升发送效率

type StatsSender

type StatsSender interface {
	Name() string
	// send data, error if failed
	Send([]Data) error
	Close() error
	Stats() StatsInfo
	// 恢复 sender 停止之前的状态
	Restore(*StatsInfo)
}

type TokenRefreshable

type TokenRefreshable interface {
	TokenRefresh(conf.MapConf) error
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL