Documentation ¶
Index ¶
- Constants
- Variables
- func NewGUIDFactory(nodeID int64) *guidFactory
- type BackendQueue
- type Channel
- func (c *Channel) AddClient(clientID int64, client Consumer) error
- func (c *Channel) Close() error
- func (c *Channel) Delete() error
- func (c *Channel) Depth() int64
- func (c *Channel) Empty() error
- func (c *Channel) Exiting() bool
- func (c *Channel) FinishMessage(clientID int64, id MessageID) error
- func (c *Channel) IsPaused() bool
- func (c *Channel) Pause() error
- func (c *Channel) PutMessage(m *Message) error
- func (c *Channel) PutMessageDeferred(msg *Message, timeout time.Duration)
- func (c *Channel) RemoveClient(clientID int64)
- func (c *Channel) RequeueMessage(clientID int64, id MessageID, timeout time.Duration) error
- func (c *Channel) StartDeferredTimeout(msg *Message, timeout time.Duration) error
- func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error
- func (c *Channel) TouchMessage(clientID int64, id MessageID, clientMsgTimeout time.Duration) error
- func (c *Channel) UnPause() error
- type ChannelMetadata
- type ChannelStats
- type Channels
- type ChannelsByName
- type Client
- type ClientStats
- type ClientV2Stats
- type Consumer
- type Logger
- type Message
- type MessageID
- type Metadata
- type NSQD
- func (n *NSQD) Context() context.Context
- func (n *NSQD) DeleteExistingTopic(topicName string) error
- func (n *NSQD) Exit()
- func (n *NSQD) GetError() error
- func (n *NSQD) GetExistingTopic(topicName string) (*Topic, error)
- func (n *NSQD) GetHealth() string
- func (n *NSQD) GetMetadata(ephemeral bool) *Metadata
- func (n *NSQD) GetStartTime() time.Time
- func (n *NSQD) GetStats(topic string, channel string, includeClients bool) Stats
- func (n *NSQD) GetTopic(topicName string) *Topic
- func (n *NSQD) IsAuthEnabled() bool
- func (n *NSQD) IsHealthy() bool
- func (n *NSQD) LoadMetadata() error
- func (n *NSQD) Main() error
- func (n *NSQD) Notify(v interface{}, persist bool)
- func (n *NSQD) PersistMetadata() error
- func (n *NSQD) RealHTTPAddr() net.Addr
- func (n *NSQD) RealHTTPSAddr() *net.TCPAddr
- func (n *NSQD) RealTCPAddr() net.Addr
- func (n *NSQD) SetHealth(err error)
- type Options
- type PubCount
- type Stats
- type Topic
- func (t *Topic) AggregateChannelE2eProcessingLatency() *quantile.Quantile
- func (t *Topic) Close() error
- func (t *Topic) Delete() error
- func (t *Topic) DeleteExistingChannel(channelName string) error
- func (t *Topic) Depth() int64
- func (t *Topic) Empty() error
- func (t *Topic) Exiting() bool
- func (t *Topic) GenerateID() MessageID
- func (t *Topic) GetChannel(channelName string) *Channel
- func (t *Topic) GetExistingChannel(channelName string) (*Channel, error)
- func (t *Topic) IsPaused() bool
- func (t *Topic) Pause() error
- func (t *Topic) PutMessage(m *Message) error
- func (t *Topic) PutMessages(msgs []*Message) error
- func (t *Topic) Start()
- func (t *Topic) UnPause() error
- type TopicMetadata
- type TopicStats
- type Topics
- type TopicsByName
- type Uint64Slice
Constants ¶
const ( LOG_DEBUG = lg.DEBUG LOG_INFO = lg.INFO LOG_WARN = lg.WARN LOG_ERROR = lg.ERROR LOG_FATAL = lg.FATAL )
const ( TLSNotRequired = iota TLSRequiredExceptHTTP TLSRequired )
const (
MsgIDLength = 16
)
Variables ¶
var ErrIDBackwards = errors.New("ID went backward")
var ErrSequenceExpired = errors.New("sequence expired")
var ErrTimeBackwards = errors.New("time has gone backwards")
Functions ¶
func NewGUIDFactory ¶ added in v1.1.0
func NewGUIDFactory(nodeID int64) *guidFactory
Types ¶
type BackendQueue ¶
type BackendQueue interface { Put([]byte) error ReadChan() <-chan []byte // this is expected to be an *unbuffered* channel Close() error Delete() error Depth() int64 Empty() error }
BackendQueue represents the behavior for the secondary message storage system
type Channel ¶
Channel represents the concrete type for a NSQ channel (and also implements the Queue interface)
There can be multiple channels per topic, each with there own unique set of subscribers (clients).
Channels maintain all client and message metadata, orchestrating in-flight messages, timeouts, requeuing, etc.
func NewChannel ¶
func NewChannel(topicName string, channelName string, nsqd *NSQD, deleteCallback func(*Channel)) *Channel
NewChannel creates a new instance of the Channel type and returns a pointer
func (*Channel) FinishMessage ¶
FinishMessage successfully discards an in-flight message
func (*Channel) PutMessage ¶
PutMessage writes a Message to the queue
func (*Channel) PutMessageDeferred ¶ added in v1.1.0
func (*Channel) RemoveClient ¶
RemoveClient removes a client from the Channel's client list
func (*Channel) RequeueMessage ¶
RequeueMessage requeues a message based on `time.Duration`, ie:
`timeoutMs` == 0 - requeue a message immediately `timeoutMs` > 0 - asynchronously wait for the specified timeout
and requeue a message (aka "deferred requeue")
func (*Channel) StartDeferredTimeout ¶
func (*Channel) StartInFlightTimeout ¶
func (*Channel) TouchMessage ¶ added in v0.2.17
TouchMessage resets the timeout for an in-flight message
type ChannelMetadata ¶ added in v1.3.0
ChannelMetadata is the collection of persistent information about a channel.
type ChannelStats ¶ added in v0.2.16
type ChannelStats struct { ChannelName string `json:"channel_name"` Depth int64 `json:"depth"` BackendDepth int64 `json:"backend_depth"` InFlightCount int `json:"in_flight_count"` DeferredCount int `json:"deferred_count"` MessageCount uint64 `json:"message_count"` RequeueCount uint64 `json:"requeue_count"` TimeoutCount uint64 `json:"timeout_count"` ClientCount int `json:"client_count"` Clients []ClientStats `json:"clients"` Paused bool `json:"paused"` E2eProcessingLatency *quantile.Result `json:"e2e_processing_latency"` }
func NewChannelStats ¶ added in v0.2.16
func NewChannelStats(c *Channel, clients []ClientStats, clientCount int) ChannelStats
type ChannelsByName ¶
type ChannelsByName struct {
Channels
}
func (ChannelsByName) Less ¶
func (c ChannelsByName) Less(i, j int) bool
type Client ¶ added in v1.2.0
type Client interface { Type() int Stats(string) ClientStats }
type ClientStats ¶
type ClientStats interface {
String() string
}
type ClientV2Stats ¶ added in v1.2.1
type ClientV2Stats struct { ClientID string `json:"client_id"` Hostname string `json:"hostname"` Version string `json:"version"` RemoteAddress string `json:"remote_address"` State int32 `json:"state"` ReadyCount int64 `json:"ready_count"` InFlightCount int64 `json:"in_flight_count"` MessageCount uint64 `json:"message_count"` FinishCount uint64 `json:"finish_count"` RequeueCount uint64 `json:"requeue_count"` ConnectTime int64 `json:"connect_ts"` SampleRate int32 `json:"sample_rate"` Deflate bool `json:"deflate"` Snappy bool `json:"snappy"` UserAgent string `json:"user_agent"` Authed bool `json:"authed,omitempty"` AuthIdentity string `json:"auth_identity,omitempty"` AuthIdentityURL string `json:"auth_identity_url,omitempty"` PubCounts []PubCount `json:"pub_counts,omitempty"` TLS bool `json:"tls"` CipherSuite string `json:"tls_cipher_suite"` TLSVersion string `json:"tls_version"` TLSNegotiatedProtocol string `json:"tls_negotiated_protocol"` TLSNegotiatedProtocolIsMutual bool `json:"tls_negotiated_protocol_is_mutual"` }
func (ClientV2Stats) String ¶ added in v1.2.1
func (s ClientV2Stats) String() string
type Consumer ¶
type Consumer interface { UnPause() Pause() Close() error TimedOutMessage() Stats(string) ClientStats Empty() }
type Message ¶ added in v0.2.29
type Message struct { ID MessageID Body []byte Timestamp int64 Attempts uint16 // contains filtered or unexported fields }
func NewMessage ¶ added in v0.2.29
type MessageID ¶ added in v0.2.29
type MessageID [MsgIDLength]byte
type Metadata ¶ added in v1.3.0
type Metadata struct { Topics []TopicMetadata `json:"topics"` Version string `json:"version"` }
Metadata is the collection of persistent information about the current NSQD.
type NSQD ¶ added in v0.2.25
func (*NSQD) Context ¶ added in v1.2.1
Context returns a context that will be canceled when nsqd initiates the shutdown
func (*NSQD) DeleteExistingTopic ¶ added in v0.2.25
DeleteExistingTopic removes a topic only if it exists
func (*NSQD) GetExistingTopic ¶ added in v0.2.25
GetExistingTopic gets a topic only if it exists
func (*NSQD) GetMetadata ¶ added in v1.3.0
GetMetadata retrieves the current topic and channel set of the NSQ daemon. If the ephemeral flag is set, ephemeral topics are also returned even though these are not saved to disk.
func (*NSQD) GetStartTime ¶ added in v0.3.3
func (*NSQD) GetTopic ¶ added in v0.2.25
GetTopic performs a thread safe operation to return a pointer to a Topic object (potentially new)
func (*NSQD) IsAuthEnabled ¶ added in v0.2.29
func (*NSQD) LoadMetadata ¶ added in v0.2.25
func (*NSQD) PersistMetadata ¶ added in v0.2.25
func (*NSQD) RealHTTPAddr ¶ added in v0.3.3
func (*NSQD) RealHTTPSAddr ¶ added in v0.3.6
func (*NSQD) RealTCPAddr ¶ added in v0.3.3
type Options ¶ added in v0.3.6
type Options struct { // basic options ID int64 `flag:"node-id" cfg:"id"` LogLevel lg.LogLevel `flag:"log-level"` LogPrefix string `flag:"log-prefix"` Logger Logger TCPAddress string `flag:"tcp-address"` HTTPAddress string `flag:"http-address"` HTTPSAddress string `flag:"https-address"` BroadcastAddress string `flag:"broadcast-address"` BroadcastTCPPort int `flag:"broadcast-tcp-port"` BroadcastHTTPPort int `flag:"broadcast-http-port"` NSQLookupdTCPAddresses []string `flag:"lookupd-tcp-address" cfg:"nsqlookupd_tcp_addresses"` AuthHTTPAddresses []string `flag:"auth-http-address" cfg:"auth_http_addresses"` HTTPClientConnectTimeout time.Duration `flag:"http-client-connect-timeout" cfg:"http_client_connect_timeout"` HTTPClientRequestTimeout time.Duration `flag:"http-client-request-timeout" cfg:"http_client_request_timeout"` // diskqueue options DataPath string `flag:"data-path"` MemQueueSize int64 `flag:"mem-queue-size"` MaxBytesPerFile int64 `flag:"max-bytes-per-file"` SyncEvery int64 `flag:"sync-every"` SyncTimeout time.Duration `flag:"sync-timeout"` QueueScanInterval time.Duration QueueScanRefreshInterval time.Duration QueueScanSelectionCount int `flag:"queue-scan-selection-count"` QueueScanWorkerPoolMax int `flag:"queue-scan-worker-pool-max"` QueueScanDirtyPercent float64 // msg and command options MsgTimeout time.Duration `flag:"msg-timeout"` MaxMsgTimeout time.Duration `flag:"max-msg-timeout"` MaxMsgSize int64 `flag:"max-msg-size"` MaxBodySize int64 `flag:"max-body-size"` MaxReqTimeout time.Duration `flag:"max-req-timeout"` ClientTimeout time.Duration // client overridable configuration options MaxHeartbeatInterval time.Duration `flag:"max-heartbeat-interval"` MaxRdyCount int64 `flag:"max-rdy-count"` MaxOutputBufferSize int64 `flag:"max-output-buffer-size"` MaxOutputBufferTimeout time.Duration `flag:"max-output-buffer-timeout"` MinOutputBufferTimeout time.Duration `flag:"min-output-buffer-timeout"` OutputBufferTimeout time.Duration `flag:"output-buffer-timeout"` MaxChannelConsumers int `flag:"max-channel-consumers"` // statsd integration StatsdAddress string `flag:"statsd-address"` StatsdPrefix string `flag:"statsd-prefix"` StatsdInterval time.Duration `flag:"statsd-interval"` StatsdMemStats bool `flag:"statsd-mem-stats"` StatsdUDPPacketSize int `flag:"statsd-udp-packet-size"` StatsdExcludeEphemeral bool `flag:"statsd-exclude-ephemeral"` // e2e message latency E2EProcessingLatencyWindowTime time.Duration `flag:"e2e-processing-latency-window-time"` E2EProcessingLatencyPercentiles []float64 `flag:"e2e-processing-latency-percentile" cfg:"e2e_processing_latency_percentiles"` // TLS config TLSCert string `flag:"tls-cert"` TLSKey string `flag:"tls-key"` TLSClientAuthPolicy string `flag:"tls-client-auth-policy"` TLSRootCAFile string `flag:"tls-root-ca-file"` TLSRequired int `flag:"tls-required"` TLSMinVersion uint16 `flag:"tls-min-version"` // compression DeflateEnabled bool `flag:"deflate"` MaxDeflateLevel int `flag:"max-deflate-level"` SnappyEnabled bool `flag:"snappy"` }
func NewOptions ¶ added in v0.3.6
func NewOptions() *Options
type Stats ¶ added in v1.2.1
type Stats struct { Topics []TopicStats Producers []ClientStats }
type Topic ¶
func (*Topic) AggregateChannelE2eProcessingLatency ¶ added in v0.2.24
func (*Topic) DeleteExistingChannel ¶
DeleteExistingChannel removes a channel from the topic only if it exists
func (*Topic) GenerateID ¶ added in v1.1.0
func (*Topic) GetChannel ¶
GetChannel performs a thread safe operation to return a pointer to a Channel object (potentially new) for the given Topic
func (*Topic) GetExistingChannel ¶
func (*Topic) PutMessage ¶
PutMessage writes a Message to the queue
func (*Topic) PutMessages ¶ added in v0.2.17
PutMessages writes multiple Messages to the queue
type TopicMetadata ¶ added in v1.3.0
type TopicMetadata struct { Name string `json:"name"` Paused bool `json:"paused"` Channels []ChannelMetadata `json:"channels"` }
TopicMetadata is the collection of persistent information about a topic.
type TopicStats ¶ added in v0.2.16
type TopicStats struct { TopicName string `json:"topic_name"` Channels []ChannelStats `json:"channels"` Depth int64 `json:"depth"` BackendDepth int64 `json:"backend_depth"` MessageCount uint64 `json:"message_count"` MessageBytes uint64 `json:"message_bytes"` Paused bool `json:"paused"` E2eProcessingLatency *quantile.Result `json:"e2e_processing_latency"` }
func NewTopicStats ¶ added in v0.2.16
func NewTopicStats(t *Topic, channels []ChannelStats) TopicStats
type TopicsByName ¶
type TopicsByName struct {
Topics
}
func (TopicsByName) Less ¶
func (t TopicsByName) Less(i, j int) bool
type Uint64Slice ¶ added in v0.2.22
type Uint64Slice []uint64
func (Uint64Slice) Len ¶ added in v0.2.22
func (s Uint64Slice) Len() int
func (Uint64Slice) Less ¶ added in v0.2.22
func (s Uint64Slice) Less(i, j int) bool
func (Uint64Slice) Swap ¶ added in v0.2.22
func (s Uint64Slice) Swap(i, j int)