Documentation ¶
Index ¶
- Constants
- Variables
- func ConvertDatasBack(ins []Data) []map[string]interface{}
- func JSONLineMarshalFunc(datas []Data) ([]byte, error)
- func MakeKey(name []byte, tags Tags) []byte
- func String(in string) string
- func UnescapeString(in string) string
- type Data
- type DiscardSender
- type ElasticsearchSender
- type FileSender
- type FtOption
- type FtSender
- type InfluxdbSender
- type MockSender
- type MongoAccSender
- type PandoraOption
- type PandoraSender
- type Point
- type Points
- type Sender
- func NewDiscardSender(c conf.MapConf) (Sender, error)
- func NewElasticSender(conf conf.MapConf) (sender Sender, err error)
- func NewFileSender(conf conf.MapConf) (sender Sender, err error)
- func NewInfluxdbSender(c conf.MapConf) (s Sender, err error)
- func NewMockSender(c conf.MapConf) (Sender, error)
- func NewMongodbAccSender(conf conf.MapConf) (sender Sender, err error)
- func NewPandoraSender(conf conf.MapConf) (sender Sender, err error)
- type SenderRegistry
- type Tags
- type UserSchema
Constants ¶
const ( KeyElasticHost = "elastic_host" KeyElasticIndex = "elastic_index" KeyElasticType = "elastic_type" KeyElasticAlias = "elastic_keys" )
const ( KeyFtSyncEvery = "ft_sync_every" // 该参数设置多少次写入会同步一次offset log KeyFtSaveLogPath = "ft_save_log_path" // disk queue 数据日志路径 KeyFtWriteLimit = "ft_write_limit" // 写入速度限制,单位MB KeyFtStrategy = "ft_strategy" // ft 的策略 KeyFtProcs = "ft_procs" // ft并发数,当always_save 策略时启用 KeyFtMemoryChannel = "ft_memory_channel" KeyFtMemoryChannelSize = "ft_memory_channel_size" )
可选参数 fault_tolerant 为true的话,以下必填
const ( // KeyFtStrategyBackupOnly 只在失败的时候进行容错 KeyFtStrategyBackupOnly = "backup_only" // KeyFtStrategyAlwaysSave 所有数据都进行容错 KeyFtStrategyAlwaysSave = "always_save" )
ft 策略
const ( KeyInfluxdbHost = "influxdb_host" KeyInfluxdbDB = "influxdb_db" KeyInfluxdbRetetion = "influxdb_retention" KeyInfluxdbMeasurement = "influxdb_measurement" KeyInfluxdbTags = "influxdb_tags" KeyInfluxdbFields = "influxdb_fields" // influxdb KeyInfluxdbTimestamp = "influxdb_timestamp" // 可选 nano时间戳字段 KeyInfluxdbTimestampPrecision = "influxdb_timestamp_precision" // 时间戳字段的精度,代表时间戳1个单位代表多少纳秒 )
Influxdb sender 的可配置字段
const ( KeyMongodbHost = "mongodb_host" KeyMongodbDB = "mongodb_db" KeyMongodbCollection = "mongodb_collection" )
可选参数 当sender_type 为mongodb_* 的时候,需要必填的字段
const ( KeyMongodbUpdateKey = "mongodb_acc_updkey" KeyMongodbAccKey = "mongodb_acc_acckey" )
可选参数 当sender_type 为mongodb_acc 的时候,需要必填的字段
const ( KeyPandoraAk = "pandora_ak" KeyPandoraSk = "pandora_sk" KeyPandoraHost = "pandora_host" KeyPandoraRepoName = "pandora_repo_name" KeyPandoraRegion = "pandora_region" KeyPandoraSchema = "pandora_schema" KeyPandoraSchemaUpdateInterval = "pandora_schema_update_interval" KeyPandoraAutoCreate = "pandora_auto_create" KeyPandoraSchemaFree = "pandora_schema_free" KeyPandoraEnableLogDB = "pandora_enable_logdb" KeyPandoraLogDBName = "pandora_logdb_name" KeyPandoraLogDBHost = "pandora_logdb_host" KeyRequestRateLimit = "request_rate_limit" KeyFlowRateLimit = "flow_rate_limit" KeyPandoraGzip = "pandora_gzip" KeyPandoraUUID = "pandora_uuid" KeyForceMicrosecond = "force_microsecond" PandoraUUID = "Pandora_UUID" )
可选参数 当sender_type 为pandora 的时候,需要必填的字段
const ( PandoraTypeLong = "long" PandoraTypeFloat = "float" PandoraTypeString = "string" PandoraTypeDate = "date" PandoraTypeBool = "boolean" PandoraTypeArray = "array" PandoraTypeMap = "map" )
const ( KeySenderType = "sender_type" KeyFaultTolerant = "fault_tolerant" KeyName = "name" KeyRunnerName = "runner_name" )
Sender's conf keys
const ( TypeFile = "file" // 本地文件 TypePandora = "pandora" // pandora 打点 TypeMongodbAccumulate = "mongodb_acc" // mongodb 并且按字段聚合 TypeInfluxdb = "influxdb" // influxdb TypeMock = "mock" // mock sender TypeDiscard = "discard" // discard sender TypeElastic = "elasticsearch" // elastic )
SenderType 发送类型
const DefaultFtSyncEvery = 10
Ft sender默认同步一次meta信息的数据次数
const (
KeyFileSenderPath = "file_send_path"
)
可选参数 当sender_type 为file 的时候
const UnderfinedRunnerName = "UnderfinedRunnerName"
Variables ¶
var ( Codes = map[byte][]byte{ ',': []byte(`\,`), '"': []byte(`\"`), ' ': []byte(`\ `), '=': []byte(`\=`), } )
var ErrNotAsyncSender = errors.New("This Sender does not support for Async Push")
NotAsyncSender return when sender is not async
var PandoraMaxBatchSize = 2 * 1024 * 1024
PandoraMaxBatchSize 发送到Pandora的batch限制
Functions ¶
func ConvertDatasBack ¶ added in v1.2.2
func JSONLineMarshalFunc ¶
JSONLineMarshalFunc 将数据json并且按换行符分隔
func UnescapeString ¶
Types ¶
type Data ¶
type Data map[string]interface{}
Data store as use key/value map e.g sum -> 1.2, url -> qiniu.com
func ConvertDatas ¶ added in v1.2.2
type DiscardSender ¶
type DiscardSender struct {
// contains filtered or unexported fields
}
func (*DiscardSender) Close ¶
func (s *DiscardSender) Close() error
func (*DiscardSender) Name ¶
func (s *DiscardSender) Name() string
Name function will return the name as string
func (*DiscardSender) Send ¶
func (s *DiscardSender) Send(d []Data) error
func (*DiscardSender) SendCount ¶
func (s *DiscardSender) SendCount() int
type ElasticsearchSender ¶
type ElasticsearchSender struct {
// contains filtered or unexported fields
}
func (*ElasticsearchSender) Close ¶
func (this *ElasticsearchSender) Close() error
func (*ElasticsearchSender) Name ¶
func (this *ElasticsearchSender) Name() string
func (*ElasticsearchSender) Send ¶
func (this *ElasticsearchSender) Send(data []Data) (err error)
type FileSender ¶
type FileSender struct {
// contains filtered or unexported fields
}
FileSender write datas into local file only for test
func (*FileSender) Close ¶
func (fs *FileSender) Close() error
func (*FileSender) Name ¶
func (fs *FileSender) Name() string
type FtSender ¶
type FtSender struct {
// contains filtered or unexported fields
}
FtSender fault tolerance sender wrapper
func NewFtSender ¶
NewFtSender Fault tolerant sender constructor
type InfluxdbSender ¶
type InfluxdbSender struct {
// contains filtered or unexported fields
}
InfluxdbSender write datas into influxdb
func (*InfluxdbSender) Close ¶
func (s *InfluxdbSender) Close() error
func (*InfluxdbSender) Name ¶
func (s *InfluxdbSender) Name() string
func (*InfluxdbSender) Send ¶
func (s *InfluxdbSender) Send(datas []Data) error
type MockSender ¶
type MockSender struct {
// contains filtered or unexported fields
}
func (*MockSender) Close ¶
func (mock *MockSender) Close() error
func (*MockSender) Name ¶
func (mock *MockSender) Name() string
Name function will return the name and datas recieved as string
func (*MockSender) Send ¶
func (mock *MockSender) Send(d []Data) error
func (*MockSender) SendCount ¶
func (mock *MockSender) SendCount() int
type MongoAccSender ¶
MongoAccSender Mongodb 根据UpdateKey 做对AccumulateKey $inc 累加的Sender
func (*MongoAccSender) Close ¶
func (s *MongoAccSender) Close() error
func (*MongoAccSender) Name ¶
func (s *MongoAccSender) Name() string
func (*MongoAccSender) Send ¶
func (s *MongoAccSender) Send(datas []Data) (se error)
Send 依次尝试发送数据到mongodb,返回错误中包含所有写失败的数据 如果要保证每次send的原子性,必须保证datas长度为1,否则当程序宕机 总会出现丢失数据的问题
type PandoraOption ¶
type PandoraOption struct {
// contains filtered or unexported fields
}
PandoraOption 创建Pandora Sender的选项
type PandoraSender ¶
type PandoraSender struct { UserSchema UserSchema // contains filtered or unexported fields }
PandoraSender pandora sender
func (*PandoraSender) Close ¶
func (s *PandoraSender) Close() error
func (*PandoraSender) Name ¶
func (s *PandoraSender) Name() string
func (*PandoraSender) Send ¶
func (s *PandoraSender) Send(datas []Data) (se error)
func (*PandoraSender) UpdateSchemas ¶
func (s *PandoraSender) UpdateSchemas()
type Point ¶
type Sender ¶
type Sender interface { Name() string // send data, error if failed Send([]Data) error Close() error }
Sender send data to pandora, prometheus such different destinations
func NewDiscardSender ¶
NewDiscardSender 仅用于日志清理
func NewFileSender ¶
NewFileSender construct
func NewInfluxdbSender ¶
NewInfluxdbSender 创建Influxdb 的sender
func NewMongodbAccSender ¶
NewMongodbAccSender mongodb accumulate sender constructor
type SenderRegistry ¶
type SenderRegistry struct {
// contains filtered or unexported fields
}
SenderRegistry sender 的工厂类。可以注册自定义sender
func NewSenderRegistry ¶
func NewSenderRegistry() *SenderRegistry
func (*SenderRegistry) NewSender ¶
func (r *SenderRegistry) NewSender(conf conf.MapConf) (sender Sender, err error)
func (*SenderRegistry) RegisterSender ¶
type UserSchema ¶
UserSchema was parsed pandora schema from user's raw schema