sender

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2024 License: Apache-2.0, Apache-2.0 Imports: 16 Imported by: 0

Documentation

Overview

added by vincent.zhang for sending to kafka 2017.09.25

Index

Constants

View Source
const (
	DefaultProcCronPeriod = time.Duration(5) * time.Second    //ProcCron的周期,默认1s
	DefaultLogCronPeriod  = time.Duration(3600) * time.Second //LogCron的周期,默认300s
)
View Source
const (
	DefaultSendQueueMaxSize = 102400 //10.24w
)
View Source
const (
	DefaultSendTaskSleepInterval = time.Millisecond * 50 //默认睡眠间隔为50ms
)

send

Variables

View Source
var (
	JudgeNodeRing *rings.ConsistentHashNodeRing
	GraphNodeRing *rings.ConsistentHashNodeRing
)

服务节点的一致性哈希环 pk -> node

View Source
var (
	TsdbQueue   *nlist.SafeListLimited
	JudgeQueues = make(map[string]*nlist.SafeListLimited)
	GraphQueues = make(map[string]*nlist.SafeListLimited)
	//added by vincent.zhang for sending to kafka
	KafkaQueue    *nlist.SafeListLimited
	KafkaLogQueue *nlist.SafeListLimited
	// added by qimin.xu
	KafkaFilterQueues = make(map[string]*nlist.SafeListLimited)
)

发送缓存队列 node -> queue_of_data

View Source
var (
	JudgeConnPools     *backend.SafeRpcConnPools
	TsdbConnPoolHelper *backend.TsdbConnPoolHelper
	GraphConnPools     *backend.SafeRpcConnPools
)

连接池 node_address -> connection_pool

View Source
var (
	MinStep int //最小上报周期,单位sec
)

默认参数

Functions

func DestroyConnPools

func DestroyConnPools()

func Push2GraphSendQueue

func Push2GraphSendQueue(items []*cmodel.MetaData)

将数据 打入 某个Graph的发送缓存队列, 具体是哪一个Graph 由一致性哈希 决定

func Push2JudgeSendQueue

func Push2JudgeSendQueue(items []*cmodel.MetaData)

将数据 打入 某个Judge的发送缓存队列, 具体是哪一个Judge 由一致性哈希 决定

func Push2KafkaLogSendQueue

func Push2KafkaLogSendQueue(items []*LogMetricItem)

将原始数据入到kafka Log发送缓存队列

func Push2KafkaSendQueue

func Push2KafkaSendQueue(items []*cmodel.MetaData)

将原始数据入到kafka发送缓存队列

func Push2TsdbSendQueue

func Push2TsdbSendQueue(items []*cmodel.MetaData)

将原始数据入到tsdb发送缓存队列

func Start

func Start()

初始化数据发送服务, 在main函数中调用

Types

type KafkaItem

type KafkaItem struct {
	Endpoint  string  `json:"endpoint"`
	Metric    string  `json:"metric"`
	Tags      string  `json:"tags"`
	Timestamp int64   `json:"timestamp"`
	Value     float64 `json:"value"`
	DsType    string  `json:"dstype"`
	Step      int     `json:"step"`
}

added by vincent.zhang for sending to Kafka 转化为kafka格式

func (*KafkaItem) KafkaString

func (this *KafkaItem) KafkaString() (s string)

type KafkaProducer

type KafkaProducer struct {
	// contains filtered or unexported fields
}

func NewKafkaProducer

func NewKafkaProducer(name string, address []string) (*KafkaProducer, error)

func (*KafkaProducer) AsyncClose

func (this *KafkaProducer) AsyncClose()

func (*KafkaProducer) Close

func (this *KafkaProducer) Close() error

func (*KafkaProducer) LogRun

func (this *KafkaProducer) LogRun()

added by vincent.zhang for sending string log to kafka

func (*KafkaProducer) Name

func (this *KafkaProducer) Name() string

func (*KafkaProducer) Run

func (this *KafkaProducer) Run()

type KafkaProducerPool

type KafkaProducerPool struct {
	sync.RWMutex

	Name         string
	Address      []string
	MaxProducers int32
	MaxIdle      int32
	Cnt          int64

	New func(name string, address []string) (*KafkaProducer, error)
	// contains filtered or unexported fields
}

Kafka_Producer_Pool

func NewKafkaProducerPool

func NewKafkaProducerPool(name string, address []string, maxProducers int32, maxIdle int32) *KafkaProducerPool

func (*KafkaProducerPool) Destroy

func (this *KafkaProducerPool) Destroy()

func (*KafkaProducerPool) Fetch

func (this *KafkaProducerPool) Fetch() (*KafkaProducer, error)

func (*KafkaProducerPool) ForceClose

func (this *KafkaProducerPool) ForceClose(producer *KafkaProducer)

func (*KafkaProducerPool) Proc

func (this *KafkaProducerPool) Proc() string

func (*KafkaProducerPool) Release

func (this *KafkaProducerPool) Release(producer *KafkaProducer)

type LogMetricItem

type LogMetricItem struct {
	Endpoint  string            `json:"endpoint"`
	Metric    string            `json:"metric"`
	Tags      map[string]string `json:"tags"`
	Timestamp int64             `json:"timestamp"`
	Value     string            `json:"value"`
	Step      int               `json:"step"`
}

func (*LogMetricItem) KafkaString

func (this *LogMetricItem) KafkaString() (s string)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL