sender

package
v1.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 13, 2017 License: Apache-2.0 Imports: 29 Imported by: 62

Documentation

Index

Constants

View Source
const (
	KeyElasticHost  = "elastic_host"
	KeyElasticIndex = "elastic_index"
	KeyElasticType  = "elastic_type"
	KeyElasticAlias = "elastic_keys"
)
View Source
const (
	KeyFtSyncEvery         = "ft_sync_every"    // 该参数设置多少次写入会同步一次offset log
	KeyFtSaveLogPath       = "ft_save_log_path" // disk queue 数据日志路径
	KeyFtWriteLimit        = "ft_write_limit"   // 写入速度限制,单位MB
	KeyFtStrategy          = "ft_strategy"      // ft 的策略
	KeyFtProcs             = "ft_procs"         // ft并发数,当always_save或concurrent策略时启用
	KeyFtMemoryChannel     = "ft_memory_channel"
	KeyFtMemoryChannelSize = "ft_memory_channel_size"
)

可选参数 fault_tolerant 为true的话,以下必填

View Source
const (
	// KeyFtStrategyBackupOnly 只在失败的时候进行容错
	KeyFtStrategyBackupOnly = "backup_only"
	// KeyFtStrategyAlwaysSave 所有数据都进行容错
	KeyFtStrategyAlwaysSave = "always_save"
	// KeyFtStrategyConcurrent 适合并发发送数据,只在失败的时候进行容错
	KeyFtStrategyConcurrent = "concurrent"
)

ft 策略

View Source
const (
	KeyInfluxdbHost               = "influxdb_host"
	KeyInfluxdbDB                 = "influxdb_db"
	KeyInfluxdbRetetion           = "influxdb_retention"
	KeyInfluxdbMeasurement        = "influxdb_measurement"
	KeyInfluxdbTags               = "influxdb_tags"
	KeyInfluxdbFields             = "influxdb_fields"              // influxdb
	KeyInfluxdbTimestamp          = "influxdb_timestamp"           // 可选 nano时间戳字段
	KeyInfluxdbTimestampPrecision = "influxdb_timestamp_precision" // 时间戳字段的精度,代表时间戳1个单位代表多少纳秒
)

Influxdb sender 的可配置字段

View Source
const (
	KeyMongodbHost       = "mongodb_host"
	KeyMongodbDB         = "mongodb_db"
	KeyMongodbCollection = "mongodb_collection"
)

可选参数 当sender_type 为mongodb_* 的时候,需要必填的字段

View Source
const (
	KeyMongodbUpdateKey = "mongodb_acc_updkey"
	KeyMongodbAccKey    = "mongodb_acc_acckey"
)

可选参数 当sender_type 为mongodb_acc 的时候,需要必填的字段

View Source
const (
	KeyPandoraAk                   = "pandora_ak"
	KeyPandoraSk                   = "pandora_sk"
	KeyPandoraHost                 = "pandora_host"
	KeyPandoraRepoName             = "pandora_repo_name"
	KeyPandoraRegion               = "pandora_region"
	KeyPandoraSchema               = "pandora_schema"
	KeyPandoraSchemaUpdateInterval = "pandora_schema_update_interval"
	KeyPandoraAutoCreate           = "pandora_auto_create"
	KeyPandoraSchemaFree           = "pandora_schema_free"
	KeyPandoraEnableLogDB          = "pandora_enable_logdb"
	KeyPandoraLogDBName            = "pandora_logdb_name"
	KeyPandoraLogDBHost            = "pandora_logdb_host"
	KeyRequestRateLimit            = "request_rate_limit"
	KeyFlowRateLimit               = "flow_rate_limit"
	KeyPandoraGzip                 = "pandora_gzip"
	KeyPandoraUUID                 = "pandora_uuid"
	KeyForceMicrosecond            = "force_microsecond"

	PandoraUUID = "Pandora_UUID"
)

可选参数 当sender_type 为pandora 的时候,需要必填的字段

View Source
const (
	PandoraTypeLong   = "long"
	PandoraTypeFloat  = "float"
	PandoraTypeString = "string"
	PandoraTypeDate   = "date"
	PandoraTypeBool   = "boolean"
	PandoraTypeArray  = "array"
	PandoraTypeMap    = "map"
)
View Source
const (
	KeySenderType    = "sender_type"
	KeyFaultTolerant = "fault_tolerant"
	KeyName          = "name"
	KeyRunnerName    = "runner_name"
)

Sender's conf keys

View Source
const (
	TypeFile              = "file"          // 本地文件
	TypePandora           = "pandora"       // pandora 打点
	TypeMongodbAccumulate = "mongodb_acc"   // mongodb 并且按字段聚合
	TypeInfluxdb          = "influxdb"      // influxdb
	TypeMock              = "mock"          // mock sender
	TypeDiscard           = "discard"       // discard sender
	TypeElastic           = "elasticsearch" // elastic

)

SenderType 发送类型

View Source
const DefaultFtSyncEvery = 10

Ft sender默认同步一次meta信息的数据次数

View Source
const (
	KeyFileSenderPath = "file_send_path"
)

可选参数 当sender_type 为file 的时候

View Source
const UnderfinedRunnerName = "UnderfinedRunnerName"

Variables

View Source
var (
	Codes = map[byte][]byte{
		',': []byte(`\,`),
		'"': []byte(`\"`),
		' ': []byte(`\ `),
		'=': []byte(`\=`),
	}
)
View Source
var ErrNotAsyncSender = errors.New("This Sender does not support for Async Push")

NotAsyncSender return when sender is not async

View Source
var ModeKeyOptions = map[string][]utils.Option{
	TypeFile: {
		{
			KeyName:      KeyFileSenderPath,
			ChooseOnly:   false,
			Default:      "/home/john/mylogs/my.log",
			DefaultNoUse: true,
			Description:  "发送到的目的文件路径(file_send_path)",
		},
	},
	TypePandora: {
		{
			KeyName:      KeyPandoraRepoName,
			ChooseOnly:   false,
			Default:      "my_work",
			DefaultNoUse: true,
			Description:  "Pandora 数据源名称(pandora_repo_name)",
			CheckRegex:   "^[a-zA-Z][a-zA-Z0-9_]{0,127}$",
		},
		{
			KeyName:      KeyPandoraAk,
			ChooseOnly:   false,
			Default:      "在此填写您七牛账号ak(access_key)",
			DefaultNoUse: true,
			Description:  "七牛的公钥(access_key)",
		},
		{
			KeyName:      KeyPandoraSk,
			ChooseOnly:   false,
			Default:      "在此填写您七牛账号的secret_key",
			DefaultNoUse: true,
			Description:  "七牛的私钥(secret_key)",
		},
		{
			KeyName:       KeyFaultTolerant,
			ChooseOnly:    true,
			ChooseOptions: []string{"true", "false"},
			Default:       "true",
			DefaultNoUse:  false,
			Description:   "是否开启磁盘发送管道(fault_tolerant)",
		},
		{
			KeyName:      KeyFtSaveLogPath,
			ChooseOnly:   false,
			Default:      "/disk1/ftsendor/",
			DefaultNoUse: true,
			Description:  "管道本地盘数据保存路径(ft_save_log_path)",
		},
		{
			KeyName:      KeyPandoraHost,
			ChooseOnly:   false,
			Default:      "https://pipeline.qiniu.com",
			DefaultNoUse: false,
			Description:  "Host地址(pandora_host)",
		},
		{
			KeyName:       KeyPandoraRegion,
			ChooseOnly:    true,
			ChooseOptions: []string{"nb"},
			Default:       "nb",
			DefaultNoUse:  false,
			Description:   "创建的资源所在区域(pandora_region)",
		},
		{
			KeyName:       KeyPandoraSchemaFree,
			ChooseOnly:    true,
			ChooseOptions: []string{"true", "false"},
			Default:       "true",
			DefaultNoUse:  false,
			Description:   "是否根据数据自动创建与增加字段(pandora_schema_free)",
		},
		{
			KeyName:      KeyPandoraAutoCreate,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "以DSL语法自动创建repo(pandora_auto_create)",
		},
		{
			KeyName:      KeyPandoraSchema,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "仅选择部分字段(重命名)发送(pandora_schema)",
		},
		{
			KeyName:       KeyPandoraEnableLogDB,
			ChooseOnly:    true,
			ChooseOptions: []string{"true", "false"},
			Default:       "true",
			DefaultNoUse:  false,
			Description:   "是否自动创建并导出到Pandora LogDB(pandora_enable_logdb)",
		},
		{
			KeyName:      KeyPandoraLogDBName,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "导出的 LogDB 仓库名称(pandora_logdb_name)",
		},
		{
			KeyName:      KeyPandoraLogDBHost,
			ChooseOnly:   false,
			Default:      "https://logdb.qiniu.com",
			DefaultNoUse: false,
			Description:  "LogDB host 地址(pandora_logdb_host)",
		},
		{
			KeyName:       KeyPandoraGzip,
			ChooseOnly:    true,
			ChooseOptions: []string{"true", "false"},
			Default:       "true",
			DefaultNoUse:  false,
			Description:   "gzip压缩发送(pandora_gzip)",
		},
		{
			KeyName:      KeyFlowRateLimit,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "流量限制(KB/s)(flow_rate_limit)",
			CheckRegex:   "\\d+",
		},
		{
			KeyName:      KeyRequestRateLimit,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "请求限制(次/s)(request_rate_limit)",
			CheckRegex:   "\\d+",
		},
		{
			KeyName:      KeyPandoraUUID,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "每条数据植入UUID(pandora_uuid)",
		},
		{
			KeyName:      KeyFtWriteLimit,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "磁盘写入限速(MB/s)(ft_write_limit)",
			CheckRegex:   "\\d+",
		},
		{
			KeyName:      KeyFtSyncEvery,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "同步meta的间隔(ft_sync_every)",
			CheckRegex:   "\\d+",
		},
		{
			KeyName:       KeyFtStrategy,
			ChooseOnly:    true,
			ChooseOptions: []string{KeyFtStrategyBackupOnly, KeyFtStrategyAlwaysSave, KeyFtStrategyConcurrent},
			Default:       KeyFtStrategyBackupOnly,
			DefaultNoUse:  false,
			Description:   "磁盘管道容错策略(仅备份错误|全部数据走管道)(ft_strategy)",
		},
		{
			KeyName:      KeyFtProcs,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "发送并发数量(磁盘管道或内存管道 always_save 或 concurrent 模式生效)(ft_procs)",
			CheckRegex:   "\\d+",
		},
		{
			KeyName:       KeyFtMemoryChannel,
			ChooseOnly:    true,
			ChooseOptions: []string{"false", "true"},
			Default:       "false",
			DefaultNoUse:  false,
			Description:   "使用内存替换磁盘管道(加速)(ft_memory_channel)",
		},
		{
			KeyName:      KeyFtMemoryChannelSize,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "内存管道长度(ft_memory_channel_size)",
			CheckRegex:   "\\d+",
		},
		{
			KeyName:       KeyForceMicrosecond,
			ChooseOnly:    true,
			ChooseOptions: []string{"false", "true"},
			Default:       "false",
			DefaultNoUse:  false,
			Description:   "对于数据的时间字段抖动(force_microsecond)",
		},
	},
	TypeMongodbAccumulate: {
		{
			KeyName:      KeyMongodbHost,
			ChooseOnly:   false,
			Default:      "mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]",
			DefaultNoUse: true,
			Description:  "数据库地址(mongodb_host)",
		},
		{
			KeyName:      KeyMongodbDB,
			ChooseOnly:   false,
			Default:      "app123",
			DefaultNoUse: true,
			Description:  "数据库名称(mongodb_db)",
		},
		{
			KeyName:      KeyMongodbCollection,
			ChooseOnly:   false,
			Default:      "collection1",
			DefaultNoUse: true,
			Description:  "数据表名称(mongodb_collection)",
		},
		{
			KeyName:      KeyMongodbUpdateKey,
			ChooseOnly:   false,
			Default:      "domain,uid",
			DefaultNoUse: true,
			Description:  "聚合条件列(mongodb_acc_updkey)",
		},
		{
			KeyName:      KeyMongodbAccKey,
			ChooseOnly:   false,
			Default:      "low,hit",
			DefaultNoUse: true,
			Description:  "聚合列(mongodb_acc_acckey)",
		},
		{
			KeyName:       KeyFaultTolerant,
			ChooseOnly:    true,
			ChooseOptions: []string{"true", "false"},
			Default:       "true",
			DefaultNoUse:  false,
			Description:   "是否开启磁盘发送管道(fault_tolerant)",
		},
		{
			KeyName:      KeyFtSaveLogPath,
			ChooseOnly:   false,
			Default:      "/disk1/ftsendor/",
			DefaultNoUse: true,
			Description:  "管道本地盘数据保存路径(ft_save_log_path)",
		},
		{
			KeyName:      KeyFtWriteLimit,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "磁盘写入限速(MB/s)(ft_write_limit)",
			CheckRegex:   "\\d+",
		},
		{
			KeyName:      KeyFtSyncEvery,
			ChooseOnly:   false,
			Default:      "10",
			DefaultNoUse: false,
			Description:  "同步meta的间隔(ft_sync_every)",
		},
		{
			KeyName:       KeyFtStrategy,
			ChooseOnly:    true,
			ChooseOptions: []string{KeyFtStrategyBackupOnly, KeyFtStrategyAlwaysSave, KeyFtStrategyConcurrent},
			Default:       KeyFtStrategyBackupOnly,
			DefaultNoUse:  false,
			Description:   "磁盘管道容错策略(仅备份错误|全部数据走管道)(ft_strategy)",
		},
		{
			KeyName:      KeyFtProcs,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "发送并发数量(磁盘管道或内存管道 always_save 或 concurrent 模式生效)(ft_procs)",
			CheckRegex:   "\\d+",
		},
		{
			KeyName:       KeyFtMemoryChannel,
			ChooseOnly:    true,
			ChooseOptions: []string{"false", "true"},
			Default:       "false",
			DefaultNoUse:  false,
			Description:   "使用内存替换磁盘管道(加速)(ft_memory_channel)",
		},
		{
			KeyName:      KeyFtMemoryChannelSize,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "内存管道长度(ft_memory_channel_size)",
			CheckRegex:   "\\d+",
		},
	},
	TypeInfluxdb: {
		{
			KeyName:      KeyInfluxdbHost,
			ChooseOnly:   false,
			Default:      "127.0.0.1:8086",
			DefaultNoUse: true,
			Description:  "数据库地址(influxdb_host)",
		},
		{
			KeyName:      KeyInfluxdbDB,
			ChooseOnly:   false,
			Default:      "testdb",
			DefaultNoUse: true,
			Description:  "数据库名称(influxdb_db)",
		},
		{
			KeyName:      KeyInfluxdbMeasurement,
			ChooseOnly:   false,
			Default:      "test_table",
			DefaultNoUse: true,
			Description:  "measurement名称(influxdb_measurement)",
		},
		{
			KeyName:      KeyInfluxdbRetetion,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "retention名称(influxdb_retention)",
		},
		{
			KeyName:      KeyInfluxdbTags,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "标签列数据(influxdb_tags)",
		},
		{
			KeyName:      KeyInfluxdbFields,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "普通列数据(influxdb_fields)",
		},
		{
			KeyName:      KeyInfluxdbTimestamp,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "时间戳列(influxdb_timestamp)",
		},
		{
			KeyName:      KeyInfluxdbTimestampPrecision,
			ChooseOnly:   false,
			Default:      "100",
			DefaultNoUse: false,
			Description:  "时间戳列精度调整(influxdb_timestamp_precision)",
		},
		{
			KeyName:       KeyFaultTolerant,
			ChooseOnly:    true,
			ChooseOptions: []string{"true", "false"},
			Default:       "true",
			DefaultNoUse:  false,
			Description:   "是否开启磁盘发送管道(fault_tolerant)",
		},
		{
			KeyName:      KeyFtSaveLogPath,
			ChooseOnly:   false,
			Default:      "/disk1/ftsendor/",
			DefaultNoUse: true,
			Description:  "管道本地盘数据保存路径(ft_save_log_path)",
		},
		{
			KeyName:      KeyFtWriteLimit,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "磁盘写入限速(MB/s)(ft_write_limit)",
			CheckRegex:   "\\d+",
		},
		{
			KeyName:      KeyFtSyncEvery,
			ChooseOnly:   false,
			Default:      "10",
			DefaultNoUse: false,
			Description:  "同步meta的间隔(ft_sync_every)",
		},
		{
			KeyName:       KeyFtStrategy,
			ChooseOnly:    true,
			ChooseOptions: []string{KeyFtStrategyBackupOnly, KeyFtStrategyAlwaysSave, KeyFtStrategyConcurrent},
			Default:       KeyFtStrategyBackupOnly,
			DefaultNoUse:  false,
			Description:   "磁盘管道容错策略(仅备份错误|全部数据走管道)(ft_strategy)",
		},
		{
			KeyName:      KeyFtProcs,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "发送并发数量(磁盘管道或内存管道 always_save 或 concurrent 模式生效)(ft_procs)",
			CheckRegex:   "\\d+",
		},
		{
			KeyName:       KeyFtMemoryChannel,
			ChooseOnly:    true,
			ChooseOptions: []string{"false", "true"},
			Default:       "false",
			DefaultNoUse:  false,
			Description:   "使用内存替换磁盘管道(加速)(ft_memory_channel)",
		},
		{
			KeyName:      KeyFtMemoryChannelSize,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "内存管道长度(ft_memory_channel_size)",
			CheckRegex:   "\\d+",
		},
	},
	TypeDiscard: {},
	TypeElastic: {
		{
			KeyName:      KeyElasticHost,
			ChooseOnly:   false,
			Default:      "localhost:9200",
			DefaultNoUse: false,
			Description:  "host地址(elastic_host)",
		},
		{
			KeyName:      KeyElasticIndex,
			ChooseOnly:   false,
			Default:      "app-repo-123",
			DefaultNoUse: true,
			Description:  "索引名称(elastic_index)",
		},
		{
			KeyName:      KeyElasticType,
			ChooseOnly:   false,
			Default:      "app",
			DefaultNoUse: true,
			Description:  "索引类型名称(elastic_type)",
		},
		{
			KeyName:       KeyFaultTolerant,
			ChooseOnly:    true,
			ChooseOptions: []string{"true", "false"},
			Default:       "true",
			DefaultNoUse:  false,
			Description:   "是否开启磁盘发送管道(fault_tolerant)",
		},
		{
			KeyName:      KeyFtSaveLogPath,
			ChooseOnly:   false,
			Default:      "/disk1/ftsendor/",
			DefaultNoUse: true,
			Description:  "管道本地盘数据保存路径(ft_save_log_path)",
		},
		{
			KeyName:      KeyFtWriteLimit,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "磁盘写入限速(MB/s)(ft_write_limit)",
			CheckRegex:   "\\d+",
		},
		{
			KeyName:      KeyFtSyncEvery,
			ChooseOnly:   false,
			Default:      "10",
			DefaultNoUse: false,
			Description:  "同步meta的间隔(ft_sync_every)",
		},
		{
			KeyName:       KeyFtStrategy,
			ChooseOnly:    true,
			ChooseOptions: []string{KeyFtStrategyBackupOnly, KeyFtStrategyAlwaysSave, KeyFtStrategyConcurrent},
			Default:       KeyFtStrategyBackupOnly,
			DefaultNoUse:  false,
			Description:   "磁盘管道容错策略(仅备份错误|全部数据走管道)(ft_strategy)",
		},
		{
			KeyName:      KeyFtProcs,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "发送并发数量(磁盘管道或内存管道 always_save 或 concurrent 模式生效)(ft_procs)",
			CheckRegex:   "\\d+",
		},
		{
			KeyName:       KeyFtMemoryChannel,
			ChooseOnly:    true,
			ChooseOptions: []string{"false", "true"},
			Default:       "false",
			DefaultNoUse:  false,
			Description:   "使用内存替换磁盘管道(加速)(ft_memory_channel)",
		},
		{
			KeyName:      KeyFtMemoryChannelSize,
			ChooseOnly:   false,
			Default:      "",
			DefaultNoUse: false,
			Description:  "内存管道长度(ft_memory_channel_size)",
			CheckRegex:   "\\d+",
		},
	},
}
View Source
var ModeUsages = []utils.KeyValue{
	{TypePandora, "发送到 Pandora"},
	{TypeFile, "发送到本地文件"},
	{TypeMongodbAccumulate, "发送到 mongodb"},
	{TypeInfluxdb, "发送到 influxdb"},
	{TypeDiscard, "消费数据但不发送"},
	{TypeElastic, "发送到Elasticsearch"},
}

ModeUsages 用途说明

View Source
var PandoraMaxBatchSize = 2 * 1024 * 1024

PandoraMaxBatchSize 发送到Pandora的batch限制

Functions

func ConvertDatasBack added in v1.2.2

func ConvertDatasBack(ins []Data) []map[string]interface{}

func JSONLineMarshalFunc

func JSONLineMarshalFunc(datas []Data) ([]byte, error)

JSONLineMarshalFunc 将数据json并且按换行符分隔

func MakeKey

func MakeKey(name []byte, tags Tags) []byte

MakeKey creates a key for a set of tags.

func String

func String(in string) string

func UnescapeString

func UnescapeString(in string) string

Types

type Data

type Data map[string]interface{}

Data store as use key/value map e.g sum -> 1.2, url -> qiniu.com

func ConvertDatas added in v1.2.2

func ConvertDatas(ins []map[string]interface{}) []Data

type DiscardSender

type DiscardSender struct {
	// contains filtered or unexported fields
}

func (*DiscardSender) Close

func (s *DiscardSender) Close() error

func (*DiscardSender) Name

func (s *DiscardSender) Name() string

Name function will return the name as string

func (*DiscardSender) Send

func (s *DiscardSender) Send(d []Data) error

func (*DiscardSender) SendCount

func (s *DiscardSender) SendCount() int

type ElasticsearchSender

type ElasticsearchSender struct {
	// contains filtered or unexported fields
}

func (*ElasticsearchSender) Close

func (this *ElasticsearchSender) Close() error

func (*ElasticsearchSender) Name

func (this *ElasticsearchSender) Name() string

func (*ElasticsearchSender) Send

func (this *ElasticsearchSender) Send(data []Data) (err error)

type FileSender

type FileSender struct {
	// contains filtered or unexported fields
}

FileSender write datas into local file only for test

func (*FileSender) Close

func (fs *FileSender) Close() error

func (*FileSender) Name

func (fs *FileSender) Name() string

func (*FileSender) Send

func (fs *FileSender) Send(datas []Data) error

Send inherit from Sender

type FtOption added in v1.2.2

type FtOption struct {
	// contains filtered or unexported fields
}

type FtSender

type FtSender struct {
	// contains filtered or unexported fields
}

FtSender fault tolerance sender wrapper

func NewFtSender

func NewFtSender(sender Sender, conf conf.MapConf) (*FtSender, error)

NewFtSender Fault tolerant sender constructor

func (*FtSender) Close

func (ft *FtSender) Close() error

func (*FtSender) Name

func (ft *FtSender) Name() string

func (*FtSender) Reset added in v1.3.1

func (ft *FtSender) Reset() error

func (*FtSender) Send

func (ft *FtSender) Send(datas []Data) error

func (*FtSender) Stats added in v1.3.0

func (ft *FtSender) Stats() utils.StatsInfo

type InfluxdbSender

type InfluxdbSender struct {
	// contains filtered or unexported fields
}

InfluxdbSender write datas into influxdb

func (*InfluxdbSender) Close

func (s *InfluxdbSender) Close() error

func (*InfluxdbSender) Name

func (s *InfluxdbSender) Name() string

func (*InfluxdbSender) Send

func (s *InfluxdbSender) Send(datas []Data) error

type MockSender

type MockSender struct {
	// contains filtered or unexported fields
}

func (*MockSender) Close

func (mock *MockSender) Close() error

func (*MockSender) Name

func (mock *MockSender) Name() string

Name function will return the name and datas recieved as string

func (*MockSender) Send

func (mock *MockSender) Send(d []Data) error

func (*MockSender) SendCount

func (mock *MockSender) SendCount() int

type MongoAccSender

type MongoAccSender struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

MongoAccSender Mongodb 根据UpdateKey 做对AccumulateKey $inc 累加的Sender

func (*MongoAccSender) Close

func (s *MongoAccSender) Close() error

func (*MongoAccSender) Name

func (s *MongoAccSender) Name() string

func (*MongoAccSender) Send

func (s *MongoAccSender) Send(datas []Data) (se error)

Send 依次尝试发送数据到mongodb,返回错误中包含所有写失败的数据 如果要保证每次send的原子性,必须保证datas长度为1,否则当程序宕机 总会出现丢失数据的问题

type PandoraOption

type PandoraOption struct {
	// contains filtered or unexported fields
}

PandoraOption 创建Pandora Sender的选项

type PandoraSender

type PandoraSender struct {
	UserSchema UserSchema
	// contains filtered or unexported fields
}

PandoraSender pandora sender

func (*PandoraSender) Close

func (s *PandoraSender) Close() error

func (*PandoraSender) Name

func (s *PandoraSender) Name() string

func (*PandoraSender) Send

func (s *PandoraSender) Send(datas []Data) (se error)

func (*PandoraSender) Stats added in v1.3.0

func (s *PandoraSender) Stats() utils.StatsInfo

func (*PandoraSender) UpdateSchemas

func (s *PandoraSender) UpdateSchemas()

type Point

type Point struct {
	Measurement string
	Tags        map[string]string
	Fields      map[string]interface{}
	Time        int64
}

func (*Point) GetFields

func (p *Point) GetFields() []byte

func (*Point) Key

func (p *Point) Key() []byte

func (*Point) String

func (p *Point) String() string

type Points

type Points []Point

func (Points) Buffer

func (ps Points) Buffer() []byte

type Sender

type Sender interface {
	Name() string
	// send data, error if failed
	Send([]Data) error
	Close() error
}

Sender send data to pandora, prometheus such different destinations

func NewDiscardSender

func NewDiscardSender(c conf.MapConf) (Sender, error)

NewDiscardSender 仅用于日志清理

func NewElasticSender

func NewElasticSender(conf conf.MapConf) (sender Sender, err error)

func NewFileSender

func NewFileSender(conf conf.MapConf) (sender Sender, err error)

NewFileSender construct

func NewInfluxdbSender

func NewInfluxdbSender(c conf.MapConf) (s Sender, err error)

NewInfluxdbSender 创建Influxdb 的sender

func NewMockSender

func NewMockSender(c conf.MapConf) (Sender, error)

NewMockSender 测试用sender

func NewMongodbAccSender

func NewMongodbAccSender(conf conf.MapConf) (sender Sender, err error)

NewMongodbAccSender mongodb accumulate sender constructor

func NewPandoraSender

func NewPandoraSender(conf conf.MapConf) (sender Sender, err error)

NewPandoraSender pandora sender constructor

type SenderRegistry

type SenderRegistry struct {
	// contains filtered or unexported fields
}

SenderRegistry sender 的工厂类。可以注册自定义sender

func NewSenderRegistry

func NewSenderRegistry() *SenderRegistry

func (*SenderRegistry) NewSender

func (r *SenderRegistry) NewSender(conf conf.MapConf) (sender Sender, err error)

func (*SenderRegistry) RegisterSender

func (registry *SenderRegistry) RegisterSender(senderType string, constructor func(conf.MapConf) (Sender, error)) error

type StatsSender added in v1.3.0

type StatsSender interface {
	Name() string
	// send data, error if failed
	Send([]Data) error
	Close() error
	Stats() utils.StatsInfo
}

type Tags

type Tags map[string]string

func (Tags) HashKey

func (t Tags) HashKey() []byte

HashKey hashes all of a tag's keys.

type UserSchema

type UserSchema struct {
	DefaultAll bool
	Fields     map[string]string
}

UserSchema was parsed pandora schema from user's raw schema

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL