Documentation ¶
Index ¶
- Constants
- Variables
- func GetQueueFileName(dataRoot string, base string, fileNum int64) string
- func GetTopicFullName(topic string, part int) string
- func GetTraceIDFromFullMsgID(id FullMessageID) uint64
- func MessageHeaderBytes() int
- func NewBufioReader(r io.Reader) *bufio.Reader
- func NsqLogger() *levellogger.LevelLogger
- func PutBufioReader(br *bufio.Reader)
- func SetRemoteMsgTracer(remote string)
- type BackendOffset
- type BackendQueue
- type BackendQueueEnd
- type BackendQueueOffset
- type BackendQueueReader
- type BackendQueueWriter
- type Channel
- func (c *Channel) AddClient(clientID int64, client Consumer) error
- func (c *Channel) Close() error
- func (c *Channel) ConfirmBackendQueue(msg *Message) (BackendOffset, int64, bool, error)
- func (c *Channel) ConfirmBackendQueueOnSlave(offset BackendOffset, cnt int64, allowBackward bool) error
- func (c *Channel) ContinueConsumeForOrder()
- func (c *Channel) Delete() error
- func (c *Channel) Depth() int64
- func (c *Channel) DepthSize() int64
- func (c *Channel) DepthTimestamp() int64
- func (c *Channel) DisableConsume(disable bool)
- func (c *Channel) Exiting() bool
- func (c *Channel) FinishMessage(clientID int64, clientAddr string, id MessageID) (BackendOffset, int64, bool, error)
- func (c *Channel) GetChannelDebugStats() string
- func (c *Channel) GetChannelEnd() BackendQueueEnd
- func (c *Channel) GetClientMsgChan() chan *Message
- func (c *Channel) GetClients() map[int64]Consumer
- func (c *Channel) GetClientsCount() int
- func (c *Channel) GetConfirmed() BackendQueueEnd
- func (c *Channel) GetInflightNum() int
- func (c *Channel) GetName() string
- func (c *Channel) GetTopicName() string
- func (c *Channel) GetTopicPart() int
- func (c *Channel) IsConfirmed(msg *Message) bool
- func (c *Channel) IsConsumeDisabled() bool
- func (c *Channel) IsEphemeral() bool
- func (c *Channel) IsOrdered() bool
- func (c *Channel) IsPaused() bool
- func (c *Channel) IsTraced() bool
- func (c *Channel) IsWaitingMoreData() bool
- func (c *Channel) Pause() error
- func (c *Channel) RemoveClient(clientID int64)
- func (c *Channel) RequeueClientMessages(clientID int64, clientAddr string)
- func (c *Channel) RequeueMessage(clientID int64, clientAddr string, id MessageID, timeout time.Duration, ...) error
- func (c *Channel) SetConsumeOffset(offset BackendOffset, cnt int64, force bool) error
- func (c *Channel) SetOrdered(enable bool)
- func (c *Channel) SetTrace(enable bool)
- func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, clientAddr string, timeout time.Duration) error
- func (c *Channel) TouchMessage(clientID int64, id MessageID, clientMsgTimeout time.Duration) error
- func (c *Channel) TryWakeupRead()
- func (c *Channel) UnPause() error
- func (c *Channel) UpdateQueueEnd(end BackendQueueEnd, forceReload bool) error
- type ChannelMetaInfo
- type ChannelStats
- type Channels
- type ChannelsByName
- type ClientPubStats
- type ClientStats
- type ClientV2
- func (c *ClientV2) Auth(secret string) error
- func (c *ClientV2) Empty()
- func (c *ClientV2) Exit()
- func (c *ClientV2) FinalClose()
- func (c *ClientV2) FinishedMessage()
- func (c *ClientV2) Flush() error
- func (c *ClientV2) HasAuthorizations() bool
- func (c *ClientV2) Identify(data IdentifyDataV2) error
- func (c *ClientV2) IncrSubError(delta int64)
- func (c *ClientV2) IsAuthorized(topic, channel string) (bool, error)
- func (c *ClientV2) IsReadyForMessages() bool
- func (c *ClientV2) LockRead()
- func (c *ClientV2) LockWrite()
- func (c *ClientV2) Pause()
- func (c *ClientV2) QueryAuthd() error
- func (c *ClientV2) RequeuedMessage(delayed bool)
- func (c *ClientV2) SendingMessage()
- func (c *ClientV2) SetHeartbeatInterval(desiredInterval int) error
- func (c *ClientV2) SetMsgTimeout(msgTimeout int) error
- func (c *ClientV2) SetOutputBufferSize(desiredSize int) error
- func (c *ClientV2) SetOutputBufferTimeout(desiredTimeout int) error
- func (c *ClientV2) SetReadyCount(count int64)
- func (c *ClientV2) SetSampleRate(sampleRate int32) error
- func (c *ClientV2) StartClose()
- func (c *ClientV2) Stats() ClientStats
- func (c *ClientV2) String() string
- func (c *ClientV2) TimedOutMessage(isDefer bool)
- func (c *ClientV2) UnPause()
- func (c *ClientV2) UnlockRead()
- func (c *ClientV2) UnlockWrite()
- func (c *ClientV2) UpgradeDeflate(level int) error
- func (c *ClientV2) UpgradeSnappy() error
- func (c *ClientV2) UpgradeTLS() error
- type Consumer
- type DetailStatsInfo
- func (self *DetailStatsInfo) GetHourlyStats() [24]int64
- func (self *DetailStatsInfo) GetMsgSizeStats() []int64
- func (self *DetailStatsInfo) GetMsgWriteLatencyStats() []int64
- func (self *DetailStatsInfo) GetPubClientStats() []ClientPubStats
- func (self *DetailStatsInfo) LoadHistory(fileName string) error
- func (self *DetailStatsInfo) RemovePubStats(remote string, protocol string)
- func (self *DetailStatsInfo) ResetHistoryInitPub(msgSize int64)
- func (self *DetailStatsInfo) SaveHistory(fileName string) error
- func (self *DetailStatsInfo) UpdateHistory(historyList [24]int64)
- func (self *DetailStatsInfo) UpdatePubClientStats(remote string, agent string, protocol string, count int64, hasErr bool)
- func (self *DetailStatsInfo) UpdateTopicMsgStats(msgSize int64, latency int64)
- type DiskQueueSnapshot
- func (d *DiskQueueSnapshot) Close() error
- func (d *DiskQueueSnapshot) GetCurrentReadQueueOffset() BackendQueueOffset
- func (d *DiskQueueSnapshot) GetQueueReadStart() BackendQueueEnd
- func (d *DiskQueueSnapshot) ReadOne() ReadResult
- func (d *DiskQueueSnapshot) ReadRaw(size int32) ([]byte, error)
- func (d *DiskQueueSnapshot) ResetSeekTo(voffset BackendOffset) error
- func (d *DiskQueueSnapshot) SeekTo(voffset BackendOffset) error
- func (d *DiskQueueSnapshot) SeekToEnd() error
- func (d *DiskQueueSnapshot) SetQueueStart(start BackendQueueEnd)
- func (d *DiskQueueSnapshot) SkipToNext() error
- func (d *DiskQueueSnapshot) UpdateQueueEnd(e BackendQueueEnd)
- type FullMessageID
- type IMsgTracer
- type IdentifyDataV2
- type LogMsgTracer
- type Message
- type MessageID
- type MsgIDGenerator
- type NSQD
- func (n *NSQD) CheckMagicCode(name string, partition int, code int64, tryFix bool) (string, error)
- func (n *NSQD) CleanClientPubStats(remote string, protocol string)
- func (n *NSQD) CloseExistingTopic(topicName string, partition int) error
- func (n *NSQD) DeleteExistingTopic(topicName string, part int) error
- func (n *NSQD) Exit()
- func (n *NSQD) ForceDeleteTopicData(name string, partition int) error
- func (n *NSQD) GetError() error
- func (n *NSQD) GetExistingTopic(topicName string, part int) (*Topic, error)
- func (n *NSQD) GetHealth() string
- func (n *NSQD) GetOpts() *Options
- func (n *NSQD) GetStartTime() time.Time
- func (n *NSQD) GetStats(leaderOnly bool) []TopicStats
- func (n *NSQD) GetTopic(topicName string, part int) *Topic
- func (n *NSQD) GetTopicDefaultPart(topicName string) int
- func (n *NSQD) GetTopicIgnPart(topicName string) *Topic
- func (n *NSQD) GetTopicMapCopy() map[string]map[int]*Topic
- func (n *NSQD) GetTopicMapRef() map[string]map[int]*Topic
- func (n *NSQD) GetTopicPartitions(topicName string) map[int]*Topic
- func (n *NSQD) GetTopicStats(topic string) []TopicStats
- func (n *NSQD) GetTopicWithDisabled(topicName string, part int) *Topic
- func (n *NSQD) IsAuthEnabled() bool
- func (n *NSQD) IsHealthy() bool
- func (n *NSQD) LoadMetadata(disabled int32)
- func (n *NSQD) Notify(v interface{})
- func (n *NSQD) PersistMetadata(currentTopicMap map[string]map[int]*Topic) error
- func (n *NSQD) SetHealth(err error)
- func (n *NSQD) SetPubLoop(loop func(t *Topic))
- func (n *NSQD) SetTopicMagicCode(t *Topic, code int64) error
- func (n *NSQD) Start()
- func (n *NSQD) SwapOpts(opts *Options)
- func (n *NSQD) TriggerOptsNotification()
- func (n *NSQD) UpdateTopicHistoryStats()
- type Options
- type PubInfo
- type PubInfoChan
- type ReadResult
- type RemoteMsgTracer
- func (self *RemoteMsgTracer) Start()
- func (self *RemoteMsgTracer) Stop()
- func (self *RemoteMsgTracer) TracePub(topic string, traceID uint64, msg *Message, diskOffset BackendOffset, ...)
- func (self *RemoteMsgTracer) TraceSub(topic string, channel string, state string, traceID uint64, msg *Message, ...)
- type Topic
- func (t *Topic) AggregateChannelE2eProcessingLatency() *quantile.Quantile
- func (t *Topic) BufferPoolGet(capacity int) *bytes.Buffer
- func (t *Topic) BufferPoolPut(b *bytes.Buffer)
- func (t *Topic) Close() error
- func (t *Topic) Delete() error
- func (t *Topic) DeleteExistingChannel(channelName string) error
- func (t *Topic) DisableForSlave()
- func (t *Topic) Empty() error
- func (t *Topic) EnableForMaster()
- func (t *Topic) Exiting() bool
- func (t *Topic) ForceFlush()
- func (t *Topic) GetChannel(channelName string) *Channel
- func (t *Topic) GetChannelMapCopy() map[string]*Channel
- func (t *Topic) GetCommitted() BackendQueueEnd
- func (t *Topic) GetDetailStats() *DetailStatsInfo
- func (t *Topic) GetDiskQueueSnapshot() *DiskQueueSnapshot
- func (t *Topic) GetDynamicInfo() TopicDynamicConf
- func (t *Topic) GetExistingChannel(channelName string) (*Channel, error)
- func (t *Topic) GetFullName() string
- func (t *Topic) GetMagicCode() int64
- func (t *Topic) GetMsgGenerator() MsgIDGenerator
- func (t *Topic) GetQueueReadStart() int64
- func (t *Topic) GetTopicChannelDebugStat(channelName string) string
- func (t *Topic) GetTopicName() string
- func (t *Topic) GetTopicPart() int
- func (t *Topic) GetWaitChan() PubInfoChan
- func (t *Topic) IsDataNeedFix() bool
- func (t *Topic) IsWriteDisabled() bool
- func (t *Topic) LoadChannelMeta() error
- func (t *Topic) LoadHistoryStats() error
- func (t *Topic) MarkAsRemoved() (string, error)
- func (t *Topic) NotifyReloadChannels()
- func (t *Topic) PrintCurrentStats()
- func (t *Topic) PutMessage(m *Message) (MessageID, BackendOffset, int32, BackendQueueEnd, error)
- func (t *Topic) PutMessageNoLock(m *Message) (MessageID, BackendOffset, int32, BackendQueueEnd, error)
- func (t *Topic) PutMessageOnReplica(m *Message, offset BackendOffset) (BackendQueueEnd, error)
- func (t *Topic) PutMessages(msgs []*Message) (MessageID, BackendOffset, int32, int64, BackendQueueEnd, error)
- func (t *Topic) PutMessagesNoLock(msgs []*Message) (MessageID, BackendOffset, int32, int64, BackendQueueEnd, error)
- func (t *Topic) PutMessagesOnReplica(msgs []*Message, offset BackendOffset) (BackendQueueEnd, error)
- func (t *Topic) QuitChan() <-chan struct{}
- func (t *Topic) RemoveChannelMeta()
- func (t *Topic) ResetBackendEndNoLock(vend BackendOffset, totalCnt int64) error
- func (t *Topic) ResetBackendWithQueueStartNoLock(queueStartOffset int64, queueStartCnt int64) error
- func (t *Topic) RollbackNoLock(vend BackendOffset, diffCnt uint64) error
- func (t *Topic) SaveChannelMeta() error
- func (t *Topic) SaveHistoryStats() error
- func (t *Topic) SetDataFixState(needFix bool)
- func (t *Topic) SetDynamicInfo(dynamicConf TopicDynamicConf, idGen MsgIDGenerator)
- func (t *Topic) SetMagicCode(code int64) error
- func (t *Topic) SetMsgGenerator(idGen MsgIDGenerator)
- func (t *Topic) SetTrace(enable bool)
- func (t *Topic) TotalDataSize() int64
- func (t *Topic) TotalMessageCnt() uint64
- func (t *Topic) TryCleanOldData(retentionSize int64, noRealClean bool, maxCleanOffset BackendOffset) (BackendQueueEnd, error)
- func (t *Topic) UpdateCommittedOffset(offset BackendQueueEnd)
- type TopicDynamicConf
- type TopicHistoryStatsInfo
- type TopicMsgStatsInfo
- type TopicStats
- type Topics
- type TopicsByName
- type TraceLogItemInfo
Constants ¶
const ( MsgIDLength = 16 MsgTraceIDLength = 8 )
const ( TLSNotRequired = iota TLSRequiredExceptHTTP TLSRequired )
const ( MAX_TOPIC_PARTITION = 1023 HISTORY_STAT_FILE_NAME = ".stat.history.dat" )
const (
MAX_NODE_ID = 1024 * 1024
)
const (
MAX_POSSIBLE_MSG_SIZE = 1 << 28
)
const (
MAX_QUEUE_OFFSET_META_DATA_KEEP = 100
)
Variables ¶
var ( ErrMsgNotInFlight = errors.New("Message ID not in flight") ErrMsgAlreadyInFlight = errors.New("Message ID already in flight") ErrConsumeDisabled = errors.New("Consume is disabled currently") ErrMsgDeferred = errors.New("Message is deferred") ErrSetConsumeOffsetNotFirstClient = errors.New("consume offset can only be changed by the first consume client") ErrNotDiskQueueReader = errors.New("the consume channel is not disk queue reader") )
var ( ErrReadQueueAlreadyCleaned = errors.New("the queue position has been cleaned") ErrConfirmSizeInvalid = errors.New("Confirm data size invalid.") ErrConfirmCntInvalid = errors.New("Confirm message count invalid.") ErrMoveOffsetInvalid = errors.New("move offset invalid") ErrOffsetTypeMismatch = errors.New("offset type mismatch") ErrReadQueueCountMissing = errors.New("read queue count info missing") ErrReadEndOfQueue = errors.New("read to the end of queue") ErrInvalidReadable = errors.New("readable data is invalid") ErrExiting = errors.New("exiting") )
var ( ErrInvalidOffset = errors.New("invalid offset") ErrNeedFixQueueStart = errors.New("init queue start should be fixed") )
var ( ErrTopicPartitionMismatch = errors.New("topic partition mismatch") ErrTopicNotExist = errors.New("topic does not exist") )
var ( ErrInvalidMessageID = errors.New("message id is invalid") ErrWriteOffsetMismatch = errors.New("write offset mismatch") ErrOperationInvalidState = errors.New("the operation is not allowed under current state") )
var DEFAULT_RETENTION_DAYS = 7
Functions ¶
func GetTraceIDFromFullMsgID ¶
func GetTraceIDFromFullMsgID(id FullMessageID) uint64
func NsqLogger ¶
func NsqLogger() *levellogger.LevelLogger
Types ¶
type BackendQueue ¶
type BackendQueue interface { Put([]byte) error ReadChan() chan []byte // this is expected to be an *unbuffered* channel Close() error Delete() error Depth() int64 Empty() error }
BackendQueue represents the behavior for the secondary message storage system
type BackendQueueEnd ¶
type BackendQueueEnd interface { Offset() BackendOffset TotalMsgCnt() int64 IsSame(BackendQueueEnd) bool }
type BackendQueueOffset ¶
type BackendQueueOffset interface {
Offset() BackendOffset
}
type BackendQueueReader ¶
type BackendQueueReader interface { ConfirmRead(BackendOffset, int64) error ResetReadToConfirmed() (BackendQueueEnd, error) SkipReadToOffset(BackendOffset, int64) (BackendQueueEnd, error) SkipReadToEnd() (BackendQueueEnd, error) Close() error // left data to be read Depth() int64 DepthSize() int64 GetQueueReadEnd() BackendQueueEnd GetQueueConfirmed() BackendQueueEnd Delete() error UpdateQueueEnd(BackendQueueEnd, bool) (bool, error) TryReadOne() (ReadResult, bool) }
for channel consumer
type BackendQueueWriter ¶
type BackendQueueWriter interface { Put([]byte) (BackendOffset, int32, int64, error) Close() error Delete() error Empty() error Flush() error GetQueueWriteEnd() BackendQueueEnd GetQueueReadEnd() BackendQueueEnd RollbackWrite(BackendOffset, uint64) error ResetWriteEnd(BackendOffset, int64) error }
for topic producer
type Channel ¶
type Channel struct { sync.RWMutex // stat counters EnableTrace int32 // contains filtered or unexported fields }
Channel represents the concrete type for a NSQ channel (and also implements the Queue interface)
There can be multiple channels per topic, each with there own unique set of subscribers (clients).
Channels maintain all client and message metadata, orchestrating in-flight messages, timeouts, requeuing, etc.
func NewChannel ¶
func NewChannel(topicName string, part int, channelName string, chEnd BackendQueueEnd, opt *Options, deleteCallback func(*Channel), consumeDisabled int32, notify func(v interface{})) *Channel
NewChannel creates a new instance of the Channel type and returns a pointer
func (*Channel) AddClient ¶
AddClient adds a client to the Channel's client list
func (*Channel) ConfirmBackendQueue ¶
in order not to make the confirm map too large, we need handle this case: a old message is not confirmed, and we keep all the newer confirmed messages so we can confirm later. indicated weather the confirmed offset is changed
func (*Channel) ConfirmBackendQueueOnSlave ¶
func (c *Channel) ConfirmBackendQueueOnSlave(offset BackendOffset, cnt int64, allowBackward bool) error
func (*Channel) Exiting ¶
Exiting returns a boolean indicating if this channel is closed/exiting
func (*Channel) FinishMessage ¶
func (c *Channel) FinishMessage(clientID int64, clientAddr string, id MessageID) (BackendOffset, int64, bool, error)
FinishMessage successfully discards an in-flight message
func (*Channel) GetChannelEnd ¶
func (c *Channel) GetChannelEnd() BackendQueueEnd
func (*Channel) GetConfirmed ¶
func (c *Channel) GetConfirmed() BackendQueueEnd
func (*Channel) RemoveClient ¶
RemoveClient removes a client from the Channel's client list
func (*Channel) RequeueClientMessages ¶
func (*Channel) RequeueMessage ¶
func (c *Channel) RequeueMessage(clientID int64, clientAddr string, id MessageID, timeout time.Duration, byClient bool) error
RequeueMessage requeues a message based on `time.Duration`, ie:
`timeoutMs` == 0 - requeue a message immediately `timeoutMs` > 0 - asynchronously wait for the specified timeout
and requeue a message
func (*Channel) SetConsumeOffset ¶
func (c *Channel) SetConsumeOffset(offset BackendOffset, cnt int64, force bool) error
func (*Channel) StartInFlightTimeout ¶
func (*Channel) TouchMessage ¶
TouchMessage resets the timeout for an in-flight message
func (*Channel) UpdateQueueEnd ¶
func (c *Channel) UpdateQueueEnd(end BackendQueueEnd, forceReload bool) error
When topic message is put, update the new end of the queue
type ChannelMetaInfo ¶
type ChannelStats ¶
type ChannelStats struct { ChannelName string `json:"channel_name"` // message size need to consume Depth int64 `json:"depth"` DepthSize int64 `json:"depth_size"` DepthTimestamp string `json:"depth_ts"` BackendDepth int64 `json:"backend_depth"` // total size sub past hour on this channel HourlySubSize int64 `json:"hourly_subsize"` InFlightCount int `json:"in_flight_count"` DeferredCount int `json:"deferred_count"` MessageCount uint64 `json:"message_count"` RequeueCount uint64 `json:"requeue_count"` TimeoutCount uint64 `json:"timeout_count"` Clients []ClientStats `json:"clients"` Paused bool `json:"paused"` E2eProcessingLatency *quantile.Result `json:"e2e_processing_latency"` }
func NewChannelStats ¶
func NewChannelStats(c *Channel, clients []ClientStats) ChannelStats
type ChannelsByName ¶
type ChannelsByName struct {
Channels
}
func (ChannelsByName) Less ¶
func (c ChannelsByName) Less(i, j int) bool
type ClientPubStats ¶
type ClientStats ¶
type ClientStats struct { // TODO: deprecated, remove in 1.0 Name string `json:"name"` ClientID string `json:"client_id"` Hostname string `json:"hostname"` Version string `json:"version"` RemoteAddress string `json:"remote_address"` State int32 `json:"state"` ReadyCount int64 `json:"ready_count"` InFlightCount int64 `json:"in_flight_count"` MessageCount uint64 `json:"message_count"` FinishCount uint64 `json:"finish_count"` RequeueCount uint64 `json:"requeue_count"` TimeoutCount int64 `json:"timeout_count"` DeferredCount int64 `json:"deferred_count"` ConnectTime int64 `json:"connect_ts"` SampleRate int32 `json:"sample_rate"` Deflate bool `json:"deflate"` Snappy bool `json:"snappy"` UserAgent string `json:"user_agent"` Authed bool `json:"authed,omitempty"` AuthIdentity string `json:"auth_identity,omitempty"` AuthIdentityURL string `json:"auth_identity_url,omitempty"` TLS bool `json:"tls"` CipherSuite string `json:"tls_cipher_suite"` TLSVersion string `json:"tls_version"` TLSNegotiatedProtocol string `json:"tls_negotiated_protocol"` TLSNegotiatedProtocolIsMutual bool `json:"tls_negotiated_protocol_is_mutual"` }
type ClientV2 ¶
type ClientV2 struct { // 64bit atomic vars need to be first for proper alignment on 32bit platforms ReadyCount int64 InFlightCount int64 MessageCount uint64 FinishCount uint64 RequeueCount uint64 TimeoutCount int64 DeferredCount int64 ID int64 UserAgent string // original connection net.Conn // reading/writing interfaces Reader *bufio.Reader Writer *bufio.Writer OutputBufferSize int OutputBufferTimeout time.Duration HeartbeatInterval time.Duration MsgTimeout time.Duration State int32 ConnectTime time.Time Channel *Channel ReadyStateChan chan int // this is only used by notify messagebump to quit // and should be closed by the read loop only ExitChan chan int ClientID string Hostname string SampleRate int32 IdentifyEventChan chan identifyEvent SubEventChan chan *Channel TLS int32 Snappy int32 Deflate int32 LenSlice []byte AuthSecret string AuthState *auth.State EnableTrace bool PubTimeout *time.Timer // contains filtered or unexported fields }
func NewClientV2 ¶
func (*ClientV2) Identify ¶
func (c *ClientV2) Identify(data IdentifyDataV2) error
func (*ClientV2) IsAuthorized ¶
func (*ClientV2) SetHeartbeatInterval ¶
func (*ClientV2) SetOutputBufferSize ¶
func (*ClientV2) SetOutputBufferTimeout ¶
func (*ClientV2) Stats ¶
func (c *ClientV2) Stats() ClientStats
type Consumer ¶
type Consumer interface { UnPause() Pause() TimedOutMessage(delayed bool) Stats() ClientStats Exit() Empty() String() string }
type DetailStatsInfo ¶
func NewDetailStatsInfo ¶
func NewDetailStatsInfo(initPubSize int64, historyPath string) *DetailStatsInfo
func (*DetailStatsInfo) GetHourlyStats ¶
func (self *DetailStatsInfo) GetHourlyStats() [24]int64
func (*DetailStatsInfo) GetMsgSizeStats ¶
func (self *DetailStatsInfo) GetMsgSizeStats() []int64
func (*DetailStatsInfo) GetMsgWriteLatencyStats ¶
func (self *DetailStatsInfo) GetMsgWriteLatencyStats() []int64
func (*DetailStatsInfo) GetPubClientStats ¶
func (self *DetailStatsInfo) GetPubClientStats() []ClientPubStats
func (*DetailStatsInfo) LoadHistory ¶
func (self *DetailStatsInfo) LoadHistory(fileName string) error
func (*DetailStatsInfo) RemovePubStats ¶
func (self *DetailStatsInfo) RemovePubStats(remote string, protocol string)
func (*DetailStatsInfo) ResetHistoryInitPub ¶
func (self *DetailStatsInfo) ResetHistoryInitPub(msgSize int64)
func (*DetailStatsInfo) SaveHistory ¶
func (self *DetailStatsInfo) SaveHistory(fileName string) error
func (*DetailStatsInfo) UpdateHistory ¶
func (self *DetailStatsInfo) UpdateHistory(historyList [24]int64)
func (*DetailStatsInfo) UpdatePubClientStats ¶
func (*DetailStatsInfo) UpdateTopicMsgStats ¶
func (self *DetailStatsInfo) UpdateTopicMsgStats(msgSize int64, latency int64)
type DiskQueueSnapshot ¶
note: the message count info is not kept in snapshot
func NewDiskQueueSnapshot ¶
func NewDiskQueueSnapshot(readFrom string, dataPath string, endInfo BackendQueueEnd) *DiskQueueSnapshot
newDiskQueue instantiates a new instance of DiskQueueSnapshot, retrieving metadata from the filesystem and starting the read ahead goroutine
func (*DiskQueueSnapshot) Close ¶
func (d *DiskQueueSnapshot) Close() error
Close cleans up the queue and persists metadata
func (*DiskQueueSnapshot) GetCurrentReadQueueOffset ¶
func (d *DiskQueueSnapshot) GetCurrentReadQueueOffset() BackendQueueOffset
func (*DiskQueueSnapshot) GetQueueReadStart ¶
func (d *DiskQueueSnapshot) GetQueueReadStart() BackendQueueEnd
func (*DiskQueueSnapshot) ReadOne ¶
func (d *DiskQueueSnapshot) ReadOne() ReadResult
readOne performs a low level filesystem read for a single []byte while advancing read positions and rolling files, if necessary
func (*DiskQueueSnapshot) ReadRaw ¶
func (d *DiskQueueSnapshot) ReadRaw(size int32) ([]byte, error)
func (*DiskQueueSnapshot) ResetSeekTo ¶
func (d *DiskQueueSnapshot) ResetSeekTo(voffset BackendOffset) error
this can allow backward seek
func (*DiskQueueSnapshot) SeekTo ¶
func (d *DiskQueueSnapshot) SeekTo(voffset BackendOffset) error
func (*DiskQueueSnapshot) SeekToEnd ¶
func (d *DiskQueueSnapshot) SeekToEnd() error
func (*DiskQueueSnapshot) SetQueueStart ¶
func (d *DiskQueueSnapshot) SetQueueStart(start BackendQueueEnd)
func (*DiskQueueSnapshot) SkipToNext ¶
func (d *DiskQueueSnapshot) SkipToNext() error
func (*DiskQueueSnapshot) UpdateQueueEnd ¶
func (d *DiskQueueSnapshot) UpdateQueueEnd(e BackendQueueEnd)
Put writes a []byte to the queue
type FullMessageID ¶
type FullMessageID [MsgIDLength]byte
the new message total id will be ID+TraceID, the length is same with old id slice, the traceid only used for trace for business, the ID is used for internal. In order to be compatible with old format, we keep the attempts field.
type IMsgTracer ¶
type IMsgTracer interface { Start() TracePub(topic string, traceID uint64, msg *Message, diskOffset BackendOffset, currentCnt int64) // state will be READ_QUEUE, Start, Req, Fin, Timeout TraceSub(topic string, channel string, state string, traceID uint64, msg *Message, clientID string) }
func NewRemoteMsgTracer ¶
func NewRemoteMsgTracer(remote string) IMsgTracer
type IdentifyDataV2 ¶
type IdentifyDataV2 struct { ShortID string `json:"short_id"` // TODO: deprecated, remove in 1.0 LongID string `json:"long_id"` // TODO: deprecated, remove in 1.0 ClientID string `json:"client_id"` Hostname string `json:"hostname"` HeartbeatInterval int `json:"heartbeat_interval"` OutputBufferSize int `json:"output_buffer_size"` OutputBufferTimeout int `json:"output_buffer_timeout"` FeatureNegotiation bool `json:"feature_negotiation"` TLSv1 bool `json:"tls_v1"` Deflate bool `json:"deflate"` DeflateLevel int `json:"deflate_level"` Snappy bool `json:"snappy"` SampleRate int32 `json:"sample_rate"` UserAgent string `json:"user_agent"` MsgTimeout int `json:"msg_timeout"` }
type LogMsgTracer ¶
type LogMsgTracer struct {
MID string
}
just print the trace log
func (*LogMsgTracer) Start ¶
func (self *LogMsgTracer) Start()
func (*LogMsgTracer) TracePub ¶
func (self *LogMsgTracer) TracePub(topic string, traceID uint64, msg *Message, diskOffset BackendOffset, currentCnt int64)
type Message ¶ added in v0.2.29
type Message struct { ID MessageID TraceID uint64 Body []byte Timestamp int64 Attempts uint16 // contains filtered or unexported fields }
func (*Message) GetFullMsgID ¶
func (m *Message) GetFullMsgID() FullMessageID
type MessageID ¶ added in v0.2.29
type MessageID uint64
func GetMessageIDFromFullMsgID ¶
func GetMessageIDFromFullMsgID(id FullMessageID) MessageID
type NSQD ¶
type NSQD struct { sync.RWMutex MetaNotifyChan chan interface{} OptsNotificationChan chan struct{} // contains filtered or unexported fields }
func (*NSQD) CheckMagicCode ¶
func (*NSQD) CleanClientPubStats ¶
func (*NSQD) CloseExistingTopic ¶
this just close the topic and remove from map, but keep the data for later.
func (*NSQD) DeleteExistingTopic ¶
DeleteExistingTopic removes a topic only if it exists
func (*NSQD) ForceDeleteTopicData ¶
func (*NSQD) GetExistingTopic ¶
GetExistingTopic gets a topic only if it exists
func (*NSQD) GetStats ¶
func (n *NSQD) GetStats(leaderOnly bool) []TopicStats
func (*NSQD) GetTopic ¶
GetTopic performs a thread safe operation to return a pointer to a Topic object (potentially new)
func (*NSQD) GetTopicMapRef ¶
should be protected by read lock
func (*NSQD) GetTopicPartitions ¶
func (*NSQD) GetTopicStats ¶
func (n *NSQD) GetTopicStats(topic string) []TopicStats
func (*NSQD) GetTopicWithDisabled ¶
func (*NSQD) PersistMetadata ¶
type Options ¶ added in v0.3.6
type Options struct { // basic options ID int64 `flag:"worker-id" cfg:"id"` Verbose bool `flag:"verbose"` ClusterID string `flag:"cluster-id"` ClusterLeadershipAddresses string `flag:"cluster-leadership-addresses" cfg:"cluster_leadership_addresses"` TCPAddress string `flag:"tcp-address"` RPCPort string `flag:"rpc-port"` ReverseProxyPort string `flag:"reverse-proxy-port"` HTTPAddress string `flag:"http-address"` HTTPSAddress string `flag:"https-address"` BroadcastAddress string `flag:"broadcast-address"` BroadcastInterface string `flag:"broadcast-interface"` NSQLookupdTCPAddresses []string `flag:"lookupd-tcp-address" cfg:"nsqlookupd_tcp_addresses"` AuthHTTPAddresses []string `flag:"auth-http-address" cfg:"auth_http_addresses"` LookupPingInterval time.Duration `flag:"lookup-ping-interval" arg:"5s"` // diskqueue options DataPath string `flag:"data-path"` MemQueueSize int64 `flag:"mem-queue-size"` MaxBytesPerFile int64 `flag:"max-bytes-per-file"` SyncEvery int64 `flag:"sync-every"` SyncTimeout time.Duration `flag:"sync-timeout"` QueueScanInterval time.Duration QueueScanRefreshInterval time.Duration QueueScanSelectionCount int QueueScanWorkerPoolMax int QueueScanDirtyPercent float64 // msg and command options MsgTimeout time.Duration `flag:"msg-timeout" arg:"60s"` MaxMsgTimeout time.Duration `flag:"max-msg-timeout"` MaxMsgSize int64 `flag:"max-msg-size" deprecated:"max-message-size" cfg:"max_msg_size"` MaxBodySize int64 `flag:"max-body-size"` MaxReqTimeout time.Duration `flag:"max-req-timeout"` MaxConfirmWin int64 `flag:"max-confirm-win"` ClientTimeout time.Duration // client overridable configuration options MaxHeartbeatInterval time.Duration `flag:"max-heartbeat-interval"` MaxRdyCount int64 `flag:"max-rdy-count"` MaxOutputBufferSize int64 `flag:"max-output-buffer-size"` MaxOutputBufferTimeout time.Duration `flag:"max-output-buffer-timeout"` // statsd integration StatsdAddress string `flag:"statsd-address"` StatsdPrefix string `flag:"statsd-prefix"` StatsdProtocol string `flag:"statsd-protocol"` StatsdInterval time.Duration `flag:"statsd-interval" arg:"60s"` StatsdMemStats bool `flag:"statsd-mem-stats"` // e2e message latency E2EProcessingLatencyWindowTime time.Duration `flag:"e2e-processing-latency-window-time"` E2EProcessingLatencyPercentiles []float64 `flag:"e2e-processing-latency-percentile" cfg:"e2e_processing_latency_percentiles"` // TLS config TLSCert string `flag:"tls-cert"` TLSKey string `flag:"tls-key"` TLSClientAuthPolicy string `flag:"tls-client-auth-policy"` TLSRootCAFile string `flag:"tls-root-ca-file"` TLSRequired int `flag:"tls-required"` TLSMinVersion uint16 `flag:"tls-min-version"` // compression DeflateEnabled bool `flag:"deflate"` MaxDeflateLevel int `flag:"max-deflate-level"` SnappyEnabled bool `flag:"snappy"` LogLevel int32 `flag:"log-level" cfg:"log_level"` LogDir string `flag:"log-dir" cfg:"log_dir"` Logger levellogger.Logger RemoteTracer string `flag:"remote-tracer"` RetentionDays int32 `flag:"retention-days" cfg:"retention_days"` }
type PubInfo ¶
type ReadResult ¶
type ReadResult struct { Offset BackendOffset MovedSize BackendOffset CurCnt int64 Data []byte Err error }
type RemoteMsgTracer ¶
type RemoteMsgTracer struct {
// contains filtered or unexported fields
}
this tracer will send the trace info to remote server for each seconds
func (*RemoteMsgTracer) Start ¶
func (self *RemoteMsgTracer) Start()
func (*RemoteMsgTracer) Stop ¶
func (self *RemoteMsgTracer) Stop()
func (*RemoteMsgTracer) TracePub ¶
func (self *RemoteMsgTracer) TracePub(topic string, traceID uint64, msg *Message, diskOffset BackendOffset, currentCnt int64)
type Topic ¶
func NewTopic ¶
func NewTopic(topicName string, part int, opt *Options, deleteCallback func(*Topic), writeDisabled int32, notify func(v interface{}), loopFunc func(v *Topic)) *Topic
Topic constructor
func (*Topic) AggregateChannelE2eProcessingLatency ¶
func (*Topic) Close ¶
Close persists all outstanding topic data and closes all its channels
func (*Topic) Delete ¶
Delete empties the topic and all its channels and closes
func (*Topic) DeleteExistingChannel ¶
DeleteExistingChannel removes a channel from the topic only if it exists
func (*Topic) Exiting ¶
Exiting returns a boolean indicating if this topic is closed/exiting
func (*Topic) GetChannel ¶
GetChannel performs a thread safe operation to return a pointer to a Channel object (potentially new) for the given Topic
func (*Topic) GetChannelMapCopy ¶
should be protected by read lock outside
func (*Topic) GetCommitted ¶
func (t *Topic) GetCommitted() BackendQueueEnd
func (*Topic) GetDetailStats ¶
func (t *Topic) GetDetailStats() *DetailStatsInfo
func (*Topic) GetDiskQueueSnapshot ¶
func (t *Topic) GetDiskQueueSnapshot() *DiskQueueSnapshot
func (*Topic) GetDynamicInfo ¶
func (t *Topic) GetDynamicInfo() TopicDynamicConf
func (*Topic) GetExistingChannel ¶
func (*Topic) GetMagicCode ¶
should be protected by the topic lock for all partitions
func (*Topic) GetMsgGenerator ¶
func (t *Topic) GetMsgGenerator() MsgIDGenerator
func (*Topic) GetTopicChannelDebugStat ¶
func (*Topic) GetWaitChan ¶
func (t *Topic) GetWaitChan() PubInfoChan
func (*Topic) PutMessage ¶
func (t *Topic) PutMessage(m *Message) (MessageID, BackendOffset, int32, BackendQueueEnd, error)
PutMessage writes a Message to the queue
func (*Topic) PutMessageNoLock ¶
func (t *Topic) PutMessageNoLock(m *Message) (MessageID, BackendOffset, int32, BackendQueueEnd, error)
func (*Topic) PutMessageOnReplica ¶
func (t *Topic) PutMessageOnReplica(m *Message, offset BackendOffset) (BackendQueueEnd, error)
func (*Topic) PutMessages ¶
func (t *Topic) PutMessages(msgs []*Message) (MessageID, BackendOffset, int32, int64, BackendQueueEnd, error)
PutMessages writes multiple Messages to the queue
func (*Topic) PutMessagesNoLock ¶
func (t *Topic) PutMessagesNoLock(msgs []*Message) (MessageID, BackendOffset, int32, int64, BackendQueueEnd, error)
func (*Topic) PutMessagesOnReplica ¶
func (t *Topic) PutMessagesOnReplica(msgs []*Message, offset BackendOffset) (BackendQueueEnd, error)
func (*Topic) ResetBackendEndNoLock ¶
func (t *Topic) ResetBackendEndNoLock(vend BackendOffset, totalCnt int64) error
func (*Topic) ResetBackendWithQueueStartNoLock ¶
func (*Topic) RollbackNoLock ¶
func (t *Topic) RollbackNoLock(vend BackendOffset, diffCnt uint64) error
func (*Topic) SetDynamicInfo ¶
func (t *Topic) SetDynamicInfo(dynamicConf TopicDynamicConf, idGen MsgIDGenerator)
func (*Topic) SetMagicCode ¶
should be protected by the topic lock for all partitions
func (*Topic) SetMsgGenerator ¶
func (t *Topic) SetMsgGenerator(idGen MsgIDGenerator)
func (*Topic) TryCleanOldData ¶
func (t *Topic) TryCleanOldData(retentionSize int64, noRealClean bool, maxCleanOffset BackendOffset) (BackendQueueEnd, error)
maybe should return the cleaned offset to allow commit log clean
func (*Topic) UpdateCommittedOffset ¶
func (t *Topic) UpdateCommittedOffset(offset BackendQueueEnd)
note: multiple writer should be protected by lock
type TopicDynamicConf ¶
type TopicHistoryStatsInfo ¶
type TopicHistoryStatsInfo struct { HourlyPubSize [24]int64 // contains filtered or unexported fields }
func (*TopicHistoryStatsInfo) UpdateHourlySize ¶
func (self *TopicHistoryStatsInfo) UpdateHourlySize(curPubSize int64)
the slave should also update the pub size stat, since the slave need sync with leader (which will cost the write performance)
type TopicMsgStatsInfo ¶
type TopicMsgStatsInfo struct { // <100bytes, <1KB, 2KB, 4KB, 8KB, 16KB, 32KB, 64KB, 128KB, 256KB, 512KB, 1MB, 2MB, 4MB MsgSizeStats [16]int64 // <1024us, 2ms, 4ms, 8ms, 16ms, 32ms, 64ms, 128ms, 256ms, 512ms, 1024ms, 2048ms, 4s, 8s MsgWriteLatencyStats [16]int64 }
func (*TopicMsgStatsInfo) UpdateMsgLatencyStats ¶
func (self *TopicMsgStatsInfo) UpdateMsgLatencyStats(latency int64)
func (*TopicMsgStatsInfo) UpdateMsgSizeStats ¶
func (self *TopicMsgStatsInfo) UpdateMsgSizeStats(msgSize int64)
func (*TopicMsgStatsInfo) UpdateMsgStats ¶
func (self *TopicMsgStatsInfo) UpdateMsgStats(msgSize int64, latency int64)
type TopicStats ¶
type TopicStats struct { TopicName string `json:"topic_name"` TopicFullName string `json:"topic_full_name"` TopicPartition string `json:"topic_partition"` Channels []ChannelStats `json:"channels"` Depth int64 `json:"depth"` BackendDepth int64 `json:"backend_depth"` BackendStart int64 `json:"backend_start"` MessageCount uint64 `json:"message_count"` IsLeader bool `json:"is_leader"` HourlyPubSize int64 `json:"hourly_pubsize"` Clients []ClientPubStats `json:"client_pub_stats"` MsgSizeStats []int64 `json:"msg_size_stats"` MsgWriteLatencyStats []int64 `json:"msg_write_latency_stats"` E2eProcessingLatency *quantile.Result `json:"e2e_processing_latency"` }
func NewTopicStats ¶
func NewTopicStats(t *Topic, channels []ChannelStats) TopicStats