sender

package
v0.0.0-...-a9119a5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 18, 2022 License: Apache-2.0, Apache-2.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultProcCronPeriod = time.Duration(5) * time.Second    //ProcCron的周期,默认1s
	DefaultLogCronPeriod  = time.Duration(3600) * time.Second //LogCron的周期,默认300s
)
View Source
const (
	DefaultSendQueueMaxSize = 102400 //10.24w
)
View Source
const (
	DefaultSendTaskSleepInterval = time.Millisecond * 50 //默认睡眠间隔为50ms
)

send

Variables

服务节点的一致性哈希环 pk -> node

View Source
var (
	//KafkaQueues string
	TsdbQueue   *nlist.SafeListLimited
	JudgeQueues = make(map[string]*nlist.SafeListLimited)
	GraphQueues = make(map[string]*nlist.SafeListLimited)
	PolyQueues  = make(map[string]*nlist.SafeListLimited)
)

发送缓存队列 node -> queue_of_data

View Source
var (
	JudgeConnPools     *backend.SafeRpcConnPools
	TsdbConnPoolHelper *backend.TsdbConnPoolHelper
	GraphConnPools     *backend.SafeRpcConnPools
	PolyConnPools      *backend.SafeRpcConnPools
)

连接池 node_address -> connection_pool

View Source
var (
	KafkaQueue    = make(chan *cmodel.KafkaData, 64000) //设置一个缓存为64000的channel
	InfluxDBQueue = make(chan *cmodel.MetaData, 64000)  //设置一个缓存为64000的channel
)
View Source
var (
	MinStep int //最小上报周期,单位sec
)

默认参数

Functions

func DestroyConnPools

func DestroyConnPools()

func ExtractKafkaItemNew

func ExtractKafkaItemNew(items []*cmodel.MetaData, kv *cmodel.KafkaData)

func GetInfluxDbConn

func GetInfluxDbConn() (client.Client, error)

func GetNewBatchPoints

func GetNewBatchPoints() (client.BatchPoints, error)

func MetricsBusSend

func MetricsBusSend(key, val []byte) error

func Push2GraphSendQueue

func Push2GraphSendQueue(items []*cmodel.MetaData)

将数据 打入 某个Graph的发送缓存队列, 具体是哪一个Graph 由一致性哈希 决定

func Push2InfluxDBSendQueue

func Push2InfluxDBSendQueue(items []*cmodel.MetaData)

func Push2JudgeSendQueue

func Push2JudgeSendQueue(items []*cmodel.MetaData)

将数据 打入 某个Judge的发送缓存队列, 具体是哪一个Judge 由一致性哈希 决定

func Push2KafkaSendQueue

func Push2KafkaSendQueue(items []*cmodel.MetaData)

将数据 打入kafka的发送缓存队列

func Push2PolySendQueue

func Push2PolySendQueue(items []*cmodel.PolyRequest)

func Push2TsdbSendQueue

func Push2TsdbSendQueue(items []*cmodel.MetaData)

将原始数据入到tsdb发送缓存队列

func Start

func Start()

初始化数据发送服务, 在main函数中调用

func StartKafkaSender

func StartKafkaSender()

将缓存数据队列写入kakfa中

Types

type PolymetricResp

type PolymetricResp struct {
	Msg        string
	Total      int
	ErrInvalid int
	Latency    int64
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL