clusterinfo

package
v0.3.7-HA.1.6.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 19, 2018 License: MIT Imports: 21 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var INDEX = int32(0)

Functions

This section is empty.

Types

type CatchupStat

type CatchupStat struct {
	HostName string `json:"hostname"`
	NodeID   string `json:"node_id"`
	Progress int    `json:"progress"`
}

type ChannelStats

type ChannelStats struct {
	Node                    string                                  `json:"node"`
	Hostname                string                                  `json:"hostname"`
	TopicName               string                                  `json:"topic_name"`
	TopicPartition          string                                  `json:"topic_partition"`
	StatsdName              string                                  `json:"statsd_name"`
	ChannelName             string                                  `json:"channel_name"`
	Depth                   int64                                   `json:"depth"`
	DepthSize               int64                                   `json:"depth_size"`
	DepthTimestamp          string                                  `json:"depth_ts"`
	MemoryDepth             int64                                   `json:"memory_depth"`
	BackendDepth            int64                                   `json:"backend_depth"`
	InFlightCount           int64                                   `json:"in_flight_count"`
	DeferredCount           int64                                   `json:"deferred_count"`
	RequeueCount            int64                                   `json:"requeue_count"`
	TimeoutCount            int64                                   `json:"timeout_count"`
	MessageCount            int64                                   `json:"message_count"`
	DelayedQueueCount       uint64                                  `json:"delayed_queue_count"`
	DelayedQueueRecent      string                                  `json:"delayed_queue_recent"`
	ClientCount             int                                     `json:"-"`
	Selected                bool                                    `json:"-"`
	NodeStats               []*ChannelStats                         `json:"nodes"`
	Clients                 []*ClientStats                          `json:"clients"`
	Paused                  bool                                    `json:"paused"`
	Skipped                 bool                                    `json:"skipped"`
	IsMultiOrdered          bool                                    `json:"is_multi_ordered"`
	IsExt                   bool                                    `json:"is_ext"`
	MsgConsumeLatencyStats  []int64                                 `json:"msg_consume_latency_stats"`
	MsgDeliveryLatencyStats []int64                                 `json:"msg_delivery_latency_stats"`
	E2eProcessingLatency    *quantile.E2eProcessingLatencyAggregate `json:"e2e_processing_latency"`
}

func (*ChannelStats) Add

func (c *ChannelStats) Add(a *ChannelStats)

type ChannelStatsByPartAndHost

type ChannelStatsByPartAndHost struct {
	ChannelStatsList
}

func (ChannelStatsByPartAndHost) Less

func (c ChannelStatsByPartAndHost) Less(i, j int) bool

type ChannelStatsList

type ChannelStatsList []*ChannelStats

func (ChannelStatsList) Len

func (c ChannelStatsList) Len() int

func (ChannelStatsList) Swap

func (c ChannelStatsList) Swap(i, j int)

type ClientPubStats

type ClientPubStats struct {
	RemoteAddress string `json:"remote_address"`
	UserAgent     string `json:"user_agent"`
	Protocol      string `json:"protocol"`
	PubCount      int64  `json:"pub_count"`
	ErrCount      int64  `json:"err_count"`
	LastPubTs     int64  `json:"last_pub_ts"`
}

type ClientStats

type ClientStats struct {
	Node              string        `json:"node"`
	RemoteAddress     string        `json:"remote_address"`
	Name              string        `json:"name"` // TODO: deprecated, remove in 1.0
	Version           string        `json:"version"`
	ClientID          string        `json:"client_id"`
	Hostname          string        `json:"hostname"`
	UserAgent         string        `json:"user_agent"`
	ConnectTs         int64         `json:"connect_ts"`
	ConnectedDuration time.Duration `json:"connected"`
	InFlightCount     int           `json:"in_flight_count"`
	ReadyCount        int           `json:"ready_count"`
	FinishCount       int64         `json:"finish_count"`
	RequeueCount      int64         `json:"requeue_count"`
	MessageCount      int64         `json:"message_count"`
	TimeoutCount      int64         `json:"timeout_count"`
	DeferredCount     int64         `json:"deferred_count"`
	SampleRate        int32         `json:"sample_rate"`
	Deflate           bool          `json:"deflate"`
	Snappy            bool          `json:"snappy"`
	Authed            bool          `json:"authed"`
	AuthIdentity      string        `json:"auth_identity"`
	AuthIdentityURL   string        `json:"auth_identity_url"`

	DesiredTag string `json:"desired_tag"`

	TLS                           bool   `json:"tls"`
	CipherSuite                   string `json:"tls_cipher_suite"`
	TLSVersion                    string `json:"tls_version"`
	TLSNegotiatedProtocol         string `json:"tls_negotiated_protocol"`
	TLSNegotiatedProtocolIsMutual bool   `json:"tls_negotiated_protocol_is_mutual"`
}

func (*ClientStats) HasSampleRate

func (c *ClientStats) HasSampleRate() bool

func (*ClientStats) HasUserAgent

func (c *ClientStats) HasUserAgent() bool

func (*ClientStats) UnmarshalJSON

func (s *ClientStats) UnmarshalJSON(b []byte) error

UnmarshalJSON implements json.Unmarshaler and postprocesses ConnectedDuration

type ClientStatsList

type ClientStatsList []*ClientStats

func (ClientStatsList) Len

func (c ClientStatsList) Len() int

func (ClientStatsList) Swap

func (c ClientStatsList) Swap(i, j int)

type ClientsByHost

type ClientsByHost struct {
	ClientStatsList
}

func (ClientsByHost) Less

func (c ClientsByHost) Less(i, j int) bool

type ClusterInfo

type ClusterInfo struct {
	// contains filtered or unexported fields
}

func New

func New(log logger, client *http_api.Client) *ClusterInfo

func (*ClusterInfo) CreateTopic

func (c *ClusterInfo) CreateTopic(topicName string, partitionNum int, replica int, syncDisk int,
	retentionDays string, orderedmulti string, ext string, lookupdHTTPAddrs []string) error

func (*ClusterInfo) CreateTopicChannel

func (c *ClusterInfo) CreateTopicChannel(topicName string, channelName string, lookupdHTTPAddrs []string) error

func (*ClusterInfo) CreateTopicChannelAfterTopicCreation

func (c *ClusterInfo) CreateTopicChannelAfterTopicCreation(topicName string, channelName string, lookupdHTTPAddrs []string, partitionNum int) error

func (*ClusterInfo) DeleteChannel

func (c *ClusterInfo) DeleteChannel(topicName string, channelName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) error

func (*ClusterInfo) DeleteTopic

func (c *ClusterInfo) DeleteTopic(topicName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) error

this will delete all partitions of topic on all nsqd node.

func (*ClusterInfo) EmptyChannel

func (c *ClusterInfo) EmptyChannel(topicName string, channelName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) error

func (*ClusterInfo) EmptyTopic

func (c *ClusterInfo) EmptyTopic(topicName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) error

func (*ClusterInfo) GetClusterInfo

func (c *ClusterInfo) GetClusterInfo(lookupdAdresses []string) (*ClusterNodeInfo, error)

func (*ClusterInfo) GetLookupdProducers

func (c *ClusterInfo) GetLookupdProducers(lookupdHTTPAddrs []string) (Producers, error)

GetLookupdProducers returns Producers of all the nsqd connected to the given lookupds

func (*ClusterInfo) GetLookupdTopicChannels

func (c *ClusterInfo) GetLookupdTopicChannels(topic string, lookupdHTTPAddrs []string) ([]string, error)

GetLookupdTopicChannels returns a []string containing a union of all the channels from all the given lookupd for the given topic

func (*ClusterInfo) GetLookupdTopicProducers

func (c *ClusterInfo) GetLookupdTopicProducers(topic string, lookupdHTTPAddrs []string) (Producers, map[string]Producers, error)

GetLookupdTopicProducers returns Producers of all the nsqd for a given topic by unioning the nodes returned from the given lookupd

func (*ClusterInfo) GetLookupdTopics

func (c *ClusterInfo) GetLookupdTopics(lookupdHTTPAddrs []string) ([]string, error)

GetLookupdTopics returns a []string containing a union of all the topics from all the given nsqlookupd

func (*ClusterInfo) GetLookupdTopicsMeta

func (c *ClusterInfo) GetLookupdTopicsMeta(lookupdHTTPAddrs []string, metaInfo bool) ([]*TopicInfo, error)

func (*ClusterInfo) GetNSQDAllMessageHistoryStats

func (c *ClusterInfo) GetNSQDAllMessageHistoryStats(producers Producers) (map[string]int64, error)

func (*ClusterInfo) GetNSQDCoordStats

func (c *ClusterInfo) GetNSQDCoordStats(producers Producers, selectedTopic string, part string) (*CoordStats, error)

func (*ClusterInfo) GetNSQDMessageByID

func (c *ClusterInfo) GetNSQDMessageByID(p Producer, selectedTopic string,
	part string, msgID int64) (string, int64, error)

func (*ClusterInfo) GetNSQDMessageHistoryStats

func (c *ClusterInfo) GetNSQDMessageHistoryStats(nsqdHTTPAddr string, selectedTopic string, par string) ([]int64, error)

func (*ClusterInfo) GetNSQDProducers

func (c *ClusterInfo) GetNSQDProducers(nsqdHTTPAddrs []string) (Producers, error)

GetNSQDProducers returns Producers of all the given nsqd

func (*ClusterInfo) GetNSQDStats

func (c *ClusterInfo) GetNSQDStats(producers Producers, selectedTopic string, sortBy string, leaderOnly bool) ([]*TopicStats, map[string]*ChannelStats, error)

GetNSQDStats returns aggregate topic and channel stats from the given Producers

if selectedTopic is empty, this will return stats for *all* topic/channels and the ChannelStats dict will be keyed by topic + ':' + channel

func (*ClusterInfo) GetNSQDTopicProducers

func (c *ClusterInfo) GetNSQDTopicProducers(topic string, nsqdHTTPAddrs []string) (Producers, error)

GetNSQDTopicProducers returns Producers containing the addresses of all the nsqd that produce the given topic

func (*ClusterInfo) GetNSQDTopics

func (c *ClusterInfo) GetNSQDTopics(nsqdHTTPAddrs []string) ([]*TopicInfo, error)

GetNSQDTopics returns a []string containing all the topics produced by the given nsqd

func (*ClusterInfo) GetProducers

func (c *ClusterInfo) GetProducers(lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) (Producers, error)

func (*ClusterInfo) GetTopicProducers

func (c *ClusterInfo) GetTopicProducers(topicName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) (Producers, map[string]Producers, error)

func (*ClusterInfo) GetVersion

func (c *ClusterInfo) GetVersion(addr string) (semver.Version, error)

GetVersion returns a semver.Version object by querying /info

func (*ClusterInfo) ListAllLookupdNodes

func (c *ClusterInfo) ListAllLookupdNodes(lookupdHTTPAddrs []string) (*LookupdNodes, error)

func (*ClusterInfo) PauseChannel

func (c *ClusterInfo) PauseChannel(topicName string, channelName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) error

func (*ClusterInfo) PauseTopic

func (c *ClusterInfo) PauseTopic(topicName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) error

func (*ClusterInfo) ResetChannel

func (c *ClusterInfo) ResetChannel(topicName string, channelName string, lookupdHTTPAddrs []string, resetBy string) error

func (*ClusterInfo) SkipChannel

func (c *ClusterInfo) SkipChannel(topicName string, channelName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) error

func (*ClusterInfo) TombstoneNodeForTopic

func (c *ClusterInfo) TombstoneNodeForTopic(topic string, node string, lookupdHTTPAddrs []string) error

TombstoneNodeForTopic tombstones the given node for the given topic on all the given nsqlookupd and deletes the topic from the node

func (*ClusterInfo) UnPauseChannel

func (c *ClusterInfo) UnPauseChannel(topicName string, channelName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) error

func (*ClusterInfo) UnPauseTopic

func (c *ClusterInfo) UnPauseTopic(topicName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) error

func (*ClusterInfo) UnSkipChannel

func (c *ClusterInfo) UnSkipChannel(topicName string, channelName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) error

type ClusterNodeInfo

type ClusterNodeInfo struct {
	Stable       bool        `json:"stable"`
	NodeStatList []*NodeStat `json:"node_stat_list"`
}

type CoordStats

type CoordStats struct {
	RpcStats        *gorpc.ConnStats `json:"rpc_stats"`
	TopicCoordStats []TopicCoordStat `json:"topic_coord_stats"`
}

type ErrList

type ErrList []error

func (ErrList) Error

func (l ErrList) Error() string

func (ErrList) Errors

func (l ErrList) Errors() []error

type ISRStat

type ISRStat struct {
	HostName string `json:"hostname"`
	NodeID   string `json:"node_id"`
}

type LookupPeer

type LookupPeer struct {
	Info peerInfo
	// contains filtered or unexported fields
}

lookupPeer is a low-level type for connecting/reading/writing to nsqlookupd

A lookupPeer instance is designed to connect lazily to nsqlookupd and reconnect gracefully (i.e. it is all handled by the library). Clients can simply use the Command interface to perform a round-trip.

func NewLookupPeer

func NewLookupPeer(addr string, maxBodySize int64, l levellogger.Logger, connectCallback func(*LookupPeer)) *LookupPeer

newLookupPeer creates a new lookupPeer instance connecting to the supplied address.

The supplied connectCallback will be called *every* time the instance connects.

func (*LookupPeer) Close

func (lp *LookupPeer) Close() error

Close implements the io.Closer interface

func (*LookupPeer) Command

func (lp *LookupPeer) Command(cmd *nsq.Command) ([]byte, error)

Command performs a round-trip for the specified Command.

It will lazily connect to nsqlookupd and gracefully handle reconnecting in the event of a failure.

It returns the response from nsqlookupd as []byte

func (*LookupPeer) Connect

func (lp *LookupPeer) Connect() error

Connect will Dial the specified address, with timeouts

func (*LookupPeer) Read

func (lp *LookupPeer) Read(data []byte) (int, error)

Read implements the io.Reader interface, adding deadlines

func (*LookupPeer) String

func (lp *LookupPeer) String() string

String returns the specified address

func (*LookupPeer) Write

func (lp *LookupPeer) Write(data []byte) (int, error)

Write implements the io.Writer interface, adding deadlines

type LookupdNodes

type LookupdNodes struct {
	LeaderNode NsqLookupdNodeInfo   `json:"lookupdleader"`
	AllNodes   []NsqLookupdNodeInfo `json:"lookupdnodes"`
}

type MessageHistoryStat

type MessageHistoryStat []int64

type NodeHourlyPubsize

type NodeHourlyPubsize struct {
	TopicName      string `json:"topic_name"`
	TopicPartition string `json:"topic_partition"`
	HourlyPubSize  int64  `json:"hourly_pub_size"`
}

type NodeStat

type NodeStat struct {
	HostName         string  `json:"hostname"`
	BroadcastAddress string  `json:"broadcast_address"`
	TCPPort          int     `json:"tcp_port"`
	HTTPPort         int     `json:"http_port"`
	LeaderLoadFactor float64 `json:"leader_load_factor"`
	NodeLoadFactor   float64 `json:"node_load_factor"`
}

type NsqLookupdNodeInfo

type NsqLookupdNodeInfo struct {
	ID       string
	NodeIP   string
	TcpPort  string
	HttpPort string
	RpcPort  string
}

type PartialErr

type PartialErr interface {
	error
	Errors() []error
}

type Producer

type Producer struct {
	RemoteAddresses  []string       `json:"remote_addresses"`
	RemoteAddress    string         `json:"remote_address"`
	Hostname         string         `json:"hostname"`
	BroadcastAddress string         `json:"broadcast_address"`
	TCPPort          int            `json:"tcp_port"`
	HTTPPort         int            `json:"http_port"`
	Version          string         `json:"version"`
	VersionObj       semver.Version `json:"-"`
	Topics           ProducerTopics `json:"topics"`
	OutOfDate        bool           `json:"out_of_date"`
}

func (*Producer) Address

func (p *Producer) Address() string

func (*Producer) HTTPAddress

func (p *Producer) HTTPAddress() string

func (*Producer) IsInconsistent

func (p *Producer) IsInconsistent(numLookupd int) bool

IsInconsistent checks for cases where an unexpected number of nsqd connections are reporting the same information to nsqlookupd (ie: multiple instances are using the same broadcast address), or cases where some nsqd are not reporting to all nsqlookupd.

func (*Producer) TCPAddress

func (p *Producer) TCPAddress() string

func (*Producer) UnmarshalJSON

func (p *Producer) UnmarshalJSON(b []byte) error

UnmarshalJSON implements json.Unmarshaler and postprocesses of ProducerTopics and VersionObj

type ProducerTopic

type ProducerTopic struct {
	Topic      string `json:"topic"`
	Tombstoned bool   `json:"tombstoned"`
}

type ProducerTopics

type ProducerTopics []ProducerTopic

func (ProducerTopics) Len

func (pt ProducerTopics) Len() int

func (ProducerTopics) Less

func (pt ProducerTopics) Less(i, j int) bool

func (ProducerTopics) Swap

func (pt ProducerTopics) Swap(i, j int)

type Producers

type Producers []*Producer

func (Producers) HTTPAddrs

func (t Producers) HTTPAddrs() []string

func (Producers) Len

func (t Producers) Len() int

func (Producers) Search

func (t Producers) Search(needle string) *Producer

func (Producers) Swap

func (t Producers) Swap(i, j int)

type ProducersByHost

type ProducersByHost struct {
	Producers
}

func (ProducersByHost) Less

func (c ProducersByHost) Less(i, j int) bool

type TopicCoordStat

type TopicCoordStat struct {
	Node         string        `json:"node"`
	Name         string        `json:"name"`
	Partition    int           `json:"partition"`
	ISRStats     []ISRStat     `json:"isr_stats"`
	CatchupStats []CatchupStat `json:"catchup_stats"`
}

type TopicInfo

type TopicInfo struct {
	TopicName  string `json:"topic_name"`
	ExtSupport bool   `json:"extend_support"`
	Ordered    bool   `json:"ordered"`
}

type TopicInfoSortByName

type TopicInfoSortByName []*TopicInfo

func (TopicInfoSortByName) Len

func (c TopicInfoSortByName) Len() int

func (TopicInfoSortByName) Less

func (c TopicInfoSortByName) Less(i, j int) bool

func (TopicInfoSortByName) Swap

func (c TopicInfoSortByName) Swap(i, j int)

type TopicMsgStatsInfo

type TopicMsgStatsInfo struct {
	// <100bytes, <1KB, 2KB, 4KB, 8KB, 16KB, 32KB, 64KB, 128KB, 256KB, 512KB, 1MB, 2MB, 4MB
	MsgSizeStats [16]int64
	// <1024us, 2ms, 4ms, 8ms, 16ms, 32ms, 64ms, 128ms, 256ms, 512ms, 1024ms, 2048ms, 4s, 8s
	MsgWriteLatencyStats [16]int64
}

type TopicStats

type TopicStats struct {
	Node           string        `json:"node"`
	Hostname       string        `json:"hostname"`
	TopicName      string        `json:"topic_name"`
	TopicPartition string        `json:"topic_partition"`
	StatsdName     string        `json:"statsd_name"`
	IsLeader       bool          `json:"is_leader"`
	IsMultiOrdered bool          `json:"is_multi_ordered"`
	IsExt          bool          `json:"is_ext"`
	SyncingNum     int           `json:"syncing_num"`
	ISRStats       []ISRStat     `json:"isr_stats"`
	CatchupStats   []CatchupStat `json:"catchup_stats"`
	Depth          int64         `json:"depth"`
	MemoryDepth    int64         `json:"memory_depth"`
	// the queue maybe auto cleaned, so the start means the queue oldest offset.
	BackendStart           int64            `json:"backend_start"`
	BackendDepth           int64            `json:"backend_depth"`
	MessageCount           int64            `json:"message_count"`
	NodeStats              []*TopicStats    `json:"nodes"`
	Channels               []*ChannelStats  `json:"channels"`
	TotalChannelDepth      int64            `json:"total_channel_depth"`
	Paused                 bool             `json:"paused"`
	HourlyPubSize          int64            `json:"hourly_pubsize"`
	PartitionHourlyPubSize []int64          `json:"partition_hourly_pubsize"`
	Clients                []ClientPubStats `json:"client_pub_stats"`
	MessageSizeStats       [16]int64        `json:"msg_size_stats"`
	MessageLatencyStats    [16]int64        `json:"msg_write_latency_stats"`

	E2eProcessingLatency *quantile.E2eProcessingLatencyAggregate `json:"e2e_processing_latency"`
}

func (*TopicStats) Add

func (t *TopicStats) Add(a *TopicStats)

type TopicStatsByChannelDepth

type TopicStatsByChannelDepth struct {
	TopicStatsList
}

func (TopicStatsByChannelDepth) Less

func (c TopicStatsByChannelDepth) Less(i, j int) bool

type TopicStatsByHourlyPubsize

type TopicStatsByHourlyPubsize struct {
	TopicStatsList
}

func (TopicStatsByHourlyPubsize) Less

func (c TopicStatsByHourlyPubsize) Less(i, j int) bool

type TopicStatsByMessageCount

type TopicStatsByMessageCount struct {
	TopicStatsList
}

func (TopicStatsByMessageCount) Less

func (c TopicStatsByMessageCount) Less(i, j int) bool

type TopicStatsByPartitionAndHost

type TopicStatsByPartitionAndHost struct {
	TopicStatsList
}

func (TopicStatsByPartitionAndHost) Less

func (c TopicStatsByPartitionAndHost) Less(i, j int) bool

type TopicStatsList

type TopicStatsList []*TopicStats

func (TopicStatsList) Len

func (t TopicStatsList) Len() int

func (TopicStatsList) Swap

func (t TopicStatsList) Swap(i, j int)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL