Documentation ¶
Index ¶
- Constants
- Variables
- func DisplayZones(zones []*Zone, header string)
- func Finalize()
- func InitShardMgr(ccfg *Cluster, conf *io.OutboundConfig, statscfg *StatsConfig) (err error)
- func Initialize(args ...interface{}) (err error)
- func IsNewMappingAlg() bool
- func IsPrimary(shardid uint32, zoneid uint32, numZones uint32) bool
- func MatchZones(zones []*Zone, curr []*Zone) bool
- func SetMappingAlg(algVersion uint32)
- func ValidateZones(zones []*Zone) bool
- type Cluster
- func (c *Cluster) CreateShardMap() *ShardMap
- func (c *Cluster) Dump(deep bool)
- func (c *Cluster) DumpChangeMap(newCluster *Cluster)
- func (c *Cluster) GetShards(zoneid uint32, nodeid uint32) (shards []uint32, err error)
- func (c *Cluster) IsRedistZone(zoneid int) bool
- func (c *Cluster) Log()
- func (c *Cluster) MergeWith(other *Cluster) string
- func (c *Cluster) PopulateFromConfig() (err error)
- func (c *Cluster) PopulateFromRedist(currZones []*Zone)
- func (c *Cluster) Read(r IReader) (version uint32, err error)
- func (c *Cluster) ReadFromCache(cacheName string) (version uint32, err2 error)
- func (c *Cluster) ReadWithRedistInfo(r IReader) (version uint32, err error)
- func (c *Cluster) ReadWithRedistNodeShards(r IReader) (err error)
- func (c *Cluster) ReadWithRetry(r IReader, cacheFile string, version uint32) (ver uint32, err error)
- func (c *Cluster) SetRedistZone(zoneid int)
- func (c *Cluster) Validate() error
- func (c *Cluster) Write(w IWriter, version ...uint32) (err error)
- func (c *Cluster) WriteRedistAbort(w IWriter) (err error)
- func (c *Cluster) WriteRedistInfo(w IWriter, nc *Cluster) (err error)
- func (c *Cluster) WriteRedistStart(w IWriter, flag bool, zoneid int, src bool, ratelimit int) (err error)
- func (c *Cluster) WriteToCache(cachePath string, cacheName string, version uint32, forRedist bool) (err error)
- type ClusterCache
- type ClusterStats
- type Config
- type Filter
- type IConfigRepo
- type IReader
- type IWriter
- type Node
- func (n *Node) GetShards() (shards []uint32)
- func (n *Node) InitNode(zoneid uint32, nodeid uint32)
- func (n *Node) Log()
- func (n *Node) NodeToString(priSecDelimiter string, shardDelimiter string) string
- func (n *Node) Print(short bool)
- func (n *Node) StringToNode(zoneid uint32, nodeid uint32, val string, priSecDelimiter string, ...) error
- type NodeStat
- type OutboundSSProcessor
- func (p *OutboundSSProcessor) GetNodeInfo() (zoneid int, hostid int)
- func (p *OutboundSSProcessor) Name() string
- func (p *OutboundSSProcessor) OnConnectError(timeTaken time.Duration, connStr string, err error)
- func (p *OutboundSSProcessor) OnConnectSuccess(conn io.Conn, connector *io.OutboundConnector, timeTaken time.Duration)
- type ProcStat
- type ShardManager
- func (p *ShardManager) DumpShardMap()
- func (p *ShardManager) GetBadShardHosts(level uint32) (hosts string)
- func (p *ShardManager) GetConnectivity() (connState [][]int)
- func (p *ShardManager) GetProcessors(partId shard.ID) ([]*OutboundSSProcessor, error)
- func (p *ShardManager) GetProcessorsByKey(key []byte) (shardId shard.ID, procs []*OutboundSSProcessor, err error)
- func (p *ShardManager) GetSSConnectivityStats() (numOkShards uint32, numBadShards uint32, numWarnShards uint32, ...)
- func (p *ShardManager) GetSSProcessor(zoneId int, nodeId int) *OutboundSSProcessor
- func (p *ShardManager) GetSSProcessors(key []byte, confNumWrites int, procs []*OutboundSSProcessor, pos []int) (shardId shard.ID, numProcs int)
- func (p *ShardManager) GetShardInfoByKey(key []byte) ([]uint32, []uint32, error)
- func (p *ShardManager) GetShardMap() *ShardMap
- func (p *ShardManager) IsConnected(zone int, zoneIndex int) bool
- func (p *ShardManager) IsConnectivityOk() bool
- func (p *ShardManager) SendStats(zoneid int, hostid int, timeout bool, proctime int64)
- func (p *ShardManager) Shutdown(curMgr *ShardManager)
- func (p *ShardManager) StatsEnabled() bool
- func (p *ShardManager) WriteProcessorsStats(w goio.Writer)
- func (p *ShardManager) WriteProcessorsStatsByShards(w goio.Writer)
- type ShardMap
- func (m *ShardMap) Dump()
- func (m *ShardMap) GetNodeId(shardid uint32, zoneid uint32) (nodeid uint32, err error)
- func (m *ShardMap) GetNodes(id uint32, start_zoneid uint32) ([]uint32, []uint32, error)
- func (m *ShardMap) LogConnectivity(connState [][]int, id int32, log bool) bool
- func (m *ShardMap) Populate(c *Cluster)
- type ShardMapEntry
- type StatsConfig
- type Zone
- type ZoneMarkDown
Constants ¶
View Source
const ( MaxShards = 1024 MaxZone = 5 )
Variables ¶
View Source
var ( Version uint32 = 0 ClusterInfo [2]Cluster )
View Source
var ( DefaultStatsConfig = StatsConfig{ TimeoutStatsEnabled: false, RespTimeStatsEnabled: false, MarkdownThreashold: 10, MarkdownExpirationBase: 5 * 60, EMARespTimeWindowSize: 39, TimeoutWindowSize: 5, TimeoutWindowUint: 60, } )
Functions ¶
func DisplayZones ¶
func InitShardMgr ¶
func InitShardMgr(ccfg *Cluster, conf *io.OutboundConfig, statscfg *StatsConfig) (err error)
func Initialize ¶
func Initialize(args ...interface{}) (err error)
func IsNewMappingAlg ¶
func IsNewMappingAlg() bool
func MatchZones ¶
zones: expected curr: current cluster info in etcd.
func SetMappingAlg ¶
func SetMappingAlg(algVersion uint32)
func ValidateZones ¶
Validate shardids are unique and consecutive.
Types ¶
type Cluster ¶
type Cluster struct { Config Zones []*Zone RedistSingleZone bool // commit redist one zone only RedistZoneId int // zone selected for commit one zone. }
func (*Cluster) CreateShardMap ¶
func (*Cluster) DumpChangeMap ¶
func (*Cluster) IsRedistZone ¶
func (*Cluster) PopulateFromConfig ¶
used if reading cluster from config file
func (*Cluster) PopulateFromRedist ¶
Populate shardmap for a config currZones is the existing.
func (*Cluster) ReadFromCache ¶
func (*Cluster) ReadWithRedistInfo ¶
func (*Cluster) ReadWithRedistNodeShards ¶
func (*Cluster) ReadWithRetry ¶
func (*Cluster) SetRedistZone ¶
func (*Cluster) WriteRedistAbort ¶
func (*Cluster) WriteRedistInfo ¶
func (*Cluster) WriteRedistStart ¶
type ClusterCache ¶
type ClusterStats ¶
type ClusterStats struct { MarkdownTable []bool // 1 writer: stats go routine, M readers: any request proc MarkdownExpiration []int64 // only accessed by stats go routine // contains filtered or unexported fields }
func NewClusterStats ¶
func NewClusterStats(numZones uint32, maxNodesPerZone uint32, conf *StatsConfig) *ClusterStats
func (*ClusterStats) IsMarkeddown ¶
func (c *ClusterStats) IsMarkeddown(zoneid uint32, nodeid uint32) bool
func (*ClusterStats) PrintMarkDown ¶
func (c *ClusterStats) PrintMarkDown()
func (*ClusterStats) PrintStats ¶
func (c *ClusterStats) PrintStats()
func (*ClusterStats) Quit ¶
func (c *ClusterStats) Quit()
func (*ClusterStats) Run ¶
func (c *ClusterStats) Run()
func (*ClusterStats) SendNodeProcState ¶
func (c *ClusterStats) SendNodeProcState(st *ProcStat)
called by processors, in different go routines/threads
type Config ¶
type Config struct { AlgVersion uint32 NumZones uint32 NumShards uint32 ConnInfo [][]string //SSHosts and SSPorts are used to generate ConnInfo ONLY when ConnInfo not defined SSHosts [][]string SSPorts []uint16 }
func (*Config) GetMaxNumHostsPerZone ¶
func (*Config) Read ¶
func (c *Config) Read(repo IConfigRepo) error
type Filter ¶
type Filter struct {
// contains filtered or unexported fields
}
func (*Filter) ExpandNodes ¶
type IConfigRepo ¶
type IWriter ¶
type IWriter interface { Write(c *Cluster, version ...uint32) (err error) // write redistribution info WriteRedistInfo(c *Cluster, nc *Cluster) (err error) // write redistibution start/stop WriteRedistStart(c *Cluster, flag bool, zoneid int, src bool, ratelimit int) (err error) WriteRedistAbort(c *Cluster) (err error) WriteRedistResume(zoneid int, ratelimit int) (err error) }
type OutboundSSProcessor ¶
type OutboundSSProcessor struct { io.OutboundProcessor // contains filtered or unexported fields }
func (*OutboundSSProcessor) GetNodeInfo ¶
func (p *OutboundSSProcessor) GetNodeInfo() (zoneid int, hostid int)
func (*OutboundSSProcessor) Name ¶
func (p *OutboundSSProcessor) Name() string
func (*OutboundSSProcessor) OnConnectError ¶
func (p *OutboundSSProcessor) OnConnectError(timeTaken time.Duration, connStr string, err error)
func (*OutboundSSProcessor) OnConnectSuccess ¶
func (p *OutboundSSProcessor) OnConnectSuccess(conn io.Conn, connector *io.OutboundConnector, timeTaken time.Duration)
type ShardManager ¶
type ShardManager struct { AlgVersion uint32 // default // contains filtered or unexported fields }
func (*ShardManager) DumpShardMap ¶
func (p *ShardManager) DumpShardMap()
func (*ShardManager) GetBadShardHosts ¶
func (p *ShardManager) GetBadShardHosts(level uint32) (hosts string)
func (*ShardManager) GetConnectivity ¶
func (p *ShardManager) GetConnectivity() (connState [][]int)
func (*ShardManager) GetProcessors ¶
func (p *ShardManager) GetProcessors(partId shard.ID) ([]*OutboundSSProcessor, error)
Used by admin worker
func (*ShardManager) GetProcessorsByKey ¶
func (p *ShardManager) GetProcessorsByKey(key []byte) (shardId shard.ID, procs []*OutboundSSProcessor, err error)
Used by admin worker
func (*ShardManager) GetSSConnectivityStats ¶
func (p *ShardManager) GetSSConnectivityStats() (numOkShards uint32, numBadShards uint32, numWarnShards uint32, numAlertShards uint32)
this function is a little costly, don't call it frequently !!!
func (*ShardManager) GetSSProcessor ¶
func (p *ShardManager) GetSSProcessor(zoneId int, nodeId int) *OutboundSSProcessor
func (*ShardManager) GetSSProcessors ¶
func (p *ShardManager) GetSSProcessors(key []byte, confNumWrites int, procs []*OutboundSSProcessor, pos []int) (shardId shard.ID, numProcs int)
used by request processor the caller's responsibility to make sure cap(procs) >= numZones and cap(pos) >= numZones
func (*ShardManager) GetShardInfoByKey ¶
func (p *ShardManager) GetShardInfoByKey(key []byte) ([]uint32, []uint32, error)
func (*ShardManager) GetShardMap ¶
func (p *ShardManager) GetShardMap() *ShardMap
func (*ShardManager) IsConnected ¶
func (p *ShardManager) IsConnected(zone int, zoneIndex int) bool
func (*ShardManager) IsConnectivityOk ¶
func (p *ShardManager) IsConnectivityOk() bool
func (*ShardManager) SendStats ¶
func (p *ShardManager) SendStats(zoneid int, hostid int, timeout bool, proctime int64)
func (*ShardManager) Shutdown ¶
func (p *ShardManager) Shutdown(curMgr *ShardManager)
func (*ShardManager) StatsEnabled ¶
func (p *ShardManager) StatsEnabled() bool
func (*ShardManager) WriteProcessorsStats ¶
func (p *ShardManager) WriteProcessorsStats(w goio.Writer)
func (*ShardManager) WriteProcessorsStatsByShards ¶
func (p *ShardManager) WriteProcessorsStatsByShards(w goio.Writer)
type ShardMap ¶
type ShardMap struct {
// contains filtered or unexported fields
}
func NewShardMap ¶
func (*ShardMap) LogConnectivity ¶
/FIXME this function needs to be change because of the shard Map change
type ShardMapEntry ¶
type ShardMapEntry struct {
// contains filtered or unexported fields
}
type StatsConfig ¶
type ZoneMarkDown ¶
type ZoneMarkDown struct {
// contains filtered or unexported fields
}
func GetMarkDownObj ¶
func GetMarkDownObj() (obj *ZoneMarkDown)
func (*ZoneMarkDown) CheckMarkDown ¶
func (m *ZoneMarkDown) CheckMarkDown() (markdown bool, zoneid int32)
func (*ZoneMarkDown) MarkDown ¶
func (m *ZoneMarkDown) MarkDown(zoneid int32)
func (*ZoneMarkDown) Reset ¶
func (m *ZoneMarkDown) Reset()
Click to show internal directories.
Click to hide internal directories.