Documentation ¶
Overview ¶
Package sublist is a routing mechanism to handle subject distribution and provides a facility to match subjects from published messages to interested subscribers. Subscribers can have wildcard subjects to match multiple published subjects.
Index ¶
- Constants
- Variables
- func Debugf(format string, v ...interface{})
- func Errorf(format string, v ...interface{})
- func Fatalf(format string, v ...interface{})
- func GenTLSConfig(tc *TLSConfigOpts) (*tls.Config, error)
- func IsValidLiteralSubject(subject string) bool
- func IsValidSubject(subject string) bool
- func Noticef(format string, v ...interface{})
- func PrintAndDie(msg string)
- func PrintServerAndExit()
- func PrintTLSHelpAndDie()
- func RemoveSelfReference(clusterPort int, routes []*url.URL) ([]*url.URL, error)
- func ResponseHandler(w http.ResponseWriter, r *http.Request, data []byte)
- func RoutesFromStr(routesStr string) []*url.URL
- func Tracef(format string, v ...interface{})
- type Auth
- type ClientAuth
- type ConnInfo
- type Connz
- type Info
- type Logger
- type Options
- type Pair
- type Pairs
- type Permissions
- type RouteInfo
- type RouteType
- type Routez
- type Server
- func (s *Server) AcceptLoop(clr chan struct{})
- func (s *Server) Addr() net.Addr
- func (s *Server) GetListenEndpoint() string
- func (s *Server) GetRouteListenEndpoint() string
- func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request)
- func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request)
- func (s *Server) HandleRoutez(w http.ResponseWriter, r *http.Request)
- func (s *Server) HandleStacksz(w http.ResponseWriter, r *http.Request)
- func (s *Server) HandleSubsz(w http.ResponseWriter, r *http.Request)
- func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request)
- func (s *Server) ID() string
- func (s *Server) NumClients() int
- func (s *Server) NumRemotes() int
- func (s *Server) NumRoutes() int
- func (s *Server) NumSubscriptions() uint32
- func (s *Server) SetClientAuthMethod(authMethod Auth)
- func (s *Server) SetLogger(logger Logger, debugFlag, traceFlag bool)
- func (s *Server) SetRouteAuthMethod(authMethod Auth)
- func (s *Server) Shutdown()
- func (s *Server) Start()
- func (s *Server) StartHTTPMonitoring()
- func (s *Server) StartHTTPSMonitoring()
- func (s *Server) StartProfiler()
- func (s *Server) StartRouting(clientListenReady chan struct{})
- type SortOpt
- type Sublist
- type SublistResult
- type SublistStats
- type Subsz
- type TLSConfigOpts
- type User
- type Varz
Constants ¶
const ( // CLIENT is an end user. CLIENT = iota // ROUTER is another router in the cluster. ROUTER )
Type of client connection.
const ( // Original Client protocol from 2009. // http://nats.io/documentation/internals/nats-protocol/ ClientProtoZero = iota // This signals a client can receive more then the original INFO block. // This can be used to update clients on other cluster members, etc. ClientProtoInfo )
const ( // VERSION is the current version for the server. VERSION = "0.9.2" // DEFAULT_PORT is the default port for client connections. DEFAULT_PORT = 4222 // RANDOM_PORT is the value for port that, when supplied, will cause the // server to listen on a randomly-chosen available port. The resolved port // is available via the Addr() method. RANDOM_PORT = -1 // DEFAULT_HOST defaults to all interfaces. DEFAULT_HOST = "0.0.0.0" // MAX_CONTROL_LINE_SIZE is the maximum allowed protocol control line size. // 1k should be plenty since payloads sans connect string are separate MAX_CONTROL_LINE_SIZE = 1024 // MAX_PAYLOAD_SIZE is the maximum allowed payload size. Should be using // something different if > 1MB payloads are needed. MAX_PAYLOAD_SIZE = (1024 * 1024) // MAX_PENDING_SIZE is the maximum outbound size (in bytes) per client. MAX_PENDING_SIZE = (10 * 1024 * 1024) // DEFAULT_MAX_CONNECTIONS is the default maximum connections allowed. DEFAULT_MAX_CONNECTIONS = (64 * 1024) // TLS_TIMEOUT is the TLS wait time. TLS_TIMEOUT = 500 * time.Millisecond // AUTH_TIMEOUT is the authorization wait time. AUTH_TIMEOUT = 2 * TLS_TIMEOUT // DEFAULT_PING_INTERVAL is how often pings are sent to clients and routes. DEFAULT_PING_INTERVAL = 2 * time.Minute // DEFAULT_PING_MAX_OUT is maximum allowed pings outstanding before disconnect. DEFAULT_PING_MAX_OUT = 2 // CR_LF string CR_LF = "\r\n" // LEN_CR_LF hold onto the computed size. LEN_CR_LF = len(CR_LF) // DEFAULT_FLUSH_DEADLINE is the write/flush deadlines. DEFAULT_FLUSH_DEADLINE = 2 * time.Second // DEFAULT_HTTP_PORT is the default monitoring port. DEFAULT_HTTP_PORT = 8222 // ACCEPT_MIN_SLEEP is the minimum acceptable sleep times on temporary errors. ACCEPT_MIN_SLEEP = 10 * time.Millisecond // ACCEPT_MAX_SLEEP is the maximum acceptable sleep times on temporary errors ACCEPT_MAX_SLEEP = 1 * time.Second // DEFAULT_ROUTE_CONNECT Route solicitation intervals. DEFAULT_ROUTE_CONNECT = 1 * time.Second // DEFAULT_ROUTE_RECONNECT Route reconnect intervals. DEFAULT_ROUTE_RECONNECT = 1 * time.Second // DEFAULT_ROUTE_DIAL Route dial timeout. DEFAULT_ROUTE_DIAL = 1 * time.Second // PROTO_SNIPPET_SIZE is the default size of proto to print on parse errors. PROTO_SNIPPET_SIZE = 32 // MAX_MSG_ARGS Maximum possible number of arguments from MSG proto. MAX_MSG_ARGS = 4 // MAX_PUB_ARGS Maximum possible number of arguments from PUB proto. MAX_PUB_ARGS = 3 )
const ( OP_START = iota OP_PLUS OP_PLUS_O OP_PLUS_OK OP_MINUS OP_MINUS_E OP_MINUS_ER OP_MINUS_ERR OP_MINUS_ERR_SPC MINUS_ERR_ARG OP_C OP_CO OP_CON OP_CONN OP_CONNE OP_CONNEC OP_CONNECT CONNECT_ARG OP_P OP_PU OP_PUB OP_PUB_SPC PUB_ARG OP_PI OP_PIN OP_PING OP_PO OP_PON OP_PONG MSG_PAYLOAD MSG_END OP_S OP_SU OP_SUB OP_SUB_SPC SUB_ARG OP_U OP_UN OP_UNS OP_UNSU OP_UNSUB OP_UNSUB_SPC UNSUB_ARG OP_M OP_MS OP_MSG OP_MSG_SPC MSG_ARG OP_I OP_IN OP_INF OP_INFO INFO_ARG )
Parser constants
const ( ConProto = "CONNECT %s" + _CRLF_ InfoProto = "INFO %s" + _CRLF_ )
Route protocol constants
const ( RSID = "RSID" QRSID = "QRSID" RSID_CID_INDEX = 1 RSID_SID_INDEX = 2 EXPECTED_MATCHES = 3 )
FIXME(dlc) - Make these reserved and reject if they come in as a sid from a client connection. Route constants
const ( RootPath = "/" VarzPath = "/varz" ConnzPath = "/connz" RoutezPath = "/routez" SubszPath = "/subsz" StackszPath = "/stacksz" )
HTTP endpoints
const DefaultConnListSize = 1024
DefaultConnListSize is the default size of the connection list.
Variables ¶
var ( // ErrConnectionClosed represents an error condition on a closed connection. ErrConnectionClosed = errors.New("Connection Closed") // ErrAuthorization represents an error condition on failed authorization. ErrAuthorization = errors.New("Authorization Error") // ErrAuthTimeout represents an error condition on failed authorization due to timeout. ErrAuthTimeout = errors.New("Authorization Timeout") // ErrMaxPayload represents an error condition when the payload is too big. ErrMaxPayload = errors.New("Maximum Payload Exceeded") // ErrMaxControlLine represents an error condition when the control line is too big. ErrMaxControlLine = errors.New("Maximum Control Line Exceeded") // ErrReservedPublishSubject represents an error condition when sending to a reserved subject, e.g. _SYS.> ErrReservedPublishSubject = errors.New("Reserved Internal Subject") // ErrBadClientProtocol signals a client requested an invalud client protocol. ErrBadClientProtocol = errors.New("Invalid Client Protocol") )
var ( ErrInvalidSubject = errors.New("sublist: Invalid Subject") ErrNotFound = errors.New("sublist: No Matches Found") )
Sublist related errors
Functions ¶
func GenTLSConfig ¶ added in v0.7.0
func GenTLSConfig(tc *TLSConfigOpts) (*tls.Config, error)
GenTLSConfig loads TLS related configuration parameters.
func IsValidLiteralSubject ¶ added in v0.8.0
IsValidLiteralSubject returns true if a subject is valid and literal (no wildcards), false otherwise
func IsValidSubject ¶ added in v0.9.2
IsValidSubject returns true if a subject is valid, false otherwise
func Noticef ¶ added in v0.6.0
func Noticef(format string, v ...interface{})
Noticef logs a notice statement
func PrintAndDie ¶
func PrintAndDie(msg string)
PrintAndDie is exported for access in other packages.
func PrintServerAndExit ¶
func PrintServerAndExit()
PrintServerAndExit will print our version and exit.
func PrintTLSHelpAndDie ¶ added in v0.8.0
func PrintTLSHelpAndDie()
PrintTLSHelpAndDie prints TLS usage and exits.
func RemoveSelfReference ¶ added in v0.5.6
RemoveSelfReference removes this server from an array of routes
func ResponseHandler ¶ added in v0.6.6
func ResponseHandler(w http.ResponseWriter, r *http.Request, data []byte)
ResponseHandler handles responses for monitoring routes
func RoutesFromStr ¶ added in v0.6.2
RoutesFromStr parses route URLs from a string
Types ¶
type Auth ¶ added in v0.6.0
type Auth interface { // Check if a client is authorized to connect Check(c ClientAuth) bool }
Auth is an interface for implementing authentication
type ClientAuth ¶ added in v0.6.0
type ClientAuth interface { // Get options associated with a client GetOpts() *clientOpts // Optionally map a user after auth. RegisterUser(*User) }
ClientAuth is an interface for client authentication
type ConnInfo ¶
type ConnInfo struct { Cid uint64 `json:"cid"` IP string `json:"ip"` Port int `json:"port"` Start time.Time `json:"start"` LastActivity time.Time `json:"last_activity"` Uptime string `json:"uptime"` Idle string `json:"idle"` Pending int `json:"pending_bytes"` InMsgs int64 `json:"in_msgs"` OutMsgs int64 `json:"out_msgs"` InBytes int64 `json:"in_bytes"` OutBytes int64 `json:"out_bytes"` NumSubs uint32 `json:"subscriptions"` Name string `json:"name,omitempty"` Lang string `json:"lang,omitempty"` Version string `json:"version,omitempty"` TLSVersion string `json:"tls_version,omitempty"` TLSCipher string `json:"tls_cipher_suite,omitempty"` AuthorizedUser string `json:"authorized_user,omitempty"` Subs []string `json:"subscriptions_list,omitempty"` }
ConnInfo has detailed information on a per connection basis.
type Connz ¶
type Connz struct { Now time.Time `json:"now"` NumConns int `json:"num_connections"` Total int `json:"total"` Offset int `json:"offset"` Limit int `json:"limit"` Conns []ConnInfo `json:"connections"` }
Connz represents detailed information on current client connections.
type Info ¶
type Info struct { ID string `json:"server_id"` Version string `json:"version"` GoVersion string `json:"go"` Host string `json:"host"` Port int `json:"port"` AuthRequired bool `json:"auth_required"` SSLRequired bool `json:"ssl_required"` // DEPRECATED: ssl json used for older clients TLSRequired bool `json:"tls_required"` TLSVerify bool `json:"tls_verify"` MaxPayload int `json:"max_payload"` IP string `json:"ip,omitempty"` ClientConnectURLs []string `json:"connect_urls,omitempty"` // Contains URLs a client can connect to. // contains filtered or unexported fields }
Info is the information sent to clients to help them understand information about this server.
type Logger ¶ added in v0.6.0
type Logger interface { // Log a notice statement Noticef(format string, v ...interface{}) // Log a fatal error Fatalf(format string, v ...interface{}) // Log an error Errorf(format string, v ...interface{}) // Log a debug statement Debugf(format string, v ...interface{}) // Log a trace statement Tracef(format string, v ...interface{}) }
Logger interface of the NATS Server
type Options ¶
type Options struct { Host string `json:"addr"` Port int `json:"port"` Trace bool `json:"-"` Debug bool `json:"-"` NoLog bool `json:"-"` NoSigs bool `json:"-"` Logtime bool `json:"-"` MaxConn int `json:"max_connections"` Users []*User `json:"-"` Username string `json:"-"` Password string `json:"-"` Authorization string `json:"-"` PingInterval time.Duration `json:"ping_interval"` MaxPingsOut int `json:"ping_max"` HTTPHost string `json:"http_host"` HTTPPort int `json:"http_port"` HTTPSPort int `json:"https_port"` AuthTimeout float64 `json:"auth_timeout"` MaxControlLine int `json:"max_control_line"` MaxPayload int `json:"max_payload"` MaxPending int `json:"max_pending_size"` ClusterHost string `json:"addr"` ClusterPort int `json:"cluster_port"` ClusterUsername string `json:"-"` ClusterPassword string `json:"-"` ClusterAuthTimeout float64 `json:"auth_timeout"` ClusterTLSTimeout float64 `json:"-"` ClusterTLSConfig *tls.Config `json:"-"` ClusterListenStr string `json:"-"` ProfPort int `json:"-"` PidFile string `json:"-"` LogFile string `json:"-"` Syslog bool `json:"-"` RemoteSyslog string `json:"-"` Routes []*url.URL `json:"-"` RoutesStr string `json:"-"` TLSTimeout float64 `json:"tls_timeout"` TLS bool `json:"-"` TLSVerify bool `json:"-"` TLSCert string `json:"-"` TLSKey string `json:"-"` TLSCaCert string `json:"-"` TLSConfig *tls.Config `json:"-"` }
Options block for gnatsd server.
func MergeOptions ¶
MergeOptions will merge two options giving preference to the flagOpts if the item is present.
func ProcessConfigFile ¶
ProcessConfigFile processes a configuration file. FIXME(dlc): Hacky
type Pair ¶ added in v0.6.2
type Pair struct { Key *client Val int64 }
Pair type is internally used.
type Permissions ¶ added in v0.9.2
Authorization are the allowed subjects on a per publish or subscribe basis.
type RouteInfo ¶ added in v0.6.0
type RouteInfo struct { Rid uint64 `json:"rid"` RemoteID string `json:"remote_id"` DidSolicit bool `json:"did_solicit"` IsConfigured bool `json:"is_configured"` IP string `json:"ip"` Port int `json:"port"` Pending int `json:"pending_size"` InMsgs int64 `json:"in_msgs"` OutMsgs int64 `json:"out_msgs"` InBytes int64 `json:"in_bytes"` OutBytes int64 `json:"out_bytes"` NumSubs uint32 `json:"subscriptions"` Subs []string `json:"subscriptions_list,omitempty"` }
RouteInfo has detailed information on a per connection basis.
type Routez ¶ added in v0.6.0
type Routez struct { Now time.Time `json:"now"` NumRoutes int `json:"num_routes"` Routes []*RouteInfo `json:"routes"` }
Routez represents detailed information on current client connections.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server is our main struct.
func (*Server) AcceptLoop ¶
func (s *Server) AcceptLoop(clr chan struct{})
AcceptLoop is exported for easier testing.
func (*Server) Addr ¶ added in v0.5.4
Addr will return the net.Addr object for the current listener.
func (*Server) GetListenEndpoint ¶ added in v0.8.0
GetListenEndpoint will return a string of the form host:port suitable for a connect. Will return empty string if the server is not ready to accept client connections.
func (*Server) GetRouteListenEndpoint ¶ added in v0.8.0
GetRouteListenEndpoint will return a string of the form host:port suitable for a connect. Will return empty string if the server is not configured for routing or not ready to accept route connections.
func (*Server) HandleConnz ¶
func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request)
HandleConnz process HTTP requests for connection information.
func (*Server) HandleRoot ¶ added in v0.6.2
func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request)
HandleRoot will show basic info and links to others handlers.
func (*Server) HandleRoutez ¶ added in v0.6.0
func (s *Server) HandleRoutez(w http.ResponseWriter, r *http.Request)
HandleRoutez process HTTP requests for route information.
func (*Server) HandleStacksz ¶ added in v0.8.1
func (s *Server) HandleStacksz(w http.ResponseWriter, r *http.Request)
HandleStacksz processes HTTP requests for getting stacks
func (*Server) HandleSubsz ¶ added in v0.6.0
func (s *Server) HandleSubsz(w http.ResponseWriter, r *http.Request)
HandleSubsz processes HTTP requests for subjects stats.
func (*Server) HandleVarz ¶
func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request)
HandleVarz will process HTTP requests for server information.
func (*Server) NumClients ¶
NumClients will report the number of registered clients.
func (*Server) NumRemotes ¶
NumRemotes will report number of registered remotes.
func (*Server) NumSubscriptions ¶
NumSubscriptions will report how many subscriptions are active.
func (*Server) SetClientAuthMethod ¶ added in v0.8.0
SetClientAuthMethod sets the authentication method for clients.
func (*Server) SetRouteAuthMethod ¶ added in v0.8.0
SetRouteAuthMethod sets the authentication method for routes.
func (*Server) Shutdown ¶
func (s *Server) Shutdown()
Shutdown will shutdown the server instance by kicking out the AcceptLoop and closing all associated clients.
func (*Server) Start ¶
func (s *Server) Start()
Start up the server, this will block. Start via a Go routine if needed.
func (*Server) StartHTTPMonitoring ¶
func (s *Server) StartHTTPMonitoring()
StartHTTPMonitoring will enable the HTTP monitoring port.
func (*Server) StartHTTPSMonitoring ¶ added in v0.7.0
func (s *Server) StartHTTPSMonitoring()
StartHTTPSMonitoring will enable the HTTPS monitoring port.
func (*Server) StartProfiler ¶
func (s *Server) StartProfiler()
StartProfiler is called to enable dynamic profiling.
func (*Server) StartRouting ¶
func (s *Server) StartRouting(clientListenReady chan struct{})
StartRouting will start the accept loop on the cluster host:port and will actively try to connect to listed routes.
type SortOpt ¶ added in v0.6.2
type SortOpt string
SortOpt is a helper type to sort by ConnInfo values
type Sublist ¶ added in v0.8.0
A Sublist stores and efficiently retrieves subscriptions.
func (*Sublist) CacheCount ¶ added in v0.8.0
CacheCount returns the number of result sets in the cache.
func (*Sublist) Match ¶ added in v0.8.0
func (s *Sublist) Match(subject string) *SublistResult
Match will match all entries to the literal subject. It will return a set of results for both normal and queue subscribers.
func (*Sublist) Stats ¶ added in v0.8.0
func (s *Sublist) Stats() *SublistStats
Stats will return a stats structure for the current state.
type SublistResult ¶ added in v0.8.0
type SublistResult struct {
// contains filtered or unexported fields
}
A result structure better optimized for queue subs.
type SublistStats ¶ added in v0.8.0
type SublistStats struct { NumSubs uint32 `json:"num_subscriptions"` NumCache uint32 `json:"num_cache"` NumInserts uint64 `json:"num_inserts"` NumRemoves uint64 `json:"num_removes"` NumMatches uint64 `json:"num_matches"` CacheHitRate float64 `json:"cache_hit_rate"` MaxFanout uint32 `json:"max_fanout"` AvgFanout float64 `json:"avg_fanout"` }
Public stats for the sublist
type Subsz ¶ added in v0.6.0
type Subsz struct {
*SublistStats
}
Subsz represents detail information on current connections.
type TLSConfigOpts ¶ added in v0.7.0
type TLSConfigOpts struct { CertFile string KeyFile string CaFile string Verify bool Timeout float64 Ciphers []uint16 }
TLSConfigOpts holds the parsed tls config information, used with flag parsing
type User ¶ added in v0.8.1
type User struct { Username string `json:"user"` Password string `json:"password"` Permissions *Permissions `json:"permissions"` }
For multiple accounts/users.
type Varz ¶
type Varz struct { *Info *Options Port int `json:"port"` MaxPayload int `json:"max_payload"` Start time.Time `json:"start"` Now time.Time `json:"now"` Uptime string `json:"uptime"` Mem int64 `json:"mem"` Cores int `json:"cores"` CPU float64 `json:"cpu"` Connections int `json:"connections"` TotalConnections uint64 `json:"total_connections"` Routes int `json:"routes"` Remotes int `json:"remotes"` InMsgs int64 `json:"in_msgs"` OutMsgs int64 `json:"out_msgs"` InBytes int64 `json:"in_bytes"` OutBytes int64 `json:"out_bytes"` SlowConsumers int64 `json:"slow_consumers"` Subscriptions uint32 `json:"subscriptions"` HTTPReqStats map[string]uint64 `json:"http_req_stats"` }
Varz will output server information on the monitoring port at /varz.