clusterinfo

package
v0.3.7-HA.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 11, 2016 License: MIT Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CatchupStat

type CatchupStat struct {
	HostName string `json:"hostname"`
	NodeID   string `json:"node_id"`
	Progress int    `json:"progress"`
}

type ChannelStats

type ChannelStats struct {
	Node           string          `json:"node"`
	Hostname       string          `json:"hostname"`
	TopicName      string          `json:"topic_name"`
	ChannelName    string          `json:"channel_name"`
	Depth          int64           `json:"depth"`
	DepthSize      int64           `json:"depth_size"`
	DepthTimestamp string          `json:"depth_ts"`
	MemoryDepth    int64           `json:"memory_depth"`
	BackendDepth   int64           `json:"backend_depth"`
	InFlightCount  int64           `json:"in_flight_count"`
	DeferredCount  int64           `json:"deferred_count"`
	RequeueCount   int64           `json:"requeue_count"`
	TimeoutCount   int64           `json:"timeout_count"`
	MessageCount   int64           `json:"message_count"`
	ClientCount    int             `json:"-"`
	Selected       bool            `json:"-"`
	NodeStats      []*ChannelStats `json:"nodes"`
	Clients        []*ClientStats  `json:"clients"`
	Paused         bool            `json:"paused"`

	E2eProcessingLatency *quantile.E2eProcessingLatencyAggregate `json:"e2e_processing_latency"`
}

func (*ChannelStats) Add

func (c *ChannelStats) Add(a *ChannelStats)

type ChannelStatsByHost

type ChannelStatsByHost struct {
	ChannelStatsList
}

func (ChannelStatsByHost) Less

func (c ChannelStatsByHost) Less(i, j int) bool

type ChannelStatsList

type ChannelStatsList []*ChannelStats

func (ChannelStatsList) Len

func (c ChannelStatsList) Len() int

func (ChannelStatsList) Swap

func (c ChannelStatsList) Swap(i, j int)

type ClientPubStats

type ClientPubStats struct {
	RemoteAddress string `json:"remote_address"`
	UserAgent     string `json:"user_agent"`
	Protocol      string `json:"protocol"`
	PubCount      int64  `json:"pub_count"`
	ErrCount      int64  `json:"err_count"`
	LastPubTs     int64  `json:"last_pub_ts"`
}

type ClientStats

type ClientStats struct {
	Node              string        `json:"node"`
	RemoteAddress     string        `json:"remote_address"`
	Name              string        `json:"name"` // TODO: deprecated, remove in 1.0
	Version           string        `json:"version"`
	ClientID          string        `json:"client_id"`
	Hostname          string        `json:"hostname"`
	UserAgent         string        `json:"user_agent"`
	ConnectTs         int64         `json:"connect_ts"`
	ConnectedDuration time.Duration `json:"connected"`
	InFlightCount     int           `json:"in_flight_count"`
	ReadyCount        int           `json:"ready_count"`
	FinishCount       int64         `json:"finish_count"`
	RequeueCount      int64         `json:"requeue_count"`
	MessageCount      int64         `json:"message_count"`
	SampleRate        int32         `json:"sample_rate"`
	Deflate           bool          `json:"deflate"`
	Snappy            bool          `json:"snappy"`
	Authed            bool          `json:"authed"`
	AuthIdentity      string        `json:"auth_identity"`
	AuthIdentityURL   string        `json:"auth_identity_url"`

	TLS                           bool   `json:"tls"`
	CipherSuite                   string `json:"tls_cipher_suite"`
	TLSVersion                    string `json:"tls_version"`
	TLSNegotiatedProtocol         string `json:"tls_negotiated_protocol"`
	TLSNegotiatedProtocolIsMutual bool   `json:"tls_negotiated_protocol_is_mutual"`
}

func (*ClientStats) HasSampleRate

func (c *ClientStats) HasSampleRate() bool

func (*ClientStats) HasUserAgent

func (c *ClientStats) HasUserAgent() bool

func (*ClientStats) UnmarshalJSON

func (s *ClientStats) UnmarshalJSON(b []byte) error

UnmarshalJSON implements json.Unmarshaler and postprocesses ConnectedDuration

type ClientStatsList

type ClientStatsList []*ClientStats

func (ClientStatsList) Len

func (c ClientStatsList) Len() int

func (ClientStatsList) Swap

func (c ClientStatsList) Swap(i, j int)

type ClientsByHost

type ClientsByHost struct {
	ClientStatsList
}

func (ClientsByHost) Less

func (c ClientsByHost) Less(i, j int) bool

type ClusterInfo

type ClusterInfo struct {
	// contains filtered or unexported fields
}

func New

func New(log logger, client *http_api.Client) *ClusterInfo

func (*ClusterInfo) CreateTopic

func (c *ClusterInfo) CreateTopic(topicName string, partitionNum int, replica int, syncDisk int,
	retentionDays string, lookupdHTTPAddrs []string) error

func (*ClusterInfo) CreateTopicChannel

func (c *ClusterInfo) CreateTopicChannel(topicName string, channelName string, lookupdHTTPAddrs []string) error

func (*ClusterInfo) DeleteChannel

func (c *ClusterInfo) DeleteChannel(topicName string, channelName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) error

func (*ClusterInfo) DeleteTopic

func (c *ClusterInfo) DeleteTopic(topicName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) error

this will delete all partitions of topic on all nsqd node.

func (*ClusterInfo) EmptyChannel

func (c *ClusterInfo) EmptyChannel(topicName string, channelName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) error

func (*ClusterInfo) EmptyTopic

func (c *ClusterInfo) EmptyTopic(topicName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) error

func (*ClusterInfo) GetLookupdProducers

func (c *ClusterInfo) GetLookupdProducers(lookupdHTTPAddrs []string) (Producers, error)

GetLookupdProducers returns Producers of all the nsqd connected to the given lookupds

func (*ClusterInfo) GetLookupdTopicChannels

func (c *ClusterInfo) GetLookupdTopicChannels(topic string, lookupdHTTPAddrs []string) ([]string, error)

GetLookupdTopicChannels returns a []string containing a union of all the channels from all the given lookupd for the given topic

func (*ClusterInfo) GetLookupdTopicProducers

func (c *ClusterInfo) GetLookupdTopicProducers(topic string, lookupdHTTPAddrs []string) (Producers, map[string]Producers, error)

GetLookupdTopicProducers returns Producers of all the nsqd for a given topic by unioning the nodes returned from the given lookupd

func (*ClusterInfo) GetLookupdTopics

func (c *ClusterInfo) GetLookupdTopics(lookupdHTTPAddrs []string) ([]string, error)

GetLookupdTopics returns a []string containing a union of all the topics from all the given nsqlookupd

func (*ClusterInfo) GetNSQDCoordStats

func (c *ClusterInfo) GetNSQDCoordStats(producers Producers, selectedTopic string, part string) (*CoordStats, error)

func (*ClusterInfo) GetNSQDProducers

func (c *ClusterInfo) GetNSQDProducers(nsqdHTTPAddrs []string) (Producers, error)

GetNSQDProducers returns Producers of all the given nsqd

func (*ClusterInfo) GetNSQDStats

func (c *ClusterInfo) GetNSQDStats(producers Producers, selectedTopic string, sortBy string) ([]*TopicStats, map[string]*ChannelStats, error)

GetNSQDStats returns aggregate topic and channel stats from the given Producers

if selectedTopic is empty, this will return stats for *all* topic/channels and the ChannelStats dict will be keyed by topic + ':' + channel

func (*ClusterInfo) GetNSQDTopicProducers

func (c *ClusterInfo) GetNSQDTopicProducers(topic string, nsqdHTTPAddrs []string) (Producers, error)

GetNSQDTopicProducers returns Producers containing the addresses of all the nsqd that produce the given topic

func (*ClusterInfo) GetNSQDTopics

func (c *ClusterInfo) GetNSQDTopics(nsqdHTTPAddrs []string) ([]string, error)

GetNSQDTopics returns a []string containing all the topics produced by the given nsqd

func (*ClusterInfo) GetProducers

func (c *ClusterInfo) GetProducers(lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) (Producers, error)

func (*ClusterInfo) GetTopicProducers

func (c *ClusterInfo) GetTopicProducers(topicName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) (Producers, map[string]Producers, error)

func (*ClusterInfo) GetVersion

func (c *ClusterInfo) GetVersion(addr string) (semver.Version, error)

GetVersion returns a semver.Version object by querying /info

func (*ClusterInfo) ListAllLookupdNodes

func (c *ClusterInfo) ListAllLookupdNodes(lookupdHTTPAddrs []string) (*LookupdNodes, error)

func (*ClusterInfo) PauseChannel

func (c *ClusterInfo) PauseChannel(topicName string, channelName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) error

func (*ClusterInfo) PauseTopic

func (c *ClusterInfo) PauseTopic(topicName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) error

func (*ClusterInfo) TombstoneNodeForTopic

func (c *ClusterInfo) TombstoneNodeForTopic(topic string, node string, lookupdHTTPAddrs []string) error

TombstoneNodeForTopic tombstones the given node for the given topic on all the given nsqlookupd and deletes the topic from the node

func (*ClusterInfo) UnPauseChannel

func (c *ClusterInfo) UnPauseChannel(topicName string, channelName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) error

func (*ClusterInfo) UnPauseTopic

func (c *ClusterInfo) UnPauseTopic(topicName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) error

type CoordStats

type CoordStats struct {
	RpcStats        gorpc.ConnStats  `json:"rpc_stats"`
	TopicCoordStats []TopicCoordStat `json:"topic_coord_stats"`
}

type ErrList

type ErrList []error

func (ErrList) Error

func (l ErrList) Error() string

func (ErrList) Errors

func (l ErrList) Errors() []error

type ISRStat

type ISRStat struct {
	HostName string `json:"hostname"`
	NodeID   string `json:"node_id"`
}

type LookupPeer

type LookupPeer struct {
	Info peerInfo
	// contains filtered or unexported fields
}

lookupPeer is a low-level type for connecting/reading/writing to nsqlookupd

A lookupPeer instance is designed to connect lazily to nsqlookupd and reconnect gracefully (i.e. it is all handled by the library). Clients can simply use the Command interface to perform a round-trip.

func NewLookupPeer

func NewLookupPeer(addr string, maxBodySize int64, l levellogger.Logger, connectCallback func(*LookupPeer)) *LookupPeer

newLookupPeer creates a new lookupPeer instance connecting to the supplied address.

The supplied connectCallback will be called *every* time the instance connects.

func (*LookupPeer) Close

func (lp *LookupPeer) Close() error

Close implements the io.Closer interface

func (*LookupPeer) Command

func (lp *LookupPeer) Command(cmd *nsq.Command) ([]byte, error)

Command performs a round-trip for the specified Command.

It will lazily connect to nsqlookupd and gracefully handle reconnecting in the event of a failure.

It returns the response from nsqlookupd as []byte

func (*LookupPeer) Connect

func (lp *LookupPeer) Connect() error

Connect will Dial the specified address, with timeouts

func (*LookupPeer) Read

func (lp *LookupPeer) Read(data []byte) (int, error)

Read implements the io.Reader interface, adding deadlines

func (*LookupPeer) String

func (lp *LookupPeer) String() string

String returns the specified address

func (*LookupPeer) Write

func (lp *LookupPeer) Write(data []byte) (int, error)

Write implements the io.Writer interface, adding deadlines

type LookupdNodes

type LookupdNodes struct {
	LeaderNode NsqLookupdNodeInfo   `json:"lookupdleader"`
	AllNodes   []NsqLookupdNodeInfo `json:"lookupdnodes"`
}

type NsqLookupdNodeInfo

type NsqLookupdNodeInfo struct {
	ID       string
	NodeIP   string
	TcpPort  string
	HttpPort string
	RpcPort  string
}

type PartialErr

type PartialErr interface {
	error
	Errors() []error
}

type Producer

type Producer struct {
	RemoteAddresses  []string       `json:"remote_addresses"`
	RemoteAddress    string         `json:"remote_address"`
	Hostname         string         `json:"hostname"`
	BroadcastAddress string         `json:"broadcast_address"`
	TCPPort          int            `json:"tcp_port"`
	HTTPPort         int            `json:"http_port"`
	Version          string         `json:"version"`
	VersionObj       semver.Version `json:"-"`
	Topics           ProducerTopics `json:"topics"`
	OutOfDate        bool           `json:"out_of_date"`
}

func (*Producer) Address

func (p *Producer) Address() string

func (*Producer) HTTPAddress

func (p *Producer) HTTPAddress() string

func (*Producer) IsInconsistent

func (p *Producer) IsInconsistent(numLookupd int) bool

IsInconsistent checks for cases where an unexpected number of nsqd connections are reporting the same information to nsqlookupd (ie: multiple instances are using the same broadcast address), or cases where some nsqd are not reporting to all nsqlookupd.

func (*Producer) TCPAddress

func (p *Producer) TCPAddress() string

func (*Producer) UnmarshalJSON

func (p *Producer) UnmarshalJSON(b []byte) error

UnmarshalJSON implements json.Unmarshaler and postprocesses of ProducerTopics and VersionObj

type ProducerTopic

type ProducerTopic struct {
	Topic      string `json:"topic"`
	Tombstoned bool   `json:"tombstoned"`
}

type ProducerTopics

type ProducerTopics []ProducerTopic

func (ProducerTopics) Len

func (pt ProducerTopics) Len() int

func (ProducerTopics) Less

func (pt ProducerTopics) Less(i, j int) bool

func (ProducerTopics) Swap

func (pt ProducerTopics) Swap(i, j int)

type Producers

type Producers []*Producer

func (Producers) HTTPAddrs

func (t Producers) HTTPAddrs() []string

func (Producers) Len

func (t Producers) Len() int

func (Producers) Search

func (t Producers) Search(needle string) *Producer

func (Producers) Swap

func (t Producers) Swap(i, j int)

type ProducersByHost

type ProducersByHost struct {
	Producers
}

func (ProducersByHost) Less

func (c ProducersByHost) Less(i, j int) bool

type TopicCoordStat

type TopicCoordStat struct {
	Node         string        `json:"node"`
	Name         string        `json:"name"`
	Partition    int           `json:"partition"`
	ISRStats     []ISRStat     `json:"isr_stats"`
	CatchupStats []CatchupStat `json:"catchup_stats"`
}

type TopicStats

type TopicStats struct {
	Node           string        `json:"node"`
	Hostname       string        `json:"hostname"`
	TopicName      string        `json:"topic_name"`
	TopicPartition string        `json:"topic_partition"`
	IsLeader       bool          `json:"is_leader"`
	SyncingNum     int           `json:"syncing_num"`
	ISRStats       []ISRStat     `json:"isr_stats"`
	CatchupStats   []CatchupStat `json:"catchup_stats"`
	Depth          int64         `json:"depth"`
	MemoryDepth    int64         `json:"memory_depth"`
	// the queue maybe auto cleaned, so the start means the queue oldest offset.
	BackendStart int64           `json:"backend_start"`
	BackendDepth int64           `json:"backend_depth"`
	MessageCount int64           `json:"message_count"`
	NodeStats    []*TopicStats   `json:"nodes"`
	Channels     []*ChannelStats `json:"channels"`

	Paused        bool             `json:"paused"`
	HourlyPubSize int64            `json:"hourly_pubsize"`
	Clients       []ClientPubStats `json:"client_pub_stats"`

	E2eProcessingLatency *quantile.E2eProcessingLatencyAggregate `json:"e2e_processing_latency"`
	// contains filtered or unexported fields
}

func (*TopicStats) Add

func (t *TopicStats) Add(a *TopicStats)

type TopicStatsByChannelDepth

type TopicStatsByChannelDepth struct {
	TopicStatsList
}

func (TopicStatsByChannelDepth) Less

func (c TopicStatsByChannelDepth) Less(i, j int) bool

type TopicStatsByPartitionAndHost

type TopicStatsByPartitionAndHost struct {
	TopicStatsList
}

func (TopicStatsByPartitionAndHost) Less

func (c TopicStatsByPartitionAndHost) Less(i, j int) bool

type TopicStatsList

type TopicStatsList []*TopicStats

func (TopicStatsList) Len

func (t TopicStatsList) Len() int

func (TopicStatsList) Swap

func (t TopicStatsList) Swap(i, j int)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL