nats

package module
v1.1.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2015 License: MIT Imports: 20 Imported by: 0

README

NATS - Go Client

A Go client for the NATS messaging system.

License MIT ReportCard Build Status GoDoc Coverage Status

Installation

# Go client
go get github.com/nats-io/nats

# Servers

# gnatsd
go get github.com/nats-io/gnatsd

# nats-server (Ruby)
gem install nats

Basic Encoded Usage


nc, _ := nats.Connect(nats.DefaultURL)
c, _ := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
defer c.Close()

// Simple Publisher
c.Publish("foo", "Hello World")

// Simple Async Subscriber
c.Subscribe("foo", func(s string) {
    fmt.Printf("Received a message: %s\n", s)
})

// EncodedConn can Publish any raw Go type using the registered Encoder
type person struct {
     Name     string
     Address  string
     Age      int
}

// Go type Subscriber
c.Subscribe("hello", func(p *person) {
    fmt.Printf("Received a person: %+v\n", p)
})

me := &person{Name: "derek", Age: 22, Address: "585 Howard Street, San Francisco, CA"}

// Go type Publisher
c.Publish("hello", me)

// Unsubscribing
sub, err := c.Subscribe("foo", nil)
...
sub.Unsubscribe()

// Requests
var response string
err := c.Request("help", "help me", &response, 10*time.Millisecond)
if err != nil {
    fmt.Printf("Request failed: %v\n", err)
}

// Replying
c.Subscribe("help", func(subj, reply string, msg string) {
    c.Publish(reply, "I can help!")
})

// Close connection
c.Close();

TLS

// There is a SecureConnect() for NATS, but this by default does NOT verify the server identity, use with caution and only for testing!
nc, err := nats.SecureConnect(secureUrl)

// The better way is to setup the tls.Config as needed and pass via the options.
// To start, set secure boolean on NATS options.
opts := nats.DefaultOptions
opts.Url = url
opts.Secure = true

// For a server with a trusted chain built into the client host, simply designate the server name that is expected.
config := &tls.Config{
    ServerName: opts.Host,
    MinVersion: tls.VersionTLS12,
}

// If you are using a self-signed cert and need to load in the CA.
rootPEM, err := ioutil.ReadFile("./configs/certs/ca.pem")
if err != nil || rootPEM == nil {
    t.Fatalf("failed to read root certificate")
}
pool := x509.NewCertPool()
ok := pool.AppendCertsFromPEM([]byte(rootPEM))
if !ok {
    t.Fatalf("failed to parse root certificate")
}

config := &tls.Config{
    ServerName: opts.Host,
    RootCAs:    pool,
    MinVersion: tls.VersionTLS12,
}

// If the server requires client certificates..
certFile := "./configs/certs/client-cert.pem"
keyFile := "./configs/certs/client-key.pem"
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
    t.Fatalf("error parsing X509 certificate/key pair: %v", err)
}

config := &tls.Config{
    Certificates: []tls.Certificate{cert},
    ServerName:   opts.Host,
    MinVersion:   tls.VersionTLS12,
}

// Now add to the options and connect.
opts.TLSConfig = config

nc, err := opts.Connect()
if err != nil {
	t.Fatalf("Got an error on Connect with Secure Options: %+v\n", err)
}

Using Go Channels (netchan)

nc, _ := nats.Connect(nats.DefaultURL)
ec, _ := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
defer ec.Close()

type person struct {
     Name     string
     Address  string
     Age      int
}

recvCh := make(chan *person)
ec.BindRecvChan("hello", recvCh)

sendCh := make(chan *person)
ec.BindSendChan("hello", sendCh)

me := &person{Name: "derek", Age: 22, Address: "585 Howard Street"}

// Send via Go channels
sendCh <- me

// Receive via Go channels
who := <- recvCh

Basic Usage


nc, _ := nats.Connect(nats.DefaultURL)

// Simple Publisher
nc.Publish("foo", []byte("Hello World"))

// Simple Async Subscriber
nc.Subscribe("foo", func(m *Msg) {
    fmt.Printf("Received a message: %s\n", string(m.Data))
})

// Simple Sync Subscriber
sub, err := nc.SubscribeSync("foo")
m, err := sub.NextMsg(timeout)

// Unsubscribing
sub, err := nc.Subscribe("foo", nil)
sub.Unsubscribe()

// Requests
msg, err := nc.Request("help", []byte("help me"), 10*time.Millisecond)

// Replies
nc.Subscribe("help", func(m *Msg) {
    nc.Publish(m.Reply, []byte("I can help!"))
})

// Close connection
nc := nats.Connect("nats://localhost:4222")
nc.Close();

Wildcard Subscriptions


// "*" matches any token, at any level of the subject.
nc.Subscribe("foo.*.baz", func(m *Msg) {
    fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data));
})

nc.Subscribe("foo.bar.*", func(m *Msg) {
    fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data));
})

// ">" matches any length of the tail of a subject, and can only be the last token
// E.g. 'foo.>' will match 'foo.bar', 'foo.bar.baz', 'foo.foo.bar.bax.22'
nc.Subscribe("foo.>", func(m *Msg) {
    fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data));
})

// Matches all of the above
nc.Publish("foo.bar.baz", []byte("Hello World"))

Queue Groups

// All subscriptions with the same queue name will form a queue group.
// Each message will be delivered to only one subscriber per queue group,
// using queuing semantics. You can have as many queue groups as you wish.
// Normal subscribers will continue to work as expected.

nc.QueueSubscribe("foo", "job_workers", func(_ *Msg) {
  received += 1;
})

Advanced Usage


// Flush connection to server, returns when all messages have been processed.
nc.Flush()
fmt.Println("All clear!")

// FlushTimeout specifies a timeout value as well.
err := nc.FlushTimeout(1*time.Second)
if err != nil {
    fmt.Println("All clear!")
} else {
    fmt.Println("Flushed timed out!")
}

// Auto-unsubscribe after MAX_WANTED messages received
const MAX_WANTED = 10
sub, err := nc.Subscribe("foo")
sub.AutoUnsubscribe(MAX_WANTED)

// Multiple connections
nc1 := nats.Connect("nats://host1:4222")
nc2 := nats.Connect("nats://host2:4222")

nc1.Subscribe("foo", func(m *Msg) {
    fmt.Printf("Received a message: %s\n", string(m.Data))
})

nc2.Publish("foo", []byte("Hello World!"));

Clustered Usage


var servers = []string{
	"nats://localhost:1222",
	"nats://localhost:1223",
	"nats://localhost:1224",
}

// Setup options to include all servers in the cluster
opts := nats.DefaultOptions
opts.Servers = servers

// Optionally set ReconnectWait and MaxReconnect attempts.
// This example means 10 seconds total per backend.
opts.MaxReconnect = 5
opts.ReconnectWait = (2 * time.Second)

// Optionally disable randomization of the server pool
opts.NoRandomize = true

nc, err := opts.Connect()

// Setup callbacks to be notified on disconnects and reconnects
nc.Opts.DisconnectedCB = func(_ *Conn) {
    fmt.Printf("Got disconnected!\n")
}

// See who we are connected to on reconnect.
nc.Opts.ReconnectedCB = func(nc *Conn) {
    fmt.Printf("Got reconnected to %v!\n", nc.ConnectedUrl())
}

License

(The MIT License)

Copyright (c) 2012-2015 Apcera Inc.

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

Documentation

Overview

A Go client for the NATS messaging system (https://nats.io).

Index

Examples

Constants

View Source
const (
	JSON_ENCODER    = "json"
	GOB_ENCODER     = "gob"
	DEFAULT_ENCODER = "default"
)
View Source
const (
	Version              = "1.1.6"
	DefaultURL           = "nats://localhost:4222"
	DefaultPort          = 4222
	DefaultMaxReconnect  = 60
	DefaultReconnectWait = 2 * time.Second
	DefaultTimeout       = 2 * time.Second
	DefaultPingInterval  = 2 * time.Minute
	DefaultMaxPingOut    = 2
	DefaultMaxChanLen    = 65536
	RequestChanLen       = 4
	LangString           = "go"
)
View Source
const (
	DISCONNECTED = Status(iota)
	CONNECTED
	CLOSED
	RECONNECTING
	CONNECTING
)
View Source
const (
	OP_START = iota
	OP_PLUS
	OP_PLUS_O
	OP_PLUS_OK
	OP_MINUS
	OP_MINUS_E
	OP_MINUS_ER
	OP_MINUS_ERR
	OP_MINUS_ERR_SPC
	MINUS_ERR_ARG
	OP_C
	OP_CO
	OP_CON
	OP_CONN
	OP_CONNE
	OP_CONNEC
	OP_CONNECT
	CONNECT_ARG
	OP_M
	OP_MS
	OP_MSG
	OP_MSG_SPC
	MSG_ARG
	MSG_PAYLOAD
	MSG_END
	OP_P
	OP_PI
	OP_PIN
	OP_PING
	OP_PO
	OP_PON
	OP_PONG
)
View Source
const InboxPrefix = "_INBOX."
View Source
const MAX_CONTROL_LINE_SIZE = 1024
View Source
const STALE_CONNECTION = "Stale Connection"

For detection and proper handling of a Stale Connection

Variables

View Source
var (
	ErrConnectionClosed   = errors.New("nats: Connection Closed")
	ErrSecureConnRequired = errors.New("nats: Secure Connection required")
	ErrSecureConnWanted   = errors.New("nats: Secure Connection not available")
	ErrSecureConnFailed   = errors.New("nats: Secure Connection failed")
	ErrBadSubscription    = errors.New("nats: Invalid Subscription")
	ErrBadSubject         = errors.New("nats: Invalid Subject")
	ErrSlowConsumer       = errors.New("nats: Slow Consumer, messages dropped")
	ErrTimeout            = errors.New("nats: Timeout")
	ErrBadTimeout         = errors.New("nats: Timeout Invalid")
	ErrAuthorization      = errors.New("nats: Authorization Failed")
	ErrNoServers          = errors.New("nats: No servers available for connection")
	ErrJsonParse          = errors.New("nats: Connect message, json parse err")
	ErrChanArg            = errors.New("nats: Argument needs to be a channel type")
	ErrStaleConnection    = errors.New("nats: " + STALE_CONNECTION)
	ErrMaxPayload         = errors.New("nats: Maximum Payload Exceeded")
)
View Source
var DefaultOptions = Options{
	AllowReconnect: true,
	MaxReconnect:   DefaultMaxReconnect,
	ReconnectWait:  DefaultReconnectWait,
	Timeout:        DefaultTimeout,
	PingInterval:   DefaultPingInterval,
	MaxPingsOut:    DefaultMaxPingOut,
	SubChanLen:     DefaultMaxChanLen,
}

Functions

func NewInbox

func NewInbox() string

NewInbox will return an inbox string which can be used for directed replies from subscribers. These are guaranteed to be unique, but can be shared and subscribed to by others.

func RegisterEncoder

func RegisterEncoder(encType string, enc Encoder)

RegisterEncoder will register the encType with the given Encoder. Useful for customization.

Types

type Conn

type Conn struct {
	Statistics

	Opts Options
	// contains filtered or unexported fields
}

A Conn represents a bare connection to a nats-server. It will send and receive []byte payloads.

func Connect

func Connect(url string) (*Conn, error)

Connect will attempt to connect to the NATS server. The url can contain username/password semantics.

Example

Shows different ways to create a Conn

package main

import (
	"time"

	"github.com/nats-io/nats"
)

func main() {

	nc, _ := nats.Connect(nats.DefaultURL)
	nc.Close()

	nc, _ = nats.Connect("nats://derek:secretpassword@nats.apcera.com:4222")
	nc.Close()

	opts := nats.Options{
		AllowReconnect: true,
		MaxReconnect:   10,
		ReconnectWait:  5 * time.Second,
		Timeout:        1 * time.Second,
	}

	nc, _ = opts.Connect()
	nc.Close()
}
Output:

func SecureConnect

func SecureConnect(url string) (*Conn, error)

SecureConnect will attempt to connect to the NATS server using TLS. The url can contain username/password semantics.

func (*Conn) Buffered added in v1.1.2

func (nc *Conn) Buffered() (int, error)

Buffered will return the number of bytes buffered to be sent to the server.

func (*Conn) Close

func (nc *Conn) Close()

Close will close the connection to the server. This call will release all blocking calls, such as Flush() and NextMsg()

Example
package main

import (
	"github.com/nats-io/nats"
)

func main() {
	nc, _ := nats.Connect(nats.DefaultURL)
	nc.Close()
}
Output:

func (*Conn) ConnectedServerId added in v1.0.5

func (nc *Conn) ConnectedServerId() string

Report the connected server's Id

func (*Conn) ConnectedUrl

func (nc *Conn) ConnectedUrl() string

Report the connected server's Url

func (*Conn) Flush

func (nc *Conn) Flush() error

Flush will perform a round trip to the server and return when it receives the internal reply.

Example
package main

import (
	"github.com/nats-io/nats"
)

func main() {
	nc, _ := nats.Connect(nats.DefaultURL)
	defer nc.Close()

	msg := &nats.Msg{Subject: "foo", Reply: "bar", Data: []byte("Hello World!")}
	for i := 0; i < 1000; i++ {
		nc.PublishMsg(msg)
	}
	err := nc.Flush()
	if err == nil {
		// Everything has been processed by the server for nc *Conn.
	}
}
Output:

func (*Conn) FlushTimeout

func (nc *Conn) FlushTimeout(timeout time.Duration) (err error)

FlushTimeout allows a Flush operation to have an associated timeout.

Example
package main

import (
	"time"

	"github.com/nats-io/nats"
)

func main() {
	nc, _ := nats.Connect(nats.DefaultURL)
	defer nc.Close()

	msg := &nats.Msg{Subject: "foo", Reply: "bar", Data: []byte("Hello World!")}
	for i := 0; i < 1000; i++ {
		nc.PublishMsg(msg)
	}
	// Only wait for up to 1 second for Flush
	err := nc.FlushTimeout(1 * time.Second)
	if err == nil {
		// Everything has been processed by the server for nc *Conn.
	}
}
Output:

func (*Conn) IsClosed

func (nc *Conn) IsClosed() bool

Test if Conn has been closed.

func (*Conn) IsReconnecting

func (nc *Conn) IsReconnecting() bool

Test if Conn is reconnecting.

func (*Conn) LastError

func (nc *Conn) LastError() error

LastError reports the last error encountered via the Connection.

func (*Conn) MaxPayload added in v1.1.2

func (nc *Conn) MaxPayload() int64

MaxPayload returns the size limit that a message payload can have.

func (*Conn) Publish

func (nc *Conn) Publish(subj string, data []byte) error

Publish publishes the data argument to the given subject. The data argument is left untouched and needs to be correctly interpreted on the receiver.

Example
package main

import (
	"github.com/nats-io/nats"
)

func main() {
	nc, _ := nats.Connect(nats.DefaultURL)
	defer nc.Close()

	nc.Publish("foo", []byte("Hello World!"))
}
Output:

func (*Conn) PublishMsg

func (nc *Conn) PublishMsg(m *Msg) error

PublishMsg publishes the Msg structure, which includes the Subject, an optional Reply and an optional Data field.

Example
package main

import (
	"github.com/nats-io/nats"
)

func main() {
	nc, _ := nats.Connect(nats.DefaultURL)
	defer nc.Close()

	msg := &nats.Msg{Subject: "foo", Reply: "bar", Data: []byte("Hello World!")}
	nc.PublishMsg(msg)
}
Output:

func (*Conn) PublishRequest

func (nc *Conn) PublishRequest(subj, reply string, data []byte) error

PublishRequest will perform a Publish() excpecting a response on the reply subject. Use Request() for automatically waiting for a response inline.

func (*Conn) QueueSubscribe

func (nc *Conn) QueueSubscribe(subj, queue string, cb MsgHandler) (*Subscription, error)

QueueSubscribe creates an asynchronous queue subscriber on the given subject. All subscribers with the same queue name will form the queue group and only one member of the group will be selected to receive any given message asynchronously.

Example
package main

import (
	"github.com/nats-io/nats"
)

func main() {
	nc, _ := nats.Connect(nats.DefaultURL)
	defer nc.Close()

	received := 0

	nc.QueueSubscribe("foo", "worker_group", func(_ *nats.Msg) {
		received += 1
	})
}
Output:

func (*Conn) QueueSubscribeSync

func (nc *Conn) QueueSubscribeSync(subj, queue string) (*Subscription, error)

QueueSubscribeSync creates a synchronous queue subscriber on the given subject. All subscribers with the same queue name will form the queue group and only one member of the group will be selected to receive any given message synchronously.

func (*Conn) Request

func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (m *Msg, err error)

Request will create an Inbox and perform a Request() call with the Inbox reply and return the first reply received. This is optimized for the case of multiple responses.

Example
package main

import (
	"time"

	"github.com/nats-io/nats"
)

func main() {
	nc, _ := nats.Connect(nats.DefaultURL)
	defer nc.Close()

	nc.Subscribe("foo", func(m *nats.Msg) {
		nc.Publish(m.Reply, []byte("I will help you"))
	})
	nc.Request("foo", []byte("help"), 50*time.Millisecond)
}
Output:

func (*Conn) Stats

func (nc *Conn) Stats() Statistics

Stats will return a race safe copy of the Statistics section for the connection.

func (*Conn) Status

func (nc *Conn) Status() Status

Status returns the current state of the connection.

func (*Conn) Subscribe

func (nc *Conn) Subscribe(subj string, cb MsgHandler) (*Subscription, error)

Subscribe will express interest in the given subject. The subject can have wildcards (partial:*, full:>). Messages will be delivered to the associated MsgHandler. If no MsgHandler is given, the subscription is a synchronous subscription and can be polled via Subscription.NextMsg().

Example

This Example shows an asynchronous subscriber.

package main

import (
	"fmt"

	"github.com/nats-io/nats"
)

func main() {
	nc, _ := nats.Connect(nats.DefaultURL)
	defer nc.Close()

	nc.Subscribe("foo", func(m *nats.Msg) {
		fmt.Printf("Received a message: %s\n", string(m.Data))
	})
}
Output:

func (*Conn) SubscribeSync

func (nc *Conn) SubscribeSync(subj string) (*Subscription, error)

SubscribeSync is syntactic sugar for Subscribe(subject, nil).

Example

This Example shows a synchronous subscriber.

package main

import (
	"fmt"
	"time"

	"github.com/nats-io/nats"
)

func main() {
	nc, _ := nats.Connect(nats.DefaultURL)
	defer nc.Close()

	sub, _ := nc.SubscribeSync("foo")
	m, err := sub.NextMsg(1 * time.Second)
	if err == nil {
		fmt.Printf("Received a message: %s\n", string(m.Data))
	} else {
		fmt.Println("NextMsg timed out.")
	}
}
Output:

type ConnHandler

type ConnHandler func(*Conn)

ConnHandlers are used for asynchronous events such as disconnected and closed connections.

type EncodedConn

type EncodedConn struct {
	Conn *Conn
	Enc  Encoder
}

EncodedConn are the preferred way to interface with NATS. They wrap a bare connection to a nats server and have an extendable encoder system that will encode and decode messages from raw Go types.

func NewEncodedConn

func NewEncodedConn(c *Conn, encType string) (*EncodedConn, error)

NewEncodedConn will wrap an existing Connection and utilize the appropriate registered encoder.

Example

Shows how to wrap a Conn into an EncodedConn

package main

import (
	"github.com/nats-io/nats"
)

func main() {
	nc, _ := nats.Connect(nats.DefaultURL)
	c, _ := nats.NewEncodedConn(nc, "json")
	c.Close()
}
Output:

func (*EncodedConn) BindRecvChan

func (c *EncodedConn) BindRecvChan(subject string, channel interface{}) (*Subscription, error)

Bind a channel for receive operations from nats.

Example

BindRecvChan() allows binding of a Go channel to a nats subject for subscribe operations. The Encoder attached to the EncodedConn will be used for un-marshalling.

package main

import (
	"fmt"

	"github.com/nats-io/nats"
)

func main() {
	nc, _ := nats.Connect(nats.DefaultURL)
	c, _ := nats.NewEncodedConn(nc, "json")
	defer c.Close()

	type person struct {
		Name    string
		Address string
		Age     int
	}

	ch := make(chan *person)
	c.BindRecvChan("hello", ch)

	me := &person{Name: "derek", Age: 22, Address: "85 Second St"}
	c.Publish("hello", me)

	// Receive the publish directly on a channel
	who := <-ch

	fmt.Printf("%v says hello!\n", who)
}
Output:

func (*EncodedConn) BindRecvQueueChan

func (c *EncodedConn) BindRecvQueueChan(subject, queue string, channel interface{}) (*Subscription, error)

Bind a channel for queue-based receive operations from nats.

func (*EncodedConn) BindSendChan

func (c *EncodedConn) BindSendChan(subject string, channel interface{}) error

Bind a channel for send operations to nats.

Example

BindSendChan() allows binding of a Go channel to a nats subject for publish operations. The Encoder attached to the EncodedConn will be used for marshalling.

package main

import (
	"github.com/nats-io/nats"
)

func main() {
	nc, _ := nats.Connect(nats.DefaultURL)
	c, _ := nats.NewEncodedConn(nc, "json")
	defer c.Close()

	type person struct {
		Name    string
		Address string
		Age     int
	}

	ch := make(chan *person)
	c.BindSendChan("hello", ch)

	me := &person{Name: "derek", Age: 22, Address: "85 Second St"}
	ch <- me
}
Output:

func (*EncodedConn) Close

func (c *EncodedConn) Close()

Close will close the connection to the server. This call will release all blocking calls, such as Flush(), etc.

func (*EncodedConn) Flush

func (c *EncodedConn) Flush() error

Flush will perform a round trip to the server and return when it receives the internal reply.

func (*EncodedConn) FlushTimeout

func (c *EncodedConn) FlushTimeout(timeout time.Duration) (err error)

FlushTimeout allows a Flush operation to have an associated timeout.

func (*EncodedConn) LastError

func (c *EncodedConn) LastError() error

LastError reports the last error encountered via the Connection.

func (*EncodedConn) Publish

func (c *EncodedConn) Publish(subject string, v interface{}) error

Publish publishes the data argument to the given subject. The data argument will be encoded using the associated encoder.

Example

EncodedConn can publish virtually anything just by passing it in. The encoder will be used to properly encode the raw Go type

package main

import (
	"github.com/nats-io/nats"
)

func main() {
	nc, _ := nats.Connect(nats.DefaultURL)
	c, _ := nats.NewEncodedConn(nc, "json")
	defer c.Close()

	type person struct {
		Name    string
		Address string
		Age     int
	}

	me := &person{Name: "derek", Age: 22, Address: "85 Second St"}
	c.Publish("hello", me)
}
Output:

func (*EncodedConn) PublishRequest

func (c *EncodedConn) PublishRequest(subject, reply string, v interface{}) error

PublishRequest will perform a Publish() expecting a response on the reply subject. Use Request() for automatically waiting for a response inline.

func (*EncodedConn) QueueSubscribe

func (c *EncodedConn) QueueSubscribe(subject, queue string, cb Handler) (*Subscription, error)

QueueSubscribe will create a queue subscription on the given subject and process incoming messages using the specified Handler. The Handler should be a func that matches a signature from the description of Handler from above.

func (*EncodedConn) Request

func (c *EncodedConn) Request(subject string, v interface{}, vPtr interface{}, timeout time.Duration) error

Request will create an Inbox and perform a Request() call with the Inbox reply for the data v. A response will be decoded into the vPtrResponse.

func (*EncodedConn) Subscribe

func (c *EncodedConn) Subscribe(subject string, cb Handler) (*Subscription, error)

Subscribe will create a subscription on the given subject and process incoming messages using the specified Handler. The Handler should be a func that matches a signature from the description of Handler from above.

Example

EncodedConn's subscribers will automatically decode the wire data into the requested Go type using the Decode() method of the registered Encoder. The callback signature can also vary to include additional data, such as subject and reply subjects.

package main

import (
	"fmt"

	"github.com/nats-io/nats"
)

func main() {
	nc, _ := nats.Connect(nats.DefaultURL)
	c, _ := nats.NewEncodedConn(nc, "json")
	defer c.Close()

	type person struct {
		Name    string
		Address string
		Age     int
	}

	c.Subscribe("hello", func(p *person) {
		fmt.Printf("Received a person! %+v\n", p)
	})

	c.Subscribe("hello", func(subj, reply string, p *person) {
		fmt.Printf("Received a person on subject %s! %+v\n", subj, p)
	})

	me := &person{Name: "derek", Age: 22, Address: "85 Second St"}
	c.Publish("hello", me)
}
Output:

type Encoder

type Encoder interface {
	Encode(subject string, v interface{}) ([]byte, error)
	Decode(subject string, data []byte, vPtr interface{}) error
}

Encoder interface is for all register encoders

func EncoderForType

func EncoderForType(encType string) Encoder

EncoderForType will return the registered Encoder for the encType.

type ErrHandler

type ErrHandler func(*Conn, *Subscription, error)

ErrHandlers are used to process asynchronous errors encountered while processing inbound messages.

type Handler

type Handler interface{}

Handler is a specific callback used for Subscribe. It is generalized to an interface{}, but we will discover its format and arguments at runtime and perform the correct callback, including de-marshalling JSON strings back into the appropriate struct based on the signature of the Handler.

Handlers are expected to have one of four signatures.

type person struct {
	Name string `json:"name,omitempty"`
	Age  uint   `json:"age,omitempty"`
}

handler := func(m *Msg)
handler := func(p *person)
handler := func(subject string, o *obj)
handler := func(subject, reply string, o *obj)

These forms allow a callback to request a raw Msg ptr, where the processing of the message from the wire is untouched. Process a JSON representation and demarshal it into the given struct, e.g. person. There are also variants where the callback wants either the subject, or the subject and the reply subject.

type Msg

type Msg struct {
	Subject string
	Reply   string
	Data    []byte
	Sub     *Subscription
}

Msg is a structure used by Subscribers and PublishMsg().

type MsgHandler

type MsgHandler func(msg *Msg)

MsgHandler is a callback function that processes messages delivered to asynchronous subscribers.

type Options

type Options struct {
	Url            string
	Servers        []string
	NoRandomize    bool
	Name           string
	Verbose        bool
	Pedantic       bool
	Secure         bool
	TLSConfig      *tls.Config
	AllowReconnect bool
	MaxReconnect   int
	ReconnectWait  time.Duration
	Timeout        time.Duration
	ClosedCB       ConnHandler
	DisconnectedCB ConnHandler
	ReconnectedCB  ConnHandler
	AsyncErrorCB   ErrHandler

	PingInterval time.Duration // disabled if 0 or negative
	MaxPingsOut  int

	// The size of the buffered channel used between the socket
	// Go routine and the message delivery or sync subscription.
	SubChanLen int
}

Options can be used to create a customized Connection.

func (Options) Connect

func (o Options) Connect() (*Conn, error)

Connect will attempt to connect to a NATS server with multiple options.

type Statistics

type Statistics struct {
	InMsgs     uint64
	OutMsgs    uint64
	InBytes    uint64
	OutBytes   uint64
	Reconnects uint64
}

Tracks various stats received and sent on this connection, including counts for messages and bytes.

type Status

type Status int

type Subscription

type Subscription struct {

	// Subject that represents this subscription. This can be different
	// than the received subject inside a Msg if this is a wildcard.
	Subject string

	// Optional queue group name. If present, all subscriptions with the
	// same name will form a distributed queue, and each message will
	// only be processed by one member of the group.
	Queue string
	// contains filtered or unexported fields
}

A Subscription represents interest in a given subject.

func (*Subscription) AutoUnsubscribe

func (s *Subscription) AutoUnsubscribe(max int) error

AutoUnsubscribe will issue an automatic Unsubscribe that is processed by the server when max messages have been received. This can be useful when sending a request to an unknown number of subscribers. Request() uses this functionality.

Example
package main

import (
	"fmt"

	"github.com/nats-io/nats"
)

func main() {
	nc, _ := nats.Connect(nats.DefaultURL)
	defer nc.Close()

	received, wanted, total := 0, 10, 100

	sub, _ := nc.Subscribe("foo", func(_ *nats.Msg) {
		received += 1
	})
	sub.AutoUnsubscribe(wanted)

	for i := 0; i < total; i++ {
		nc.Publish("foo", []byte("Hello"))
	}
	nc.Flush()

	fmt.Printf("Received = %d", received)
}
Output:

func (*Subscription) IsValid

func (s *Subscription) IsValid() bool

IsValid returns a boolean indicating whether the subscription is still active. This will return false if the subscription has already been closed.

func (*Subscription) NextMsg

func (s *Subscription) NextMsg(timeout time.Duration) (msg *Msg, err error)

NextMsg() will return the next message available to a synchronous subscriber or block until one is available. A timeout can be used to return when no message has been delivered.

Example
package main

import (
	"fmt"
	"time"

	"github.com/nats-io/nats"
)

func main() {
	nc, _ := nats.Connect(nats.DefaultURL)
	defer nc.Close()

	sub, _ := nc.SubscribeSync("foo")
	m, err := sub.NextMsg(1 * time.Second)
	if err == nil {
		fmt.Printf("Received a message: %s\n", string(m.Data))
	} else {
		fmt.Println("NextMsg timed out.")
	}
}
Output:

func (*Subscription) QueuedMsgs added in v1.1.2

func (s *Subscription) QueuedMsgs() (int, error)

Queued returns the number of queued messages in the client for this subscription.

func (*Subscription) Unsubscribe

func (s *Subscription) Unsubscribe() error

Unsubscribe will remove interest in the given subject.

Example
package main

import (
	"github.com/nats-io/nats"
)

func main() {
	nc, _ := nats.Connect(nats.DefaultURL)
	defer nc.Close()

	sub, _ := nc.SubscribeSync("foo")
	// ...
	sub.Unsubscribe()
}
Output:

Directories

Path Synopsis
encoders

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL