nats

package module
v1.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 25, 2022 License: GPL-3.0 Imports: 21 Imported by: 0

README

NATS module

Overview

Observability stack for NATS microservice services. Because of NATS not supported any middleware we was forced to reinvent by ourselves, and we had been chosen just wrap nats.Conn as one simplest way to do it.

Unfortunately, we were not able to perform composition with original nats.Con structure, and we found that it could bring users in confusions since not full functions are covers.

Middleware concept allowed to decompose business logic in small mw components. Thus, we offer replacement of MsgHandler with context and error return.

How to start

go get github.com/tel-io/instrumentation/middleware/nats 
// create connection to nats
con, _ := nats.Connect(addr)

// wrap it 
mw := natsmw.New(con, natsmw.WithTel(t))

Features

  • Decorated instance has near legacy signature
  • Build-IN Trace, Logs, Metrics, Recovery middlewares
  • NATS Core fully supported functionality: async sub, pub, request, reply
  • NATS JetStream: partial support
  • Grafana Dashboard covered sub/pub,request,reply
  • *nats.Subscription all wrapped function return attached subscription watcher who scrap metrics
Consumer
NATS Core

SubscribeMW and QueueSubscribeMW function just backport compatibility for our previous version where signature was:

type PostFn func(ctx context.Context, sub string, data []byte) ([]byte, error)
  • Subscribe
  • QueueSubscribe

NOTE:

Example:

func main(){
    // create nats connection
    con, _ := nats.Connect(addr)
    // create our mw from nats connection
    mw := natsmw.New(con, natsmw.WithTel(t))
    // we have handler already used our new-brand handler
    ourHandler := func(ctx context.Context, msg *nats.Msg) error {
        // perform respond with possible error returning
        return msg.Respond([]byte("HELLO"))
    }
	
    // subscribe to queue via our middleware with our `ourHandler`
    subscribe, _ := mw.QueueSubscribe("nats.demo", "consumer", ourHandler)
	
	// or without queue
    subscribe, _ := mw.Subscribe("nats.demo",  ourHandler)
}
JetStream

You should create JetStream from our wrapper For subscription, we covered Subscribe and QueueSubscribe as push stack, which quite less popular as it provide not optimized for horizontal scale

Here is example:

func main(){
	// create nats connection
    con, _ := nats.Connect(addr)
    // create our mw from nats connection
    mw := natsmw.New(con, natsmw.WithTel(t))
	
    // we have handler already used our new-brand handler
    ourHandler := func(ctx context.Context, msg *nats.Msg) error {
        _ = msg.Ack()
        return nil
    }

	// create wrapped js instance
    js, _ := mw.JetStream()
	
    // simple subscription with our handler to js
    handler := js.Subscribe("nats.demo",  ourHandler)
	
	// subscription with queue to js
    handler := js.QueueSubscribe("nats.demo",  "consumer", ourHandler)
}
BuildWrappedHandler

BuildWrappedHandler feature allow wrap any native function with middleware stack, allow to build middleware handler for function which not covered.

Here is example with jetstream:

func main(){
    // some context, it could be signal closer or just stub
    ccx := context.TODO()
    
    // create tel instance with ccx provided ccx context
    t, closer := tel.New(ccx, tel.DefaultDebugConfig())
    defer closer()

	// create nats connection
    con, _ := nats.Connect(addr)
    // create our mw from nats connection
    mw := natsmw.New(con, natsmw.WithTel(t))
	
    // we have handler already used our new-brand handler
    ourHandler := func(ctx context.Context, msg *nats.Msg) error {
        _ = msg.Ack()
        return nil
    }
    
    // create wrapped js instance
    js, _ := mw.JetStream()

    // but PullSubscribe don't process any handler, but we would like observe this process
    sub, _ := js.PullSubscribe("stream.demo", "PULL")
	
    // so we just create function which func(msg *nats.Msg) but would be processed with our `ourHandler` processor
    handler := mw.BuildWrappedHandler(ourHandler)

    for{
        msgs, _ := v.Fetch(100)
        for _, msg := range msgs {
            //and here we process it
            handler(msg)
    }
}
Producer

All our produced function receive ctx and name has WithContext suffix.

WARNING! Context should contain tel context, as most important feature is to continue span of traces

NATS Core

Here is example:

func main(){
	// some context, it could be signal closer or just stub
	ccx := context.TODO()
	
	// create tel instance with ccx provided ccx context
    t, closer := tel.New(ccx, tel.DefaultDebugConfig())
    defer closer()

	// wrap ccx with context
    ctx := tel.WithContext(ccx, t)
	
	// create nats connection
    con, _ := nats.Connect(addr)
	
    // create our mw from nats connection
    mw := natsmw.New(con, natsmw.WithTel(t))
	
	// send just with subject and body
    _ = con.PublishWithContext(ctx, "nats.test", []byte("HELLO_WORLD"))
	
	// or with native nats.Msg 
    _ = con.PublishMsgWithContext(ctx, &nats.Msg{Subject:"nats.test", Data: []byte("HELLO_WORLD"})	
	
	// or none-blocking request - assume that reply would be provided further
	_ = con.PublishRequestWithContext(ctx, "nats.test", "nats.reply", []byte("HELLO_WORLD"))

	// request with reply
    reply, _ = con.RequestWithContext(ctx, "nats.test",  []byte("HELLO_WORLD"))

    // request with reply via nats.Msg
    reply, _ = con.RequestWithContext(ctx, &nats.Msg{Subject:"nats.test", Data: []byte("HELLO_WORLD"})
}
JetStream

There is no helper yet

Documentation

Overview

Example (Handler)
tele := tel.NewNull()
ctx := tele.Ctx()

conn, _ := nats.Connect("example.com")
nConn := New(conn, WithTel(tele))

// legacy backport
cbLegacy := func(ctx context.Context, sub string, data []byte) ([]byte, error) {
	return nil, nil
}

cb := func(ctx context.Context, msg *nats.Msg) error {
	return nil
}

_, _ = nConn.QueueSubscribeMW("sub", "queue", cbLegacy)
_, _ = nConn.QueueSubscribeMW("sub2", "queue", cbLegacy)

// sub
nConn.Subscribe("sub", cb)
nConn.QueueSubscribe("sub", "xxx", cb)

// pub
nConn.PublishWithContext(ctx, "sub", []byte("HELLO"))
nConn.PublishMsgWithContext(ctx, &nats.Msg{})
nConn.PublishRequestWithContext(ctx, "sub", "reply", []byte("HELLO"))
nConn.RequestWithContext(ctx, "sub", []byte("HELLO"))
nConn.RequestMsgWithContext(ctx, &nats.Msg{})
Output:

Index

Examples

Constants

View Source
const (
	Subject = attribute.Key("subject")
	IsError = attribute.Key("error")
	Kind    = attribute.Key("kind_of")
)

Attribute keys that can be added to a span.

View Source
const (
	KindSub     = "sub"
	KindPub     = "pub"
	KindRequest = "request"
	KindRespond = "respond"
)
View Source
const (
	Count         = "nats.count"          // Incoming request count total
	ContentLength = "nats.content_length" // Incoming request bytes total
	Latency       = "nats.duration"       // Incoming end to end duration, microseconds

	SubscriptionsPendingCount = "nats.subscriptions.pending.msgs"
	SubscriptionsPendingBytes = "nats.subscriptions.pending.bytes"
	SubscriptionsDroppedMsgs  = "nats.subscriptions.dropped.count"
	SubscriptionCountMsgs     = "nats.subscriptions.send.count"
)

Server NATS metrics

Variables

This section is empty.

Functions

func SemVersion

func SemVersion() string

SemVersion is the semantic version to be supplied to tracer/meter creation.

func Version

func Version() string

Version is the current release version of the otelnats instrumentation.

Types

type CommonPublish added in v1.3.0

type CommonPublish struct {
	*nats.Conn
}

func NewCommonPublish added in v1.3.0

func NewCommonPublish(conn *nats.Conn) *CommonPublish

func (*CommonPublish) PublishMsgWithContext added in v1.3.0

func (c *CommonPublish) PublishMsgWithContext(ctx context.Context, msg *nats.Msg) error

PublishMsgWithContext publishes the Msg structure, which includes the Subject, an optional Reply and an optional Data field.

func (*CommonPublish) PublishRequestWithContext added in v1.3.0

func (c *CommonPublish) PublishRequestWithContext(_ context.Context, subj, reply string, data []byte) error

PublishRequestWithContext will perform a Publish() expecting a response on the reply subject. Use Request() for automatically waiting for a response inline.

func (*CommonPublish) PublishWithContext added in v1.3.0

func (c *CommonPublish) PublishWithContext(ctx context.Context, subj string, data []byte) error

PublishWithContext publishes the data argument to the given subject. The data argument is left untouched and needs to be correctly interpreted on the receiver.

func (*CommonPublish) RequestMsgWithContext added in v1.3.0

func (c *CommonPublish) RequestMsgWithContext(ctx context.Context, msg *nats.Msg) (*nats.Msg, error)

RequestMsgWithContext takes a context, a subject and payload in bytes and request expecting a single response.

func (*CommonPublish) RequestWithContext added in v1.3.0

func (c *CommonPublish) RequestWithContext(ctx context.Context, subj string, data []byte) (*nats.Msg, error)

RequestWithContext will send a request payload and deliver the response message, or an error, including a timeout if no message was received properly.

type ConnContext added in v1.3.0

type ConnContext struct {
	Publish

	*Core
	// contains filtered or unexported fields
}

ConnContext wrapper for nats.ConnContext aks mw connection approach

Features: Expose subscription stats via function overwrite

func New

func New(conn *nats.Conn, opts ...Option) *ConnContext

New wraps nats Core connection with middleware functionality

func (*ConnContext) BuildWrappedHandler added in v1.3.0

func (c *ConnContext) BuildWrappedHandler(cb MsgHandler) nats.MsgHandler

BuildWrappedHandler allow to create own mw, for bach processing for example or so on

func (*ConnContext) Conn added in v1.3.0

func (c *ConnContext) Conn() *nats.Conn

Conn unwrap connection

func (ConnContext) DefaultMiddleware added in v1.3.0

func (c ConnContext) DefaultMiddleware() []Middleware

func (ConnContext) DefaultPubMiddleware added in v1.3.0

func (c ConnContext) DefaultPubMiddleware() []PubMiddleware

func (*ConnContext) JetStream added in v1.3.0

func (c *ConnContext) JetStream(opts ...nats.JSOpt) (*JetStreamContext, error)

JetStream returns a JetStreamContext wrapper for consumer

func (ConnContext) Middleware added in v1.3.0

func (c ConnContext) Middleware() []Middleware

func (ConnContext) PubMiddleware added in v1.3.0

func (c ConnContext) PubMiddleware() []PubMiddleware

func (*ConnContext) QueueSubscribe added in v1.3.0

func (c *ConnContext) QueueSubscribe(subj, queue string, cb MsgHandler) (*nats.Subscription, error)

QueueSubscribe creates an asynchronous queue subscriber on the given subject. All subscribers with the same queue name will form the queue group and only one member of the group will be selected to receive any given message asynchronously.

func (*ConnContext) QueueSubscribeMW added in v1.3.0

func (c *ConnContext) QueueSubscribeMW(subj, queue string, next PostFn) (*nats.Subscription, error)

QueueSubscribeMW mw callback function, just legacy Deprecated: just backport compatibility for PostFn legacy

func (*ConnContext) Subscribe added in v1.3.0

func (c *ConnContext) Subscribe(subj string, cb MsgHandler) (*nats.Subscription, error)

Subscribe will express interest in the given subject. The subject can have wildcards. There are two type of wildcards: * for partial, and > for full. A subscription on subject time.*.east would receive messages sent to time.us.east and time.eu.east. A subscription on subject time.us.> would receive messages sent to time.us.east and time.us.east.atlanta, while time.us.* would only match time.us.east since it can't match more than one token. Messages will be delivered to the associated MsgHandler.

func (*ConnContext) SubscribeMW added in v1.3.0

func (c *ConnContext) SubscribeMW(subj string, cb PostFn) (*nats.Subscription, error)

SubscribeMW backport compatible function for previous mw approach Deprecated: just backport compatibility for PostFn legacy

type Core added in v1.3.0

type Core struct {
	// contains filtered or unexported fields
}

Core features for context

func (Core) DefaultMiddleware added in v1.3.0

func (c Core) DefaultMiddleware() []Middleware

func (Core) DefaultPubMiddleware added in v1.3.0

func (c Core) DefaultPubMiddleware() []PubMiddleware

func (Core) Middleware added in v1.3.0

func (c Core) Middleware() []Middleware

func (Core) PubMiddleware added in v1.3.0

func (c Core) PubMiddleware() []PubMiddleware

type JetStreamContext added in v1.3.0

type JetStreamContext struct {
	*Core
	// contains filtered or unexported fields
}

func (JetStreamContext) DefaultMiddleware added in v1.3.0

func (c JetStreamContext) DefaultMiddleware() []Middleware

func (JetStreamContext) DefaultPubMiddleware added in v1.3.0

func (c JetStreamContext) DefaultPubMiddleware() []PubMiddleware

func (*JetStreamContext) JS added in v1.3.0

func (j *JetStreamContext) JS() nats.JetStreamContext

JS unwrap

func (JetStreamContext) Middleware added in v1.3.0

func (c JetStreamContext) Middleware() []Middleware

func (JetStreamContext) PubMiddleware added in v1.3.0

func (c JetStreamContext) PubMiddleware() []PubMiddleware

func (*JetStreamContext) PullSubscribe added in v1.3.0

func (j *JetStreamContext) PullSubscribe(subj, durable string, opts ...nats.SubOpt) (*nats.Subscription, error)

PullSubscribe creates a Subscription that can fetch messages. See important note in Subscribe(). Additionally, for an ephemeral pull consumer, the "durable" value must be set to an empty string.

Only wrap Subscription for gather stats

func (*JetStreamContext) QueueSubscribe added in v1.3.0

func (j *JetStreamContext) QueueSubscribe(subj, queue string, cb MsgHandler, opts ...nats.SubOpt) (*nats.Subscription, error)

QueueSubscribe creates a Subscription with a queue group. If no optional durable name nor binding options are specified, the queue name will be used as a durable name. See important note in Subscribe()

func (*JetStreamContext) Subscribe added in v1.3.0

func (j *JetStreamContext) Subscribe(subj string, cb MsgHandler, opts ...nats.SubOpt) (*nats.Subscription, error)

Subscribe creates an async Subscription for JetStream. The stream and consumer names can be provided with the nats.Bind() option. For creating an ephemeral (where the consumer name is picked by the server), you can provide the stream name with nats.BindStream(). If no stream name is specified, the library will attempt to figure out which stream the subscription is for. See important notes below for more details.

IMPORTANT NOTES: * If none of the options Bind() nor Durable() are specified, the library will send a request to the server to create an ephemeral JetStream consumer, which will be deleted after an Unsubscribe() or Drain(), or automatically by the server after a short period of time after the NATS subscription is gone. * If Durable() option is specified, the library will attempt to lookup a JetStream consumer with this name, and if found, will bind to it and not attempt to delete it. However, if not found, the library will send a request to create such durable JetStream consumer. Note that the library will delete the JetStream consumer after an Unsubscribe() or Drain() only if it created the durable consumer while subscribing. If the durable consumer already existed prior to subscribing it won't be deleted. * If Bind() option is provided, the library will attempt to lookup the consumer with the given name, and if successful, bind to it. If the lookup fails, then the Subscribe() call will return an error.

type Logs added in v1.3.0

type Logs struct {
	// contains filtered or unexported fields
}

Logs dump some payload

func NewLogs added in v1.3.0

func NewLogs(cfg *config) *Logs

func (Logs) DefaultMiddleware added in v1.3.0

func (c Logs) DefaultMiddleware() []Middleware

func (Logs) DefaultPubMiddleware added in v1.3.0

func (c Logs) DefaultPubMiddleware() []PubMiddleware

func (Logs) Middleware added in v1.3.0

func (c Logs) Middleware() []Middleware

func (Logs) PubMiddleware added in v1.3.0

func (c Logs) PubMiddleware() []PubMiddleware

type Middleware added in v1.3.0

type Middleware interface {
	// contains filtered or unexported methods
}

type MsgHandler added in v1.3.0

type MsgHandler func(ctx context.Context, msg *nats.Msg) error

MsgHandler our desired way to handle subscriptions ctx allow inside function continue traces or pass log attachment error return allow middleware to understand behaviour of system what has gone here, and it could change differently

type NameFn added in v1.3.0

type NameFn func(msg *nats.Msg) string

NameFn operation name description

type Option

type Option interface {
	// contains filtered or unexported methods
}

Option allows configuration of the httptrace Extract() and Inject() functions.

func WithDumpPayloadOnError added in v1.2.3

func WithDumpPayloadOnError(enable bool) Option

WithDumpPayloadOnError write dump request and response on faults

Default: true

func WithDumpRequest added in v1.2.3

func WithDumpRequest(enable bool) Option

WithDumpRequest dump request as plain text to log and trace i guess we can go further and perform option with encoding requests

func WithDumpResponse added in v1.2.3

func WithDumpResponse(enable bool) Option

WithDumpResponse dump response as plain text to log and trace

func WithNameFunction added in v1.3.0

func WithNameFunction(fn NameFn) Option

func WithPostHook

func WithPostHook(cb PostHook) Option

WithPostHook set (only one) where you can perform post handle operation with data provided by handler Deprecated: legacy usage only

func WithReply

func WithReply(inject bool) Option

WithReply extend mw with automatically sending reply on nats requests if they ask with data provided @inject - wrap nats.Msg handler with OTEL propagation data - extend traces, baggage and etc. Deprecated: legacy usage only

func WithTel

func WithTel(t tel.Telemetry) Option

WithTel in some cases we should put another version

type PostFn

type PostFn func(ctx context.Context, sub string, data []byte) ([]byte, error)

PostFn callback function which got new instance of tele inside ctx and msg sub + data Deprecated: legacy function, but we use it via conn wrapper: QueueSubscribeMW or SubscribeMW just for backport compatibility

type PostHook

type PostHook func(ctx context.Context, msg *nats.Msg, data []byte) error

type PubMetric added in v1.3.0

type PubMetric struct {
	// contains filtered or unexported fields
}

PubMetric handle publish and request metrics gathering implementing PubMiddleware

func NewPubMetric added in v1.3.0

func NewPubMetric(m *metrics) *PubMetric

func (*PubMetric) PublishMsgWithContext added in v1.3.0

func (p *PubMetric) PublishMsgWithContext(ctx context.Context, msg *nats.Msg) (err error)

func (*PubMetric) PublishRequestWithContext added in v1.3.0

func (p *PubMetric) PublishRequestWithContext(ctx context.Context, subj, reply string, data []byte) (err error)

func (*PubMetric) PublishWithContext added in v1.3.0

func (p *PubMetric) PublishWithContext(ctx context.Context, subj string, data []byte) (err error)

func (*PubMetric) RequestMsgWithContext added in v1.3.0

func (p *PubMetric) RequestMsgWithContext(ctx context.Context, msg *nats.Msg) (resp *nats.Msg, err error)

func (*PubMetric) RequestWithContext added in v1.3.0

func (p *PubMetric) RequestWithContext(ctx context.Context, subj string, data []byte) (resp *nats.Msg, err error)

type PubMiddleware added in v1.3.0

type PubMiddleware interface {
	Publish
	// contains filtered or unexported methods
}

type PubTrace added in v1.3.0

type PubTrace struct {
	// contains filtered or unexported fields
}

PubTrace handle trace handling implementing PubMiddleware

func NewPubTrace added in v1.3.0

func NewPubTrace(fn NameFn) *PubTrace

func (*PubTrace) PublishMsgWithContext added in v1.3.0

func (p *PubTrace) PublishMsgWithContext(cxt context.Context, msg *nats.Msg) (err error)

func (*PubTrace) PublishRequestWithContext added in v1.3.0

func (p *PubTrace) PublishRequestWithContext(ctx context.Context, subj, reply string, data []byte) error

PublishRequestWithContext we just use PublishMsgWithContext to handle publish with reply option

func (*PubTrace) PublishWithContext added in v1.3.0

func (p *PubTrace) PublishWithContext(cxt context.Context, subj string, data []byte) (err error)

func (*PubTrace) RequestMsgWithContext added in v1.3.0

func (p *PubTrace) RequestMsgWithContext(cxt context.Context, msg *nats.Msg) (resp *nats.Msg, err error)

func (*PubTrace) RequestWithContext added in v1.3.0

func (p *PubTrace) RequestWithContext(ctx context.Context, subj string, data []byte) (*nats.Msg, error)

type Publish added in v1.3.0

type Publish interface {
	PublishWithContext(ctx context.Context, subj string, data []byte) error
	PublishMsgWithContext(ctx context.Context, msg *nats.Msg) error
	PublishRequestWithContext(ctx context.Context, subj, reply string, data []byte) error
	RequestWithContext(ctx context.Context, subj string, data []byte) (*nats.Msg, error)
	RequestMsgWithContext(ctx context.Context, msg *nats.Msg) (*nats.Msg, error)
}

type Recovery added in v1.3.0

type Recovery struct{}

func NewRecovery added in v1.3.0

func NewRecovery() *Recovery

type SubMetrics added in v1.3.0

type SubMetrics struct {
	// contains filtered or unexported fields
}

SubMetrics implement Middleware interface

func NewMetrics added in v1.3.0

func NewMetrics(m *metrics) *SubMetrics

type SubscriptionStatMetric added in v1.3.0

type SubscriptionStatMetric struct {
	// contains filtered or unexported fields
}

SubscriptionStatMetric hook provide important subscription statistics

func NewSubscriptionStatMetrics added in v1.3.0

func NewSubscriptionStatMetrics(opts ...Option) (*SubscriptionStatMetric, error)

func (*SubscriptionStatMetric) Hook added in v1.3.0

func (s *SubscriptionStatMetric) Hook(sub *nats.Subscription, err error) (*nats.Subscription, error)

func (*SubscriptionStatMetric) Register added in v1.3.0

func (s *SubscriptionStatMetric) Register(sub ...*nats.Subscription)

type Tracer added in v1.3.0

type Tracer struct {
	// contains filtered or unexported fields
}

Tracer for subscribers implementing Middleware

func NewTracer added in v1.3.0

func NewTracer(fn NameFn) *Tracer

Directories

Path Synopsis
example module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL