Documentation ¶
Overview ¶
added by vincent.zhang for sending to kafka 2017.09.25
Index ¶
- Constants
- Variables
- func DestroyConnPools()
- func Push2GraphSendQueue(items []*cmodel.MetaData)
- func Push2JudgeSendQueue(items []*cmodel.MetaData)
- func Push2KafkaLogSendQueue(items []*LogMetricItem)
- func Push2KafkaSendQueue(items []*cmodel.MetaData)
- func Push2TsdbSendQueue(items []*cmodel.MetaData)
- func Start()
- type KafkaItem
- type KafkaProducer
- type KafkaProducerPool
- type LogMetricItem
Constants ¶
View Source
const ( DefaultProcCronPeriod = time.Duration(5) * time.Second //ProcCron的周期,默认1s DefaultLogCronPeriod = time.Duration(3600) * time.Second //LogCron的周期,默认300s )
View Source
const (
DefaultSendQueueMaxSize = 102400 //10.24w
)
View Source
const (
DefaultSendTaskSleepInterval = time.Millisecond * 50 //默认睡眠间隔为50ms
)
send
Variables ¶
View Source
var ( JudgeNodeRing *rings.ConsistentHashNodeRing GraphNodeRing *rings.ConsistentHashNodeRing )
服务节点的一致性哈希环 pk -> node
View Source
var ( TsdbQueue *nlist.SafeListLimited JudgeQueues = make(map[string]*nlist.SafeListLimited) GraphQueues = make(map[string]*nlist.SafeListLimited) //added by vincent.zhang for sending to kafka KafkaQueue *nlist.SafeListLimited KafkaLogQueue *nlist.SafeListLimited // added by qimin.xu KafkaFilterQueues = make(map[string]*nlist.SafeListLimited) )
发送缓存队列 node -> queue_of_data
View Source
var ( JudgeConnPools *backend.SafeRpcConnPools TsdbConnPoolHelper *backend.TsdbConnPoolHelper GraphConnPools *backend.SafeRpcConnPools )
连接池 node_address -> connection_pool
View Source
var (
MinStep int //最小上报周期,单位sec
)
默认参数
Functions ¶
func DestroyConnPools ¶
func DestroyConnPools()
func Push2GraphSendQueue ¶
将数据 打入 某个Graph的发送缓存队列, 具体是哪一个Graph 由一致性哈希 决定
func Push2JudgeSendQueue ¶
将数据 打入 某个Judge的发送缓存队列, 具体是哪一个Judge 由一致性哈希 决定
func Push2KafkaLogSendQueue ¶
func Push2KafkaLogSendQueue(items []*LogMetricItem)
将原始数据入到kafka Log发送缓存队列
Types ¶
type KafkaItem ¶
type KafkaItem struct { Endpoint string `json:"endpoint"` Metric string `json:"metric"` Tags string `json:"tags"` Timestamp int64 `json:"timestamp"` Value float64 `json:"value"` DsType string `json:"dstype"` Step int `json:"step"` }
added by vincent.zhang for sending to Kafka 转化为kafka格式
func (*KafkaItem) KafkaString ¶
type KafkaProducer ¶
type KafkaProducer struct {
// contains filtered or unexported fields
}
func NewKafkaProducer ¶
func NewKafkaProducer(name string, address []string) (*KafkaProducer, error)
func (*KafkaProducer) AsyncClose ¶
func (this *KafkaProducer) AsyncClose()
func (*KafkaProducer) Close ¶
func (this *KafkaProducer) Close() error
func (*KafkaProducer) LogRun ¶
func (this *KafkaProducer) LogRun()
added by vincent.zhang for sending string log to kafka
func (*KafkaProducer) Name ¶
func (this *KafkaProducer) Name() string
func (*KafkaProducer) Run ¶
func (this *KafkaProducer) Run()
type KafkaProducerPool ¶
type KafkaProducerPool struct { sync.RWMutex Name string Address []string MaxProducers int32 MaxIdle int32 Cnt int64 New func(name string, address []string) (*KafkaProducer, error) // contains filtered or unexported fields }
Kafka_Producer_Pool
func NewKafkaProducerPool ¶
func NewKafkaProducerPool(name string, address []string, maxProducers int32, maxIdle int32) *KafkaProducerPool
func (*KafkaProducerPool) Destroy ¶
func (this *KafkaProducerPool) Destroy()
func (*KafkaProducerPool) Fetch ¶
func (this *KafkaProducerPool) Fetch() (*KafkaProducer, error)
func (*KafkaProducerPool) ForceClose ¶
func (this *KafkaProducerPool) ForceClose(producer *KafkaProducer)
func (*KafkaProducerPool) Proc ¶
func (this *KafkaProducerPool) Proc() string
func (*KafkaProducerPool) Release ¶
func (this *KafkaProducerPool) Release(producer *KafkaProducer)
type LogMetricItem ¶
type LogMetricItem struct { Endpoint string `json:"endpoint"` Metric string `json:"metric"` Tags map[string]string `json:"tags"` Timestamp int64 `json:"timestamp"` Value string `json:"value"` Step int `json:"step"` }
func (*LogMetricItem) KafkaString ¶
func (this *LogMetricItem) KafkaString() (s string)
Click to show internal directories.
Click to hide internal directories.