Documentation ¶
Index ¶
- Constants
- func ClientIP(ctx context.Context) net.IP
- func WithClientIP(ctx context.Context, ip net.IP) context.Context
- type Cache
- type CacheStats
- type Command
- type ConsulConfig
- type ConsulEngine
- func (e *ConsulEngine) CheckHealth(ctx context.Context) error
- func (e *ConsulEngine) Close() (err error)
- func (e *ConsulEngine) LookupChannels(ctx context.Context, topic string) (channels []string, err error)
- func (e *ConsulEngine) LookupInfo(ctx context.Context) (info EngineInfo, err error)
- func (e *ConsulEngine) LookupNodes(ctx context.Context) ([]NodeInfo2, error)
- func (e *ConsulEngine) LookupProducers(ctx context.Context, topic string) (producers []NodeInfo, err error)
- func (e *ConsulEngine) LookupTopics(ctx context.Context) (topics []string, err error)
- func (e *ConsulEngine) RegisterNode(ctx context.Context, node NodeInfo) (n Node, err error)
- func (e *ConsulEngine) TombstoneTopic(ctx context.Context, node NodeInfo, topic string) (err error)
- type ConsulNode
- func (n *ConsulNode) Info() NodeInfo
- func (n *ConsulNode) Ping(ctx context.Context) error
- func (n *ConsulNode) RegisterChannel(ctx context.Context, topic string, channel string) error
- func (n *ConsulNode) RegisterTopic(ctx context.Context, topic string) error
- func (n *ConsulNode) String() string
- func (n *ConsulNode) Unregister(ctx context.Context) error
- func (n *ConsulNode) UnregisterChannel(ctx context.Context, topic string, channel string) error
- func (n *ConsulNode) UnregisterTopic(ctx context.Context, topic string) error
- type ConsulRegistry
- type Engine
- type EngineInfo
- type Error
- type HTTPHandler
- type Identify
- type LocalConfig
- type LocalEngine
- func (e *LocalEngine) CheckHealth(ctx context.Context) (err error)
- func (e *LocalEngine) Close() error
- func (e *LocalEngine) LookupChannels(ctx context.Context, topic string) (channels []string, err error)
- func (e *LocalEngine) LookupInfo(ctx context.Context) (info EngineInfo, err error)
- func (e *LocalEngine) LookupNodes(ctx context.Context) (nodes []NodeInfo2, err error)
- func (e *LocalEngine) LookupProducers(ctx context.Context, topic string) (producers []NodeInfo, err error)
- func (e *LocalEngine) LookupTopics(ctx context.Context) (topics []string, err error)
- func (e *LocalEngine) RegisterNode(ctx context.Context, node NodeInfo) (Node, error)
- func (e *LocalEngine) TombstoneTopic(ctx context.Context, node NodeInfo, topic string) error
- type LocalNode
- func (n *LocalNode) Info() NodeInfo
- func (n *LocalNode) Ping(ctx context.Context) error
- func (n *LocalNode) RegisterChannel(ctx context.Context, topic string, channel string) error
- func (n *LocalNode) RegisterTopic(ctx context.Context, topic string) error
- func (n *LocalNode) String() string
- func (n *LocalNode) Unregister(ctx context.Context) error
- func (n *LocalNode) UnregisterChannel(ctx context.Context, topic string, channel string) error
- func (n *LocalNode) UnregisterTopic(ctx context.Context, topic string) error
- type LocalRegistry
- type Node
- type NodeInfo
- type NodeInfo2
- type OK
- type Ping
- type ProxyEngine
- func (p *ProxyEngine) CheckHealth(ctx context.Context) (err error)
- func (p *ProxyEngine) Close() error
- func (p *ProxyEngine) LookupChannels(ctx context.Context, topic string) (channels []string, err error)
- func (p *ProxyEngine) LookupInfo(ctx context.Context) (info EngineInfo, err error)
- func (p *ProxyEngine) LookupNodes(ctx context.Context) (nodes []NodeInfo2, err error)
- func (p *ProxyEngine) LookupProducers(ctx context.Context, topic string) (nodes []NodeInfo, err error)
- func (p *ProxyEngine) LookupTopics(ctx context.Context) (topics []string, err error)
- func (p *ProxyEngine) RegisterNode(ctx context.Context, node NodeInfo) (Node, error)
- func (p *ProxyEngine) TombstoneTopic(ctx context.Context, node NodeInfo, topic string) (err error)
- type ProxyNode
- type RawResponse
- type Register
- type Registry
- type Response
- type Subnet
- type SubnetTopology
- type TCPHandler
- type Topology
- type Unregister
Constants ¶
const ( // DefaultConsulAddress is the default address at which a consul agent is // expected to be available for consul engines. DefaultConsulAddress = "localhost:8500" // DefaultConsulNamespace is the key namespace used by default by the consul // engine. DefaultConsulNamespace = "nsqlookup" )
const ( ErrInvalid = "E_INVALID" ErrBadTopic = "E_BAD_TOPIC" ErrBadChannel = "E_BAD_CHANNEL" ErrBadBody = "E_BAD_BODY" ErrBadProtocol = "E_BAD_PROTOCOL" )
const ( // DefaultTcpAddress is the default address used for TCP connections. DefaultTcpAddress = "localhost:4160" // DefaultHttpAddress is the default address used for HTTP requests. DefaultHttpAddress = "localhost:4161" // DefaultReadTimeout is the maximum duration used by default waiting // for commands. DefaultReadTimeout = 1 * time.Minute // DefaultReadTimeout is the maximum duration used by default for write // operations. DefaultWriteTimeout = 1 * time.Second // DefaultEngineTimeout is the maximum duration used by default for engine // operations. DefaultEngineTimeout = 1 * time.Second )
const ( // DefaultLocalEngineNodeTimeout is the maximum amount of time an idle node // will be kept by default in a local engine. DefaultLocalEngineNodeTimeout = 2 * DefaultReadTimeout // DefaultLocalEngineTombstoneTimeout is the maximum amount of time a // tombstone rule is kept by default in a local engine. DefaultLocalEngineTombstoneTimeout = DefaultLocalEngineNodeTimeout )
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Cache ¶ added in v1.3.1
type Cache struct { // Base registry to cache services for. This field must not be nil. Registry Registry // Minimum and maximum TTLs applied to cache entries. MinTTL time.Duration MaxTTL time.Duration // Maximum size of the cache (in bytes). Defaults to 1 MB. MaxBytes int64 // contains filtered or unexported fields }
Cache provides the implementation of an in-memory caching layer for a service registry.
When used as a resolver, the cache uses a load balancing strategy to return a different address on every call to Resolve.
Cache implements both the Registry and Resolver interfaces, which means they are safe to use concurrently from multiple goroutines.
Cache values must not be copied after being used.
func (*Cache) Lookup ¶ added in v1.3.1
func (c *Cache) Lookup(ctx context.Context, name string, tags ...string) ([]string, time.Duration, error)
Lookup satisfies the Registry interface.
func (*Cache) Stats ¶ added in v1.3.1
func (c *Cache) Stats() CacheStats
Stats takes a snapshot of the current utilization statistics of the cache.
Note that because cache is safe to use concurrently from multiple goroutines, cache statistics are eventually consistent and a snapshot may not reflect the effect of concurrent utilization of the cache.
type CacheStats ¶ added in v1.3.1
type CacheStats struct { Bytes int64 `metric:"services.cache.bytes" type:"gauge"` Size int64 `metric:"services.cache.size" type:"gauge"` Hits int64 `metric:"services.cache.hits" type:"counter"` Misses int64 `metric:"services.cache.misses" type:"counter"` Evictions int64 `metric:"services.cache.evictions" type:"counter"` }
CacheStats exposes internal statistics on service cache utilization.
type Command ¶
type Command interface { // Name returns the name of the command. Name() string // Write outputs the command to w. Write(w *bufio.Writer) error }
The Command interface is implemented by all types representing client commands sent to nsqlookup servers.
type ConsulConfig ¶
type ConsulConfig struct { // The address at which the consul agent is exposing its HTTP API. Address string // The namespace that the engine will be working on within the consul // key/value store. Namespace string // NodeTImeout is the maximum amount of time a node is allowed to be idle // before it gets evicted. NodeTimeout time.Duration // TomstoneTimeout is the amount of time after which a tombstone set on a // topic is evisted. TombstoneTimeout time.Duration // Transport used by the engine's HTTP client, the default transport is used // if none is provided. Transport http.RoundTripper }
The ConsulConfig structure is used to configure consul engines.
type ConsulEngine ¶
type ConsulEngine struct {
// contains filtered or unexported fields
}
ConsulEngine are objects that provide the implementation of a nsqlookup engine backed by a consul infrastructure.
func NewConsulEngine ¶
func NewConsulEngine(config ConsulConfig) *ConsulEngine
NewConsulEngine creates and return a new engine configured with config.
func (*ConsulEngine) CheckHealth ¶
func (e *ConsulEngine) CheckHealth(ctx context.Context) error
func (*ConsulEngine) Close ¶
func (e *ConsulEngine) Close() (err error)
func (*ConsulEngine) LookupChannels ¶
func (*ConsulEngine) LookupInfo ¶
func (e *ConsulEngine) LookupInfo(ctx context.Context) (info EngineInfo, err error)
func (*ConsulEngine) LookupNodes ¶
func (e *ConsulEngine) LookupNodes(ctx context.Context) ([]NodeInfo2, error)
func (*ConsulEngine) LookupProducers ¶
func (*ConsulEngine) LookupTopics ¶
func (e *ConsulEngine) LookupTopics(ctx context.Context) (topics []string, err error)
func (*ConsulEngine) RegisterNode ¶
func (*ConsulEngine) TombstoneTopic ¶
type ConsulNode ¶
type ConsulNode struct {
// contains filtered or unexported fields
}
func (*ConsulNode) Info ¶
func (n *ConsulNode) Info() NodeInfo
func (*ConsulNode) RegisterChannel ¶
func (*ConsulNode) RegisterTopic ¶
func (n *ConsulNode) RegisterTopic(ctx context.Context, topic string) error
func (*ConsulNode) String ¶
func (n *ConsulNode) String() string
func (*ConsulNode) Unregister ¶
func (n *ConsulNode) Unregister(ctx context.Context) error
func (*ConsulNode) UnregisterChannel ¶
func (*ConsulNode) UnregisterTopic ¶
func (n *ConsulNode) UnregisterTopic(ctx context.Context, topic string) error
type ConsulRegistry ¶ added in v1.3.0
type ConsulRegistry struct { Address string TTL time.Duration Transport http.RoundTripper }
ConsulRegistry implements a service registry which discovers services from a consul catalog.
type Engine ¶
type Engine interface { // Close should release all internal state maintained by the engine, it is // called when the nsqlookup server using the engine is shutting down. Close() error // RegisterNode is called by nsqlookup servers when a new node is attempting // to register. RegisterNode(ctx context.Context, node NodeInfo) (Node, error) // TombstoneTopic marks topic as tombstoned on node. TombstoneTopic(ctx context.Context, node NodeInfo, topic string) error // LookupNodes must return a list of of all nodes registered on the engine. LookupNodes(ctx context.Context) ([]NodeInfo2, error) // LookupProducers must return a list of all nodes for which topic has been // registered on the engine and were not tombstoned. LookupProducers(ctx context.Context, topic string) ([]NodeInfo, error) // LookupTopics must return a list of all topics registered on the engine. LookupTopics(ctx context.Context) ([]string, error) // LookupChannels must return a list of all channels registered for topic on // the engine. LookupChannels(ctx context.Context, topic string) ([]string, error) // LookupInfo must return information about the engine. LookupInfo(ctx context.Context) (EngineInfo, error) // CheckHealth is called by nsqlookup servers to evaluate the health of the // engine. CheckHealth(ctx context.Context) error }
The Engine interface must be implemented by types that are intended to be used to power nsqlookup servers.
Each method of the engine accepts a context as first argument which may be used to cancel or set a deadline on the operation. This is useful for engines that work we storage services accessed over the network. The context may be nil.
type EngineInfo ¶
type EngineInfo struct { // Type of the engine. Type string `json:"type"` // Version represents the version of the nsqlookup engine. Version string `json:"version"` }
The EngineInfo structure carries information about a nsqlookup engine.
type HTTPHandler ¶
type HTTPHandler struct { // Engine must not be nil and has to be set to the engine that will be used // by the handler to respond to http requests. Engine Engine // EngineTimeout should be set to the maximum duration allowed for engine // operations. EngineTimeout time.Duration // List of user agents to enable zone-awareness for. ZoneAwareAgents []string }
The HTTPHandler satisfies the http.Handler interface and provides the implementation of the nsqlookup http API.
func (HTTPHandler) ServeHTTP ¶
func (h HTTPHandler) ServeHTTP(res http.ResponseWriter, req *http.Request)
type LocalConfig ¶
type LocalConfig struct { // NodeTimeout is the maximum amount of time an idle node will be kept in a // local engine. NodeTimeout time.Duration // TombstoneTimeout is the maximum amount of time a tombstone rule is kept // in a local engine. TombstoneTimeout time.Duration }
The LocalConfig structure is used to configure local nsqlookup engines.
type LocalEngine ¶
type LocalEngine struct {
// contains filtered or unexported fields
}
LocalEngine is a nsqlookup engine that maintain its state in memory.
This is an implementation of the default behavior of nsqlookup servers as provided by the standard implementation, where no state is shared between instances of nsqlookup and the state is disarded when the server goes away.
func NewLocalEngine ¶
func NewLocalEngine(config LocalConfig) *LocalEngine
NewLocalEngine creates and returns an instance of LocalEngine configured with config.
func (*LocalEngine) CheckHealth ¶
func (e *LocalEngine) CheckHealth(ctx context.Context) (err error)
func (*LocalEngine) Close ¶
func (e *LocalEngine) Close() error
func (*LocalEngine) LookupChannels ¶
func (*LocalEngine) LookupInfo ¶
func (e *LocalEngine) LookupInfo(ctx context.Context) (info EngineInfo, err error)
func (*LocalEngine) LookupNodes ¶
func (e *LocalEngine) LookupNodes(ctx context.Context) (nodes []NodeInfo2, err error)
func (*LocalEngine) LookupProducers ¶
func (*LocalEngine) LookupTopics ¶
func (e *LocalEngine) LookupTopics(ctx context.Context) (topics []string, err error)
func (*LocalEngine) RegisterNode ¶
func (*LocalEngine) TombstoneTopic ¶
type LocalNode ¶
type LocalNode struct {
// contains filtered or unexported fields
}
func (*LocalNode) RegisterChannel ¶
func (*LocalNode) RegisterTopic ¶
func (*LocalNode) UnregisterChannel ¶
type LocalRegistry ¶ added in v1.3.0
LocalRegistry is an implementation of a immutable set of services. This type is mostly useful for testing purposes.
type Node ¶
type Node interface { // Info should return the info given to RegisterNode when the node was // created. Info() NodeInfo // Ping is called by nsqlookup servers when a registered node sends a // ping command to inform that it is still alive. Ping(ctx context.Context) error // Unregister is called by nsqlookup servers when a node that had // previously registered is going away. Unregister(ctx context.Context) error // RegisterTopic is called by nsqlookup servers when topic is being // registered on node. RegisterTopic(ctx context.Context, topic string) error // UnregisterTopic is called by nsqlookup servers when topic is being // unregistered from node. UnregisterTopic(ctx context.Context, topic string) error // RegisterChannel is called by nsqlookup servers when channel from topic is // being registered on node. RegisterChannel(ctx context.Context, topic string, channel string) error // UnregisterChannel is called by nsqlookup servers when channel from topic // is being unregistered from node. UnregisterChannel(ctx context.Context, topic string, channel string) error }
The Node interface is used to represent a single node registered within a nsqlookup engine.
type NodeInfo ¶
type NodeInfo struct { // RemoteAddress is the address that the node connected from. RemoteAddress string `json:"remote_address"` // Hostname of the nsqd node. Hostname string `json:"hostname"` // BroadcastAddress is the address advertized by the nsqd node. BroadcastAddress string `json:"broadcast_address"` // TcpPort is the port on which the nsqd node is listening for incoming TCP // connections. TcpPort int `json:"tcp_port"` // HttpPort is the port on which the nsqd node accepts HTTP requests. HttpPort int `json:"http_port"` // Version represents the version of nsqd ran by the node. Version string `json:"version"` }
The NodeInfo structure carries information about a node referenced by a nsqlookup server.
type NodeInfo2 ¶
type NodeInfo2 struct { // RemoteAddress is the address that the node connected from. RemoteAddress string `json:"remote_address"` // Hostname of the nsqd node. Hostname string `json:"hostname"` // BroadcastAddress is the address advertized by the nsqd node. BroadcastAddress string `json:"broadcast_address"` // TcpPort is the port on which the nsqd node is listening for incoming TCP // connections. TcpPort int `json:"tcp_port"` // HttpPort is the port on which the nsqd node accepts HTTP requests. HttpPort int `json:"http_port"` // Version represents the version of nsqd ran by the node. Version string `json:"version"` // Tombstones has items set to true if the topic at the matching index has // been tomstoned. Tombstones []bool `json:"tombstones"` // Topics is the list of topic hosted by the node. Topics []string `json:"topics"` }
The NodeInfo2 structure carries information about a node referenced by a nsqlookup server.
The type is very similar to NodeInfo, but adds a list of tombstones for a node, and a list of topics. The tombstones list carries booleans that tell whether the topic at the matching index has been tombstoned on the node.
type ProxyEngine ¶
type ProxyEngine struct { Transport http.RoundTripper Topology Topology Registry Registry // Name of the nsqlookupd server, defaults to "nsqlookupd". Nsqlookupd string // List of topics for which the proxy applies zone restrictions of consumers // and producers. // // The value may be a magic ["*"] to indicate that the proxy should apply // zone awareness to all topics. ZoneAwareTopics []string }
A ProxyEngine implements the Engine interface an is intended to be used as a frontend to a set of standard nsqlookupd servers to expose them as if they were a single entity.
func (*ProxyEngine) CheckHealth ¶
func (p *ProxyEngine) CheckHealth(ctx context.Context) (err error)
func (*ProxyEngine) Close ¶
func (p *ProxyEngine) Close() error
func (*ProxyEngine) LookupChannels ¶
func (*ProxyEngine) LookupInfo ¶
func (p *ProxyEngine) LookupInfo(ctx context.Context) (info EngineInfo, err error)
func (*ProxyEngine) LookupNodes ¶
func (p *ProxyEngine) LookupNodes(ctx context.Context) (nodes []NodeInfo2, err error)
func (*ProxyEngine) LookupProducers ¶
func (*ProxyEngine) LookupTopics ¶
func (p *ProxyEngine) LookupTopics(ctx context.Context) (topics []string, err error)
func (*ProxyEngine) RegisterNode ¶
func (*ProxyEngine) TombstoneTopic ¶
type RawResponse ¶
type RawResponse []byte
RawResponse is a pre-serialized byte buffer that implements the Response interface.
func (RawResponse) Status ¶
func (RawResponse) Status() string
Status returns the status of the response.
type Registry ¶ added in v1.3.0
type Registry interface { // Lookup returns a set of addresses at which services with the given name // can be reached. // // An arbitrary list of tags can be passed to the method to narrow down the // result set to services matching this set of tags. No tags means to do no // filtering. // // The method also returns a TTL representing how long the result is valid // for. A zero TTL means that the caller should not reuse the result. // // The returned list of addresses must not be retained by implementations of // the Registry interface. The caller becomes the owner of the value after // the method returned. // // A non-nil error is returned when the lookup cannot be completed. // // The context can be used to asynchronously cancel the query when it // involves blocking operations. Lookup(ctx context.Context, name string, tags ...string) (addrs []string, ttl time.Duration, err error) }
type Response ¶
type Response interface { // Status returns the status of the response. Status() string // Write outputs the response to w. Write(w *bufio.Writer) error }
The Response interface is implemented by all types representing nsqlookup server responses.
type Subnet ¶ added in v1.3.0
Subnet represents a network subnet, which is made of a CIDR for the range of IP addresses it contains, and a logical zone name.
type SubnetTopology ¶ added in v1.3.0
type SubnetTopology []Subnet
SubnetTopology is an implementation of the Topology interface working on a static list of subnets.
func (SubnetTopology) LookupIPZone ¶ added in v1.3.0
type TCPHandler ¶
type TCPHandler struct { // Engine must not be nil and has to be set to the engine that will be used // by the handler to register the connections it serves. Engine Engine // The Info field should be set to provide information to the connections // about the discovery endpoint they're connected to. Info NodeInfo // ReadTimeout is the maximum amount of time the handler will allow its // connections to be idle before closing them. ReadTimeout time.Duration // WriteTimeout is the maximum amount of time the handler will take to send // responses to its connections. WriteTimeout time.Duration // EngineTimeout is the maximum amount of time the handler gives to // operations done on the engine. EngineTimeout time.Duration }
The DiscoverHandler type provides the implementation of a connection handler that speaks the nsqlookupd discovery protocol and provides an interface to a nsqlookup engine.
type Topology ¶ added in v1.3.0
Toppology is an interface abstracting the discovery of network topology.
type Unregister ¶
func (Unregister) Name ¶
func (c Unregister) Name() string