Documentation ¶
Overview ¶
A Go client for the NATS messaging system (https://nats.io).
Index ¶
- Constants
- Variables
- func DecodeObjectDigest(data string) ([]byte, error)
- func GetObjectDigestValue(data hash.Hash) string
- func NewInbox() string
- func RegisterEncoder(encType string, enc Encoder)deprecated
- type APIError
- type APIStats
- type AccountInfo
- type AccountLimits
- type AckOpt
- type AckPolicy
- type AckWait
- type AuthTokenHandler
- type ClientTrace
- type ClusterInfo
- type Conn
- func (nc *Conn) AuthRequired() bool
- func (nc *Conn) Barrier(f func()) error
- func (nc *Conn) Buffered() (int, error)
- func (nc *Conn) ChanQueueSubscribe(subj, group string, ch chan *Msg) (*Subscription, error)
- func (nc *Conn) ChanSubscribe(subj string, ch chan *Msg) (*Subscription, error)
- func (nc *Conn) Close()
- func (nc *Conn) ClosedHandler() ConnHandler
- func (nc *Conn) ConnectedAddr() string
- func (nc *Conn) ConnectedClusterName() string
- func (nc *Conn) ConnectedServerId() string
- func (nc *Conn) ConnectedServerName() string
- func (nc *Conn) ConnectedServerVersion() string
- func (nc *Conn) ConnectedUrl() string
- func (nc *Conn) ConnectedUrlRedacted() string
- func (nc *Conn) DisconnectErrHandler() ConnErrHandler
- func (nc *Conn) DiscoveredServers() []string
- func (nc *Conn) DiscoveredServersHandler() ConnHandler
- func (nc *Conn) Drain() error
- func (nc *Conn) ErrorHandler() ErrHandler
- func (nc *Conn) Flush() error
- func (nc *Conn) FlushTimeout(timeout time.Duration) (err error)
- func (nc *Conn) FlushWithContext(ctx context.Context) error
- func (nc *Conn) ForceReconnect() error
- func (nc *Conn) GetClientID() (uint64, error)
- func (nc *Conn) GetClientIP() (net.IP, error)
- func (nc *Conn) HeadersSupported() bool
- func (nc *Conn) IsClosed() bool
- func (nc *Conn) IsConnected() bool
- func (nc *Conn) IsDraining() bool
- func (nc *Conn) IsReconnecting() bool
- func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error)
- func (nc *Conn) LastError() error
- func (nc *Conn) MaxPayload() int64
- func (nc *Conn) NewInbox() string
- func (nc *Conn) NewRespInbox() string
- func (nc *Conn) NumSubscriptions() int
- func (nc *Conn) Publish(subj string, data []byte) error
- func (nc *Conn) PublishMsg(m *Msg) error
- func (nc *Conn) PublishRequest(subj, reply string, data []byte) error
- func (nc *Conn) QueueSubscribe(subj, queue string, cb MsgHandler) (*Subscription, error)
- func (nc *Conn) QueueSubscribeSync(subj, queue string) (*Subscription, error)
- func (nc *Conn) QueueSubscribeSyncWithChan(subj, queue string, ch chan *Msg) (*Subscription, error)
- func (nc *Conn) RTT() (time.Duration, error)
- func (nc *Conn) ReconnectHandler() ConnHandler
- func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (*Msg, error)
- func (nc *Conn) RequestMsg(msg *Msg, timeout time.Duration) (*Msg, error)
- func (nc *Conn) RequestMsgWithContext(ctx context.Context, msg *Msg) (*Msg, error)
- func (nc *Conn) RequestWithContext(ctx context.Context, subj string, data []byte) (*Msg, error)
- func (nc *Conn) Servers() []string
- func (nc *Conn) SetClosedHandler(cb ConnHandler)
- func (nc *Conn) SetDisconnectErrHandler(dcb ConnErrHandler)
- func (nc *Conn) SetDisconnectHandler(dcb ConnHandler)
- func (nc *Conn) SetDiscoveredServersHandler(dscb ConnHandler)
- func (nc *Conn) SetErrorHandler(cb ErrHandler)
- func (nc *Conn) SetReconnectHandler(rcb ConnHandler)
- func (nc *Conn) Stats() Statistics
- func (nc *Conn) Status() Status
- func (nc *Conn) StatusChanged(statuses ...Status) chan Status
- func (nc *Conn) Subscribe(subj string, cb MsgHandler) (*Subscription, error)
- func (nc *Conn) SubscribeSync(subj string) (*Subscription, error)
- func (nc *Conn) TLSConnectionState() (tls.ConnectionState, error)
- func (nc *Conn) TLSRequired() bool
- type ConnErrHandler
- type ConnHandler
- type ConsumerConfig
- type ConsumerInfo
- type ContextOpt
- type CustomDialer
- type DeleteMarkersOlderThan
- type DeleteOpt
- type DeliverPolicy
- type DiscardPolicy
- type EncodedConndeprecated
- func (c *EncodedConn) BindRecvChan(subject string, channel any) (*Subscription, error)deprecated
- func (c *EncodedConn) BindRecvQueueChan(subject, queue string, channel any) (*Subscription, error)deprecated
- func (c *EncodedConn) BindSendChan(subject string, channel any) errordeprecated
- func (c *EncodedConn) Close()deprecated
- func (c *EncodedConn) Drain() errordeprecated
- func (c *EncodedConn) Flush() errordeprecated
- func (c *EncodedConn) FlushTimeout(timeout time.Duration) (err error)deprecated
- func (c *EncodedConn) LastError() errordeprecated
- func (c *EncodedConn) Publish(subject string, v any) errordeprecated
- func (c *EncodedConn) PublishRequest(subject, reply string, v any) errordeprecated
- func (c *EncodedConn) QueueSubscribe(subject, queue string, cb Handler) (*Subscription, error)deprecated
- func (c *EncodedConn) Request(subject string, v any, vPtr any, timeout time.Duration) errordeprecated
- func (c *EncodedConn) RequestWithContext(ctx context.Context, subject string, v any, vPtr any) errordeprecated
- func (c *EncodedConn) Subscribe(subject string, cb Handler) (*Subscription, error)deprecated
- type Encoderdeprecated
- type ErrConsumerSequenceMismatch
- type ErrHandler
- type ErrorCode
- type ExternalStream
- type GetObjectInfoOpt
- type GetObjectOpt
- type Handlerdeprecated
- type Header
- type InProcessConnProvider
- type JSOpt
- func APIPrefix(pre string) JSOpt
- func DirectGet() JSOpt
- func DirectGetNext(subject string) JSOpt
- func Domain(domain string) JSOpt
- func PublishAsyncErrHandler(cb MsgErrHandler) JSOpt
- func PublishAsyncMaxPending(max int) JSOpt
- func StreamListFilter(subject string) JSOpt
- func UseLegacyDurableConsumers() JSOpt
- type JetStream
- type JetStreamContext
- type JetStreamError
- type JetStreamManager
- type KeyLister
- type KeyValue
- type KeyValueBucketStatus
- func (s *KeyValueBucketStatus) BackingStore() string
- func (s *KeyValueBucketStatus) Bucket() string
- func (s *KeyValueBucketStatus) Bytes() uint64
- func (s *KeyValueBucketStatus) History() int64
- func (s *KeyValueBucketStatus) IsCompressed() bool
- func (s *KeyValueBucketStatus) StreamInfo() *StreamInfo
- func (s *KeyValueBucketStatus) TTL() time.Duration
- func (s *KeyValueBucketStatus) Values() uint64
- type KeyValueConfig
- type KeyValueEntry
- type KeyValueManager
- type KeyValueOp
- type KeyValueStatus
- type KeyWatcher
- type ListObjectsOpt
- type MaxWait
- type MessageBatch
- type Msg
- func (m *Msg) Ack(opts ...AckOpt) error
- func (m *Msg) AckSync(opts ...AckOpt) error
- func (m *Msg) Equal(msg *Msg) bool
- func (m *Msg) InProgress(opts ...AckOpt) error
- func (m *Msg) Metadata() (*MsgMetadata, error)
- func (m *Msg) Nak(opts ...AckOpt) error
- func (m *Msg) NakWithDelay(delay time.Duration, opts ...AckOpt) error
- func (m *Msg) Respond(data []byte) error
- func (m *Msg) RespondMsg(msg *Msg) error
- func (m *Msg) Size() int
- func (m *Msg) Term(opts ...AckOpt) error
- type MsgErrHandler
- type MsgHandler
- type MsgMetadata
- type ObjectBucketStatus
- func (s *ObjectBucketStatus) BackingStore() string
- func (s *ObjectBucketStatus) Bucket() string
- func (s *ObjectBucketStatus) Description() string
- func (s *ObjectBucketStatus) IsCompressed() bool
- func (s *ObjectBucketStatus) Metadata() map[string]string
- func (s *ObjectBucketStatus) Replicas() int
- func (s *ObjectBucketStatus) Sealed() bool
- func (s *ObjectBucketStatus) Size() uint64
- func (s *ObjectBucketStatus) Storage() StorageType
- func (s *ObjectBucketStatus) StreamInfo() *StreamInfo
- func (s *ObjectBucketStatus) TTL() time.Duration
- type ObjectInfo
- type ObjectLink
- type ObjectMeta
- type ObjectMetaOptions
- type ObjectOpt
- type ObjectResult
- type ObjectStore
- type ObjectStoreConfig
- type ObjectStoreManager
- type ObjectStoreStatus
- type ObjectWatcher
- type Option
- func ClientCert(certFile, keyFile string) Option
- func ClientTLSConfig(certCB TLSCertHandler, rootCAsCB RootCAsHandler) Option
- func ClosedHandler(cb ConnHandler) Option
- func Compression(enabled bool) Option
- func ConnectHandler(cb ConnHandler) Option
- func CustomInboxPrefix(p string) Option
- func CustomReconnectDelay(cb ReconnectDelayHandler) Option
- func Dialer(dialer *net.Dialer) Option
- func DisconnectErrHandler(cb ConnErrHandler) Option
- func DisconnectHandler(cb ConnHandler) Option
- func DiscoveredServersHandler(cb ConnHandler) Option
- func DontRandomize() Option
- func DrainTimeout(t time.Duration) Option
- func ErrorHandler(cb ErrHandler) Option
- func FlusherTimeout(t time.Duration) Option
- func IgnoreAuthErrorAbort() Option
- func InProcessServer(server InProcessConnProvider) Option
- func LameDuckModeHandler(cb ConnHandler) Option
- func MaxPingsOutstanding(max int) Option
- func MaxReconnects(max int) Option
- func Name(name string) Option
- func Nkey(pubKey string, sigCB SignatureHandler) Option
- func NkeyOptionFromSeed(seedFile string) (Option, error)
- func NoCallbacksAfterClientClose() Option
- func NoEcho() Option
- func NoReconnect() Option
- func PingInterval(t time.Duration) Option
- func ProxyPath(path string) Option
- func ReconnectBufSize(size int) Option
- func ReconnectHandler(cb ConnHandler) Option
- func ReconnectJitter(jitter, jitterForTLS time.Duration) Option
- func ReconnectWait(t time.Duration) Option
- func RetryOnFailedConnect(retry bool) Option
- func RootCAs(file ...string) Option
- func Secure(tls ...*tls.Config) Option
- func SetCustomDialer(dialer CustomDialer) Option
- func SkipHostLookup() Option
- func SyncQueueLen(max int) Option
- func TLSHandshakeFirst() Option
- func Timeout(t time.Duration) Option
- func Token(token string) Option
- func TokenHandler(cb AuthTokenHandler) Option
- func UseOldRequestStyle() Option
- func UserCredentials(userOrChainedFile string, seedFiles ...string) Option
- func UserInfo(user, password string) Option
- func UserInfoHandler(cb UserInfoCB) Option
- func UserJWT(userCB UserJWTHandler, sigCB SignatureHandler) Option
- func UserJWTAndSeed(jwt string, seed string) Option
- type Options
- type PeerInfo
- type Placement
- type PubAck
- type PubAckFuture
- type PubOpt
- func ExpectLastMsgId(id string) PubOpt
- func ExpectLastSequence(seq uint64) PubOpt
- func ExpectLastSequencePerSubject(seq uint64) PubOpt
- func ExpectStream(stream string) PubOpt
- func MsgId(id string) PubOpt
- func RetryAttempts(num int) PubOpt
- func RetryWait(dur time.Duration) PubOpt
- func StallWait(ttl time.Duration) PubOpt
- type PullHeartbeat
- type PullMaxBytes
- type PullOpt
- type PurgeOpt
- type RawStreamMsg
- type RePublish
- type ReconnectDelayHandler
- type ReplayPolicy
- type RetentionPolicy
- type RootCAsHandler
- type SequenceInfo
- type SequencePair
- type SignatureHandler
- type Statistics
- type Status
- type StorageType
- type StoreCompression
- type StreamAlternate
- type StreamConfig
- type StreamConsumerLimits
- type StreamInfo
- type StreamInfoRequest
- type StreamPurgeRequest
- type StreamSource
- type StreamSourceInfo
- type StreamState
- type SubOpt
- func AckAll() SubOpt
- func AckExplicit() SubOpt
- func AckNone() SubOpt
- func BackOff(backOff []time.Duration) SubOpt
- func Bind(stream, consumer string) SubOpt
- func BindStream(stream string) SubOpt
- func ConsumerFilterSubjects(subjects ...string) SubOpt
- func ConsumerMemoryStorage() SubOpt
- func ConsumerName(name string) SubOpt
- func ConsumerReplicas(replicas int) SubOpt
- func DeliverAll() SubOpt
- func DeliverLast() SubOpt
- func DeliverLastPerSubject() SubOpt
- func DeliverNew() SubOpt
- func DeliverSubject(subject string) SubOpt
- func Description(description string) SubOpt
- func Durable(consumer string) SubOpt
- func EnableFlowControl() SubOpt
- func HeadersOnly() SubOpt
- func IdleHeartbeat(duration time.Duration) SubOpt
- func InactiveThreshold(threshold time.Duration) SubOpt
- func ManualAck() SubOpt
- func MaxAckPending(n int) SubOpt
- func MaxDeliver(n int) SubOpt
- func MaxRequestBatch(max int) SubOpt
- func MaxRequestExpires(max time.Duration) SubOpt
- func MaxRequestMaxBytes(bytes int) SubOpt
- func OrderedConsumer() SubOpt
- func PullMaxWaiting(n int) SubOpt
- func RateLimit(n uint64) SubOpt
- func ReplayInstant() SubOpt
- func ReplayOriginal() SubOpt
- func SkipConsumerLookup() SubOpt
- func StartSequence(seq uint64) SubOpt
- func StartTime(startTime time.Time) SubOpt
- type SubStatus
- type SubjectTransformConfig
- type Subscription
- func (s *Subscription) AutoUnsubscribe(max int) error
- func (s *Subscription) ClearMaxPending() error
- func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error)
- func (s *Subscription) Delivered() (int64, error)
- func (s *Subscription) Drain() error
- func (s *Subscription) Dropped() (int, error)
- func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error)
- func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, error)
- func (sub *Subscription) InitialConsumerPending() (uint64, error)
- func (s *Subscription) IsDraining() bool
- func (s *Subscription) IsValid() bool
- func (s *Subscription) MaxPending() (int, int, error)
- func (s *Subscription) NextMsg(timeout time.Duration) (*Msg, error)
- func (s *Subscription) NextMsgWithContext(ctx context.Context) (*Msg, error)
- func (s *Subscription) Pending() (int, int, error)
- func (s *Subscription) PendingLimits() (int, int, error)
- func (s *Subscription) QueuedMsgs() (int, error)deprecated
- func (s *Subscription) SetClosedHandler(handler func(subject string))
- func (s *Subscription) SetPendingLimits(msgLimit, bytesLimit int) error
- func (s *Subscription) StatusChanged(statuses ...SubStatus) <-chan SubStatus
- func (s *Subscription) Type() SubscriptionType
- func (s *Subscription) Unsubscribe() error
- type SubscriptionType
- type TLSCertHandler
- type Tier
- type UserInfoCB
- type UserJWTHandler
- type WatchOpt
Examples ¶
- AckOpt
- AckWait
- Conn.Close
- Conn.Flush
- Conn.FlushTimeout
- Conn.ForceReconnect
- Conn.Publish
- Conn.PublishMsg
- Conn.QueueSubscribe
- Conn.Request
- Conn.Subscribe
- Conn.SubscribeSync
- Connect
- Context
- CustomDialer
- JSOpt
- JetStream
- JetStreamContext
- JetStreamManager
- MaxWait
- Msg.AckSync
- Msg.Metadata
- PubOpt
- PullOpt
- SubOpt
- Subscription.AutoUnsubscribe
- Subscription.NextMsg
- Subscription.Unsubscribe
Constants ¶
const ( JSON_ENCODER = "json" GOB_ENCODER = "gob" DEFAULT_ENCODER = "default" )
Indexed names into the Registered Encoders.
const ( // Default time wait between retries on Publish iff err is NoResponders. DefaultPubRetryWait = 250 * time.Millisecond // Default number of retries DefaultPubRetryAttempts = 2 )
Request API subjects for JetStream.
const ( MsgIdHdr = "Nats-Msg-Id" ExpectedStreamHdr = "Nats-Expected-Stream" ExpectedLastSeqHdr = "Nats-Expected-Last-Sequence" ExpectedLastSubjSeqHdr = "Nats-Expected-Last-Subject-Sequence" ExpectedLastMsgIdHdr = "Nats-Expected-Last-Msg-Id" MsgRollup = "Nats-Rollup" )
Headers for published messages.
const ( JSStream = "Nats-Stream" JSSequence = "Nats-Sequence" JSTimeStamp = "Nats-Time-Stamp" JSSubject = "Nats-Subject" JSLastSequence = "Nats-Last-Sequence" )
Headers for republished messages and direct gets.
const ( MsgRollupSubject = "sub" MsgRollupAll = "all" )
Rollups, can be subject only or all messages.
const ( KeyValueMaxHistory = 64 AllKeys = ">" )
Used to watch all keys.
const ( Version = "1.37.0" DefaultURL = "nats://127.0.0.1:4222" DefaultPort = 4222 DefaultMaxReconnect = 60 DefaultReconnectWait = 2 * time.Second DefaultReconnectJitter = 100 * time.Millisecond DefaultReconnectJitterTLS = time.Second DefaultTimeout = 2 * time.Second DefaultPingInterval = 2 * time.Minute DefaultMaxPingOut = 2 DefaultMaxChanLen = 64 * 1024 // 64k DefaultReconnectBufSize = 8 * 1024 * 1024 // 8MB RequestChanLen = 8 DefaultDrainTimeout = 30 * time.Second DefaultFlusherTimeout = time.Minute LangString = "go" )
Default Constants
const ( // STALE_CONNECTION is for detection and proper handling of stale connections. STALE_CONNECTION = "stale connection" // PERMISSIONS_ERR is for when nats server subject authorization has failed. PERMISSIONS_ERR = "permissions violation" // AUTHORIZATION_ERR is for when nats server user authorization has failed. AUTHORIZATION_ERR = "authorization violation" // AUTHENTICATION_EXPIRED_ERR is for when nats server user authorization has expired. AUTHENTICATION_EXPIRED_ERR = "user authentication expired" // AUTHENTICATION_REVOKED_ERR is for when user authorization has been revoked. AUTHENTICATION_REVOKED_ERR = "user authentication revoked" // ACCOUNT_AUTHENTICATION_EXPIRED_ERR is for when nats server account authorization has expired. ACCOUNT_AUTHENTICATION_EXPIRED_ERR = "account authentication expired" // MAX_CONNECTIONS_ERR is for when nats server denies the connection due to server max_connections limit MAX_CONNECTIONS_ERR = "maximum connections exceeded" // MAX_SUBSCRIPTIONS_ERR is for when nats server denies the connection due to server subscriptions limit MAX_SUBSCRIPTIONS_ERR = "maximum subscriptions exceeded" )
const ( DISCONNECTED = Status(iota) CONNECTED CLOSED RECONNECTING CONNECTING DRAINING_SUBS DRAINING_PUBS )
const ( SubscriptionActive = SubStatus(iota) SubscriptionDraining SubscriptionClosed SubscriptionSlowConsumer )
const ( AsyncSubscription = SubscriptionType(iota) SyncSubscription ChanSubscription NilSubscription PullSubscription )
The different types of subscription types.
const ( // DefaultSubPendingMsgsLimit will be 512k msgs. DefaultSubPendingMsgsLimit = 512 * 1024 // DefaultSubPendingBytesLimit is 64MB DefaultSubPendingBytesLimit = 64 * 1024 * 1024 )
Pending Limits
const ( OP_START = iota OP_PLUS OP_PLUS_O OP_PLUS_OK OP_MINUS OP_MINUS_E OP_MINUS_ER OP_MINUS_ERR OP_MINUS_ERR_SPC MINUS_ERR_ARG OP_M OP_MS OP_MSG OP_MSG_SPC MSG_ARG MSG_PAYLOAD MSG_END OP_H OP_P OP_PI OP_PIN OP_PING OP_PO OP_PON OP_PONG OP_I OP_IN OP_INF OP_INFO OP_INFO_SPC INFO_ARG )
const (
InboxPrefix = "_INBOX."
)
InboxPrefix is the prefix for all inbox subjects.
const MAX_CONTROL_LINE_SIZE = 4096
const MsgSize = "Nats-Msg-Size"
MsgSize is a header that will be part of a consumer's delivered message if HeadersOnly requested.
Variables ¶
var ( ErrKeyValueConfigRequired = errors.New("nats: config required") ErrInvalidBucketName = errors.New("nats: invalid bucket name") ErrInvalidKey = errors.New("nats: invalid key") ErrBucketNotFound = errors.New("nats: bucket not found") ErrBadBucket = errors.New("nats: bucket not valid key-value store") ErrKeyNotFound = errors.New("nats: key not found") ErrKeyDeleted = errors.New("nats: key was deleted") ErrHistoryToLarge = errors.New("nats: history limited to a max of 64") ErrNoKeysFound = errors.New("nats: no keys found") )
Errors
var ( ErrConnectionClosed = errors.New("nats: connection closed") ErrConnectionDraining = errors.New("nats: connection draining") ErrDrainTimeout = errors.New("nats: draining connection timed out") ErrConnectionReconnecting = errors.New("nats: connection reconnecting") ErrSecureConnRequired = errors.New("nats: secure connection required") ErrSecureConnWanted = errors.New("nats: secure connection not available") ErrBadSubscription = errors.New("nats: invalid subscription") ErrTypeSubscription = errors.New("nats: invalid subscription type") ErrBadSubject = errors.New("nats: invalid subject") ErrBadQueueName = errors.New("nats: invalid queue name") ErrSlowConsumer = errors.New("nats: slow consumer, messages dropped") ErrTimeout = errors.New("nats: timeout") ErrBadTimeout = errors.New("nats: timeout invalid") ErrAuthorization = errors.New("nats: authorization violation") ErrAuthExpired = errors.New("nats: authentication expired") ErrAuthRevoked = errors.New("nats: authentication revoked") ErrAccountAuthExpired = errors.New("nats: account authentication expired") ErrNoServers = errors.New("nats: no servers available for connection") ErrJsonParse = errors.New("nats: connect message, json parse error") ErrChanArg = errors.New("nats: argument needs to be a channel type") ErrMaxPayload = errors.New("nats: maximum payload exceeded") ErrMaxMessages = errors.New("nats: maximum messages delivered") ErrSyncSubRequired = errors.New("nats: illegal call on an async subscription") ErrMultipleTLSConfigs = errors.New("nats: multiple tls.Configs not allowed") ErrClientCertOrRootCAsRequired = errors.New("nats: at least one of certCB or rootCAsCB must be set") ErrNoInfoReceived = errors.New("nats: protocol exception, INFO not received") ErrReconnectBufExceeded = errors.New("nats: outbound buffer limit exceeded") ErrInvalidConnection = errors.New("nats: invalid connection") ErrInvalidMsg = errors.New("nats: invalid message or message nil") ErrInvalidArg = errors.New("nats: invalid argument") ErrInvalidContext = errors.New("nats: invalid context") ErrNoDeadlineContext = errors.New("nats: context requires a deadline") ErrNoEchoNotSupported = errors.New("nats: no echo option not supported by this server") ErrClientIDNotSupported = errors.New("nats: client ID not supported by this server") ErrUserButNoSigCB = errors.New("nats: user callback defined without a signature handler") ErrNkeyButNoSigCB = errors.New("nats: nkey defined without a signature handler") ErrNoUserCB = errors.New("nats: user callback not defined") ErrNkeyAndUser = errors.New("nats: user callback and nkey defined") ErrNkeysNotSupported = errors.New("nats: nkeys not supported by the server") ErrStaleConnection = errors.New("nats: " + STALE_CONNECTION) ErrTokenAlreadySet = errors.New("nats: token and token handler both set") ErrUserInfoAlreadySet = errors.New("nats: cannot set user info callback and user/pass") ErrMsgNotBound = errors.New("nats: message is not bound to subscription/connection") ErrMsgNoReply = errors.New("nats: message does not have a reply") ErrClientIPNotSupported = errors.New("nats: client IP not supported by this server") ErrDisconnected = errors.New("nats: server is disconnected") ErrHeadersNotSupported = errors.New("nats: headers not supported by this server") ErrBadHeaderMsg = errors.New("nats: message could not decode headers") ErrNoResponders = errors.New("nats: no responders available for request") ErrMaxConnectionsExceeded = errors.New("nats: server maximum connections exceeded") ErrConnectionNotTLS = errors.New("nats: connection is not tls") ErrMaxSubscriptionsExceeded = errors.New("nats: server maximum subscriptions exceeded") )
Errors
var ( ErrObjectConfigRequired = errors.New("nats: object-store config required") ErrBadObjectMeta = errors.New("nats: object-store meta information invalid") ErrObjectNotFound = errors.New("nats: object not found") ErrInvalidStoreName = errors.New("nats: invalid object-store name") ErrDigestMismatch = errors.New("nats: received a corrupt object, digests do not match") ErrInvalidDigestFormat = errors.New("nats: object digest hash has invalid format") ErrNoObjectsFound = errors.New("nats: no objects found") ErrObjectAlreadyExists = errors.New("nats: an object already exists with that name") ErrNameRequired = errors.New("nats: name is required") ErrNeeds262 = errors.New("nats: object-store requires at least server version 2.6.2") ErrLinkNotAllowed = errors.New("nats: link cannot be set when putting the object in bucket") ErrObjectRequired = errors.New("nats: object required") ErrNoLinkToDeleted = errors.New("nats: not allowed to link to a deleted object") ErrNoLinkToLink = errors.New("nats: not allowed to link to another link") ErrCantGetBucket = errors.New("nats: invalid Get, object is a link to a bucket") ErrBucketRequired = errors.New("nats: bucket required") ErrBucketMalformed = errors.New("nats: bucket malformed") ErrUpdateMetaDeleted = errors.New("nats: cannot update meta for a deleted object") )
var DefaultOptions = GetDefaultOptions()
Deprecated: Use GetDefaultOptions() instead. DefaultOptions is not safe for use by multiple clients. For details see #308.
Functions ¶
func DecodeObjectDigest ¶
DecodeObjectDigest decodes base64 hash
func GetObjectDigestValue ¶
GetObjectDigestValue calculates the base64 value of hashed data
func NewInbox ¶
func NewInbox() string
NewInbox will return an inbox string which can be used for directed replies from subscribers. These are guaranteed to be unique, but can be shared and subscribed to by others.
func RegisterEncoder
deprecated
Types ¶
type APIError ¶
type APIError struct { Code int `json:"code"` ErrorCode ErrorCode `json:"err_code"` Description string `json:"description,omitempty"` }
APIError is included in all API responses if there was an error.
type AccountInfo ¶
type AccountInfo struct { Tier Domain string `json:"domain"` API APIStats `json:"api"` Tiers map[string]Tier `json:"tiers"` }
AccountInfo contains info about the JetStream usage from the current account.
type AccountLimits ¶
type AccountLimits struct { MaxMemory int64 `json:"max_memory"` MaxStore int64 `json:"max_storage"` MaxStreams int `json:"max_streams"` MaxConsumers int `json:"max_consumers"` MaxAckPending int `json:"max_ack_pending"` MemoryMaxStreamBytes int64 `json:"memory_max_stream_bytes"` StoreMaxStreamBytes int64 `json:"storage_max_stream_bytes"` MaxBytesRequired bool `json:"max_bytes_required"` }
AccountLimits includes the JetStream limits of the current account.
type AckOpt ¶
type AckOpt interface {
// contains filtered or unexported methods
}
AckOpt are the options that can be passed when acknowledge a message.
Example ¶
AckOpt are the options that can be passed when acknowledge a message.
nc, err := nats.Connect("localhost") if err != nil { log.Fatal(err) } // Create JetStream context to produce/consumer messages that will be persisted. js, err := nc.JetStream() if err != nil { log.Fatal(err) } // Create stream to persist messages published on 'foo'. js.AddStream(&nats.StreamConfig{ Name: "FOO", Subjects: []string{"foo"}, }) // Publish is synchronous by default, and waits for a PubAck response. js.Publish("foo", []byte("Hello JS!")) sub, _ := js.SubscribeSync("foo") msg, _ := sub.NextMsg(2 * time.Second) // Ack and wait for 2 seconds msg.InProgress(nats.AckWait(2)) // Using a context. ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() msg.Ack(nats.Context(ctx))
Output:
type AckPolicy ¶
type AckPolicy int
AckPolicy determines how the consumer should acknowledge delivered messages.
func (AckPolicy) MarshalJSON ¶
func (*AckPolicy) UnmarshalJSON ¶
type AckWait ¶
AckWait sets the maximum amount of time we will wait for an ack.
Example ¶
nc, _ := nats.Connect("localhost") js, _ := nc.JetStream() // Set custom timeout for a JetStream API request. js.AddStream(&nats.StreamConfig{ Name: "FOO", Subjects: []string{"foo"}, }) // Wait for an ack response for 2 seconds. js.Publish("foo", []byte("Hello JS!"), nats.AckWait(2*time.Second)) // Create consumer on 'foo' subject that waits for an ack for 10s, // after which the message will be delivered. sub, _ := js.SubscribeSync("foo", nats.AckWait(10*time.Second)) msg, _ := sub.NextMsg(2 * time.Second) // Wait for ack of ack for 2s. msg.AckSync(nats.AckWait(2 * time.Second))
Output:
type AuthTokenHandler ¶
type AuthTokenHandler func() string
AuthTokenHandler is used to generate a new token.
type ClientTrace ¶
type ClientTrace struct { RequestSent func(subj string, payload []byte) ResponseReceived func(subj string, payload []byte, hdr Header) }
ClientTrace can be used to trace API interactions for the JetStream Context.
type ClusterInfo ¶
type ClusterInfo struct { Name string `json:"name,omitempty"` Leader string `json:"leader,omitempty"` Replicas []*PeerInfo `json:"replicas,omitempty"` }
ClusterInfo shows information about the underlying set of servers that make up the stream or consumer.
type Conn ¶
type Conn struct { // Keep all members for which we use atomic at the beginning of the // struct and make sure they are all 64bits (or use padding if necessary). // atomic.* functions crash on 32bit machines if operand is not aligned // at 64bit. See https://github.com/golang/go/issues/599 Statistics // Opts holds the configuration of the Conn. // Modifying the configuration of a running Conn is a race. Opts Options // contains filtered or unexported fields }
A Conn represents a bare connection to a nats-server. It can send and receive []byte payloads. The connection is safe to use in multiple Go routines concurrently.
func Connect ¶
Connect will attempt to connect to the NATS system. The url can contain username/password semantics. e.g. nats://derek:pass@localhost:4222 Comma separated arrays are also supported, e.g. urlA, urlB. Options start with the defaults but can be overridden. To connect to a NATS Server's websocket port, use the `ws` or `wss` scheme, such as `ws://localhost:8080`. Note that websocket schemes cannot be mixed with others (nats/tls).
Example ¶
Shows different ways to create a Conn.
nc, _ := nats.Connect("demo.nats.io") nc.Close() nc, _ = nats.Connect("nats://derek:secretpassword@demo.nats.io:4222") nc.Close() nc, _ = nats.Connect("tls://derek:secretpassword@demo.nats.io:4443") nc.Close() opts := nats.Options{ AllowReconnect: true, MaxReconnect: 10, ReconnectWait: 5 * time.Second, Timeout: 1 * time.Second, } nc, _ = opts.Connect() nc.Close()
Output:
func (*Conn) AuthRequired ¶
AuthRequired will return if the connected server requires authorization.
func (*Conn) Barrier ¶
Barrier schedules the given function `f` to all registered asynchronous subscriptions. Only the last subscription to see this barrier will invoke the function. If no subscription is registered at the time of this call, `f()` is invoked right away. ErrConnectionClosed is returned if the connection is closed prior to the call.
func (*Conn) Buffered ¶
Buffered will return the number of bytes buffered to be sent to the server. FIXME(dlc) take into account disconnected state.
func (*Conn) ChanQueueSubscribe ¶
func (nc *Conn) ChanQueueSubscribe(subj, group string, ch chan *Msg) (*Subscription, error)
ChanQueueSubscribe will express interest in the given subject. All subscribers with the same queue name will form the queue group and only one member of the group will be selected to receive any given message, which will be placed on the channel. You should not close the channel until sub.Unsubscribe() has been called. Note: This is the same than QueueSubscribeSyncWithChan.
func (*Conn) ChanSubscribe ¶
func (nc *Conn) ChanSubscribe(subj string, ch chan *Msg) (*Subscription, error)
ChanSubscribe will express interest in the given subject and place all messages received on the channel. You should not close the channel until sub.Unsubscribe() has been called.
func (*Conn) Close ¶
func (nc *Conn) Close()
Close will close the connection to the server. This call will release all blocking calls, such as Flush() and NextMsg()
Example ¶
nc, _ := nats.Connect(nats.DefaultURL) nc.Close()
Output:
func (*Conn) ClosedHandler ¶
func (nc *Conn) ClosedHandler() ConnHandler
ClosedHandler will return the closed event handler.
func (*Conn) ConnectedAddr ¶
ConnectedAddr returns the connected server's IP
func (*Conn) ConnectedClusterName ¶
ConnectedClusterName reports the connected server's cluster name if any
func (*Conn) ConnectedServerId ¶
ConnectedServerId reports the connected server's Id
func (*Conn) ConnectedServerName ¶
ConnectedServerName reports the connected server's name
func (*Conn) ConnectedServerVersion ¶
ConnectedServerVersion reports the connected server's version as a string
func (*Conn) ConnectedUrl ¶
ConnectedUrl reports the connected server's URL
func (*Conn) ConnectedUrlRedacted ¶
ConnectedUrlRedacted reports the connected server's URL with passwords redacted
func (*Conn) DisconnectErrHandler ¶
func (nc *Conn) DisconnectErrHandler() ConnErrHandler
DisconnectErrHandler will return the disconnect event handler.
func (*Conn) DiscoveredServers ¶
DiscoveredServers returns only the server urls that have been discovered after a connection has been established. If authentication is enabled, use UserInfo or Token when connecting with these urls.
func (*Conn) DiscoveredServersHandler ¶
func (nc *Conn) DiscoveredServersHandler() ConnHandler
DiscoveredServersHandler will return the discovered servers handler.
func (*Conn) Drain ¶
Drain will put a connection into a drain state. All subscriptions will immediately be put into a drain state. Upon completion, the publishers will be drained and can not publish any additional messages. Upon draining of the publishers, the connection will be closed. Use the ClosedCB option to know when the connection has moved from draining to closed.
See note in Subscription.Drain for JetStream subscriptions.
func (*Conn) ErrorHandler ¶
func (nc *Conn) ErrorHandler() ErrHandler
ErrorHandler will return the async error handler.
func (*Conn) Flush ¶
Flush will perform a round trip to the server and return when it receives the internal reply.
Example ¶
nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() msg := &nats.Msg{Subject: "foo", Reply: "bar", Data: []byte("Hello World!")} for i := 0; i < 1000; i++ { nc.PublishMsg(msg) } err := nc.Flush() if err == nil { // Everything has been processed by the server for nc *Conn. }
Output:
func (*Conn) FlushTimeout ¶
FlushTimeout allows a Flush operation to have an associated timeout.
Example ¶
nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() msg := &nats.Msg{Subject: "foo", Reply: "bar", Data: []byte("Hello World!")} for i := 0; i < 1000; i++ { nc.PublishMsg(msg) } // Only wait for up to 1 second for Flush err := nc.FlushTimeout(1 * time.Second) if err == nil { // Everything has been processed by the server for nc *Conn. }
Output:
func (*Conn) FlushWithContext ¶
FlushWithContext will allow a context to control the duration of a Flush() call. This context should be non-nil and should have a deadline set. We will return an error if none is present.
func (*Conn) ForceReconnect ¶
ForceReconnect forces a reconnect attempt to the server. This is a non-blocking call and will start the reconnect process without waiting for it to complete.
If the connection is already in the process of reconnecting, this call will force an immediate reconnect attempt (bypassing the current reconnect delay).
Example ¶
nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() nc.Subscribe("foo", func(m *nats.Msg) { fmt.Printf("Received a message: %s\n", string(m.Data)) }) // Reconnect to the server. // the subscription will be recreated after the reconnect. nc.ForceReconnect()
Output:
func (*Conn) GetClientID ¶
GetClientID returns the client ID assigned by the server to which the client is currently connected to. Note that the value may change if the client reconnects. This function returns ErrClientIDNotSupported if the server is of a version prior to 1.2.0.
func (*Conn) GetClientIP ¶
GetClientIP returns the client IP as known by the server. Supported as of server version 2.1.6.
func (*Conn) HeadersSupported ¶
HeadersSupported will return if the server supports headers
func (*Conn) IsConnected ¶
IsConnected tests if a Conn is connected.
func (*Conn) IsDraining ¶
IsDraining tests if a Conn is in the draining state.
func (*Conn) IsReconnecting ¶
IsReconnecting tests if a Conn is reconnecting.
func (*Conn) JetStream ¶
func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error)
JetStream returns a JetStreamContext for messaging and stream management. Errors are only returned if inconsistent options are provided.
NOTE: JetStreamContext is part of legacy API. Users are encouraged to switch to the new JetStream API for enhanced capabilities and simplified API. Please refer to the `jetstream` package. See: https://github.com/nats-io/nats.go/blob/main/jetstream/README.md
func (*Conn) LastError ¶
LastError reports the last error encountered via the connection. It can be used reliably within ClosedCB in order to find out reason why connection was closed for example.
func (*Conn) MaxPayload ¶
MaxPayload returns the size limit that a message payload can have. This is set by the server configuration and delivered to the client upon connect.
func (*Conn) NewRespInbox ¶
NewRespInbox is the new format used for _INBOX.
func (*Conn) NumSubscriptions ¶
NumSubscriptions returns active number of subscriptions.
func (*Conn) Publish ¶
Publish publishes the data argument to the given subject. The data argument is left untouched and needs to be correctly interpreted on the receiver.
Example ¶
nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() nc.Publish("foo", []byte("Hello World!"))
Output:
func (*Conn) PublishMsg ¶
PublishMsg publishes the Msg structure, which includes the Subject, an optional Reply and an optional Data field.
Example ¶
nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() msg := &nats.Msg{Subject: "foo", Reply: "bar", Data: []byte("Hello World!")} nc.PublishMsg(msg)
Output:
func (*Conn) PublishRequest ¶
PublishRequest will perform a Publish() expecting a response on the reply subject. Use Request() for automatically waiting for a response inline.
func (*Conn) QueueSubscribe ¶
func (nc *Conn) QueueSubscribe(subj, queue string, cb MsgHandler) (*Subscription, error)
QueueSubscribe creates an asynchronous queue subscriber on the given subject. All subscribers with the same queue name will form the queue group and only one member of the group will be selected to receive any given message asynchronously.
Example ¶
nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() received := 0 nc.QueueSubscribe("foo", "worker_group", func(_ *nats.Msg) { received++ })
Output:
func (*Conn) QueueSubscribeSync ¶
func (nc *Conn) QueueSubscribeSync(subj, queue string) (*Subscription, error)
QueueSubscribeSync creates a synchronous queue subscriber on the given subject. All subscribers with the same queue name will form the queue group and only one member of the group will be selected to receive any given message synchronously using Subscription.NextMsg().
func (*Conn) QueueSubscribeSyncWithChan ¶
func (nc *Conn) QueueSubscribeSyncWithChan(subj, queue string, ch chan *Msg) (*Subscription, error)
QueueSubscribeSyncWithChan will express interest in the given subject. All subscribers with the same queue name will form the queue group and only one member of the group will be selected to receive any given message, which will be placed on the channel. You should not close the channel until sub.Unsubscribe() has been called. Note: This is the same than ChanQueueSubscribe.
func (*Conn) ReconnectHandler ¶
func (nc *Conn) ReconnectHandler() ConnHandler
ReconnectHandler will return the reconnect event handler.
func (*Conn) Request ¶
Request will send a request payload and deliver the response message, or an error, including a timeout if no message was received properly.
Example ¶
nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() nc.Subscribe("foo", func(m *nats.Msg) { nc.Publish(m.Reply, []byte("I will help you")) }) nc.Request("foo", []byte("help"), 50*time.Millisecond)
Output:
func (*Conn) RequestMsg ¶
RequestMsg will send a request payload including optional headers and deliver the response message, or an error, including a timeout if no message was received properly.
func (*Conn) RequestMsgWithContext ¶
RequestMsgWithContext takes a context, a subject and payload in bytes and request expecting a single response.
func (*Conn) RequestWithContext ¶
RequestWithContext takes a context, a subject and payload in bytes and request expecting a single response.
func (*Conn) Servers ¶
Servers returns the list of known server urls, including additional servers discovered after a connection has been established. If authentication is enabled, use UserInfo or Token when connecting with these urls.
func (*Conn) SetClosedHandler ¶
func (nc *Conn) SetClosedHandler(cb ConnHandler)
SetClosedHandler will set the closed event handler.
func (*Conn) SetDisconnectErrHandler ¶
func (nc *Conn) SetDisconnectErrHandler(dcb ConnErrHandler)
SetDisconnectErrHandler will set the disconnect event handler.
func (*Conn) SetDisconnectHandler ¶
func (nc *Conn) SetDisconnectHandler(dcb ConnHandler)
SetDisconnectHandler will set the disconnect event handler. Deprecated: Use SetDisconnectErrHandler
func (*Conn) SetDiscoveredServersHandler ¶
func (nc *Conn) SetDiscoveredServersHandler(dscb ConnHandler)
SetDiscoveredServersHandler will set the discovered servers handler.
func (*Conn) SetErrorHandler ¶
func (nc *Conn) SetErrorHandler(cb ErrHandler)
SetErrorHandler will set the async error handler.
func (*Conn) SetReconnectHandler ¶
func (nc *Conn) SetReconnectHandler(rcb ConnHandler)
SetReconnectHandler will set the reconnect event handler.
func (*Conn) Stats ¶
func (nc *Conn) Stats() Statistics
Stats will return a race safe copy of the Statistics section for the connection.
func (*Conn) StatusChanged ¶
StatusChanged returns a channel on which given list of connection status changes will be reported. If no statuses are provided, defaults will be used: CONNECTED, RECONNECTING, DISCONNECTED, CLOSED.
func (*Conn) Subscribe ¶
func (nc *Conn) Subscribe(subj string, cb MsgHandler) (*Subscription, error)
Subscribe will express interest in the given subject. The subject can have wildcards. There are two type of wildcards: * for partial, and > for full. A subscription on subject time.*.east would receive messages sent to time.us.east and time.eu.east. A subscription on subject time.us.> would receive messages sent to time.us.east and time.us.east.atlanta, while time.us.* would only match time.us.east since it can't match more than one token. Messages will be delivered to the associated MsgHandler.
Example ¶
This Example shows an asynchronous subscriber.
nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() nc.Subscribe("foo", func(m *nats.Msg) { fmt.Printf("Received a message: %s\n", string(m.Data)) })
Output:
func (*Conn) SubscribeSync ¶
func (nc *Conn) SubscribeSync(subj string) (*Subscription, error)
SubscribeSync will express interest on the given subject. Messages will be received synchronously using Subscription.NextMsg().
Example ¶
This Example shows a synchronous subscriber.
nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() sub, _ := nc.SubscribeSync("foo") m, err := sub.NextMsg(1 * time.Second) if err == nil { fmt.Printf("Received a message: %s\n", string(m.Data)) } else { fmt.Println("NextMsg timed out.") }
Output:
func (*Conn) TLSConnectionState ¶
func (nc *Conn) TLSConnectionState() (tls.ConnectionState, error)
TLSConnectionState retrieves the state of the TLS connection to the server
func (*Conn) TLSRequired ¶
TLSRequired will return if the connected server requires TLS connections.
type ConnErrHandler ¶
ConnErrHandler is used to process asynchronous events like disconnected connection with the error (if any).
type ConnHandler ¶
type ConnHandler func(*Conn)
ConnHandler is used for asynchronous events such as disconnected and closed connections.
type ConsumerConfig ¶
type ConsumerConfig struct { Durable string `json:"durable_name,omitempty"` Name string `json:"name,omitempty"` Description string `json:"description,omitempty"` DeliverPolicy DeliverPolicy `json:"deliver_policy"` OptStartSeq uint64 `json:"opt_start_seq,omitempty"` OptStartTime *time.Time `json:"opt_start_time,omitempty"` AckPolicy AckPolicy `json:"ack_policy"` AckWait time.Duration `json:"ack_wait,omitempty"` MaxDeliver int `json:"max_deliver,omitempty"` BackOff []time.Duration `json:"backoff,omitempty"` FilterSubject string `json:"filter_subject,omitempty"` FilterSubjects []string `json:"filter_subjects,omitempty"` ReplayPolicy ReplayPolicy `json:"replay_policy"` RateLimit uint64 `json:"rate_limit_bps,omitempty"` // Bits per sec SampleFrequency string `json:"sample_freq,omitempty"` MaxWaiting int `json:"max_waiting,omitempty"` MaxAckPending int `json:"max_ack_pending,omitempty"` FlowControl bool `json:"flow_control,omitempty"` Heartbeat time.Duration `json:"idle_heartbeat,omitempty"` HeadersOnly bool `json:"headers_only,omitempty"` // Pull based options. MaxRequestBatch int `json:"max_batch,omitempty"` MaxRequestExpires time.Duration `json:"max_expires,omitempty"` MaxRequestMaxBytes int `json:"max_bytes,omitempty"` // Push based consumers. DeliverSubject string `json:"deliver_subject,omitempty"` DeliverGroup string `json:"deliver_group,omitempty"` // Inactivity threshold. InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"` // Generally inherited by parent stream and other markers, now can be configured directly. Replicas int `json:"num_replicas"` // Force memory storage. MemoryStorage bool `json:"mem_storage,omitempty"` // Metadata is additional metadata for the Consumer. // Keys starting with `_nats` are reserved. // NOTE: Metadata requires nats-server v2.10.0+ Metadata map[string]string `json:"metadata,omitempty"` }
ConsumerConfig is the configuration of a JetStream consumer.
type ConsumerInfo ¶
type ConsumerInfo struct { Stream string `json:"stream_name"` Name string `json:"name"` Created time.Time `json:"created"` Config ConsumerConfig `json:"config"` Delivered SequenceInfo `json:"delivered"` AckFloor SequenceInfo `json:"ack_floor"` NumAckPending int `json:"num_ack_pending"` NumRedelivered int `json:"num_redelivered"` NumWaiting int `json:"num_waiting"` NumPending uint64 `json:"num_pending"` Cluster *ClusterInfo `json:"cluster,omitempty"` PushBound bool `json:"push_bound,omitempty"` }
ConsumerInfo is the info from a JetStream consumer.
type ContextOpt ¶
ContextOpt is an option used to set a context.Context.
func Context ¶
func Context(ctx context.Context) ContextOpt
Context returns an option that can be used to configure a context for APIs that are context aware such as those part of the JetStream interface.
Example ¶
nc, err := nats.Connect("localhost") if err != nil { log.Fatal(err) } js, _ := nc.JetStream() // Base context ctx, cancel := context.WithCancel(context.Background()) defer cancel() // nats.Context option implements context.Context interface, so can be used // to create a new context from top level one. nctx := nats.Context(ctx) // JetStreamManager functions all can use context. js.AddStream(&nats.StreamConfig{ Name: "FOO", Subjects: []string{"foo"}, }, nctx) // Custom context with timeout tctx, tcancel := context.WithTimeout(nctx, 2*time.Second) defer tcancel() // Set a timeout for publishing using context. deadlineCtx := nats.Context(tctx) js.Publish("foo", []byte("Hello JS!"), deadlineCtx) sub, _ := js.SubscribeSync("foo") msg, _ := sub.NextMsgWithContext(deadlineCtx) // Acks can also use a context to await for a response. msg.Ack(deadlineCtx)
Output:
type CustomDialer ¶
CustomDialer can be used to specify any dialer, not necessarily a *net.Dialer. A CustomDialer may also implement `SkipTLSHandshake() bool` in order to skip the TLS handshake in case not required.
Example ¶
// Given the following CustomDialer implementation: // // type skipTLSDialer struct { // dialer *net.Dialer // skipTLS bool // } // // func (sd *skipTLSDialer) Dial(network, address string) (net.Conn, error) { // return sd.dialer.Dial(network, address) // } // // func (sd *skipTLSDialer) SkipTLSHandshake() bool { // return true // } // sd := &skipTLSDialer{dialer: &net.Dialer{Timeout: 2 * time.Second}, skipTLS: true} nc, _ := nats.Connect("demo.nats.io", nats.SetCustomDialer(sd)) defer nc.Close()
Output:
type DeleteMarkersOlderThan ¶
DeleteMarkersOlderThan indicates that delete or purge markers older than that will be deleted as part of PurgeDeletes() operation, otherwise, only the data will be removed but markers that are recent will be kept. Note that if no option is specified, the default is 30 minutes. You can set this option to a negative value to instruct to always remove the markers, regardless of their age.
type DeleteOpt ¶
type DeleteOpt interface {
// contains filtered or unexported methods
}
func LastRevision ¶
LastRevision deletes if the latest revision matches.
type DeliverPolicy ¶
type DeliverPolicy int
DeliverPolicy determines how the consumer should select the first message to deliver.
const ( // DeliverAllPolicy starts delivering messages from the very beginning of a // stream. This is the default. DeliverAllPolicy DeliverPolicy = iota // DeliverLastPolicy will start the consumer with the last sequence // received. DeliverLastPolicy // DeliverNewPolicy will only deliver new messages that are sent after the // consumer is created. DeliverNewPolicy // DeliverByStartSequencePolicy will deliver messages starting from a given // sequence. DeliverByStartSequencePolicy // DeliverByStartTimePolicy will deliver messages starting from a given // time. DeliverByStartTimePolicy // DeliverLastPerSubjectPolicy will start the consumer with the last message // for all subjects received. DeliverLastPerSubjectPolicy )
func (DeliverPolicy) MarshalJSON ¶
func (p DeliverPolicy) MarshalJSON() ([]byte, error)
func (*DeliverPolicy) UnmarshalJSON ¶
func (p *DeliverPolicy) UnmarshalJSON(data []byte) error
type DiscardPolicy ¶
type DiscardPolicy int
DiscardPolicy determines how to proceed when limits of messages or bytes are reached.
const ( // DiscardOld will remove older messages to return to the limits. This is // the default. DiscardOld DiscardPolicy = iota // DiscardNew will fail to store new messages. DiscardNew )
func (DiscardPolicy) MarshalJSON ¶
func (dp DiscardPolicy) MarshalJSON() ([]byte, error)
func (DiscardPolicy) String ¶
func (dp DiscardPolicy) String() string
func (*DiscardPolicy) UnmarshalJSON ¶
func (dp *DiscardPolicy) UnmarshalJSON(data []byte) error
type EncodedConn
deprecated
EncodedConn are the preferred way to interface with NATS. They wrap a bare connection to a nats server and have an extendable encoder system that will encode and decode messages from raw Go types.
Deprecated: Encoded connections are no longer supported.
func NewEncodedConn
deprecated
func NewEncodedConn(c *Conn, encType string) (*EncodedConn, error)
NewEncodedConn will wrap an existing Connection and utilize the appropriate registered encoder.
Deprecated: Encoded connections are no longer supported.
func (*EncodedConn) BindRecvChan
deprecated
func (c *EncodedConn) BindRecvChan(subject string, channel any) (*Subscription, error)
BindRecvChan binds a channel for receive operations from NATS.
Deprecated: Encoded connections are no longer supported.
func (*EncodedConn) BindRecvQueueChan
deprecated
func (c *EncodedConn) BindRecvQueueChan(subject, queue string, channel any) (*Subscription, error)
BindRecvQueueChan binds a channel for queue-based receive operations from NATS.
Deprecated: Encoded connections are no longer supported.
func (*EncodedConn) BindSendChan
deprecated
func (c *EncodedConn) BindSendChan(subject string, channel any) error
BindSendChan binds a channel for send operations to NATS.
Deprecated: Encoded connections are no longer supported.
func (*EncodedConn) Close
deprecated
func (c *EncodedConn) Close()
Close will close the connection to the server. This call will release all blocking calls, such as Flush(), etc.
Deprecated: Encoded connections are no longer supported.
func (*EncodedConn) Drain
deprecated
func (c *EncodedConn) Drain() error
Drain will put a connection into a drain state. All subscriptions will immediately be put into a drain state. Upon completion, the publishers will be drained and can not publish any additional messages. Upon draining of the publishers, the connection will be closed. Use the ClosedCB() option to know when the connection has moved from draining to closed.
Deprecated: Encoded connections are no longer supported.
func (*EncodedConn) Flush
deprecated
func (c *EncodedConn) Flush() error
Flush will perform a round trip to the server and return when it receives the internal reply.
Deprecated: Encoded connections are no longer supported.
func (*EncodedConn) FlushTimeout
deprecated
func (c *EncodedConn) FlushTimeout(timeout time.Duration) (err error)
FlushTimeout allows a Flush operation to have an associated timeout.
Deprecated: Encoded connections are no longer supported.
func (*EncodedConn) LastError
deprecated
func (c *EncodedConn) LastError() error
LastError reports the last error encountered via the Connection.
Deprecated: Encoded connections are no longer supported.
func (*EncodedConn) Publish
deprecated
func (c *EncodedConn) Publish(subject string, v any) error
Publish publishes the data argument to the given subject. The data argument will be encoded using the associated encoder.
Deprecated: Encoded connections are no longer supported.
func (*EncodedConn) PublishRequest
deprecated
func (c *EncodedConn) PublishRequest(subject, reply string, v any) error
PublishRequest will perform a Publish() expecting a response on the reply subject. Use Request() for automatically waiting for a response inline.
Deprecated: Encoded connections are no longer supported.
func (*EncodedConn) QueueSubscribe
deprecated
func (c *EncodedConn) QueueSubscribe(subject, queue string, cb Handler) (*Subscription, error)
QueueSubscribe will create a queue subscription on the given subject and process incoming messages using the specified Handler. The Handler should be a func that matches a signature from the description of Handler from above.
Deprecated: Encoded connections are no longer supported.
func (*EncodedConn) Request
deprecated
func (*EncodedConn) RequestWithContext
deprecated
func (c *EncodedConn) RequestWithContext(ctx context.Context, subject string, v any, vPtr any) error
RequestWithContext will create an Inbox and perform a Request using the provided cancellation context with the Inbox reply for the data v. A response will be decoded into the vPtr last parameter.
Deprecated: Encoded connections are no longer supported.
func (*EncodedConn) Subscribe
deprecated
func (c *EncodedConn) Subscribe(subject string, cb Handler) (*Subscription, error)
Subscribe will create a subscription on the given subject and process incoming messages using the specified Handler. The Handler should be a func that matches a signature from the description of Handler from above.
Deprecated: Encoded connections are no longer supported.
type ErrConsumerSequenceMismatch ¶
type ErrConsumerSequenceMismatch struct { // StreamResumeSequence is the stream sequence from where the consumer // should resume consuming from the stream. StreamResumeSequence uint64 // ConsumerSequence is the sequence of the consumer that is behind. ConsumerSequence uint64 // LastConsumerSequence is the sequence of the consumer when the heartbeat // was received. LastConsumerSequence uint64 }
ErrConsumerSequenceMismatch represents an error from a consumer that received a Heartbeat including sequence different to the one expected from the view of the client.
func (*ErrConsumerSequenceMismatch) Error ¶
func (ecs *ErrConsumerSequenceMismatch) Error() string
type ErrHandler ¶
type ErrHandler func(*Conn, *Subscription, error)
ErrHandler is used to process asynchronous errors encountered while processing inbound messages.
type ErrorCode ¶
type ErrorCode uint16
Error code represents JetStream error codes returned by the API
const ( JSErrCodeJetStreamNotEnabledForAccount ErrorCode = 10039 JSErrCodeJetStreamNotEnabled ErrorCode = 10076 JSErrCodeInsufficientResourcesErr ErrorCode = 10023 JSErrCodeStreamNotFound ErrorCode = 10059 JSErrCodeStreamNameInUse ErrorCode = 10058 JSErrCodeConsumerNotFound ErrorCode = 10014 JSErrCodeConsumerNameExists ErrorCode = 10013 JSErrCodeConsumerAlreadyExists ErrorCode = 10105 JSErrCodeDuplicateFilterSubjects ErrorCode = 10136 JSErrCodeOverlappingFilterSubjects ErrorCode = 10138 JSErrCodeConsumerEmptyFilter ErrorCode = 10139 JSErrCodeMessageNotFound ErrorCode = 10037 JSErrCodeBadRequest ErrorCode = 10003 JSStreamInvalidConfig ErrorCode = 10052 JSErrCodeStreamWrongLastSequence ErrorCode = 10071 )
type ExternalStream ¶
type ExternalStream struct { APIPrefix string `json:"api"` DeliverPrefix string `json:"deliver,omitempty"` }
ExternalStream allows you to qualify access to a stream source in another account.
type GetObjectInfoOpt ¶
type GetObjectInfoOpt interface {
// contains filtered or unexported methods
}
func GetObjectInfoShowDeleted ¶
func GetObjectInfoShowDeleted() GetObjectInfoOpt
GetObjectInfoShowDeleted makes GetInfo() return object if it was marked as deleted.
type GetObjectOpt ¶
type GetObjectOpt interface {
// contains filtered or unexported methods
}
func GetObjectShowDeleted ¶
func GetObjectShowDeleted() GetObjectOpt
GetObjectShowDeleted makes Get() return object if it was marked as deleted.
type Handler
deprecated
type Handler any
Handler is a specific callback used for Subscribe. It is generalized to an any, but we will discover its format and arguments at runtime and perform the correct callback, including demarshaling encoded data back into the appropriate struct based on the signature of the Handler.
Handlers are expected to have one of four signatures.
type person struct { Name string `json:"name,omitempty"` Age uint `json:"age,omitempty"` } handler := func(m *Msg) handler := func(p *person) handler := func(subject string, o *obj) handler := func(subject, reply string, o *obj)
These forms allow a callback to request a raw Msg ptr, where the processing of the message from the wire is untouched. Process a JSON representation and demarshal it into the given struct, e.g. person. There are also variants where the callback wants either the subject, or the subject and the reply subject.
Deprecated: Encoded connections are no longer supported.
type Header ¶
Header represents the optional Header for a NATS message, based on the implementation of http.Header.
func DecodeHeadersMsg ¶
DecodeHeadersMsg will decode and headers.
func (Header) Add ¶
Add adds the key, value pair to the header. It is case-sensitive and appends to any existing values associated with key.
type InProcessConnProvider ¶
type JSOpt ¶
type JSOpt interface {
// contains filtered or unexported methods
}
JSOpt configures a JetStreamContext.
Example ¶
A JetStream context can be configured with a default timeout using nats.MaxWait or with a custom API prefix in case of using an imported JetStream from another account.
nc, err := nats.Connect("localhost") if err != nil { log.Fatal(err) } // Use the JetStream context to manage streams and consumers (with nats.APIPrefix JSOpt) js, err := nc.JetStream(nats.APIPrefix("dlc"), nats.MaxWait(5*time.Second)) if err != nil { log.Fatal(err) } sub, _ := js.SubscribeSync("foo") js.Publish("foo", []byte("Hello JS!")) sub.NextMsg(2 * time.Second)
Output:
func DirectGet ¶
func DirectGet() JSOpt
DirectGet is an option that can be used to make GetMsg() or GetLastMsg() retrieve message directly from a group of servers (leader and replicas) if the stream was created with the AllowDirect option.
func DirectGetNext ¶
DirectGetNext is an option that can be used to make GetMsg() retrieve message directly from a group of servers (leader and replicas) if the stream was created with the AllowDirect option. The server will find the next message matching the filter `subject` starting at the start sequence (argument in GetMsg()). The filter `subject` can be a wildcard.
func PublishAsyncErrHandler ¶
func PublishAsyncErrHandler(cb MsgErrHandler) JSOpt
PublishAsyncErrHandler sets the error handler for async publishes in JetStream.
func PublishAsyncMaxPending ¶
PublishAsyncMaxPending sets the maximum outstanding async publishes that can be inflight at one time.
func StreamListFilter ¶
StreamListFilter is an option that can be used to configure `StreamsInfo()` and `StreamNames()` requests. It allows filtering the returned streams by subject associated with each stream. Wildcards can be used. For example, `StreamListFilter(FOO.*.A) will return all streams which have at least one subject matching the provided pattern (e.g. FOO.TEST.A).
func UseLegacyDurableConsumers ¶
func UseLegacyDurableConsumers() JSOpt
UseLegacyDurableConsumers makes JetStream use the legacy (pre nats-server v2.9.0) subjects for consumer creation. If this option is used when creating JetStremContext, $JS.API.CONSUMER.DURABLE.CREATE.<stream>.<consumer> will be used to create a consumer with Durable provided, rather than $JS.API.CONSUMER.CREATE.<stream>.<consumer>.
type JetStream ¶
type JetStream interface { // Publish publishes a message to JetStream. Publish(subj string, data []byte, opts ...PubOpt) (*PubAck, error) // PublishMsg publishes a Msg to JetStream. PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) // PublishAsync publishes a message to JetStream and returns a PubAckFuture. // The data should not be changed until the PubAckFuture has been processed. PublishAsync(subj string, data []byte, opts ...PubOpt) (PubAckFuture, error) // PublishMsgAsync publishes a Msg to JetStream and returns a PubAckFuture. // The message should not be changed until the PubAckFuture has been processed. PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) // PublishAsyncPending returns the number of async publishes outstanding for this context. PublishAsyncPending() int // PublishAsyncComplete returns a channel that will be closed when all outstanding messages are ack'd. PublishAsyncComplete() <-chan struct{} // CleanupPublisher will cleanup the publishing side of JetStreamContext. // // This will unsubscribe from the internal reply subject if needed. // All pending async publishes will fail with ErrJetStreamPublisherClosed. // // If an error handler was provided, it will be called for each pending async // publish and PublishAsyncComplete will be closed. // // After completing JetStreamContext is still usable - internal subscription // will be recreated on next publish, but the acks from previous publishes will // be lost. CleanupPublisher() // Subscribe creates an async Subscription for JetStream. // The stream and consumer names can be provided with the nats.Bind() option. // For creating an ephemeral (where the consumer name is picked by the server), // you can provide the stream name with nats.BindStream(). // If no stream name is specified, the library will attempt to figure out which // stream the subscription is for. See important notes below for more details. // // IMPORTANT NOTES: // * If none of the options Bind() nor Durable() are specified, the library will // send a request to the server to create an ephemeral JetStream consumer, // which will be deleted after an Unsubscribe() or Drain(), or automatically // by the server after a short period of time after the NATS subscription is // gone. // * If Durable() option is specified, the library will attempt to lookup a JetStream // consumer with this name, and if found, will bind to it and not attempt to // delete it. However, if not found, the library will send a request to // create such durable JetStream consumer. Note that the library will delete // the JetStream consumer after an Unsubscribe() or Drain() only if it // created the durable consumer while subscribing. If the durable consumer // already existed prior to subscribing it won't be deleted. // * If Bind() option is provided, the library will attempt to lookup the // consumer with the given name, and if successful, bind to it. If the lookup fails, // then the Subscribe() call will return an error. Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) // SubscribeSync creates a Subscription that can be used to process messages synchronously. // See important note in Subscribe() SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error) // ChanSubscribe creates channel based Subscription. // See important note in Subscribe() ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) // ChanQueueSubscribe creates channel based Subscription with a queue group. // See important note in QueueSubscribe() ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) // QueueSubscribe creates a Subscription with a queue group. // If no optional durable name nor binding options are specified, the queue name will be used as a durable name. // See important note in Subscribe() QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) // QueueSubscribeSync creates a Subscription with a queue group that can be used to process messages synchronously. // See important note in QueueSubscribe() QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error) // PullSubscribe creates a Subscription that can fetch messages. // See important note in Subscribe(). Additionally, for an ephemeral pull consumer, the "durable" value must be // set to an empty string. PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error) }
JetStream allows persistent messaging through JetStream.
NOTE: JetStream is part of legacy API. Users are encouraged to switch to the new JetStream API for enhanced capabilities and simplified API. Please refer to the `jetstream` package. See: https://github.com/nats-io/nats.go/blob/main/jetstream/README.md
Example ¶
nc, err := nats.Connect("localhost") if err != nil { log.Fatal(err) } // Use the JetStream context to produce and consumer messages // that have been persisted. js, err := nc.JetStream(nats.PublishAsyncMaxPending(256)) if err != nil { log.Fatal(err) } js.AddStream(&nats.StreamConfig{ Name: "FOO", Subjects: []string{"foo"}, }) js.Publish("foo", []byte("Hello JS!")) // Publish messages asynchronously. for i := 0; i < 500; i++ { js.PublishAsync("foo", []byte("Hello JS Async!")) } select { case <-js.PublishAsyncComplete(): case <-time.After(5 * time.Second): fmt.Println("Did not resolve in time") } // Create async consumer on subject 'foo'. Async subscribers // ack a message once exiting the callback. js.Subscribe("foo", func(msg *nats.Msg) { meta, _ := msg.Metadata() fmt.Printf("Stream Sequence : %v\n", meta.Sequence.Stream) fmt.Printf("Consumer Sequence: %v\n", meta.Sequence.Consumer) }) // Async subscriber with manual acks. js.Subscribe("foo", func(msg *nats.Msg) { msg.Ack() }, nats.ManualAck()) // Async queue subscription where members load balance the // received messages together. // If no consumer name is specified, either with nats.Bind() // or nats.Durable() options, the queue name is used as the // durable name (that is, as if you were passing the // nats.Durable(<queue group name>) option. // It is recommended to use nats.Bind() or nats.Durable() // and preferably create the JetStream consumer beforehand // (using js.AddConsumer) so that the JS consumer is not // deleted on an Unsubscribe() or Drain() when the member // that created the consumer goes away first. // Check Godoc for the QueueSubscribe() API for more details. js.QueueSubscribe("foo", "group", func(msg *nats.Msg) { msg.Ack() }, nats.ManualAck()) // Subscriber to consume messages synchronously. sub, _ := js.SubscribeSync("foo") msg, _ := sub.NextMsg(2 * time.Second) msg.Ack() // We can add a member to the group, with this member using // the synchronous version of the QueueSubscribe. sub, _ = js.QueueSubscribeSync("foo", "group") msg, _ = sub.NextMsg(2 * time.Second) msg.Ack() // ChanSubscribe msgCh := make(chan *nats.Msg, 8192) sub, _ = js.ChanSubscribe("foo", msgCh) select { case msg := <-msgCh: fmt.Println("[Received]", msg) case <-time.After(1 * time.Second): } // Create Pull based consumer with maximum 128 inflight. sub, _ = js.PullSubscribe("foo", "wq", nats.PullMaxWaiting(128)) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() for { select { case <-ctx.Done(): return default: } msgs, _ := sub.Fetch(10, nats.Context(ctx)) for _, msg := range msgs { msg.Ack() } }
Output:
type JetStreamContext ¶
type JetStreamContext interface { JetStream JetStreamManager KeyValueManager ObjectStoreManager }
JetStreamContext allows JetStream messaging and stream management.
NOTE: JetStreamContext is part of legacy API. Users are encouraged to switch to the new JetStream API for enhanced capabilities and simplified API. Please refer to the `jetstream` package. See: https://github.com/nats-io/nats.go/blob/main/jetstream/README.md
Example ¶
A JetStreamContext is the composition of a JetStream and JetStreamManagement interfaces. In case of only requiring publishing/consuming messages, can create a context that only uses the JetStream interface.
nc, _ := nats.Connect("localhost") var js nats.JetStream var jsm nats.JetStreamManager var jsctx nats.JetStreamContext // JetStream that can publish/subscribe but cannot manage streams. js, _ = nc.JetStream() js.Publish("foo", []byte("hello")) // JetStream context that can manage streams/consumers but cannot produce messages. jsm, _ = nc.JetStream() jsm.AddStream(&nats.StreamConfig{Name: "FOO"}) // JetStream context that can both manage streams/consumers // as well as publish/subscribe. jsctx, _ = nc.JetStream() jsctx.AddStream(&nats.StreamConfig{Name: "BAR"}) jsctx.Publish("bar", []byte("hello world"))
Output:
type JetStreamError ¶
JetStreamError is an error result that happens when using JetStream. In case of client-side error, `APIError()` returns nil
var ( // ErrJetStreamNotEnabled is an error returned when JetStream is not enabled for an account. // // Note: This error will not be returned in clustered mode, even if each // server in the cluster does not have JetStream enabled. In clustered mode, // requests will time out instead. ErrJetStreamNotEnabled JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeJetStreamNotEnabled, Description: "jetstream not enabled", Code: 503}} // ErrJetStreamNotEnabledForAccount is an error returned when JetStream is not enabled for an account. ErrJetStreamNotEnabledForAccount JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeJetStreamNotEnabledForAccount, Description: "jetstream not enabled for account", Code: 503}} // ErrStreamNotFound is an error returned when stream with given name does not exist. ErrStreamNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeStreamNotFound, Description: "stream not found", Code: 404}} // ErrStreamNameAlreadyInUse is returned when a stream with given name already exists and has a different configuration. ErrStreamNameAlreadyInUse JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeStreamNameInUse, Description: "stream name already in use", Code: 400}} // ErrStreamSubjectTransformNotSupported is returned when the connected nats-server version does not support setting // the stream subject transform. If this error is returned when executing AddStream(), the stream with invalid // configuration was already created in the server. ErrStreamSubjectTransformNotSupported JetStreamError = &jsError{message: "stream subject transformation not supported by nats-server"} // ErrStreamSourceSubjectTransformNotSupported is returned when the connected nats-server version does not support setting // the stream source subject transform. If this error is returned when executing AddStream(), the stream with invalid // configuration was already created in the server. ErrStreamSourceSubjectTransformNotSupported JetStreamError = &jsError{message: "stream subject transformation not supported by nats-server"} // ErrStreamSourceNotSupported is returned when the connected nats-server version does not support setting // the stream sources. If this error is returned when executing AddStream(), the stream with invalid // configuration was already created in the server. ErrStreamSourceNotSupported JetStreamError = &jsError{message: "stream sourcing is not supported by nats-server"} // ErrStreamSourceMultipleSubjectTransformsNotSupported is returned when the connected nats-server version does not support setting // the stream sources. If this error is returned when executing AddStream(), the stream with invalid // configuration was already created in the server. ErrStreamSourceMultipleSubjectTransformsNotSupported JetStreamError = &jsError{message: "stream sourcing with multiple subject transforms not supported by nats-server"} // ErrConsumerNotFound is an error returned when consumer with given name does not exist. ErrConsumerNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerNotFound, Description: "consumer not found", Code: 404}} // ErrMsgNotFound is returned when message with provided sequence number does npt exist. ErrMsgNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeMessageNotFound, Description: "message not found", Code: 404}} // ErrBadRequest is returned when invalid request is sent to JetStream API. ErrBadRequest JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeBadRequest, Description: "bad request", Code: 400}} // ErrDuplicateFilterSubjects is returned when both FilterSubject and FilterSubjects are specified when creating consumer. ErrDuplicateFilterSubjects JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeDuplicateFilterSubjects, Description: "consumer cannot have both FilterSubject and FilterSubjects specified", Code: 500}} // ErrDuplicateFilterSubjects is returned when filter subjects overlap when creating consumer. ErrOverlappingFilterSubjects JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeOverlappingFilterSubjects, Description: "consumer subject filters cannot overlap", Code: 500}} // ErrEmptyFilter is returned when a filter in FilterSubjects is empty. ErrEmptyFilter JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerEmptyFilter, Description: "consumer filter in FilterSubjects cannot be empty", Code: 500}} // ErrConsumerNameAlreadyInUse is an error returned when consumer with given name already exists. ErrConsumerNameAlreadyInUse JetStreamError = &jsError{message: "consumer name already in use"} // ErrConsumerNotActive is an error returned when consumer is not active. ErrConsumerNotActive JetStreamError = &jsError{message: "consumer not active"} // ErrInvalidJSAck is returned when JetStream ack from message publish is invalid. ErrInvalidJSAck JetStreamError = &jsError{message: "invalid jetstream publish response"} // ErrStreamConfigRequired is returned when empty stream configuration is supplied to add/update stream. ErrStreamConfigRequired JetStreamError = &jsError{message: "stream configuration is required"} // ErrStreamNameRequired is returned when the provided stream name is empty. ErrStreamNameRequired JetStreamError = &jsError{message: "stream name is required"} // ErrConsumerNameRequired is returned when the provided consumer durable name is empty. ErrConsumerNameRequired JetStreamError = &jsError{message: "consumer name is required"} // ErrConsumerMultipleFilterSubjectsNotSupported is returned when the connected nats-server version does not support setting // multiple filter subjects with filter_subjects field. If this error is returned when executing AddConsumer(), the consumer with invalid // configuration was already created in the server. ErrConsumerMultipleFilterSubjectsNotSupported JetStreamError = &jsError{message: "multiple consumer filter subjects not supported by nats-server"} // ErrConsumerConfigRequired is returned when empty consumer consuguration is supplied to add/update consumer. ErrConsumerConfigRequired JetStreamError = &jsError{message: "consumer configuration is required"} // ErrPullSubscribeToPushConsumer is returned when attempting to use PullSubscribe on push consumer. ErrPullSubscribeToPushConsumer JetStreamError = &jsError{message: "cannot pull subscribe to push based consumer"} // ErrPullSubscribeRequired is returned when attempting to use subscribe methods not suitable for pull consumers for pull consumers. ErrPullSubscribeRequired JetStreamError = &jsError{message: "must use pull subscribe to bind to pull based consumer"} // ErrMsgAlreadyAckd is returned when attempting to acknowledge message more than once. ErrMsgAlreadyAckd JetStreamError = &jsError{message: "message was already acknowledged"} // ErrNoStreamResponse is returned when there is no response from stream (e.g. no responders error). ErrNoStreamResponse JetStreamError = &jsError{message: "no response from stream"} // ErrNotJSMessage is returned when attempting to get metadata from non JetStream message . ErrNotJSMessage JetStreamError = &jsError{message: "not a jetstream message"} // ErrInvalidStreamName is returned when the provided stream name is invalid (contains '.' or ' '). ErrInvalidStreamName JetStreamError = &jsError{message: "invalid stream name"} // ErrInvalidConsumerName is returned when the provided consumer name is invalid (contains '.' or ' '). ErrInvalidConsumerName JetStreamError = &jsError{message: "invalid consumer name"} // ErrInvalidFilterSubject is returned when the provided filter subject is invalid. ErrInvalidFilterSubject JetStreamError = &jsError{message: "invalid filter subject"} // ErrNoMatchingStream is returned when stream lookup by subject is unsuccessful. ErrNoMatchingStream JetStreamError = &jsError{message: "no stream matches subject"} // ErrSubjectMismatch is returned when the provided subject does not match consumer's filter subject. ErrSubjectMismatch JetStreamError = &jsError{message: "subject does not match consumer"} // ErrContextAndTimeout is returned when attempting to use both context and timeout. ErrContextAndTimeout JetStreamError = &jsError{message: "context and timeout can not both be set"} // ErrCantAckIfConsumerAckNone is returned when attempting to ack a message for consumer with AckNone policy set. ErrCantAckIfConsumerAckNone JetStreamError = &jsError{message: "cannot acknowledge a message for a consumer with AckNone policy"} // ErrConsumerDeleted is returned when attempting to send pull request to a consumer which does not exist ErrConsumerDeleted JetStreamError = &jsError{message: "consumer deleted"} // ErrConsumerLeadershipChanged is returned when pending requests are no longer valid after leadership has changed ErrConsumerLeadershipChanged JetStreamError = &jsError{message: "Leadership Changed"} // ErrNoHeartbeat is returned when no heartbeat is received from server when sending requests with pull consumer. ErrNoHeartbeat JetStreamError = &jsError{message: "no heartbeat received"} // ErrSubscriptionClosed is returned when attempting to send pull request to a closed subscription ErrSubscriptionClosed JetStreamError = &jsError{message: "subscription closed"} // ErrJetStreamPublisherClosed is returned for each unfinished ack future when JetStream.Cleanup is called. ErrJetStreamPublisherClosed JetStreamError = &jsError{message: "jetstream context closed"} // Deprecated: ErrInvalidDurableName is no longer returned and will be removed in future releases. // Use ErrInvalidConsumerName instead. ErrInvalidDurableName = errors.New("nats: invalid durable name") )
var (
ErrKeyExists JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeStreamWrongLastSequence, Code: 400}, message: "key exists"}
)
type JetStreamManager ¶
type JetStreamManager interface { // AddStream creates a stream. AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) // UpdateStream updates a stream. UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) // DeleteStream deletes a stream. DeleteStream(name string, opts ...JSOpt) error // StreamInfo retrieves information from a stream. StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) // PurgeStream purges a stream messages. PurgeStream(name string, opts ...JSOpt) error // StreamsInfo can be used to retrieve a list of StreamInfo objects. // Deprecated: Use Streams() instead. StreamsInfo(opts ...JSOpt) <-chan *StreamInfo // Streams can be used to retrieve a list of StreamInfo objects. Streams(opts ...JSOpt) <-chan *StreamInfo // StreamNames is used to retrieve a list of Stream names. StreamNames(opts ...JSOpt) <-chan string // GetMsg retrieves a raw stream message stored in JetStream by sequence number. // Use options nats.DirectGet() or nats.DirectGetNext() to trigger retrieval // directly from a distributed group of servers (leader and replicas). // The stream must have been created/updated with the AllowDirect boolean. GetMsg(name string, seq uint64, opts ...JSOpt) (*RawStreamMsg, error) // GetLastMsg retrieves the last raw stream message stored in JetStream by subject. // Use option nats.DirectGet() to trigger retrieval // directly from a distributed group of servers (leader and replicas). // The stream must have been created/updated with the AllowDirect boolean. GetLastMsg(name, subject string, opts ...JSOpt) (*RawStreamMsg, error) // DeleteMsg deletes a message from a stream. The message is marked as erased, but its value is not overwritten. DeleteMsg(name string, seq uint64, opts ...JSOpt) error // SecureDeleteMsg deletes a message from a stream. The deleted message is overwritten with random data // As a result, this operation is slower than DeleteMsg() SecureDeleteMsg(name string, seq uint64, opts ...JSOpt) error // AddConsumer adds a consumer to a stream. // If the consumer already exists, and the configuration is the same, it // will return the existing consumer. // If the consumer already exists, and the configuration is different, it // will return ErrConsumerNameAlreadyInUse. AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) // UpdateConsumer updates an existing consumer. UpdateConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) // DeleteConsumer deletes a consumer. DeleteConsumer(stream, consumer string, opts ...JSOpt) error // ConsumerInfo retrieves information of a consumer from a stream. ConsumerInfo(stream, name string, opts ...JSOpt) (*ConsumerInfo, error) // ConsumersInfo is used to retrieve a list of ConsumerInfo objects. // Deprecated: Use Consumers() instead. ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo // Consumers is used to retrieve a list of ConsumerInfo objects. Consumers(stream string, opts ...JSOpt) <-chan *ConsumerInfo // ConsumerNames is used to retrieve a list of Consumer names. ConsumerNames(stream string, opts ...JSOpt) <-chan string // AccountInfo retrieves info about the JetStream usage from an account. AccountInfo(opts ...JSOpt) (*AccountInfo, error) // StreamNameBySubject returns a stream matching given subject. StreamNameBySubject(string, ...JSOpt) (string, error) }
JetStreamManager manages JetStream Streams and Consumers.
Example ¶
nc, _ := nats.Connect("localhost") js, _ := nc.JetStream() // Create a stream js.AddStream(&nats.StreamConfig{ Name: "FOO", Subjects: []string{"foo"}, MaxBytes: 1024, }) // Update a stream js.UpdateStream(&nats.StreamConfig{ Name: "FOO", MaxBytes: 2048, }) // Create a durable consumer js.AddConsumer("FOO", &nats.ConsumerConfig{ Durable: "BAR", }) // Get information about all streams (with Context JSOpt) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() for info := range js.StreamsInfo(nats.Context(ctx)) { fmt.Println("stream name:", info.Config.Name) } // Get information about all consumers (with MaxWait JSOpt) for info := range js.ConsumersInfo("FOO", nats.MaxWait(10*time.Second)) { fmt.Println("consumer name:", info.Name) } // Delete a consumer js.DeleteConsumer("FOO", "BAR") // Delete a stream js.DeleteStream("FOO")
Output:
type KeyValue ¶
type KeyValue interface { // Get returns the latest value for the key. Get(key string) (entry KeyValueEntry, err error) // GetRevision returns a specific revision value for the key. GetRevision(key string, revision uint64) (entry KeyValueEntry, err error) // Put will place the new value for the key into the store. Put(key string, value []byte) (revision uint64, err error) // PutString will place the string for the key into the store. PutString(key string, value string) (revision uint64, err error) // Create will add the key/value pair iff it does not exist. Create(key string, value []byte) (revision uint64, err error) // Update will update the value iff the latest revision matches. // Update also resets the TTL associated with the key (if any). Update(key string, value []byte, last uint64) (revision uint64, err error) // Delete will place a delete marker and leave all revisions. Delete(key string, opts ...DeleteOpt) error // Purge will place a delete marker and remove all previous revisions. Purge(key string, opts ...DeleteOpt) error // Watch for any updates to keys that match the keys argument which could include wildcards. // Watch will send a nil entry when it has received all initial values. Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) // WatchAll will invoke the callback for all updates. WatchAll(opts ...WatchOpt) (KeyWatcher, error) // Keys will return all keys. // Deprecated: Use ListKeys instead to avoid memory issues. Keys(opts ...WatchOpt) ([]string, error) // ListKeys will return all keys in a channel. ListKeys(opts ...WatchOpt) (KeyLister, error) // History will return all historical values for the key. History(key string, opts ...WatchOpt) ([]KeyValueEntry, error) // Bucket returns the current bucket name. Bucket() string // PurgeDeletes will remove all current delete markers. PurgeDeletes(opts ...PurgeOpt) error // Status retrieves the status and configuration of a bucket Status() (KeyValueStatus, error) }
KeyValue contains methods to operate on a KeyValue store.
type KeyValueBucketStatus ¶
type KeyValueBucketStatus struct {
// contains filtered or unexported fields
}
KeyValueBucketStatus represents status of a Bucket, implements KeyValueStatus
func (*KeyValueBucketStatus) BackingStore ¶
func (s *KeyValueBucketStatus) BackingStore() string
BackingStore indicates what technology is used for storage of the bucket
func (*KeyValueBucketStatus) Bucket ¶
func (s *KeyValueBucketStatus) Bucket() string
Bucket the name of the bucket
func (*KeyValueBucketStatus) Bytes ¶
func (s *KeyValueBucketStatus) Bytes() uint64
Bytes is the size of the stream
func (*KeyValueBucketStatus) History ¶
func (s *KeyValueBucketStatus) History() int64
History returns the configured history kept per key
func (*KeyValueBucketStatus) IsCompressed ¶
func (s *KeyValueBucketStatus) IsCompressed() bool
IsCompressed indicates if the data is compressed on disk
func (*KeyValueBucketStatus) StreamInfo ¶
func (s *KeyValueBucketStatus) StreamInfo() *StreamInfo
StreamInfo is the stream info retrieved to create the status
func (*KeyValueBucketStatus) TTL ¶
func (s *KeyValueBucketStatus) TTL() time.Duration
TTL is how long the bucket keeps values for
func (*KeyValueBucketStatus) Values ¶
func (s *KeyValueBucketStatus) Values() uint64
Values is how many messages are in the bucket, including historical values
type KeyValueConfig ¶
type KeyValueConfig struct { Bucket string `json:"bucket"` Description string `json:"description,omitempty"` MaxValueSize int32 `json:"max_value_size,omitempty"` History uint8 `json:"history,omitempty"` TTL time.Duration `json:"ttl,omitempty"` MaxBytes int64 `json:"max_bytes,omitempty"` Storage StorageType `json:"storage,omitempty"` Replicas int `json:"num_replicas,omitempty"` Placement *Placement `json:"placement,omitempty"` RePublish *RePublish `json:"republish,omitempty"` Mirror *StreamSource `json:"mirror,omitempty"` Sources []*StreamSource `json:"sources,omitempty"` // Enable underlying stream compression. // NOTE: Compression is supported for nats-server 2.10.0+ Compression bool `json:"compression,omitempty"` }
KeyValueConfig is for configuring a KeyValue store.
type KeyValueEntry ¶
type KeyValueEntry interface { // Bucket is the bucket the data was loaded from. Bucket() string // Key is the key that was retrieved. Key() string // Value is the retrieved value. Value() []byte // Revision is a unique sequence for this value. Revision() uint64 // Created is the time the data was put in the bucket. Created() time.Time // Delta is distance from the latest value. Delta() uint64 // Operation returns Put or Delete or Purge. Operation() KeyValueOp }
KeyValueEntry is a retrieved entry for Get or List or Watch.
type KeyValueManager ¶
type KeyValueManager interface { // KeyValue will lookup and bind to an existing KeyValue store. KeyValue(bucket string) (KeyValue, error) // CreateKeyValue will create a KeyValue store with the following configuration. CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) // DeleteKeyValue will delete this KeyValue store (JetStream stream). DeleteKeyValue(bucket string) error // KeyValueStoreNames is used to retrieve a list of key value store names KeyValueStoreNames() <-chan string // KeyValueStores is used to retrieve a list of key value store statuses KeyValueStores() <-chan KeyValueStatus }
KeyValueManager is used to manage KeyValue stores.
type KeyValueOp ¶
type KeyValueOp uint8
const ( KeyValuePut KeyValueOp = iota KeyValueDelete KeyValuePurge )
func (KeyValueOp) String ¶
func (op KeyValueOp) String() string
type KeyValueStatus ¶
type KeyValueStatus interface { // Bucket the name of the bucket Bucket() string // Values is how many messages are in the bucket, including historical values Values() uint64 // History returns the configured history kept per key History() int64 // TTL is how long the bucket keeps values for TTL() time.Duration // BackingStore indicates what technology is used for storage of the bucket BackingStore() string // Bytes returns the size in bytes of the bucket Bytes() uint64 // IsCompressed indicates if the data is compressed on disk IsCompressed() bool }
KeyValueStatus is run-time status about a Key-Value bucket
type KeyWatcher ¶
type KeyWatcher interface { // Context returns watcher context optionally provided by nats.Context option. Context() context.Context // Updates returns a channel to read any updates to entries. Updates() <-chan KeyValueEntry // Stop will stop this watcher. Stop() error }
KeyWatcher is what is returned when doing a watch.
type ListObjectsOpt ¶
type ListObjectsOpt interface {
// contains filtered or unexported methods
}
func ListObjectsShowDeleted ¶
func ListObjectsShowDeleted() ListObjectsOpt
ListObjectsShowDeleted makes ListObjects() return deleted objects.
type MaxWait ¶
MaxWait sets the maximum amount of time we will wait for a response.
Example ¶
nc, _ := nats.Connect("localhost") // Set default timeout for JetStream API requests, // following requests will inherit this timeout. js, _ := nc.JetStream(nats.MaxWait(3 * time.Second)) // Set custom timeout for a JetStream API request. js.AddStream(&nats.StreamConfig{ Name: "FOO", Subjects: []string{"foo"}, }, nats.MaxWait(2*time.Second)) sub, _ := js.PullSubscribe("foo", "my-durable-name") // Fetch using the default timeout of 3 seconds. msgs, _ := sub.Fetch(1) // Set custom timeout for a pull batch request. msgs, _ = sub.Fetch(1, nats.MaxWait(2*time.Second)) for _, msg := range msgs { msg.Ack() }
Output:
type MessageBatch ¶
type MessageBatch interface { // Messages returns a channel on which messages will be published. Messages() <-chan *Msg // Error returns an error encountered when fetching messages. Error() error // Done signals end of execution. Done() <-chan struct{} }
MessageBatch provides methods to retrieve messages consumed using [Subscribe.FetchBatch].
type Msg ¶
type Msg struct { Subject string Reply string Header Header Data []byte Sub *Subscription // contains filtered or unexported fields }
Msg represents a message delivered by NATS. This structure is used by Subscribers and PublishMsg().
Types of Acknowledgements ¶
In case using JetStream, there are multiple ways to ack a Msg:
// Acknowledgement that a message has been processed. msg.Ack() // Negatively acknowledges a message. msg.Nak() // Terminate a message so that it is not redelivered further. msg.Term() // Signal the server that the message is being worked on and reset redelivery timer. msg.InProgress()
func (*Msg) Ack ¶
Ack acknowledges a message. This tells the server that the message was successfully processed and it can move on to the next message.
func (*Msg) AckSync ¶
AckSync is the synchronous version of Ack. This indicates successful message processing.
Example ¶
nc, _ := nats.Connect("localhost") js, _ := nc.JetStream() // Set custom timeout for a JetStream API request. js.AddStream(&nats.StreamConfig{ Name: "FOO", Subjects: []string{"foo"}, }) sub, _ := js.SubscribeSync("foo") msg, _ := sub.NextMsg(2 * time.Second) // Wait for ack of an ack. msg.AckSync()
Output:
func (*Msg) InProgress ¶
InProgress tells the server that this message is being worked on. It resets the redelivery timer on the server.
func (*Msg) Metadata ¶
func (m *Msg) Metadata() (*MsgMetadata, error)
Metadata retrieves the metadata from a JetStream message. This method will return an error for non-JetStream Msgs.
Example ¶
When a message has been delivered by JetStream, it will be possible to access some of its metadata such as sequence numbers.
nc, _ := nats.Connect("localhost") js, _ := nc.JetStream() // Set custom timeout for a JetStream API request. js.AddStream(&nats.StreamConfig{ Name: "FOO", Subjects: []string{"foo"}, }) js.Publish("foo", []byte("hello")) sub, _ := js.SubscribeSync("foo") msg, _ := sub.NextMsg(2 * time.Second) // meta, _ := msg.Metadata() // Stream and Consumer sequences. fmt.Printf("Stream seq: %s:%d, Consumer seq: %s:%d\n", meta.Stream, meta.Sequence.Stream, meta.Consumer, meta.Sequence.Consumer) fmt.Printf("Pending: %d\n", meta.NumPending) fmt.Printf("Pending: %d\n", meta.NumDelivered)
Output:
func (*Msg) Nak ¶
Nak negatively acknowledges a message. This tells the server to redeliver the message. You can configure the number of redeliveries by passing nats.MaxDeliver when you Subscribe. The default is infinite redeliveries.
func (*Msg) NakWithDelay ¶
Nak negatively acknowledges a message. This tells the server to redeliver the message after the give `delay` duration. You can configure the number of redeliveries by passing nats.MaxDeliver when you Subscribe. The default is infinite redeliveries.
func (*Msg) Respond ¶
Respond allows a convenient way to respond to requests in service based subscriptions.
func (*Msg) RespondMsg ¶
RespondMsg allows a convenient way to respond to requests in service based subscriptions that might include headers
type MsgErrHandler ¶
MsgErrHandler is used to process asynchronous errors from JetStream PublishAsync. It will return the original message sent to the server for possible retransmitting and the error encountered.
type MsgHandler ¶
type MsgHandler func(msg *Msg)
MsgHandler is a callback function that processes messages delivered to asynchronous subscribers.
type MsgMetadata ¶
type MsgMetadata struct { Sequence SequencePair NumDelivered uint64 NumPending uint64 Timestamp time.Time Stream string Consumer string Domain string }
MsgMetadata is the JetStream metadata associated with received messages.
type ObjectBucketStatus ¶
type ObjectBucketStatus struct {
// contains filtered or unexported fields
}
ObjectBucketStatus represents status of a Bucket, implements ObjectStoreStatus
func (*ObjectBucketStatus) BackingStore ¶
func (s *ObjectBucketStatus) BackingStore() string
BackingStore indicates what technology is used for storage of the bucket
func (*ObjectBucketStatus) Bucket ¶
func (s *ObjectBucketStatus) Bucket() string
Bucket is the name of the bucket
func (*ObjectBucketStatus) Description ¶
func (s *ObjectBucketStatus) Description() string
Description is the description supplied when creating the bucket
func (*ObjectBucketStatus) IsCompressed ¶
func (s *ObjectBucketStatus) IsCompressed() bool
IsCompressed indicates if the data is compressed on disk
func (*ObjectBucketStatus) Metadata ¶
func (s *ObjectBucketStatus) Metadata() map[string]string
Metadata is the metadata supplied when creating the bucket
func (*ObjectBucketStatus) Replicas ¶
func (s *ObjectBucketStatus) Replicas() int
Replicas indicates how many storage replicas are kept for the data in the bucket
func (*ObjectBucketStatus) Sealed ¶
func (s *ObjectBucketStatus) Sealed() bool
Sealed indicates the stream is sealed and cannot be modified in any way
func (*ObjectBucketStatus) Size ¶
func (s *ObjectBucketStatus) Size() uint64
Size is the combined size of all data in the bucket including metadata, in bytes
func (*ObjectBucketStatus) Storage ¶
func (s *ObjectBucketStatus) Storage() StorageType
Storage indicates the underlying JetStream storage technology used to store data
func (*ObjectBucketStatus) StreamInfo ¶
func (s *ObjectBucketStatus) StreamInfo() *StreamInfo
StreamInfo is the stream info retrieved to create the status
func (*ObjectBucketStatus) TTL ¶
func (s *ObjectBucketStatus) TTL() time.Duration
TTL indicates how long objects are kept in the bucket
type ObjectInfo ¶
type ObjectInfo struct { ObjectMeta Bucket string `json:"bucket"` NUID string `json:"nuid"` Size uint64 `json:"size"` ModTime time.Time `json:"mtime"` Chunks uint32 `json:"chunks"` Digest string `json:"digest,omitempty"` Deleted bool `json:"deleted,omitempty"` }
ObjectInfo is meta plus instance information.
type ObjectLink ¶
type ObjectLink struct { // Bucket is the name of the other object store. Bucket string `json:"bucket"` // Name can be used to link to a single object. // If empty means this is a link to the whole store, like a directory. Name string `json:"name,omitempty"` }
ObjectLink is used to embed links to other buckets and objects.
type ObjectMeta ¶
type ObjectMeta struct { Name string `json:"name"` Description string `json:"description,omitempty"` Headers Header `json:"headers,omitempty"` Metadata map[string]string `json:"metadata,omitempty"` // Optional options. Opts *ObjectMetaOptions `json:"options,omitempty"` }
ObjectMeta is high level information about an object.
type ObjectMetaOptions ¶
type ObjectMetaOptions struct { Link *ObjectLink `json:"link,omitempty"` ChunkSize uint32 `json:"max_chunk_size,omitempty"` }
ObjectMetaOptions
type ObjectResult ¶
type ObjectResult interface { io.ReadCloser Info() (*ObjectInfo, error) Error() error }
ObjectResult will return the underlying stream info and also be an io.ReadCloser.
type ObjectStore ¶
type ObjectStore interface { // Put will place the contents from the reader into a new object. Put(obj *ObjectMeta, reader io.Reader, opts ...ObjectOpt) (*ObjectInfo, error) // Get will pull the named object from the object store. Get(name string, opts ...GetObjectOpt) (ObjectResult, error) // PutBytes is convenience function to put a byte slice into this object store. PutBytes(name string, data []byte, opts ...ObjectOpt) (*ObjectInfo, error) // GetBytes is a convenience function to pull an object from this object store and return it as a byte slice. GetBytes(name string, opts ...GetObjectOpt) ([]byte, error) // PutString is convenience function to put a string into this object store. PutString(name string, data string, opts ...ObjectOpt) (*ObjectInfo, error) // GetString is a convenience function to pull an object from this object store and return it as a string. GetString(name string, opts ...GetObjectOpt) (string, error) // PutFile is convenience function to put a file into this object store. PutFile(file string, opts ...ObjectOpt) (*ObjectInfo, error) // GetFile is a convenience function to pull an object from this object store and place it in a file. GetFile(name, file string, opts ...GetObjectOpt) error // GetInfo will retrieve the current information for the object. GetInfo(name string, opts ...GetObjectInfoOpt) (*ObjectInfo, error) // UpdateMeta will update the metadata for the object. UpdateMeta(name string, meta *ObjectMeta) error // Delete will delete the named object. Delete(name string) error // AddLink will add a link to another object. AddLink(name string, obj *ObjectInfo) (*ObjectInfo, error) // AddBucketLink will add a link to another object store. AddBucketLink(name string, bucket ObjectStore) (*ObjectInfo, error) // Seal will seal the object store, no further modifications will be allowed. Seal() error // Watch for changes in the underlying store and receive meta information updates. Watch(opts ...WatchOpt) (ObjectWatcher, error) // List will list all the objects in this store. List(opts ...ListObjectsOpt) ([]*ObjectInfo, error) // Status retrieves run-time status about the backing store of the bucket. Status() (ObjectStoreStatus, error) }
ObjectStore is a blob store capable of storing large objects efficiently in JetStream streams
type ObjectStoreConfig ¶
type ObjectStoreConfig struct { Bucket string `json:"bucket"` Description string `json:"description,omitempty"` TTL time.Duration `json:"max_age,omitempty"` MaxBytes int64 `json:"max_bytes,omitempty"` Storage StorageType `json:"storage,omitempty"` Replicas int `json:"num_replicas,omitempty"` Placement *Placement `json:"placement,omitempty"` // Bucket-specific metadata // NOTE: Metadata requires nats-server v2.10.0+ Metadata map[string]string `json:"metadata,omitempty"` // Enable underlying stream compression. // NOTE: Compression is supported for nats-server 2.10.0+ Compression bool `json:"compression,omitempty"` }
ObjectStoreConfig is the config for the object store.
type ObjectStoreManager ¶
type ObjectStoreManager interface { // ObjectStore will look up and bind to an existing object store instance. ObjectStore(bucket string) (ObjectStore, error) // CreateObjectStore will create an object store. CreateObjectStore(cfg *ObjectStoreConfig) (ObjectStore, error) // DeleteObjectStore will delete the underlying stream for the named object. DeleteObjectStore(bucket string) error // ObjectStoreNames is used to retrieve a list of bucket names ObjectStoreNames(opts ...ObjectOpt) <-chan string // ObjectStores is used to retrieve a list of bucket statuses ObjectStores(opts ...ObjectOpt) <-chan ObjectStoreStatus }
ObjectStoreManager creates, loads and deletes Object Stores
type ObjectStoreStatus ¶
type ObjectStoreStatus interface { // Bucket is the name of the bucket Bucket() string // Description is the description supplied when creating the bucket Description() string // TTL indicates how long objects are kept in the bucket TTL() time.Duration // Storage indicates the underlying JetStream storage technology used to store data Storage() StorageType // Replicas indicates how many storage replicas are kept for the data in the bucket Replicas() int // Sealed indicates the stream is sealed and cannot be modified in any way Sealed() bool // Size is the combined size of all data in the bucket including metadata, in bytes Size() uint64 // BackingStore provides details about the underlying storage BackingStore() string // Metadata is the user supplied metadata for the bucket Metadata() map[string]string // IsCompressed indicates if the data is compressed on disk IsCompressed() bool }
type ObjectWatcher ¶
type ObjectWatcher interface { // Updates returns a channel to read any updates to entries. Updates() <-chan *ObjectInfo // Stop will stop this watcher. Stop() error }
ObjectWatcher is what is returned when doing a watch.
type Option ¶
Option is a function on the options for a connection.
func ClientCert ¶
ClientCert is a helper option to provide the client certificate from a file. If Secure is not already set this will set it as well.
func ClientTLSConfig ¶
func ClientTLSConfig(certCB TLSCertHandler, rootCAsCB RootCAsHandler) Option
ClientTLSConfig is an Option to set the TLS configuration for secure connections. It can be used to e.g. set TLS config with cert and root CAs from memory. For simple use case of loading cert and CAs from file, ClientCert and RootCAs options are more convenient. If Secure is not already set this will set it as well.
func ClosedHandler ¶
func ClosedHandler(cb ConnHandler) Option
ClosedHandler is an Option to set the closed handler.
func Compression ¶
Compression is an Option to indicate if this connection supports compression. Currently only supported for Websocket connections.
func ConnectHandler ¶
func ConnectHandler(cb ConnHandler) Option
ConnectHandler is an Option to set the connected handler.
func CustomInboxPrefix ¶
CustomInboxPrefix configures the request + reply inbox prefix
func CustomReconnectDelay ¶
func CustomReconnectDelay(cb ReconnectDelayHandler) Option
CustomReconnectDelay is an Option to set the CustomReconnectDelayCB option. See CustomReconnectDelayCB Option for more details.
func Dialer ¶
Dialer is an Option to set the dialer which will be used when attempting to establish a connection. Deprecated: Should use CustomDialer instead.
func DisconnectErrHandler ¶
func DisconnectErrHandler(cb ConnErrHandler) Option
DisconnectErrHandler is an Option to set the disconnected error handler.
func DisconnectHandler ¶
func DisconnectHandler(cb ConnHandler) Option
DisconnectHandler is an Option to set the disconnected handler. Deprecated: Use DisconnectErrHandler.
func DiscoveredServersHandler ¶
func DiscoveredServersHandler(cb ConnHandler) Option
DiscoveredServersHandler is an Option to set the new servers handler.
func DontRandomize ¶
func DontRandomize() Option
DontRandomize is an Option to turn off randomizing the server pool.
func DrainTimeout ¶
DrainTimeout is an Option to set the timeout for draining a connection. Defaults to 30s.
func ErrorHandler ¶
func ErrorHandler(cb ErrHandler) Option
ErrorHandler is an Option to set the async error handler.
func FlusherTimeout ¶
FlusherTimeout is an Option to set the write (and flush) timeout on a connection.
func IgnoreAuthErrorAbort ¶
func IgnoreAuthErrorAbort() Option
IgnoreAuthErrorAbort opts out of the default connect behavior of aborting subsequent reconnect attempts if server returns the same auth error twice.
func InProcessServer ¶
func InProcessServer(server InProcessConnProvider) Option
InProcessServer is an Option that will try to establish a direction to a NATS server running within the process instead of dialing via TCP.
func LameDuckModeHandler ¶
func LameDuckModeHandler(cb ConnHandler) Option
LameDuckModeHandler sets the callback to invoke when the server notifies the connection that it entered lame duck mode, that is, going to gradually disconnect all its connections before shutting down. This is often used in deployments when upgrading NATS Servers.
func MaxPingsOutstanding ¶
MaxPingsOutstanding is an Option to set the maximum number of ping requests that can go unanswered by the server before closing the connection. Defaults to 2.
func MaxReconnects ¶
MaxReconnects is an Option to set the maximum number of reconnect attempts. If negative, it will never stop trying to reconnect. Defaults to 60.
func Nkey ¶
func Nkey(pubKey string, sigCB SignatureHandler) Option
Nkey will set the public Nkey and the signature callback to sign the server nonce.
func NkeyOptionFromSeed ¶
NkeyOptionFromSeed will load an nkey pair from a seed file. It will return the NKey Option and will handle signing of nonce challenges from the server. It will take care to not hold keys in memory and to wipe memory.
func NoCallbacksAfterClientClose ¶
func NoCallbacksAfterClientClose() Option
NoCallbacksAfterClientClose is an Option to disable callbacks when user code calls Close(). If close is initiated by any other condition, callbacks if any will be invoked.
func NoEcho ¶
func NoEcho() Option
NoEcho is an Option to turn off messages echoing back from a server. Note this is supported on servers >= version 1.2. Proto 1 or greater.
func NoReconnect ¶
func NoReconnect() Option
NoReconnect is an Option to turn off reconnect behavior.
func PingInterval ¶
PingInterval is an Option to set the period for client ping commands. Defaults to 2m.
func ProxyPath ¶
ProxyPath is an option for websocket connections that adds a path to connections url. This is useful when connecting to NATS behind a proxy.
func ReconnectBufSize ¶
ReconnectBufSize sets the buffer size of messages kept while busy reconnecting. Defaults to 8388608 bytes (8MB). It can be disabled by setting it to -1.
func ReconnectHandler ¶
func ReconnectHandler(cb ConnHandler) Option
ReconnectHandler is an Option to set the reconnected handler.
func ReconnectJitter ¶
ReconnectJitter is an Option to set the upper bound of a random delay added ReconnectWait. Defaults to 100ms and 1s, respectively.
func ReconnectWait ¶
ReconnectWait is an Option to set the wait time between reconnect attempts. Defaults to 2s.
func RetryOnFailedConnect ¶
RetryOnFailedConnect sets the connection in reconnecting state right away if it can't connect to a server in the initial set. See RetryOnFailedConnect option for more details.
func RootCAs ¶
RootCAs is a helper option to provide the RootCAs pool from a list of filenames. If Secure is not already set this will set it as well.
func Secure ¶
Secure is an Option to enable TLS secure connections that skip server verification by default. Pass a TLS Configuration for proper TLS. A TLS Configuration using InsecureSkipVerify should NOT be used in a production setting.
func SetCustomDialer ¶
func SetCustomDialer(dialer CustomDialer) Option
SetCustomDialer is an Option to set a custom dialer which will be used when attempting to establish a connection. If both Dialer and CustomDialer are specified, CustomDialer takes precedence.
func SkipHostLookup ¶
func SkipHostLookup() Option
SkipHostLookup is an Option to skip the host lookup when connecting to a server.
func SyncQueueLen ¶
SyncQueueLen will set the maximum queue len for the internal channel used for SubscribeSync(). Defaults to 65536.
func TLSHandshakeFirst ¶
func TLSHandshakeFirst() Option
TLSHandshakeFirst is an Option to perform the TLS handshake first, that is before receiving the INFO protocol. This requires the server to also be configured with such option, otherwise the connection will fail.
func Token ¶
Token is an Option to set the token to use when a token is not included directly in the URLs and when a token handler is not provided.
func TokenHandler ¶
func TokenHandler(cb AuthTokenHandler) Option
TokenHandler is an Option to set the token handler to use when a token is not included directly in the URLs and when a token is not set.
func UseOldRequestStyle ¶
func UseOldRequestStyle() Option
UseOldRequestStyle is an Option to force usage of the old Request style.
func UserCredentials ¶
UserCredentials is a convenience function that takes a filename for a user's JWT and a filename for the user's private Nkey seed.
func UserInfo ¶
UserInfo is an Option to set the username and password to use when not included directly in the URLs.
func UserInfoHandler ¶
func UserInfoHandler(cb UserInfoCB) Option
func UserJWT ¶
func UserJWT(userCB UserJWTHandler, sigCB SignatureHandler) Option
UserJWT will set the callbacks to retrieve the user's JWT and the signature callback to sign the server nonce. This an the Nkey option are mutually exclusive.
func UserJWTAndSeed ¶
UserJWTAndSeed is a convenience function that takes the JWT and seed values as strings.
type Options ¶
type Options struct { // Url represents a single NATS server url to which the client // will be connecting. If the Servers option is also set, it // then becomes the first server in the Servers array. Url string // InProcessServer represents a NATS server running within the // same process. If this is set then we will attempt to connect // to the server directly rather than using external TCP conns. InProcessServer InProcessConnProvider // Servers is a configured set of servers which this client // will use when attempting to connect. Servers []string // NoRandomize configures whether we will randomize the // server pool. NoRandomize bool // NoEcho configures whether the server will echo back messages // that are sent on this connection if we also have matching subscriptions. // Note this is supported on servers >= version 1.2. Proto 1 or greater. NoEcho bool // Name is an optional name label which will be sent to the server // on CONNECT to identify the client. Name string // Verbose signals the server to send an OK ack for commands // successfully processed by the server. Verbose bool // Pedantic signals the server whether it should be doing further // validation of subjects. Pedantic bool // Secure enables TLS secure connections that skip server // verification by default. NOT RECOMMENDED. Secure bool // TLSConfig is a custom TLS configuration to use for secure // transports. TLSConfig *tls.Config // TLSCertCB is used to fetch and return custom tls certificate. TLSCertCB TLSCertHandler // TLSHandshakeFirst is used to instruct the library perform // the TLS handshake right after the connect and before receiving // the INFO protocol from the server. If this option is enabled // but the server is not configured to perform the TLS handshake // first, the connection will fail. TLSHandshakeFirst bool // RootCAsCB is used to fetch and return a set of root certificate // authorities that clients use when verifying server certificates. RootCAsCB RootCAsHandler // AllowReconnect enables reconnection logic to be used when we // encounter a disconnect from the current server. AllowReconnect bool // MaxReconnect sets the number of reconnect attempts that will be // tried before giving up. If negative, then it will never give up // trying to reconnect. // Defaults to 60. MaxReconnect int // ReconnectWait sets the time to backoff after attempting a reconnect // to a server that we were already connected to previously. // Defaults to 2s. ReconnectWait time.Duration // CustomReconnectDelayCB is invoked after the library tried every // URL in the server list and failed to reconnect. It passes to the // user the current number of attempts. This function returns the // amount of time the library will sleep before attempting to reconnect // again. It is strongly recommended that this value contains some // jitter to prevent all connections to attempt reconnecting at the same time. CustomReconnectDelayCB ReconnectDelayHandler // ReconnectJitter sets the upper bound for a random delay added to // ReconnectWait during a reconnect when no TLS is used. // Defaults to 100ms. ReconnectJitter time.Duration // ReconnectJitterTLS sets the upper bound for a random delay added to // ReconnectWait during a reconnect when TLS is used. // Defaults to 1s. ReconnectJitterTLS time.Duration // Timeout sets the timeout for a Dial operation on a connection. // Defaults to 2s. Timeout time.Duration // DrainTimeout sets the timeout for a Drain Operation to complete. // Defaults to 30s. DrainTimeout time.Duration // FlusherTimeout is the maximum time to wait for write operations // to the underlying connection to complete (including the flusher loop). // Defaults to 1m. FlusherTimeout time.Duration // PingInterval is the period at which the client will be sending ping // commands to the server, disabled if 0 or negative. // Defaults to 2m. PingInterval time.Duration // MaxPingsOut is the maximum number of pending ping commands that can // be awaiting a response before raising an ErrStaleConnection error. // Defaults to 2. MaxPingsOut int // ClosedCB sets the closed handler that is called when a client will // no longer be connected. ClosedCB ConnHandler // DisconnectedCB sets the disconnected handler that is called // whenever the connection is disconnected. // Will not be called if DisconnectedErrCB is set // Deprecated. Use DisconnectedErrCB which passes error that caused // the disconnect event. DisconnectedCB ConnHandler // DisconnectedErrCB sets the disconnected error handler that is called // whenever the connection is disconnected. // Disconnected error could be nil, for instance when user explicitly closes the connection. // DisconnectedCB will not be called if DisconnectedErrCB is set DisconnectedErrCB ConnErrHandler // ConnectedCB sets the connected handler called when the initial connection // is established. It is not invoked on successful reconnects - for reconnections, // use ReconnectedCB. ConnectedCB can be used in conjunction with RetryOnFailedConnect // to detect whether the initial connect was successful. ConnectedCB ConnHandler // ReconnectedCB sets the reconnected handler called whenever // the connection is successfully reconnected. ReconnectedCB ConnHandler // DiscoveredServersCB sets the callback that is invoked whenever a new // server has joined the cluster. DiscoveredServersCB ConnHandler // AsyncErrorCB sets the async error handler (e.g. slow consumer errors) AsyncErrorCB ErrHandler // ReconnectBufSize is the size of the backing bufio during reconnect. // Once this has been exhausted publish operations will return an error. // Defaults to 8388608 bytes (8MB). ReconnectBufSize int // SubChanLen is the size of the buffered channel used between the socket // Go routine and the message delivery for SyncSubscriptions. // NOTE: This does not affect AsyncSubscriptions which are // dictated by PendingLimits() // Defaults to 65536. SubChanLen int // UserJWT sets the callback handler that will fetch a user's JWT. UserJWT UserJWTHandler // Nkey sets the public nkey that will be used to authenticate // when connecting to the server. UserJWT and Nkey are mutually exclusive // and if defined, UserJWT will take precedence. Nkey string // SignatureCB designates the function used to sign the nonce // presented from the server. SignatureCB SignatureHandler // User sets the username to be used when connecting to the server. User string // Password sets the password to be used when connecting to a server. Password string // UserInfo sets the callback handler that will fetch the username and password. UserInfo UserInfoCB // Token sets the token to be used when connecting to a server. Token string // TokenHandler designates the function used to generate the token to be used when connecting to a server. TokenHandler AuthTokenHandler // Dialer allows a custom net.Dialer when forming connections. // Deprecated: should use CustomDialer instead. Dialer *net.Dialer // CustomDialer allows to specify a custom dialer (not necessarily // a *net.Dialer). CustomDialer CustomDialer // UseOldRequestStyle forces the old method of Requests that utilize // a new Inbox and a new Subscription for each request. UseOldRequestStyle bool // NoCallbacksAfterClientClose allows preventing the invocation of // callbacks after Close() is called. Client won't receive notifications // when Close is invoked by user code. Default is to invoke the callbacks. NoCallbacksAfterClientClose bool // LameDuckModeHandler sets the callback to invoke when the server notifies // the connection that it entered lame duck mode, that is, going to // gradually disconnect all its connections before shutting down. This is // often used in deployments when upgrading NATS Servers. LameDuckModeHandler ConnHandler // RetryOnFailedConnect sets the connection in reconnecting state right // away if it can't connect to a server in the initial set. The // MaxReconnect and ReconnectWait options are used for this process, // similarly to when an established connection is disconnected. // If a ReconnectHandler is set, it will be invoked on the first // successful reconnect attempt (if the initial connect fails), // and if a ClosedHandler is set, it will be invoked if // it fails to connect (after exhausting the MaxReconnect attempts). RetryOnFailedConnect bool // For websocket connections, indicates to the server that the connection // supports compression. If the server does too, then data will be compressed. Compression bool // For websocket connections, adds a path to connections url. // This is useful when connecting to NATS behind a proxy. ProxyPath string // InboxPrefix allows the default _INBOX prefix to be customized InboxPrefix string // IgnoreAuthErrorAbort - if set to true, client opts out of the default connect behavior of aborting // subsequent reconnect attempts if server returns the same auth error twice (regardless of reconnect policy). IgnoreAuthErrorAbort bool // SkipHostLookup skips the DNS lookup for the server hostname. SkipHostLookup bool }
Options can be used to create a customized connection.
func GetDefaultOptions ¶
func GetDefaultOptions() Options
GetDefaultOptions returns default configuration options for the client.
type PeerInfo ¶
type PeerInfo struct { Name string `json:"name"` Current bool `json:"current"` Offline bool `json:"offline,omitempty"` Active time.Duration `json:"active"` Lag uint64 `json:"lag,omitempty"` }
PeerInfo shows information about all the peers in the cluster that are supporting the stream or consumer.
type PubAck ¶
type PubAck struct { Stream string `json:"stream"` Sequence uint64 `json:"seq"` Duplicate bool `json:"duplicate,omitempty"` Domain string `json:"domain,omitempty"` }
PubAck is an ack received after successfully publishing a message.
type PubAckFuture ¶
type PubAckFuture interface { // Ok returns a receive only channel that can be used to get a PubAck. Ok() <-chan *PubAck // Err returns a receive only channel that can be used to get the error from an async publish. Err() <-chan error // Msg returns the message that was sent to the server. Msg() *Msg }
PubAckFuture is a future for a PubAck.
type PubOpt ¶
type PubOpt interface {
// contains filtered or unexported methods
}
PubOpt configures options for publishing JetStream messages.
Example ¶
nc, err := nats.Connect("localhost") if err != nil { log.Fatal(err) } // Create JetStream context to produce/consumer messages that will be persisted. js, err := nc.JetStream() if err != nil { log.Fatal(err) } // Create stream to persist messages published on 'foo'. js.AddStream(&nats.StreamConfig{ Name: "FOO", Subjects: []string{"foo"}, }) // Publish is synchronous by default, and waits for a PubAck response. js.Publish("foo", []byte("Hello JS!")) // Publish with a custom timeout. js.Publish("foo", []byte("Hello JS!"), nats.AckWait(500*time.Millisecond)) // Publish with a context. ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() js.Publish("foo", []byte("Hello JS!"), nats.Context(ctx)) // Publish and assert the expected stream name. js.Publish("foo", []byte("Hello JS!"), nats.ExpectStream("FOO")) // Publish and assert the last sequence. js.Publish("foo", []byte("Hello JS!"), nats.ExpectLastSequence(5)) // Publish and tag the message with an ID. js.Publish("foo", []byte("Hello JS!"), nats.MsgId("foo:6")) // Publish and assert the last msg ID. js.Publish("foo", []byte("Hello JS!"), nats.ExpectLastMsgId("foo:6"))
Output:
func ExpectLastMsgId ¶
ExpectLastMsgId sets the expected last msgId in the response from the publish.
func ExpectLastSequence ¶
ExpectLastSequence sets the expected sequence in the response from the publish.
func ExpectLastSequencePerSubject ¶
ExpectLastSequencePerSubject sets the expected sequence per subject in the response from the publish.
func ExpectStream ¶
ExpectStream sets the expected stream to respond from the publish.
func RetryAttempts ¶
RetryAttempts sets the retry number of attempts when ErrNoResponders is encountered.
type PullHeartbeat ¶
type PullMaxBytes ¶
type PullMaxBytes int
PullMaxBytes defines the max bytes allowed for a fetch request.
type PullOpt ¶
type PullOpt interface {
// contains filtered or unexported methods
}
PullOpt are the options that can be passed when pulling a batch of messages.
Example ¶
nc, err := nats.Connect("localhost") if err != nil { log.Fatal(err) } // Create JetStream context to produce/consumer messages that will be persisted. js, err := nc.JetStream() if err != nil { log.Fatal(err) } // Create stream to persist messages published on 'foo'. js.AddStream(&nats.StreamConfig{ Name: "FOO", Subjects: []string{"foo"}, }) // Publish is synchronous by default, and waits for a PubAck response. js.Publish("foo", []byte("Hello JS!")) sub, _ := js.PullSubscribe("foo", "wq") // Pull one message, msgs, _ := sub.Fetch(1, nats.MaxWait(2*time.Second)) for _, msg := range msgs { msg.Ack() } // Using a context to timeout waiting for a message. ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() msgs, _ = sub.Fetch(1, nats.Context(ctx)) for _, msg := range msgs { msg.Ack() }
Output:
type RawStreamMsg ¶
type RawStreamMsg struct { Subject string Sequence uint64 Header Header Data []byte Time time.Time }
RawStreamMsg is a raw message stored in JetStream.
type RePublish ¶
type RePublish struct { Source string `json:"src,omitempty"` Destination string `json:"dest"` HeadersOnly bool `json:"headers_only,omitempty"` }
RePublish is for republishing messages once committed to a stream. The original subject cis remapped from the subject pattern to the destination pattern.
type ReconnectDelayHandler ¶
ReconnectDelayHandler is used to get from the user the desired delay the library should pause before attempting to reconnect again. Note that this is invoked after the library tried the whole list of URLs and failed to reconnect.
type ReplayPolicy ¶
type ReplayPolicy int
ReplayPolicy determines how the consumer should replay messages it already has queued in the stream.
const ( // ReplayInstantPolicy will replay messages as fast as possible. ReplayInstantPolicy ReplayPolicy = iota // ReplayOriginalPolicy will maintain the same timing as the messages were received. ReplayOriginalPolicy )
func (ReplayPolicy) MarshalJSON ¶
func (p ReplayPolicy) MarshalJSON() ([]byte, error)
func (*ReplayPolicy) UnmarshalJSON ¶
func (p *ReplayPolicy) UnmarshalJSON(data []byte) error
type RetentionPolicy ¶
type RetentionPolicy int
RetentionPolicy determines how messages in a set are retained.
const ( // LimitsPolicy (default) means that messages are retained until any given limit is reached. // This could be one of MaxMsgs, MaxBytes, or MaxAge. LimitsPolicy RetentionPolicy = iota // InterestPolicy specifies that when all known observables have acknowledged a message it can be removed. InterestPolicy // WorkQueuePolicy specifies that when the first worker or subscriber acknowledges the message it can be removed. WorkQueuePolicy )
func (RetentionPolicy) MarshalJSON ¶
func (rp RetentionPolicy) MarshalJSON() ([]byte, error)
func (RetentionPolicy) String ¶
func (rp RetentionPolicy) String() string
func (*RetentionPolicy) UnmarshalJSON ¶
func (rp *RetentionPolicy) UnmarshalJSON(data []byte) error
type RootCAsHandler ¶
RootCAsHandler is used to fetch and return a set of root certificate authorities that clients use when verifying server certificates.
type SequenceInfo ¶
type SequenceInfo struct { Consumer uint64 `json:"consumer_seq"` Stream uint64 `json:"stream_seq"` Last *time.Time `json:"last_active,omitempty"` }
SequenceInfo has both the consumer and the stream sequence and last activity.
type SequencePair ¶
type SequencePair struct { Consumer uint64 `json:"consumer_seq"` Stream uint64 `json:"stream_seq"` }
SequencePair includes the consumer and stream sequence info from a JetStream consumer.
type SignatureHandler ¶
SignatureHandler is used to sign a nonce from the server while authenticating with nkeys. The user should sign the nonce and return the raw signature. The client will base64 encode this to send to the server.
type Statistics ¶
type Statistics struct { InMsgs uint64 OutMsgs uint64 InBytes uint64 OutBytes uint64 Reconnects uint64 }
Tracks various stats received and sent on this connection, including counts for messages and bytes.
type StorageType ¶
type StorageType int
StorageType determines how messages are stored for retention.
const ( // FileStorage specifies on disk storage. It's the default. FileStorage StorageType = iota // MemoryStorage specifies in memory only. MemoryStorage )
func (StorageType) MarshalJSON ¶
func (st StorageType) MarshalJSON() ([]byte, error)
func (StorageType) String ¶
func (st StorageType) String() string
func (*StorageType) UnmarshalJSON ¶
func (st *StorageType) UnmarshalJSON(data []byte) error
type StoreCompression ¶
type StoreCompression uint8
const ( NoCompression StoreCompression = iota S2Compression )
func (StoreCompression) MarshalJSON ¶
func (alg StoreCompression) MarshalJSON() ([]byte, error)
func (StoreCompression) String ¶
func (alg StoreCompression) String() string
func (*StoreCompression) UnmarshalJSON ¶
func (alg *StoreCompression) UnmarshalJSON(b []byte) error
type StreamAlternate ¶
type StreamAlternate struct { Name string `json:"name"` Domain string `json:"domain,omitempty"` Cluster string `json:"cluster"` }
StreamAlternate is an alternate stream represented by a mirror.
type StreamConfig ¶
type StreamConfig struct { // Name is the name of the stream. It is required and must be unique // across the JetStream account. // // Name Names cannot contain whitespace, ., *, >, path separators // (forward or backwards slash), and non-printable characters. Name string `json:"name"` // Description is an optional description of the stream. Description string `json:"description,omitempty"` // Subjects is a list of subjects that the stream is listening on. // Wildcards are supported. Subjects cannot be set if the stream is // created as a mirror. Subjects []string `json:"subjects,omitempty"` // Retention defines the message retention policy for the stream. // Defaults to LimitsPolicy. Retention RetentionPolicy `json:"retention"` // MaxConsumers specifies the maximum number of consumers allowed for // the stream. MaxConsumers int `json:"max_consumers"` // MaxMsgs is the maximum number of messages the stream will store. // After reaching the limit, stream adheres to the discard policy. // If not set, server default is -1 (unlimited). MaxMsgs int64 `json:"max_msgs"` // MaxBytes is the maximum total size of messages the stream will store. // After reaching the limit, stream adheres to the discard policy. // If not set, server default is -1 (unlimited). MaxBytes int64 `json:"max_bytes"` // Discard defines the policy for handling messages when the stream // reaches its limits in terms of number of messages or total bytes. Discard DiscardPolicy `json:"discard"` // DiscardNewPerSubject is a flag to enable discarding new messages per // subject when limits are reached. Requires DiscardPolicy to be // DiscardNew and the MaxMsgsPerSubject to be set. DiscardNewPerSubject bool `json:"discard_new_per_subject,omitempty"` // MaxAge is the maximum age of messages that the stream will retain. MaxAge time.Duration `json:"max_age"` // MaxMsgsPerSubject is the maximum number of messages per subject that // the stream will retain. MaxMsgsPerSubject int64 `json:"max_msgs_per_subject"` // MaxMsgSize is the maximum size of any single message in the stream. MaxMsgSize int32 `json:"max_msg_size,omitempty"` // Storage specifies the type of storage backend used for the stream // (file or memory). Storage StorageType `json:"storage"` // Replicas is the number of stream replicas in clustered JetStream. // Defaults to 1, maximum is 5. Replicas int `json:"num_replicas"` // NoAck is a flag to disable acknowledging messages received by this // stream. // // If set to true, publish methods from the JetStream client will not // work as expected, since they rely on acknowledgements. Core NATS // publish methods should be used instead. Note that this will make // message delivery less reliable. NoAck bool `json:"no_ack,omitempty"` // Duplicates is the window within which to track duplicate messages. // If not set, server default is 2 minutes. Duplicates time.Duration `json:"duplicate_window,omitempty"` // Placement is used to declare where the stream should be placed via // tags and/or an explicit cluster name. Placement *Placement `json:"placement,omitempty"` // Mirror defines the configuration for mirroring another stream. Mirror *StreamSource `json:"mirror,omitempty"` // Sources is a list of other streams this stream sources messages from. Sources []*StreamSource `json:"sources,omitempty"` // Sealed streams do not allow messages to be published or deleted via limits or API, // sealed streams can not be unsealed via configuration update. Can only // be set on already created streams via the Update API. Sealed bool `json:"sealed,omitempty"` // DenyDelete restricts the ability to delete messages from a stream via // the API. Defaults to false. DenyDelete bool `json:"deny_delete,omitempty"` // DenyPurge restricts the ability to purge messages from a stream via // the API. Defaults to false. DenyPurge bool `json:"deny_purge,omitempty"` // AllowRollup allows the use of the Nats-Rollup header to replace all // contents of a stream, or subject in a stream, with a single new // message. AllowRollup bool `json:"allow_rollup_hdrs,omitempty"` // Compression specifies the message storage compression algorithm. // Defaults to NoCompression. Compression StoreCompression `json:"compression"` // FirstSeq is the initial sequence number of the first message in the // stream. FirstSeq uint64 `json:"first_seq,omitempty"` // SubjectTransform allows applying a transformation to matching // messages' subjects. SubjectTransform *SubjectTransformConfig `json:"subject_transform,omitempty"` // RePublish allows immediate republishing a message to the configured // subject after it's stored. RePublish *RePublish `json:"republish,omitempty"` // AllowDirect enables direct access to individual messages using direct // get API. Defaults to false. AllowDirect bool `json:"allow_direct"` // MirrorDirect enables direct access to individual messages from the // origin stream using direct get API. Defaults to false. MirrorDirect bool `json:"mirror_direct"` // ConsumerLimits defines limits of certain values that consumers can // set, defaults for those who don't set these settings ConsumerLimits StreamConsumerLimits `json:"consumer_limits,omitempty"` // Metadata is a set of application-defined key-value pairs for // associating metadata on the stream. This feature requires nats-server // v2.10.0 or later. Metadata map[string]string `json:"metadata,omitempty"` // Template identifies the template that manages the Stream. Deprecated: // This feature is no longer supported. Template string `json:"template_owner,omitempty"` }
StreamConfig will determine the properties for a stream. There are sensible defaults for most. If no subjects are given the name will be used as the only subject.
type StreamConsumerLimits ¶
type StreamConsumerLimits struct { InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"` MaxAckPending int `json:"max_ack_pending,omitempty"` }
StreamConsumerLimits are the limits for a consumer on a stream. These can be overridden on a per consumer basis.
type StreamInfo ¶
type StreamInfo struct { Config StreamConfig `json:"config"` Created time.Time `json:"created"` State StreamState `json:"state"` Cluster *ClusterInfo `json:"cluster,omitempty"` Mirror *StreamSourceInfo `json:"mirror,omitempty"` Sources []*StreamSourceInfo `json:"sources,omitempty"` Alternates []*StreamAlternate `json:"alternates,omitempty"` }
StreamInfo shows config and current state for this stream.
type StreamInfoRequest ¶
type StreamInfoRequest struct { // DeletedDetails when true includes information about deleted messages DeletedDetails bool `json:"deleted_details,omitempty"` // SubjectsFilter when set, returns information on the matched subjects SubjectsFilter string `json:"subjects_filter,omitempty"` // contains filtered or unexported fields }
StreamInfoRequest contains additional option to return
type StreamPurgeRequest ¶
type StreamPurgeRequest struct { // Purge up to but not including sequence. Sequence uint64 `json:"seq,omitempty"` // Subject to match against messages for the purge command. Subject string `json:"filter,omitempty"` // Number of messages to keep. Keep uint64 `json:"keep,omitempty"` }
StreamPurgeRequest is optional request information to the purge API.
type StreamSource ¶
type StreamSource struct { Name string `json:"name"` OptStartSeq uint64 `json:"opt_start_seq,omitempty"` OptStartTime *time.Time `json:"opt_start_time,omitempty"` FilterSubject string `json:"filter_subject,omitempty"` SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"` External *ExternalStream `json:"external,omitempty"` Domain string `json:"-"` }
StreamSource dictates how streams can source from other streams.
type StreamSourceInfo ¶
type StreamSourceInfo struct { Name string `json:"name"` Lag uint64 `json:"lag"` Active time.Duration `json:"active"` External *ExternalStream `json:"external"` Error *APIError `json:"error"` FilterSubject string `json:"filter_subject,omitempty"` SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"` }
StreamSourceInfo shows information about an upstream stream source.
type StreamState ¶
type StreamState struct { Msgs uint64 `json:"messages"` Bytes uint64 `json:"bytes"` FirstSeq uint64 `json:"first_seq"` FirstTime time.Time `json:"first_ts"` LastSeq uint64 `json:"last_seq"` LastTime time.Time `json:"last_ts"` Consumers int `json:"consumer_count"` Deleted []uint64 `json:"deleted"` NumDeleted int `json:"num_deleted"` NumSubjects uint64 `json:"num_subjects"` Subjects map[string]uint64 `json:"subjects"` }
StreamState is information about the given stream.
type SubOpt ¶
type SubOpt interface {
// contains filtered or unexported methods
}
SubOpt configures options for subscribing to JetStream consumers.
Example ¶
nc, err := nats.Connect("localhost") if err != nil { log.Fatal(err) } // Create JetStream context to produce/consumer messages that will be persisted. js, err := nc.JetStream() if err != nil { log.Fatal(err) } // Auto-ack each individual message. js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }) // Auto-ack current sequence and all below. js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.AckAll()) // Auto-ack each individual message. js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.AckExplicit()) // Acks are not required. js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.AckNone()) // Manually acknowledge messages. js.Subscribe("foo", func(msg *nats.Msg) { msg.Ack() }, nats.ManualAck()) // Bind to an existing stream. sub, _ := js.SubscribeSync("origin", nats.BindStream("m1")) msg, _ := sub.NextMsg(2 * time.Second) msg.Ack() // Deliver all messages from the beginning. js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.DeliverAll()) // Deliver messages starting from the last one. js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.DeliverLast()) // Deliver only new messages that arrive after subscription. js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.DeliverNew()) // Create durable consumer FOO, if it doesn't exist. js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.Durable("FOO")) // Create consumer on Foo with flow control and heartbeats. js.SubscribeSync("foo", // Redeliver after 30s nats.AckWait(30*time.Second), // Redeliver only once nats.MaxDeliver(1), // Activate Flow control algorithm from the server. nats.EnableFlowControl(), // Track heartbeats from the server for missed sequences. nats.IdleHeartbeat(500*time.Millisecond), ) // Set the allowable number of outstanding acks. js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.MaxAckPending(5)) // Set the number of redeliveries for a message. js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.MaxDeliver(5)) // Set the number the max inflight pull requests. js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.PullMaxWaiting(5)) // Set the number the max inflight pull requests. js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.PullMaxWaiting(5)) // Set the rate limit on a push consumer. js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.RateLimit(1024)) // Replay messages at original speed, instead of as fast as possible. js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.ReplayOriginal()) // Start delivering messages at a given sequence. js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.StartSequence(10)) // Start delivering messages at a given time. js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.StartTime(time.Now().Add(-2*time.Hour))) // Start delivering messages with delay based on BackOff array of time durations. js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.ManualAck(), nats.MaxDeliver(2), nats.BackOff([]time.Duration{50 * time.Millisecond, 250 * time.Millisecond})) // Set consumer replicas count for a durable while subscribing. js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.Durable("FOO"), nats.ConsumerReplicas(1)) // Force memory storage while subscribing. js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.Durable("FOO"), nats.ConsumerMemoryStorage()) // Skip consumer lookup when using explicit consumer name js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.Durable("FOO"), nats.SkipConsumerLookup()) // Use multiple subject filters. js.Subscribe("", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.Durable("FOO"), nats.ConsumerFilterSubjects("foo", "bar"), nats.BindStream("test_stream"))
Output:
func AckAll ¶
func AckAll() SubOpt
AckAll when acking a sequence number, this implicitly acks all sequences below this one as well.
func BackOff ¶
BackOff is an array of time durations that represent the time to delay based on delivery count.
func Bind ¶
Bind binds a subscription to an existing consumer from a stream without attempting to create. The first argument is the stream name and the second argument will be the consumer name.
func BindStream ¶
BindStream binds a consumer to a stream explicitly based on a name. When a stream name is not specified, the library uses the subscribe subject as a way to find the stream name. It is done by making a request to the server to get list of stream names that have a filter for this subject. If the returned list contains a single stream, then this stream name will be used, otherwise the `ErrNoMatchingStream` is returned. To avoid the stream lookup, provide the stream name with this function. See also `Bind()`.
func ConsumerFilterSubjects ¶
ConsumerFilterSubjects can be used to set multiple subject filters on the consumer. It has to be used in conjunction with nats.BindStream and with empty 'subject' parameter.
func ConsumerMemoryStorage ¶
func ConsumerMemoryStorage() SubOpt
ConsumerMemoryStorage sets the memory storage to true for a consumer.
func ConsumerName ¶
ConsumerName sets the name for a consumer.
func ConsumerReplicas ¶
ConsumerReplicas sets the number of replica count for a consumer.
func DeliverAll ¶
func DeliverAll() SubOpt
DeliverAll will configure a Consumer to receive all the messages from a Stream.
func DeliverLast ¶
func DeliverLast() SubOpt
DeliverLast configures a Consumer to receive messages starting with the latest one.
func DeliverLastPerSubject ¶
func DeliverLastPerSubject() SubOpt
DeliverLastPerSubject configures a Consumer to receive messages starting with the latest one for each filtered subject.
func DeliverNew ¶
func DeliverNew() SubOpt
DeliverNew configures a Consumer to receive messages published after the subscription.
func DeliverSubject ¶
DeliverSubject specifies the JetStream consumer deliver subject.
This option is used only in situations where the consumer does not exist and a creation request is sent to the server. If not provided, an inbox will be selected. If a consumer exists, then the NATS subscription will be created on the JetStream consumer's DeliverSubject, not necessarily this subject.
func Description ¶
Description will set the description for the created consumer.
func Durable ¶
Durable defines the consumer name for JetStream durable subscribers. This function will return ErrInvalidConsumerName if the name contains any dot ".".
func EnableFlowControl ¶
func EnableFlowControl() SubOpt
EnableFlowControl enables flow control for a push based consumer.
func HeadersOnly ¶
func HeadersOnly() SubOpt
HeadersOnly() will instruct the consumer to only deliver headers and no payloads.
func IdleHeartbeat ¶
IdleHeartbeat enables push based consumers to have idle heartbeats delivered. For pull consumers, idle heartbeat has to be set on each [Fetch] call.
func InactiveThreshold ¶
InactiveThreshold indicates how long the server should keep a consumer after detecting a lack of activity. In NATS Server 2.8.4 and earlier, this option only applies to ephemeral consumers. In NATS Server 2.9.0 and later, this option applies to both ephemeral and durable consumers, allowing durable consumers to also be deleted automatically after the inactivity threshold has passed.
func ManualAck ¶
func ManualAck() SubOpt
ManualAck disables auto ack functionality for async subscriptions.
func MaxAckPending ¶
MaxAckPending sets the number of outstanding acks that are allowed before message delivery is halted.
func MaxDeliver ¶
MaxDeliver sets the number of redeliveries for a message.
func MaxRequestBatch ¶
MaxRequestBatch sets the maximum pull consumer batch size that a Fetch() can request.
func MaxRequestExpires ¶
MaxRequestExpires sets the maximum pull consumer request expiration that a Fetch() can request (using the Fetch's timeout value).
func MaxRequestMaxBytes ¶
MaxRequesMaxBytes sets the maximum pull consumer request bytes that a Fetch() can receive.
func OrderedConsumer ¶
func OrderedConsumer() SubOpt
OrderedConsumer will create a FIFO direct/ephemeral consumer for in order delivery of messages. There are no redeliveries and no acks, and flow control and heartbeats will be added but will be taken care of without additional client code.
func PullMaxWaiting ¶
PullMaxWaiting defines the max inflight pull requests.
func ReplayInstant ¶
func ReplayInstant() SubOpt
ReplayInstant replays the messages as fast as possible.
func ReplayOriginal ¶
func ReplayOriginal() SubOpt
ReplayOriginal replays the messages at the original speed.
func SkipConsumerLookup ¶
func SkipConsumerLookup() SubOpt
SkipConsumerLookup will omit looking up consumer when Bind, Durable or ConsumerName are provided.
NOTE: This setting may cause an existing consumer to be overwritten. Also, because consumer lookup is skipped, all consumer options like AckPolicy, DeliverSubject etc. need to be provided even if consumer already exists.
func StartSequence ¶
StartSequence configures a Consumer to receive messages from a start sequence.
type SubjectTransformConfig ¶
type SubjectTransformConfig struct { Source string `json:"src,omitempty"` Destination string `json:"dest"` }
SubjectTransformConfig is for applying a subject transform (to matching messages) before doing anything else when a new message is received.
type Subscription ¶
type Subscription struct { // Subject that represents this subscription. This can be different // than the received subject inside a Msg if this is a wildcard. Subject string // Optional queue group name. If present, all subscriptions with the // same name will form a distributed queue, and each message will // only be processed by one member of the group. Queue string // contains filtered or unexported fields }
Subscription represents interest in a given subject.
func (*Subscription) AutoUnsubscribe ¶
func (s *Subscription) AutoUnsubscribe(max int) error
AutoUnsubscribe will issue an automatic Unsubscribe that is processed by the server when max messages have been received. This can be useful when sending a request to an unknown number of subscribers.
Example ¶
nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() received, wanted, total := 0, 10, 100 sub, _ := nc.Subscribe("foo", func(_ *nats.Msg) { received++ }) sub.AutoUnsubscribe(wanted) for i := 0; i < total; i++ { nc.Publish("foo", []byte("Hello")) } nc.Flush() fmt.Printf("Received = %d", received)
Output:
func (*Subscription) ClearMaxPending ¶
func (s *Subscription) ClearMaxPending() error
ClearMaxPending resets the maximums seen so far.
func (*Subscription) ConsumerInfo ¶
func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error)
func (*Subscription) Delivered ¶
func (s *Subscription) Delivered() (int64, error)
Delivered returns the number of delivered messages for this subscription.
func (*Subscription) Drain ¶
func (s *Subscription) Drain() error
Drain will remove interest but continue callbacks until all messages have been processed.
For a JetStream subscription, if the library has created the JetStream consumer, the library will send a DeleteConsumer request to the server when the Drain operation completes. If a failure occurs when deleting the JetStream consumer, an error will be reported to the asynchronous error callback. If you do not wish the JetStream consumer to be automatically deleted, ensure that the consumer is not created by the library, which means create the consumer with AddConsumer and bind to this consumer.
func (*Subscription) Dropped ¶
func (s *Subscription) Dropped() (int, error)
Dropped returns the number of known dropped messages for this subscription. This will correspond to messages dropped by violations of PendingLimits. If the server declares the connection a SlowConsumer, this number may not be valid.
func (*Subscription) Fetch ¶
func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error)
Fetch pulls a batch of messages from a stream for a pull consumer.
func (*Subscription) FetchBatch ¶
func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, error)
FetchBatch pulls a batch of messages from a stream for a pull consumer. Unlike Subscription.Fetch, it is non blocking and returns MessageBatch, allowing to retrieve incoming messages from a channel. The returned channel is always closed after all messages for a batch have been delivered by the server - it is safe to iterate over it using range.
To avoid using default JetStream timeout as fetch expiry time, use nats.MaxWait or nats.Context (with deadline set).
This method will not return error in case of pull request expiry (even if there are no messages). Any other error encountered when receiving messages will cause FetchBatch to stop receiving new messages.
func (*Subscription) InitialConsumerPending ¶
func (sub *Subscription) InitialConsumerPending() (uint64, error)
InitialConsumerPending returns the number of messages pending to be delivered to the consumer when the subscription was created.
func (*Subscription) IsDraining ¶
func (s *Subscription) IsDraining() bool
IsDraining returns a boolean indicating whether the subscription is being drained. This will return false if the subscription has already been closed.
func (*Subscription) IsValid ¶
func (s *Subscription) IsValid() bool
IsValid returns a boolean indicating whether the subscription is still active. This will return false if the subscription has already been closed.
func (*Subscription) MaxPending ¶
func (s *Subscription) MaxPending() (int, int, error)
MaxPending returns the maximum number of queued messages and queued bytes seen so far.
func (*Subscription) NextMsg ¶
func (s *Subscription) NextMsg(timeout time.Duration) (*Msg, error)
NextMsg will return the next message available to a synchronous subscriber or block until one is available. An error is returned if the subscription is invalid (ErrBadSubscription), the connection is closed (ErrConnectionClosed), the timeout is reached (ErrTimeout), or if there were no responders (ErrNoResponders) when used in the context of a request/reply.
Example ¶
nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() sub, _ := nc.SubscribeSync("foo") m, err := sub.NextMsg(1 * time.Second) if err == nil { fmt.Printf("Received a message: %s\n", string(m.Data)) } else { fmt.Println("NextMsg timed out.") }
Output:
func (*Subscription) NextMsgWithContext ¶
func (s *Subscription) NextMsgWithContext(ctx context.Context) (*Msg, error)
NextMsgWithContext takes a context and returns the next message available to a synchronous subscriber, blocking until it is delivered or context gets canceled.
func (*Subscription) Pending ¶
func (s *Subscription) Pending() (int, int, error)
Pending returns the number of queued messages and queued bytes in the client for this subscription.
func (*Subscription) PendingLimits ¶
func (s *Subscription) PendingLimits() (int, int, error)
PendingLimits returns the current limits for this subscription. If no error is returned, a negative value indicates that the given metric is not limited.
func (*Subscription) QueuedMsgs
deprecated
func (s *Subscription) QueuedMsgs() (int, error)
Queued returns the number of queued messages in the client for this subscription.
Deprecated: Use Pending()
func (*Subscription) SetClosedHandler ¶
func (s *Subscription) SetClosedHandler(handler func(subject string))
SetClosedHandler will set the closed handler for when a subscription is closed (either unsubscribed or drained).
func (*Subscription) SetPendingLimits ¶
func (s *Subscription) SetPendingLimits(msgLimit, bytesLimit int) error
SetPendingLimits sets the limits for pending msgs and bytes for this subscription. Zero is not allowed. Any negative value means that the given metric is not limited.
func (*Subscription) StatusChanged ¶
func (s *Subscription) StatusChanged(statuses ...SubStatus) <-chan SubStatus
StatusChanged returns a channel on which given list of subscription status changes will be sent. If no status is provided, all status changes will be sent. Available statuses are SubscriptionActive, SubscriptionDraining, SubscriptionClosed, and SubscriptionSlowConsumer. The returned channel will be closed when the subscription is closed.
func (*Subscription) Type ¶
func (s *Subscription) Type() SubscriptionType
Type returns the type of Subscription.
func (*Subscription) Unsubscribe ¶
func (s *Subscription) Unsubscribe() error
Unsubscribe will remove interest in the given subject.
For a JetStream subscription, if the library has created the JetStream consumer, it will send a DeleteConsumer request to the server (if the unsubscribe itself was successful). If the delete operation fails, the error will be returned. If you do not wish the JetStream consumer to be automatically deleted, ensure that the consumer is not created by the library, which means create the consumer with AddConsumer and bind to this consumer (using the nats.Bind() option).
Example ¶
nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() sub, _ := nc.SubscribeSync("foo") // ... sub.Unsubscribe()
Output:
type TLSCertHandler ¶
type TLSCertHandler func() (tls.Certificate, error)
TLSCertHandler is used to fetch and return tls certificate.
type UserInfoCB ¶
UserInfoCB is used to pass the username and password when establishing connection.
type UserJWTHandler ¶
UserJWTHandler is used to fetch and return the account signed JWT for this user.
type WatchOpt ¶
type WatchOpt interface {
// contains filtered or unexported methods
}
func IgnoreDeletes ¶
func IgnoreDeletes() WatchOpt
IgnoreDeletes will have the key watcher not pass any deleted keys.
func IncludeHistory ¶
func IncludeHistory() WatchOpt
IncludeHistory instructs the key watcher to include historical values as well.
func MetaOnly ¶
func MetaOnly() WatchOpt
MetaOnly instructs the key watcher to retrieve only the entry meta data, not the entry value
func UpdatesOnly ¶
func UpdatesOnly() WatchOpt
UpdatesOnly instructs the key watcher to only include updates on values (without latest values when started).