Documentation ¶
Overview ¶
description: Utility to perform master election/failover using etcd.
Index ¶
- Constants
- Variables
- func AddCounter(name string) uint32
- func CheckKeyIfExist(err error) bool
- func DisableClusterWrite(disableType int)
- func ExtractRpcAddrFromID(nid string) string
- func FilterList(l []string, filter []string) []string
- func FindSlice(in []string, e string) int
- func FindSliceList(inList [][]string, e string) int
- func GenNsqLookupNodeID(n *NsqLookupdNodeInfo, extra string) string
- func GenNsqdNodeID(n *NsqdNodeInfo, extra string) string
- func GetLogDataSize() int
- func GetNextLogOffset(cur int64) int64
- func GetPartitionFromMsgID(id int64) int
- func GetPrevLogOffset(cur int64) int64
- func GetTopicFullName(n string, pid int) string
- func GetTopicPartitionBasePath(rootPath string, topic string, partition int) string
- func GetTopicPartitionFileName(topic string, partition int, suffix string) string
- func GetTopicPartitionLogPath(basepath, t string, p int) string
- func IncCounter(id uint32)
- func IncCounterBy(id uint32, amount uint64)
- func IsAllClusterWriteDisabled() bool
- func IsClusterWriteDisabledForOrdered() bool
- func IsEtcdNodeExist(err error) bool
- func IsEtcdNotFile(err error) bool
- func IsEtcdNotReachable(err error) bool
- func IsEtcdWatchExpired(err error) bool
- func MergeList(l []string, r []string) []string
- func NewNsqdCoordGRpcServer(coord *NsqdCoordinator, rootPath string) *nsqdCoordGRpcServer
- func RetryWithTimeout(fn func() error) error
- func SetCoordLogLevel(level int32)
- func SetCoordLogger(log levellogger.Logger, level int32)
- type By
- type CatchupStat
- type ChannelConsumeMgr
- type ChannelConsumerOffset
- type CntComparator
- type CommitLogData
- type CommonCoordErr
- type ConsistentStore
- type CoordErr
- func (self *CoordErr) CanRetryWrite(retryTimes int) bool
- func (self *CoordErr) HasError() bool
- func (self *CoordErr) IsEqual(other *CoordErr) bool
- func (self *CoordErr) IsLocalErr() bool
- func (self *CoordErr) IsNetErr() bool
- func (self *CoordErr) String() string
- func (self *CoordErr) ToErrorType() error
- type CoordErrStats
- type CoordErrStatsData
- type CoordErrType
- type CoordStats
- type DataPlacement
- type EVENT_TYPE
- type EpochType
- type ErrRPCRetCode
- type EtcdClient
- func (self *EtcdClient) CompareAndDelete(key string, prevValue string, prevIndex uint64) (*client.Response, error)
- func (self *EtcdClient) CompareAndSwap(key string, value string, ttl uint64, prevValue string, prevIndex uint64) (*client.Response, error)
- func (self *EtcdClient) Create(key string, value string, ttl uint64) (*client.Response, error)
- func (self *EtcdClient) CreateDir(key string, ttl uint64) (*client.Response, error)
- func (self *EtcdClient) CreateInOrder(dir string, value string, ttl uint64) (*client.Response, error)
- func (self *EtcdClient) Delete(key string, recursive bool) (*client.Response, error)
- func (self *EtcdClient) Get(key string, sort, recursive bool) (*client.Response, error)
- func (self *EtcdClient) GetNewest(key string, sort, recursive bool) (*client.Response, error)
- func (self *EtcdClient) Set(key string, value string, ttl uint64) (*client.Response, error)
- func (self *EtcdClient) SetWithTTL(key string, ttl uint64) (*client.Response, error)
- func (self *EtcdClient) Update(key string, value string, ttl uint64) (*client.Response, error)
- func (self *EtcdClient) Watch(key string, waitIndex uint64, recursive bool) client.Watcher
- type EtcdLock
- type ICommitLogComparator
- type ILocalLogQueue
- type INsqlookupRemoteProxy
- type ISRStat
- type IntHeap
- type JoinISRState
- type LFListT
- type LogStartInfo
- type Master
- type MasterChanInfo
- type MasterEvent
- type MsgIDComparator
- type MsgOffsetComparator
- type MsgTimestampComparator
- type NSQDLeadership
- type NSQLookupdLeadership
- type NodeTopicStats
- func (nts *NodeTopicStats) GetMostBusyAndIdleTopicWriteLevel(leaderOnly bool) (string, string, float64, float64)
- func (nts *NodeTopicStats) GetNodeAvgReadLevel() float64
- func (nts *NodeTopicStats) GetNodeAvgWriteLevel() float64
- func (nts *NodeTopicStats) GetNodeLeaderLoadFactor() float64
- func (nts *NodeTopicStats) GetNodeLoadFactor() (float64, float64)
- func (nts *NodeTopicStats) GetNodePeakLevelList() []int64
- func (nts *NodeTopicStats) GetSortedTopicWriteLevel(leaderOnly bool) LFListT
- func (nts *NodeTopicStats) GetTopicAvgWriteLevel(topicFullName string) float64
- func (nts *NodeTopicStats) GetTopicLeaderLoadFactor(topicFullName string) float64
- func (nts *NodeTopicStats) GetTopicLoadFactor(topicFullName string) float64
- func (nts *NodeTopicStats) GetTopicPeakLevel(topic TopicPartitionID) float64
- func (nts *NodeTopicStats) LeaderLessLoader(other *NodeTopicStats) bool
- func (nts *NodeTopicStats) SlaveLessLoader(other *NodeTopicStats) bool
- type NsqLookupCoordRpcServer
- func (nlcoord *NsqLookupCoordRpcServer) ReadyForTopicISR(req *RpcReadyForISR) *CoordErr
- func (nlcoord *NsqLookupCoordRpcServer) RequestCheckTopicConsistence(req *RpcReqCheckTopic) *CoordErr
- func (nlcoord *NsqLookupCoordRpcServer) RequestJoinCatchup(req *RpcReqJoinCatchup) *CoordErr
- func (nlcoord *NsqLookupCoordRpcServer) RequestJoinTopicISR(req *RpcReqJoinISR) *CoordErr
- func (nlcoord *NsqLookupCoordRpcServer) RequestLeaveFromISR(req *RpcReqLeaveFromISR) *CoordErr
- func (nlcoord *NsqLookupCoordRpcServer) RequestLeaveFromISRByLeader(req *RpcReqLeaveFromISRByLeader) *CoordErr
- func (nlcoord *NsqLookupCoordRpcServer) RequestNotifyNewTopicInfo(req *RpcReqNewTopicInfo) *CoordErr
- type NsqLookupCoordinator
- func (nlcoord *NsqLookupCoordinator) ChangeTopicMetaParam(topic string, newSyncEvery int, newRetentionDay int, newReplicator int, ...) error
- func (nlcoord *NsqLookupCoordinator) CreateTopic(topic string, meta TopicMetaInfo) error
- func (nlcoord *NsqLookupCoordinator) DeleteTopic(topic string, partition string) error
- func (nlcoord *NsqLookupCoordinator) DeleteTopicForce(topic string, partition string) error
- func (nlcoord *NsqLookupCoordinator) ExpandTopicPartition(topic string, newPartitionNum int) error
- func (nlcoord *NsqLookupCoordinator) GetAllLookupdNodes() ([]NsqLookupdNodeInfo, error)
- func (nlcoord *NsqLookupCoordinator) GetClusterNodeLoadFactor() (map[string]float64, map[string]float64)
- func (nlcoord *NsqLookupCoordinator) GetClusterTopNTopics(n int) LFListT
- func (nlcoord *NsqLookupCoordinator) GetLookupLeader() NsqLookupdNodeInfo
- func (nlcoord *NsqLookupCoordinator) GetTopicLeaderNodes(topicName string) (map[string]string, error)
- func (nlcoord *NsqLookupCoordinator) GetTopicMetaInfo(topicName string) (TopicMetaInfo, error)
- func (nlcoord *NsqLookupCoordinator) GetTopicsMetaInfoMap(topics []string) (map[string]TopicMetaInfo, error)
- func (nlcoord *NsqLookupCoordinator) IsClusterStable() bool
- func (nlcoord *NsqLookupCoordinator) IsMineLeader() bool
- func (nlcoord *NsqLookupCoordinator) IsTopicLeader(topic string, part int, nid string) bool
- func (nlcoord *NsqLookupCoordinator) MarkNodeAsRemoving(nid string) error
- func (nlcoord *NsqLookupCoordinator) MoveTopicPartitionDataByManual(topicName string, partitionID int, moveLeader bool, fromNode string, ...) error
- func (nlcoord *NsqLookupCoordinator) SetClusterUpgradeState(upgrading bool) error
- func (nlcoord *NsqLookupCoordinator) SetLeadershipMgr(l NSQLookupdLeadership)
- func (nlcoord *NsqLookupCoordinator) SetTopNBalance(enable bool) error
- func (nlcoord *NsqLookupCoordinator) Start() error
- func (nlcoord *NsqLookupCoordinator) Stop()
- type NsqLookupRpcClient
- func (nlrpc *NsqLookupRpcClient) CallFast(method string, arg interface{}) (interface{}, error)
- func (nlrpc *NsqLookupRpcClient) CallWithRetry(method string, arg interface{}) (interface{}, error)
- func (nlrpc *NsqLookupRpcClient) Close()
- func (nlrpc *NsqLookupRpcClient) ReadyForTopicISR(topic string, partition int, nid string, leaderSession *TopicLeaderSession, ...) *CoordErr
- func (nlrpc *NsqLookupRpcClient) Reconnect() error
- func (nlrpc *NsqLookupRpcClient) RemoteAddr() string
- func (nlrpc *NsqLookupRpcClient) RequestCheckTopicConsistence(topic string, partition int)
- func (nlrpc *NsqLookupRpcClient) RequestJoinCatchup(topic string, partition int, nid string) *CoordErr
- func (nlrpc *NsqLookupRpcClient) RequestJoinTopicISR(topic string, partition int, nid string) *CoordErr
- func (nlrpc *NsqLookupRpcClient) RequestLeaveFromISR(topic string, partition int, nid string) *CoordErr
- func (nlrpc *NsqLookupRpcClient) RequestLeaveFromISRByLeader(topic string, partition int, nid string, leaderSession *TopicLeaderSession) *CoordErr
- func (nlrpc *NsqLookupRpcClient) RequestLeaveFromISRFast(topic string, partition int, nid string) *CoordErr
- func (nlrpc *NsqLookupRpcClient) RequestNotifyNewTopicInfo(topic string, partition int, nid string)
- func (nlrpc *NsqLookupRpcClient) ShouldRemoved() bool
- type NsqLookupdEtcdMgr
- func (self *NsqLookupdEtcdMgr) AcquireAndWatchLeader(leader chan *NsqLookupdNodeInfo, stop chan struct{})
- func (self *NsqLookupdEtcdMgr) CheckIfLeader(session string) bool
- func (self *NsqLookupdEtcdMgr) CreateTopic(topic string, meta *TopicMetaInfo) error
- func (self *NsqLookupdEtcdMgr) CreateTopicPartition(topic string, partition int) error
- func (self *NsqLookupdEtcdMgr) DeleteTopic(topic string, partition int) error
- func (self *NsqLookupdEtcdMgr) DeleteWholeTopic(topic string) error
- func (self *NsqLookupdEtcdMgr) GetAllLookupdNodes() ([]NsqLookupdNodeInfo, error)
- func (self *NsqLookupdEtcdMgr) GetAllTopicMetas() (map[string]TopicMetaInfo, error)
- func (self *NsqLookupdEtcdMgr) GetClusterEpoch() (EpochType, error)
- func (self *NsqLookupdEtcdMgr) GetNsqdNodes() ([]NsqdNodeInfo, error)
- func (self *NsqLookupdEtcdMgr) GetTopicInfo(topic string, partition int) (*TopicPartitionMetaInfo, error)
- func (self *NsqLookupdEtcdMgr) GetTopicLeaderSession(topic string, partition int) (*TopicLeaderSession, error)
- func (self *NsqLookupdEtcdMgr) GetTopicMetaInfo(topic string) (TopicMetaInfo, EpochType, error)
- func (self *NsqLookupdEtcdMgr) GetTopicMetaInfoTryCache(topic string) (TopicMetaInfo, error)
- func (self *NsqLookupdEtcdMgr) GetTopicsMetaInfoMap(topics []string) (map[string]TopicMetaInfo, error)
- func (self *NsqLookupdEtcdMgr) InitClusterID(id string)
- func (self *NsqLookupdEtcdMgr) IsExistTopic(topic string) (bool, error)
- func (self *NsqLookupdEtcdMgr) IsExistTopicPartition(topic string, partitionNum int) (bool, error)
- func (self *NsqLookupdEtcdMgr) Register(value *NsqLookupdNodeInfo) error
- func (self *NsqLookupdEtcdMgr) ReleaseTopicLeader(topic string, partition int, session *TopicLeaderSession) error
- func (self *NsqLookupdEtcdMgr) ScanTopics() ([]TopicPartitionMetaInfo, error)
- func (self *NsqLookupdEtcdMgr) Stop()
- func (self *NsqLookupdEtcdMgr) Unregister(value *NsqLookupdNodeInfo) error
- func (self *NsqLookupdEtcdMgr) UpdateLookupEpoch(oldGen EpochType) (EpochType, error)
- func (self *NsqLookupdEtcdMgr) UpdateTopicMetaInfo(topic string, meta *TopicMetaInfo, oldGen EpochType) error
- func (self *NsqLookupdEtcdMgr) UpdateTopicNodeInfo(topic string, partition int, topicInfo *TopicPartitionReplicaInfo, ...) error
- func (self *NsqLookupdEtcdMgr) WatchNsqdNodes(nsqds chan []NsqdNodeInfo, stop chan struct{})
- type NsqLookupdNodeInfo
- type NsqdCoordRpcServer
- func (self *NsqdCoordRpcServer) DeleteChannel(info *RpcChannelOffsetArg) *CoordErr
- func (self *NsqdCoordRpcServer) DeleteNsqdTopic(rpcTopicReq *RpcAdminTopicInfo) *CoordErr
- func (self *NsqdCoordRpcServer) DisableTopicWrite(rpcTopicReq *RpcAdminTopicInfo) *CoordErr
- func (self *NsqdCoordRpcServer) EnableTopicWrite(rpcTopicReq *RpcAdminTopicInfo) *CoordErr
- func (self *NsqdCoordRpcServer) GetBackupedDelayedQueue(req *RpcGetBackupedDQReq) (*RpcGetBackupedDQRsp, error)
- func (self *NsqdCoordRpcServer) GetCommitLogFromOffset(req *RpcCommitLogReq) *RpcCommitLogRsp
- func (self *NsqdCoordRpcServer) GetDelayedQueueCommitLogFromOffset(req *RpcCommitLogReq) *RpcCommitLogRsp
- func (self *NsqdCoordRpcServer) GetDelayedQueueFullSyncInfo(req *RpcGetFullSyncInfoReq) (*RpcGetFullSyncInfoRsp, error)
- func (self *NsqdCoordRpcServer) GetFullSyncInfo(req *RpcGetFullSyncInfoReq) (*RpcGetFullSyncInfoRsp, error)
- func (self *NsqdCoordRpcServer) GetLastCommitLogID(req *RpcCommitLogReq) (int64, error)
- func (self *NsqdCoordRpcServer) GetLastDelayedQueueCommitLogID(req *RpcCommitLogReq) (int64, error)
- func (self *NsqdCoordRpcServer) GetNodeInfo(req *RpcNodeInfoReq) (*RpcNodeInfoRsp, error)
- func (self *NsqdCoordRpcServer) GetTopicStats(topic string) *NodeTopicStats
- func (self *NsqdCoordRpcServer) IsTopicWriteDisabled(rpcTopicReq *RpcAdminTopicInfo) bool
- func (self *NsqdCoordRpcServer) NotifyAcquireTopicLeader(rpcTopicReq *RpcAcquireTopicLeaderReq) *CoordErr
- func (self *NsqdCoordRpcServer) NotifyReleaseTopicLeader(rpcTopicReq *RpcReleaseTopicLeaderReq) *CoordErr
- func (self *NsqdCoordRpcServer) NotifyTopicLeaderSession(rpcTopicReq *RpcTopicLeaderSession) *CoordErr
- func (self *NsqdCoordRpcServer) PullCommitLogsAndData(req *RpcPullCommitLogsReq) (*RpcPullCommitLogsRsp, error)
- func (self *NsqdCoordRpcServer) PullDelayedQueueCommitLogsAndData(req *RpcPullCommitLogsReq) (*RpcPullCommitLogsRsp, error)
- func (self *NsqdCoordRpcServer) PutDelayedMessage(info *RpcPutMessage) *CoordErr
- func (self *NsqdCoordRpcServer) PutMessage(info *RpcPutMessage) *CoordErr
- func (self *NsqdCoordRpcServer) PutMessages(info *RpcPutMessages) *CoordErr
- func (self *NsqdCoordRpcServer) TestRpcCoordErr(req *RpcTestReq) *CoordErr
- func (self *NsqdCoordRpcServer) TestRpcError(req *RpcTestReq) *RpcTestRsp
- func (self *NsqdCoordRpcServer) TestRpcTimeout() error
- func (self *NsqdCoordRpcServer) TriggerLookupChanged() error
- func (self *NsqdCoordRpcServer) UpdateChannelList(info *RpcChannelListArg) *CoordErr
- func (self *NsqdCoordRpcServer) UpdateChannelOffset(info *RpcChannelOffsetArg) *CoordErr
- func (self *NsqdCoordRpcServer) UpdateChannelState(state *RpcChannelState) *CoordErr
- func (self *NsqdCoordRpcServer) UpdateDelayedQueueState(info *RpcConfirmedDelayedCursor) *CoordErr
- func (self *NsqdCoordRpcServer) UpdateTopicInfo(rpcTopicReq *RpcAdminTopicInfo) *CoordErr
- type NsqdCoordinator
- func (ncoord *NsqdCoordinator) DeleteChannel(topic *nsqd.Topic, channelName string) error
- func (ncoord *NsqdCoordinator) EmptyChannelDelayedStateToCluster(channel *nsqd.Channel) error
- func (ncoord *NsqdCoordinator) FinishMessageToCluster(channel *nsqd.Channel, clientID int64, clientAddr string, msgID nsqd.MessageID) error
- func (ncoord *NsqdCoordinator) GetAllLookupdNodes() ([]NsqLookupdNodeInfo, error)
- func (ncoord *NsqdCoordinator) GetCurrentLookupd() NsqLookupdNodeInfo
- func (ncoord *NsqdCoordinator) GetMasterTopicCoordData(topic string) (int, *coordData, error)
- func (ncoord *NsqdCoordinator) GetMyID() string
- func (ncoord *NsqdCoordinator) GreedyCleanTopicOldData(localTopic *nsqd.Topic) error
- func (ncoord *NsqdCoordinator) IsMineConsumeLeaderForTopic(topic string, part int) bool
- func (ncoord *NsqdCoordinator) IsMineLeaderForTopic(topic string, part int) bool
- func (ncoord *NsqdCoordinator) PutDelayedMessageToCluster(topic *nsqd.Topic, msg *nsqd.Message) (nsqd.MessageID, nsqd.BackendOffset, int32, nsqd.BackendQueueEnd, error)
- func (ncoord *NsqdCoordinator) PutMessageBodyToCluster(topic *nsqd.Topic, body []byte, traceID uint64) (nsqd.MessageID, nsqd.BackendOffset, int32, nsqd.BackendQueueEnd, error)
- func (ncoord *NsqdCoordinator) PutMessageToCluster(topic *nsqd.Topic, msg *nsqd.Message) (nsqd.MessageID, nsqd.BackendOffset, int32, nsqd.BackendQueueEnd, error)
- func (ncoord *NsqdCoordinator) PutMessagesToCluster(topic *nsqd.Topic, msgs []*nsqd.Message) (nsqd.MessageID, nsqd.BackendOffset, int32, error)
- func (ncoord *NsqdCoordinator) SearchLogByMsgCnt(topic string, part int, count int64) (*CommitLogData, int64, int64, error)
- func (ncoord *NsqdCoordinator) SearchLogByMsgID(topic string, part int, msgID int64) (*CommitLogData, int64, int64, error)
- func (ncoord *NsqdCoordinator) SearchLogByMsgOffset(topic string, part int, offset int64) (*CommitLogData, int64, int64, error)
- func (ncoord *NsqdCoordinator) SearchLogByMsgTimestamp(topic string, part int, ts_sec int64) (*CommitLogData, int64, int64, error)
- func (ncoord *NsqdCoordinator) SetChannelConsumeOffsetToCluster(ch *nsqd.Channel, queueOffset int64, cnt int64, force bool) error
- func (ncoord *NsqdCoordinator) SetLeadershipMgr(l NSQDLeadership)
- func (ncoord *NsqdCoordinator) Start() error
- func (ncoord *NsqdCoordinator) Stats(topic string, part int) *CoordStats
- func (ncoord *NsqdCoordinator) Stop()
- func (ncoord *NsqdCoordinator) SyncTopicChannels(topicName string, part int) error
- func (ncoord *NsqdCoordinator) TryFixLocalTopic(topic string, pid int) error
- func (ncoord *NsqdCoordinator) UpdateChannelStateToCluster(channel *nsqd.Channel, paused int, skipped int, zanTestSkipped int) error
- type NsqdEtcdMgr
- func (nem *NsqdEtcdMgr) AcquireTopicLeader(topic string, partition int, nodeData *NsqdNodeInfo, epoch EpochType) error
- func (nem *NsqdEtcdMgr) GetAllLookupdNodes() ([]NsqLookupdNodeInfo, error)
- func (nem *NsqdEtcdMgr) GetTopicInfo(topic string, partition int) (*TopicPartitionMetaInfo, error)
- func (nem *NsqdEtcdMgr) GetTopicLeaderSession(topic string, partition int) (*TopicLeaderSession, error)
- func (nem *NsqdEtcdMgr) InitClusterID(id string)
- func (nem *NsqdEtcdMgr) IsTopicRealDeleted(topic string) (bool, error)
- func (nem *NsqdEtcdMgr) RegisterNsqd(nodeData *NsqdNodeInfo) error
- func (nem *NsqdEtcdMgr) ReleaseTopicLeader(topic string, partition int, session *TopicLeaderSession) error
- func (nem *NsqdEtcdMgr) UnregisterNsqd(nodeData *NsqdNodeInfo) error
- func (nem *NsqdEtcdMgr) WatchLookupdLeader(leader chan *NsqLookupdNodeInfo, stop chan struct{}) error
- type NsqdNodeInfo
- type NsqdNodeLoadFactor
- type NsqdRpcClient
- func (nrpc *NsqdRpcClient) CallFast(method string, arg interface{}) (interface{}, error)
- func (nrpc *NsqdRpcClient) CallRpcTest(data string) (string, *CoordErr)
- func (nrpc *NsqdRpcClient) CallRpcTestCoordErr(data string) *CoordErr
- func (nrpc *NsqdRpcClient) CallRpcTesttimeout(data string) error
- func (nrpc *NsqdRpcClient) CallWithRetry(method string, arg interface{}) (interface{}, error)
- func (nrpc *NsqdRpcClient) Close()
- func (nrpc *NsqdRpcClient) DeleteChannel(leaderSession *TopicLeaderSession, info *TopicPartitionMetaInfo, ...) *CoordErr
- func (nrpc *NsqdRpcClient) DeleteNsqdTopic(epoch EpochType, topicInfo *TopicPartitionMetaInfo) *CoordErr
- func (nrpc *NsqdRpcClient) DisableTopicWrite(epoch EpochType, topicInfo *TopicPartitionMetaInfo) *CoordErr
- func (nrpc *NsqdRpcClient) DisableTopicWriteFast(epoch EpochType, topicInfo *TopicPartitionMetaInfo) *CoordErr
- func (nrpc *NsqdRpcClient) EnableTopicWrite(epoch EpochType, topicInfo *TopicPartitionMetaInfo) *CoordErr
- func (nrpc *NsqdRpcClient) GetBackupedDelayedQueue(topic string, partition int) (io.Reader, error)
- func (nrpc *NsqdRpcClient) GetCommitLogFromOffset(topicInfo *TopicPartitionMetaInfo, logCountNumIndex int64, logIndex int64, ...) (bool, int64, int64, int64, CommitLogData, *CoordErr)
- func (nrpc *NsqdRpcClient) GetFullSyncInfo(topic string, partition int, fromDelayed bool) (*LogStartInfo, *CommitLogData, error)
- func (nrpc *NsqdRpcClient) GetLastCommitLogID(topicInfo *TopicPartitionMetaInfo) (int64, *CoordErr)
- func (nrpc *NsqdRpcClient) GetLastDelayedQueueCommitLogID(topicInfo *TopicPartitionMetaInfo) (int64, *CoordErr)
- func (nrpc *NsqdRpcClient) GetNodeInfo(nid string) (*NsqdNodeInfo, error)
- func (nrpc *NsqdRpcClient) GetTopicStats(topic string) (*NodeTopicStats, error)
- func (nrpc *NsqdRpcClient) IsTopicWriteDisabled(topicInfo *TopicPartitionMetaInfo) bool
- func (nrpc *NsqdRpcClient) NotifyAcquireTopicLeader(epoch EpochType, topicInfo *TopicPartitionMetaInfo) *CoordErr
- func (nrpc *NsqdRpcClient) NotifyChannelList(leaderSession *TopicLeaderSession, info *TopicPartitionMetaInfo, ...) *CoordErr
- func (nrpc *NsqdRpcClient) NotifyReleaseTopicLeader(epoch EpochType, topicInfo *TopicPartitionMetaInfo, ...) *CoordErr
- func (nrpc *NsqdRpcClient) NotifyTopicLeaderSession(epoch EpochType, topicInfo *TopicPartitionMetaInfo, ...) *CoordErr
- func (nrpc *NsqdRpcClient) NotifyUpdateChannelOffset(leaderSession *TopicLeaderSession, info *TopicPartitionMetaInfo, ...) *CoordErr
- func (nrpc *NsqdRpcClient) PullCommitLogsAndData(topic string, partition int, logCountNumIndex int64, logIndex int64, ...) ([]CommitLogData, [][]byte, error)
- func (nrpc *NsqdRpcClient) PutDelayedMessage(leaderSession *TopicLeaderSession, info *TopicPartitionMetaInfo, ...) *CoordErr
- func (nrpc *NsqdRpcClient) PutMessage(leaderSession *TopicLeaderSession, info *TopicPartitionMetaInfo, ...) *CoordErr
- func (nrpc *NsqdRpcClient) PutMessages(leaderSession *TopicLeaderSession, info *TopicPartitionMetaInfo, ...) *CoordErr
- func (nrpc *NsqdRpcClient) Reconnect() error
- func (nrpc *NsqdRpcClient) ShouldRemoved() bool
- func (nrpc *NsqdRpcClient) TriggerLookupChanged() error
- func (nrpc *NsqdRpcClient) UpdateChannelList(leaderSession *TopicLeaderSession, info *TopicPartitionMetaInfo, ...) *CoordErr
- func (nrpc *NsqdRpcClient) UpdateChannelOffset(leaderSession *TopicLeaderSession, info *TopicPartitionMetaInfo, ...) *CoordErr
- func (nrpc *NsqdRpcClient) UpdateChannelState(leaderSession *TopicLeaderSession, info *TopicPartitionMetaInfo, ...) *CoordErr
- func (nrpc *NsqdRpcClient) UpdateDelayedQueueState(leaderSession *TopicLeaderSession, info *TopicPartitionMetaInfo, ch string, ...) *CoordErr
- func (nrpc *NsqdRpcClient) UpdateTopicInfo(epoch EpochType, topicInfo *TopicPartitionMetaInfo) *CoordErr
- type Options
- type RpcAcquireTopicLeaderReq
- type RpcAdminTopicInfo
- type RpcChannelListArg
- type RpcChannelOffsetArg
- type RpcChannelState
- type RpcCommitLogReq
- type RpcCommitLogRsp
- type RpcConfirmedDelayedCursor
- type RpcFailedInfo
- type RpcGetBackupedDQReq
- type RpcGetBackupedDQRsp
- type RpcGetFullSyncInfoReq
- type RpcGetFullSyncInfoRsp
- type RpcLookupReqBase
- type RpcNodeInfoReq
- type RpcNodeInfoRsp
- type RpcPullCommitLogsReq
- type RpcPullCommitLogsRsp
- type RpcPutMessage
- type RpcPutMessages
- type RpcReadyForISR
- type RpcReleaseTopicLeaderReq
- type RpcReqCheckTopic
- type RpcReqJoinCatchup
- type RpcReqJoinISR
- type RpcReqLeaveFromISR
- type RpcReqLeaveFromISRByLeader
- type RpcReqNewTopicInfo
- type RpcRspJoinISR
- type RpcTestReq
- type RpcTestRsp
- type RpcTopicData
- type RpcTopicLeaderSession
- type SlaveAsyncWriteResult
- type SortableStrings
- type StatsSorter
- type TopicCatchupInfo
- type TopicChannelsInfo
- type TopicCommitLogMgr
- func (self *TopicCommitLogMgr) AppendCommitLog(l *CommitLogData, slave bool) error
- func (self *TopicCommitLogMgr) AppendCommitLogWithSync(l *CommitLogData, slave bool, useFsync bool) error
- func (self *TopicCommitLogMgr) CleanOldData(fileIndex int64, fileOffset int64) error
- func (self *TopicCommitLogMgr) Close()
- func (self *TopicCommitLogMgr) ConvertToCountIndex(start int64, offset int64) (int64, error)
- func (self *TopicCommitLogMgr) ConvertToOffsetIndex(countIndex int64) (int64, int64, error)
- func (self *TopicCommitLogMgr) Delete()
- func (self *TopicCommitLogMgr) FlushCommitLogs()
- func (self *TopicCommitLogMgr) GetCommitLogFromOffsetV2(start int64, offset int64) (*CommitLogData, error)
- func (self *TopicCommitLogMgr) GetCommitLogsV2(startIndex int64, startOffset int64, num int) ([]CommitLogData, error)
- func (self *TopicCommitLogMgr) GetCurrentEnd() (int64, int64)
- func (self *TopicCommitLogMgr) GetCurrentStart() int64
- func (self *TopicCommitLogMgr) GetLastCommitLogDataOnSegment(index int64) (int64, *CommitLogData, error)
- func (self *TopicCommitLogMgr) GetLastCommitLogID() int64
- func (self *TopicCommitLogMgr) GetLastCommitLogOffsetV2() (int64, int64, *CommitLogData, error)
- func (self *TopicCommitLogMgr) GetLogStartInfo() (*LogStartInfo, *CommitLogData, error)
- func (self *TopicCommitLogMgr) IsCommitted(id int64) bool
- func (self *TopicCommitLogMgr) MoveTo(newBase string) error
- func (self *TopicCommitLogMgr) NextID() uint64
- func (self *TopicCommitLogMgr) Reopen() error
- func (self *TopicCommitLogMgr) Reset(id uint64)
- func (self *TopicCommitLogMgr) ResetLogWithStart(newStart LogStartInfo) error
- func (self *TopicCommitLogMgr) SearchLogDataByComparator(comp ICommitLogComparator) (int64, int64, *CommitLogData, error)
- func (self *TopicCommitLogMgr) SearchLogDataByMsgCnt(searchCnt int64) (int64, int64, *CommitLogData, error)
- func (self *TopicCommitLogMgr) SearchLogDataByMsgID(searchMsgID int64) (int64, int64, *CommitLogData, error)
- func (self *TopicCommitLogMgr) SearchLogDataByMsgOffset(searchMsgOffset int64) (int64, int64, *CommitLogData, error)
- func (self *TopicCommitLogMgr) TruncateToOffsetV2(startIndex int64, offset int64) (*CommitLogData, error)
- type TopicCoordStat
- type TopicCoordinator
- func (tc *TopicCoordinator) DeleteNoWriteLock(removeData bool)
- func (tc *TopicCoordinator) DeleteWithLock(removeData bool)
- func (tc *TopicCoordinator) DisableWrite(disable bool)
- func (tc *TopicCoordinator) Exiting()
- func (cd TopicCoordinator) GetCopy() *coordData
- func (tc *TopicCoordinator) GetData() *coordData
- func (tc *TopicCoordinator) GetDelayedQueueLogMgr() (*TopicCommitLogMgr, error)
- func (cd TopicCoordinator) GetLeader() string
- func (cd TopicCoordinator) GetLeaderSession() string
- func (cd TopicCoordinator) GetLeaderSessionEpoch() EpochType
- func (cd TopicCoordinator) GetLeaderSessionID() string
- func (cd TopicCoordinator) GetTopicEpochForWrite() EpochType
- func (tc *TopicCoordinator) IsExiting() bool
- func (cd TopicCoordinator) IsForceLeave() bool
- func (cd TopicCoordinator) IsISRReadyForWrite(myID string) bool
- func (cd TopicCoordinator) IsMineISR(id string) bool
- func (cd TopicCoordinator) IsMineLeaderSessionReady(id string) bool
- func (tc *TopicCoordinator) IsWriteDisabled() bool
- func (cd TopicCoordinator) SetForceLeave(leave bool)
- type TopicLeaderSession
- type TopicMetaInfo
- type TopicNameInfo
- type TopicPartitionID
- type TopicPartitionMetaInfo
- type TopicPartitionReplicaInfo
- type TopicReplicasInfo
- type WrapChannelConsumerOffset
Constants ¶
const ( ErrFailedOnNotLeader = "E_FAILED_ON_NOT_LEADER" ErrFailedOnNotWritable = "E_FAILED_ON_NOT_WRITABLE" )
const ( RATIO_BETWEEN_LEADER_FOLLOWER = 0.7 HIGHEST_PUB_QPS_LEVEL = 100 HIGHEST_LEFT_CONSUME_MB_SIZE = 50 * 1024 HIGHEST_LEFT_DATA_MB_SIZE = 200 * 1024 )
const ( ErrCodeEtcdNotReachable = 501 ErrCodeUnhandledHTTPStatus = 502 )
const ( NoClusterWriteDisable = 0 ClusterWriteDisabledForOrdered = 1 ClusterWriteDisabledForAll = 2 )
const ( EVENT_WATCH_TOPIC_L_CREATE = iota EVENT_WATCH_TOPIC_L_DELETE )
const ( MAX_WRITE_RETRY = 10 MAX_CATCHUP_RETRY = 5 MAX_LOG_PULL = 10000 MAX_LOG_PULL_BYTES = 1024 * 1024 * 32 MAX_CATCHUP_RUNNING = 8 API_BACKUP_DELAYED_QUEUE_DB = "/delayqueue/backupto" )
const ( RPC_TIMEOUT = time.Duration(time.Second * 5) RPC_TIMEOUT_SHORT = time.Duration(time.Second * 3) RPC_TIMEOUT_FOR_LOOKUP = time.Duration(time.Second * 3) )
const ( MAX_PARTITION_NUM = 255 MAX_SYNC_EVERY = 4000 MAX_RETENTION_DAYS = 60 )
const ( NSQ_TOPIC_DIR = "Topics" NSQ_TOPIC_META = "TopicMeta" NSQ_TOPIC_REPLICA_INFO = "ReplicaInfo" NSQ_TOPIC_LEADER_SESSION = "LeaderSession" NSQ_NODE_DIR = "NsqdNodes" NSQ_LOOKUPD_DIR = "NsqlookupdInfo" NSQ_LOOKUPD_NODE_DIR = "NsqlookupdNodes" NSQ_LOOKUPD_LEADER_SESSION = "LookupdLeaderSession" )
const (
ETCD_LOCK_NSQ_NAMESPACE = "nsq"
)
const (
ETCD_TTL = 30
)
const (
MAX_INCR_ID_BIT = 50
)
const (
RETRY_SLEEP = 200
)
Variables ¶
var ( ErrCommitLogWrongID = errors.New("commit log id is wrong") ErrCommitLogWrongLastID = errors.New("commit log last id should no less than log id") ErrCommitLogIDNotFound = errors.New("commit log id is not found") ErrCommitLogSearchNotFound = errors.New("search commit log data not found") ErrCommitLogOutofBound = errors.New("commit log offset is out of bound") ErrCommitLogEOF = errors.New("commit log end of file") ErrCommitLogOffsetInvalid = errors.New("commit log offset is invalid") ErrCommitLogPartitionExceed = errors.New("commit log partition id is exceeded") ErrCommitLogSearchFailed = errors.New("commit log data search failed") ErrCommitLogSegmentSizeInvalid = errors.New("commit log segment size is invalid") ErrCommitLogLessThanSegmentStart = errors.New("commit log read index is less than segment start") ErrCommitLogCleanKeepMin = errors.New("commit log clean should keep some data") )
var ( ErrTopicInfoNotFound = NewCoordErr("topic info not found", CoordClusterErr) ErrNotTopicLeader = NewCoordErrWithCode("not topic leader", CoordClusterNoRetryWriteErr, RpcErrNotTopicLeader) ErrEpochMismatch = NewCoordErrWithCode("commit epoch not match", CoordElectionTmpErr, RpcErrEpochMismatch) ErrEpochLessThanCurrent = NewCoordErrWithCode("epoch should be increased", CoordElectionErr, RpcErrEpochLessThanCurrent) ErrWriteQuorumFailed = NewCoordErrWithCode("write to quorum failed.", CoordElectionTmpErr, RpcErrWriteQuorumFailed) ErrCommitLogIDDup = NewCoordErrWithCode("commit id duplicated", CoordElectionErr, RpcErrCommitLogIDDup) ErrMissingTopicLeaderSession = NewCoordErrWithCode("missing topic leader session", CoordElectionErr, RpcErrMissingTopicLeaderSession) ErrLeaderSessionMismatch = NewCoordErrWithCode("leader session mismatch", CoordElectionTmpErr, RpcErrLeaderSessionMismatch) ErrWriteDisabled = NewCoordErrWithCode("write is disabled on the topic", CoordElectionErr, RpcErrWriteDisabled) ErrLeavingISRWait = NewCoordErrWithCode("leaving isr need wait.", CoordElectionTmpErr, RpcErrLeavingISRWait) ErrTopicCoordExistingAndMismatch = NewCoordErrWithCode("topic coordinator existing with a different partition", CoordClusterErr, RpcErrTopicCoordExistingAndMismatch) ErrTopicCoordTmpConflicted = NewCoordErrWithCode("topic coordinator is conflicted temporally", CoordClusterErr, RpcErrTopicCoordConflicted) ErrTopicLeaderChanged = NewCoordErrWithCode("topic leader changed", CoordElectionTmpErr, RpcErrTopicLeaderChanged) ErrTopicCommitLogEOF = NewCoordErrWithCode(ErrCommitLogEOF.Error(), CoordCommonErr, RpcErrCommitLogEOF) ErrTopicCommitLogOutofBound = NewCoordErrWithCode(ErrCommitLogOutofBound.Error(), CoordCommonErr, RpcErrCommitLogOutofBound) ErrTopicCommitLogLessThanSegmentStart = NewCoordErrWithCode(ErrCommitLogLessThanSegmentStart.Error(), CoordCommonErr, RpcErrCommitLogLessThanSegmentStart) ErrTopicCommitLogNotConsistent = NewCoordErrWithCode("topic commit log is not consistent", CoordClusterErr, RpcCommonErr) ErrMissingTopicCoord = NewCoordErrWithCode("missing topic coordinator", CoordClusterErr, RpcErrMissingTopicCoord) ErrTopicLoading = NewCoordErrWithCode("topic is still loading data", CoordLocalTmpErr, RpcErrTopicLoading) ErrTopicExiting = NewCoordErr("topic coordinator is exiting", CoordLocalTmpErr) ErrTopicExitingOnSlave = NewCoordErr("topic coordinator is exiting on slave", CoordTmpErr) ErrTopicCoordStateInvalid = NewCoordErrWithCode("invalid coordinator state", CoordClusterErr, RpcErrTopicCoordStateInvalid) ErrTopicSlaveInvalid = NewCoordErrWithCode("topic slave has some invalid state", CoordSlaveErr, RpcErrSlaveStateInvalid) ErrTopicLeaderSessionInvalid = NewCoordErrWithCode("topic leader session is invalid", CoordElectionTmpErr, RpcCommonErr) ErrTopicWriteOnNonISR = NewCoordErrWithCode("topic write on a node not in ISR", CoordTmpErr, RpcErrWriteOnNonISR) ErrTopicISRNotEnough = NewCoordErrWithCode("topic isr nodes not enough", CoordTmpErr, RpcCommonErr) ErrClusterChanged = NewCoordErrWithCode("cluster changed ", CoordTmpErr, RpcNoErr) ErrTopicMissingDelayedLog = NewCoordErrWithCode("topic missing delayed queue log", CoordLocalErr, RpcNoErr) ErrRpcMethodUnknown = NewCoordErrWithCode("rpc method unknown", CoordClusterErr, RpcCommonErr) ErrPubArgError = NewCoordErr("pub argument error", CoordCommonErr) ErrTopicNotRelated = NewCoordErr("topic not related to me", CoordCommonErr) ErrTopicCatchupAlreadyRunning = NewCoordErr("topic is already running catchup", CoordCommonErr) ErrTopicArgError = NewCoordErr("topic argument error", CoordCommonErr) ErrOperationExpired = NewCoordErr("operation has expired since wait too long", CoordCommonErr) ErrCatchupRunningBusy = NewCoordErr("too much running catchup", CoordCommonErr) ErrMissingTopicLog = NewCoordErr("missing topic log ", CoordLocalErr) ErrLocalTopicPartitionMismatch = NewCoordErr("local topic partition not match", CoordLocalErr) ErrLocalFallBehind = NewCoordErr("local data fall behind", CoordElectionErr) ErrLocalForwardThanLeader = NewCoordErr("local data is more than leader", CoordElectionErr) ErrLocalSetChannelOffsetNotFirstClient = NewCoordErr("failed to set channel offset since not first client", CoordLocalErr) ErrLocalMissingTopic = NewCoordErr("local topic missing", CoordLocalErr) ErrLocalNotReadyForWrite = NewCoordErr("local topic is not ready for write.", CoordLocalErr) ErrLocalInitTopicFailed = NewCoordErr("local topic init failed", CoordLocalErr) ErrLocalInitTopicCoordFailed = NewCoordErr("topic coordinator init failed", CoordLocalErr) ErrLocalTopicDataCorrupt = NewCoordErr("local topic data corrupt", CoordLocalErr) ErrLocalChannelPauseFailed = NewCoordErr("local channel pause/unpause failed", CoordLocalErr) ErrLocalChannelSkipFailed = NewCoordErr("local channel skip/unskip failed", CoordLocalErr) ErrLocalChannelSkipZanTestFailed = NewCoordErr("local channel skip/unskip zan test failed", CoordLocalErr) ErrLocalDelayedQueueMissing = NewCoordErr("local delayed queue is missing", CoordLocalErr) )
var ( ErrNodeIsExcludedForTopicData = errors.New("destination node is excluded for topic") ErrClusterBalanceRunning = errors.New("another balance is running, should wait") )
var ( ErrLeaderSessionAlreadyExist = errors.New("leader session already exist") ErrLeaderSessionNotExist = errors.New("session not exist") ErrKeyAlreadyExist = errors.New("Key already exist") ErrKeyNotFound = errors.New("Key not found") )
var ( MaxRetryWait = time.Second * 3 ForceFixLeaderData = false MaxTopicRetentionSizePerDay = int64(1024 * 1024 * 1024 * 16) )
var ( ErrAlreadyExist = errors.New("already exist") ErrTopicNotCreated = errors.New("topic is not created") ErrWaitingLeaderRelease = errors.New("leader session is still alive") ErrNotNsqLookupLeader = errors.New("Not nsqlookup leader") ErrClusterUnstable = errors.New("the cluster is unstable") ErrLeaderNodeLost = NewCoordErr("leader node is lost", CoordTmpErr) ErrNodeNotFound = NewCoordErr("node not found", CoordCommonErr) ErrLeaderElectionFail = NewCoordErr("Leader election failed.", CoordElectionTmpErr) ErrNoLeaderCanBeElected = NewCoordErr("No leader can be elected", CoordElectionTmpErr) ErrJoinISRInvalid = NewCoordErr("Join ISR failed", CoordCommonErr) ErrJoinISRTimeout = NewCoordErr("Join ISR timeout", CoordCommonErr) ErrWaitingJoinISR = NewCoordErr("The topic is waiting node to join isr", CoordCommonErr) ErrLeaderSessionNotReleased = NewCoordErr("The topic leader session is not released", CoordElectionTmpErr) ErrTopicISRCatchupEnough = NewCoordErr("the topic isr and catchup nodes are enough", CoordTmpErr) ErrClusterNodeRemoving = NewCoordErr("the node is mark as removed", CoordTmpErr) ErrTopicNodeConflict = NewCoordErr("the topic node info is conflicted", CoordElectionErr) ErrLeadershipServerUnstable = NewCoordErr("the leadership server is unstable", CoordTmpErr) )
var DEFAULT_COMMIT_BUF_SIZE = 64
var LOGROTATE_NUM = 500000
var MAX_COMMIT_BUF_SIZE = 1024
var MIN_KEEP_LOG_ITEM = 1000
var NSQ_ROOT_DIR = "NSQMetaData"
this is default value
Functions ¶
func GenNsqLookupNodeID ¶
func GenNsqLookupNodeID(n *NsqLookupdNodeInfo, extra string) string
func GenNsqdNodeID ¶
func GenNsqdNodeID(n *NsqdNodeInfo, extra string) string
func GetTopicPartitionBasePath ¶
func GetTopicPartitionFileName ¶
func NewNsqdCoordGRpcServer ¶
func NewNsqdCoordGRpcServer(coord *NsqdCoordinator, rootPath string) *nsqdCoordGRpcServer
func SetCoordLogger ¶
func SetCoordLogger(log levellogger.Logger, level int32)
Types ¶
type By ¶
type By func(l, r *NodeTopicStats) bool
func (By) Sort ¶
func (by By) Sort(statList []NodeTopicStats)
type CatchupStat ¶
type ChannelConsumeMgr ¶
func (*ChannelConsumeMgr) Clear ¶
func (cc *ChannelConsumeMgr) Clear()
func (*ChannelConsumeMgr) Get ¶
func (cc *ChannelConsumeMgr) Get(ch string) (ChannelConsumerOffset, bool)
func (*ChannelConsumeMgr) GetSyncedChs ¶
func (cc *ChannelConsumeMgr) GetSyncedChs() []string
func (*ChannelConsumeMgr) Update ¶
func (cc *ChannelConsumeMgr) Update(ch string, cco ChannelConsumerOffset)
func (*ChannelConsumeMgr) UpdateSyncedChs ¶
func (cc *ChannelConsumeMgr) UpdateSyncedChs(names []string)
type ChannelConsumerOffset ¶
type ChannelConsumerOffset struct { VOffset int64 VCnt int64 Flush bool AllowBackward bool ConfirmedInterval []nsqd.MsgQueueInterval NeedUpdateConfirmed bool }
func (*ChannelConsumerOffset) IsSame ¶
func (cco *ChannelConsumerOffset) IsSame(other *ChannelConsumerOffset) bool
type CntComparator ¶
type CntComparator int64
func (CntComparator) GreatThanRightBoundary ¶
func (self CntComparator) GreatThanRightBoundary(l *CommitLogData) bool
func (CntComparator) LessThanLeftBoundary ¶
func (self CntComparator) LessThanLeftBoundary(l *CommitLogData) bool
func (CntComparator) SearchEndBoundary ¶
func (self CntComparator) SearchEndBoundary() int64
type CommitLogData ¶
type CommitLogData struct { LogID int64 // epoch for the topic leader Epoch EpochType // if single message, this should be the same with logid // for multiple messages, this would be the logid for the last message in batch LastMsgLogID int64 MsgOffset int64 // size for batch messages MsgSize int32 // the total message count for all from begin, not only this batch MsgCnt int64 // the message number for current commit MsgNum int32 }
type CommonCoordErr ¶
type CommonCoordErr struct {
CoordErr
}
func (*CommonCoordErr) Error ¶
func (self *CommonCoordErr) Error() string
type ConsistentStore ¶
type CoordErr ¶
type CoordErr struct { ErrMsg string ErrCode ErrRPCRetCode ErrType CoordErrType }
note: since the gorpc will treat error type as special, we should not implement the error interface for CoordErr as response type
func NewCoordErr ¶
func NewCoordErr(msg string, etype CoordErrType) *CoordErr
func NewCoordErrWithCode ¶
func NewCoordErrWithCode(msg string, etype CoordErrType, code ErrRPCRetCode) *CoordErr
type CoordErrStats ¶
type CoordErrStats struct { sync.Mutex CoordErrStatsData }
func (*CoordErrStats) GetCopy ¶
func (self *CoordErrStats) GetCopy() *CoordErrStatsData
type CoordErrStatsData ¶
type CoordErrType ¶
type CoordErrType int
const ( CoordNoErr CoordErrType = iota CoordCommonErr CoordNetErr CoordElectionErr CoordElectionTmpErr CoordClusterErr CoordSlaveErr CoordLocalErr CoordLocalTmpErr CoordTmpErr CoordClusterNoRetryWriteErr )
type CoordStats ¶
type CoordStats struct { RpcStats *gorpc.ConnStats `json:"rpc_stats"` ErrStats CoordErrStatsData TopicCoordStats []TopicCoordStat `json:"topic_coord_stats"` }
type DataPlacement ¶
type DataPlacement struct {
// contains filtered or unexported fields
}
func NewDataPlacement ¶
func NewDataPlacement(coord *NsqLookupCoordinator) *DataPlacement
func (*DataPlacement) DoBalance ¶
func (dpm *DataPlacement) DoBalance(monitorChan chan struct{})
func (*DataPlacement) SetBalanceInterval ¶
func (dpm *DataPlacement) SetBalanceInterval(start int, end int)
type EVENT_TYPE ¶
type EVENT_TYPE int
const ( MASTER_ADD EVENT_TYPE = iota MASTER_DELETE MASTER_MODIFY MASTER_ERROR )
type ErrRPCRetCode ¶
type ErrRPCRetCode int
const ( RpcNoErr ErrRPCRetCode = iota RpcCommonErr )
const ( RpcErrLeavingISRWait ErrRPCRetCode = iota + 10 RpcErrNotTopicLeader RpcErrNoLeader RpcErrEpochMismatch RpcErrEpochLessThanCurrent RpcErrWriteQuorumFailed RpcErrCommitLogIDDup RpcErrCommitLogEOF RpcErrCommitLogOutofBound RpcErrCommitLogLessThanSegmentStart RpcErrMissingTopicLeaderSession RpcErrLeaderSessionMismatch RpcErrWriteDisabled RpcErrTopicNotExist RpcErrMissingTopicCoord RpcErrTopicCoordExistingAndMismatch RpcErrTopicCoordConflicted RpcErrTopicLeaderChanged RpcErrTopicLoading RpcErrSlaveStateInvalid RpcErrTopicCoordStateInvalid RpcErrWriteOnNonISR )
type EtcdClient ¶
type EtcdClient struct {
// contains filtered or unexported fields
}
func NewEClient ¶
func NewEClient(host, userName, pwd string) (*EtcdClient, error)
func (*EtcdClient) CompareAndDelete ¶
func (*EtcdClient) CompareAndSwap ¶
func (*EtcdClient) Create ¶
func (*EtcdClient) CreateDir ¶
func (*EtcdClient) CreateInOrder ¶
func (*EtcdClient) Delete ¶
func (*EtcdClient) Get ¶
func (*EtcdClient) GetNewest ¶
func (*EtcdClient) Set ¶
func (*EtcdClient) SetWithTTL ¶
func (*EtcdClient) Update ¶
type EtcdLock ¶
func (*EtcdLock) GetEventsChan ¶
func (self *EtcdLock) GetEventsChan() <-chan *MasterEvent
type ICommitLogComparator ¶
type ICommitLogComparator interface { SearchEndBoundary() int64 LessThanLeftBoundary(l *CommitLogData) bool GreatThanRightBoundary(l *CommitLogData) bool }
type ILocalLogQueue ¶
type ILocalLogQueue interface { IsDataNeedFix() bool SetDataFixState(bool) ForceFlush() ResetBackendEndNoLock(nsqd.BackendOffset, int64) error ResetBackendWithQueueStartNoLock(int64, int64) error GetDiskQueueSnapshot() *nsqd.DiskQueueSnapshot TotalMessageCnt() uint64 TotalDataSize() int64 PutRawDataOnReplica(rawData []byte, offset nsqd.BackendOffset, checkSize int64, msgNum int32) (nsqd.BackendQueueEnd, error) PutMessageOnReplica(msgs *nsqd.Message, offset nsqd.BackendOffset, checkSize int64) (nsqd.BackendQueueEnd, error) TryCleanOldData(retentionSize int64, noRealClean bool, maxCleanOffset nsqd.BackendOffset) (nsqd.BackendQueueEnd, error) }
type INsqlookupRemoteProxy ¶
type INsqlookupRemoteProxy interface { RemoteAddr() string Reconnect() error Close() RequestJoinCatchup(topic string, partition int, nid string) *CoordErr RequestJoinTopicISR(topic string, partition int, nid string) *CoordErr ReadyForTopicISR(topic string, partition int, nid string, leaderSession *TopicLeaderSession, joinISRSession string) *CoordErr RequestLeaveFromISR(topic string, partition int, nid string) *CoordErr RequestLeaveFromISRByLeader(topic string, partition int, nid string, leaderSession *TopicLeaderSession) *CoordErr RequestNotifyNewTopicInfo(topic string, partition int, nid string) RequestCheckTopicConsistence(topic string, partition int) }
type ISRStat ¶
type JoinISRState ¶
type LFListT ¶
type LFListT []loadFactorInfo
type LogStartInfo ¶
type Master ¶
type Master interface { Start() Stop() GetEventsChan() <-chan *MasterEvent GetKey() string GetMaster() string TryAcquire() (ret error) }
func NewMaster ¶
func NewMaster(etcdClient *EtcdClient, name, value string, ttl uint64) Master
type MasterChanInfo ¶
type MasterChanInfo struct {
// contains filtered or unexported fields
}
type MasterEvent ¶
type MasterEvent struct { Type EVENT_TYPE Master string ModifiedIndex uint64 }
type MsgIDComparator ¶
type MsgIDComparator int64
func (MsgIDComparator) GreatThanRightBoundary ¶
func (self MsgIDComparator) GreatThanRightBoundary(l *CommitLogData) bool
func (MsgIDComparator) LessThanLeftBoundary ¶
func (self MsgIDComparator) LessThanLeftBoundary(l *CommitLogData) bool
func (MsgIDComparator) SearchEndBoundary ¶
func (self MsgIDComparator) SearchEndBoundary() int64
type MsgOffsetComparator ¶
type MsgOffsetComparator int64
func (MsgOffsetComparator) GreatThanRightBoundary ¶
func (self MsgOffsetComparator) GreatThanRightBoundary(l *CommitLogData) bool
func (MsgOffsetComparator) LessThanLeftBoundary ¶
func (self MsgOffsetComparator) LessThanLeftBoundary(l *CommitLogData) bool
func (MsgOffsetComparator) SearchEndBoundary ¶
func (self MsgOffsetComparator) SearchEndBoundary() int64
type MsgTimestampComparator ¶
type MsgTimestampComparator struct {
// contains filtered or unexported fields
}
func (*MsgTimestampComparator) GreatThanRightBoundary ¶
func (ncoord *MsgTimestampComparator) GreatThanRightBoundary(l *CommitLogData) bool
func (*MsgTimestampComparator) LessThanLeftBoundary ¶
func (ncoord *MsgTimestampComparator) LessThanLeftBoundary(l *CommitLogData) bool
func (*MsgTimestampComparator) SearchEndBoundary ¶
func (ncoord *MsgTimestampComparator) SearchEndBoundary() int64
type NSQDLeadership ¶
type NSQDLeadership interface { InitClusterID(id string) RegisterNsqd(nodeData *NsqdNodeInfo) error // update UnregisterNsqd(nodeData *NsqdNodeInfo) error // try create the topic leadership key and no need to retry if the key already exist AcquireTopicLeader(topic string, partition int, nodeData *NsqdNodeInfo, epoch EpochType) error // release the session key using the acquired session. should check current session epoch // to avoid release the changed session ReleaseTopicLeader(topic string, partition int, session *TopicLeaderSession) error // all registered lookup nodes. GetAllLookupdNodes() ([]NsqLookupdNodeInfo, error) // get the newest lookup leader and watch the change of it. WatchLookupdLeader(leader chan *NsqLookupdNodeInfo, stop chan struct{}) error GetTopicInfo(topic string, partition int) (*TopicPartitionMetaInfo, error) // get leadership information, if not exist should return ErrLeaderSessionNotExist as error GetTopicLeaderSession(topic string, partition int) (*TopicLeaderSession, error) IsTopicRealDeleted(topic string) (bool, error) }
type NSQLookupdLeadership ¶
type NSQLookupdLeadership interface { InitClusterID(id string) Register(value *NsqLookupdNodeInfo) error Unregister(value *NsqLookupdNodeInfo) error Stop() // the cluster root modify index GetClusterEpoch() (EpochType, error) GetAllLookupdNodes() ([]NsqLookupdNodeInfo, error) // add AcquireAndWatchLeader(leader chan *NsqLookupdNodeInfo, stop chan struct{}) CheckIfLeader(session string) bool UpdateLookupEpoch(oldGen EpochType) (EpochType, error) GetNsqdNodes() ([]NsqdNodeInfo, error) // watching the cluster nsqd node, should return the newest for the first time. WatchNsqdNodes(nsqds chan []NsqdNodeInfo, stop chan struct{}) // get all topics info, should cache the newest to improve performance. ScanTopics() ([]TopicPartitionMetaInfo, error) GetAllTopicMetas() (map[string]TopicMetaInfo, error) // should return both the meta info for topic and the replica info for topic partition // epoch should be updated while return GetTopicInfo(topic string, partition int) (*TopicPartitionMetaInfo, error) // create and write the meta info to topic meta node CreateTopic(topic string, meta *TopicMetaInfo) error // create topic partition path CreateTopicPartition(topic string, partition int) error IsExistTopic(topic string) (bool, error) IsExistTopicPartition(topic string, partition int) (bool, error) // get topic meta info only GetTopicMetaInfo(topic string) (TopicMetaInfo, EpochType, error) GetTopicMetaInfoTryCache(topic string) (TopicMetaInfo, error) UpdateTopicMetaInfo(topic string, meta *TopicMetaInfo, oldGen EpochType) error DeleteTopic(topic string, partition int) error DeleteWholeTopic(topic string) error // // update the replica info about leader, isr, epoch for partition // Note: update should do check-and-set to avoid unexpected override. // the epoch in topicInfo should be updated to the new epoch // if no topic partition replica info node should create only once. UpdateTopicNodeInfo(topic string, partition int, topicInfo *TopicPartitionReplicaInfo, oldGen EpochType) error // get leadership information, if not exist should return ErrLeaderSessionNotExist as error GetTopicLeaderSession(topic string, partition int) (*TopicLeaderSession, error) // watch any leadership lock change for all topic partitions, should return the token used later by release. //WatchTopicLeader(leader chan *TopicLeaderSession, stop chan struct{}) error // only leader lookup can do the release, normally notify the nsqd node do the release by itself. // lookup node should release only when the nsqd is lost ReleaseTopicLeader(topic string, partition int, session *TopicLeaderSession) error // get topic meta info map with passing topics slice GetTopicsMetaInfoMap(topics []string) (map[string]TopicMetaInfo, error) }
We need check leader lock session before do any modify to etcd. Make sure all returned value should be copied to avoid modify by outside.
type NodeTopicStats ¶
type NodeTopicStats struct { NodeID string // the data (MB) need to be consumed on the leader for all channels in the topic. ChannelDepthData map[string]int64 // the data left on disk. unit: MB TopicLeaderDataSize map[string]int64 TopicTotalDataSize map[string]int64 NodeCPUs int // the pub stat for past 24hr TopicHourlyPubDataList map[string][24]int64 ChannelNum map[string]int ChannelList map[string][]string ChannelMetas map[string][]nsqd.ChannelMetaInfo ChannelOffsets map[string][]WrapChannelConsumerOffset }
func NewNodeTopicStats ¶
func NewNodeTopicStats(nid string, cap int, cpus int) *NodeTopicStats
func (*NodeTopicStats) GetMostBusyAndIdleTopicWriteLevel ¶
func (*NodeTopicStats) GetNodeAvgReadLevel ¶
func (nts *NodeTopicStats) GetNodeAvgReadLevel() float64
func (*NodeTopicStats) GetNodeAvgWriteLevel ¶
func (nts *NodeTopicStats) GetNodeAvgWriteLevel() float64
func (*NodeTopicStats) GetNodeLeaderLoadFactor ¶
func (nts *NodeTopicStats) GetNodeLeaderLoadFactor() float64
func (*NodeTopicStats) GetNodeLoadFactor ¶
func (nts *NodeTopicStats) GetNodeLoadFactor() (float64, float64)
func (*NodeTopicStats) GetNodePeakLevelList ¶
func (nts *NodeTopicStats) GetNodePeakLevelList() []int64
func (*NodeTopicStats) GetSortedTopicWriteLevel ¶
func (nts *NodeTopicStats) GetSortedTopicWriteLevel(leaderOnly bool) LFListT
func (*NodeTopicStats) GetTopicAvgWriteLevel ¶
func (nts *NodeTopicStats) GetTopicAvgWriteLevel(topicFullName string) float64
func (*NodeTopicStats) GetTopicLeaderLoadFactor ¶
func (nts *NodeTopicStats) GetTopicLeaderLoadFactor(topicFullName string) float64
func (*NodeTopicStats) GetTopicLoadFactor ¶
func (nts *NodeTopicStats) GetTopicLoadFactor(topicFullName string) float64
func (*NodeTopicStats) GetTopicPeakLevel ¶
func (nts *NodeTopicStats) GetTopicPeakLevel(topic TopicPartitionID) float64
func (*NodeTopicStats) LeaderLessLoader ¶
func (nts *NodeTopicStats) LeaderLessLoader(other *NodeTopicStats) bool
func (*NodeTopicStats) SlaveLessLoader ¶
func (nts *NodeTopicStats) SlaveLessLoader(other *NodeTopicStats) bool
type NsqLookupCoordRpcServer ¶
type NsqLookupCoordRpcServer struct {
// contains filtered or unexported fields
}
func NewNsqLookupCoordRpcServer ¶
func NewNsqLookupCoordRpcServer(coord *NsqLookupCoordinator) *NsqLookupCoordRpcServer
func (*NsqLookupCoordRpcServer) ReadyForTopicISR ¶
func (nlcoord *NsqLookupCoordRpcServer) ReadyForTopicISR(req *RpcReadyForISR) *CoordErr
func (*NsqLookupCoordRpcServer) RequestCheckTopicConsistence ¶
func (nlcoord *NsqLookupCoordRpcServer) RequestCheckTopicConsistence(req *RpcReqCheckTopic) *CoordErr
func (*NsqLookupCoordRpcServer) RequestJoinCatchup ¶
func (nlcoord *NsqLookupCoordRpcServer) RequestJoinCatchup(req *RpcReqJoinCatchup) *CoordErr
func (*NsqLookupCoordRpcServer) RequestJoinTopicISR ¶
func (nlcoord *NsqLookupCoordRpcServer) RequestJoinTopicISR(req *RpcReqJoinISR) *CoordErr
func (*NsqLookupCoordRpcServer) RequestLeaveFromISR ¶
func (nlcoord *NsqLookupCoordRpcServer) RequestLeaveFromISR(req *RpcReqLeaveFromISR) *CoordErr
func (*NsqLookupCoordRpcServer) RequestLeaveFromISRByLeader ¶
func (nlcoord *NsqLookupCoordRpcServer) RequestLeaveFromISRByLeader(req *RpcReqLeaveFromISRByLeader) *CoordErr
func (*NsqLookupCoordRpcServer) RequestNotifyNewTopicInfo ¶
func (nlcoord *NsqLookupCoordRpcServer) RequestNotifyNewTopicInfo(req *RpcReqNewTopicInfo) *CoordErr
type NsqLookupCoordinator ¶
type NsqLookupCoordinator struct {
// contains filtered or unexported fields
}
nsqlookup coordinator is used for the topic leader and isr coordinator, all the changes for leader or isr will be handled by the nsqlookup coordinator. In a cluster, only one nsqlookup coordinator will be the leader which will handle the event for cluster changes.
func NewNsqLookupCoordinator ¶
func NewNsqLookupCoordinator(cluster string, n *NsqLookupdNodeInfo, opts *Options) *NsqLookupCoordinator
func (*NsqLookupCoordinator) ChangeTopicMetaParam ¶
func (*NsqLookupCoordinator) CreateTopic ¶
func (nlcoord *NsqLookupCoordinator) CreateTopic(topic string, meta TopicMetaInfo) error
func (*NsqLookupCoordinator) DeleteTopic ¶
func (nlcoord *NsqLookupCoordinator) DeleteTopic(topic string, partition string) error
func (*NsqLookupCoordinator) DeleteTopicForce ¶
func (nlcoord *NsqLookupCoordinator) DeleteTopicForce(topic string, partition string) error
func (*NsqLookupCoordinator) ExpandTopicPartition ¶
func (nlcoord *NsqLookupCoordinator) ExpandTopicPartition(topic string, newPartitionNum int) error
func (*NsqLookupCoordinator) GetAllLookupdNodes ¶
func (nlcoord *NsqLookupCoordinator) GetAllLookupdNodes() ([]NsqLookupdNodeInfo, error)
func (*NsqLookupCoordinator) GetClusterNodeLoadFactor ¶
func (nlcoord *NsqLookupCoordinator) GetClusterNodeLoadFactor() (map[string]float64, map[string]float64)
func (*NsqLookupCoordinator) GetClusterTopNTopics ¶
func (nlcoord *NsqLookupCoordinator) GetClusterTopNTopics(n int) LFListT
func (*NsqLookupCoordinator) GetLookupLeader ¶
func (nlcoord *NsqLookupCoordinator) GetLookupLeader() NsqLookupdNodeInfo
func (*NsqLookupCoordinator) GetTopicLeaderNodes ¶
func (nlcoord *NsqLookupCoordinator) GetTopicLeaderNodes(topicName string) (map[string]string, error)
func (*NsqLookupCoordinator) GetTopicMetaInfo ¶
func (nlcoord *NsqLookupCoordinator) GetTopicMetaInfo(topicName string) (TopicMetaInfo, error)
func (*NsqLookupCoordinator) GetTopicsMetaInfoMap ¶
func (nlcoord *NsqLookupCoordinator) GetTopicsMetaInfoMap(topics []string) (map[string]TopicMetaInfo, error)
func (*NsqLookupCoordinator) IsClusterStable ¶
func (nlcoord *NsqLookupCoordinator) IsClusterStable() bool
func (*NsqLookupCoordinator) IsMineLeader ¶
func (nlcoord *NsqLookupCoordinator) IsMineLeader() bool
func (*NsqLookupCoordinator) IsTopicLeader ¶
func (nlcoord *NsqLookupCoordinator) IsTopicLeader(topic string, part int, nid string) bool
func (*NsqLookupCoordinator) MarkNodeAsRemoving ¶
func (nlcoord *NsqLookupCoordinator) MarkNodeAsRemoving(nid string) error
func (*NsqLookupCoordinator) MoveTopicPartitionDataByManual ¶
func (*NsqLookupCoordinator) SetClusterUpgradeState ¶
func (nlcoord *NsqLookupCoordinator) SetClusterUpgradeState(upgrading bool) error
func (*NsqLookupCoordinator) SetLeadershipMgr ¶
func (nlcoord *NsqLookupCoordinator) SetLeadershipMgr(l NSQLookupdLeadership)
func (*NsqLookupCoordinator) SetTopNBalance ¶
func (nlcoord *NsqLookupCoordinator) SetTopNBalance(enable bool) error
func (*NsqLookupCoordinator) Start ¶
func (nlcoord *NsqLookupCoordinator) Start() error
init and register to leader server
func (*NsqLookupCoordinator) Stop ¶
func (nlcoord *NsqLookupCoordinator) Stop()
type NsqLookupRpcClient ¶
func (*NsqLookupRpcClient) CallFast ¶
func (nlrpc *NsqLookupRpcClient) CallFast(method string, arg interface{}) (interface{}, error)
func (*NsqLookupRpcClient) CallWithRetry ¶
func (nlrpc *NsqLookupRpcClient) CallWithRetry(method string, arg interface{}) (interface{}, error)
func (*NsqLookupRpcClient) Close ¶
func (nlrpc *NsqLookupRpcClient) Close()
func (*NsqLookupRpcClient) ReadyForTopicISR ¶
func (nlrpc *NsqLookupRpcClient) ReadyForTopicISR(topic string, partition int, nid string, leaderSession *TopicLeaderSession, joinISRSession string) *CoordErr
func (*NsqLookupRpcClient) Reconnect ¶
func (nlrpc *NsqLookupRpcClient) Reconnect() error
func (*NsqLookupRpcClient) RemoteAddr ¶
func (nlrpc *NsqLookupRpcClient) RemoteAddr() string
func (*NsqLookupRpcClient) RequestCheckTopicConsistence ¶
func (nlrpc *NsqLookupRpcClient) RequestCheckTopicConsistence(topic string, partition int)
func (*NsqLookupRpcClient) RequestJoinCatchup ¶
func (nlrpc *NsqLookupRpcClient) RequestJoinCatchup(topic string, partition int, nid string) *CoordErr
func (*NsqLookupRpcClient) RequestJoinTopicISR ¶
func (nlrpc *NsqLookupRpcClient) RequestJoinTopicISR(topic string, partition int, nid string) *CoordErr
func (*NsqLookupRpcClient) RequestLeaveFromISR ¶
func (nlrpc *NsqLookupRpcClient) RequestLeaveFromISR(topic string, partition int, nid string) *CoordErr
func (*NsqLookupRpcClient) RequestLeaveFromISRByLeader ¶
func (nlrpc *NsqLookupRpcClient) RequestLeaveFromISRByLeader(topic string, partition int, nid string, leaderSession *TopicLeaderSession) *CoordErr
func (*NsqLookupRpcClient) RequestLeaveFromISRFast ¶
func (nlrpc *NsqLookupRpcClient) RequestLeaveFromISRFast(topic string, partition int, nid string) *CoordErr
func (*NsqLookupRpcClient) RequestNotifyNewTopicInfo ¶
func (nlrpc *NsqLookupRpcClient) RequestNotifyNewTopicInfo(topic string, partition int, nid string)
func (*NsqLookupRpcClient) ShouldRemoved ¶
func (nlrpc *NsqLookupRpcClient) ShouldRemoved() bool
type NsqLookupdEtcdMgr ¶
type NsqLookupdEtcdMgr struct {
// contains filtered or unexported fields
}
func NewNsqLookupdEtcdMgr ¶
func NewNsqLookupdEtcdMgr(host, username, pwd string) (*NsqLookupdEtcdMgr, error)
func (*NsqLookupdEtcdMgr) AcquireAndWatchLeader ¶
func (self *NsqLookupdEtcdMgr) AcquireAndWatchLeader(leader chan *NsqLookupdNodeInfo, stop chan struct{})
func (*NsqLookupdEtcdMgr) CheckIfLeader ¶
func (self *NsqLookupdEtcdMgr) CheckIfLeader(session string) bool
func (*NsqLookupdEtcdMgr) CreateTopic ¶
func (self *NsqLookupdEtcdMgr) CreateTopic(topic string, meta *TopicMetaInfo) error
func (*NsqLookupdEtcdMgr) CreateTopicPartition ¶
func (self *NsqLookupdEtcdMgr) CreateTopicPartition(topic string, partition int) error
func (*NsqLookupdEtcdMgr) DeleteTopic ¶
func (self *NsqLookupdEtcdMgr) DeleteTopic(topic string, partition int) error
func (*NsqLookupdEtcdMgr) DeleteWholeTopic ¶
func (self *NsqLookupdEtcdMgr) DeleteWholeTopic(topic string) error
func (*NsqLookupdEtcdMgr) GetAllLookupdNodes ¶
func (self *NsqLookupdEtcdMgr) GetAllLookupdNodes() ([]NsqLookupdNodeInfo, error)
TODO: cache this to improve performance
func (*NsqLookupdEtcdMgr) GetAllTopicMetas ¶
func (self *NsqLookupdEtcdMgr) GetAllTopicMetas() (map[string]TopicMetaInfo, error)
func (*NsqLookupdEtcdMgr) GetClusterEpoch ¶
func (self *NsqLookupdEtcdMgr) GetClusterEpoch() (EpochType, error)
func (*NsqLookupdEtcdMgr) GetNsqdNodes ¶
func (self *NsqLookupdEtcdMgr) GetNsqdNodes() ([]NsqdNodeInfo, error)
func (*NsqLookupdEtcdMgr) GetTopicInfo ¶
func (self *NsqLookupdEtcdMgr) GetTopicInfo(topic string, partition int) (*TopicPartitionMetaInfo, error)
func (*NsqLookupdEtcdMgr) GetTopicLeaderSession ¶
func (self *NsqLookupdEtcdMgr) GetTopicLeaderSession(topic string, partition int) (*TopicLeaderSession, error)
func (*NsqLookupdEtcdMgr) GetTopicMetaInfo ¶
func (self *NsqLookupdEtcdMgr) GetTopicMetaInfo(topic string) (TopicMetaInfo, EpochType, error)
func (*NsqLookupdEtcdMgr) GetTopicMetaInfoTryCache ¶
func (self *NsqLookupdEtcdMgr) GetTopicMetaInfoTryCache(topic string) (TopicMetaInfo, error)
func (*NsqLookupdEtcdMgr) GetTopicsMetaInfoMap ¶
func (self *NsqLookupdEtcdMgr) GetTopicsMetaInfoMap(topics []string) (map[string]TopicMetaInfo, error)
func (*NsqLookupdEtcdMgr) InitClusterID ¶
func (self *NsqLookupdEtcdMgr) InitClusterID(id string)
func (*NsqLookupdEtcdMgr) IsExistTopic ¶
func (self *NsqLookupdEtcdMgr) IsExistTopic(topic string) (bool, error)
func (*NsqLookupdEtcdMgr) IsExistTopicPartition ¶
func (self *NsqLookupdEtcdMgr) IsExistTopicPartition(topic string, partitionNum int) (bool, error)
func (*NsqLookupdEtcdMgr) Register ¶
func (self *NsqLookupdEtcdMgr) Register(value *NsqLookupdNodeInfo) error
func (*NsqLookupdEtcdMgr) ReleaseTopicLeader ¶
func (self *NsqLookupdEtcdMgr) ReleaseTopicLeader(topic string, partition int, session *TopicLeaderSession) error
func (*NsqLookupdEtcdMgr) ScanTopics ¶
func (self *NsqLookupdEtcdMgr) ScanTopics() ([]TopicPartitionMetaInfo, error)
func (*NsqLookupdEtcdMgr) Stop ¶
func (self *NsqLookupdEtcdMgr) Stop()
func (*NsqLookupdEtcdMgr) Unregister ¶
func (self *NsqLookupdEtcdMgr) Unregister(value *NsqLookupdNodeInfo) error
func (*NsqLookupdEtcdMgr) UpdateLookupEpoch ¶
func (self *NsqLookupdEtcdMgr) UpdateLookupEpoch(oldGen EpochType) (EpochType, error)
func (*NsqLookupdEtcdMgr) UpdateTopicMetaInfo ¶
func (self *NsqLookupdEtcdMgr) UpdateTopicMetaInfo(topic string, meta *TopicMetaInfo, oldGen EpochType) error
func (*NsqLookupdEtcdMgr) UpdateTopicNodeInfo ¶
func (self *NsqLookupdEtcdMgr) UpdateTopicNodeInfo(topic string, partition int, topicInfo *TopicPartitionReplicaInfo, oldGen EpochType) error
func (*NsqLookupdEtcdMgr) WatchNsqdNodes ¶
func (self *NsqLookupdEtcdMgr) WatchNsqdNodes(nsqds chan []NsqdNodeInfo, stop chan struct{})
type NsqLookupdNodeInfo ¶
type NsqLookupdNodeInfo struct { ID string NodeIP string TcpPort string HttpPort string RpcPort string Epoch EpochType }
func (*NsqLookupdNodeInfo) GetID ¶
func (self *NsqLookupdNodeInfo) GetID() string
type NsqdCoordRpcServer ¶
type NsqdCoordRpcServer struct {
// contains filtered or unexported fields
}
func NewNsqdCoordRpcServer ¶
func NewNsqdCoordRpcServer(coord *NsqdCoordinator, rootPath string) *NsqdCoordRpcServer
func (*NsqdCoordRpcServer) DeleteChannel ¶
func (self *NsqdCoordRpcServer) DeleteChannel(info *RpcChannelOffsetArg) *CoordErr
func (*NsqdCoordRpcServer) DeleteNsqdTopic ¶
func (self *NsqdCoordRpcServer) DeleteNsqdTopic(rpcTopicReq *RpcAdminTopicInfo) *CoordErr
func (*NsqdCoordRpcServer) DisableTopicWrite ¶
func (self *NsqdCoordRpcServer) DisableTopicWrite(rpcTopicReq *RpcAdminTopicInfo) *CoordErr
func (*NsqdCoordRpcServer) EnableTopicWrite ¶
func (self *NsqdCoordRpcServer) EnableTopicWrite(rpcTopicReq *RpcAdminTopicInfo) *CoordErr
func (*NsqdCoordRpcServer) GetBackupedDelayedQueue ¶
func (self *NsqdCoordRpcServer) GetBackupedDelayedQueue(req *RpcGetBackupedDQReq) (*RpcGetBackupedDQRsp, error)
func (*NsqdCoordRpcServer) GetCommitLogFromOffset ¶
func (self *NsqdCoordRpcServer) GetCommitLogFromOffset(req *RpcCommitLogReq) *RpcCommitLogRsp
return the logdata from offset, if the offset is larger than local, then return the last logdata on local.
func (*NsqdCoordRpcServer) GetDelayedQueueCommitLogFromOffset ¶
func (self *NsqdCoordRpcServer) GetDelayedQueueCommitLogFromOffset(req *RpcCommitLogReq) *RpcCommitLogRsp
func (*NsqdCoordRpcServer) GetDelayedQueueFullSyncInfo ¶
func (self *NsqdCoordRpcServer) GetDelayedQueueFullSyncInfo(req *RpcGetFullSyncInfoReq) (*RpcGetFullSyncInfoRsp, error)
func (*NsqdCoordRpcServer) GetFullSyncInfo ¶
func (self *NsqdCoordRpcServer) GetFullSyncInfo(req *RpcGetFullSyncInfoReq) (*RpcGetFullSyncInfoRsp, error)
func (*NsqdCoordRpcServer) GetLastCommitLogID ¶
func (self *NsqdCoordRpcServer) GetLastCommitLogID(req *RpcCommitLogReq) (int64, error)
func (*NsqdCoordRpcServer) GetLastDelayedQueueCommitLogID ¶
func (self *NsqdCoordRpcServer) GetLastDelayedQueueCommitLogID(req *RpcCommitLogReq) (int64, error)
func (*NsqdCoordRpcServer) GetNodeInfo ¶
func (self *NsqdCoordRpcServer) GetNodeInfo(req *RpcNodeInfoReq) (*RpcNodeInfoRsp, error)
func (*NsqdCoordRpcServer) GetTopicStats ¶
func (self *NsqdCoordRpcServer) GetTopicStats(topic string) *NodeTopicStats
func (*NsqdCoordRpcServer) IsTopicWriteDisabled ¶
func (self *NsqdCoordRpcServer) IsTopicWriteDisabled(rpcTopicReq *RpcAdminTopicInfo) bool
func (*NsqdCoordRpcServer) NotifyAcquireTopicLeader ¶
func (self *NsqdCoordRpcServer) NotifyAcquireTopicLeader(rpcTopicReq *RpcAcquireTopicLeaderReq) *CoordErr
func (*NsqdCoordRpcServer) NotifyReleaseTopicLeader ¶
func (self *NsqdCoordRpcServer) NotifyReleaseTopicLeader(rpcTopicReq *RpcReleaseTopicLeaderReq) *CoordErr
func (*NsqdCoordRpcServer) NotifyTopicLeaderSession ¶
func (self *NsqdCoordRpcServer) NotifyTopicLeaderSession(rpcTopicReq *RpcTopicLeaderSession) *CoordErr
func (*NsqdCoordRpcServer) PullCommitLogsAndData ¶
func (self *NsqdCoordRpcServer) PullCommitLogsAndData(req *RpcPullCommitLogsReq) (*RpcPullCommitLogsRsp, error)
func (*NsqdCoordRpcServer) PullDelayedQueueCommitLogsAndData ¶
func (self *NsqdCoordRpcServer) PullDelayedQueueCommitLogsAndData(req *RpcPullCommitLogsReq) (*RpcPullCommitLogsRsp, error)
func (*NsqdCoordRpcServer) PutDelayedMessage ¶
func (self *NsqdCoordRpcServer) PutDelayedMessage(info *RpcPutMessage) *CoordErr
receive from leader
func (*NsqdCoordRpcServer) PutMessage ¶
func (self *NsqdCoordRpcServer) PutMessage(info *RpcPutMessage) *CoordErr
receive from leader
func (*NsqdCoordRpcServer) PutMessages ¶
func (self *NsqdCoordRpcServer) PutMessages(info *RpcPutMessages) *CoordErr
func (*NsqdCoordRpcServer) TestRpcCoordErr ¶
func (self *NsqdCoordRpcServer) TestRpcCoordErr(req *RpcTestReq) *CoordErr
func (*NsqdCoordRpcServer) TestRpcError ¶
func (self *NsqdCoordRpcServer) TestRpcError(req *RpcTestReq) *RpcTestRsp
func (*NsqdCoordRpcServer) TestRpcTimeout ¶
func (self *NsqdCoordRpcServer) TestRpcTimeout() error
func (*NsqdCoordRpcServer) TriggerLookupChanged ¶
func (self *NsqdCoordRpcServer) TriggerLookupChanged() error
func (*NsqdCoordRpcServer) UpdateChannelList ¶
func (self *NsqdCoordRpcServer) UpdateChannelList(info *RpcChannelListArg) *CoordErr
func (*NsqdCoordRpcServer) UpdateChannelOffset ¶
func (self *NsqdCoordRpcServer) UpdateChannelOffset(info *RpcChannelOffsetArg) *CoordErr
func (*NsqdCoordRpcServer) UpdateChannelState ¶
func (self *NsqdCoordRpcServer) UpdateChannelState(state *RpcChannelState) *CoordErr
func (*NsqdCoordRpcServer) UpdateDelayedQueueState ¶
func (self *NsqdCoordRpcServer) UpdateDelayedQueueState(info *RpcConfirmedDelayedCursor) *CoordErr
func (*NsqdCoordRpcServer) UpdateTopicInfo ¶
func (self *NsqdCoordRpcServer) UpdateTopicInfo(rpcTopicReq *RpcAdminTopicInfo) *CoordErr
type NsqdCoordinator ¶
type NsqdCoordinator struct {
// contains filtered or unexported fields
}
func NewNsqdCoordinator ¶
func NewNsqdCoordinator(cluster, ip, tcpport, rpcport, httpport, extraID string, rootPath string, nsqd *nsqd.NSQD) *NsqdCoordinator
func (*NsqdCoordinator) DeleteChannel ¶
func (ncoord *NsqdCoordinator) DeleteChannel(topic *nsqd.Topic, channelName string) error
func (*NsqdCoordinator) EmptyChannelDelayedStateToCluster ¶
func (ncoord *NsqdCoordinator) EmptyChannelDelayedStateToCluster(channel *nsqd.Channel) error
func (*NsqdCoordinator) FinishMessageToCluster ¶
func (*NsqdCoordinator) GetAllLookupdNodes ¶
func (ncoord *NsqdCoordinator) GetAllLookupdNodes() ([]NsqLookupdNodeInfo, error)
func (*NsqdCoordinator) GetCurrentLookupd ¶
func (ncoord *NsqdCoordinator) GetCurrentLookupd() NsqLookupdNodeInfo
func (*NsqdCoordinator) GetMasterTopicCoordData ¶
func (ncoord *NsqdCoordinator) GetMasterTopicCoordData(topic string) (int, *coordData, error)
since only one master is allowed on the same topic, we can get it.
func (*NsqdCoordinator) GetMyID ¶
func (ncoord *NsqdCoordinator) GetMyID() string
func (*NsqdCoordinator) GreedyCleanTopicOldData ¶
func (ncoord *NsqdCoordinator) GreedyCleanTopicOldData(localTopic *nsqd.Topic) error
func (*NsqdCoordinator) IsMineConsumeLeaderForTopic ¶
func (ncoord *NsqdCoordinator) IsMineConsumeLeaderForTopic(topic string, part int) bool
func (*NsqdCoordinator) IsMineLeaderForTopic ¶
func (ncoord *NsqdCoordinator) IsMineLeaderForTopic(topic string, part int) bool
func (*NsqdCoordinator) PutDelayedMessageToCluster ¶
func (ncoord *NsqdCoordinator) PutDelayedMessageToCluster(topic *nsqd.Topic, msg *nsqd.Message) (nsqd.MessageID, nsqd.BackendOffset, int32, nsqd.BackendQueueEnd, error)
func (*NsqdCoordinator) PutMessageBodyToCluster ¶
func (ncoord *NsqdCoordinator) PutMessageBodyToCluster(topic *nsqd.Topic, body []byte, traceID uint64) (nsqd.MessageID, nsqd.BackendOffset, int32, nsqd.BackendQueueEnd, error)
func (*NsqdCoordinator) PutMessageToCluster ¶
func (ncoord *NsqdCoordinator) PutMessageToCluster(topic *nsqd.Topic, msg *nsqd.Message) (nsqd.MessageID, nsqd.BackendOffset, int32, nsqd.BackendQueueEnd, error)
func (*NsqdCoordinator) PutMessagesToCluster ¶
func (*NsqdCoordinator) SearchLogByMsgCnt ¶
func (ncoord *NsqdCoordinator) SearchLogByMsgCnt(topic string, part int, count int64) (*CommitLogData, int64, int64, error)
func (*NsqdCoordinator) SearchLogByMsgID ¶
func (ncoord *NsqdCoordinator) SearchLogByMsgID(topic string, part int, msgID int64) (*CommitLogData, int64, int64, error)
func (*NsqdCoordinator) SearchLogByMsgOffset ¶
func (ncoord *NsqdCoordinator) SearchLogByMsgOffset(topic string, part int, offset int64) (*CommitLogData, int64, int64, error)
search commitlog for message given by message disk data offset
func (*NsqdCoordinator) SearchLogByMsgTimestamp ¶
func (ncoord *NsqdCoordinator) SearchLogByMsgTimestamp(topic string, part int, ts_sec int64) (*CommitLogData, int64, int64, error)
return the searched log data and the exact offset the reader should be reset and the total count before the offset.
func (*NsqdCoordinator) SetChannelConsumeOffsetToCluster ¶
func (*NsqdCoordinator) SetLeadershipMgr ¶
func (ncoord *NsqdCoordinator) SetLeadershipMgr(l NSQDLeadership)
func (*NsqdCoordinator) Start ¶
func (ncoord *NsqdCoordinator) Start() error
func (*NsqdCoordinator) Stats ¶
func (ncoord *NsqdCoordinator) Stats(topic string, part int) *CoordStats
func (*NsqdCoordinator) Stop ¶
func (ncoord *NsqdCoordinator) Stop()
func (*NsqdCoordinator) SyncTopicChannels ¶
func (ncoord *NsqdCoordinator) SyncTopicChannels(topicName string, part int) error
func (*NsqdCoordinator) TryFixLocalTopic ¶
func (ncoord *NsqdCoordinator) TryFixLocalTopic(topic string, pid int) error
type NsqdEtcdMgr ¶
func NewNsqdEtcdMgr ¶
func NewNsqdEtcdMgr(host, username, pwd string) (*NsqdEtcdMgr, error)
func (*NsqdEtcdMgr) AcquireTopicLeader ¶
func (nem *NsqdEtcdMgr) AcquireTopicLeader(topic string, partition int, nodeData *NsqdNodeInfo, epoch EpochType) error
func (*NsqdEtcdMgr) GetAllLookupdNodes ¶
func (nem *NsqdEtcdMgr) GetAllLookupdNodes() ([]NsqLookupdNodeInfo, error)
func (*NsqdEtcdMgr) GetTopicInfo ¶
func (nem *NsqdEtcdMgr) GetTopicInfo(topic string, partition int) (*TopicPartitionMetaInfo, error)
func (*NsqdEtcdMgr) GetTopicLeaderSession ¶
func (nem *NsqdEtcdMgr) GetTopicLeaderSession(topic string, partition int) (*TopicLeaderSession, error)
func (*NsqdEtcdMgr) InitClusterID ¶
func (nem *NsqdEtcdMgr) InitClusterID(id string)
func (*NsqdEtcdMgr) IsTopicRealDeleted ¶
func (nem *NsqdEtcdMgr) IsTopicRealDeleted(topic string) (bool, error)
func (*NsqdEtcdMgr) RegisterNsqd ¶
func (nem *NsqdEtcdMgr) RegisterNsqd(nodeData *NsqdNodeInfo) error
func (*NsqdEtcdMgr) ReleaseTopicLeader ¶
func (nem *NsqdEtcdMgr) ReleaseTopicLeader(topic string, partition int, session *TopicLeaderSession) error
func (*NsqdEtcdMgr) UnregisterNsqd ¶
func (nem *NsqdEtcdMgr) UnregisterNsqd(nodeData *NsqdNodeInfo) error
func (*NsqdEtcdMgr) WatchLookupdLeader ¶
func (nem *NsqdEtcdMgr) WatchLookupdLeader(leader chan *NsqLookupdNodeInfo, stop chan struct{}) error
type NsqdNodeInfo ¶
func (*NsqdNodeInfo) GetID ¶
func (self *NsqdNodeInfo) GetID() string
type NsqdNodeLoadFactor ¶
type NsqdNodeLoadFactor struct {
// contains filtered or unexported fields
}
type NsqdRpcClient ¶
func NewNsqdRpcClient ¶
func NewNsqdRpcClient(addr string, timeout time.Duration) (*NsqdRpcClient, error)
func (*NsqdRpcClient) CallFast ¶
func (nrpc *NsqdRpcClient) CallFast(method string, arg interface{}) (interface{}, error)
func (*NsqdRpcClient) CallRpcTest ¶
func (nrpc *NsqdRpcClient) CallRpcTest(data string) (string, *CoordErr)
func (*NsqdRpcClient) CallRpcTestCoordErr ¶
func (nrpc *NsqdRpcClient) CallRpcTestCoordErr(data string) *CoordErr
func (*NsqdRpcClient) CallRpcTesttimeout ¶
func (nrpc *NsqdRpcClient) CallRpcTesttimeout(data string) error
func (*NsqdRpcClient) CallWithRetry ¶
func (nrpc *NsqdRpcClient) CallWithRetry(method string, arg interface{}) (interface{}, error)
func (*NsqdRpcClient) Close ¶
func (nrpc *NsqdRpcClient) Close()
func (*NsqdRpcClient) DeleteChannel ¶
func (nrpc *NsqdRpcClient) DeleteChannel(leaderSession *TopicLeaderSession, info *TopicPartitionMetaInfo, channel string) *CoordErr
func (*NsqdRpcClient) DeleteNsqdTopic ¶
func (nrpc *NsqdRpcClient) DeleteNsqdTopic(epoch EpochType, topicInfo *TopicPartitionMetaInfo) *CoordErr
func (*NsqdRpcClient) DisableTopicWrite ¶
func (nrpc *NsqdRpcClient) DisableTopicWrite(epoch EpochType, topicInfo *TopicPartitionMetaInfo) *CoordErr
func (*NsqdRpcClient) DisableTopicWriteFast ¶
func (nrpc *NsqdRpcClient) DisableTopicWriteFast(epoch EpochType, topicInfo *TopicPartitionMetaInfo) *CoordErr
func (*NsqdRpcClient) EnableTopicWrite ¶
func (nrpc *NsqdRpcClient) EnableTopicWrite(epoch EpochType, topicInfo *TopicPartitionMetaInfo) *CoordErr
func (*NsqdRpcClient) GetBackupedDelayedQueue ¶
func (*NsqdRpcClient) GetCommitLogFromOffset ¶
func (nrpc *NsqdRpcClient) GetCommitLogFromOffset(topicInfo *TopicPartitionMetaInfo, logCountNumIndex int64, logIndex int64, offset int64, fromDelayedQueue bool) (bool, int64, int64, int64, CommitLogData, *CoordErr)
func (*NsqdRpcClient) GetFullSyncInfo ¶
func (nrpc *NsqdRpcClient) GetFullSyncInfo(topic string, partition int, fromDelayed bool) (*LogStartInfo, *CommitLogData, error)
func (*NsqdRpcClient) GetLastCommitLogID ¶
func (nrpc *NsqdRpcClient) GetLastCommitLogID(topicInfo *TopicPartitionMetaInfo) (int64, *CoordErr)
func (*NsqdRpcClient) GetLastDelayedQueueCommitLogID ¶
func (nrpc *NsqdRpcClient) GetLastDelayedQueueCommitLogID(topicInfo *TopicPartitionMetaInfo) (int64, *CoordErr)
func (*NsqdRpcClient) GetNodeInfo ¶
func (nrpc *NsqdRpcClient) GetNodeInfo(nid string) (*NsqdNodeInfo, error)
func (*NsqdRpcClient) GetTopicStats ¶
func (nrpc *NsqdRpcClient) GetTopicStats(topic string) (*NodeTopicStats, error)
func (*NsqdRpcClient) IsTopicWriteDisabled ¶
func (nrpc *NsqdRpcClient) IsTopicWriteDisabled(topicInfo *TopicPartitionMetaInfo) bool
func (*NsqdRpcClient) NotifyAcquireTopicLeader ¶
func (nrpc *NsqdRpcClient) NotifyAcquireTopicLeader(epoch EpochType, topicInfo *TopicPartitionMetaInfo) *CoordErr
func (*NsqdRpcClient) NotifyChannelList ¶
func (nrpc *NsqdRpcClient) NotifyChannelList(leaderSession *TopicLeaderSession, info *TopicPartitionMetaInfo, chList []string) *CoordErr
func (*NsqdRpcClient) NotifyReleaseTopicLeader ¶
func (nrpc *NsqdRpcClient) NotifyReleaseTopicLeader(epoch EpochType, topicInfo *TopicPartitionMetaInfo, leaderSessionEpoch EpochType, leaderSession string) *CoordErr
func (*NsqdRpcClient) NotifyTopicLeaderSession ¶
func (nrpc *NsqdRpcClient) NotifyTopicLeaderSession(epoch EpochType, topicInfo *TopicPartitionMetaInfo, leaderSession *TopicLeaderSession, joinSession string) *CoordErr
func (*NsqdRpcClient) NotifyUpdateChannelOffset ¶
func (nrpc *NsqdRpcClient) NotifyUpdateChannelOffset(leaderSession *TopicLeaderSession, info *TopicPartitionMetaInfo, channel string, offset ChannelConsumerOffset) *CoordErr
func (*NsqdRpcClient) PullCommitLogsAndData ¶
func (*NsqdRpcClient) PutDelayedMessage ¶
func (nrpc *NsqdRpcClient) PutDelayedMessage(leaderSession *TopicLeaderSession, info *TopicPartitionMetaInfo, log CommitLogData, message *nsqd.Message) *CoordErr
func (*NsqdRpcClient) PutMessage ¶
func (nrpc *NsqdRpcClient) PutMessage(leaderSession *TopicLeaderSession, info *TopicPartitionMetaInfo, log CommitLogData, message *nsqd.Message) *CoordErr
func (*NsqdRpcClient) PutMessages ¶
func (nrpc *NsqdRpcClient) PutMessages(leaderSession *TopicLeaderSession, info *TopicPartitionMetaInfo, log CommitLogData, messages []*nsqd.Message) *CoordErr
func (*NsqdRpcClient) Reconnect ¶
func (nrpc *NsqdRpcClient) Reconnect() error
func (*NsqdRpcClient) ShouldRemoved ¶
func (nrpc *NsqdRpcClient) ShouldRemoved() bool
func (*NsqdRpcClient) TriggerLookupChanged ¶
func (nrpc *NsqdRpcClient) TriggerLookupChanged() error
func (*NsqdRpcClient) UpdateChannelList ¶
func (nrpc *NsqdRpcClient) UpdateChannelList(leaderSession *TopicLeaderSession, info *TopicPartitionMetaInfo, chList []string) *CoordErr
func (*NsqdRpcClient) UpdateChannelOffset ¶
func (nrpc *NsqdRpcClient) UpdateChannelOffset(leaderSession *TopicLeaderSession, info *TopicPartitionMetaInfo, channel string, offset ChannelConsumerOffset) *CoordErr
func (*NsqdRpcClient) UpdateChannelState ¶
func (nrpc *NsqdRpcClient) UpdateChannelState(leaderSession *TopicLeaderSession, info *TopicPartitionMetaInfo, channel string, paused int, skipped int, zanTestSkipped int) *CoordErr
func (*NsqdRpcClient) UpdateDelayedQueueState ¶
func (nrpc *NsqdRpcClient) UpdateDelayedQueueState(leaderSession *TopicLeaderSession, info *TopicPartitionMetaInfo, ch string, ts int64, cursorList [][]byte, cntList map[int]uint64, channelCntList map[string]uint64, wait bool) *CoordErr
func (*NsqdRpcClient) UpdateTopicInfo ¶
func (nrpc *NsqdRpcClient) UpdateTopicInfo(epoch EpochType, topicInfo *TopicPartitionMetaInfo) *CoordErr
type RpcAcquireTopicLeaderReq ¶
type RpcAcquireTopicLeaderReq struct { RpcTopicData LeaderNodeID string LookupdEpoch EpochType }
type RpcAdminTopicInfo ¶
type RpcAdminTopicInfo struct { TopicPartitionMetaInfo LookupdEpoch EpochType DisableWrite bool }
type RpcChannelListArg ¶
type RpcChannelListArg struct { RpcTopicData ChannelList []string }
type RpcChannelOffsetArg ¶
type RpcChannelOffsetArg struct { RpcTopicData Channel string // position file + file offset ChannelOffset ChannelConsumerOffset }
type RpcChannelState ¶
type RpcChannelState struct { RpcTopicData Channel string Paused int Skipped int ZanTestSkipped int }
type RpcCommitLogReq ¶
type RpcCommitLogReq struct { RpcTopicData LogOffset int64 LogStartIndex int64 LogCountNumIndex int64 UseCountIndex bool }
type RpcCommitLogRsp ¶
type RpcConfirmedDelayedCursor ¶
type RpcFailedInfo ¶
type RpcFailedInfo struct {
// contains filtered or unexported fields
}
type RpcGetBackupedDQReq ¶
type RpcGetBackupedDQReq struct {
RpcTopicData
}
type RpcGetFullSyncInfoReq ¶
type RpcGetFullSyncInfoReq struct {
RpcTopicData
}
type RpcGetFullSyncInfoRsp ¶
type RpcGetFullSyncInfoRsp struct { FirstLogData CommitLogData StartInfo LogStartInfo }
type RpcLookupReqBase ¶
type RpcNodeInfoRsp ¶
type RpcPullCommitLogsReq ¶
type RpcPullCommitLogsRsp ¶
type RpcPullCommitLogsRsp struct { Logs []CommitLogData DataList [][]byte }
type RpcPutMessage ¶
type RpcPutMessage struct { RpcTopicData LogData CommitLogData TopicMessage *nsqd.Message TopicRawMessage []byte }
type RpcPutMessages ¶
type RpcPutMessages struct { RpcTopicData LogData CommitLogData TopicMessages []*nsqd.Message // raw message data will include the size header TopicRawMessage []byte }
type RpcReadyForISR ¶
type RpcReadyForISR struct { RpcLookupReqBase LeaderSession TopicLeaderSession JoinISRSession string }
type RpcReleaseTopicLeaderReq ¶
type RpcReleaseTopicLeaderReq struct { RpcTopicData LeaderNodeID string LookupdEpoch EpochType }
type RpcReqCheckTopic ¶
type RpcReqCheckTopic struct {
RpcLookupReqBase
}
type RpcReqJoinCatchup ¶
type RpcReqJoinCatchup struct {
RpcLookupReqBase
}
type RpcReqJoinISR ¶
type RpcReqJoinISR struct {
RpcLookupReqBase
}
type RpcReqLeaveFromISR ¶
type RpcReqLeaveFromISR struct {
RpcLookupReqBase
}
type RpcReqLeaveFromISRByLeader ¶
type RpcReqLeaveFromISRByLeader struct { RpcLookupReqBase LeaderSession TopicLeaderSession }
type RpcReqNewTopicInfo ¶
type RpcReqNewTopicInfo struct {
RpcLookupReqBase
}
type RpcTopicData ¶
type RpcTopicLeaderSession ¶
type RpcTopicLeaderSession struct { RpcTopicData LeaderNode NsqdNodeInfo LookupdEpoch EpochType JoinSession string }
type SlaveAsyncWriteResult ¶
type SlaveAsyncWriteResult struct {
// contains filtered or unexported fields
}
func (*SlaveAsyncWriteResult) GetResult ¶
func (self *SlaveAsyncWriteResult) GetResult() *CoordErr
func (*SlaveAsyncWriteResult) Wait ¶
func (self *SlaveAsyncWriteResult) Wait()
type SortableStrings ¶
type SortableStrings []string
func (SortableStrings) Len ¶
func (s SortableStrings) Len() int
func (SortableStrings) Less ¶
func (s SortableStrings) Less(l, r int) bool
func (SortableStrings) Swap ¶
func (s SortableStrings) Swap(l, r int)
type StatsSorter ¶
type StatsSorter struct {
// contains filtered or unexported fields
}
func (*StatsSorter) Len ¶
func (s *StatsSorter) Len() int
func (*StatsSorter) Less ¶
func (s *StatsSorter) Less(i, j int) bool
func (*StatsSorter) Swap ¶
func (s *StatsSorter) Swap(i, j int)
type TopicCommitLogMgr ¶
func InitTopicCommitLogMgr ¶
func InitTopicCommitLogMgrWithFixMode ¶
func (*TopicCommitLogMgr) AppendCommitLog ¶
func (self *TopicCommitLogMgr) AppendCommitLog(l *CommitLogData, slave bool) error
func (*TopicCommitLogMgr) AppendCommitLogWithSync ¶
func (self *TopicCommitLogMgr) AppendCommitLogWithSync(l *CommitLogData, slave bool, useFsync bool) error
func (*TopicCommitLogMgr) CleanOldData ¶
func (self *TopicCommitLogMgr) CleanOldData(fileIndex int64, fileOffset int64) error
func (*TopicCommitLogMgr) Close ¶
func (self *TopicCommitLogMgr) Close()
func (*TopicCommitLogMgr) ConvertToCountIndex ¶
func (self *TopicCommitLogMgr) ConvertToCountIndex(start int64, offset int64) (int64, error)
func (*TopicCommitLogMgr) ConvertToOffsetIndex ¶
func (self *TopicCommitLogMgr) ConvertToOffsetIndex(countIndex int64) (int64, int64, error)
func (*TopicCommitLogMgr) Delete ¶
func (self *TopicCommitLogMgr) Delete()
func (*TopicCommitLogMgr) FlushCommitLogs ¶
func (self *TopicCommitLogMgr) FlushCommitLogs()
func (*TopicCommitLogMgr) GetCommitLogFromOffsetV2 ¶
func (self *TopicCommitLogMgr) GetCommitLogFromOffsetV2(start int64, offset int64) (*CommitLogData, error)
func (*TopicCommitLogMgr) GetCommitLogsV2 ¶
func (self *TopicCommitLogMgr) GetCommitLogsV2(startIndex int64, startOffset int64, num int) ([]CommitLogData, error)
func (*TopicCommitLogMgr) GetCurrentEnd ¶
func (self *TopicCommitLogMgr) GetCurrentEnd() (int64, int64)
func (*TopicCommitLogMgr) GetCurrentStart ¶
func (self *TopicCommitLogMgr) GetCurrentStart() int64
func (*TopicCommitLogMgr) GetLastCommitLogDataOnSegment ¶
func (self *TopicCommitLogMgr) GetLastCommitLogDataOnSegment(index int64) (int64, *CommitLogData, error)
func (*TopicCommitLogMgr) GetLastCommitLogID ¶
func (self *TopicCommitLogMgr) GetLastCommitLogID() int64
func (*TopicCommitLogMgr) GetLastCommitLogOffsetV2 ¶
func (self *TopicCommitLogMgr) GetLastCommitLogOffsetV2() (int64, int64, *CommitLogData, error)
this will get the log data of last commit
func (*TopicCommitLogMgr) GetLogStartInfo ¶
func (self *TopicCommitLogMgr) GetLogStartInfo() (*LogStartInfo, *CommitLogData, error)
func (*TopicCommitLogMgr) IsCommitted ¶
func (self *TopicCommitLogMgr) IsCommitted(id int64) bool
func (*TopicCommitLogMgr) MoveTo ¶
func (self *TopicCommitLogMgr) MoveTo(newBase string) error
func (*TopicCommitLogMgr) NextID ¶
func (self *TopicCommitLogMgr) NextID() uint64
func (*TopicCommitLogMgr) Reopen ¶
func (self *TopicCommitLogMgr) Reopen() error
func (*TopicCommitLogMgr) ResetLogWithStart ¶
func (self *TopicCommitLogMgr) ResetLogWithStart(newStart LogStartInfo) error
func (*TopicCommitLogMgr) SearchLogDataByComparator ¶
func (self *TopicCommitLogMgr) SearchLogDataByComparator(comp ICommitLogComparator) (int64, int64, *CommitLogData, error)
func (*TopicCommitLogMgr) SearchLogDataByMsgCnt ¶
func (self *TopicCommitLogMgr) SearchLogDataByMsgCnt(searchCnt int64) (int64, int64, *CommitLogData, error)
func (*TopicCommitLogMgr) SearchLogDataByMsgID ¶
func (self *TopicCommitLogMgr) SearchLogDataByMsgID(searchMsgID int64) (int64, int64, *CommitLogData, error)
func (*TopicCommitLogMgr) SearchLogDataByMsgOffset ¶
func (self *TopicCommitLogMgr) SearchLogDataByMsgOffset(searchMsgOffset int64) (int64, int64, *CommitLogData, error)
func (*TopicCommitLogMgr) TruncateToOffsetV2 ¶
func (self *TopicCommitLogMgr) TruncateToOffsetV2(startIndex int64, offset int64) (*CommitLogData, error)
type TopicCoordStat ¶
type TopicCoordStat struct { Node string `json:"node"` Name string `json:"name"` Partition int `json:"partition"` ISRStats []ISRStat `json:"isr_stats"` CatchupStats []CatchupStat `json:"catchup_stats"` }
type TopicCoordinator ¶
type TopicCoordinator struct {
// contains filtered or unexported fields
}
func NewTopicCoordinator ¶
func NewTopicCoordinatorWithFixMode ¶
func (*TopicCoordinator) DeleteNoWriteLock ¶
func (tc *TopicCoordinator) DeleteNoWriteLock(removeData bool)
func (*TopicCoordinator) DeleteWithLock ¶
func (tc *TopicCoordinator) DeleteWithLock(removeData bool)
func (*TopicCoordinator) DisableWrite ¶
func (tc *TopicCoordinator) DisableWrite(disable bool)
func (*TopicCoordinator) Exiting ¶
func (tc *TopicCoordinator) Exiting()
func (TopicCoordinator) GetCopy ¶
func (cd TopicCoordinator) GetCopy() *coordData
func (*TopicCoordinator) GetData ¶
func (tc *TopicCoordinator) GetData() *coordData
func (*TopicCoordinator) GetDelayedQueueLogMgr ¶
func (tc *TopicCoordinator) GetDelayedQueueLogMgr() (*TopicCommitLogMgr, error)
func (TopicCoordinator) GetLeaderSessionEpoch ¶
func (cd TopicCoordinator) GetLeaderSessionEpoch() EpochType
func (TopicCoordinator) GetTopicEpochForWrite ¶
func (cd TopicCoordinator) GetTopicEpochForWrite() EpochType
func (*TopicCoordinator) IsExiting ¶
func (tc *TopicCoordinator) IsExiting() bool
func (TopicCoordinator) IsISRReadyForWrite ¶
func (TopicCoordinator) IsMineLeaderSessionReady ¶
func (*TopicCoordinator) IsWriteDisabled ¶
func (tc *TopicCoordinator) IsWriteDisabled() bool
type TopicLeaderSession ¶
type TopicLeaderSession struct { Topic string Partition int LeaderNode *NsqdNodeInfo Session string LeaderEpoch EpochType }
func (*TopicLeaderSession) IsSame ¶
func (self *TopicLeaderSession) IsSame(other *TopicLeaderSession) bool
type TopicMetaInfo ¶
type TopicMetaInfo struct { PartitionNum int Replica int // the suggest load factor for each topic partition. SuggestLF int // other options SyncEvery int // to verify the data of the create -> delete -> create with same topic MagicCode int64 // the retention days for the data RetentionDay int32 // used for ordered multi partition topic // allow multi partitions on the same node OrderedMulti bool //used for message ext Ext bool }
type TopicPartitionID ¶
func (*TopicPartitionID) String ¶
func (ncoord *TopicPartitionID) String() string
type TopicPartitionMetaInfo ¶
type TopicPartitionMetaInfo struct { Name string Partition int TopicMetaInfo TopicPartitionReplicaInfo }
func (*TopicPartitionMetaInfo) Copy ¶
func (self *TopicPartitionMetaInfo) Copy() *TopicPartitionMetaInfo
func (*TopicPartitionMetaInfo) GetTopicDesp ¶
func (self *TopicPartitionMetaInfo) GetTopicDesp() string
type TopicPartitionReplicaInfo ¶
type TopicPartitionReplicaInfo struct { Leader string ISR []string CatchupList []string Channels []string // this is only used for write operation // if this changed during write, mean the current write should be abort EpochForWrite EpochType Epoch EpochType }
func (*TopicPartitionReplicaInfo) Copy ¶
func (self *TopicPartitionReplicaInfo) Copy() *TopicPartitionReplicaInfo
type WrapChannelConsumerOffset ¶
type WrapChannelConsumerOffset struct { Name string ChannelConsumerOffset }
Source Files ¶
- commitlog.go
- common.go
- coord_grpc_server.go
- coordinator_rpc.go
- coordinator_stats.go
- data_placement_mgr.go
- etcd_client.go
- etcd_master_lock.go
- etcd_utils.go
- leadership.go
- logger.go
- nsq_lookupd_etcd.go
- nsqd_coordinator.go
- nsqd_coordinator_cluster_write.go
- nsqd_coordinator_rpc_helper.go
- nsqd_node_etcd.go
- nsqd_rpc_client.go
- nsqlookup_coord_api.go
- nsqlookup_coord_rpc.go
- nsqlookup_coord_rpc_helper.go
- nsqlookup_coordinator.go
- nsqlookup_rpc_client.go
- struct.go
- topic_coordinator.go