Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var ConsumerElasticApmInterceptor = func(captureErrors bool, exitOnPanic bool) consumer.UnaryInterceptorFunc { opts := serverOptions{ tracer: apm.DefaultTracer(), } return func(next consumer.UnaryFunc) consumer.UnaryFunc { return func(ctx context.Context, req consumer.MessageRequest) (resp consumer.ConfirmationType, err error) { if !opts.tracer.Recording() { return next(ctx, req) } spec := req.Spec() requestName := fmt.Sprintf("%v with stream %v", spec.ConsumerName, spec.ConsumerQueue) tx, ctx := startTransaction(ctx, opts.tracer, requestName, req.Header(), spec.Version) defer tx.End() defer func() { r := recover() if r != nil { e := opts.tracer.Recovered(r) e.SetTransaction(tx) tx.Context.SetFramework(frameworkName, spec.Version) e.Handled = false e.Send() zerolog.Ctx(ctx).Error().Msgf("panic: %+v", r) if exitOnPanic { panic(r) } return } setTransactionResult(tx, resp) if err != nil && captureErrors { logError(err, ctx) } }() resp, err = next(ctx, req) return resp, err } } }
View Source
var PublisherElasticApmInterceptor = func() publisher.UnaryPublisherInterceptorFunc { return func(next publisher.UnaryPublisherFunc) publisher.UnaryPublisherFunc { return func(ctx context.Context, req publisher.AnyEvent) { if apmTx := apm.TransactionFromContext(ctx); apmTx != nil { req.SetHeader(elasticTraceparentHeader, apmhttp.FormatTraceparentHeader(apmTx.TraceContext())) span := apmTx.StartSpan( fmt.Sprintf("publish event to %v", req.GetDestination()), "eventsourcing", apm.SpanFromContext(ctx), ) span.Context.SetLabel("destination", req.GetDestination()) span.Context.SetServiceTarget(apm.ServiceTargetSpanContext{ Name: req.GetDestinationType(), Type: req.GetDestination(), }) defer span.End() } next(ctx, req) } } }
Functions ¶
This section is empty.
Types ¶
This section is empty.
Click to show internal directories.
Click to hide internal directories.