migrate

package
v0.0.0-...-b6b44b4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 23, 2020 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultSendQueueMaxSize = 102400 //10.24w
)
View Source
const (
	DefaultSendTaskSleepInterval = time.Millisecond * 50 //默认睡眠间隔为50ms
)

send

Variables

View Source
var (
	Config     MigrateSection
	QueueCheck = QueueFilter{Data: make(map[string]struct{})}

	TsdbQueues    = make(map[string]*list.SafeListLimited)
	NewTsdbQueues = make(map[string]*list.SafeListLimited)
	RRDFileQueues = make(map[string]*list.SafeListLimited)
	// 服务节点的一致性哈希环 pk -> node
	TsdbNodeRing    *ConsistentHashRing
	NewTsdbNodeRing *ConsistentHashRing

	// 连接池 node_address -> connection_pool
	TsdbConnPools    *pools.ConnPools
	NewTsdbConnPools *pools.ConnPools
)

Functions

func FetchData

func FetchData(start, end int64, consolFun, endpoint, counter string, step int) ([]*dataobj.RRDData, error)

func GenQParam

func GenQParam(start, end int64, consolFunc, endpoint, counter string, step int) dataobj.TsdbQueryParam

func Init

func Init(cfg MigrateSection)

func Push2NewTsdbSendQueue

func Push2NewTsdbSendQueue(item *dataobj.TsdbItem)

func Push2OldTsdbSendQueue

func Push2OldTsdbSendQueue(item *dataobj.TsdbItem)

将数据 打入 某个Tsdb的发送缓存队列, 具体是哪一个Tsdb 由一致性哈希 决定

func QueryOne

func QueryOne(para dataobj.TsdbQueryParam) (resp *dataobj.TsdbQueryResponse, err error)

func RebuildConsistentHashRing

func RebuildConsistentHashRing(hashRing *ConsistentHashRing, nodes []string, replicas int)

func StartMigrate

func StartMigrate()

Types

type ConsistentHashRing

type ConsistentHashRing struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewConsistentHashRing

func NewConsistentHashRing(replicas int32, nodes []string) *ConsistentHashRing

func (*ConsistentHashRing) GetNode

func (c *ConsistentHashRing) GetNode(pk string) (string, error)

func (*ConsistentHashRing) GetRing

func (*ConsistentHashRing) Set

type MigrateSection

type MigrateSection struct {
	Batch       int               `yaml:"batch"`
	Concurrency int               `yaml:"concurrency"` //number of multiple worker per node
	Enabled     bool              `yaml:"enabled"`
	Replicas    int               `yaml:"replicas"`
	OldCluster  map[string]string `yaml:"oldCluster"`
	NewCluster  map[string]string `yaml:"newCluster"`
	MaxConns    int               `yaml:"maxConns"`
	MaxIdle     int               `yaml:"maxIdle"`
	ConnTimeout int               `yaml:"connTimeout"`
	CallTimeout int               `yaml:"callTimeout"`
}

type QueueFilter

type QueueFilter struct {
	Data map[string]struct{}
	sync.RWMutex
}

func (*QueueFilter) Exists

func (q *QueueFilter) Exists(key string) bool

func (*QueueFilter) Set

func (q *QueueFilter) Set(key string)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL