README ¶
OpenTelemetry instrumentation for CloudEvents
This package contains the components necessary to instrument CloudEvents clients with OpenTelemetry. The main component is the OTelObservabilityService
which implements the ObservabilityService
interface from CloudEvents.
Instrumented CloudEvents HTTP client
If you want to get a fully instrumented HTTP client, use the helper method in the github.com/cloudevents/sdk-go/observability/opentelemetry/v2
module:
import (
"context"
otelObs "github.com/cloudevents/sdk-go/observability/opentelemetry/v2/client"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/client"
cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
)
// you can pass the http/client options as usual
c, _ := otelObs.NewClientHTTP([]cehttp.Option{}, []client.Option{})
This will produce spans for all outgoing and incoming requests. By default, the spans will have the attributes as defined in keys.go. For more advanced configuration, see the next section.
Advanced configuration
HTTP auto-instrumentation
In order to generate spans when sending and receiving events, it's necessary to configure the HTTP client from cloudevents with OpenTelemetry instrumentation. The client has two potentially interesting points for instrumentation:
- Outgoing requests
- Incoming requests (via StartReceiver)
To fulfil these, we can use the HTTP auto-instrumentation package from OpenTelemetry:
p, err := cloudevents.NewHTTP(
cloudevents.WithRoundTripper(otelhttp.NewTransport(http.DefaultTransport)),
cloudevents.WithMiddleware(func(next http.Handler) http.Handler {
return otelhttp.NewHandler(next, "receive")
}),
)
The otelhttp.NewTransport
will ensure that spans are generated for each outgoing request, and that the traceparent
header is properly propagated. The otelhttp.NewHandler
will take care of incoming requests, reading the traceparent
header and continuing the trace with a new span.
This already gives some observability "out-of-the-box", but the spans generated only contain common HTTP headers as defined in the HTTP semantic conventions. Another point is that if using another protocol, propagation will not work and spans will not be automatically generated, unless there is an auto-instrumentation library for it.
Because of this, CloudEvents offers the ObservabilityService
interface which is used to generate spans, independently of the chosen protocol. See next how to configure the CloudEvents client to use it.
Using the OTelObservabilityService
The most basic way to configure the CloudEvents client to use the OTelObservabilityService
is:
c, err := cloudevents.NewClient(p, client.WithObservabilityService(otelObs.NewOTelObservabilityService()))
With the above configuration, the spans generated by the OTelObservabilityService
will have:
Span name | cloudevents.client.[event type] [operation name] where [operation name] follows the OpenTelemetry semantic conventions for messaging systems |
Span attributes | The attributes as defined in keys.go |
If you require different span names or extra attributes, you can pass multiple OTelObservabilityServiceOption
options when creating the observability service:
nameFormatter := func(e *cloudevents.Event) string {
return "my.custom.name." + e.Context.GetType()
}
attributesGetter := func(*cloudevents.Event) []attribute.KeyValue {
return []attribute.KeyValue{
attribute.String("my-attr", "some-value"),
}
}
// create the obs service with custom span names and attributes
os := otelObs.NewOTelObservabilityService(
otelObs.WithSpanNameFormatter(nameFormatter),
otelObs.WithSpanAttributesGetter(attributesGetter),
)
c, err := cloudevents.NewClient(p, client.WithObservabilityService(os))
Note: The
nameFormatter
andattributesGetter
functions will be called on each span creation. Avoid doing any heavy processing in them.
Extra types
This package also contains extra types and helper functions that are useful in case you need to access/set the tracecontext
in a more "low-level" way.
They allow to inject(write) and extract(read) tracecontext
from the event. This is particularly useful when dealing with code that has no notion of a "request" nor a context. For example, long-running background processes polling from a queue.
Note: To learn more about the propagation, take a look at the Propagators API SPEC.
Manually extracting/injecting tracecontext from the event
When working with distributed systems, it can be difficult to achieve proper context propagation. For example, a long-running process listening to a topic does not have a "context" concept like a HTTP server receiving requests does.
Note: The OpenTelemetry community is always creating new auto-instrumentation integrations for popular libraries and frameworks. The OpenTelemetry registry lists the integrations that are available. As support increases, new auto-instrumentation libraries can be integrated into the CloudEvents SDK.
For this case, it might be useful to inject
the tracecontext
inside the event before sending it to a queue. Later, the process can extract
it and continue the trace normally. For that we can use the InjectDistributedTracingExtension
and ExtractDistributedTracingExtension
helper functions.
func sendEventToQueue(ctx context.Context, event cloudevents.Event) {
// assuming this function is properly instrumented,
// the ctx contains the current span
// Before sending the event to the queue
// we can inject the tracecontext into the event as a DistributedTracingExtension
otelObs.InjectDistributedTracingExtension(ctx, event)
}
func handleEvent(e cloudevents.Event) {
// here in our long-running process, we don't have a "context"
// if we have the tracecontext in the event, we can
// re-create the context with it and continue the trace:
ctx := otelObs.ExtractDistributedTracingExtension(context.Background(), e)
// ctx now has the tracecontext from the moment when the event was sent.
// All subsequent requests made with this context will be part of the trace.
c, _ := otelObs.NewClientHTTP([]cehttp.Option{}, []client.Option{})
ctx = cloudevents.ContextWithTarget(ctx, "my-other-cloudevents-app")
c.Send(ctx, e)
}
Because we used the context
that was re-created from the event, the call to my-other-cloudevents-app
will be correlated with the initial span. If my-other-cloudevents-app
is also instrumented and itself make more calls, these will also be part of the trace.
Most use-cases are covered by using the InjectDistributedTracingExtension
and ExtractDistributedTracingExtension
helper functions.
CloudEventCarrier
The CloudEventCarrier
is an implementation of the OpenTelemetry TextMapCarrier. Its purpose is to carry the tracecontext
, that is used by propagators later.
CloudEventCarrier
exposes the DistributedTracingExtension
which is populated by the propagator. It works similarly as the HeaderCarrier which allows getting/setting the traceparent
header.
It can be used to get access to the "raw" tracecontext
values (traceparent
and tracestate
). One use case is to inject the tracecontext
from the context
into the carrier, to gain access to the populated DistributedTracingExtension
:
type MyEvent struct {
TraceParent string `json:"traceparent,omitempty"`
TraceState string `json:"tracestate,omitempty"`
}
func injectAndReadTraceParentAndState(ctx context.Context, e cloudevents.Event) {
me := MyEvent{}
// the propagator from OpenTelemetry
prop := propagation.TraceContext{}
carrier := otelObs.NewCloudEventCarrier()
// Injects (writes) the tracecontext into the NewCloudEventCarrier
// Doing so, will set the DistributedTracingExtension fields
prop.Inject(ctx, carrier)
// Here then we have the "raw" access to the tracecontext data
// https://www.w3.org/TR/trace-context/
// e.g. 00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01
me.TraceParent = carrier.Extension.TraceParent
// e.g. congo=t61rcWkgMzE
me.TraceState = carrier.Extension.TraceState
}