awssnssqs

package
v0.35.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 8, 2023 License: Apache-2.0 Imports: 29 Imported by: 56

Documentation

Overview

Package awssnssqs provides two implementations of pubsub.Topic, one that sends messages to AWS SNS (Simple Notification Service), and one that sends messages to SQS (Simple Queuing Service). It also provides an implementation of pubsub.Subscription that receives messages from SQS.

URLs

For pubsub.OpenTopic, awssnssqs registers for the scheme "awssns" for an SNS topic, and "awssqs" for an SQS topic. For pubsub.OpenSubscription, it registers for the scheme "awssqs".

The default URL opener will use an AWS session with the default credentials and configuration; see https://docs.aws.amazon.com/sdk-for-go/api/aws/session/ for more details. To customize the URL opener, or for more details on the URL format, see URLOpener. See https://gocloud.dev/concepts/urls/ for background information.

Message Delivery Semantics

AWS SQS supports at-least-once semantics; applications must call Message.Ack after processing a message, or it will be redelivered. See https://godoc.org/gocloud.dev/pubsub#hdr-At_most_once_and_At_least_once_Delivery for more background.

Escaping

Go CDK supports all UTF-8 strings; to make this work with services lacking full UTF-8 support, strings must be escaped (during writes) and unescaped (during reads). The following escapes are required for awssnssqs:

  • Metadata keys: Characters other than "a-zA-z0-9_-.", and additionally "." when it's at the start of the key or the previous character was ".", are escaped using "__0x<hex>__". These characters were determined by experimentation.
  • Metadata values: Escaped using URL encoding.
  • Message body: AWS SNS/SQS only supports UTF-8 strings. See the BodyBase64Encoding enum in TopicOptions for strategies on how to send non-UTF-8 message bodies. By default, non-UTF-8 message bodies are base64 encoded.

As

awssnssqs exposes the following types for As:

  • Topic: (V1) *sns.SNS for OpenSNSTopic, *sqs.SQS for OpenSQSTopic; (V2) *snsv2.Client for OpenSNSTopicV2, *sqsv2.Client for OpenSQSTopicV2
  • Subscription: (V1) *sqs.SQS; (V2) *sqsv2.Client
  • Message: (V1) *sqs.Message; (V2) sqstypesv2.Message
  • Message.BeforeSend: (V1) *sns.PublishBatchRequestEntry or *sns.PublishInput (deprecated) for OpenSNSTopic, *sqs.SendMessageBatchRequestEntry or *sqs.SendMessageInput (deprecated) for OpenSQSTopic; (V2) *snsv2.PublishBatchRequestEntry or *snsv2.PublishInput (deprecated) for OpenSNSTopicV2, *sqstypesv2.SendMessageBatchRequestEntry for OpenSQSTopicV2
  • Message.AfterSend: (V1) sns.PublishBatchResultEntry or *sns.PublishOutput (deprecated) for OpenSNSTopic, *sqs.SendMessageBatchResultEntry for OpenSQSTopic; (V2) snstypesv2.PublishBatchResultEntry or *snsv2.PublishOutput (deprecated) for OpenSNSTopicV2, sqstypesv2.SendMessageBatchResultEntry for OpenSQSTopicV2
  • Error: (V1) awserr.Error, (V2) any error type returned by the service, notably smithy.APIError
Example (OpenSNSTopicFromURL)
package main

import (
	"context"
	"log"

	"gocloud.dev/pubsub"
)

func main() {
	// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/awssnssqs"
	// PRAGMA: On gocloud.dev, hide lines until the next blank line.
	ctx := context.Background()

	const topicARN = "arn:aws:sns:us-east-2:123456789012:mytopic"
	// Note the 3 slashes; ARNs have multiple colons and therefore aren't valid
	// as hostnames in the URL.
	topic, err := pubsub.OpenTopic(ctx, "awssns:///"+topicARN+"?region=us-east-2")
	if err != nil {
		log.Fatal(err)
	}
	defer topic.Shutdown(ctx)
}
Output:

Example (OpenSQSTopicFromURL)
package main

import (
	"context"
	"log"

	"gocloud.dev/pubsub"
)

func main() {
	// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/awssnssqs"
	// PRAGMA: On gocloud.dev, hide lines until the next blank line.
	ctx := context.Background()

	// https://docs.aws.amazon.com/sdk-for-net/v2/developer-guide/QueueURL.html
	const queueURL = "sqs.us-east-2.amazonaws.com/123456789012/myqueue"
	topic, err := pubsub.OpenTopic(ctx, "awssqs://"+queueURL+"?region=us-east-2")
	if err != nil {
		log.Fatal(err)
	}
	defer topic.Shutdown(ctx)
}
Output:

Example (OpenSubscriptionFromURL)
package main

import (
	"context"
	"log"

	"gocloud.dev/pubsub"
)

func main() {
	// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/awssnssqs"
	// PRAGMA: On gocloud.dev, hide lines until the next blank line.
	ctx := context.Background()

	// pubsub.OpenSubscription creates a *pubsub.Subscription from a URL.
	// This URL will open the subscription with the URL
	// "https://sqs.us-east-2.amazonaws.com/123456789012/myqueue".
	subscription, err := pubsub.OpenSubscription(ctx,
		"awssqs://sqs.us-east-2.amazonaws.com/123456789012/"+
			"myqueue?region=us-east-2")
	if err != nil {
		log.Fatal(err)
	}
	defer subscription.Shutdown(ctx)
}
Output:

Index

Examples

Constants

View Source
const SNSScheme = "awssns"

SNSScheme is the URL scheme for pubsub.OpenTopic (for an SNS topic) that awssnssqs registers its URLOpeners under on pubsub.DefaultMux.

View Source
const SQSScheme = "awssqs"

SQSScheme is the URL scheme for pubsub.OpenTopic (for an SQS topic) and for pubsub.OpenSubscription that awssnssqs registers its URLOpeners under on pubsub.DefaultMux.

Variables

View Source
var Set = wire.NewSet(
	wire.Struct(new(URLOpener), "ConfigProvider"),
)

Set holds Wire providers for this package.

Functions

func OpenSNSTopic added in v0.14.0

func OpenSNSTopic(ctx context.Context, sess client.ConfigProvider, topicARN string, opts *TopicOptions) *pubsub.Topic

OpenSNSTopic opens a topic that sends to the SNS topic with the given Amazon Resource Name (ARN).

Example
package main

import (
	"context"
	"log"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"gocloud.dev/pubsub/awssnssqs"
)

func main() {
	// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On gocloud.dev, hide lines until the next blank line.
	ctx := context.Background()

	// Establish an AWS session.
	// See https://docs.aws.amazon.com/sdk-for-go/api/aws/session/ for more info.
	// The region must match the region for the SNS topic "mytopic".
	sess, err := session.NewSession(&aws.Config{
		Region: aws.String("us-east-2"),
	})
	if err != nil {
		log.Fatal(err)
	}

	// Create a *pubsub.Topic.
	const topicARN = "arn:aws:sns:us-east-2:123456789012:mytopic"
	topic := awssnssqs.OpenSNSTopic(ctx, sess, topicARN, nil)
	defer topic.Shutdown(ctx)
}
Output:

func OpenSNSTopicV2 added in v0.25.0

func OpenSNSTopicV2(ctx context.Context, client *snsv2.Client, topicARN string, opts *TopicOptions) *pubsub.Topic

OpenSNSTopicV2 opens a topic that sends to the SNS topic with the given Amazon Resource Name (ARN), using AWS SDK V2.

func OpenSQSTopic added in v0.14.0

func OpenSQSTopic(ctx context.Context, sess client.ConfigProvider, qURL string, opts *TopicOptions) *pubsub.Topic

OpenSQSTopic opens a topic that sends to the SQS topic with the given SQS queue URL.

Example
package main

import (
	"context"
	"log"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"gocloud.dev/pubsub/awssnssqs"
)

func main() {
	// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On gocloud.dev, hide lines until the next blank line.
	ctx := context.Background()

	// Establish an AWS session.
	// See https://docs.aws.amazon.com/sdk-for-go/api/aws/session/ for more info.
	// The region must match the region for the SQS queue "myqueue".
	sess, err := session.NewSession(&aws.Config{
		Region: aws.String("us-east-2"),
	})
	if err != nil {
		log.Fatal(err)
	}

	// Create a *pubsub.Topic.
	const queueURL = "https://sqs.us-east-2.amazonaws.com/123456789012/myqueue"
	topic := awssnssqs.OpenSQSTopic(ctx, sess, queueURL, nil)
	defer topic.Shutdown(ctx)
}
Output:

func OpenSQSTopicV2 added in v0.25.0

func OpenSQSTopicV2(ctx context.Context, client *sqsv2.Client, qURL string, opts *TopicOptions) *pubsub.Topic

OpenSQSTopicV2 opens a topic that sends to the SQS topic with the given SQS queue URL, using AWS SDK V2.

func OpenSubscription

func OpenSubscription(ctx context.Context, sess client.ConfigProvider, qURL string, opts *SubscriptionOptions) *pubsub.Subscription

OpenSubscription opens a subscription based on AWS SQS for the given SQS queue URL. The queue is assumed to be subscribed to some SNS topic, though there is no check for this.

Example
package main

import (
	"context"
	"log"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"gocloud.dev/pubsub/awssnssqs"
)

func main() {
	// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On gocloud.dev, hide lines until the next blank line.
	ctx := context.Background()

	// Establish an AWS session.
	// See https://docs.aws.amazon.com/sdk-for-go/api/aws/session/ for more info.
	// The region must match the region for "MyQueue".
	sess, err := session.NewSession(&aws.Config{
		Region: aws.String("us-east-2"),
	})
	if err != nil {
		log.Fatal(err)
	}

	// Construct a *pubsub.Subscription.
	// https://docs.aws.amazon.com/sdk-for-net/v2/developer-guide/QueueURL.html
	const queueURL = "https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"
	subscription := awssnssqs.OpenSubscription(ctx, sess, queueURL, nil)
	defer subscription.Shutdown(ctx)
}
Output:

func OpenSubscriptionV2 added in v0.25.0

func OpenSubscriptionV2(ctx context.Context, client *sqsv2.Client, qURL string, opts *SubscriptionOptions) *pubsub.Subscription

OpenSubscriptionV2 opens a subscription based on AWS SQS for the given SQS queue URL, using AWS SDK V2. The queue is assumed to be subscribed to some SNS topic, though there is no check for this.

func OpenTopic

func OpenTopic(ctx context.Context, sess client.ConfigProvider, topicARN string, opts *TopicOptions) *pubsub.Topic

OpenTopic is a shortcut for OpenSNSTopic, provided for backwards compatibility.

Types

type BodyBase64Encoding

type BodyBase64Encoding int

BodyBase64Encoding is an enum of strategies for when to base64 message bodies.

const (
	// NonUTF8Only means that message bodies that are valid UTF-8 encodings are
	// sent as-is. Invalid UTF-8 message bodies are base64 encoded, and a
	// MessageAttribute with key "base64encoded" is added to the message.
	// When receiving messages, the "base64encoded" attribute is used to determine
	// whether to base64 decode, and is then filtered out.
	NonUTF8Only BodyBase64Encoding = 0
	// Always means that all message bodies are base64 encoded.
	// A MessageAttribute with key "base64encoded" is added to the message.
	// When receiving messages, the "base64encoded" attribute is used to determine
	// whether to base64 decode, and is then filtered out.
	Always BodyBase64Encoding = 1
	// Never means that message bodies are never base64 encoded. Non-UTF-8
	// bytes in message bodies may be modified by SNS/SQS.
	Never BodyBase64Encoding = 2
)

type SubscriptionOptions

type SubscriptionOptions struct {
	// Raw determines how the Subscription will process message bodies.
	//
	// If the subscription is expected to process messages sent directly to
	// SQS, or messages from SNS topics configured to use "raw" delivery,
	// set this to true. Message bodies will be passed through untouched.
	//
	// If false, the Subscription will use best-effort heuristics to
	// identify whether message bodies are raw or SNS JSON; this may be
	// inefficient for raw messages.
	//
	// See https://aws.amazon.com/sns/faqs/#Raw_message_delivery.
	Raw bool

	// NackLazy determines what Nack does.
	//
	// By default, Nack uses ChangeMessageVisibility to set the VisibilityTimeout
	// for the nacked message to 0, so that it will be redelivered immediately.
	// Set NackLazy to true to bypass this behavior; Nack will do nothing,
	// and the message will be redelivered after the existing VisibilityTimeout
	// expires (defaults to 30s, but can be configured per queue).
	//
	// See https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html.
	NackLazy bool

	// WaitTime passed to ReceiveMessage to enable long polling.
	// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-short-and-long-polling.html#sqs-long-polling.
	// Note that a non-zero WaitTime can delay delivery of messages
	// by up to that duration.
	WaitTime time.Duration

	// ReceiveBatcherOptions adds constraints to the default batching done for receives.
	ReceiveBatcherOptions batcher.Options

	// AckBatcherOptions adds constraints to the default batching done for acks.
	AckBatcherOptions batcher.Options
}

SubscriptionOptions will contain configuration for subscriptions.

type TopicOptions

type TopicOptions struct {
	// BodyBase64Encoding determines when message bodies are base64 encoded.
	// The default is NonUTF8Only.
	BodyBase64Encoding BodyBase64Encoding

	// BatcherOptions adds constraints to the default batching done for sends.
	BatcherOptions batcher.Options
}

TopicOptions contains configuration options for topics.

type URLOpener

type URLOpener struct {
	// UseV2 indicates whether the AWS SDK V2 should be used.
	UseV2 bool

	// ConfigProvider configures the connection to AWS.
	// It must be set to a non-nil value if UseV2 is false.
	ConfigProvider client.ConfigProvider

	// TopicOptions specifies the options to pass to OpenTopic.
	TopicOptions TopicOptions
	// SubscriptionOptions specifies the options to pass to OpenSubscription.
	SubscriptionOptions SubscriptionOptions
}

URLOpener opens AWS SNS/SQS URLs like "awssns:///sns-topic-arn" for SNS topics or "awssqs://sqs-queue-url" for SQS topics and subscriptions.

For SNS topics, the URL's host+path is used as the topic Amazon Resource Name (ARN). Since ARNs have ":" in them, and ":" precedes a port in URL hostnames, leave the host blank and put the ARN in the path (e.g., "awssns:///arn:aws:service:region:accountid:resourceType/resourcePath").

For SQS topics and subscriptions, the URL's host+path is prefixed with "https://" to create the queue URL.

Use "awssdk=v1" to force using AWS SDK v1, "awssdk=v2" to force using AWS SDK v2, or anything else to accept the default.

For V1, see gocloud.dev/aws/ConfigFromURLParams for supported query parameters for overriding the aws.Session from the URL. For V2, see gocloud.dev/aws/V2ConfigFromURLParams.

In addition, the following query parameters are supported:

  • raw (for "awssqs" Subscriptions only): sets SubscriberOptions.Raw. The value must be parseable by `strconv.ParseBool`.
  • nacklazy (for "awssqs" Subscriptions only): sets SubscriberOptions.NackLazy. The value must be parseable by `strconv.ParseBool`.
  • waittime: sets SubscriberOptions.WaitTime, in time.ParseDuration formats.

See gocloud.dev/aws/ConfigFromURLParams for other query parameters that affect the default AWS session.

func (*URLOpener) OpenSubscriptionURL

func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsub.Subscription, error)

OpenSubscriptionURL opens a pubsub.Subscription based on u.

func (*URLOpener) OpenTopicURL

func (o *URLOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic, error)

OpenTopicURL opens a pubsub.Topic based on u.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL