cluster

package
v0.0.0-...-9750751 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 21, 2024 License: Apache-2.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MaxShards = 1024
	MaxZone   = 5
)

Variables

View Source
var (
	Version uint32 = 0

	ClusterInfo [2]Cluster
)
View Source
var (
	DefaultStatsConfig = StatsConfig{
		TimeoutStatsEnabled:    false,
		RespTimeStatsEnabled:   false,
		MarkdownThreashold:     10,
		MarkdownExpirationBase: 5 * 60,
		EMARespTimeWindowSize:  39,
		TimeoutWindowSize:      5,
		TimeoutWindowUint:      60,
	}
)

Functions

func DisplayZones

func DisplayZones(zones []*Zone, header string)

func Finalize

func Finalize()

func InitShardMgr

func InitShardMgr(ccfg *Cluster, conf *io.OutboundConfig, statscfg *StatsConfig) (err error)

func Initialize

func Initialize(args ...interface{}) (err error)

func IsNewMappingAlg

func IsNewMappingAlg() bool

func IsPrimary

func IsPrimary(shardid uint32, zoneid uint32, numZones uint32) bool

func MatchZones

func MatchZones(zones []*Zone, curr []*Zone) bool

zones: expected curr: current cluster info in etcd.

func SetMappingAlg

func SetMappingAlg(algVersion uint32)

func ValidateZones

func ValidateZones(zones []*Zone) bool

Validate shardids are unique and consecutive.

Types

type Cluster

type Cluster struct {
	Config
	Zones            []*Zone
	RedistSingleZone bool // commit redist one zone only
	RedistZoneId     int  // zone selected for commit one zone.
}

func (*Cluster) CreateShardMap

func (c *Cluster) CreateShardMap() *ShardMap

func (*Cluster) Dump

func (c *Cluster) Dump(deep bool)

func (*Cluster) DumpChangeMap

func (c *Cluster) DumpChangeMap(newCluster *Cluster)

func (*Cluster) GetShards

func (c *Cluster) GetShards(zoneid uint32, nodeid uint32) (shards []uint32, err error)

func (*Cluster) IsRedistZone

func (c *Cluster) IsRedistZone(zoneid int) bool

func (*Cluster) Log

func (c *Cluster) Log()

func (*Cluster) MergeWith

func (c *Cluster) MergeWith(other *Cluster) string

func (*Cluster) PopulateFromConfig

func (c *Cluster) PopulateFromConfig() (err error)

used if reading cluster from config file

func (*Cluster) PopulateFromRedist

func (c *Cluster) PopulateFromRedist(currZones []*Zone)

Populate shardmap for a config currZones is the existing.

func (*Cluster) Read

func (c *Cluster) Read(r IReader) (version uint32, err error)

func (*Cluster) ReadFromCache

func (c *Cluster) ReadFromCache(cacheName string) (version uint32, err2 error)

func (*Cluster) ReadWithRedistInfo

func (c *Cluster) ReadWithRedistInfo(r IReader) (version uint32, err error)

func (*Cluster) ReadWithRedistNodeShards

func (c *Cluster) ReadWithRedistNodeShards(r IReader) (err error)

func (*Cluster) ReadWithRetry

func (c *Cluster) ReadWithRetry(r IReader, cacheFile string, version uint32) (ver uint32, err error)

func (*Cluster) SetRedistZone

func (c *Cluster) SetRedistZone(zoneid int)

func (*Cluster) Validate

func (c *Cluster) Validate() error

func (*Cluster) Write

func (c *Cluster) Write(w IWriter, version ...uint32) (err error)

func (*Cluster) WriteRedistAbort

func (c *Cluster) WriteRedistAbort(w IWriter) (err error)

func (*Cluster) WriteRedistInfo

func (c *Cluster) WriteRedistInfo(w IWriter, nc *Cluster) (err error)

func (*Cluster) WriteRedistStart

func (c *Cluster) WriteRedistStart(w IWriter, flag bool, zoneid int, src bool, ratelimit int) (err error)

func (*Cluster) WriteToCache

func (c *Cluster) WriteToCache(cachePath string, cacheName string, version uint32, forRedist bool) (err error)

type ClusterCache

type ClusterCache struct {
	Version     uint32
	ForRedist   bool
	ClusterInfo Config
	Zones       []Zone
}

type ClusterStats

type ClusterStats struct {
	MarkdownTable      []bool  // 1 writer: stats go routine, M readers: any request proc
	MarkdownExpiration []int64 // only accessed by stats go routine
	// contains filtered or unexported fields
}

func NewClusterStats

func NewClusterStats(numZones uint32, maxNodesPerZone uint32, conf *StatsConfig) *ClusterStats

func (*ClusterStats) IsMarkeddown

func (c *ClusterStats) IsMarkeddown(zoneid uint32, nodeid uint32) bool

func (*ClusterStats) PrintMarkDown

func (c *ClusterStats) PrintMarkDown()

func (*ClusterStats) PrintStats

func (c *ClusterStats) PrintStats()

func (*ClusterStats) Quit

func (c *ClusterStats) Quit()

func (*ClusterStats) Run

func (c *ClusterStats) Run()

func (*ClusterStats) SendNodeProcState

func (c *ClusterStats) SendNodeProcState(st *ProcStat)

called by processors, in different go routines/threads

type Config

type Config struct {
	AlgVersion uint32
	NumZones   uint32
	NumShards  uint32
	ConnInfo   [][]string

	//SSHosts and SSPorts are used to generate ConnInfo ONLY when ConnInfo not defined
	SSHosts [][]string
	SSPorts []uint16
}

func (*Config) Dump

func (c *Config) Dump()

func (*Config) GetMaxNumHostsPerZone

func (c *Config) GetMaxNumHostsPerZone() int

func (*Config) Read

func (c *Config) Read(repo IConfigRepo) error

func (*Config) Validate

func (c *Config) Validate() error

type Filter

type Filter struct {
	// contains filtered or unexported fields
}

func NewFilter

func NewFilter(numZones int) (filter *Filter)

func (*Filter) ExpandNodes

func (filter *Filter) ExpandNodes(workZones []*Zone, cutoff []int, numShards uint32) (zones []*Zone)

func (*Filter) Get

func (f *Filter) Get(zoneid int, shardid uint32) (nodeid int)

func (*Filter) InitZone

func (f *Filter) InitZone(zone int)

func (*Filter) NumZones

func (f *Filter) NumZones() int

func (*Filter) SetBase

func (f *Filter) SetBase(val int)

type IConfigRepo

type IConfigRepo interface {
	GetClusterConfig(c *Config) error
}

type IReader

type IReader interface {
	// for proxy
	Read(c *Cluster) (version uint32, err error)

	// for storage server
	ReadWithRedistInfo(c *Cluster) (version uint32, err error)

	// for cluster manager
	ReadWithRedistNodeShards(c *Cluster) (err error)
}

type IWriter

type IWriter interface {
	Write(c *Cluster, version ...uint32) (err error)

	// write redistribution info
	WriteRedistInfo(c *Cluster, nc *Cluster) (err error)

	// write redistibution start/stop
	WriteRedistStart(c *Cluster, flag bool, zoneid int, src bool, ratelimit int) (err error)

	WriteRedistAbort(c *Cluster) (err error)

	WriteRedistResume(zoneid int, ratelimit int) (err error)
}

type Node

type Node struct {
	Zoneid          uint32
	Nodeid          uint32
	PrimaryShards   []uint32
	SecondaryShards []uint32
}

Node class represent a logic node

func NewNode

func NewNode(zoneid uint32, nodeid uint32) *Node

func (*Node) GetShards

func (n *Node) GetShards() (shards []uint32)

func (*Node) InitNode

func (n *Node) InitNode(zoneid uint32, nodeid uint32)

func (*Node) Log

func (n *Node) Log()

func (*Node) NodeToString

func (n *Node) NodeToString(priSecDelimiter string, shardDelimiter string) string

func (*Node) Print

func (n *Node) Print(short bool)

func (*Node) StringToNode

func (n *Node) StringToNode(zoneid uint32, nodeid uint32, val string,
	priSecDelimiter string, shardDelimiter string) error

type NodeStat

type NodeStat struct {
	// contains filtered or unexported fields
}

type OutboundSSProcessor

type OutboundSSProcessor struct {
	io.OutboundProcessor
	// contains filtered or unexported fields
}

func (*OutboundSSProcessor) GetNodeInfo

func (p *OutboundSSProcessor) GetNodeInfo() (zoneid int, hostid int)

func (*OutboundSSProcessor) Name

func (p *OutboundSSProcessor) Name() string

func (*OutboundSSProcessor) OnConnectError

func (p *OutboundSSProcessor) OnConnectError(timeTaken time.Duration, connStr string, err error)

func (*OutboundSSProcessor) OnConnectSuccess

func (p *OutboundSSProcessor) OnConnectSuccess(conn io.Conn, connector *io.OutboundConnector, timeTaken time.Duration)

type ProcStat

type ProcStat struct {
	// contains filtered or unexported fields
}

type ShardManager

type ShardManager struct {
	AlgVersion uint32 // default
	// contains filtered or unexported fields
}

func GetShardMgr

func GetShardMgr() *ShardManager

Return shardmgr that is active.

func (*ShardManager) DumpShardMap

func (p *ShardManager) DumpShardMap()

func (*ShardManager) GetBadShardHosts

func (p *ShardManager) GetBadShardHosts(level uint32) (hosts string)

func (*ShardManager) GetConnectivity

func (p *ShardManager) GetConnectivity() (connState [][]int)

func (*ShardManager) GetProcessors

func (p *ShardManager) GetProcessors(partId shard.ID) ([]*OutboundSSProcessor, error)

Used by admin worker

func (*ShardManager) GetProcessorsByKey

func (p *ShardManager) GetProcessorsByKey(key []byte) (shardId shard.ID, procs []*OutboundSSProcessor, err error)

Used by admin worker

func (*ShardManager) GetSSConnectivityStats

func (p *ShardManager) GetSSConnectivityStats() (numOkShards uint32, numBadShards uint32, numWarnShards uint32, numAlertShards uint32)

this function is a little costly, don't call it frequently !!!

func (*ShardManager) GetSSProcessor

func (p *ShardManager) GetSSProcessor(zoneId int, nodeId int) *OutboundSSProcessor

func (*ShardManager) GetSSProcessors

func (p *ShardManager) GetSSProcessors(key []byte, confNumWrites int, procs []*OutboundSSProcessor, pos []int) (shardId shard.ID, numProcs int)

used by request processor the caller's responsibility to make sure cap(procs) >= numZones and cap(pos) >= numZones

func (*ShardManager) GetShardInfoByKey

func (p *ShardManager) GetShardInfoByKey(key []byte) ([]uint32, []uint32, error)

func (*ShardManager) GetShardMap

func (p *ShardManager) GetShardMap() *ShardMap

func (*ShardManager) IsConnected

func (p *ShardManager) IsConnected(zone int, zoneIndex int) bool

func (*ShardManager) IsConnectivityOk

func (p *ShardManager) IsConnectivityOk() bool

func (*ShardManager) SendStats

func (p *ShardManager) SendStats(zoneid int, hostid int, timeout bool, proctime int64)

func (*ShardManager) Shutdown

func (p *ShardManager) Shutdown(curMgr *ShardManager)

func (*ShardManager) StatsEnabled

func (p *ShardManager) StatsEnabled() bool

func (*ShardManager) WriteProcessorsStats

func (p *ShardManager) WriteProcessorsStats(w goio.Writer)

func (*ShardManager) WriteProcessorsStatsByShards

func (p *ShardManager) WriteProcessorsStatsByShards(w goio.Writer)

type ShardMap

type ShardMap struct {
	// contains filtered or unexported fields
}

func NewShardMap

func NewShardMap(c *Cluster) *ShardMap

func (*ShardMap) Dump

func (m *ShardMap) Dump()

func (*ShardMap) GetNodeId

func (m *ShardMap) GetNodeId(shardid uint32, zoneid uint32) (nodeid uint32, err error)

func (*ShardMap) GetNodes

func (m *ShardMap) GetNodes(id uint32, start_zoneid uint32) ([]uint32, []uint32, error)

func (*ShardMap) LogConnectivity

func (m *ShardMap) LogConnectivity(connState [][]int, id int32, log bool) bool

/FIXME this function needs to be change because of the shard Map change

func (*ShardMap) Populate

func (m *ShardMap) Populate(c *Cluster)

type ShardMapEntry

type ShardMapEntry struct {
	// contains filtered or unexported fields
}

type StatsConfig

type StatsConfig struct {
	TimeoutStatsEnabled    bool
	RespTimeStatsEnabled   bool
	MarkdownThreashold     uint32
	MarkdownExpirationBase uint32
	EMARespTimeWindowSize  uint32
	TimeoutWindowSize      uint32
	TimeoutWindowUint      uint32
}

type Zone

type Zone struct {
	Zoneid   uint32
	NumNodes uint32
	Nodes    []Node
}

Node class represent a logic node

func NewZoneFromConfig

func NewZoneFromConfig(zoneid uint32, numNodes uint32, numZones uint32, numShards uint32) *Zone

func NewZones

func NewZones(numZones uint32, numShards uint32, cutoff []int) (zones []*Zone)

func (*Zone) Display

func (z *Zone) Display()

func (*Zone) Log

func (z *Zone) Log()

func (*Zone) Print

func (z *Zone) Print()

type ZoneMarkDown

type ZoneMarkDown struct {
	// contains filtered or unexported fields
}

func GetMarkDownObj

func GetMarkDownObj() (obj *ZoneMarkDown)

func (*ZoneMarkDown) CheckMarkDown

func (m *ZoneMarkDown) CheckMarkDown() (markdown bool, zoneid int32)

func (*ZoneMarkDown) MarkDown

func (m *ZoneMarkDown) MarkDown(zoneid int32)

func (*ZoneMarkDown) Reset

func (m *ZoneMarkDown) Reset()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL