Documentation ¶
Index ¶
- Constants
- Variables
- func FetchData(start, end int64, consolFun, endpoint, counter string, step int) ([]*dataobj.RRDData, error)
- func GenQParam(start, end int64, consolFunc, endpoint, counter string, step int) dataobj.TsdbQueryParam
- func Init(cfg MigrateSection)
- func Push2NewTsdbSendQueue(item *dataobj.TsdbItem)
- func Push2OldTsdbSendQueue(item *dataobj.TsdbItem)
- func QueryOne(para dataobj.TsdbQueryParam) (resp *dataobj.TsdbQueryResponse, err error)
- func RebuildConsistentHashRing(hashRing *ConsistentHashRing, nodes []string, replicas int)
- func StartMigrate()
- type ConnPools
- type ConsistentHashRing
- type MigrateSection
- type QueueFilter
- type RpcClient
Constants ¶
View Source
const (
DefaultSendQueueMaxSize = 102400 //10.24w
)
View Source
const (
DefaultSendTaskSleepInterval = time.Millisecond * 50 //默认睡眠间隔为50ms
)
send
Variables ¶
View Source
var ( Config MigrateSection QueueCheck = QueueFilter{Data: make(map[interface{}]struct{})} TsdbQueues = make(map[string]*list.SafeListLimited) NewTsdbQueues = make(map[string]*list.SafeListLimited) RRDFileQueues = make(map[string]*list.SafeListLimited) // 服务节点的一致性哈希环 pk -> node TsdbNodeRing *ConsistentHashRing NewTsdbNodeRing *ConsistentHashRing // 连接池 node_address -> connection_pool TsdbConnPools *ConnPools = &ConnPools{M: make(map[string]*pool.ConnPool)} NewTsdbConnPools *ConnPools = &ConnPools{M: make(map[string]*pool.ConnPool)} )
Functions ¶
func GenQParam ¶
func GenQParam(start, end int64, consolFunc, endpoint, counter string, step int) dataobj.TsdbQueryParam
func Init ¶
func Init(cfg MigrateSection)
func Push2NewTsdbSendQueue ¶
func Push2OldTsdbSendQueue ¶
将数据 打入 某个Tsdb的发送缓存队列, 具体是哪一个Tsdb 由一致性哈希 决定
func QueryOne ¶
func QueryOne(para dataobj.TsdbQueryParam) (resp *dataobj.TsdbQueryResponse, err error)
func RebuildConsistentHashRing ¶
func RebuildConsistentHashRing(hashRing *ConsistentHashRing, nodes []string, replicas int)
func StartMigrate ¶
func StartMigrate()
Types ¶
type ConnPools ¶
type ConnPools struct { sync.RWMutex M map[string]*pool.ConnPool MaxConns int MaxIdle int ConnTimeout int CallTimeout int }
每个后端backend对应一个ConnPool
func CreateConnPools ¶
type ConsistentHashRing ¶
func NewConsistentHashRing ¶
func NewConsistentHashRing(replicas int32, nodes []string) *ConsistentHashRing
func (*ConsistentHashRing) GetNode ¶
func (this *ConsistentHashRing) GetNode(pk string) (string, error)
func (*ConsistentHashRing) GetRing ¶
func (this *ConsistentHashRing) GetRing() *consistent.Consistent
func (*ConsistentHashRing) Set ¶
func (this *ConsistentHashRing) Set(r *consistent.Consistent)
type MigrateSection ¶
type MigrateSection struct { Batch int `yaml:"batch"` Concurrency int `yaml:"concurrency"` //number of multiple worker per node Enabled bool `yaml:"enabled"` Replicas int `yaml:"replicas"` OldCluster map[string]string `yaml:"oldCluster"` NewCluster map[string]string `yaml:"newCluster"` MaxConns int `yaml:"maxConns"` MaxIdle int `yaml:"maxIdle"` ConnTimeout int `yaml:"connTimeout"` CallTimeout int `yaml:"callTimeout"` }
type QueueFilter ¶
func (*QueueFilter) Exists ¶
func (q *QueueFilter) Exists(key interface{}) bool
func (*QueueFilter) Set ¶
func (q *QueueFilter) Set(key interface{})
Click to show internal directories.
Click to hide internal directories.