nsqlookup

package
v1.2.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 3, 2021 License: MIT, MIT Imports: 27 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DefaultConsulAddress is the default address at which a consul agent is
	// expected to be available for consul engines.
	DefaultConsulAddress = "localhost:8500"

	// DefaultConsulNamespace is the key namespace used by default by the consul
	// engine.
	DefaultConsulNamespace = "nsqlookup"
)
View Source
const (
	ErrInvalid     = "E_INVALID"
	ErrBadTopic    = "E_BAD_TOPIC"
	ErrBadChannel  = "E_BAD_CHANNEL"
	ErrBadBody     = "E_BAD_BODY"
	ErrBadProtocol = "E_BAD_PROTOCOL"
)
View Source
const (
	// DefaultTcpAddress is the default address used for TCP connections.
	DefaultTcpAddress = "localhost:4160"

	// DefaultHttpAddress is the default address used for HTTP requests.
	DefaultHttpAddress = "localhost:4161"

	// DefaultReadTimeout is the maximum duration used by default waiting
	// for commands.
	DefaultReadTimeout = 1 * time.Minute

	// DefaultReadTimeout is the maximum duration used by default for write
	// operations.
	DefaultWriteTimeout = 1 * time.Second

	// DefaultEngineTimeout is the maximum duration used by default for engine
	// operations.
	DefaultEngineTimeout = 1 * time.Second
)
View Source
const (
	// DefaultLocalEngineNodeTimeout is the maximum amount of time an idle node
	// will be kept by default in a local engine.
	DefaultLocalEngineNodeTimeout = 2 * DefaultReadTimeout

	// DefaultLocalEngineTombstoneTimeout is the maximum amount of time a
	// tombstone rule is kept by default in a local engine.
	DefaultLocalEngineTombstoneTimeout = DefaultLocalEngineNodeTimeout
)

Variables

This section is empty.

Functions

func ClientIP added in v1.2.8

func ClientIP(ctx context.Context) net.IP

ClientIP returns the client IP embedded in the context, or nil if none were found.

func WithClientIP added in v1.2.8

func WithClientIP(ctx context.Context, ip net.IP) context.Context

WithClientIP returns a context which carries the given client IP.

Types

type Cache added in v1.2.8

type Cache struct {
	// Base registry to cache services for. This field must not be nil.
	Registry Registry

	// Minimum and maximum TTLs applied to cache entries.
	MinTTL time.Duration
	MaxTTL time.Duration

	// Maximum size of the cache (in bytes). Defaults to 1 MB.
	MaxBytes int64
	// contains filtered or unexported fields
}

Cache provides the implementation of an in-memory caching layer for a service registry.

When used as a resolver, the cache uses a load balancing strategy to return a different address on every call to Resolve.

Cache implements both the Registry and Resolver interfaces, which means they are safe to use concurrently from multiple goroutines.

Cache values must not be copied after being used.

func (*Cache) Lookup added in v1.2.8

func (c *Cache) Lookup(ctx context.Context, name string, tags ...string) ([]string, time.Duration, error)

Lookup satisfies the Registry interface.

func (*Cache) Resolve added in v1.2.8

func (c *Cache) Resolve(ctx context.Context, name string) (string, error)

Resolve satisfies the Resolver interface.

func (*Cache) Stats added in v1.2.8

func (c *Cache) Stats() CacheStats

Stats takes a snapshot of the current utilization statistics of the cache.

Note that because cache is safe to use concurrently from multiple goroutines, cache statistics are eventually consistent and a snapshot may not reflect the effect of concurrent utilization of the cache.

type CacheStats added in v1.2.8

type CacheStats struct {
	Bytes     int64 `metric:"services.cache.bytes"     type:"gauge"`
	Size      int64 `metric:"services.cache.size"      type:"gauge"`
	Hits      int64 `metric:"services.cache.hits"      type:"counter"`
	Misses    int64 `metric:"services.cache.misses"    type:"counter"`
	Evictions int64 `metric:"services.cache.evictions" type:"counter"`
}

CacheStats exposes internal statistics on service cache utilization.

type Command

type Command interface {
	// Name returns the name of the command.
	Name() string

	// Write outputs the command to w.
	Write(w *bufio.Writer) error
}

The Command interface is implemented by all types representing client commands sent to nsqlookup servers.

func ReadCommand

func ReadCommand(r *bufio.Reader) (cmd Command, err error)

ReadCommand reads cmd from r, or returns an error if no commands could be read.

type ConsulConfig

type ConsulConfig struct {
	// The address at which the consul agent is exposing its HTTP API.
	Address string

	// The namespace that the engine will be working on within the consul
	// key/value store.
	Namespace string

	// NodeTImeout is the maximum amount of time a node is allowed to be idle
	// before it gets evicted.
	NodeTimeout time.Duration

	// TomstoneTimeout is the amount of time after which a tombstone set on a
	// topic is evisted.
	TombstoneTimeout time.Duration

	// Transport used by the engine's HTTP client, the default transport is used
	// if none is provided.
	Transport http.RoundTripper
}

The ConsulConfig structure is used to configure consul engines.

type ConsulEngine

type ConsulEngine struct {
	// contains filtered or unexported fields
}

ConsulEngine are objects that provide the implementation of a nsqlookup engine backed by a consul infrastructure.

func NewConsulEngine

func NewConsulEngine(config ConsulConfig) *ConsulEngine

NewConsulEngine creates and return a new engine configured with config.

func (*ConsulEngine) CheckHealth

func (e *ConsulEngine) CheckHealth(ctx context.Context) error

func (*ConsulEngine) Close

func (e *ConsulEngine) Close() (err error)

func (*ConsulEngine) LookupChannels

func (e *ConsulEngine) LookupChannels(ctx context.Context, topic string) (channels []string, err error)

func (*ConsulEngine) LookupInfo

func (e *ConsulEngine) LookupInfo(ctx context.Context) (info EngineInfo, err error)

func (*ConsulEngine) LookupNodes

func (e *ConsulEngine) LookupNodes(ctx context.Context) ([]NodeInfo2, error)

func (*ConsulEngine) LookupProducers

func (e *ConsulEngine) LookupProducers(ctx context.Context, topic string) (producers []NodeInfo, err error)

func (*ConsulEngine) LookupTopics

func (e *ConsulEngine) LookupTopics(ctx context.Context) (topics []string, err error)

func (*ConsulEngine) RegisterNode

func (e *ConsulEngine) RegisterNode(ctx context.Context, node NodeInfo) (n Node, err error)

func (*ConsulEngine) TombstoneTopic

func (e *ConsulEngine) TombstoneTopic(ctx context.Context, node NodeInfo, topic string) (err error)

type ConsulNode

type ConsulNode struct {
	// contains filtered or unexported fields
}

func (*ConsulNode) Info

func (n *ConsulNode) Info() NodeInfo

func (*ConsulNode) Ping

func (n *ConsulNode) Ping(ctx context.Context) error

func (*ConsulNode) RegisterChannel

func (n *ConsulNode) RegisterChannel(ctx context.Context, topic string, channel string) error

func (*ConsulNode) RegisterTopic

func (n *ConsulNode) RegisterTopic(ctx context.Context, topic string) error

func (*ConsulNode) String

func (n *ConsulNode) String() string

func (*ConsulNode) Unregister

func (n *ConsulNode) Unregister(ctx context.Context) error

func (*ConsulNode) UnregisterChannel

func (n *ConsulNode) UnregisterChannel(ctx context.Context, topic string, channel string) error

func (*ConsulNode) UnregisterTopic

func (n *ConsulNode) UnregisterTopic(ctx context.Context, topic string) error

type ConsulRegistry added in v1.2.8

type ConsulRegistry struct {
	Address   string
	TTL       time.Duration
	Transport http.RoundTripper
}

ConsulRegistry implements a service registry which discovers services from a consul catalog.

func (*ConsulRegistry) Lookup added in v1.2.8

func (r *ConsulRegistry) Lookup(ctx context.Context, service string, tags ...string) (addrs []string, ttl time.Duration, err error)

type Engine

type Engine interface {
	// Close should release all internal state maintained by the engine, it is
	// called when the nsqlookup server using the engine is shutting down.
	Close() error

	// RegisterNode is called by nsqlookup servers when a new node is attempting
	// to register.
	RegisterNode(ctx context.Context, node NodeInfo) (Node, error)

	// TombstoneTopic marks topic as tombstoned on node.
	TombstoneTopic(ctx context.Context, node NodeInfo, topic string) error

	// LookupNodes must return a list of of all nodes registered on the engine.
	LookupNodes(ctx context.Context) ([]NodeInfo2, error)

	// LookupProducers must return a list of all nodes for which topic has been
	// registered on the engine and were not tombstoned.
	LookupProducers(ctx context.Context, topic string) ([]NodeInfo, error)

	// LookupTopics must return a list of all topics registered on the engine.
	LookupTopics(ctx context.Context) ([]string, error)

	// LookupChannels must return a list of all channels registered for topic on
	// the engine.
	LookupChannels(ctx context.Context, topic string) ([]string, error)

	// LookupInfo must return information about the engine.
	LookupInfo(ctx context.Context) (EngineInfo, error)

	// CheckHealth is called by nsqlookup servers to evaluate the health of the
	// engine.
	CheckHealth(ctx context.Context) error
}

The Engine interface must be implemented by types that are intended to be used to power nsqlookup servers.

Each method of the engine accepts a context as first argument which may be used to cancel or set a deadline on the operation. This is useful for engines that work we storage services accessed over the network. The context may be nil.

type EngineInfo

type EngineInfo struct {
	// Type of the engine.
	Type string `json:"type"`

	// Version represents the version of the nsqlookup engine.
	Version string `json:"version"`
}

The EngineInfo structure carries information about a nsqlookup engine.

type Error

type Error struct {
	Code   string
	Reason string
}

func (Error) Error

func (e Error) Error() string

func (Error) Status

func (e Error) Status() string

func (Error) Write

func (e Error) Write(w *bufio.Writer) (err error)

type HTTPHandler

type HTTPHandler struct {
	// Engine must not be nil and has to be set to the engine that will be used
	// by the handler to respond to http requests.
	Engine Engine

	// EngineTimeout should be set to the maximum duration allowed for engine
	// operations.
	EngineTimeout time.Duration

	// List of user agents to enable zone-awareness for.
	ZoneAwareAgents []string
}

The HTTPHandler satisfies the http.Handler interface and provides the implementation of the nsqlookup http API.

func (HTTPHandler) ServeHTTP

func (h HTTPHandler) ServeHTTP(res http.ResponseWriter, req *http.Request)

type Identify

type Identify struct {
	Info NodeInfo
}

func (Identify) Name

func (c Identify) Name() string

func (Identify) Write

func (c Identify) Write(w *bufio.Writer) (err error)

type LocalConfig

type LocalConfig struct {
	// NodeTimeout is the maximum amount of time an idle node will be kept in a
	// local engine.
	NodeTimeout time.Duration

	// TombstoneTimeout is the maximum amount of time a tombstone rule is kept
	// in a local engine.
	TombstoneTimeout time.Duration
}

The LocalConfig structure is used to configure local nsqlookup engines.

type LocalEngine

type LocalEngine struct {
	// contains filtered or unexported fields
}

LocalEngine is a nsqlookup engine that maintain its state in memory.

This is an implementation of the default behavior of nsqlookup servers as provided by the standard implementation, where no state is shared between instances of nsqlookup and the state is disarded when the server goes away.

func NewLocalEngine

func NewLocalEngine(config LocalConfig) *LocalEngine

NewLocalEngine creates and returns an instance of LocalEngine configured with config.

func (*LocalEngine) CheckHealth

func (e *LocalEngine) CheckHealth(ctx context.Context) (err error)

func (*LocalEngine) Close

func (e *LocalEngine) Close() error

func (*LocalEngine) LookupChannels

func (e *LocalEngine) LookupChannels(ctx context.Context, topic string) (channels []string, err error)

func (*LocalEngine) LookupInfo

func (e *LocalEngine) LookupInfo(ctx context.Context) (info EngineInfo, err error)

func (*LocalEngine) LookupNodes

func (e *LocalEngine) LookupNodes(ctx context.Context) (nodes []NodeInfo2, err error)

func (*LocalEngine) LookupProducers

func (e *LocalEngine) LookupProducers(ctx context.Context, topic string) (producers []NodeInfo, err error)

func (*LocalEngine) LookupTopics

func (e *LocalEngine) LookupTopics(ctx context.Context) (topics []string, err error)

func (*LocalEngine) RegisterNode

func (e *LocalEngine) RegisterNode(ctx context.Context, node NodeInfo) (Node, error)

func (*LocalEngine) TombstoneTopic

func (e *LocalEngine) TombstoneTopic(ctx context.Context, node NodeInfo, topic string) error

type LocalNode

type LocalNode struct {
	// contains filtered or unexported fields
}

func (*LocalNode) Info

func (n *LocalNode) Info() NodeInfo

func (*LocalNode) Ping

func (n *LocalNode) Ping(ctx context.Context) error

func (*LocalNode) RegisterChannel

func (n *LocalNode) RegisterChannel(ctx context.Context, topic string, channel string) error

func (*LocalNode) RegisterTopic

func (n *LocalNode) RegisterTopic(ctx context.Context, topic string) error

func (*LocalNode) String

func (n *LocalNode) String() string

func (*LocalNode) Unregister

func (n *LocalNode) Unregister(ctx context.Context) error

func (*LocalNode) UnregisterChannel

func (n *LocalNode) UnregisterChannel(ctx context.Context, topic string, channel string) error

func (*LocalNode) UnregisterTopic

func (n *LocalNode) UnregisterTopic(ctx context.Context, topic string) error

type LocalRegistry added in v1.2.8

type LocalRegistry map[string][]string

LocalRegistry is an implementation of a immutable set of services. This type is mostly useful for testing purposes.

func (LocalRegistry) Lookup added in v1.2.8

func (r LocalRegistry) Lookup(ctx context.Context, service string, tags ...string) (addrs []string, ttl time.Duration, err error)

type Node

type Node interface {
	// Info should return the info given to RegisterNode when the node was
	// created.
	Info() NodeInfo

	// Ping is called by nsqlookup servers when a registered node sends a
	// ping command to inform that it is still alive.
	Ping(ctx context.Context) error

	// Unregister is called by nsqlookup servers when a node that had
	// previously registered is going away.
	Unregister(ctx context.Context) error

	// RegisterTopic is called by nsqlookup servers when topic is being
	// registered on node.
	RegisterTopic(ctx context.Context, topic string) error

	// UnregisterTopic is called by nsqlookup servers when topic is being
	// unregistered from node.
	UnregisterTopic(ctx context.Context, topic string) error

	// RegisterChannel is called by nsqlookup servers when channel from topic is
	// being registered on node.
	RegisterChannel(ctx context.Context, topic string, channel string) error

	// UnregisterChannel is called by nsqlookup servers when channel from topic
	// is being unregistered from node.
	UnregisterChannel(ctx context.Context, topic string, channel string) error
}

The Node interface is used to represent a single node registered within a nsqlookup engine.

type NodeInfo

type NodeInfo struct {
	// RemoteAddress is the address that the node connected from.
	RemoteAddress string `json:"remote_address"`

	// Hostname of the nsqd node.
	Hostname string `json:"hostname"`

	// BroadcastAddress is the address advertized by the nsqd node.
	BroadcastAddress string `json:"broadcast_address"`

	// TcpPort is the port on which the nsqd node is listening for incoming TCP
	// connections.
	TcpPort int `json:"tcp_port"`

	// HttpPort is the port on which the nsqd node accepts HTTP requests.
	HttpPort int `json:"http_port"`

	// Version represents the version of nsqd ran by the node.
	Version string `json:"version"`
}

The NodeInfo structure carries information about a node referenced by a nsqlookup server.

func (NodeInfo) String

func (info NodeInfo) String() string

String returns a human-readable representation of the node info.

type NodeInfo2

type NodeInfo2 struct {
	// RemoteAddress is the address that the node connected from.
	RemoteAddress string `json:"remote_address"`

	// Hostname of the nsqd node.
	Hostname string `json:"hostname"`

	// BroadcastAddress is the address advertized by the nsqd node.
	BroadcastAddress string `json:"broadcast_address"`

	// TcpPort is the port on which the nsqd node is listening for incoming TCP
	// connections.
	TcpPort int `json:"tcp_port"`

	// HttpPort is the port on which the nsqd node accepts HTTP requests.
	HttpPort int `json:"http_port"`

	// Version represents the version of nsqd ran by the node.
	Version string `json:"version"`

	// Tombstones has items set to true if the topic at the matching index has
	// been tomstoned.
	Tombstones []bool `json:"tombstones"`

	// Topics is the list of topic hosted by the node.
	Topics []string `json:"topics"`
}

The NodeInfo2 structure carries information about a node referenced by a nsqlookup server.

The type is very similar to NodeInfo, but adds a list of tombstones for a node, and a list of topics. The tombstones list carries booleans that tell whether the topic at the matching index has been tombstoned on the node.

func (NodeInfo2) String

func (info NodeInfo2) String() string

String returns a human-readable representation of the node info.

type OK

type OK struct {
}

func (OK) Status

func (OK) Status() string

func (OK) Write

func (OK) Write(w *bufio.Writer) error

type Ping

type Ping struct {
}

func (Ping) Name

func (c Ping) Name() string

func (Ping) Write

func (c Ping) Write(w *bufio.Writer) (err error)

type ProxyEngine

type ProxyEngine struct {
	Transport http.RoundTripper
	Topology  Topology
	Registry  Registry
	// Name of the nsqlookupd server, defaults to "nsqlookupd".
	Nsqlookupd string
	// List of topics for which the proxy applies zone restrictions of consumers
	// and producers.
	//
	// The value may be a magic ["*"] to indicate that the proxy should apply
	// zone awareness to all topics.
	ZoneAwareTopics []string
}

A ProxyEngine implements the Engine interface an is intended to be used as a frontend to a set of standard nsqlookupd servers to expose them as if they were a single entity.

func (*ProxyEngine) CheckHealth

func (p *ProxyEngine) CheckHealth(ctx context.Context) (err error)

func (*ProxyEngine) Close

func (p *ProxyEngine) Close() error

func (*ProxyEngine) LookupChannels

func (p *ProxyEngine) LookupChannels(ctx context.Context, topic string) (channels []string, err error)

func (*ProxyEngine) LookupInfo

func (p *ProxyEngine) LookupInfo(ctx context.Context) (info EngineInfo, err error)

func (*ProxyEngine) LookupNodes

func (p *ProxyEngine) LookupNodes(ctx context.Context) (nodes []NodeInfo2, err error)

func (*ProxyEngine) LookupProducers

func (p *ProxyEngine) LookupProducers(ctx context.Context, topic string) (nodes []NodeInfo, err error)

func (*ProxyEngine) LookupTopics

func (p *ProxyEngine) LookupTopics(ctx context.Context) (topics []string, err error)

func (*ProxyEngine) RegisterNode

func (p *ProxyEngine) RegisterNode(ctx context.Context, node NodeInfo) (Node, error)

func (*ProxyEngine) TombstoneTopic

func (p *ProxyEngine) TombstoneTopic(ctx context.Context, node NodeInfo, topic string) (err error)

type ProxyNode

type ProxyNode struct {
	// contains filtered or unexported fields
}

type RawResponse

type RawResponse []byte

RawResponse is a pre-serialized byte buffer that implements the Response interface.

func (RawResponse) Status

func (RawResponse) Status() string

Status returns the status of the response.

func (RawResponse) Write

func (r RawResponse) Write(w *bufio.Writer) error

Write outputs the response to w.

type Register

type Register struct {
	Topic   string
	Channel string
}

func (Register) Name

func (c Register) Name() string

func (Register) Write

func (c Register) Write(w *bufio.Writer) (err error)

type Registry added in v1.2.8

type Registry interface {
	// Lookup returns a set of addresses at which services with the given name
	// can be reached.
	//
	// An arbitrary list of tags can be passed to the method to narrow down the
	// result set to services matching this set of tags. No tags means to do no
	// filtering.
	//
	// The method also returns a TTL representing how long the result is valid
	// for. A zero TTL means that the caller should not reuse the result.
	//
	// The returned list of addresses must not be retained by implementations of
	// the Registry interface. The caller becomes the owner of the value after
	// the method returned.
	//
	// A non-nil error is returned when the lookup cannot be completed.
	//
	// The context can be used to asynchronously cancel the query when it
	// involves blocking operations.
	Lookup(ctx context.Context, name string, tags ...string) (addrs []string, ttl time.Duration, err error)
}

type Response

type Response interface {
	// Status returns the status of the response.
	Status() string

	// Write outputs the response to w.
	Write(w *bufio.Writer) error
}

The Response interface is implemented by all types representing nsqlookup server responses.

func ReadResponse

func ReadResponse(r *bufio.Reader) (res Response, err error)

ReadResponse reads res from r, or returns an error if no responses could be read.

type Subnet added in v1.2.8

type Subnet struct {
	CIDR *net.IPNet
	Zone string
}

Subnet represents a network subnet, which is made of a CIDR for the range of IP addresses it contains, and a logical zone name.

type SubnetTopology added in v1.2.8

type SubnetTopology []Subnet

SubnetTopology is an implementation of the Topology interface working on a static list of subnets.

func (SubnetTopology) LookupIPZone added in v1.2.8

func (topology SubnetTopology) LookupIPZone(ctx context.Context, ip net.IP) (zone string, err error)

type TCPHandler

type TCPHandler struct {
	// Engine must not be nil and has to be set to the engine that will be used
	// by the handler to register the connections it serves.
	Engine Engine

	// The Info field should be set to provide information to the connections
	// about the discovery endpoint they're connected to.
	Info NodeInfo

	// ReadTimeout is the maximum amount of time the handler will allow its
	// connections to be idle before closing them.
	ReadTimeout time.Duration

	// WriteTimeout is the maximum amount of time the handler will take to send
	// responses to its connections.
	WriteTimeout time.Duration

	// EngineTimeout is the maximum amount of time the handler gives to
	// operations done on the engine.
	EngineTimeout time.Duration
}

The DiscoverHandler type provides the implementation of a connection handler that speaks the nsqlookupd discovery protocol and provides an interface to a nsqlookup engine.

func (TCPHandler) ServeConn

func (h TCPHandler) ServeConn(ctx context.Context, conn net.Conn)

ServeConn takes ownership of the conn object and starts service the commands that the client sends to the discovery handler.

type Topology added in v1.2.8

type Topology interface {
	LookupIPZone(ctx context.Context, ip net.IP) (zone string, err error)
}

Toppology is an interface abstracting the discovery of network topology.

type Unregister

type Unregister struct {
	Topic   string
	Channel string
}

func (Unregister) Name

func (c Unregister) Name() string

func (Unregister) Write

func (c Unregister) Write(w *bufio.Writer) (err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL