Documentation ¶
Overview ¶
Package awssnssqs provides an implementation of pubsub that uses AWS SNS (Simple Notification Service) and SQS (Simple Queueing Service).
URLs ¶
For pubsub.OpenTopic and pubsub.OpenSubscription, awssnssqs registers for the schemes "awssns" and "awssqs" respectively. The default URL opener will use an AWS session with the default credentials and configuration; see https://docs.aws.amazon.com/sdk-for-go/api/aws/session/ for more details. To customize the URL opener, or for more details on the URL format, see URLOpener. See https://github.com/eliben/gocdkx/concepts/urls/ for background information.
Message Delivery Semantics ¶
AWS SNS and SQS combine to support at-least-once semantics; applications must call Message.Ack after processing a message, or it will be redelivered. See https://godoc.org/github.com/eliben/gocdkx/pubsub#hdr-At_most_once_and_At_least_once_Delivery for more background.
Escaping ¶
Go CDK supports all UTF-8 strings; to make this work with providers lacking full UTF-8 support, strings must be escaped (during writes) and unescaped (during reads). The following escapes are required for awssnssqs:
- Metadata keys: Characters other than "a-zA-z0-9_-.", and additionally "." when it's at the start of the key or the previous character was ".", are escaped using "__0x<hex>__". These characters were determined by experimentation.
- Metadata values: Escaped using URL encoding.
- Message body: AWS SNS/SQS only supports UTF-8 strings. See the BodyBase64Encoding enum in TopicOptions for strategies on how to send non-UTF-8 message bodies. By default, non-UTF-8 message bodies are base64 encoded.
As ¶
awssnssqs exposes the following types for As:
- Topic: *sns.SNS
- Subscription: *sqs.SQS
- Message: *sqs.Message
- Message.BeforeSend: *sns.PublishInput
- Error: awserror.Error
Example (OpenSubscription) ¶
package main import ( "context" "log" "github.com/eliben/gocdkx/pubsub" ) func main() { // This example is used in https://github.com/eliben/gocdkx/howto/pubsub/subscribe/#sns // import _ "github.com/eliben/gocdkx/pubsub/awssnssqs" // Variables set up elsewhere: ctx := context.Background() // OpenSubscription creates a *pubsub.Subscription from a URL. // This URL will open the subscription with the URL // "https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue". subscription, err := pubsub.OpenSubscription(ctx, "awssqs://sqs.us-east-2.amazonaws.com/123456789012/"+ "MyQueue?region=us-east-2") if err != nil { log.Fatal(err) } defer subscription.Shutdown(ctx) }
Output:
Example (OpenTopic) ¶
package main import ( "context" "log" "github.com/eliben/gocdkx/pubsub" ) func main() { // This example is used in https://github.com/eliben/gocdkx/howto/pubsub/publish/#sns // import _ "github.com/eliben/gocdkx/pubsub/awssnssqs" // Variables set up elsewhere: ctx := context.Background() const topicARN = "arn:aws:sns:us-east-2:123456789012:MyTopic" topic, err := pubsub.OpenTopic(ctx, "awssns://"+topicARN+"?region=us-east-2") if err != nil { log.Fatal(err) } defer topic.Shutdown(ctx) }
Output:
Index ¶
- Constants
- Variables
- func OpenSubscription(ctx context.Context, sess client.ConfigProvider, qURL string, ...) *pubsub.Subscription
- func OpenTopic(ctx context.Context, sess client.ConfigProvider, topicARN string, ...) *pubsub.Topic
- type BodyBase64Encoding
- type SubscriptionOptions
- type TopicOptions
- type URLOpener
Examples ¶
Constants ¶
const SNSScheme = "awssns"
SNSScheme is the URL scheme for pubsub.OpenTopic awssnssqs registers its URLOpeners under on pubsub.DefaultMux.
const SQSScheme = "awssqs"
SQSScheme is the URL scheme for pubsub.OpenSubscription awssnssqs registers its URLOpeners under on pubsub.DefaultMux.
Variables ¶
var Set = wire.NewSet( SubscriptionOptions{}, TopicOptions{}, URLOpener{}, )
Set holds Wire providers for this package.
Functions ¶
func OpenSubscription ¶
func OpenSubscription(ctx context.Context, sess client.ConfigProvider, qURL string, opts *SubscriptionOptions) *pubsub.Subscription
OpenSubscription opens a subscription based on AWS SQS for the given SQS queue URL. The queue is assumed to be subscribed to some SNS topic, though there is no check for this.
Example ¶
package main import ( "context" "log" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/eliben/gocdkx/pubsub/awssnssqs" ) func main() { // This example is used in https://github.com/eliben/gocdkx/howto/pubsub/subscribe/#sns-ctor // Variables set up elsewhere: ctx := context.Background() // Establish an AWS session. // See https://docs.aws.amazon.com/sdk-for-go/api/aws/session/ for more info. // The region must match the region for "MyQueue". sess, err := session.NewSession(&aws.Config{ Region: aws.String("us-east-2"), }) if err != nil { log.Fatal(err) } // Construct a *pubsub.Subscription. // https://docs.aws.amazon.com/sdk-for-net/v2/developer-guide/QueueURL.html const queueURL = "https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue" subscription := awssnssqs.OpenSubscription(ctx, sess, queueURL, nil) defer subscription.Shutdown(ctx) }
Output:
func OpenTopic ¶
func OpenTopic(ctx context.Context, sess client.ConfigProvider, topicARN string, opts *TopicOptions) *pubsub.Topic
OpenTopic opens a topic that sends to the SNS topic with the given Amazon Resource Name (ARN).
Example ¶
package main import ( "context" "log" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/eliben/gocdkx/pubsub/awssnssqs" ) func main() { // This example is used in https://github.com/eliben/gocdkx/howto/pubsub/publish/#sns-ctor // Variables set up elsewhere: ctx := context.Background() // Establish an AWS session. // See https://docs.aws.amazon.com/sdk-for-go/api/aws/session/ for more info. // The region must match the region for "MyTopic". sess, err := session.NewSession(&aws.Config{ Region: aws.String("us-east-2"), }) if err != nil { log.Fatal(err) } // Create a *pubsub.Topic. const topicARN = "arn:aws:sns:us-east-2:123456789012:MyTopic" topic := awssnssqs.OpenTopic(ctx, sess, topicARN, nil) defer topic.Shutdown(ctx) }
Output:
Types ¶
type BodyBase64Encoding ¶
type BodyBase64Encoding int
BodyBase64Encoding is an enum of strategies for when to base64 message bodies.
const ( // NonUTF8Only means that message bodies that are valid UTF-8 encodings are // sent as-is. Invalid UTF-8 message bodies are base64 encoded, and a // MessageAttribute with key "base64encoded" is added to the message. // When receiving messages, the "base64encoded" attribute is used to determine // whether to base64 decode, and is then filtered out. NonUTF8Only BodyBase64Encoding = 0 // Always means that all message bodies are base64 encoded. // A MessageAttribute with key "base64encoded" is added to the message. // When receiving messages, the "base64encoded" attribute is used to determine // whether to base64 decode, and is then filtered out. Always BodyBase64Encoding = 1 // Never means that message bodies are never base64 encoded. Non-UTF-8 // bytes in message bodies may be modified by SNS/SQS. Never BodyBase64Encoding = 2 )
type SubscriptionOptions ¶
type SubscriptionOptions struct{}
SubscriptionOptions will contain configuration for subscriptions.
type TopicOptions ¶
type TopicOptions struct { // BodyBase64Encoding determines when message bodies are base64 encoded. // The default is NonUTF8Only. BodyBase64Encoding BodyBase64Encoding }
TopicOptions contains configuration options for topics.
type URLOpener ¶
type URLOpener struct { // ConfigProvider configures the connection to AWS. ConfigProvider client.ConfigProvider // TopicOptions specifies the options to pass to OpenTopic. TopicOptions TopicOptions // SubscriptionOptions specifies the options to pass to OpenSubscription. SubscriptionOptions SubscriptionOptions }
URLOpener opens AWS SNS/SQS URLs like "awssns://sns-topic-arn" for topics or "awssqs://sqs-queue-url" for subscriptions.
For topics, the URL's host+path is used as the topic Amazon Resource Name (ARN).
For subscriptions, the URL's host+path is prefixed with "https://" to create the queue URL.
See github.com/eliben/gocdkx/aws/ConfigFromURLParams for supported query parameters that affect the default AWS session.
func (*URLOpener) OpenSubscriptionURL ¶
func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsub.Subscription, error)
OpenSubscriptionURL opens a pubsub.Subscription based on u.