Documentation ¶
Index ¶
- Constants
- Variables
- func IsArray(arr []*SyncClient, cli *SyncClient) (int, bool)
- func NewNATSOptions() *server.Options
- func ProcessConfigFile(configFile string, opts *Options) error
- type Channelsz
- type Channelz
- type Clientsz
- type Clientz
- type ClusteringOptions
- type Options
- type Serverz
- type StanServer
- type State
- type Storez
- type Subscriptionz
- type SyncClient
- type TopicGroup
- func (t *TopicGroup) Add(client *SyncClient, isAuto bool)
- func (t *TopicGroup) AddClient(clientId, topic, group, host string, port int, isAuto bool)
- func (t *TopicGroup) DelClient(clientId, topic string, isAuto bool)
- func (t *TopicGroup) Delete(client *SyncClient, isAuto bool)
- func (t *TopicGroup) GetClient(clientID string, subject string) *SyncClient
- func (t *TopicGroup) GetClientGroup(client *SyncClient) []*SyncClient
- func (t *TopicGroup) GetGroupClientsByGroup(group, subject string) []*SyncClient
- func (t *TopicGroup) ToJson() string
Constants ¶
const ( RootPath = "/streaming" ServerPath = RootPath + "/serverz" StorePath = RootPath + "/storez" ClientsPath = RootPath + "/clientsz" ChannelsPath = RootPath + "/channelsz" IsFTActivePath = RootPath + "/isFTActive" )
Routes for the monitoring pages
const ( // VERSION is the current version for the NATS Streaming server. VERSION = "0.18.0" DefaultClusterID = "test-cluster" DefaultDiscoverPrefix = "_STAN.discover" DefaultPubPrefix = "_STAN.pub" DefaultSubPrefix = "_STAN.sub" DefaultSubClosePrefix = "_STAN.subclose" DefaultUnSubPrefix = "_STAN.unsub" DefaultClosePrefix = "_STAN.close" DefaultRequestTopic = "_STAN.client_join_topic" DefaultStoreType = stores.TypeMemory DefaultGroupLimit = 5 // DefaultHeartBeatInterval is the interval at which server sends heartbeat to a client DefaultHeartBeatInterval = 30 * time.Second // DefaultClientHBTimeout is how long server waits for a heartbeat response DefaultClientHBTimeout = 10 * time.Second // DefaultMaxFailedHeartBeats is the number of failed heartbeats before server closes // the client connection (total= (heartbeat interval + heartbeat timeout) * (fail count + 1) DefaultMaxFailedHeartBeats = int((5 * time.Minute) / DefaultHeartBeatInterval) // DefaultIOBatchSize is the maximum number of messages to accumulate before flushing a store. DefaultIOBatchSize = 1024 // DefaultIOSleepTime is the duration (in micro-seconds) the server waits for more messages // before starting processing. Set to 0 (or negative) to disable the wait. DefaultIOSleepTime = int64(0) // DefaultLogCacheSize is the number of Raft log entries to cache in memory // to reduce disk IO. DefaultLogCacheSize = 512 // DefaultLogSnapshots is the number of Raft log snapshots to retain. DefaultLogSnapshots = 2 // DefaultTrailingLogs is the number of log entries to leave after a // snapshot and compaction. DefaultTrailingLogs = 10240 )
Server defaults.
Variables ¶
var ( ErrInvalidSubject = errors.New("stan: invalid subject") ErrInvalidStart = errors.New("stan: invalid start position") ErrInvalidSub = errors.New("stan: invalid subscription") ErrInvalidClient = errors.New("stan: clientID already registered") ErrMissingClient = errors.New("stan: clientID missing") ErrInvalidClientID = errors.New("stan: invalid clientID: only alphanumeric and `-` or `_` characters allowed") ErrInvalidAckWait = errors.New("stan: invalid ack wait time, should be >= 1s") ErrInvalidMaxInflight = errors.New("stan: invalid MaxInflight, should be >= 1") ErrInvalidConnReq = errors.New("stan: invalid connection request") ErrInvalidPubReq = errors.New("stan: invalid publish request") ErrInvalidSubReq = errors.New("stan: invalid subscription request") ErrInvalidUnsubReq = errors.New("stan: invalid unsubscribe request") ErrInvalidCloseReq = errors.New("stan: invalid close request") ErrDupDurable = errors.New("stan: duplicate durable registration") ErrInvalidDurName = errors.New("stan: durable name of a durable queue subscriber can't contain the character ':'") ErrUnknownClient = errors.New("stan: unknown clientID") ErrNoChannel = errors.New("stan: no configured channel") ErrClusteredRestart = errors.New("stan: cannot restart server in clustered mode if it was not previously clustered") ErrChanDelInProgress = errors.New("stan: channel is being deleted") )
Errors.
var DefaultNatsServerOptions = server.Options{ Host: "127.0.0.1", Port: 4222, NoLog: true, NoSigs: true, }
DefaultNatsServerOptions are default options for the NATS server
var ErrTimeout = errors.New("natslog: read timeout")
ErrTimeout reports a read timeout error
Functions ¶
func IsArray ¶
func IsArray(arr []*SyncClient, cli *SyncClient) (int, bool)
judge the client in the array
func NewNATSOptions ¶
NewNATSOptions returns a new instance of (NATS) Options. This is needed if one wants to configure specific NATS options before starting a NATS Streaming Server (with RunServerWithOpts()).
func ProcessConfigFile ¶
ProcessConfigFile parses the configuration file `configFile` and updates the given Streaming options `opts`.
Types ¶
type Channelsz ¶
type Channelsz struct { ClusterID string `json:"cluster_id"` ServerID string `json:"server_id"` Now time.Time `json:"now"` Offset int `json:"offset"` Limit int `json:"limit"` Count int `json:"count"` Total int `json:"total"` Names []string `json:"names,omitempty"` Channels []*Channelz `json:"channels,omitempty"` }
Channelsz lists the name of all NATS Streaming Channelsz
type Channelz ¶
type Channelz struct { Name string `json:"name"` Msgs int `json:"msgs"` Bytes uint64 `json:"bytes"` FirstSeq uint64 `json:"first_seq"` LastSeq uint64 `json:"last_seq"` Subscriptions []*Subscriptionz `json:"subscriptions,omitempty"` }
Channelz describes a NATS Streaming Channel
type Clientsz ¶
type Clientsz struct { ClusterID string `json:"cluster_id"` ServerID string `json:"server_id"` Now time.Time `json:"now"` Offset int `json:"offset"` Limit int `json:"limit"` Count int `json:"count"` Total int `json:"total"` Clients []*Clientz `json:"clients"` }
Clientsz lists the client connections
type Clientz ¶
type Clientz struct { ID string `json:"id"` HBInbox string `json:"hb_inbox"` Subscriptions map[string][]*Subscriptionz `json:"subscriptions,omitempty"` }
Clientz describes a NATS Streaming Client connection
type ClusteringOptions ¶
type ClusteringOptions struct { Clustered bool // Run the server in a clustered configuration. NodeID string // ID of the node within the cluster. Bootstrap bool // Bootstrap the cluster as a seed node if there is no existing state. Peers []string // List of cluster peer node IDs to bootstrap cluster state. RaftLogPath string // Path to Raft log store directory. LogCacheSize int // Number of Raft log entries to cache in memory to reduce disk IO. LogSnapshots int // Number of Raft log snapshots to retain. TrailingLogs int64 // Number of logs left after a snapshot. Sync bool // Do a file sync after every write to the Raft log and message store. RaftLogging bool // Enable logging of Raft library (disabled by default since really verbose). // When a node processes a snapshot (either on startup or if falling behind) and its is // not in phase with the message store's state, it is required to reconcile its state // with the current leader. If it is unable, the node will fail to start or exit. // If all nodes are starting and there is no way to have a leader at this point, // then if this boolean is set to true, then the node will attempt to reconcile but // if it can't it will still proceed. ProceedOnRestoreFailure bool // These will be set to some sane defaults. Change only if experiencing raft issues. RaftHeartbeatTimeout time.Duration RaftElectionTimeout time.Duration RaftLeaseTimeout time.Duration RaftCommitTimeout time.Duration }
ClusteringOptions contains STAN Server options related to clustering.
type Options ¶
type Options struct { ID string DiscoverPrefix string StoreType string FilestoreDir string FileStoreOpts stores.FileStoreOptions SQLStoreOpts stores.SQLStoreOptions stores.StoreLimits // Store limits (MaxChannels, etc..) EnableLogging bool // Enables logging CustomLogger logger.Logger // Server will start with the provided logger Trace bool // Verbose trace Debug bool // Debug trace HandleSignals bool // Should the server setup a signal handler (for Ctrl+C, etc...) Secure bool // Create a TLS enabled connection ClientCert string // Client Certificate for TLS ClientKey string // Client Key for TLS ClientCA string // Client CAs for TLS TLSSkipVerify bool // Skips the server's certificate chain and host name verification (Insecure!) TLSServerName string // Used to verify the hostname returned in the server certificate IOBatchSize int // Maximum number of messages collected from clients before starting their processing. IOSleepTime int64 // Duration (in micro-seconds) the server waits for more message to fill up a batch. NATSServerURL string // URL for external NATS Server to connect to. If empty, NATS Server is embedded. NATSCredentials string // Credentials file for connecting to external NATS Server. ClientHBInterval time.Duration // Interval at which server sends heartbeat to a client. ClientHBTimeout time.Duration // How long server waits for a heartbeat response. ClientHBFailCount int // Number of failed heartbeats before server closes client connection. FTGroupName string // Name of the FT Group. A group can be 2 or more servers with a single active server and all sharing the same datastore. Partitioning bool // Specify if server only accepts messages/subscriptions on channels defined in StoreLimits. SyslogName string // Optional name for the syslog (usueful on Windows when running several servers as a service) Encrypt bool // Specify if server should encrypt messages payload when storing them EncryptionCipher string // Cipher used for encryption. Supported are "AES" and "CHACHA". If none is specified, defaults to AES on platforms with Intel processors, CHACHA otherwise. EncryptionKey []byte // Encryption key. The environment NATS_STREAMING_ENCRYPTION_KEY takes precedence and is the preferred way to provide the key. Clustering ClusteringOptions NATSClientOpts []nats.Option GroupLimit int Sctp bool }
Options for NATS Streaming Server
func ConfigureOptions ¶
func ConfigureOptions(fs *flag.FlagSet, args []string, printVersion, printHelp, printTLSHelp func()) (*Options, *natsd.Options, error)
ConfigureOptions accepts a flag set and augment it with NATS Streaming Server specific flags. It then invokes the corresponding function from NATS Server. On success, Streaming and NATS options structures are returned configured based on the selected flags and/or configuration files. The command line options take precedence to the ones in the configuration files.
func GetDefaultOptions ¶
func GetDefaultOptions() (o *Options)
GetDefaultOptions returns default options for the NATS Streaming Server
type Serverz ¶
type Serverz struct { ClusterID string `json:"cluster_id"` ServerID string `json:"server_id"` Version string `json:"version"` GoVersion string `json:"go"` State string `json:"state"` Role string `json:"role,omitempty"` Now time.Time `json:"now"` Start time.Time `json:"start_time"` Uptime string `json:"uptime"` Clients int `json:"clients"` Subscriptions int `json:"subscriptions"` Channels int `json:"channels"` TotalMsgs int `json:"total_msgs"` TotalBytes uint64 `json:"total_bytes"` InMsgs int64 `json:"in_msgs"` InBytes int64 `json:"in_bytes"` OutMsgs int64 `json:"out_msgs"` OutBytes int64 `json:"out_bytes"` OpenFDs int `json:"open_fds,omitempty"` MaxFDs int `json:"max_fds,omitempty"` }
Serverz describes the NATS Streaming Server
type StanServer ¶
type StanServer struct { // Add Topic Group Map TopicGroupSnapshot *TopicGroup // contains filtered or unexported fields }
StanServer structure represents the NATS Streaming Server
func Run ¶
func Run(sOpts *Options, nOpts *natsd.Options) (*StanServer, error)
Run starts the NATS Streaming server. This wrapper function allows Windows to add a hook for running NATS Streaming as a service.
func RunServer ¶
func RunServer(ID string) (*StanServer, error)
RunServer will startup an embedded NATS Streaming Server and a nats-server to support it.
func RunServerWithOpts ¶
func RunServerWithOpts(stanOpts *Options, natsOpts *server.Options) (newServer *StanServer, returnedError error)
RunServerWithOpts allows you to run a NATS Streaming Server with full control on the Streaming and NATS Server configuration.
func (*StanServer) ClusterID ¶
func (s *StanServer) ClusterID() string
ClusterID returns the NATS Streaming Server's ID.
func (*StanServer) LastError ¶
func (s *StanServer) LastError() error
LastError returns the last fatal error the server experienced.
func (*StanServer) Shutdown ¶
func (s *StanServer) Shutdown()
Shutdown will close our NATS connection and shutdown any embedded NATS server.
func (*StanServer) State ¶
func (s *StanServer) State() State
State returns the state of this server.
type Storez ¶
type Storez struct { ClusterID string `json:"cluster_id"` ServerID string `json:"server_id"` Now time.Time `json:"now"` Type string `json:"type"` Limits stores.StoreLimits `json:"limits"` TotalMsgs int `json:"total_msgs"` TotalBytes uint64 `json:"total_bytes"` }
Storez describes the NATS Streaming Store
type Subscriptionz ¶
type Subscriptionz struct { ClientID string `json:"client_id"` Inbox string `json:"inbox"` AckInbox string `json:"ack_inbox"` DurableName string `json:"durable_name,omitempty"` QueueName string `json:"queue_name,omitempty"` IsDurable bool `json:"is_durable"` IsOffline bool `json:"is_offline"` MaxInflight int `json:"max_inflight"` AckWait int `json:"ack_wait"` LastSent uint64 `json:"last_sent"` PendingCount int `json:"pending_count"` IsStalled bool `json:"is_stalled"` }
Subscriptionz describes a NATS Streaming Subscription
type SyncClient ¶
type TopicGroup ¶
type TopicGroup struct { Limit int `json:"limit"` Clients map[string][][]*SyncClient `json:"clients"` Snapshot map[string][]int `json:"snapshot"` }
func (*TopicGroup) Add ¶
func (t *TopicGroup) Add(client *SyncClient, isAuto bool)
add client to array
func (*TopicGroup) AddClient ¶
func (t *TopicGroup) AddClient(clientId, topic, group, host string, port int, isAuto bool)
add client info to array
func (*TopicGroup) DelClient ¶
func (t *TopicGroup) DelClient(clientId, topic string, isAuto bool)
delete client info from array
func (*TopicGroup) Delete ¶
func (t *TopicGroup) Delete(client *SyncClient, isAuto bool)
delete client from array
func (*TopicGroup) GetClient ¶
func (t *TopicGroup) GetClient(clientID string, subject string) *SyncClient
func (*TopicGroup) GetClientGroup ¶
func (t *TopicGroup) GetClientGroup(client *SyncClient) []*SyncClient
get group client by single client
func (*TopicGroup) GetGroupClientsByGroup ¶
func (t *TopicGroup) GetGroupClientsByGroup(group, subject string) []*SyncClient
get group client by group_name and topic