Documentation ¶
Overview ¶
Package ocsqs provides a drop in replacement for your exisitng SQS client providing methods for persisiting and creating spans from SQS message attributes.
client := ocsqs.New(sqs.New(session))
Raw Message Delivery ¶
SNS allows you to create subscriptions with RawMessageDelivery enabled. If you have connected your SQS queue to an SNS topic with RawMessageDelivery you must create the ocsqs.SQS client also with RawMessageDelivery enabled as this affects how span contexts are retrieved from message attributes.
client := ocsqs.New(sqs.New(session), ocsqs.WithRawMessageDelivery())
Rember to set the MessageAttributeNames field on ReceiveMessageInput to All to ensure message attributes are added to the message:
msgs, err := sqs.ReceiveMessage(&sqs.ReceiveMessageInput{ QueueUrl: aws.String("your-queue-url"), MessageAttributeNames: []*string{aws.String("All")}, })
Index ¶
- func DefaultFormatSpanName(msg *sqs.Message) string
- func GetMessageAttributes(msg *sqs.Message) map[string]*sqs.MessageAttributeValue
- func SendMessageInputWithSpan(ctx context.Context, in *sqs.SendMessageInput, opts ...Option) *sqs.SendMessageInput
- func SpanFromContext(ctx context.Context) (trace.SpanContext, bool)
- func StartSpan(ctx context.Context, msg *sqs.Message, opts ...Option) (context.Context, *trace.Span)
- func WithContext(ctx context.Context, msg *sqs.Message, opts ...Option) context.Context
- type FormatSpanNameFunc
- type GetStartOptionsFunc
- type Option
- type Options
- type SQS
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DefaultFormatSpanName ¶
DefaultFormatSpanName formats a span name according to the given SQS message.
func GetMessageAttributes ¶
func GetMessageAttributes(msg *sqs.Message) map[string]*sqs.MessageAttributeValue
GetMessageAttributes returns message attributes from an SQS message
func SendMessageInputWithSpan ¶
func SendMessageInputWithSpan(ctx context.Context, in *sqs.SendMessageInput, opts ...Option) *sqs.SendMessageInput
SendMessageInputWithSpan adds span data to message input to propagate spans being send through SQS directly.
func SpanFromContext ¶
func SpanFromContext(ctx context.Context) (trace.SpanContext, bool)
SpanFromContext will return a span context from context
func StartSpan ¶
func StartSpan(ctx context.Context, msg *sqs.Message, opts ...Option) (context.Context, *trace.Span)
StartSpan starts a span from an SQS Message
Example ¶
package main import ( "context" "encoding/json" "fmt" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/sqs" "go.krak3n.codes/ocaws/ocawstest" "go.krak3n.codes/ocaws/ocsqs" ) func main() { attr, _ := json.Marshal(map[string]map[string]string{ b3.TraceIDKey: map[string]string{ "Value": ocawstest.DefaultTraceID.String(), }, b3.SpanIDKey: map[string]string{ "Value": ocawstest.DefaultSpanID.String(), }, b3.SpanSampledKey: map[string]string{ "Value": "0", }, }) body, _ := json.Marshal(map[string]json.RawMessage{ "MessageAttributes": attr, }) msg := &sqs.Message{ Body: aws.String(string(body)), } ctx := context.Background() ctx, span := ocsqs.StartSpan(ctx, msg) defer span.End() if span != nil { sc := span.SpanContext() fmt.Println("TraceID:", sc.TraceID.String()) fmt.Println("SpanID:", sc.SpanID.String()) fmt.Println("Span Sampled:", sc.IsSampled()) } }
Output: TraceID: 616263646566676869676b6c6d6e6f71 SpanID: 6162636465666768 Span Sampled: false
Example (With_raw_message_delivery) ¶
package main import ( "context" "fmt" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/sqs" "go.krak3n.codes/ocaws/ocawstest" "go.krak3n.codes/ocaws/ocsqs" ) func main() { // Create a message with trace attributes, publish a message via SNS or SQS msg := &sqs.Message{ MessageAttributes: map[string]*sqs.MessageAttributeValue{ b3.TraceIDKey: { DataType: aws.String("String"), StringValue: aws.String(ocawstest.DefaultTraceID.String()), }, b3.SpanIDKey: { DataType: aws.String("String"), StringValue: aws.String(ocawstest.DefaultSpanID.String()), }, b3.SpanSampledKey: { DataType: aws.String("String"), StringValue: aws.String("0"), }, }, } ctx := context.Background() ctx, span := ocsqs.StartSpan(ctx, msg) defer span.End() if span != nil { sc := span.SpanContext() fmt.Println("TraceID:", sc.TraceID.String()) fmt.Println("SpanID:", sc.SpanID.String()) fmt.Println("Span Sampled:", sc.IsSampled()) } }
Output: TraceID: 616263646566676869676b6c6d6e6f71 SpanID: 6162636465666768 Span Sampled: false
func WithContext ¶
WithContext will create a new span context and place it on the given context from a message. This is useful if you wish to defer the starting of a span
Example ¶
package main import ( "context" "fmt" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/sqs" "go.krak3n.codes/ocaws/ocawstest" "go.krak3n.codes/ocaws/ocsqs" ) func main() { // Create a message with trace attributes, publish a message via SNS or SQS msg := &sqs.Message{ MessageAttributes: map[string]*sqs.MessageAttributeValue{ b3.TraceIDKey: { DataType: aws.String("String"), StringValue: aws.String(ocawstest.DefaultTraceID.String()), }, b3.SpanIDKey: { DataType: aws.String("String"), StringValue: aws.String(ocawstest.DefaultSpanID.String()), }, b3.SpanSampledKey: { DataType: aws.String("String"), StringValue: aws.String("0"), }, }, } ctx := context.Background() sc, ok := ocsqs.SpanFromContext(ocsqs.WithContext(ctx, msg)) if ok { fmt.Println("TraceID:", sc.TraceID.String()) fmt.Println("SpanID:", sc.SpanID.String()) fmt.Println("Span Sampled:", sc.IsSampled()) } }
Output: TraceID: 616263646566676869676b6c6d6e6f71 SpanID: 6162636465666768 Span Sampled: false
Types ¶
type FormatSpanNameFunc ¶
A FormatSpanNameFunc formats a span name from the sqs message
type GetStartOptionsFunc ¶
type GetStartOptionsFunc func(*sqs.Message) trace.StartOptions
A GetStartOptionsFunc returns start options on message by message basis
type Option ¶
type Option func(*Options)
Option overrides default Options configuration
func WithFormatSpanName ¶
func WithFormatSpanName(fn FormatSpanNameFunc) Option
WithFormatSpanName sets the SQS clients formant name func
func WithGetStartOptions ¶
func WithGetStartOptions(fn GetStartOptionsFunc) Option
WithGetStartOptions sets the SQS clients GetStartOptions func
func WithPropagator ¶
func WithPropagator(p propagation.Propagator) Option
WithPropagator sets the clients propagator
func WithStartOptions ¶
func WithStartOptions(s trace.StartOptions) Option
WithStartOptions sets the clients StartOptions
type Options ¶
type Options struct { // Propagator defines how traces will be propagated, if not specified this // will be B3 Propagator propagation.Propagator // StartOptions are applied to the span started by this Handler around each // message. // StartOptions.SpanKind will always be set to trace.SpanKindServer // for spans started by this transport. StartOptions trace.StartOptions // GetStartOptions allows to set start options per message. If set, // StartOptions is going to be ignored. GetStartOptions GetStartOptionsFunc // FormatSpanName formats the span name based on the given sqs.Message. See // DefaultFormatSpanName for the default format FormatSpanName FormatSpanNameFunc }
type SQS ¶
SQS provides methods for sending messages with trace attributes and starting spans from messages. It embeds the SQS client allowing this to be used as a drop in replacement.
func New ¶
New constructs a new SQS client with default configuration values. Use Option functions to customize configuration. By default the propagator used is B3.
func (*SQS) SendMessageWithContext ¶
func (s *SQS) SendMessageWithContext(ctx aws.Context, input *sqs.SendMessageInput, opts ...request.Option) (*sqs.SendMessageOutput, error)
SendMessageWithContext shadows the sqs clients SendMessageWithContext adding trace span data to the send message input
Example ¶
package main import ( "context" "fmt" "log" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/sqs" "go.krak3n.codes/ocaws" "go.krak3n.codes/ocaws/ocsqs" "go.krak3n.codes/ocaws/propagation/b3" "go.opencensus.io/trace" ) var sess *session.Session func main() { ctx, span := trace.StartSpan(context.Background(), "sqs/ExampleSQS_SendMessageWithContext") defer span.End() // Create SNS Client c := ocsqs.New(sqs.New(sess)) // Create Topic q, err := c.CreateQueue(&sqs.CreateQueueInput{ QueueName: aws.String("foo"), }) if err != nil { log.Fatal(err) } // Publish message with span context message attributes in := &sqs.SendMessageInput{ QueueUrl: q.QueueUrl, MessageBody: aws.String(`{"foo":"bar"}`), } if _, err := c.SendMessageWithContext(ctx, in); err != nil { log.Fatal(err) } fmt.Println("TraceID:", *in.MessageAttributes[b3.TraceIDKey].StringValue) fmt.Println("SpanID:", *in.MessageAttributes[b3.SpanIDKey].StringValue) fmt.Println("Span Sampled:", *in.MessageAttributes[b3.SpanSampledKey].StringValue) fmt.Println("Trace Queue URL:", *in.MessageAttributes[ocaws.TraceQueueURL].StringValue) }
Output: TraceID: 616263646566676869676b6c6d6e6f71 SpanID: 6162636465666768 Span Sampled: 0 Trace Queue URL: http://localhost:4576/queue/foo