backend

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 16, 2020 License: Apache-2.0 Imports: 30 Imported by: 5

Documentation

Index

Constants

View Source
const (
	DefaultSendTaskSleepInterval = time.Millisecond * 50 //默认睡眠间隔为50ms
	MAX_SEND_RETRY               = 10
)

send

View Source
const DefaultSendQueueMaxSize = 102400 //10.24w

Variables

View Source
var (
	Config BackendSection
	// 服务节点的一致性哈希环 pk -> node
	TsdbNodeRing *ConsistentHashRing

	// 发送缓存队列 node -> queue_of_data
	TsdbQueues  = make(map[string]*list.SafeListLimited)
	JudgeQueues = cache.SafeJudgeQueue{}

	// 连接池 node_address -> connection_pool
	TsdbConnPools  *ConnPools = &ConnPools{M: make(map[string]*pool.ConnPool)}
	JudgeConnPools *ConnPools = &ConnPools{M: make(map[string]*pool.ConnPool)}
)
View Source
var (
	MinStep int //最小上报周期,单位sec
)

Functions

func FetchData

func FetchData(inputs []dataobj.QueryData) []*dataobj.TsdbQueryResponse

func FetchDataForUI

func FetchDataForUI(input dataobj.QueryDataForUI) []*dataobj.TsdbQueryResponse

func GenQParam

func GenQParam(start, end int64, consolFunc, endpoint, counter string, step int) dataobj.TsdbQueryParam

func GetCounter

func GetCounter(metric, tag string, tagMap map[string]string) (counter string, err error)

func GetJudges

func GetJudges() []string

func GetSeries

func GetSeries(start, end int64, req []SeriesReq) ([]dataobj.QueryData, error)

func Init

func Init(cfg BackendSection)

func Push2JudgeSendQueue

func Push2JudgeSendQueue(items []*dataobj.MetricValue)

func Push2TsdbSendQueue

func Push2TsdbSendQueue(items []*dataobj.MetricValue)

将数据 打入 某个Tsdb的发送缓存队列, 具体是哪一个Tsdb 由一致性哈希 决定

func QueryOne

func QueryOne(para dataobj.TsdbQueryParam) (resp *dataobj.TsdbQueryResponse, err error)

func RebuildConsistentHashRing

func RebuildConsistentHashRing(hashRing *ConsistentHashRing, nodes []string, replicas int)

func Send2JudgeTask

func Send2JudgeTask(Q *list.SafeListLimited, addr string, concurrent int)

func Send2TsdbTask

func Send2TsdbTask(Q *list.SafeListLimited, node string, addr string, concurrent int)

func TagMatch

func TagMatch(straTags []model.Tag, tag map[string]string) bool

Types

type BackendSection

type BackendSection struct {
	Enabled      bool   `yaml:"enabled"`
	Batch        int    `yaml:"batch"`
	ConnTimeout  int    `yaml:"connTimeout"`
	CallTimeout  int    `yaml:"callTimeout"`
	WorkerNum    int    `yaml:"workerNum"`
	MaxConns     int    `yaml:"maxConns"`
	MaxIdle      int    `yaml:"maxIdle"`
	IndexTimeout int    `yaml:"indexTimeout"`
	StraPath     string `yaml:"straPath"`
	HbsMod       string `yaml:"hbsMod"`

	Replicas    int                     `yaml:"replicas"`
	Cluster     map[string]string       `yaml:"cluster"`
	ClusterList map[string]*ClusterNode `json:"clusterList"`
}

type ClusterNode

type ClusterNode struct {
	Addrs []string `json:"addrs"`
}

type ConnPools

type ConnPools struct {
	sync.RWMutex
	M           map[string]*pool.ConnPool
	MaxConns    int
	MaxIdle     int
	ConnTimeout int
	CallTimeout int
}

每个后端backend对应一个ConnPool

func CreateConnPools

func CreateConnPools(maxConns, maxIdle, connTimeout, callTimeout int, cluster []string) *ConnPools

func (*ConnPools) Call

func (this *ConnPools) Call(addr, method string, args interface{}, resp interface{}) error

同步发送, 完成发送或超时后 才能返回

func (*ConnPools) Get

func (this *ConnPools) Get(address string) (*pool.ConnPool, bool)

func (*ConnPools) Update

func (cp *ConnPools) Update(cluster []string)

type ConsistentHashRing

type ConsistentHashRing struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewConsistentHashRing

func NewConsistentHashRing(replicas int32, nodes []string) *ConsistentHashRing

func (*ConsistentHashRing) GetNode

func (this *ConsistentHashRing) GetNode(pk string) (string, error)

func (*ConsistentHashRing) GetRing

func (this *ConsistentHashRing) GetRing() *consistent.Consistent

func (*ConsistentHashRing) Set

type Pool

type Pool struct {
	Pool *pool.ConnPool
	Addr string
}

func SelectPoolByPK

func SelectPoolByPK(pk string) ([]Pool, error)

type RpcClient

type RpcClient struct {
	// contains filtered or unexported fields
}

RpcCient, 要实现io.Closer接口

func (RpcClient) Call

func (this RpcClient) Call(method string, args interface{}, reply interface{}) error

func (RpcClient) Close

func (this RpcClient) Close() error

func (RpcClient) Closed

func (this RpcClient) Closed() bool

func (RpcClient) Name

func (this RpcClient) Name() string

type Series

type Series struct {
	Endpoints []string `json:"endpoints"`
	Metric    string   `json:"metric"`
	Tags      []string `json:"tags"`
	Step      int      `json:"step"`
	DsType    string   `json:"dstype"`
}

type SeriesReq

type SeriesReq struct {
	Endpoints []string `json:"endpoints"`
	Metric    string   `json:"metric"`
	Tagkv     []*Tagkv `json:"tagkv"`
}

type SeriesResp

type SeriesResp struct {
	Dat []Series `json:"dat"`
	Err string   `json:"err"`
}

type Tagkv

type Tagkv struct {
	TagK string   `json:"tagk"`
	TagV []string `json:"tagv"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL