Documentation
¶
Overview ¶
Example (Handler) ¶
tele := tel.NewNull() ctx := tele.Ctx() conn, _ := nats.Connect("example.com") nConn := New(WithTel(tele)).Use(conn) // legacy backport cbLegacy := func(ctx context.Context, sub string, data []byte) ([]byte, error) { return nil, nil } cb := func(ctx context.Context, msg *nats.Msg) error { return nil } _, _ = nConn.QueueSubscribeMW("sub", "queue", cbLegacy) _, _ = nConn.QueueSubscribeMW("sub2", "queue", cbLegacy) // sub _, _ = nConn.Subscribe("sub", cb) _, _ = nConn.QueueSubscribe("sub", "xxx", cb) // sync sub with wrap ourHandler := nConn.BuildWrappedHandler(func(ctx context.Context, msg *nats.Msg) error { // perform respond with possible error returning return msg.Respond([]byte("HELLO")) }) ch := make(chan *nats.Msg) _, _ = nConn.QueueSubscribeSyncWithChan("sub", "queue", ch) for msg := range ch { ourHandler(msg) } // pub _ = nConn.PublishWithContext(ctx, "sub", []byte("HELLO")) _ = nConn.PublishMsgWithContext(ctx, &nats.Msg{}) _ = nConn.PublishRequestWithContext(ctx, "sub", "reply", []byte("HELLO")) _, _ = nConn.RequestWithContext(ctx, "sub", []byte("HELLO")) _, _ = nConn.RequestMsgWithContext(ctx, &nats.Msg{})
Output:
Index ¶
- Constants
- Variables
- func ExtractAttributes(msg *nats.Msg, kind string, additional bool) []attribute.KeyValue
- func ReplyFn(ctx context.Context, msg *nats.Msg, data []byte) error
- func SemVersion() string
- func Version() string
- func WrapKindOfContext(ctx context.Context, kindOf string) context.Context
- type CommonPublish
- func (c *CommonPublish) PublishMsgWithContext(ctx context.Context, msg *nats.Msg) error
- func (c *CommonPublish) PublishRequestWithContext(ctx context.Context, subj, reply string, data []byte) error
- func (c *CommonPublish) PublishWithContext(ctx context.Context, subj string, data []byte) error
- func (c *CommonPublish) RequestMsgWithContext(ccx context.Context, m *nats.Msg) (res *nats.Msg, err error)
- func (c *CommonPublish) RequestWithContext(ctx context.Context, subj string, data []byte) (*nats.Msg, error)
- type CommonSubscribe
- func (c *CommonSubscribe) BuildWrappedHandler(next MsgHandler) nats.MsgHandler
- func (c CommonSubscribe) DefaultMiddleware() []Middleware
- func (c *CommonSubscribe) QueueSubscribe(subj, queue string, cb MsgHandler) (*nats.Subscription, error)
- func (c *CommonSubscribe) QueueSubscribeMW(subj, queue string, next PostFn) (*nats.Subscription, error)
- func (c *CommonSubscribe) QueueSubscribeSyncWithChan(subj, queue string, ch chan *nats.Msg) (*nats.Subscription, error)
- func (c *CommonSubscribe) Subscribe(subj string, cb MsgHandler) (*nats.Subscription, error)
- func (c *CommonSubscribe) SubscribeMW(subj string, cb PostFn) (*nats.Subscription, error)
- type ConnContext
- type Core
- type Interceptor
- type JetStreamContext
- func (c JetStreamContext) DefaultMiddleware() []Middleware
- func (j *JetStreamContext) JS() nats.JetStreamContext
- func (j *JetStreamContext) PullSubscribe(subj, durable string, opts ...nats.SubOpt) (*nats.Subscription, error)
- func (j *JetStreamContext) QueueSubscribe(subj, queue string, cb MsgHandler, opts ...nats.SubOpt) (*nats.Subscription, error)
- func (j *JetStreamContext) Subscribe(subj string, cb MsgHandler, opts ...nats.SubOpt) (*nats.Subscription, error)
- type Logs
- type Middleware
- type MsgHandler
- type NameFn
- type Option
- func WithDisableDefaultMiddleware() Option
- func WithDump(enable bool) Option
- func WithDumpPayloadOnError(enable bool) Option
- func WithNameFunction(fn NameFn) Option
- func WithPostHook(cb PostHook) Option
- func WithPubMiddleware(list ...Middleware) Option
- func WithReply(inject bool) Option
- func WithSubMiddleware(list ...Middleware) Option
- func WithTel(t tel.Telemetry) Option
- type PostFn
- type PostHook
- type Publish
- type Recovery
- type SubMetrics
- type Subscriber
- type SubscriptionStatMetric
- type Tracer
Examples ¶
Constants ¶
const ( KindKey = "kind_of" PayloadKey = "payload" )
const ( Subject = attribute.Key("subject") Reply = attribute.Key("reply") IsError = attribute.Key("error") Kind = attribute.Key(KindKey) Duration = attribute.Key("duration") )
Attribute keys that can be added to a span.
const ( KindUnk = "UNK" KindSub = "SUB" KindPub = "PUB" KindRequest = "REQUEST" KindRespond = "RESPOND" KindReply = "REPLY" )
const ( Count = "nats.count" // Incoming request count total ContentLength = "nats.content_length" // Incoming request bytes total Latency = "nats.duration" // Incoming end to end duration, microseconds SubscriptionsPendingCount = "nats.subscriptions.pending.msgs" SubscriptionsPendingBytes = "nats.subscriptions.pending.bytes" SubscriptionsDroppedMsgs = "nats.subscriptions.dropped.count" SubscriptionCountMsgs = "nats.subscriptions.send.count" )
Server NATS metrics
Variables ¶
var ErrMultipleMiddleWare = errors.New("not allow create multiple instances")
Functions ¶
func ExtractAttributes ¶ added in v2.0.2
ExtractAttributes ... @additional - handle business cases
func ReplyFn ¶ added in v2.0.3
ReplyFn reply helper which send reply with wrapping trace information
func SemVersion ¶
func SemVersion() string
SemVersion is the semantic version to be supplied to tracer/meter creation.
Types ¶
type CommonPublish ¶
type CommonPublish struct { Conn natsPublisher // contains filtered or unexported fields }
func NewCommonPublish ¶
func NewCommonPublish(conn *nats.Conn, interceptor Interceptor) *CommonPublish
NewCommonPublish create instance of wrapper publisher
func (*CommonPublish) PublishMsgWithContext ¶
func (c *CommonPublish) PublishMsgWithContext(ctx context.Context, msg *nats.Msg) error
PublishMsgWithContext publishes the Msg structure, which includes the Subject, an optional Reply and an optional Data field.
func (*CommonPublish) PublishRequestWithContext ¶
func (c *CommonPublish) PublishRequestWithContext(ctx context.Context, subj, reply string, data []byte) error
PublishRequestWithContext will perform a Publish() expecting a response on the reply subject. Use Request() for automatically waiting for a response inline.
func (*CommonPublish) PublishWithContext ¶
PublishWithContext publishes the data argument to the given subject. The data argument is left untouched and needs to be correctly interpreted on the receiver.
func (*CommonPublish) RequestMsgWithContext ¶
func (c *CommonPublish) RequestMsgWithContext(ccx context.Context, m *nats.Msg) (res *nats.Msg, err error)
RequestMsgWithContext takes a context, a subject and payload in bytes and request expecting a single response.
func (*CommonPublish) RequestWithContext ¶
func (c *CommonPublish) RequestWithContext(ctx context.Context, subj string, data []byte) (*nats.Msg, error)
RequestWithContext will send a request payload and deliver the response message, or an error, including a timeout if no message was received properly.
type CommonSubscribe ¶ added in v2.0.2
type CommonSubscribe struct { *Core // contains filtered or unexported fields }
func NewCommonSubscriber ¶ added in v2.0.2
func NewCommonSubscriber(conn *nats.Conn, core *Core) *CommonSubscribe
func (*CommonSubscribe) BuildWrappedHandler ¶ added in v2.0.2
func (c *CommonSubscribe) BuildWrappedHandler(next MsgHandler) nats.MsgHandler
BuildWrappedHandler allow to create own mw, for bach processing for example or so on
func (CommonSubscribe) DefaultMiddleware ¶ added in v2.0.2
func (c CommonSubscribe) DefaultMiddleware() []Middleware
DefaultMiddleware subInter interceptor
func (*CommonSubscribe) QueueSubscribe ¶ added in v2.0.2
func (c *CommonSubscribe) QueueSubscribe(subj, queue string, cb MsgHandler) (*nats.Subscription, error)
QueueSubscribe creates an asynchronous queue subscriber on the given subject. All subscribers with the same queue name will form the queue group and only one member of the group will be selected to receive any given message asynchronously.
func (*CommonSubscribe) QueueSubscribeMW ¶ added in v2.0.2
func (c *CommonSubscribe) QueueSubscribeMW(subj, queue string, next PostFn) (*nats.Subscription, error)
QueueSubscribeMW mw callback function, just legacy Deprecated: just backport compatibility for PostFn legacy
func (*CommonSubscribe) QueueSubscribeSyncWithChan ¶ added in v2.0.2
func (c *CommonSubscribe) QueueSubscribeSyncWithChan(subj, queue string, ch chan *nats.Msg) (*nats.Subscription, error)
QueueSubscribeSyncWithChan will express interest in the given subject. All subscribers with the same queue name will form the queue group and only one member of the group will be selected to receive any given message, which will be placed on the channel. You should not close the channel until sub.Unsubscribe() has been called.
NOTE: middleware only subscription hook performed
func (*CommonSubscribe) Subscribe ¶ added in v2.0.2
func (c *CommonSubscribe) Subscribe(subj string, cb MsgHandler) (*nats.Subscription, error)
Subscribe will express interest in the given subject. The subject can have wildcards. There are two type of wildcards: * for partial, and > for full. A subscription on subject time.*.east would receive messages sent to time.us.east and time.eu.east. A subscription on subject time.us.> would receive messages sent to time.us.east and time.us.east.atlanta, while time.us.* would only match time.us.east since it can't match more than one token. Messages will be delivered to the associated MsgHandler.
func (*CommonSubscribe) SubscribeMW ¶ added in v2.0.2
func (c *CommonSubscribe) SubscribeMW(subj string, cb PostFn) (*nats.Subscription, error)
SubscribeMW backport compatible function for previous mw approach Deprecated: just backport compatibility for PostFn legacy
type ConnContext ¶
type ConnContext struct { Publish Subscriber *Core // contains filtered or unexported fields }
ConnContext wrapper for nats.ConnContext aks mw connection approach
Features: Expose subscription stats via function overwrite
func (ConnContext) DefaultMiddleware ¶
func (c ConnContext) DefaultMiddleware() []Middleware
DefaultMiddleware subInter interceptor
func (*ConnContext) JetStream ¶
func (c *ConnContext) JetStream(opts ...nats.JSOpt) (*JetStreamContext, error)
JetStream returns a JetStreamContext wrapper for consumer
type Core ¶
type Core struct {
// contains filtered or unexported fields
}
Core features for context
func (Core) DefaultMiddleware ¶
func (c Core) DefaultMiddleware() []Middleware
DefaultMiddleware subInter interceptor
func (*Core) Use ¶ added in v2.0.1
func (c *Core) Use(conn *nats.Conn) *ConnContext
Use connection with subMiddleware
type Interceptor ¶ added in v2.0.2
type Interceptor func(next MsgHandler) MsgHandler
Interceptor ...
func ChainInterceptor ¶ added in v2.0.2
func ChainInterceptor(interceptors ...Interceptor) Interceptor
func MiddlewareChain ¶ added in v2.0.2
func MiddlewareChain(mw ...Middleware) Interceptor
MiddlewareChain - MsgHandler decorator with subMiddleware
type JetStreamContext ¶
type JetStreamContext struct { *Core // contains filtered or unexported fields }
func (JetStreamContext) DefaultMiddleware ¶
func (c JetStreamContext) DefaultMiddleware() []Middleware
DefaultMiddleware subInter interceptor
func (*JetStreamContext) PullSubscribe ¶
func (j *JetStreamContext) PullSubscribe(subj, durable string, opts ...nats.SubOpt) (*nats.Subscription, error)
PullSubscribe creates a Subscription that can fetch messages. See important note in Subscribe(). Additionally, for an ephemeral pull consumer, the "durable" value must be set to an empty string.
Only wrap Subscription for gather stats
func (*JetStreamContext) QueueSubscribe ¶
func (j *JetStreamContext) QueueSubscribe(subj, queue string, cb MsgHandler, opts ...nats.SubOpt) (*nats.Subscription, error)
QueueSubscribe creates a Subscription with a queue group. If no optional durable name nor binding options are specified, the queue name will be used as a durable name. See important note in Subscribe()
func (*JetStreamContext) Subscribe ¶
func (j *JetStreamContext) Subscribe(subj string, cb MsgHandler, opts ...nats.SubOpt) (*nats.Subscription, error)
Subscribe creates an async Subscription for JetStream. The stream and consumer names can be provided with the nats.Bind() option. For creating an ephemeral (where the consumer name is picked by the server), you can provide the stream name with nats.BindStream(). If no stream name is specified, the library will attempt to figure out which stream the subscription is for. See important notes below for more details.
IMPORTANT NOTES: * If none of the options Bind() nor Durable() are specified, the library will send a request to the server to create an ephemeral JetStream consumer, which will be deleted after an Unsubscribe() or Drain(), or automatically by the server after a short period of time after the NATS subscription is gone. * If Durable() option is specified, the library will attempt to lookup a JetStream consumer with this name, and if found, will bind to it and not attempt to delete it. However, if not found, the library will send a request to create such durable JetStream consumer. Note that the library will delete the JetStream consumer after an Unsubscribe() or Drain() only if it created the durable consumer while subscribing. If the durable consumer already existed prior to subscribing it won't be deleted. * If Bind() option is provided, the library will attempt to lookup the consumer with the given name, and if successful, bind to it. If the lookup fails, then the Subscribe() call will return an error.
type Middleware ¶
type Middleware interface {
// contains filtered or unexported methods
}
type MsgHandler ¶
MsgHandler our desired way to handle subscriptions ctx allow inside function continue traces or pass log attachment error return allow subMiddleware to understand behaviour of system what has gone here, and it could change differently
type Option ¶
type Option interface {
// contains filtered or unexported methods
}
Option allows configuration of the httptrace Extract() and Inject() functions.
func WithDisableDefaultMiddleware ¶ added in v2.0.2
func WithDisableDefaultMiddleware() Option
WithDisableDefaultMiddleware disable default middleware usage
func WithDump ¶ added in v2.0.2
WithDump dump request as plain text to log and trace i guess we can go further and perform option with encoding requests
func WithDumpPayloadOnError ¶
WithDumpPayloadOnError write dump request and response on faults
Default: true
func WithNameFunction ¶
func WithPostHook ¶
WithPostHook set (only one) where you can perform post handle operation with data provided by handler Deprecated: legacy usage only
func WithPubMiddleware ¶ added in v2.0.2
func WithPubMiddleware(list ...Middleware) Option
WithPubMiddleware for publish
func WithReply ¶
WithReply extend mw with automatically sending reply on nats requests if they ask with data provided @inject - wrap nats.Msg handler with OTEL propagation data - extend traces, baggage and etc. Deprecated: legacy usage only
func WithSubMiddleware ¶ added in v2.0.2
func WithSubMiddleware(list ...Middleware) Option
WithSubMiddleware for subscriptions
type PostFn ¶
PostFn callback function which got new instance of tele inside ctx and msg sub + data Deprecated: legacy function, but we use it via conn wrapper: QueueSubscribeMW or SubscribeMW just for backport compatibility
type Publish ¶
type Publish interface { PublishWithContext(ctx context.Context, subj string, data []byte) error PublishMsgWithContext(ctx context.Context, msg *nats.Msg) error PublishRequestWithContext(ctx context.Context, subj, reply string, data []byte) error RequestWithContext(ctx context.Context, subj string, data []byte) (*nats.Msg, error) RequestMsgWithContext(ctx context.Context, msg *nats.Msg) (*nats.Msg, error) }
type SubMetrics ¶
type SubMetrics struct {
// contains filtered or unexported fields
}
SubMetrics implement Middleware interface
func NewMetrics ¶
func NewMetrics(m *metrics) *SubMetrics
type Subscriber ¶ added in v2.0.2
type Subscriber interface { Subscribe(subj string, cb MsgHandler) (*nats.Subscription, error) QueueSubscribe(subj, queue string, cb MsgHandler) (*nats.Subscription, error) QueueSubscribeMW(subj, queue string, next PostFn) (*nats.Subscription, error) SubscribeMW(subj string, cb PostFn) (*nats.Subscription, error) QueueSubscribeSyncWithChan(subj, queue string, ch chan *nats.Msg) (*nats.Subscription, error) BuildWrappedHandler(next MsgHandler) nats.MsgHandler }
type SubscriptionStatMetric ¶
type SubscriptionStatMetric struct {
// contains filtered or unexported fields
}
SubscriptionStatMetric hook provide important subscription statistics
func NewSubscriptionStatMetrics ¶
func NewSubscriptionStatMetrics(opts ...Option) (*SubscriptionStatMetric, error)
func (*SubscriptionStatMetric) Hook ¶
func (s *SubscriptionStatMetric) Hook(sub *nats.Subscription, err error) (*nats.Subscription, error)
func (*SubscriptionStatMetric) Register ¶
func (s *SubscriptionStatMetric) Register(sub ...*nats.Subscription)