Documentation ¶
Overview ¶
A Go client for the NATS messaging system (https://nats.io).
Index ¶
- Constants
- Variables
- func NewInbox() string
- func RegisterEncoder(encType string, enc Encoder)
- type Conn
- func (nc *Conn) AuthRequired() bool
- func (nc *Conn) Buffered() (int, error)
- func (nc *Conn) ChanQueueSubscribe(subj, group string, ch chan *Msg) (*Subscription, error)
- func (nc *Conn) ChanSubscribe(subj string, ch chan *Msg) (*Subscription, error)
- func (nc *Conn) Close()
- func (nc *Conn) ConnectedServerId() string
- func (nc *Conn) ConnectedUrl() string
- func (nc *Conn) DiscoveredServers() []string
- func (nc *Conn) Flush() error
- func (nc *Conn) FlushTimeout(timeout time.Duration) (err error)
- func (nc *Conn) IsClosed() bool
- func (nc *Conn) IsConnected() bool
- func (nc *Conn) IsReconnecting() bool
- func (nc *Conn) LastError() error
- func (nc *Conn) MaxPayload() int64
- func (nc *Conn) Publish(subj string, data []byte) error
- func (nc *Conn) PublishMsg(m *Msg) error
- func (nc *Conn) PublishRequest(subj, reply string, data []byte) error
- func (nc *Conn) QueueSubscribe(subj, queue string, cb MsgHandler) (*Subscription, error)
- func (nc *Conn) QueueSubscribeSync(subj, queue string) (*Subscription, error)
- func (nc *Conn) QueueSubscribeSyncWithChan(subj, queue string, ch chan *Msg) (*Subscription, error)
- func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (*Msg, error)
- func (nc *Conn) Servers() []string
- func (nc *Conn) SetClosedHandler(cb ConnHandler)
- func (nc *Conn) SetDisconnectHandler(dcb ConnHandler)
- func (nc *Conn) SetErrorHandler(cb ErrHandler)
- func (nc *Conn) SetReconnectHandler(rcb ConnHandler)
- func (nc *Conn) Stats() Statistics
- func (nc *Conn) Status() Status
- func (nc *Conn) Subscribe(subj string, cb MsgHandler) (*Subscription, error)
- func (nc *Conn) SubscribeSync(subj string) (*Subscription, error)
- func (nc *Conn) TLSRequired() bool
- type ConnHandler
- type EncodedConn
- func (c *EncodedConn) BindRecvChan(subject string, channel interface{}) (*Subscription, error)
- func (c *EncodedConn) BindRecvQueueChan(subject, queue string, channel interface{}) (*Subscription, error)
- func (c *EncodedConn) BindSendChan(subject string, channel interface{}) error
- func (c *EncodedConn) Close()
- func (c *EncodedConn) Flush() error
- func (c *EncodedConn) FlushTimeout(timeout time.Duration) (err error)
- func (c *EncodedConn) LastError() error
- func (c *EncodedConn) Publish(subject string, v interface{}) error
- func (c *EncodedConn) PublishRequest(subject, reply string, v interface{}) error
- func (c *EncodedConn) QueueSubscribe(subject, queue string, cb Handler) (*Subscription, error)
- func (c *EncodedConn) Request(subject string, v interface{}, vPtr interface{}, timeout time.Duration) error
- func (c *EncodedConn) Subscribe(subject string, cb Handler) (*Subscription, error)
- type Encoder
- type ErrHandler
- type Handler
- type Msg
- type MsgHandler
- type Option
- func ClientCert(certFile, keyFile string) Option
- func ClosedHandler(cb ConnHandler) Option
- func Dialer(dialer *net.Dialer) Option
- func DisconnectHandler(cb ConnHandler) Option
- func DontRandomize() Option
- func ErrorHandler(cb ErrHandler) Option
- func MaxReconnects(max int) Option
- func Name(name string) Option
- func NoReconnect() Option
- func ReconnectHandler(cb ConnHandler) Option
- func ReconnectWait(t time.Duration) Option
- func RootCAs(file ...string) Option
- func Secure(tls ...*tls.Config) Option
- func Timeout(t time.Duration) Option
- func Token(token string) Option
- func UserInfo(user, password string) Option
- type Options
- type Statistics
- type Status
- type Subscription
- func (s *Subscription) AutoUnsubscribe(max int) error
- func (s *Subscription) ClearMaxPending() error
- func (s *Subscription) Delivered() (int64, error)
- func (s *Subscription) Dropped() (int, error)
- func (s *Subscription) IsValid() bool
- func (s *Subscription) MaxPending() (int, int, error)
- func (s *Subscription) NextMsg(timeout time.Duration) (*Msg, error)
- func (s *Subscription) Pending() (int, int, error)
- func (s *Subscription) PendingLimits() (int, int, error)
- func (s *Subscription) QueuedMsgs() (int, error)
- func (s *Subscription) SetPendingLimits(msgLimit, bytesLimit int) error
- func (s *Subscription) Type() SubscriptionType
- func (s *Subscription) Unsubscribe() error
- type SubscriptionType
Examples ¶
- Conn.Close
- Conn.Flush
- Conn.FlushTimeout
- Conn.Publish
- Conn.PublishMsg
- Conn.QueueSubscribe
- Conn.Request
- Conn.Subscribe
- Conn.SubscribeSync
- Connect
- EncodedConn.BindRecvChan
- EncodedConn.BindSendChan
- EncodedConn.Publish
- EncodedConn.Subscribe
- NewEncodedConn
- Subscription.AutoUnsubscribe
- Subscription.NextMsg
- Subscription.Unsubscribe
Constants ¶
const ( JSON_ENCODER = "json" GOB_ENCODER = "gob" DEFAULT_ENCODER = "default" )
Indexe names into the Registered Encoders.
const ( Version = "1.2.2" DefaultURL = "nats://localhost:4222" DefaultPort = 4222 DefaultMaxReconnect = 60 DefaultReconnectWait = 2 * time.Second DefaultTimeout = 2 * time.Second DefaultPingInterval = 2 * time.Minute DefaultMaxPingOut = 2 DefaultMaxChanLen = 8192 // 8k DefaultReconnectBufSize = 8 * 1024 * 1024 // 8MB RequestChanLen = 8 LangString = "go" )
Default Constants
const ( DISCONNECTED = Status(iota) CONNECTED CLOSED RECONNECTING CONNECTING )
const ( AsyncSubscription = SubscriptionType(iota) SyncSubscription ChanSubscription NilSubscription )
The different types of subscription types.
const ( DefaultSubPendingMsgsLimit = 65536 DefaultSubPendingBytesLimit = 65536 * 1024 )
Pending Limits
const ( OP_START = iota OP_PLUS OP_PLUS_O OP_PLUS_OK OP_MINUS OP_MINUS_E OP_MINUS_ER OP_MINUS_ERR OP_MINUS_ERR_SPC MINUS_ERR_ARG OP_M OP_MS OP_MSG OP_MSG_SPC MSG_ARG MSG_PAYLOAD MSG_END OP_P OP_PI OP_PIN OP_PING OP_PO OP_PON OP_PONG OP_I OP_IN OP_INF OP_INFO OP_INFO_SPC INFO_ARG )
const InboxPrefix = "_INBOX."
InboxPrefix is the prefix for all inbox subjects.
const MAX_CONTROL_LINE_SIZE = 1024
const PERMISSIONS_ERR = "permissions violation"
PERMISSIONS_ERR is for when nats server subject authorization has failed.
const STALE_CONNECTION = "stale connection"
STALE_CONNECTION is for detection and proper handling of stale connections.
Variables ¶
var ( ErrConnectionClosed = errors.New("nats: connection closed") ErrSecureConnRequired = errors.New("nats: secure connection required") ErrSecureConnWanted = errors.New("nats: secure connection not available") ErrBadSubscription = errors.New("nats: invalid subscription") ErrTypeSubscription = errors.New("nats: invalid subscription type") ErrBadSubject = errors.New("nats: invalid subject") ErrSlowConsumer = errors.New("nats: slow consumer, messages dropped") ErrTimeout = errors.New("nats: timeout") ErrBadTimeout = errors.New("nats: timeout invalid") ErrAuthorization = errors.New("nats: authorization violation") ErrNoServers = errors.New("nats: no servers available for connection") ErrJsonParse = errors.New("nats: connect message, json parse error") ErrChanArg = errors.New("nats: argument needs to be a channel type") ErrMaxPayload = errors.New("nats: maximum payload exceeded") ErrMaxMessages = errors.New("nats: maximum messages delivered") ErrSyncSubRequired = errors.New("nats: illegal call on an async subscription") ErrMultipleTLSConfigs = errors.New("nats: multiple tls.Configs not allowed") ErrNoInfoReceived = errors.New("nats: protocol exception, INFO not received") ErrReconnectBufExceeded = errors.New("nats: outbound buffer limit exceeded") ErrInvalidConnection = errors.New("nats: invalid connection") ErrInvalidMsg = errors.New("nats: invalid message or message nil") ErrInvalidArg = errors.New("nats: invalid argument") ErrStaleConnection = errors.New("nats: " + STALE_CONNECTION) )
Errors
var DefaultOptions = Options{ AllowReconnect: true, MaxReconnect: DefaultMaxReconnect, ReconnectWait: DefaultReconnectWait, Timeout: DefaultTimeout, PingInterval: DefaultPingInterval, MaxPingsOut: DefaultMaxPingOut, SubChanLen: DefaultMaxChanLen, ReconnectBufSize: DefaultReconnectBufSize, Dialer: &net.Dialer{ Timeout: DefaultTimeout, }, }
Functions ¶
func NewInbox ¶
func NewInbox() string
NewInbox will return an inbox string which can be used for directed replies from subscribers. These are guaranteed to be unique, but can be shared and subscribed to by others.
func RegisterEncoder ¶
RegisterEncoder will register the encType with the given Encoder. Useful for customization.
Types ¶
type Conn ¶
type Conn struct { Statistics Opts Options // contains filtered or unexported fields }
A Conn represents a bare connection to a nats-server. It can send and receive []byte payloads.
func Connect ¶
Connect will attempt to connect to the NATS system. The url can contain username/password semantics. e.g. nats://derek:pass@localhost:4222 Comma separated arrays are also supported, e.g. urlA, urlB. Options start with the defaults but can be overridden.
Example ¶
Shows different ways to create a Conn
package main import ( "time" "github.com/nats-io/nats" ) func main() { nc, _ := nats.Connect(nats.DefaultURL) nc.Close() nc, _ = nats.Connect("nats://derek:secretpassword@demo.nats.io:4222") nc.Close() nc, _ = nats.Connect("tls://derek:secretpassword@demo.nats.io:4443") nc.Close() opts := nats.Options{ AllowReconnect: true, MaxReconnect: 10, ReconnectWait: 5 * time.Second, Timeout: 1 * time.Second, } nc, _ = opts.Connect() nc.Close() }
Output:
func (*Conn) AuthRequired ¶ added in v1.2.0
AuthRequired will return if the connected server requires authorization.
func (*Conn) Buffered ¶ added in v1.1.2
Buffered will return the number of bytes buffered to be sent to the server. FIXME(dlc) take into account disconnected state.
func (*Conn) ChanQueueSubscribe ¶ added in v1.2.0
func (nc *Conn) ChanQueueSubscribe(subj, group string, ch chan *Msg) (*Subscription, error)
ChanQueueSubscribe will place all messages received on the channel. You should not close the channel until sub.Unsubscribe() has been called.
func (*Conn) ChanSubscribe ¶ added in v1.2.0
func (nc *Conn) ChanSubscribe(subj string, ch chan *Msg) (*Subscription, error)
ChanSubscribe will place all messages received on the channel. You should not close the channel until sub.Unsubscribe() has been called.
func (*Conn) Close ¶
func (nc *Conn) Close()
Close will close the connection to the server. This call will release all blocking calls, such as Flush() and NextMsg()
Example ¶
package main import ( "github.com/nats-io/nats" ) func main() { nc, _ := nats.Connect(nats.DefaultURL) nc.Close() }
Output:
func (*Conn) ConnectedServerId ¶ added in v1.0.5
Report the connected server's Id
func (*Conn) DiscoveredServers ¶ added in v1.2.2
DiscoveredServers returns only the server urls that have been discovered after a connection has been established. If authentication is enabled, use UserInfo or Token when connecting with these urls.
func (*Conn) Flush ¶
Flush will perform a round trip to the server and return when it receives the internal reply.
Example ¶
package main import ( "github.com/nats-io/nats" ) func main() { nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() msg := &nats.Msg{Subject: "foo", Reply: "bar", Data: []byte("Hello World!")} for i := 0; i < 1000; i++ { nc.PublishMsg(msg) } err := nc.Flush() if err == nil { // Everything has been processed by the server for nc *Conn. } }
Output:
func (*Conn) FlushTimeout ¶
FlushTimeout allows a Flush operation to have an associated timeout.
Example ¶
package main import ( "time" "github.com/nats-io/nats" ) func main() { nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() msg := &nats.Msg{Subject: "foo", Reply: "bar", Data: []byte("Hello World!")} for i := 0; i < 1000; i++ { nc.PublishMsg(msg) } // Only wait for up to 1 second for Flush err := nc.FlushTimeout(1 * time.Second) if err == nil { // Everything has been processed by the server for nc *Conn. } }
Output:
func (*Conn) IsConnected ¶ added in v1.2.2
IsConnected tests if a Conn is connected.
func (*Conn) IsReconnecting ¶
IsReconnecting tests if a Conn is reconnecting.
func (*Conn) LastError ¶
LastError reports the last error encountered via the connection. It can be used reliably within ClosedCB in order to find out reason why connection was closed for example.
func (*Conn) MaxPayload ¶ added in v1.1.2
MaxPayload returns the size limit that a message payload can have. This is set by the server configuration and delivered to the client upon connect.
func (*Conn) Publish ¶
Publish publishes the data argument to the given subject. The data argument is left untouched and needs to be correctly interpreted on the receiver.
Example ¶
package main import ( "github.com/nats-io/nats" ) func main() { nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() nc.Publish("foo", []byte("Hello World!")) }
Output:
func (*Conn) PublishMsg ¶
PublishMsg publishes the Msg structure, which includes the Subject, an optional Reply and an optional Data field.
Example ¶
package main import ( "github.com/nats-io/nats" ) func main() { nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() msg := &nats.Msg{Subject: "foo", Reply: "bar", Data: []byte("Hello World!")} nc.PublishMsg(msg) }
Output:
func (*Conn) PublishRequest ¶
PublishRequest will perform a Publish() excpecting a response on the reply subject. Use Request() for automatically waiting for a response inline.
func (*Conn) QueueSubscribe ¶
func (nc *Conn) QueueSubscribe(subj, queue string, cb MsgHandler) (*Subscription, error)
QueueSubscribe creates an asynchronous queue subscriber on the given subject. All subscribers with the same queue name will form the queue group and only one member of the group will be selected to receive any given message asynchronously.
Example ¶
package main import ( "github.com/nats-io/nats" ) func main() { nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() received := 0 nc.QueueSubscribe("foo", "worker_group", func(_ *nats.Msg) { received++ }) }
Output:
func (*Conn) QueueSubscribeSync ¶
func (nc *Conn) QueueSubscribeSync(subj, queue string) (*Subscription, error)
QueueSubscribeSync creates a synchronous queue subscriber on the given subject. All subscribers with the same queue name will form the queue group and only one member of the group will be selected to receive any given message synchronously.
func (*Conn) QueueSubscribeSyncWithChan ¶ added in v1.2.0
func (nc *Conn) QueueSubscribeSyncWithChan(subj, queue string, ch chan *Msg) (*Subscription, error)
QueueSubscribeSyncWithChan is syntactic sugar for ChanQueueSubscribe(subject, group, ch).
func (*Conn) Request ¶
Request will create an Inbox and perform a Request() call with the Inbox reply and return the first reply received. This is optimized for the case of multiple responses.
Example ¶
package main import ( "time" "github.com/nats-io/nats" ) func main() { nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() nc.Subscribe("foo", func(m *nats.Msg) { nc.Publish(m.Reply, []byte("I will help you")) }) nc.Request("foo", []byte("help"), 50*time.Millisecond) }
Output:
func (*Conn) Servers ¶ added in v1.2.2
Servers returns the list of known server urls, including additional servers discovered after a connection has been established. If authentication is enabled, use UserInfo or Token when connecting with these urls.
func (*Conn) SetClosedHandler ¶ added in v1.2.0
func (nc *Conn) SetClosedHandler(cb ConnHandler)
SetClosedHandler will set the reconnect event handler.
func (*Conn) SetDisconnectHandler ¶ added in v1.2.0
func (nc *Conn) SetDisconnectHandler(dcb ConnHandler)
SetDisconnectHandler will set the disconnect event handler.
func (*Conn) SetErrorHandler ¶ added in v1.2.0
func (nc *Conn) SetErrorHandler(cb ErrHandler)
SetErrHandler will set the async error handler.
func (*Conn) SetReconnectHandler ¶ added in v1.2.0
func (nc *Conn) SetReconnectHandler(rcb ConnHandler)
SetReconnectHandler will set the reconnect event handler.
func (*Conn) Stats ¶
func (nc *Conn) Stats() Statistics
Stats will return a race safe copy of the Statistics section for the connection.
func (*Conn) Subscribe ¶
func (nc *Conn) Subscribe(subj string, cb MsgHandler) (*Subscription, error)
Subscribe will express interest in the given subject. The subject can have wildcards (partial:*, full:>). Messages will be delivered to the associated MsgHandler. If no MsgHandler is given, the subscription is a synchronous subscription and can be polled via Subscription.NextMsg().
Example ¶
This Example shows an asynchronous subscriber.
package main import ( "fmt" "github.com/nats-io/nats" ) func main() { nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() nc.Subscribe("foo", func(m *nats.Msg) { fmt.Printf("Received a message: %s\n", string(m.Data)) }) }
Output:
func (*Conn) SubscribeSync ¶
func (nc *Conn) SubscribeSync(subj string) (*Subscription, error)
SubscribeSync is syntactic sugar for Subscribe(subject, nil).
Example ¶
This Example shows a synchronous subscriber.
package main import ( "fmt" "time" "github.com/nats-io/nats" ) func main() { nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() sub, _ := nc.SubscribeSync("foo") m, err := sub.NextMsg(1 * time.Second) if err == nil { fmt.Printf("Received a message: %s\n", string(m.Data)) } else { fmt.Println("NextMsg timed out.") } }
Output:
func (*Conn) TLSRequired ¶ added in v1.2.0
TLSRequired will return if the connected server requires TLS connections.
type ConnHandler ¶
type ConnHandler func(*Conn)
ConnHandler is used for asynchronous events such as disconnected and closed connections.
type EncodedConn ¶
EncodedConn are the preferred way to interface with NATS. They wrap a bare connection to a nats server and have an extendable encoder system that will encode and decode messages from raw Go types.
func NewEncodedConn ¶
func NewEncodedConn(c *Conn, encType string) (*EncodedConn, error)
NewEncodedConn will wrap an existing Connection and utilize the appropriate registered encoder.
Example ¶
Shows how to wrap a Conn into an EncodedConn
package main import ( "github.com/nats-io/nats" ) func main() { nc, _ := nats.Connect(nats.DefaultURL) c, _ := nats.NewEncodedConn(nc, "json") c.Close() }
Output:
func (*EncodedConn) BindRecvChan ¶
func (c *EncodedConn) BindRecvChan(subject string, channel interface{}) (*Subscription, error)
BindRecvChan binds a channel for receive operations from NATS.
Example ¶
BindRecvChan() allows binding of a Go channel to a nats subject for subscribe operations. The Encoder attached to the EncodedConn will be used for un-marshalling.
package main import ( "fmt" "github.com/nats-io/nats" ) func main() { nc, _ := nats.Connect(nats.DefaultURL) c, _ := nats.NewEncodedConn(nc, "json") defer c.Close() type person struct { Name string Address string Age int } ch := make(chan *person) c.BindRecvChan("hello", ch) me := &person{Name: "derek", Age: 22, Address: "85 Second St"} c.Publish("hello", me) // Receive the publish directly on a channel who := <-ch fmt.Printf("%v says hello!\n", who) }
Output:
func (*EncodedConn) BindRecvQueueChan ¶
func (c *EncodedConn) BindRecvQueueChan(subject, queue string, channel interface{}) (*Subscription, error)
BindRecvQueueChan binds a channel for queue-based receive operations from NATS.
func (*EncodedConn) BindSendChan ¶
func (c *EncodedConn) BindSendChan(subject string, channel interface{}) error
BindSendChan binds a channel for send operations to NATS.
Example ¶
BindSendChan() allows binding of a Go channel to a nats subject for publish operations. The Encoder attached to the EncodedConn will be used for marshalling.
package main import ( "github.com/nats-io/nats" ) func main() { nc, _ := nats.Connect(nats.DefaultURL) c, _ := nats.NewEncodedConn(nc, "json") defer c.Close() type person struct { Name string Address string Age int } ch := make(chan *person) c.BindSendChan("hello", ch) me := &person{Name: "derek", Age: 22, Address: "85 Second St"} ch <- me }
Output:
func (*EncodedConn) Close ¶
func (c *EncodedConn) Close()
Close will close the connection to the server. This call will release all blocking calls, such as Flush(), etc.
func (*EncodedConn) Flush ¶
func (c *EncodedConn) Flush() error
Flush will perform a round trip to the server and return when it receives the internal reply.
func (*EncodedConn) FlushTimeout ¶
func (c *EncodedConn) FlushTimeout(timeout time.Duration) (err error)
FlushTimeout allows a Flush operation to have an associated timeout.
func (*EncodedConn) LastError ¶
func (c *EncodedConn) LastError() error
LastError reports the last error encountered via the Connection.
func (*EncodedConn) Publish ¶
func (c *EncodedConn) Publish(subject string, v interface{}) error
Publish publishes the data argument to the given subject. The data argument will be encoded using the associated encoder.
Example ¶
EncodedConn can publish virtually anything just by passing it in. The encoder will be used to properly encode the raw Go type
package main import ( "github.com/nats-io/nats" ) func main() { nc, _ := nats.Connect(nats.DefaultURL) c, _ := nats.NewEncodedConn(nc, "json") defer c.Close() type person struct { Name string Address string Age int } me := &person{Name: "derek", Age: 22, Address: "85 Second St"} c.Publish("hello", me) }
Output:
func (*EncodedConn) PublishRequest ¶
func (c *EncodedConn) PublishRequest(subject, reply string, v interface{}) error
PublishRequest will perform a Publish() expecting a response on the reply subject. Use Request() for automatically waiting for a response inline.
func (*EncodedConn) QueueSubscribe ¶
func (c *EncodedConn) QueueSubscribe(subject, queue string, cb Handler) (*Subscription, error)
QueueSubscribe will create a queue subscription on the given subject and process incoming messages using the specified Handler. The Handler should be a func that matches a signature from the description of Handler from above.
func (*EncodedConn) Request ¶
func (c *EncodedConn) Request(subject string, v interface{}, vPtr interface{}, timeout time.Duration) error
Request will create an Inbox and perform a Request() call with the Inbox reply for the data v. A response will be decoded into the vPtrResponse.
func (*EncodedConn) Subscribe ¶
func (c *EncodedConn) Subscribe(subject string, cb Handler) (*Subscription, error)
Subscribe will create a subscription on the given subject and process incoming messages using the specified Handler. The Handler should be a func that matches a signature from the description of Handler from above.
Example ¶
EncodedConn's subscribers will automatically decode the wire data into the requested Go type using the Decode() method of the registered Encoder. The callback signature can also vary to include additional data, such as subject and reply subjects.
package main import ( "fmt" "github.com/nats-io/nats" ) func main() { nc, _ := nats.Connect(nats.DefaultURL) c, _ := nats.NewEncodedConn(nc, "json") defer c.Close() type person struct { Name string Address string Age int } c.Subscribe("hello", func(p *person) { fmt.Printf("Received a person! %+v\n", p) }) c.Subscribe("hello", func(subj, reply string, p *person) { fmt.Printf("Received a person on subject %s! %+v\n", subj, p) }) me := &person{Name: "derek", Age: 22, Address: "85 Second St"} c.Publish("hello", me) }
Output:
type Encoder ¶
type Encoder interface { Encode(subject string, v interface{}) ([]byte, error) Decode(subject string, data []byte, vPtr interface{}) error }
Encoder interface is for all register encoders
func EncoderForType ¶
EncoderForType will return the registered Encoder for the encType.
type ErrHandler ¶
type ErrHandler func(*Conn, *Subscription, error)
ErrHandler is used to process asynchronous errors encountered while processing inbound messages.
type Handler ¶
type Handler interface{}
Handler is a specific callback used for Subscribe. It is generalized to an interface{}, but we will discover its format and arguments at runtime and perform the correct callback, including de-marshalling JSON strings back into the appropriate struct based on the signature of the Handler.
Handlers are expected to have one of four signatures.
type person struct { Name string `json:"name,omitempty"` Age uint `json:"age,omitempty"` } handler := func(m *Msg) handler := func(p *person) handler := func(subject string, o *obj) handler := func(subject, reply string, o *obj)
These forms allow a callback to request a raw Msg ptr, where the processing of the message from the wire is untouched. Process a JSON representation and demarshal it into the given struct, e.g. person. There are also variants where the callback wants either the subject, or the subject and the reply subject.
type Msg ¶
type Msg struct { Subject string Reply string Data []byte Sub *Subscription // contains filtered or unexported fields }
Msg is a structure used by Subscribers and PublishMsg().
type MsgHandler ¶
type MsgHandler func(msg *Msg)
MsgHandler is a callback function that processes messages delivered to asynchronous subscribers.
type Option ¶ added in v1.2.0
Option is a function on the options for a connection.
func ClientCert ¶ added in v1.2.0
ClientCert is a helper option to provide the client certificate from a file. If Secure is not already set this will set it as well
func ClosedHandler ¶ added in v1.2.0
func ClosedHandler(cb ConnHandler) Option
ClosedHandler is an Option to set the closed handler.
func Dialer ¶ added in v1.2.2
Dialer is an Option to set the dialer which will be used when attempting to establish a connection.
func DisconnectHandler ¶ added in v1.2.0
func DisconnectHandler(cb ConnHandler) Option
DisconnectHandler is an Option to set the disconnected handler.
func DontRandomize ¶ added in v1.2.0
func DontRandomize() Option
DontRandomize is an Option to turn off randomizing the server pool.
func ErrorHandler ¶ added in v1.2.0
func ErrorHandler(cb ErrHandler) Option
ErrHandler is an Option to set the async error handler.
func MaxReconnects ¶ added in v1.2.0
MaxReconnects is an Option to set the maximum number of reconnect attempts.
func NoReconnect ¶ added in v1.2.0
func NoReconnect() Option
NoReconnect is an Option to turn off reconnect behavior.
func ReconnectHandler ¶ added in v1.2.0
func ReconnectHandler(cb ConnHandler) Option
ReconnectHandler is an Option to set the reconnected handler.
func ReconnectWait ¶ added in v1.2.0
ReconnectWait is an Option to set the wait time between reconnect attempts.
func RootCAs ¶ added in v1.2.0
RootCAs is a helper option to provide the RootCAs pool from a list of filenames. If Secure is not already set this will set it as well.
func Secure ¶ added in v1.2.0
Secure is an Option to enable TLS secure connections that skip server verification by default. Pass a TLS Configuration for proper TLS.
type Options ¶
type Options struct { Url string Servers []string NoRandomize bool Name string Verbose bool Pedantic bool Secure bool TLSConfig *tls.Config AllowReconnect bool MaxReconnect int ReconnectWait time.Duration Timeout time.Duration PingInterval time.Duration // disabled if 0 or negative MaxPingsOut int ClosedCB ConnHandler DisconnectedCB ConnHandler ReconnectedCB ConnHandler AsyncErrorCB ErrHandler // Size of the backing bufio buffer during reconnect. Once this // has been exhausted publish operations will error. ReconnectBufSize int // The size of the buffered channel used between the socket // Go routine and the message delivery for SyncSubscriptions. // NOTE: This does not affect AsyncSubscriptions which are // dictated by PendingLimits() SubChanLen int User string Password string Token string // Dialer allows users setting a custom Dialer Dialer *net.Dialer }
Options can be used to create a customized connection.
type Statistics ¶
type Statistics struct { InMsgs uint64 OutMsgs uint64 InBytes uint64 OutBytes uint64 Reconnects uint64 }
Tracks various stats received and sent on this connection, including counts for messages and bytes.
type Subscription ¶
type Subscription struct { // Subject that represents this subscription. This can be different // than the received subject inside a Msg if this is a wildcard. Subject string // Optional queue group name. If present, all subscriptions with the // same name will form a distributed queue, and each message will // only be processed by one member of the group. Queue string // contains filtered or unexported fields }
A Subscription represents interest in a given subject.
func (*Subscription) AutoUnsubscribe ¶
func (s *Subscription) AutoUnsubscribe(max int) error
AutoUnsubscribe will issue an automatic Unsubscribe that is processed by the server when max messages have been received. This can be useful when sending a request to an unknown number of subscribers. Request() uses this functionality.
Example ¶
package main import ( "fmt" "github.com/nats-io/nats" ) func main() { nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() received, wanted, total := 0, 10, 100 sub, _ := nc.Subscribe("foo", func(_ *nats.Msg) { received++ }) sub.AutoUnsubscribe(wanted) for i := 0; i < total; i++ { nc.Publish("foo", []byte("Hello")) } nc.Flush() fmt.Printf("Received = %d", received) }
Output:
func (*Subscription) ClearMaxPending ¶ added in v1.2.0
func (s *Subscription) ClearMaxPending() error
ClearMaxPending resets the maximums seen so far.
func (*Subscription) Delivered ¶ added in v1.2.0
func (s *Subscription) Delivered() (int64, error)
Delivered returns the number of delivered messages for this subscription.
func (*Subscription) Dropped ¶ added in v1.2.0
func (s *Subscription) Dropped() (int, error)
Dropped returns the number of known dropped messages for this subscription. This will correspond to messages dropped by violations of PendingLimits. If the server declares the connection a SlowConsumer, this number may not be valid.
func (*Subscription) IsValid ¶
func (s *Subscription) IsValid() bool
IsValid returns a boolean indicating whether the subscription is still active. This will return false if the subscription has already been closed.
func (*Subscription) MaxPending ¶ added in v1.2.0
func (s *Subscription) MaxPending() (int, int, error)
MaxPending returns the maximum number of queued messages and queued bytes seen so far.
func (*Subscription) NextMsg ¶
func (s *Subscription) NextMsg(timeout time.Duration) (*Msg, error)
NextMsg() will return the next message available to a synchronous subscriber or block until one is available. A timeout can be used to return when no message has been delivered.
Example ¶
package main import ( "fmt" "time" "github.com/nats-io/nats" ) func main() { nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() sub, _ := nc.SubscribeSync("foo") m, err := sub.NextMsg(1 * time.Second) if err == nil { fmt.Printf("Received a message: %s\n", string(m.Data)) } else { fmt.Println("NextMsg timed out.") } }
Output:
func (*Subscription) Pending ¶ added in v1.2.0
func (s *Subscription) Pending() (int, int, error)
Pending returns the number of queued messages and queued bytes in the client for this subscription.
func (*Subscription) PendingLimits ¶ added in v1.2.0
func (s *Subscription) PendingLimits() (int, int, error)
PendingLimits returns the current limits for this subscription. If no error is returned, a negative value indicates that the given metric is not limited.
func (*Subscription) QueuedMsgs ¶ added in v1.1.2
func (s *Subscription) QueuedMsgs() (int, error)
Queued returns the number of queued messages in the client for this subscription. DEPRECATED: Use Pending()
func (*Subscription) SetPendingLimits ¶ added in v1.2.0
func (s *Subscription) SetPendingLimits(msgLimit, bytesLimit int) error
SetPendingLimits sets the limits for pending msgs and bytes for this subscription. Zero is not allowed. Any negative value means that the given metric is not limited.
func (*Subscription) Type ¶ added in v1.2.0
func (s *Subscription) Type() SubscriptionType
Type returns the type of Subscription.
func (*Subscription) Unsubscribe ¶
func (s *Subscription) Unsubscribe() error
Unsubscribe will remove interest in the given subject.
Example ¶
package main import ( "github.com/nats-io/nats" ) func main() { nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() sub, _ := nc.SubscribeSync("foo") // ... sub.Unsubscribe() }
Output:
type SubscriptionType ¶ added in v1.2.0
type SubscriptionType int
SubscriptionType is the type of the Subscription.