Documentation ¶
Index ¶
- Constants
- Variables
- func GetQueueFileName(dataRoot string, base string, fileNum int64) string
- func GetTopicFullName(topic string, part int) string
- func GetTraceIDFromFullMsgID(id FullMessageID) uint64
- func IsValidDelayedMessage(m *Message) bool
- func MessageHeaderBytes() int
- func NewBufioReader(r io.Reader) *bufio.Reader
- func NsqLogger() *levellogger.LevelLogger
- func PrintMessage(m *Message) string
- func PutBufioReader(br *bufio.Reader)
- func SetLogger(log levellogger.Logger)
- func SetRemoteMsgTracer(remote string)
- type BackendOffset
- type BackendQueue
- type BackendQueueEnd
- type BackendQueueOffset
- type BackendQueueReader
- type BackendQueueWriter
- type Channel
- func (c *Channel) AddClient(clientID int64, client Consumer) error
- func (c *Channel) CleanWaitingRequeueChan(msg *Message)
- func (c *Channel) Close() error
- func (c *Channel) ConfirmBackendQueue(msg *Message) (BackendOffset, int64, bool)
- func (c *Channel) ConfirmBackendQueueOnSlave(offset BackendOffset, cnt int64, allowBackward bool) error
- func (c *Channel) ConfirmDelayedMessage(msg *Message) (BackendOffset, int64, bool)
- func (c *Channel) ContinueConsumeForOrder()
- func (c *Channel) Delete() error
- func (c *Channel) Depth() int64
- func (c *Channel) DepthSize() int64
- func (c *Channel) DepthTimestamp() int64
- func (c *Channel) DisableConsume(disable bool)
- func (c *Channel) Exiting() bool
- func (c *Channel) FinishMessage(clientID int64, clientAddr string, id MessageID) (BackendOffset, int64, bool, *Message, error)
- func (c *Channel) FinishMessageForce(clientID int64, clientAddr string, id MessageID, forceFin bool) (BackendOffset, int64, bool, *Message, error)
- func (c *Channel) GetChannelDebugStats() string
- func (c *Channel) GetChannelEnd() BackendQueueEnd
- func (c *Channel) GetChannelWaitingConfirmCnt() int64
- func (c *Channel) GetClientMsgChan() chan *Message
- func (c *Channel) GetClientTagMsgChan(tag string) (chan *Message, bool)
- func (c *Channel) GetClients() map[int64]Consumer
- func (c *Channel) GetClientsCount() int
- func (c *Channel) GetConfirmed() BackendQueueEnd
- func (c *Channel) GetConfirmedInterval() []MsgQueueInterval
- func (c *Channel) GetConfirmedIntervalLen() int
- func (c *Channel) GetDelayedQueue() *DelayQueue
- func (c *Channel) GetDelayedQueueConsumedState() (RecentKeyList, map[int]uint64, map[string]uint64)
- func (c *Channel) GetInflightNum() int
- func (c *Channel) GetMemDelayedMsgs() []MessageID
- func (c *Channel) GetName() string
- func (c *Channel) GetOrCreateClientMsgChannel(tag string) chan *Message
- func (c *Channel) GetTopicName() string
- func (c *Channel) GetTopicPart() int
- func (c *Channel) IsConfirmed(msg *Message) bool
- func (c *Channel) IsConsumeDisabled() bool
- func (c *Channel) IsEphemeral() bool
- func (c *Channel) IsExt() bool
- func (c *Channel) IsOrdered() bool
- func (c *Channel) IsPaused() bool
- func (c *Channel) IsSkipped() bool
- func (c *Channel) IsTraced() bool
- func (c *Channel) IsWaitingMoreData() bool
- func (c *Channel) Pause() error
- func (c *Channel) RemoveClient(clientID int64, clientTag string)
- func (c *Channel) RemoveTagClientMsgChannel(tag string)
- func (c *Channel) RequeueClientMessages(clientID int64, clientAddr string)
- func (c *Channel) RequeueMessage(clientID int64, clientAddr string, id MessageID, timeout time.Duration, ...) error
- func (c *Channel) SetConsumeOffset(offset BackendOffset, cnt int64, force bool) error
- func (c *Channel) SetDelayedQueue(dq *DelayQueue)
- func (c *Channel) SetExt(isExt bool)
- func (c *Channel) SetOrdered(enable bool)
- func (c *Channel) SetTrace(enable bool)
- func (c *Channel) ShouldRequeueToEnd(clientID int64, clientAddr string, id MessageID, timeout time.Duration, ...) (*Message, bool)
- func (c *Channel) ShouldWaitDelayed(msg *Message) bool
- func (c *Channel) Skip() error
- func (c *Channel) StartInFlightTimeout(msg *Message, client Consumer, clientAddr string, timeout time.Duration) (bool, error)
- func (c *Channel) TouchMessage(clientID int64, id MessageID, clientMsgTimeout time.Duration) error
- func (c *Channel) TryWakeupRead()
- func (c *Channel) UnPause() error
- func (c *Channel) UnSkip() error
- func (c *Channel) UpdateConfirmedInterval(intervals []MsgQueueInterval)
- func (c *Channel) UpdateQueueEnd(end BackendQueueEnd, forceReload bool) error
- type ChannelMetaInfo
- type ChannelStats
- type ChannelStatsInfo
- func (self *ChannelStatsInfo) GetChannelLatencyStats() []int64
- func (self *ChannelStatsInfo) GetDeliveryLatencyStats() []int64
- func (self *ChannelStatsInfo) UpdateChannelLatencyStats(latencyInMillSec int64)
- func (self *ChannelStatsInfo) UpdateChannelStats(latencyInMillSec int64)
- func (self *ChannelStatsInfo) UpdateDeliveryLatencyStats(latencyInMillSec int64)
- func (self *ChannelStatsInfo) UpdateDeliveryStats(latencyInMillSec int64)
- type Channels
- type ChannelsByName
- type ClientPubStats
- type ClientStats
- type ClientV2
- func (c *ClientV2) Auth(secret string) error
- func (c *ClientV2) Empty()
- func (c *ClientV2) Exit()
- func (c *ClientV2) ExtendSupport() bool
- func (c *ClientV2) FinalClose()
- func (c *ClientV2) FinishedMessage()
- func (c *ClientV2) Flush() error
- func (c *ClientV2) GetDesiredTag() string
- func (c *ClientV2) GetHeartbeatInterval() time.Duration
- func (c *ClientV2) GetID() int64
- func (c *ClientV2) GetMsgTimeout() time.Duration
- func (c *ClientV2) GetOutputBufferSize() int64
- func (c *ClientV2) GetOutputBufferTimeout() time.Duration
- func (c *ClientV2) GetTagMsgChannel() chan *Message
- func (c *ClientV2) HasAuthorizations() bool
- func (c *ClientV2) Identify(data IdentifyDataV2) error
- func (c *ClientV2) IncrSubError(delta int64)
- func (c *ClientV2) IsAuthorized(topic, channel string) (bool, error)
- func (c *ClientV2) IsReadyForMessages() bool
- func (c *ClientV2) LockWrite()
- func (c *ClientV2) Pause()
- func (c *ClientV2) QueryAuthd() error
- func (c *ClientV2) RequeuedMessage()
- func (c *ClientV2) SendingMessage()
- func (c *ClientV2) SetDesiredTag(tagStr string) error
- func (c *ClientV2) SetExtFilter(filter ExtFilterData)
- func (c *ClientV2) SetExtendSupport()
- func (c *ClientV2) SetHeartbeatInterval(desiredInterval int) error
- func (c *ClientV2) SetMsgTimeout(msgTimeout int) error
- func (c *ClientV2) SetOutputBufferSize(desiredSize int) error
- func (c *ClientV2) SetOutputBufferTimeout(desiredTimeout int) error
- func (c *ClientV2) SetReadyCount(count int64)
- func (c *ClientV2) SetSampleRate(sampleRate int32) error
- func (c *ClientV2) SetTagMsgChannel(tagMsgChan chan *Message) error
- func (c *ClientV2) StartClose()
- func (c *ClientV2) Stats() ClientStats
- func (c *ClientV2) String() string
- func (c *ClientV2) TimedOutMessage()
- func (c *ClientV2) UnPause()
- func (c *ClientV2) UnlockWrite()
- func (c *ClientV2) UnsetDesiredTag()
- func (c *ClientV2) UpgradeDeflate(level int) error
- func (c *ClientV2) UpgradeSnappy() error
- func (c *ClientV2) UpgradeTLS() error
- type Consumer
- type DelayQueue
- func (q *DelayQueue) BackupKVStoreTo(w io.Writer) (int64, error)
- func (q *DelayQueue) CheckConsistence() error
- func (q *DelayQueue) Close() error
- func (q *DelayQueue) ConfirmedMessage(msg *Message) error
- func (q *DelayQueue) Delete() error
- func (q *DelayQueue) EmptyDelayedChannel(ch string) error
- func (q *DelayQueue) EmptyDelayedType(dt int) error
- func (q *DelayQueue) ForceFlush()
- func (q *DelayQueue) GetCurrentDelayedCnt(dt int, channel string) (uint64, error)
- func (q *DelayQueue) GetDBSize() (int64, error)
- func (q *DelayQueue) GetDiskQueueSnapshot() *DiskQueueSnapshot
- func (q *DelayQueue) GetFullName() string
- func (q *DelayQueue) GetOldestConsumedState(chList []string, includeOthers bool) (RecentKeyList, map[int]uint64, map[string]uint64)
- func (q *DelayQueue) GetSyncedOffset() (BackendOffset, error)
- func (q *DelayQueue) GetTopicName() string
- func (q *DelayQueue) GetTopicPart() int
- func (q *DelayQueue) IsChannelMessageDelayed(msgID MessageID, ch string) bool
- func (q *DelayQueue) IsDataNeedFix() bool
- func (q *DelayQueue) IsExt() bool
- func (q *DelayQueue) PeekAll(results []Message) (int, error)
- func (q *DelayQueue) PeekRecentChannelTimeout(now int64, results []Message, ch string) (int, error)
- func (q *DelayQueue) PeekRecentDelayedPub(now int64, results []Message) (int, error)
- func (q *DelayQueue) PeekRecentTimeoutWithFilter(results []Message, peekTs int64, filterType int, filterChannel string) (int, error)
- func (q *DelayQueue) PutDelayMessage(m *Message) (MessageID, BackendOffset, int32, BackendQueueEnd, error)
- func (q *DelayQueue) PutMessageOnReplica(m *Message, offset BackendOffset, checkSize int64) (BackendQueueEnd, error)
- func (q *DelayQueue) PutRawDataOnReplica(rawData []byte, offset BackendOffset, checkSize int64, msgNum int32) (BackendQueueEnd, error)
- func (q *DelayQueue) ResetBackendEndNoLock(vend BackendOffset, totalCnt int64) error
- func (q *DelayQueue) ResetBackendWithQueueStartNoLock(queueStartOffset int64, queueStartCnt int64) error
- func (q *DelayQueue) RestoreKVStoreFrom(body io.Reader) error
- func (q *DelayQueue) RollbackNoLock(vend BackendOffset, diffCnt uint64) error
- func (q *DelayQueue) SetDataFixState(needFix bool)
- func (q *DelayQueue) SetTrace(enable bool)
- func (q *DelayQueue) TotalDataSize() int64
- func (q *DelayQueue) TotalMessageCnt() uint64
- func (q *DelayQueue) TryCleanOldData(retentionSize int64, noRealClean bool, maxCleanOffset BackendOffset) (BackendQueueEnd, error)
- func (q *DelayQueue) UpdateConsumedState(keyList RecentKeyList, cntList map[int]uint64, ...) error
- type DetailStatsInfo
- func (self *DetailStatsInfo) GetHourlyStats() [24]int64
- func (self *DetailStatsInfo) GetMsgSizeStats() []int64
- func (self *DetailStatsInfo) GetMsgWriteLatencyStats() []int64
- func (self *DetailStatsInfo) GetPubClientStats() []ClientPubStats
- func (self *DetailStatsInfo) LoadHistory(fileName string) error
- func (self *DetailStatsInfo) RemovePubStats(remote string, protocol string)
- func (self *DetailStatsInfo) ResetHistoryInitPub(msgSize int64)
- func (self *DetailStatsInfo) SaveHistory(fileName string) error
- func (self *DetailStatsInfo) UpdateHistory(historyList [24]int64)
- func (self *DetailStatsInfo) UpdatePubClientStats(remote string, agent string, protocol string, count int64, hasErr bool)
- func (self *DetailStatsInfo) UpdateTopicMsgStats(msgSize int64, latency int64)
- type DiskQueueSnapshot
- func (d *DiskQueueSnapshot) Close() error
- func (d *DiskQueueSnapshot) GetCurrentReadQueueOffset() BackendQueueOffset
- func (d *DiskQueueSnapshot) GetQueueReadStart() BackendQueueEnd
- func (d *DiskQueueSnapshot) ReadOne() ReadResult
- func (d *DiskQueueSnapshot) ReadRaw(size int32) ([]byte, error)
- func (d *DiskQueueSnapshot) ResetSeekTo(voffset BackendOffset) error
- func (d *DiskQueueSnapshot) SeekTo(voffset BackendOffset) error
- func (d *DiskQueueSnapshot) SeekToEnd() error
- func (d *DiskQueueSnapshot) SetQueueStart(start BackendQueueEnd)
- func (d *DiskQueueSnapshot) SkipToNext() error
- func (d *DiskQueueSnapshot) UpdateQueueEnd(e BackendQueueEnd)
- type ExtFilterData
- type FullMessageID
- type IExtFilter
- type IMsgTracer
- type INsqdNotify
- type IdentifyDataV2
- type IntervalHash
- func (self *IntervalHash) AddOrMerge(inter QueueInterval) QueueInterval
- func (self *IntervalHash) DeleteInterval(inter QueueInterval)
- func (self *IntervalHash) DeleteLower(low int64) int
- func (self *IntervalHash) DeleteRange(inter QueueInterval)
- func (self *IntervalHash) IsCompleteOverlap(inter QueueInterval) bool
- func (self *IntervalHash) IsLowestAt(low int64) QueueInterval
- func (self *IntervalHash) Len() int
- func (self *IntervalHash) Query(inter QueueInterval, excludeBoard bool) []QueueInterval
- func (self *IntervalHash) QueryExist(inter QueueInterval, excludeBoard bool) []QueueInterval
- func (self *IntervalHash) ToIntervalList() []MsgQueueInterval
- func (self *IntervalHash) ToString() string
- type IntervalSkipList
- func (self *IntervalSkipList) AddOrMerge(inter QueueInterval) QueueInterval
- func (self *IntervalSkipList) DeleteInterval(inter QueueInterval)
- func (self *IntervalSkipList) DeleteLower(low int64) int
- func (self *IntervalSkipList) DeleteRange(inter QueueInterval)
- func (self *IntervalSkipList) IsCompleteOverlap(inter QueueInterval) bool
- func (self *IntervalSkipList) IsLowestAt(low int64) QueueInterval
- func (self *IntervalSkipList) Len() int
- func (self *IntervalSkipList) Query(inter QueueInterval, excludeBoard bool) []QueueInterval
- func (self *IntervalSkipList) ToIntervalList() []MsgQueueInterval
- func (self *IntervalSkipList) ToString() string
- type IntervalTree
- func (self *IntervalTree) AddOrMerge(inter QueueInterval) QueueInterval
- func (self *IntervalTree) DeleteInterval(inter QueueInterval)
- func (self *IntervalTree) DeleteLower(low int64) int
- func (self *IntervalTree) DeleteRange(inter QueueInterval)
- func (self *IntervalTree) IsCompleteOverlap(inter QueueInterval) bool
- func (self *IntervalTree) IsLowestAt(low int64) QueueInterval
- func (self *IntervalTree) Len() int
- func (self *IntervalTree) Query(inter QueueInterval, excludeBoard bool) []QueueInterval
- func (self *IntervalTree) ToIntervalList() []MsgQueueInterval
- func (self *IntervalTree) ToString() string
- type LogMsgTracer
- func (self *LogMsgTracer) Start()
- func (self *LogMsgTracer) TracePub(topic string, part int, pubMethod string, traceID uint64, msg *Message, ...)
- func (self *LogMsgTracer) TracePubClient(topic string, part int, traceID uint64, msgID MessageID, ...)
- func (self *LogMsgTracer) TraceSub(topic string, channel string, state string, traceID uint64, msg *Message, ...)
- type Message
- func DecodeDelayedMessage(b []byte, isExt bool) (*Message, error)
- func DecodeMessage(b []byte, ext bool) (*Message, error)
- func NewMessage(id MessageID, body []byte) *Message
- func NewMessageWithExt(id MessageID, body []byte, extVer ext.ExtVer, extBytes []byte) *Message
- func NewMessageWithTs(id MessageID, body []byte, ts int64) *Message
- func (m *Message) GetClientID() int64
- func (m *Message) GetCopy() *Message
- func (m *Message) GetFullMsgID() FullMessageID
- func (m *Message) IsDeferred() bool
- func (m *Message) WriteDelayedTo(w io.Writer, writeExt bool) (int64, error)
- func (m *Message) WriteTo(w io.Writer, writeExt bool) (int64, error)
- func (m *Message) WriteToClient(w io.Writer, writeExt bool, writeDetail bool) (int64, error)
- type MessageID
- type MsgChanData
- type MsgIDGenerator
- type MsgQueueInterval
- type MultiFilterData
- type NSQD
- func (n *NSQD) CheckMagicCode(name string, partition int, code int64, tryFix bool) (string, error)
- func (n *NSQD) CleanClientPubStats(remote string, protocol string)
- func (n *NSQD) CloseExistingTopic(topicName string, partition int) error
- func (n *NSQD) DeleteExistingTopic(topicName string, part int) error
- func (n *NSQD) Exit()
- func (n *NSQD) ForceDeleteTopicData(name string, partition int) error
- func (n *NSQD) GetError() error
- func (n *NSQD) GetExistingTopic(topicName string, part int) (*Topic, error)
- func (n *NSQD) GetHealth() string
- func (n *NSQD) GetOpts() *Options
- func (n *NSQD) GetStartTime() time.Time
- func (n *NSQD) GetStats(leaderOnly bool, filterClients bool) []TopicStats
- func (n *NSQD) GetTopic(topicName string, part int) *Topic
- func (n *NSQD) GetTopicDefaultPart(topicName string) int
- func (n *NSQD) GetTopicIgnPart(topicName string) *Topic
- func (n *NSQD) GetTopicMapCopy() map[string]map[int]*Topic
- func (n *NSQD) GetTopicMapRef() map[string]map[int]*Topic
- func (n *NSQD) GetTopicPartitions(topicName string) map[int]*Topic
- func (n *NSQD) GetTopicStats(leaderOnly bool, topic string) []TopicStats
- func (n *NSQD) GetTopicStatsWithFilter(leaderOnly bool, topic string, filterClients bool) []TopicStats
- func (n *NSQD) GetTopicWithDisabled(topicName string, part int, ext bool) *Topic
- func (n *NSQD) GetTopicWithExt(topicName string, part int) *Topic
- func (n *NSQD) IsAuthEnabled() bool
- func (n *NSQD) IsHealthy() bool
- func (n *NSQD) LoadMetadata(disabled int32)
- func (n *NSQD) NotifyDeleteTopic(t *Topic)
- func (n *NSQD) NotifyPersistMetadata()
- func (n *NSQD) NotifyScanDelayed(ch *Channel)
- func (n *NSQD) NotifyStateChanged(v interface{}, needPersist bool)
- func (n *NSQD) ReqToEnd(ch *Channel, msg *Message, t time.Duration) error
- func (n *NSQD) SetHealth(err error)
- func (n *NSQD) SetPubLoop(loop func(t *Topic))
- func (n *NSQD) SetReqToEndCB(reqToEndCB ReqToEndFunc)
- func (n *NSQD) SetTopicMagicCode(t *Topic, code int64) error
- func (n *NSQD) Start()
- func (n *NSQD) SwapOpts(opts *Options)
- func (n *NSQD) TriggerOptsNotification()
- func (n *NSQD) UpdateTopicHistoryStats()
- type Options
- type PubInfo
- type PubInfoChan
- type QueueInterval
- type ReadResult
- type RecentKeyList
- type RemoteMsgTracer
- func (self *RemoteMsgTracer) Start()
- func (self *RemoteMsgTracer) Stop()
- func (self *RemoteMsgTracer) TracePub(topic string, part int, pubMethod string, traceID uint64, msg *Message, ...)
- func (self *RemoteMsgTracer) TracePubClient(topic string, part int, traceID uint64, msgID MessageID, ...)
- func (self *RemoteMsgTracer) TraceSub(topic string, channel string, state string, traceID uint64, msg *Message, ...)
- type ReqToEndFunc
- type Topic
- func (t *Topic) AggregateChannelE2eProcessingLatency() *quantile.Quantile
- func (t *Topic) BufferPoolGet(capacity int) *bytes.Buffer
- func (t *Topic) BufferPoolPut(b *bytes.Buffer)
- func (t *Topic) Close() error
- func (t *Topic) CloseExistingChannel(channelName string, deleteData bool) error
- func (t *Topic) Delete() error
- func (t *Topic) DeleteExistingChannel(channelName string) error
- func (t *Topic) DisableForSlave()
- func (t *Topic) Empty() error
- func (t *Topic) EnableForMaster()
- func (t *Topic) Exiting() bool
- func (t *Topic) ForceFlush()
- func (t *Topic) ForceFlushForChannels()
- func (t *Topic) GetChannel(channelName string) *Channel
- func (t *Topic) GetChannelMapCopy() map[string]*Channel
- func (t *Topic) GetChannelMeta() []ChannelMetaInfo
- func (t *Topic) GetCommitted() BackendQueueEnd
- func (t *Topic) GetDelayedQueue() *DelayQueue
- func (t *Topic) GetDelayedQueueConsumedState() (RecentKeyList, map[int]uint64, map[string]uint64)
- func (t *Topic) GetDetailStats() *DetailStatsInfo
- func (t *Topic) GetDiskQueueSnapshot() *DiskQueueSnapshot
- func (t *Topic) GetDynamicInfo() TopicDynamicConf
- func (t *Topic) GetExistingChannel(channelName string) (*Channel, error)
- func (t *Topic) GetFullName() string
- func (t *Topic) GetMagicCode() int64
- func (t *Topic) GetOrCreateDelayedQueueNoLock(idGen MsgIDGenerator) (*DelayQueue, error)
- func (t *Topic) GetQueueReadStart() int64
- func (t *Topic) GetTopicChannelDebugStat(channelName string) string
- func (t *Topic) GetTopicName() string
- func (t *Topic) GetTopicPart() int
- func (t *Topic) GetWaitChan() PubInfoChan
- func (t *Topic) IsDataNeedFix() bool
- func (t *Topic) IsExt() bool
- func (t *Topic) IsOrdered() bool
- func (t *Topic) IsWriteDisabled() bool
- func (t *Topic) LoadChannelMeta() error
- func (t *Topic) LoadHistoryStats() error
- func (t *Topic) MarkAsRemoved() (string, error)
- func (t *Topic) NotifyReloadChannels()
- func (t *Topic) PrintCurrentStats()
- func (t *Topic) PutMessage(m *Message) (MessageID, BackendOffset, int32, BackendQueueEnd, error)
- func (t *Topic) PutMessageNoLock(m *Message) (MessageID, BackendOffset, int32, BackendQueueEnd, error)
- func (t *Topic) PutMessageOnReplica(m *Message, offset BackendOffset, checkSize int64) (BackendQueueEnd, error)
- func (t *Topic) PutMessages(msgs []*Message) (MessageID, BackendOffset, int32, int64, BackendQueueEnd, error)
- func (t *Topic) PutMessagesNoLock(msgs []*Message) (MessageID, BackendOffset, int32, int64, BackendQueueEnd, error)
- func (t *Topic) PutMessagesOnReplica(msgs []*Message, offset BackendOffset, checkSize int64) (BackendQueueEnd, error)
- func (t *Topic) PutRawDataOnReplica(rawData []byte, offset BackendOffset, checkSize int64, msgNum int32) (BackendQueueEnd, error)
- func (t *Topic) QuitChan() <-chan struct{}
- func (t *Topic) RemoveChannelMeta()
- func (t *Topic) ResetBackendEndNoLock(vend BackendOffset, totalCnt int64) error
- func (t *Topic) ResetBackendWithQueueStartNoLock(queueStartOffset int64, queueStartCnt int64) error
- func (t *Topic) RollbackNoLock(vend BackendOffset, diffCnt uint64) error
- func (t *Topic) SaveChannelMeta() error
- func (t *Topic) SaveHistoryStats() error
- func (t *Topic) SetDataFixState(needFix bool)
- func (t *Topic) SetDynamicInfo(dynamicConf TopicDynamicConf, idGen MsgIDGenerator)
- func (t *Topic) SetMagicCode(code int64) error
- func (t *Topic) SetTrace(enable bool)
- func (t *Topic) TotalDataSize() int64
- func (t *Topic) TotalMessageCnt() uint64
- func (t *Topic) TryCleanOldData(retentionSize int64, noRealClean bool, maxCleanOffset BackendOffset) (BackendQueueEnd, error)
- func (t *Topic) UpdateCommittedOffset(offset BackendQueueEnd)
- func (t *Topic) UpdateDelayedQueueConsumedState(keyList RecentKeyList, cntList map[int]uint64, ...) error
- type TopicDynamicConf
- type TopicHistoryStatsInfo
- type TopicMsgStatsInfo
- type TopicStats
- type Topics
- type TopicsByName
- type TraceLogItemInfo
Constants ¶
const ( MAX_MEM_REQ_TIMES = 10 MaxWaitingDelayed = 100 )
const ( MinDelayedType = 1 ChannelDelayed = 1 PubDelayed = 2 TransactionDelayed = 3 MaxDelayedType = 4 TxMaxSize = 65536 CompactCntThreshold = 20000 )
const ( MsgIDLength = 16 MsgTraceIDLength = 8 MsgJsonHeaderLength = 2 )
const ( TLSNotRequired = iota TLSRequiredExceptHTTP TLSRequired )
const ( MAX_TOPIC_PARTITION = 1023 HISTORY_STAT_FILE_NAME = ".stat.history.dat" )
const (
FLUSH_DISTANCE = 4
)
const (
MAX_NODE_ID = 1024 * 1024
)
const (
MAX_POSSIBLE_MSG_SIZE = 1 << 28
)
const (
MAX_QUEUE_OFFSET_META_DATA_KEEP = 100
)
Variables ¶
var ( ErrMsgNotInFlight = errors.New("Message ID not in flight") ErrMsgDeferredTooMuch = errors.New("Too much deferred messages in flight") ErrMsgAlreadyInFlight = errors.New("Message ID already in flight") ErrConsumeDisabled = errors.New("Consume is disabled currently") ErrMsgDeferred = errors.New("Message is deferred") ErrSetConsumeOffsetNotFirstClient = errors.New("consume offset can only be changed by the first consume client") ErrNotDiskQueueReader = errors.New("the consume channel is not disk queue reader") )
var ( ErrReadQueueAlreadyCleaned = errors.New("the queue position has been cleaned") ErrConfirmSizeInvalid = errors.New("Confirm data size invalid.") ErrConfirmCntInvalid = errors.New("Confirm message count invalid.") ErrMoveOffsetInvalid = errors.New("move offset invalid") ErrOffsetTypeMismatch = errors.New("offset type mismatch") ErrReadQueueCountMissing = errors.New("read queue count info missing") ErrReadEndOfQueue = errors.New("read to the end of queue") ErrInvalidReadable = errors.New("readable data is invalid") ErrReadEndChangeToOld = errors.New("queue read end change to old without reload") ErrExiting = errors.New("exiting") )
var ( ErrInvalidOffset = errors.New("invalid offset") ErrNeedFixQueueStart = errors.New("init queue start should be fixed") )
var ( ErrNotSupportedFilter = errors.New("the filter type not supported") ErrInvalidFilter = errors.New("invalid filter rule") )
{ "ver":1, "filter_ext_key":"xx", "filter_data":"filterA", }
{ "ver":2, "filter_ext_key":"xx", "filter_data":"regexp string", }
{ "ver":3, "filter_ext_key":"xx", "filter_data":"glob rule", }
ver is used to extend other filter type currently support equal, regexp, glob TODO: maybe support multi ext key rule chains such as, filter if match_rule1(ext_key1) and match_rule2(ext_key2) or match_rule3(ext_key3)
var ( ErrTopicPartitionMismatch = errors.New("topic partition mismatch") ErrTopicNotExist = errors.New("topic does not exist") )
var ( ErrInvalidMessageID = errors.New("message id is invalid") ErrWriteOffsetMismatch = errors.New("write offset mismatch") ErrOperationInvalidState = errors.New("the operation is not allowed under current state") ErrMessageInvalidDelayedState = errors.New("the message is invalid for delayed") )
var (
CompactThreshold = 1024 * 1024 * 16
)
var DEFAULT_RETENTION_DAYS = 7
var EnableDelayedQueue = int32(1)
Functions ¶
func GetTraceIDFromFullMsgID ¶
func GetTraceIDFromFullMsgID(id FullMessageID) uint64
func NsqLogger ¶
func NsqLogger() *levellogger.LevelLogger
func SetLogger ¶
func SetLogger(log levellogger.Logger)
Types ¶
type BackendQueue ¶
type BackendQueue interface { Put([]byte) error ReadChan() chan []byte // this is expected to be an *unbuffered* channel Close() error Delete() error Depth() int64 Empty() error }
BackendQueue represents the behavior for the secondary message storage system
type BackendQueueEnd ¶
type BackendQueueEnd interface { Offset() BackendOffset TotalMsgCnt() int64 IsSame(BackendQueueEnd) bool }
type BackendQueueOffset ¶
type BackendQueueOffset interface {
Offset() BackendOffset
}
type BackendQueueReader ¶
type BackendQueueReader interface { ConfirmRead(BackendOffset, int64) error ResetReadToConfirmed() (BackendQueueEnd, error) SkipReadToOffset(BackendOffset, int64) (BackendQueueEnd, error) SkipReadToEnd() (BackendQueueEnd, error) Close() error // left data to be read Depth() int64 DepthSize() int64 GetQueueReadEnd() BackendQueueEnd GetQueueConfirmed() BackendQueueEnd Delete() error UpdateQueueEnd(BackendQueueEnd, bool) (bool, error) TryReadOne() (ReadResult, bool) }
for channel consumer
type BackendQueueWriter ¶
type BackendQueueWriter interface { Put([]byte) (BackendOffset, int32, int64, error) Close() error Delete() error Empty() error Flush() error GetQueueWriteEnd() BackendQueueEnd GetQueueReadStart() BackendQueueEnd GetQueueReadEnd() BackendQueueEnd RollbackWrite(BackendOffset, uint64) error ResetWriteEnd(BackendOffset, int64) error }
for topic producer
func NewDiskQueueWriter ¶
type Channel ¶
type Channel struct { sync.RWMutex // stat counters EnableTrace int32 Ext int32 // contains filtered or unexported fields }
Channel represents the concrete type for a NSQ channel (and also implements the Queue interface)
There can be multiple channels per topic, each with there own unique set of subscribers (clients).
Channels maintain all client and message metadata, orchestrating in-flight messages, timeouts, requeuing, etc.
func NewChannel ¶
func NewChannel(topicName string, part int, channelName string, chEnd BackendQueueEnd, opt *Options, deleteCallback func(*Channel), consumeDisabled int32, notify INsqdNotify, ext int32) *Channel
NewChannel creates a new instance of the Channel type and returns a pointer
func (*Channel) CleanWaitingRequeueChan ¶
if a message confirmed without goto inflight first, then we should clean the waiting state from requeue
func (*Channel) ConfirmBackendQueue ¶
func (c *Channel) ConfirmBackendQueue(msg *Message) (BackendOffset, int64, bool)
in order not to make the confirm map too large, we need handle this case: a old message is not confirmed, and we keep all the newer confirmed messages so we can confirm later. indicated weather the confirmed offset is changed
func (*Channel) ConfirmBackendQueueOnSlave ¶
func (c *Channel) ConfirmBackendQueueOnSlave(offset BackendOffset, cnt int64, allowBackward bool) error
func (*Channel) ConfirmDelayedMessage ¶
func (c *Channel) ConfirmDelayedMessage(msg *Message) (BackendOffset, int64, bool)
func (*Channel) Exiting ¶
Exiting returns a boolean indicating if this channel is closed/exiting
func (*Channel) FinishMessage ¶
func (*Channel) FinishMessageForce ¶
func (*Channel) GetChannelEnd ¶
func (c *Channel) GetChannelEnd() BackendQueueEnd
func (*Channel) GetClientTagMsgChan ¶
* get active tag channel or default message channel from tag channel map
func (*Channel) GetConfirmed ¶
func (c *Channel) GetConfirmed() BackendQueueEnd
func (*Channel) GetConfirmedInterval ¶
func (c *Channel) GetConfirmedInterval() []MsgQueueInterval
func (*Channel) GetDelayedQueue ¶
func (c *Channel) GetDelayedQueue() *DelayQueue
func (*Channel) GetDelayedQueueConsumedState ¶
func (*Channel) GetOrCreateClientMsgChannel ¶
get or create tag message chanel, invoked from protocol_v2.messagePump()
func (*Channel) RemoveClient ¶
RemoveClient removes a client from the Channel's client list
func (*Channel) RequeueClientMessages ¶
func (*Channel) RequeueMessage ¶
func (c *Channel) RequeueMessage(clientID int64, clientAddr string, id MessageID, timeout time.Duration, byClient bool) error
RequeueMessage requeues a message based on `time.Duration`, ie:
`timeoutMs` == 0 - requeue a message immediately `timeoutMs` > 0 - asynchronously wait for the specified timeout
and requeue a message
func (*Channel) SetConsumeOffset ¶
func (c *Channel) SetConsumeOffset(offset BackendOffset, cnt int64, force bool) error
func (*Channel) SetDelayedQueue ¶
func (c *Channel) SetDelayedQueue(dq *DelayQueue)
func (*Channel) ShouldRequeueToEnd ¶
func (*Channel) StartInFlightTimeout ¶
func (*Channel) TouchMessage ¶
TouchMessage resets the timeout for an in-flight message
func (*Channel) UpdateConfirmedInterval ¶
func (c *Channel) UpdateConfirmedInterval(intervals []MsgQueueInterval)
func (*Channel) UpdateQueueEnd ¶
func (c *Channel) UpdateQueueEnd(end BackendQueueEnd, forceReload bool) error
When topic message is put, update the new end of the queue
type ChannelMetaInfo ¶
type ChannelStats ¶
type ChannelStats struct { ChannelName string `json:"channel_name"` // message size need to consume Depth int64 `json:"depth"` DepthSize int64 `json:"depth_size"` DepthTimestamp string `json:"depth_ts"` BackendDepth int64 `json:"backend_depth"` // total size sub past hour on this channel HourlySubSize int64 `json:"hourly_subsize"` InFlightCount int `json:"in_flight_count"` DeferredCount int `json:"deferred_count"` MessageCount uint64 `json:"message_count"` RequeueCount uint64 `json:"requeue_count"` TimeoutCount uint64 `json:"timeout_count"` Clients []ClientStats `json:"clients"` ClientNum int64 `json:"client_num"` Paused bool `json:"paused"` Skipped bool `json:"skipped"` DelayedQueueCount uint64 `json:"delayed_queue_count"` DelayedQueueRecent string `json:"delayed_queue_recent"` E2eProcessingLatency *quantile.Result `json:"e2e_processing_latency"` MsgConsumeLatencyStats []int64 `json:"msg_consume_latency_stats"` MsgDeliveryLatencyStats []int64 `json:"msg_delivery_latency_stats"` }
func NewChannelStats ¶
func NewChannelStats(c *Channel, clients []ClientStats, clientNum int) ChannelStats
type ChannelStatsInfo ¶
type ChannelStatsInfo struct { // 16ms, 32ms, 64ms, 128ms, 256ms, 512ms, 1024ms, 2048ms, 4s, 8s, 16s, above MsgConsumeLatencyStats [12]int64 MsgDeliveryLatencyStats [12]int64 }
func (*ChannelStatsInfo) GetChannelLatencyStats ¶
func (self *ChannelStatsInfo) GetChannelLatencyStats() []int64
func (*ChannelStatsInfo) GetDeliveryLatencyStats ¶
func (self *ChannelStatsInfo) GetDeliveryLatencyStats() []int64
func (*ChannelStatsInfo) UpdateChannelLatencyStats ¶
func (self *ChannelStatsInfo) UpdateChannelLatencyStats(latencyInMillSec int64)
update message consume latency distribution in millisecond
func (*ChannelStatsInfo) UpdateChannelStats ¶
func (self *ChannelStatsInfo) UpdateChannelStats(latencyInMillSec int64)
func (*ChannelStatsInfo) UpdateDeliveryLatencyStats ¶
func (self *ChannelStatsInfo) UpdateDeliveryLatencyStats(latencyInMillSec int64)
update message consume latency distribution in millisecond
func (*ChannelStatsInfo) UpdateDeliveryStats ¶
func (self *ChannelStatsInfo) UpdateDeliveryStats(latencyInMillSec int64)
type ChannelsByName ¶
type ChannelsByName struct {
Channels
}
func (ChannelsByName) Less ¶
func (c ChannelsByName) Less(i, j int) bool
type ClientPubStats ¶
type ClientStats ¶
type ClientStats struct { // TODO: deprecated, remove in 1.0 Name string `json:"name"` ClientID string `json:"client_id"` Hostname string `json:"hostname"` Version string `json:"version"` RemoteAddress string `json:"remote_address"` State int32 `json:"state"` ReadyCount int64 `json:"ready_count"` InFlightCount int64 `json:"in_flight_count"` MessageCount uint64 `json:"message_count"` FinishCount uint64 `json:"finish_count"` RequeueCount uint64 `json:"requeue_count"` TimeoutCount int64 `json:"timeout_count"` DeferredCount int64 `json:"deferred_count"` ConnectTime int64 `json:"connect_ts"` SampleRate int32 `json:"sample_rate"` Deflate bool `json:"deflate"` Snappy bool `json:"snappy"` UserAgent string `json:"user_agent"` Authed bool `json:"authed,omitempty"` AuthIdentity string `json:"auth_identity,omitempty"` AuthIdentityURL string `json:"auth_identity_url,omitempty"` DesiredTag string `json:"desired_tag"` TLS bool `json:"tls"` CipherSuite string `json:"tls_cipher_suite"` TLSVersion string `json:"tls_version"` TLSNegotiatedProtocol string `json:"tls_negotiated_protocol"` TLSNegotiatedProtocolIsMutual bool `json:"tls_negotiated_protocol_is_mutual"` }
type ClientV2 ¶
type ClientV2 struct { // 64bit atomic vars need to be first for proper alignment on 32bit platforms ReadyCount int64 InFlightCount int64 MessageCount uint64 FinishCount uint64 RequeueCount uint64 TimeoutCount uint64 ID int64 UserAgent string // original connection net.Conn // reading/writing interfaces Reader *bufio.Reader Writer *bufio.Writer State int32 ConnectTime time.Time Channel *Channel ReadyStateChan chan int // this is only used by notify messagebump to quit // and should be closed by the read loop only ExitChan chan int ClientID string Hostname string SampleRate int32 IdentifyEventChan chan identifyEvent SubEventChan chan *Channel TLS int32 Snappy int32 Deflate int32 LenSlice []byte AuthSecret string AuthState *auth.State EnableTrace bool PubTimeout *time.Timer TagMsgChannel chan *Message // contains filtered or unexported fields }
func NewClientV2 ¶
func (*ClientV2) Identify ¶
func (c *ClientV2) Identify(data IdentifyDataV2) error
func (*ClientV2) IsAuthorized ¶
func (*ClientV2) SetExtFilter ¶
func (c *ClientV2) SetExtFilter(filter ExtFilterData)
func (*ClientV2) SetHeartbeatInterval ¶
func (*ClientV2) SetOutputBufferSize ¶
func (*ClientV2) SetOutputBufferTimeout ¶
func (*ClientV2) SetTagMsgChannel ¶
since only used in messagePump loop, no lock needed
func (*ClientV2) Stats ¶
func (c *ClientV2) Stats() ClientStats
type Consumer ¶
type Consumer interface { UnPause() Pause() TimedOutMessage() RequeuedMessage() FinishedMessage() Stats() ClientStats Exit() Empty() String() string GetID() int64 }
type DelayQueue ¶
type DelayQueue struct { EnableTrace int32 SyncEvery int64 // contains filtered or unexported fields }
func NewDelayQueue ¶
func NewDelayQueue(topicName string, part int, dataPath string, opt *Options, idGen MsgIDGenerator, isExt bool) (*DelayQueue, error)
func NewDelayQueueForRead ¶
func NewDelayQueueForRead(topicName string, part int, dataPath string, opt *Options, idGen MsgIDGenerator, isExt bool) (*DelayQueue, error)
func (*DelayQueue) BackupKVStoreTo ¶
func (q *DelayQueue) BackupKVStoreTo(w io.Writer) (int64, error)
func (*DelayQueue) CheckConsistence ¶
func (q *DelayQueue) CheckConsistence() error
func (*DelayQueue) Close ¶
func (q *DelayQueue) Close() error
func (*DelayQueue) ConfirmedMessage ¶
func (q *DelayQueue) ConfirmedMessage(msg *Message) error
func (*DelayQueue) Delete ¶
func (q *DelayQueue) Delete() error
func (*DelayQueue) EmptyDelayedChannel ¶
func (q *DelayQueue) EmptyDelayedChannel(ch string) error
func (*DelayQueue) EmptyDelayedType ¶
func (q *DelayQueue) EmptyDelayedType(dt int) error
func (*DelayQueue) ForceFlush ¶
func (q *DelayQueue) ForceFlush()
func (*DelayQueue) GetCurrentDelayedCnt ¶
func (q *DelayQueue) GetCurrentDelayedCnt(dt int, channel string) (uint64, error)
func (*DelayQueue) GetDBSize ¶
func (q *DelayQueue) GetDBSize() (int64, error)
func (*DelayQueue) GetDiskQueueSnapshot ¶
func (q *DelayQueue) GetDiskQueueSnapshot() *DiskQueueSnapshot
func (*DelayQueue) GetFullName ¶
func (q *DelayQueue) GetFullName() string
func (*DelayQueue) GetOldestConsumedState ¶
func (q *DelayQueue) GetOldestConsumedState(chList []string, includeOthers bool) (RecentKeyList, map[int]uint64, map[string]uint64)
func (*DelayQueue) GetSyncedOffset ¶
func (q *DelayQueue) GetSyncedOffset() (BackendOffset, error)
func (*DelayQueue) GetTopicName ¶
func (q *DelayQueue) GetTopicName() string
func (*DelayQueue) GetTopicPart ¶
func (q *DelayQueue) GetTopicPart() int
func (*DelayQueue) IsChannelMessageDelayed ¶
func (q *DelayQueue) IsChannelMessageDelayed(msgID MessageID, ch string) bool
func (*DelayQueue) IsDataNeedFix ¶
func (q *DelayQueue) IsDataNeedFix() bool
func (*DelayQueue) IsExt ¶
func (q *DelayQueue) IsExt() bool
func (*DelayQueue) PeekAll ¶
func (q *DelayQueue) PeekAll(results []Message) (int, error)
func (*DelayQueue) PeekRecentChannelTimeout ¶
func (*DelayQueue) PeekRecentDelayedPub ¶
func (q *DelayQueue) PeekRecentDelayedPub(now int64, results []Message) (int, error)
func (*DelayQueue) PeekRecentTimeoutWithFilter ¶
func (*DelayQueue) PutDelayMessage ¶
func (q *DelayQueue) PutDelayMessage(m *Message) (MessageID, BackendOffset, int32, BackendQueueEnd, error)
func (*DelayQueue) PutMessageOnReplica ¶
func (q *DelayQueue) PutMessageOnReplica(m *Message, offset BackendOffset, checkSize int64) (BackendQueueEnd, error)
func (*DelayQueue) PutRawDataOnReplica ¶
func (q *DelayQueue) PutRawDataOnReplica(rawData []byte, offset BackendOffset, checkSize int64, msgNum int32) (BackendQueueEnd, error)
func (*DelayQueue) ResetBackendEndNoLock ¶
func (q *DelayQueue) ResetBackendEndNoLock(vend BackendOffset, totalCnt int64) error
func (*DelayQueue) ResetBackendWithQueueStartNoLock ¶
func (q *DelayQueue) ResetBackendWithQueueStartNoLock(queueStartOffset int64, queueStartCnt int64) error
func (*DelayQueue) RestoreKVStoreFrom ¶
func (q *DelayQueue) RestoreKVStoreFrom(body io.Reader) error
func (*DelayQueue) RollbackNoLock ¶
func (q *DelayQueue) RollbackNoLock(vend BackendOffset, diffCnt uint64) error
func (*DelayQueue) SetDataFixState ¶
func (q *DelayQueue) SetDataFixState(needFix bool)
func (*DelayQueue) SetTrace ¶
func (q *DelayQueue) SetTrace(enable bool)
func (*DelayQueue) TotalDataSize ¶
func (q *DelayQueue) TotalDataSize() int64
func (*DelayQueue) TotalMessageCnt ¶
func (q *DelayQueue) TotalMessageCnt() uint64
func (*DelayQueue) TryCleanOldData ¶
func (q *DelayQueue) TryCleanOldData(retentionSize int64, noRealClean bool, maxCleanOffset BackendOffset) (BackendQueueEnd, error)
func (*DelayQueue) UpdateConsumedState ¶
func (q *DelayQueue) UpdateConsumedState(keyList RecentKeyList, cntList map[int]uint64, channelCntList map[string]uint64) error
type DetailStatsInfo ¶
func NewDetailStatsInfo ¶
func NewDetailStatsInfo(initPubSize int64, historyPath string) *DetailStatsInfo
func (*DetailStatsInfo) GetHourlyStats ¶
func (self *DetailStatsInfo) GetHourlyStats() [24]int64
func (*DetailStatsInfo) GetMsgSizeStats ¶
func (self *DetailStatsInfo) GetMsgSizeStats() []int64
func (*DetailStatsInfo) GetMsgWriteLatencyStats ¶
func (self *DetailStatsInfo) GetMsgWriteLatencyStats() []int64
func (*DetailStatsInfo) GetPubClientStats ¶
func (self *DetailStatsInfo) GetPubClientStats() []ClientPubStats
func (*DetailStatsInfo) LoadHistory ¶
func (self *DetailStatsInfo) LoadHistory(fileName string) error
func (*DetailStatsInfo) RemovePubStats ¶
func (self *DetailStatsInfo) RemovePubStats(remote string, protocol string)
func (*DetailStatsInfo) ResetHistoryInitPub ¶
func (self *DetailStatsInfo) ResetHistoryInitPub(msgSize int64)
func (*DetailStatsInfo) SaveHistory ¶
func (self *DetailStatsInfo) SaveHistory(fileName string) error
func (*DetailStatsInfo) UpdateHistory ¶
func (self *DetailStatsInfo) UpdateHistory(historyList [24]int64)
func (*DetailStatsInfo) UpdatePubClientStats ¶
func (*DetailStatsInfo) UpdateTopicMsgStats ¶
func (self *DetailStatsInfo) UpdateTopicMsgStats(msgSize int64, latency int64)
type DiskQueueSnapshot ¶
note: the message count info is not kept in snapshot
func NewDiskQueueSnapshot ¶
func NewDiskQueueSnapshot(readFrom string, dataPath string, endInfo BackendQueueEnd) *DiskQueueSnapshot
newDiskQueue instantiates a new instance of DiskQueueSnapshot, retrieving metadata from the filesystem and starting the read ahead goroutine
func (*DiskQueueSnapshot) Close ¶
func (d *DiskQueueSnapshot) Close() error
Close cleans up the queue and persists metadata
func (*DiskQueueSnapshot) GetCurrentReadQueueOffset ¶
func (d *DiskQueueSnapshot) GetCurrentReadQueueOffset() BackendQueueOffset
func (*DiskQueueSnapshot) GetQueueReadStart ¶
func (d *DiskQueueSnapshot) GetQueueReadStart() BackendQueueEnd
func (*DiskQueueSnapshot) ReadOne ¶
func (d *DiskQueueSnapshot) ReadOne() ReadResult
readOne performs a low level filesystem read for a single []byte while advancing read positions and rolling files, if necessary
func (*DiskQueueSnapshot) ReadRaw ¶
func (d *DiskQueueSnapshot) ReadRaw(size int32) ([]byte, error)
func (*DiskQueueSnapshot) ResetSeekTo ¶
func (d *DiskQueueSnapshot) ResetSeekTo(voffset BackendOffset) error
this can allow backward seek
func (*DiskQueueSnapshot) SeekTo ¶
func (d *DiskQueueSnapshot) SeekTo(voffset BackendOffset) error
func (*DiskQueueSnapshot) SeekToEnd ¶
func (d *DiskQueueSnapshot) SeekToEnd() error
func (*DiskQueueSnapshot) SetQueueStart ¶
func (d *DiskQueueSnapshot) SetQueueStart(start BackendQueueEnd)
func (*DiskQueueSnapshot) SkipToNext ¶
func (d *DiskQueueSnapshot) SkipToNext() error
func (*DiskQueueSnapshot) UpdateQueueEnd ¶
func (d *DiskQueueSnapshot) UpdateQueueEnd(e BackendQueueEnd)
Put writes a []byte to the queue
type ExtFilterData ¶
type ExtFilterData struct { Type int `json:"type,omitempty"` FilterExtKey string `json:"filter_ext_key,omitempty"` FilterData string `json:"filter_data,omitempty"` FilterDataList []MultiFilterData `json:"filter_data_list,omitempty"` }
type FullMessageID ¶
type FullMessageID [MsgIDLength]byte
the new message total id will be ID+TraceID, the length is same with old id slice, the traceid only used for trace for business, the ID is used for internal. In order to be compatible with old format, we keep the attempts field.
type IExtFilter ¶
func NewExtFilter ¶
func NewExtFilter(filter ExtFilterData) (IExtFilter, error)
type IMsgTracer ¶
type IMsgTracer interface { Start() TracePub(topic string, part int, pubMethod string, traceID uint64, msg *Message, diskOffset BackendOffset, currentCnt int64) TracePubClient(topic string, part int, traceID uint64, msgID MessageID, diskOffset BackendOffset, clientID string) // state will be READ_QUEUE, Start, Req, Fin, Timeout TraceSub(topic string, channel string, state string, traceID uint64, msg *Message, clientID string) }
func GetMsgTracer ¶
func GetMsgTracer() IMsgTracer
func NewRemoteMsgTracer ¶
func NewRemoteMsgTracer(remote string) IMsgTracer
type INsqdNotify ¶
type IdentifyDataV2 ¶
type IdentifyDataV2 struct { ShortID string `json:"short_id"` // TODO: deprecated, remove in 1.0 LongID string `json:"long_id"` // TODO: deprecated, remove in 1.0 ClientID string `json:"client_id"` Hostname string `json:"hostname"` HeartbeatInterval int `json:"heartbeat_interval"` OutputBufferSize int `json:"output_buffer_size"` OutputBufferTimeout int `json:"output_buffer_timeout"` FeatureNegotiation bool `json:"feature_negotiation"` TLSv1 bool `json:"tls_v1"` Deflate bool `json:"deflate"` DeflateLevel int `json:"deflate_level"` Snappy bool `json:"snappy"` SampleRate int32 `json:"sample_rate"` UserAgent string `json:"user_agent"` MsgTimeout int `json:"msg_timeout"` DesiredTag string `json:"desired_tag,omitempty"` ExtendSupport bool `json:"extend_support"` ExtFilter ExtFilterData `json:"ext_filter"` }
type IntervalHash ¶
type IntervalHash struct {
// contains filtered or unexported fields
}
func NewIntervalHash ¶
func NewIntervalHash() *IntervalHash
func (*IntervalHash) AddOrMerge ¶
func (self *IntervalHash) AddOrMerge(inter QueueInterval) QueueInterval
return the merged interval, if no overlap just return the original
func (*IntervalHash) DeleteInterval ¶
func (self *IntervalHash) DeleteInterval(inter QueueInterval)
func (*IntervalHash) DeleteLower ¶
func (self *IntervalHash) DeleteLower(low int64) int
func (*IntervalHash) DeleteRange ¶
func (self *IntervalHash) DeleteRange(inter QueueInterval)
func (*IntervalHash) IsCompleteOverlap ¶
func (self *IntervalHash) IsCompleteOverlap(inter QueueInterval) bool
func (*IntervalHash) IsLowestAt ¶
func (self *IntervalHash) IsLowestAt(low int64) QueueInterval
func (*IntervalHash) Len ¶
func (self *IntervalHash) Len() int
func (*IntervalHash) Query ¶
func (self *IntervalHash) Query(inter QueueInterval, excludeBoard bool) []QueueInterval
func (*IntervalHash) QueryExist ¶
func (self *IntervalHash) QueryExist(inter QueueInterval, excludeBoard bool) []QueueInterval
func (*IntervalHash) ToIntervalList ¶
func (self *IntervalHash) ToIntervalList() []MsgQueueInterval
func (*IntervalHash) ToString ¶
func (self *IntervalHash) ToString() string
type IntervalSkipList ¶
type IntervalSkipList struct {
// contains filtered or unexported fields
}
func NewIntervalSkipList ¶
func NewIntervalSkipList() *IntervalSkipList
func (*IntervalSkipList) AddOrMerge ¶
func (self *IntervalSkipList) AddOrMerge(inter QueueInterval) QueueInterval
return the merged interval, if no overlap just return the original
func (*IntervalSkipList) DeleteInterval ¶
func (self *IntervalSkipList) DeleteInterval(inter QueueInterval)
func (*IntervalSkipList) DeleteLower ¶
func (self *IntervalSkipList) DeleteLower(low int64) int
func (*IntervalSkipList) DeleteRange ¶
func (self *IntervalSkipList) DeleteRange(inter QueueInterval)
func (*IntervalSkipList) IsCompleteOverlap ¶
func (self *IntervalSkipList) IsCompleteOverlap(inter QueueInterval) bool
func (*IntervalSkipList) IsLowestAt ¶
func (self *IntervalSkipList) IsLowestAt(low int64) QueueInterval
func (*IntervalSkipList) Len ¶
func (self *IntervalSkipList) Len() int
func (*IntervalSkipList) Query ¶
func (self *IntervalSkipList) Query(inter QueueInterval, excludeBoard bool) []QueueInterval
func (*IntervalSkipList) ToIntervalList ¶
func (self *IntervalSkipList) ToIntervalList() []MsgQueueInterval
func (*IntervalSkipList) ToString ¶
func (self *IntervalSkipList) ToString() string
type IntervalTree ¶
type IntervalTree struct {
// contains filtered or unexported fields
}
func NewIntervalTree ¶
func NewIntervalTree() *IntervalTree
func (*IntervalTree) AddOrMerge ¶
func (self *IntervalTree) AddOrMerge(inter QueueInterval) QueueInterval
return the merged interval, if no overlap just return the original
func (*IntervalTree) DeleteInterval ¶
func (self *IntervalTree) DeleteInterval(inter QueueInterval)
func (*IntervalTree) DeleteLower ¶
func (self *IntervalTree) DeleteLower(low int64) int
func (*IntervalTree) DeleteRange ¶
func (self *IntervalTree) DeleteRange(inter QueueInterval)
func (*IntervalTree) IsCompleteOverlap ¶
func (self *IntervalTree) IsCompleteOverlap(inter QueueInterval) bool
func (*IntervalTree) IsLowestAt ¶
func (self *IntervalTree) IsLowestAt(low int64) QueueInterval
func (*IntervalTree) Len ¶
func (self *IntervalTree) Len() int
func (*IntervalTree) Query ¶
func (self *IntervalTree) Query(inter QueueInterval, excludeBoard bool) []QueueInterval
func (*IntervalTree) ToIntervalList ¶
func (self *IntervalTree) ToIntervalList() []MsgQueueInterval
func (*IntervalTree) ToString ¶
func (self *IntervalTree) ToString() string
type LogMsgTracer ¶
type LogMsgTracer struct {
MID string
}
just print the trace log
func (*LogMsgTracer) Start ¶
func (self *LogMsgTracer) Start()
func (*LogMsgTracer) TracePub ¶
func (self *LogMsgTracer) TracePub(topic string, part int, pubMethod string, traceID uint64, msg *Message, diskOffset BackendOffset, currentCnt int64)
func (*LogMsgTracer) TracePubClient ¶
func (self *LogMsgTracer) TracePubClient(topic string, part int, traceID uint64, msgID MessageID, diskOffset BackendOffset, clientID string)
type Message ¶ added in v0.2.29
type Message struct { ID MessageID TraceID uint64 Body []byte Timestamp int64 Attempts uint16 ExtBytes []byte ExtVer ext.ExtVer //for backend queue Offset BackendOffset RawMoveSize BackendOffset // for delayed queue message // 1 - delayed message by channel // 2 - delayed pub // DelayedType int32 DelayedTs int64 DelayedOrigID MessageID DelayedChannel string // will be used for delayed pub. (json data to tell different type of delay) DelayedData []byte // contains filtered or unexported fields }
func NewMessageWithExt ¶
func (*Message) GetFullMsgID ¶
func (m *Message) GetFullMsgID() FullMessageID
func (*Message) WriteDelayedTo ¶
func (*Message) WriteTo ¶ added in v0.2.29
type MessageID ¶ added in v0.2.29
type MessageID uint64
func GetMessageIDFromFullMsgID ¶
func GetMessageIDFromFullMsgID(id FullMessageID) MessageID
type MultiFilterData ¶
type NSQD ¶
type NSQD struct { sync.RWMutex MetaNotifyChan chan interface{} OptsNotificationChan chan struct{} // contains filtered or unexported fields }
func (*NSQD) CheckMagicCode ¶
func (*NSQD) CleanClientPubStats ¶
func (*NSQD) CloseExistingTopic ¶
this just close the topic and remove from map, but keep the data for later.
func (*NSQD) DeleteExistingTopic ¶
DeleteExistingTopic removes a topic only if it exists
func (*NSQD) ForceDeleteTopicData ¶
func (*NSQD) GetExistingTopic ¶
GetExistingTopic gets a topic only if it exists
func (*NSQD) GetStats ¶
func (n *NSQD) GetStats(leaderOnly bool, filterClients bool) []TopicStats
func (*NSQD) GetTopic ¶
GetTopic performs a thread safe operation to return a pointer to a Topic object (potentially new)
func (*NSQD) GetTopicMapRef ¶
should be protected by read lock
func (*NSQD) GetTopicPartitions ¶
func (*NSQD) GetTopicStats ¶
func (n *NSQD) GetTopicStats(leaderOnly bool, topic string) []TopicStats
func (*NSQD) GetTopicStatsWithFilter ¶
func (n *NSQD) GetTopicStatsWithFilter(leaderOnly bool, topic string, filterClients bool) []TopicStats
func (*NSQD) GetTopicWithDisabled ¶
func (*NSQD) NotifyStateChanged ¶
func (*NSQD) SetReqToEndCB ¶
func (n *NSQD) SetReqToEndCB(reqToEndCB ReqToEndFunc)
type Options ¶ added in v0.3.6
type Options struct { // basic options ID int64 `flag:"worker-id" cfg:"id"` Verbose bool `flag:"verbose"` ClusterID string `flag:"cluster-id"` ClusterLeadershipAddresses string `flag:"cluster-leadership-addresses" cfg:"cluster_leadership_addresses"` TCPAddress string `flag:"tcp-address"` RPCPort string `flag:"rpc-port"` ReverseProxyPort string `flag:"reverse-proxy-port"` HTTPAddress string `flag:"http-address"` HTTPSAddress string `flag:"https-address"` BroadcastAddress string `flag:"broadcast-address"` BroadcastInterface string `flag:"broadcast-interface"` NSQLookupdTCPAddresses []string `flag:"lookupd-tcp-address" cfg:"nsqlookupd_tcp_addresses"` AuthHTTPAddresses []string `flag:"auth-http-address" cfg:"auth_http_addresses"` LookupPingInterval time.Duration `flag:"lookup-ping-interval" arg:"5s"` // diskqueue options DataPath string `flag:"data-path"` MemQueueSize int64 `flag:"mem-queue-size"` MaxBytesPerFile int64 `flag:"max-bytes-per-file"` SyncEvery int64 `flag:"sync-every"` SyncTimeout time.Duration `flag:"sync-timeout"` QueueScanInterval time.Duration `flag:"queue-scan-interval"` QueueScanRefreshInterval time.Duration `flag:"queue-scan-refresh-interval"` QueueScanSelectionCount int `flag:"queue-scan-selection-count"` QueueScanWorkerPoolMax int `flag:"queue-scan-worker-pool-max"` QueueScanDirtyPercent float64 `flag:"queue-scan-dirty-percent"` // msg and command options MsgTimeout time.Duration `flag:"msg-timeout" arg:"60s"` MaxMsgTimeout time.Duration `flag:"max-msg-timeout"` MaxMsgSize int64 `flag:"max-msg-size" deprecated:"max-message-size" cfg:"max_msg_size"` MaxBodySize int64 `flag:"max-body-size"` MaxReqTimeout time.Duration `flag:"max-req-timeout"` MaxConfirmWin int64 `flag:"max-confirm-win"` ClientTimeout time.Duration ReqToEndThreshold time.Duration `flag:"req-to-end-threshold"` // client overridable configuration options MaxHeartbeatInterval time.Duration `flag:"max-heartbeat-interval"` MaxRdyCount int64 `flag:"max-rdy-count"` MaxOutputBufferSize int64 `flag:"max-output-buffer-size"` MaxOutputBufferTimeout time.Duration `flag:"max-output-buffer-timeout"` // statsd integration StatsdAddress string `flag:"statsd-address"` StatsdPrefix string `flag:"statsd-prefix"` StatsdProtocol string `flag:"statsd-protocol"` StatsdInterval time.Duration `flag:"statsd-interval" arg:"60s"` StatsdMemStats bool `flag:"statsd-mem-stats"` // e2e message latency E2EProcessingLatencyWindowTime time.Duration `flag:"e2e-processing-latency-window-time"` E2EProcessingLatencyPercentiles []float64 `flag:"e2e-processing-latency-percentile" cfg:"e2e_processing_latency_percentiles"` // TLS config TLSCert string `flag:"tls-cert"` TLSKey string `flag:"tls-key"` TLSClientAuthPolicy string `flag:"tls-client-auth-policy"` TLSRootCAFile string `flag:"tls-root-ca-file"` TLSRequired int `flag:"tls-required"` TLSMinVersion uint16 `flag:"tls-min-version"` // compression DeflateEnabled bool `flag:"deflate"` MaxDeflateLevel int `flag:"max-deflate-level"` SnappyEnabled bool `flag:"snappy"` LogLevel int32 `flag:"log-level" cfg:"log_level"` LogDir string `flag:"log-dir" cfg:"log_dir"` Logger levellogger.Logger RemoteTracer string `flag:"remote-tracer"` RetentionDays int32 `flag:"retention-days" cfg:"retention_days"` RetentionSizePerDay int64 `flag:"retention-size-per-day" cfg:"retention_size_per_day"` StartAsFixMode bool `flag:"start-as-fix-mode"` AllowExtCompatible bool `flag:"allow-ext-compatible" cfg:"allow_ext_compatible"` AllowSubExtCompatible bool `flag:"allow-sub-ext-compatible" cfg:"allow_sub_ext_compatible"` }
type PubInfo ¶
type QueueInterval ¶
type QueueInterval interface { Start() int64 End() int64 EndCnt() uint64 augmentedtree.Interval }
type ReadResult ¶
type ReadResult struct { Offset BackendOffset MovedSize BackendOffset CurCnt int64 Data []byte Err error }
type RemoteMsgTracer ¶
type RemoteMsgTracer struct {
// contains filtered or unexported fields
}
this tracer will send the trace info to remote server for each seconds
func (*RemoteMsgTracer) Start ¶
func (self *RemoteMsgTracer) Start()
func (*RemoteMsgTracer) Stop ¶
func (self *RemoteMsgTracer) Stop()
func (*RemoteMsgTracer) TracePub ¶
func (self *RemoteMsgTracer) TracePub(topic string, part int, pubMethod string, traceID uint64, msg *Message, diskOffset BackendOffset, currentCnt int64)
func (*RemoteMsgTracer) TracePubClient ¶
func (self *RemoteMsgTracer) TracePubClient(topic string, part int, traceID uint64, msgID MessageID, diskOffset BackendOffset, clientID string)
type Topic ¶
func NewTopic ¶
func NewTopicWithExt ¶
func NewTopicWithExt(topicName string, part int, ext bool, opt *Options, writeDisabled int32, notify INsqdNotify, loopFunc func(v *Topic)) *Topic
Topic constructor
func (*Topic) AggregateChannelE2eProcessingLatency ¶
func (*Topic) Close ¶
Close persists all outstanding topic data and closes all its channels
func (*Topic) CloseExistingChannel ¶
func (*Topic) Delete ¶
Delete empties the topic and all its channels and closes
func (*Topic) DeleteExistingChannel ¶
DeleteExistingChannel removes a channel from the topic only if it exists
func (*Topic) Exiting ¶
Exiting returns a boolean indicating if this topic is closed/exiting
func (*Topic) GetChannel ¶
GetChannel performs a thread safe operation to return a pointer to a Channel object (potentially new) for the given Topic
func (*Topic) GetChannelMeta ¶
func (t *Topic) GetChannelMeta() []ChannelMetaInfo
func (*Topic) GetCommitted ¶
func (t *Topic) GetCommitted() BackendQueueEnd
func (*Topic) GetDelayedQueue ¶
func (t *Topic) GetDelayedQueue() *DelayQueue
func (*Topic) GetDelayedQueueConsumedState ¶
func (*Topic) GetDetailStats ¶
func (t *Topic) GetDetailStats() *DetailStatsInfo
func (*Topic) GetDiskQueueSnapshot ¶
func (t *Topic) GetDiskQueueSnapshot() *DiskQueueSnapshot
func (*Topic) GetDynamicInfo ¶
func (t *Topic) GetDynamicInfo() TopicDynamicConf
func (*Topic) GetExistingChannel ¶
func (*Topic) GetMagicCode ¶
should be protected by the topic lock for all partitions
func (*Topic) GetOrCreateDelayedQueueNoLock ¶
func (t *Topic) GetOrCreateDelayedQueueNoLock(idGen MsgIDGenerator) (*DelayQueue, error)
func (*Topic) GetTopicChannelDebugStat ¶
func (*Topic) GetWaitChan ¶
func (t *Topic) GetWaitChan() PubInfoChan
func (*Topic) PutMessage ¶
func (t *Topic) PutMessage(m *Message) (MessageID, BackendOffset, int32, BackendQueueEnd, error)
PutMessage writes a Message to the queue
func (*Topic) PutMessageNoLock ¶
func (t *Topic) PutMessageNoLock(m *Message) (MessageID, BackendOffset, int32, BackendQueueEnd, error)
func (*Topic) PutMessageOnReplica ¶
func (t *Topic) PutMessageOnReplica(m *Message, offset BackendOffset, checkSize int64) (BackendQueueEnd, error)
func (*Topic) PutMessages ¶
func (t *Topic) PutMessages(msgs []*Message) (MessageID, BackendOffset, int32, int64, BackendQueueEnd, error)
PutMessages writes multiple Messages to the queue
func (*Topic) PutMessagesNoLock ¶
func (t *Topic) PutMessagesNoLock(msgs []*Message) (MessageID, BackendOffset, int32, int64, BackendQueueEnd, error)
func (*Topic) PutMessagesOnReplica ¶
func (t *Topic) PutMessagesOnReplica(msgs []*Message, offset BackendOffset, checkSize int64) (BackendQueueEnd, error)
func (*Topic) PutRawDataOnReplica ¶
func (t *Topic) PutRawDataOnReplica(rawData []byte, offset BackendOffset, checkSize int64, msgNum int32) (BackendQueueEnd, error)
func (*Topic) ResetBackendEndNoLock ¶
func (t *Topic) ResetBackendEndNoLock(vend BackendOffset, totalCnt int64) error
func (*Topic) ResetBackendWithQueueStartNoLock ¶
func (*Topic) RollbackNoLock ¶
func (t *Topic) RollbackNoLock(vend BackendOffset, diffCnt uint64) error
func (*Topic) SetDynamicInfo ¶
func (t *Topic) SetDynamicInfo(dynamicConf TopicDynamicConf, idGen MsgIDGenerator)
func (*Topic) SetMagicCode ¶
should be protected by the topic lock for all partitions
func (*Topic) TryCleanOldData ¶
func (t *Topic) TryCleanOldData(retentionSize int64, noRealClean bool, maxCleanOffset BackendOffset) (BackendQueueEnd, error)
maybe should return the cleaned offset to allow commit log clean
func (*Topic) UpdateCommittedOffset ¶
func (t *Topic) UpdateCommittedOffset(offset BackendQueueEnd)
note: multiple writer should be protected by lock
type TopicDynamicConf ¶
type TopicHistoryStatsInfo ¶
type TopicHistoryStatsInfo struct { HourlyPubSize [24]int64 // contains filtered or unexported fields }
func (*TopicHistoryStatsInfo) UpdateHourlySize ¶
func (self *TopicHistoryStatsInfo) UpdateHourlySize(curPubSize int64)
the slave should also update the pub size stat, since the slave need sync with leader (which will cost the write performance)
type TopicMsgStatsInfo ¶
type TopicMsgStatsInfo struct { // <100bytes, <1KB, 2KB, 4KB, 8KB, 16KB, 32KB, 64KB, 128KB, 256KB, 512KB, 1MB, 2MB, 4MB MsgSizeStats [16]int64 // <1024us, 2ms, 4ms, 8ms, 16ms, 32ms, 64ms, 128ms, 256ms, 512ms, 1024ms, 2048ms, 4s, 8s MsgWriteLatencyStats [16]int64 }
func (*TopicMsgStatsInfo) UpdateMsgLatencyStats ¶
func (self *TopicMsgStatsInfo) UpdateMsgLatencyStats(latency int64)
func (*TopicMsgStatsInfo) UpdateMsgSizeStats ¶
func (self *TopicMsgStatsInfo) UpdateMsgSizeStats(msgSize int64)
func (*TopicMsgStatsInfo) UpdateMsgStats ¶
func (self *TopicMsgStatsInfo) UpdateMsgStats(msgSize int64, latency int64)
type TopicStats ¶
type TopicStats struct { TopicName string `json:"topic_name"` TopicFullName string `json:"topic_full_name"` TopicPartition string `json:"topic_partition"` Channels []ChannelStats `json:"channels"` Depth int64 `json:"depth"` BackendDepth int64 `json:"backend_depth"` BackendStart int64 `json:"backend_start"` MessageCount uint64 `json:"message_count"` IsLeader bool `json:"is_leader"` HourlyPubSize int64 `json:"hourly_pubsize"` Clients []ClientPubStats `json:"client_pub_stats"` MsgSizeStats []int64 `json:"msg_size_stats"` MsgWriteLatencyStats []int64 `json:"msg_write_latency_stats"` IsMultiOrdered bool `json:"is_multi_ordered"` IsExt bool `json:"is_ext"` StatsdName string `json:"statsd_name"` E2eProcessingLatency *quantile.Result `json:"e2e_processing_latency"` }
func NewTopicStats ¶
func NewTopicStats(t *Topic, channels []ChannelStats, filterClients bool) TopicStats