v1.1.0 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Aug 19, 2018 License: MIT Imports: 49 Imported by: 0



nsqd is the daemon that receives, queues, and delivers messages to clients.

Read the docs




View Source
const (
View Source
const (
	TLSNotRequired = iota
View Source
const (
	MsgIDLength = 16


View Source
var ErrIDBackwards = errors.New("ID went backward")
View Source
var ErrSequenceExpired = errors.New("sequence expired")
View Source
var ErrTimeBackwards = errors.New("time has gone backwards")


func NewGUIDFactory added in v1.1.0

func NewGUIDFactory(nodeID int64) *guidFactory


type BackendQueue

type BackendQueue interface {
	Put([]byte) error
	ReadChan() chan []byte // this is expected to be an *unbuffered* channel
	Close() error
	Delete() error
	Depth() int64
	Empty() error

BackendQueue represents the behavior for the secondary message storage system

type Channel

type Channel struct {
	// contains filtered or unexported fields

Channel represents the concrete type for a NSQ channel (and also implements the Queue interface)

There can be multiple channels per topic, each with there own unique set of subscribers (clients).

Channels maintain all client and message metadata, orchestrating in-flight messages, timeouts, requeuing, etc.

func NewChannel

func NewChannel(topicName string, channelName string, ctx *context,
	deleteCallback func(*Channel)) *Channel

NewChannel creates a new instance of the Channel type and returns a pointer

func (*Channel) AddClient

func (c *Channel) AddClient(clientID int64, client Consumer)

AddClient adds a client to the Channel's client list

func (*Channel) Close

func (c *Channel) Close() error

Close cleanly closes the Channel

func (*Channel) Delete

func (c *Channel) Delete() error

Delete empties the channel and closes

func (*Channel) Depth

func (c *Channel) Depth() int64

func (*Channel) Empty added in v0.2.17

func (c *Channel) Empty() error

func (*Channel) Exiting

func (c *Channel) Exiting() bool

Exiting returns a boolean indicating if this channel is closed/exiting

func (*Channel) FinishMessage

func (c *Channel) FinishMessage(clientID int64, id MessageID) error

FinishMessage successfully discards an in-flight message

func (*Channel) IsPaused

func (c *Channel) IsPaused() bool

func (*Channel) Pause

func (c *Channel) Pause() error

func (*Channel) PutMessage

func (c *Channel) PutMessage(m *Message) error

PutMessage writes a Message to the queue

func (*Channel) PutMessageDeferred added in v1.1.0

func (c *Channel) PutMessageDeferred(msg *Message, timeout time.Duration)

func (*Channel) RemoveClient

func (c *Channel) RemoveClient(clientID int64)

RemoveClient removes a client from the Channel's client list

func (*Channel) RequeueMessage

func (c *Channel) RequeueMessage(clientID int64, id MessageID, timeout time.Duration) error

RequeueMessage requeues a message based on `time.Duration`, ie:

`timeoutMs` == 0 - requeue a message immediately `timeoutMs` > 0 - asynchronously wait for the specified timeout

and requeue a message (aka "deferred requeue")

func (*Channel) StartDeferredTimeout

func (c *Channel) StartDeferredTimeout(msg *Message, timeout time.Duration) error

func (*Channel) StartInFlightTimeout

func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error

func (*Channel) TouchMessage added in v0.2.17

func (c *Channel) TouchMessage(clientID int64, id MessageID, clientMsgTimeout time.Duration) error

TouchMessage resets the timeout for an in-flight message

func (*Channel) UnPause

func (c *Channel) UnPause() error

type ChannelStats added in v0.2.16

type ChannelStats struct {
	ChannelName   string        `json:"channel_name"`
	Depth         int64         `json:"depth"`
	BackendDepth  int64         `json:"backend_depth"`
	InFlightCount int           `json:"in_flight_count"`
	DeferredCount int           `json:"deferred_count"`
	MessageCount  uint64        `json:"message_count"`
	RequeueCount  uint64        `json:"requeue_count"`
	TimeoutCount  uint64        `json:"timeout_count"`
	Clients       []ClientStats `json:"clients"`
	Paused        bool          `json:"paused"`

	E2eProcessingLatency *quantile.Result `json:"e2e_processing_latency"`

func NewChannelStats added in v0.2.16

func NewChannelStats(c *Channel, clients []ClientStats) ChannelStats

type Channels

type Channels []*Channel

func (Channels) Len

func (c Channels) Len() int

func (Channels) Swap

func (c Channels) Swap(i, j int)

type ChannelsByName

type ChannelsByName struct {

func (ChannelsByName) Less

func (c ChannelsByName) Less(i, j int) bool

type ClientStats

type ClientStats struct {
	ClientID        string `json:"client_id"`
	Hostname        string `json:"hostname"`
	Version         string `json:"version"`
	RemoteAddress   string `json:"remote_address"`
	State           int32  `json:"state"`
	ReadyCount      int64  `json:"ready_count"`
	InFlightCount   int64  `json:"in_flight_count"`
	MessageCount    uint64 `json:"message_count"`
	FinishCount     uint64 `json:"finish_count"`
	RequeueCount    uint64 `json:"requeue_count"`
	ConnectTime     int64  `json:"connect_ts"`
	SampleRate      int32  `json:"sample_rate"`
	Deflate         bool   `json:"deflate"`
	Snappy          bool   `json:"snappy"`
	UserAgent       string `json:"user_agent"`
	Authed          bool   `json:"authed,omitempty"`
	AuthIdentity    string `json:"auth_identity,omitempty"`
	AuthIdentityURL string `json:"auth_identity_url,omitempty"`

	TLS                           bool   `json:"tls"`
	CipherSuite                   string `json:"tls_cipher_suite"`
	TLSVersion                    string `json:"tls_version"`
	TLSNegotiatedProtocol         string `json:"tls_negotiated_protocol"`
	TLSNegotiatedProtocolIsMutual bool   `json:"tls_negotiated_protocol_is_mutual"`

type Consumer

type Consumer interface {
	Close() error
	Stats() ClientStats

type Logger added in v1.1.0

type Logger lg.Logger

type Message added in v0.2.29

type Message struct {
	ID        MessageID
	Body      []byte
	Timestamp int64
	Attempts  uint16
	// contains filtered or unexported fields

func NewMessage added in v0.2.29

func NewMessage(id MessageID, body []byte) *Message

func (*Message) WriteTo added in v0.2.29

func (m *Message) WriteTo(w io.Writer) (int64, error)

type MessageID added in v0.2.29

type MessageID [MsgIDLength]byte

type NSQD added in v0.2.25

type NSQD struct {
	// contains filtered or unexported fields

func New added in v0.3.6

func New(opts *Options) *NSQD

func (*NSQD) DeleteExistingTopic added in v0.2.25

func (n *NSQD) DeleteExistingTopic(topicName string) error

DeleteExistingTopic removes a topic only if it exists

func (*NSQD) Exit added in v0.2.25

func (n *NSQD) Exit()

func (*NSQD) GetError added in v0.2.31

func (n *NSQD) GetError() error

func (*NSQD) GetExistingTopic added in v0.2.25

func (n *NSQD) GetExistingTopic(topicName string) (*Topic, error)

GetExistingTopic gets a topic only if it exists

func (*NSQD) GetHealth added in v0.2.31

func (n *NSQD) GetHealth() string

func (*NSQD) GetStartTime added in v0.3.3

func (n *NSQD) GetStartTime() time.Time

func (*NSQD) GetStats added in v0.2.28

func (n *NSQD) GetStats(topic string, channel string) []TopicStats

func (*NSQD) GetTopic added in v0.2.25

func (n *NSQD) GetTopic(topicName string) *Topic

GetTopic performs a thread safe operation to return a pointer to a Topic object (potentially new)

func (*NSQD) IsAuthEnabled added in v0.2.29

func (n *NSQD) IsAuthEnabled() bool

func (*NSQD) IsHealthy added in v0.2.31

func (n *NSQD) IsHealthy() bool

func (*NSQD) LoadMetadata added in v0.2.25

func (n *NSQD) LoadMetadata() error

func (*NSQD) Main added in v0.2.25

func (n *NSQD) Main()

func (*NSQD) Notify added in v0.2.25

func (n *NSQD) Notify(v interface{})

func (*NSQD) PersistMetadata added in v0.2.25

func (n *NSQD) PersistMetadata() error

func (*NSQD) RealHTTPAddr added in v0.3.3

func (n *NSQD) RealHTTPAddr() *net.TCPAddr

func (*NSQD) RealHTTPSAddr added in v0.3.6

func (n *NSQD) RealHTTPSAddr() *net.TCPAddr

func (*NSQD) RealTCPAddr added in v0.3.3

func (n *NSQD) RealTCPAddr() *net.TCPAddr

func (*NSQD) SetHealth added in v0.2.31

func (n *NSQD) SetHealth(err error)

type Options added in v0.3.6

type Options struct {
	// basic options
	ID        int64  `flag:"node-id" cfg:"id"`
	LogLevel  string `flag:"log-level"`
	LogPrefix string `flag:"log-prefix"`
	Verbose   bool   `flag:"verbose"` // for backwards compatibility
	Logger    Logger

	TCPAddress               string        `flag:"tcp-address"`
	HTTPAddress              string        `flag:"http-address"`
	HTTPSAddress             string        `flag:"https-address"`
	BroadcastAddress         string        `flag:"broadcast-address"`
	NSQLookupdTCPAddresses   []string      `flag:"lookupd-tcp-address" cfg:"nsqlookupd_tcp_addresses"`
	AuthHTTPAddresses        []string      `flag:"auth-http-address" cfg:"auth_http_addresses"`
	HTTPClientConnectTimeout time.Duration `flag:"http-client-connect-timeout" cfg:"http_client_connect_timeout"`
	HTTPClientRequestTimeout time.Duration `flag:"http-client-request-timeout" cfg:"http_client_request_timeout"`

	// diskqueue options
	DataPath        string        `flag:"data-path"`
	MemQueueSize    int64         `flag:"mem-queue-size"`
	MaxBytesPerFile int64         `flag:"max-bytes-per-file"`
	SyncEvery       int64         `flag:"sync-every"`
	SyncTimeout     time.Duration `flag:"sync-timeout"`

	QueueScanInterval        time.Duration
	QueueScanRefreshInterval time.Duration
	QueueScanSelectionCount  int
	QueueScanWorkerPoolMax   int
	QueueScanDirtyPercent    float64

	// msg and command options
	MsgTimeout    time.Duration `flag:"msg-timeout"`
	MaxMsgTimeout time.Duration `flag:"max-msg-timeout"`
	MaxMsgSize    int64         `flag:"max-msg-size"`
	MaxBodySize   int64         `flag:"max-body-size"`
	MaxReqTimeout time.Duration `flag:"max-req-timeout"`
	ClientTimeout time.Duration

	// client overridable configuration options
	MaxHeartbeatInterval   time.Duration `flag:"max-heartbeat-interval"`
	MaxRdyCount            int64         `flag:"max-rdy-count"`
	MaxOutputBufferSize    int64         `flag:"max-output-buffer-size"`
	MaxOutputBufferTimeout time.Duration `flag:"max-output-buffer-timeout"`

	// statsd integration
	StatsdAddress       string        `flag:"statsd-address"`
	StatsdPrefix        string        `flag:"statsd-prefix"`
	StatsdInterval      time.Duration `flag:"statsd-interval"`
	StatsdMemStats      bool          `flag:"statsd-mem-stats"`
	StatsdUDPPacketSize int           `flag:"statsd-udp-packet-size"`

	// e2e message latency
	E2EProcessingLatencyWindowTime  time.Duration `flag:"e2e-processing-latency-window-time"`
	E2EProcessingLatencyPercentiles []float64     `flag:"e2e-processing-latency-percentile" cfg:"e2e_processing_latency_percentiles"`

	// TLS config
	TLSCert             string `flag:"tls-cert"`
	TLSKey              string `flag:"tls-key"`
	TLSClientAuthPolicy string `flag:"tls-client-auth-policy"`
	TLSRootCAFile       string `flag:"tls-root-ca-file"`
	TLSRequired         int    `flag:"tls-required"`
	TLSMinVersion       uint16 `flag:"tls-min-version"`

	// compression
	DeflateEnabled  bool `flag:"deflate"`
	MaxDeflateLevel int  `flag:"max-deflate-level"`
	SnappyEnabled   bool `flag:"snappy"`
	// contains filtered or unexported fields

func NewOptions added in v0.3.6

func NewOptions() *Options

type Topic

type Topic struct {
	// contains filtered or unexported fields

func NewTopic

func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic

Topic constructor

func (*Topic) AggregateChannelE2eProcessingLatency added in v0.2.24

func (t *Topic) AggregateChannelE2eProcessingLatency() *quantile.Quantile

func (*Topic) Close

func (t *Topic) Close() error

Close persists all outstanding topic data and closes all its channels

func (*Topic) Delete

func (t *Topic) Delete() error

Delete empties the topic and all its channels and closes

func (*Topic) DeleteExistingChannel

func (t *Topic) DeleteExistingChannel(channelName string) error

DeleteExistingChannel removes a channel from the topic only if it exists

func (*Topic) Depth

func (t *Topic) Depth() int64

func (*Topic) Empty added in v0.2.17

func (t *Topic) Empty() error

func (*Topic) Exiting

func (t *Topic) Exiting() bool

Exiting returns a boolean indicating if this topic is closed/exiting

func (*Topic) GenerateID added in v1.1.0

func (t *Topic) GenerateID() MessageID

func (*Topic) GetChannel

func (t *Topic) GetChannel(channelName string) *Channel

GetChannel performs a thread safe operation to return a pointer to a Channel object (potentially new) for the given Topic

func (*Topic) GetExistingChannel

func (t *Topic) GetExistingChannel(channelName string) (*Channel, error)

func (*Topic) IsPaused added in v0.2.25

func (t *Topic) IsPaused() bool

func (*Topic) Pause added in v0.2.25

func (t *Topic) Pause() error

func (*Topic) PutMessage

func (t *Topic) PutMessage(m *Message) error

PutMessage writes a Message to the queue

func (*Topic) PutMessages added in v0.2.17

func (t *Topic) PutMessages(msgs []*Message) error

PutMessages writes multiple Messages to the queue

func (*Topic) Start added in v1.1.0

func (t *Topic) Start()

func (*Topic) UnPause added in v0.2.25

func (t *Topic) UnPause() error

type TopicStats added in v0.2.16

type TopicStats struct {
	TopicName    string         `json:"topic_name"`
	Channels     []ChannelStats `json:"channels"`
	Depth        int64          `json:"depth"`
	BackendDepth int64          `json:"backend_depth"`
	MessageCount uint64         `json:"message_count"`
	Paused       bool           `json:"paused"`

	E2eProcessingLatency *quantile.Result `json:"e2e_processing_latency"`

func NewTopicStats added in v0.2.16

func NewTopicStats(t *Topic, channels []ChannelStats) TopicStats

type Topics

type Topics []*Topic

func (Topics) Len

func (t Topics) Len() int

func (Topics) Swap

func (t Topics) Swap(i, j int)

type TopicsByName

type TopicsByName struct {

func (TopicsByName) Less

func (t TopicsByName) Less(i, j int) bool

type Uint64Slice added in v0.2.22

type Uint64Slice []uint64

func (Uint64Slice) Len added in v0.2.22

func (s Uint64Slice) Len() int

func (Uint64Slice) Less added in v0.2.22

func (s Uint64Slice) Less(i, j int) bool

func (Uint64Slice) Swap added in v0.2.22

func (s Uint64Slice) Swap(i, j int)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL