Documentation ¶
Index ¶
Constants ¶
const ( RootPath = "/streaming" ServerPath = RootPath + "/serverz" StorePath = RootPath + "/storez" ClientsPath = RootPath + "/clientsz" ChannelsPath = RootPath + "/channelsz" )
Routes for the monitoring pages
const ( // VERSION is the current version for the NATS Streaming server. VERSION = "0.6.0" DefaultClusterID = "test-cluster" DefaultDiscoverPrefix = "_STAN.discover" DefaultPubPrefix = "_STAN.pub" DefaultSubPrefix = "_STAN.sub" DefaultSubClosePrefix = "_STAN.subclose" DefaultUnSubPrefix = "_STAN.unsub" DefaultClosePrefix = "_STAN.close" DefaultStoreType = stores.TypeMemory // DefaultHeartBeatInterval is the interval at which server sends heartbeat to a client DefaultHeartBeatInterval = 30 * time.Second // DefaultClientHBTimeout is how long server waits for a heartbeat response DefaultClientHBTimeout = 10 * time.Second // DefaultMaxFailedHeartBeats is the number of failed heartbeats before server closes // the client connection (total= (heartbeat interval + heartbeat timeout) * (fail count + 1) DefaultMaxFailedHeartBeats = int((5 * time.Minute) / DefaultHeartBeatInterval) // DefaultIOBatchSize is the maximum number of messages to accumulate before flushing a store. DefaultIOBatchSize = 1024 // DefaultIOSleepTime is the duration (in micro-seconds) the server waits for more messages // before starting processing. Set to 0 (or negative) to disable the wait. DefaultIOSleepTime = int64(0) )
Server defaults.
Variables ¶
var ( ErrInvalidSubject = errors.New("stan: invalid subject") ErrInvalidStart = errors.New("stan: invalid start position") ErrInvalidSub = errors.New("stan: invalid subscription") ErrInvalidClient = errors.New("stan: clientID already registered") ErrMissingClient = errors.New("stan: clientID missing") ErrInvalidClientID = errors.New("stan: invalid clientID: only alphanumeric and `-` or `_` characters allowed") ErrInvalidAckWait = errors.New("stan: invalid ack wait time, should be >= 1s") ErrInvalidMaxInflight = errors.New("stan: invalid MaxInflight, should be >= 1") ErrInvalidConnReq = errors.New("stan: invalid connection request") ErrInvalidPubReq = errors.New("stan: invalid publish request") ErrInvalidSubReq = errors.New("stan: invalid subscription request") ErrInvalidUnsubReq = errors.New("stan: invalid unsubscribe request") ErrInvalidCloseReq = errors.New("stan: invalid close request") ErrDupDurable = errors.New("stan: duplicate durable registration") ErrInvalidDurName = errors.New("stan: durable name of a durable queue subscriber can't contain the character ':'") ErrUnknownClient = errors.New("stan: unknown clientID") ErrNoChannel = errors.New("stan: no configured channel") )
Errors.
var DefaultNatsServerOptions = server.Options{ Host: "localhost", Port: 4222, NoLog: true, NoSigs: true, }
DefaultNatsServerOptions are default options for the NATS server
Functions ¶
func ProcessConfigFile ¶ added in v0.3.0
ProcessConfigFile parses the configuration file `configFile` and updates the given Streaming options `opts`.
Types ¶
type Channelsz ¶ added in v0.5.0
type Channelsz struct { ClusterID string `json:"cluster_id"` ServerID string `json:"server_id"` Now time.Time `json:"now"` Offset int `json:"offset"` Limit int `json:"limit"` Count int `json:"count"` Total int `json:"total"` Names []string `json:"names,omitempty"` Channels []*Channelz `json:"channels,omitempty"` }
Channelsz lists the name of all NATS Streaming Channelsz
type Channelz ¶ added in v0.5.0
type Channelz struct { Name string `json:"name"` Msgs int `json:"msgs"` Bytes uint64 `json:"bytes"` FirstSeq uint64 `json:"first_seq"` LastSeq uint64 `json:"last_seq"` Subscriptions []*Subscriptionz `json:"subscriptions,omitempty"` }
Channelz describes a NATS Streaming Channel
type Clientsz ¶ added in v0.5.0
type Clientsz struct { ClusterID string `json:"cluster_id"` ServerID string `json:"server_id"` Now time.Time `json:"now"` Offset int `json:"offset"` Limit int `json:"limit"` Count int `json:"count"` Total int `json:"total"` Clients []*Clientz `json:"clients"` }
Clientsz lists the client connections
type Clientz ¶ added in v0.5.0
type Clientz struct { ID string `json:"id"` HBInbox string `json:"hb_inbox"` Subscriptions map[string][]*Subscriptionz `json:"subscriptions,omitempty"` }
Clientz describes a NATS Streaming Client connection
type Options ¶
type Options struct { ID string DiscoverPrefix string StoreType string FilestoreDir string FileStoreOpts stores.FileStoreOptions stores.StoreLimits // Store limits (MaxChannels, etc..) EnableLogging bool // Enables logging CustomLogger logger.Logger // Server will start with the provided logger Trace bool // Verbose trace Debug bool // Debug trace HandleSignals bool // Should the server setup a signal handler (for Ctrl+C, etc...) Secure bool // Create a TLS enabled connection w/o server verification ClientCert string // Client Certificate for TLS ClientKey string // Client Key for TLS ClientCA string // Client CAs for TLS IOBatchSize int // Maximum number of messages collected from clients before starting their processing. IOSleepTime int64 // Duration (in micro-seconds) the server waits for more message to fill up a batch. NATSServerURL string // URL for external NATS Server to connect to. If empty, NATS Server is embedded. ClientHBInterval time.Duration // Interval at which server sends heartbeat to a client. ClientHBTimeout time.Duration // How long server waits for a heartbeat response. ClientHBFailCount int // Number of failed heartbeats before server closes client connection. AckSubsPoolSize int // Number of internal subscriptions handling incoming ACKs (0 means one per client's subscription). FTGroupName string // Name of the FT Group. A group can be 2 or more servers with a single active server and all sharing the same datastore. Partitioning bool // Specify if server only accepts messages/subscriptions on channels defined in StoreLimits. }
Options for STAN Server
func ConfigureOptions ¶ added in v0.6.0
func ConfigureOptions(fs *flag.FlagSet, args []string, printVersion, printHelp, printTLSHelp func()) (*Options, *natsd.Options, error)
ConfigureOptions accepts a flag set and augment it with NATS Streaming Server specific flags. It then invokes the corresponding function from NATS Server. On success, Streaming and NATS options structures are returned configured based on the selected flags and/or configuration files. The command line options take precedence to the ones in the configuration files.
func GetDefaultOptions ¶
func GetDefaultOptions() (o *Options)
GetDefaultOptions returns default options for the STAN server
type Serverz ¶ added in v0.5.0
type Serverz struct { ClusterID string `json:"cluster_id"` ServerID string `json:"server_id"` Version string `json:"version"` GoVersion string `json:"go"` State string `json:"state"` Now time.Time `json:"now"` Start time.Time `json:"start_time"` Uptime string `json:"uptime"` Clients int `json:"clients"` Subscriptions int `json:"subscriptions"` Channels int `json:"channels"` TotalMsgs int `json:"total_msgs"` TotalBytes uint64 `json:"total_bytes"` }
Serverz describes the NATS Streaming Server
type StanServer ¶
type StanServer struct {
// contains filtered or unexported fields
}
StanServer structure represents the STAN server
func RunServer ¶
func RunServer(ID string) (*StanServer, error)
RunServer will startup an embedded STAN server and a nats-server to support it.
func RunServerWithOpts ¶
func RunServerWithOpts(stanOpts *Options, natsOpts *server.Options) (newServer *StanServer, returnedError error)
RunServerWithOpts will startup an embedded STAN server and a nats-server to support it.
func (*StanServer) ClusterID ¶
func (s *StanServer) ClusterID() string
ClusterID returns the STAN Server's ID.
func (*StanServer) LastError ¶ added in v0.4.0
func (s *StanServer) LastError() error
LastError returns the last fatal error the server experienced.
func (*StanServer) Shutdown ¶
func (s *StanServer) Shutdown()
Shutdown will close our NATS connection and shutdown any embedded NATS server.
func (*StanServer) State ¶ added in v0.4.0
func (s *StanServer) State() State
State returns the state of this server.
type Storez ¶ added in v0.5.0
type Storez struct { ClusterID string `json:"cluster_id"` ServerID string `json:"server_id"` Now time.Time `json:"now"` Type string `json:"type"` Limits stores.StoreLimits `json:"limits"` TotalMsgs int `json:"total_msgs"` TotalBytes uint64 `json:"total_bytes"` }
Storez describes the NATS Streaming Store
type Subscriptionz ¶ added in v0.5.0
type Subscriptionz struct { Inbox string `json:"inbox"` AckInbox string `json:"ack_inbox"` DurableName string `json:"durable_name,omitempty"` QueueName string `json:"queue_name,omitempty"` IsDurable bool `json:"is_durable"` IsOffline bool `json:"is_offline"` MaxInflight int `json:"max_inflight"` AckWait int `json:"ack_wait"` LastSent uint64 `json:"last_sent"` PendingCount int `json:"pending_count"` IsStalled bool `json:"is_stalled"` }
Subscriptionz describes a NATS Streaming Subscription