README
¶
apm-kafkago
Elastic APM Go agent support for segmentio/kafka-go client.
This package wraps segementio/kafka-go reader and writer to add the traceparent header to the message header and create a new transaction for each message.
Usage as Consumer
import(
"github.com/segmentio/kafka-go"
"github.com/sohaibomr/apm-kafkago"
)
kReader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{kafkaBrokers},
Topic: topic,
GroupID: "payment-group",
})
reader := apmkafkago.WrapReader(kReader) // use this reader to read messages, this will parse the Traceparent header from the message header and create a new transaction for each message
Usage as Producer
import(
"github.com/segmentio/kafka-go"
"github.com/sohaibomr/apm-kafkago"
)
kWriter := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{kafkaBrokers},
Topic: topic,
})
writer := apmkafkago.WrapWriter(kWriter) // use this writer to write messages, this will add the Traceparent header to the message header
How will it look on the APM Dashboard ?
Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Reader ¶
type Reader struct {
R *kafka.Reader
}
Reader is a wrapper around kafka.Read
func WrapReader ¶
func WrapReader(r *kafka.Reader) *Reader
WrapReader returns a new Reader wraper around kafka.Reader
func (*Reader) ReadMessage ¶
ReadMessage reads a message from kafka
func (*Reader) ReadMessageTx ¶
func (r *Reader) ReadMessageTx(ctx context.Context) (msg kafka.Message, tx *apm.Transaction, err error)
ReadMessageTx reads a message from kafka and returns apm transaction to be used in a transactional context call transaction.end at the end of processing kafka message
type Writer ¶
type Writer struct {
W *kafka.Writer
}
Writer is a wrapper around kafka.Writer
func WrapWriter ¶
func WrapWriter(w *kafka.Writer) *Writer
WrapWriter returns a new Writer wraper around kafka.Writer
Click to show internal directories.
Click to hide internal directories.