Documentation ¶
Index ¶
- Constants
- Variables
- func ConvertDatas(ins []map[string]interface{}) []Data
- func ConvertDatasBack(ins []Data) []map[string]interface{}
- func CreateInfluxdbDatabase(host, database, sender string) (err error)
- func CreateInfluxdbRetention(host, database, retention, duration, sender string) (err error)
- func GetRouterMatchTypeUsage() []KeyValue
- func GetRouterOption() []Option
- func JSONLineMarshalFunc(datas []Data) ([]byte, error)
- func MakeKey(name []byte, tags Tags) []byte
- func String(in string) string
- func UnescapeString(in string) string
- type DiscardSender
- type ElasticsearchSender
- type FileSender
- type FtOption
- type FtSender
- func (ft *FtSender) Close() error
- func (ft *FtSender) Name() string
- func (ft *FtSender) Reset() error
- func (ft *FtSender) Restore(info *utils.StatsInfo)
- func (ft *FtSender) Send(datas []Data) error
- func (ft *FtSender) Stats() utils.StatsInfo
- func (ft *FtSender) TokenRefresh(mapConf conf.MapConf) (err error)
- type HttpSender
- type InfluxdbSender
- type KafkaSender
- type MTypeContains
- type MTypeEqual
- type MatchType
- type MockSender
- type MongoAccSender
- type PandoraOption
- type PandoraSender
- type Point
- type Points
- type Router
- type RouterConfig
- type Sender
- func NewDiscardSender(c conf.MapConf) (Sender, error)
- func NewElasticSender(conf conf.MapConf) (sender Sender, err error)
- func NewFileSender(conf conf.MapConf) (sender Sender, err error)
- func NewHttpSender(c conf.MapConf) (Sender, error)
- func NewInfluxdbSender(c conf.MapConf) (s Sender, err error)
- func NewKafkaSender(conf conf.MapConf) (sender Sender, err error)
- func NewMockSender(c conf.MapConf) (Sender, error)
- func NewMongodbAccSender(conf conf.MapConf) (sender Sender, err error)
- func NewPandoraSender(conf conf.MapConf) (sender Sender, err error)
- type SenderRegistry
- type StatsSender
- type Tags
- type TokenRefreshable
- type Tokens
- type UserSchema
Constants ¶
const ( KeyElasticHost = "elastic_host" KeyElasticVersion = "elastic_version" KeyElasticIndex = "elastic_index" KeyElasticType = "elastic_type" KeyElasticAlias = "elastic_keys" KeyElasticIndexStrategy = "elastic_index_strategy" KeyElasticTimezone = "elastic_time_zone" )
const ( KeyDefaultIndexStrategy = "default" KeyYearIndexStrategy = "year" KeyMonthIndexStrategy = "month" KeyDayIndexStrategy = "day" )
const ( KeylocalTimezone = "Local" KeyUTCTimezone = "UTC" KeyPRCTimezone = "PRC" )
timeZone
const ( KeyFtSyncEvery = "ft_sync_every" // 该参数设置多少次写入会同步一次offset log KeyFtSaveLogPath = "ft_save_log_path" // disk queue 数据日志路径 KeyFtWriteLimit = "ft_write_limit" // 写入速度限制,单位MB KeyFtStrategy = "ft_strategy" // ft 的策略 KeyFtProcs = "ft_procs" // ft并发数,当always_save或concurrent策略时启用 KeyFtMemoryChannel = "ft_memory_channel" KeyFtMemoryChannelSize = "ft_memory_channel_size" )
可选参数 fault_tolerant 为true的话,以下必填
const ( // KeyFtStrategyBackupOnly 只在失败的时候进行容错 KeyFtStrategyBackupOnly = "backup_only" // KeyFtStrategyAlwaysSave 所有数据都进行容错 KeyFtStrategyAlwaysSave = "always_save" // KeyFtStrategyConcurrent 适合并发发送数据,只在失败的时候进行容错 KeyFtStrategyConcurrent = "concurrent" )
ft 策略
const ( KeyHttpSenderUrl = "http_sender_url" KeyHttpSenderGzip = "http_sender_gzip" KeyHttpSenderProtocol = "http_sender_protocol" KeyHttpSenderCsvHead = "http_sender_csv_head" KeyHttpSenderCsvSplit = "http_sender_csv_split" )
const ( KeyInfluxdbHost = "influxdb_host" KeyInfluxdbDB = "influxdb_db" KeyInfluxdbAutoCreate = "influxdb_autoCreate" KeyInfluxdbRetetion = "influxdb_retention" KeyInfluxdbRetetionDuration = "influxdb_retention_duration" KeyInfluxdbMeasurement = "influxdb_measurement" KeyInfluxdbTags = "influxdb_tags" KeyInfluxdbFields = "influxdb_fields" // influxdb KeyInfluxdbTimestamp = "influxdb_timestamp" // 可选 nano时间戳字段 KeyInfluxdbTimestampPrecision = "influxdb_timestamp_precision" // 时间戳字段的精度,代表时间戳1个单位代表多少纳秒 )
Influxdb sender 的可配置字段
const ( KeyKafkaCompressionNone = "none" KeyKafkaCompressionGzip = "gzip" KeyKafkaCompressionSnappy = "snappy" )
const ( KeyKafkaHost = "kafka_host" //主机地址,可以有多个 KeyKafkaTopic = "kafka_topic" //topic 1.填一个值,则topic为所填值 2.天两个值: %{[字段名]}, defaultTopic :根据每条event,以指定字段值为topic,若无,则用默认值 KeyKafkaClientId = "kafka_client_id" //客户端ID //KeyKafkaFlushNum = "kafka_flush_num" //缓冲条数 //KeyKafkaFlushFrequency = "kafka_flush_frequency" //缓冲频率 KeyKafkaRetryMax = "kafka_retry_max" //最大重试次数 KeyKafkaCompression = "kafka_compression" //压缩模式,有none, gzip, snappy KeyKafkaTimeout = "kafka_timeout" //连接超时时间 KeyKafkaKeepAlive = "kafka_keep_alive" //保持连接时长 KeyMaxMessageBytes = "max_message_bytes" //每条消息最大字节数 )
const ( KeyMongodbHost = "mongodb_host" KeyMongodbDB = "mongodb_db" KeyMongodbCollection = "mongodb_collection" )
可选参数 当sender_type 为mongodb_* 的时候,需要必填的字段
const ( KeyMongodbUpdateKey = "mongodb_acc_updkey" KeyMongodbAccKey = "mongodb_acc_acckey" )
可选参数 当sender_type 为mongodb_acc 的时候,需要必填的字段
const ( KeyPandoraAk = "pandora_ak" KeyPandoraSk = "pandora_sk" KeyPandoraHost = "pandora_host" KeyPandoraWorkflowName = "pandora_workflow_name" KeyPandoraRepoName = "pandora_repo_name" KeyPandoraRegion = "pandora_region" KeyPandoraSchema = "pandora_schema" KeyPandoraSchemaUpdateInterval = "pandora_schema_update_interval" KeyPandoraAutoCreate = "pandora_auto_create" KeyPandoraSchemaFree = "pandora_schema_free" KeyPandoraExtraInfo = "pandora_extra_info" KeyPandoraEnableLogDB = "pandora_enable_logdb" KeyPandoraLogDBName = "pandora_logdb_name" KeyPandoraLogDBHost = "pandora_logdb_host" KeyPandoraEnableTSDB = "pandora_enable_tsdb" KeyPandoraTSDBName = "pandora_tsdb_name" KeyPandoraTSDBSeriesName = "pandora_tsdb_series_name" KeyPandoraTSDBSeriesTags = "pandora_tsdb_series_tags" KeyPandoraTSDBHost = "pandora_tsdb_host" KeyPandoraTSDBTimeStamp = "pandora_tsdb_timestamp" KeyPandoraEnableKodo = "pandora_enable_kodo" KeyPandoraKodoBucketName = "pandora_bucket_name" KeyPandoraKodoFilePrefix = "pandora_kodo_prefix" KeyPandoraKodoCompressPrefix = "pandora_kodo_compress" KeyPandoraEmail = "qiniu_email" KeyRequestRateLimit = "request_rate_limit" KeyFlowRateLimit = "flow_rate_limit" KeyPandoraGzip = "pandora_gzip" KeyPandoraUUID = "pandora_uuid" KeyPandoraWithIP = "pandora_withip" KeyForceMicrosecond = "force_microsecond" KeyForceDataConvert = "pandora_force_convert" KeyPandoraAutoConvertDate = "pandora_auto_convert_date" KeyIgnoreInvalidField = "ignore_invalid_field" KeyPandoraUnescape = "pandora_unescape" PandoraUUID = "Pandora_UUID" )
可选参数 当sender_type 为pandora 的时候,需要必填的字段
const ( PandoraTypeLong = "long" PandoraTypeFloat = "float" PandoraTypeString = "string" PandoraTypeDate = "date" PandoraTypeBool = "boolean" PandoraTypeArray = "array" PandoraTypeMap = "map" PandoraTypeJsonString = "jsonstring" )
const ( KeySenderType = "sender_type" KeyFaultTolerant = "fault_tolerant" KeyName = "name" KeyRunnerName = "runner_name" KeyLogkitSendTime = "logkit_send_time" KeyIsMetrics = "is_metrics" KeyMetricTime = "timestamp" )
Sender's conf keys
const ( TypeFile = "file" // 本地文件 TypePandora = "pandora" // pandora 打点 TypeMongodbAccumulate = "mongodb_acc" // mongodb 并且按字段聚合 TypeInfluxdb = "influxdb" // influxdb TypeMock = "mock" // mock sender TypeDiscard = "discard" // discard sender TypeElastic = "elasticsearch" // elastic TypeKafka = "kafka" // kafka TypeHttp = "http" // http sender )
SenderType 发送类型
const ( //DefaultSenderIndex = 0 RouterKeyName = "router_key_name" RouterMatchType = "router_match_type" //RouterRoutesMap = "router_routes" //RouterMatchValue = "router_match_value" //RouterSenderIndex = "router_sender_index" RouterDefaultIndex = "router_default_sender" MTypeEqualName = "equal" MTypeContainsName = "contains" )
const DefaultFtSyncEvery = 10
Ft sender默认同步一次meta信息的数据次数
const (
InnerUserAgent = "_useragent"
)
const (
KeyFileSenderPath = "file_send_path"
)
可选参数 当sender_type 为file 的时候
const KeySendTime = "sendTime"
const UnderfinedRunnerName = "UnderfinedRunnerName"
Variables ¶
var ( // ElasticVersion3 v3.x ElasticVersion3 = "3.x" // ElasticVersion5 v5.x ElasticVersion5 = "5.x" // ElasticVersion6 v6.x ElasticVersion6 = "6.x" )
var ( OptionSaveLogPath = Option{ KeyName: KeyFtSaveLogPath, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "管道本地盘数据保存路径(ft_save_log_path)", Advance: true, } OptionFtWriteLimit = Option{ KeyName: KeyFtWriteLimit, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "磁盘写入限速(MB/s)(ft_write_limit)", CheckRegex: "\\d+", Advance: true, } OptionFtSyncEvery = Option{ KeyName: KeyFtSyncEvery, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "同步meta的间隔(ft_sync_every)", CheckRegex: "\\d+", Advance: true, } OptionFtStrategy = Option{ KeyName: KeyFtStrategy, ChooseOnly: true, ChooseOptions: []interface{}{KeyFtStrategyBackupOnly, KeyFtStrategyAlwaysSave, KeyFtStrategyConcurrent}, Default: KeyFtStrategyBackupOnly, DefaultNoUse: false, Description: "磁盘管道容错策略(仅备份错误|全部数据走管道|仅增加并发)(ft_strategy)", Advance: true, } OptionFtProcs = Option{ KeyName: KeyFtProcs, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "发送并发数量(磁盘管道或内存管道 always_save 或 concurrent 模式生效)(ft_procs)", CheckRegex: "\\d+", Advance: true, } OptionFtMemoryChannel = Option{ KeyName: KeyFtMemoryChannel, ChooseOnly: true, ChooseOptions: []interface{}{"false", "true"}, Default: "false", DefaultNoUse: false, Description: "使用内存替换磁盘管道(加速)(ft_memory_channel)", Advance: true, } OptionFtMemoryChannelSize = Option{ KeyName: KeyFtMemoryChannelSize, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "内存管道长度(ft_memory_channel_size)", CheckRegex: "\\d+", Advance: true, AdvanceDepend: KeyFtMemoryChannel, } )
var ( Codes = map[byte][]byte{ ',': []byte(`\,`), '"': []byte(`\"`), ' ': []byte(`\ `), '=': []byte(`\=`), } )
var ErrNotAsyncSender = errors.New("This Sender does not support for Async Push")
NotAsyncSender return when sender is not async
var MatchTypeRegistry = map[string]MatchType{}
var ModeKeyOptions = map[string][]Option{ TypeFile: { { KeyName: KeyFileSenderPath, ChooseOnly: false, Default: "/home/john/mylogs/my.log", DefaultNoUse: true, Description: "发送到的目的文件路径(file_send_path)", }, }, TypePandora: { { KeyName: KeyPandoraWorkflowName, ChooseOnly: false, Default: "logkit_default_workflow", DefaultNoUse: true, Description: "Pandora workflow名称(为空则使用旧版)(pandora_workflow_name)", CheckRegex: "^[a-zA-Z_][a-zA-Z0-9_]{0,127}$", }, { KeyName: KeyPandoraRepoName, ChooseOnly: false, Default: "my_work", DefaultNoUse: true, Description: "Pandora 数据源名称(pandora_repo_name)", CheckRegex: "^[a-zA-Z][a-zA-Z0-9_]{0,127}$", }, { KeyName: KeyPandoraAk, ChooseOnly: false, Default: "在此填写您七牛账号ak(access_key)", DefaultNoUse: true, Description: "七牛的公钥(access_key)", }, { KeyName: KeyPandoraSk, ChooseOnly: false, Default: "在此填写您七牛账号的secret_key", DefaultNoUse: true, Description: "七牛的私钥(secret_key)", Secret: true, }, OptionSaveLogPath, { KeyName: KeyLogkitSendTime, ChooseOnly: true, ChooseOptions: []interface{}{"true", "false"}, Default: "true", DefaultNoUse: false, Description: "是否在发送数据时自动添加发送时间(logkit_send_time)", }, { KeyName: KeyPandoraExtraInfo, ChooseOnly: true, ChooseOptions: []interface{}{"false", "true"}, Default: "false", DefaultNoUse: false, Description: "自动添加额外信息(pandora_extra_info)", Advance: true, }, { KeyName: KeyPandoraHost, ChooseOnly: false, Default: "https://pipeline.qiniu.com", DefaultNoUse: false, Description: "Host地址(pandora_host)", Advance: true, }, { KeyName: KeyPandoraRegion, ChooseOnly: true, ChooseOptions: []interface{}{"nb"}, Default: "nb", DefaultNoUse: false, Description: "创建的资源所在区域(pandora_region)", Advance: true, }, { KeyName: KeyPandoraSchemaFree, ChooseOnly: true, ChooseOptions: []interface{}{"true", "false"}, Default: "true", DefaultNoUse: false, Description: "是否根据数据自动创建与增加字段(pandora_schema_free)", }, { KeyName: KeyPandoraAutoCreate, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "以DSL语法自动创建repo(pandora_auto_create)", Advance: true, }, { KeyName: KeyPandoraSchema, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "仅选择部分字段(重命名)发送(pandora_schema)", Advance: true, }, { KeyName: KeyPandoraEnableLogDB, ChooseOnly: true, ChooseOptions: []interface{}{"true", "false"}, Default: "true", DefaultNoUse: false, Description: "是否自动创建并导出到Pandora LogDB(pandora_enable_logdb)", }, { KeyName: KeyPandoraLogDBName, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "导出的 LogDB 仓库名称(pandora_logdb_name)", Advance: true, AdvanceDepend: KeyPandoraEnableLogDB, }, { KeyName: KeyPandoraLogDBHost, ChooseOnly: false, Default: "https://logdb.qiniu.com", DefaultNoUse: false, Description: "LogDB host 地址(pandora_logdb_host)", Advance: true, AdvanceDepend: KeyPandoraEnableLogDB, }, { KeyName: KeyPandoraEnableTSDB, ChooseOnly: true, ChooseOptions: []interface{}{"true", "false"}, Default: "false", DefaultNoUse: false, Description: "是否自动创建并导出到Pandora TSDB(pandora_enable_tsdb)", }, { KeyName: KeyPandoraTSDBName, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "导出的 TSDB 仓库名称(pandora_tsdb_name)", Advance: true, AdvanceDepend: KeyPandoraEnableTSDB, }, { KeyName: KeyPandoraTSDBSeriesName, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "导出的 TSDB 序列名称(pandora_tsdb_series_name)", Advance: true, AdvanceDepend: KeyPandoraEnableTSDB, }, { KeyName: KeyPandoraTSDBSeriesTags, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "导出的 TSDB 时需要设置为tag类型的字段列表(pandora_tsdb_series_tags)", Advance: true, AdvanceDepend: KeyPandoraEnableTSDB, }, { KeyName: KeyPandoraTSDBHost, ChooseOnly: false, Default: "https://tsdb.qiniu.com", DefaultNoUse: false, Description: "TSDB host 地址(pandora_tsdb_host)", Advance: true, AdvanceDepend: KeyPandoraEnableTSDB, }, { KeyName: KeyPandoraTSDBTimeStamp, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "TSDB 时间戳字段(pandora_tsdb_timestamp)", Advance: true, AdvanceDepend: KeyPandoraEnableTSDB, }, { KeyName: KeyPandoraEnableKodo, ChooseOnly: true, ChooseOptions: []interface{}{"false", "true"}, Default: "false", DefaultNoUse: false, Description: "是否自动导出到七牛云存储(pandora_enable_kodo)", }, { KeyName: KeyPandoraKodoBucketName, ChooseOnly: false, Default: "my_bucket_name", DefaultNoUse: true, Description: "云存储 Bucket 仓库名称(启用自动导出到云存储时必填)(pandora_bucket_name)", AdvanceDepend: KeyPandoraEnableTSDB, }, { KeyName: KeyPandoraEmail, ChooseOnly: false, Default: "my@email.com", DefaultNoUse: true, Description: "邮箱(启用自动导出到云存储时必填)(qiniu_email)", AdvanceDepend: KeyPandoraEnableTSDB, }, { KeyName: KeyPandoraKodoFilePrefix, ChooseOnly: false, Default: "logkitauto/date=$(year)-$(mon)-$(day)/hour=$(hour)/min=$(min)/$(sec)", DefaultNoUse: false, Description: "云存储文件前缀(pandora_kodo_prefix)", AdvanceDepend: KeyPandoraEnableTSDB, Advance: true, }, { KeyName: KeyPandoraKodoCompressPrefix, ChooseOnly: true, ChooseOptions: []interface{}{"parquet", "json", "text", "csv"}, Default: "parquet", DefaultNoUse: false, Description: "云存储压缩方式(pandora_kodo_compress)", AdvanceDepend: KeyPandoraEnableTSDB, Advance: true, }, { KeyName: KeyPandoraGzip, ChooseOnly: true, ChooseOptions: []interface{}{"true", "false"}, Default: "true", DefaultNoUse: false, Description: "gzip压缩发送(pandora_gzip)", Advance: true, }, { KeyName: KeyFlowRateLimit, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "流量限制(KB/s)(flow_rate_limit)", CheckRegex: "\\d+", Advance: true, }, { KeyName: KeyRequestRateLimit, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "请求限制(次/s)(request_rate_limit)", CheckRegex: "\\d+", Advance: true, }, { KeyName: KeyPandoraUUID, ChooseOnly: true, ChooseOptions: []interface{}{"false", "true"}, Default: "false", DefaultNoUse: false, Description: "每条数据植入UUID(pandora_uuid)", Advance: true, }, { KeyName: KeyPandoraWithIP, ChooseOnly: false, ChooseOptions: []interface{}{"false", "true"}, Default: "false", DefaultNoUse: false, Description: "每条数据植入IP地址(pandora_withip)", Advance: true, }, OptionFtWriteLimit, OptionFtSyncEvery, OptionFtStrategy, OptionFtProcs, OptionFtMemoryChannel, OptionFtMemoryChannelSize, { KeyName: KeyForceMicrosecond, ChooseOnly: true, ChooseOptions: []interface{}{"false", "true"}, Default: "false", DefaultNoUse: false, Description: "对于数据的时间字段抖动(force_microsecond)", Advance: true, }, { KeyName: KeyForceDataConvert, ChooseOnly: true, ChooseOptions: []interface{}{"false", "true"}, Default: "false", DefaultNoUse: false, Description: "数据强制类型转换(pandora_force_convert)", Advance: true, }, { KeyName: KeyIgnoreInvalidField, ChooseOnly: true, ChooseOptions: []interface{}{"true", "false"}, Default: "true", DefaultNoUse: false, Description: "忽略格式错误的字段(ignore_invalid_field)", Advance: true, }, { KeyName: KeyPandoraAutoConvertDate, ChooseOnly: true, ChooseOptions: []interface{}{"true", "false"}, Default: "true", DefaultNoUse: false, Description: "时间类型自动转换(pandora_auto_convert_date)", Advance: true, }, { KeyName: KeyPandoraUnescape, ChooseOnly: true, ChooseOptions: []interface{}{"true", "false"}, Default: "true", DefaultNoUse: false, Description: "服务端反转译换行和制表符(pandora_unescape)", Advance: true, }, }, TypeMongodbAccumulate: { { KeyName: KeyMongodbHost, ChooseOnly: false, Default: "mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]", DefaultNoUse: true, Description: "数据库地址(mongodb_host)", }, { KeyName: KeyMongodbDB, ChooseOnly: false, Default: "app123", DefaultNoUse: true, Description: "数据库名称(mongodb_db)", }, { KeyName: KeyMongodbCollection, ChooseOnly: false, Default: "collection1", DefaultNoUse: true, Description: "数据表名称(mongodb_collection)", }, { KeyName: KeyMongodbUpdateKey, ChooseOnly: false, Default: "domain,uid", DefaultNoUse: true, Description: "聚合条件列(mongodb_acc_updkey)", }, { KeyName: KeyMongodbAccKey, ChooseOnly: false, Default: "low,hit", DefaultNoUse: true, Description: "聚合列(mongodb_acc_acckey)", }, OptionSaveLogPath, OptionFtWriteLimit, OptionFtSyncEvery, OptionFtStrategy, OptionFtProcs, OptionFtMemoryChannel, OptionFtMemoryChannelSize, }, TypeInfluxdb: { { KeyName: KeyInfluxdbHost, ChooseOnly: false, Default: "127.0.0.1:8086", DefaultNoUse: true, Description: "数据库地址(influxdb_host)", }, { KeyName: KeyInfluxdbDB, ChooseOnly: false, Default: "testdb", DefaultNoUse: true, Description: "数据库名称(influxdb_db)", }, { KeyName: KeyInfluxdbAutoCreate, ChooseOnly: true, ChooseOptions: []interface{}{"true", "false"}, Description: "自动创建数据库(influxdb_auto_create)", }, { KeyName: KeyInfluxdbMeasurement, ChooseOnly: false, Default: "test_table", DefaultNoUse: true, Description: "measurement名称(influxdb_measurement)", }, { KeyName: KeyInfluxdbRetetion, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "retention名称(influxdb_retention)", }, { KeyName: KeyInfluxdbRetetionDuration, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "retention时长(influxdb_retention_duration)", AdvanceDepend: KeyInfluxdbAutoCreate, }, { KeyName: KeyInfluxdbTags, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "标签列数据(influxdb_tags)", }, { KeyName: KeyInfluxdbFields, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "普通列数据(influxdb_fields)", }, { KeyName: KeyInfluxdbTimestamp, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "时间戳列(influxdb_timestamp)", }, { KeyName: KeyInfluxdbTimestampPrecision, ChooseOnly: false, Default: "100", DefaultNoUse: false, Description: "时间戳列精度调整(influxdb_timestamp_precision)", Advance: true, }, OptionSaveLogPath, OptionFtWriteLimit, OptionFtSyncEvery, OptionFtStrategy, OptionFtProcs, OptionFtMemoryChannel, OptionFtMemoryChannelSize, }, TypeDiscard: {}, TypeElastic: { { KeyName: KeyElasticHost, ChooseOnly: false, Default: "192.168.31.203:9200", DefaultNoUse: false, Description: "host地址(elastic_host)", }, { KeyName: KeyElasticVersion, ChooseOnly: true, ChooseOptions: []interface{}{ElasticVersion3, ElasticVersion5, ElasticVersion6}, Description: "ES版本号(es_version)", }, { KeyName: KeyElasticIndex, ChooseOnly: false, Default: "app-repo-123", DefaultNoUse: true, Description: "索引名称(elastic_index)", }, { KeyName: KeyElasticIndexStrategy, ChooseOnly: true, ChooseOptions: []interface{}{KeyDefaultIndexStrategy, KeyYearIndexStrategy, KeyMonthIndexStrategy, KeyDayIndexStrategy}, Default: KeyFtStrategyBackupOnly, DefaultNoUse: false, Description: "自动索引模式(默认索引|按年索引|按月索引|按日索引)(index_strategy)", }, { KeyName: KeyElasticTimezone, ChooseOnly: true, ChooseOptions: []interface{}{KeyUTCTimezone, KeylocalTimezone, KeyPRCTimezone}, Default: KeyUTCTimezone, DefaultNoUse: false, Description: "索引时区(Local(本地)|UTC(标准时间)|PRC(北京时间))(elastic_time_zone)", }, { KeyName: KeyLogkitSendTime, ChooseOnly: true, ChooseOptions: []interface{}{"true", "false"}, Default: "true", DefaultNoUse: false, Description: "是否在发送数据时自动添加发送时间(logkit_send_time)", }, { KeyName: KeyElasticType, ChooseOnly: false, Default: "app", DefaultNoUse: true, Description: "索引类型名称(elastic_type)", }, OptionSaveLogPath, OptionFtWriteLimit, OptionFtSyncEvery, OptionFtStrategy, OptionFtProcs, OptionFtMemoryChannel, OptionFtMemoryChannelSize, }, TypeKafka: { { KeyName: KeyKafkaHost, ChooseOnly: false, Default: "192.168.31.201:9092", DefaultNoUse: true, Description: "broker的host地址(kafka_host)", }, { KeyName: KeyKafkaTopic, ChooseOnly: false, Default: "my_topic", DefaultNoUse: true, Description: "打点的topic名称(kafka_topic)", }, { KeyName: KeyKafkaCompression, ChooseOnly: true, ChooseOptions: []interface{}{KeyKafkaCompressionNone, KeyKafkaCompressionGzip, KeyKafkaCompressionSnappy}, Default: KeyKafkaCompressionNone, DefaultNoUse: false, Description: "压缩模式(none不压缩|gzip压缩|snappy压缩)(kafka_compression)", }, { KeyName: KeyKafkaClientId, ChooseOnly: false, Default: "", DefaultNoUse: false, Description: "kafka客户端标识ID(kafka_client_id)", }, { KeyName: KeyKafkaRetryMax, ChooseOnly: false, Default: "3", DefaultNoUse: false, Description: "kafka最大错误重试次数(kafka_retry_max)", Advance: true, }, { KeyName: KeyKafkaTimeout, ChooseOnly: false, Default: "30s", DefaultNoUse: false, Description: "kafka连接超时时间(kafka_timeout)", Advance: true, }, { KeyName: KeyKafkaKeepAlive, ChooseOnly: false, Default: "0", DefaultNoUse: false, Description: "kafka的keepalive时间(kafka_keep_alive)", Advance: true, }, OptionSaveLogPath, OptionFtWriteLimit, OptionFtSyncEvery, OptionFtStrategy, OptionFtProcs, OptionFtMemoryChannel, OptionFtMemoryChannelSize, }, TypeHttp: { { KeyName: KeyHttpSenderUrl, ChooseOnly: false, Default: "", DefaultNoUse: true, Description: "发送目的url(http_sender_url)", }, { KeyName: KeyHttpSenderProtocol, ChooseOnly: true, ChooseOptions: []interface{}{"json", "csv"}, Default: "json", DefaultNoUse: true, Description: "发送数据时使用的格式(http_sender_protocol)", }, { KeyName: KeyHttpSenderCsvSplit, ChooseOnly: false, Default: "\t", DefaultNoUse: true, Description: "csv分隔符(http_sender_csv_split)", }, { KeyName: KeyHttpSenderGzip, ChooseOnly: true, ChooseOptions: []interface{}{"true", "false"}, Default: "true", DefaultNoUse: true, Description: "是否启用gzip(http_sender_gzip)", }, }, }
var ModeUsages = []KeyValue{ {TypePandora, "发送到七牛大数据平台(Pandora)"}, {TypeFile, "发送到本地文件"}, {TypeMongodbAccumulate, "发送到 MongoDB 服务"}, {TypeInfluxdb, "发送到 InfluxDB 服务"}, {TypeDiscard, "消费数据但不发送"}, {TypeElastic, "发送到 Elasticsearch 服务"}, {TypeKafka, "发送到 Kafka 服务"}, {TypeHttp, "通过 Http Post 发送"}, }
ModeUsages 用途说明
var PandoraMaxBatchSize = 2 * 1024 * 1024
PandoraMaxBatchSize 发送到Pandora的batch限制
Functions ¶
func ConvertDatas ¶ added in v1.2.2
func ConvertDatas(ins []map[string]interface{}) []Data
func ConvertDatasBack ¶ added in v1.2.2
func ConvertDatasBack(ins []Data) []map[string]interface{}
func CreateInfluxdbDatabase ¶ added in v1.4.3
func CreateInfluxdbRetention ¶ added in v1.4.3
func GetRouterMatchTypeUsage ¶ added in v1.4.2
func GetRouterMatchTypeUsage() []KeyValue
func GetRouterOption ¶ added in v1.4.2
func GetRouterOption() []Option
func JSONLineMarshalFunc ¶
JSONLineMarshalFunc 将数据json并且按换行符分隔
func UnescapeString ¶
Types ¶
type DiscardSender ¶
type DiscardSender struct {
// contains filtered or unexported fields
}
func (*DiscardSender) Close ¶
func (s *DiscardSender) Close() error
func (*DiscardSender) Name ¶
func (s *DiscardSender) Name() string
Name function will return the name as string
func (*DiscardSender) Send ¶
func (s *DiscardSender) Send(d []Data) error
func (*DiscardSender) SendCount ¶
func (s *DiscardSender) SendCount() int
type ElasticsearchSender ¶
type ElasticsearchSender struct {
// contains filtered or unexported fields
}
ElasticsearchSender ElasticSearch sender
func (*ElasticsearchSender) Close ¶
func (ess *ElasticsearchSender) Close() error
Close ElasticSearch Sender Close
func (*ElasticsearchSender) Name ¶
func (ess *ElasticsearchSender) Name() string
Name ElasticSearchSenderName
func (*ElasticsearchSender) Send ¶
func (ess *ElasticsearchSender) Send(data []Data) (err error)
Send ElasticSearchSender
type FileSender ¶
type FileSender struct {
// contains filtered or unexported fields
}
FileSender write datas into local file only for test
func (*FileSender) Close ¶
func (fs *FileSender) Close() error
func (*FileSender) Name ¶
func (fs *FileSender) Name() string
type FtSender ¶
type FtSender struct {
// contains filtered or unexported fields
}
FtSender fault tolerance sender wrapper
func NewFtSender ¶
NewFtSender Fault tolerant sender constructor
type HttpSender ¶ added in v1.4.2
type HttpSender struct {
// contains filtered or unexported fields
}
func (*HttpSender) Close ¶ added in v1.4.2
func (h *HttpSender) Close() error
func (*HttpSender) Name ¶ added in v1.4.2
func (h *HttpSender) Name() string
func (*HttpSender) Send ¶ added in v1.4.2
func (h *HttpSender) Send(data []Data) (err error)
type InfluxdbSender ¶
type InfluxdbSender struct {
// contains filtered or unexported fields
}
InfluxdbSender write datas into influxdb
func (*InfluxdbSender) Close ¶
func (s *InfluxdbSender) Close() error
func (*InfluxdbSender) Name ¶
func (s *InfluxdbSender) Name() string
func (*InfluxdbSender) Send ¶
func (s *InfluxdbSender) Send(datas []Data) error
type KafkaSender ¶ added in v1.3.6
type KafkaSender struct {
// contains filtered or unexported fields
}
func (*KafkaSender) Close ¶ added in v1.3.6
func (this *KafkaSender) Close() (err error)
func (*KafkaSender) Name ¶ added in v1.3.6
func (this *KafkaSender) Name() string
func (*KafkaSender) Send ¶ added in v1.3.6
func (this *KafkaSender) Send(data []Data) error
type MockSender ¶
type MockSender struct {
// contains filtered or unexported fields
}
func (*MockSender) Close ¶
func (mock *MockSender) Close() error
func (*MockSender) Name ¶
func (mock *MockSender) Name() string
Name function will return the name and datas recieved as string
func (*MockSender) Send ¶
func (mock *MockSender) Send(d []Data) error
func (*MockSender) SendCount ¶
func (mock *MockSender) SendCount() int
type MongoAccSender ¶
MongoAccSender Mongodb 根据UpdateKey 做对AccumulateKey $inc 累加的Sender
func (*MongoAccSender) Close ¶
func (s *MongoAccSender) Close() error
func (*MongoAccSender) Name ¶
func (s *MongoAccSender) Name() string
func (*MongoAccSender) Send ¶
func (s *MongoAccSender) Send(datas []Data) (se error)
Send 依次尝试发送数据到mongodb,返回错误中包含所有写失败的数据 如果要保证每次send的原子性,必须保证datas长度为1,否则当程序宕机 总会出现丢失数据的问题
type PandoraOption ¶
type PandoraOption struct { UnescapeLine bool // contains filtered or unexported fields }
PandoraOption 创建Pandora Sender的选项
type PandoraSender ¶
type PandoraSender struct { UserSchema UserSchema // contains filtered or unexported fields }
PandoraSender pandora sender
func (*PandoraSender) Close ¶
func (s *PandoraSender) Close() error
func (*PandoraSender) Name ¶
func (s *PandoraSender) Name() string
func (*PandoraSender) Send ¶
func (s *PandoraSender) Send(datas []Data) (se error)
func (*PandoraSender) TokenRefresh ¶ added in v1.4.4
func (s *PandoraSender) TokenRefresh(mapConf conf.MapConf) error
func (*PandoraSender) UpdateSchemas ¶
func (s *PandoraSender) UpdateSchemas()
type Point ¶
type Router ¶ added in v1.4.2
type Router struct {
// contains filtered or unexported fields
}
func NewSenderRouter ¶ added in v1.4.2
func NewSenderRouter(conf RouterConfig, senderCnt int) (*Router, error)
func (*Router) GetSenderIndex ¶ added in v1.4.2
type RouterConfig ¶ added in v1.4.2
type Sender ¶
type Sender interface { Name() string // send data, error if failed Send([]Data) error Close() error }
Sender send data to pandora, prometheus such different destinations
func NewDiscardSender ¶
NewDiscardSender 仅用于日志清理
func NewElasticSender ¶
NewElasticSender New ElasticSender
func NewFileSender ¶
NewFileSender construct
func NewInfluxdbSender ¶
NewInfluxdbSender 创建Influxdb 的sender
func NewKafkaSender ¶ added in v1.3.6
func NewMongodbAccSender ¶
NewMongodbAccSender mongodb accumulate sender constructor
type SenderRegistry ¶
type SenderRegistry struct {
// contains filtered or unexported fields
}
SenderRegistry sender 的工厂类。可以注册自定义sender
func NewSenderRegistry ¶
func NewSenderRegistry() *SenderRegistry
func (*SenderRegistry) RegisterSender ¶
type StatsSender ¶ added in v1.3.0
type TokenRefreshable ¶ added in v1.4.4
type Tokens ¶ added in v1.4.4
type Tokens struct { LogDBTokens pipeline.AutoExportLogDBTokens TsDBTokens pipeline.AutoExportTSDBTokens KodoTokens pipeline.AutoExportKodoTokens SchemaFreeTokens pipeline.SchemaFreeToken }
type UserSchema ¶
UserSchema was parsed pandora schema from user's raw schema