Documentation ¶
Index ¶
- Constants
- Variables
- func NewNSQDOptions() *nsqdOptions
- type BackendQueue
- type Channel
- func (c *Channel) AddClient(clientID int64, client Consumer)
- func (c *Channel) Close() error
- func (c *Channel) Delete() error
- func (c *Channel) Depth() int64
- func (c *Channel) Empty() error
- func (c *Channel) Exiting() bool
- func (c *Channel) FinishMessage(clientID int64, id MessageID) error
- func (c *Channel) IsPaused() bool
- func (c *Channel) Pause() error
- func (c *Channel) PutMessage(m *Message) error
- func (c *Channel) RemoveClient(clientID int64)
- func (c *Channel) RequeueMessage(clientID int64, id MessageID, timeout time.Duration) error
- func (c *Channel) StartDeferredTimeout(msg *Message, timeout time.Duration) error
- func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error
- func (c *Channel) TouchMessage(clientID int64, id MessageID, clientMsgTimeout time.Duration) error
- func (c *Channel) UnPause() error
- type ChannelStats
- type Channels
- type ChannelsByName
- type ClientStats
- type Consumer
- type Message
- type MessageID
- type NSQD
- func (n *NSQD) DeleteExistingTopic(topicName string) error
- func (n *NSQD) Exit()
- func (n *NSQD) GetError() error
- func (n *NSQD) GetExistingTopic(topicName string) (*Topic, error)
- func (n *NSQD) GetHealth() string
- func (n *NSQD) GetStats() []TopicStats
- func (n *NSQD) GetTopic(topicName string) *Topic
- func (n *NSQD) IsAuthEnabled() bool
- func (n *NSQD) IsHealthy() bool
- func (n *NSQD) LoadMetadata()
- func (n *NSQD) Main()
- func (n *NSQD) Notify(v interface{})
- func (n *NSQD) PersistMetadata() error
- func (n *NSQD) SetHealth(err error)
- type Topic
- func (t *Topic) AggregateChannelE2eProcessingLatency() *util.Quantile
- func (t *Topic) Close() error
- func (t *Topic) Delete() error
- func (t *Topic) DeleteExistingChannel(channelName string) error
- func (t *Topic) Depth() int64
- func (t *Topic) Empty() error
- func (t *Topic) Exiting() bool
- func (t *Topic) GetChannel(channelName string) *Channel
- func (t *Topic) GetExistingChannel(channelName string) (*Channel, error)
- func (t *Topic) IsPaused() bool
- func (t *Topic) Pause() error
- func (t *Topic) PutMessage(m *Message) error
- func (t *Topic) PutMessages(msgs []*Message) error
- func (t *Topic) UnPause() error
- type TopicStats
- type Topics
- type TopicsByName
- type Uint64Slice
Constants ¶
const MsgIDLength = 16
Variables ¶
var ErrSequenceExpired = errors.New("sequence expired")
var ErrTimeBackwards = errors.New("time has gone backwards")
Functions ¶
func NewNSQDOptions ¶
func NewNSQDOptions() *nsqdOptions
Types ¶
type BackendQueue ¶
type BackendQueue interface { Put([]byte) error ReadChan() chan []byte // this is expected to be an *unbuffered* channel Close() error Delete() error Depth() int64 Empty() error }
BackendQueue represents the behavior for the secondary message storage system
type Channel ¶
Channel represents the concrete type for a NSQ channel (and also implements the Queue interface)
There can be multiple channels per topic, each with there own unique set of subscribers (clients).
Channels maintain all client and message metadata, orchestrating in-flight messages, timeouts, requeuing, etc.
func NewChannel ¶
func NewChannel(topicName string, channelName string, ctx *context, deleteCallback func(*Channel)) *Channel
NewChannel creates a new instance of the Channel type and returns a pointer
func (*Channel) FinishMessage ¶
FinishMessage successfully discards an in-flight message
func (*Channel) PutMessage ¶
PutMessage writes a Message to the queue
func (*Channel) RemoveClient ¶
RemoveClient removes a client from the Channel's client list
func (*Channel) RequeueMessage ¶
RequeueMessage requeues a message based on `time.Duration`, ie:
`timeoutMs` == 0 - requeue a message immediately `timeoutMs` > 0 - asynchronously wait for the specified timeout
and requeue a message (aka "deferred requeue")
func (*Channel) StartDeferredTimeout ¶
func (*Channel) StartInFlightTimeout ¶
func (*Channel) TouchMessage ¶
TouchMessage resets the timeout for an in-flight message
type ChannelStats ¶
type ChannelStats struct { ChannelName string `json:"channel_name"` Depth int64 `json:"depth"` BackendDepth int64 `json:"backend_depth"` InFlightCount int `json:"in_flight_count"` DeferredCount int `json:"deferred_count"` MessageCount uint64 `json:"message_count"` RequeueCount uint64 `json:"requeue_count"` TimeoutCount uint64 `json:"timeout_count"` Clients []ClientStats `json:"clients"` Paused bool `json:"paused"` E2eProcessingLatency *util.PercentileResult `json:"e2e_processing_latency"` }
func NewChannelStats ¶
func NewChannelStats(c *Channel, clients []ClientStats) ChannelStats
type ChannelsByName ¶
type ChannelsByName struct {
Channels
}
func (ChannelsByName) Less ¶
func (c ChannelsByName) Less(i, j int) bool
type ClientStats ¶
type ClientStats struct { // TODO: deprecated, remove in 1.0 Name string `json:"name"` ClientID string `json:"client_id"` Hostname string `json:"hostname"` Version string `json:"version"` RemoteAddress string `json:"remote_address"` State int32 `json:"state"` ReadyCount int64 `json:"ready_count"` InFlightCount int64 `json:"in_flight_count"` MessageCount uint64 `json:"message_count"` FinishCount uint64 `json:"finish_count"` RequeueCount uint64 `json:"requeue_count"` ConnectTime int64 `json:"connect_ts"` SampleRate int32 `json:"sample_rate"` Deflate bool `json:"deflate"` Snappy bool `json:"snappy"` UserAgent string `json:"user_agent"` Authed bool `json:"authed,omitempty"` AuthIdentity string `json:"auth_identity,omitempty"` AuthIdentityURL string `json:"auth_identity_url,omitempty"` TLS bool `json:"tls"` CipherSuite string `json:"tls_cipher_suite"` TLSVersion string `json:"tls_version"` TLSNegotiatedProtocol string `json:"tls_negotiated_protocol"` TLSNegotiatedProtocolIsMutual bool `json:"tls_negotiated_protocol_is_mutual"` }
type Consumer ¶
type Consumer interface { UnPause() Pause() Close() error TimedOutMessage() Stats() ClientStats Empty() }
type Message ¶ added in v0.2.29
type Message struct { ID MessageID Body []byte Timestamp int64 Attempts uint16 // contains filtered or unexported fields }
func NewMessage ¶ added in v0.2.29
type MessageID ¶ added in v0.2.29
type MessageID [MsgIDLength]byte
type NSQD ¶
func (*NSQD) DeleteExistingTopic ¶
DeleteExistingTopic removes a topic only if it exists
func (*NSQD) GetExistingTopic ¶
GetExistingTopic gets a topic only if it exists
func (*NSQD) GetStats ¶
func (n *NSQD) GetStats() []TopicStats
func (*NSQD) GetTopic ¶
GetTopic performs a thread safe operation to return a pointer to a Topic object (potentially new)
func (*NSQD) IsAuthEnabled ¶ added in v0.2.29
func (*NSQD) LoadMetadata ¶
func (n *NSQD) LoadMetadata()
func (*NSQD) PersistMetadata ¶
type Topic ¶
func (*Topic) AggregateChannelE2eProcessingLatency ¶
func (*Topic) DeleteExistingChannel ¶
DeleteExistingChannel removes a channel from the topic only if it exists
func (*Topic) GetChannel ¶
GetChannel performs a thread safe operation to return a pointer to a Channel object (potentially new) for the given Topic
func (*Topic) GetExistingChannel ¶
func (*Topic) PutMessage ¶
PutMessage writes a Message to the queue
func (*Topic) PutMessages ¶
PutMessages writes multiple Messages to the queue
type TopicStats ¶
type TopicStats struct { TopicName string `json:"topic_name"` Channels []ChannelStats `json:"channels"` Depth int64 `json:"depth"` BackendDepth int64 `json:"backend_depth"` MessageCount uint64 `json:"message_count"` Paused bool `json:"paused"` E2eProcessingLatency *util.PercentileResult `json:"e2e_processing_latency"` }
func NewTopicStats ¶
func NewTopicStats(t *Topic, channels []ChannelStats) TopicStats
type TopicsByName ¶
type TopicsByName struct {
Topics
}
func (TopicsByName) Less ¶
func (t TopicsByName) Less(i, j int) bool
type Uint64Slice ¶
type Uint64Slice []uint64
func (Uint64Slice) Len ¶
func (s Uint64Slice) Len() int
func (Uint64Slice) Less ¶
func (s Uint64Slice) Less(i, j int) bool
func (Uint64Slice) Swap ¶
func (s Uint64Slice) Swap(i, j int)