Documentation ¶
Index ¶
- Constants
- Variables
- func FetchData(inputs []dataobj.QueryData) []*dataobj.TsdbQueryResponse
- func FetchDataForUI(input dataobj.QueryDataForUI) []*dataobj.TsdbQueryResponse
- func GenQParam(start, end int64, consolFunc, endpoint, counter string, step int) dataobj.TsdbQueryParam
- func GetCounter(metric, tag string, tagMap map[string]string) (counter string, err error)
- func GetJudges() []string
- func GetSeries(start, end int64, req []SeriesReq) ([]dataobj.QueryData, error)
- func Init(cfg BackendSection)
- func Push2InfluxdbSendQueue(items []*dataobj.MetricValue)
- func Push2JudgeSendQueue(items []*dataobj.MetricValue)
- func Push2OpenTsdbSendQueue(items []*dataobj.MetricValue)
- func Push2TsdbSendQueue(items []*dataobj.MetricValue)
- func QueryOne(para dataobj.TsdbQueryParam) (resp *dataobj.TsdbQueryResponse, err error)
- func RebuildConsistentHashRing(hashRing *ConsistentHashRing, nodes []string, replicas int)
- func Send2JudgeTask(Q *list.SafeListLimited, addr string, concurrent int)
- func Send2TsdbTask(Q *list.SafeListLimited, node, addr string, concurrent int)
- func TagMatch(straTags []model.Tag, tag map[string]string) bool
- type BackendSection
- type ClusterNode
- type ConsistentHashRing
- type InfluxClient
- type InfluxdbSection
- type OpenTsdbSection
- type Pool
- type Series
- type SeriesReq
- type SeriesResp
- type Tagkv
Constants ¶
View Source
const ( DefaultSendTaskSleepInterval = time.Millisecond * 50 //默认睡眠间隔为50ms MaxSendRetry = 10 )
send
View Source
const DefaultSendQueueMaxSize = 102400 //10.24w
Variables ¶
View Source
var ( Config BackendSection // 服务节点的一致性哈希环 pk -> node TsdbNodeRing *ConsistentHashRing // 发送缓存队列 node -> queue_of_data TsdbQueues = make(map[string]*list.SafeListLimited) JudgeQueues = cache.SafeJudgeQueue{} InfluxdbQueue *list.SafeListLimited OpenTsdbQueue *list.SafeListLimited // 连接池 node_address -> connection_pool TsdbConnPools *pools.ConnPools JudgeConnPools *pools.ConnPools OpenTsdbConnPoolHelper *pools.OpenTsdbConnPoolHelper )
View Source
var (
MinStep int //最小上报周期,单位sec
)
Functions ¶
func FetchDataForUI ¶
func FetchDataForUI(input dataobj.QueryDataForUI) []*dataobj.TsdbQueryResponse
func GenQParam ¶
func GenQParam(start, end int64, consolFunc, endpoint, counter string, step int) dataobj.TsdbQueryParam
func GetCounter ¶
func Init ¶
func Init(cfg BackendSection)
func Push2InfluxdbSendQueue ¶ added in v1.4.0
func Push2InfluxdbSendQueue(items []*dataobj.MetricValue)
将原始数据插入到influxdb缓存队列
func Push2JudgeSendQueue ¶
func Push2JudgeSendQueue(items []*dataobj.MetricValue)
func Push2OpenTsdbSendQueue ¶ added in v1.4.0
func Push2OpenTsdbSendQueue(items []*dataobj.MetricValue)
将原始数据入到tsdb发送缓存队列
func Push2TsdbSendQueue ¶
func Push2TsdbSendQueue(items []*dataobj.MetricValue)
Push2TsdbSendQueue pushes data to a TSDB instance which depends on the consistent ring.
func QueryOne ¶
func QueryOne(para dataobj.TsdbQueryParam) (resp *dataobj.TsdbQueryResponse, err error)
func RebuildConsistentHashRing ¶
func RebuildConsistentHashRing(hashRing *ConsistentHashRing, nodes []string, replicas int)
func Send2JudgeTask ¶
func Send2JudgeTask(Q *list.SafeListLimited, addr string, concurrent int)
func Send2TsdbTask ¶
func Send2TsdbTask(Q *list.SafeListLimited, node, addr string, concurrent int)
Types ¶
type BackendSection ¶
type BackendSection struct { Enabled bool `yaml:"enabled"` Batch int `yaml:"batch"` ConnTimeout int `yaml:"connTimeout"` CallTimeout int `yaml:"callTimeout"` WorkerNum int `yaml:"workerNum"` MaxConns int `yaml:"maxConns"` MaxIdle int `yaml:"maxIdle"` IndexTimeout int `yaml:"indexTimeout"` StraPath string `yaml:"straPath"` HbsMod string `yaml:"hbsMod"` Replicas int `yaml:"replicas"` Cluster map[string]string `yaml:"cluster"` ClusterList map[string]*ClusterNode `json:"clusterList"` Influxdb InfluxdbSection `yaml:"influxdb"` OpenTsdb OpenTsdbSection `yaml:"opentsdb"` }
type ClusterNode ¶
type ClusterNode struct {
Addrs []string `json:"addrs"`
}
type ConsistentHashRing ¶
func NewConsistentHashRing ¶
func NewConsistentHashRing(replicas int32, nodes []string) *ConsistentHashRing
func (*ConsistentHashRing) GetNode ¶
func (c *ConsistentHashRing) GetNode(pk string) (string, error)
func (*ConsistentHashRing) GetRing ¶
func (c *ConsistentHashRing) GetRing() *consistent.Consistent
func (*ConsistentHashRing) Set ¶
func (c *ConsistentHashRing) Set(r *consistent.Consistent)
type InfluxClient ¶ added in v1.4.0
func NewInfluxdbClient ¶ added in v1.4.0
func NewInfluxdbClient() (*InfluxClient, error)
func (*InfluxClient) Send ¶ added in v1.4.0
func (c *InfluxClient) Send(items []*dataobj.InfluxdbItem) error
type InfluxdbSection ¶ added in v1.4.0
type InfluxdbSection struct { Enabled bool `yaml:"enabled"` Batch int `yaml:"batch"` MaxRetry int `yaml:"maxRetry"` WorkerNum int `yaml:"workerNum"` Timeout int `yaml:"timeout"` Address string `yaml:"address"` Database string `yaml:"database"` Username string `yaml:"username"` Password string `yaml:"password"` Precision string `yaml:"precision"` }
type OpenTsdbSection ¶ added in v1.4.0
type OpenTsdbSection struct { Enabled bool `yaml:"enabled"` Batch int `yaml:"batch"` ConnTimeout int `yaml:"connTimeout"` CallTimeout int `yaml:"callTimeout"` WorkerNum int `yaml:"workerNum"` MaxConns int `yaml:"maxConns"` MaxIdle int `yaml:"maxIdle"` MaxRetry int `yaml:"maxRetry"` Address string `yaml:"address"` }
type Pool ¶
func SelectPoolByPK ¶
type SeriesResp ¶
Click to show internal directories.
Click to hide internal directories.