sender

package
v1.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 1, 2017 License: Apache-2.0 Imports: 29 Imported by: 62

Documentation

Index

Constants

View Source
const (
	KeyElasticHost  = "elastic_host"
	KeyElasticIndex = "elastic_index"
	KeyElasticType  = "elastic_type"
	KeyElasticAlias = "elastic_keys"
)
View Source
const (
	KeyFtSyncEvery         = "ft_sync_every"    // 该参数设置多少次写入会同步一次offset log
	KeyFtSaveLogPath       = "ft_save_log_path" // disk queue 数据日志路径
	KeyFtWriteLimit        = "ft_write_limit"   // 写入速度限制,单位MB
	KeyFtStrategy          = "ft_strategy"      // ft 的策略
	KeyFtProcs             = "ft_procs"         // ft并发数,当always_save 策略时启用
	KeyFtMemoryChannel     = "ft_memory_channel"
	KeyFtMemoryChannelSize = "ft_memory_channel_size"
)

可选参数 fault_tolerant 为true的话,以下必填

View Source
const (
	// KeyFtStrategyBackupOnly 只在失败的时候进行容错
	KeyFtStrategyBackupOnly = "backup_only"
	// KeyFtStrategyAlwaysSave 所有数据都进行容错
	KeyFtStrategyAlwaysSave = "always_save"
)

ft 策略

View Source
const (
	KeyInfluxdbHost               = "influxdb_host"
	KeyInfluxdbDB                 = "influxdb_db"
	KeyInfluxdbRetetion           = "influxdb_retention"
	KeyInfluxdbMeasurement        = "influxdb_measurement"
	KeyInfluxdbTags               = "influxdb_tags"
	KeyInfluxdbFields             = "influxdb_fields"              // influxdb
	KeyInfluxdbTimestamp          = "influxdb_timestamp"           // 可选 nano时间戳字段
	KeyInfluxdbTimestampPrecision = "influxdb_timestamp_precision" // 时间戳字段的精度,代表时间戳1个单位代表多少纳秒
)

Influxdb sender 的可配置字段

View Source
const (
	KeyMongodbHost       = "mongodb_host"
	KeyMongodbDB         = "mongodb_db"
	KeyMongodbCollection = "mongodb_collection"
)

可选参数 当sender_type 为mongodb_* 的时候,需要必填的字段

View Source
const (
	KeyMongodbUpdateKey = "mongodb_acc_updkey"
	KeyMongodbAccKey    = "mongodb_acc_acckey"
)

可选参数 当sender_type 为mongodb_acc 的时候,需要必填的字段

View Source
const (
	KeyPandoraAk                   = "pandora_ak"
	KeyPandoraSk                   = "pandora_sk"
	KeyPandoraHost                 = "pandora_host"
	KeyPandoraRepoName             = "pandora_repo_name"
	KeyPandoraRegion               = "pandora_region"
	KeyPandoraSchema               = "pandora_schema"
	KeyPandoraSchemaUpdateInterval = "pandora_schema_update_interval"
	KeyPandoraAutoCreate           = "pandora_auto_create"
	KeyPandoraSchemaFree           = "pandora_schema_free"
	KeyPandoraEnableLogDB          = "pandora_enable_logdb"
	KeyPandoraLogDBName            = "pandora_logdb_name"
	KeyPandoraLogDBHost            = "pandora_logdb_host"
	KeyRequestRateLimit            = "request_rate_limit"
	KeyFlowRateLimit               = "flow_rate_limit"
	KeyPandoraGzip                 = "pandora_gzip"
	KeyPandoraUUID                 = "pandora_uuid"
	KeyForceMicrosecond            = "force_microsecond"

	PandoraUUID = "Pandora_UUID"
)

可选参数 当sender_type 为pandora 的时候,需要必填的字段

View Source
const (
	PandoraTypeLong   = "long"
	PandoraTypeFloat  = "float"
	PandoraTypeString = "string"
	PandoraTypeDate   = "date"
	PandoraTypeBool   = "boolean"
	PandoraTypeArray  = "array"
	PandoraTypeMap    = "map"
)
View Source
const (
	KeySenderType    = "sender_type"
	KeyFaultTolerant = "fault_tolerant"
	KeyName          = "name"
	KeyRunnerName    = "runner_name"
)

Sender's conf keys

View Source
const (
	TypeFile              = "file"          // 本地文件
	TypePandora           = "pandora"       // pandora 打点
	TypeMongodbAccumulate = "mongodb_acc"   // mongodb 并且按字段聚合
	TypeInfluxdb          = "influxdb"      // influxdb
	TypeMock              = "mock"          // mock sender
	TypeDiscard           = "discard"       // discard sender
	TypeElastic           = "elasticsearch" // elastic

)

SenderType 发送类型

View Source
const DefaultFtSyncEvery = 10

Ft sender默认同步一次meta信息的数据次数

View Source
const (
	KeyFileSenderPath = "file_send_path"
)

可选参数 当sender_type 为file 的时候

View Source
const UnderfinedRunnerName = "UnderfinedRunnerName"

Variables

View Source
var (
	Codes = map[byte][]byte{
		',': []byte(`\,`),
		'"': []byte(`\"`),
		' ': []byte(`\ `),
		'=': []byte(`\=`),
	}
)
View Source
var ErrNotAsyncSender = errors.New("This Sender does not support for Async Push")

NotAsyncSender return when sender is not async

View Source
var PandoraMaxBatchSize = 2 * 1024 * 1024

PandoraMaxBatchSize 发送到Pandora的batch限制

Functions

func ConvertDatasBack added in v1.2.2

func ConvertDatasBack(ins []Data) []map[string]interface{}

func JSONLineMarshalFunc

func JSONLineMarshalFunc(datas []Data) ([]byte, error)

JSONLineMarshalFunc 将数据json并且按换行符分隔

func MakeKey

func MakeKey(name []byte, tags Tags) []byte

MakeKey creates a key for a set of tags.

func String

func String(in string) string

func UnescapeString

func UnescapeString(in string) string

Types

type Data

type Data map[string]interface{}

Data store as use key/value map e.g sum -> 1.2, url -> qiniu.com

func ConvertDatas added in v1.2.2

func ConvertDatas(ins []map[string]interface{}) []Data

type DiscardSender

type DiscardSender struct {
	// contains filtered or unexported fields
}

func (*DiscardSender) Close

func (s *DiscardSender) Close() error

func (*DiscardSender) Name

func (s *DiscardSender) Name() string

Name function will return the name as string

func (*DiscardSender) Send

func (s *DiscardSender) Send(d []Data) error

func (*DiscardSender) SendCount

func (s *DiscardSender) SendCount() int

type ElasticsearchSender

type ElasticsearchSender struct {
	// contains filtered or unexported fields
}

func (*ElasticsearchSender) Close

func (this *ElasticsearchSender) Close() error

func (*ElasticsearchSender) Name

func (this *ElasticsearchSender) Name() string

func (*ElasticsearchSender) Send

func (this *ElasticsearchSender) Send(data []Data) (err error)

type FileSender

type FileSender struct {
	// contains filtered or unexported fields
}

FileSender write datas into local file only for test

func (*FileSender) Close

func (fs *FileSender) Close() error

func (*FileSender) Name

func (fs *FileSender) Name() string

func (*FileSender) Send

func (fs *FileSender) Send(datas []Data) error

Send inherit from Sender

type FtOption added in v1.2.2

type FtOption struct {
	// contains filtered or unexported fields
}

type FtSender

type FtSender struct {
	// contains filtered or unexported fields
}

FtSender fault tolerance sender wrapper

func NewFtSender

func NewFtSender(sender Sender, conf conf.MapConf) (*FtSender, error)

NewFtSender Fault tolerant sender constructor

func (*FtSender) Close

func (ft *FtSender) Close() error

func (*FtSender) Name

func (ft *FtSender) Name() string

func (*FtSender) Send

func (ft *FtSender) Send(datas []Data) error

type InfluxdbSender

type InfluxdbSender struct {
	// contains filtered or unexported fields
}

InfluxdbSender write datas into influxdb

func (*InfluxdbSender) Close

func (s *InfluxdbSender) Close() error

func (*InfluxdbSender) Name

func (s *InfluxdbSender) Name() string

func (*InfluxdbSender) Send

func (s *InfluxdbSender) Send(datas []Data) error

type MockSender

type MockSender struct {
	// contains filtered or unexported fields
}

func (*MockSender) Close

func (mock *MockSender) Close() error

func (*MockSender) Name

func (mock *MockSender) Name() string

Name function will return the name and datas recieved as string

func (*MockSender) Send

func (mock *MockSender) Send(d []Data) error

func (*MockSender) SendCount

func (mock *MockSender) SendCount() int

type MongoAccSender

type MongoAccSender struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

MongoAccSender Mongodb 根据UpdateKey 做对AccumulateKey $inc 累加的Sender

func (*MongoAccSender) Close

func (s *MongoAccSender) Close() error

func (*MongoAccSender) Name

func (s *MongoAccSender) Name() string

func (*MongoAccSender) Send

func (s *MongoAccSender) Send(datas []Data) (se error)

Send 依次尝试发送数据到mongodb,返回错误中包含所有写失败的数据 如果要保证每次send的原子性,必须保证datas长度为1,否则当程序宕机 总会出现丢失数据的问题

type PandoraOption

type PandoraOption struct {
	// contains filtered or unexported fields
}

PandoraOption 创建Pandora Sender的选项

type PandoraSender

type PandoraSender struct {
	UserSchema UserSchema
	// contains filtered or unexported fields
}

PandoraSender pandora sender

func (*PandoraSender) Close

func (s *PandoraSender) Close() error

func (*PandoraSender) Name

func (s *PandoraSender) Name() string

func (*PandoraSender) Send

func (s *PandoraSender) Send(datas []Data) (se error)

func (*PandoraSender) UpdateSchemas

func (s *PandoraSender) UpdateSchemas()

type Point

type Point struct {
	Measurement string
	Tags        map[string]string
	Fields      map[string]interface{}
	Time        int64
}

func (*Point) GetFields

func (p *Point) GetFields() []byte

func (*Point) Key

func (p *Point) Key() []byte

func (*Point) String

func (p *Point) String() string

type Points

type Points []Point

func (Points) Buffer

func (ps Points) Buffer() []byte

type Sender

type Sender interface {
	Name() string
	// send data, error if failed
	Send([]Data) error
	Close() error
}

Sender send data to pandora, prometheus such different destinations

func NewDiscardSender

func NewDiscardSender(c conf.MapConf) (Sender, error)

NewDiscardSender 仅用于日志清理

func NewElasticSender

func NewElasticSender(conf conf.MapConf) (sender Sender, err error)

func NewFileSender

func NewFileSender(conf conf.MapConf) (sender Sender, err error)

NewFileSender construct

func NewInfluxdbSender

func NewInfluxdbSender(c conf.MapConf) (s Sender, err error)

NewInfluxdbSender 创建Influxdb 的sender

func NewMockSender

func NewMockSender(c conf.MapConf) (Sender, error)

NewMockSender 测试用sender

func NewMongodbAccSender

func NewMongodbAccSender(conf conf.MapConf) (sender Sender, err error)

NewMongodbAccSender mongodb accumulate sender constructor

func NewPandoraSender

func NewPandoraSender(conf conf.MapConf) (sender Sender, err error)

NewPandoraSender pandora sender constructor

type SenderRegistry

type SenderRegistry struct {
	// contains filtered or unexported fields
}

SenderRegistry sender 的工厂类。可以注册自定义sender

func NewSenderRegistry

func NewSenderRegistry() *SenderRegistry

func (*SenderRegistry) NewSender

func (r *SenderRegistry) NewSender(conf conf.MapConf) (sender Sender, err error)

func (*SenderRegistry) RegisterSender

func (registry *SenderRegistry) RegisterSender(senderType string, constructor func(conf.MapConf) (Sender, error)) error

type Tags

type Tags map[string]string

func (Tags) HashKey

func (t Tags) HashKey() []byte

HashKey hashes all of a tag's keys.

type UserSchema

type UserSchema struct {
	DefaultAll bool
	Fields     map[string]string
}

UserSchema was parsed pandora schema from user's raw schema

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL