sender

package
v1.4.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 6, 2018 License: Apache-2.0 Imports: 39 Imported by: 62

Documentation

Index

Constants

View Source
const (
	KeyElasticHost    = "elastic_host"
	KeyElasticVersion = "elastic_version"
	KeyElasticIndex   = "elastic_index"
	KeyElasticType    = "elastic_type"
	KeyElasticAlias   = "elastic_keys"

	KeyElasticIndexStrategy = "elastic_index_strategy"
	KeyElasticTimezone      = "elastic_time_zone"
)
View Source
const (
	KeyDefaultIndexStrategy = "default"
	KeyYearIndexStrategy    = "year"
	KeyMonthIndexStrategy   = "month"
	KeyDayIndexStrategy     = "day"
)
View Source
const (
	KeylocalTimezone = "Local"
	KeyUTCTimezone   = "UTC"
	KeyPRCTimezone   = "PRC"
)

timeZone

View Source
const (
	KeyFtSyncEvery         = "ft_sync_every"    // 该参数设置多少次写入会同步一次offset log
	KeyFtSaveLogPath       = "ft_save_log_path" // disk queue 数据日志路径
	KeyFtWriteLimit        = "ft_write_limit"   // 写入速度限制,单位MB
	KeyFtStrategy          = "ft_strategy"      // ft 的策略
	KeyFtProcs             = "ft_procs"         // ft并发数,当always_save或concurrent策略时启用
	KeyFtMemoryChannel     = "ft_memory_channel"
	KeyFtMemoryChannelSize = "ft_memory_channel_size"
)

可选参数 fault_tolerant 为true的话,以下必填

View Source
const (
	// KeyFtStrategyBackupOnly 只在失败的时候进行容错
	KeyFtStrategyBackupOnly = "backup_only"
	// KeyFtStrategyAlwaysSave 所有数据都进行容错
	KeyFtStrategyAlwaysSave = "always_save"
	// KeyFtStrategyConcurrent 适合并发发送数据,只在失败的时候进行容错
	KeyFtStrategyConcurrent = "concurrent"
)

ft 策略

View Source
const (
	KeyHttpSenderUrl      = "http_sender_url"
	KeyHttpSenderGzip     = "http_sender_gzip"
	KeyHttpSenderProtocol = "http_sender_protocol"
	KeyHttpSenderCsvHead  = "http_sender_csv_head"
	KeyHttpSenderCsvSplit = "http_sender_csv_split"
)
View Source
const (
	KeyInfluxdbHost               = "influxdb_host"
	KeyInfluxdbDB                 = "influxdb_db"
	KeyInfluxdbAutoCreate         = "influxdb_autoCreate"
	KeyInfluxdbRetetion           = "influxdb_retention"
	KeyInfluxdbRetetionDuration   = "influxdb_retention_duration"
	KeyInfluxdbMeasurement        = "influxdb_measurement"
	KeyInfluxdbTags               = "influxdb_tags"
	KeyInfluxdbFields             = "influxdb_fields"              // influxdb
	KeyInfluxdbTimestamp          = "influxdb_timestamp"           // 可选 nano时间戳字段
	KeyInfluxdbTimestampPrecision = "influxdb_timestamp_precision" // 时间戳字段的精度,代表时间戳1个单位代表多少纳秒
)

Influxdb sender 的可配置字段

View Source
const (
	KeyKafkaCompressionNone   = "none"
	KeyKafkaCompressionGzip   = "gzip"
	KeyKafkaCompressionSnappy = "snappy"
)
View Source
const (
	KeyKafkaHost     = "kafka_host"      //主机地址,可以有多个
	KeyKafkaTopic    = "kafka_topic"     //topic 1.填一个值,则topic为所填值 2.天两个值: %{[字段名]}, defaultTopic :根据每条event,以指定字段值为topic,若无,则用默认值
	KeyKafkaClientId = "kafka_client_id" //客户端ID
	//KeyKafkaFlushNum = "kafka_flush_num"				//缓冲条数
	//KeyKafkaFlushFrequency = "kafka_flush_frequency"	//缓冲频率
	KeyKafkaRetryMax    = "kafka_retry_max"   //最大重试次数
	KeyKafkaCompression = "kafka_compression" //压缩模式,有none, gzip, snappy
	KeyKafkaTimeout     = "kafka_timeout"     //连接超时时间
	KeyKafkaKeepAlive   = "kafka_keep_alive"  //保持连接时长
	KeyMaxMessageBytes  = "max_message_bytes" //每条消息最大字节数
)
View Source
const (
	KeyMongodbHost       = "mongodb_host"
	KeyMongodbDB         = "mongodb_db"
	KeyMongodbCollection = "mongodb_collection"
)

可选参数 当sender_type 为mongodb_* 的时候,需要必填的字段

View Source
const (
	KeyMongodbUpdateKey = "mongodb_acc_updkey"
	KeyMongodbAccKey    = "mongodb_acc_acckey"
)

可选参数 当sender_type 为mongodb_acc 的时候,需要必填的字段

View Source
const (
	KeyPandoraAk                   = "pandora_ak"
	KeyPandoraSk                   = "pandora_sk"
	KeyPandoraHost                 = "pandora_host"
	KeyPandoraWorkflowName         = "pandora_workflow_name"
	KeyPandoraRepoName             = "pandora_repo_name"
	KeyPandoraRegion               = "pandora_region"
	KeyPandoraSchema               = "pandora_schema"
	KeyPandoraSchemaUpdateInterval = "pandora_schema_update_interval"
	KeyPandoraAutoCreate           = "pandora_auto_create"
	KeyPandoraSchemaFree           = "pandora_schema_free"
	KeyPandoraExtraInfo            = "pandora_extra_info"

	KeyPandoraEnableLogDB = "pandora_enable_logdb"
	KeyPandoraLogDBName   = "pandora_logdb_name"
	KeyPandoraLogDBHost   = "pandora_logdb_host"

	KeyPandoraEnableTSDB     = "pandora_enable_tsdb"
	KeyPandoraTSDBName       = "pandora_tsdb_name"
	KeyPandoraTSDBSeriesName = "pandora_tsdb_series_name"
	KeyPandoraTSDBSeriesTags = "pandora_tsdb_series_tags"
	KeyPandoraTSDBHost       = "pandora_tsdb_host"
	KeyPandoraTSDBTimeStamp  = "pandora_tsdb_timestamp"

	KeyPandoraEnableKodo         = "pandora_enable_kodo"
	KeyPandoraKodoBucketName     = "pandora_bucket_name"
	KeyPandoraKodoFilePrefix     = "pandora_kodo_prefix"
	KeyPandoraKodoCompressPrefix = "pandora_kodo_compress"

	KeyPandoraEmail = "qiniu_email"

	KeyRequestRateLimit       = "request_rate_limit"
	KeyFlowRateLimit          = "flow_rate_limit"
	KeyPandoraGzip            = "pandora_gzip"
	KeyPandoraUUID            = "pandora_uuid"
	KeyPandoraWithIP          = "pandora_withip"
	KeyForceMicrosecond       = "force_microsecond"
	KeyForceDataConvert       = "pandora_force_convert"
	KeyPandoraAutoConvertDate = "pandora_auto_convert_date"
	KeyIgnoreInvalidField     = "ignore_invalid_field"

	PandoraUUID = "Pandora_UUID"
)

可选参数 当sender_type 为pandora 的时候,需要必填的字段

View Source
const (
	PandoraTypeLong       = "long"
	PandoraTypeFloat      = "float"
	PandoraTypeString     = "string"
	PandoraTypeDate       = "date"
	PandoraTypeBool       = "boolean"
	PandoraTypeArray      = "array"
	PandoraTypeMap        = "map"
	PandoraTypeJsonString = "jsonstring"
)
View Source
const (
	KeySenderType     = "sender_type"
	KeyFaultTolerant  = "fault_tolerant"
	KeyName           = "name"
	KeyRunnerName     = "runner_name"
	KeyLogkitSendTime = "logkit_send_time"
	KeyIsMetrics      = "is_metrics"
	KeyMetricTime     = "timestamp"
)

Sender's conf keys

View Source
const (
	TypeFile              = "file"          // 本地文件
	TypePandora           = "pandora"       // pandora 打点
	TypeMongodbAccumulate = "mongodb_acc"   // mongodb 并且按字段聚合
	TypeInfluxdb          = "influxdb"      // influxdb
	TypeMock              = "mock"          // mock sender
	TypeDiscard           = "discard"       // discard sender
	TypeElastic           = "elasticsearch" // elastic
	TypeKafka             = "kafka"         // kafka
	TypeHttp              = "http"          // http sender
)

SenderType 发送类型

View Source
const (
	//DefaultSenderIndex = 0
	RouterKeyName   = "router_key_name"
	RouterMatchType = "router_match_type"
	//RouterRoutesMap    = "router_routes"
	//RouterMatchValue   = "router_match_value"
	//RouterSenderIndex  = "router_sender_index"
	RouterDefaultIndex = "router_default_sender"

	MTypeEqualName    = "equal"
	MTypeContainsName = "contains"
)
View Source
const DefaultFtSyncEvery = 10

Ft sender默认同步一次meta信息的数据次数

View Source
const (
	InnerUserAgent = "_useragent"
)
View Source
const (
	KeyFileSenderPath = "file_send_path"
)

可选参数 当sender_type 为file 的时候

View Source
const KeySendTime = "sendTime"
View Source
const UnderfinedRunnerName = "UnderfinedRunnerName"

Variables

View Source
var (
	// ElasticVersion3 v3.x
	ElasticVersion3 = "3.x"
	// ElasticVersion5 v5.x
	ElasticVersion5 = "5.x"
	// ElasticVersion6 v6.x
	ElasticVersion6 = "6.x"
)
View Source
var (
	OptionSaveLogPath = Option{
		KeyName:      KeyFtSaveLogPath,
		ChooseOnly:   false,
		Default:      "",
		DefaultNoUse: false,
		Description:  "管道本地盘数据保存路径(ft_save_log_path)",
	}
	OptionFtWriteLimit = Option{
		KeyName:      KeyFtWriteLimit,
		ChooseOnly:   false,
		Default:      "",
		DefaultNoUse: false,
		Description:  "磁盘写入限速(MB/s)(ft_write_limit)",
		CheckRegex:   "\\d+",
	}
	OptionFtSyncEvery = Option{
		KeyName:      KeyFtSyncEvery,
		ChooseOnly:   false,
		Default:      "",
		DefaultNoUse: false,
		Description:  "同步meta的间隔(ft_sync_every)",
		CheckRegex:   "\\d+",
	}
	OptionFtStrategy = Option{
		KeyName:       KeyFtStrategy,
		ChooseOnly:    true,
		ChooseOptions: []interface{}{KeyFtStrategyBackupOnly, KeyFtStrategyAlwaysSave, KeyFtStrategyConcurrent},
		Default:       KeyFtStrategyBackupOnly,
		DefaultNoUse:  false,
		Description:   "磁盘管道容错策略(仅备份错误|全部数据走管道)(ft_strategy)",
	}
	OptionFtProcs = Option{
		KeyName:      KeyFtProcs,
		ChooseOnly:   false,
		Default:      "",
		DefaultNoUse: false,
		Description:  "发送并发数量(磁盘管道或内存管道 always_save 或 concurrent 模式生效)(ft_procs)",
		CheckRegex:   "\\d+",
	}
	OptionFtMemoryChannel = Option{
		KeyName:       KeyFtMemoryChannel,
		ChooseOnly:    true,
		ChooseOptions: []interface{}{"false", "true"},
		Default:       "false",
		DefaultNoUse:  false,
		Description:   "使用内存替换磁盘管道(加速)(ft_memory_channel)",
	}
	OptionFtMemoryChannelSize = Option{
		KeyName:      KeyFtMemoryChannelSize,
		ChooseOnly:   false,
		Default:      "",
		DefaultNoUse: false,
		Description:  "内存管道长度(ft_memory_channel_size)",
		CheckRegex:   "\\d+",
	}
)
View Source
var (
	Codes = map[byte][]byte{
		',': []byte(`\,`),
		'"': []byte(`\"`),
		' ': []byte(`\ `),
		'=': []byte(`\=`),
	}
)
View Source
var ErrNotAsyncSender = errors.New("This Sender does not support for Async Push")

NotAsyncSender return when sender is not async

View Source
var MatchTypeRegistry = map[string]MatchType{}
View Source
var ModeKeyOptions = map[string][]Option{
	TypeFile: {
		{
			KeyName:      KeyFileSenderPath,
			ChooseOnly:   false,
			Default:      "/home/john/mylogs/my.log",
			DefaultNoUse: true,
			Description:  "发送到的目的文件路径(file_send_path)",
		},
	},
	TypePandora: {
		{
			KeyName:      KeyPandoraWorkflowName,
			ChooseOnly:   false,
			Default:      "logkit_default_workflow",
			DefaultNoUse: true,
			Description:  "Pandora workflow名称(为空则使用旧版)(pandora_workflow_name)",
			CheckRegex:   "^[a-zA-Z_][a-zA-Z0-9_]{0,127}$",
		},
		{
			KeyName:      KeyPandoraRepoName,
			ChooseOnly:   false,
			Default:      "my_work",
			DefaultNoUse: true,
			Description:  "Pandora 数据源名称(pandora_repo_name)",
			CheckRegex:   "^[a-zA-Z][a-zA-Z0-9_]{0,127}$",
		},
		{
			KeyName:      KeyPandoraAk,
			ChooseOnly:   false,
			Default:      "在此填写您七牛账号ak(access_key)",
			DefaultNoUse: true,
			Description:  "七牛的公钥(access_key)",
		},
		{
			KeyName:      KeyPandoraSk,
			ChooseOnly:   false,
			Default:      "在此填写您七牛账号的secret_key",
			DefaultNoUse: true,
			Description:  "七牛的私钥(secret_key)",
			Secret:       true,
		},
		OptionSaveLogPath,
		{
			KeyName:       KeyLogkitSendTime,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"true", "false"},
			Default:       "true",
			DefaultNoUse:  false,
			Description:   "是否在发送数据时自动添加发送时间(logkit_send_time)",
		},
		{
			KeyName:       KeyPandoraExtraInfo,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"true", "false"},
			Default:       "true",
			DefaultNoUse:  false,
			Description:   "自动添加额外信息(pandora_extra_info)",
		},
		{
			KeyName:      KeyPandoraHost,
			ChooseOnly:   false,
			Default:      "https://pipeline.qiniu.com",
			DefaultNoUse: false,
			Description:  "Host地址(pandora_host)",
		},
		{
			KeyName:       KeyPandoraRegion,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"nb"},
			Default:       "nb",
			DefaultNoUse:  false,
			Description:   "创建的资源所在区域(pandora_region)",
		},
		{
			KeyName:       KeyPandoraSchemaFree,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"true", "false"},
			Default:       "true",
			DefaultNoUse:  false,
			Description:   "是否根据数据自动创建与增加字段(pandora_schema_free)",
		},
		{
			KeyName:      KeyPandoraAutoCreate,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "以DSL语法自动创建repo(pandora_auto_create)",
		},
		{
			KeyName:      KeyPandoraSchema,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "仅选择部分字段(重命名)发送(pandora_schema)",
		},
		{
			KeyName:       KeyPandoraEnableLogDB,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"true", "false"},
			Default:       "true",
			DefaultNoUse:  false,
			Description:   "是否自动创建并导出到Pandora LogDB(pandora_enable_logdb)",
		},
		{
			KeyName:      KeyPandoraLogDBName,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "导出的 LogDB 仓库名称(pandora_logdb_name)",
		},
		{
			KeyName:      KeyPandoraLogDBHost,
			ChooseOnly:   false,
			Default:      "https://logdb.qiniu.com",
			DefaultNoUse: false,
			Description:  "LogDB host 地址(pandora_logdb_host)",
		},
		{
			KeyName:       KeyPandoraEnableTSDB,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"true", "false"},
			Default:       "false",
			DefaultNoUse:  false,
			Description:   "是否自动创建并导出到Pandora TSDB(pandora_enable_tsdb)",
		},
		{
			KeyName:      KeyPandoraTSDBName,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "导出的 TSDB 仓库名称(pandora_tsdb_name)",
		},
		{
			KeyName:      KeyPandoraTSDBSeriesName,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "导出的 TSDB 序列名称(pandora_tsdb_series_name)",
		},
		{
			KeyName:      KeyPandoraTSDBSeriesTags,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "导出的 TSDB 时需要设置为tag类型的字段列表(pandora_tsdb_series_tags)",
		},
		{
			KeyName:      KeyPandoraTSDBHost,
			ChooseOnly:   false,
			Default:      "https://tsdb.qiniu.com",
			DefaultNoUse: false,
			Description:  "TSDB host 地址(pandora_tsdb_host)",
		},
		{
			KeyName:      KeyPandoraTSDBTimeStamp,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "TSDB 时间戳字段(pandora_tsdb_timestamp)",
		},
		{
			KeyName:       KeyPandoraEnableKodo,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"false", "true"},
			Default:       "false",
			DefaultNoUse:  false,
			Description:   "是否自动导出到七牛云存储(pandora_enable_kodo)",
		},
		{
			KeyName:      KeyPandoraKodoBucketName,
			ChooseOnly:   false,
			Default:      "my_bucket_name",
			DefaultNoUse: true,
			Description:  "云存储 Bucket 仓库名称(启用自动导出到云存储时必填)(pandora_bucket_name)",
		},
		{
			KeyName:      KeyPandoraEmail,
			ChooseOnly:   false,
			Default:      "my@email.com",
			DefaultNoUse: true,
			Description:  "邮箱(启用自动导出到云存储时必填)(qiniu_email)",
		},
		{
			KeyName:      KeyPandoraKodoFilePrefix,
			ChooseOnly:   false,
			Default:      "logkitauto/date=$(year)-$(mon)-$(day)/hour=$(hour)/min=$(min)/$(sec)",
			DefaultNoUse: false,
			Description:  "云存储文件前缀(pandora_kodo_prefix)",
		},
		{
			KeyName:       KeyPandoraKodoCompressPrefix,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"parquet", "json", "text", "csv"},
			Default:       "parquet",
			DefaultNoUse:  false,
			Description:   "云存储压缩方式(pandora_kodo_compress)",
		},
		{
			KeyName:       KeyPandoraGzip,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"true", "false"},
			Default:       "true",
			DefaultNoUse:  false,
			Description:   "gzip压缩发送(pandora_gzip)",
		},
		{
			KeyName:      KeyFlowRateLimit,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "流量限制(KB/s)(flow_rate_limit)",
			CheckRegex:   "\\d+",
		},
		{
			KeyName:      KeyRequestRateLimit,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "请求限制(次/s)(request_rate_limit)",
			CheckRegex:   "\\d+",
		},
		{
			KeyName:       KeyPandoraUUID,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"false", "true"},
			Default:       "false",
			DefaultNoUse:  false,
			Description:   "每条数据植入UUID(pandora_uuid)",
		},
		{
			KeyName:       KeyPandoraWithIP,
			ChooseOnly:    false,
			ChooseOptions: []interface{}{"false", "true"},
			Default:       "false",
			DefaultNoUse:  false,
			Description:   "每条数据植入IP地址(pandora_withip)",
		},
		OptionFtWriteLimit,
		OptionFtSyncEvery,
		OptionFtStrategy,
		OptionFtProcs,
		OptionFtMemoryChannel,
		OptionFtMemoryChannelSize,
		{
			KeyName:       KeyForceMicrosecond,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"false", "true"},
			Default:       "false",
			DefaultNoUse:  false,
			Description:   "对于数据的时间字段抖动(force_microsecond)",
		},
		{
			KeyName:       KeyForceDataConvert,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"false", "true"},
			Default:       "false",
			DefaultNoUse:  false,
			Description:   "数据强制类型转换(pandora_force_convert)",
		},
		{
			KeyName:       KeyIgnoreInvalidField,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"true", "false"},
			Default:       "true",
			DefaultNoUse:  false,
			Description:   "忽略格式错误的字段(ignore_invalid_field)",
		},
		{
			KeyName:       KeyPandoraAutoConvertDate,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"true", "false"},
			Default:       "true",
			DefaultNoUse:  false,
			Description:   "时间类型自动转换(pandora_auto_convert_date)",
		},
	},
	TypeMongodbAccumulate: {
		{
			KeyName:      KeyMongodbHost,
			ChooseOnly:   false,
			Default:      "mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]",
			DefaultNoUse: true,
			Description:  "数据库地址(mongodb_host)",
		},
		{
			KeyName:      KeyMongodbDB,
			ChooseOnly:   false,
			Default:      "app123",
			DefaultNoUse: true,
			Description:  "数据库名称(mongodb_db)",
		},
		{
			KeyName:      KeyMongodbCollection,
			ChooseOnly:   false,
			Default:      "collection1",
			DefaultNoUse: true,
			Description:  "数据表名称(mongodb_collection)",
		},
		{
			KeyName:      KeyMongodbUpdateKey,
			ChooseOnly:   false,
			Default:      "domain,uid",
			DefaultNoUse: true,
			Description:  "聚合条件列(mongodb_acc_updkey)",
		},
		{
			KeyName:      KeyMongodbAccKey,
			ChooseOnly:   false,
			Default:      "low,hit",
			DefaultNoUse: true,
			Description:  "聚合列(mongodb_acc_acckey)",
		},
		OptionSaveLogPath,
		OptionFtWriteLimit,
		OptionFtSyncEvery,
		OptionFtStrategy,
		OptionFtProcs,
		OptionFtMemoryChannel,
		OptionFtMemoryChannelSize,
	},
	TypeInfluxdb: {
		{
			KeyName:      KeyInfluxdbHost,
			ChooseOnly:   false,
			Default:      "127.0.0.1:8086",
			DefaultNoUse: true,
			Description:  "数据库地址(influxdb_host)",
		},
		{
			KeyName:      KeyInfluxdbDB,
			ChooseOnly:   false,
			Default:      "testdb",
			DefaultNoUse: true,
			Description:  "数据库名称(influxdb_db)",
		},
		{
			KeyName:       KeyInfluxdbAutoCreate,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"true", "false"},
			Description:   "自动创建数据库(influxdb_auto_create)",
		},
		{
			KeyName:      KeyInfluxdbMeasurement,
			ChooseOnly:   false,
			Default:      "test_table",
			DefaultNoUse: true,
			Description:  "measurement名称(influxdb_measurement)",
		},
		{
			KeyName:      KeyInfluxdbRetetion,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "retention名称(influxdb_retention)",
		},
		{
			KeyName:      KeyInfluxdbRetetionDuration,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "retention时长(influxdb_retention_duration)",
		},
		{
			KeyName:      KeyInfluxdbTags,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "标签列数据(influxdb_tags)",
		},
		{
			KeyName:      KeyInfluxdbFields,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "普通列数据(influxdb_fields)",
		},
		{
			KeyName:      KeyInfluxdbTimestamp,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "时间戳列(influxdb_timestamp)",
		},
		{
			KeyName:      KeyInfluxdbTimestampPrecision,
			ChooseOnly:   false,
			Default:      "100",
			DefaultNoUse: false,
			Description:  "时间戳列精度调整(influxdb_timestamp_precision)",
		},
		OptionSaveLogPath,
		OptionFtWriteLimit,
		OptionFtSyncEvery,
		OptionFtStrategy,
		OptionFtProcs,
		OptionFtMemoryChannel,
		OptionFtMemoryChannelSize,
	},
	TypeDiscard: {},
	TypeElastic: {
		{
			KeyName:      KeyElasticHost,
			ChooseOnly:   false,
			Default:      "192.168.31.203:9200",
			DefaultNoUse: false,
			Description:  "host地址(elastic_host)",
		},
		{
			KeyName:       KeyElasticVersion,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{ElasticVersion3, ElasticVersion5, ElasticVersion6},
			Description:   "ES版本号(es_version)",
		},
		{
			KeyName:      KeyElasticIndex,
			ChooseOnly:   false,
			Default:      "app-repo-123",
			DefaultNoUse: true,
			Description:  "索引名称(elastic_index)",
		},
		{
			KeyName:       KeyElasticIndexStrategy,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{KeyDefaultIndexStrategy, KeyYearIndexStrategy, KeyMonthIndexStrategy, KeyDayIndexStrategy},
			Default:       KeyFtStrategyBackupOnly,
			DefaultNoUse:  false,
			Description:   "自动索引模式(默认索引|按年索引|按月索引|按日索引)(index_strategy)",
		},
		{
			KeyName:       KeyElasticTimezone,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{KeyUTCTimezone, KeylocalTimezone, KeyPRCTimezone},
			Default:       KeyUTCTimezone,
			DefaultNoUse:  false,
			Description:   "索引时区(Local(本地)|UTC(标准时间)|PRC(北京时间))(elastic_time_zone)",
		},
		{
			KeyName:       KeyLogkitSendTime,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"true", "false"},
			Default:       "true",
			DefaultNoUse:  false,
			Description:   "是否在发送数据时自动添加发送时间(logkit_send_time)",
		},
		{
			KeyName:      KeyElasticType,
			ChooseOnly:   false,
			Default:      "app",
			DefaultNoUse: true,
			Description:  "索引类型名称(elastic_type)",
		},
		OptionSaveLogPath,
		OptionFtWriteLimit,
		OptionFtSyncEvery,
		OptionFtStrategy,
		OptionFtProcs,
		OptionFtMemoryChannel,
		OptionFtMemoryChannelSize,
	},
	TypeKafka: {
		{
			KeyName:      KeyKafkaHost,
			ChooseOnly:   false,
			Default:      "192.168.31.201:9092",
			DefaultNoUse: true,
			Description:  "broker的host地址(kafka_host)",
		},
		{
			KeyName:      KeyKafkaTopic,
			ChooseOnly:   false,
			Default:      "my_topic",
			DefaultNoUse: true,
			Description:  "打点的topic名称(kafka_topic)",
		},
		{
			KeyName:       KeyKafkaCompression,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{KeyKafkaCompressionNone, KeyKafkaCompressionGzip, KeyKafkaCompressionSnappy},
			Default:       KeyKafkaCompressionNone,
			DefaultNoUse:  false,
			Description:   "压缩模式(none不压缩|gzip压缩|snappy压缩)(kafka_compression)",
		},
		{
			KeyName:      KeyKafkaClientId,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "kafka客户端标识ID(kafka_client_id)",
		},
		{
			KeyName:      KeyKafkaRetryMax,
			ChooseOnly:   false,
			Default:      "3",
			DefaultNoUse: false,
			Description:  "kafka最大错误重试次数(kafka_retry_max)",
		},
		{
			KeyName:      KeyKafkaTimeout,
			ChooseOnly:   false,
			Default:      "30s",
			DefaultNoUse: false,
			Description:  "kafka连接超时时间(kafka_timeout)",
		},
		{
			KeyName:      KeyKafkaKeepAlive,
			ChooseOnly:   false,
			Default:      "0",
			DefaultNoUse: false,
			Description:  "kafka的keepalive时间(kafka_keep_alive)",
		},
		OptionSaveLogPath,
		OptionFtWriteLimit,
		OptionFtSyncEvery,
		OptionFtStrategy,
		OptionFtProcs,
		OptionFtMemoryChannel,
		OptionFtMemoryChannelSize,
	},
	TypeHttp: {
		{
			KeyName:      KeyHttpSenderUrl,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: true,
			Description:  "发送目的url(http_sender_url)",
		},
		{
			KeyName:       KeyHttpSenderProtocol,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"json", "csv"},
			Default:       "json",
			DefaultNoUse:  true,
			Description:   "发送数据时使用的格式(http_sender_protocol)",
		},
		{
			KeyName:      KeyHttpSenderCsvSplit,
			ChooseOnly:   false,
			Default:      "\t",
			DefaultNoUse: true,
			Description:  "csv分隔符(http_sender_csv_split)",
		},
		{
			KeyName:       KeyHttpSenderGzip,
			ChooseOnly:    true,
			ChooseOptions: []interface{}{"true", "false"},
			Default:       "true",
			DefaultNoUse:  true,
			Description:   "是否启用gzip(http_sender_gzip)",
		},
	},
}
View Source
var ModeUsages = []KeyValue{
	{TypePandora, "发送到 Pandora"},
	{TypeFile, "发送到本地文件"},
	{TypeMongodbAccumulate, "发送到 mongodb"},
	{TypeInfluxdb, "发送到 influxdb"},
	{TypeDiscard, "消费数据但不发送"},
	{TypeElastic, "发送到Elasticsearch"},
	{TypeKafka, "发送到Kafka"},
	{TypeHttp, "通过 Http Post 发送"},
}

ModeUsages 用途说明

View Source
var PandoraMaxBatchSize = 2 * 1024 * 1024

PandoraMaxBatchSize 发送到Pandora的batch限制

Functions

func ConvertDatas added in v1.2.2

func ConvertDatas(ins []map[string]interface{}) []Data

func ConvertDatasBack added in v1.2.2

func ConvertDatasBack(ins []Data) []map[string]interface{}

func CreateInfluxdbDatabase added in v1.4.3

func CreateInfluxdbDatabase(host, database, sender string) (err error)

func CreateInfluxdbRetention added in v1.4.3

func CreateInfluxdbRetention(host, database, retention, duration, sender string) (err error)

func GetRouterMatchTypeUsage added in v1.4.2

func GetRouterMatchTypeUsage() []KeyValue

func GetRouterOption added in v1.4.2

func GetRouterOption() []Option

func JSONLineMarshalFunc

func JSONLineMarshalFunc(datas []Data) ([]byte, error)

JSONLineMarshalFunc 将数据json并且按换行符分隔

func MakeKey

func MakeKey(name []byte, tags Tags) []byte

MakeKey creates a key for a set of tags.

func String

func String(in string) string

func UnescapeString

func UnescapeString(in string) string

Types

type DiscardSender

type DiscardSender struct {
	// contains filtered or unexported fields
}

func (*DiscardSender) Close

func (s *DiscardSender) Close() error

func (*DiscardSender) Name

func (s *DiscardSender) Name() string

Name function will return the name as string

func (*DiscardSender) Send

func (s *DiscardSender) Send(d []Data) error

func (*DiscardSender) SendCount

func (s *DiscardSender) SendCount() int

type ElasticsearchSender

type ElasticsearchSender struct {
	// contains filtered or unexported fields
}

ElasticsearchSender ElasticSearch sender

func (*ElasticsearchSender) Close

func (ess *ElasticsearchSender) Close() error

Close ElasticSearch Sender Close

func (*ElasticsearchSender) Name

func (ess *ElasticsearchSender) Name() string

Name ElasticSearchSenderName

func (*ElasticsearchSender) Send

func (ess *ElasticsearchSender) Send(data []Data) (err error)

Send ElasticSearchSender

type FileSender

type FileSender struct {
	// contains filtered or unexported fields
}

FileSender write datas into local file only for test

func (*FileSender) Close

func (fs *FileSender) Close() error

func (*FileSender) Name

func (fs *FileSender) Name() string

func (*FileSender) Send

func (fs *FileSender) Send(datas []Data) error

Send inherit from Sender

type FtOption added in v1.2.2

type FtOption struct {
	// contains filtered or unexported fields
}

type FtSender

type FtSender struct {
	// contains filtered or unexported fields
}

FtSender fault tolerance sender wrapper

func NewFtSender

func NewFtSender(sender Sender, conf conf.MapConf, ftSaveLogPath string) (*FtSender, error)

NewFtSender Fault tolerant sender constructor

func (*FtSender) Close

func (ft *FtSender) Close() error

func (*FtSender) Name

func (ft *FtSender) Name() string

func (*FtSender) Reset added in v1.3.1

func (ft *FtSender) Reset() error

func (*FtSender) Restore added in v1.3.5

func (ft *FtSender) Restore(info *utils.StatsInfo)

func (*FtSender) Send

func (ft *FtSender) Send(datas []Data) error

func (*FtSender) Stats added in v1.3.0

func (ft *FtSender) Stats() utils.StatsInfo

type HttpSender added in v1.4.2

type HttpSender struct {
	// contains filtered or unexported fields
}

func (*HttpSender) Close added in v1.4.2

func (h *HttpSender) Close() error

func (*HttpSender) Name added in v1.4.2

func (h *HttpSender) Name() string

func (*HttpSender) Send added in v1.4.2

func (h *HttpSender) Send(data []Data) (err error)

type InfluxdbSender

type InfluxdbSender struct {
	// contains filtered or unexported fields
}

InfluxdbSender write datas into influxdb

func (*InfluxdbSender) Close

func (s *InfluxdbSender) Close() error

func (*InfluxdbSender) Name

func (s *InfluxdbSender) Name() string

func (*InfluxdbSender) Send

func (s *InfluxdbSender) Send(datas []Data) error

type KafkaSender added in v1.3.6

type KafkaSender struct {
	// contains filtered or unexported fields
}

func (*KafkaSender) Close added in v1.3.6

func (this *KafkaSender) Close() (err error)

func (*KafkaSender) Name added in v1.3.6

func (this *KafkaSender) Name() string

func (*KafkaSender) Send added in v1.3.6

func (this *KafkaSender) Send(data []Data) error

type MTypeContains added in v1.4.2

type MTypeContains struct{}

senderData 中包含

type MTypeEqual added in v1.4.2

type MTypeEqual struct{}

两个值完全相等

type MatchType added in v1.4.2

type MatchType func() mType

type MockSender

type MockSender struct {
	// contains filtered or unexported fields
}

func (*MockSender) Close

func (mock *MockSender) Close() error

func (*MockSender) Name

func (mock *MockSender) Name() string

Name function will return the name and datas recieved as string

func (*MockSender) Send

func (mock *MockSender) Send(d []Data) error

func (*MockSender) SendCount

func (mock *MockSender) SendCount() int

type MongoAccSender

type MongoAccSender struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

MongoAccSender Mongodb 根据UpdateKey 做对AccumulateKey $inc 累加的Sender

func (*MongoAccSender) Close

func (s *MongoAccSender) Close() error

func (*MongoAccSender) Name

func (s *MongoAccSender) Name() string

func (*MongoAccSender) Send

func (s *MongoAccSender) Send(datas []Data) (se error)

Send 依次尝试发送数据到mongodb,返回错误中包含所有写失败的数据 如果要保证每次send的原子性,必须保证datas长度为1,否则当程序宕机 总会出现丢失数据的问题

type PandoraOption

type PandoraOption struct {
	// contains filtered or unexported fields
}

PandoraOption 创建Pandora Sender的选项

type PandoraSender

type PandoraSender struct {
	UserSchema UserSchema
	// contains filtered or unexported fields
}

PandoraSender pandora sender

func (*PandoraSender) Close

func (s *PandoraSender) Close() error

func (*PandoraSender) Name

func (s *PandoraSender) Name() string

func (*PandoraSender) Send

func (s *PandoraSender) Send(datas []Data) (se error)

func (*PandoraSender) UpdateSchemas

func (s *PandoraSender) UpdateSchemas()

type Point

type Point struct {
	Measurement string
	Tags        map[string]string
	Fields      map[string]interface{}
	Time        int64
}

func (*Point) GetFields

func (p *Point) GetFields() []byte

func (*Point) Key

func (p *Point) Key() []byte

func (*Point) String

func (p *Point) String() string

type Points

type Points []Point

func (Points) Buffer

func (ps Points) Buffer() []byte

type Router added in v1.4.2

type Router struct {
	// contains filtered or unexported fields
}

func NewSenderRouter added in v1.4.2

func NewSenderRouter(conf RouterConfig, senderCnt int) (*Router, error)

func (*Router) GetSenderIndex added in v1.4.2

func (r *Router) GetSenderIndex(data Data) int

type RouterConfig added in v1.4.2

type RouterConfig struct {
	KeyName      string         `json:"router_key_name"`
	MatchType    string         `json:"router_match_type"`
	DefaultIndex int            `json:"router_default_sender"`
	Routes       map[string]int `json:"router_routes"`
}

type Sender

type Sender interface {
	Name() string
	// send data, error if failed
	Send([]Data) error
	Close() error
}

Sender send data to pandora, prometheus such different destinations

func NewDiscardSender

func NewDiscardSender(c conf.MapConf) (Sender, error)

NewDiscardSender 仅用于日志清理

func NewElasticSender

func NewElasticSender(conf conf.MapConf) (sender Sender, err error)

NewElasticSender New ElasticSender

func NewFileSender

func NewFileSender(conf conf.MapConf) (sender Sender, err error)

NewFileSender construct

func NewHttpSender added in v1.4.2

func NewHttpSender(c conf.MapConf) (Sender, error)

func NewInfluxdbSender

func NewInfluxdbSender(c conf.MapConf) (s Sender, err error)

NewInfluxdbSender 创建Influxdb 的sender

func NewKafkaSender added in v1.3.6

func NewKafkaSender(conf conf.MapConf) (sender Sender, err error)

func NewMockSender

func NewMockSender(c conf.MapConf) (Sender, error)

NewMockSender 测试用sender

func NewMongodbAccSender

func NewMongodbAccSender(conf conf.MapConf) (sender Sender, err error)

NewMongodbAccSender mongodb accumulate sender constructor

func NewPandoraSender

func NewPandoraSender(conf conf.MapConf) (sender Sender, err error)

NewPandoraSender pandora sender constructor

type SenderRegistry

type SenderRegistry struct {
	// contains filtered or unexported fields
}

SenderRegistry sender 的工厂类。可以注册自定义sender

func NewSenderRegistry

func NewSenderRegistry() *SenderRegistry

func (*SenderRegistry) NewSender

func (r *SenderRegistry) NewSender(conf conf.MapConf, ftSaveLogPath string) (sender Sender, err error)

func (*SenderRegistry) RegisterSender

func (registry *SenderRegistry) RegisterSender(senderType string, constructor func(conf.MapConf) (Sender, error)) error

type StatsSender added in v1.3.0

type StatsSender interface {
	Name() string
	// send data, error if failed
	Send([]Data) error
	Close() error
	Stats() utils.StatsInfo
	// 恢复 sender 停止之前的状态
	Restore(*utils.StatsInfo)
}

type Tags

type Tags map[string]string

func (Tags) HashKey

func (t Tags) HashKey() []byte

HashKey hashes all of a tag's keys.

type UserSchema

type UserSchema struct {
	DefaultAll bool
	Fields     map[string]string
}

UserSchema was parsed pandora schema from user's raw schema

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL