Documentation ¶
Overview ¶
Example (Consumer) ¶
exchangeName := "my-exchange" queueName := "my-queue" url := "amqp://guest:guest@localhost:5672/" sensor := instana.NewSensor("rabbitmq-client") c, err := amqp.Dial(url) failOnError(err, "Could not connect to the server") defer c.Close() ch, err := c.Channel() failOnError(err, "Could not acquire the channel") defer ch.Close() err = ch.ExchangeDeclare(exchangeName, "fanout", true, false, false, false, nil) failOnError(err, "Could not declare the exchange") q, err := ch.QueueDeclare(queueName, false, false, true, false, nil) failOnError(err, "Could not declare queue") err = ch.QueueBind(q.Name, "", exchangeName, false, nil) failOnError(err, "Could not bind the queue to the exchange") instaCh := instaamqp091.WrapChannel(sensor, ch, url) // Use the Instana `Consume` method with the same arguments as the original `Consume` method. msgs, err := instaCh.Consume(q.Name, "", true, false, false, false, nil) failOnError(err, "Could not consume messages") for d := range msgs { fmt.Println("Got a message:", string(d.Body)) }
Output:
Example (Publisher) ¶
exchangeName := "my-exchange" url := "amqp://guest:guest@localhost:5672/" // Create the Instana sensor sensor := instana.NewSensor("rabbitmq-client") c, err := amqp.Dial(url) failOnError(err, "Could not connect to the server") defer c.Close() ch, err := c.Channel() failOnError(err, "Could not acquire the channel") defer ch.Close() err = ch.ExchangeDeclare(exchangeName, "fanout", true, false, false, false, nil) failOnError(err, "Could not declare the exchange") // There must be a new entry span per publish call. // In the most common cases, creating an entry span manually is not needed, as the entry span is originated from an // incoming HTTP client call. entrySpan := sensor.Tracer().StartSpan("my-publishing-method") ext.SpanKind.Set(entrySpan, ext.SpanKindRPCServerEnum) instaCh := instaamqp091.WrapChannel(sensor, ch, url) // Use the Instana `Publish` method with the same arguments as the original `Publish` method, with the additional // entrySpan argument. err = instaCh.Publish(entrySpan, exchangeName, "", false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte("My published message"), }) failOnError(err, "Error publishing the message") entrySpan.Finish()
Output:
Index ¶
Examples ¶
Constants ¶
View Source
const Version = "0.21.0"
Version is the instrumentation module semantic version
Variables ¶
This section is empty.
Functions ¶
func SpanContextFromConsumerMessage ¶
func SpanContextFromConsumerMessage(d amqp.Delivery, sensor instana.TracerLogger) (ot.SpanContext, bool)
SpanContextFromConsumerMessage extracts the tracing context from amqp.Delivery#Headers
Types ¶
type AmqpChannel ¶
type AmqpChannel struct {
// contains filtered or unexported fields
}
AmqpChannel is a wrapper around the amqp.Channel object and contains all the relevant information to be tracked
func WrapChannel ¶
func WrapChannel(sensor instana.TracerLogger, ch PubCons, serverUrl string) *AmqpChannel
WrapChannel returns the AmqpChannel, which is Instana's wrapper around amqp.Channel
func (AmqpChannel) Consume ¶
func (c AmqpChannel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error)
Consume replaces the original amqp.Channel.Consume method in order to collect the relevant data to be tracked
func (AmqpChannel) Publish ¶
func (c AmqpChannel) Publish(entrySpan ot.Span, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error
Publish replaces the original amqp.Channel.Publish method in order to collect the relevant data to be tracked
type PubCons ¶
type PubCons interface { Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error) }
PubCons contains all methods that we want to instrument from the amqp library
Click to show internal directories.
Click to hide internal directories.