Documentation ¶
Index ¶
- Constants
- Variables
- func Init(cfg BackendSection) error
- func RegisterDataSource(pluginId string, datasource DataSource)
- func RegisterPushEndpoint(pluginId string, push PushEndpoint)
- type BackendSection
- type DataSource
- type KafkaData
- type KafkaPushEndpoint
- type KafkaSection
- type KfClient
- type OpenTsdbPushEndpoint
- type OpenTsdbSection
- type PushEndpoint
Constants ¶
View Source
const ( DefaultSendTaskSleepInterval = time.Millisecond * 50 //默认睡眠间隔为50ms DefaultSendQueueMaxSize = 102400 //10.24w MaxSendRetry = 10 )
send
Variables ¶
View Source
var (
MinStep int //最小上报周期,单位sec
)
View Source
var (
StraPath string
)
Functions ¶
func Init ¶
func Init(cfg BackendSection) error
func RegisterDataSource ¶
func RegisterDataSource(pluginId string, datasource DataSource)
func RegisterPushEndpoint ¶
func RegisterPushEndpoint(pluginId string, push PushEndpoint)
Types ¶
type BackendSection ¶
type BackendSection struct { DataSource string `yaml:"datasource"` StraPath string `yaml:"straPath"` M3db m3db.M3dbSection `yaml:"m3db"` Tsdb tsdb.TsdbSection `yaml:"tsdb"` Influxdb influxdb.InfluxdbSection `yaml:"influxdb"` OpenTsdb OpenTsdbSection `yaml:"opentsdb"` Kafka KafkaSection `yaml:"kafka"` }
type DataSource ¶
type DataSource interface { PushEndpoint // query data for judge QueryData(inputs []dataobj.QueryData) []*dataobj.TsdbQueryResponse // query data for ui QueryDataForUI(input dataobj.QueryDataForUI) []*dataobj.TsdbQueryResponse // query metrics & tags QueryMetrics(recv dataobj.EndpointsRecv) *dataobj.MetricResp QueryTagPairs(recv dataobj.EndpointMetricRecv) []dataobj.IndexTagkvResp QueryIndexByClude(recv []dataobj.CludeRecv) []dataobj.XcludeResp QueryIndexByFullTags(recv []dataobj.IndexByFullTagsRecv) ([]dataobj.IndexByFullTagsResp, int) // tsdb instance GetInstance(metric, endpoint string, tags map[string]string) []string }
func GetDataSourceFor ¶
func GetDataSourceFor(pluginId string) (DataSource, error)
get backend datasource (pluginId == "" for default datasource)
type KafkaPushEndpoint ¶
type KafkaPushEndpoint struct { // config Section KafkaSection // 发送缓存队列 node -> queue_of_data KafkaQueue chan KafkaData }
func (*KafkaPushEndpoint) Init ¶
func (kafka *KafkaPushEndpoint) Init()
func (*KafkaPushEndpoint) Push2Queue ¶
func (kafka *KafkaPushEndpoint) Push2Queue(items []*dataobj.MetricValue)
type KafkaSection ¶
type KafkaSection struct { Enabled bool `yaml:"enabled"` Name string `yaml:"name"` Topic string `yaml:"topic"` BrokersPeers string `yaml:"brokersPeers"` ConnTimeout int `yaml:"connTimeout"` CallTimeout int `yaml:"callTimeout"` MaxRetry int `yaml:"maxRetry"` KeepAlive int64 `yaml:"keepAlive"` SaslUser string `yaml:"saslUser"` SaslPasswd string `yaml:"saslPasswd"` }
type KfClient ¶
type KfClient struct { Topic string BrokersPeers []string // contains filtered or unexported fields }
func NewKfClient ¶
func NewKfClient(c KafkaSection) (kafkaSender *KfClient, err error)
type OpenTsdbPushEndpoint ¶
type OpenTsdbPushEndpoint struct { // config Section OpenTsdbSection OpenTsdbConnPoolHelper *pools.OpenTsdbConnPoolHelper // 发送缓存队列 node -> queue_of_data OpenTsdbQueue *list.SafeListLimited }
func (*OpenTsdbPushEndpoint) Init ¶
func (opentsdb *OpenTsdbPushEndpoint) Init()
func (*OpenTsdbPushEndpoint) Push2Queue ¶
func (opentsdb *OpenTsdbPushEndpoint) Push2Queue(items []*dataobj.MetricValue)
将原始数据入到tsdb发送缓存队列
type OpenTsdbSection ¶
type OpenTsdbSection struct { Enabled bool `yaml:"enabled"` Name string `yaml:"name"` Batch int `yaml:"batch"` ConnTimeout int `yaml:"connTimeout"` CallTimeout int `yaml:"callTimeout"` WorkerNum int `yaml:"workerNum"` MaxConns int `yaml:"maxConns"` MaxIdle int `yaml:"maxIdle"` MaxRetry int `yaml:"maxRetry"` Address string `yaml:"address"` }
type PushEndpoint ¶
type PushEndpoint interface { // push data Push2Queue(items []*dataobj.MetricValue) }
Click to show internal directories.
Click to hide internal directories.