Documentation ¶
Index ¶
- Constants
- Variables
- func FetchData(start, end int64, consolFun, endpoint, counter string, step int) ([]*dataobj.RRDData, error)
- func GenQParam(start, end int64, consolFunc, endpoint, counter string, step int) dataobj.TsdbQueryParam
- func Init(cfg MigrateSection)
- func Push2NewTsdbSendQueue(item *dataobj.TsdbItem)
- func Push2OldTsdbSendQueue(item *dataobj.TsdbItem)
- func QueryOne(para dataobj.TsdbQueryParam) (resp *dataobj.TsdbQueryResponse, err error)
- func RebuildConsistentHashRing(hashRing *ConsistentHashRing, nodes []string, replicas int)
- func StartMigrate()
- type ConsistentHashRing
- type MigrateSection
- type QueueFilter
Constants ¶
View Source
const (
DefaultSendQueueMaxSize = 102400 //10.24w
)
View Source
const (
DefaultSendTaskSleepInterval = time.Millisecond * 50 //默认睡眠间隔为50ms
)
send
Variables ¶
View Source
var ( Config MigrateSection QueueCheck = QueueFilter{Data: make(map[string]struct{})} TsdbQueues = make(map[string]*list.SafeListLimited) NewTsdbQueues = make(map[string]*list.SafeListLimited) RRDFileQueues = make(map[string]*list.SafeListLimited) // 服务节点的一致性哈希环 pk -> node TsdbNodeRing *ConsistentHashRing NewTsdbNodeRing *ConsistentHashRing // 连接池 node_address -> connection_pool TsdbConnPools *pools.ConnPools NewTsdbConnPools *pools.ConnPools )
Functions ¶
func GenQParam ¶
func GenQParam(start, end int64, consolFunc, endpoint, counter string, step int) dataobj.TsdbQueryParam
func Init ¶
func Init(cfg MigrateSection)
func Push2NewTsdbSendQueue ¶
func Push2OldTsdbSendQueue ¶
将数据 打入 某个Tsdb的发送缓存队列, 具体是哪一个Tsdb 由一致性哈希 决定
func QueryOne ¶
func QueryOne(para dataobj.TsdbQueryParam) (resp *dataobj.TsdbQueryResponse, err error)
func RebuildConsistentHashRing ¶
func RebuildConsistentHashRing(hashRing *ConsistentHashRing, nodes []string, replicas int)
func StartMigrate ¶
func StartMigrate()
Types ¶
type ConsistentHashRing ¶
func NewConsistentHashRing ¶
func NewConsistentHashRing(replicas int32, nodes []string) *ConsistentHashRing
func (*ConsistentHashRing) GetNode ¶
func (c *ConsistentHashRing) GetNode(pk string) (string, error)
func (*ConsistentHashRing) GetRing ¶
func (c *ConsistentHashRing) GetRing() *consistent.Consistent
func (*ConsistentHashRing) Set ¶
func (c *ConsistentHashRing) Set(r *consistent.Consistent)
type MigrateSection ¶
type MigrateSection struct { Batch int `yaml:"batch"` Concurrency int `yaml:"concurrency"` //number of multiple worker per node Enabled bool `yaml:"enabled"` Replicas int `yaml:"replicas"` OldCluster map[string]string `yaml:"oldCluster"` NewCluster map[string]string `yaml:"newCluster"` MaxConns int `yaml:"maxConns"` MaxIdle int `yaml:"maxIdle"` ConnTimeout int `yaml:"connTimeout"` CallTimeout int `yaml:"callTimeout"` }
type QueueFilter ¶
func (*QueueFilter) Exists ¶
func (q *QueueFilter) Exists(key string) bool
func (*QueueFilter) Set ¶
func (q *QueueFilter) Set(key string)
Click to show internal directories.
Click to hide internal directories.