instaamqp091

package module
v0.21.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 13, 2025 License: MIT Imports: 6 Imported by: 0

README

Instana instrumentation for amqp091-go

This module contains instrumentation code for RabbitMQ clients written with amqp091-go.

GoDoc

Installation

To add the module to your go.mod file run the following command in your project directory:

$ go get github.com/instana/go-sensor/instrumentation/instaamqp091

Usage

instaamqp091 offers a function wrapper around amqp.Channel that returns an instaamqp091.AmqpChannel instance. This Instana object provides instrumentation for the amqp.Channel.Publish and amqp.Channel.Consume methods, that are responsible for tracing data from messages sent and received.

For any other amqp.Channel methods, the original amqp.Channel instance can be normally used.

A publisher example:

import (
	instana "github.com/instana/go-sensor"
	"github.com/instana/go-sensor/instrumentation/instaamqp091"
	"github.com/opentracing/opentracing-go/ext"
	amqp "github.com/rabbitmq/amqp091-go"
)

func Example_publisher() {
	exchangeName := "my-exchange"
	url := "amqp://guest:guest@localhost:5672/"

	// Create the Instana sensor
	sensor := instana.NewSensor("rabbitmq-client")

	c, err := amqp.Dial(url)
	failOnError(err, "Could not connect to the server")
	defer c.Close()

	ch, err := c.Channel()
	failOnError(err, "Could not acquire the channel")
	defer ch.Close()

	err = ch.ExchangeDeclare(exchangeName, "fanout", true, false, false, false, nil)
	failOnError(err, "Could not declare the exchange")

	// There must be an entry span per publish call.
	// In most common cases, creating an entry span manually is not needed, as the entry span is originated from an
	// incoming HTTP client call.
	entrySpan := sensor.Tracer().StartSpan("my-publishing-method")
	ext.SpanKind.Set(entrySpan, ext.SpanKindRPCServerEnum)

	// We wrap the original amqp.Channel.Publish and amqp.Channel.Consume methods into an Instana object.
	instaCh := instaamqp091.WrapChannel(sensor, ch, url)

	// Use the Instana `Publish` method with the same arguments as the original `Publish` method, with the additional
	// `entrySpan` argument. That's it!
	err = instaCh.Publish(entrySpan, exchangeName, "", false, false, amqp.Publishing{
		ContentType: "text/plain",
		Body:        []byte(fmt.Sprintf("My published message")),
	})

	failOnError(err, "Error publishing the message")
	entrySpan.Finish()
}

A consumer example:

import (
	instana "github.com/instana/go-sensor"
	"github.com/instana/go-sensor/instrumentation/instaamqp091"
	"github.com/opentracing/opentracing-go/ext"
	amqp "github.com/rabbitmq/amqp091-go"
)

func Example_consumer() {
	exchangeName := "my-exchange"
	queueName := "my-queue"
	url := "amqp://guest:guest@localhost:5672/"

	sensor := instana.NewSensor("rabbitmq-client")

	c, err := amqp.Dial(url)
	failOnError(err, "Could not connect to the server")
	defer c.Close()

	ch, err := c.Channel()
	failOnError(err, "Could not acquire the channel")
	defer ch.Close()

	err = ch.ExchangeDeclare(exchangeName, "fanout", true, false, false, false, nil)
	failOnError(err, "Could not declare the exchange")

	q, err := ch.QueueDeclare(queueName, false, false, true, false, nil)
	failOnError(err, "Could not declare queue")

	err = ch.QueueBind(q.Name, "", exchangeName, false, nil)
	failOnError(err, "Could not bind the queue to the exchange")

	instaCh := instaamqp091.WrapChannel(sensor, ch, url)

	// Use the Instana `Consume` method with the same arguments as the original `Consume` method.
	msgs, err := instaCh.Consume(q.Name, "", true, false, false, false, nil)
	failOnError(err, "Could not consume messages")

	forever := make(chan bool)

	go func() {
		for d := range msgs {
			fmt.Println("Got a message:", string(d.Body))
		}
	}()

	<-forever
}

See the instaamqp091 package documentation for detailed examples.

Documentation

Overview

Example (Consumer)
exchangeName := "my-exchange"
queueName := "my-queue"
url := "amqp://guest:guest@localhost:5672/"

sensor := instana.NewSensor("rabbitmq-client")

c, err := amqp.Dial(url)
failOnError(err, "Could not connect to the server")
defer c.Close()

ch, err := c.Channel()
failOnError(err, "Could not acquire the channel")
defer ch.Close()

err = ch.ExchangeDeclare(exchangeName, "fanout", true, false, false, false, nil)
failOnError(err, "Could not declare the exchange")

q, err := ch.QueueDeclare(queueName, false, false, true, false, nil)
failOnError(err, "Could not declare queue")

err = ch.QueueBind(q.Name, "", exchangeName, false, nil)
failOnError(err, "Could not bind the queue to the exchange")

instaCh := instaamqp091.WrapChannel(sensor, ch, url)

// Use the Instana `Consume` method with the same arguments as the original `Consume` method.
msgs, err := instaCh.Consume(q.Name, "", true, false, false, false, nil)
failOnError(err, "Could not consume messages")

for d := range msgs {
	fmt.Println("Got a message:", string(d.Body))
}
Output:

Example (Publisher)
exchangeName := "my-exchange"
url := "amqp://guest:guest@localhost:5672/"

// Create the Instana sensor
sensor := instana.NewSensor("rabbitmq-client")

c, err := amqp.Dial(url)
failOnError(err, "Could not connect to the server")
defer c.Close()

ch, err := c.Channel()
failOnError(err, "Could not acquire the channel")
defer ch.Close()

err = ch.ExchangeDeclare(exchangeName, "fanout", true, false, false, false, nil)
failOnError(err, "Could not declare the exchange")

// There must be a new entry span per publish call.
// In the most common cases, creating an entry span manually is not needed, as the entry span is originated from an
// incoming HTTP client call.
entrySpan := sensor.Tracer().StartSpan("my-publishing-method")
ext.SpanKind.Set(entrySpan, ext.SpanKindRPCServerEnum)

instaCh := instaamqp091.WrapChannel(sensor, ch, url)

// Use the Instana `Publish` method with the same arguments as the original `Publish` method, with the additional
// entrySpan argument.
err = instaCh.Publish(entrySpan, exchangeName, "", false, false, amqp.Publishing{
	ContentType: "text/plain",
	Body:        []byte("My published message"),
})

failOnError(err, "Error publishing the message")
entrySpan.Finish()
Output:

Index

Examples

Constants

View Source
const Version = "0.21.0"

Version is the instrumentation module semantic version

Variables

This section is empty.

Functions

func SpanContextFromConsumerMessage

func SpanContextFromConsumerMessage(d amqp.Delivery, sensor instana.TracerLogger) (ot.SpanContext, bool)

SpanContextFromConsumerMessage extracts the tracing context from amqp.Delivery#Headers

Types

type AmqpChannel

type AmqpChannel struct {
	// contains filtered or unexported fields
}

AmqpChannel is a wrapper around the amqp.Channel object and contains all the relevant information to be tracked

func WrapChannel

func WrapChannel(sensor instana.TracerLogger, ch PubCons, serverUrl string) *AmqpChannel

WrapChannel returns the AmqpChannel, which is Instana's wrapper around amqp.Channel

func (AmqpChannel) Consume

func (c AmqpChannel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error)

Consume replaces the original amqp.Channel.Consume method in order to collect the relevant data to be tracked

func (AmqpChannel) Publish

func (c AmqpChannel) Publish(entrySpan ot.Span, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error

Publish replaces the original amqp.Channel.Publish method in order to collect the relevant data to be tracked

type PubCons

type PubCons interface {
	Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error
	Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error)
}

PubCons contains all methods that we want to instrument from the amqp library

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL