Documentation ¶
Index ¶
- Constants
- Variables
- func ConvertDatas(ins []map[string]interface{}) []Data
- func ConvertDatasBack(ins []Data) []map[string]interface{}
- func RegisterConstructor(typ string, c Constructor)
- func SplitData(data string, splitSize int64) (valArray []string)
- type Constructor
- type FtOption
- type FtSender
- func (ft *FtSender) Close() error
- func (ft *FtSender) Name() string
- func (ft *FtSender) Reset() error
- func (ft *FtSender) Restore(info *StatsInfo)
- func (ft *FtSender) Send(datas []Data) error
- func (ft *FtSender) SkipDeepCopy() bool
- func (ft *FtSender) Stats() StatsInfo
- func (ft *FtSender) TokenRefresh(mapConf conf.MapConf) (err error)
- type Registry
- type Sender
- type SkipDeepCopySender
- type StatsSender
- type TokenRefreshable
Constants ¶
View Source
const ( DefaultSplitSize = 64 * 1024 // 默认分割为 64 kb // TypeMarshalError 表示marshal出错 TypeMarshalError = reqerr.SendErrorType("Data Marshal failed") )
View Source
const ( // pandora key, 可选参数 当sender_type 为pandora 的时候,需要必填的字段 KeyPandoraAk = "pandora_ak" KeyPandoraSk = "pandora_sk" KeyPandoraHost = "pandora_host" KeyPandoraWorkflowName = "pandora_workflow_name" KeyPandoraRepoName = "pandora_repo_name" KeyPandoraRegion = "pandora_region" KeyPandoraSchema = "pandora_schema" KeyPandoraSchemaUpdateInterval = "pandora_schema_update_interval" KeyPandoraAutoCreate = "pandora_auto_create" KeyPandoraSchemaFree = "pandora_schema_free" KeyPandoraExtraInfo = "pandora_extra_info" KeyPandoraEnableLogDB = "pandora_enable_logdb" KeyPandoraLogDBName = "pandora_logdb_name" KeyPandoraLogDBHost = "pandora_logdb_host" KeyPandoraLogDBAnalyzer = "pandora_logdb_analyzer" KeyPandoraEnableTSDB = "pandora_enable_tsdb" KeyPandoraTSDBName = "pandora_tsdb_name" KeyPandoraTSDBSeriesName = "pandora_tsdb_series_name" KeyPandoraTSDBSeriesTags = "pandora_tsdb_series_tags" KeyPandoraTSDBHost = "pandora_tsdb_host" KeyPandoraTSDBTimeStamp = "pandora_tsdb_timestamp" KeyPandoraEnableKodo = "pandora_enable_kodo" KeyPandoraKodoBucketName = "pandora_bucket_name" KeyPandoraKodoFilePrefix = "pandora_kodo_prefix" KeyPandoraKodoLowFreqFile = "pandora_kodo_low_frequency_file" KeyPandoraKodoCompressPrefix = "pandora_kodo_compress" KeyPandoraKodoGzip = "pandora_kodo_gzip" KeyPandoraKodoRotateStrategy = "pandora_kodo_rotate_strategy" KeyPandoraKodoRotateInterval = "pandora_kodo_rotate_interval" KeyPandoraKodoRotateSize = "pandora_kodo_rotate_size" KeyPandoraKodoFileRetention = "pandora_kodo_file_retention" KeyPandoraEmail = "qiniu_email" KeyRequestRateLimit = "request_rate_limit" KeyFlowRateLimit = "flow_rate_limit" KeyPandoraGzip = "pandora_gzip" KeyPandoraUUID = "pandora_uuid" KeyPandoraWithIP = "pandora_withip" KeyForceMicrosecond = "force_microsecond" KeyForceDataConvert = "pandora_force_convert" KeyNumberUseFloat = "number_use_float" KeyPandoraAutoConvertDate = "pandora_auto_convert_date" KeyIgnoreInvalidField = "ignore_invalid_field" KeyPandoraUnescape = "pandora_unescape" KeyPandoraSendType = "pandora_send_type" KeyInsecureServer = "insecure_server" KeyPandoraDescription = "pandora_description" PandoraUUID = "Pandora_UUID" TimestampPrecision = 19 // Sender's conf keys KeySenderType = "sender_type" KeyFaultTolerant = "fault_tolerant" KeyName = "name" KeyLogkitSendTime = "logkit_send_time" KeyIsMetrics = "is_metrics" KeyMetricTime = "timestamp" UnderfinedRunnerName = "UnderfinedRunnerName" // SenderType 发送类型 TypeFile = "file" // 本地文件 TypePandora = "pandora" // pandora 打点 TypeMongodbAccumulate = "mongodb_acc" // mongodb 并且按字段聚合 TypeInfluxdb = "influxdb" // influxdb TypeMock = "mock" // mock sender TypeDiscard = "discard" // discard sender TypeElastic = "elasticsearch" // elastic TypeKafka = "kafka" // kafka TypeHttp = "http" // http sender InnerUserAgent = "_useragent" )
pandora
View Source
const ( // Elastic KeyElasticHost = "elastic_host" KeyElasticVersion = "elastic_version" KeyElasticIndex = "elastic_index" KeyElasticType = "elastic_type" KeyElasticAlias = "elastic_keys" KeyElasticIndexStrategy = "elastic_index_strategy" KeyElasticTimezone = "elastic_time_zone" KeyDefaultIndexStrategy = "default" KeyYearIndexStrategy = "year" KeyMonthIndexStrategy = "month" KeyDayIndexStrategy = "day" // ElasticVersion3 v3.x ElasticVersion3 = "3.x" // ElasticVersion5 v5.x ElasticVersion5 = "5.x" // ElasticVersion6 v6.x ElasticVersion6 = "6.x" //timeZone KeylocalTimezone = "Local" KeyUTCTimezone = "UTC" KeyPRCTimezone = "PRC" KeySendTime = "sendTime" // fault_tolerant // 可选参数 fault_tolerant 为true的话,以下必填 KeyFtSyncEvery = "ft_sync_every" // 该参数设置多少次写入会同步一次offset log KeyFtSaveLogPath = "ft_save_log_path" // disk queue 数据日志路径 KeyFtWriteLimit = "ft_write_limit" // 写入速度限制,单位MB KeyFtStrategy = "ft_strategy" // ft 的策略 KeyFtProcs = "ft_procs" // ft并发数,当always_save或concurrent策略时启用 KeyFtMemoryChannel = "ft_memory_channel" KeyFtMemoryChannelSize = "ft_memory_channel_size" KeyFtLongDataDiscard = "ft_long_data_discard" // queue KeyMaxDiskUsedBytes = "max_disk_used_bytes" KeyMaxSizePerFile = "max_size_per_file" // ft 策略 // KeyFtStrategyBackupOnly 只在失败的时候进行容错 KeyFtStrategyBackupOnly = "backup_only" // KeyFtStrategyAlwaysSave 所有数据都进行容错 KeyFtStrategyAlwaysSave = "always_save" // KeyFtStrategyConcurrent 适合并发发送数据,只在失败的时候进行容错 KeyFtStrategyConcurrent = "concurrent" // Ft sender默认同步一次meta信息的数据次数 DefaultFtSyncEvery = 10 // file // 可选参数 当sender_type 为file 的时候 KeyFileSenderPath = "file_send_path" KeyFileSenderTimestampKey = "file_send_timestamp_key" KeyFileSenderMaxOpenFiles = "file_send_max_open_files" // http KeyHttpSenderUrl = "http_sender_url" KeyHttpSenderGzip = "http_sender_gzip" KeyHttpSenderProtocol = "http_sender_protocol" KeyHttpSenderCsvHead = "http_sender_csv_head" KeyHttpSenderCsvSplit = "http_sender_csv_split" // Influxdb sender 的可配置字段 KeyInfluxdbHost = "influxdb_host" KeyInfluxdbDB = "influxdb_db" KeyInfluxdbAutoCreate = "influxdb_autoCreate" KeyInfluxdbRetetion = "influxdb_retention" KeyInfluxdbRetetionDuration = "influxdb_retention_duration" KeyInfluxdbMeasurement = "influxdb_measurement" KeyInfluxdbTags = "influxdb_tags" KeyInfluxdbFields = "influxdb_fields" // influxdb KeyInfluxdbTimestamp = "influxdb_timestamp" // 可选 nano时间戳字段 KeyInfluxdbTimestampPrecision = "influxdb_timestamp_precision" // 时间戳字段的精度,代表时间戳1个单位代表多少纳秒 KeyInfluxdbIgnoreBeyondRetention = "influxdb_ignore_beyond_retention" // 开启后将忽略超出 retention 时间的点 // Kafka KeyKafkaCompressionNone = "none" KeyKafkaCompressionGzip = "gzip" KeyKafkaCompressionSnappy = "snappy" KeyKafkaHost = "kafka_host" //主机地址,可以有多个 KeyKafkaTopic = "kafka_topic" //topic 1.填一个值,则topic为所填值 2.天两个值: %{[字段名]}, defaultTopic :根据每条event,以指定字段值为topic,若无,则用默认值 KeyKafkaClientId = "kafka_client_id" //客户端ID //KeyKafkaFlushNum = "kafka_flush_num" //缓冲条数 //KeyKafkaFlushFrequency = "kafka_flush_frequency" //缓冲频率 KeyKafkaRetryMax = "kafka_retry_max" //最大重试次数 KeyKafkaCompression = "kafka_compression" //压缩模式,有none, gzip, snappy KeyKafkaTimeout = "kafka_timeout" //连接超时时间 KeyKafkaKeepAlive = "kafka_keep_alive" //保持连接时长 KeyMaxMessageBytes = "max_message_bytes" //每条消息最大字节数 // Mongodb // 可选参数 当sender_type 为mongodb_* 的时候,需要必填的字段 KeyMongodbHost = "mongodb_host" KeyMongodbDB = "mongodb_db" KeyMongodbCollection = "mongodb_collection" // 可选参数 当sender_type 为mongodb_acc 的时候,需要必填的字段 KeyMongodbUpdateKey = "mongodb_acc_updkey" KeyMongodbAccKey = "mongodb_acc_acckey" )
Variables ¶
View Source
var ( OptionSaveLogPath = Option{ KeyName: KeyFtSaveLogPath, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "管道磁盘数据保存路径(ft_save_log_path)", Advance: true, ToolTip: `指定备份数据的存放路径`, } OptionFtWriteLimit = Option{ KeyName: KeyFtWriteLimit, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "磁盘写入限速(ft_write_limit)", CheckRegex: "\\d+", Advance: true, ToolTip: `为了避免速率太快导致磁盘压力加大,可以根据系统情况自行限定写入本地磁盘的速率,单位MB/s`, } OptionFtStrategy = Option{ KeyName: KeyFtStrategy, ChooseOnly: true, ChooseOptions: []interface{}{KeyFtStrategyBackupOnly, KeyFtStrategyAlwaysSave, KeyFtStrategyConcurrent}, Default: KeyFtStrategyBackupOnly, DefaultNoUse: false, Description: "磁盘管道容错策略[仅备份错误|全部数据走管道|仅增加并发](ft_strategy)", Advance: true, ToolTip: `设置为backup_only的时候,数据不经过本地队列直接发送到下游,设为always_save时则所有数据会先发送到本地队列,选concurrent的时候会直接并发发送,不经过队列。无论该选项设置什么,失败的数据都会加入到重试队列中异步循环重试`, } OptionFtProcs = Option{ KeyName: KeyFtProcs, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "发送并发数量(ft_procs)", CheckRegex: "\\d+", Advance: true, ToolTip: "并发仅在ft_strateg模式选择 always_save或concurrent 时生效", } OptionFtMemoryChannel = Option{ KeyName: KeyFtMemoryChannel, Element: Radio, ChooseOnly: true, ChooseOptions: []interface{}{"false", "true"}, Default: "false", DefaultNoUse: false, Description: "用内存管道(ft_memory_channel)", Advance: true, ToolTip: `内存管道替代磁盘管道`, } OptionFtMemoryChannelSize = Option{ KeyName: KeyFtMemoryChannelSize, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "内存管道长度(ft_memory_channel_size)", CheckRegex: "\\d+", Advance: true, AdvanceDepend: KeyFtMemoryChannel, ToolTip: `默认为"100",单位为批次,也就是100代表100个待发送的批次,注意:该选项设置的大小表达的是队列中可存储的元素个数,并不是占用的内存大小`, } OptionKeyFtLongDataDiscard = Option{ KeyName: KeyFtLongDataDiscard, Element: Radio, ChooseOnly: true, ChooseOptions: []interface{}{"false", "true"}, Default: "false", DefaultNoUse: false, Description: "丢弃大于2M的数据(ft_long_data_discard)", Advance: true, ToolTip: `丢弃大于2M的数据`, } OptionMaxDiskUsedBytes = Option{ KeyName: KeyMaxDiskUsedBytes, ChooseOnly: false, Default: strconv.Itoa(maxDiskUsedBytes), DefaultNoUse: false, Description: "磁盘使用总大小限制(max_disk_used_bytes)", Advance: true, ToolTip: `磁盘使用总大小限制`, } OptionMaxSizePerSize = Option{ KeyName: KeyMaxSizePerFile, ChooseOnly: false, Default: strconv.Itoa(maxBytesPerFile), DefaultNoUse: false, Description: "磁盘队列单个文件最大字节(max_size_per_file)", Advance: true, ToolTip: `磁盘队列单个文件最大字节, 也是单个消息的最大字节。最大不超过2GB`, } OptionLogkitSendTime = Option{ KeyName: KeyLogkitSendTime, Element: Radio, ChooseOnly: true, ChooseOptions: []interface{}{"true", "false"}, Default: "true", DefaultNoUse: false, Description: "添加系统时间(logkit_send_time)", Advance: true, ToolTip: "在系统中添加数据发送时的当前时间作为时间戳", } )
View Source
var ErrNotAsyncSender = errors.New("sender does not support for Async Push")
NotAsyncSender return when sender is not async
View Source
var ModeKeyOptions = map[string][]Option{ TypeFile: { { KeyName: KeyFileSenderPath, ChooseOnly: false, Default: "", Required: true, Placeholder: "/home/john/mylogs/my-%Y-%m-%d.log", DefaultNoUse: true, Description: "发送到指定文件(file_send_path)", ToolTip: `路径支持魔法变量,例如 "file_send_path":"data-%Y-%m-%d.txt" ,此时数据就会渲染出日期,存放为 data-2018-03-28.txt`, }, { KeyName: KeyFileSenderTimestampKey, ChooseOnly: false, Default: "", Required: false, Placeholder: "timestamp", DefaultNoUse: true, Description: "时间戳键名(file_send_timestamp_key)", ToolTip: `用于获取时间的时间戳键名,如果该键存在则将替代当前时间渲染路径中的魔法变量,格式必须是 RFC3339Nano`, }, }, TypePandora: { { KeyName: KeyPandoraAk, ChooseOnly: false, Default: "", Required: true, Placeholder: "在此填写您七牛账号ak(access_key)", DefaultNoUse: true, Description: "七牛公钥(access_key)", }, { KeyName: KeyPandoraSk, ChooseOnly: false, Default: "", Required: true, Placeholder: "在此填写您七牛账号的sk(secret_key)", DefaultNoUse: true, Description: "七牛私钥(secret_key)", Secret: true, }, { KeyName: KeyPandoraWorkflowName, ChooseOnly: false, Default: "", Placeholder: "logkit_default_pipeline", DefaultNoUse: true, Required: true, Description: "新增或现有的Pipeline名称(pandora_workflow_name)", CheckRegex: "^[a-zA-Z_][a-zA-Z0-9_]{0,127}$", ToolTip: "新增或现有的七牛大数据平台Pipeline名称", }, { KeyName: KeyPandoraRepoName, ChooseOnly: false, Default: "", Required: true, Placeholder: "my_work", DefaultNoUse: true, Description: "新增或现有的实时仓库名称(pandora_repo_name)", CheckRegex: "^[a-zA-Z][a-zA-Z0-9_]{0,127}$", ToolTip: "新增或现有的七牛大数据平台Pipeline中的实时仓库名称", }, OptionSaveLogPath, OptionLogkitSendTime, { KeyName: KeyPandoraHost, ChooseOnly: false, Default: config.DefaultPipelineEndpoint, DefaultNoUse: false, Description: "大数据平台域名(pandora_host)", Advance: true, ToolTip: "数据发送的目的域名,私有部署请对应修改", }, { KeyName: KeyPandoraRegion, ChooseOnly: true, ChooseOptions: []interface{}{"nb"}, Default: "nb", DefaultNoUse: false, Description: "创建的资源所在区域(pandora_region)", ToolTip: "工作流资源创建所在区域", }, { KeyName: KeyPandoraSchemaFree, Element: Radio, ChooseOnly: true, ChooseOptions: []interface{}{"true", "false"}, Default: "true", DefaultNoUse: false, Description: "自动创建实时仓库并更新(pandora_schema_free)", Advance: true, ToolTip: "自动根据数据创建Pipeline、实时仓库并自动更新", }, { KeyName: KeyPandoraAutoCreate, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "以DSL语法自动创建实时仓库(pandora_auto_create)", Advance: true, ToolTip: `自动创建实时仓库,语法为 "f1 date, f2 string, f3 float, f4 map{f5 long}"`, }, { KeyName: KeyPandoraSchema, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "筛选字段(重命名)发送(pandora_schema)", Advance: true, ToolTip: `将f1重命名为f2: "f1 f2,...", 其他自动不要去掉...`, }, { KeyName: KeyPandoraEnableLogDB, Element: Radio, ChooseOnly: true, ChooseOptions: []interface{}{"true", "false"}, Default: "true", DefaultNoUse: false, Description: "自动创建并导出到日志分析(pandora_enable_logdb)", }, { KeyName: KeyPandoraLogDBName, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "新增或现有的日志分析仓库名称(pandora_logdb_name)", Advance: true, AdvanceDepend: KeyPandoraEnableLogDB, ToolTip: "若不指定使用实时仓库(pandora_repo_name)名称", }, { KeyName: KeyPandoraLogDBHost, ChooseOnly: false, Default: config.DefaultLogDBEndpoint, DefaultNoUse: false, Description: "日志分析域名[私有部署才修改](pandora_logdb_host)", Advance: true, AdvanceDepend: KeyPandoraEnableLogDB, ToolTip: "日志分析仓库域名,私有部署请对应修改", }, { KeyName: KeyPandoraLogDBAnalyzer, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "指定字段分词方式(pandora_logdb_analyzer)", Advance: true, AdvanceDepend: KeyPandoraEnableLogDB, ToolTip: `指定字段的分词方式,逗号分隔多个,如 "f1 keyword, f2 full_text"。仅在新建时生效,更改时不生效,请在日志仓库更改。`, }, { KeyName: KeyPandoraEnableTSDB, Element: Radio, ChooseOnly: true, ChooseOptions: []interface{}{"false", "true"}, Default: "false", DefaultNoUse: false, Description: "自动创建并导出到时序数据库(pandora_enable_tsdb)", }, { KeyName: KeyPandoraTSDBName, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "指定时序数据库仓库名称(pandora_tsdb_name)", AdvanceDepend: KeyPandoraEnableTSDB, ToolTip: "若不指定使用实时仓库(pandora_repo_name)名称", }, { KeyName: KeyPandoraTSDBSeriesName, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "指定时序数据库序列名称(pandora_tsdb_series_name)", AdvanceDepend: KeyPandoraEnableTSDB, ToolTip: "若不指定使用仓库(pandora_tsdb_name)名称", }, { KeyName: KeyPandoraTSDBSeriesTags, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "指定时序数据库标签(pandora_tsdb_series_tags)", AdvanceDepend: KeyPandoraEnableTSDB, }, { KeyName: KeyPandoraTSDBHost, ChooseOnly: false, Default: config.DefaultTSDBEndpoint, DefaultNoUse: false, Description: "时序数据库域名[私有部署才修改](pandora_tsdb_host)", Advance: true, AdvanceDepend: KeyPandoraEnableTSDB, ToolTip: "时序数据库域名,私有部署请对应修改", }, { KeyName: KeyPandoraTSDBTimeStamp, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "指定时序数据库时间戳(pandora_tsdb_timestamp)", Advance: true, AdvanceDepend: KeyPandoraEnableTSDB, }, { KeyName: KeyPandoraEnableKodo, Element: Radio, ChooseOnly: true, ChooseOptions: []interface{}{"false", "true"}, Default: "false", DefaultNoUse: false, Description: "自动导出到七牛云存储(pandora_enable_kodo)", }, { KeyName: KeyPandoraKodoBucketName, ChooseOnly: false, Default: "", Required: true, Placeholder: "my_bucket_name", DefaultNoUse: true, Description: "云存储仓库名称(启用自动导出到云存储时必填)(pandora_bucket_name)", AdvanceDepend: KeyPandoraEnableKodo, }, { KeyName: KeyPandoraEmail, ChooseOnly: false, Default: "", Required: true, Placeholder: "my@email.com", DefaultNoUse: true, Description: "七牛账户邮箱(qiniu_email)", AdvanceDepend: KeyPandoraEnableKodo, }, { KeyName: KeyPandoraKodoFilePrefix, ChooseOnly: false, Default: "logkitauto/date=$(year)-$(mon)-$(day)/hour=$(hour)/min=$(min)/$(sec)", DefaultNoUse: false, Description: "云存储文件前缀(pandora_kodo_prefix)", AdvanceDepend: KeyPandoraEnableKodo, Advance: true, }, { KeyName: KeyPandoraKodoLowFreqFile, Element: Radio, ChooseOnly: true, ChooseOptions: []interface{}{"false", "true"}, Default: "false", DefaultNoUse: false, Description: "存储为低频文件(pandora_kodo_low_frequency_file)", AdvanceDepend: KeyPandoraEnableKodo, Advance: true, }, { KeyName: KeyPandoraKodoCompressPrefix, ChooseOnly: true, ChooseOptions: []interface{}{"parquet", "json", "text", "csv"}, Default: "parquet", DefaultNoUse: false, Description: "云存储文件保存格式(pandora_kodo_compress)", AdvanceDepend: KeyPandoraEnableKodo, Advance: true, }, { KeyName: KeyPandoraKodoGzip, ChooseOnly: true, ChooseOptions: []interface{}{"false", "true"}, Default: "false", DefaultNoUse: false, Description: "云存储文件压缩存储(pandora_kodo_gzip)", AdvanceDepend: KeyPandoraEnableKodo, Advance: true, }, { KeyName: KeyPandoraKodoRotateStrategy, ChooseOnly: true, ChooseOptions: []interface{}{"interval", "size", "both"}, Default: "interval", DefaultNoUse: false, Description: "云存储文件分割策略(pandora_kodo_rotate_strategy)", AdvanceDepend: KeyPandoraEnableKodo, Advance: true, }, { KeyName: KeyPandoraKodoRotateInterval, ChooseOnly: false, Default: "600", DefaultNoUse: false, Description: "云存储文件分割间隔时间(pandora_kodo_rotate_interval)(单位秒)", AdvanceDepend: KeyPandoraEnableKodo, Advance: true, }, { KeyName: KeyPandoraKodoRotateSize, ChooseOnly: false, Default: "512000", DefaultNoUse: false, Description: "云存储文件分割文件大小(pandora_kodo_rotate_size)(单位KB)", AdvanceDepend: KeyPandoraEnableKodo, Advance: true, }, { KeyName: KeyPandoraKodoFileRetention, ChooseOnly: false, Default: "0", DefaultNoUse: false, Description: "云存储文件保存时间(pandora_kodo_file_retention)(单位天,0为永久保存)", ToolTip: "导出到云存储的文件保存时间,数字表示存的天数,0为永久保存", AdvanceDepend: KeyPandoraEnableKodo, Advance: true, }, { KeyName: KeyPandoraGzip, Element: Radio, ChooseOnly: true, ChooseOptions: []interface{}{"true", "false"}, Default: "true", DefaultNoUse: false, Description: "压缩发送(pandora_gzip)", Advance: true, ToolTip: "使用gzip压缩发送", }, { KeyName: KeyFlowRateLimit, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "流量限制(flow_rate_limit)", CheckRegex: "\\d+", Advance: true, ToolTip: "对请求流量限制,单位为[KB/s]", }, { KeyName: KeyRequestRateLimit, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "请求限制(request_rate_limit)", CheckRegex: "\\d+", Advance: true, ToolTip: "对请求次数限制,单位为[次/s]", }, { KeyName: KeyPandoraUUID, Element: Radio, ChooseOnly: true, ChooseOptions: []interface{}{"false", "true"}, Default: "false", DefaultNoUse: false, Description: "数据植入UUID(pandora_uuid)", Advance: true, ToolTip: `该字段保证了发送出去的每一条数据都拥有一个唯一的UUID,可以用于数据去重等需要`, }, { KeyName: KeyPandoraWithIP, ChooseOnly: false, ChooseOptions: []interface{}{"false", "true"}, Default: "false", DefaultNoUse: false, Description: "数据植入来源IP(pandora_withip)", Advance: true, }, OptionFtWriteLimit, OptionFtStrategy, OptionFtProcs, OptionFtMemoryChannel, OptionFtMemoryChannelSize, OptionKeyFtLongDataDiscard, OptionMaxDiskUsedBytes, OptionMaxSizePerSize, { KeyName: KeyForceMicrosecond, Element: Radio, ChooseOnly: true, ChooseOptions: []interface{}{"false", "true"}, Default: "false", DefaultNoUse: false, Description: "扰动时间字段增加精度(force_microsecond)", Advance: true, }, { KeyName: KeyForceDataConvert, Element: Radio, ChooseOnly: true, ChooseOptions: []interface{}{"false", "true"}, Default: "false", DefaultNoUse: false, Description: "自动转换类型(pandora_force_convert)", Advance: true, ToolTip: `强制类型转换,如定义的pandora schema为long,而实际的为string,则会尝试将string解析为long类型,若无法解析,则忽略该字段内容`, }, { KeyName: KeyNumberUseFloat, Element: Radio, ChooseOnly: true, ChooseOptions: []interface{}{"true", "false"}, Default: "true", DefaultNoUse: false, Description: "数字统一为float(number_use_float)", Advance: true, ToolTip: `对于整型和浮点型都自动识别为浮点型`, }, { KeyName: KeyIgnoreInvalidField, Element: Radio, ChooseOnly: true, ChooseOptions: []interface{}{"true", "false"}, Default: "true", DefaultNoUse: false, Description: "忽略错误字段(ignore_invalid_field)", Advance: true, ToolTip: `进行数据格式校验,并忽略不符合格式的字段数据`, }, { KeyName: KeyPandoraAutoConvertDate, Element: Radio, ChooseOnly: true, ChooseOptions: []interface{}{"false", "true"}, Default: "false", DefaultNoUse: false, Description: "自动转换时间类型(pandora_auto_convert_date)", Advance: true, ToolTip: `会自动将用户的自动尝试转换为Pandora的时间类型(date)`, }, { KeyName: KeyPandoraUnescape, Element: Radio, ChooseOnly: true, ChooseOptions: []interface{}{"true", "false"}, Default: "true", DefaultNoUse: false, Description: "服务端反转译换行/制表符(pandora_unescape)", Advance: true, ToolTip: `在pandora服务端反转译\\n=>\n, \\t=>\t; 由于pandora数据上传的编码方式会占用\t和\n这两个符号,所以在sdk中打点时会默认把\t转译为\\t,把\n转译为\\n,开启这个选项就是在服务端把这个反转译回来。开启该选项也会转译数据中原有的这些\\t和\\n符号`, }, { KeyName: KeyInsecureServer, Element: Radio, ChooseOnly: true, ChooseOptions: []interface{}{"false", "true"}, Default: "false", DefaultNoUse: false, Description: "认证不验证(insecure_server)", Advance: true, ToolTip: `对于https等情况不对证书和安全性检验`, }, }, TypeMongodbAccumulate: { { KeyName: KeyMongodbHost, ChooseOnly: false, Default: "", Required: true, Placeholder: "mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]", DefaultNoUse: true, Description: "数据库地址(mongodb_host)", ToolTip: `Mongodb的地址: mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]`, }, { KeyName: KeyMongodbDB, ChooseOnly: false, Default: "", Required: true, Placeholder: "app123", DefaultNoUse: true, Description: "数据库名称(mongodb_db)", }, { KeyName: KeyMongodbCollection, ChooseOnly: false, Default: "", Required: true, Placeholder: "collection1", DefaultNoUse: true, Description: "数据表名称(mongodb_collection)", }, { KeyName: KeyMongodbUpdateKey, ChooseOnly: false, Default: "", Required: true, Placeholder: "domain,uid", DefaultNoUse: true, Description: "聚合条件列(mongodb_acc_updkey)", }, { KeyName: KeyMongodbAccKey, ChooseOnly: false, Default: "", Required: true, Placeholder: "low,hit", DefaultNoUse: true, Description: "聚合列(mongodb_acc_acckey)", }, OptionSaveLogPath, OptionFtWriteLimit, OptionFtStrategy, OptionFtProcs, OptionFtMemoryChannel, OptionFtMemoryChannelSize, OptionKeyFtLongDataDiscard, OptionMaxDiskUsedBytes, OptionMaxSizePerSize, }, TypeInfluxdb: { { KeyName: KeyInfluxdbHost, ChooseOnly: false, Default: "", Required: true, Placeholder: "127.0.0.1:8086", DefaultNoUse: true, Description: "数据库地址(influxdb_host)", ToolTip: `数据库地址127.0.0.1:8086`, }, { KeyName: KeyInfluxdbDB, ChooseOnly: false, Default: "", Required: true, Placeholder: "testdb", DefaultNoUse: true, Description: "数据库名称(influxdb_db)", }, { KeyName: KeyInfluxdbAutoCreate, Element: Radio, ChooseOnly: true, ChooseOptions: []interface{}{"true", "false"}, Default: "true", Description: "自动创建数据库(influxdb_auto_create)", Advance: true, }, { KeyName: KeyInfluxdbMeasurement, ChooseOnly: false, Default: "", Required: true, Placeholder: "test_table", DefaultNoUse: true, Description: "measurement名称(influxdb_measurement)", }, { KeyName: KeyInfluxdbRetetion, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "retention名称(influxdb_retention)", Advance: true, }, { KeyName: KeyInfluxdbRetetionDuration, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "retention时长(influxdb_retention_duration)", AdvanceDepend: KeyInfluxdbAutoCreate, Advance: true, }, { KeyName: KeyInfluxdbTags, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "标签列数据(influxdb_tags)", Advance: true, }, { KeyName: KeyInfluxdbFields, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "普通列数据(influxdb_fields)", Advance: true, }, { KeyName: KeyInfluxdbTimestamp, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "时间戳列(influxdb_timestamp)", Advance: true, }, { KeyName: KeyInfluxdbTimestampPrecision, ChooseOnly: false, Default: "100", DefaultNoUse: false, Description: "时间戳列精度调整(influxdb_timestamp_precision)", Advance: true, }, { KeyName: KeyInfluxdbIgnoreBeyondRetention, Element: Radio, ChooseOnly: true, ChooseOptions: []interface{}{"true", "false"}, Default: "false", Description: "忽略超出 retention 时间的数据(influxdb_ignore_beyond_retention)", Advance: true, }, OptionSaveLogPath, OptionFtWriteLimit, OptionFtStrategy, OptionFtProcs, OptionFtMemoryChannel, OptionFtMemoryChannelSize, OptionKeyFtLongDataDiscard, OptionMaxDiskUsedBytes, OptionMaxSizePerSize, }, TypeDiscard: {}, TypeElastic: { { KeyName: KeyElasticHost, ChooseOnly: false, Default: "", Required: true, Placeholder: "192.168.31.203:9200", DefaultNoUse: false, Description: "host地址(elastic_host)", ToolTip: `常用端口9200`, }, { KeyName: KeyElasticVersion, ChooseOnly: true, ChooseOptions: []interface{}{ElasticVersion3, ElasticVersion5, ElasticVersion6}, Description: "ES版本号(es_version)", }, { KeyName: KeyElasticIndex, ChooseOnly: false, Default: "", Required: true, Placeholder: "app-repo-123", DefaultNoUse: true, Description: "索引名称(elastic_index)", }, { KeyName: KeyElasticIndexStrategy, ChooseOnly: true, ChooseOptions: []interface{}{KeyDefaultIndexStrategy, KeyYearIndexStrategy, KeyMonthIndexStrategy, KeyDayIndexStrategy}, Default: KeyFtStrategyBackupOnly, DefaultNoUse: false, Description: "自动索引模式(默认索引|按年索引|按月索引|按日索引)(index_strategy)", Advance: true, }, { KeyName: KeyElasticTimezone, ChooseOnly: true, ChooseOptions: []interface{}{KeyUTCTimezone, KeylocalTimezone, KeyPRCTimezone}, Default: KeyUTCTimezone, DefaultNoUse: false, Description: "索引时区(Local(本地)|UTC(标准时间)|PRC(北京时间))(elastic_time_zone)", Advance: true, }, OptionLogkitSendTime, { KeyName: KeyElasticType, ChooseOnly: false, Default: "", Required: true, Placeholder: "app", DefaultNoUse: true, Description: "索引类型名称(elastic_type)", }, OptionSaveLogPath, OptionFtWriteLimit, OptionFtStrategy, OptionFtProcs, OptionFtMemoryChannel, OptionFtMemoryChannelSize, OptionKeyFtLongDataDiscard, OptionMaxDiskUsedBytes, OptionMaxSizePerSize, }, TypeKafka: { { KeyName: KeyKafkaHost, ChooseOnly: false, Required: true, Default: "", Placeholder: "192.168.31.201:9092", DefaultNoUse: true, Description: "broker的host地址(kafka_host)", ToolTip: "常用端口 9092", }, { KeyName: KeyKafkaTopic, ChooseOnly: false, Default: "", Required: true, Placeholder: "my_topic", DefaultNoUse: true, Description: "打点的topic名称(kafka_topic)", }, { KeyName: KeyKafkaCompression, ChooseOnly: true, ChooseOptions: []interface{}{KeyKafkaCompressionNone, KeyKafkaCompressionGzip, KeyKafkaCompressionSnappy}, Default: KeyKafkaCompressionNone, DefaultNoUse: false, Description: "压缩模式[none不压缩|gzip压缩|snappy压缩](kafka_compression)", }, { KeyName: KeyKafkaClientId, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "kafka客户端标识ID(kafka_client_id)", Advance: true, }, { KeyName: KeyKafkaRetryMax, ChooseOnly: false, Default: "3", DefaultNoUse: false, Description: "kafka最大错误重试次数(kafka_retry_max)", Advance: true, }, { KeyName: KeyKafkaTimeout, ChooseOnly: false, Default: "30s", DefaultNoUse: false, Description: "kafka连接超时时间(kafka_timeout)", Advance: true, }, { KeyName: KeyKafkaKeepAlive, ChooseOnly: false, Default: "0", DefaultNoUse: false, Description: "kafka的keepalive时间(kafka_keep_alive)", Advance: true, }, OptionSaveLogPath, OptionFtWriteLimit, OptionFtStrategy, OptionFtProcs, OptionFtMemoryChannel, OptionFtMemoryChannelSize, OptionKeyFtLongDataDiscard, OptionMaxDiskUsedBytes, OptionMaxSizePerSize, }, TypeHttp: { { KeyName: KeyHttpSenderUrl, ChooseOnly: false, Default: "", Placeholder: "http://127.0.0.1/data", DefaultNoUse: true, Required: true, Description: "发送目的url(http_sender_url)", }, { KeyName: KeyHttpSenderProtocol, ChooseOnly: true, ChooseOptions: []interface{}{"json", "csv", "body_json"}, Default: "json", Description: "发送数据时使用的格式(http_sender_protocol)", }, { KeyName: KeyHttpSenderCsvSplit, ChooseOnly: false, Default: "", Placeholder: ",", Required: true, DefaultNoUse: true, Description: "csv分隔符(http_sender_csv_split)", }, { KeyName: KeyHttpSenderGzip, Element: Radio, ChooseOnly: true, ChooseOptions: []interface{}{"true", "false"}, Default: "true", DefaultNoUse: true, Description: "是否启用gzip(http_sender_gzip)", }, OptionSaveLogPath, OptionFtWriteLimit, OptionFtStrategy, OptionFtProcs, OptionFtMemoryChannel, OptionFtMemoryChannelSize, OptionKeyFtLongDataDiscard, OptionMaxDiskUsedBytes, OptionMaxSizePerSize, }, }
View Source
var ModeUsages = KeyValueSlice{ {TypePandora, "发送至 七牛云智能日志平台(Pandora)", ""}, {TypeFile, "发送至 本地文件", ""}, {TypeMongodbAccumulate, "发送至 MongoDB 服务", ""}, {TypeInfluxdb, "发送至 InfluxDB 服务", ""}, {TypeDiscard, "消费数据但不发送", ""}, {TypeElastic, "发送至 Elasticsearch 服务", ""}, {TypeKafka, "发送至 Kafka 服务", ""}, {TypeHttp, "发送至 HTTP 服务器", ""}, }
ModeUsages 用途说明
Functions ¶
func ConvertDatas ¶
func ConvertDatas(ins []map[string]interface{}) []Data
func ConvertDatasBack ¶
func ConvertDatasBack(ins []Data) []map[string]interface{}
func RegisterConstructor ¶
func RegisterConstructor(typ string, c Constructor)
RegisterConstructor adds a new constructor for a given type of reader.
Types ¶
type FtSender ¶
type FtSender struct { BackupQueue queue.BackendQueue // contains filtered or unexported fields }
FtSender fault tolerance sender wrapper
func NewFtSender ¶
NewFtSender Fault tolerant sender constructor
func (*FtSender) SkipDeepCopy ¶
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
SenderRegistry sender 的工厂类。可以注册自定义sender
func NewRegistry ¶
func NewRegistry() *Registry
type Sender ¶
type Sender interface { Name() string // send data, error if failed Send([]Data) error Close() error }
Sender send data to pandora, prometheus such different destinations
type SkipDeepCopySender ¶
type SkipDeepCopySender interface { // SkipDeepCopy 需要返回值是因为如果一个 sender 封装了其它 sender,需要根据实际封装的类型返回是否忽略深度拷贝 SkipDeepCopy() bool }
SkipDeepCopySender 表示该 sender 不会对传入数据进行污染,凡是有次保证的 sender 需要实现该接口提升发送效率
type StatsSender ¶
type TokenRefreshable ¶
Click to show internal directories.
Click to hide internal directories.