consistence

package
v0.3.7-HA.1.5.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 17, 2017 License: MIT Imports: 32 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DEFAULT_COMMIT_BUF_SIZE = 1024
	MAX_INCR_ID_BIT         = 50
)
View Source
const (
	ErrFailedOnNotLeader   = "E_FAILED_ON_NOT_LEADER"
	ErrFailedOnNotWritable = "E_FAILED_ON_NOT_WRITABLE"
)
View Source
const (
	RATIO_BETWEEN_LEADER_FOLLOWER = 0.7

	HIGHEST_PUB_QPS_LEVEL        = 100
	HIGHEST_LEFT_CONSUME_MB_SIZE = 50 * 1024
	HIGHEST_LEFT_DATA_MB_SIZE    = 200 * 1024
)
View Source
const (
	EVENT_WATCH_TOPIC_L_CREATE = iota
	EVENT_WATCH_TOPIC_L_DELETE
)
View Source
const (
	MAX_WRITE_RETRY                  = 10
	MAX_CATCHUP_RETRY                = 5
	MAX_LOG_PULL                     = 10000
	MAX_LOG_PULL_BYTES               = 1024 * 1024 * 32
	MAX_TOPIC_RETENTION_SIZE_PER_DAY = 1024 * 1024 * 1024 * 4
	MAX_CATCHUP_RUNNING              = 3
)
View Source
const (
	RPC_TIMEOUT            = time.Duration(time.Second * 5)
	RPC_TIMEOUT_SHORT      = time.Duration(time.Second * 3)
	RPC_TIMEOUT_FOR_LOOKUP = time.Duration(time.Second * 3)
)
View Source
const (
	MAX_PARTITION_NUM  = 255
	MAX_SYNC_EVERY     = 5000
	MAX_RETENTION_DAYS = 60
)
View Source
const (
	NSQ_ROOT_DIR               = "NSQMetaData"
	NSQ_TOPIC_DIR              = "Topics"
	NSQ_TOPIC_META             = "TopicMeta"
	NSQ_TOPIC_REPLICA_INFO     = "ReplicaInfo"
	NSQ_TOPIC_LEADER_SESSION   = "LeaderSession"
	NSQ_NODE_DIR               = "NsqdNodes"
	NSQ_LOOKUPD_DIR            = "NsqlookupdInfo"
	NSQ_LOOKUPD_NODE_DIR       = "NsqlookupdNodes"
	NSQ_LOOKUPD_LEADER_SESSION = "LookupdLeaderSession"
)
View Source
const (
	ETCD_LOCK_NSQ_NAMESPACE = "nsq"
)
View Source
const (
	ETCD_TTL = 15
)

Variables

View Source
var (
	ErrCommitLogWrongID              = errors.New("commit log id is wrong")
	ErrCommitLogWrongLastID          = errors.New("commit log last id should no less than log id")
	ErrCommitLogIDNotFound           = errors.New("commit log id is not found")
	ErrCommitLogSearchNotFound       = errors.New("search commit log data not found")
	ErrCommitLogOutofBound           = errors.New("commit log offset is out of bound")
	ErrCommitLogEOF                  = errors.New("commit log end of file")
	ErrCommitLogOffsetInvalid        = errors.New("commit log offset is invalid")
	ErrCommitLogPartitionExceed      = errors.New("commit log partition id is exceeded")
	ErrCommitLogSearchFailed         = errors.New("commit log data search failed")
	ErrCommitLogSegmentSizeInvalid   = errors.New("commit log segment size is invalid")
	ErrCommitLogLessThanSegmentStart = errors.New("commit log read index is less than segment start")
	ErrCommitLogCleanKeepMin         = errors.New("commit log clean should keep some data")
)
View Source
var (
	ErrTopicInfoNotFound = NewCoordErr("topic info not found", CoordClusterErr)

	ErrNotTopicLeader                     = NewCoordErrWithCode("not topic leader", CoordClusterNoRetryWriteErr, RpcErrNotTopicLeader)
	ErrEpochMismatch                      = NewCoordErrWithCode("commit epoch not match", CoordElectionTmpErr, RpcErrEpochMismatch)
	ErrEpochLessThanCurrent               = NewCoordErrWithCode("epoch should be increased", CoordElectionErr, RpcErrEpochLessThanCurrent)
	ErrWriteQuorumFailed                  = NewCoordErrWithCode("write to quorum failed.", CoordElectionTmpErr, RpcErrWriteQuorumFailed)
	ErrCommitLogIDDup                     = NewCoordErrWithCode("commit id duplicated", CoordElectionErr, RpcErrCommitLogIDDup)
	ErrMissingTopicLeaderSession          = NewCoordErrWithCode("missing topic leader session", CoordElectionErr, RpcErrMissingTopicLeaderSession)
	ErrLeaderSessionMismatch              = NewCoordErrWithCode("leader session mismatch", CoordElectionTmpErr, RpcErrLeaderSessionMismatch)
	ErrWriteDisabled                      = NewCoordErrWithCode("write is disabled on the topic", CoordElectionErr, RpcErrWriteDisabled)
	ErrLeavingISRWait                     = NewCoordErrWithCode("leaving isr need wait.", CoordElectionTmpErr, RpcErrLeavingISRWait)
	ErrTopicCoordExistingAndMismatch      = NewCoordErrWithCode("topic coordinator existing with a different partition", CoordClusterErr, RpcErrTopicCoordExistingAndMismatch)
	ErrTopicCoordTmpConflicted            = NewCoordErrWithCode("topic coordinator is conflicted temporally", CoordClusterErr, RpcErrTopicCoordConflicted)
	ErrTopicLeaderChanged                 = NewCoordErrWithCode("topic leader changed", CoordElectionTmpErr, RpcErrTopicLeaderChanged)
	ErrTopicCommitLogEOF                  = NewCoordErrWithCode(ErrCommitLogEOF.Error(), CoordCommonErr, RpcErrCommitLogEOF)
	ErrTopicCommitLogOutofBound           = NewCoordErrWithCode(ErrCommitLogOutofBound.Error(), CoordCommonErr, RpcErrCommitLogOutofBound)
	ErrTopicCommitLogLessThanSegmentStart = NewCoordErrWithCode(ErrCommitLogLessThanSegmentStart.Error(), CoordCommonErr, RpcErrCommitLogLessThanSegmentStart)
	ErrTopicCommitLogNotConsistent        = NewCoordErrWithCode("topic commit log is not consistent", CoordClusterErr, RpcCommonErr)
	ErrMissingTopicCoord                  = NewCoordErrWithCode("missing topic coordinator", CoordClusterErr, RpcErrMissingTopicCoord)
	ErrTopicLoading                       = NewCoordErrWithCode("topic is still loading data", CoordLocalTmpErr, RpcErrTopicLoading)
	ErrTopicExiting                       = NewCoordErr("topic coordinator is exiting", CoordLocalTmpErr)
	ErrTopicExitingOnSlave                = NewCoordErr("topic coordinator is exiting on slave", CoordTmpErr)
	ErrTopicCoordStateInvalid             = NewCoordErrWithCode("invalid coordinator state", CoordClusterErr, RpcErrTopicCoordStateInvalid)
	ErrTopicSlaveInvalid                  = NewCoordErrWithCode("topic slave has some invalid state", CoordSlaveErr, RpcErrSlaveStateInvalid)
	ErrTopicLeaderSessionInvalid          = NewCoordErrWithCode("topic leader session is invalid", CoordElectionTmpErr, RpcCommonErr)
	ErrTopicWriteOnNonISR                 = NewCoordErrWithCode("topic write on a node not in ISR", CoordTmpErr, RpcErrWriteOnNonISR)
	ErrTopicISRNotEnough                  = NewCoordErrWithCode("topic isr nodes not enough", CoordTmpErr, RpcCommonErr)
	ErrClusterChanged                     = NewCoordErrWithCode("cluster changed ", CoordTmpErr, RpcNoErr)

	ErrPubArgError                = NewCoordErr("pub argument error", CoordCommonErr)
	ErrTopicNotRelated            = NewCoordErr("topic not related to me", CoordCommonErr)
	ErrTopicCatchupAlreadyRunning = NewCoordErr("topic is already running catchup", CoordCommonErr)
	ErrTopicArgError              = NewCoordErr("topic argument error", CoordCommonErr)
	ErrOperationExpired           = NewCoordErr("operation has expired since wait too long", CoordCommonErr)
	ErrCatchupRunningBusy         = NewCoordErr("too much running catchup", CoordCommonErr)

	ErrMissingTopicLog                     = NewCoordErr("missing topic log ", CoordLocalErr)
	ErrLocalTopicPartitionMismatch         = NewCoordErr("local topic partition not match", CoordLocalErr)
	ErrLocalFallBehind                     = NewCoordErr("local data fall behind", CoordElectionErr)
	ErrLocalForwardThanLeader              = NewCoordErr("local data is more than leader", CoordElectionErr)
	ErrLocalSetChannelOffsetNotFirstClient = NewCoordErr("failed to set channel offset since not first client", CoordLocalErr)
	ErrLocalMissingTopic                   = NewCoordErr("local topic missing", CoordLocalErr)
	ErrLocalNotReadyForWrite               = NewCoordErr("local topic is not ready for write.", CoordLocalErr)
	ErrLocalInitTopicFailed                = NewCoordErr("local topic init failed", CoordLocalErr)
	ErrLocalInitTopicCoordFailed           = NewCoordErr("topic coordinator init failed", CoordLocalErr)
	ErrLocalTopicDataCorrupt               = NewCoordErr("local topic data corrupt", CoordLocalErr)
)
View Source
var (
	ErrBalanceNodeUnavailable     = errors.New("can not find a node to be balanced")
	ErrNodeIsExcludedForTopicData = errors.New("destination node is excluded for topic")
	ErrClusterBalanceRunning      = errors.New("another balance is running, should wait")
)
View Source
var (
	ErrLeaderSessionAlreadyExist = errors.New("leader session already exist")
	ErrLeaderSessionNotExist     = errors.New("session not exist")
	ErrKeyAlreadyExist           = errors.New("Key already exist")
	ErrKeyNotFound               = errors.New("Key not found")
)
View Source
var (
	ErrAlreadyExist         = errors.New("already exist")
	ErrTopicNotCreated      = errors.New("topic is not created")
	ErrWaitingLeaderRelease = errors.New("leader session is still alive")
	ErrNotNsqLookupLeader   = errors.New("Not nsqlookup leader")
	ErrClusterUnstable      = errors.New("the cluster is unstable")

	ErrLeaderNodeLost           = NewCoordErr("leader node is lost", CoordTmpErr)
	ErrNodeNotFound             = NewCoordErr("node not found", CoordCommonErr)
	ErrLeaderElectionFail       = NewCoordErr("Leader election failed.", CoordElectionTmpErr)
	ErrNoLeaderCanBeElected     = NewCoordErr("No leader can be elected", CoordElectionTmpErr)
	ErrNodeUnavailable          = NewCoordErr("No node is available for topic", CoordTmpErr)
	ErrJoinISRInvalid           = NewCoordErr("Join ISR failed", CoordCommonErr)
	ErrJoinISRTimeout           = NewCoordErr("Join ISR timeout", CoordCommonErr)
	ErrWaitingJoinISR           = NewCoordErr("The topic is waiting node to join isr", CoordCommonErr)
	ErrLeaderSessionNotReleased = NewCoordErr("The topic leader session is not released", CoordElectionTmpErr)
	ErrTopicISRCatchupEnough    = NewCoordErr("the topic isr and catchup nodes are enough", CoordTmpErr)
	ErrClusterNodeRemoving      = NewCoordErr("the node is mark as removed", CoordTmpErr)
	ErrTopicNodeConflict        = NewCoordErr("the topic node info is conflicted", CoordElectionErr)
	ErrLeadershipServerUnstable = NewCoordErr("the leadership server is unstable", CoordTmpErr)
)
View Source
var LOGROTATE_NUM = 2000000
View Source
var MIN_KEEP_LOG_ITEM = 1000
View Source
var (
	MaxRetryWait = time.Second * 3
)

Functions

func AddCounter

func AddCounter(name string) uint32

func CheckKeyIfExist

func CheckKeyIfExist(err error) bool

func DecodeMessagesFromRaw

func DecodeMessagesFromRaw(data []byte, msgs []*nsqd.Message, tmpbuf []byte) ([]*nsqd.Message, error)

func ExtractRpcAddrFromID

func ExtractRpcAddrFromID(nid string) string

func FilterList

func FilterList(l []string, filter []string) []string

func FindSlice

func FindSlice(in []string, e string) int

func GenNsqLookupNodeID

func GenNsqLookupNodeID(n *NsqLookupdNodeInfo, extra string) string

func GenNsqdNodeID

func GenNsqdNodeID(n *NsqdNodeInfo, extra string) string

func GetLogDataSize

func GetLogDataSize() int

func GetNextLogOffset

func GetNextLogOffset(cur int64) int64

func GetPartitionFromMsgID

func GetPartitionFromMsgID(id int64) int

func GetPrevLogOffset

func GetPrevLogOffset(cur int64) int64

func GetTopicPartitionBasePath

func GetTopicPartitionBasePath(rootPath string, topic string, partition int) string

func GetTopicPartitionFileName

func GetTopicPartitionFileName(topic string, partition int, suffix string) string

func GetTopicPartitionLogPath

func GetTopicPartitionLogPath(basepath, t string, p int) string

func IncCounter

func IncCounter(id uint32)

func IncCounterBy

func IncCounterBy(id uint32, amount uint64)

func IsEtcdNodeExist

func IsEtcdNodeExist(err error) bool

func IsEtcdNotFile

func IsEtcdNotFile(err error) bool

func MergeList

func MergeList(l []string, r []string) []string

func NewEtcdClient

func NewEtcdClient(etcdHost string) *etcd.Client

func NewNsqdCoordGRpcServer

func NewNsqdCoordGRpcServer(coord *NsqdCoordinator, rootPath string) *nsqdCoordGRpcServer

func RetryWithTimeout

func RetryWithTimeout(fn func() error) error

func SetCoordLogLevel

func SetCoordLogLevel(level int32)

func SetCoordLogger

func SetCoordLogger(log levellogger.Logger, level int32)

func SetEtcdLogger

func SetEtcdLogger(log etcdlock.Logger, level int32)

Types

type By

type By func(l, r *NodeTopicStats) bool

func (By) Sort

func (by By) Sort(statList []NodeTopicStats)

type CatchupStat

type CatchupStat struct {
	HostName string `json:"hostname"`
	NodeID   string `json:"node_id"`
	Progress int    `json:"progress"`
}

type ChannelConsumeMgr

type ChannelConsumeMgr struct {
	sync.Mutex
	// contains filtered or unexported fields
}

type ChannelConsumerOffset

type ChannelConsumerOffset struct {
	VOffset       int64
	VCnt          int64
	Flush         bool
	AllowBackward bool
}

type CntComparator

type CntComparator int64

func (CntComparator) GreatThanRightBoundary

func (self CntComparator) GreatThanRightBoundary(l *CommitLogData) bool

func (CntComparator) LessThanLeftBoundary

func (self CntComparator) LessThanLeftBoundary(l *CommitLogData) bool

func (CntComparator) SearchEndBoundary

func (self CntComparator) SearchEndBoundary() int64

type CommitLogData

type CommitLogData struct {
	LogID int64
	// epoch for the topic leader
	Epoch EpochType
	// if single message, this should be the same with logid
	// for multiple messages, this would be the logid for the last message in batch
	LastMsgLogID int64
	MsgOffset    int64
	// size for batch messages
	MsgSize int32
	// the total message count for all from begin, not only this batch
	MsgCnt int64
	// the message number for current commit
	MsgNum int32
}

type CommonCoordErr

type CommonCoordErr struct {
	CoordErr
}

func (*CommonCoordErr) Error

func (self *CommonCoordErr) Error() string

type ConsistentStore

type ConsistentStore interface {
	WriteKey(key, value string) error
	ReadKey(key string) (string, error)
	ListKey(key string) ([]string, error)
}

type CoordErr

type CoordErr struct {
	ErrMsg  string
	ErrCode ErrRPCRetCode
	ErrType CoordErrType
}

note: since the gorpc will treat error type as special, we should not implement the error interface for CoordErr as response type

func NewCoordErr

func NewCoordErr(msg string, etype CoordErrType) *CoordErr

func NewCoordErrWithCode

func NewCoordErrWithCode(msg string, etype CoordErrType, code ErrRPCRetCode) *CoordErr

func (*CoordErr) CanRetryWrite

func (self *CoordErr) CanRetryWrite(retryTimes int) bool

func (*CoordErr) HasError

func (self *CoordErr) HasError() bool

func (*CoordErr) IsEqual

func (self *CoordErr) IsEqual(other *CoordErr) bool

func (*CoordErr) IsLocalErr

func (self *CoordErr) IsLocalErr() bool

func (*CoordErr) IsNetErr

func (self *CoordErr) IsNetErr() bool

func (*CoordErr) String

func (self *CoordErr) String() string

func (*CoordErr) ToErrorType

func (self *CoordErr) ToErrorType() error

type CoordErrStats

type CoordErrStats struct {
	sync.Mutex
	CoordErrStatsData
}

func (*CoordErrStats) GetCopy

func (self *CoordErrStats) GetCopy() *CoordErrStatsData

type CoordErrStatsData

type CoordErrStatsData struct {
	WriteEpochError        int64
	WriteNotLeaderError    int64
	WriteQuorumError       int64
	WriteBusyError         int64
	RpcCheckFailed         int64
	LeadershipError        int64
	TopicCoordMissingError int64
	LocalErr               int64
	OtherCoordErrs         map[string]int64
}

type CoordErrType

type CoordErrType int
const (
	CoordNoErr CoordErrType = iota
	CoordCommonErr
	CoordNetErr
	CoordElectionErr
	CoordElectionTmpErr
	CoordClusterErr
	CoordSlaveErr
	CoordLocalErr
	CoordLocalTmpErr
	CoordTmpErr
	CoordClusterNoRetryWriteErr
)

type CoordStats

type CoordStats struct {
	RpcStats        *gorpc.ConnStats `json:"rpc_stats"`
	ErrStats        CoordErrStatsData
	TopicCoordStats []TopicCoordStat `json:"topic_coord_stats"`
}

type DataPlacement

type DataPlacement struct {
	// contains filtered or unexported fields
}

func NewDataPlacement

func NewDataPlacement(coord *NsqLookupCoordinator) *DataPlacement

func (*DataPlacement) DoBalance

func (self *DataPlacement) DoBalance(monitorChan chan struct{})

func (*DataPlacement) SetBalanceInterval

func (self *DataPlacement) SetBalanceInterval(start int, end int)

type EpochType

type EpochType int64

type ErrRPCRetCode

type ErrRPCRetCode int
const (
	RpcNoErr ErrRPCRetCode = iota
	RpcCommonErr
)
const (
	RpcErrLeavingISRWait ErrRPCRetCode = iota + 10
	RpcErrNotTopicLeader
	RpcErrNoLeader
	RpcErrEpochMismatch
	RpcErrEpochLessThanCurrent
	RpcErrWriteQuorumFailed
	RpcErrCommitLogIDDup
	RpcErrCommitLogEOF
	RpcErrCommitLogOutofBound
	RpcErrCommitLogLessThanSegmentStart
	RpcErrMissingTopicLeaderSession
	RpcErrLeaderSessionMismatch
	RpcErrWriteDisabled
	RpcErrTopicNotExist
	RpcErrMissingTopicCoord
	RpcErrTopicCoordExistingAndMismatch
	RpcErrTopicCoordConflicted
	RpcErrTopicLeaderChanged
	RpcErrTopicLoading
	RpcErrSlaveStateInvalid
	RpcErrTopicCoordStateInvalid
	RpcErrWriteOnNonISR
)

type ICommitLogComparator

type ICommitLogComparator interface {
	SearchEndBoundary() int64
	LessThanLeftBoundary(l *CommitLogData) bool
	GreatThanRightBoundary(l *CommitLogData) bool
}

type INsqlookupRemoteProxy

type INsqlookupRemoteProxy interface {
	RemoteAddr() string
	Reconnect() error
	Close()
	RequestJoinCatchup(topic string, partition int, nid string) *CoordErr
	RequestJoinTopicISR(topic string, partition int, nid string) *CoordErr
	ReadyForTopicISR(topic string, partition int, nid string, leaderSession *TopicLeaderSession, joinISRSession string) *CoordErr
	RequestLeaveFromISR(topic string, partition int, nid string) *CoordErr
	RequestLeaveFromISRByLeader(topic string, partition int, nid string, leaderSession *TopicLeaderSession) *CoordErr
	RequestNotifyNewTopicInfo(topic string, partition int, nid string)
	RequestCheckTopicConsistence(topic string, partition int)
}

func NewNsqLookupRpcClient

func NewNsqLookupRpcClient(addr string, timeout time.Duration) (INsqlookupRemoteProxy, error)

type ISRStat

type ISRStat struct {
	HostName string `json:"hostname"`
	NodeID   string `json:"node_id"`
}

type IntHeap

type IntHeap []int

An IntHeap is a min-heap of ints.

func (IntHeap) Len

func (h IntHeap) Len() int

func (IntHeap) Less

func (h IntHeap) Less(i, j int) bool

func (*IntHeap) Pop

func (h *IntHeap) Pop() interface{}

func (*IntHeap) Push

func (h *IntHeap) Push(x interface{})

func (IntHeap) Swap

func (h IntHeap) Swap(i, j int)

type JoinISRState

type JoinISRState struct {
	sync.Mutex
	// contains filtered or unexported fields
}

type LogStartInfo

type LogStartInfo struct {
	SegmentStartCount int64
	SegmentStartIndex int64
	// the start offset in the file number at start index
	// This can allow the commit log start at the middle of the segment file
	SegmentStartOffset int64
}

type MasterChanInfo

type MasterChanInfo struct {
	// contains filtered or unexported fields
}

type MsgIDComparator

type MsgIDComparator int64

func (MsgIDComparator) GreatThanRightBoundary

func (self MsgIDComparator) GreatThanRightBoundary(l *CommitLogData) bool

func (MsgIDComparator) LessThanLeftBoundary

func (self MsgIDComparator) LessThanLeftBoundary(l *CommitLogData) bool

func (MsgIDComparator) SearchEndBoundary

func (self MsgIDComparator) SearchEndBoundary() int64

type MsgOffsetComparator

type MsgOffsetComparator int64

func (MsgOffsetComparator) GreatThanRightBoundary

func (self MsgOffsetComparator) GreatThanRightBoundary(l *CommitLogData) bool

func (MsgOffsetComparator) LessThanLeftBoundary

func (self MsgOffsetComparator) LessThanLeftBoundary(l *CommitLogData) bool

func (MsgOffsetComparator) SearchEndBoundary

func (self MsgOffsetComparator) SearchEndBoundary() int64

type MsgTimestampComparator

type MsgTimestampComparator struct {
	// contains filtered or unexported fields
}

func (*MsgTimestampComparator) GreatThanRightBoundary

func (self *MsgTimestampComparator) GreatThanRightBoundary(l *CommitLogData) bool

func (*MsgTimestampComparator) LessThanLeftBoundary

func (self *MsgTimestampComparator) LessThanLeftBoundary(l *CommitLogData) bool

func (*MsgTimestampComparator) SearchEndBoundary

func (self *MsgTimestampComparator) SearchEndBoundary() int64

type NSQDLeadership

type NSQDLeadership interface {
	InitClusterID(id string)
	RegisterNsqd(nodeData *NsqdNodeInfo) error // update
	UnregisterNsqd(nodeData *NsqdNodeInfo) error
	// try create the topic leadership key and no need to retry if the key already exist
	AcquireTopicLeader(topic string, partition int, nodeData *NsqdNodeInfo, epoch EpochType) error
	// release the session key using the acquired session. should check current session epoch
	// to avoid release the changed session
	ReleaseTopicLeader(topic string, partition int, session *TopicLeaderSession) error
	// all registered lookup nodes.
	GetAllLookupdNodes() ([]NsqLookupdNodeInfo, error)
	// get the newest lookup leader and watch the change of it.
	WatchLookupdLeader(leader chan *NsqLookupdNodeInfo, stop chan struct{}) error
	GetTopicInfo(topic string, partition int) (*TopicPartitionMetaInfo, error)
	// get leadership information, if not exist should return ErrLeaderSessionNotExist as error
	GetTopicLeaderSession(topic string, partition int) (*TopicLeaderSession, error)
}

type NSQLookupdLeadership

type NSQLookupdLeadership interface {
	InitClusterID(id string)
	Register(value *NsqLookupdNodeInfo) error
	Unregister(value *NsqLookupdNodeInfo) error
	Stop()
	// the cluster root modify index
	GetClusterEpoch() (EpochType, error)
	GetAllLookupdNodes() ([]NsqLookupdNodeInfo, error) // add
	AcquireAndWatchLeader(leader chan *NsqLookupdNodeInfo, stop chan struct{})
	CheckIfLeader(session string) bool
	UpdateLookupEpoch(oldGen EpochType) (EpochType, error)

	GetNsqdNodes() ([]NsqdNodeInfo, error)
	// watching the cluster nsqd node, should return the newest for the first time.
	WatchNsqdNodes(nsqds chan []NsqdNodeInfo, stop chan struct{})
	// get all topics info, should cache the newest to improve performance.
	ScanTopics() ([]TopicPartitionMetaInfo, error)
	// should return both the meta info for topic and the replica info for topic partition
	// epoch should be updated while return
	GetTopicInfo(topic string, partition int) (*TopicPartitionMetaInfo, error)
	// create and write the meta info to topic meta node
	CreateTopic(topic string, meta *TopicMetaInfo) error
	// create topic partition path
	CreateTopicPartition(topic string, partition int) error
	IsExistTopic(topic string) (bool, error)
	IsExistTopicPartition(topic string, partition int) (bool, error)
	// get topic meta info only
	GetTopicMetaInfo(topic string) (TopicMetaInfo, EpochType, error)
	UpdateTopicMetaInfo(topic string, meta *TopicMetaInfo, oldGen EpochType) error
	DeleteTopic(topic string, partition int) error
	DeleteWholeTopic(topic string) error
	//
	// update the replica info about leader, isr, epoch for partition
	// Note: update should do check-and-set to avoid unexpected override.
	// the epoch in topicInfo should be updated to the new epoch
	// if no topic partition replica info node should create only once.
	UpdateTopicNodeInfo(topic string, partition int, topicInfo *TopicPartitionReplicaInfo, oldGen EpochType) error
	// get leadership information, if not exist should return ErrLeaderSessionNotExist as error
	GetTopicLeaderSession(topic string, partition int) (*TopicLeaderSession, error)
	// watch any leadership lock change for all topic partitions, should return the token used later by release.
	WatchTopicLeader(leader chan *TopicLeaderSession, stop chan struct{}) error
	// only leader lookup can do the release, normally notify the nsqd node do the release by itself.
	// lookup node should release only when the nsqd is lost
	ReleaseTopicLeader(topic string, partition int, session *TopicLeaderSession) error
}

We need check leader lock session before do any modify to etcd. Make sure all returned value should be copied to avoid modify by outside.

type NodeTopicStats

type NodeTopicStats struct {
	NodeID string
	// the data (MB) need to be consumed on the leader for all channels in the topic.
	ChannelDepthData map[string]int64
	// the data left on disk. unit: MB
	TopicLeaderDataSize map[string]int64
	TopicTotalDataSize  map[string]int64
	NodeCPUs            int

	// the pub stat for past 24hr
	TopicHourlyPubDataList map[string][24]int64
	ChannelNum             map[string]int
	ChannelList            map[string][]string
}

func NewNodeTopicStats

func NewNodeTopicStats(nid string, cap int, cpus int) *NodeTopicStats

func (*NodeTopicStats) GetMostBusyAndIdleTopicWriteLevel

func (self *NodeTopicStats) GetMostBusyAndIdleTopicWriteLevel(leaderOnly bool) (string, string, float64, float64)

func (*NodeTopicStats) GetNodeAvgReadLevel

func (self *NodeTopicStats) GetNodeAvgReadLevel() float64

func (*NodeTopicStats) GetNodeAvgWriteLevel

func (self *NodeTopicStats) GetNodeAvgWriteLevel() float64

func (*NodeTopicStats) GetNodeLeaderLoadFactor

func (self *NodeTopicStats) GetNodeLeaderLoadFactor() float64

func (*NodeTopicStats) GetNodeLoadFactor

func (self *NodeTopicStats) GetNodeLoadFactor() (float64, float64)

func (*NodeTopicStats) GetNodePeakLevelList

func (self *NodeTopicStats) GetNodePeakLevelList() []int64

func (*NodeTopicStats) GetSortedTopicWriteLevel

func (self *NodeTopicStats) GetSortedTopicWriteLevel(leaderOnly bool) topicLFListT

func (*NodeTopicStats) GetTopicAvgWriteLevel

func (self *NodeTopicStats) GetTopicAvgWriteLevel(topicFullName string) float64

func (*NodeTopicStats) GetTopicLeaderLoadFactor

func (self *NodeTopicStats) GetTopicLeaderLoadFactor(topicFullName string) float64

func (*NodeTopicStats) GetTopicLoadFactor

func (self *NodeTopicStats) GetTopicLoadFactor(topicFullName string) float64

func (*NodeTopicStats) GetTopicPeakLevel

func (self *NodeTopicStats) GetTopicPeakLevel(topic TopicPartitionID) float64

func (*NodeTopicStats) LeaderLessLoader

func (self *NodeTopicStats) LeaderLessLoader(other *NodeTopicStats) bool

func (*NodeTopicStats) SlaveLessLoader

func (self *NodeTopicStats) SlaveLessLoader(other *NodeTopicStats) bool

type NsqLookupCoordRpcServer

type NsqLookupCoordRpcServer struct {
	// contains filtered or unexported fields
}

func NewNsqLookupCoordRpcServer

func NewNsqLookupCoordRpcServer(coord *NsqLookupCoordinator) *NsqLookupCoordRpcServer

func (*NsqLookupCoordRpcServer) ReadyForTopicISR

func (self *NsqLookupCoordRpcServer) ReadyForTopicISR(req *RpcReadyForISR) *CoordErr

func (*NsqLookupCoordRpcServer) RequestCheckTopicConsistence

func (self *NsqLookupCoordRpcServer) RequestCheckTopicConsistence(req *RpcReqCheckTopic) *CoordErr

func (*NsqLookupCoordRpcServer) RequestJoinCatchup

func (self *NsqLookupCoordRpcServer) RequestJoinCatchup(req *RpcReqJoinCatchup) *CoordErr

func (*NsqLookupCoordRpcServer) RequestJoinTopicISR

func (self *NsqLookupCoordRpcServer) RequestJoinTopicISR(req *RpcReqJoinISR) *CoordErr

func (*NsqLookupCoordRpcServer) RequestLeaveFromISR

func (self *NsqLookupCoordRpcServer) RequestLeaveFromISR(req *RpcReqLeaveFromISR) *CoordErr

func (*NsqLookupCoordRpcServer) RequestLeaveFromISRByLeader

func (self *NsqLookupCoordRpcServer) RequestLeaveFromISRByLeader(req *RpcReqLeaveFromISRByLeader) *CoordErr

func (*NsqLookupCoordRpcServer) RequestNotifyNewTopicInfo

func (self *NsqLookupCoordRpcServer) RequestNotifyNewTopicInfo(req *RpcReqNewTopicInfo) *CoordErr

type NsqLookupCoordinator

type NsqLookupCoordinator struct {
	// contains filtered or unexported fields
}

func NewNsqLookupCoordinator

func NewNsqLookupCoordinator(cluster string, n *NsqLookupdNodeInfo, opts *Options) *NsqLookupCoordinator

func (*NsqLookupCoordinator) ChangeTopicMetaParam

func (self *NsqLookupCoordinator) ChangeTopicMetaParam(topic string,
	newSyncEvery int, newRetentionDay int, newReplicator int) error

func (*NsqLookupCoordinator) CreateTopic

func (self *NsqLookupCoordinator) CreateTopic(topic string, meta TopicMetaInfo) error

func (*NsqLookupCoordinator) DeleteTopic

func (self *NsqLookupCoordinator) DeleteTopic(topic string, partition string) error

func (*NsqLookupCoordinator) DeleteTopicForce

func (self *NsqLookupCoordinator) DeleteTopicForce(topic string, partition string) error

func (*NsqLookupCoordinator) ExpandTopicPartition

func (self *NsqLookupCoordinator) ExpandTopicPartition(topic string, newPartitionNum int) error

func (*NsqLookupCoordinator) GetAllLookupdNodes

func (self *NsqLookupCoordinator) GetAllLookupdNodes() ([]NsqLookupdNodeInfo, error)

func (*NsqLookupCoordinator) GetClusterNodeLoadFactor

func (self *NsqLookupCoordinator) GetClusterNodeLoadFactor() (map[string]float64, map[string]float64)

func (*NsqLookupCoordinator) GetLookupLeader

func (self *NsqLookupCoordinator) GetLookupLeader() NsqLookupdNodeInfo

func (*NsqLookupCoordinator) GetTopicLeaderNodes

func (self *NsqLookupCoordinator) GetTopicLeaderNodes(topicName string) (map[string]string, error)

func (*NsqLookupCoordinator) GetTopicMetaInfo

func (self *NsqLookupCoordinator) GetTopicMetaInfo(topicName string) (TopicMetaInfo, error)

func (*NsqLookupCoordinator) IsClusterStable

func (self *NsqLookupCoordinator) IsClusterStable() bool

func (*NsqLookupCoordinator) IsMineLeader

func (self *NsqLookupCoordinator) IsMineLeader() bool

func (*NsqLookupCoordinator) IsTopicLeader

func (self *NsqLookupCoordinator) IsTopicLeader(topic string, part int, nid string) bool

func (*NsqLookupCoordinator) MarkNodeAsRemoving

func (self *NsqLookupCoordinator) MarkNodeAsRemoving(nid string) error

func (*NsqLookupCoordinator) MoveTopicPartitionDataByManual

func (self *NsqLookupCoordinator) MoveTopicPartitionDataByManual(topicName string,
	partitionID int, moveLeader bool, fromNode string, toNode string) error

func (*NsqLookupCoordinator) SetClusterUpgradeState

func (self *NsqLookupCoordinator) SetClusterUpgradeState(upgrading bool) error

func (*NsqLookupCoordinator) SetLeadershipMgr

func (self *NsqLookupCoordinator) SetLeadershipMgr(l NSQLookupdLeadership)

func (*NsqLookupCoordinator) Start

func (self *NsqLookupCoordinator) Start() error

init and register to leader server

func (*NsqLookupCoordinator) Stop

func (self *NsqLookupCoordinator) Stop()

type NsqLookupRpcClient

type NsqLookupRpcClient struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func (*NsqLookupRpcClient) CallWithRetry

func (self *NsqLookupRpcClient) CallWithRetry(method string, arg interface{}) (interface{}, error)

func (*NsqLookupRpcClient) Close

func (self *NsqLookupRpcClient) Close()

func (*NsqLookupRpcClient) ReadyForTopicISR

func (self *NsqLookupRpcClient) ReadyForTopicISR(topic string, partition int, nid string, leaderSession *TopicLeaderSession, joinISRSession string) *CoordErr

func (*NsqLookupRpcClient) Reconnect

func (self *NsqLookupRpcClient) Reconnect() error

func (*NsqLookupRpcClient) RemoteAddr

func (self *NsqLookupRpcClient) RemoteAddr() string

func (*NsqLookupRpcClient) RequestCheckTopicConsistence

func (self *NsqLookupRpcClient) RequestCheckTopicConsistence(topic string, partition int)

func (*NsqLookupRpcClient) RequestJoinCatchup

func (self *NsqLookupRpcClient) RequestJoinCatchup(topic string, partition int, nid string) *CoordErr

func (*NsqLookupRpcClient) RequestJoinTopicISR

func (self *NsqLookupRpcClient) RequestJoinTopicISR(topic string, partition int, nid string) *CoordErr

func (*NsqLookupRpcClient) RequestLeaveFromISR

func (self *NsqLookupRpcClient) RequestLeaveFromISR(topic string, partition int, nid string) *CoordErr

func (*NsqLookupRpcClient) RequestLeaveFromISRByLeader

func (self *NsqLookupRpcClient) RequestLeaveFromISRByLeader(topic string, partition int, nid string, leaderSession *TopicLeaderSession) *CoordErr

func (*NsqLookupRpcClient) RequestNotifyNewTopicInfo

func (self *NsqLookupRpcClient) RequestNotifyNewTopicInfo(topic string, partition int, nid string)

func (*NsqLookupRpcClient) ShouldRemoved

func (self *NsqLookupRpcClient) ShouldRemoved() bool

type NsqLookupdEtcdMgr

type NsqLookupdEtcdMgr struct {
	// contains filtered or unexported fields
}

func NewNsqLookupdEtcdMgr

func NewNsqLookupdEtcdMgr(host string) *NsqLookupdEtcdMgr

func (*NsqLookupdEtcdMgr) AcquireAndWatchLeader

func (self *NsqLookupdEtcdMgr) AcquireAndWatchLeader(leader chan *NsqLookupdNodeInfo, stop chan struct{})

func (*NsqLookupdEtcdMgr) CheckIfLeader

func (self *NsqLookupdEtcdMgr) CheckIfLeader(session string) bool

func (*NsqLookupdEtcdMgr) CreateTopic

func (self *NsqLookupdEtcdMgr) CreateTopic(topic string, meta *TopicMetaInfo) error

func (*NsqLookupdEtcdMgr) CreateTopicPartition

func (self *NsqLookupdEtcdMgr) CreateTopicPartition(topic string, partition int) error

func (*NsqLookupdEtcdMgr) DeleteTopic

func (self *NsqLookupdEtcdMgr) DeleteTopic(topic string, partition int) error

func (*NsqLookupdEtcdMgr) DeleteWholeTopic

func (self *NsqLookupdEtcdMgr) DeleteWholeTopic(topic string) error

func (*NsqLookupdEtcdMgr) GetAllLookupdNodes

func (self *NsqLookupdEtcdMgr) GetAllLookupdNodes() ([]NsqLookupdNodeInfo, error)

func (*NsqLookupdEtcdMgr) GetClusterEpoch

func (self *NsqLookupdEtcdMgr) GetClusterEpoch() (EpochType, error)

func (*NsqLookupdEtcdMgr) GetNsqdNodes

func (self *NsqLookupdEtcdMgr) GetNsqdNodes() ([]NsqdNodeInfo, error)

func (*NsqLookupdEtcdMgr) GetTopicInfo

func (self *NsqLookupdEtcdMgr) GetTopicInfo(topic string, partition int) (*TopicPartitionMetaInfo, error)

func (*NsqLookupdEtcdMgr) GetTopicLeaderSession

func (self *NsqLookupdEtcdMgr) GetTopicLeaderSession(topic string, partition int) (*TopicLeaderSession, error)

func (*NsqLookupdEtcdMgr) GetTopicMetaInfo

func (self *NsqLookupdEtcdMgr) GetTopicMetaInfo(topic string) (TopicMetaInfo, EpochType, error)

func (*NsqLookupdEtcdMgr) InitClusterID

func (self *NsqLookupdEtcdMgr) InitClusterID(id string)

func (*NsqLookupdEtcdMgr) IsExistTopic

func (self *NsqLookupdEtcdMgr) IsExistTopic(topic string) (bool, error)

func (*NsqLookupdEtcdMgr) IsExistTopicPartition

func (self *NsqLookupdEtcdMgr) IsExistTopicPartition(topic string, partitionNum int) (bool, error)

func (*NsqLookupdEtcdMgr) Register

func (self *NsqLookupdEtcdMgr) Register(value *NsqLookupdNodeInfo) error

func (*NsqLookupdEtcdMgr) ReleaseTopicLeader

func (self *NsqLookupdEtcdMgr) ReleaseTopicLeader(topic string, partition int, session *TopicLeaderSession) error

func (*NsqLookupdEtcdMgr) ScanTopics

func (self *NsqLookupdEtcdMgr) ScanTopics() ([]TopicPartitionMetaInfo, error)

func (*NsqLookupdEtcdMgr) Stop

func (self *NsqLookupdEtcdMgr) Stop()

func (*NsqLookupdEtcdMgr) Unregister

func (self *NsqLookupdEtcdMgr) Unregister(value *NsqLookupdNodeInfo) error

func (*NsqLookupdEtcdMgr) UpdateLookupEpoch

func (self *NsqLookupdEtcdMgr) UpdateLookupEpoch(oldGen EpochType) (EpochType, error)

func (*NsqLookupdEtcdMgr) UpdateTopicMetaInfo

func (self *NsqLookupdEtcdMgr) UpdateTopicMetaInfo(topic string, meta *TopicMetaInfo, oldGen EpochType) error

func (*NsqLookupdEtcdMgr) UpdateTopicNodeInfo

func (self *NsqLookupdEtcdMgr) UpdateTopicNodeInfo(topic string, partition int, topicInfo *TopicPartitionReplicaInfo, oldGen EpochType) error

func (*NsqLookupdEtcdMgr) WatchNsqdNodes

func (self *NsqLookupdEtcdMgr) WatchNsqdNodes(nsqds chan []NsqdNodeInfo, stop chan struct{})

func (*NsqLookupdEtcdMgr) WatchTopicLeader

func (self *NsqLookupdEtcdMgr) WatchTopicLeader(leader chan *TopicLeaderSession, stop chan struct{}) error

maybe use: go WatchTopicLeader()...

type NsqLookupdNodeInfo

type NsqLookupdNodeInfo struct {
	ID       string
	NodeIP   string
	TcpPort  string
	HttpPort string
	RpcPort  string
	Epoch    EpochType
}

func (*NsqLookupdNodeInfo) GetID

func (self *NsqLookupdNodeInfo) GetID() string

type NsqdCoordRpcServer

type NsqdCoordRpcServer struct {
	// contains filtered or unexported fields
}

func NewNsqdCoordRpcServer

func NewNsqdCoordRpcServer(coord *NsqdCoordinator, rootPath string) *NsqdCoordRpcServer

func (*NsqdCoordRpcServer) DeleteChannel

func (self *NsqdCoordRpcServer) DeleteChannel(info *RpcChannelOffsetArg) *CoordErr

func (*NsqdCoordRpcServer) DeleteNsqdTopic

func (self *NsqdCoordRpcServer) DeleteNsqdTopic(rpcTopicReq *RpcAdminTopicInfo) *CoordErr

func (*NsqdCoordRpcServer) DisableTopicWrite

func (self *NsqdCoordRpcServer) DisableTopicWrite(rpcTopicReq *RpcAdminTopicInfo) *CoordErr

func (*NsqdCoordRpcServer) EnableTopicWrite

func (self *NsqdCoordRpcServer) EnableTopicWrite(rpcTopicReq *RpcAdminTopicInfo) *CoordErr

func (*NsqdCoordRpcServer) GetCommitLogFromOffset

func (self *NsqdCoordRpcServer) GetCommitLogFromOffset(req *RpcCommitLogReq) *RpcCommitLogRsp

return the logdata from offset, if the offset is larger than local, then return the last logdata on local.

func (*NsqdCoordRpcServer) GetFullSyncInfo

func (self *NsqdCoordRpcServer) GetFullSyncInfo(req *RpcGetFullSyncInfoReq) (*RpcGetFullSyncInfoRsp, error)

func (*NsqdCoordRpcServer) GetLastCommitLogID

func (self *NsqdCoordRpcServer) GetLastCommitLogID(req *RpcCommitLogReq) (int64, error)

func (*NsqdCoordRpcServer) GetTopicStats

func (self *NsqdCoordRpcServer) GetTopicStats(topic string) *NodeTopicStats

func (*NsqdCoordRpcServer) IsTopicWriteDisabled

func (self *NsqdCoordRpcServer) IsTopicWriteDisabled(rpcTopicReq *RpcAdminTopicInfo) bool

func (*NsqdCoordRpcServer) NotifyAcquireTopicLeader

func (self *NsqdCoordRpcServer) NotifyAcquireTopicLeader(rpcTopicReq *RpcAcquireTopicLeaderReq) *CoordErr

func (*NsqdCoordRpcServer) NotifyReleaseTopicLeader

func (self *NsqdCoordRpcServer) NotifyReleaseTopicLeader(rpcTopicReq *RpcReleaseTopicLeaderReq) *CoordErr

func (*NsqdCoordRpcServer) NotifyTopicLeaderSession

func (self *NsqdCoordRpcServer) NotifyTopicLeaderSession(rpcTopicReq *RpcTopicLeaderSession) *CoordErr

func (*NsqdCoordRpcServer) PullCommitLogsAndData

func (self *NsqdCoordRpcServer) PullCommitLogsAndData(req *RpcPullCommitLogsReq) (*RpcPullCommitLogsRsp, error)

func (*NsqdCoordRpcServer) PutMessage

func (self *NsqdCoordRpcServer) PutMessage(info *RpcPutMessage) *CoordErr

receive from leader

func (*NsqdCoordRpcServer) PutMessages

func (self *NsqdCoordRpcServer) PutMessages(info *RpcPutMessages) *CoordErr

func (*NsqdCoordRpcServer) TestRpcCoordErr

func (self *NsqdCoordRpcServer) TestRpcCoordErr(req *RpcTestReq) *CoordErr

func (*NsqdCoordRpcServer) TestRpcError

func (self *NsqdCoordRpcServer) TestRpcError(req *RpcTestReq) *RpcTestRsp

func (*NsqdCoordRpcServer) TestRpcTimeout

func (self *NsqdCoordRpcServer) TestRpcTimeout() error

func (*NsqdCoordRpcServer) TriggerLookupChanged

func (self *NsqdCoordRpcServer) TriggerLookupChanged() error

func (*NsqdCoordRpcServer) UpdateChannelOffset

func (self *NsqdCoordRpcServer) UpdateChannelOffset(info *RpcChannelOffsetArg) *CoordErr

func (*NsqdCoordRpcServer) UpdateTopicInfo

func (self *NsqdCoordRpcServer) UpdateTopicInfo(rpcTopicReq *RpcAdminTopicInfo) *CoordErr

type NsqdCoordinator

type NsqdCoordinator struct {
	// contains filtered or unexported fields
}

func NewNsqdCoordinator

func NewNsqdCoordinator(cluster, ip, tcpport, rpcport, extraID string, rootPath string, nsqd *nsqd.NSQD) *NsqdCoordinator

func (*NsqdCoordinator) DeleteChannel

func (self *NsqdCoordinator) DeleteChannel(topic *nsqd.Topic, channelName string) error

func (*NsqdCoordinator) FinishMessageToCluster

func (self *NsqdCoordinator) FinishMessageToCluster(channel *nsqd.Channel, clientID int64, clientAddr string, msgID nsqd.MessageID) error

func (*NsqdCoordinator) GetAllLookupdNodes

func (self *NsqdCoordinator) GetAllLookupdNodes() ([]NsqLookupdNodeInfo, error)

func (*NsqdCoordinator) GetCurrentLookupd

func (self *NsqdCoordinator) GetCurrentLookupd() NsqLookupdNodeInfo

func (*NsqdCoordinator) GetMasterTopicCoordData

func (self *NsqdCoordinator) GetMasterTopicCoordData(topic string) (int, *coordData, error)

since only one master is allowed on the same topic, we can get it.

func (*NsqdCoordinator) GetMyID

func (self *NsqdCoordinator) GetMyID() string

func (*NsqdCoordinator) IsMineLeaderForTopic

func (self *NsqdCoordinator) IsMineLeaderForTopic(topic string, part int) bool

func (*NsqdCoordinator) PutMessageToCluster

func (self *NsqdCoordinator) PutMessageToCluster(topic *nsqd.Topic,
	body []byte, traceID uint64) (nsqd.MessageID, nsqd.BackendOffset, int32, nsqd.BackendQueueEnd, error)

func (*NsqdCoordinator) PutMessagesToCluster

func (self *NsqdCoordinator) PutMessagesToCluster(topic *nsqd.Topic,
	msgs []*nsqd.Message) (nsqd.MessageID, nsqd.BackendOffset, int32, error)

func (*NsqdCoordinator) SearchLogByMsgCnt

func (self *NsqdCoordinator) SearchLogByMsgCnt(topic string, part int, count int64) (*CommitLogData, int64, int64, error)

func (*NsqdCoordinator) SearchLogByMsgID

func (self *NsqdCoordinator) SearchLogByMsgID(topic string, part int, msgID int64) (*CommitLogData, int64, int64, error)

func (*NsqdCoordinator) SearchLogByMsgOffset

func (self *NsqdCoordinator) SearchLogByMsgOffset(topic string, part int, offset int64) (*CommitLogData, int64, int64, error)

func (*NsqdCoordinator) SearchLogByMsgTimestamp

func (self *NsqdCoordinator) SearchLogByMsgTimestamp(topic string, part int, ts_sec int64) (*CommitLogData, int64, int64, error)

return the searched log data and the exact offset the reader should be reset and the total count before the offset.

func (*NsqdCoordinator) SetChannelConsumeOffsetToCluster

func (self *NsqdCoordinator) SetChannelConsumeOffsetToCluster(ch *nsqd.Channel, queueOffset int64, cnt int64, force bool) error

func (*NsqdCoordinator) SetLeadershipMgr

func (self *NsqdCoordinator) SetLeadershipMgr(l NSQDLeadership)

func (*NsqdCoordinator) Start

func (self *NsqdCoordinator) Start() error

func (*NsqdCoordinator) Stats

func (self *NsqdCoordinator) Stats(topic string, part int) *CoordStats

func (*NsqdCoordinator) Stop

func (self *NsqdCoordinator) Stop()

type NsqdEtcdMgr

type NsqdEtcdMgr struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewNsqdEtcdMgr

func NewNsqdEtcdMgr(host string) *NsqdEtcdMgr

func (*NsqdEtcdMgr) AcquireTopicLeader

func (self *NsqdEtcdMgr) AcquireTopicLeader(topic string, partition int, nodeData *NsqdNodeInfo, epoch EpochType) error

func (*NsqdEtcdMgr) GetAllLookupdNodes

func (self *NsqdEtcdMgr) GetAllLookupdNodes() ([]NsqLookupdNodeInfo, error)

func (*NsqdEtcdMgr) GetTopicInfo

func (self *NsqdEtcdMgr) GetTopicInfo(topic string, partition int) (*TopicPartitionMetaInfo, error)

func (*NsqdEtcdMgr) GetTopicLeaderSession

func (self *NsqdEtcdMgr) GetTopicLeaderSession(topic string, partition int) (*TopicLeaderSession, error)

func (*NsqdEtcdMgr) InitClusterID

func (self *NsqdEtcdMgr) InitClusterID(id string)

func (*NsqdEtcdMgr) RegisterNsqd

func (self *NsqdEtcdMgr) RegisterNsqd(nodeData *NsqdNodeInfo) error

func (*NsqdEtcdMgr) ReleaseTopicLeader

func (self *NsqdEtcdMgr) ReleaseTopicLeader(topic string, partition int, session *TopicLeaderSession) error

func (*NsqdEtcdMgr) UnregisterNsqd

func (self *NsqdEtcdMgr) UnregisterNsqd(nodeData *NsqdNodeInfo) error

func (*NsqdEtcdMgr) WatchLookupdLeader

func (self *NsqdEtcdMgr) WatchLookupdLeader(leader chan *NsqLookupdNodeInfo, stop chan struct{}) error

type NsqdNodeInfo

type NsqdNodeInfo struct {
	ID      string
	NodeIP  string
	TcpPort string
	RpcPort string
}

func (*NsqdNodeInfo) GetID

func (self *NsqdNodeInfo) GetID() string

type NsqdNodeLoadFactor

type NsqdNodeLoadFactor struct {
	// contains filtered or unexported fields
}

type NsqdRpcClient

type NsqdRpcClient struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewNsqdRpcClient

func NewNsqdRpcClient(addr string, timeout time.Duration) (*NsqdRpcClient, error)

func (*NsqdRpcClient) CallFast

func (self *NsqdRpcClient) CallFast(method string, arg interface{}) (interface{}, error)

func (*NsqdRpcClient) CallRpcTest

func (self *NsqdRpcClient) CallRpcTest(data string) (string, *CoordErr)

func (*NsqdRpcClient) CallRpcTestCoordErr

func (self *NsqdRpcClient) CallRpcTestCoordErr(data string) *CoordErr

func (*NsqdRpcClient) CallRpcTesttimeout

func (self *NsqdRpcClient) CallRpcTesttimeout(data string) error

func (*NsqdRpcClient) CallWithRetry

func (self *NsqdRpcClient) CallWithRetry(method string, arg interface{}) (interface{}, error)

func (*NsqdRpcClient) Close

func (self *NsqdRpcClient) Close()

func (*NsqdRpcClient) DeleteChannel

func (self *NsqdRpcClient) DeleteChannel(leaderSession *TopicLeaderSession, info *TopicPartitionMetaInfo, channel string) *CoordErr

func (*NsqdRpcClient) DeleteNsqdTopic

func (self *NsqdRpcClient) DeleteNsqdTopic(epoch EpochType, topicInfo *TopicPartitionMetaInfo) *CoordErr

func (*NsqdRpcClient) DisableTopicWrite

func (self *NsqdRpcClient) DisableTopicWrite(epoch EpochType, topicInfo *TopicPartitionMetaInfo) *CoordErr

func (*NsqdRpcClient) DisableTopicWriteFast

func (self *NsqdRpcClient) DisableTopicWriteFast(epoch EpochType, topicInfo *TopicPartitionMetaInfo) *CoordErr

func (*NsqdRpcClient) EnableTopicWrite

func (self *NsqdRpcClient) EnableTopicWrite(epoch EpochType, topicInfo *TopicPartitionMetaInfo) *CoordErr

func (*NsqdRpcClient) GetCommitLogFromOffset

func (self *NsqdRpcClient) GetCommitLogFromOffset(topicInfo *TopicPartitionMetaInfo, logCountNumIndex int64,
	logIndex int64, offset int64) (bool, int64, int64, int64, CommitLogData, *CoordErr)

func (*NsqdRpcClient) GetFullSyncInfo

func (self *NsqdRpcClient) GetFullSyncInfo(topic string, partition int) (*LogStartInfo, *CommitLogData, error)

func (*NsqdRpcClient) GetLastCommitLogID

func (self *NsqdRpcClient) GetLastCommitLogID(topicInfo *TopicPartitionMetaInfo) (int64, *CoordErr)

func (*NsqdRpcClient) GetTopicStats

func (self *NsqdRpcClient) GetTopicStats(topic string) (*NodeTopicStats, error)

func (*NsqdRpcClient) IsTopicWriteDisabled

func (self *NsqdRpcClient) IsTopicWriteDisabled(topicInfo *TopicPartitionMetaInfo) bool

func (*NsqdRpcClient) NotifyAcquireTopicLeader

func (self *NsqdRpcClient) NotifyAcquireTopicLeader(epoch EpochType, topicInfo *TopicPartitionMetaInfo) *CoordErr

func (*NsqdRpcClient) NotifyReleaseTopicLeader

func (self *NsqdRpcClient) NotifyReleaseTopicLeader(epoch EpochType, topicInfo *TopicPartitionMetaInfo, leaderSessionEpoch EpochType) *CoordErr

func (*NsqdRpcClient) NotifyTopicLeaderSession

func (self *NsqdRpcClient) NotifyTopicLeaderSession(epoch EpochType, topicInfo *TopicPartitionMetaInfo, leaderSession *TopicLeaderSession, joinSession string) *CoordErr

func (*NsqdRpcClient) NotifyUpdateChannelOffset

func (self *NsqdRpcClient) NotifyUpdateChannelOffset(leaderSession *TopicLeaderSession, info *TopicPartitionMetaInfo, channel string, offset ChannelConsumerOffset) *CoordErr

func (*NsqdRpcClient) PullCommitLogsAndData

func (self *NsqdRpcClient) PullCommitLogsAndData(topic string, partition int, logCountNumIndex int64,
	logIndex int64, startOffset int64, num int) ([]CommitLogData, [][]byte, error)

func (*NsqdRpcClient) PutMessage

func (self *NsqdRpcClient) PutMessage(leaderSession *TopicLeaderSession, info *TopicPartitionMetaInfo, log CommitLogData, message *nsqd.Message) *CoordErr

func (*NsqdRpcClient) PutMessages

func (self *NsqdRpcClient) PutMessages(leaderSession *TopicLeaderSession, info *TopicPartitionMetaInfo, log CommitLogData, messages []*nsqd.Message) *CoordErr

func (*NsqdRpcClient) Reconnect

func (self *NsqdRpcClient) Reconnect() error

func (*NsqdRpcClient) ShouldRemoved

func (self *NsqdRpcClient) ShouldRemoved() bool

func (*NsqdRpcClient) TriggerLookupChanged

func (self *NsqdRpcClient) TriggerLookupChanged() error

func (*NsqdRpcClient) UpdateChannelOffset

func (self *NsqdRpcClient) UpdateChannelOffset(leaderSession *TopicLeaderSession, info *TopicPartitionMetaInfo, channel string, offset ChannelConsumerOffset) *CoordErr

func (*NsqdRpcClient) UpdateTopicInfo

func (self *NsqdRpcClient) UpdateTopicInfo(epoch EpochType, topicInfo *TopicPartitionMetaInfo) *CoordErr

type Options

type Options struct {
	BalanceStart int
	BalanceEnd   int
}

type RpcAcquireTopicLeaderReq

type RpcAcquireTopicLeaderReq struct {
	RpcTopicData
	LeaderNodeID string
	LookupdEpoch EpochType
}

type RpcAdminTopicInfo

type RpcAdminTopicInfo struct {
	TopicPartitionMetaInfo
	LookupdEpoch EpochType
	DisableWrite bool
}

type RpcChannelOffsetArg

type RpcChannelOffsetArg struct {
	RpcTopicData
	Channel string
	// position file + file offset
	ChannelOffset ChannelConsumerOffset
}

type RpcCommitLogReq

type RpcCommitLogReq struct {
	RpcTopicData
	LogOffset        int64
	LogStartIndex    int64
	LogCountNumIndex int64
	UseCountIndex    bool
}

type RpcCommitLogRsp

type RpcCommitLogRsp struct {
	LogStartIndex    int64
	LogOffset        int64
	LogData          CommitLogData
	ErrInfo          CoordErr
	LogCountNumIndex int64
	UseCountIndex    bool
}

type RpcFailedInfo

type RpcFailedInfo struct {
	// contains filtered or unexported fields
}

type RpcGetFullSyncInfoReq

type RpcGetFullSyncInfoReq struct {
	RpcTopicData
}

type RpcGetFullSyncInfoRsp

type RpcGetFullSyncInfoRsp struct {
	FirstLogData CommitLogData
	StartInfo    LogStartInfo
}

type RpcLookupReqBase

type RpcLookupReqBase struct {
	TopicName      string
	TopicPartition int
	NodeID         string
}

type RpcPullCommitLogsReq

type RpcPullCommitLogsReq struct {
	RpcTopicData
	StartLogOffset   int64
	LogMaxNum        int
	StartIndexCnt    int64
	LogCountNumIndex int64
	UseCountIndex    bool
}

type RpcPullCommitLogsRsp

type RpcPullCommitLogsRsp struct {
	Logs     []CommitLogData
	DataList [][]byte
}

type RpcPutMessage

type RpcPutMessage struct {
	RpcTopicData
	LogData      CommitLogData
	TopicMessage *nsqd.Message
}

type RpcPutMessages

type RpcPutMessages struct {
	RpcTopicData
	LogData       CommitLogData
	TopicMessages []*nsqd.Message
}

type RpcReadyForISR

type RpcReadyForISR struct {
	RpcLookupReqBase
	LeaderSession  TopicLeaderSession
	JoinISRSession string
}

type RpcReleaseTopicLeaderReq

type RpcReleaseTopicLeaderReq struct {
	RpcTopicData
	LeaderNodeID string
	LookupdEpoch EpochType
}

type RpcReqCheckTopic

type RpcReqCheckTopic struct {
	RpcLookupReqBase
}

type RpcReqJoinCatchup

type RpcReqJoinCatchup struct {
	RpcLookupReqBase
}

type RpcReqJoinISR

type RpcReqJoinISR struct {
	RpcLookupReqBase
}

type RpcReqLeaveFromISR

type RpcReqLeaveFromISR struct {
	RpcLookupReqBase
}

type RpcReqLeaveFromISRByLeader

type RpcReqLeaveFromISRByLeader struct {
	RpcLookupReqBase
	LeaderSession TopicLeaderSession
}

type RpcReqNewTopicInfo

type RpcReqNewTopicInfo struct {
	RpcLookupReqBase
}

type RpcRspJoinISR

type RpcRspJoinISR struct {
	CoordErr
	JoinISRSession string
}

type RpcTestReq

type RpcTestReq struct {
	Data string
}

type RpcTestRsp

type RpcTestRsp struct {
	RspData string
	RetErr  *CoordErr
}

type RpcTopicData

type RpcTopicData struct {
	TopicName               string
	TopicPartition          int
	Epoch                   EpochType
	TopicWriteEpoch         EpochType
	TopicLeaderSessionEpoch EpochType
	TopicLeaderSession      string
	TopicLeader             string
}

type RpcTopicLeaderSession

type RpcTopicLeaderSession struct {
	RpcTopicData
	LeaderNode   NsqdNodeInfo
	LookupdEpoch EpochType
	JoinSession  string
}

type SlaveAsyncWriteResult

type SlaveAsyncWriteResult struct {
	// contains filtered or unexported fields
}

func (*SlaveAsyncWriteResult) GetResult

func (self *SlaveAsyncWriteResult) GetResult() *CoordErr

func (*SlaveAsyncWriteResult) Wait

func (self *SlaveAsyncWriteResult) Wait()

type SortableStrings

type SortableStrings []string

func (SortableStrings) Len

func (s SortableStrings) Len() int

func (SortableStrings) Less

func (s SortableStrings) Less(l, r int) bool

func (SortableStrings) Swap

func (s SortableStrings) Swap(l, r int)

type StatsSorter

type StatsSorter struct {
	// contains filtered or unexported fields
}

func (*StatsSorter) Len

func (s *StatsSorter) Len() int

func (*StatsSorter) Less

func (s *StatsSorter) Less(i, j int) bool

func (*StatsSorter) Swap

func (s *StatsSorter) Swap(i, j int)

type TopicCatchupInfo

type TopicCatchupInfo struct {
	CatchupList []string
}

type TopicChannelsInfo

type TopicChannelsInfo struct {
	Channels []string
}

type TopicCommitLogMgr

type TopicCommitLogMgr struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func InitTopicCommitLogMgr

func InitTopicCommitLogMgr(t string, p int, basepath string, commitBufSize int) (*TopicCommitLogMgr, error)

func (*TopicCommitLogMgr) AppendCommitLog

func (self *TopicCommitLogMgr) AppendCommitLog(l *CommitLogData, slave bool) error

func (*TopicCommitLogMgr) CleanOldData

func (self *TopicCommitLogMgr) CleanOldData(fileIndex int64, fileOffset int64) error

func (*TopicCommitLogMgr) Close

func (self *TopicCommitLogMgr) Close()

func (*TopicCommitLogMgr) ConvertToCountIndex

func (self *TopicCommitLogMgr) ConvertToCountIndex(start int64, offset int64) (int64, error)

func (*TopicCommitLogMgr) ConvertToOffsetIndex

func (self *TopicCommitLogMgr) ConvertToOffsetIndex(countIndex int64) (int64, int64, error)

func (*TopicCommitLogMgr) Delete

func (self *TopicCommitLogMgr) Delete()

func (*TopicCommitLogMgr) FlushCommitLogs

func (self *TopicCommitLogMgr) FlushCommitLogs()

func (*TopicCommitLogMgr) GetCommitLogFromOffsetV2

func (self *TopicCommitLogMgr) GetCommitLogFromOffsetV2(start int64, offset int64) (*CommitLogData, error)

func (*TopicCommitLogMgr) GetCommitLogsV2

func (self *TopicCommitLogMgr) GetCommitLogsV2(startIndex int64, startOffset int64, num int) ([]CommitLogData, error)

func (*TopicCommitLogMgr) GetCurrentEnd

func (self *TopicCommitLogMgr) GetCurrentEnd() (int64, int64)

func (*TopicCommitLogMgr) GetCurrentStart

func (self *TopicCommitLogMgr) GetCurrentStart() int64

func (*TopicCommitLogMgr) GetLastCommitLogDataOnSegment

func (self *TopicCommitLogMgr) GetLastCommitLogDataOnSegment(index int64) (int64, *CommitLogData, error)

func (*TopicCommitLogMgr) GetLastCommitLogID

func (self *TopicCommitLogMgr) GetLastCommitLogID() int64

func (*TopicCommitLogMgr) GetLastCommitLogOffsetV2

func (self *TopicCommitLogMgr) GetLastCommitLogOffsetV2() (int64, int64, *CommitLogData, error)

this will get the log data of last commit

func (*TopicCommitLogMgr) GetLogStartInfo

func (self *TopicCommitLogMgr) GetLogStartInfo() (*LogStartInfo, *CommitLogData, error)

func (*TopicCommitLogMgr) IsCommitted

func (self *TopicCommitLogMgr) IsCommitted(id int64) bool

func (*TopicCommitLogMgr) MoveTo

func (self *TopicCommitLogMgr) MoveTo(newBase string) error

func (*TopicCommitLogMgr) NextID

func (self *TopicCommitLogMgr) NextID() uint64

func (*TopicCommitLogMgr) Reopen

func (self *TopicCommitLogMgr) Reopen() error

func (*TopicCommitLogMgr) Reset

func (self *TopicCommitLogMgr) Reset(id uint64)

reset the nextid

func (*TopicCommitLogMgr) ResetLogWithStart

func (self *TopicCommitLogMgr) ResetLogWithStart(newStart LogStartInfo) error

func (*TopicCommitLogMgr) SearchLogDataByComparator

func (self *TopicCommitLogMgr) SearchLogDataByComparator(comp ICommitLogComparator) (int64, int64, *CommitLogData, error)

func (*TopicCommitLogMgr) SearchLogDataByMsgCnt

func (self *TopicCommitLogMgr) SearchLogDataByMsgCnt(searchCnt int64) (int64, int64, *CommitLogData, error)

func (*TopicCommitLogMgr) SearchLogDataByMsgID

func (self *TopicCommitLogMgr) SearchLogDataByMsgID(searchMsgID int64) (int64, int64, *CommitLogData, error)

func (*TopicCommitLogMgr) SearchLogDataByMsgOffset

func (self *TopicCommitLogMgr) SearchLogDataByMsgOffset(searchMsgOffset int64) (int64, int64, *CommitLogData, error)

func (*TopicCommitLogMgr) TruncateToOffsetV2

func (self *TopicCommitLogMgr) TruncateToOffsetV2(startIndex int64, offset int64) (*CommitLogData, error)

type TopicCoordStat

type TopicCoordStat struct {
	Node         string        `json:"node"`
	Name         string        `json:"name"`
	Partition    int           `json:"partition"`
	ISRStats     []ISRStat     `json:"isr_stats"`
	CatchupStats []CatchupStat `json:"catchup_stats"`
}

type TopicCoordinator

type TopicCoordinator struct {
	// contains filtered or unexported fields
}

func NewTopicCoordinator

func NewTopicCoordinator(name string, partition int, basepath string, syncEvery int) (*TopicCoordinator, error)

func (*TopicCoordinator) DeleteNoWriteLock

func (self *TopicCoordinator) DeleteNoWriteLock(removeData bool)

func (*TopicCoordinator) DeleteWithLock

func (self *TopicCoordinator) DeleteWithLock(removeData bool)

func (*TopicCoordinator) DisableWrite

func (self *TopicCoordinator) DisableWrite(disable bool)

func (*TopicCoordinator) Exiting

func (self *TopicCoordinator) Exiting()

func (TopicCoordinator) GetCopy

func (self TopicCoordinator) GetCopy() *coordData

func (*TopicCoordinator) GetData

func (self *TopicCoordinator) GetData() *coordData

func (TopicCoordinator) GetLeader

func (self TopicCoordinator) GetLeader() string

func (TopicCoordinator) GetLeaderSession

func (self TopicCoordinator) GetLeaderSession() string

func (TopicCoordinator) GetLeaderSessionEpoch

func (self TopicCoordinator) GetLeaderSessionEpoch() EpochType

func (TopicCoordinator) GetLeaderSessionID

func (self TopicCoordinator) GetLeaderSessionID() string

func (TopicCoordinator) GetTopicEpochForWrite

func (self TopicCoordinator) GetTopicEpochForWrite() EpochType

func (*TopicCoordinator) IsExiting

func (self *TopicCoordinator) IsExiting() bool

func (TopicCoordinator) IsForceLeave

func (self TopicCoordinator) IsForceLeave() bool

func (TopicCoordinator) IsISRReadyForWrite

func (self TopicCoordinator) IsISRReadyForWrite(myID string) bool

func (TopicCoordinator) IsMineISR

func (self TopicCoordinator) IsMineISR(id string) bool

func (TopicCoordinator) IsMineLeaderSessionReady

func (self TopicCoordinator) IsMineLeaderSessionReady(id string) bool

func (*TopicCoordinator) IsWriteDisabled

func (self *TopicCoordinator) IsWriteDisabled() bool

func (TopicCoordinator) SetForceLeave

func (self TopicCoordinator) SetForceLeave(leave bool)

type TopicLeaderSession

type TopicLeaderSession struct {
	Topic       string
	Partition   int
	LeaderNode  *NsqdNodeInfo
	Session     string
	LeaderEpoch EpochType
}

func (*TopicLeaderSession) IsSame

func (self *TopicLeaderSession) IsSame(other *TopicLeaderSession) bool

type TopicMetaInfo

type TopicMetaInfo struct {
	PartitionNum int
	Replica      int
	// the suggest load factor for each topic partition.
	SuggestLF int
	// other options
	SyncEvery int
	// to verify the data of the create -> delete -> create with same topic
	MagicCode int64
	// the retention days for the data
	RetentionDay int32
	// used for ordered multi partition topic
	// allow multi partitions on the same node
	OrderedMulti bool
}

type TopicNameInfo

type TopicNameInfo struct {
	TopicName      string
	TopicPartition int
}

type TopicPartitionID

type TopicPartitionID struct {
	TopicName      string
	TopicPartition int
}

func (*TopicPartitionID) String

func (self *TopicPartitionID) String() string

type TopicPartitionMetaInfo

type TopicPartitionMetaInfo struct {
	Name      string
	Partition int
	TopicMetaInfo
	TopicPartitionReplicaInfo
}

func (*TopicPartitionMetaInfo) GetTopicDesp

func (self *TopicPartitionMetaInfo) GetTopicDesp() string

type TopicPartitionReplicaInfo

type TopicPartitionReplicaInfo struct {
	Leader      string
	ISR         []string
	CatchupList []string
	Channels    []string
	// this is only used for write operation
	// if this changed during write, mean the current write should be abort
	EpochForWrite EpochType
	Epoch         EpochType
}

type TopicReplicasInfo

type TopicReplicasInfo struct {
	Leader string
	ISR    []string
}

type WatchTopicLeaderInfo

type WatchTopicLeaderInfo struct {
	// contains filtered or unexported fields
}

Directories

Path Synopsis
Package coordgrpc is a generated protocol buffer package.
Package coordgrpc is a generated protocol buffer package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL