Documentation ¶
Index ¶
- type ChannelStats
- type ChannelStatsByHost
- type ChannelStatsList
- type ClientStats
- type ClientStatsList
- type ClientsByHost
- type ClusterInfo
- func (c *ClusterInfo) CreateTopicChannel(topicName string, channelName string, lookupdHTTPAddrs []string) error
- func (c *ClusterInfo) DeleteChannel(topicName string, channelName string, lookupdHTTPAddrs []string, ...) error
- func (c *ClusterInfo) DeleteTopic(topicName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) error
- func (c *ClusterInfo) EmptyChannel(topicName string, channelName string, lookupdHTTPAddrs []string, ...) error
- func (c *ClusterInfo) EmptyTopic(topicName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) error
- func (c *ClusterInfo) GetLookupdProducers(lookupdHTTPAddrs []string) (Producers, error)
- func (c *ClusterInfo) GetLookupdTopicChannels(topic string, lookupdHTTPAddrs []string) ([]string, error)
- func (c *ClusterInfo) GetLookupdTopicProducers(topic string, lookupdHTTPAddrs []string) (Producers, error)
- func (c *ClusterInfo) GetLookupdTopics(lookupdHTTPAddrs []string) ([]string, error)
- func (c *ClusterInfo) GetNSQDProducers(nsqdHTTPAddrs []string) (Producers, error)
- func (c *ClusterInfo) GetNSQDStats(producers Producers, selectedTopic string, selectedChannel string, ...) ([]*TopicStats, map[string]*ChannelStats, error)
- func (c *ClusterInfo) GetNSQDTopicProducers(topic string, nsqdHTTPAddrs []string) (Producers, error)
- func (c *ClusterInfo) GetNSQDTopics(nsqdHTTPAddrs []string) ([]string, error)
- func (c *ClusterInfo) GetProducers(lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) (Producers, error)
- func (c *ClusterInfo) GetTopicProducers(topicName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) (Producers, error)
- func (c *ClusterInfo) GetVersion(addr string) (semver.Version, error)
- func (c *ClusterInfo) PauseChannel(topicName string, channelName string, lookupdHTTPAddrs []string, ...) error
- func (c *ClusterInfo) PauseTopic(topicName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) error
- func (c *ClusterInfo) TombstoneNodeForTopic(topic string, node string, lookupdHTTPAddrs []string) error
- func (c *ClusterInfo) UnPauseChannel(topicName string, channelName string, lookupdHTTPAddrs []string, ...) error
- func (c *ClusterInfo) UnPauseTopic(topicName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) error
- type ErrList
- type PartialErr
- type Producer
- type ProducerTopic
- type ProducerTopics
- type Producers
- type ProducersByHost
- type TopicStats
- type TopicStatsByHost
- type TopicStatsList
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ChannelStats ¶
type ChannelStats struct { Node string `json:"node"` Hostname string `json:"hostname"` TopicName string `json:"topic_name"` ChannelName string `json:"channel_name"` Depth int64 `json:"depth"` MemoryDepth int64 `json:"memory_depth"` BackendDepth int64 `json:"backend_depth"` InFlightCount int64 `json:"in_flight_count"` DeferredCount int64 `json:"deferred_count"` RequeueCount int64 `json:"requeue_count"` TimeoutCount int64 `json:"timeout_count"` MessageCount int64 `json:"message_count"` ClientCount int `json:"client_count"` Selected bool `json:"-"` NodeStats []*ChannelStats `json:"nodes"` Clients []*ClientStats `json:"clients"` Paused bool `json:"paused"` E2eProcessingLatency *quantile.E2eProcessingLatencyAggregate `json:"e2e_processing_latency"` }
func (*ChannelStats) Add ¶
func (c *ChannelStats) Add(a *ChannelStats)
type ChannelStatsByHost ¶
type ChannelStatsByHost struct {
ChannelStatsList
}
func (ChannelStatsByHost) Less ¶
func (c ChannelStatsByHost) Less(i, j int) bool
type ChannelStatsList ¶
type ChannelStatsList []*ChannelStats
func (ChannelStatsList) Len ¶
func (c ChannelStatsList) Len() int
func (ChannelStatsList) Swap ¶
func (c ChannelStatsList) Swap(i, j int)
type ClientStats ¶
type ClientStats struct { Node string `json:"node"` RemoteAddress string `json:"remote_address"` Version string `json:"version"` ClientID string `json:"client_id"` Hostname string `json:"hostname"` UserAgent string `json:"user_agent"` ConnectTs int64 `json:"connect_ts"` ConnectedDuration time.Duration `json:"connected"` InFlightCount int `json:"in_flight_count"` ReadyCount int `json:"ready_count"` FinishCount int64 `json:"finish_count"` RequeueCount int64 `json:"requeue_count"` MessageCount int64 `json:"message_count"` SampleRate int32 `json:"sample_rate"` Deflate bool `json:"deflate"` Snappy bool `json:"snappy"` Authed bool `json:"authed"` AuthIdentity string `json:"auth_identity"` AuthIdentityURL string `json:"auth_identity_url"` TLS bool `json:"tls"` CipherSuite string `json:"tls_cipher_suite"` TLSVersion string `json:"tls_version"` TLSNegotiatedProtocol string `json:"tls_negotiated_protocol"` TLSNegotiatedProtocolIsMutual bool `json:"tls_negotiated_protocol_is_mutual"` }
func (*ClientStats) HasSampleRate ¶
func (s *ClientStats) HasSampleRate() bool
func (*ClientStats) HasUserAgent ¶
func (s *ClientStats) HasUserAgent() bool
func (*ClientStats) UnmarshalJSON ¶
func (s *ClientStats) UnmarshalJSON(b []byte) error
UnmarshalJSON implements json.Unmarshaler and postprocesses ConnectedDuration
type ClientStatsList ¶
type ClientStatsList []*ClientStats
func (ClientStatsList) Len ¶
func (c ClientStatsList) Len() int
func (ClientStatsList) Swap ¶
func (c ClientStatsList) Swap(i, j int)
type ClientsByHost ¶
type ClientsByHost struct {
ClientStatsList
}
func (ClientsByHost) Less ¶
func (c ClientsByHost) Less(i, j int) bool
type ClusterInfo ¶
type ClusterInfo struct {
// contains filtered or unexported fields
}
func New ¶
func New(log lg.AppLogFunc, client *http_api.Client) *ClusterInfo
func (*ClusterInfo) CreateTopicChannel ¶
func (c *ClusterInfo) CreateTopicChannel(topicName string, channelName string, lookupdHTTPAddrs []string) error
func (*ClusterInfo) DeleteChannel ¶
func (*ClusterInfo) DeleteTopic ¶
func (c *ClusterInfo) DeleteTopic(topicName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) error
func (*ClusterInfo) EmptyChannel ¶
func (*ClusterInfo) EmptyTopic ¶
func (c *ClusterInfo) EmptyTopic(topicName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) error
func (*ClusterInfo) GetLookupdProducers ¶
func (c *ClusterInfo) GetLookupdProducers(lookupdHTTPAddrs []string) (Producers, error)
GetLookupdProducers returns Producers of all the nsqd connected to the given lookupds
func (*ClusterInfo) GetLookupdTopicChannels ¶
func (c *ClusterInfo) GetLookupdTopicChannels(topic string, lookupdHTTPAddrs []string) ([]string, error)
GetLookupdTopicChannels returns a []string containing a union of all the channels from all the given lookupd for the given topic
func (*ClusterInfo) GetLookupdTopicProducers ¶
func (c *ClusterInfo) GetLookupdTopicProducers(topic string, lookupdHTTPAddrs []string) (Producers, error)
GetLookupdTopicProducers returns Producers of all the nsqd for a given topic by unioning the nodes returned from the given lookupd
func (*ClusterInfo) GetLookupdTopics ¶
func (c *ClusterInfo) GetLookupdTopics(lookupdHTTPAddrs []string) ([]string, error)
GetLookupdTopics returns a []string containing a union of all the topics from all the given nsqlookupd
func (*ClusterInfo) GetNSQDProducers ¶
func (c *ClusterInfo) GetNSQDProducers(nsqdHTTPAddrs []string) (Producers, error)
GetNSQDProducers returns Producers of all the given nsqd
func (*ClusterInfo) GetNSQDStats ¶
func (c *ClusterInfo) GetNSQDStats(producers Producers, selectedTopic string, selectedChannel string, includeClients bool) ([]*TopicStats, map[string]*ChannelStats, error)
GetNSQDStats returns aggregate topic and channel stats from the given Producers
if selectedChannel is empty, this will return stats for topic/channel if selectedTopic is empty, this will return stats for *all* topic/channels if includeClients is false, this will *not* return client stats for channels and the ChannelStats dict will be keyed by topic + ':' + channel
func (*ClusterInfo) GetNSQDTopicProducers ¶
func (c *ClusterInfo) GetNSQDTopicProducers(topic string, nsqdHTTPAddrs []string) (Producers, error)
GetNSQDTopicProducers returns Producers containing the addresses of all the nsqd that produce the given topic
func (*ClusterInfo) GetNSQDTopics ¶
func (c *ClusterInfo) GetNSQDTopics(nsqdHTTPAddrs []string) ([]string, error)
GetNSQDTopics returns a []string containing all the topics produced by the given nsqd
func (*ClusterInfo) GetProducers ¶
func (c *ClusterInfo) GetProducers(lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) (Producers, error)
func (*ClusterInfo) GetTopicProducers ¶
func (*ClusterInfo) GetVersion ¶
func (c *ClusterInfo) GetVersion(addr string) (semver.Version, error)
GetVersion returns a semver.Version object by querying /info
func (*ClusterInfo) PauseChannel ¶
func (*ClusterInfo) PauseTopic ¶
func (c *ClusterInfo) PauseTopic(topicName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) error
func (*ClusterInfo) TombstoneNodeForTopic ¶
func (c *ClusterInfo) TombstoneNodeForTopic(topic string, node string, lookupdHTTPAddrs []string) error
TombstoneNodeForTopic tombstones the given node for the given topic on all the given nsqlookupd and deletes the topic from the node
func (*ClusterInfo) UnPauseChannel ¶
func (*ClusterInfo) UnPauseTopic ¶
func (c *ClusterInfo) UnPauseTopic(topicName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) error
type PartialErr ¶
type Producer ¶
type Producer struct { RemoteAddresses []string `json:"remote_addresses"` RemoteAddress string `json:"remote_address"` Hostname string `json:"hostname"` BroadcastAddress string `json:"broadcast_address"` TCPPort int `json:"tcp_port"` HTTPPort int `json:"http_port"` Version string `json:"version"` VersionObj semver.Version `json:"-"` Topics ProducerTopics `json:"topics"` OutOfDate bool `json:"out_of_date"` }
func (*Producer) HTTPAddress ¶
func (*Producer) IsInconsistent ¶
IsInconsistent checks for cases where an unexpected number of nsqd connections are reporting the same information to nsqlookupd (ie: multiple instances are using the same broadcast address), or cases where some nsqd are not reporting to all nsqlookupd.
func (*Producer) TCPAddress ¶
func (*Producer) UnmarshalJSON ¶
UnmarshalJSON implements json.Unmarshaler and postprocesses of ProducerTopics and VersionObj
type ProducerTopic ¶
type ProducerTopics ¶
type ProducerTopics []ProducerTopic
func (ProducerTopics) Len ¶
func (pt ProducerTopics) Len() int
func (ProducerTopics) Less ¶
func (pt ProducerTopics) Less(i, j int) bool
func (ProducerTopics) Swap ¶
func (pt ProducerTopics) Swap(i, j int)
type ProducersByHost ¶
type ProducersByHost struct {
Producers
}
func (ProducersByHost) Less ¶
func (c ProducersByHost) Less(i, j int) bool
type TopicStats ¶
type TopicStats struct { Node string `json:"node"` Hostname string `json:"hostname"` TopicName string `json:"topic_name"` Depth int64 `json:"depth"` MemoryDepth int64 `json:"memory_depth"` BackendDepth int64 `json:"backend_depth"` MessageCount int64 `json:"message_count"` NodeStats []*TopicStats `json:"nodes"` Channels []*ChannelStats `json:"channels"` Paused bool `json:"paused"` E2eProcessingLatency *quantile.E2eProcessingLatencyAggregate `json:"e2e_processing_latency"` }
func (*TopicStats) Add ¶
func (t *TopicStats) Add(a *TopicStats)
type TopicStatsByHost ¶
type TopicStatsByHost struct {
TopicStatsList
}
func (TopicStatsByHost) Less ¶
func (c TopicStatsByHost) Less(i, j int) bool
type TopicStatsList ¶
type TopicStatsList []*TopicStats
func (TopicStatsList) Len ¶
func (t TopicStatsList) Len() int
func (TopicStatsList) Swap ¶
func (t TopicStatsList) Swap(i, j int)