Documentation ¶
Index ¶
- Constants
- Variables
- func AddCounter(name string) uint32
- func CheckKeyIfExist(err error) bool
- func DecodeMessagesFromRaw(data []byte, msgs []*nsqd.Message, tmpbuf []byte) ([]*nsqd.Message, error)
- func ExtractRpcAddrFromID(nid string) string
- func FilterList(l []string, filter []string) []string
- func FindSlice(in []string, e string) int
- func GenNsqLookupNodeID(n *NsqLookupdNodeInfo, extra string) string
- func GenNsqdNodeID(n *NsqdNodeInfo, extra string) string
- func GetLogDataSize() int
- func GetNextLogOffset(cur int64) int64
- func GetPartitionFromMsgID(id int64) int
- func GetPrevLogOffset(cur int64) int64
- func GetTopicPartitionBasePath(rootPath string, topic string, partition int) string
- func GetTopicPartitionFileName(topic string, partition int, suffix string) string
- func GetTopicPartitionLogPath(basepath, t string, p int) string
- func IncCounter(id uint32)
- func IncCounterBy(id uint32, amount uint64)
- func IsEtcdNodeExist(err error) bool
- func IsEtcdNotFile(err error) bool
- func MergeList(l []string, r []string) []string
- func NewEtcdClient(etcdHost string) *etcd.Client
- func NewNsqdCoordGRpcServer(coord *NsqdCoordinator, rootPath string) *nsqdCoordGRpcServer
- func RetryWithTimeout(fn func() error) error
- func SetCoordLogLevel(level int32)
- func SetCoordLogger(log levellogger.Logger, level int32)
- func SetEtcdLogger(log etcdlock.Logger, level int32)
- type By
- type CatchupStat
- type ChannelConsumeMgr
- type ChannelConsumerOffset
- type CntComparator
- type CommitLogData
- type CommonCoordErr
- type ConsistentStore
- type CoordErr
- func (self *CoordErr) CanRetryWrite(retryTimes int) bool
- func (self *CoordErr) HasError() bool
- func (self *CoordErr) IsEqual(other *CoordErr) bool
- func (self *CoordErr) IsLocalErr() bool
- func (self *CoordErr) IsNetErr() bool
- func (self *CoordErr) String() string
- func (self *CoordErr) ToErrorType() error
- type CoordErrStats
- type CoordErrStatsData
- type CoordErrType
- type CoordStats
- type DataPlacement
- type EpochType
- type ErrRPCRetCode
- type ICommitLogComparator
- type INsqlookupRemoteProxy
- type ISRStat
- type IntHeap
- type JoinISRState
- type LogStartInfo
- type MasterChanInfo
- type MsgIDComparator
- type MsgOffsetComparator
- type MsgTimestampComparator
- type NSQDLeadership
- type NSQLookupdLeadership
- type NodeTopicStats
- func (self *NodeTopicStats) GetMostBusyAndIdleTopicWriteLevel(leaderOnly bool) (string, string, float64, float64)
- func (self *NodeTopicStats) GetNodeAvgReadLevel() float64
- func (self *NodeTopicStats) GetNodeAvgWriteLevel() float64
- func (self *NodeTopicStats) GetNodeLeaderLoadFactor() float64
- func (self *NodeTopicStats) GetNodeLoadFactor() (float64, float64)
- func (self *NodeTopicStats) GetNodePeakLevelList() []int64
- func (self *NodeTopicStats) GetSortedTopicWriteLevel(leaderOnly bool) topicLFListT
- func (self *NodeTopicStats) GetTopicAvgWriteLevel(topicFullName string) float64
- func (self *NodeTopicStats) GetTopicLeaderLoadFactor(topicFullName string) float64
- func (self *NodeTopicStats) GetTopicLoadFactor(topicFullName string) float64
- func (self *NodeTopicStats) GetTopicPeakLevel(topic TopicPartitionID) float64
- func (self *NodeTopicStats) LeaderLessLoader(other *NodeTopicStats) bool
- func (self *NodeTopicStats) SlaveLessLoader(other *NodeTopicStats) bool
- type NsqLookupCoordRpcServer
- func (self *NsqLookupCoordRpcServer) ReadyForTopicISR(req *RpcReadyForISR) *CoordErr
- func (self *NsqLookupCoordRpcServer) RequestCheckTopicConsistence(req *RpcReqCheckTopic) *CoordErr
- func (self *NsqLookupCoordRpcServer) RequestJoinCatchup(req *RpcReqJoinCatchup) *CoordErr
- func (self *NsqLookupCoordRpcServer) RequestJoinTopicISR(req *RpcReqJoinISR) *CoordErr
- func (self *NsqLookupCoordRpcServer) RequestLeaveFromISR(req *RpcReqLeaveFromISR) *CoordErr
- func (self *NsqLookupCoordRpcServer) RequestLeaveFromISRByLeader(req *RpcReqLeaveFromISRByLeader) *CoordErr
- func (self *NsqLookupCoordRpcServer) RequestNotifyNewTopicInfo(req *RpcReqNewTopicInfo) *CoordErr
- type NsqLookupCoordinator
- func (self *NsqLookupCoordinator) ChangeTopicMetaParam(topic string, newSyncEvery int, newRetentionDay int, newReplicator int) error
- func (self *NsqLookupCoordinator) CreateTopic(topic string, meta TopicMetaInfo) error
- func (self *NsqLookupCoordinator) DeleteTopic(topic string, partition string) error
- func (self *NsqLookupCoordinator) DeleteTopicForce(topic string, partition string) error
- func (self *NsqLookupCoordinator) ExpandTopicPartition(topic string, newPartitionNum int) error
- func (self *NsqLookupCoordinator) GetAllLookupdNodes() ([]NsqLookupdNodeInfo, error)
- func (self *NsqLookupCoordinator) GetClusterNodeLoadFactor() (map[string]float64, map[string]float64)
- func (self *NsqLookupCoordinator) GetLookupLeader() NsqLookupdNodeInfo
- func (self *NsqLookupCoordinator) GetTopicLeaderNodes(topicName string) (map[string]string, error)
- func (self *NsqLookupCoordinator) GetTopicMetaInfo(topicName string) (TopicMetaInfo, error)
- func (self *NsqLookupCoordinator) IsClusterStable() bool
- func (self *NsqLookupCoordinator) IsMineLeader() bool
- func (self *NsqLookupCoordinator) IsTopicLeader(topic string, part int, nid string) bool
- func (self *NsqLookupCoordinator) MarkNodeAsRemoving(nid string) error
- func (self *NsqLookupCoordinator) MoveTopicPartitionDataByManual(topicName string, partitionID int, moveLeader bool, fromNode string, ...) error
- func (self *NsqLookupCoordinator) SetClusterUpgradeState(upgrading bool) error
- func (self *NsqLookupCoordinator) SetLeadershipMgr(l NSQLookupdLeadership)
- func (self *NsqLookupCoordinator) Start() error
- func (self *NsqLookupCoordinator) Stop()
- type NsqLookupRpcClient
- func (self *NsqLookupRpcClient) CallWithRetry(method string, arg interface{}) (interface{}, error)
- func (self *NsqLookupRpcClient) Close()
- func (self *NsqLookupRpcClient) ReadyForTopicISR(topic string, partition int, nid string, leaderSession *TopicLeaderSession, ...) *CoordErr
- func (self *NsqLookupRpcClient) Reconnect() error
- func (self *NsqLookupRpcClient) RemoteAddr() string
- func (self *NsqLookupRpcClient) RequestCheckTopicConsistence(topic string, partition int)
- func (self *NsqLookupRpcClient) RequestJoinCatchup(topic string, partition int, nid string) *CoordErr
- func (self *NsqLookupRpcClient) RequestJoinTopicISR(topic string, partition int, nid string) *CoordErr
- func (self *NsqLookupRpcClient) RequestLeaveFromISR(topic string, partition int, nid string) *CoordErr
- func (self *NsqLookupRpcClient) RequestLeaveFromISRByLeader(topic string, partition int, nid string, leaderSession *TopicLeaderSession) *CoordErr
- func (self *NsqLookupRpcClient) RequestNotifyNewTopicInfo(topic string, partition int, nid string)
- func (self *NsqLookupRpcClient) ShouldRemoved() bool
- type NsqLookupdEtcdMgr
- func (self *NsqLookupdEtcdMgr) AcquireAndWatchLeader(leader chan *NsqLookupdNodeInfo, stop chan struct{})
- func (self *NsqLookupdEtcdMgr) CheckIfLeader(session string) bool
- func (self *NsqLookupdEtcdMgr) CreateTopic(topic string, meta *TopicMetaInfo) error
- func (self *NsqLookupdEtcdMgr) CreateTopicPartition(topic string, partition int) error
- func (self *NsqLookupdEtcdMgr) DeleteTopic(topic string, partition int) error
- func (self *NsqLookupdEtcdMgr) DeleteWholeTopic(topic string) error
- func (self *NsqLookupdEtcdMgr) GetAllLookupdNodes() ([]NsqLookupdNodeInfo, error)
- func (self *NsqLookupdEtcdMgr) GetClusterEpoch() (EpochType, error)
- func (self *NsqLookupdEtcdMgr) GetNsqdNodes() ([]NsqdNodeInfo, error)
- func (self *NsqLookupdEtcdMgr) GetTopicInfo(topic string, partition int) (*TopicPartitionMetaInfo, error)
- func (self *NsqLookupdEtcdMgr) GetTopicLeaderSession(topic string, partition int) (*TopicLeaderSession, error)
- func (self *NsqLookupdEtcdMgr) GetTopicMetaInfo(topic string) (TopicMetaInfo, EpochType, error)
- func (self *NsqLookupdEtcdMgr) InitClusterID(id string)
- func (self *NsqLookupdEtcdMgr) IsExistTopic(topic string) (bool, error)
- func (self *NsqLookupdEtcdMgr) IsExistTopicPartition(topic string, partitionNum int) (bool, error)
- func (self *NsqLookupdEtcdMgr) Register(value *NsqLookupdNodeInfo) error
- func (self *NsqLookupdEtcdMgr) ReleaseTopicLeader(topic string, partition int, session *TopicLeaderSession) error
- func (self *NsqLookupdEtcdMgr) ScanTopics() ([]TopicPartitionMetaInfo, error)
- func (self *NsqLookupdEtcdMgr) Stop()
- func (self *NsqLookupdEtcdMgr) Unregister(value *NsqLookupdNodeInfo) error
- func (self *NsqLookupdEtcdMgr) UpdateLookupEpoch(oldGen EpochType) (EpochType, error)
- func (self *NsqLookupdEtcdMgr) UpdateTopicMetaInfo(topic string, meta *TopicMetaInfo, oldGen EpochType) error
- func (self *NsqLookupdEtcdMgr) UpdateTopicNodeInfo(topic string, partition int, topicInfo *TopicPartitionReplicaInfo, ...) error
- func (self *NsqLookupdEtcdMgr) WatchNsqdNodes(nsqds chan []NsqdNodeInfo, stop chan struct{})
- func (self *NsqLookupdEtcdMgr) WatchTopicLeader(leader chan *TopicLeaderSession, stop chan struct{}) error
- type NsqLookupdNodeInfo
- type NsqdCoordRpcServer
- func (self *NsqdCoordRpcServer) DeleteChannel(info *RpcChannelOffsetArg) *CoordErr
- func (self *NsqdCoordRpcServer) DeleteNsqdTopic(rpcTopicReq *RpcAdminTopicInfo) *CoordErr
- func (self *NsqdCoordRpcServer) DisableTopicWrite(rpcTopicReq *RpcAdminTopicInfo) *CoordErr
- func (self *NsqdCoordRpcServer) EnableTopicWrite(rpcTopicReq *RpcAdminTopicInfo) *CoordErr
- func (self *NsqdCoordRpcServer) GetCommitLogFromOffset(req *RpcCommitLogReq) *RpcCommitLogRsp
- func (self *NsqdCoordRpcServer) GetFullSyncInfo(req *RpcGetFullSyncInfoReq) (*RpcGetFullSyncInfoRsp, error)
- func (self *NsqdCoordRpcServer) GetLastCommitLogID(req *RpcCommitLogReq) (int64, error)
- func (self *NsqdCoordRpcServer) GetTopicStats(topic string) *NodeTopicStats
- func (self *NsqdCoordRpcServer) IsTopicWriteDisabled(rpcTopicReq *RpcAdminTopicInfo) bool
- func (self *NsqdCoordRpcServer) NotifyAcquireTopicLeader(rpcTopicReq *RpcAcquireTopicLeaderReq) *CoordErr
- func (self *NsqdCoordRpcServer) NotifyReleaseTopicLeader(rpcTopicReq *RpcReleaseTopicLeaderReq) *CoordErr
- func (self *NsqdCoordRpcServer) NotifyTopicLeaderSession(rpcTopicReq *RpcTopicLeaderSession) *CoordErr
- func (self *NsqdCoordRpcServer) PullCommitLogsAndData(req *RpcPullCommitLogsReq) (*RpcPullCommitLogsRsp, error)
- func (self *NsqdCoordRpcServer) PutMessage(info *RpcPutMessage) *CoordErr
- func (self *NsqdCoordRpcServer) PutMessages(info *RpcPutMessages) *CoordErr
- func (self *NsqdCoordRpcServer) TestRpcCoordErr(req *RpcTestReq) *CoordErr
- func (self *NsqdCoordRpcServer) TestRpcError(req *RpcTestReq) *RpcTestRsp
- func (self *NsqdCoordRpcServer) TestRpcTimeout() error
- func (self *NsqdCoordRpcServer) TriggerLookupChanged() error
- func (self *NsqdCoordRpcServer) UpdateChannelOffset(info *RpcChannelOffsetArg) *CoordErr
- func (self *NsqdCoordRpcServer) UpdateTopicInfo(rpcTopicReq *RpcAdminTopicInfo) *CoordErr
- type NsqdCoordinator
- func (self *NsqdCoordinator) DeleteChannel(topic *nsqd.Topic, channelName string) error
- func (self *NsqdCoordinator) FinishMessageToCluster(channel *nsqd.Channel, clientID int64, clientAddr string, msgID nsqd.MessageID) error
- func (self *NsqdCoordinator) GetAllLookupdNodes() ([]NsqLookupdNodeInfo, error)
- func (self *NsqdCoordinator) GetCurrentLookupd() NsqLookupdNodeInfo
- func (self *NsqdCoordinator) GetMasterTopicCoordData(topic string) (int, *coordData, error)
- func (self *NsqdCoordinator) GetMyID() string
- func (self *NsqdCoordinator) IsMineLeaderForTopic(topic string, part int) bool
- func (self *NsqdCoordinator) PutMessageToCluster(topic *nsqd.Topic, body []byte, traceID uint64) (nsqd.MessageID, nsqd.BackendOffset, int32, nsqd.BackendQueueEnd, error)
- func (self *NsqdCoordinator) PutMessagesToCluster(topic *nsqd.Topic, msgs []*nsqd.Message) (nsqd.MessageID, nsqd.BackendOffset, int32, error)
- func (self *NsqdCoordinator) SearchLogByMsgCnt(topic string, part int, count int64) (*CommitLogData, int64, int64, error)
- func (self *NsqdCoordinator) SearchLogByMsgID(topic string, part int, msgID int64) (*CommitLogData, int64, int64, error)
- func (self *NsqdCoordinator) SearchLogByMsgOffset(topic string, part int, offset int64) (*CommitLogData, int64, int64, error)
- func (self *NsqdCoordinator) SearchLogByMsgTimestamp(topic string, part int, ts_sec int64) (*CommitLogData, int64, int64, error)
- func (self *NsqdCoordinator) SetChannelConsumeOffsetToCluster(ch *nsqd.Channel, queueOffset int64, cnt int64, force bool) error
- func (self *NsqdCoordinator) SetLeadershipMgr(l NSQDLeadership)
- func (self *NsqdCoordinator) Start() error
- func (self *NsqdCoordinator) Stats(topic string, part int) *CoordStats
- func (self *NsqdCoordinator) Stop()
- type NsqdEtcdMgr
- func (self *NsqdEtcdMgr) AcquireTopicLeader(topic string, partition int, nodeData *NsqdNodeInfo, epoch EpochType) error
- func (self *NsqdEtcdMgr) GetAllLookupdNodes() ([]NsqLookupdNodeInfo, error)
- func (self *NsqdEtcdMgr) GetTopicInfo(topic string, partition int) (*TopicPartitionMetaInfo, error)
- func (self *NsqdEtcdMgr) GetTopicLeaderSession(topic string, partition int) (*TopicLeaderSession, error)
- func (self *NsqdEtcdMgr) InitClusterID(id string)
- func (self *NsqdEtcdMgr) RegisterNsqd(nodeData *NsqdNodeInfo) error
- func (self *NsqdEtcdMgr) ReleaseTopicLeader(topic string, partition int, session *TopicLeaderSession) error
- func (self *NsqdEtcdMgr) UnregisterNsqd(nodeData *NsqdNodeInfo) error
- func (self *NsqdEtcdMgr) WatchLookupdLeader(leader chan *NsqLookupdNodeInfo, stop chan struct{}) error
- type NsqdNodeInfo
- type NsqdNodeLoadFactor
- type NsqdRpcClient
- func (self *NsqdRpcClient) CallFast(method string, arg interface{}) (interface{}, error)
- func (self *NsqdRpcClient) CallRpcTest(data string) (string, *CoordErr)
- func (self *NsqdRpcClient) CallRpcTestCoordErr(data string) *CoordErr
- func (self *NsqdRpcClient) CallRpcTesttimeout(data string) error
- func (self *NsqdRpcClient) CallWithRetry(method string, arg interface{}) (interface{}, error)
- func (self *NsqdRpcClient) Close()
- func (self *NsqdRpcClient) DeleteChannel(leaderSession *TopicLeaderSession, info *TopicPartitionMetaInfo, ...) *CoordErr
- func (self *NsqdRpcClient) DeleteNsqdTopic(epoch EpochType, topicInfo *TopicPartitionMetaInfo) *CoordErr
- func (self *NsqdRpcClient) DisableTopicWrite(epoch EpochType, topicInfo *TopicPartitionMetaInfo) *CoordErr
- func (self *NsqdRpcClient) DisableTopicWriteFast(epoch EpochType, topicInfo *TopicPartitionMetaInfo) *CoordErr
- func (self *NsqdRpcClient) EnableTopicWrite(epoch EpochType, topicInfo *TopicPartitionMetaInfo) *CoordErr
- func (self *NsqdRpcClient) GetCommitLogFromOffset(topicInfo *TopicPartitionMetaInfo, logCountNumIndex int64, logIndex int64, ...) (bool, int64, int64, int64, CommitLogData, *CoordErr)
- func (self *NsqdRpcClient) GetFullSyncInfo(topic string, partition int) (*LogStartInfo, *CommitLogData, error)
- func (self *NsqdRpcClient) GetLastCommitLogID(topicInfo *TopicPartitionMetaInfo) (int64, *CoordErr)
- func (self *NsqdRpcClient) GetTopicStats(topic string) (*NodeTopicStats, error)
- func (self *NsqdRpcClient) IsTopicWriteDisabled(topicInfo *TopicPartitionMetaInfo) bool
- func (self *NsqdRpcClient) NotifyAcquireTopicLeader(epoch EpochType, topicInfo *TopicPartitionMetaInfo) *CoordErr
- func (self *NsqdRpcClient) NotifyReleaseTopicLeader(epoch EpochType, topicInfo *TopicPartitionMetaInfo, ...) *CoordErr
- func (self *NsqdRpcClient) NotifyTopicLeaderSession(epoch EpochType, topicInfo *TopicPartitionMetaInfo, ...) *CoordErr
- func (self *NsqdRpcClient) NotifyUpdateChannelOffset(leaderSession *TopicLeaderSession, info *TopicPartitionMetaInfo, ...) *CoordErr
- func (self *NsqdRpcClient) PullCommitLogsAndData(topic string, partition int, logCountNumIndex int64, logIndex int64, ...) ([]CommitLogData, [][]byte, error)
- func (self *NsqdRpcClient) PutMessage(leaderSession *TopicLeaderSession, info *TopicPartitionMetaInfo, ...) *CoordErr
- func (self *NsqdRpcClient) PutMessages(leaderSession *TopicLeaderSession, info *TopicPartitionMetaInfo, ...) *CoordErr
- func (self *NsqdRpcClient) Reconnect() error
- func (self *NsqdRpcClient) ShouldRemoved() bool
- func (self *NsqdRpcClient) TriggerLookupChanged() error
- func (self *NsqdRpcClient) UpdateChannelOffset(leaderSession *TopicLeaderSession, info *TopicPartitionMetaInfo, ...) *CoordErr
- func (self *NsqdRpcClient) UpdateTopicInfo(epoch EpochType, topicInfo *TopicPartitionMetaInfo) *CoordErr
- type Options
- type RpcAcquireTopicLeaderReq
- type RpcAdminTopicInfo
- type RpcChannelOffsetArg
- type RpcCommitLogReq
- type RpcCommitLogRsp
- type RpcFailedInfo
- type RpcGetFullSyncInfoReq
- type RpcGetFullSyncInfoRsp
- type RpcLookupReqBase
- type RpcPullCommitLogsReq
- type RpcPullCommitLogsRsp
- type RpcPutMessage
- type RpcPutMessages
- type RpcReadyForISR
- type RpcReleaseTopicLeaderReq
- type RpcReqCheckTopic
- type RpcReqJoinCatchup
- type RpcReqJoinISR
- type RpcReqLeaveFromISR
- type RpcReqLeaveFromISRByLeader
- type RpcReqNewTopicInfo
- type RpcRspJoinISR
- type RpcTestReq
- type RpcTestRsp
- type RpcTopicData
- type RpcTopicLeaderSession
- type SlaveAsyncWriteResult
- type SortableStrings
- type StatsSorter
- type TopicCatchupInfo
- type TopicChannelsInfo
- type TopicCommitLogMgr
- func (self *TopicCommitLogMgr) AppendCommitLog(l *CommitLogData, slave bool) error
- func (self *TopicCommitLogMgr) CleanOldData(fileIndex int64, fileOffset int64) error
- func (self *TopicCommitLogMgr) Close()
- func (self *TopicCommitLogMgr) ConvertToCountIndex(start int64, offset int64) (int64, error)
- func (self *TopicCommitLogMgr) ConvertToOffsetIndex(countIndex int64) (int64, int64, error)
- func (self *TopicCommitLogMgr) Delete()
- func (self *TopicCommitLogMgr) FlushCommitLogs()
- func (self *TopicCommitLogMgr) GetCommitLogFromOffsetV2(start int64, offset int64) (*CommitLogData, error)
- func (self *TopicCommitLogMgr) GetCommitLogsV2(startIndex int64, startOffset int64, num int) ([]CommitLogData, error)
- func (self *TopicCommitLogMgr) GetCurrentEnd() (int64, int64)
- func (self *TopicCommitLogMgr) GetCurrentStart() int64
- func (self *TopicCommitLogMgr) GetLastCommitLogDataOnSegment(index int64) (int64, *CommitLogData, error)
- func (self *TopicCommitLogMgr) GetLastCommitLogID() int64
- func (self *TopicCommitLogMgr) GetLastCommitLogOffsetV2() (int64, int64, *CommitLogData, error)
- func (self *TopicCommitLogMgr) GetLogStartInfo() (*LogStartInfo, *CommitLogData, error)
- func (self *TopicCommitLogMgr) IsCommitted(id int64) bool
- func (self *TopicCommitLogMgr) MoveTo(newBase string) error
- func (self *TopicCommitLogMgr) NextID() uint64
- func (self *TopicCommitLogMgr) Reopen() error
- func (self *TopicCommitLogMgr) Reset(id uint64)
- func (self *TopicCommitLogMgr) ResetLogWithStart(newStart LogStartInfo) error
- func (self *TopicCommitLogMgr) SearchLogDataByComparator(comp ICommitLogComparator) (int64, int64, *CommitLogData, error)
- func (self *TopicCommitLogMgr) SearchLogDataByMsgCnt(searchCnt int64) (int64, int64, *CommitLogData, error)
- func (self *TopicCommitLogMgr) SearchLogDataByMsgID(searchMsgID int64) (int64, int64, *CommitLogData, error)
- func (self *TopicCommitLogMgr) SearchLogDataByMsgOffset(searchMsgOffset int64) (int64, int64, *CommitLogData, error)
- func (self *TopicCommitLogMgr) TruncateToOffsetV2(startIndex int64, offset int64) (*CommitLogData, error)
- type TopicCoordStat
- type TopicCoordinator
- func (self *TopicCoordinator) DeleteNoWriteLock(removeData bool)
- func (self *TopicCoordinator) DeleteWithLock(removeData bool)
- func (self *TopicCoordinator) DisableWrite(disable bool)
- func (self *TopicCoordinator) Exiting()
- func (self TopicCoordinator) GetCopy() *coordData
- func (self *TopicCoordinator) GetData() *coordData
- func (self TopicCoordinator) GetLeader() string
- func (self TopicCoordinator) GetLeaderSession() string
- func (self TopicCoordinator) GetLeaderSessionEpoch() EpochType
- func (self TopicCoordinator) GetLeaderSessionID() string
- func (self TopicCoordinator) GetTopicEpochForWrite() EpochType
- func (self *TopicCoordinator) IsExiting() bool
- func (self TopicCoordinator) IsForceLeave() bool
- func (self TopicCoordinator) IsISRReadyForWrite(myID string) bool
- func (self TopicCoordinator) IsMineISR(id string) bool
- func (self TopicCoordinator) IsMineLeaderSessionReady(id string) bool
- func (self *TopicCoordinator) IsWriteDisabled() bool
- func (self TopicCoordinator) SetForceLeave(leave bool)
- type TopicLeaderSession
- type TopicMetaInfo
- type TopicNameInfo
- type TopicPartitionID
- type TopicPartitionMetaInfo
- type TopicPartitionReplicaInfo
- type TopicReplicasInfo
- type WatchTopicLeaderInfo
Constants ¶
View Source
const ( DEFAULT_COMMIT_BUF_SIZE = 1024 MAX_INCR_ID_BIT = 50 )
View Source
const ( ErrFailedOnNotLeader = "E_FAILED_ON_NOT_LEADER" ErrFailedOnNotWritable = "E_FAILED_ON_NOT_WRITABLE" )
View Source
const ( RATIO_BETWEEN_LEADER_FOLLOWER = 0.7 HIGHEST_PUB_QPS_LEVEL = 100 HIGHEST_LEFT_CONSUME_MB_SIZE = 50 * 1024 HIGHEST_LEFT_DATA_MB_SIZE = 200 * 1024 )
View Source
const ( EVENT_WATCH_TOPIC_L_CREATE = iota EVENT_WATCH_TOPIC_L_DELETE )
View Source
const ( MAX_WRITE_RETRY = 10 MAX_CATCHUP_RETRY = 5 MAX_LOG_PULL = 10000 MAX_LOG_PULL_BYTES = 1024 * 1024 * 32 MAX_TOPIC_RETENTION_SIZE_PER_DAY = 1024 * 1024 * 1024 * 4 MAX_CATCHUP_RUNNING = 3 )
View Source
const ( RPC_TIMEOUT = time.Duration(time.Second * 5) RPC_TIMEOUT_SHORT = time.Duration(time.Second * 3) RPC_TIMEOUT_FOR_LOOKUP = time.Duration(time.Second * 3) )
View Source
const ( MAX_PARTITION_NUM = 255 MAX_SYNC_EVERY = 5000 MAX_RETENTION_DAYS = 60 )
View Source
const ( NSQ_ROOT_DIR = "NSQMetaData" NSQ_TOPIC_DIR = "Topics" NSQ_TOPIC_META = "TopicMeta" NSQ_TOPIC_REPLICA_INFO = "ReplicaInfo" NSQ_TOPIC_LEADER_SESSION = "LeaderSession" NSQ_NODE_DIR = "NsqdNodes" NSQ_LOOKUPD_DIR = "NsqlookupdInfo" NSQ_LOOKUPD_NODE_DIR = "NsqlookupdNodes" NSQ_LOOKUPD_LEADER_SESSION = "LookupdLeaderSession" )
View Source
const (
ETCD_LOCK_NSQ_NAMESPACE = "nsq"
)
View Source
const (
ETCD_TTL = 15
)
Variables ¶
View Source
var ( ErrCommitLogWrongID = errors.New("commit log id is wrong") ErrCommitLogWrongLastID = errors.New("commit log last id should no less than log id") ErrCommitLogIDNotFound = errors.New("commit log id is not found") ErrCommitLogSearchNotFound = errors.New("search commit log data not found") ErrCommitLogOutofBound = errors.New("commit log offset is out of bound") ErrCommitLogEOF = errors.New("commit log end of file") ErrCommitLogOffsetInvalid = errors.New("commit log offset is invalid") ErrCommitLogPartitionExceed = errors.New("commit log partition id is exceeded") ErrCommitLogSearchFailed = errors.New("commit log data search failed") ErrCommitLogSegmentSizeInvalid = errors.New("commit log segment size is invalid") ErrCommitLogLessThanSegmentStart = errors.New("commit log read index is less than segment start") ErrCommitLogCleanKeepMin = errors.New("commit log clean should keep some data") )
View Source
var ( ErrTopicInfoNotFound = NewCoordErr("topic info not found", CoordClusterErr) ErrNotTopicLeader = NewCoordErrWithCode("not topic leader", CoordClusterNoRetryWriteErr, RpcErrNotTopicLeader) ErrEpochMismatch = NewCoordErrWithCode("commit epoch not match", CoordElectionTmpErr, RpcErrEpochMismatch) ErrEpochLessThanCurrent = NewCoordErrWithCode("epoch should be increased", CoordElectionErr, RpcErrEpochLessThanCurrent) ErrWriteQuorumFailed = NewCoordErrWithCode("write to quorum failed.", CoordElectionTmpErr, RpcErrWriteQuorumFailed) ErrCommitLogIDDup = NewCoordErrWithCode("commit id duplicated", CoordElectionErr, RpcErrCommitLogIDDup) ErrMissingTopicLeaderSession = NewCoordErrWithCode("missing topic leader session", CoordElectionErr, RpcErrMissingTopicLeaderSession) ErrLeaderSessionMismatch = NewCoordErrWithCode("leader session mismatch", CoordElectionTmpErr, RpcErrLeaderSessionMismatch) ErrWriteDisabled = NewCoordErrWithCode("write is disabled on the topic", CoordElectionErr, RpcErrWriteDisabled) ErrLeavingISRWait = NewCoordErrWithCode("leaving isr need wait.", CoordElectionTmpErr, RpcErrLeavingISRWait) ErrTopicCoordExistingAndMismatch = NewCoordErrWithCode("topic coordinator existing with a different partition", CoordClusterErr, RpcErrTopicCoordExistingAndMismatch) ErrTopicCoordTmpConflicted = NewCoordErrWithCode("topic coordinator is conflicted temporally", CoordClusterErr, RpcErrTopicCoordConflicted) ErrTopicLeaderChanged = NewCoordErrWithCode("topic leader changed", CoordElectionTmpErr, RpcErrTopicLeaderChanged) ErrTopicCommitLogEOF = NewCoordErrWithCode(ErrCommitLogEOF.Error(), CoordCommonErr, RpcErrCommitLogEOF) ErrTopicCommitLogOutofBound = NewCoordErrWithCode(ErrCommitLogOutofBound.Error(), CoordCommonErr, RpcErrCommitLogOutofBound) ErrTopicCommitLogLessThanSegmentStart = NewCoordErrWithCode(ErrCommitLogLessThanSegmentStart.Error(), CoordCommonErr, RpcErrCommitLogLessThanSegmentStart) ErrTopicCommitLogNotConsistent = NewCoordErrWithCode("topic commit log is not consistent", CoordClusterErr, RpcCommonErr) ErrMissingTopicCoord = NewCoordErrWithCode("missing topic coordinator", CoordClusterErr, RpcErrMissingTopicCoord) ErrTopicLoading = NewCoordErrWithCode("topic is still loading data", CoordLocalTmpErr, RpcErrTopicLoading) ErrTopicExiting = NewCoordErr("topic coordinator is exiting", CoordLocalTmpErr) ErrTopicExitingOnSlave = NewCoordErr("topic coordinator is exiting on slave", CoordTmpErr) ErrTopicCoordStateInvalid = NewCoordErrWithCode("invalid coordinator state", CoordClusterErr, RpcErrTopicCoordStateInvalid) ErrTopicSlaveInvalid = NewCoordErrWithCode("topic slave has some invalid state", CoordSlaveErr, RpcErrSlaveStateInvalid) ErrTopicLeaderSessionInvalid = NewCoordErrWithCode("topic leader session is invalid", CoordElectionTmpErr, RpcCommonErr) ErrTopicWriteOnNonISR = NewCoordErrWithCode("topic write on a node not in ISR", CoordTmpErr, RpcErrWriteOnNonISR) ErrTopicISRNotEnough = NewCoordErrWithCode("topic isr nodes not enough", CoordTmpErr, RpcCommonErr) ErrClusterChanged = NewCoordErrWithCode("cluster changed ", CoordTmpErr, RpcNoErr) ErrPubArgError = NewCoordErr("pub argument error", CoordCommonErr) ErrTopicNotRelated = NewCoordErr("topic not related to me", CoordCommonErr) ErrTopicCatchupAlreadyRunning = NewCoordErr("topic is already running catchup", CoordCommonErr) ErrTopicArgError = NewCoordErr("topic argument error", CoordCommonErr) ErrOperationExpired = NewCoordErr("operation has expired since wait too long", CoordCommonErr) ErrCatchupRunningBusy = NewCoordErr("too much running catchup", CoordCommonErr) ErrMissingTopicLog = NewCoordErr("missing topic log ", CoordLocalErr) ErrLocalTopicPartitionMismatch = NewCoordErr("local topic partition not match", CoordLocalErr) ErrLocalFallBehind = NewCoordErr("local data fall behind", CoordElectionErr) ErrLocalForwardThanLeader = NewCoordErr("local data is more than leader", CoordElectionErr) ErrLocalSetChannelOffsetNotFirstClient = NewCoordErr("failed to set channel offset since not first client", CoordLocalErr) ErrLocalMissingTopic = NewCoordErr("local topic missing", CoordLocalErr) ErrLocalNotReadyForWrite = NewCoordErr("local topic is not ready for write.", CoordLocalErr) ErrLocalInitTopicFailed = NewCoordErr("local topic init failed", CoordLocalErr) ErrLocalInitTopicCoordFailed = NewCoordErr("topic coordinator init failed", CoordLocalErr) ErrLocalTopicDataCorrupt = NewCoordErr("local topic data corrupt", CoordLocalErr) )
View Source
var ( ErrNodeIsExcludedForTopicData = errors.New("destination node is excluded for topic") ErrClusterBalanceRunning = errors.New("another balance is running, should wait") )
View Source
var ( ErrLeaderSessionAlreadyExist = errors.New("leader session already exist") ErrLeaderSessionNotExist = errors.New("session not exist") ErrKeyAlreadyExist = errors.New("Key already exist") ErrKeyNotFound = errors.New("Key not found") )
View Source
var ( ErrAlreadyExist = errors.New("already exist") ErrTopicNotCreated = errors.New("topic is not created") ErrWaitingLeaderRelease = errors.New("leader session is still alive") ErrNotNsqLookupLeader = errors.New("Not nsqlookup leader") ErrClusterUnstable = errors.New("the cluster is unstable") ErrLeaderNodeLost = NewCoordErr("leader node is lost", CoordTmpErr) ErrNodeNotFound = NewCoordErr("node not found", CoordCommonErr) ErrLeaderElectionFail = NewCoordErr("Leader election failed.", CoordElectionTmpErr) ErrNoLeaderCanBeElected = NewCoordErr("No leader can be elected", CoordElectionTmpErr) ErrJoinISRInvalid = NewCoordErr("Join ISR failed", CoordCommonErr) ErrJoinISRTimeout = NewCoordErr("Join ISR timeout", CoordCommonErr) ErrWaitingJoinISR = NewCoordErr("The topic is waiting node to join isr", CoordCommonErr) ErrLeaderSessionNotReleased = NewCoordErr("The topic leader session is not released", CoordElectionTmpErr) ErrTopicISRCatchupEnough = NewCoordErr("the topic isr and catchup nodes are enough", CoordTmpErr) ErrClusterNodeRemoving = NewCoordErr("the node is mark as removed", CoordTmpErr) ErrTopicNodeConflict = NewCoordErr("the topic node info is conflicted", CoordElectionErr) ErrLeadershipServerUnstable = NewCoordErr("the leadership server is unstable", CoordTmpErr) )
View Source
var LOGROTATE_NUM = 2000000
View Source
var MIN_KEEP_LOG_ITEM = 1000
Functions ¶
func DecodeMessagesFromRaw ¶
func GenNsqLookupNodeID ¶
func GenNsqLookupNodeID(n *NsqLookupdNodeInfo, extra string) string
func GenNsqdNodeID ¶
func GenNsqdNodeID(n *NsqdNodeInfo, extra string) string
func GetTopicPartitionBasePath ¶
func GetTopicPartitionFileName ¶
func NewNsqdCoordGRpcServer ¶
func NewNsqdCoordGRpcServer(coord *NsqdCoordinator, rootPath string) *nsqdCoordGRpcServer
func SetCoordLogger ¶
func SetCoordLogger(log levellogger.Logger, level int32)
Types ¶
type By ¶
type By func(l, r *NodeTopicStats) bool
func (By) Sort ¶
func (by By) Sort(statList []NodeTopicStats)
type CatchupStat ¶
type ChannelConsumeMgr ¶
type ChannelConsumerOffset ¶
type CntComparator ¶
type CntComparator int64
func (CntComparator) GreatThanRightBoundary ¶
func (self CntComparator) GreatThanRightBoundary(l *CommitLogData) bool
func (CntComparator) LessThanLeftBoundary ¶
func (self CntComparator) LessThanLeftBoundary(l *CommitLogData) bool
func (CntComparator) SearchEndBoundary ¶
func (self CntComparator) SearchEndBoundary() int64
type CommitLogData ¶
type CommitLogData struct { LogID int64 // epoch for the topic leader Epoch EpochType // if single message, this should be the same with logid // for multiple messages, this would be the logid for the last message in batch LastMsgLogID int64 MsgOffset int64 // size for batch messages MsgSize int32 // the total message count for all from begin, not only this batch MsgCnt int64 // the message number for current commit MsgNum int32 }
type CommonCoordErr ¶
type CommonCoordErr struct {
CoordErr
}
func (*CommonCoordErr) Error ¶
func (self *CommonCoordErr) Error() string
type ConsistentStore ¶
type CoordErr ¶
type CoordErr struct { ErrMsg string ErrCode ErrRPCRetCode ErrType CoordErrType }
note: since the gorpc will treat error type as special, we should not implement the error interface for CoordErr as response type
func NewCoordErr ¶
func NewCoordErr(msg string, etype CoordErrType) *CoordErr
func NewCoordErrWithCode ¶
func NewCoordErrWithCode(msg string, etype CoordErrType, code ErrRPCRetCode) *CoordErr
type CoordErrStats ¶
type CoordErrStats struct { sync.Mutex CoordErrStatsData }
func (*CoordErrStats) GetCopy ¶
func (self *CoordErrStats) GetCopy() *CoordErrStatsData
type CoordErrStatsData ¶
type CoordErrType ¶
type CoordErrType int
const ( CoordNoErr CoordErrType = iota CoordCommonErr CoordNetErr CoordElectionErr CoordElectionTmpErr CoordClusterErr CoordSlaveErr CoordLocalErr CoordLocalTmpErr CoordTmpErr CoordClusterNoRetryWriteErr )
type CoordStats ¶
type CoordStats struct { RpcStats *gorpc.ConnStats `json:"rpc_stats"` ErrStats CoordErrStatsData TopicCoordStats []TopicCoordStat `json:"topic_coord_stats"` }
type DataPlacement ¶
type DataPlacement struct {
// contains filtered or unexported fields
}
func NewDataPlacement ¶
func NewDataPlacement(coord *NsqLookupCoordinator) *DataPlacement
func (*DataPlacement) DoBalance ¶
func (self *DataPlacement) DoBalance(monitorChan chan struct{})
func (*DataPlacement) SetBalanceInterval ¶
func (self *DataPlacement) SetBalanceInterval(start int, end int)
type ErrRPCRetCode ¶
type ErrRPCRetCode int
const ( RpcNoErr ErrRPCRetCode = iota RpcCommonErr )
const ( RpcErrLeavingISRWait ErrRPCRetCode = iota + 10 RpcErrNotTopicLeader RpcErrNoLeader RpcErrEpochMismatch RpcErrEpochLessThanCurrent RpcErrWriteQuorumFailed RpcErrCommitLogIDDup RpcErrCommitLogEOF RpcErrCommitLogOutofBound RpcErrCommitLogLessThanSegmentStart RpcErrMissingTopicLeaderSession RpcErrLeaderSessionMismatch RpcErrWriteDisabled RpcErrTopicNotExist RpcErrMissingTopicCoord RpcErrTopicCoordExistingAndMismatch RpcErrTopicCoordConflicted RpcErrTopicLeaderChanged RpcErrTopicLoading RpcErrSlaveStateInvalid RpcErrTopicCoordStateInvalid RpcErrWriteOnNonISR )
type ICommitLogComparator ¶
type ICommitLogComparator interface { SearchEndBoundary() int64 LessThanLeftBoundary(l *CommitLogData) bool GreatThanRightBoundary(l *CommitLogData) bool }
type INsqlookupRemoteProxy ¶
type INsqlookupRemoteProxy interface { RemoteAddr() string Reconnect() error Close() RequestJoinCatchup(topic string, partition int, nid string) *CoordErr RequestJoinTopicISR(topic string, partition int, nid string) *CoordErr ReadyForTopicISR(topic string, partition int, nid string, leaderSession *TopicLeaderSession, joinISRSession string) *CoordErr RequestLeaveFromISR(topic string, partition int, nid string) *CoordErr RequestLeaveFromISRByLeader(topic string, partition int, nid string, leaderSession *TopicLeaderSession) *CoordErr RequestNotifyNewTopicInfo(topic string, partition int, nid string) RequestCheckTopicConsistence(topic string, partition int) }
type ISRStat ¶
type JoinISRState ¶
type LogStartInfo ¶
type MasterChanInfo ¶
type MasterChanInfo struct {
// contains filtered or unexported fields
}
type MsgIDComparator ¶
type MsgIDComparator int64
func (MsgIDComparator) GreatThanRightBoundary ¶
func (self MsgIDComparator) GreatThanRightBoundary(l *CommitLogData) bool
func (MsgIDComparator) LessThanLeftBoundary ¶
func (self MsgIDComparator) LessThanLeftBoundary(l *CommitLogData) bool
func (MsgIDComparator) SearchEndBoundary ¶
func (self MsgIDComparator) SearchEndBoundary() int64
type MsgOffsetComparator ¶
type MsgOffsetComparator int64
func (MsgOffsetComparator) GreatThanRightBoundary ¶
func (self MsgOffsetComparator) GreatThanRightBoundary(l *CommitLogData) bool
func (MsgOffsetComparator) LessThanLeftBoundary ¶
func (self MsgOffsetComparator) LessThanLeftBoundary(l *CommitLogData) bool
func (MsgOffsetComparator) SearchEndBoundary ¶
func (self MsgOffsetComparator) SearchEndBoundary() int64
type MsgTimestampComparator ¶
type MsgTimestampComparator struct {
// contains filtered or unexported fields
}
func (*MsgTimestampComparator) GreatThanRightBoundary ¶
func (self *MsgTimestampComparator) GreatThanRightBoundary(l *CommitLogData) bool
func (*MsgTimestampComparator) LessThanLeftBoundary ¶
func (self *MsgTimestampComparator) LessThanLeftBoundary(l *CommitLogData) bool
func (*MsgTimestampComparator) SearchEndBoundary ¶
func (self *MsgTimestampComparator) SearchEndBoundary() int64
type NSQDLeadership ¶
type NSQDLeadership interface { InitClusterID(id string) RegisterNsqd(nodeData *NsqdNodeInfo) error // update UnregisterNsqd(nodeData *NsqdNodeInfo) error // try create the topic leadership key and no need to retry if the key already exist AcquireTopicLeader(topic string, partition int, nodeData *NsqdNodeInfo, epoch EpochType) error // release the session key using the acquired session. should check current session epoch // to avoid release the changed session ReleaseTopicLeader(topic string, partition int, session *TopicLeaderSession) error // all registered lookup nodes. GetAllLookupdNodes() ([]NsqLookupdNodeInfo, error) // get the newest lookup leader and watch the change of it. WatchLookupdLeader(leader chan *NsqLookupdNodeInfo, stop chan struct{}) error GetTopicInfo(topic string, partition int) (*TopicPartitionMetaInfo, error) // get leadership information, if not exist should return ErrLeaderSessionNotExist as error GetTopicLeaderSession(topic string, partition int) (*TopicLeaderSession, error) }
type NSQLookupdLeadership ¶
type NSQLookupdLeadership interface { InitClusterID(id string) Register(value *NsqLookupdNodeInfo) error Unregister(value *NsqLookupdNodeInfo) error Stop() // the cluster root modify index GetClusterEpoch() (EpochType, error) GetAllLookupdNodes() ([]NsqLookupdNodeInfo, error) // add AcquireAndWatchLeader(leader chan *NsqLookupdNodeInfo, stop chan struct{}) CheckIfLeader(session string) bool UpdateLookupEpoch(oldGen EpochType) (EpochType, error) GetNsqdNodes() ([]NsqdNodeInfo, error) // watching the cluster nsqd node, should return the newest for the first time. WatchNsqdNodes(nsqds chan []NsqdNodeInfo, stop chan struct{}) // get all topics info, should cache the newest to improve performance. ScanTopics() ([]TopicPartitionMetaInfo, error) // should return both the meta info for topic and the replica info for topic partition // epoch should be updated while return GetTopicInfo(topic string, partition int) (*TopicPartitionMetaInfo, error) // create and write the meta info to topic meta node CreateTopic(topic string, meta *TopicMetaInfo) error // create topic partition path CreateTopicPartition(topic string, partition int) error IsExistTopic(topic string) (bool, error) IsExistTopicPartition(topic string, partition int) (bool, error) // get topic meta info only GetTopicMetaInfo(topic string) (TopicMetaInfo, EpochType, error) UpdateTopicMetaInfo(topic string, meta *TopicMetaInfo, oldGen EpochType) error DeleteTopic(topic string, partition int) error DeleteWholeTopic(topic string) error // // update the replica info about leader, isr, epoch for partition // Note: update should do check-and-set to avoid unexpected override. // the epoch in topicInfo should be updated to the new epoch // if no topic partition replica info node should create only once. UpdateTopicNodeInfo(topic string, partition int, topicInfo *TopicPartitionReplicaInfo, oldGen EpochType) error // get leadership information, if not exist should return ErrLeaderSessionNotExist as error GetTopicLeaderSession(topic string, partition int) (*TopicLeaderSession, error) // watch any leadership lock change for all topic partitions, should return the token used later by release. WatchTopicLeader(leader chan *TopicLeaderSession, stop chan struct{}) error // only leader lookup can do the release, normally notify the nsqd node do the release by itself. // lookup node should release only when the nsqd is lost ReleaseTopicLeader(topic string, partition int, session *TopicLeaderSession) error }
We need check leader lock session before do any modify to etcd. Make sure all returned value should be copied to avoid modify by outside.
type NodeTopicStats ¶
type NodeTopicStats struct { NodeID string // the data (MB) need to be consumed on the leader for all channels in the topic. ChannelDepthData map[string]int64 // the data left on disk. unit: MB TopicLeaderDataSize map[string]int64 TopicTotalDataSize map[string]int64 NodeCPUs int // the pub stat for past 24hr TopicHourlyPubDataList map[string][24]int64 ChannelNum map[string]int ChannelList map[string][]string }
func NewNodeTopicStats ¶
func NewNodeTopicStats(nid string, cap int, cpus int) *NodeTopicStats
func (*NodeTopicStats) GetMostBusyAndIdleTopicWriteLevel ¶
func (*NodeTopicStats) GetNodeAvgReadLevel ¶
func (self *NodeTopicStats) GetNodeAvgReadLevel() float64
func (*NodeTopicStats) GetNodeAvgWriteLevel ¶
func (self *NodeTopicStats) GetNodeAvgWriteLevel() float64
func (*NodeTopicStats) GetNodeLeaderLoadFactor ¶
func (self *NodeTopicStats) GetNodeLeaderLoadFactor() float64
func (*NodeTopicStats) GetNodeLoadFactor ¶
func (self *NodeTopicStats) GetNodeLoadFactor() (float64, float64)
func (*NodeTopicStats) GetNodePeakLevelList ¶
func (self *NodeTopicStats) GetNodePeakLevelList() []int64
func (*NodeTopicStats) GetSortedTopicWriteLevel ¶
func (self *NodeTopicStats) GetSortedTopicWriteLevel(leaderOnly bool) topicLFListT
func (*NodeTopicStats) GetTopicAvgWriteLevel ¶
func (self *NodeTopicStats) GetTopicAvgWriteLevel(topicFullName string) float64
func (*NodeTopicStats) GetTopicLeaderLoadFactor ¶
func (self *NodeTopicStats) GetTopicLeaderLoadFactor(topicFullName string) float64
func (*NodeTopicStats) GetTopicLoadFactor ¶
func (self *NodeTopicStats) GetTopicLoadFactor(topicFullName string) float64
func (*NodeTopicStats) GetTopicPeakLevel ¶
func (self *NodeTopicStats) GetTopicPeakLevel(topic TopicPartitionID) float64
func (*NodeTopicStats) LeaderLessLoader ¶
func (self *NodeTopicStats) LeaderLessLoader(other *NodeTopicStats) bool
func (*NodeTopicStats) SlaveLessLoader ¶
func (self *NodeTopicStats) SlaveLessLoader(other *NodeTopicStats) bool
type NsqLookupCoordRpcServer ¶
type NsqLookupCoordRpcServer struct {
// contains filtered or unexported fields
}
func NewNsqLookupCoordRpcServer ¶
func NewNsqLookupCoordRpcServer(coord *NsqLookupCoordinator) *NsqLookupCoordRpcServer
func (*NsqLookupCoordRpcServer) ReadyForTopicISR ¶
func (self *NsqLookupCoordRpcServer) ReadyForTopicISR(req *RpcReadyForISR) *CoordErr
func (*NsqLookupCoordRpcServer) RequestCheckTopicConsistence ¶
func (self *NsqLookupCoordRpcServer) RequestCheckTopicConsistence(req *RpcReqCheckTopic) *CoordErr
func (*NsqLookupCoordRpcServer) RequestJoinCatchup ¶
func (self *NsqLookupCoordRpcServer) RequestJoinCatchup(req *RpcReqJoinCatchup) *CoordErr
func (*NsqLookupCoordRpcServer) RequestJoinTopicISR ¶
func (self *NsqLookupCoordRpcServer) RequestJoinTopicISR(req *RpcReqJoinISR) *CoordErr
func (*NsqLookupCoordRpcServer) RequestLeaveFromISR ¶
func (self *NsqLookupCoordRpcServer) RequestLeaveFromISR(req *RpcReqLeaveFromISR) *CoordErr
func (*NsqLookupCoordRpcServer) RequestLeaveFromISRByLeader ¶
func (self *NsqLookupCoordRpcServer) RequestLeaveFromISRByLeader(req *RpcReqLeaveFromISRByLeader) *CoordErr
func (*NsqLookupCoordRpcServer) RequestNotifyNewTopicInfo ¶
func (self *NsqLookupCoordRpcServer) RequestNotifyNewTopicInfo(req *RpcReqNewTopicInfo) *CoordErr
type NsqLookupCoordinator ¶
type NsqLookupCoordinator struct {
// contains filtered or unexported fields
}
func NewNsqLookupCoordinator ¶
func NewNsqLookupCoordinator(cluster string, n *NsqLookupdNodeInfo, opts *Options) *NsqLookupCoordinator
func (*NsqLookupCoordinator) ChangeTopicMetaParam ¶
func (*NsqLookupCoordinator) CreateTopic ¶
func (self *NsqLookupCoordinator) CreateTopic(topic string, meta TopicMetaInfo) error
func (*NsqLookupCoordinator) DeleteTopic ¶
func (self *NsqLookupCoordinator) DeleteTopic(topic string, partition string) error
func (*NsqLookupCoordinator) DeleteTopicForce ¶
func (self *NsqLookupCoordinator) DeleteTopicForce(topic string, partition string) error
func (*NsqLookupCoordinator) ExpandTopicPartition ¶
func (self *NsqLookupCoordinator) ExpandTopicPartition(topic string, newPartitionNum int) error
func (*NsqLookupCoordinator) GetAllLookupdNodes ¶
func (self *NsqLookupCoordinator) GetAllLookupdNodes() ([]NsqLookupdNodeInfo, error)
func (*NsqLookupCoordinator) GetClusterNodeLoadFactor ¶
func (self *NsqLookupCoordinator) GetClusterNodeLoadFactor() (map[string]float64, map[string]float64)
func (*NsqLookupCoordinator) GetLookupLeader ¶
func (self *NsqLookupCoordinator) GetLookupLeader() NsqLookupdNodeInfo
func (*NsqLookupCoordinator) GetTopicLeaderNodes ¶
func (self *NsqLookupCoordinator) GetTopicLeaderNodes(topicName string) (map[string]string, error)
func (*NsqLookupCoordinator) GetTopicMetaInfo ¶
func (self *NsqLookupCoordinator) GetTopicMetaInfo(topicName string) (TopicMetaInfo, error)
func (*NsqLookupCoordinator) IsClusterStable ¶
func (self *NsqLookupCoordinator) IsClusterStable() bool
func (*NsqLookupCoordinator) IsMineLeader ¶
func (self *NsqLookupCoordinator) IsMineLeader() bool
func (*NsqLookupCoordinator) IsTopicLeader ¶
func (self *NsqLookupCoordinator) IsTopicLeader(topic string, part int, nid string) bool
func (*NsqLookupCoordinator) MarkNodeAsRemoving ¶
func (self *NsqLookupCoordinator) MarkNodeAsRemoving(nid string) error
func (*NsqLookupCoordinator) MoveTopicPartitionDataByManual ¶
func (*NsqLookupCoordinator) SetClusterUpgradeState ¶
func (self *NsqLookupCoordinator) SetClusterUpgradeState(upgrading bool) error
func (*NsqLookupCoordinator) SetLeadershipMgr ¶
func (self *NsqLookupCoordinator) SetLeadershipMgr(l NSQLookupdLeadership)
func (*NsqLookupCoordinator) Start ¶
func (self *NsqLookupCoordinator) Start() error
init and register to leader server
func (*NsqLookupCoordinator) Stop ¶
func (self *NsqLookupCoordinator) Stop()
type NsqLookupRpcClient ¶
func (*NsqLookupRpcClient) CallWithRetry ¶
func (self *NsqLookupRpcClient) CallWithRetry(method string, arg interface{}) (interface{}, error)
func (*NsqLookupRpcClient) Close ¶
func (self *NsqLookupRpcClient) Close()
func (*NsqLookupRpcClient) ReadyForTopicISR ¶
func (self *NsqLookupRpcClient) ReadyForTopicISR(topic string, partition int, nid string, leaderSession *TopicLeaderSession, joinISRSession string) *CoordErr
func (*NsqLookupRpcClient) Reconnect ¶
func (self *NsqLookupRpcClient) Reconnect() error
func (*NsqLookupRpcClient) RemoteAddr ¶
func (self *NsqLookupRpcClient) RemoteAddr() string
func (*NsqLookupRpcClient) RequestCheckTopicConsistence ¶
func (self *NsqLookupRpcClient) RequestCheckTopicConsistence(topic string, partition int)
func (*NsqLookupRpcClient) RequestJoinCatchup ¶
func (self *NsqLookupRpcClient) RequestJoinCatchup(topic string, partition int, nid string) *CoordErr
func (*NsqLookupRpcClient) RequestJoinTopicISR ¶
func (self *NsqLookupRpcClient) RequestJoinTopicISR(topic string, partition int, nid string) *CoordErr
func (*NsqLookupRpcClient) RequestLeaveFromISR ¶
func (self *NsqLookupRpcClient) RequestLeaveFromISR(topic string, partition int, nid string) *CoordErr
func (*NsqLookupRpcClient) RequestLeaveFromISRByLeader ¶
func (self *NsqLookupRpcClient) RequestLeaveFromISRByLeader(topic string, partition int, nid string, leaderSession *TopicLeaderSession) *CoordErr
func (*NsqLookupRpcClient) RequestNotifyNewTopicInfo ¶
func (self *NsqLookupRpcClient) RequestNotifyNewTopicInfo(topic string, partition int, nid string)
func (*NsqLookupRpcClient) ShouldRemoved ¶
func (self *NsqLookupRpcClient) ShouldRemoved() bool
type NsqLookupdEtcdMgr ¶
type NsqLookupdEtcdMgr struct {
// contains filtered or unexported fields
}
func NewNsqLookupdEtcdMgr ¶
func NewNsqLookupdEtcdMgr(host string) *NsqLookupdEtcdMgr
func (*NsqLookupdEtcdMgr) AcquireAndWatchLeader ¶
func (self *NsqLookupdEtcdMgr) AcquireAndWatchLeader(leader chan *NsqLookupdNodeInfo, stop chan struct{})
func (*NsqLookupdEtcdMgr) CheckIfLeader ¶
func (self *NsqLookupdEtcdMgr) CheckIfLeader(session string) bool
func (*NsqLookupdEtcdMgr) CreateTopic ¶
func (self *NsqLookupdEtcdMgr) CreateTopic(topic string, meta *TopicMetaInfo) error
func (*NsqLookupdEtcdMgr) CreateTopicPartition ¶
func (self *NsqLookupdEtcdMgr) CreateTopicPartition(topic string, partition int) error
func (*NsqLookupdEtcdMgr) DeleteTopic ¶
func (self *NsqLookupdEtcdMgr) DeleteTopic(topic string, partition int) error
func (*NsqLookupdEtcdMgr) DeleteWholeTopic ¶
func (self *NsqLookupdEtcdMgr) DeleteWholeTopic(topic string) error
func (*NsqLookupdEtcdMgr) GetAllLookupdNodes ¶
func (self *NsqLookupdEtcdMgr) GetAllLookupdNodes() ([]NsqLookupdNodeInfo, error)
func (*NsqLookupdEtcdMgr) GetClusterEpoch ¶
func (self *NsqLookupdEtcdMgr) GetClusterEpoch() (EpochType, error)
func (*NsqLookupdEtcdMgr) GetNsqdNodes ¶
func (self *NsqLookupdEtcdMgr) GetNsqdNodes() ([]NsqdNodeInfo, error)
func (*NsqLookupdEtcdMgr) GetTopicInfo ¶
func (self *NsqLookupdEtcdMgr) GetTopicInfo(topic string, partition int) (*TopicPartitionMetaInfo, error)
func (*NsqLookupdEtcdMgr) GetTopicLeaderSession ¶
func (self *NsqLookupdEtcdMgr) GetTopicLeaderSession(topic string, partition int) (*TopicLeaderSession, error)
func (*NsqLookupdEtcdMgr) GetTopicMetaInfo ¶
func (self *NsqLookupdEtcdMgr) GetTopicMetaInfo(topic string) (TopicMetaInfo, EpochType, error)
func (*NsqLookupdEtcdMgr) InitClusterID ¶
func (self *NsqLookupdEtcdMgr) InitClusterID(id string)
func (*NsqLookupdEtcdMgr) IsExistTopic ¶
func (self *NsqLookupdEtcdMgr) IsExistTopic(topic string) (bool, error)
func (*NsqLookupdEtcdMgr) IsExistTopicPartition ¶
func (self *NsqLookupdEtcdMgr) IsExistTopicPartition(topic string, partitionNum int) (bool, error)
func (*NsqLookupdEtcdMgr) Register ¶
func (self *NsqLookupdEtcdMgr) Register(value *NsqLookupdNodeInfo) error
func (*NsqLookupdEtcdMgr) ReleaseTopicLeader ¶
func (self *NsqLookupdEtcdMgr) ReleaseTopicLeader(topic string, partition int, session *TopicLeaderSession) error
func (*NsqLookupdEtcdMgr) ScanTopics ¶
func (self *NsqLookupdEtcdMgr) ScanTopics() ([]TopicPartitionMetaInfo, error)
func (*NsqLookupdEtcdMgr) Stop ¶
func (self *NsqLookupdEtcdMgr) Stop()
func (*NsqLookupdEtcdMgr) Unregister ¶
func (self *NsqLookupdEtcdMgr) Unregister(value *NsqLookupdNodeInfo) error
func (*NsqLookupdEtcdMgr) UpdateLookupEpoch ¶
func (self *NsqLookupdEtcdMgr) UpdateLookupEpoch(oldGen EpochType) (EpochType, error)
func (*NsqLookupdEtcdMgr) UpdateTopicMetaInfo ¶
func (self *NsqLookupdEtcdMgr) UpdateTopicMetaInfo(topic string, meta *TopicMetaInfo, oldGen EpochType) error
func (*NsqLookupdEtcdMgr) UpdateTopicNodeInfo ¶
func (self *NsqLookupdEtcdMgr) UpdateTopicNodeInfo(topic string, partition int, topicInfo *TopicPartitionReplicaInfo, oldGen EpochType) error
func (*NsqLookupdEtcdMgr) WatchNsqdNodes ¶
func (self *NsqLookupdEtcdMgr) WatchNsqdNodes(nsqds chan []NsqdNodeInfo, stop chan struct{})
func (*NsqLookupdEtcdMgr) WatchTopicLeader ¶
func (self *NsqLookupdEtcdMgr) WatchTopicLeader(leader chan *TopicLeaderSession, stop chan struct{}) error
maybe use: go WatchTopicLeader()...
type NsqLookupdNodeInfo ¶
type NsqLookupdNodeInfo struct { ID string NodeIP string TcpPort string HttpPort string RpcPort string Epoch EpochType }
func (*NsqLookupdNodeInfo) GetID ¶
func (self *NsqLookupdNodeInfo) GetID() string
type NsqdCoordRpcServer ¶
type NsqdCoordRpcServer struct {
// contains filtered or unexported fields
}
func NewNsqdCoordRpcServer ¶
func NewNsqdCoordRpcServer(coord *NsqdCoordinator, rootPath string) *NsqdCoordRpcServer
func (*NsqdCoordRpcServer) DeleteChannel ¶
func (self *NsqdCoordRpcServer) DeleteChannel(info *RpcChannelOffsetArg) *CoordErr
func (*NsqdCoordRpcServer) DeleteNsqdTopic ¶
func (self *NsqdCoordRpcServer) DeleteNsqdTopic(rpcTopicReq *RpcAdminTopicInfo) *CoordErr
func (*NsqdCoordRpcServer) DisableTopicWrite ¶
func (self *NsqdCoordRpcServer) DisableTopicWrite(rpcTopicReq *RpcAdminTopicInfo) *CoordErr
func (*NsqdCoordRpcServer) EnableTopicWrite ¶
func (self *NsqdCoordRpcServer) EnableTopicWrite(rpcTopicReq *RpcAdminTopicInfo) *CoordErr
func (*NsqdCoordRpcServer) GetCommitLogFromOffset ¶
func (self *NsqdCoordRpcServer) GetCommitLogFromOffset(req *RpcCommitLogReq) *RpcCommitLogRsp
return the logdata from offset, if the offset is larger than local, then return the last logdata on local.
func (*NsqdCoordRpcServer) GetFullSyncInfo ¶
func (self *NsqdCoordRpcServer) GetFullSyncInfo(req *RpcGetFullSyncInfoReq) (*RpcGetFullSyncInfoRsp, error)
func (*NsqdCoordRpcServer) GetLastCommitLogID ¶
func (self *NsqdCoordRpcServer) GetLastCommitLogID(req *RpcCommitLogReq) (int64, error)
func (*NsqdCoordRpcServer) GetTopicStats ¶
func (self *NsqdCoordRpcServer) GetTopicStats(topic string) *NodeTopicStats
func (*NsqdCoordRpcServer) IsTopicWriteDisabled ¶
func (self *NsqdCoordRpcServer) IsTopicWriteDisabled(rpcTopicReq *RpcAdminTopicInfo) bool
func (*NsqdCoordRpcServer) NotifyAcquireTopicLeader ¶
func (self *NsqdCoordRpcServer) NotifyAcquireTopicLeader(rpcTopicReq *RpcAcquireTopicLeaderReq) *CoordErr
func (*NsqdCoordRpcServer) NotifyReleaseTopicLeader ¶
func (self *NsqdCoordRpcServer) NotifyReleaseTopicLeader(rpcTopicReq *RpcReleaseTopicLeaderReq) *CoordErr
func (*NsqdCoordRpcServer) NotifyTopicLeaderSession ¶
func (self *NsqdCoordRpcServer) NotifyTopicLeaderSession(rpcTopicReq *RpcTopicLeaderSession) *CoordErr
func (*NsqdCoordRpcServer) PullCommitLogsAndData ¶
func (self *NsqdCoordRpcServer) PullCommitLogsAndData(req *RpcPullCommitLogsReq) (*RpcPullCommitLogsRsp, error)
func (*NsqdCoordRpcServer) PutMessage ¶
func (self *NsqdCoordRpcServer) PutMessage(info *RpcPutMessage) *CoordErr
receive from leader
func (*NsqdCoordRpcServer) PutMessages ¶
func (self *NsqdCoordRpcServer) PutMessages(info *RpcPutMessages) *CoordErr
func (*NsqdCoordRpcServer) TestRpcCoordErr ¶
func (self *NsqdCoordRpcServer) TestRpcCoordErr(req *RpcTestReq) *CoordErr
func (*NsqdCoordRpcServer) TestRpcError ¶
func (self *NsqdCoordRpcServer) TestRpcError(req *RpcTestReq) *RpcTestRsp
func (*NsqdCoordRpcServer) TestRpcTimeout ¶
func (self *NsqdCoordRpcServer) TestRpcTimeout() error
func (*NsqdCoordRpcServer) TriggerLookupChanged ¶
func (self *NsqdCoordRpcServer) TriggerLookupChanged() error
func (*NsqdCoordRpcServer) UpdateChannelOffset ¶
func (self *NsqdCoordRpcServer) UpdateChannelOffset(info *RpcChannelOffsetArg) *CoordErr
func (*NsqdCoordRpcServer) UpdateTopicInfo ¶
func (self *NsqdCoordRpcServer) UpdateTopicInfo(rpcTopicReq *RpcAdminTopicInfo) *CoordErr
type NsqdCoordinator ¶
type NsqdCoordinator struct {
// contains filtered or unexported fields
}
func NewNsqdCoordinator ¶
func NewNsqdCoordinator(cluster, ip, tcpport, rpcport, extraID string, rootPath string, nsqd *nsqd.NSQD) *NsqdCoordinator
func (*NsqdCoordinator) DeleteChannel ¶
func (self *NsqdCoordinator) DeleteChannel(topic *nsqd.Topic, channelName string) error
func (*NsqdCoordinator) FinishMessageToCluster ¶
func (*NsqdCoordinator) GetAllLookupdNodes ¶
func (self *NsqdCoordinator) GetAllLookupdNodes() ([]NsqLookupdNodeInfo, error)
func (*NsqdCoordinator) GetCurrentLookupd ¶
func (self *NsqdCoordinator) GetCurrentLookupd() NsqLookupdNodeInfo
func (*NsqdCoordinator) GetMasterTopicCoordData ¶
func (self *NsqdCoordinator) GetMasterTopicCoordData(topic string) (int, *coordData, error)
since only one master is allowed on the same topic, we can get it.
func (*NsqdCoordinator) GetMyID ¶
func (self *NsqdCoordinator) GetMyID() string
func (*NsqdCoordinator) IsMineLeaderForTopic ¶
func (self *NsqdCoordinator) IsMineLeaderForTopic(topic string, part int) bool
func (*NsqdCoordinator) PutMessageToCluster ¶
func (self *NsqdCoordinator) PutMessageToCluster(topic *nsqd.Topic, body []byte, traceID uint64) (nsqd.MessageID, nsqd.BackendOffset, int32, nsqd.BackendQueueEnd, error)
func (*NsqdCoordinator) PutMessagesToCluster ¶
func (*NsqdCoordinator) SearchLogByMsgCnt ¶
func (self *NsqdCoordinator) SearchLogByMsgCnt(topic string, part int, count int64) (*CommitLogData, int64, int64, error)
func (*NsqdCoordinator) SearchLogByMsgID ¶
func (self *NsqdCoordinator) SearchLogByMsgID(topic string, part int, msgID int64) (*CommitLogData, int64, int64, error)
func (*NsqdCoordinator) SearchLogByMsgOffset ¶
func (self *NsqdCoordinator) SearchLogByMsgOffset(topic string, part int, offset int64) (*CommitLogData, int64, int64, error)
func (*NsqdCoordinator) SearchLogByMsgTimestamp ¶
func (self *NsqdCoordinator) SearchLogByMsgTimestamp(topic string, part int, ts_sec int64) (*CommitLogData, int64, int64, error)
return the searched log data and the exact offset the reader should be reset and the total count before the offset.
func (*NsqdCoordinator) SetChannelConsumeOffsetToCluster ¶
func (*NsqdCoordinator) SetLeadershipMgr ¶
func (self *NsqdCoordinator) SetLeadershipMgr(l NSQDLeadership)
func (*NsqdCoordinator) Start ¶
func (self *NsqdCoordinator) Start() error
func (*NsqdCoordinator) Stats ¶
func (self *NsqdCoordinator) Stats(topic string, part int) *CoordStats
func (*NsqdCoordinator) Stop ¶
func (self *NsqdCoordinator) Stop()
type NsqdEtcdMgr ¶
func NewNsqdEtcdMgr ¶
func NewNsqdEtcdMgr(host string) *NsqdEtcdMgr
func (*NsqdEtcdMgr) AcquireTopicLeader ¶
func (self *NsqdEtcdMgr) AcquireTopicLeader(topic string, partition int, nodeData *NsqdNodeInfo, epoch EpochType) error
func (*NsqdEtcdMgr) GetAllLookupdNodes ¶
func (self *NsqdEtcdMgr) GetAllLookupdNodes() ([]NsqLookupdNodeInfo, error)
func (*NsqdEtcdMgr) GetTopicInfo ¶
func (self *NsqdEtcdMgr) GetTopicInfo(topic string, partition int) (*TopicPartitionMetaInfo, error)
func (*NsqdEtcdMgr) GetTopicLeaderSession ¶
func (self *NsqdEtcdMgr) GetTopicLeaderSession(topic string, partition int) (*TopicLeaderSession, error)
func (*NsqdEtcdMgr) InitClusterID ¶
func (self *NsqdEtcdMgr) InitClusterID(id string)
func (*NsqdEtcdMgr) RegisterNsqd ¶
func (self *NsqdEtcdMgr) RegisterNsqd(nodeData *NsqdNodeInfo) error
func (*NsqdEtcdMgr) ReleaseTopicLeader ¶
func (self *NsqdEtcdMgr) ReleaseTopicLeader(topic string, partition int, session *TopicLeaderSession) error
func (*NsqdEtcdMgr) UnregisterNsqd ¶
func (self *NsqdEtcdMgr) UnregisterNsqd(nodeData *NsqdNodeInfo) error
func (*NsqdEtcdMgr) WatchLookupdLeader ¶
func (self *NsqdEtcdMgr) WatchLookupdLeader(leader chan *NsqLookupdNodeInfo, stop chan struct{}) error
type NsqdNodeInfo ¶
func (*NsqdNodeInfo) GetID ¶
func (self *NsqdNodeInfo) GetID() string
type NsqdNodeLoadFactor ¶
type NsqdNodeLoadFactor struct {
// contains filtered or unexported fields
}
type NsqdRpcClient ¶
func NewNsqdRpcClient ¶
func NewNsqdRpcClient(addr string, timeout time.Duration) (*NsqdRpcClient, error)
func (*NsqdRpcClient) CallFast ¶
func (self *NsqdRpcClient) CallFast(method string, arg interface{}) (interface{}, error)
func (*NsqdRpcClient) CallRpcTest ¶
func (self *NsqdRpcClient) CallRpcTest(data string) (string, *CoordErr)
func (*NsqdRpcClient) CallRpcTestCoordErr ¶
func (self *NsqdRpcClient) CallRpcTestCoordErr(data string) *CoordErr
func (*NsqdRpcClient) CallRpcTesttimeout ¶
func (self *NsqdRpcClient) CallRpcTesttimeout(data string) error
func (*NsqdRpcClient) CallWithRetry ¶
func (self *NsqdRpcClient) CallWithRetry(method string, arg interface{}) (interface{}, error)
func (*NsqdRpcClient) Close ¶
func (self *NsqdRpcClient) Close()
func (*NsqdRpcClient) DeleteChannel ¶
func (self *NsqdRpcClient) DeleteChannel(leaderSession *TopicLeaderSession, info *TopicPartitionMetaInfo, channel string) *CoordErr
func (*NsqdRpcClient) DeleteNsqdTopic ¶
func (self *NsqdRpcClient) DeleteNsqdTopic(epoch EpochType, topicInfo *TopicPartitionMetaInfo) *CoordErr
func (*NsqdRpcClient) DisableTopicWrite ¶
func (self *NsqdRpcClient) DisableTopicWrite(epoch EpochType, topicInfo *TopicPartitionMetaInfo) *CoordErr
func (*NsqdRpcClient) DisableTopicWriteFast ¶
func (self *NsqdRpcClient) DisableTopicWriteFast(epoch EpochType, topicInfo *TopicPartitionMetaInfo) *CoordErr
func (*NsqdRpcClient) EnableTopicWrite ¶
func (self *NsqdRpcClient) EnableTopicWrite(epoch EpochType, topicInfo *TopicPartitionMetaInfo) *CoordErr
func (*NsqdRpcClient) GetCommitLogFromOffset ¶
func (self *NsqdRpcClient) GetCommitLogFromOffset(topicInfo *TopicPartitionMetaInfo, logCountNumIndex int64, logIndex int64, offset int64) (bool, int64, int64, int64, CommitLogData, *CoordErr)
func (*NsqdRpcClient) GetFullSyncInfo ¶
func (self *NsqdRpcClient) GetFullSyncInfo(topic string, partition int) (*LogStartInfo, *CommitLogData, error)
func (*NsqdRpcClient) GetLastCommitLogID ¶
func (self *NsqdRpcClient) GetLastCommitLogID(topicInfo *TopicPartitionMetaInfo) (int64, *CoordErr)
func (*NsqdRpcClient) GetTopicStats ¶
func (self *NsqdRpcClient) GetTopicStats(topic string) (*NodeTopicStats, error)
func (*NsqdRpcClient) IsTopicWriteDisabled ¶
func (self *NsqdRpcClient) IsTopicWriteDisabled(topicInfo *TopicPartitionMetaInfo) bool
func (*NsqdRpcClient) NotifyAcquireTopicLeader ¶
func (self *NsqdRpcClient) NotifyAcquireTopicLeader(epoch EpochType, topicInfo *TopicPartitionMetaInfo) *CoordErr
func (*NsqdRpcClient) NotifyReleaseTopicLeader ¶
func (self *NsqdRpcClient) NotifyReleaseTopicLeader(epoch EpochType, topicInfo *TopicPartitionMetaInfo, leaderSessionEpoch EpochType) *CoordErr
func (*NsqdRpcClient) NotifyTopicLeaderSession ¶
func (self *NsqdRpcClient) NotifyTopicLeaderSession(epoch EpochType, topicInfo *TopicPartitionMetaInfo, leaderSession *TopicLeaderSession, joinSession string) *CoordErr
func (*NsqdRpcClient) NotifyUpdateChannelOffset ¶
func (self *NsqdRpcClient) NotifyUpdateChannelOffset(leaderSession *TopicLeaderSession, info *TopicPartitionMetaInfo, channel string, offset ChannelConsumerOffset) *CoordErr
func (*NsqdRpcClient) PullCommitLogsAndData ¶
func (self *NsqdRpcClient) PullCommitLogsAndData(topic string, partition int, logCountNumIndex int64, logIndex int64, startOffset int64, num int) ([]CommitLogData, [][]byte, error)
func (*NsqdRpcClient) PutMessage ¶
func (self *NsqdRpcClient) PutMessage(leaderSession *TopicLeaderSession, info *TopicPartitionMetaInfo, log CommitLogData, message *nsqd.Message) *CoordErr
func (*NsqdRpcClient) PutMessages ¶
func (self *NsqdRpcClient) PutMessages(leaderSession *TopicLeaderSession, info *TopicPartitionMetaInfo, log CommitLogData, messages []*nsqd.Message) *CoordErr
func (*NsqdRpcClient) Reconnect ¶
func (self *NsqdRpcClient) Reconnect() error
func (*NsqdRpcClient) ShouldRemoved ¶
func (self *NsqdRpcClient) ShouldRemoved() bool
func (*NsqdRpcClient) TriggerLookupChanged ¶
func (self *NsqdRpcClient) TriggerLookupChanged() error
func (*NsqdRpcClient) UpdateChannelOffset ¶
func (self *NsqdRpcClient) UpdateChannelOffset(leaderSession *TopicLeaderSession, info *TopicPartitionMetaInfo, channel string, offset ChannelConsumerOffset) *CoordErr
func (*NsqdRpcClient) UpdateTopicInfo ¶
func (self *NsqdRpcClient) UpdateTopicInfo(epoch EpochType, topicInfo *TopicPartitionMetaInfo) *CoordErr
type RpcAcquireTopicLeaderReq ¶
type RpcAcquireTopicLeaderReq struct { RpcTopicData LeaderNodeID string LookupdEpoch EpochType }
type RpcAdminTopicInfo ¶
type RpcAdminTopicInfo struct { TopicPartitionMetaInfo LookupdEpoch EpochType DisableWrite bool }
type RpcChannelOffsetArg ¶
type RpcChannelOffsetArg struct { RpcTopicData Channel string // position file + file offset ChannelOffset ChannelConsumerOffset }
type RpcCommitLogReq ¶
type RpcCommitLogReq struct { RpcTopicData LogOffset int64 LogStartIndex int64 LogCountNumIndex int64 UseCountIndex bool }
type RpcCommitLogRsp ¶
type RpcFailedInfo ¶
type RpcFailedInfo struct {
// contains filtered or unexported fields
}
type RpcGetFullSyncInfoReq ¶
type RpcGetFullSyncInfoReq struct {
RpcTopicData
}
type RpcGetFullSyncInfoRsp ¶
type RpcGetFullSyncInfoRsp struct { FirstLogData CommitLogData StartInfo LogStartInfo }
type RpcLookupReqBase ¶
type RpcPullCommitLogsReq ¶
type RpcPullCommitLogsRsp ¶
type RpcPullCommitLogsRsp struct { Logs []CommitLogData DataList [][]byte }
type RpcPutMessage ¶
type RpcPutMessage struct { RpcTopicData LogData CommitLogData TopicMessage *nsqd.Message }
type RpcPutMessages ¶
type RpcPutMessages struct { RpcTopicData LogData CommitLogData TopicMessages []*nsqd.Message }
type RpcReadyForISR ¶
type RpcReadyForISR struct { RpcLookupReqBase LeaderSession TopicLeaderSession JoinISRSession string }
type RpcReleaseTopicLeaderReq ¶
type RpcReleaseTopicLeaderReq struct { RpcTopicData LeaderNodeID string LookupdEpoch EpochType }
type RpcReqCheckTopic ¶
type RpcReqCheckTopic struct {
RpcLookupReqBase
}
type RpcReqJoinCatchup ¶
type RpcReqJoinCatchup struct {
RpcLookupReqBase
}
type RpcReqJoinISR ¶
type RpcReqJoinISR struct {
RpcLookupReqBase
}
type RpcReqLeaveFromISR ¶
type RpcReqLeaveFromISR struct {
RpcLookupReqBase
}
type RpcReqLeaveFromISRByLeader ¶
type RpcReqLeaveFromISRByLeader struct { RpcLookupReqBase LeaderSession TopicLeaderSession }
type RpcReqNewTopicInfo ¶
type RpcReqNewTopicInfo struct {
RpcLookupReqBase
}
type RpcTopicData ¶
type RpcTopicLeaderSession ¶
type RpcTopicLeaderSession struct { RpcTopicData LeaderNode NsqdNodeInfo LookupdEpoch EpochType JoinSession string }
type SlaveAsyncWriteResult ¶
type SlaveAsyncWriteResult struct {
// contains filtered or unexported fields
}
func (*SlaveAsyncWriteResult) GetResult ¶
func (self *SlaveAsyncWriteResult) GetResult() *CoordErr
func (*SlaveAsyncWriteResult) Wait ¶
func (self *SlaveAsyncWriteResult) Wait()
type SortableStrings ¶
type SortableStrings []string
func (SortableStrings) Len ¶
func (s SortableStrings) Len() int
func (SortableStrings) Less ¶
func (s SortableStrings) Less(l, r int) bool
func (SortableStrings) Swap ¶
func (s SortableStrings) Swap(l, r int)
type StatsSorter ¶
type StatsSorter struct {
// contains filtered or unexported fields
}
func (*StatsSorter) Len ¶
func (s *StatsSorter) Len() int
func (*StatsSorter) Less ¶
func (s *StatsSorter) Less(i, j int) bool
func (*StatsSorter) Swap ¶
func (s *StatsSorter) Swap(i, j int)
type TopicCommitLogMgr ¶
func InitTopicCommitLogMgr ¶
func (*TopicCommitLogMgr) AppendCommitLog ¶
func (self *TopicCommitLogMgr) AppendCommitLog(l *CommitLogData, slave bool) error
func (*TopicCommitLogMgr) CleanOldData ¶
func (self *TopicCommitLogMgr) CleanOldData(fileIndex int64, fileOffset int64) error
func (*TopicCommitLogMgr) Close ¶
func (self *TopicCommitLogMgr) Close()
func (*TopicCommitLogMgr) ConvertToCountIndex ¶
func (self *TopicCommitLogMgr) ConvertToCountIndex(start int64, offset int64) (int64, error)
func (*TopicCommitLogMgr) ConvertToOffsetIndex ¶
func (self *TopicCommitLogMgr) ConvertToOffsetIndex(countIndex int64) (int64, int64, error)
func (*TopicCommitLogMgr) Delete ¶
func (self *TopicCommitLogMgr) Delete()
func (*TopicCommitLogMgr) FlushCommitLogs ¶
func (self *TopicCommitLogMgr) FlushCommitLogs()
func (*TopicCommitLogMgr) GetCommitLogFromOffsetV2 ¶
func (self *TopicCommitLogMgr) GetCommitLogFromOffsetV2(start int64, offset int64) (*CommitLogData, error)
func (*TopicCommitLogMgr) GetCommitLogsV2 ¶
func (self *TopicCommitLogMgr) GetCommitLogsV2(startIndex int64, startOffset int64, num int) ([]CommitLogData, error)
func (*TopicCommitLogMgr) GetCurrentEnd ¶
func (self *TopicCommitLogMgr) GetCurrentEnd() (int64, int64)
func (*TopicCommitLogMgr) GetCurrentStart ¶
func (self *TopicCommitLogMgr) GetCurrentStart() int64
func (*TopicCommitLogMgr) GetLastCommitLogDataOnSegment ¶
func (self *TopicCommitLogMgr) GetLastCommitLogDataOnSegment(index int64) (int64, *CommitLogData, error)
func (*TopicCommitLogMgr) GetLastCommitLogID ¶
func (self *TopicCommitLogMgr) GetLastCommitLogID() int64
func (*TopicCommitLogMgr) GetLastCommitLogOffsetV2 ¶
func (self *TopicCommitLogMgr) GetLastCommitLogOffsetV2() (int64, int64, *CommitLogData, error)
this will get the log data of last commit
func (*TopicCommitLogMgr) GetLogStartInfo ¶
func (self *TopicCommitLogMgr) GetLogStartInfo() (*LogStartInfo, *CommitLogData, error)
func (*TopicCommitLogMgr) IsCommitted ¶
func (self *TopicCommitLogMgr) IsCommitted(id int64) bool
func (*TopicCommitLogMgr) MoveTo ¶
func (self *TopicCommitLogMgr) MoveTo(newBase string) error
func (*TopicCommitLogMgr) NextID ¶
func (self *TopicCommitLogMgr) NextID() uint64
func (*TopicCommitLogMgr) Reopen ¶
func (self *TopicCommitLogMgr) Reopen() error
func (*TopicCommitLogMgr) ResetLogWithStart ¶
func (self *TopicCommitLogMgr) ResetLogWithStart(newStart LogStartInfo) error
func (*TopicCommitLogMgr) SearchLogDataByComparator ¶
func (self *TopicCommitLogMgr) SearchLogDataByComparator(comp ICommitLogComparator) (int64, int64, *CommitLogData, error)
func (*TopicCommitLogMgr) SearchLogDataByMsgCnt ¶
func (self *TopicCommitLogMgr) SearchLogDataByMsgCnt(searchCnt int64) (int64, int64, *CommitLogData, error)
func (*TopicCommitLogMgr) SearchLogDataByMsgID ¶
func (self *TopicCommitLogMgr) SearchLogDataByMsgID(searchMsgID int64) (int64, int64, *CommitLogData, error)
func (*TopicCommitLogMgr) SearchLogDataByMsgOffset ¶
func (self *TopicCommitLogMgr) SearchLogDataByMsgOffset(searchMsgOffset int64) (int64, int64, *CommitLogData, error)
func (*TopicCommitLogMgr) TruncateToOffsetV2 ¶
func (self *TopicCommitLogMgr) TruncateToOffsetV2(startIndex int64, offset int64) (*CommitLogData, error)
type TopicCoordStat ¶
type TopicCoordStat struct { Node string `json:"node"` Name string `json:"name"` Partition int `json:"partition"` ISRStats []ISRStat `json:"isr_stats"` CatchupStats []CatchupStat `json:"catchup_stats"` }
type TopicCoordinator ¶
type TopicCoordinator struct {
// contains filtered or unexported fields
}
func NewTopicCoordinator ¶
func (*TopicCoordinator) DeleteNoWriteLock ¶
func (self *TopicCoordinator) DeleteNoWriteLock(removeData bool)
func (*TopicCoordinator) DeleteWithLock ¶
func (self *TopicCoordinator) DeleteWithLock(removeData bool)
func (*TopicCoordinator) DisableWrite ¶
func (self *TopicCoordinator) DisableWrite(disable bool)
func (*TopicCoordinator) Exiting ¶
func (self *TopicCoordinator) Exiting()
func (TopicCoordinator) GetCopy ¶
func (self TopicCoordinator) GetCopy() *coordData
func (*TopicCoordinator) GetData ¶
func (self *TopicCoordinator) GetData() *coordData
func (TopicCoordinator) GetLeaderSessionEpoch ¶
func (self TopicCoordinator) GetLeaderSessionEpoch() EpochType
func (TopicCoordinator) GetLeaderSessionID ¶
func (self TopicCoordinator) GetLeaderSessionID() string
func (TopicCoordinator) GetTopicEpochForWrite ¶
func (self TopicCoordinator) GetTopicEpochForWrite() EpochType
func (*TopicCoordinator) IsExiting ¶
func (self *TopicCoordinator) IsExiting() bool
func (TopicCoordinator) IsISRReadyForWrite ¶
func (TopicCoordinator) IsMineLeaderSessionReady ¶
func (*TopicCoordinator) IsWriteDisabled ¶
func (self *TopicCoordinator) IsWriteDisabled() bool
type TopicLeaderSession ¶
type TopicLeaderSession struct { Topic string Partition int LeaderNode *NsqdNodeInfo Session string LeaderEpoch EpochType }
func (*TopicLeaderSession) IsSame ¶
func (self *TopicLeaderSession) IsSame(other *TopicLeaderSession) bool
type TopicMetaInfo ¶
type TopicMetaInfo struct { PartitionNum int Replica int // the suggest load factor for each topic partition. SuggestLF int // other options SyncEvery int // to verify the data of the create -> delete -> create with same topic MagicCode int64 // the retention days for the data RetentionDay int32 // used for ordered multi partition topic // allow multi partitions on the same node OrderedMulti bool }
type TopicPartitionID ¶
func (*TopicPartitionID) String ¶
func (self *TopicPartitionID) String() string
type TopicPartitionMetaInfo ¶
type TopicPartitionMetaInfo struct { Name string Partition int TopicMetaInfo TopicPartitionReplicaInfo }
func (*TopicPartitionMetaInfo) GetTopicDesp ¶
func (self *TopicPartitionMetaInfo) GetTopicDesp() string
type TopicPartitionReplicaInfo ¶
type WatchTopicLeaderInfo ¶
type WatchTopicLeaderInfo struct {
// contains filtered or unexported fields
}
Source Files ¶
- commitlog.go
- common.go
- coord_grpc_server.go
- coordinator_rpc.go
- coordinator_stats.go
- data_placement_mgr.go
- leadership.go
- logger.go
- nsq_lookupd_etcd.go
- nsqd_coordinator.go
- nsqd_coordinator_cluster_write.go
- nsqd_coordinator_rpc_helper.go
- nsqd_node_etcd.go
- nsqd_rpc_client.go
- nsqlookup_coord_api.go
- nsqlookup_coord_rpc.go
- nsqlookup_coord_rpc_helper.go
- nsqlookup_coordinator.go
- nsqlookup_rpc_client.go
- struct.go
- topic_coordinator.go
- utils.go
Click to show internal directories.
Click to hide internal directories.