Documentation
¶
Overview ¶
A Go client for the NATS messaging system (https://nats.io).
Index ¶
- Constants
- Variables
- func NewInbox() string
- func RegisterEncoder(encType string, enc Encoder)
- type APIStats
- type AccountInfo
- type AccountLimits
- type AckOpt
- type AckPolicy
- type AckWait
- type AuthTokenHandler
- type ClusterInfo
- type Conn
- func (nc *Conn) AuthRequired() bool
- func (nc *Conn) Barrier(f func()) error
- func (nc *Conn) Buffered() (int, error)
- func (nc *Conn) ChanQueueSubscribe(subj, group string, ch chan *Msg) (*Subscription, error)
- func (nc *Conn) ChanSubscribe(subj string, ch chan *Msg) (*Subscription, error)
- func (nc *Conn) Close()
- func (nc *Conn) ConnectedAddr() string
- func (nc *Conn) ConnectedClusterName() string
- func (nc *Conn) ConnectedServerId() string
- func (nc *Conn) ConnectedServerName() string
- func (nc *Conn) ConnectedUrl() string
- func (nc *Conn) DiscoveredServers() []string
- func (nc *Conn) Drain() error
- func (nc *Conn) Flush() error
- func (nc *Conn) FlushTimeout(timeout time.Duration) (err error)
- func (nc *Conn) FlushWithContext(ctx context.Context) error
- func (nc *Conn) GetClientID() (uint64, error)
- func (nc *Conn) GetClientIP() (net.IP, error)
- func (nc *Conn) HeadersSupported() bool
- func (nc *Conn) IsClosed() bool
- func (nc *Conn) IsConnected() bool
- func (nc *Conn) IsDraining() bool
- func (nc *Conn) IsReconnecting() bool
- func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error)
- func (nc *Conn) LastError() error
- func (nc *Conn) MaxPayload() int64
- func (nc *Conn) NewRespInbox() string
- func (nc *Conn) NumSubscriptions() int
- func (nc *Conn) Publish(subj string, data []byte) error
- func (nc *Conn) PublishMsg(m *Msg) error
- func (nc *Conn) PublishRequest(subj, reply string, data []byte) error
- func (nc *Conn) QueueSubscribe(subj, queue string, cb MsgHandler) (*Subscription, error)
- func (nc *Conn) QueueSubscribeSync(subj, queue string) (*Subscription, error)
- func (nc *Conn) QueueSubscribeSyncWithChan(subj, queue string, ch chan *Msg) (*Subscription, error)
- func (nc *Conn) RTT() (time.Duration, error)
- func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (*Msg, error)
- func (nc *Conn) RequestMsg(msg *Msg, timeout time.Duration) (*Msg, error)
- func (nc *Conn) RequestMsgWithContext(ctx context.Context, msg *Msg) (*Msg, error)
- func (nc *Conn) RequestWithContext(ctx context.Context, subj string, data []byte) (*Msg, error)
- func (nc *Conn) Servers() []string
- func (nc *Conn) SetClosedHandler(cb ConnHandler)
- func (nc *Conn) SetDisconnectErrHandler(dcb ConnErrHandler)
- func (nc *Conn) SetDisconnectHandler(dcb ConnHandler)
- func (nc *Conn) SetDiscoveredServersHandler(dscb ConnHandler)
- func (nc *Conn) SetErrorHandler(cb ErrHandler)
- func (nc *Conn) SetReconnectHandler(rcb ConnHandler)
- func (nc *Conn) Stats() Statistics
- func (nc *Conn) Status() Status
- func (nc *Conn) Subscribe(subj string, cb MsgHandler) (*Subscription, error)
- func (nc *Conn) SubscribeSync(subj string) (*Subscription, error)
- func (nc *Conn) TLSRequired() bool
- type ConnErrHandler
- type ConnHandler
- type ConsumerConfig
- type ConsumerInfo
- type ContextOpt
- type CustomDialer
- type DeliverPolicy
- type DiscardPolicy
- type EncodedConn
- func (c *EncodedConn) BindRecvChan(subject string, channel interface{}) (*Subscription, error)
- func (c *EncodedConn) BindRecvQueueChan(subject, queue string, channel interface{}) (*Subscription, error)
- func (c *EncodedConn) BindSendChan(subject string, channel interface{}) error
- func (c *EncodedConn) Close()
- func (c *EncodedConn) Drain() error
- func (c *EncodedConn) Flush() error
- func (c *EncodedConn) FlushTimeout(timeout time.Duration) (err error)
- func (c *EncodedConn) LastError() error
- func (c *EncodedConn) Publish(subject string, v interface{}) error
- func (c *EncodedConn) PublishRequest(subject, reply string, v interface{}) error
- func (c *EncodedConn) QueueSubscribe(subject, queue string, cb Handler) (*Subscription, error)
- func (c *EncodedConn) Request(subject string, v interface{}, vPtr interface{}, timeout time.Duration) error
- func (c *EncodedConn) RequestWithContext(ctx context.Context, subject string, v interface{}, vPtr interface{}) error
- func (c *EncodedConn) Subscribe(subject string, cb Handler) (*Subscription, error)
- type Encoder
- type ErrConsumerSequenceMismatch
- type ErrHandler
- type ExternalStream
- type Handler
- type Header
- type JSOpt
- type JetStream
- type JetStreamContext
- type JetStreamManager
- type MaxWait
- type Msg
- func (m *Msg) Ack(opts ...AckOpt) error
- func (m *Msg) AckSync(opts ...AckOpt) error
- func (m *Msg) InProgress(opts ...AckOpt) error
- func (m *Msg) Metadata() (*MsgMetadata, error)
- func (m *Msg) Nak(opts ...AckOpt) error
- func (m *Msg) Respond(data []byte) error
- func (m *Msg) RespondMsg(msg *Msg) error
- func (m *Msg) Term(opts ...AckOpt) error
- type MsgErrHandler
- type MsgHandler
- type MsgMetadata
- type Option
- func ClientCert(certFile, keyFile string) Option
- func ClosedHandler(cb ConnHandler) Option
- func Compression(enabled bool) Option
- func CustomInboxPrefix(p string) Option
- func CustomReconnectDelay(cb ReconnectDelayHandler) Option
- func Dialer(dialer *net.Dialer) Option
- func DisconnectErrHandler(cb ConnErrHandler) Option
- func DisconnectHandler(cb ConnHandler) Option
- func DiscoveredServersHandler(cb ConnHandler) Option
- func DontRandomize() Option
- func DrainTimeout(t time.Duration) Option
- func ErrorHandler(cb ErrHandler) Option
- func FlusherTimeout(t time.Duration) Option
- func LameDuckModeHandler(cb ConnHandler) Option
- func MaxPingsOutstanding(max int) Option
- func MaxReconnects(max int) Option
- func Name(name string) Option
- func Nkey(pubKey string, sigCB SignatureHandler) Option
- func NkeyOptionFromSeed(seedFile string) (Option, error)
- func NoCallbacksAfterClientClose() Option
- func NoEcho() Option
- func NoReconnect() Option
- func PingInterval(t time.Duration) Option
- func ReconnectBufSize(size int) Option
- func ReconnectHandler(cb ConnHandler) Option
- func ReconnectJitter(jitter, jitterForTLS time.Duration) Option
- func ReconnectWait(t time.Duration) Option
- func RetryOnFailedConnect(retry bool) Option
- func RootCAs(file ...string) Option
- func Secure(tls ...*tls.Config) Option
- func SetCustomDialer(dialer CustomDialer) Option
- func SyncQueueLen(max int) Option
- func Timeout(t time.Duration) Option
- func Token(token string) Option
- func TokenHandler(cb AuthTokenHandler) Option
- func UseOldRequestStyle() Option
- func UserCredentials(userOrChainedFile string, seedFiles ...string) Option
- func UserInfo(user, password string) Option
- func UserJWT(userCB UserJWTHandler, sigCB SignatureHandler) Option
- type Options
- type PeerInfo
- type Placement
- type PubAck
- type PubAckFuture
- type PubOpt
- type PullOpt
- type RawStreamMsg
- type ReconnectDelayHandler
- type ReplayPolicy
- type RetentionPolicy
- type SequenceInfo
- type SequencePair
- type SignatureHandler
- type Statistics
- type Status
- type StorageType
- type StreamConfig
- type StreamInfo
- type StreamSource
- type StreamSourceInfo
- type StreamState
- type SubOpt
- func AckAll() SubOpt
- func AckExplicit() SubOpt
- func AckNone() SubOpt
- func Bind(stream, consumer string) SubOpt
- func BindStream(stream string) SubOpt
- func DeliverAll() SubOpt
- func DeliverLast() SubOpt
- func DeliverLastPerSubject() SubOpt
- func DeliverNew() SubOpt
- func DeliverSubject(subject string) SubOpt
- func Description(description string) SubOpt
- func Durable(consumer string) SubOpt
- func EnableFlowControl() SubOpt
- func IdleHeartbeat(duration time.Duration) SubOpt
- func ManualAck() SubOpt
- func MaxAckPending(n int) SubOpt
- func MaxDeliver(n int) SubOpt
- func OrderedConsumer() SubOpt
- func PullMaxWaiting(n int) SubOpt
- func RateLimit(n uint64) SubOpt
- func ReplayInstant() SubOpt
- func ReplayOriginal() SubOpt
- func StartSequence(seq uint64) SubOpt
- func StartTime(startTime time.Time) SubOpt
- type Subscription
- func (s *Subscription) AutoUnsubscribe(max int) error
- func (s *Subscription) ClearMaxPending() error
- func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error)
- func (s *Subscription) Delivered() (int64, error)
- func (s *Subscription) Drain() error
- func (s *Subscription) Dropped() (int, error)
- func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error)
- func (s *Subscription) IsValid() bool
- func (s *Subscription) MaxPending() (int, int, error)
- func (s *Subscription) NextMsg(timeout time.Duration) (*Msg, error)
- func (s *Subscription) NextMsgWithContext(ctx context.Context) (*Msg, error)
- func (s *Subscription) Pending() (int, int, error)
- func (s *Subscription) PendingLimits() (int, int, error)
- func (s *Subscription) QueuedMsgs() (int, error)
- func (s *Subscription) SetPendingLimits(msgLimit, bytesLimit int) error
- func (s *Subscription) Type() SubscriptionType
- func (s *Subscription) Unsubscribe() error
- type SubscriptionType
- type UserJWTHandler
Examples ¶
- AckOpt
- AckWait
- Conn.Close
- Conn.Flush
- Conn.FlushTimeout
- Conn.Publish
- Conn.PublishMsg
- Conn.QueueSubscribe
- Conn.Request
- Conn.Subscribe
- Conn.SubscribeSync
- Connect
- Context
- EncodedConn.BindRecvChan
- EncodedConn.BindSendChan
- EncodedConn.Publish
- EncodedConn.Subscribe
- JSOpt
- JetStream
- JetStreamContext
- JetStreamManager
- MaxWait
- Msg.AckSync
- Msg.Metadata
- NewEncodedConn
- PubOpt
- PullOpt
- SubOpt
- Subscription.AutoUnsubscribe
- Subscription.NextMsg
- Subscription.Unsubscribe
Constants ¶
const ( JSON_ENCODER = "json" GOB_ENCODER = "gob" DEFAULT_ENCODER = "default" )
Indexed names into the Registered Encoders.
const ( MsgIdHdr = "Nats-Msg-Id" ExpectedStreamHdr = "Nats-Expected-Stream" ExpectedLastSeqHdr = "Nats-Expected-Last-Sequence" ExpectedLastSubjSeqHdr = "Nats-Expected-Last-Subject-Sequence" ExpectedLastMsgIdHdr = "Nats-Expected-Last-Msg-Id" )
Headers for published messages.
const ( Version = "1.12.0" DefaultURL = "nats://127.0.0.1:4222" DefaultPort = 4222 DefaultMaxReconnect = 60 DefaultReconnectWait = 2 * time.Second DefaultReconnectJitter = 100 * time.Millisecond DefaultReconnectJitterTLS = time.Second DefaultTimeout = 2 * time.Second DefaultPingInterval = 2 * time.Minute DefaultMaxPingOut = 2 DefaultMaxChanLen = 64 * 1024 // 64k DefaultReconnectBufSize = 8 * 1024 * 1024 // 8MB RequestChanLen = 8 DefaultDrainTimeout = 30 * time.Second LangString = "go" )
Default Constants
const ( // STALE_CONNECTION is for detection and proper handling of stale connections. STALE_CONNECTION = "stale connection" // PERMISSIONS_ERR is for when nats server subject authorization has failed. PERMISSIONS_ERR = "permissions violation" // AUTHORIZATION_ERR is for when nats server user authorization has failed. AUTHORIZATION_ERR = "authorization violation" // AUTHENTICATION_EXPIRED_ERR is for when nats server user authorization has expired. AUTHENTICATION_EXPIRED_ERR = "user authentication expired" // AUTHENTICATION_REVOKED_ERR is for when user authorization has been revoked. AUTHENTICATION_REVOKED_ERR = "user authentication revoked" // ACCOUNT_AUTHENTICATION_EXPIRED_ERR is for when nats server account authorization has expired. ACCOUNT_AUTHENTICATION_EXPIRED_ERR = "account authentication expired" )
const ( DISCONNECTED = Status(iota) CONNECTED CLOSED RECONNECTING CONNECTING DRAINING_SUBS DRAINING_PUBS )
const ( AsyncSubscription = SubscriptionType(iota) SyncSubscription ChanSubscription NilSubscription PullSubscription )
The different types of subscription types.
const ( // DefaultSubPendingMsgsLimit will be 512k msgs. DefaultSubPendingMsgsLimit = 512 * 1024 // DefaultSubPendingBytesLimit is 64MB DefaultSubPendingBytesLimit = 64 * 1024 * 1024 )
Pending Limits
const ( OP_START = iota OP_PLUS OP_PLUS_O OP_PLUS_OK OP_MINUS OP_MINUS_E OP_MINUS_ER OP_MINUS_ERR OP_MINUS_ERR_SPC MINUS_ERR_ARG OP_M OP_MS OP_MSG OP_MSG_SPC MSG_ARG MSG_PAYLOAD MSG_END OP_H OP_P OP_PI OP_PIN OP_PING OP_PO OP_PON OP_PONG OP_I OP_IN OP_INF OP_INFO OP_INFO_SPC INFO_ARG )
const (
InboxPrefix = "_INBOX."
)
InboxPrefix is the prefix for all inbox subjects.
const MAX_CONTROL_LINE_SIZE = 4096
Variables ¶
var ( ErrConnectionClosed = errors.New("nats: connection closed") ErrConnectionDraining = errors.New("nats: connection draining") ErrDrainTimeout = errors.New("nats: draining connection timed out") ErrConnectionReconnecting = errors.New("nats: connection reconnecting") ErrSecureConnRequired = errors.New("nats: secure connection required") ErrSecureConnWanted = errors.New("nats: secure connection not available") ErrBadSubscription = errors.New("nats: invalid subscription") ErrTypeSubscription = errors.New("nats: invalid subscription type") ErrBadSubject = errors.New("nats: invalid subject") ErrBadQueueName = errors.New("nats: invalid queue name") ErrSlowConsumer = errors.New("nats: slow consumer, messages dropped") ErrTimeout = errors.New("nats: timeout") ErrBadTimeout = errors.New("nats: timeout invalid") ErrAuthorization = errors.New("nats: authorization violation") ErrAuthExpired = errors.New("nats: authentication expired") ErrAuthRevoked = errors.New("nats: authentication revoked") ErrAccountAuthExpired = errors.New("nats: account authentication expired") ErrNoServers = errors.New("nats: no servers available for connection") ErrJsonParse = errors.New("nats: connect message, json parse error") ErrChanArg = errors.New("nats: argument needs to be a channel type") ErrMaxPayload = errors.New("nats: maximum payload exceeded") ErrMaxMessages = errors.New("nats: maximum messages delivered") ErrSyncSubRequired = errors.New("nats: illegal call on an async subscription") ErrMultipleTLSConfigs = errors.New("nats: multiple tls.Configs not allowed") ErrNoInfoReceived = errors.New("nats: protocol exception, INFO not received") ErrReconnectBufExceeded = errors.New("nats: outbound buffer limit exceeded") ErrInvalidConnection = errors.New("nats: invalid connection") ErrInvalidMsg = errors.New("nats: invalid message or message nil") ErrInvalidArg = errors.New("nats: invalid argument") ErrInvalidContext = errors.New("nats: invalid context") ErrNoDeadlineContext = errors.New("nats: context requires a deadline") ErrNoEchoNotSupported = errors.New("nats: no echo option not supported by this server") ErrClientIDNotSupported = errors.New("nats: client ID not supported by this server") ErrUserButNoSigCB = errors.New("nats: user callback defined without a signature handler") ErrNkeyButNoSigCB = errors.New("nats: nkey defined without a signature handler") ErrNoUserCB = errors.New("nats: user callback not defined") ErrNkeyAndUser = errors.New("nats: user callback and nkey defined") ErrNkeysNotSupported = errors.New("nats: nkeys not supported by the server") ErrStaleConnection = errors.New("nats: " + STALE_CONNECTION) ErrTokenAlreadySet = errors.New("nats: token and token handler both set") ErrMsgNotBound = errors.New("nats: message is not bound to subscription/connection") ErrMsgNoReply = errors.New("nats: message does not have a reply") ErrClientIPNotSupported = errors.New("nats: client IP not supported by this server") ErrDisconnected = errors.New("nats: server is disconnected") ErrHeadersNotSupported = errors.New("nats: headers not supported by this server") ErrBadHeaderMsg = errors.New("nats: message could not decode headers") ErrNoResponders = errors.New("nats: no responders available for request") ErrNoContextOrTimeout = errors.New("nats: no context or timeout given") ErrPullModeNotAllowed = errors.New("nats: pull based not supported") ErrJetStreamNotEnabled = errors.New("nats: jetstream not enabled") ErrJetStreamBadPre = errors.New("nats: jetstream api prefix not valid") ErrNoStreamResponse = errors.New("nats: no response from stream") ErrNotJSMessage = errors.New("nats: not a jetstream message") ErrInvalidStreamName = errors.New("nats: invalid stream name") ErrInvalidDurableName = errors.New("nats: invalid durable name") ErrNoMatchingStream = errors.New("nats: no stream matches subject") ErrSubjectMismatch = errors.New("nats: subject does not match consumer") ErrContextAndTimeout = errors.New("nats: context and timeout can not both be set") ErrInvalidJSAck = errors.New("nats: invalid jetstream publish response") ErrMultiStreamUnsupported = errors.New("nats: multiple streams are not supported") ErrStreamNameRequired = errors.New("nats: stream name is required") ErrStreamNotFound = errors.New("nats: stream not found") ErrConsumerNotFound = errors.New("nats: consumer not found") ErrConsumerNameRequired = errors.New("nats: consumer name is required") ErrConsumerConfigRequired = errors.New("nats: consumer configuration is required") ErrStreamSnapshotConfigRequired = errors.New("nats: stream snapshot configuration is required") ErrDeliverSubjectRequired = errors.New("nats: deliver subject is required") ErrPullSubscribeToPushConsumer = errors.New("nats: cannot pull subscribe to push based consumer") ErrPullSubscribeRequired = errors.New("nats: must use pull subscribe to bind to pull based consumer") ErrConsumerNotActive = errors.New("nats: consumer not active") )
Errors
var DefaultOptions = GetDefaultOptions()
DEPRECATED: Use GetDefaultOptions() instead. DefaultOptions is not safe for use by multiple clients. For details see #308.
Functions ¶
func NewInbox ¶
func NewInbox() string
NewInbox will return an inbox string which can be used for directed replies from subscribers. These are guaranteed to be unique, but can be shared and subscribed to by others.
func RegisterEncoder ¶
RegisterEncoder will register the encType with the given Encoder. Useful for customization.
Types ¶
type AccountInfo ¶
type AccountInfo struct { Memory uint64 `json:"memory"` Store uint64 `json:"storage"` Streams int `json:"streams"` Consumers int `json:"consumers"` Domain string `json:"domain"` API APIStats `json:"api"` Limits AccountLimits `json:"limits"` }
AccountInfo contains info about the JetStream usage from the current account.
type AccountLimits ¶
type AccountLimits struct { MaxMemory int64 `json:"max_memory"` MaxStore int64 `json:"max_storage"` MaxStreams int `json:"max_streams"` MaxConsumers int `json:"max_consumers"` }
AccountLimits includes the JetStream limits of the current account.
type AckOpt ¶
type AckOpt interface {
// contains filtered or unexported methods
}
AckOpt are the options that can be passed when acknowledge a message.
Example ¶
AckOpt are the options that can be passed when acknowledge a message.
nc, err := nats.Connect("localhost") if err != nil { log.Fatal(err) } // Create JetStream context to produce/consumer messages that will be persisted. js, err := nc.JetStream() if err != nil { log.Fatal(err) } // Create stream to persist messages published on 'foo'. js.AddStream(&nats.StreamConfig{ Name: "FOO", Subjects: []string{"foo"}, }) // Publish is synchronous by default, and waits for a PubAck response. js.Publish("foo", []byte("Hello JS!")) sub, _ := js.SubscribeSync("foo") msg, _ := sub.NextMsg(2 * time.Second) // Ack and wait for 2 seconds msg.InProgress(nats.AckWait(2)) // Using a context. ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() msg.Ack(nats.Context(ctx))
Output:
type AckPolicy ¶
type AckPolicy int
AckPolicy determines how the consumer should acknowledge delivered messages.
func (AckPolicy) MarshalJSON ¶
func (*AckPolicy) UnmarshalJSON ¶
type AckWait ¶
AckWait sets the maximum amount of time we will wait for an ack.
Example ¶
nc, _ := nats.Connect("localhost") js, _ := nc.JetStream() // Set custom timeout for a JetStream API request. js.AddStream(&nats.StreamConfig{ Name: "FOO", Subjects: []string{"foo"}, }) // Wait for an ack response for 2 seconds. js.Publish("foo", []byte("Hello JS!"), nats.AckWait(2*time.Second)) // Create consumer on 'foo' subject that waits for an ack for 10s, // after which the message will be delivered. sub, _ := js.SubscribeSync("foo", nats.AckWait(10*time.Second)) msg, _ := sub.NextMsg(2 * time.Second) // Wait for ack of ack for 2s. msg.AckSync(nats.AckWait(2 * time.Second))
Output:
type AuthTokenHandler ¶
type AuthTokenHandler func() string
AuthTokenHandler is used to generate a new token.
type ClusterInfo ¶
type ClusterInfo struct { Name string `json:"name,omitempty"` Leader string `json:"leader,omitempty"` Replicas []*PeerInfo `json:"replicas,omitempty"` }
ClusterInfo shows information about the underlying set of servers that make up the stream or consumer.
type Conn ¶
type Conn struct { // Keep all members for which we use atomic at the beginning of the // struct and make sure they are all 64bits (or use padding if necessary). // atomic.* functions crash on 32bit machines if operand is not aligned // at 64bit. See https://github.com/golang/go/issues/599 Statistics // Opts holds the configuration of the Conn. // Modifying the configuration of a running Conn is a race. Opts Options // contains filtered or unexported fields }
A Conn represents a bare connection to a nats-server. It can send and receive []byte payloads. The connection is safe to use in multiple Go routines concurrently.
func Connect ¶
Connect will attempt to connect to the NATS system. The url can contain username/password semantics. e.g. nats://derek:pass@localhost:4222 Comma separated arrays are also supported, e.g. urlA, urlB. Options start with the defaults but can be overridden. To connect to a NATS Server's websocket port, use the `ws` or `wss` scheme, such as `ws://localhost:8080`. Note that websocket schemes cannot be mixed with others (nats/tls).
Example ¶
Shows different ways to create a Conn.
nc, _ := nats.Connect("demo.nats.io") nc.Close() nc, _ = nats.Connect("nats://derek:secretpassword@demo.nats.io:4222") nc.Close() nc, _ = nats.Connect("tls://derek:secretpassword@demo.nats.io:4443") nc.Close() opts := nats.Options{ AllowReconnect: true, MaxReconnect: 10, ReconnectWait: 5 * time.Second, Timeout: 1 * time.Second, } nc, _ = opts.Connect() nc.Close()
Output:
func (*Conn) AuthRequired ¶
AuthRequired will return if the connected server requires authorization.
func (*Conn) Barrier ¶
Barrier schedules the given function `f` to all registered asynchronous subscriptions. Only the last subscription to see this barrier will invoke the function. If no subscription is registered at the time of this call, `f()` is invoked right away. ErrConnectionClosed is returned if the connection is closed prior to the call.
func (*Conn) Buffered ¶
Buffered will return the number of bytes buffered to be sent to the server. FIXME(dlc) take into account disconnected state.
func (*Conn) ChanQueueSubscribe ¶
func (nc *Conn) ChanQueueSubscribe(subj, group string, ch chan *Msg) (*Subscription, error)
ChanQueueSubscribe will express interest in the given subject. All subscribers with the same queue name will form the queue group and only one member of the group will be selected to receive any given message, which will be placed on the channel. You should not close the channel until sub.Unsubscribe() has been called. Note: This is the same than QueueSubscribeSyncWithChan.
func (*Conn) ChanSubscribe ¶
func (nc *Conn) ChanSubscribe(subj string, ch chan *Msg) (*Subscription, error)
ChanSubscribe will express interest in the given subject and place all messages received on the channel. You should not close the channel until sub.Unsubscribe() has been called.
func (*Conn) Close ¶
func (nc *Conn) Close()
Close will close the connection to the server. This call will release all blocking calls, such as Flush() and NextMsg()
Example ¶
nc, _ := nats.Connect(nats.DefaultURL) nc.Close()
Output:
func (*Conn) ConnectedAddr ¶
ConnectedAddr returns the connected server's IP
func (*Conn) ConnectedClusterName ¶
ConnectedClusterName reports the connected server's cluster name if any
func (*Conn) ConnectedServerId ¶
ConnectedServerId reports the connected server's Id
func (*Conn) ConnectedServerName ¶
ConnectedServerName reports the connected server's name
func (*Conn) ConnectedUrl ¶
ConnectedUrl reports the connected server's URL
func (*Conn) DiscoveredServers ¶
DiscoveredServers returns only the server urls that have been discovered after a connection has been established. If authentication is enabled, use UserInfo or Token when connecting with these urls.
func (*Conn) Drain ¶
Drain will put a connection into a drain state. All subscriptions will immediately be put into a drain state. Upon completion, the publishers will be drained and can not publish any additional messages. Upon draining of the publishers, the connection will be closed. Use the ClosedCB() option to know when the connection has moved from draining to closed.
See note in Subscription.Drain for JetStream subscriptions.
func (*Conn) Flush ¶
Flush will perform a round trip to the server and return when it receives the internal reply.
Example ¶
nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() msg := &nats.Msg{Subject: "foo", Reply: "bar", Data: []byte("Hello World!")} for i := 0; i < 1000; i++ { nc.PublishMsg(msg) } err := nc.Flush() if err == nil { // Everything has been processed by the server for nc *Conn. }
Output:
func (*Conn) FlushTimeout ¶
FlushTimeout allows a Flush operation to have an associated timeout.
Example ¶
nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() msg := &nats.Msg{Subject: "foo", Reply: "bar", Data: []byte("Hello World!")} for i := 0; i < 1000; i++ { nc.PublishMsg(msg) } // Only wait for up to 1 second for Flush err := nc.FlushTimeout(1 * time.Second) if err == nil { // Everything has been processed by the server for nc *Conn. }
Output:
func (*Conn) FlushWithContext ¶
FlushWithContext will allow a context to control the duration of a Flush() call. This context should be non-nil and should have a deadline set. We will return an error if none is present.
func (*Conn) GetClientID ¶
GetClientID returns the client ID assigned by the server to which the client is currently connected to. Note that the value may change if the client reconnects. This function returns ErrClientIDNotSupported if the server is of a version prior to 1.2.0.
func (*Conn) GetClientIP ¶
GetClientIP returns the client IP as known by the server. Supported as of server version 2.1.6.
func (*Conn) HeadersSupported ¶
HeadersSupported will return if the server supports headers
func (*Conn) IsConnected ¶
IsConnected tests if a Conn is connected.
func (*Conn) IsDraining ¶
IsDraining tests if a Conn is in the draining state.
func (*Conn) IsReconnecting ¶
IsReconnecting tests if a Conn is reconnecting.
func (*Conn) JetStream ¶
func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error)
JetStream returns a JetStreamContext for messaging and stream management. Errors are only returned if inconsistent options are provided.
func (*Conn) LastError ¶
LastError reports the last error encountered via the connection. It can be used reliably within ClosedCB in order to find out reason why connection was closed for example.
func (*Conn) MaxPayload ¶
MaxPayload returns the size limit that a message payload can have. This is set by the server configuration and delivered to the client upon connect.
func (*Conn) NewRespInbox ¶
NewRespInbox is the new format used for _INBOX.
func (*Conn) NumSubscriptions ¶
NumSubscriptions returns active number of subscriptions.
func (*Conn) Publish ¶
Publish publishes the data argument to the given subject. The data argument is left untouched and needs to be correctly interpreted on the receiver.
Example ¶
nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() nc.Publish("foo", []byte("Hello World!"))
Output:
func (*Conn) PublishMsg ¶
PublishMsg publishes the Msg structure, which includes the Subject, an optional Reply and an optional Data field.
Example ¶
nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() msg := &nats.Msg{Subject: "foo", Reply: "bar", Data: []byte("Hello World!")} nc.PublishMsg(msg)
Output:
func (*Conn) PublishRequest ¶
PublishRequest will perform a Publish() expecting a response on the reply subject. Use Request() for automatically waiting for a response inline.
func (*Conn) QueueSubscribe ¶
func (nc *Conn) QueueSubscribe(subj, queue string, cb MsgHandler) (*Subscription, error)
QueueSubscribe creates an asynchronous queue subscriber on the given subject. All subscribers with the same queue name will form the queue group and only one member of the group will be selected to receive any given message asynchronously.
Example ¶
nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() received := 0 nc.QueueSubscribe("foo", "worker_group", func(_ *nats.Msg) { received++ })
Output:
func (*Conn) QueueSubscribeSync ¶
func (nc *Conn) QueueSubscribeSync(subj, queue string) (*Subscription, error)
QueueSubscribeSync creates a synchronous queue subscriber on the given subject. All subscribers with the same queue name will form the queue group and only one member of the group will be selected to receive any given message synchronously using Subscription.NextMsg().
func (*Conn) QueueSubscribeSyncWithChan ¶
func (nc *Conn) QueueSubscribeSyncWithChan(subj, queue string, ch chan *Msg) (*Subscription, error)
QueueSubscribeSyncWithChan will express interest in the given subject. All subscribers with the same queue name will form the queue group and only one member of the group will be selected to receive any given message, which will be placed on the channel. You should not close the channel until sub.Unsubscribe() has been called. Note: This is the same than ChanQueueSubscribe.
func (*Conn) Request ¶
Request will send a request payload and deliver the response message, or an error, including a timeout if no message was received properly.
Example ¶
nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() nc.Subscribe("foo", func(m *nats.Msg) { nc.Publish(m.Reply, []byte("I will help you")) }) nc.Request("foo", []byte("help"), 50*time.Millisecond)
Output:
func (*Conn) RequestMsg ¶
RequestMsg will send a request payload including optional headers and deliver the response message, or an error, including a timeout if no message was received properly.
func (*Conn) RequestMsgWithContext ¶
RequestMsgWithContext takes a context, a subject and payload in bytes and request expecting a single response.
func (*Conn) RequestWithContext ¶
RequestWithContext takes a context, a subject and payload in bytes and request expecting a single response.
func (*Conn) Servers ¶
Servers returns the list of known server urls, including additional servers discovered after a connection has been established. If authentication is enabled, use UserInfo or Token when connecting with these urls.
func (*Conn) SetClosedHandler ¶
func (nc *Conn) SetClosedHandler(cb ConnHandler)
SetClosedHandler will set the reconnect event handler.
func (*Conn) SetDisconnectErrHandler ¶
func (nc *Conn) SetDisconnectErrHandler(dcb ConnErrHandler)
SetDisconnectErrHandler will set the disconnect event handler.
func (*Conn) SetDisconnectHandler ¶
func (nc *Conn) SetDisconnectHandler(dcb ConnHandler)
SetDisconnectHandler will set the disconnect event handler. DEPRECATED: Use SetDisconnectErrHandler
func (*Conn) SetDiscoveredServersHandler ¶
func (nc *Conn) SetDiscoveredServersHandler(dscb ConnHandler)
SetDiscoveredServersHandler will set the discovered servers handler.
func (*Conn) SetErrorHandler ¶
func (nc *Conn) SetErrorHandler(cb ErrHandler)
SetErrorHandler will set the async error handler.
func (*Conn) SetReconnectHandler ¶
func (nc *Conn) SetReconnectHandler(rcb ConnHandler)
SetReconnectHandler will set the reconnect event handler.
func (*Conn) Stats ¶
func (nc *Conn) Stats() Statistics
Stats will return a race safe copy of the Statistics section for the connection.
func (*Conn) Subscribe ¶
func (nc *Conn) Subscribe(subj string, cb MsgHandler) (*Subscription, error)
Subscribe will express interest in the given subject. The subject can have wildcards (partial:*, full:>). Messages will be delivered to the associated MsgHandler.
Example ¶
This Example shows an asynchronous subscriber.
nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() nc.Subscribe("foo", func(m *nats.Msg) { fmt.Printf("Received a message: %s\n", string(m.Data)) })
Output:
func (*Conn) SubscribeSync ¶
func (nc *Conn) SubscribeSync(subj string) (*Subscription, error)
SubscribeSync will express interest on the given subject. Messages will be received synchronously using Subscription.NextMsg().
Example ¶
This Example shows a synchronous subscriber.
nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() sub, _ := nc.SubscribeSync("foo") m, err := sub.NextMsg(1 * time.Second) if err == nil { fmt.Printf("Received a message: %s\n", string(m.Data)) } else { fmt.Println("NextMsg timed out.") }
Output:
func (*Conn) TLSRequired ¶
TLSRequired will return if the connected server requires TLS connections.
type ConnErrHandler ¶
ConnErrHandler is used to process asynchronous events like disconnected connection with the error (if any).
type ConnHandler ¶
type ConnHandler func(*Conn)
ConnHandler is used for asynchronous events such as disconnected and closed connections.
type ConsumerConfig ¶
type ConsumerConfig struct { Durable string `json:"durable_name,omitempty"` Description string `json:"description,omitempty"` DeliverSubject string `json:"deliver_subject,omitempty"` DeliverGroup string `json:"deliver_group,omitempty"` DeliverPolicy DeliverPolicy `json:"deliver_policy"` OptStartSeq uint64 `json:"opt_start_seq,omitempty"` OptStartTime *time.Time `json:"opt_start_time,omitempty"` AckPolicy AckPolicy `json:"ack_policy"` AckWait time.Duration `json:"ack_wait,omitempty"` MaxDeliver int `json:"max_deliver,omitempty"` FilterSubject string `json:"filter_subject,omitempty"` ReplayPolicy ReplayPolicy `json:"replay_policy"` RateLimit uint64 `json:"rate_limit_bps,omitempty"` // Bits per sec SampleFrequency string `json:"sample_freq,omitempty"` MaxWaiting int `json:"max_waiting,omitempty"` MaxAckPending int `json:"max_ack_pending,omitempty"` FlowControl bool `json:"flow_control,omitempty"` Heartbeat time.Duration `json:"idle_heartbeat,omitempty"` }
ConsumerConfig is the configuration of a JetStream consumer.
type ConsumerInfo ¶
type ConsumerInfo struct { Stream string `json:"stream_name"` Name string `json:"name"` Created time.Time `json:"created"` Config ConsumerConfig `json:"config"` Delivered SequenceInfo `json:"delivered"` AckFloor SequenceInfo `json:"ack_floor"` NumAckPending int `json:"num_ack_pending"` NumRedelivered int `json:"num_redelivered"` NumWaiting int `json:"num_waiting"` NumPending uint64 `json:"num_pending"` Cluster *ClusterInfo `json:"cluster,omitempty"` PushBound bool `json:"push_bound,omitempty"` }
ConsumerInfo is the info from a JetStream consumer.
type ContextOpt ¶
ContextOpt is an option used to set a context.Context.
func Context ¶
func Context(ctx context.Context) ContextOpt
Context returns an option that can be used to configure a context for APIs that are context aware such as those part of the JetStream interface.
Example ¶
nc, err := nats.Connect("localhost") if err != nil { log.Fatal(err) } js, _ := nc.JetStream() // Base context ctx, cancel := context.WithCancel(context.Background()) defer cancel() // nats.Context option implements context.Context interface, so can be used // to create a new context from top level one. nctx := nats.Context(ctx) // JetStreamManager functions all can use context. js.AddStream(&nats.StreamConfig{ Name: "FOO", Subjects: []string{"foo"}, }, nctx) // Custom context with timeout tctx, tcancel := context.WithTimeout(nctx, 2*time.Second) defer tcancel() // Set a timeout for publishing using context. deadlineCtx := nats.Context(tctx) js.Publish("foo", []byte("Hello JS!"), deadlineCtx) sub, _ := js.SubscribeSync("foo") msg, _ := sub.NextMsgWithContext(deadlineCtx) // Acks can also use a context to await for a response. msg.Ack(deadlineCtx)
Output:
type CustomDialer ¶
CustomDialer can be used to specify any dialer, not necessarily a *net.Dialer.
type DeliverPolicy ¶
type DeliverPolicy int
DeliverPolicy determines how the consumer should select the first message to deliver.
const ( // DeliverAllPolicy starts delivering messages from the very beginning of a // stream. This is the default. DeliverAllPolicy DeliverPolicy = iota // DeliverLastPolicy will start the consumer with the last sequence // received. DeliverLastPolicy // DeliverNewPolicy will only deliver new messages that are sent after the // consumer is created. DeliverNewPolicy // DeliverByStartSequencePolicy will deliver messages starting from a given // sequence. DeliverByStartSequencePolicy // DeliverByStartTimePolicy will deliver messages starting from a given // time. DeliverByStartTimePolicy // DeliverLastPerSubjectPolicy will start the consumer with the last message // for all subjects received. DeliverLastPerSubjectPolicy )
func (DeliverPolicy) MarshalJSON ¶
func (p DeliverPolicy) MarshalJSON() ([]byte, error)
func (*DeliverPolicy) UnmarshalJSON ¶
func (p *DeliverPolicy) UnmarshalJSON(data []byte) error
type DiscardPolicy ¶
type DiscardPolicy int
DiscardPolicy determines how to proceed when limits of messages or bytes are reached.
const ( // DiscardOld will remove older messages to return to the limits. This is // the default. DiscardOld DiscardPolicy = iota //DiscardNew will fail to store new messages. DiscardNew )
func (DiscardPolicy) MarshalJSON ¶
func (dp DiscardPolicy) MarshalJSON() ([]byte, error)
func (DiscardPolicy) String ¶
func (dp DiscardPolicy) String() string
func (*DiscardPolicy) UnmarshalJSON ¶
func (dp *DiscardPolicy) UnmarshalJSON(data []byte) error
type EncodedConn ¶
EncodedConn are the preferred way to interface with NATS. They wrap a bare connection to a nats server and have an extendable encoder system that will encode and decode messages from raw Go types.
func NewEncodedConn ¶
func NewEncodedConn(c *Conn, encType string) (*EncodedConn, error)
NewEncodedConn will wrap an existing Connection and utilize the appropriate registered encoder.
Example ¶
Shows how to wrap a Conn into an EncodedConn
nc, _ := nats.Connect(nats.DefaultURL) c, _ := nats.NewEncodedConn(nc, "json") c.Close()
Output:
func (*EncodedConn) BindRecvChan ¶
func (c *EncodedConn) BindRecvChan(subject string, channel interface{}) (*Subscription, error)
BindRecvChan binds a channel for receive operations from NATS.
Example ¶
BindRecvChan() allows binding of a Go channel to a nats subject for subscribe operations. The Encoder attached to the EncodedConn will be used for un-marshaling.
nc, _ := nats.Connect(nats.DefaultURL) c, _ := nats.NewEncodedConn(nc, "json") defer c.Close() type person struct { Name string Address string Age int } ch := make(chan *person) c.BindRecvChan("hello", ch) me := &person{Name: "derek", Age: 22, Address: "85 Second St"} c.Publish("hello", me) // Receive the publish directly on a channel who := <-ch fmt.Printf("%v says hello!\n", who)
Output:
func (*EncodedConn) BindRecvQueueChan ¶
func (c *EncodedConn) BindRecvQueueChan(subject, queue string, channel interface{}) (*Subscription, error)
BindRecvQueueChan binds a channel for queue-based receive operations from NATS.
func (*EncodedConn) BindSendChan ¶
func (c *EncodedConn) BindSendChan(subject string, channel interface{}) error
BindSendChan binds a channel for send operations to NATS.
Example ¶
BindSendChan() allows binding of a Go channel to a nats subject for publish operations. The Encoder attached to the EncodedConn will be used for marshaling.
nc, _ := nats.Connect(nats.DefaultURL) c, _ := nats.NewEncodedConn(nc, "json") defer c.Close() type person struct { Name string Address string Age int } ch := make(chan *person) c.BindSendChan("hello", ch) me := &person{Name: "derek", Age: 22, Address: "85 Second St"} ch <- me
Output:
func (*EncodedConn) Close ¶
func (c *EncodedConn) Close()
Close will close the connection to the server. This call will release all blocking calls, such as Flush(), etc.
func (*EncodedConn) Drain ¶
func (c *EncodedConn) Drain() error
Drain will put a connection into a drain state. All subscriptions will immediately be put into a drain state. Upon completion, the publishers will be drained and can not publish any additional messages. Upon draining of the publishers, the connection will be closed. Use the ClosedCB() option to know when the connection has moved from draining to closed.
func (*EncodedConn) Flush ¶
func (c *EncodedConn) Flush() error
Flush will perform a round trip to the server and return when it receives the internal reply.
func (*EncodedConn) FlushTimeout ¶
func (c *EncodedConn) FlushTimeout(timeout time.Duration) (err error)
FlushTimeout allows a Flush operation to have an associated timeout.
func (*EncodedConn) LastError ¶
func (c *EncodedConn) LastError() error
LastError reports the last error encountered via the Connection.
func (*EncodedConn) Publish ¶
func (c *EncodedConn) Publish(subject string, v interface{}) error
Publish publishes the data argument to the given subject. The data argument will be encoded using the associated encoder.
Example ¶
EncodedConn can publish virtually anything just by passing it in. The encoder will be used to properly encode the raw Go type
nc, _ := nats.Connect(nats.DefaultURL) c, _ := nats.NewEncodedConn(nc, "json") defer c.Close() type person struct { Name string Address string Age int } me := &person{Name: "derek", Age: 22, Address: "85 Second St"} c.Publish("hello", me)
Output:
func (*EncodedConn) PublishRequest ¶
func (c *EncodedConn) PublishRequest(subject, reply string, v interface{}) error
PublishRequest will perform a Publish() expecting a response on the reply subject. Use Request() for automatically waiting for a response inline.
func (*EncodedConn) QueueSubscribe ¶
func (c *EncodedConn) QueueSubscribe(subject, queue string, cb Handler) (*Subscription, error)
QueueSubscribe will create a queue subscription on the given subject and process incoming messages using the specified Handler. The Handler should be a func that matches a signature from the description of Handler from above.
func (*EncodedConn) Request ¶
func (c *EncodedConn) Request(subject string, v interface{}, vPtr interface{}, timeout time.Duration) error
Request will create an Inbox and perform a Request() call with the Inbox reply for the data v. A response will be decoded into the vPtr Response.
func (*EncodedConn) RequestWithContext ¶
func (c *EncodedConn) RequestWithContext(ctx context.Context, subject string, v interface{}, vPtr interface{}) error
RequestWithContext will create an Inbox and perform a Request using the provided cancellation context with the Inbox reply for the data v. A response will be decoded into the vPtrResponse.
func (*EncodedConn) Subscribe ¶
func (c *EncodedConn) Subscribe(subject string, cb Handler) (*Subscription, error)
Subscribe will create a subscription on the given subject and process incoming messages using the specified Handler. The Handler should be a func that matches a signature from the description of Handler from above.
Example ¶
EncodedConn's subscribers will automatically decode the wire data into the requested Go type using the Decode() method of the registered Encoder. The callback signature can also vary to include additional data, such as subject and reply subjects.
nc, _ := nats.Connect(nats.DefaultURL) c, _ := nats.NewEncodedConn(nc, "json") defer c.Close() type person struct { Name string Address string Age int } c.Subscribe("hello", func(p *person) { fmt.Printf("Received a person! %+v\n", p) }) c.Subscribe("hello", func(subj, reply string, p *person) { fmt.Printf("Received a person on subject %s! %+v\n", subj, p) }) me := &person{Name: "derek", Age: 22, Address: "85 Second St"} c.Publish("hello", me)
Output:
type Encoder ¶
type Encoder interface { Encode(subject string, v interface{}) ([]byte, error) Decode(subject string, data []byte, vPtr interface{}) error }
Encoder interface is for all register encoders
func EncoderForType ¶
EncoderForType will return the registered Encoder for the encType.
type ErrConsumerSequenceMismatch ¶
type ErrConsumerSequenceMismatch struct { // StreamResumeSequence is the stream sequence from where the consumer // should resume consuming from the stream. StreamResumeSequence uint64 // ConsumerSequence is the sequence of the consumer that is behind. ConsumerSequence uint64 // LastConsumerSequence is the sequence of the consumer when the heartbeat // was received. LastConsumerSequence uint64 }
ErrConsumerSequenceMismatch represents an error from a consumer that received a Heartbeat including sequence different to the one expected from the view of the client.
func (*ErrConsumerSequenceMismatch) Error ¶
func (ecs *ErrConsumerSequenceMismatch) Error() string
type ErrHandler ¶
type ErrHandler func(*Conn, *Subscription, error)
ErrHandler is used to process asynchronous errors encountered while processing inbound messages.
type ExternalStream ¶
ExternalStream allows you to qualify access to a stream source in another account.
type Handler ¶
type Handler interface{}
Handler is a specific callback used for Subscribe. It is generalized to an interface{}, but we will discover its format and arguments at runtime and perform the correct callback, including de-marshaling encoded data back into the appropriate struct based on the signature of the Handler.
Handlers are expected to have one of four signatures.
type person struct { Name string `json:"name,omitempty"` Age uint `json:"age,omitempty"` } handler := func(m *Msg) handler := func(p *person) handler := func(subject string, o *obj) handler := func(subject, reply string, o *obj)
These forms allow a callback to request a raw Msg ptr, where the processing of the message from the wire is untouched. Process a JSON representation and demarshal it into the given struct, e.g. person. There are also variants where the callback wants either the subject, or the subject and the reply subject.
type Header ¶
Header represents the optional Header for a NATS message, based on the implementation of http.Header.
func (Header) Add ¶
Add adds the key, value pair to the header. It is case-sensitive and appends to any existing values associated with key.
type JSOpt ¶
type JSOpt interface {
// contains filtered or unexported methods
}
JSOpt configures a JetStreamContext.
Example ¶
A JetStream context can be configured with a default timeout using nats.MaxWait or with a custom API prefix in case of using an imported JetStream from another account.
nc, err := nats.Connect("localhost") if err != nil { log.Fatal(err) } // Use the JetStream context to manage streams and consumers (with nats.APIPrefix JSOpt) js, err := nc.JetStream(nats.APIPrefix("dlc"), nats.MaxWait(5*time.Second)) if err != nil { log.Fatal(err) } sub, _ := js.SubscribeSync("foo") js.Publish("foo", []byte("Hello JS!")) sub.NextMsg(2 * time.Second)
Output:
func PublishAsyncErrHandler ¶
func PublishAsyncErrHandler(cb MsgErrHandler) JSOpt
PublishAsyncErrHandler sets the error handler for async publishes in JetStream.
func PublishAsyncMaxPending ¶
PublishAsyncMaxPending sets the maximum outstanding async publishes that can be inflight at one time.
type JetStream ¶
type JetStream interface { // Publish publishes a message to JetStream. Publish(subj string, data []byte, opts ...PubOpt) (*PubAck, error) // PublishMsg publishes a Msg to JetStream. PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) // PublishAsync publishes a message to JetStream and returns a PubAckFuture. // The data should not be changed until the PubAckFuture has been processed. PublishAsync(subj string, data []byte, opts ...PubOpt) (PubAckFuture, error) // PublishMsgAsync publishes a Msg to JetStream and returms a PubAckFuture. // The message should not be changed until the PubAckFuture has been processed. PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) // PublishAsyncPending returns the number of async publishes outstanding for this context. PublishAsyncPending() int // PublishAsyncComplete returns a channel that will be closed when all outstanding messages are ack'd. PublishAsyncComplete() <-chan struct{} // Subscribe creates an async Subscription for JetStream. Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) // SubscribeSync creates a Subscription that can be used to process messages synchronously. SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error) // ChanSubscribe creates channel based Subscription. ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) // ChanQueueSubscribe creates channel based Subscription with a queue group. ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) // QueueSubscribe creates a Subscription with a queue group. QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) // QueueSubscribeSync creates a Subscription with a queue group that can be used to process messages synchronously. QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error) // PullSubscribe creates a Subscription that can fetch messages. PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error) }
JetStream allows persistent messaging through JetStream.
Example ¶
nc, err := nats.Connect("localhost") if err != nil { log.Fatal(err) } // Use the JetStream context to produce and consumer messages // that have been persisted. js, err := nc.JetStream(nats.PublishAsyncMaxPending(256)) if err != nil { log.Fatal(err) } js.AddStream(&nats.StreamConfig{ Name: "FOO", Subjects: []string{"foo"}, }) js.Publish("foo", []byte("Hello JS!")) // Publish messages asynchronously. for i := 0; i < 500; i++ { js.PublishAsync("foo", []byte("Hello JS Async!")) } select { case <-js.PublishAsyncComplete(): case <-time.After(5 * time.Second): fmt.Println("Did not resolve in time") } // Create async consumer on subject 'foo'. Async subscribers // ack a message once exiting the callback. js.Subscribe("foo", func(msg *nats.Msg) { meta, _ := msg.Metadata() fmt.Printf("Stream Sequence : %v\n", meta.Sequence.Stream) fmt.Printf("Consumer Sequence: %v\n", meta.Sequence.Consumer) }) // Async subscriber with manual acks. js.Subscribe("foo", func(msg *nats.Msg) { msg.Ack() }, nats.ManualAck()) // Async queue subscription where members load balance the // received messages together. // If no consumer name is specified, either with nats.Bind() // or nats.Durable() options, the queue name is used as the // durable name (that is, as if you were passing the // nats.Durable(<queue group name>) option. // It is recommended to use nats.Bind() or nats.Durable() // and preferably create the JetStream consumer beforehand // (using js.AddConsumer) so that the JS consumer is not // deleted on an Unsubscribe() or Drain() when the member // that created the consumer goes away first. // Check Godoc for the QueueSubscribe() API for more details. js.QueueSubscribe("foo", "group", func(msg *nats.Msg) { msg.Ack() }, nats.ManualAck()) // Subscriber to consume messages synchronously. sub, _ := js.SubscribeSync("foo") msg, _ := sub.NextMsg(2 * time.Second) msg.Ack() // We can add a member to the group, with this member using // the synchronous version of the QueueSubscribe. sub, _ = js.QueueSubscribeSync("foo", "group") msg, _ = sub.NextMsg(2 * time.Second) msg.Ack() // ChanSubscribe msgCh := make(chan *nats.Msg, 8192) sub, _ = js.ChanSubscribe("foo", msgCh) select { case msg := <-msgCh: fmt.Println("[Received]", msg) case <-time.After(1 * time.Second): } // Create Pull based consumer with maximum 128 inflight. sub, _ = js.PullSubscribe("foo", "wq", nats.PullMaxWaiting(128)) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() for { select { case <-ctx.Done(): return default: } msgs, _ := sub.Fetch(10, nats.Context(ctx)) for _, msg := range msgs { msg.Ack() } }
Output:
type JetStreamContext ¶
type JetStreamContext interface { JetStream JetStreamManager }
JetStreamContext allows JetStream messaging and stream management.
Example ¶
A JetStreamContext is the composition of a JetStream and JetStreamManagement interfaces. In case of only requiring publishing/consuming messages, can create a context that only uses the JetStream interface.
nc, _ := nats.Connect("localhost") var js nats.JetStream var jsm nats.JetStreamManager var jsctx nats.JetStreamContext // JetStream that can publish/subscribe but cannot manage streams. js, _ = nc.JetStream() js.Publish("foo", []byte("hello")) // JetStream context that can manage streams/consumers but cannot produce messages. jsm, _ = nc.JetStream() jsm.AddStream(&nats.StreamConfig{Name: "FOO"}) // JetStream context that can both manage streams/consumers // as well as publish/subscribe. jsctx, _ = nc.JetStream() jsctx.AddStream(&nats.StreamConfig{Name: "BAR"}) jsctx.Publish("bar", []byte("hello world"))
Output:
type JetStreamManager ¶
type JetStreamManager interface { // AddStream creates a stream. AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) // UpdateStream updates a stream. UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) // DeleteStream deletes a stream. DeleteStream(name string, opts ...JSOpt) error // StreamInfo retrieves information from a stream. StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) // PurgeStream purges a stream messages. PurgeStream(name string, opts ...JSOpt) error // StreamsInfo can be used to retrieve a list of StreamInfo objects. StreamsInfo(opts ...JSOpt) <-chan *StreamInfo // StreamNames is used to retrieve a list of Stream names. StreamNames(opts ...JSOpt) <-chan string // GetMsg retrieves a raw stream message stored in JetStream by sequence number. GetMsg(name string, seq uint64, opts ...JSOpt) (*RawStreamMsg, error) // DeleteMsg erases a message from a stream. DeleteMsg(name string, seq uint64, opts ...JSOpt) error // AddConsumer adds a consumer to a stream. AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) // DeleteConsumer deletes a consumer. DeleteConsumer(stream, consumer string, opts ...JSOpt) error // ConsumerInfo retrieves information of a consumer from a stream. ConsumerInfo(stream, name string, opts ...JSOpt) (*ConsumerInfo, error) // ConsumersInfo is used to retrieve a list of ConsumerInfo objects. ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo // ConsumerNames is used to retrieve a list of Consumer names. ConsumerNames(stream string, opts ...JSOpt) <-chan string // AccountInfo retrieves info about the JetStream usage from an account. AccountInfo(opts ...JSOpt) (*AccountInfo, error) }
JetStreamManager manages JetStream Streams and Consumers.
Example ¶
nc, _ := nats.Connect("localhost") js, _ := nc.JetStream() // Create a stream js.AddStream(&nats.StreamConfig{ Name: "FOO", Subjects: []string{"foo"}, MaxBytes: 1024, }) // Update a stream js.UpdateStream(&nats.StreamConfig{ Name: "FOO", MaxBytes: 2048, }) // Create a druable consumer js.AddConsumer("FOO", &nats.ConsumerConfig{ Durable: "BAR", }) // Get information about all streams (with Context JSOpt) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() for info := range js.StreamsInfo(nats.Context(ctx)) { fmt.Println("stream name:", info.Config.Name) } // Get information about all consumers (with MaxWait JSOpt) for info := range js.ConsumersInfo("FOO", nats.MaxWait(10*time.Second)) { fmt.Println("consumer name:", info.Name) } // Delete a consumer js.DeleteConsumer("FOO", "BAR") // Delete a stream js.DeleteStream("FOO")
Output:
type MaxWait ¶
MaxWait sets the maximum amount of time we will wait for a response.
Example ¶
nc, _ := nats.Connect("localhost") // Set default timeout for JetStream API requests, // following requests will inherit this timeout. js, _ := nc.JetStream(nats.MaxWait(3 * time.Second)) // Set custom timeout for a JetStream API request. js.AddStream(&nats.StreamConfig{ Name: "FOO", Subjects: []string{"foo"}, }, nats.MaxWait(2*time.Second)) sub, _ := js.PullSubscribe("foo", "my-durable-name") // Fetch using the default timeout of 3 seconds. msgs, _ := sub.Fetch(1) // Set custom timeout for a pull batch request. msgs, _ = sub.Fetch(1, nats.MaxWait(2*time.Second)) for _, msg := range msgs { msg.Ack() }
Output:
type Msg ¶
type Msg struct { Subject string Reply string Header Header Data []byte Sub *Subscription // contains filtered or unexported fields }
Msg represents a message delivered by NATS. This structure is used by Subscribers and PublishMsg().
Types of Acknowledgements ¶
In case using JetStream, there are multiple ways to ack a Msg:
// Acknowledgement that a message has been processed. msg.Ack() // Negatively acknowledges a message. msg.Nak() // Terminate a message so that it is not redelivered further. msg.Term() // Signal the server that the message is being worked on and reset redelivery timer. msg.InProgress()
func (*Msg) Ack ¶
Ack acknowledges a message. This tells the server that the message was successfully processed and it can move on to the next message.
func (*Msg) AckSync ¶
AckSync is the synchronous version of Ack. This indicates successful message processing.
Example ¶
nc, _ := nats.Connect("localhost") js, _ := nc.JetStream() // Set custom timeout for a JetStream API request. js.AddStream(&nats.StreamConfig{ Name: "FOO", Subjects: []string{"foo"}, }) sub, _ := js.SubscribeSync("foo") msg, _ := sub.NextMsg(2 * time.Second) // Wait for ack of an ack. msg.AckSync()
Output:
func (*Msg) InProgress ¶
InProgress tells the server that this message is being worked on. It resets the redelivery timer on the server.
func (*Msg) Metadata ¶
func (m *Msg) Metadata() (*MsgMetadata, error)
Metadata retrieves the metadata from a JetStream message. This method will return an error for non-JetStream Msgs.
Example ¶
When a message has been delivered by JetStream, it will be possible to access some of its metadata such as sequence numbers.
nc, _ := nats.Connect("localhost") js, _ := nc.JetStream() // Set custom timeout for a JetStream API request. js.AddStream(&nats.StreamConfig{ Name: "FOO", Subjects: []string{"foo"}, }) js.Publish("foo", []byte("hello")) sub, _ := js.SubscribeSync("foo") msg, _ := sub.NextMsg(2 * time.Second) // meta, _ := msg.Metadata() // Stream and Consumer sequences. fmt.Printf("Stream seq: %s:%d, Consumer seq: %s:%d\n", meta.Stream, meta.Sequence.Stream, meta.Consumer, meta.Sequence.Consumer) fmt.Printf("Pending: %d\n", meta.NumPending) fmt.Printf("Pending: %d\n", meta.NumDelivered)
Output:
func (*Msg) Nak ¶
Nak negatively acknowledges a message. This tells the server to redeliver the message. You can configure the number of redeliveries by passing nats.MaxDeliver when you Subscribe. The default is infinite redeliveries.
func (*Msg) Respond ¶
Respond allows a convenient way to respond to requests in service based subscriptions.
func (*Msg) RespondMsg ¶
RespondMsg allows a convenient way to respond to requests in service based subscriptions that might include headers
type MsgErrHandler ¶
MsgErrHandler is used to process asynchronous errors from JetStream PublishAsync and PublishAsynMsg. It will return the original message sent to the server for possible retransmitting and the error encountered.
type MsgHandler ¶
type MsgHandler func(msg *Msg)
MsgHandler is a callback function that processes messages delivered to asynchronous subscribers.
type MsgMetadata ¶
type MsgMetadata struct { Sequence SequencePair NumDelivered uint64 NumPending uint64 Timestamp time.Time Stream string Consumer string Domain string }
MsgMetadata is the JetStream metadata associated with received messages.
type Option ¶
Option is a function on the options for a connection.
func ClientCert ¶
ClientCert is a helper option to provide the client certificate from a file. If Secure is not already set this will set it as well.
func ClosedHandler ¶
func ClosedHandler(cb ConnHandler) Option
ClosedHandler is an Option to set the closed handler.
func Compression ¶
Compression is an Option to indicate if this connection supports compression. Currently only supported for Websocket connections.
func CustomInboxPrefix ¶
CustomInboxPrefix configures the request + reply inbox prefix
func CustomReconnectDelay ¶
func CustomReconnectDelay(cb ReconnectDelayHandler) Option
CustomReconnectDelay is an Option to set the CustomReconnectDelayCB option. See CustomReconnectDelayCB Option for more details.
func Dialer ¶
Dialer is an Option to set the dialer which will be used when attempting to establish a connection. DEPRECATED: Should use CustomDialer instead.
func DisconnectErrHandler ¶
func DisconnectErrHandler(cb ConnErrHandler) Option
DisconnectErrHandler is an Option to set the disconnected error handler.
func DisconnectHandler ¶
func DisconnectHandler(cb ConnHandler) Option
DisconnectHandler is an Option to set the disconnected handler. DEPRECATED: Use DisconnectErrHandler.
func DiscoveredServersHandler ¶
func DiscoveredServersHandler(cb ConnHandler) Option
DiscoveredServersHandler is an Option to set the new servers handler.
func DontRandomize ¶
func DontRandomize() Option
DontRandomize is an Option to turn off randomizing the server pool.
func DrainTimeout ¶
DrainTimeout is an Option to set the timeout for draining a connection.
func ErrorHandler ¶
func ErrorHandler(cb ErrHandler) Option
ErrorHandler is an Option to set the async error handler.
func FlusherTimeout ¶
FlusherTimeout is an Option to set the write (and flush) timeout on a connection.
func LameDuckModeHandler ¶
func LameDuckModeHandler(cb ConnHandler) Option
LameDuckModeHandler sets the callback to invoke when the server notifies the connection that it entered lame duck mode, that is, going to gradually disconnect all its connections before shuting down. This is often used in deployments when upgrading NATS Servers.
func MaxPingsOutstanding ¶
MaxPingsOutstanding is an Option to set the maximum number of ping requests that can go un-answered by the server before closing the connection.
func MaxReconnects ¶
MaxReconnects is an Option to set the maximum number of reconnect attempts.
func Nkey ¶
func Nkey(pubKey string, sigCB SignatureHandler) Option
Nkey will set the public Nkey and the signature callback to sign the server nonce.
func NkeyOptionFromSeed ¶
NkeyOptionFromSeed will load an nkey pair from a seed file. It will return the NKey Option and will handle signing of nonce challenges from the server. It will take care to not hold keys in memory and to wipe memory.
func NoCallbacksAfterClientClose ¶
func NoCallbacksAfterClientClose() Option
NoCallbacksAfterClientClose is an Option to disable callbacks when user code calls Close(). If close is initiated by any other condition, callbacks if any will be invoked.
func NoEcho ¶
func NoEcho() Option
NoEcho is an Option to turn off messages echoing back from a server. Note this is supported on servers >= version 1.2. Proto 1 or greater.
func NoReconnect ¶
func NoReconnect() Option
NoReconnect is an Option to turn off reconnect behavior.
func PingInterval ¶
PingInterval is an Option to set the period for client ping commands.
func ReconnectBufSize ¶
ReconnectBufSize sets the buffer size of messages kept while busy reconnecting.
func ReconnectHandler ¶
func ReconnectHandler(cb ConnHandler) Option
ReconnectHandler is an Option to set the reconnected handler.
func ReconnectJitter ¶
ReconnectJitter is an Option to set the upper bound of a random delay added ReconnectWait.
func ReconnectWait ¶
ReconnectWait is an Option to set the wait time between reconnect attempts.
func RetryOnFailedConnect ¶
RetryOnFailedConnect sets the connection in reconnecting state right away if it can't connect to a server in the initial set. See RetryOnFailedConnect option for more details.
func RootCAs ¶
RootCAs is a helper option to provide the RootCAs pool from a list of filenames. If Secure is not already set this will set it as well.
func Secure ¶
Secure is an Option to enable TLS secure connections that skip server verification by default. Pass a TLS Configuration for proper TLS. NOTE: This should NOT be used in a production setting.
func SetCustomDialer ¶
func SetCustomDialer(dialer CustomDialer) Option
SetCustomDialer is an Option to set a custom dialer which will be used when attempting to establish a connection. If both Dialer and CustomDialer are specified, CustomDialer takes precedence.
func SyncQueueLen ¶
SyncQueueLen will set the maximum queue len for the internal channel used for SubscribeSync().
func Token ¶
Token is an Option to set the token to use when a token is not included directly in the URLs and when a token handler is not provided.
func TokenHandler ¶
func TokenHandler(cb AuthTokenHandler) Option
TokenHandler is an Option to set the token handler to use when a token is not included directly in the URLs and when a token is not set.
func UseOldRequestStyle ¶
func UseOldRequestStyle() Option
UseOldRequestStyle is an Option to force usage of the old Request style.
func UserCredentials ¶
UserCredentials is a convenience function that takes a filename for a user's JWT and a filename for the user's private Nkey seed.
func UserInfo ¶
UserInfo is an Option to set the username and password to use when not included directly in the URLs.
func UserJWT ¶
func UserJWT(userCB UserJWTHandler, sigCB SignatureHandler) Option
UserJWT will set the callbacks to retrieve the user's JWT and the signature callback to sign the server nonce. This an the Nkey option are mutually exclusive.
type Options ¶
type Options struct { // Url represents a single NATS server url to which the client // will be connecting. If the Servers option is also set, it // then becomes the first server in the Servers array. Url string // Servers is a configured set of servers which this client // will use when attempting to connect. Servers []string // NoRandomize configures whether we will randomize the // server pool. NoRandomize bool // NoEcho configures whether the server will echo back messages // that are sent on this connection if we also have matching subscriptions. // Note this is supported on servers >= version 1.2. Proto 1 or greater. NoEcho bool // Name is an optional name label which will be sent to the server // on CONNECT to identify the client. Name string // Verbose signals the server to send an OK ack for commands // successfully processed by the server. Verbose bool // Pedantic signals the server whether it should be doing further // validation of subjects. Pedantic bool // Secure enables TLS secure connections that skip server // verification by default. NOT RECOMMENDED. Secure bool // TLSConfig is a custom TLS configuration to use for secure // transports. TLSConfig *tls.Config // AllowReconnect enables reconnection logic to be used when we // encounter a disconnect from the current server. AllowReconnect bool // MaxReconnect sets the number of reconnect attempts that will be // tried before giving up. If negative, then it will never give up // trying to reconnect. MaxReconnect int // ReconnectWait sets the time to backoff after attempting a reconnect // to a server that we were already connected to previously. ReconnectWait time.Duration // CustomReconnectDelayCB is invoked after the library tried every // URL in the server list and failed to reconnect. It passes to the // user the current number of attempts. This function returns the // amount of time the library will sleep before attempting to reconnect // again. It is strongly recommended that this value contains some // jitter to prevent all connections to attempt reconnecting at the same time. CustomReconnectDelayCB ReconnectDelayHandler // ReconnectJitter sets the upper bound for a random delay added to // ReconnectWait during a reconnect when no TLS is used. // Note that any jitter is capped with ReconnectJitterMax. ReconnectJitter time.Duration // ReconnectJitterTLS sets the upper bound for a random delay added to // ReconnectWait during a reconnect when TLS is used. // Note that any jitter is capped with ReconnectJitterMax. ReconnectJitterTLS time.Duration // Timeout sets the timeout for a Dial operation on a connection. Timeout time.Duration // DrainTimeout sets the timeout for a Drain Operation to complete. DrainTimeout time.Duration // FlusherTimeout is the maximum time to wait for write operations // to the underlying connection to complete (including the flusher loop). FlusherTimeout time.Duration // PingInterval is the period at which the client will be sending ping // commands to the server, disabled if 0 or negative. PingInterval time.Duration // MaxPingsOut is the maximum number of pending ping commands that can // be awaiting a response before raising an ErrStaleConnection error. MaxPingsOut int // ClosedCB sets the closed handler that is called when a client will // no longer be connected. ClosedCB ConnHandler // DisconnectedCB sets the disconnected handler that is called // whenever the connection is disconnected. // Will not be called if DisconnectedErrCB is set // DEPRECATED. Use DisconnectedErrCB which passes error that caused // the disconnect event. DisconnectedCB ConnHandler // DisconnectedErrCB sets the disconnected error handler that is called // whenever the connection is disconnected. // Disconnected error could be nil, for instance when user explicitly closes the connection. // DisconnectedCB will not be called if DisconnectedErrCB is set DisconnectedErrCB ConnErrHandler // ReconnectedCB sets the reconnected handler called whenever // the connection is successfully reconnected. ReconnectedCB ConnHandler // DiscoveredServersCB sets the callback that is invoked whenever a new // server has joined the cluster. DiscoveredServersCB ConnHandler // AsyncErrorCB sets the async error handler (e.g. slow consumer errors) AsyncErrorCB ErrHandler // ReconnectBufSize is the size of the backing bufio during reconnect. // Once this has been exhausted publish operations will return an error. ReconnectBufSize int // SubChanLen is the size of the buffered channel used between the socket // Go routine and the message delivery for SyncSubscriptions. // NOTE: This does not affect AsyncSubscriptions which are // dictated by PendingLimits() SubChanLen int // UserJWT sets the callback handler that will fetch a user's JWT. UserJWT UserJWTHandler // Nkey sets the public nkey that will be used to authenticate // when connecting to the server. UserJWT and Nkey are mutually exclusive // and if defined, UserJWT will take precedence. Nkey string // SignatureCB designates the function used to sign the nonce // presented from the server. SignatureCB SignatureHandler // User sets the username to be used when connecting to the server. User string // Password sets the password to be used when connecting to a server. Password string // Token sets the token to be used when connecting to a server. Token string // TokenHandler designates the function used to generate the token to be used when connecting to a server. TokenHandler AuthTokenHandler // Dialer allows a custom net.Dialer when forming connections. // DEPRECATED: should use CustomDialer instead. Dialer *net.Dialer // CustomDialer allows to specify a custom dialer (not necessarily // a *net.Dialer). CustomDialer CustomDialer // UseOldRequestStyle forces the old method of Requests that utilize // a new Inbox and a new Subscription for each request. UseOldRequestStyle bool // NoCallbacksAfterClientClose allows preventing the invocation of // callbacks after Close() is called. Client won't receive notifications // when Close is invoked by user code. Default is to invoke the callbacks. NoCallbacksAfterClientClose bool // LameDuckModeHandler sets the callback to invoke when the server notifies // the connection that it entered lame duck mode, that is, going to // gradually disconnect all its connections before shuting down. This is // often used in deployments when upgrading NATS Servers. LameDuckModeHandler ConnHandler // RetryOnFailedConnect sets the connection in reconnecting state right // away if it can't connect to a server in the initial set. The // MaxReconnect and ReconnectWait options are used for this process, // similarly to when an established connection is disconnected. // If a ReconnectHandler is set, it will be invoked when the connection // is established, and if a ClosedHandler is set, it will be invoked if // it fails to connect (after exhausting the MaxReconnect attempts). RetryOnFailedConnect bool // For websocket connections, indicates to the server that the connection // supports compression. If the server does too, then data will be compressed. Compression bool // InboxPrefix allows the default _INBOX prefix to be customized InboxPrefix string }
Options can be used to create a customized connection.
func GetDefaultOptions ¶
func GetDefaultOptions() Options
GetDefaultOptions returns default configuration options for the client.
type PeerInfo ¶
type PeerInfo struct { Name string `json:"name"` Current bool `json:"current"` Offline bool `json:"offline,omitempty"` Active time.Duration `json:"active"` Lag uint64 `json:"lag,omitempty"` }
PeerInfo shows information about all the peers in the cluster that are supporting the stream or consumer.
type PubAck ¶
type PubAck struct { Stream string `json:"stream"` Sequence uint64 `json:"seq"` Duplicate bool `json:"duplicate,omitempty"` Domain string `json:"domain,omitempty"` }
PubAck is an ack received after successfully publishing a message.
type PubAckFuture ¶
type PubAckFuture interface { // Ok returns a receive only channel that can be used to get a PubAck. Ok() <-chan *PubAck // Err returns a receive only channel that can be used to get the error from an async publish. Err() <-chan error // Msg returns the message that was sent to the server. Msg() *Msg }
PubAckFuture is a future for a PubAck.
type PubOpt ¶
type PubOpt interface {
// contains filtered or unexported methods
}
PubOpt configures options for publishing JetStream messages.
Example ¶
nc, err := nats.Connect("localhost") if err != nil { log.Fatal(err) } // Create JetStream context to produce/consumer messages that will be persisted. js, err := nc.JetStream() if err != nil { log.Fatal(err) } // Create stream to persist messages published on 'foo'. js.AddStream(&nats.StreamConfig{ Name: "FOO", Subjects: []string{"foo"}, }) // Publish is synchronous by default, and waits for a PubAck response. js.Publish("foo", []byte("Hello JS!")) // Publish with a custom timeout. js.Publish("foo", []byte("Hello JS!"), nats.AckWait(500*time.Millisecond)) // Publish with a context. ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() js.Publish("foo", []byte("Hello JS!"), nats.Context(ctx)) // Publish and assert the expected stream name. js.Publish("foo", []byte("Hello JS!"), nats.ExpectStream("FOO")) // Publish and assert the last sequence. js.Publish("foo", []byte("Hello JS!"), nats.ExpectLastSequence(5)) // Publish and tag the message with an ID. js.Publish("foo", []byte("Hello JS!"), nats.MsgId("foo:6")) // Publish and assert the last msg ID. js.Publish("foo", []byte("Hello JS!"), nats.ExpectLastMsgId("foo:6"))
Output:
func ExpectLastMsgId ¶
ExpectLastMsgId sets the expected last msgId in the response from the publish.
func ExpectLastSequence ¶
ExpectLastSequence sets the expected sequence in the response from the publish.
func ExpectLastSequencePerSubject ¶
ExpectLastSequencePerSubject sets the expected sequence per subject in the response from the publish.
func ExpectStream ¶
ExpectStream sets the expected stream to respond from the publish.
type PullOpt ¶
type PullOpt interface {
// contains filtered or unexported methods
}
PullOpt are the options that can be passed when pulling a batch of messages.
Example ¶
nc, err := nats.Connect("localhost") if err != nil { log.Fatal(err) } // Create JetStream context to produce/consumer messages that will be persisted. js, err := nc.JetStream() if err != nil { log.Fatal(err) } // Create stream to persist messages published on 'foo'. js.AddStream(&nats.StreamConfig{ Name: "FOO", Subjects: []string{"foo"}, }) // Publish is synchronous by default, and waits for a PubAck response. js.Publish("foo", []byte("Hello JS!")) sub, _ := js.PullSubscribe("foo", "wq") // Pull one message, msgs, _ := sub.Fetch(1, nats.MaxWait(2*time.Second)) for _, msg := range msgs { msg.Ack() } // Using a context to timeout waiting for a message. ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() msgs, _ = sub.Fetch(1, nats.Context(ctx)) for _, msg := range msgs { msg.Ack() }
Output:
type RawStreamMsg ¶
type RawStreamMsg struct { Subject string Sequence uint64 Header Header Data []byte Time time.Time }
RawStreamMsg is a raw message stored in JetStream.
type ReconnectDelayHandler ¶
ReconnectDelayHandler is used to get from the user the desired delay the library should pause before attempting to reconnect again. Note that this is invoked after the library tried the whole list of URLs and failed to reconnect.
type ReplayPolicy ¶
type ReplayPolicy int
ReplayPolicy determines how the consumer should replay messages it already has queued in the stream.
const ( // ReplayInstantPolicy will replay messages as fast as possible. ReplayInstantPolicy ReplayPolicy = iota // ReplayOriginalPolicy will maintain the same timing as the messages were received. ReplayOriginalPolicy )
func (ReplayPolicy) MarshalJSON ¶
func (p ReplayPolicy) MarshalJSON() ([]byte, error)
func (*ReplayPolicy) UnmarshalJSON ¶
func (p *ReplayPolicy) UnmarshalJSON(data []byte) error
type RetentionPolicy ¶
type RetentionPolicy int
RetentionPolicy determines how messages in a set are retained.
const ( // LimitsPolicy (default) means that messages are retained until any given limit is reached. // This could be one of MaxMsgs, MaxBytes, or MaxAge. LimitsPolicy RetentionPolicy = iota // InterestPolicy specifies that when all known observables have acknowledged a message it can be removed. InterestPolicy // WorkQueuePolicy specifies that when the first worker or subscriber acknowledges the message it can be removed. WorkQueuePolicy )
func (RetentionPolicy) MarshalJSON ¶
func (rp RetentionPolicy) MarshalJSON() ([]byte, error)
func (RetentionPolicy) String ¶
func (rp RetentionPolicy) String() string
func (*RetentionPolicy) UnmarshalJSON ¶
func (rp *RetentionPolicy) UnmarshalJSON(data []byte) error
type SequenceInfo ¶
type SequenceInfo struct { Consumer uint64 `json:"consumer_seq"` Stream uint64 `json:"stream_seq"` Last *time.Time `json:"last_active,omitempty"` }
SequenceInfo has both the consumer and the stream sequence and last activity.
type SequencePair ¶
type SequencePair struct { Consumer uint64 `json:"consumer_seq"` Stream uint64 `json:"stream_seq"` }
SequencePair includes the consumer and stream sequence info from a JetStream consumer.
type SignatureHandler ¶
SignatureHandler is used to sign a nonce from the server while authenticating with nkeys. The user should sign the nonce and return the raw signature. The client will base64 encode this to send to the server.
type Statistics ¶
type Statistics struct { InMsgs uint64 OutMsgs uint64 InBytes uint64 OutBytes uint64 Reconnects uint64 }
Tracks various stats received and sent on this connection, including counts for messages and bytes.
type StorageType ¶
type StorageType int
StorageType determines how messages are stored for retention.
const ( // FileStorage specifies on disk storage. It's the default. FileStorage StorageType = iota // MemoryStorage specifies in memory only. MemoryStorage )
func (StorageType) MarshalJSON ¶
func (st StorageType) MarshalJSON() ([]byte, error)
func (StorageType) String ¶
func (st StorageType) String() string
func (*StorageType) UnmarshalJSON ¶
func (st *StorageType) UnmarshalJSON(data []byte) error
type StreamConfig ¶
type StreamConfig struct { Name string `json:"name"` Description string `json:"description,omitempty"` Subjects []string `json:"subjects,omitempty"` Retention RetentionPolicy `json:"retention"` MaxConsumers int `json:"max_consumers"` MaxMsgs int64 `json:"max_msgs"` MaxBytes int64 `json:"max_bytes"` Discard DiscardPolicy `json:"discard"` MaxAge time.Duration `json:"max_age"` MaxMsgsPerSubject int64 `json:"max_msgs_per_subject"` MaxMsgSize int32 `json:"max_msg_size,omitempty"` Storage StorageType `json:"storage"` Replicas int `json:"num_replicas"` NoAck bool `json:"no_ack,omitempty"` Template string `json:"template_owner,omitempty"` Duplicates time.Duration `json:"duplicate_window,omitempty"` Placement *Placement `json:"placement,omitempty"` Mirror *StreamSource `json:"mirror,omitempty"` Sources []*StreamSource `json:"sources,omitempty"` }
StreamConfig will determine the properties for a stream. There are sensible defaults for most. If no subjects are given the name will be used as the only subject.
type StreamInfo ¶
type StreamInfo struct { Config StreamConfig `json:"config"` Created time.Time `json:"created"` State StreamState `json:"state"` Cluster *ClusterInfo `json:"cluster,omitempty"` Mirror *StreamSourceInfo `json:"mirror,omitempty"` Sources []*StreamSourceInfo `json:"sources,omitempty"` }
StreamInfo shows config and current state for this stream.
type StreamSource ¶
type StreamSource struct { Name string `json:"name"` OptStartSeq uint64 `json:"opt_start_seq,omitempty"` OptStartTime *time.Time `json:"opt_start_time,omitempty"` FilterSubject string `json:"filter_subject,omitempty"` External *ExternalStream `json:"external,omitempty"` }
StreamSource dictates how streams can source from other streams.
type StreamSourceInfo ¶
type StreamSourceInfo struct { Name string `json:"name"` Lag uint64 `json:"lag"` Active time.Duration `json:"active"` }
StreamSourceInfo shows information about an upstream stream source.
type StreamState ¶
type StreamState struct { Msgs uint64 `json:"messages"` Bytes uint64 `json:"bytes"` FirstSeq uint64 `json:"first_seq"` FirstTime time.Time `json:"first_ts"` LastSeq uint64 `json:"last_seq"` LastTime time.Time `json:"last_ts"` Consumers int `json:"consumer_count"` }
StreamState is information about the given stream.
type SubOpt ¶
type SubOpt interface {
// contains filtered or unexported methods
}
SubOpt configures options for subscribing to JetStream consumers.
Example ¶
nc, err := nats.Connect("localhost") if err != nil { log.Fatal(err) } // Create JetStream context to produce/consumer messages that will be persisted. js, err := nc.JetStream() if err != nil { log.Fatal(err) } // Auto-ack each individual message. js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }) // Auto-ack current sequence and all below. js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.AckAll()) // Auto-ack each individual message. js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.AckExplicit()) // Acks are not required. js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.AckNone()) // Manually acknowledge messages. js.Subscribe("foo", func(msg *nats.Msg) { msg.Ack() }, nats.ManualAck()) // Bind to an existing stream. sub, _ := js.SubscribeSync("origin", nats.BindStream("m1")) msg, _ := sub.NextMsg(2 * time.Second) msg.Ack() // Deliver all messages from the beginning. js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.DeliverAll()) // Deliver messages starting from the last one. js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.DeliverLast()) // Deliver only new messages that arrive after subscription. js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.DeliverNew()) // Create durable consumer FOO, if it doesn't exist. js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.Durable("FOO")) // Create consumer on Foo with flow control and heartbeats. js.SubscribeSync("foo", // Redeliver after 30s nats.AckWait(30*time.Second), // Redeliver only once nats.MaxDeliver(1), // Activate Flow control algorithm from the server. nats.EnableFlowControl(), // Track heartbeats from the server fro missed sequences. nats.IdleHeartbeat(500*time.Millisecond), ) // Set the allowable number of outstanding acks. js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.MaxAckPending(5)) // Set the number of redeliveries for a message. js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.MaxDeliver(5)) // Set the number the max inflight pull requests. js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.PullMaxWaiting(5)) // Set the number the max inflight pull requests. js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.PullMaxWaiting(5)) // Set the rate limit on a push consumer. js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.RateLimit(1024)) // Replay messages at original speed, instead of as fast as possible. js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.ReplayOriginal()) // Start delivering messages at a given sequence. js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.StartSequence(10)) // Start delivering messages at a given time. js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.StartTime(time.Now().Add(-2*time.Hour)))
Output:
func AckAll ¶
func AckAll() SubOpt
AckAll when acking a sequence number, this implicitly acks all sequences below this one as well.
func Bind ¶
Bind binds a subscription to an existing consumer from a stream without attempting to create. The first argument is the stream name and the second argument will be the consumer name.
func BindStream ¶
BindStream binds a consumer to a stream explicitly based on a name. When a stream name is not specified, the library uses the subscribe subject as a way to find the stream name. It is done by making a request to the server to get list of stream names that have a fileter for this subject. If the returned list contains a single stream, then this stream name will be used, otherwise the `ErrNoMatchingStream` is returned. To avoid the stream lookup, provide the stream name with this function. See also `Bind()`.
func DeliverAll ¶
func DeliverAll() SubOpt
DeliverAll will configure a Consumer to receive all the messages from a Stream.
func DeliverLast ¶
func DeliverLast() SubOpt
DeliverLast configures a Consumer to receive messages starting with the latest one.
func DeliverLastPerSubject ¶
func DeliverLastPerSubject() SubOpt
DeliverLastPerSubject configures a Consumer to receive messages starting with the latest one for each filtered subject.
func DeliverNew ¶
func DeliverNew() SubOpt
DeliverNew configures a Consumer to receive messages published after the subscription.
func DeliverSubject ¶
DeliverSubject specifies the JetStream consumer deliver subject.
This option is used only in situations where the consumer does not exist and a creation request is sent to the server. If not provided, an inbox will be selected. If a consumer exists, then the NATS subscription will be created on the JetStream consumer's DeliverSubject, not necessarily this subject.
func Description ¶
Description will set the description for the created consumer.
func EnableFlowControl ¶
func EnableFlowControl() SubOpt
EnableFlowControl enables flow control for a push based consumer.
func IdleHeartbeat ¶
IdleHeartbeat enables push based consumers to have idle heartbeats delivered.
func ManualAck ¶
func ManualAck() SubOpt
ManualAck disables auto ack functionality for async subscriptions.
func MaxAckPending ¶
MaxAckPending sets the number of outstanding acks that are allowed before message delivery is halted.
func MaxDeliver ¶
MaxDeliver sets the number of redeliveries for a message.
func OrderedConsumer ¶
func OrderedConsumer() SubOpt
OrderedConsumer will create a fifo direct/ephemeral consumer for in order delivery of messages. There are no redeliveries and no acks, and flow control and heartbeats will be added but will be taken care of without additional client code.
func PullMaxWaiting ¶
PullMaxWaiting defines the max inflight pull requests.
func ReplayInstant ¶
func ReplayInstant() SubOpt
ReplayInstant replays the messages as fast as possible.
func ReplayOriginal ¶
func ReplayOriginal() SubOpt
ReplayOriginal replays the messages at the original speed.
func StartSequence ¶
StartSequence configures a Consumer to receive messages from a start sequence.
type Subscription ¶
type Subscription struct { // Subject that represents this subscription. This can be different // than the received subject inside a Msg if this is a wildcard. Subject string // Optional queue group name. If present, all subscriptions with the // same name will form a distributed queue, and each message will // only be processed by one member of the group. Queue string // contains filtered or unexported fields }
Subscription represents interest in a given subject.
func (*Subscription) AutoUnsubscribe ¶
func (s *Subscription) AutoUnsubscribe(max int) error
AutoUnsubscribe will issue an automatic Unsubscribe that is processed by the server when max messages have been received. This can be useful when sending a request to an unknown number of subscribers.
Example ¶
nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() received, wanted, total := 0, 10, 100 sub, _ := nc.Subscribe("foo", func(_ *nats.Msg) { received++ }) sub.AutoUnsubscribe(wanted) for i := 0; i < total; i++ { nc.Publish("foo", []byte("Hello")) } nc.Flush() fmt.Printf("Received = %d", received)
Output:
func (*Subscription) ClearMaxPending ¶
func (s *Subscription) ClearMaxPending() error
ClearMaxPending resets the maximums seen so far.
func (*Subscription) ConsumerInfo ¶
func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error)
func (*Subscription) Delivered ¶
func (s *Subscription) Delivered() (int64, error)
Delivered returns the number of delivered messages for this subscription.
func (*Subscription) Drain ¶
func (s *Subscription) Drain() error
Drain will remove interest but continue callbacks until all messages have been processed.
For a JetStream subscription, if the library has created the JetStream consumer, the library will send a DeleteConsumer request to the server when the Drain operation completes. If a failure occurs when deleting the JetStream consumer, an error will be reported to the asynchronous error callback. If you do not wish the JetStream consumer to be automatically deleted, ensure that the consumer is not created by the library, which means create the consumer with AddConsumer and bind to this consumer.
func (*Subscription) Dropped ¶
func (s *Subscription) Dropped() (int, error)
Dropped returns the number of known dropped messages for this subscription. This will correspond to messages dropped by violations of PendingLimits. If the server declares the connection a SlowConsumer, this number may not be valid.
func (*Subscription) Fetch ¶
func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error)
Fetch pulls a batch of messages from a stream for a pull consumer.
func (*Subscription) IsValid ¶
func (s *Subscription) IsValid() bool
IsValid returns a boolean indicating whether the subscription is still active. This will return false if the subscription has already been closed.
func (*Subscription) MaxPending ¶
func (s *Subscription) MaxPending() (int, int, error)
MaxPending returns the maximum number of queued messages and queued bytes seen so far.
func (*Subscription) NextMsg ¶
func (s *Subscription) NextMsg(timeout time.Duration) (*Msg, error)
NextMsg will return the next message available to a synchronous subscriber or block until one is available. An error is returned if the subscription is invalid (ErrBadSubscription), the connection is closed (ErrConnectionClosed), the timeout is reached (ErrTimeout), or if there were no responders (ErrNoResponders) when used in the context of a request/reply.
Example ¶
nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() sub, _ := nc.SubscribeSync("foo") m, err := sub.NextMsg(1 * time.Second) if err == nil { fmt.Printf("Received a message: %s\n", string(m.Data)) } else { fmt.Println("NextMsg timed out.") }
Output:
func (*Subscription) NextMsgWithContext ¶
func (s *Subscription) NextMsgWithContext(ctx context.Context) (*Msg, error)
NextMsgWithContext takes a context and returns the next message available to a synchronous subscriber, blocking until it is delivered or context gets canceled.
func (*Subscription) Pending ¶
func (s *Subscription) Pending() (int, int, error)
Pending returns the number of queued messages and queued bytes in the client for this subscription.
func (*Subscription) PendingLimits ¶
func (s *Subscription) PendingLimits() (int, int, error)
PendingLimits returns the current limits for this subscription. If no error is returned, a negative value indicates that the given metric is not limited.
func (*Subscription) QueuedMsgs ¶
func (s *Subscription) QueuedMsgs() (int, error)
Queued returns the number of queued messages in the client for this subscription. DEPRECATED: Use Pending()
func (*Subscription) SetPendingLimits ¶
func (s *Subscription) SetPendingLimits(msgLimit, bytesLimit int) error
SetPendingLimits sets the limits for pending msgs and bytes for this subscription. Zero is not allowed. Any negative value means that the given metric is not limited.
func (*Subscription) Type ¶
func (s *Subscription) Type() SubscriptionType
Type returns the type of Subscription.
func (*Subscription) Unsubscribe ¶
func (s *Subscription) Unsubscribe() error
Unsubscribe will remove interest in the given subject.
For a JetStream subscription, if the library has created the JetStream consumer, it will send a DeleteConsumer request to the server (if the unsubscribe itself was successful). If the delete operation fails, the error will be returned. If you do not wish the JetStream consumer to be automatically deleted, ensure that the consumer is not created by the library, which means create the consumer with AddConsumer and bind to this consumer (using the nats.Bind() option).
Example ¶
nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() sub, _ := nc.SubscribeSync("foo") // ... sub.Unsubscribe()
Output:
type UserJWTHandler ¶
UserJWTHandler is used to fetch and return the account signed JWT for this user.