Documentation ¶
Overview ¶
Example (Handler) ¶
tele := tel.NewNull() ctx := tele.Ctx() conn, _ := nats.Connect("example.com") nConn := New(conn, WithTel(tele)) // legacy backport cbLegacy := func(ctx context.Context, sub string, data []byte) ([]byte, error) { return nil, nil } cb := func(ctx context.Context, msg *nats.Msg) error { return nil } _, _ = nConn.QueueSubscribeMW("sub", "queue", cbLegacy) _, _ = nConn.QueueSubscribeMW("sub2", "queue", cbLegacy) // sub nConn.Subscribe("sub", cb) nConn.QueueSubscribe("sub", "xxx", cb) // pub nConn.PublishWithContext(ctx, "sub", []byte("HELLO")) nConn.PublishMsgWithContext(ctx, &nats.Msg{}) nConn.PublishRequestWithContext(ctx, "sub", "reply", []byte("HELLO")) nConn.RequestWithContext(ctx, "sub", []byte("HELLO")) nConn.RequestMsgWithContext(ctx, &nats.Msg{})
Output:
Index ¶
- Constants
- func SemVersion() string
- func Version() string
- type CommonPublish
- func (c *CommonPublish) PublishMsgWithContext(ctx context.Context, msg *nats.Msg) error
- func (c *CommonPublish) PublishRequestWithContext(_ context.Context, subj, reply string, data []byte) error
- func (c *CommonPublish) PublishWithContext(ctx context.Context, subj string, data []byte) error
- func (c *CommonPublish) RequestMsgWithContext(ctx context.Context, msg *nats.Msg) (*nats.Msg, error)
- func (c *CommonPublish) RequestWithContext(ctx context.Context, subj string, data []byte) (*nats.Msg, error)
- type ConnContext
- func (c *ConnContext) BuildWrappedHandler(cb MsgHandler) nats.MsgHandler
- func (c *ConnContext) Conn() *nats.Conn
- func (c ConnContext) DefaultMiddleware() []Middleware
- func (c ConnContext) DefaultPubMiddleware() []PubMiddleware
- func (c *ConnContext) JetStream(opts ...nats.JSOpt) (*JetStreamContext, error)
- func (c ConnContext) Middleware() []Middleware
- func (c ConnContext) PubMiddleware() []PubMiddleware
- func (c *ConnContext) QueueSubscribe(subj, queue string, cb MsgHandler) (*nats.Subscription, error)
- func (c *ConnContext) QueueSubscribeMW(subj, queue string, next PostFn) (*nats.Subscription, error)
- func (c *ConnContext) Subscribe(subj string, cb MsgHandler) (*nats.Subscription, error)
- func (c *ConnContext) SubscribeMW(subj string, cb PostFn) (*nats.Subscription, error)
- type Core
- type JetStreamContext
- func (c JetStreamContext) DefaultMiddleware() []Middleware
- func (c JetStreamContext) DefaultPubMiddleware() []PubMiddleware
- func (j *JetStreamContext) JS() nats.JetStreamContext
- func (c JetStreamContext) Middleware() []Middleware
- func (c JetStreamContext) PubMiddleware() []PubMiddleware
- func (j *JetStreamContext) PullSubscribe(subj, durable string, opts ...nats.SubOpt) (*nats.Subscription, error)
- func (j *JetStreamContext) QueueSubscribe(subj, queue string, cb MsgHandler, opts ...nats.SubOpt) (*nats.Subscription, error)
- func (j *JetStreamContext) Subscribe(subj string, cb MsgHandler, opts ...nats.SubOpt) (*nats.Subscription, error)
- type Logs
- type Middleware
- type MsgHandler
- type NameFn
- type Option
- type PostFn
- type PostHook
- type PubMetric
- func (p *PubMetric) PublishMsgWithContext(ctx context.Context, msg *nats.Msg) (err error)
- func (p *PubMetric) PublishRequestWithContext(ctx context.Context, subj, reply string, data []byte) (err error)
- func (p *PubMetric) PublishWithContext(ctx context.Context, subj string, data []byte) (err error)
- func (p *PubMetric) RequestMsgWithContext(ctx context.Context, msg *nats.Msg) (resp *nats.Msg, err error)
- func (p *PubMetric) RequestWithContext(ctx context.Context, subj string, data []byte) (resp *nats.Msg, err error)
- type PubMiddleware
- type PubTrace
- func (p *PubTrace) PublishMsgWithContext(cxt context.Context, msg *nats.Msg) (err error)
- func (p *PubTrace) PublishRequestWithContext(ctx context.Context, subj, reply string, data []byte) error
- func (p *PubTrace) PublishWithContext(cxt context.Context, subj string, data []byte) (err error)
- func (p *PubTrace) RequestMsgWithContext(cxt context.Context, msg *nats.Msg) (resp *nats.Msg, err error)
- func (p *PubTrace) RequestWithContext(ctx context.Context, subj string, data []byte) (*nats.Msg, error)
- type Publish
- type Recovery
- type SubMetrics
- type SubscriptionStatMetric
- type Tracer
Examples ¶
Constants ¶
const ( Subject = attribute.Key("subject") IsError = attribute.Key("error") Kind = attribute.Key("kind_of") )
Attribute keys that can be added to a span.
const ( KindSub = "sub" KindPub = "pub" KindRequest = "request" KindRespond = "respond" )
const ( Count = "nats.count" // Incoming request count total ContentLength = "nats.content_length" // Incoming request bytes total Latency = "nats.duration" // Incoming end to end duration, microseconds SubscriptionsPendingCount = "nats.subscriptions.pending.msgs" SubscriptionsPendingBytes = "nats.subscriptions.pending.bytes" SubscriptionsDroppedMsgs = "nats.subscriptions.dropped.count" SubscriptionCountMsgs = "nats.subscriptions.send.count" )
Server NATS metrics
Variables ¶
This section is empty.
Functions ¶
func SemVersion ¶
func SemVersion() string
SemVersion is the semantic version to be supplied to tracer/meter creation.
Types ¶
type CommonPublish ¶ added in v1.3.0
type CommonPublish struct {
*nats.Conn
}
func NewCommonPublish ¶ added in v1.3.0
func NewCommonPublish(conn *nats.Conn) *CommonPublish
func (*CommonPublish) PublishMsgWithContext ¶ added in v1.3.0
func (c *CommonPublish) PublishMsgWithContext(ctx context.Context, msg *nats.Msg) error
PublishMsgWithContext publishes the Msg structure, which includes the Subject, an optional Reply and an optional Data field.
func (*CommonPublish) PublishRequestWithContext ¶ added in v1.3.0
func (c *CommonPublish) PublishRequestWithContext(_ context.Context, subj, reply string, data []byte) error
PublishRequestWithContext will perform a Publish() expecting a response on the reply subject. Use Request() for automatically waiting for a response inline.
func (*CommonPublish) PublishWithContext ¶ added in v1.3.0
PublishWithContext publishes the data argument to the given subject. The data argument is left untouched and needs to be correctly interpreted on the receiver.
func (*CommonPublish) RequestMsgWithContext ¶ added in v1.3.0
func (c *CommonPublish) RequestMsgWithContext(ctx context.Context, msg *nats.Msg) (*nats.Msg, error)
RequestMsgWithContext takes a context, a subject and payload in bytes and request expecting a single response.
func (*CommonPublish) RequestWithContext ¶ added in v1.3.0
func (c *CommonPublish) RequestWithContext(ctx context.Context, subj string, data []byte) (*nats.Msg, error)
RequestWithContext will send a request payload and deliver the response message, or an error, including a timeout if no message was received properly.
type ConnContext ¶ added in v1.3.0
ConnContext wrapper for nats.ConnContext aks mw connection approach
Features: Expose subscription stats via function overwrite
func New ¶
func New(conn *nats.Conn, opts ...Option) *ConnContext
New wraps nats Core connection with middleware functionality
func (*ConnContext) BuildWrappedHandler ¶ added in v1.3.0
func (c *ConnContext) BuildWrappedHandler(cb MsgHandler) nats.MsgHandler
BuildWrappedHandler allow to create own mw, for bach processing for example or so on
func (*ConnContext) Conn ¶ added in v1.3.0
func (c *ConnContext) Conn() *nats.Conn
Conn unwrap connection
func (ConnContext) DefaultMiddleware ¶ added in v1.3.0
func (c ConnContext) DefaultMiddleware() []Middleware
func (ConnContext) DefaultPubMiddleware ¶ added in v1.3.0
func (c ConnContext) DefaultPubMiddleware() []PubMiddleware
func (*ConnContext) JetStream ¶ added in v1.3.0
func (c *ConnContext) JetStream(opts ...nats.JSOpt) (*JetStreamContext, error)
JetStream returns a JetStreamContext wrapper for consumer
func (ConnContext) Middleware ¶ added in v1.3.0
func (c ConnContext) Middleware() []Middleware
func (ConnContext) PubMiddleware ¶ added in v1.3.0
func (c ConnContext) PubMiddleware() []PubMiddleware
func (*ConnContext) QueueSubscribe ¶ added in v1.3.0
func (c *ConnContext) QueueSubscribe(subj, queue string, cb MsgHandler) (*nats.Subscription, error)
QueueSubscribe creates an asynchronous queue subscriber on the given subject. All subscribers with the same queue name will form the queue group and only one member of the group will be selected to receive any given message asynchronously.
func (*ConnContext) QueueSubscribeMW ¶ added in v1.3.0
func (c *ConnContext) QueueSubscribeMW(subj, queue string, next PostFn) (*nats.Subscription, error)
QueueSubscribeMW mw callback function, just legacy Deprecated: just backport compatibility for PostFn legacy
func (*ConnContext) Subscribe ¶ added in v1.3.0
func (c *ConnContext) Subscribe(subj string, cb MsgHandler) (*nats.Subscription, error)
Subscribe will express interest in the given subject. The subject can have wildcards. There are two type of wildcards: * for partial, and > for full. A subscription on subject time.*.east would receive messages sent to time.us.east and time.eu.east. A subscription on subject time.us.> would receive messages sent to time.us.east and time.us.east.atlanta, while time.us.* would only match time.us.east since it can't match more than one token. Messages will be delivered to the associated MsgHandler.
func (*ConnContext) SubscribeMW ¶ added in v1.3.0
func (c *ConnContext) SubscribeMW(subj string, cb PostFn) (*nats.Subscription, error)
SubscribeMW backport compatible function for previous mw approach Deprecated: just backport compatibility for PostFn legacy
type Core ¶ added in v1.3.0
type Core struct {
// contains filtered or unexported fields
}
Core features for context
func (Core) DefaultMiddleware ¶ added in v1.3.0
func (c Core) DefaultMiddleware() []Middleware
func (Core) DefaultPubMiddleware ¶ added in v1.3.0
func (c Core) DefaultPubMiddleware() []PubMiddleware
func (Core) Middleware ¶ added in v1.3.0
func (c Core) Middleware() []Middleware
func (Core) PubMiddleware ¶ added in v1.3.0
func (c Core) PubMiddleware() []PubMiddleware
type JetStreamContext ¶ added in v1.3.0
type JetStreamContext struct { *Core // contains filtered or unexported fields }
func (JetStreamContext) DefaultMiddleware ¶ added in v1.3.0
func (c JetStreamContext) DefaultMiddleware() []Middleware
func (JetStreamContext) DefaultPubMiddleware ¶ added in v1.3.0
func (c JetStreamContext) DefaultPubMiddleware() []PubMiddleware
func (*JetStreamContext) JS ¶ added in v1.3.0
func (j *JetStreamContext) JS() nats.JetStreamContext
JS unwrap
func (JetStreamContext) Middleware ¶ added in v1.3.0
func (c JetStreamContext) Middleware() []Middleware
func (JetStreamContext) PubMiddleware ¶ added in v1.3.0
func (c JetStreamContext) PubMiddleware() []PubMiddleware
func (*JetStreamContext) PullSubscribe ¶ added in v1.3.0
func (j *JetStreamContext) PullSubscribe(subj, durable string, opts ...nats.SubOpt) (*nats.Subscription, error)
PullSubscribe creates a Subscription that can fetch messages. See important note in Subscribe(). Additionally, for an ephemeral pull consumer, the "durable" value must be set to an empty string.
Only wrap Subscription for gather stats
func (*JetStreamContext) QueueSubscribe ¶ added in v1.3.0
func (j *JetStreamContext) QueueSubscribe(subj, queue string, cb MsgHandler, opts ...nats.SubOpt) (*nats.Subscription, error)
QueueSubscribe creates a Subscription with a queue group. If no optional durable name nor binding options are specified, the queue name will be used as a durable name. See important note in Subscribe()
func (*JetStreamContext) Subscribe ¶ added in v1.3.0
func (j *JetStreamContext) Subscribe(subj string, cb MsgHandler, opts ...nats.SubOpt) (*nats.Subscription, error)
Subscribe creates an async Subscription for JetStream. The stream and consumer names can be provided with the nats.Bind() option. For creating an ephemeral (where the consumer name is picked by the server), you can provide the stream name with nats.BindStream(). If no stream name is specified, the library will attempt to figure out which stream the subscription is for. See important notes below for more details.
IMPORTANT NOTES: * If none of the options Bind() nor Durable() are specified, the library will send a request to the server to create an ephemeral JetStream consumer, which will be deleted after an Unsubscribe() or Drain(), or automatically by the server after a short period of time after the NATS subscription is gone. * If Durable() option is specified, the library will attempt to lookup a JetStream consumer with this name, and if found, will bind to it and not attempt to delete it. However, if not found, the library will send a request to create such durable JetStream consumer. Note that the library will delete the JetStream consumer after an Unsubscribe() or Drain() only if it created the durable consumer while subscribing. If the durable consumer already existed prior to subscribing it won't be deleted. * If Bind() option is provided, the library will attempt to lookup the consumer with the given name, and if successful, bind to it. If the lookup fails, then the Subscribe() call will return an error.
type Logs ¶ added in v1.3.0
type Logs struct {
// contains filtered or unexported fields
}
Logs dump some payload
func (Logs) DefaultMiddleware ¶ added in v1.3.0
func (c Logs) DefaultMiddleware() []Middleware
func (Logs) DefaultPubMiddleware ¶ added in v1.3.0
func (c Logs) DefaultPubMiddleware() []PubMiddleware
func (Logs) Middleware ¶ added in v1.3.0
func (c Logs) Middleware() []Middleware
func (Logs) PubMiddleware ¶ added in v1.3.0
func (c Logs) PubMiddleware() []PubMiddleware
type Middleware ¶ added in v1.3.0
type Middleware interface {
// contains filtered or unexported methods
}
type MsgHandler ¶ added in v1.3.0
MsgHandler our desired way to handle subscriptions ctx allow inside function continue traces or pass log attachment error return allow middleware to understand behaviour of system what has gone here, and it could change differently
type NameFn ¶ added in v1.3.0
type NameFn func(msg *nats.Msg) string
NameFn operation name description
type Option ¶
type Option interface {
// contains filtered or unexported methods
}
Option allows configuration of the httptrace Extract() and Inject() functions.
func WithDumpPayloadOnError ¶ added in v1.2.3
WithDumpPayloadOnError write dump request and response on faults
Default: true
func WithDumpRequest ¶ added in v1.2.3
WithDumpRequest dump request as plain text to log and trace i guess we can go further and perform option with encoding requests
func WithDumpResponse ¶ added in v1.2.3
WithDumpResponse dump response as plain text to log and trace
func WithNameFunction ¶ added in v1.3.0
func WithPostHook ¶
WithPostHook set (only one) where you can perform post handle operation with data provided by handler Deprecated: legacy usage only
type PostFn ¶
PostFn callback function which got new instance of tele inside ctx and msg sub + data Deprecated: legacy function, but we use it via conn wrapper: QueueSubscribeMW or SubscribeMW just for backport compatibility
type PubMetric ¶ added in v1.3.0
type PubMetric struct {
// contains filtered or unexported fields
}
PubMetric handle publish and request metrics gathering implementing PubMiddleware
func NewPubMetric ¶ added in v1.3.0
func NewPubMetric(m *metrics) *PubMetric
func (*PubMetric) PublishMsgWithContext ¶ added in v1.3.0
func (*PubMetric) PublishRequestWithContext ¶ added in v1.3.0
func (*PubMetric) PublishWithContext ¶ added in v1.3.0
func (*PubMetric) RequestMsgWithContext ¶ added in v1.3.0
type PubMiddleware ¶ added in v1.3.0
type PubMiddleware interface { Publish // contains filtered or unexported methods }
type PubTrace ¶ added in v1.3.0
type PubTrace struct {
// contains filtered or unexported fields
}
PubTrace handle trace handling implementing PubMiddleware
func NewPubTrace ¶ added in v1.3.0
func (*PubTrace) PublishMsgWithContext ¶ added in v1.3.0
func (*PubTrace) PublishRequestWithContext ¶ added in v1.3.0
func (p *PubTrace) PublishRequestWithContext(ctx context.Context, subj, reply string, data []byte) error
PublishRequestWithContext we just use PublishMsgWithContext to handle publish with reply option
func (*PubTrace) PublishWithContext ¶ added in v1.3.0
func (*PubTrace) RequestMsgWithContext ¶ added in v1.3.0
type Publish ¶ added in v1.3.0
type Publish interface { PublishWithContext(ctx context.Context, subj string, data []byte) error PublishMsgWithContext(ctx context.Context, msg *nats.Msg) error PublishRequestWithContext(ctx context.Context, subj, reply string, data []byte) error RequestWithContext(ctx context.Context, subj string, data []byte) (*nats.Msg, error) RequestMsgWithContext(ctx context.Context, msg *nats.Msg) (*nats.Msg, error) }
type Recovery ¶ added in v1.3.0
type Recovery struct{}
func NewRecovery ¶ added in v1.3.0
func NewRecovery() *Recovery
type SubMetrics ¶ added in v1.3.0
type SubMetrics struct {
// contains filtered or unexported fields
}
SubMetrics implement Middleware interface
func NewMetrics ¶ added in v1.3.0
func NewMetrics(m *metrics) *SubMetrics
type SubscriptionStatMetric ¶ added in v1.3.0
type SubscriptionStatMetric struct {
// contains filtered or unexported fields
}
SubscriptionStatMetric hook provide important subscription statistics
func NewSubscriptionStatMetrics ¶ added in v1.3.0
func NewSubscriptionStatMetrics(opts ...Option) (*SubscriptionStatMetric, error)
func (*SubscriptionStatMetric) Hook ¶ added in v1.3.0
func (s *SubscriptionStatMetric) Hook(sub *nats.Subscription, err error) (*nats.Subscription, error)
func (*SubscriptionStatMetric) Register ¶ added in v1.3.0
func (s *SubscriptionStatMetric) Register(sub ...*nats.Subscription)