Documentation
¶
Index ¶
- Constants
- Variables
- func ConfigureLogger(stanOpts *Options, natsOpts *natsd.Options)
- func Debugf(format string, v ...interface{})
- func Errorf(format string, v ...interface{})
- func Fatalf(format string, v ...interface{})
- func Noticef(format string, v ...interface{})
- func ProcessConfigFile(configFile string, opts *Options) error
- func RemoveLogger()
- func Tracef(format string, v ...interface{})
- type Channelsz
- type Channelz
- type Clientsz
- type Clientz
- type Options
- type Serverz
- type StanServer
- type State
- type Storez
- type Subscriptionz
Constants ¶
const ( RootPath = "/streaming" ServerPath = RootPath + "/serverz" StorePath = RootPath + "/storez" ClientsPath = RootPath + "/clientsz" ChannelsPath = RootPath + "/channelsz" )
Routes for the monitoring pages
const ( // VERSION is the current version for the NATS Streaming server. VERSION = "0.5.0" DefaultClusterID = "test-cluster" DefaultDiscoverPrefix = "_STAN.discover" DefaultPubPrefix = "_STAN.pub" DefaultSubPrefix = "_STAN.sub" DefaultSubClosePrefix = "_STAN.subclose" DefaultUnSubPrefix = "_STAN.unsub" DefaultClosePrefix = "_STAN.close" DefaultStoreType = stores.TypeMemory // DefaultHeartBeatInterval is the interval at which server sends heartbeat to a client DefaultHeartBeatInterval = 30 * time.Second // DefaultClientHBTimeout is how long server waits for a heartbeat response DefaultClientHBTimeout = 10 * time.Second // DefaultMaxFailedHeartBeats is the number of failed heartbeats before server closes // the client connection (total= (heartbeat interval + heartbeat timeout) * (fail count + 1) DefaultMaxFailedHeartBeats = int((5 * time.Minute) / DefaultHeartBeatInterval) // DefaultIOBatchSize is the maximum number of messages to accumulate before flushing a store. DefaultIOBatchSize = 1024 // DefaultIOSleepTime is the duration (in micro-seconds) the server waits for more messages // before starting processing. Set to 0 (or negative) to disable the wait. DefaultIOSleepTime = int64(0) )
Server defaults.
const LogPrefix = "STREAM: "
LogPrefix is prefixed to all NATS Streaming log messages
Variables ¶
var ( ErrInvalidSubject = errors.New("stan: invalid subject") ErrInvalidStart = errors.New("stan: invalid start position") ErrInvalidSub = errors.New("stan: invalid subscription") ErrInvalidClient = errors.New("stan: clientID already registered") ErrMissingClient = errors.New("stan: clientID missing") ErrInvalidClientID = errors.New("stan: invalid clientID: only alphanumeric and `-` or `_` characters allowed") ErrInvalidAckWait = errors.New("stan: invalid ack wait time, should be >= 1s") ErrInvalidMaxInflight = errors.New("stan: invalid MaxInflight, should be >= 1") ErrInvalidConnReq = errors.New("stan: invalid connection request") ErrInvalidPubReq = errors.New("stan: invalid publish request") ErrInvalidSubReq = errors.New("stan: invalid subscription request") ErrInvalidUnsubReq = errors.New("stan: invalid unsubscribe request") ErrInvalidCloseReq = errors.New("stan: invalid close request") ErrDupDurable = errors.New("stan: duplicate durable registration") ErrInvalidDurName = errors.New("stan: durable name of a durable queue subscriber can't contain the character ':'") ErrUnknownClient = errors.New("stan: unknown clientID") ErrNoChannel = errors.New("stan: no configured channel") )
Errors.
var DefaultNatsServerOptions = server.Options{ Host: "localhost", Port: 4222, NoLog: true, NoSigs: true, }
DefaultNatsServerOptions are default options for the NATS server
Functions ¶
func ConfigureLogger ¶
ConfigureLogger configures logging for STAN and the embedded NATS server based on options passed.
func ProcessConfigFile ¶ added in v0.3.0
ProcessConfigFile parses the configuration file `configFile` and updates the given Streaming options `opts`.
func RemoveLogger ¶
func RemoveLogger()
RemoveLogger clears the logger instance and debug/trace flags. Used for testing.
Types ¶
type Channelsz ¶ added in v0.5.0
type Channelsz struct { ClusterID string `json:"cluster_id"` ServerID string `json:"server_id"` Now time.Time `json:"now"` Offset int `json:"offset"` Limit int `json:"limit"` Count int `json:"count"` Total int `json:"total"` Names []string `json:"names,omitempty"` Channels []*Channelz `json:"channels,omitempty"` }
Channelsz lists the name of all NATS Streaming Channelsz
type Channelz ¶ added in v0.5.0
type Channelz struct { Name string `json:"name"` Msgs int `json:"msgs"` Bytes uint64 `json:"bytes"` FirstSeq uint64 `json:"first_seq"` LastSeq uint64 `json:"last_seq"` Subscriptions []*Subscriptionz `json:"subscriptions,omitempty"` }
Channelz describes a NATS Streaming Channel
type Clientsz ¶ added in v0.5.0
type Clientsz struct { ClusterID string `json:"cluster_id"` ServerID string `json:"server_id"` Now time.Time `json:"now"` Offset int `json:"offset"` Limit int `json:"limit"` Count int `json:"count"` Total int `json:"total"` Clients []*Clientz `json:"clients"` }
Clientsz lists the client connections
type Clientz ¶ added in v0.5.0
type Clientz struct { ID string `json:"id"` HBInbox string `json:"hb_inbox"` Subscriptions map[string][]*Subscriptionz `json:"subscriptions,omitempty"` }
Clientz describes a NATS Streaming Client connection
type Options ¶
type Options struct { ID string DiscoverPrefix string StoreType string FilestoreDir string FileStoreOpts stores.FileStoreOptions stores.StoreLimits // Store limits (MaxChannels, etc..) Trace bool // Verbose trace Debug bool // Debug trace HandleSignals bool // Should the server setup a signal handler (for Ctrl+C, etc...) Secure bool // Create a TLS enabled connection w/o server verification ClientCert string // Client Certificate for TLS ClientKey string // Client Key for TLS ClientCA string // Client CAs for TLS IOBatchSize int // Maximum number of messages collected from clients before starting their processing. IOSleepTime int64 // Duration (in micro-seconds) the server waits for more message to fill up a batch. NATSServerURL string // URL for external NATS Server to connect to. If empty, NATS Server is embedded. ClientHBInterval time.Duration // Interval at which server sends heartbeat to a client. ClientHBTimeout time.Duration // How long server waits for a heartbeat response. ClientHBFailCount int // Number of failed heartbeats before server closes client connection. AckSubsPoolSize int // Number of internal subscriptions handling incoming ACKs (0 means one per client's subscription). FTGroupName string // Name of the FT Group. A group can be 2 or more servers with a single active server and all sharing the same datastore. Partitioning bool // Specify if server only accepts messages/subscriptions on channels defined in StoreLimits. }
Options for STAN Server
func GetDefaultOptions ¶
func GetDefaultOptions() (o *Options)
GetDefaultOptions returns default options for the STAN server
type Serverz ¶ added in v0.5.0
type Serverz struct { ClusterID string `json:"cluster_id"` ServerID string `json:"server_id"` Version string `json:"version"` GoVersion string `json:"go"` State string `json:"state"` Now time.Time `json:"now"` Start time.Time `json:"start_time"` Uptime string `json:"uptime"` Clients int `json:"clients"` Subscriptions int `json:"subscriptions"` Channels int `json:"channels"` TotalMsgs int `json:"total_msgs"` TotalBytes uint64 `json:"total_bytes"` }
Serverz describes the NATS Streaming Server
type StanServer ¶
type StanServer struct {
// contains filtered or unexported fields
}
StanServer structure represents the STAN server
func RunServer ¶
func RunServer(ID string) (*StanServer, error)
RunServer will startup an embedded STAN server and a nats-server to support it.
func RunServerWithOpts ¶
func RunServerWithOpts(stanOpts *Options, natsOpts *server.Options) (newServer *StanServer, returnedError error)
RunServerWithOpts will startup an embedded STAN server and a nats-server to support it.
func (*StanServer) ClusterID ¶
func (s *StanServer) ClusterID() string
ClusterID returns the STAN Server's ID.
func (*StanServer) LastError ¶ added in v0.4.0
func (s *StanServer) LastError() error
LastError returns the last fatal error the server experienced.
func (*StanServer) Shutdown ¶
func (s *StanServer) Shutdown()
Shutdown will close our NATS connection and shutdown any embedded NATS server.
func (*StanServer) State ¶ added in v0.4.0
func (s *StanServer) State() State
State returns the state of this server.
type Storez ¶ added in v0.5.0
type Storez struct { ClusterID string `json:"cluster_id"` ServerID string `json:"server_id"` Now time.Time `json:"now"` Type string `json:"type"` Limits stores.StoreLimits `json:"limits"` TotalMsgs int `json:"total_msgs"` TotalBytes uint64 `json:"total_bytes"` }
Storez describes the NATS Streaming Store
type Subscriptionz ¶ added in v0.5.0
type Subscriptionz struct { Inbox string `json:"inbox"` AckInbox string `json:"ack_inbox"` DurableName string `json:"durable_name,omitempty"` QueueName string `json:"queue_name,omitempty"` IsDurable bool `json:"is_durable"` MaxInflight int `json:"max_inflight"` AckWait int `json:"ack_wait"` LastSent uint64 `json:"last_sent"` PendingCount int `json:"pending_count"` IsStalled bool `json:"is_stalled"` }
Subscriptionz describes a NATS Streaming Subscription