nsqd

package
v0.3.7-HA.1.9.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 26, 2019 License: MIT Imports: 46 Imported by: 0

README

nsqd

nsqd is the daemon that receives, queues, and delivers messages to clients.

Read the docs

Documentation

Index

Constants

View Source
const (
	MaxMemReqTimes    = 10
	MaxWaitingDelayed = 100
	MaxDepthReqToEnd  = 1000000
	ZanTestSkip       = 0
	ZanTestUnskip     = 1

	DefaultMaxChDelayedQNum = 10000 * 16
)
View Source
const (
	MinDelayedType      = 1
	ChannelDelayed      = 1
	PubDelayed          = 2
	TransactionDelayed  = 3
	MaxDelayedType      = 4
	TxMaxSize           = 65536
	CompactCntThreshold = 40000
)
View Source
const (
	MsgIDLength         = 16
	MsgTraceIDLength    = 8
	MsgJsonHeaderLength = 2

	MaxAttempts = 4000
)
View Source
const (
	TLSNotRequired = iota
	TLSRequiredExceptHTTP
	TLSRequired
)
View Source
const (
	MAX_TOPIC_PARTITION    = 1023
	HISTORY_STAT_FILE_NAME = ".stat.history.dat"
)
View Source
const (
	FLUSH_DISTANCE = 4
)
View Source
const (
	MAX_NODE_ID = 1024 * 1024
)
View Source
const (
	MAX_POSSIBLE_MSG_SIZE = 1 << 28
)
View Source
const (
	MAX_QUEUE_OFFSET_META_DATA_KEEP = 100
)

Variables

View Source
var (
	ErrMsgNotInFlight                 = errors.New("Message ID not in flight")
	ErrMsgDeferredTooMuch             = errors.New("Too much deferred messages in flight")
	ErrMsgAlreadyInFlight             = errors.New("Message ID already in flight")
	ErrConsumeDisabled                = errors.New("Consume is disabled currently")
	ErrMsgDeferred                    = errors.New("Message is deferred")
	ErrSetConsumeOffsetNotFirstClient = errors.New("consume offset can only be changed by the first consume client")
	ErrNotDiskQueueReader             = errors.New("the consume channel is not disk queue reader")
)
View Source
var (
	ErrReadQueueAlreadyCleaned = errors.New("the queue position has been cleaned")
	ErrConfirmSizeInvalid      = errors.New("Confirm data size invalid.")
	ErrConfirmCntInvalid       = errors.New("Confirm message count invalid.")
	ErrMoveOffsetInvalid       = errors.New("move offset invalid")
	ErrMoveOffsetOverflowed    = errors.New("disk reader move offset overflowed the end")
	ErrOffsetTypeMismatch      = errors.New("offset type mismatch")
	ErrReadQueueCountMissing   = errors.New("read queue count info missing")
	ErrReadEndOfQueue          = errors.New("read to the end of queue")
	ErrInvalidReadable         = errors.New("readable data is invalid")
	ErrReadEndChangeToOld      = errors.New("queue read end change to old without reload")
	ErrExiting                 = errors.New("exiting")
)
View Source
var (
	ErrInvalidOffset     = errors.New("invalid offset")
	ErrNeedFixQueueStart = errors.New("init queue start should be fixed")
)
View Source
var (
	ErrNotSupportedFilter = errors.New("the filter type not supported")
	ErrInvalidFilter      = errors.New("invalid filter rule")
)
{
	"ver":1,
	"filter_ext_key":"xx",
	"filter_data":"filterA",
}
{
	"ver":2,
	"filter_ext_key":"xx",
	"filter_data":"regexp string",
}
{
	"ver":3,
	"filter_ext_key":"xx",
	"filter_data":"glob rule",
}

ver is used to extend other filter type currently support equal, regexp, glob TODO: maybe support multi ext key rule chains such as, filter if match_rule1(ext_key1) and match_rule2(ext_key2) or match_rule3(ext_key3)

View Source
var (
	ErrTopicPartitionMismatch = errors.New("topic partition mismatch")
	ErrTopicNotExist          = errors.New("topic does not exist")
)
View Source
var (
	ErrInvalidMessageID           = errors.New("message id is invalid")
	ErrWriteOffsetMismatch        = errors.New("write offset mismatch")
	ErrOperationInvalidState      = errors.New("the operation is not allowed under current state")
	ErrMessageInvalidDelayedState = errors.New("the message is invalid for delayed")
)
View Source
var (
	CompactThreshold = 1024 * 1024 * 16
)
View Source
var DEFAULT_RETENTION_DAYS = 3
View Source
var EnableDelayedQueue = int32(1)
View Source
var FileNumV2Seq = 999990

Functions

func GetQueueFileName

func GetQueueFileName(dataRoot string, base string, fileNum int64) string

func GetTopicFullName

func GetTopicFullName(topic string, part int) string

func GetTraceIDFromFullMsgID

func GetTraceIDFromFullMsgID(id FullMessageID) uint64

func IsValidDelayedMessage

func IsValidDelayedMessage(m *Message) bool

func MessageHeaderBytes

func MessageHeaderBytes() int

func NewBufioReader

func NewBufioReader(r io.Reader) *bufio.Reader

func NewDBMetaStorage

func NewDBMetaStorage(p string) (*dbMetaStorage, error)

func NewShardedDBMetaStorage

func NewShardedDBMetaStorage(p string) (*shardedDBMetaStorage, error)

func NsqLogger

func NsqLogger() *levellogger.LevelLogger

func PrintMessage

func PrintMessage(m *Message) string

func PrintMessageNoBody

func PrintMessageNoBody(m *Message) string

func PutBufioReader

func PutBufioReader(br *bufio.Reader)

func SetLogger

func SetLogger(log levellogger.Logger)

func SetRemoteMsgTracer

func SetRemoteMsgTracer(remote string)

Types

type BackendOffset

type BackendOffset int64

type BackendQueue

type BackendQueue interface {
	Put([]byte) error
	ReadChan() chan []byte // this is expected to be an *unbuffered* channel
	Close() error
	Delete() error
	Depth() int64
	Empty() error
}

BackendQueue represents the behavior for the secondary message storage system

type BackendQueueEnd

type BackendQueueEnd interface {
	Offset() BackendOffset
	TotalMsgCnt() int64
	IsSame(BackendQueueEnd) bool
}

type BackendQueueOffset

type BackendQueueOffset interface {
	Offset() BackendOffset
}

type BackendQueueReader

type BackendQueueReader interface {
	ConfirmRead(BackendOffset, int64) error
	ResetReadToConfirmed() (BackendQueueEnd, error)
	SkipReadToOffset(BackendOffset, int64) (BackendQueueEnd, error)
	SkipReadToEnd() (BackendQueueEnd, error)
	Close() error
	// left data to be read
	Depth() int64
	DepthSize() int64
	GetQueueReadEnd() BackendQueueEnd
	GetQueueConfirmed() BackendQueueEnd
	Delete() error
	UpdateQueueEnd(BackendQueueEnd, bool) (bool, error)
	TryReadOne() (ReadResult, bool)
}

for channel consumer

type BackendQueueWriter

type BackendQueueWriter interface {
	Put([]byte) (BackendOffset, int32, int64, error)
	Close() error
	Delete() error
	Empty() error
	Flush(bool) error
	GetQueueWriteEnd() BackendQueueEnd
	GetQueueReadStart() BackendQueueEnd
	GetQueueReadEnd() BackendQueueEnd
	RollbackWrite(BackendOffset, uint64) error
	ResetWriteEnd(BackendOffset, int64) error
}

for topic producer

func NewDiskQueueWriter

func NewDiskQueueWriter(name string, dataPath string, maxBytesPerFile int64,
	minMsgSize int32, maxMsgSize int32,
	syncEvery int64) (BackendQueueWriter, error)

func NewDiskQueueWriterForRead

func NewDiskQueueWriterForRead(name string, dataPath string, maxBytesPerFile int64,
	minMsgSize int32, maxMsgSize int32,
	syncEvery int64) (BackendQueueWriter, error)

type Channel

type Channel struct {
	sync.RWMutex

	// stat counters
	EnableTrace     int32
	EnableSlowTrace int32
	Ext             int32
	// contains filtered or unexported fields
}

Channel represents the concrete type for a NSQ channel (and also implements the Queue interface)

There can be multiple channels per topic, each with there own unique set of subscribers (clients).

Channels maintain all client and message metadata, orchestrating in-flight messages, timeouts, requeuing, etc.

func NewChannel

func NewChannel(topicName string, part int, topicOrdered bool, channelName string, chEnd BackendQueueEnd, opt *Options,
	deleteCallback func(*Channel), moreDataCallback func(*Channel), consumeDisabled int32,
	notify INsqdNotify, ext int32, queueStart BackendQueueEnd, metaStorage IMetaStorage) *Channel

NewChannel creates a new instance of the Channel type and returns a pointer

func (*Channel) AddClient

func (c *Channel) AddClient(clientID int64, client Consumer) error

func (*Channel) CleanWaitingRequeueChan

func (c *Channel) CleanWaitingRequeueChan(msg *Message)

if a message confirmed without goto inflight first, then we should clean the waiting state from requeue

func (*Channel) Close

func (c *Channel) Close() error

Close cleanly closes the Channel

func (*Channel) ConfirmBackendQueue

func (c *Channel) ConfirmBackendQueue(msg *Message) (BackendOffset, int64, bool)

in order not to make the confirm map too large, we need handle this case: a old message is not confirmed, and we keep all the newer confirmed messages so we can confirm later. indicated weather the confirmed offset is changed

func (*Channel) ConfirmBackendQueueOnSlave

func (c *Channel) ConfirmBackendQueueOnSlave(offset BackendOffset, cnt int64, allowBackward bool) error

func (*Channel) ConfirmDelayedMessage

func (c *Channel) ConfirmDelayedMessage(msg *Message) (BackendOffset, int64, bool)

func (*Channel) ContinueConsumeForOrder

func (c *Channel) ContinueConsumeForOrder()

func (*Channel) Delete

func (c *Channel) Delete() error

Delete empties the channel and closes

func (*Channel) Depth

func (c *Channel) Depth() int64

func (*Channel) DepthSize

func (c *Channel) DepthSize() int64

func (*Channel) DepthTimestamp

func (c *Channel) DepthTimestamp() int64

func (*Channel) DisableConsume

func (c *Channel) DisableConsume(disable bool)

func (*Channel) Exiting

func (c *Channel) Exiting() bool

Exiting returns a boolean indicating if this channel is closed/exiting

func (*Channel) FinishMessage

func (c *Channel) FinishMessage(clientID int64, clientAddr string,
	id MessageID) (BackendOffset, int64, bool, *Message, error)

func (*Channel) FinishMessageForce

func (c *Channel) FinishMessageForce(clientID int64, clientAddr string,
	id MessageID, forceFin bool) (BackendOffset, int64, bool, *Message, error)

func (*Channel) Flush

func (c *Channel) Flush(fsync bool) error

func (*Channel) GetChannelDebugStats

func (c *Channel) GetChannelDebugStats() string

func (*Channel) GetChannelEnd

func (c *Channel) GetChannelEnd() BackendQueueEnd

func (*Channel) GetChannelWaitingConfirmCnt

func (c *Channel) GetChannelWaitingConfirmCnt() int64

func (*Channel) GetClientMsgChan

func (c *Channel) GetClientMsgChan() chan *Message

func (*Channel) GetClientTagMsgChan

func (c *Channel) GetClientTagMsgChan(tag string) (chan *Message, bool)

* get active tag channel or default message channel from tag channel map

func (*Channel) GetClients

func (c *Channel) GetClients() map[int64]Consumer

func (*Channel) GetClientsCount

func (c *Channel) GetClientsCount() int

func (*Channel) GetConfirmed

func (c *Channel) GetConfirmed() BackendQueueEnd

func (*Channel) GetConfirmedInterval

func (c *Channel) GetConfirmedInterval() []MsgQueueInterval

func (*Channel) GetConfirmedIntervalLen

func (c *Channel) GetConfirmedIntervalLen() int

func (*Channel) GetDelayedQueue

func (c *Channel) GetDelayedQueue() *DelayQueue

func (*Channel) GetDelayedQueueConsumedDetails

func (c *Channel) GetDelayedQueueConsumedDetails() (int64, RecentKeyList, map[int]uint64, map[string]uint64)

func (*Channel) GetDelayedQueueConsumedState

func (c *Channel) GetDelayedQueueConsumedState() (int64, uint64)

func (*Channel) GetInflightNum

func (c *Channel) GetInflightNum() int

func (*Channel) GetMemDelayedMsgs

func (c *Channel) GetMemDelayedMsgs() []MessageID

func (*Channel) GetName

func (c *Channel) GetName() string

func (*Channel) GetOrCreateClientMsgChannel

func (c *Channel) GetOrCreateClientMsgChannel(tag string) chan *Message

get or create tag message chanel, invoked from protocol_v2.messagePump()

func (*Channel) GetTopicName

func (c *Channel) GetTopicName() string

func (*Channel) GetTopicPart

func (c *Channel) GetTopicPart() int

func (*Channel) IsConfirmed

func (c *Channel) IsConfirmed(msg *Message) bool

func (*Channel) IsConsumeDisabled

func (c *Channel) IsConsumeDisabled() bool

func (*Channel) IsEphemeral

func (c *Channel) IsEphemeral() bool

func (*Channel) IsExt

func (c *Channel) IsExt() bool

func (*Channel) IsOrdered

func (c *Channel) IsOrdered() bool

func (*Channel) IsPaused

func (c *Channel) IsPaused() bool

func (*Channel) IsSkipped

func (c *Channel) IsSkipped() bool

func (*Channel) IsSlowTraced

func (c *Channel) IsSlowTraced() bool

func (*Channel) IsTraced

func (c *Channel) IsTraced() bool

func (*Channel) IsWaitingMoreData

func (c *Channel) IsWaitingMoreData() bool

waiting more data is indicated all msgs are consumed and confirmed if some delayed message in channel, waiting more data is not true

func (*Channel) IsWaitingMoreDiskData

func (c *Channel) IsWaitingMoreDiskData() bool

waiting read more data from disk, this means the reader has reached end of queue. However, there may be some other messages in memory waiting confirm.

func (*Channel) IsZanTestSkipped

func (c *Channel) IsZanTestSkipped() bool

func (*Channel) Pause

func (c *Channel) Pause() error

func (*Channel) RemoveClient

func (c *Channel) RemoveClient(clientID int64, clientTag string)

RemoveClient removes a client from the Channel's client list

func (*Channel) RemoveTagClientMsgChannel

func (c *Channel) RemoveTagClientMsgChannel(tag string)

func (*Channel) RequeueClientMessages

func (c *Channel) RequeueClientMessages(clientID int64, clientAddr string)

func (*Channel) RequeueMessage

func (c *Channel) RequeueMessage(clientID int64, clientAddr string, id MessageID, timeout time.Duration, byClient bool) error

RequeueMessage requeues a message based on `time.Duration`, ie:

`timeoutMs` == 0 - requeue a message immediately `timeoutMs` > 0 - asynchronously wait for the specified timeout

and requeue a message

func (*Channel) SetConsumeOffset

func (c *Channel) SetConsumeOffset(offset BackendOffset, cnt int64, force bool) error

func (*Channel) SetDelayedQueue

func (c *Channel) SetDelayedQueue(dq *DelayQueue)

func (*Channel) SetExt

func (c *Channel) SetExt(isExt bool)

func (*Channel) SetOrdered

func (c *Channel) SetOrdered(enable bool)

func (*Channel) SetSlowTrace

func (c *Channel) SetSlowTrace(enable bool)

func (*Channel) SetTrace

func (c *Channel) SetTrace(enable bool)

func (*Channel) ShouldRequeueToEnd

func (c *Channel) ShouldRequeueToEnd(clientID int64, clientAddr string, id MessageID,
	timeout time.Duration, byClient bool) (*Message, bool)

func (*Channel) ShouldWaitDelayed

func (c *Channel) ShouldWaitDelayed(msg *Message) bool

func (*Channel) Skip

func (c *Channel) Skip() error

func (*Channel) SkipZanTest

func (c *Channel) SkipZanTest() error

func (*Channel) StartInFlightTimeout

func (c *Channel) StartInFlightTimeout(msg *Message, client Consumer, clientAddr string, timeout time.Duration) (bool, error)

func (*Channel) TouchMessage

func (c *Channel) TouchMessage(clientID int64, id MessageID, clientMsgTimeout time.Duration) error

TouchMessage resets the timeout for an in-flight message

func (*Channel) TryRefreshChannelEnd

func (c *Channel) TryRefreshChannelEnd()

if some message is skipped, we should try refresh channel end to get more possible new messages, since the end will only be updated when new message come in first time

func (*Channel) TryWakeupRead

func (c *Channel) TryWakeupRead()

func (*Channel) UnPause

func (c *Channel) UnPause() error

func (*Channel) UnSkip

func (c *Channel) UnSkip() error

func (*Channel) UnskipZanTest

func (c *Channel) UnskipZanTest() error

func (*Channel) UpdateConfirmedInterval

func (c *Channel) UpdateConfirmedInterval(intervals []MsgQueueInterval)

func (*Channel) UpdateQueueEnd

func (c *Channel) UpdateQueueEnd(end BackendQueueEnd, forceReload bool) error

When topic message is put, update the new end of the queue

type ChannelMetaInfo

type ChannelMetaInfo struct {
	Name           string `json:"name"`
	Paused         bool   `json:"paused"`
	Skipped        bool   `json:"skipped"`
	ZanTestSkipped bool   `json:"zanTestSkipped"`
}

func (*ChannelMetaInfo) IsZanTestSkipepd

func (cm *ChannelMetaInfo) IsZanTestSkipepd() bool

type ChannelStats

type ChannelStats struct {
	ChannelName string `json:"channel_name"`
	// message size need to consume
	Depth          int64  `json:"depth"`
	DepthSize      int64  `json:"depth_size"`
	DepthTimestamp string `json:"depth_ts"`
	BackendDepth   int64  `json:"backend_depth"`
	// total size sub past hour on this channel
	HourlySubSize  int64         `json:"hourly_subsize"`
	InFlightCount  int           `json:"in_flight_count"`
	DeferredCount  int           `json:"deferred_count"`
	MessageCount   uint64        `json:"message_count"`
	RequeueCount   uint64        `json:"requeue_count"`
	TimeoutCount   uint64        `json:"timeout_count"`
	Clients        []ClientStats `json:"clients"`
	ClientNum      int64         `json:"client_num"`
	Paused         bool          `json:"paused"`
	Skipped        bool          `json:"skipped"`
	ZanTestSkipped bool          `json:"zan_test_skipped"`

	DelayedQueueCount  uint64 `json:"delayed_queue_count"`
	DelayedQueueRecent string `json:"delayed_queue_recent"`

	E2eProcessingLatency    *quantile.Result `json:"e2e_processing_latency"`
	MsgConsumeLatencyStats  []int64          `json:"msg_consume_latency_stats"`
	MsgDeliveryLatencyStats []int64          `json:"msg_delivery_latency_stats"`
}

func NewChannelStats

func NewChannelStats(c *Channel, clients []ClientStats, clientNum int) ChannelStats

type ChannelStatsInfo

type ChannelStatsInfo struct {
	// 16ms, 32ms, 64ms, 128ms, 256ms, 512ms, 1024ms, 2048ms, 4s, 8s, 16s, above
	MsgConsumeLatencyStats  [12]int64
	MsgDeliveryLatencyStats [12]int64
}

func (*ChannelStatsInfo) GetChannelLatencyStats

func (self *ChannelStatsInfo) GetChannelLatencyStats() []int64

func (*ChannelStatsInfo) GetDeliveryLatencyStats

func (self *ChannelStatsInfo) GetDeliveryLatencyStats() []int64

func (*ChannelStatsInfo) UpdateChannelLatencyStats

func (self *ChannelStatsInfo) UpdateChannelLatencyStats(latencyInMillSec int64)

update message consume latency distribution in millisecond

func (*ChannelStatsInfo) UpdateChannelStats

func (self *ChannelStatsInfo) UpdateChannelStats(latencyInMillSec int64)

func (*ChannelStatsInfo) UpdateDelivery2ACKLatencyStats

func (self *ChannelStatsInfo) UpdateDelivery2ACKLatencyStats(latencyInMillSec int64)

update message consume latency distribution in millisecond

func (*ChannelStatsInfo) UpdateDelivery2ACKStats

func (self *ChannelStatsInfo) UpdateDelivery2ACKStats(latencyInMillSec int64)

type Channels

type Channels []*Channel

func (Channels) Len

func (c Channels) Len() int

func (Channels) Swap

func (c Channels) Swap(i, j int)

type ChannelsByName

type ChannelsByName struct {
	Channels
}

func (ChannelsByName) Less

func (c ChannelsByName) Less(i, j int) bool

type ClientPubStats

type ClientPubStats struct {
	RemoteAddress string `json:"remote_address"`
	UserAgent     string `json:"user_agent"`
	Protocol      string `json:"protocol"`
	PubCount      int64  `json:"pub_count"`
	ErrCount      int64  `json:"err_count"`
	LastPubTs     int64  `json:"last_pub_ts"`
}

func (*ClientPubStats) IncrCounter

func (cps *ClientPubStats) IncrCounter(count int64, hasErr bool)

type ClientStats

type ClientStats struct {
	// TODO: deprecated, remove in 1.0
	Name string `json:"name"`

	ClientID        string `json:"client_id"`
	Hostname        string `json:"hostname"`
	Version         string `json:"version"`
	RemoteAddress   string `json:"remote_address"`
	State           int32  `json:"state"`
	ReadyCount      int64  `json:"ready_count"`
	InFlightCount   int64  `json:"in_flight_count"`
	MessageCount    uint64 `json:"message_count"`
	FinishCount     uint64 `json:"finish_count"`
	RequeueCount    uint64 `json:"requeue_count"`
	TimeoutCount    int64  `json:"timeout_count"`
	DeferredCount   int64  `json:"deferred_count"`
	ConnectTime     int64  `json:"connect_ts"`
	SampleRate      int32  `json:"sample_rate"`
	Deflate         bool   `json:"deflate"`
	Snappy          bool   `json:"snappy"`
	UserAgent       string `json:"user_agent"`
	Authed          bool   `json:"authed,omitempty"`
	AuthIdentity    string `json:"auth_identity,omitempty"`
	AuthIdentityURL string `json:"auth_identity_url,omitempty"`
	DesiredTag      string `json:"desired_tag"`

	TLS                           bool   `json:"tls"`
	CipherSuite                   string `json:"tls_cipher_suite"`
	TLSVersion                    string `json:"tls_version"`
	TLSNegotiatedProtocol         string `json:"tls_negotiated_protocol"`
	TLSNegotiatedProtocolIsMutual bool   `json:"tls_negotiated_protocol_is_mutual"`
}

type ClientV2

type ClientV2 struct {
	// 64bit atomic vars need to be first for proper alignment on 32bit platforms
	ReadyCount    int64
	InFlightCount int64
	MessageCount  uint64
	FinishCount   uint64
	RequeueCount  uint64
	TimeoutCount  uint64

	ID int64

	UserAgent string

	// original connection
	net.Conn

	// reading/writing interfaces
	Reader *bufio.Reader
	Writer *bufio.Writer

	State          int32
	ConnectTime    time.Time
	Channel        *Channel
	ReadyStateChan chan int
	// this is only used by notify messagebump to quit
	// and should be closed by the read loop only
	ExitChan chan int

	ClientID string
	Hostname string

	SampleRate int32

	IdentifyEventChan chan identifyEvent
	SubEventChan      chan *Channel

	TLS     int32
	Snappy  int32
	Deflate int32

	LenSlice []byte

	AuthSecret string
	AuthState  *auth.State

	EnableTrace bool

	PubTimeout *time.Timer

	TagMsgChannel chan *Message

	PubStats *ClientPubStats
	// contains filtered or unexported fields
}

func NewClientV2

func NewClientV2(id int64, conn net.Conn, opts *Options, tls *tls.Config) *ClientV2

func (*ClientV2) Auth

func (c *ClientV2) Auth(secret string) error

func (*ClientV2) Empty

func (c *ClientV2) Empty()

func (*ClientV2) Exit

func (c *ClientV2) Exit()

func (*ClientV2) ExtendSupport

func (c *ClientV2) ExtendSupport() bool

func (*ClientV2) FinalClose

func (c *ClientV2) FinalClose()

func (*ClientV2) FinishedMessage

func (c *ClientV2) FinishedMessage()

func (*ClientV2) Flush

func (c *ClientV2) Flush() error

func (*ClientV2) GetDesiredTag

func (c *ClientV2) GetDesiredTag() string

func (*ClientV2) GetHeartbeatInterval

func (c *ClientV2) GetHeartbeatInterval() time.Duration

func (*ClientV2) GetID

func (c *ClientV2) GetID() int64

func (*ClientV2) GetMsgTimeout

func (c *ClientV2) GetMsgTimeout() time.Duration

func (*ClientV2) GetOutputBufferSize

func (c *ClientV2) GetOutputBufferSize() int64

func (*ClientV2) GetOutputBufferTimeout

func (c *ClientV2) GetOutputBufferTimeout() time.Duration

func (*ClientV2) GetTagMsgChannel

func (c *ClientV2) GetTagMsgChannel() chan *Message

func (*ClientV2) HasAuthorizations

func (c *ClientV2) HasAuthorizations() bool

func (*ClientV2) Identify

func (c *ClientV2) Identify(data IdentifyDataV2) error

func (*ClientV2) IncrSubError

func (c *ClientV2) IncrSubError(delta int64)

func (*ClientV2) IsAuthorized

func (c *ClientV2) IsAuthorized(topic, channel string) (bool, error)

func (*ClientV2) IsReadyForMessages

func (c *ClientV2) IsReadyForMessages() bool

func (*ClientV2) LockWrite

func (c *ClientV2) LockWrite()

func (*ClientV2) Pause

func (c *ClientV2) Pause()

func (*ClientV2) QueryAuthd

func (c *ClientV2) QueryAuthd() error

func (*ClientV2) RequeuedMessage

func (c *ClientV2) RequeuedMessage()

func (*ClientV2) SendingMessage

func (c *ClientV2) SendingMessage()

func (*ClientV2) SetDesiredTag

func (c *ClientV2) SetDesiredTag(tagStr string) error

func (*ClientV2) SetExtFilter

func (c *ClientV2) SetExtFilter(filter ExtFilterData)

func (*ClientV2) SetExtendSupport

func (c *ClientV2) SetExtendSupport()

func (*ClientV2) SetHeartbeatInterval

func (c *ClientV2) SetHeartbeatInterval(desiredInterval int) error

func (*ClientV2) SetMsgTimeout

func (c *ClientV2) SetMsgTimeout(msgTimeout int) error

func (*ClientV2) SetOutputBufferSize

func (c *ClientV2) SetOutputBufferSize(desiredSize int) error

func (*ClientV2) SetOutputBufferTimeout

func (c *ClientV2) SetOutputBufferTimeout(desiredTimeout int) error

func (*ClientV2) SetReadyCount

func (c *ClientV2) SetReadyCount(count int64)

func (*ClientV2) SetSampleRate

func (c *ClientV2) SetSampleRate(sampleRate int32) error

func (*ClientV2) SetTagMsgChannel

func (c *ClientV2) SetTagMsgChannel(tagMsgChan chan *Message) error

since only used in messagePump loop, no lock needed

func (*ClientV2) SkipZanTest

func (c *ClientV2) SkipZanTest()

func (*ClientV2) StartClose

func (c *ClientV2) StartClose()

func (*ClientV2) Stats

func (c *ClientV2) Stats() ClientStats

func (*ClientV2) String

func (c *ClientV2) String() string

func (*ClientV2) SwitchToConsumer

func (c *ClientV2) SwitchToConsumer(isEphemeral bool) error

func (*ClientV2) TimedOutMessage

func (c *ClientV2) TimedOutMessage()

func (*ClientV2) UnPause

func (c *ClientV2) UnPause()

func (*ClientV2) UnlockWrite

func (c *ClientV2) UnlockWrite()

func (*ClientV2) UnsetDesiredTag

func (c *ClientV2) UnsetDesiredTag()

func (*ClientV2) UnskipZanTest

func (c *ClientV2) UnskipZanTest()

func (*ClientV2) UpgradeDeflate

func (c *ClientV2) UpgradeDeflate(level int) error

func (*ClientV2) UpgradeSnappy

func (c *ClientV2) UpgradeSnappy() error

func (*ClientV2) UpgradeTLS

func (c *ClientV2) UpgradeTLS() error

type Consumer

type Consumer interface {
	SkipZanTest()
	UnskipZanTest()
	UnPause()
	Pause()
	TimedOutMessage()
	RequeuedMessage()
	FinishedMessage()
	Stats() ClientStats
	Exit()
	Empty()
	String() string
	GetID() int64
}

type DelayQueue

type DelayQueue struct {
	EnableTrace int32
	SyncEvery   int64
	// contains filtered or unexported fields
}

func NewDelayQueue

func NewDelayQueue(topicName string, part int, dataPath string, opt *Options,
	idGen MsgIDGenerator, isExt bool) (*DelayQueue, error)

func NewDelayQueueForRead

func NewDelayQueueForRead(topicName string, part int, dataPath string, opt *Options,
	idGen MsgIDGenerator, isExt bool) (*DelayQueue, error)

func (*DelayQueue) BackupKVStoreTo

func (q *DelayQueue) BackupKVStoreTo(w io.Writer) (int64, error)

func (*DelayQueue) CheckConsistence

func (q *DelayQueue) CheckConsistence() error

func (*DelayQueue) Close

func (q *DelayQueue) Close() error

func (*DelayQueue) ConfirmedMessage

func (q *DelayQueue) ConfirmedMessage(msg *Message) error

func (*DelayQueue) Delete

func (q *DelayQueue) Delete() error

func (*DelayQueue) EmptyDelayedChannel

func (q *DelayQueue) EmptyDelayedChannel(ch string) error

func (*DelayQueue) EmptyDelayedType

func (q *DelayQueue) EmptyDelayedType(dt int) error

func (*DelayQueue) ForceFlush

func (q *DelayQueue) ForceFlush()

func (*DelayQueue) GetChangedTs

func (q *DelayQueue) GetChangedTs() int64

func (*DelayQueue) GetCurrentDelayedCnt

func (q *DelayQueue) GetCurrentDelayedCnt(dt int, channel string) (uint64, error)

func (*DelayQueue) GetDBSize

func (q *DelayQueue) GetDBSize() (int64, error)

func (*DelayQueue) GetDiskQueueSnapshot

func (q *DelayQueue) GetDiskQueueSnapshot() *DiskQueueSnapshot

func (*DelayQueue) GetFullName

func (q *DelayQueue) GetFullName() string

func (*DelayQueue) GetOldestConsumedState

func (q *DelayQueue) GetOldestConsumedState(chList []string, includeOthers bool) (RecentKeyList, map[int]uint64, map[string]uint64)

func (*DelayQueue) GetSyncedOffset

func (q *DelayQueue) GetSyncedOffset() (BackendOffset, error)

func (*DelayQueue) GetTopicName

func (q *DelayQueue) GetTopicName() string

func (*DelayQueue) GetTopicPart

func (q *DelayQueue) GetTopicPart() int

func (*DelayQueue) IsChannelMessageDelayed

func (q *DelayQueue) IsChannelMessageDelayed(msgID MessageID, ch string) bool

func (*DelayQueue) IsDataNeedFix

func (q *DelayQueue) IsDataNeedFix() bool

func (*DelayQueue) IsExt

func (q *DelayQueue) IsExt() bool

func (*DelayQueue) PeekAll

func (q *DelayQueue) PeekAll(results []Message) (int, error)

func (*DelayQueue) PeekRecentChannelTimeout

func (q *DelayQueue) PeekRecentChannelTimeout(now int64, results []Message, ch string) (int, error)

func (*DelayQueue) PeekRecentDelayedPub

func (q *DelayQueue) PeekRecentDelayedPub(now int64, results []Message) (int, error)

func (*DelayQueue) PeekRecentTimeoutWithFilter

func (q *DelayQueue) PeekRecentTimeoutWithFilter(results []Message, peekTs int64, filterType int,
	filterChannel string) (int, error)

func (*DelayQueue) PutDelayMessage

func (q *DelayQueue) PutDelayMessage(m *Message) (MessageID, BackendOffset, int32, BackendQueueEnd, error)

func (*DelayQueue) PutMessageOnReplica

func (q *DelayQueue) PutMessageOnReplica(m *Message, offset BackendOffset, checkSize int64) (BackendQueueEnd, error)

func (*DelayQueue) PutRawDataOnReplica

func (q *DelayQueue) PutRawDataOnReplica(rawData []byte, offset BackendOffset, checkSize int64, msgNum int32) (BackendQueueEnd, error)

func (*DelayQueue) ResetBackendEndNoLock

func (q *DelayQueue) ResetBackendEndNoLock(vend BackendOffset, totalCnt int64) error

func (*DelayQueue) ResetBackendWithQueueStartNoLock

func (q *DelayQueue) ResetBackendWithQueueStartNoLock(queueStartOffset int64, queueStartCnt int64) error

func (*DelayQueue) RestoreKVStoreFrom

func (q *DelayQueue) RestoreKVStoreFrom(body io.Reader) error

func (*DelayQueue) RollbackNoLock

func (q *DelayQueue) RollbackNoLock(vend BackendOffset, diffCnt uint64) error

func (*DelayQueue) SetDataFixState

func (q *DelayQueue) SetDataFixState(needFix bool)

func (*DelayQueue) SetTrace

func (q *DelayQueue) SetTrace(enable bool)

func (*DelayQueue) Stats

func (q *DelayQueue) Stats() string

func (*DelayQueue) TotalDataSize

func (q *DelayQueue) TotalDataSize() int64

func (*DelayQueue) TotalMessageCnt

func (q *DelayQueue) TotalMessageCnt() uint64

func (*DelayQueue) TryCleanOldData

func (q *DelayQueue) TryCleanOldData(retentionSize int64, noRealClean bool, maxCleanOffset BackendOffset) (BackendQueueEnd, error)

func (*DelayQueue) UpdateConsumedState

func (q *DelayQueue) UpdateConsumedState(ts int64, keyList RecentKeyList, cntList map[int]uint64, channelCntList map[string]uint64) error

type DetailStatsInfo

type DetailStatsInfo struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewDetailStatsInfo

func NewDetailStatsInfo(initPubSize int64, historyPath string) *DetailStatsInfo

func (*DetailStatsInfo) BatchUpdateTopicLatencyStats

func (self *DetailStatsInfo) BatchUpdateTopicLatencyStats(latency int64, num int64)

func (*DetailStatsInfo) GetHourlyStats

func (self *DetailStatsInfo) GetHourlyStats() [24]int64

func (*DetailStatsInfo) GetMsgSizeStats

func (self *DetailStatsInfo) GetMsgSizeStats() []int64

func (*DetailStatsInfo) GetMsgWriteLatencyStats

func (self *DetailStatsInfo) GetMsgWriteLatencyStats() []int64

func (*DetailStatsInfo) GetPubClientStats

func (self *DetailStatsInfo) GetPubClientStats() []ClientPubStats

func (*DetailStatsInfo) InitPubClientStats

func (self *DetailStatsInfo) InitPubClientStats(remote string, agent string, protocol string) *ClientPubStats

func (*DetailStatsInfo) LoadHistory

func (self *DetailStatsInfo) LoadHistory(fileName string) error

func (*DetailStatsInfo) RemovePubStats

func (self *DetailStatsInfo) RemovePubStats(remote string, protocol string)

func (*DetailStatsInfo) ResetHistoryInitPub

func (self *DetailStatsInfo) ResetHistoryInitPub(msgSize int64)

func (*DetailStatsInfo) SaveHistory

func (self *DetailStatsInfo) SaveHistory(fileName string) error

func (*DetailStatsInfo) UpdateHistory

func (self *DetailStatsInfo) UpdateHistory(historyList [24]int64)

func (*DetailStatsInfo) UpdateTopicMsgStats

func (self *DetailStatsInfo) UpdateTopicMsgStats(msgSize int64, latency int64)

type DiskQueueSnapshot

type DiskQueueSnapshot struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

note: the message count info is not kept in snapshot

func NewDiskQueueSnapshot

func NewDiskQueueSnapshot(readFrom string, dataPath string, endInfo BackendQueueEnd) *DiskQueueSnapshot

newDiskQueue instantiates a new instance of DiskQueueSnapshot, retrieving metadata from the filesystem and starting the read ahead goroutine

func (*DiskQueueSnapshot) Close

func (d *DiskQueueSnapshot) Close() error

Close cleans up the queue and persists metadata

func (*DiskQueueSnapshot) GetCurrentReadQueueOffset

func (d *DiskQueueSnapshot) GetCurrentReadQueueOffset() BackendQueueOffset

func (*DiskQueueSnapshot) GetQueueReadStart

func (d *DiskQueueSnapshot) GetQueueReadStart() BackendQueueEnd

func (*DiskQueueSnapshot) ReadOne

func (d *DiskQueueSnapshot) ReadOne() ReadResult

readOne performs a low level filesystem read for a single []byte while advancing read positions and rolling files, if necessary

func (*DiskQueueSnapshot) ReadRaw

func (d *DiskQueueSnapshot) ReadRaw(size int32) ([]byte, error)

func (*DiskQueueSnapshot) ResetSeekTo

func (d *DiskQueueSnapshot) ResetSeekTo(voffset BackendOffset, cnt int64) error

this can allow backward seek

func (*DiskQueueSnapshot) SeekTo

func (d *DiskQueueSnapshot) SeekTo(voffset BackendOffset, cnt int64) error

func (*DiskQueueSnapshot) SeekToEnd

func (d *DiskQueueSnapshot) SeekToEnd() error

func (*DiskQueueSnapshot) SetQueueStart

func (d *DiskQueueSnapshot) SetQueueStart(start BackendQueueEnd)

func (*DiskQueueSnapshot) SkipToNext

func (d *DiskQueueSnapshot) SkipToNext() error

func (*DiskQueueSnapshot) UpdateQueueEnd

func (d *DiskQueueSnapshot) UpdateQueueEnd(e BackendQueueEnd)

Put writes a []byte to the queue

type ExtFilterData

type ExtFilterData struct {
	Type           int               `json:"type,omitempty"`
	Inverse        bool              `json:"inverse,omitempty"`
	FilterExtKey   string            `json:"filter_ext_key,omitempty"`
	FilterData     string            `json:"filter_data,omitempty"`
	FilterDataList []MultiFilterData `json:"filter_data_list,omitempty"`
}

type FullMessageID

type FullMessageID [MsgIDLength]byte

the new message total id will be ID+TraceID, the length is same with old id slice, the traceid only used for trace for business, the ID is used for internal. In order to be compatible with old format, we keep the attempts field.

type IExtFilter

type IExtFilter interface {
	Match(msg *Message) bool
}

func NewExtFilter

func NewExtFilter(filter ExtFilterData) (IExtFilter, error)

type IMetaStorage

type IMetaStorage interface {
	PersistReader(key string, fsync bool, confirmed diskQueueEndInfo, queueEndInfo diskQueueEndInfo) error
	RetrieveReader(key string) (diskQueueEndInfo, diskQueueEndInfo, error)
	Remove(key string)
	PersistWriter(key string, fsync bool, wend diskQueueEndInfo) error
	RetrieveWriter(key string) (diskQueueEndInfo, error)
	Sync()
	Close()
}

type IMsgTracer

type IMsgTracer interface {
	Start()
	TracePub(topic string, part int, pubMethod string, traceID uint64, msg *Message, diskOffset BackendOffset, currentCnt int64)
	TracePubClient(topic string, part int, traceID uint64, msgID MessageID, diskOffset BackendOffset, clientID string)
	// state will be READ_QUEUE, Start, Req, Fin, Timeout
	TraceSub(topic string, channel string, state string, traceID uint64, msg *Message, clientID string, cost int64)
	IsRemote() bool
}

func GetMsgTracer

func GetMsgTracer() IMsgTracer

func NewRemoteMsgTracer

func NewRemoteMsgTracer(remote string) IMsgTracer

type INsqdNotify

type INsqdNotify interface {
	NotifyDeleteTopic(*Topic)
	NotifyStateChanged(v interface{}, needPersist bool)
	ReqToEnd(*Channel, *Message, time.Duration) error
	NotifyScanDelayed(*Channel)
	PushTopicJob(*Topic, func())
}

type IdentifyDataV2

type IdentifyDataV2 struct {
	ShortID string `json:"short_id"` // TODO: deprecated, remove in 1.0
	LongID  string `json:"long_id"`  // TODO: deprecated, remove in 1.0

	ClientID            string        `json:"client_id"`
	Hostname            string        `json:"hostname"`
	HeartbeatInterval   int           `json:"heartbeat_interval"`
	OutputBufferSize    int           `json:"output_buffer_size"`
	OutputBufferTimeout int           `json:"output_buffer_timeout"`
	FeatureNegotiation  bool          `json:"feature_negotiation"`
	TLSv1               bool          `json:"tls_v1"`
	Deflate             bool          `json:"deflate"`
	DeflateLevel        int           `json:"deflate_level"`
	Snappy              bool          `json:"snappy"`
	SampleRate          int32         `json:"sample_rate"`
	UserAgent           string        `json:"user_agent"`
	MsgTimeout          int           `json:"msg_timeout"`
	DesiredTag          string        `json:"desired_tag,omitempty"`
	ExtendSupport       bool          `json:"extend_support"`
	ExtFilter           ExtFilterData `json:"ext_filter"`
}

type IntervalHash

type IntervalHash struct {
	// contains filtered or unexported fields
}

func NewIntervalHash

func NewIntervalHash() *IntervalHash

func (*IntervalHash) AddOrMerge

func (self *IntervalHash) AddOrMerge(inter QueueInterval) QueueInterval

return the merged interval, if no overlap just return the original

func (*IntervalHash) DeleteInterval

func (self *IntervalHash) DeleteInterval(inter QueueInterval)

func (*IntervalHash) DeleteLower

func (self *IntervalHash) DeleteLower(low int64) int

func (*IntervalHash) DeleteRange

func (self *IntervalHash) DeleteRange(inter QueueInterval)

func (*IntervalHash) IsCompleteOverlap

func (self *IntervalHash) IsCompleteOverlap(inter QueueInterval) bool

func (*IntervalHash) IsLowestAt

func (self *IntervalHash) IsLowestAt(low int64) QueueInterval

func (*IntervalHash) Len

func (self *IntervalHash) Len() int

func (*IntervalHash) Query

func (self *IntervalHash) Query(inter QueueInterval, excludeBoard bool) []QueueInterval

func (*IntervalHash) QueryExist

func (self *IntervalHash) QueryExist(inter QueueInterval, excludeBoard bool) []QueueInterval

func (*IntervalHash) ToIntervalList

func (self *IntervalHash) ToIntervalList() []MsgQueueInterval

func (*IntervalHash) ToString

func (self *IntervalHash) ToString() string

type IntervalSkipList

type IntervalSkipList struct {
	// contains filtered or unexported fields
}

func NewIntervalSkipList

func NewIntervalSkipList() *IntervalSkipList

func (*IntervalSkipList) AddOrMerge

func (self *IntervalSkipList) AddOrMerge(inter QueueInterval) QueueInterval

return the merged interval, if no overlap just return the original

func (*IntervalSkipList) DeleteInterval

func (self *IntervalSkipList) DeleteInterval(inter QueueInterval)

func (*IntervalSkipList) DeleteLower

func (self *IntervalSkipList) DeleteLower(low int64) int

func (*IntervalSkipList) DeleteRange

func (self *IntervalSkipList) DeleteRange(inter QueueInterval)

func (*IntervalSkipList) IsCompleteOverlap

func (self *IntervalSkipList) IsCompleteOverlap(inter QueueInterval) bool

func (*IntervalSkipList) IsLowestAt

func (self *IntervalSkipList) IsLowestAt(low int64) QueueInterval

func (*IntervalSkipList) Len

func (self *IntervalSkipList) Len() int

func (*IntervalSkipList) Query

func (self *IntervalSkipList) Query(inter QueueInterval, excludeBoard bool) []QueueInterval

func (*IntervalSkipList) ToIntervalList

func (self *IntervalSkipList) ToIntervalList() []MsgQueueInterval

func (*IntervalSkipList) ToString

func (self *IntervalSkipList) ToString() string

type IntervalTree

type IntervalTree struct {
	// contains filtered or unexported fields
}

func NewIntervalTree

func NewIntervalTree() *IntervalTree

func (*IntervalTree) AddOrMerge

func (self *IntervalTree) AddOrMerge(inter QueueInterval) QueueInterval

return the merged interval, if no overlap just return the original

func (*IntervalTree) DeleteInterval

func (self *IntervalTree) DeleteInterval(inter QueueInterval)

func (*IntervalTree) DeleteLower

func (self *IntervalTree) DeleteLower(low int64) int

func (*IntervalTree) DeleteRange

func (self *IntervalTree) DeleteRange(inter QueueInterval)

func (*IntervalTree) IsCompleteOverlap

func (self *IntervalTree) IsCompleteOverlap(inter QueueInterval) bool

func (*IntervalTree) IsLowestAt

func (self *IntervalTree) IsLowestAt(low int64) QueueInterval

func (*IntervalTree) Len

func (self *IntervalTree) Len() int

func (*IntervalTree) Query

func (self *IntervalTree) Query(inter QueueInterval, excludeBoard bool) []QueueInterval

func (*IntervalTree) ToIntervalList

func (self *IntervalTree) ToIntervalList() []MsgQueueInterval

func (*IntervalTree) ToString

func (self *IntervalTree) ToString() string

type LogMsgTracer

type LogMsgTracer struct {
	MID string
}

just print the trace log

func (*LogMsgTracer) IsRemote

func (self *LogMsgTracer) IsRemote() bool

func (*LogMsgTracer) Start

func (self *LogMsgTracer) Start()

func (*LogMsgTracer) TracePub

func (self *LogMsgTracer) TracePub(topic string, part int, pubMethod string, traceID uint64, msg *Message, diskOffset BackendOffset, currentCnt int64)

func (*LogMsgTracer) TracePubClient

func (self *LogMsgTracer) TracePubClient(topic string, part int, traceID uint64, msgID MessageID, diskOffset BackendOffset, clientID string)

func (*LogMsgTracer) TraceSub

func (self *LogMsgTracer) TraceSub(topic string, channel string, state string, traceID uint64, msg *Message, clientID string, cost int64)

type Message added in v0.2.29

type Message struct {
	ID        MessageID
	TraceID   uint64
	Body      []byte
	Timestamp int64
	Attempts  uint16
	ExtBytes  []byte
	ExtVer    ext.ExtVer

	//for backend queue
	Offset      BackendOffset
	RawMoveSize BackendOffset

	// for delayed queue message
	// 1 - delayed message by channel
	// 2 - delayed pub
	//
	DelayedType    int32
	DelayedTs      int64
	DelayedOrigID  MessageID
	DelayedChannel string
	// will be used for delayed pub. (json data to tell different type of delay)
	DelayedData []byte
	// contains filtered or unexported fields
}

func DecodeDelayedMessage

func DecodeDelayedMessage(b []byte, isExt bool) (*Message, error)

func DecodeMessage

func DecodeMessage(b []byte, ext bool) (*Message, error)

func NewMessage added in v0.2.29

func NewMessage(id MessageID, body []byte) *Message

func NewMessageWithExt

func NewMessageWithExt(id MessageID, body []byte, extVer ext.ExtVer, extBytes []byte) *Message

func NewMessageWithTs

func NewMessageWithTs(id MessageID, body []byte, ts int64) *Message

func (*Message) GetClientID

func (m *Message) GetClientID() int64

func (*Message) GetCopy

func (m *Message) GetCopy() *Message

func (*Message) GetFullMsgID

func (m *Message) GetFullMsgID() FullMessageID

func (*Message) IsDeferred

func (m *Message) IsDeferred() bool

func (*Message) WriteDelayedTo

func (m *Message) WriteDelayedTo(w io.Writer, writeExt bool) (int64, error)

func (*Message) WriteTo added in v0.2.29

func (m *Message) WriteTo(w io.Writer, writeExt bool) (int64, error)

func (*Message) WriteToClient

func (m *Message) WriteToClient(w io.Writer, writeExt bool, writeDetail bool) (int64, error)

type MessageID added in v0.2.29

type MessageID uint64

func GetMessageIDFromFullMsgID

func GetMessageIDFromFullMsgID(id FullMessageID) MessageID

type MsgChanData

type MsgChanData struct {
	MsgChan   chan *Message
	ClientCnt int64
}

type MsgIDGenerator

type MsgIDGenerator interface {
	NextID() uint64
	Reset(uint64)
}

type MsgQueueInterval

type MsgQueueInterval struct {
	Start  int64
	End    int64
	EndCnt uint64
}

type MultiFilterData

type MultiFilterData struct {
	FilterExtKey string `json:"filter_ext_key,omitempty"`
	FilterData   string `json:"filter_data,omitempty"`
}

type NSQD

type NSQD struct {
	sync.RWMutex

	MetaNotifyChan       chan interface{}
	OptsNotificationChan chan struct{}
	// contains filtered or unexported fields
}

func New added in v0.3.6

func New(opts *Options) *NSQD

func (*NSQD) CheckMagicCode

func (n *NSQD) CheckMagicCode(name string, partition int, code int64, tryFix bool) (string, error)

func (*NSQD) CleanClientPubStats

func (n *NSQD) CleanClientPubStats(remote string, protocol string)

func (*NSQD) CloseExistingTopic

func (n *NSQD) CloseExistingTopic(topicName string, partition int) error

this just close the topic and remove from map, but keep the data for later.

func (*NSQD) DeleteExistingTopic

func (n *NSQD) DeleteExistingTopic(topicName string, part int) error

DeleteExistingTopic removes a topic only if it exists

func (*NSQD) Exit

func (n *NSQD) Exit()

func (*NSQD) ForceDeleteTopicData

func (n *NSQD) ForceDeleteTopicData(name string, partition int) error

func (*NSQD) GetError added in v0.2.31

func (n *NSQD) GetError() error

func (*NSQD) GetExistingTopic

func (n *NSQD) GetExistingTopic(topicName string, part int) (*Topic, error)

GetExistingTopic gets a topic only if it exists

func (*NSQD) GetHealth added in v0.2.31

func (n *NSQD) GetHealth() string

func (*NSQD) GetOpts

func (n *NSQD) GetOpts() *Options

func (*NSQD) GetStartTime added in v0.3.3

func (n *NSQD) GetStartTime() time.Time

func (*NSQD) GetStats

func (n *NSQD) GetStats(leaderOnly bool, filterClients bool) []TopicStats

func (*NSQD) GetTopic

func (n *NSQD) GetTopic(topicName string, part int, ordered bool) *Topic

GetTopic performs a thread safe operation to return a pointer to a Topic object (potentially new)

func (*NSQD) GetTopicDefaultPart

func (n *NSQD) GetTopicDefaultPart(topicName string) int

func (*NSQD) GetTopicIgnPart

func (n *NSQD) GetTopicIgnPart(topicName string) *Topic

func (*NSQD) GetTopicMapCopy

func (n *NSQD) GetTopicMapCopy() []*Topic

func (*NSQD) GetTopicMapRef

func (n *NSQD) GetTopicMapRef() map[string]map[int]*Topic

should be protected by read lock

func (*NSQD) GetTopicPartitions

func (n *NSQD) GetTopicPartitions(topicName string) map[int]*Topic

func (*NSQD) GetTopicStats

func (n *NSQD) GetTopicStats(leaderOnly bool, topic string) []TopicStats

func (*NSQD) GetTopicStatsWithFilter

func (n *NSQD) GetTopicStatsWithFilter(leaderOnly bool, topic string, filterClients bool) []TopicStats

func (*NSQD) GetTopicWithDisabled

func (n *NSQD) GetTopicWithDisabled(topicName string, part int, ext bool, ordered bool) *Topic

func (*NSQD) GetTopicWithExt

func (n *NSQD) GetTopicWithExt(topicName string, part int, ordered bool) *Topic

func (*NSQD) IsAuthEnabled added in v0.2.29

func (n *NSQD) IsAuthEnabled() bool

func (*NSQD) IsHealthy added in v0.2.31

func (n *NSQD) IsHealthy() bool

func (*NSQD) LoadMetadata

func (n *NSQD) LoadMetadata(disabled int32)

func (*NSQD) NotifyDeleteTopic

func (n *NSQD) NotifyDeleteTopic(t *Topic)

func (*NSQD) NotifyPersistMetadata

func (n *NSQD) NotifyPersistMetadata()

func (*NSQD) NotifyScanDelayed

func (n *NSQD) NotifyScanDelayed(ch *Channel)

func (*NSQD) NotifyStateChanged

func (n *NSQD) NotifyStateChanged(v interface{}, needPersist bool)

func (*NSQD) PushTopicJob

func (n *NSQD) PushTopicJob(t *Topic, job func())

func (*NSQD) ReqToEnd

func (n *NSQD) ReqToEnd(ch *Channel, msg *Message, t time.Duration) error

func (*NSQD) SetHealth added in v0.2.31

func (n *NSQD) SetHealth(err error)

func (*NSQD) SetPubLoop

func (n *NSQD) SetPubLoop(loop func(t *Topic))

func (*NSQD) SetReqToEndCB

func (n *NSQD) SetReqToEndCB(reqToEndCB ReqToEndFunc)

func (*NSQD) SetTopicMagicCode

func (n *NSQD) SetTopicMagicCode(t *Topic, code int64) error

func (*NSQD) Start

func (n *NSQD) Start()

func (*NSQD) SwapOpts

func (n *NSQD) SwapOpts(opts *Options)

func (*NSQD) TriggerOptsNotification

func (n *NSQD) TriggerOptsNotification()

func (*NSQD) UpdateTopicHistoryStats

func (n *NSQD) UpdateTopicHistoryStats()

type Options added in v0.3.6

type Options struct {
	// basic options
	ID                         int64         `flag:"worker-id" cfg:"id"`
	Verbose                    bool          `flag:"verbose"`
	ClusterID                  string        `flag:"cluster-id"`
	ClusterLeadershipAddresses string        `flag:"cluster-leadership-addresses" cfg:"cluster_leadership_addresses"`
	ClusterLeadershipUsername  string        `flag:"cluster-leadership-username" cfg:"cluster_leadership_username"`
	ClusterLeadershipPassword  string        `flag:"cluster-leadership-password" cfg:"cluster_leadership_password"`
	ClusterLeadershipRootDir   string        `flag:"cluster-leadership-root-dir" cfg:"cluster_leadership_root_dir"`
	TCPAddress                 string        `flag:"tcp-address"`
	RPCPort                    string        `flag:"rpc-port"`
	ReverseProxyPort           string        `flag:"reverse-proxy-port"`
	HTTPAddress                string        `flag:"http-address"`
	HTTPSAddress               string        `flag:"https-address"`
	BroadcastAddress           string        `flag:"broadcast-address"`
	BroadcastInterface         string        `flag:"broadcast-interface"`
	NSQLookupdTCPAddresses     []string      `flag:"lookupd-tcp-address" cfg:"nsqlookupd_tcp_addresses"`
	AuthHTTPAddresses          []string      `flag:"auth-http-address" cfg:"auth_http_addresses"`
	LookupPingInterval         time.Duration `flag:"lookup-ping-interval" arg:"5s"`

	// diskqueue options
	DataPath        string        `flag:"data-path"`
	MemQueueSize    int64         `flag:"mem-queue-size"`
	MaxBytesPerFile int64         `flag:"max-bytes-per-file"`
	SyncEvery       int64         `flag:"sync-every"`
	SyncTimeout     time.Duration `flag:"sync-timeout"`

	QueueScanInterval          time.Duration `flag:"queue-scan-interval"`
	QueueScanRefreshInterval   time.Duration `flag:"queue-scan-refresh-interval"`
	QueueScanSelectionCount    int           `flag:"queue-scan-selection-count"`
	QueueScanWorkerPoolMax     int           `flag:"queue-scan-worker-pool-max"`
	QueueTopicJobWorkerPoolMax int           `flag:"queue-topic-job-worker-pool-max"`
	QueueScanDirtyPercent      float64       `flag:"queue-scan-dirty-percent"`

	// msg and command options
	MsgTimeout            time.Duration `flag:"msg-timeout" arg:"60s"`
	MaxMsgTimeout         time.Duration `flag:"max-msg-timeout"`
	MaxMsgSize            int64         `flag:"max-msg-size" deprecated:"max-message-size" cfg:"max_msg_size"`
	MaxBodySize           int64         `flag:"max-body-size"`
	MaxReqTimeout         time.Duration `flag:"max-req-timeout"`
	MaxConfirmWin         int64         `flag:"max-confirm-win"`
	MaxChannelDelayedQNum int64         `flag:"max-channel-delayed-qnum"`
	ClientTimeout         time.Duration
	ReqToEndThreshold     time.Duration `flag:"req-to-end-threshold"`

	// client overridable configuration options
	MaxHeartbeatInterval   time.Duration `flag:"max-heartbeat-interval"`
	MaxRdyCount            int64         `flag:"max-rdy-count"`
	MaxOutputBufferSize    int64         `flag:"max-output-buffer-size"`
	MaxOutputBufferTimeout time.Duration `flag:"max-output-buffer-timeout"`

	// statsd integration
	StatsdAddress  string        `flag:"statsd-address"`
	StatsdPrefix   string        `flag:"statsd-prefix"`
	StatsdProtocol string        `flag:"statsd-protocol"`
	StatsdInterval time.Duration `flag:"statsd-interval" arg:"60s"`
	StatsdMemStats bool          `flag:"statsd-mem-stats"`

	// e2e message latency
	E2EProcessingLatencyWindowTime  time.Duration `flag:"e2e-processing-latency-window-time"`
	E2EProcessingLatencyPercentiles []float64     `flag:"e2e-processing-latency-percentile" cfg:"e2e_processing_latency_percentiles"`

	// TLS config
	TLSCert             string `flag:"tls-cert"`
	TLSKey              string `flag:"tls-key"`
	TLSClientAuthPolicy string `flag:"tls-client-auth-policy"`
	TLSRootCAFile       string `flag:"tls-root-ca-file"`
	TLSRequired         int    `flag:"tls-required"`
	TLSMinVersion       uint16 `flag:"tls-min-version"`

	// compression
	DeflateEnabled  bool `flag:"deflate"`
	MaxDeflateLevel int  `flag:"max-deflate-level"`
	SnappyEnabled   bool `flag:"snappy"`

	LogLevel     int32  `flag:"log-level" cfg:"log_level"`
	LogDir       string `flag:"log-dir" cfg:"log_dir"`
	Logger       levellogger.Logger
	RemoteTracer string `flag:"remote-tracer"`

	RetentionDays         int32 `flag:"retention-days" cfg:"retention_days"`
	RetentionSizePerDay   int64 `flag:"retention-size-per-day" cfg:"retention_size_per_day"`
	StartAsFixMode        bool  `flag:"start-as-fix-mode"`
	AllowExtCompatible    bool  `flag:"allow-ext-compatible" cfg:"allow_ext_compatible"`
	AllowSubExtCompatible bool  `flag:"allow-sub-ext-compatible" cfg:"allow_sub_ext_compatible"`
	AllowZanTestSkip      bool  `flag:"allow-zan-test-skip"`
	DefaultCommitBuf      int32 `flag:"default-commit-buf" cfg:"default_commit_buf"`
	MaxCommitBuf          int32 `flag:"max-commit-buf" cfg:"max_commit_buf"`
	UseFsync              bool  `flag:"use-fsync"`
}

func NewOptions added in v0.3.6

func NewOptions() *Options

func (*Options) DecideBroadcast

func (opts *Options) DecideBroadcast() string

type PubInfo

type PubInfo struct {
	Done       chan struct{}
	MsgBody    []byte
	ExtContent ext.IExtContent
	StartPub   time.Time
	Err        error
}

type PubInfoChan

type PubInfoChan chan *PubInfo

type QueueInterval

type QueueInterval interface {
	Start() int64
	End() int64
	EndCnt() uint64
	augmentedtree.Interval
}

type ReadResult

type ReadResult struct {
	Offset    BackendOffset
	MovedSize BackendOffset
	CurCnt    int64
	Data      []byte
	Err       error
}

type RecentKeyList

type RecentKeyList [][]byte

type RemoteMsgTracer

type RemoteMsgTracer struct {
	// contains filtered or unexported fields
}

this tracer will send the trace info to remote server for each seconds

func (*RemoteMsgTracer) IsRemote

func (self *RemoteMsgTracer) IsRemote() bool

func (*RemoteMsgTracer) Start

func (self *RemoteMsgTracer) Start()

func (*RemoteMsgTracer) Stop

func (self *RemoteMsgTracer) Stop()

func (*RemoteMsgTracer) TracePub

func (self *RemoteMsgTracer) TracePub(topic string, part int, pubMethod string, traceID uint64, msg *Message, diskOffset BackendOffset, currentCnt int64)

func (*RemoteMsgTracer) TracePubClient

func (self *RemoteMsgTracer) TracePubClient(topic string, part int, traceID uint64, msgID MessageID, diskOffset BackendOffset, clientID string)

func (*RemoteMsgTracer) TraceSub

func (self *RemoteMsgTracer) TraceSub(topic string, channel string, state string, traceID uint64, msg *Message, clientID string, cost int64)

type ReqToEndFunc

type ReqToEndFunc func(*Channel, *Message, time.Duration) error

type Topic

type Topic struct {
	sync.Mutex

	EnableTrace int32
	// contains filtered or unexported fields
}

func NewTopic

func NewTopic(topicName string, part int, opt *Options,
	writeDisabled int32, metaStorage IMetaStorage,
	notify INsqdNotify, loopFunc func(v *Topic)) *Topic

func NewTopicWithExt

func NewTopicWithExt(topicName string, part int, ext bool, ordered bool, opt *Options,
	writeDisabled int32, metaStorage IMetaStorage,
	notify INsqdNotify, loopFunc func(v *Topic)) *Topic

Topic constructor

func (*Topic) AggregateChannelE2eProcessingLatency

func (t *Topic) AggregateChannelE2eProcessingLatency() *quantile.Quantile

func (*Topic) BufferPoolGet

func (t *Topic) BufferPoolGet(capacity int) *bytes.Buffer

func (*Topic) BufferPoolPut

func (t *Topic) BufferPoolPut(b *bytes.Buffer)

func (*Topic) Close

func (t *Topic) Close() error

Close persists all outstanding topic data and closes all its channels

func (*Topic) CloseExistingChannel

func (t *Topic) CloseExistingChannel(channelName string, deleteData bool) error

func (*Topic) Delete

func (t *Topic) Delete() error

Delete empties the topic and all its channels and closes

func (*Topic) DeleteExistingChannel

func (t *Topic) DeleteExistingChannel(channelName string) error

DeleteExistingChannel removes a channel from the topic only if it exists

func (*Topic) DisableForSlave

func (t *Topic) DisableForSlave()

func (*Topic) Empty

func (t *Topic) Empty() error

func (*Topic) EnableForMaster

func (t *Topic) EnableForMaster()

func (*Topic) Exiting

func (t *Topic) Exiting() bool

Exiting returns a boolean indicating if this topic is closed/exiting

func (*Topic) ForceFlush

func (t *Topic) ForceFlush()

func (*Topic) ForceFlushForChannels

func (t *Topic) ForceFlushForChannels(wait bool)

func (*Topic) GetChannel

func (t *Topic) GetChannel(channelName string) *Channel

GetChannel performs a thread safe operation to return a pointer to a Channel object (potentially new) for the given Topic

func (*Topic) GetChannelMapCopy

func (t *Topic) GetChannelMapCopy() map[string]*Channel

func (*Topic) GetChannelMeta

func (t *Topic) GetChannelMeta() []ChannelMetaInfo

func (*Topic) GetCommitted

func (t *Topic) GetCommitted() BackendQueueEnd

func (*Topic) GetDelayedQueue

func (t *Topic) GetDelayedQueue() *DelayQueue

func (*Topic) GetDelayedQueueConsumedState

func (t *Topic) GetDelayedQueueConsumedState() (int64, RecentKeyList, map[int]uint64, map[string]uint64)

func (*Topic) GetDetailStats

func (t *Topic) GetDetailStats() *DetailStatsInfo

func (*Topic) GetDiskQueueSnapshot

func (t *Topic) GetDiskQueueSnapshot() *DiskQueueSnapshot

func (*Topic) GetDynamicInfo

func (t *Topic) GetDynamicInfo() TopicDynamicConf

func (*Topic) GetExistingChannel

func (t *Topic) GetExistingChannel(channelName string) (*Channel, error)

func (*Topic) GetFullName

func (t *Topic) GetFullName() string

func (*Topic) GetMagicCode

func (t *Topic) GetMagicCode() int64

should be protected by the topic lock for all partitions

func (*Topic) GetOrCreateDelayedQueueNoLock

func (t *Topic) GetOrCreateDelayedQueueNoLock(idGen MsgIDGenerator) (*DelayQueue, error)

func (*Topic) GetQueueReadStart

func (t *Topic) GetQueueReadStart() int64

func (*Topic) GetTopicChannelDebugStat

func (t *Topic) GetTopicChannelDebugStat(channelName string) string

func (*Topic) GetTopicName

func (t *Topic) GetTopicName() string

func (*Topic) GetTopicPart

func (t *Topic) GetTopicPart() int

func (*Topic) GetWaitChan

func (t *Topic) GetWaitChan() PubInfoChan

func (*Topic) IncrPubFailed

func (t *Topic) IncrPubFailed()

func (*Topic) IsDataNeedFix

func (t *Topic) IsDataNeedFix() bool

func (*Topic) IsExt

func (t *Topic) IsExt() bool

func (*Topic) IsFsync

func (t *Topic) IsFsync() bool

func (*Topic) IsOrdered

func (t *Topic) IsOrdered() bool

func (*Topic) IsWriteDisabled

func (t *Topic) IsWriteDisabled() bool

func (*Topic) LoadChannelMeta

func (t *Topic) LoadChannelMeta() error

func (*Topic) LoadHistoryStats

func (t *Topic) LoadHistoryStats() error

func (*Topic) MarkAsRemoved

func (t *Topic) MarkAsRemoved() (string, error)

func (*Topic) NotifyReloadChannels

func (t *Topic) NotifyReloadChannels()

func (*Topic) PrintCurrentStats

func (t *Topic) PrintCurrentStats()

func (*Topic) PubFailed

func (t *Topic) PubFailed() int64

func (*Topic) PutMessage

PutMessage writes a Message to the queue

func (*Topic) PutMessageNoLock

func (t *Topic) PutMessageNoLock(m *Message) (MessageID, BackendOffset, int32, BackendQueueEnd, error)

func (*Topic) PutMessageOnReplica

func (t *Topic) PutMessageOnReplica(m *Message, offset BackendOffset, checkSize int64) (BackendQueueEnd, error)

func (*Topic) PutMessages

func (t *Topic) PutMessages(msgs []*Message) (MessageID, BackendOffset, int32, int64, BackendQueueEnd, error)

PutMessages writes multiple Messages to the queue

func (*Topic) PutMessagesNoLock

func (t *Topic) PutMessagesNoLock(msgs []*Message) (MessageID, BackendOffset, int32, int64, BackendQueueEnd, error)

func (*Topic) PutMessagesOnReplica

func (t *Topic) PutMessagesOnReplica(msgs []*Message, offset BackendOffset, checkSize int64) (BackendQueueEnd, error)

func (*Topic) PutRawDataOnReplica

func (t *Topic) PutRawDataOnReplica(rawData []byte, offset BackendOffset, checkSize int64, msgNum int32) (BackendQueueEnd, error)

func (*Topic) QuitChan

func (t *Topic) QuitChan() <-chan struct{}

func (*Topic) RemoveChannelMeta

func (t *Topic) RemoveChannelMeta()

func (*Topic) ResetBackendEndNoLock

func (t *Topic) ResetBackendEndNoLock(vend BackendOffset, totalCnt int64) error

func (*Topic) ResetBackendWithQueueStartNoLock

func (t *Topic) ResetBackendWithQueueStartNoLock(queueStartOffset int64, queueStartCnt int64) error

func (*Topic) RollbackNoLock

func (t *Topic) RollbackNoLock(vend BackendOffset, diffCnt uint64) error

func (*Topic) SaveChannelMeta

func (t *Topic) SaveChannelMeta() error

func (*Topic) SaveHistoryStats

func (t *Topic) SaveHistoryStats() error

func (*Topic) SetDataFixState

func (t *Topic) SetDataFixState(needFix bool)

func (*Topic) SetDynamicInfo

func (t *Topic) SetDynamicInfo(dynamicConf TopicDynamicConf, idGen MsgIDGenerator)

func (*Topic) SetMagicCode

func (t *Topic) SetMagicCode(code int64) error

should be protected by the topic lock for all partitions

func (*Topic) SetTrace

func (t *Topic) SetTrace(enable bool)

func (*Topic) TotalDataSize

func (t *Topic) TotalDataSize() int64

func (*Topic) TotalMessageCnt

func (t *Topic) TotalMessageCnt() uint64

func (*Topic) TryCleanOldData

func (t *Topic) TryCleanOldData(retentionSize int64, noRealClean bool, maxCleanOffset BackendOffset) (BackendQueueEnd, error)

maybe should return the cleaned offset to allow commit log clean

func (*Topic) TryFixData

func (t *Topic) TryFixData() error

after crash, some topic meta need to be fixed by manual

func (*Topic) UpdateCommittedOffset

func (t *Topic) UpdateCommittedOffset(offset BackendQueueEnd)

note: multiple writer should be protected by lock

func (*Topic) UpdateDelayedQueueConsumedState

func (t *Topic) UpdateDelayedQueueConsumedState(ts int64, keyList RecentKeyList, cntList map[int]uint64, channelCntList map[string]uint64) error

type TopicDynamicConf

type TopicDynamicConf struct {
	AutoCommit   int32
	RetentionDay int32
	SyncEvery    int64
	OrderedMulti bool
	Ext          bool
}

type TopicHistoryStatsInfo

type TopicHistoryStatsInfo struct {
	HourlyPubSize [24]int64
	// contains filtered or unexported fields
}

func (*TopicHistoryStatsInfo) UpdateHourlySize

func (self *TopicHistoryStatsInfo) UpdateHourlySize(curPubSize int64)

the slave should also update the pub size stat, since the slave need sync with leader (which will cost the write performance)

type TopicMsgStatsInfo

type TopicMsgStatsInfo struct {
	// <100bytes, <1KB, 2KB, 4KB, 8KB, 16KB, 32KB, 64KB, 128KB, 256KB, 512KB, 1MB, 2MB, 4MB
	MsgSizeStats [16]int64
	// <1024us, 2ms, 4ms, 8ms, 16ms, 32ms, 64ms, 128ms, 256ms, 512ms, 1024ms, 2048ms, 4s, 8s
	MsgWriteLatencyStats [16]int64
}

func (*TopicMsgStatsInfo) BatchUpdateMsgLatencyStats

func (self *TopicMsgStatsInfo) BatchUpdateMsgLatencyStats(latency int64, num int64)

func (*TopicMsgStatsInfo) UpdateMsgLatencyStats

func (self *TopicMsgStatsInfo) UpdateMsgLatencyStats(latency int64)

func (*TopicMsgStatsInfo) UpdateMsgSizeStats

func (self *TopicMsgStatsInfo) UpdateMsgSizeStats(msgSize int64)

func (*TopicMsgStatsInfo) UpdateMsgStats

func (self *TopicMsgStatsInfo) UpdateMsgStats(msgSize int64, latency int64)

type TopicStats

type TopicStats struct {
	TopicName            string           `json:"topic_name"`
	TopicFullName        string           `json:"topic_full_name"`
	TopicPartition       string           `json:"topic_partition"`
	Channels             []ChannelStats   `json:"channels"`
	Depth                int64            `json:"depth"`
	BackendDepth         int64            `json:"backend_depth"`
	BackendStart         int64            `json:"backend_start"`
	MessageCount         uint64           `json:"message_count"`
	IsLeader             bool             `json:"is_leader"`
	HourlyPubSize        int64            `json:"hourly_pubsize"`
	Clients              []ClientPubStats `json:"client_pub_stats"`
	MsgSizeStats         []int64          `json:"msg_size_stats"`
	MsgWriteLatencyStats []int64          `json:"msg_write_latency_stats"`
	IsMultiOrdered       bool             `json:"is_multi_ordered"`
	IsExt                bool             `json:"is_ext"`
	StatsdName           string           `json:"statsd_name"`
	PubFailedCnt         int64            `json:"pub_failed_cnt"`

	E2eProcessingLatency *quantile.Result `json:"e2e_processing_latency"`
}

func NewTopicStats

func NewTopicStats(t *Topic, channels []ChannelStats, filterClients bool) TopicStats

type Topics

type Topics []*Topic

func (Topics) Len

func (t Topics) Len() int

func (Topics) Swap

func (t Topics) Swap(i, j int)

type TopicsByName

type TopicsByName struct {
	Topics
}

func (TopicsByName) Less

func (t TopicsByName) Less(i, j int) bool

type TraceLogItemInfo

type TraceLogItemInfo struct {
	MsgID     uint64 `json:"msgid"`
	TraceID   uint64 `json:"traceid"`
	Topic     string `json:"topic"`
	Channel   string `json:"channel"`
	Timestamp int64  `json:"timestamp"`
	Action    string `json:"action"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL