migrate

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 29, 2020 License: Apache-2.0 Imports: 26 Imported by: 3

Documentation

Index

Constants

View Source
const (
	DefaultSendQueueMaxSize = 102400 //10.24w
)
View Source
const (
	DefaultSendTaskSleepInterval = time.Millisecond * 50 //默认睡眠间隔为50ms
)

send

Variables

View Source
var (
	Config     MigrateSection
	QueueCheck = QueueFilter{Data: make(map[interface{}]struct{})}

	TsdbQueues    = make(map[string]*list.SafeListLimited)
	NewTsdbQueues = make(map[string]*list.SafeListLimited)
	RRDFileQueues = make(map[string]*list.SafeListLimited)
	// 服务节点的一致性哈希环 pk -> node
	TsdbNodeRing    *ConsistentHashRing
	NewTsdbNodeRing *ConsistentHashRing

	// 连接池 node_address -> connection_pool
	TsdbConnPools    *ConnPools = &ConnPools{M: make(map[string]*pool.ConnPool)}
	NewTsdbConnPools *ConnPools = &ConnPools{M: make(map[string]*pool.ConnPool)}
)

Functions

func FetchData

func FetchData(start, end int64, consolFun, endpoint, counter string, step int) ([]*dataobj.RRDData, error)

func GenQParam

func GenQParam(start, end int64, consolFunc, endpoint, counter string, step int) dataobj.TsdbQueryParam

func Init

func Init(cfg MigrateSection)

func Push2NewTsdbSendQueue

func Push2NewTsdbSendQueue(item *dataobj.TsdbItem)

func Push2OldTsdbSendQueue

func Push2OldTsdbSendQueue(item *dataobj.TsdbItem)

将数据 打入 某个Tsdb的发送缓存队列, 具体是哪一个Tsdb 由一致性哈希 决定

func QueryOne

func QueryOne(para dataobj.TsdbQueryParam) (resp *dataobj.TsdbQueryResponse, err error)

func RebuildConsistentHashRing

func RebuildConsistentHashRing(hashRing *ConsistentHashRing, nodes []string, replicas int)

func StartMigrate

func StartMigrate()

Types

type ConnPools

type ConnPools struct {
	sync.RWMutex
	M           map[string]*pool.ConnPool
	MaxConns    int
	MaxIdle     int
	ConnTimeout int
	CallTimeout int
}

每个后端backend对应一个ConnPool

func CreateConnPools

func CreateConnPools(maxConns, maxIdle, connTimeout, callTimeout int, cluster []string) *ConnPools

func (*ConnPools) Call

func (this *ConnPools) Call(addr, method string, args interface{}, resp interface{}) error

同步发送, 完成发送或超时后 才能返回

func (*ConnPools) Get

func (this *ConnPools) Get(address string) (*pool.ConnPool, bool)

type ConsistentHashRing

type ConsistentHashRing struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewConsistentHashRing

func NewConsistentHashRing(replicas int32, nodes []string) *ConsistentHashRing

func (*ConsistentHashRing) GetNode

func (this *ConsistentHashRing) GetNode(pk string) (string, error)

func (*ConsistentHashRing) GetRing

func (this *ConsistentHashRing) GetRing() *consistent.Consistent

func (*ConsistentHashRing) Set

type MigrateSection

type MigrateSection struct {
	Batch       int               `yaml:"batch"`
	Concurrency int               `yaml:"concurrency"` //number of multiple worker per node
	Enabled     bool              `yaml:"enabled"`
	Replicas    int               `yaml:"replicas"`
	OldCluster  map[string]string `yaml:"oldCluster"`
	NewCluster  map[string]string `yaml:"newCluster"`
	MaxConns    int               `yaml:"maxConns"`
	MaxIdle     int               `yaml:"maxIdle"`
	ConnTimeout int               `yaml:"connTimeout"`
	CallTimeout int               `yaml:"callTimeout"`
}

type QueueFilter

type QueueFilter struct {
	Data map[interface{}]struct{}
	sync.RWMutex
}

func (*QueueFilter) Exists

func (q *QueueFilter) Exists(key interface{}) bool

func (*QueueFilter) Set

func (q *QueueFilter) Set(key interface{})

type RpcClient

type RpcClient struct {
	// contains filtered or unexported fields
}

RpcCient, 要实现io.Closer接口

func (RpcClient) Call

func (this RpcClient) Call(method string, args interface{}, reply interface{}) error

func (RpcClient) Close

func (this RpcClient) Close() error

func (RpcClient) Closed

func (this RpcClient) Closed() bool

func (RpcClient) Name

func (this RpcClient) Name() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL