consistence

package
v0.3.7-HA.1.9.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 5, 2019 License: MIT Imports: 34 Imported by: 0

Documentation

Overview

description: Utility to perform master election/failover using etcd.

Index

Constants

View Source
const (
	ErrFailedOnNotLeader   = "E_FAILED_ON_NOT_LEADER"
	ErrFailedOnNotWritable = "E_FAILED_ON_NOT_WRITABLE"
)
View Source
const (
	RATIO_BETWEEN_LEADER_FOLLOWER = 0.7

	HIGHEST_PUB_QPS_LEVEL        = 100
	HIGHEST_LEFT_CONSUME_MB_SIZE = 50 * 1024
	HIGHEST_LEFT_DATA_MB_SIZE    = 200 * 1024
)
View Source
const (
	ErrCodeEtcdNotReachable    = 501
	ErrCodeUnhandledHTTPStatus = 502
)
View Source
const (
	NoClusterWriteDisable          = 0
	ClusterWriteDisabledForOrdered = 1
	ClusterWriteDisabledForAll     = 2
)
View Source
const (
	EVENT_WATCH_TOPIC_L_CREATE = iota
	EVENT_WATCH_TOPIC_L_DELETE
)
View Source
const (
	MAX_WRITE_RETRY             = 10
	MAX_CATCHUP_RETRY           = 5
	MAX_LOG_PULL                = 10000
	MAX_LOG_PULL_BYTES          = 1024 * 1024 * 32
	MAX_CATCHUP_RUNNING         = 8
	API_BACKUP_DELAYED_QUEUE_DB = "/delayqueue/backupto"
)
View Source
const (
	RPC_TIMEOUT            = time.Duration(time.Second * 5)
	RPC_TIMEOUT_SHORT      = time.Duration(time.Second * 3)
	RPC_TIMEOUT_FOR_LOOKUP = time.Duration(time.Second * 3)
)
View Source
const (
	MAX_PARTITION_NUM  = 255
	MAX_SYNC_EVERY     = 4000
	MAX_RETENTION_DAYS = 60
)
View Source
const (
	NSQ_TOPIC_DIR              = "Topics"
	NSQ_TOPIC_META             = "TopicMeta"
	NSQ_TOPIC_REPLICA_INFO     = "ReplicaInfo"
	NSQ_TOPIC_LEADER_SESSION   = "LeaderSession"
	NSQ_NODE_DIR               = "NsqdNodes"
	NSQ_LOOKUPD_DIR            = "NsqlookupdInfo"
	NSQ_LOOKUPD_NODE_DIR       = "NsqlookupdNodes"
	NSQ_LOOKUPD_LEADER_SESSION = "LookupdLeaderSession"
)
View Source
const (
	ETCD_LOCK_NSQ_NAMESPACE = "nsq"
)
View Source
const (
	ETCD_TTL = 30
)
View Source
const (
	MAX_INCR_ID_BIT = 50
)
View Source
const (
	RETRY_SLEEP = 200
)

Variables

View Source
var (
	ErrCommitLogWrongID              = errors.New("commit log id is wrong")
	ErrCommitLogWrongLastID          = errors.New("commit log last id should no less than log id")
	ErrCommitLogIDNotFound           = errors.New("commit log id is not found")
	ErrCommitLogSearchNotFound       = errors.New("search commit log data not found")
	ErrCommitLogOutofBound           = errors.New("commit log offset is out of bound")
	ErrCommitLogEOF                  = errors.New("commit log end of file")
	ErrCommitLogOffsetInvalid        = errors.New("commit log offset is invalid")
	ErrCommitLogPartitionExceed      = errors.New("commit log partition id is exceeded")
	ErrCommitLogSearchFailed         = errors.New("commit log data search failed")
	ErrCommitLogSegmentSizeInvalid   = errors.New("commit log segment size is invalid")
	ErrCommitLogLessThanSegmentStart = errors.New("commit log read index is less than segment start")
	ErrCommitLogCleanKeepMin         = errors.New("commit log clean should keep some data")
)
View Source
var (
	ErrTopicInfoNotFound = NewCoordErr("topic info not found", CoordClusterErr)

	ErrNotTopicLeader                     = NewCoordErrWithCode("not topic leader", CoordClusterNoRetryWriteErr, RpcErrNotTopicLeader)
	ErrEpochMismatch                      = NewCoordErrWithCode("commit epoch not match", CoordElectionTmpErr, RpcErrEpochMismatch)
	ErrEpochLessThanCurrent               = NewCoordErrWithCode("epoch should be increased", CoordElectionErr, RpcErrEpochLessThanCurrent)
	ErrWriteQuorumFailed                  = NewCoordErrWithCode("write to quorum failed.", CoordElectionTmpErr, RpcErrWriteQuorumFailed)
	ErrCommitLogIDDup                     = NewCoordErrWithCode("commit id duplicated", CoordElectionErr, RpcErrCommitLogIDDup)
	ErrMissingTopicLeaderSession          = NewCoordErrWithCode("missing topic leader session", CoordElectionErr, RpcErrMissingTopicLeaderSession)
	ErrLeaderSessionMismatch              = NewCoordErrWithCode("leader session mismatch", CoordElectionTmpErr, RpcErrLeaderSessionMismatch)
	ErrWriteDisabled                      = NewCoordErrWithCode("write is disabled on the topic", CoordElectionErr, RpcErrWriteDisabled)
	ErrLeavingISRWait                     = NewCoordErrWithCode("leaving isr need wait.", CoordElectionTmpErr, RpcErrLeavingISRWait)
	ErrTopicCoordExistingAndMismatch      = NewCoordErrWithCode("topic coordinator existing with a different partition", CoordClusterErr, RpcErrTopicCoordExistingAndMismatch)
	ErrTopicCoordTmpConflicted            = NewCoordErrWithCode("topic coordinator is conflicted temporally", CoordClusterErr, RpcErrTopicCoordConflicted)
	ErrTopicLeaderChanged                 = NewCoordErrWithCode("topic leader changed", CoordElectionTmpErr, RpcErrTopicLeaderChanged)
	ErrTopicCommitLogEOF                  = NewCoordErrWithCode(ErrCommitLogEOF.Error(), CoordCommonErr, RpcErrCommitLogEOF)
	ErrTopicCommitLogOutofBound           = NewCoordErrWithCode(ErrCommitLogOutofBound.Error(), CoordCommonErr, RpcErrCommitLogOutofBound)
	ErrTopicCommitLogLessThanSegmentStart = NewCoordErrWithCode(ErrCommitLogLessThanSegmentStart.Error(), CoordCommonErr, RpcErrCommitLogLessThanSegmentStart)
	ErrTopicCommitLogNotConsistent        = NewCoordErrWithCode("topic commit log is not consistent", CoordClusterErr, RpcCommonErr)
	ErrMissingTopicCoord                  = NewCoordErrWithCode("missing topic coordinator", CoordClusterErr, RpcErrMissingTopicCoord)
	ErrTopicLoading                       = NewCoordErrWithCode("topic is still loading data", CoordLocalTmpErr, RpcErrTopicLoading)
	ErrTopicExiting                       = NewCoordErr("topic coordinator is exiting", CoordLocalTmpErr)
	ErrTopicExitingOnSlave                = NewCoordErr("topic coordinator is exiting on slave", CoordTmpErr)
	ErrTopicCoordStateInvalid             = NewCoordErrWithCode("invalid coordinator state", CoordClusterErr, RpcErrTopicCoordStateInvalid)
	ErrTopicSlaveInvalid                  = NewCoordErrWithCode("topic slave has some invalid state", CoordSlaveErr, RpcErrSlaveStateInvalid)
	ErrTopicLeaderSessionInvalid          = NewCoordErrWithCode("topic leader session is invalid", CoordElectionTmpErr, RpcCommonErr)
	ErrTopicWriteOnNonISR                 = NewCoordErrWithCode("topic write on a node not in ISR", CoordTmpErr, RpcErrWriteOnNonISR)
	ErrTopicISRNotEnough                  = NewCoordErrWithCode("topic isr nodes not enough", CoordTmpErr, RpcCommonErr)
	ErrClusterChanged                     = NewCoordErrWithCode("cluster changed ", CoordTmpErr, RpcNoErr)
	ErrTopicMissingDelayedLog             = NewCoordErrWithCode("topic missing delayed queue log", CoordLocalErr, RpcNoErr)

	ErrRpcMethodUnknown           = NewCoordErrWithCode("rpc method unknown", CoordClusterErr, RpcCommonErr)
	ErrPubArgError                = NewCoordErr("pub argument error", CoordCommonErr)
	ErrTopicNotRelated            = NewCoordErr("topic not related to me", CoordCommonErr)
	ErrTopicCatchupAlreadyRunning = NewCoordErr("topic is already running catchup", CoordCommonErr)
	ErrTopicArgError              = NewCoordErr("topic argument error", CoordCommonErr)
	ErrOperationExpired           = NewCoordErr("operation has expired since wait too long", CoordCommonErr)
	ErrCatchupRunningBusy         = NewCoordErr("too much running catchup", CoordCommonErr)

	ErrMissingTopicLog                     = NewCoordErr("missing topic log ", CoordLocalErr)
	ErrLocalTopicPartitionMismatch         = NewCoordErr("local topic partition not match", CoordLocalErr)
	ErrLocalFallBehind                     = NewCoordErr("local data fall behind", CoordElectionErr)
	ErrLocalForwardThanLeader              = NewCoordErr("local data is more than leader", CoordElectionErr)
	ErrLocalSetChannelOffsetNotFirstClient = NewCoordErr("failed to set channel offset since not first client", CoordLocalErr)
	ErrLocalMissingTopic                   = NewCoordErr("local topic missing", CoordLocalErr)
	ErrLocalNotReadyForWrite               = NewCoordErr("local topic is not ready for write.", CoordLocalErr)
	ErrLocalInitTopicFailed                = NewCoordErr("local topic init failed", CoordLocalErr)
	ErrLocalInitTopicCoordFailed           = NewCoordErr("topic coordinator init failed", CoordLocalErr)
	ErrLocalTopicDataCorrupt               = NewCoordErr("local topic data corrupt", CoordLocalErr)
	ErrLocalChannelPauseFailed             = NewCoordErr("local channel pause/unpause failed", CoordLocalErr)
	ErrLocalChannelSkipFailed              = NewCoordErr("local channel skip/unskip failed", CoordLocalErr)
	ErrLocalChannelSkipZanTestFailed       = NewCoordErr("local channel skip/unskip zan test failed", CoordLocalErr)
	ErrLocalDelayedQueueMissing            = NewCoordErr("local delayed queue is missing", CoordLocalErr)
)
View Source
var (
	ErrBalanceNodeUnavailable     = errors.New("can not find a node to be balanced")
	ErrNodeIsExcludedForTopicData = errors.New("destination node is excluded for topic")
	ErrClusterBalanceRunning      = errors.New("another balance is running, should wait")
)
View Source
var (
	ErrLeaderSessionAlreadyExist = errors.New("leader session already exist")
	ErrLeaderSessionNotExist     = errors.New("session not exist")
	ErrKeyAlreadyExist           = errors.New("Key already exist")
	ErrKeyNotFound               = errors.New("Key not found")
)
View Source
var (
	MaxRetryWait                = time.Second * 3
	ForceFixLeaderData          = false
	MaxTopicRetentionSizePerDay = int64(1024 * 1024 * 1024 * 16)
)
View Source
var (
	ErrAlreadyExist         = errors.New("already exist")
	ErrTopicNotCreated      = errors.New("topic is not created")
	ErrWaitingLeaderRelease = errors.New("leader session is still alive")
	ErrNotNsqLookupLeader   = errors.New("Not nsqlookup leader")
	ErrClusterUnstable      = errors.New("the cluster is unstable")

	ErrLeaderNodeLost           = NewCoordErr("leader node is lost", CoordTmpErr)
	ErrNodeNotFound             = NewCoordErr("node not found", CoordCommonErr)
	ErrLeaderElectionFail       = NewCoordErr("Leader election failed.", CoordElectionTmpErr)
	ErrNoLeaderCanBeElected     = NewCoordErr("No leader can be elected", CoordElectionTmpErr)
	ErrNodeUnavailable          = NewCoordErr("No node is available for topic", CoordTmpErr)
	ErrJoinISRInvalid           = NewCoordErr("Join ISR failed", CoordCommonErr)
	ErrJoinISRTimeout           = NewCoordErr("Join ISR timeout", CoordCommonErr)
	ErrWaitingJoinISR           = NewCoordErr("The topic is waiting node to join isr", CoordCommonErr)
	ErrLeaderSessionNotReleased = NewCoordErr("The topic leader session is not released", CoordElectionTmpErr)
	ErrTopicISRCatchupEnough    = NewCoordErr("the topic isr and catchup nodes are enough", CoordTmpErr)
	ErrClusterNodeRemoving      = NewCoordErr("the node is mark as removed", CoordTmpErr)
	ErrTopicNodeConflict        = NewCoordErr("the topic node info is conflicted", CoordElectionErr)
	ErrLeadershipServerUnstable = NewCoordErr("the leadership server is unstable", CoordTmpErr)
)
View Source
var DEFAULT_COMMIT_BUF_SIZE = 64
View Source
var LOGROTATE_NUM = 500000
View Source
var MAX_COMMIT_BUF_SIZE = 1024
View Source
var MIN_KEEP_LOG_ITEM = 1000
View Source
var NSQ_ROOT_DIR = "NSQMetaData"

this is default value

Functions

func AddCounter

func AddCounter(name string) uint32

func CheckKeyIfExist

func CheckKeyIfExist(err error) bool

func DisableClusterWrite

func DisableClusterWrite(disableType int)

func ExtractRpcAddrFromID

func ExtractRpcAddrFromID(nid string) string

func FilterList

func FilterList(l []string, filter []string) []string

func FindSlice

func FindSlice(in []string, e string) int

func FindSliceList

func FindSliceList(inList [][]string, e string) int

func GenNsqLookupNodeID

func GenNsqLookupNodeID(n *NsqLookupdNodeInfo, extra string) string

func GenNsqdNodeID

func GenNsqdNodeID(n *NsqdNodeInfo, extra string) string

func GetLogDataSize

func GetLogDataSize() int

func GetNextLogOffset

func GetNextLogOffset(cur int64) int64

func GetPartitionFromMsgID

func GetPartitionFromMsgID(id int64) int

func GetPrevLogOffset

func GetPrevLogOffset(cur int64) int64

func GetTopicFullName

func GetTopicFullName(n string, pid int) string

func GetTopicPartitionBasePath

func GetTopicPartitionBasePath(rootPath string, topic string, partition int) string

func GetTopicPartitionFileName

func GetTopicPartitionFileName(topic string, partition int, suffix string) string

func GetTopicPartitionLogPath

func GetTopicPartitionLogPath(basepath, t string, p int) string

func IncCounter

func IncCounter(id uint32)

func IncCounterBy

func IncCounterBy(id uint32, amount uint64)

func IsAllClusterWriteDisabled

func IsAllClusterWriteDisabled() bool

func IsClusterWriteDisabledForOrdered

func IsClusterWriteDisabledForOrdered() bool

func IsEtcdNodeExist

func IsEtcdNodeExist(err error) bool

func IsEtcdNotFile

func IsEtcdNotFile(err error) bool

func IsEtcdNotReachable

func IsEtcdNotReachable(err error) bool

func IsEtcdWatchExpired

func IsEtcdWatchExpired(err error) bool

func MergeList

func MergeList(l []string, r []string) []string

func NewNsqdCoordGRpcServer

func NewNsqdCoordGRpcServer(coord *NsqdCoordinator, rootPath string) *nsqdCoordGRpcServer

func RetryWithTimeout

func RetryWithTimeout(fn func() error) error

func SetCoordLogLevel

func SetCoordLogLevel(level int32)

func SetCoordLogger

func SetCoordLogger(log levellogger.Logger, level int32)

Types

type By

type By func(l, r *NodeTopicStats) bool

func (By) Sort

func (by By) Sort(statList []NodeTopicStats)

type CatchupStat

type CatchupStat struct {
	HostName string `json:"hostname"`
	NodeID   string `json:"node_id"`
	Progress int    `json:"progress"`
}

type ChannelConsumeMgr

type ChannelConsumeMgr struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func (*ChannelConsumeMgr) Clear

func (cc *ChannelConsumeMgr) Clear()

func (*ChannelConsumeMgr) Get

func (*ChannelConsumeMgr) GetSyncedChs

func (cc *ChannelConsumeMgr) GetSyncedChs() []string

func (*ChannelConsumeMgr) Update

func (cc *ChannelConsumeMgr) Update(ch string, cco ChannelConsumerOffset)

func (*ChannelConsumeMgr) UpdateSyncedChs

func (cc *ChannelConsumeMgr) UpdateSyncedChs(names []string)

type ChannelConsumerOffset

type ChannelConsumerOffset struct {
	VOffset             int64
	VCnt                int64
	Flush               bool
	AllowBackward       bool
	ConfirmedInterval   []nsqd.MsgQueueInterval
	NeedUpdateConfirmed bool
}

func (*ChannelConsumerOffset) IsSame

func (cco *ChannelConsumerOffset) IsSame(other *ChannelConsumerOffset) bool

type CntComparator

type CntComparator int64

func (CntComparator) GreatThanRightBoundary

func (self CntComparator) GreatThanRightBoundary(l *CommitLogData) bool

func (CntComparator) LessThanLeftBoundary

func (self CntComparator) LessThanLeftBoundary(l *CommitLogData) bool

func (CntComparator) SearchEndBoundary

func (self CntComparator) SearchEndBoundary() int64

type CommitLogData

type CommitLogData struct {
	LogID int64
	// epoch for the topic leader
	Epoch EpochType
	// if single message, this should be the same with logid
	// for multiple messages, this would be the logid for the last message in batch
	LastMsgLogID int64
	MsgOffset    int64
	// size for batch messages
	MsgSize int32
	// the total message count for all from begin, not only this batch
	MsgCnt int64
	// the message number for current commit
	MsgNum int32
}

type CommonCoordErr

type CommonCoordErr struct {
	CoordErr
}

func (*CommonCoordErr) Error

func (self *CommonCoordErr) Error() string

type ConsistentStore

type ConsistentStore interface {
	WriteKey(key, value string) error
	ReadKey(key string) (string, error)
	ListKey(key string) ([]string, error)
}

type CoordErr

type CoordErr struct {
	ErrMsg  string
	ErrCode ErrRPCRetCode
	ErrType CoordErrType
}

note: since the gorpc will treat error type as special, we should not implement the error interface for CoordErr as response type

func NewCoordErr

func NewCoordErr(msg string, etype CoordErrType) *CoordErr

func NewCoordErrWithCode

func NewCoordErrWithCode(msg string, etype CoordErrType, code ErrRPCRetCode) *CoordErr

func (*CoordErr) CanRetryWrite

func (self *CoordErr) CanRetryWrite(retryTimes int) bool

func (*CoordErr) HasError

func (self *CoordErr) HasError() bool

func (*CoordErr) IsEqual

func (self *CoordErr) IsEqual(other *CoordErr) bool

func (*CoordErr) IsLocalErr

func (self *CoordErr) IsLocalErr() bool

func (*CoordErr) IsNetErr

func (self *CoordErr) IsNetErr() bool

func (*CoordErr) String

func (self *CoordErr) String() string

func (*CoordErr) ToErrorType

func (self *CoordErr) ToErrorType() error

type CoordErrStats

type CoordErrStats struct {
	sync.Mutex
	CoordErrStatsData
}

func (*CoordErrStats) GetCopy

func (self *CoordErrStats) GetCopy() *CoordErrStatsData

type CoordErrStatsData

type CoordErrStatsData struct {
	WriteEpochError        int64
	WriteNotLeaderError    int64
	WriteQuorumError       int64
	WriteBusyError         int64
	RpcCheckFailed         int64
	LeadershipError        int64
	TopicCoordMissingError int64
	LocalErr               int64
	OtherCoordErrs         map[string]int64
}

type CoordErrType

type CoordErrType int
const (
	CoordNoErr CoordErrType = iota
	CoordCommonErr
	CoordNetErr
	CoordElectionErr
	CoordElectionTmpErr
	CoordClusterErr
	CoordSlaveErr
	CoordLocalErr
	CoordLocalTmpErr
	CoordTmpErr
	CoordClusterNoRetryWriteErr
)

type CoordStats

type CoordStats struct {
	RpcStats        *gorpc.ConnStats `json:"rpc_stats"`
	ErrStats        CoordErrStatsData
	TopicCoordStats []TopicCoordStat `json:"topic_coord_stats"`
}

type DataPlacement

type DataPlacement struct {
	// contains filtered or unexported fields
}

func NewDataPlacement

func NewDataPlacement(coord *NsqLookupCoordinator) *DataPlacement

func (*DataPlacement) DoBalance

func (dpm *DataPlacement) DoBalance(monitorChan chan struct{})

func (*DataPlacement) SetBalanceInterval

func (dpm *DataPlacement) SetBalanceInterval(start int, end int)

type EVENT_TYPE

type EVENT_TYPE int
const (
	MASTER_ADD EVENT_TYPE = iota
	MASTER_DELETE
	MASTER_MODIFY
	MASTER_ERROR
)

type EpochType

type EpochType int64

type ErrRPCRetCode

type ErrRPCRetCode int
const (
	RpcNoErr ErrRPCRetCode = iota
	RpcCommonErr
)
const (
	RpcErrLeavingISRWait ErrRPCRetCode = iota + 10
	RpcErrNotTopicLeader
	RpcErrNoLeader
	RpcErrEpochMismatch
	RpcErrEpochLessThanCurrent
	RpcErrWriteQuorumFailed
	RpcErrCommitLogIDDup
	RpcErrCommitLogEOF
	RpcErrCommitLogOutofBound
	RpcErrCommitLogLessThanSegmentStart
	RpcErrMissingTopicLeaderSession
	RpcErrLeaderSessionMismatch
	RpcErrWriteDisabled
	RpcErrTopicNotExist
	RpcErrMissingTopicCoord
	RpcErrTopicCoordExistingAndMismatch
	RpcErrTopicCoordConflicted
	RpcErrTopicLeaderChanged
	RpcErrTopicLoading
	RpcErrSlaveStateInvalid
	RpcErrTopicCoordStateInvalid
	RpcErrWriteOnNonISR
)

type EtcdClient

type EtcdClient struct {
	// contains filtered or unexported fields
}

func NewEClient

func NewEClient(host, userName, pwd string) (*EtcdClient, error)

func (*EtcdClient) CompareAndDelete

func (self *EtcdClient) CompareAndDelete(key string, prevValue string, prevIndex uint64) (*client.Response, error)

func (*EtcdClient) CompareAndSwap

func (self *EtcdClient) CompareAndSwap(key string, value string, ttl uint64, prevValue string, prevIndex uint64) (*client.Response, error)

func (*EtcdClient) Create

func (self *EtcdClient) Create(key string, value string, ttl uint64) (*client.Response, error)

func (*EtcdClient) CreateDir

func (self *EtcdClient) CreateDir(key string, ttl uint64) (*client.Response, error)

func (*EtcdClient) CreateInOrder

func (self *EtcdClient) CreateInOrder(dir string, value string, ttl uint64) (*client.Response, error)

func (*EtcdClient) Delete

func (self *EtcdClient) Delete(key string, recursive bool) (*client.Response, error)

func (*EtcdClient) Get

func (self *EtcdClient) Get(key string, sort, recursive bool) (*client.Response, error)

func (*EtcdClient) GetNewest

func (self *EtcdClient) GetNewest(key string, sort, recursive bool) (*client.Response, error)

func (*EtcdClient) Set

func (self *EtcdClient) Set(key string, value string, ttl uint64) (*client.Response, error)

func (*EtcdClient) SetWithTTL

func (self *EtcdClient) SetWithTTL(key string, ttl uint64) (*client.Response, error)

func (*EtcdClient) Update

func (self *EtcdClient) Update(key string, value string, ttl uint64) (*client.Response, error)

func (*EtcdClient) Watch

func (self *EtcdClient) Watch(key string, waitIndex uint64, recursive bool) client.Watcher

type EtcdLock

type EtcdLock struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func (*EtcdLock) GetEventsChan

func (self *EtcdLock) GetEventsChan() <-chan *MasterEvent

func (*EtcdLock) GetKey

func (self *EtcdLock) GetKey() string

func (*EtcdLock) GetMaster

func (self *EtcdLock) GetMaster() string

func (*EtcdLock) Start

func (self *EtcdLock) Start()

func (*EtcdLock) Stop

func (self *EtcdLock) Stop()

func (*EtcdLock) TryAcquire

func (self *EtcdLock) TryAcquire() (ret error)

type ICommitLogComparator

type ICommitLogComparator interface {
	SearchEndBoundary() int64
	LessThanLeftBoundary(l *CommitLogData) bool
	GreatThanRightBoundary(l *CommitLogData) bool
}

type ILocalLogQueue

type ILocalLogQueue interface {
	IsDataNeedFix() bool
	SetDataFixState(bool)
	ForceFlush()
	ResetBackendEndNoLock(nsqd.BackendOffset, int64) error
	ResetBackendWithQueueStartNoLock(int64, int64) error
	GetDiskQueueSnapshot() *nsqd.DiskQueueSnapshot
	TotalMessageCnt() uint64
	TotalDataSize() int64

	PutRawDataOnReplica(rawData []byte, offset nsqd.BackendOffset, checkSize int64, msgNum int32) (nsqd.BackendQueueEnd, error)
	PutMessageOnReplica(msgs *nsqd.Message, offset nsqd.BackendOffset, checkSize int64) (nsqd.BackendQueueEnd, error)
	TryCleanOldData(retentionSize int64, noRealClean bool, maxCleanOffset nsqd.BackendOffset) (nsqd.BackendQueueEnd, error)
}

type INsqlookupRemoteProxy

type INsqlookupRemoteProxy interface {
	RemoteAddr() string
	Reconnect() error
	Close()
	RequestJoinCatchup(topic string, partition int, nid string) *CoordErr
	RequestJoinTopicISR(topic string, partition int, nid string) *CoordErr
	ReadyForTopicISR(topic string, partition int, nid string, leaderSession *TopicLeaderSession, joinISRSession string) *CoordErr
	RequestLeaveFromISR(topic string, partition int, nid string) *CoordErr
	RequestLeaveFromISRByLeader(topic string, partition int, nid string, leaderSession *TopicLeaderSession) *CoordErr
	RequestNotifyNewTopicInfo(topic string, partition int, nid string)
	RequestCheckTopicConsistence(topic string, partition int)
}

func NewNsqLookupRpcClient

func NewNsqLookupRpcClient(addr string, timeout time.Duration) (INsqlookupRemoteProxy, error)

type ISRStat

type ISRStat struct {
	HostName string `json:"hostname"`
	NodeID   string `json:"node_id"`
}

type IntHeap

type IntHeap []int

An IntHeap is a min-heap of ints.

func (IntHeap) Len

func (h IntHeap) Len() int

func (IntHeap) Less

func (h IntHeap) Less(i, j int) bool

func (*IntHeap) Pop

func (h *IntHeap) Pop() interface{}

func (*IntHeap) Push

func (h *IntHeap) Push(x interface{})

func (IntHeap) Swap

func (h IntHeap) Swap(i, j int)

type JoinISRState

type JoinISRState struct {
	sync.Mutex
	// contains filtered or unexported fields
}

type LFListT

type LFListT []loadFactorInfo

func (LFListT) Len

func (s LFListT) Len() int

func (LFListT) Less

func (s LFListT) Less(i, j int) bool

func (LFListT) Swap

func (s LFListT) Swap(i, j int)

type LogStartInfo

type LogStartInfo struct {
	SegmentStartCount int64
	SegmentStartIndex int64
	// the start offset in the file number at start index
	// This can allow the commit log start at the middle of the segment file
	SegmentStartOffset int64
}

type Master

type Master interface {
	Start()
	Stop()
	GetEventsChan() <-chan *MasterEvent
	GetKey() string
	GetMaster() string
	TryAcquire() (ret error)
}

func NewMaster

func NewMaster(etcdClient *EtcdClient, name, value string, ttl uint64) Master

type MasterChanInfo

type MasterChanInfo struct {
	// contains filtered or unexported fields
}

type MasterEvent

type MasterEvent struct {
	Type          EVENT_TYPE
	Master        string
	ModifiedIndex uint64
}

type MsgIDComparator

type MsgIDComparator int64

func (MsgIDComparator) GreatThanRightBoundary

func (self MsgIDComparator) GreatThanRightBoundary(l *CommitLogData) bool

func (MsgIDComparator) LessThanLeftBoundary

func (self MsgIDComparator) LessThanLeftBoundary(l *CommitLogData) bool

func (MsgIDComparator) SearchEndBoundary

func (self MsgIDComparator) SearchEndBoundary() int64

type MsgOffsetComparator

type MsgOffsetComparator int64

func (MsgOffsetComparator) GreatThanRightBoundary

func (self MsgOffsetComparator) GreatThanRightBoundary(l *CommitLogData) bool

func (MsgOffsetComparator) LessThanLeftBoundary

func (self MsgOffsetComparator) LessThanLeftBoundary(l *CommitLogData) bool

func (MsgOffsetComparator) SearchEndBoundary

func (self MsgOffsetComparator) SearchEndBoundary() int64

type MsgTimestampComparator

type MsgTimestampComparator struct {
	// contains filtered or unexported fields
}

func (*MsgTimestampComparator) GreatThanRightBoundary

func (ncoord *MsgTimestampComparator) GreatThanRightBoundary(l *CommitLogData) bool

func (*MsgTimestampComparator) LessThanLeftBoundary

func (ncoord *MsgTimestampComparator) LessThanLeftBoundary(l *CommitLogData) bool

func (*MsgTimestampComparator) SearchEndBoundary

func (ncoord *MsgTimestampComparator) SearchEndBoundary() int64

type NSQDLeadership

type NSQDLeadership interface {
	InitClusterID(id string)
	RegisterNsqd(nodeData *NsqdNodeInfo) error // update
	UnregisterNsqd(nodeData *NsqdNodeInfo) error
	// try create the topic leadership key and no need to retry if the key already exist
	AcquireTopicLeader(topic string, partition int, nodeData *NsqdNodeInfo, epoch EpochType) error
	// release the session key using the acquired session. should check current session epoch
	// to avoid release the changed session
	ReleaseTopicLeader(topic string, partition int, session *TopicLeaderSession) error
	// all registered lookup nodes.
	GetAllLookupdNodes() ([]NsqLookupdNodeInfo, error)
	// get the newest lookup leader and watch the change of it.
	WatchLookupdLeader(leader chan *NsqLookupdNodeInfo, stop chan struct{}) error
	GetTopicInfo(topic string, partition int) (*TopicPartitionMetaInfo, error)
	// get leadership information, if not exist should return ErrLeaderSessionNotExist as error
	GetTopicLeaderSession(topic string, partition int) (*TopicLeaderSession, error)
	IsTopicRealDeleted(topic string) (bool, error)
}

type NSQLookupdLeadership

type NSQLookupdLeadership interface {
	InitClusterID(id string)
	Register(value *NsqLookupdNodeInfo) error
	Unregister(value *NsqLookupdNodeInfo) error
	Stop()
	// the cluster root modify index
	GetClusterEpoch() (EpochType, error)
	GetAllLookupdNodes() ([]NsqLookupdNodeInfo, error) // add
	AcquireAndWatchLeader(leader chan *NsqLookupdNodeInfo, stop chan struct{})
	CheckIfLeader(session string) bool
	UpdateLookupEpoch(oldGen EpochType) (EpochType, error)

	GetNsqdNodes() ([]NsqdNodeInfo, error)
	// watching the cluster nsqd node, should return the newest for the first time.
	WatchNsqdNodes(nsqds chan []NsqdNodeInfo, stop chan struct{})
	// get all topics info, should cache the newest to improve performance.
	ScanTopics() ([]TopicPartitionMetaInfo, error)
	GetAllTopicMetas() (map[string]TopicMetaInfo, error)
	// should return both the meta info for topic and the replica info for topic partition
	// epoch should be updated while return
	GetTopicInfo(topic string, partition int) (*TopicPartitionMetaInfo, error)
	// create and write the meta info to topic meta node
	CreateTopic(topic string, meta *TopicMetaInfo) error
	// create topic partition path
	CreateTopicPartition(topic string, partition int) error
	IsExistTopic(topic string) (bool, error)
	IsExistTopicPartition(topic string, partition int) (bool, error)
	// get topic meta info only
	GetTopicMetaInfo(topic string) (TopicMetaInfo, EpochType, error)
	GetTopicMetaInfoTryCache(topic string) (TopicMetaInfo, error)
	UpdateTopicMetaInfo(topic string, meta *TopicMetaInfo, oldGen EpochType) error
	DeleteTopic(topic string, partition int) error
	DeleteWholeTopic(topic string) error
	//
	// update the replica info about leader, isr, epoch for partition
	// Note: update should do check-and-set to avoid unexpected override.
	// the epoch in topicInfo should be updated to the new epoch
	// if no topic partition replica info node should create only once.
	UpdateTopicNodeInfo(topic string, partition int, topicInfo *TopicPartitionReplicaInfo, oldGen EpochType) error
	// get leadership information, if not exist should return ErrLeaderSessionNotExist as error
	GetTopicLeaderSession(topic string, partition int) (*TopicLeaderSession, error)
	// watch any leadership lock change for all topic partitions, should return the token used later by release.
	//WatchTopicLeader(leader chan *TopicLeaderSession, stop chan struct{}) error
	// only leader lookup can do the release, normally notify the nsqd node do the release by itself.
	// lookup node should release only when the nsqd is lost
	ReleaseTopicLeader(topic string, partition int, session *TopicLeaderSession) error
	// get topic meta info map with passing topics slice
	GetTopicsMetaInfoMap(topics []string) (map[string]TopicMetaInfo, error)
}

We need check leader lock session before do any modify to etcd. Make sure all returned value should be copied to avoid modify by outside.

type NodeTopicStats

type NodeTopicStats struct {
	NodeID string
	// the data (MB) need to be consumed on the leader for all channels in the topic.
	ChannelDepthData map[string]int64
	// the data left on disk. unit: MB
	TopicLeaderDataSize map[string]int64
	TopicTotalDataSize  map[string]int64
	NodeCPUs            int

	// the pub stat for past 24hr
	TopicHourlyPubDataList map[string][24]int64
	ChannelNum             map[string]int
	ChannelList            map[string][]string
	ChannelMetas           map[string][]nsqd.ChannelMetaInfo
	ChannelOffsets         map[string][]WrapChannelConsumerOffset
}

func NewNodeTopicStats

func NewNodeTopicStats(nid string, cap int, cpus int) *NodeTopicStats

func (*NodeTopicStats) GetMostBusyAndIdleTopicWriteLevel

func (nts *NodeTopicStats) GetMostBusyAndIdleTopicWriteLevel(leaderOnly bool) (string, string, float64, float64)

func (*NodeTopicStats) GetNodeAvgReadLevel

func (nts *NodeTopicStats) GetNodeAvgReadLevel() float64

func (*NodeTopicStats) GetNodeAvgWriteLevel

func (nts *NodeTopicStats) GetNodeAvgWriteLevel() float64

func (*NodeTopicStats) GetNodeLeaderLoadFactor

func (nts *NodeTopicStats) GetNodeLeaderLoadFactor() float64

func (*NodeTopicStats) GetNodeLoadFactor

func (nts *NodeTopicStats) GetNodeLoadFactor() (float64, float64)

func (*NodeTopicStats) GetNodePeakLevelList

func (nts *NodeTopicStats) GetNodePeakLevelList() []int64

func (*NodeTopicStats) GetSortedTopicWriteLevel

func (nts *NodeTopicStats) GetSortedTopicWriteLevel(leaderOnly bool) LFListT

func (*NodeTopicStats) GetTopicAvgWriteLevel

func (nts *NodeTopicStats) GetTopicAvgWriteLevel(topicFullName string) float64

func (*NodeTopicStats) GetTopicLeaderLoadFactor

func (nts *NodeTopicStats) GetTopicLeaderLoadFactor(topicFullName string) float64

func (*NodeTopicStats) GetTopicLoadFactor

func (nts *NodeTopicStats) GetTopicLoadFactor(topicFullName string) float64

func (*NodeTopicStats) GetTopicPeakLevel

func (nts *NodeTopicStats) GetTopicPeakLevel(topic TopicPartitionID) float64

func (*NodeTopicStats) LeaderLessLoader

func (nts *NodeTopicStats) LeaderLessLoader(other *NodeTopicStats) bool

func (*NodeTopicStats) SlaveLessLoader

func (nts *NodeTopicStats) SlaveLessLoader(other *NodeTopicStats) bool

type NsqLookupCoordRpcServer

type NsqLookupCoordRpcServer struct {
	// contains filtered or unexported fields
}

func NewNsqLookupCoordRpcServer

func NewNsqLookupCoordRpcServer(coord *NsqLookupCoordinator) *NsqLookupCoordRpcServer

func (*NsqLookupCoordRpcServer) ReadyForTopicISR

func (nlcoord *NsqLookupCoordRpcServer) ReadyForTopicISR(req *RpcReadyForISR) *CoordErr

func (*NsqLookupCoordRpcServer) RequestCheckTopicConsistence

func (nlcoord *NsqLookupCoordRpcServer) RequestCheckTopicConsistence(req *RpcReqCheckTopic) *CoordErr

func (*NsqLookupCoordRpcServer) RequestJoinCatchup

func (nlcoord *NsqLookupCoordRpcServer) RequestJoinCatchup(req *RpcReqJoinCatchup) *CoordErr

func (*NsqLookupCoordRpcServer) RequestJoinTopicISR

func (nlcoord *NsqLookupCoordRpcServer) RequestJoinTopicISR(req *RpcReqJoinISR) *CoordErr

func (*NsqLookupCoordRpcServer) RequestLeaveFromISR

func (nlcoord *NsqLookupCoordRpcServer) RequestLeaveFromISR(req *RpcReqLeaveFromISR) *CoordErr

func (*NsqLookupCoordRpcServer) RequestLeaveFromISRByLeader

func (nlcoord *NsqLookupCoordRpcServer) RequestLeaveFromISRByLeader(req *RpcReqLeaveFromISRByLeader) *CoordErr

func (*NsqLookupCoordRpcServer) RequestNotifyNewTopicInfo

func (nlcoord *NsqLookupCoordRpcServer) RequestNotifyNewTopicInfo(req *RpcReqNewTopicInfo) *CoordErr

type NsqLookupCoordinator

type NsqLookupCoordinator struct {
	// contains filtered or unexported fields
}

nsqlookup coordinator is used for the topic leader and isr coordinator, all the changes for leader or isr will be handled by the nsqlookup coordinator. In a cluster, only one nsqlookup coordinator will be the leader which will handle the event for cluster changes.

func NewNsqLookupCoordinator

func NewNsqLookupCoordinator(cluster string, n *NsqLookupdNodeInfo, opts *Options) *NsqLookupCoordinator

func (*NsqLookupCoordinator) ChangeTopicMetaParam

func (nlcoord *NsqLookupCoordinator) ChangeTopicMetaParam(topic string,
	newSyncEvery int, newRetentionDay int, newReplicator int, upgradeExt string) error

func (*NsqLookupCoordinator) CreateTopic

func (nlcoord *NsqLookupCoordinator) CreateTopic(topic string, meta TopicMetaInfo) error

func (*NsqLookupCoordinator) DeleteTopic

func (nlcoord *NsqLookupCoordinator) DeleteTopic(topic string, partition string) error

func (*NsqLookupCoordinator) DeleteTopicForce

func (nlcoord *NsqLookupCoordinator) DeleteTopicForce(topic string, partition string) error

func (*NsqLookupCoordinator) ExpandTopicPartition

func (nlcoord *NsqLookupCoordinator) ExpandTopicPartition(topic string, newPartitionNum int) error

func (*NsqLookupCoordinator) GetAllLookupdNodes

func (nlcoord *NsqLookupCoordinator) GetAllLookupdNodes() ([]NsqLookupdNodeInfo, error)

func (*NsqLookupCoordinator) GetClusterNodeLoadFactor

func (nlcoord *NsqLookupCoordinator) GetClusterNodeLoadFactor() (map[string]float64, map[string]float64)

func (*NsqLookupCoordinator) GetClusterTopNTopics

func (nlcoord *NsqLookupCoordinator) GetClusterTopNTopics(n int) LFListT

func (*NsqLookupCoordinator) GetLookupLeader

func (nlcoord *NsqLookupCoordinator) GetLookupLeader() NsqLookupdNodeInfo

func (*NsqLookupCoordinator) GetTopicLeaderNodes

func (nlcoord *NsqLookupCoordinator) GetTopicLeaderNodes(topicName string) (map[string]string, error)

func (*NsqLookupCoordinator) GetTopicMetaInfo

func (nlcoord *NsqLookupCoordinator) GetTopicMetaInfo(topicName string) (TopicMetaInfo, error)

func (*NsqLookupCoordinator) GetTopicsMetaInfoMap

func (nlcoord *NsqLookupCoordinator) GetTopicsMetaInfoMap(topics []string) (map[string]TopicMetaInfo, error)

func (*NsqLookupCoordinator) IsClusterStable

func (nlcoord *NsqLookupCoordinator) IsClusterStable() bool

func (*NsqLookupCoordinator) IsMineLeader

func (nlcoord *NsqLookupCoordinator) IsMineLeader() bool

func (*NsqLookupCoordinator) IsTopicLeader

func (nlcoord *NsqLookupCoordinator) IsTopicLeader(topic string, part int, nid string) bool

func (*NsqLookupCoordinator) MarkNodeAsRemoving

func (nlcoord *NsqLookupCoordinator) MarkNodeAsRemoving(nid string) error

func (*NsqLookupCoordinator) MoveTopicPartitionDataByManual

func (nlcoord *NsqLookupCoordinator) MoveTopicPartitionDataByManual(topicName string,
	partitionID int, moveLeader bool, fromNode string, toNode string) error

func (*NsqLookupCoordinator) SetClusterUpgradeState

func (nlcoord *NsqLookupCoordinator) SetClusterUpgradeState(upgrading bool) error

func (*NsqLookupCoordinator) SetLeadershipMgr

func (nlcoord *NsqLookupCoordinator) SetLeadershipMgr(l NSQLookupdLeadership)

func (*NsqLookupCoordinator) SetTopNBalance

func (nlcoord *NsqLookupCoordinator) SetTopNBalance(enable bool) error

func (*NsqLookupCoordinator) Start

func (nlcoord *NsqLookupCoordinator) Start() error

init and register to leader server

func (*NsqLookupCoordinator) Stop

func (nlcoord *NsqLookupCoordinator) Stop()

type NsqLookupRpcClient

type NsqLookupRpcClient struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func (*NsqLookupRpcClient) CallFast

func (nlrpc *NsqLookupRpcClient) CallFast(method string, arg interface{}) (interface{}, error)

func (*NsqLookupRpcClient) CallWithRetry

func (nlrpc *NsqLookupRpcClient) CallWithRetry(method string, arg interface{}) (interface{}, error)

func (*NsqLookupRpcClient) Close

func (nlrpc *NsqLookupRpcClient) Close()

func (*NsqLookupRpcClient) ReadyForTopicISR

func (nlrpc *NsqLookupRpcClient) ReadyForTopicISR(topic string, partition int, nid string, leaderSession *TopicLeaderSession, joinISRSession string) *CoordErr

func (*NsqLookupRpcClient) Reconnect

func (nlrpc *NsqLookupRpcClient) Reconnect() error

func (*NsqLookupRpcClient) RemoteAddr

func (nlrpc *NsqLookupRpcClient) RemoteAddr() string

func (*NsqLookupRpcClient) RequestCheckTopicConsistence

func (nlrpc *NsqLookupRpcClient) RequestCheckTopicConsistence(topic string, partition int)

func (*NsqLookupRpcClient) RequestJoinCatchup

func (nlrpc *NsqLookupRpcClient) RequestJoinCatchup(topic string, partition int, nid string) *CoordErr

func (*NsqLookupRpcClient) RequestJoinTopicISR

func (nlrpc *NsqLookupRpcClient) RequestJoinTopicISR(topic string, partition int, nid string) *CoordErr

func (*NsqLookupRpcClient) RequestLeaveFromISR

func (nlrpc *NsqLookupRpcClient) RequestLeaveFromISR(topic string, partition int, nid string) *CoordErr

func (*NsqLookupRpcClient) RequestLeaveFromISRByLeader

func (nlrpc *NsqLookupRpcClient) RequestLeaveFromISRByLeader(topic string, partition int, nid string, leaderSession *TopicLeaderSession) *CoordErr

func (*NsqLookupRpcClient) RequestLeaveFromISRFast

func (nlrpc *NsqLookupRpcClient) RequestLeaveFromISRFast(topic string, partition int, nid string) *CoordErr

func (*NsqLookupRpcClient) RequestNotifyNewTopicInfo

func (nlrpc *NsqLookupRpcClient) RequestNotifyNewTopicInfo(topic string, partition int, nid string)

func (*NsqLookupRpcClient) ShouldRemoved

func (nlrpc *NsqLookupRpcClient) ShouldRemoved() bool

type NsqLookupdEtcdMgr

type NsqLookupdEtcdMgr struct {
	// contains filtered or unexported fields
}

func NewNsqLookupdEtcdMgr

func NewNsqLookupdEtcdMgr(host, username, pwd string) (*NsqLookupdEtcdMgr, error)

func (*NsqLookupdEtcdMgr) AcquireAndWatchLeader

func (self *NsqLookupdEtcdMgr) AcquireAndWatchLeader(leader chan *NsqLookupdNodeInfo, stop chan struct{})

func (*NsqLookupdEtcdMgr) CheckIfLeader

func (self *NsqLookupdEtcdMgr) CheckIfLeader(session string) bool

func (*NsqLookupdEtcdMgr) CreateTopic

func (self *NsqLookupdEtcdMgr) CreateTopic(topic string, meta *TopicMetaInfo) error

func (*NsqLookupdEtcdMgr) CreateTopicPartition

func (self *NsqLookupdEtcdMgr) CreateTopicPartition(topic string, partition int) error

func (*NsqLookupdEtcdMgr) DeleteTopic

func (self *NsqLookupdEtcdMgr) DeleteTopic(topic string, partition int) error

func (*NsqLookupdEtcdMgr) DeleteWholeTopic

func (self *NsqLookupdEtcdMgr) DeleteWholeTopic(topic string) error

func (*NsqLookupdEtcdMgr) GetAllLookupdNodes

func (self *NsqLookupdEtcdMgr) GetAllLookupdNodes() ([]NsqLookupdNodeInfo, error)

TODO: cache this to improve performance

func (*NsqLookupdEtcdMgr) GetAllTopicMetas

func (self *NsqLookupdEtcdMgr) GetAllTopicMetas() (map[string]TopicMetaInfo, error)

func (*NsqLookupdEtcdMgr) GetClusterEpoch

func (self *NsqLookupdEtcdMgr) GetClusterEpoch() (EpochType, error)

func (*NsqLookupdEtcdMgr) GetNsqdNodes

func (self *NsqLookupdEtcdMgr) GetNsqdNodes() ([]NsqdNodeInfo, error)

func (*NsqLookupdEtcdMgr) GetTopicInfo

func (self *NsqLookupdEtcdMgr) GetTopicInfo(topic string, partition int) (*TopicPartitionMetaInfo, error)

func (*NsqLookupdEtcdMgr) GetTopicLeaderSession

func (self *NsqLookupdEtcdMgr) GetTopicLeaderSession(topic string, partition int) (*TopicLeaderSession, error)

func (*NsqLookupdEtcdMgr) GetTopicMetaInfo

func (self *NsqLookupdEtcdMgr) GetTopicMetaInfo(topic string) (TopicMetaInfo, EpochType, error)

func (*NsqLookupdEtcdMgr) GetTopicMetaInfoTryCache

func (self *NsqLookupdEtcdMgr) GetTopicMetaInfoTryCache(topic string) (TopicMetaInfo, error)

func (*NsqLookupdEtcdMgr) GetTopicsMetaInfoMap

func (self *NsqLookupdEtcdMgr) GetTopicsMetaInfoMap(topics []string) (map[string]TopicMetaInfo, error)

func (*NsqLookupdEtcdMgr) InitClusterID

func (self *NsqLookupdEtcdMgr) InitClusterID(id string)

func (*NsqLookupdEtcdMgr) IsExistTopic

func (self *NsqLookupdEtcdMgr) IsExistTopic(topic string) (bool, error)

func (*NsqLookupdEtcdMgr) IsExistTopicPartition

func (self *NsqLookupdEtcdMgr) IsExistTopicPartition(topic string, partitionNum int) (bool, error)

func (*NsqLookupdEtcdMgr) Register

func (self *NsqLookupdEtcdMgr) Register(value *NsqLookupdNodeInfo) error

func (*NsqLookupdEtcdMgr) ReleaseTopicLeader

func (self *NsqLookupdEtcdMgr) ReleaseTopicLeader(topic string, partition int, session *TopicLeaderSession) error

func (*NsqLookupdEtcdMgr) ScanTopics

func (self *NsqLookupdEtcdMgr) ScanTopics() ([]TopicPartitionMetaInfo, error)

func (*NsqLookupdEtcdMgr) Stop

func (self *NsqLookupdEtcdMgr) Stop()

func (*NsqLookupdEtcdMgr) Unregister

func (self *NsqLookupdEtcdMgr) Unregister(value *NsqLookupdNodeInfo) error

func (*NsqLookupdEtcdMgr) UpdateLookupEpoch

func (self *NsqLookupdEtcdMgr) UpdateLookupEpoch(oldGen EpochType) (EpochType, error)

func (*NsqLookupdEtcdMgr) UpdateTopicMetaInfo

func (self *NsqLookupdEtcdMgr) UpdateTopicMetaInfo(topic string, meta *TopicMetaInfo, oldGen EpochType) error

func (*NsqLookupdEtcdMgr) UpdateTopicNodeInfo

func (self *NsqLookupdEtcdMgr) UpdateTopicNodeInfo(topic string, partition int, topicInfo *TopicPartitionReplicaInfo, oldGen EpochType) error

func (*NsqLookupdEtcdMgr) WatchNsqdNodes

func (self *NsqLookupdEtcdMgr) WatchNsqdNodes(nsqds chan []NsqdNodeInfo, stop chan struct{})

type NsqLookupdNodeInfo

type NsqLookupdNodeInfo struct {
	ID       string
	NodeIP   string
	TcpPort  string
	HttpPort string
	RpcPort  string
	Epoch    EpochType
}

func (*NsqLookupdNodeInfo) GetID

func (self *NsqLookupdNodeInfo) GetID() string

type NsqdCoordRpcServer

type NsqdCoordRpcServer struct {
	// contains filtered or unexported fields
}

func NewNsqdCoordRpcServer

func NewNsqdCoordRpcServer(coord *NsqdCoordinator, rootPath string) *NsqdCoordRpcServer

func (*NsqdCoordRpcServer) DeleteChannel

func (self *NsqdCoordRpcServer) DeleteChannel(info *RpcChannelOffsetArg) *CoordErr

func (*NsqdCoordRpcServer) DeleteNsqdTopic

func (self *NsqdCoordRpcServer) DeleteNsqdTopic(rpcTopicReq *RpcAdminTopicInfo) *CoordErr

func (*NsqdCoordRpcServer) DisableTopicWrite

func (self *NsqdCoordRpcServer) DisableTopicWrite(rpcTopicReq *RpcAdminTopicInfo) *CoordErr

func (*NsqdCoordRpcServer) EnableTopicWrite

func (self *NsqdCoordRpcServer) EnableTopicWrite(rpcTopicReq *RpcAdminTopicInfo) *CoordErr

func (*NsqdCoordRpcServer) GetBackupedDelayedQueue

func (self *NsqdCoordRpcServer) GetBackupedDelayedQueue(req *RpcGetBackupedDQReq) (*RpcGetBackupedDQRsp, error)

func (*NsqdCoordRpcServer) GetCommitLogFromOffset

func (self *NsqdCoordRpcServer) GetCommitLogFromOffset(req *RpcCommitLogReq) *RpcCommitLogRsp

return the logdata from offset, if the offset is larger than local, then return the last logdata on local.

func (*NsqdCoordRpcServer) GetDelayedQueueCommitLogFromOffset

func (self *NsqdCoordRpcServer) GetDelayedQueueCommitLogFromOffset(req *RpcCommitLogReq) *RpcCommitLogRsp

func (*NsqdCoordRpcServer) GetDelayedQueueFullSyncInfo

func (self *NsqdCoordRpcServer) GetDelayedQueueFullSyncInfo(req *RpcGetFullSyncInfoReq) (*RpcGetFullSyncInfoRsp, error)

func (*NsqdCoordRpcServer) GetFullSyncInfo

func (self *NsqdCoordRpcServer) GetFullSyncInfo(req *RpcGetFullSyncInfoReq) (*RpcGetFullSyncInfoRsp, error)

func (*NsqdCoordRpcServer) GetLastCommitLogID

func (self *NsqdCoordRpcServer) GetLastCommitLogID(req *RpcCommitLogReq) (int64, error)

func (*NsqdCoordRpcServer) GetLastDelayedQueueCommitLogID

func (self *NsqdCoordRpcServer) GetLastDelayedQueueCommitLogID(req *RpcCommitLogReq) (int64, error)

func (*NsqdCoordRpcServer) GetNodeInfo

func (self *NsqdCoordRpcServer) GetNodeInfo(req *RpcNodeInfoReq) (*RpcNodeInfoRsp, error)

func (*NsqdCoordRpcServer) GetTopicStats

func (self *NsqdCoordRpcServer) GetTopicStats(topic string) *NodeTopicStats

func (*NsqdCoordRpcServer) IsTopicWriteDisabled

func (self *NsqdCoordRpcServer) IsTopicWriteDisabled(rpcTopicReq *RpcAdminTopicInfo) bool

func (*NsqdCoordRpcServer) NotifyAcquireTopicLeader

func (self *NsqdCoordRpcServer) NotifyAcquireTopicLeader(rpcTopicReq *RpcAcquireTopicLeaderReq) *CoordErr

func (*NsqdCoordRpcServer) NotifyReleaseTopicLeader

func (self *NsqdCoordRpcServer) NotifyReleaseTopicLeader(rpcTopicReq *RpcReleaseTopicLeaderReq) *CoordErr

func (*NsqdCoordRpcServer) NotifyTopicLeaderSession

func (self *NsqdCoordRpcServer) NotifyTopicLeaderSession(rpcTopicReq *RpcTopicLeaderSession) *CoordErr

func (*NsqdCoordRpcServer) PullCommitLogsAndData

func (self *NsqdCoordRpcServer) PullCommitLogsAndData(req *RpcPullCommitLogsReq) (*RpcPullCommitLogsRsp, error)

func (*NsqdCoordRpcServer) PullDelayedQueueCommitLogsAndData

func (self *NsqdCoordRpcServer) PullDelayedQueueCommitLogsAndData(req *RpcPullCommitLogsReq) (*RpcPullCommitLogsRsp, error)

func (*NsqdCoordRpcServer) PutDelayedMessage

func (self *NsqdCoordRpcServer) PutDelayedMessage(info *RpcPutMessage) *CoordErr

receive from leader

func (*NsqdCoordRpcServer) PutMessage

func (self *NsqdCoordRpcServer) PutMessage(info *RpcPutMessage) *CoordErr

receive from leader

func (*NsqdCoordRpcServer) PutMessages

func (self *NsqdCoordRpcServer) PutMessages(info *RpcPutMessages) *CoordErr

func (*NsqdCoordRpcServer) TestRpcCoordErr

func (self *NsqdCoordRpcServer) TestRpcCoordErr(req *RpcTestReq) *CoordErr

func (*NsqdCoordRpcServer) TestRpcError

func (self *NsqdCoordRpcServer) TestRpcError(req *RpcTestReq) *RpcTestRsp

func (*NsqdCoordRpcServer) TestRpcTimeout

func (self *NsqdCoordRpcServer) TestRpcTimeout() error

func (*NsqdCoordRpcServer) TriggerLookupChanged

func (self *NsqdCoordRpcServer) TriggerLookupChanged() error

func (*NsqdCoordRpcServer) UpdateChannelList

func (self *NsqdCoordRpcServer) UpdateChannelList(info *RpcChannelListArg) *CoordErr

func (*NsqdCoordRpcServer) UpdateChannelOffset

func (self *NsqdCoordRpcServer) UpdateChannelOffset(info *RpcChannelOffsetArg) *CoordErr

func (*NsqdCoordRpcServer) UpdateChannelState

func (self *NsqdCoordRpcServer) UpdateChannelState(state *RpcChannelState) *CoordErr

func (*NsqdCoordRpcServer) UpdateDelayedQueueState

func (self *NsqdCoordRpcServer) UpdateDelayedQueueState(info *RpcConfirmedDelayedCursor) *CoordErr

func (*NsqdCoordRpcServer) UpdateTopicInfo

func (self *NsqdCoordRpcServer) UpdateTopicInfo(rpcTopicReq *RpcAdminTopicInfo) *CoordErr

type NsqdCoordinator

type NsqdCoordinator struct {
	// contains filtered or unexported fields
}

func NewNsqdCoordinator

func NewNsqdCoordinator(cluster, ip, tcpport, rpcport, httpport, extraID string, rootPath string, nsqd *nsqd.NSQD) *NsqdCoordinator

func (*NsqdCoordinator) DeleteChannel

func (ncoord *NsqdCoordinator) DeleteChannel(topic *nsqd.Topic, channelName string) error

func (*NsqdCoordinator) EmptyChannelDelayedStateToCluster

func (ncoord *NsqdCoordinator) EmptyChannelDelayedStateToCluster(channel *nsqd.Channel) error

func (*NsqdCoordinator) FinishMessageToCluster

func (ncoord *NsqdCoordinator) FinishMessageToCluster(channel *nsqd.Channel, clientID int64, clientAddr string, msgID nsqd.MessageID) error

func (*NsqdCoordinator) GetAllLookupdNodes

func (ncoord *NsqdCoordinator) GetAllLookupdNodes() ([]NsqLookupdNodeInfo, error)

func (*NsqdCoordinator) GetCurrentLookupd

func (ncoord *NsqdCoordinator) GetCurrentLookupd() NsqLookupdNodeInfo

func (*NsqdCoordinator) GetMasterTopicCoordData

func (ncoord *NsqdCoordinator) GetMasterTopicCoordData(topic string) (int, *coordData, error)

since only one master is allowed on the same topic, we can get it.

func (*NsqdCoordinator) GetMyID

func (ncoord *NsqdCoordinator) GetMyID() string

func (*NsqdCoordinator) GreedyCleanTopicOldData

func (ncoord *NsqdCoordinator) GreedyCleanTopicOldData(localTopic *nsqd.Topic) error

func (*NsqdCoordinator) IsMineConsumeLeaderForTopic

func (ncoord *NsqdCoordinator) IsMineConsumeLeaderForTopic(topic string, part int) bool

func (*NsqdCoordinator) IsMineLeaderForTopic

func (ncoord *NsqdCoordinator) IsMineLeaderForTopic(topic string, part int) bool

func (*NsqdCoordinator) PutDelayedMessageToCluster

func (ncoord *NsqdCoordinator) PutDelayedMessageToCluster(topic *nsqd.Topic,
	msg *nsqd.Message) (nsqd.MessageID, nsqd.BackendOffset, int32, nsqd.BackendQueueEnd, error)

func (*NsqdCoordinator) PutMessageBodyToCluster

func (ncoord *NsqdCoordinator) PutMessageBodyToCluster(topic *nsqd.Topic,
	body []byte, traceID uint64) (nsqd.MessageID, nsqd.BackendOffset, int32, nsqd.BackendQueueEnd, error)

func (*NsqdCoordinator) PutMessageToCluster

func (ncoord *NsqdCoordinator) PutMessageToCluster(topic *nsqd.Topic,
	msg *nsqd.Message) (nsqd.MessageID, nsqd.BackendOffset, int32, nsqd.BackendQueueEnd, error)

func (*NsqdCoordinator) PutMessagesToCluster

func (ncoord *NsqdCoordinator) PutMessagesToCluster(topic *nsqd.Topic,
	msgs []*nsqd.Message) (nsqd.MessageID, nsqd.BackendOffset, int32, error)

func (*NsqdCoordinator) SearchLogByMsgCnt

func (ncoord *NsqdCoordinator) SearchLogByMsgCnt(topic string, part int, count int64) (*CommitLogData, int64, int64, error)

func (*NsqdCoordinator) SearchLogByMsgID

func (ncoord *NsqdCoordinator) SearchLogByMsgID(topic string, part int, msgID int64) (*CommitLogData, int64, int64, error)

func (*NsqdCoordinator) SearchLogByMsgOffset

func (ncoord *NsqdCoordinator) SearchLogByMsgOffset(topic string, part int, offset int64) (*CommitLogData, int64, int64, error)

search commitlog for message given by message disk data offset

func (*NsqdCoordinator) SearchLogByMsgTimestamp

func (ncoord *NsqdCoordinator) SearchLogByMsgTimestamp(topic string, part int, ts_sec int64) (*CommitLogData, int64, int64, error)

return the searched log data and the exact offset the reader should be reset and the total count before the offset.

func (*NsqdCoordinator) SetChannelConsumeOffsetToCluster

func (ncoord *NsqdCoordinator) SetChannelConsumeOffsetToCluster(ch *nsqd.Channel, queueOffset int64, cnt int64, force bool) error

func (*NsqdCoordinator) SetLeadershipMgr

func (ncoord *NsqdCoordinator) SetLeadershipMgr(l NSQDLeadership)

func (*NsqdCoordinator) Start

func (ncoord *NsqdCoordinator) Start() error

func (*NsqdCoordinator) Stats

func (ncoord *NsqdCoordinator) Stats(topic string, part int) *CoordStats

func (*NsqdCoordinator) Stop

func (ncoord *NsqdCoordinator) Stop()

func (*NsqdCoordinator) SyncTopicChannels

func (ncoord *NsqdCoordinator) SyncTopicChannels(topicName string, part int) error

func (*NsqdCoordinator) TryFixLocalTopic

func (ncoord *NsqdCoordinator) TryFixLocalTopic(topic string, pid int) error

func (*NsqdCoordinator) UpdateChannelStateToCluster

func (ncoord *NsqdCoordinator) UpdateChannelStateToCluster(channel *nsqd.Channel, paused int, skipped int, zanTestSkipped int) error

type NsqdEtcdMgr

type NsqdEtcdMgr struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewNsqdEtcdMgr

func NewNsqdEtcdMgr(host, username, pwd string) (*NsqdEtcdMgr, error)

func (*NsqdEtcdMgr) AcquireTopicLeader

func (nem *NsqdEtcdMgr) AcquireTopicLeader(topic string, partition int, nodeData *NsqdNodeInfo, epoch EpochType) error

func (*NsqdEtcdMgr) GetAllLookupdNodes

func (nem *NsqdEtcdMgr) GetAllLookupdNodes() ([]NsqLookupdNodeInfo, error)

func (*NsqdEtcdMgr) GetTopicInfo

func (nem *NsqdEtcdMgr) GetTopicInfo(topic string, partition int) (*TopicPartitionMetaInfo, error)

func (*NsqdEtcdMgr) GetTopicLeaderSession

func (nem *NsqdEtcdMgr) GetTopicLeaderSession(topic string, partition int) (*TopicLeaderSession, error)

func (*NsqdEtcdMgr) InitClusterID

func (nem *NsqdEtcdMgr) InitClusterID(id string)

func (*NsqdEtcdMgr) IsTopicRealDeleted

func (nem *NsqdEtcdMgr) IsTopicRealDeleted(topic string) (bool, error)

func (*NsqdEtcdMgr) RegisterNsqd

func (nem *NsqdEtcdMgr) RegisterNsqd(nodeData *NsqdNodeInfo) error

func (*NsqdEtcdMgr) ReleaseTopicLeader

func (nem *NsqdEtcdMgr) ReleaseTopicLeader(topic string, partition int, session *TopicLeaderSession) error

func (*NsqdEtcdMgr) UnregisterNsqd

func (nem *NsqdEtcdMgr) UnregisterNsqd(nodeData *NsqdNodeInfo) error

func (*NsqdEtcdMgr) WatchLookupdLeader

func (nem *NsqdEtcdMgr) WatchLookupdLeader(leader chan *NsqLookupdNodeInfo, stop chan struct{}) error

type NsqdNodeInfo

type NsqdNodeInfo struct {
	ID       string
	NodeIP   string
	TcpPort  string
	RpcPort  string
	HttpPort string
}

func (*NsqdNodeInfo) GetID

func (self *NsqdNodeInfo) GetID() string

type NsqdNodeLoadFactor

type NsqdNodeLoadFactor struct {
	// contains filtered or unexported fields
}

type NsqdRpcClient

type NsqdRpcClient struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewNsqdRpcClient

func NewNsqdRpcClient(addr string, timeout time.Duration) (*NsqdRpcClient, error)

func (*NsqdRpcClient) CallFast

func (nrpc *NsqdRpcClient) CallFast(method string, arg interface{}) (interface{}, error)

func (*NsqdRpcClient) CallRpcTest

func (nrpc *NsqdRpcClient) CallRpcTest(data string) (string, *CoordErr)

func (*NsqdRpcClient) CallRpcTestCoordErr

func (nrpc *NsqdRpcClient) CallRpcTestCoordErr(data string) *CoordErr

func (*NsqdRpcClient) CallRpcTesttimeout

func (nrpc *NsqdRpcClient) CallRpcTesttimeout(data string) error

func (*NsqdRpcClient) CallWithRetry

func (nrpc *NsqdRpcClient) CallWithRetry(method string, arg interface{}) (interface{}, error)

func (*NsqdRpcClient) Close

func (nrpc *NsqdRpcClient) Close()

func (*NsqdRpcClient) DeleteChannel

func (nrpc *NsqdRpcClient) DeleteChannel(leaderSession *TopicLeaderSession, info *TopicPartitionMetaInfo, channel string) *CoordErr

func (*NsqdRpcClient) DeleteNsqdTopic

func (nrpc *NsqdRpcClient) DeleteNsqdTopic(epoch EpochType, topicInfo *TopicPartitionMetaInfo) *CoordErr

func (*NsqdRpcClient) DisableTopicWrite

func (nrpc *NsqdRpcClient) DisableTopicWrite(epoch EpochType, topicInfo *TopicPartitionMetaInfo) *CoordErr

func (*NsqdRpcClient) DisableTopicWriteFast

func (nrpc *NsqdRpcClient) DisableTopicWriteFast(epoch EpochType, topicInfo *TopicPartitionMetaInfo) *CoordErr

func (*NsqdRpcClient) EnableTopicWrite

func (nrpc *NsqdRpcClient) EnableTopicWrite(epoch EpochType, topicInfo *TopicPartitionMetaInfo) *CoordErr

func (*NsqdRpcClient) GetBackupedDelayedQueue

func (nrpc *NsqdRpcClient) GetBackupedDelayedQueue(topic string, partition int) (io.Reader, error)

func (*NsqdRpcClient) GetCommitLogFromOffset

func (nrpc *NsqdRpcClient) GetCommitLogFromOffset(topicInfo *TopicPartitionMetaInfo, logCountNumIndex int64,
	logIndex int64, offset int64, fromDelayedQueue bool) (bool, int64, int64, int64, CommitLogData, *CoordErr)

func (*NsqdRpcClient) GetFullSyncInfo

func (nrpc *NsqdRpcClient) GetFullSyncInfo(topic string, partition int, fromDelayed bool) (*LogStartInfo, *CommitLogData, error)

func (*NsqdRpcClient) GetLastCommitLogID

func (nrpc *NsqdRpcClient) GetLastCommitLogID(topicInfo *TopicPartitionMetaInfo) (int64, *CoordErr)

func (*NsqdRpcClient) GetLastDelayedQueueCommitLogID

func (nrpc *NsqdRpcClient) GetLastDelayedQueueCommitLogID(topicInfo *TopicPartitionMetaInfo) (int64, *CoordErr)

func (*NsqdRpcClient) GetNodeInfo

func (nrpc *NsqdRpcClient) GetNodeInfo(nid string) (*NsqdNodeInfo, error)

func (*NsqdRpcClient) GetTopicStats

func (nrpc *NsqdRpcClient) GetTopicStats(topic string) (*NodeTopicStats, error)

func (*NsqdRpcClient) IsTopicWriteDisabled

func (nrpc *NsqdRpcClient) IsTopicWriteDisabled(topicInfo *TopicPartitionMetaInfo) bool

func (*NsqdRpcClient) NotifyAcquireTopicLeader

func (nrpc *NsqdRpcClient) NotifyAcquireTopicLeader(epoch EpochType, topicInfo *TopicPartitionMetaInfo) *CoordErr

func (*NsqdRpcClient) NotifyChannelList

func (nrpc *NsqdRpcClient) NotifyChannelList(leaderSession *TopicLeaderSession, info *TopicPartitionMetaInfo, chList []string) *CoordErr

func (*NsqdRpcClient) NotifyReleaseTopicLeader

func (nrpc *NsqdRpcClient) NotifyReleaseTopicLeader(epoch EpochType, topicInfo *TopicPartitionMetaInfo,
	leaderSessionEpoch EpochType, leaderSession string) *CoordErr

func (*NsqdRpcClient) NotifyTopicLeaderSession

func (nrpc *NsqdRpcClient) NotifyTopicLeaderSession(epoch EpochType, topicInfo *TopicPartitionMetaInfo, leaderSession *TopicLeaderSession, joinSession string) *CoordErr

func (*NsqdRpcClient) NotifyUpdateChannelOffset

func (nrpc *NsqdRpcClient) NotifyUpdateChannelOffset(leaderSession *TopicLeaderSession, info *TopicPartitionMetaInfo, channel string, offset ChannelConsumerOffset) *CoordErr

func (*NsqdRpcClient) PullCommitLogsAndData

func (nrpc *NsqdRpcClient) PullCommitLogsAndData(topic string, partition int, logCountNumIndex int64,
	logIndex int64, startOffset int64, num int, fromDelayed bool) ([]CommitLogData, [][]byte, error)

func (*NsqdRpcClient) PutDelayedMessage

func (nrpc *NsqdRpcClient) PutDelayedMessage(leaderSession *TopicLeaderSession, info *TopicPartitionMetaInfo, log CommitLogData, message *nsqd.Message) *CoordErr

func (*NsqdRpcClient) PutMessage

func (nrpc *NsqdRpcClient) PutMessage(leaderSession *TopicLeaderSession, info *TopicPartitionMetaInfo, log CommitLogData, message *nsqd.Message) *CoordErr

func (*NsqdRpcClient) PutMessages

func (nrpc *NsqdRpcClient) PutMessages(leaderSession *TopicLeaderSession, info *TopicPartitionMetaInfo, log CommitLogData, messages []*nsqd.Message) *CoordErr

func (*NsqdRpcClient) Reconnect

func (nrpc *NsqdRpcClient) Reconnect() error

func (*NsqdRpcClient) ShouldRemoved

func (nrpc *NsqdRpcClient) ShouldRemoved() bool

func (*NsqdRpcClient) TriggerLookupChanged

func (nrpc *NsqdRpcClient) TriggerLookupChanged() error

func (*NsqdRpcClient) UpdateChannelList

func (nrpc *NsqdRpcClient) UpdateChannelList(leaderSession *TopicLeaderSession, info *TopicPartitionMetaInfo, chList []string) *CoordErr

func (*NsqdRpcClient) UpdateChannelOffset

func (nrpc *NsqdRpcClient) UpdateChannelOffset(leaderSession *TopicLeaderSession, info *TopicPartitionMetaInfo, channel string, offset ChannelConsumerOffset) *CoordErr

func (*NsqdRpcClient) UpdateChannelState

func (nrpc *NsqdRpcClient) UpdateChannelState(leaderSession *TopicLeaderSession, info *TopicPartitionMetaInfo, channel string, paused int, skipped int, zanTestSkipped int) *CoordErr

func (*NsqdRpcClient) UpdateDelayedQueueState

func (nrpc *NsqdRpcClient) UpdateDelayedQueueState(leaderSession *TopicLeaderSession,
	info *TopicPartitionMetaInfo, ch string, ts int64, cursorList [][]byte,
	cntList map[int]uint64, channelCntList map[string]uint64, wait bool) *CoordErr

func (*NsqdRpcClient) UpdateTopicInfo

func (nrpc *NsqdRpcClient) UpdateTopicInfo(epoch EpochType, topicInfo *TopicPartitionMetaInfo) *CoordErr

type Options

type Options struct {
	BalanceStart int
	BalanceEnd   int
}

type RpcAcquireTopicLeaderReq

type RpcAcquireTopicLeaderReq struct {
	RpcTopicData
	LeaderNodeID string
	LookupdEpoch EpochType
}

type RpcAdminTopicInfo

type RpcAdminTopicInfo struct {
	TopicPartitionMetaInfo
	LookupdEpoch EpochType
	DisableWrite bool
}

type RpcChannelListArg

type RpcChannelListArg struct {
	RpcTopicData
	ChannelList []string
}

type RpcChannelOffsetArg

type RpcChannelOffsetArg struct {
	RpcTopicData
	Channel string
	// position file + file offset
	ChannelOffset ChannelConsumerOffset
}

type RpcChannelState

type RpcChannelState struct {
	RpcTopicData
	Channel        string
	Paused         int
	Skipped        int
	ZanTestSkipped int
}

type RpcCommitLogReq

type RpcCommitLogReq struct {
	RpcTopicData
	LogOffset        int64
	LogStartIndex    int64
	LogCountNumIndex int64
	UseCountIndex    bool
}

type RpcCommitLogRsp

type RpcCommitLogRsp struct {
	LogStartIndex    int64
	LogOffset        int64
	LogData          CommitLogData
	ErrInfo          CoordErr
	LogCountNumIndex int64
	UseCountIndex    bool
}

type RpcConfirmedDelayedCursor

type RpcConfirmedDelayedCursor struct {
	RpcTopicData
	UpdatedChannel string
	KeyList        [][]byte
	ChannelCntList map[string]uint64
	OtherCntList   map[int]uint64
	Timestamp      int64
}

type RpcFailedInfo

type RpcFailedInfo struct {
	// contains filtered or unexported fields
}

type RpcGetBackupedDQReq

type RpcGetBackupedDQReq struct {
	RpcTopicData
}

type RpcGetBackupedDQRsp

type RpcGetBackupedDQRsp struct {
	Buffer []byte
}

type RpcGetFullSyncInfoReq

type RpcGetFullSyncInfoReq struct {
	RpcTopicData
}

type RpcGetFullSyncInfoRsp

type RpcGetFullSyncInfoRsp struct {
	FirstLogData CommitLogData
	StartInfo    LogStartInfo
}

type RpcLookupReqBase

type RpcLookupReqBase struct {
	TopicName      string
	TopicPartition int
	NodeID         string
}

type RpcNodeInfoReq

type RpcNodeInfoReq struct {
	NodeID string
}

type RpcNodeInfoRsp

type RpcNodeInfoRsp struct {
	ID       string
	NodeIP   string
	TcpPort  string
	RpcPort  string
	HttpPort string
}

type RpcPullCommitLogsReq

type RpcPullCommitLogsReq struct {
	RpcTopicData
	StartLogOffset   int64
	LogMaxNum        int
	StartIndexCnt    int64
	LogCountNumIndex int64
	UseCountIndex    bool
}

type RpcPullCommitLogsRsp

type RpcPullCommitLogsRsp struct {
	Logs     []CommitLogData
	DataList [][]byte
}

type RpcPutMessage

type RpcPutMessage struct {
	RpcTopicData
	LogData         CommitLogData
	TopicMessage    *nsqd.Message
	TopicRawMessage []byte
}

type RpcPutMessages

type RpcPutMessages struct {
	RpcTopicData
	LogData       CommitLogData
	TopicMessages []*nsqd.Message
	// raw message data will include the size header
	TopicRawMessage []byte
}

type RpcReadyForISR

type RpcReadyForISR struct {
	RpcLookupReqBase
	LeaderSession  TopicLeaderSession
	JoinISRSession string
}

type RpcReleaseTopicLeaderReq

type RpcReleaseTopicLeaderReq struct {
	RpcTopicData
	LeaderNodeID string
	LookupdEpoch EpochType
}

type RpcReqCheckTopic

type RpcReqCheckTopic struct {
	RpcLookupReqBase
}

type RpcReqJoinCatchup

type RpcReqJoinCatchup struct {
	RpcLookupReqBase
}

type RpcReqJoinISR

type RpcReqJoinISR struct {
	RpcLookupReqBase
}

type RpcReqLeaveFromISR

type RpcReqLeaveFromISR struct {
	RpcLookupReqBase
}

type RpcReqLeaveFromISRByLeader

type RpcReqLeaveFromISRByLeader struct {
	RpcLookupReqBase
	LeaderSession TopicLeaderSession
}

type RpcReqNewTopicInfo

type RpcReqNewTopicInfo struct {
	RpcLookupReqBase
}

type RpcRspJoinISR

type RpcRspJoinISR struct {
	CoordErr
	JoinISRSession string
}

type RpcTestReq

type RpcTestReq struct {
	Data string
}

type RpcTestRsp

type RpcTestRsp struct {
	RspData string
	RetErr  *CoordErr
}

type RpcTopicData

type RpcTopicData struct {
	TopicName               string
	TopicPartition          int
	Epoch                   EpochType
	TopicWriteEpoch         EpochType
	TopicLeaderSessionEpoch EpochType
	TopicLeaderSession      string
	TopicLeader             string
}

type RpcTopicLeaderSession

type RpcTopicLeaderSession struct {
	RpcTopicData
	LeaderNode   NsqdNodeInfo
	LookupdEpoch EpochType
	JoinSession  string
}

type SlaveAsyncWriteResult

type SlaveAsyncWriteResult struct {
	// contains filtered or unexported fields
}

func (*SlaveAsyncWriteResult) GetResult

func (self *SlaveAsyncWriteResult) GetResult() *CoordErr

func (*SlaveAsyncWriteResult) Wait

func (self *SlaveAsyncWriteResult) Wait()

type SortableStrings

type SortableStrings []string

func (SortableStrings) Len

func (s SortableStrings) Len() int

func (SortableStrings) Less

func (s SortableStrings) Less(l, r int) bool

func (SortableStrings) Swap

func (s SortableStrings) Swap(l, r int)

type StatsSorter

type StatsSorter struct {
	// contains filtered or unexported fields
}

func (*StatsSorter) Len

func (s *StatsSorter) Len() int

func (*StatsSorter) Less

func (s *StatsSorter) Less(i, j int) bool

func (*StatsSorter) Swap

func (s *StatsSorter) Swap(i, j int)

type TopicCatchupInfo

type TopicCatchupInfo struct {
	CatchupList []string
}

type TopicChannelsInfo

type TopicChannelsInfo struct {
	Channels []string
}

type TopicCommitLogMgr

type TopicCommitLogMgr struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func InitTopicCommitLogMgr

func InitTopicCommitLogMgr(t string, p int, basepath string, commitBufSize int) (*TopicCommitLogMgr, error)

func InitTopicCommitLogMgrWithFixMode

func InitTopicCommitLogMgrWithFixMode(t string, p int, basepath string, commitBufSize int, fixMode bool) (*TopicCommitLogMgr, error)

func (*TopicCommitLogMgr) AppendCommitLog

func (self *TopicCommitLogMgr) AppendCommitLog(l *CommitLogData, slave bool) error

func (*TopicCommitLogMgr) AppendCommitLogWithSync

func (self *TopicCommitLogMgr) AppendCommitLogWithSync(l *CommitLogData, slave bool, useFsync bool) error

func (*TopicCommitLogMgr) CleanOldData

func (self *TopicCommitLogMgr) CleanOldData(fileIndex int64, fileOffset int64) error

func (*TopicCommitLogMgr) Close

func (self *TopicCommitLogMgr) Close()

func (*TopicCommitLogMgr) ConvertToCountIndex

func (self *TopicCommitLogMgr) ConvertToCountIndex(start int64, offset int64) (int64, error)

func (*TopicCommitLogMgr) ConvertToOffsetIndex

func (self *TopicCommitLogMgr) ConvertToOffsetIndex(countIndex int64) (int64, int64, error)

func (*TopicCommitLogMgr) Delete

func (self *TopicCommitLogMgr) Delete()

func (*TopicCommitLogMgr) FlushCommitLogs

func (self *TopicCommitLogMgr) FlushCommitLogs()

func (*TopicCommitLogMgr) GetCommitLogFromOffsetV2

func (self *TopicCommitLogMgr) GetCommitLogFromOffsetV2(start int64, offset int64) (*CommitLogData, error)

func (*TopicCommitLogMgr) GetCommitLogsV2

func (self *TopicCommitLogMgr) GetCommitLogsV2(startIndex int64, startOffset int64, num int) ([]CommitLogData, error)

func (*TopicCommitLogMgr) GetCurrentEnd

func (self *TopicCommitLogMgr) GetCurrentEnd() (int64, int64)

func (*TopicCommitLogMgr) GetCurrentStart

func (self *TopicCommitLogMgr) GetCurrentStart() int64

func (*TopicCommitLogMgr) GetLastCommitLogDataOnSegment

func (self *TopicCommitLogMgr) GetLastCommitLogDataOnSegment(index int64) (int64, *CommitLogData, error)

func (*TopicCommitLogMgr) GetLastCommitLogID

func (self *TopicCommitLogMgr) GetLastCommitLogID() int64

func (*TopicCommitLogMgr) GetLastCommitLogOffsetV2

func (self *TopicCommitLogMgr) GetLastCommitLogOffsetV2() (int64, int64, *CommitLogData, error)

this will get the log data of last commit

func (*TopicCommitLogMgr) GetLogStartInfo

func (self *TopicCommitLogMgr) GetLogStartInfo() (*LogStartInfo, *CommitLogData, error)

func (*TopicCommitLogMgr) IsCommitted

func (self *TopicCommitLogMgr) IsCommitted(id int64) bool

func (*TopicCommitLogMgr) MoveTo

func (self *TopicCommitLogMgr) MoveTo(newBase string) error

func (*TopicCommitLogMgr) NextID

func (self *TopicCommitLogMgr) NextID() uint64

func (*TopicCommitLogMgr) Reopen

func (self *TopicCommitLogMgr) Reopen() error

func (*TopicCommitLogMgr) Reset

func (self *TopicCommitLogMgr) Reset(id uint64)

reset the nextid

func (*TopicCommitLogMgr) ResetLogWithStart

func (self *TopicCommitLogMgr) ResetLogWithStart(newStart LogStartInfo) error

func (*TopicCommitLogMgr) SearchLogDataByComparator

func (self *TopicCommitLogMgr) SearchLogDataByComparator(comp ICommitLogComparator) (int64, int64, *CommitLogData, error)

func (*TopicCommitLogMgr) SearchLogDataByMsgCnt

func (self *TopicCommitLogMgr) SearchLogDataByMsgCnt(searchCnt int64) (int64, int64, *CommitLogData, error)

func (*TopicCommitLogMgr) SearchLogDataByMsgID

func (self *TopicCommitLogMgr) SearchLogDataByMsgID(searchMsgID int64) (int64, int64, *CommitLogData, error)

func (*TopicCommitLogMgr) SearchLogDataByMsgOffset

func (self *TopicCommitLogMgr) SearchLogDataByMsgOffset(searchMsgOffset int64) (int64, int64, *CommitLogData, error)

func (*TopicCommitLogMgr) TruncateToOffsetV2

func (self *TopicCommitLogMgr) TruncateToOffsetV2(startIndex int64, offset int64) (*CommitLogData, error)

type TopicCoordStat

type TopicCoordStat struct {
	Node         string        `json:"node"`
	Name         string        `json:"name"`
	Partition    int           `json:"partition"`
	ISRStats     []ISRStat     `json:"isr_stats"`
	CatchupStats []CatchupStat `json:"catchup_stats"`
}

type TopicCoordinator

type TopicCoordinator struct {
	// contains filtered or unexported fields
}

func NewTopicCoordinator

func NewTopicCoordinator(name string, partition int, basepath string,
	syncEvery int, ordered bool) (*TopicCoordinator, error)

func NewTopicCoordinatorWithFixMode

func NewTopicCoordinatorWithFixMode(name string, partition int, basepath string,
	syncEvery int, ordered bool, forceFix bool) (*TopicCoordinator, error)

func (*TopicCoordinator) DeleteNoWriteLock

func (tc *TopicCoordinator) DeleteNoWriteLock(removeData bool)

func (*TopicCoordinator) DeleteWithLock

func (tc *TopicCoordinator) DeleteWithLock(removeData bool)

func (*TopicCoordinator) DisableWrite

func (tc *TopicCoordinator) DisableWrite(disable bool)

func (*TopicCoordinator) Exiting

func (tc *TopicCoordinator) Exiting()

func (TopicCoordinator) GetCopy

func (cd TopicCoordinator) GetCopy() *coordData

func (*TopicCoordinator) GetData

func (tc *TopicCoordinator) GetData() *coordData

func (*TopicCoordinator) GetDelayedQueueLogMgr

func (tc *TopicCoordinator) GetDelayedQueueLogMgr() (*TopicCommitLogMgr, error)

func (TopicCoordinator) GetLeader

func (cd TopicCoordinator) GetLeader() string

func (TopicCoordinator) GetLeaderSession

func (cd TopicCoordinator) GetLeaderSession() string

func (TopicCoordinator) GetLeaderSessionEpoch

func (cd TopicCoordinator) GetLeaderSessionEpoch() EpochType

func (TopicCoordinator) GetLeaderSessionID

func (cd TopicCoordinator) GetLeaderSessionID() string

func (TopicCoordinator) GetTopicEpochForWrite

func (cd TopicCoordinator) GetTopicEpochForWrite() EpochType

func (*TopicCoordinator) IsExiting

func (tc *TopicCoordinator) IsExiting() bool

func (TopicCoordinator) IsForceLeave

func (cd TopicCoordinator) IsForceLeave() bool

func (TopicCoordinator) IsISRReadyForWrite

func (cd TopicCoordinator) IsISRReadyForWrite(myID string) bool

func (TopicCoordinator) IsMineISR

func (cd TopicCoordinator) IsMineISR(id string) bool

func (TopicCoordinator) IsMineLeaderSessionReady

func (cd TopicCoordinator) IsMineLeaderSessionReady(id string) bool

func (*TopicCoordinator) IsWriteDisabled

func (tc *TopicCoordinator) IsWriteDisabled() bool

func (TopicCoordinator) SetForceLeave

func (cd TopicCoordinator) SetForceLeave(leave bool)

type TopicLeaderSession

type TopicLeaderSession struct {
	Topic       string
	Partition   int
	LeaderNode  *NsqdNodeInfo
	Session     string
	LeaderEpoch EpochType
}

func (*TopicLeaderSession) IsSame

func (self *TopicLeaderSession) IsSame(other *TopicLeaderSession) bool

type TopicMetaInfo

type TopicMetaInfo struct {
	PartitionNum int
	Replica      int
	// the suggest load factor for each topic partition.
	SuggestLF int
	// other options
	SyncEvery int
	// to verify the data of the create -> delete -> create with same topic
	MagicCode int64
	// the retention days for the data
	RetentionDay int32
	// used for ordered multi partition topic
	// allow multi partitions on the same node
	OrderedMulti bool
	//used for message ext
	Ext bool
}

type TopicNameInfo

type TopicNameInfo struct {
	TopicName      string
	TopicPartition int
}

type TopicPartitionID

type TopicPartitionID struct {
	TopicName      string
	TopicPartition int
}

func (*TopicPartitionID) String

func (ncoord *TopicPartitionID) String() string

type TopicPartitionMetaInfo

type TopicPartitionMetaInfo struct {
	Name      string
	Partition int
	TopicMetaInfo
	TopicPartitionReplicaInfo
}

func (*TopicPartitionMetaInfo) Copy

func (*TopicPartitionMetaInfo) GetTopicDesp

func (self *TopicPartitionMetaInfo) GetTopicDesp() string

type TopicPartitionReplicaInfo

type TopicPartitionReplicaInfo struct {
	Leader      string
	ISR         []string
	CatchupList []string
	Channels    []string
	// this is only used for write operation
	// if this changed during write, mean the current write should be abort
	EpochForWrite EpochType
	Epoch         EpochType
}

func (*TopicPartitionReplicaInfo) Copy

type TopicReplicasInfo

type TopicReplicasInfo struct {
	Leader string
	ISR    []string
}

type WrapChannelConsumerOffset

type WrapChannelConsumerOffset struct {
	Name string
	ChannelConsumerOffset
}

Directories

Path Synopsis
Package coordgrpc is a generated protocol buffer package.
Package coordgrpc is a generated protocol buffer package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL