Documentation ¶
Index ¶
- Variables
- type CatchupStat
- type ChannelStats
- type ChannelStatsByPartAndHost
- type ChannelStatsList
- type ClientPubStats
- type ClientStats
- type ClientStatsList
- type ClientsByHost
- type ClusterInfo
- func (c *ClusterInfo) CreateTopic(topicName string, partitionNum int, replica int, syncDisk int, ...) error
- func (c *ClusterInfo) CreateTopicChannel(topicName string, channelName string, lookupdHTTPAddrs []string) error
- func (c *ClusterInfo) CreateTopicChannelAfterTopicCreation(topicName string, channelName string, lookupdHTTPAddrs []string, ...) error
- func (c *ClusterInfo) DeleteChannel(topicName string, channelName string, lookupdHTTPAddrs []string, ...) error
- func (c *ClusterInfo) DeleteTopic(topicName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) error
- func (c *ClusterInfo) EmptyChannel(topicName string, channelName string, lookupdHTTPAddrs []string, ...) error
- func (c *ClusterInfo) EmptyTopic(topicName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) error
- func (c *ClusterInfo) GetClusterInfo(lookupdAdresses []string) (*ClusterNodeInfo, error)
- func (c *ClusterInfo) GetLookupdProducers(lookupdHTTPAddrs []string) (Producers, error)
- func (c *ClusterInfo) GetLookupdTopicChannels(topic string, lookupdHTTPAddrs []string) ([]string, error)
- func (c *ClusterInfo) GetLookupdTopicProducers(topic string, lookupdHTTPAddrs []string) (Producers, map[string]Producers, error)
- func (c *ClusterInfo) GetLookupdTopics(lookupdHTTPAddrs []string) ([]string, error)
- func (c *ClusterInfo) GetLookupdTopicsMeta(lookupdHTTPAddrs []string, metaInfo bool) ([]*TopicInfo, error)
- func (c *ClusterInfo) GetNSQDAllMessageHistoryStats(producers Producers) (map[string]int64, error)
- func (c *ClusterInfo) GetNSQDCoordStats(producers Producers, selectedTopic string, part string) (*CoordStats, error)
- func (c *ClusterInfo) GetNSQDMessageByID(p Producer, selectedTopic string, part string, msgID int64) (string, int64, error)
- func (c *ClusterInfo) GetNSQDMessageHistoryStats(nsqdHTTPAddr string, selectedTopic string, par string) ([]int64, error)
- func (c *ClusterInfo) GetNSQDProducers(nsqdHTTPAddrs []string) (Producers, error)
- func (c *ClusterInfo) GetNSQDStats(producers Producers, selectedTopic string, sortBy string, leaderOnly bool) ([]*TopicStats, map[string]*ChannelStats, error)
- func (c *ClusterInfo) GetNSQDTopicProducers(topic string, nsqdHTTPAddrs []string) (Producers, error)
- func (c *ClusterInfo) GetNSQDTopics(nsqdHTTPAddrs []string) ([]*TopicInfo, error)
- func (c *ClusterInfo) GetProducers(lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) (Producers, error)
- func (c *ClusterInfo) GetTopicProducers(topicName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) (Producers, map[string]Producers, error)
- func (c *ClusterInfo) GetVersion(addr string) (semver.Version, error)
- func (c *ClusterInfo) ListAllLookupdNodes(lookupdHTTPAddrs []string) (*LookupdNodes, error)
- func (c *ClusterInfo) PauseChannel(topicName string, channelName string, lookupdHTTPAddrs []string, ...) error
- func (c *ClusterInfo) PauseTopic(topicName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) error
- func (c *ClusterInfo) ResetChannel(topicName string, channelName string, lookupdHTTPAddrs []string, ...) error
- func (c *ClusterInfo) SkipChannel(topicName string, channelName string, lookupdHTTPAddrs []string, ...) error
- func (c *ClusterInfo) TombstoneNodeForTopic(topic string, node string, lookupdHTTPAddrs []string) error
- func (c *ClusterInfo) UnPauseChannel(topicName string, channelName string, lookupdHTTPAddrs []string, ...) error
- func (c *ClusterInfo) UnPauseTopic(topicName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) error
- func (c *ClusterInfo) UnSkipChannel(topicName string, channelName string, lookupdHTTPAddrs []string, ...) error
- type ClusterNodeInfo
- type CoordStats
- type ErrList
- type ISRStat
- type LookupPeer
- type LookupdNodes
- type MessageHistoryStat
- type NodeHourlyPubsize
- type NodeStat
- type NsqLookupdNodeInfo
- type PartialErr
- type Producer
- type ProducerTopic
- type ProducerTopics
- type Producers
- type ProducersByHost
- type TopicCoordStat
- type TopicInfo
- type TopicInfoSortByName
- type TopicMsgStatsInfo
- type TopicStats
- type TopicStatsByChannelDepth
- type TopicStatsByHourlyPubsize
- type TopicStatsByMessageCount
- type TopicStatsByPartitionAndHost
- type TopicStatsList
Constants ¶
This section is empty.
Variables ¶
var INDEX = int32(0)
Functions ¶
This section is empty.
Types ¶
type CatchupStat ¶
type ChannelStats ¶
type ChannelStats struct { Node string `json:"node"` Hostname string `json:"hostname"` TopicName string `json:"topic_name"` TopicPartition string `json:"topic_partition"` StatsdName string `json:"statsd_name"` ChannelName string `json:"channel_name"` Depth int64 `json:"depth"` DepthSize int64 `json:"depth_size"` DepthTimestamp string `json:"depth_ts"` MemoryDepth int64 `json:"memory_depth"` BackendDepth int64 `json:"backend_depth"` InFlightCount int64 `json:"in_flight_count"` DeferredCount int64 `json:"deferred_count"` RequeueCount int64 `json:"requeue_count"` TimeoutCount int64 `json:"timeout_count"` MessageCount int64 `json:"message_count"` DelayedQueueCount uint64 `json:"delayed_queue_count"` DelayedQueueRecent string `json:"delayed_queue_recent"` ClientCount int `json:"-"` Selected bool `json:"-"` NodeStats []*ChannelStats `json:"nodes"` Clients []*ClientStats `json:"clients"` Paused bool `json:"paused"` Skipped bool `json:"skipped"` IsMultiOrdered bool `json:"is_multi_ordered"` IsExt bool `json:"is_ext"` MsgConsumeLatencyStats []int64 `json:"msg_consume_latency_stats"` MsgDeliveryLatencyStats []int64 `json:"msg_delivery_latency_stats"` E2eProcessingLatency *quantile.E2eProcessingLatencyAggregate `json:"e2e_processing_latency"` //indicate whether current channel is ths only channel under topic OnlyChannel bool `json:"only_channel"` }
func (*ChannelStats) Add ¶
func (c *ChannelStats) Add(a *ChannelStats)
type ChannelStatsByPartAndHost ¶
type ChannelStatsByPartAndHost struct {
ChannelStatsList
}
func (ChannelStatsByPartAndHost) Less ¶
func (c ChannelStatsByPartAndHost) Less(i, j int) bool
type ChannelStatsList ¶
type ChannelStatsList []*ChannelStats
func (ChannelStatsList) Len ¶
func (c ChannelStatsList) Len() int
func (ChannelStatsList) Swap ¶
func (c ChannelStatsList) Swap(i, j int)
type ClientPubStats ¶
type ClientStats ¶
type ClientStats struct { Node string `json:"node"` RemoteAddress string `json:"remote_address"` Name string `json:"name"` // TODO: deprecated, remove in 1.0 Version string `json:"version"` ClientID string `json:"client_id"` Hostname string `json:"hostname"` UserAgent string `json:"user_agent"` ConnectTs int64 `json:"connect_ts"` ConnectedDuration time.Duration `json:"connected"` InFlightCount int `json:"in_flight_count"` ReadyCount int `json:"ready_count"` FinishCount int64 `json:"finish_count"` RequeueCount int64 `json:"requeue_count"` MessageCount int64 `json:"message_count"` TimeoutCount int64 `json:"timeout_count"` DeferredCount int64 `json:"deferred_count"` SampleRate int32 `json:"sample_rate"` Deflate bool `json:"deflate"` Snappy bool `json:"snappy"` Authed bool `json:"authed"` AuthIdentity string `json:"auth_identity"` AuthIdentityURL string `json:"auth_identity_url"` DesiredTag string `json:"desired_tag"` TLS bool `json:"tls"` CipherSuite string `json:"tls_cipher_suite"` TLSVersion string `json:"tls_version"` TLSNegotiatedProtocol string `json:"tls_negotiated_protocol"` TLSNegotiatedProtocolIsMutual bool `json:"tls_negotiated_protocol_is_mutual"` }
func (*ClientStats) HasSampleRate ¶
func (c *ClientStats) HasSampleRate() bool
func (*ClientStats) HasUserAgent ¶
func (c *ClientStats) HasUserAgent() bool
func (*ClientStats) UnmarshalJSON ¶
func (s *ClientStats) UnmarshalJSON(b []byte) error
UnmarshalJSON implements json.Unmarshaler and postprocesses ConnectedDuration
type ClientStatsList ¶
type ClientStatsList []*ClientStats
func (ClientStatsList) Len ¶
func (c ClientStatsList) Len() int
func (ClientStatsList) Swap ¶
func (c ClientStatsList) Swap(i, j int)
type ClientsByHost ¶
type ClientsByHost struct {
ClientStatsList
}
func (ClientsByHost) Less ¶
func (c ClientsByHost) Less(i, j int) bool
type ClusterInfo ¶
type ClusterInfo struct {
// contains filtered or unexported fields
}
func New ¶
func New(log logger, client *http_api.Client) *ClusterInfo
func (*ClusterInfo) CreateTopic ¶
func (*ClusterInfo) CreateTopicChannel ¶
func (c *ClusterInfo) CreateTopicChannel(topicName string, channelName string, lookupdHTTPAddrs []string) error
func (*ClusterInfo) CreateTopicChannelAfterTopicCreation ¶
func (*ClusterInfo) DeleteChannel ¶
func (*ClusterInfo) DeleteTopic ¶
func (c *ClusterInfo) DeleteTopic(topicName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) error
this will delete all partitions of topic on all nsqd node.
func (*ClusterInfo) EmptyChannel ¶
func (*ClusterInfo) EmptyTopic ¶
func (c *ClusterInfo) EmptyTopic(topicName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) error
func (*ClusterInfo) GetClusterInfo ¶
func (c *ClusterInfo) GetClusterInfo(lookupdAdresses []string) (*ClusterNodeInfo, error)
func (*ClusterInfo) GetLookupdProducers ¶
func (c *ClusterInfo) GetLookupdProducers(lookupdHTTPAddrs []string) (Producers, error)
GetLookupdProducers returns Producers of all the nsqd connected to the given lookupds
func (*ClusterInfo) GetLookupdTopicChannels ¶
func (c *ClusterInfo) GetLookupdTopicChannels(topic string, lookupdHTTPAddrs []string) ([]string, error)
GetLookupdTopicChannels returns a []string containing a union of all the channels from all the given lookupd for the given topic
func (*ClusterInfo) GetLookupdTopicProducers ¶
func (c *ClusterInfo) GetLookupdTopicProducers(topic string, lookupdHTTPAddrs []string) (Producers, map[string]Producers, error)
GetLookupdTopicProducers returns Producers of all the nsqd for a given topic by unioning the nodes returned from the given lookupd
func (*ClusterInfo) GetLookupdTopics ¶
func (c *ClusterInfo) GetLookupdTopics(lookupdHTTPAddrs []string) ([]string, error)
GetLookupdTopics returns a []string containing a union of all the topics from all the given nsqlookupd
func (*ClusterInfo) GetLookupdTopicsMeta ¶
func (c *ClusterInfo) GetLookupdTopicsMeta(lookupdHTTPAddrs []string, metaInfo bool) ([]*TopicInfo, error)
func (*ClusterInfo) GetNSQDAllMessageHistoryStats ¶
func (c *ClusterInfo) GetNSQDAllMessageHistoryStats(producers Producers) (map[string]int64, error)
func (*ClusterInfo) GetNSQDCoordStats ¶
func (c *ClusterInfo) GetNSQDCoordStats(producers Producers, selectedTopic string, part string) (*CoordStats, error)
func (*ClusterInfo) GetNSQDMessageByID ¶
func (*ClusterInfo) GetNSQDMessageHistoryStats ¶
func (*ClusterInfo) GetNSQDProducers ¶
func (c *ClusterInfo) GetNSQDProducers(nsqdHTTPAddrs []string) (Producers, error)
GetNSQDProducers returns Producers of all the given nsqd
func (*ClusterInfo) GetNSQDStats ¶
func (c *ClusterInfo) GetNSQDStats(producers Producers, selectedTopic string, sortBy string, leaderOnly bool) ([]*TopicStats, map[string]*ChannelStats, error)
GetNSQDStats returns aggregate topic and channel stats from the given Producers
if selectedTopic is empty, this will return stats for *all* topic/channels and the ChannelStats dict will be keyed by topic + ':' + channel
func (*ClusterInfo) GetNSQDTopicProducers ¶
func (c *ClusterInfo) GetNSQDTopicProducers(topic string, nsqdHTTPAddrs []string) (Producers, error)
GetNSQDTopicProducers returns Producers containing the addresses of all the nsqd that produce the given topic
func (*ClusterInfo) GetNSQDTopics ¶
func (c *ClusterInfo) GetNSQDTopics(nsqdHTTPAddrs []string) ([]*TopicInfo, error)
GetNSQDTopics returns a []string containing all the topics produced by the given nsqd
func (*ClusterInfo) GetProducers ¶
func (c *ClusterInfo) GetProducers(lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) (Producers, error)
func (*ClusterInfo) GetTopicProducers ¶
func (*ClusterInfo) GetVersion ¶
func (c *ClusterInfo) GetVersion(addr string) (semver.Version, error)
GetVersion returns a semver.Version object by querying /info
func (*ClusterInfo) ListAllLookupdNodes ¶
func (c *ClusterInfo) ListAllLookupdNodes(lookupdHTTPAddrs []string) (*LookupdNodes, error)
func (*ClusterInfo) PauseChannel ¶
func (*ClusterInfo) PauseTopic ¶
func (c *ClusterInfo) PauseTopic(topicName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) error
func (*ClusterInfo) ResetChannel ¶
func (*ClusterInfo) SkipChannel ¶
func (*ClusterInfo) TombstoneNodeForTopic ¶
func (c *ClusterInfo) TombstoneNodeForTopic(topic string, node string, lookupdHTTPAddrs []string) error
TombstoneNodeForTopic tombstones the given node for the given topic on all the given nsqlookupd and deletes the topic from the node
func (*ClusterInfo) UnPauseChannel ¶
type ClusterNodeInfo ¶
type CoordStats ¶
type CoordStats struct { RpcStats *gorpc.ConnStats `json:"rpc_stats"` TopicCoordStats []TopicCoordStat `json:"topic_coord_stats"` }
type ISRStat ¶
type LookupPeer ¶
type LookupPeer struct { Info peerInfo // contains filtered or unexported fields }
lookupPeer is a low-level type for connecting/reading/writing to nsqlookupd
A lookupPeer instance is designed to connect lazily to nsqlookupd and reconnect gracefully (i.e. it is all handled by the library). Clients can simply use the Command interface to perform a round-trip.
func NewLookupPeer ¶
func NewLookupPeer(addr string, maxBodySize int64, l levellogger.Logger, connectCallback func(*LookupPeer)) *LookupPeer
newLookupPeer creates a new lookupPeer instance connecting to the supplied address.
The supplied connectCallback will be called *every* time the instance connects.
func (*LookupPeer) Close ¶
func (lp *LookupPeer) Close() error
Close implements the io.Closer interface
func (*LookupPeer) Command ¶
func (lp *LookupPeer) Command(cmd *nsq.Command) ([]byte, error)
Command performs a round-trip for the specified Command.
It will lazily connect to nsqlookupd and gracefully handle reconnecting in the event of a failure.
It returns the response from nsqlookupd as []byte
func (*LookupPeer) Connect ¶
func (lp *LookupPeer) Connect() error
Connect will Dial the specified address, with timeouts
func (*LookupPeer) Read ¶
func (lp *LookupPeer) Read(data []byte) (int, error)
Read implements the io.Reader interface, adding deadlines
func (*LookupPeer) String ¶
func (lp *LookupPeer) String() string
String returns the specified address
func (*LookupPeer) Write ¶
func (lp *LookupPeer) Write(data []byte) (int, error)
Write implements the io.Writer interface, adding deadlines
type LookupdNodes ¶
type LookupdNodes struct { LeaderNode NsqLookupdNodeInfo `json:"lookupdleader"` AllNodes []NsqLookupdNodeInfo `json:"lookupdnodes"` }
type NodeHourlyPubsize ¶
type NodeStat ¶
type NsqLookupdNodeInfo ¶
type Producer ¶
type Producer struct { RemoteAddresses []string `json:"remote_addresses"` RemoteAddress string `json:"remote_address"` Hostname string `json:"hostname"` BroadcastAddress string `json:"broadcast_address"` TCPPort int `json:"tcp_port"` HTTPPort int `json:"http_port"` Version string `json:"version"` VersionObj semver.Version `json:"-"` Topics ProducerTopics `json:"topics"` OutOfDate bool `json:"out_of_date"` }
func (*Producer) IsInconsistent ¶
IsInconsistent checks for cases where an unexpected number of nsqd connections are reporting the same information to nsqlookupd (ie: multiple instances are using the same broadcast address), or cases where some nsqd are not reporting to all nsqlookupd.
type ProducerTopic ¶
type ProducerTopics ¶
type ProducerTopics []ProducerTopic
func (ProducerTopics) Len ¶
func (pt ProducerTopics) Len() int
func (ProducerTopics) Less ¶
func (pt ProducerTopics) Less(i, j int) bool
func (ProducerTopics) Swap ¶
func (pt ProducerTopics) Swap(i, j int)
type ProducersByHost ¶
type ProducersByHost struct {
Producers
}
func (ProducersByHost) Less ¶
func (c ProducersByHost) Less(i, j int) bool
type TopicCoordStat ¶
type TopicCoordStat struct { Node string `json:"node"` Name string `json:"name"` Partition int `json:"partition"` ISRStats []ISRStat `json:"isr_stats"` CatchupStats []CatchupStat `json:"catchup_stats"` }
type TopicInfo ¶
type TopicInfoSortByName ¶
type TopicInfoSortByName []*TopicInfo
func (TopicInfoSortByName) Len ¶
func (c TopicInfoSortByName) Len() int
func (TopicInfoSortByName) Less ¶
func (c TopicInfoSortByName) Less(i, j int) bool
func (TopicInfoSortByName) Swap ¶
func (c TopicInfoSortByName) Swap(i, j int)
type TopicMsgStatsInfo ¶
type TopicStats ¶
type TopicStats struct { Node string `json:"node"` Hostname string `json:"hostname"` TopicName string `json:"topic_name"` TopicPartition string `json:"topic_partition"` StatsdName string `json:"statsd_name"` IsLeader bool `json:"is_leader"` IsMultiOrdered bool `json:"is_multi_ordered"` IsExt bool `json:"is_ext"` SyncingNum int `json:"syncing_num"` ISRStats []ISRStat `json:"isr_stats"` CatchupStats []CatchupStat `json:"catchup_stats"` Depth int64 `json:"depth"` MemoryDepth int64 `json:"memory_depth"` // the queue maybe auto cleaned, so the start means the queue oldest offset. BackendStart int64 `json:"backend_start"` BackendDepth int64 `json:"backend_depth"` MessageCount int64 `json:"message_count"` NodeStats []*TopicStats `json:"nodes"` Channels []*ChannelStats `json:"channels"` TotalChannelDepth int64 `json:"total_channel_depth"` Paused bool `json:"paused"` HourlyPubSize int64 `json:"hourly_pubsize"` PartitionHourlyPubSize []int64 `json:"partition_hourly_pubsize"` Clients []ClientPubStats `json:"client_pub_stats"` MessageSizeStats [16]int64 `json:"msg_size_stats"` MessageLatencyStats [16]int64 `json:"msg_write_latency_stats"` E2eProcessingLatency *quantile.E2eProcessingLatencyAggregate `json:"e2e_processing_latency"` }
func (*TopicStats) Add ¶
func (t *TopicStats) Add(a *TopicStats)
type TopicStatsByChannelDepth ¶
type TopicStatsByChannelDepth struct {
TopicStatsList
}
func (TopicStatsByChannelDepth) Less ¶
func (c TopicStatsByChannelDepth) Less(i, j int) bool
type TopicStatsByHourlyPubsize ¶
type TopicStatsByHourlyPubsize struct {
TopicStatsList
}
func (TopicStatsByHourlyPubsize) Less ¶
func (c TopicStatsByHourlyPubsize) Less(i, j int) bool
type TopicStatsByMessageCount ¶
type TopicStatsByMessageCount struct {
TopicStatsList
}
func (TopicStatsByMessageCount) Less ¶
func (c TopicStatsByMessageCount) Less(i, j int) bool
type TopicStatsByPartitionAndHost ¶
type TopicStatsByPartitionAndHost struct {
TopicStatsList
}
func (TopicStatsByPartitionAndHost) Less ¶
func (c TopicStatsByPartitionAndHost) Less(i, j int) bool
type TopicStatsList ¶
type TopicStatsList []*TopicStats
func (TopicStatsList) Len ¶
func (t TopicStatsList) Len() int
func (TopicStatsList) Swap ¶
func (t TopicStatsList) Swap(i, j int)