nats

package module
v1.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 31, 2022 License: GPL-3.0 Imports: 17 Imported by: 0

README

NATS module

How to start

go get github.com/tel-io/instrumentation/middleware/nats 
Consumer
import (
    "github.com/tel-io/tel/v2"
    "github.com/tel-io/instrumentation/middleware/nats"
)

func sub(){
    mw := natsmw.New(natsmw.WithTel(t))
	
    subscribe, err := con.Subscribe("nats.demo", mw.Handler(func (ctx context.Context, sub string, data []byte) ([]byte, error) {
    // send as reply
        return nil, []byte("HELLO WORLD")
    }))
    
    crash, err := con.Subscribe("nats.crash", mw.Handler(func (ctx context.Context, sub string, data []byte) ([]byte, error) {
        time.Sleep(time.Millisecond)
        panic("some panic")
        return nil, nil
    }))
    
    someErr, err := con.Subscribe("nats.err", mw.Handler(func (ctx context.Context, sub string, data []byte) ([]byte, error) {
        time.Sleep(time.Millisecond)
    return nil, fmt.Errorf("some errro")
    }))
}
Producer

ToDo

Documentation

Overview

Example (Handler)
cb := func(ctx context.Context, sub string, data []byte) ([]byte, error) {
	return nil, nil
}

tele := tel.NewNull()
mw := New(WithReply(true), WithTel(tele))

conn, _ := nats.Connect("example.com")

_, _ = conn.QueueSubscribe("sub", "queue", mw.Handler(cb))
_, _ = conn.QueueSubscribe("sub2", "queue", mw.Handler(cb))
Output:

Index

Examples

Constants

View Source
const (
	Subject       = attribute.Key("nats.subject")
	IsError       = attribute.Key("nats.code")
	ReadBytesKey  = attribute.Key("nats.read_bytes")  // if anything was read from the request body, the total number of bytes read
	WroteBytesKey = attribute.Key("nats.wrote_bytes") // if anything was written to the response writer, the total number of bytes written
)

Attribute keys that can be added to a span.

View Source
const (
	RequestCount          = "nats.consumer.request_count"           // Incoming request count total
	RequestContentLength  = "nats.consumer.request_content_length"  // Incoming request bytes total
	ResponseContentLength = "nats.consumer.response_content_length" // Incoming response bytes total
	ServerLatency         = "nats.consumer.duration"                // Incoming end to end duration, microseconds
)

Server HTTP metrics

Variables

This section is empty.

Functions

func SemVersion

func SemVersion() string

SemVersion is the semantic version to be supplied to tracer/meter creation.

func Version

func Version() string

Version is the current release version of the otelnats instrumentation.

Types

type MiddleWare

type MiddleWare struct {
	// contains filtered or unexported fields
}

MiddleWare helper

func New

func New(opts ...Option) *MiddleWare

New nats middleware

func (*MiddleWare) Handler

func (n *MiddleWare) Handler(next PostFn) func(*nats.Msg)

Handler is entry point perform recovery, debug logging and perform tracing

type Option

type Option interface {
	// contains filtered or unexported methods
}

Option allows configuration of the httptrace Extract() and Inject() functions.

func WithPostHook

func WithPostHook(cb PostHook) Option

WithPostHook set (only one) where you can perform post handle operation with data provided by handler

func WithReply

func WithReply(inject bool) Option

WithReply extend mw with automatically sending reply on nats requests if they ask with data provided @inject - wrap nats.Msg handler with OTEL propagation data - extend traces, baggage and etc.

func WithTel

func WithTel(t tel.Telemetry) Option

WithTel in some cases we should put another version

type PostFn

type PostFn func(ctx context.Context, sub string, data []byte) ([]byte, error)

PostFn callback function which got new instance of tele inside ctx and msg sub + data

type PostHook

type PostHook func(ctx context.Context, msg *nats.Msg, data []byte) error

Directories

Path Synopsis
example module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL