sqs

package
v0.0.0-...-5e471cc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 13, 2024 License: MIT Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const (
	AWSStringDataType = "String"
	AWSNumberDataType = "Number"
	AWSBinaryDataType = "Binary"
)
View Source
const NoSleep time.Duration = -1
View Source
const UUIDAttribute = "_watermill_message_uuid"

Variables

This section is empty.

Functions

func GenerateCreateQueueInputDefault

func GenerateCreateQueueInputDefault(ctx context.Context, queueName QueueName, attrs QueueConfigAttributes) (*sqs.CreateQueueInput, error)

func GenerateDeleteMessageInputDefault

func GenerateDeleteMessageInputDefault(ctx context.Context, queueURL QueueURL, receiptHandle *string) (*sqs.DeleteMessageInput, error)

func GenerateGetQueueUrlInputDefault

func GenerateGetQueueUrlInputDefault(ctx context.Context, topic string) (*sqs.GetQueueUrlInput, error)

func GenerateReceiveMessageInputDefault

func GenerateReceiveMessageInputDefault(ctx context.Context, queueURL QueueURL) (*sqs.ReceiveMessageInput, error)

func GenerateSendMessageInputDefault

func GenerateSendMessageInputDefault(ctx context.Context, queueURL QueueURL, msg *types.Message) (*sqs.SendMessageInput, error)

Types

type DefaultMarshalerUnmarshaler

type DefaultMarshalerUnmarshaler struct{}

func (DefaultMarshalerUnmarshaler) Marshal

func (DefaultMarshalerUnmarshaler) Unmarshal

type DefaultRmMongoMarshalerUnmarshaler

type DefaultRmMongoMarshalerUnmarshaler struct {
	MongoCollection *mongo.Collection
}

func (DefaultRmMongoMarshalerUnmarshaler) Marshal

func (DefaultRmMongoMarshalerUnmarshaler) Unmarshal

type GenerateCreateQueueInputFunc

type GenerateCreateQueueInputFunc func(ctx context.Context, queueName QueueName, attrs QueueConfigAttributes) (*sqs.CreateQueueInput, error)

type GenerateDeleteMessageInputFunc

type GenerateDeleteMessageInputFunc func(ctx context.Context, queueURL QueueURL, receiptHandle *string) (*sqs.DeleteMessageInput, error)

type GenerateGetQueueUrlInputFunc

type GenerateGetQueueUrlInputFunc func(ctx context.Context, topic string) (*sqs.GetQueueUrlInput, error)

type GenerateQueueUrlResolver

type GenerateQueueUrlResolver struct {
	AwsRegion    string
	AwsAccountID string
}

GenerateQueueUrlResolver is a resolver that generates queue URL based on AWS region and account ID.

func (GenerateQueueUrlResolver) ResolveQueueUrl

type GenerateReceiveMessageInputFunc

type GenerateReceiveMessageInputFunc func(ctx context.Context, queueURL QueueURL) (*sqs.ReceiveMessageInput, error)

type GenerateSendMessageInputFunc

type GenerateSendMessageInputFunc func(ctx context.Context, queueURL QueueURL, msg *types.Message) (*sqs.SendMessageInput, error)

type GetQueueUrlByNameUrlResolver

type GetQueueUrlByNameUrlResolver struct {
	// contains filtered or unexported fields
}

GetQueueUrlByNameUrlResolver resolves queue url by calling AWS API. Topic name passed to Publisher.Publish, Subscriber.Subscribe is mapped to queue name.

By default, queue URL is cached. It can be changed with GetQueueUrlByNameUrlResolverConfig.

func (*GetQueueUrlByNameUrlResolver) ResolveQueueUrl

type GetQueueUrlByNameUrlResolverConfig

type GetQueueUrlByNameUrlResolverConfig struct {
	// DoNotCacheQueues disables caching of queue URLs.
	DoNotCacheQueues bool

	// GenerateGetQueueUrlInput generates *sqs.GetQueueUrlInput for AWS SDK.
	GenerateGetQueueUrlInput GenerateGetQueueUrlInputFunc
}

type Marshaler

type Marshaler interface {
	Marshal(msg *message.Message) (*types.Message, error)
}

type OverrideEndpointResolver

type OverrideEndpointResolver struct {
	Endpoint transport.Endpoint
}

OverrideEndpointResolver is a custom endpoint resolver that always returns the same endpoint. It can be used to use AWS emulators like Localstack.

For example: import (

"github.com/ThreeDotsLabs/watermill-aws/sqs"
amazonsqs "github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/smithy-go/transport"

)

pub, err := sqs.NewPublisher(sqs.PublisherConfig{
		AWSConfig: cfg,
		Marshaler: sqs.DefaultMarshalerUnmarshaler{},
		OptFns: []func(*amazonsqs.Options){
			amazonsqs.WithEndpointResolverV2(sqs.OverrideEndpointResolver{
				Endpoint: transport.Endpoint{
					URI: url.URL{Scheme: "http", Host: "localstack:4566"},
				},
			}),
		},
	}, logger)

func (OverrideEndpointResolver) ResolveEndpoint

func (o OverrideEndpointResolver) ResolveEndpoint(ctx context.Context, params sqs.EndpointParameters) (transport.Endpoint, error)

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

func NewPublisher

func NewPublisher(config PublisherConfig, logger watermill.LoggerAdapter) (*Publisher, error)

func (*Publisher) Close

func (p *Publisher) Close() error

func (*Publisher) GetQueueArn

func (p *Publisher) GetQueueArn(ctx context.Context, url *QueueURL) (*QueueArn, error)

func (*Publisher) GetQueueUrl

func (p *Publisher) GetQueueUrl(ctx context.Context, topic string, createIfNotExists bool) (QueueName, QueueURL, error)

func (*Publisher) Publish

func (p *Publisher) Publish(topic string, messages ...*message.Message) error

type PublisherConfig

type PublisherConfig struct {
	// AWSConfig is the AWS configuration.
	AWSConfig aws.Config

	MongoConfig *mongo.Database

	// OptFns are options for the SQS client.
	OptFns []func(*sqs.Options)

	// QueueConfigAttributes is a struct that holds the attributes of an SQS queue.
	CreateQueueConfig QueueConfigAttributes

	// DoNotCreateQueueIfNotExists disables creating the queue if it does not exist.
	DoNotCreateQueueIfNotExists bool

	// QueueUrlResolver is a function that resolves queue URL.
	QueueUrlResolver QueueUrlResolver

	// GenerateSendMessageInput generates *sqs.SendMessageInput for AWS SDK.
	GenerateSendMessageInput GenerateSendMessageInputFunc

	// GenerateCreateQueueInput generates *sqs.CreateQueueInput for AWS SDK.
	GenerateCreateQueueInput GenerateCreateQueueInputFunc

	Marshaler Marshaler
}

func (*PublisherConfig) Validate

func (c *PublisherConfig) Validate() error

type QueueArn

type QueueArn string

QueueArn is an ARN of the queue.

type QueueConfigAttributes

type QueueConfigAttributes struct {
	// DelaySeconds – The length of time, in seconds, for which the delivery of all messages in the queue is delayed.
	// Valid values: An integer from 0 to 900 (15 minutes). Default: 0.
	DelaySeconds string `json:"DelaySeconds,omitempty"`

	// MaximumMessageSize – The limit of how many bytes a message can contain before Amazon SQS rejects it.
	// Valid values: An integer from 1,024 bytes (1 KiB) up to 262,144 bytes (256 KiB). Default: 262,144 (256 KiB).
	MaximumMessageSize string `json:"MaximumMessageSize,omitempty"`

	// MessageRetentionPeriod – The length of time, in seconds, for which Amazon SQS retains a message.
	// Valid values: An integer representing seconds, from 60 (1 minute) to 1,209,600 (14 days).
	// Default: 345,600 (4 days).
	//
	// When you change a queue's attributes, the change can take up to 60 seconds for most of the attributes to
	// propagate throughout the Amazon SQS system. Changes made to the MessageRetentionPeriod attribute can take up to
	// 15 minutes and will impact existing messages in the queue potentially causing them to be expired and deleted if
	// the MessageRetentionPeriod is reduced below the age of existing messages.
	MessageRetentionPeriod string `json:"MessageRetentionPeriod,omitempty"`

	// Policy – The queue's policy. A valid AWS policy. For more information about policy structure, see
	// [Overview of AWS IAM Policies](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies.html) in the
	// AWS Identity and Access Management User Guide.
	Policy string `json:"Policy,omitempty"`

	// ReceiveMessageWaitTimeSeconds – The length of time, in seconds, for which a ReceiveMessage action waits for a
	// message to arrive.
	// Valid values: An integer from 0 to 20 (seconds). Default: 0.
	ReceiveMessageWaitTimeSeconds string `json:"ReceiveMessageWaitTimeSeconds,omitempty"`

	// RedrivePolicy – The string that includes the parameters for the dead-letter queue functionality of the source
	// queue as a JSON object. The parameters are as follows:
	RedrivePolicy string `json:"RedrivePolicy,omitempty"`

	// DeadLetterTargetArn – The Amazon Resource Name (ARN) of the dead-letter queue to which Amazon SQS moves
	// messages after the value of maxReceiveCount is exceeded.
	DeadLetterTargetArn string `json:"deadLetterTargetArn,omitempty"`

	// MaxReceiveCount – The number of times a message is delivered to the source queue before being moved to the
	// dead-letter queue. Default: 10. When the ReceiveCount for a message exceeds the maxReceiveCount for a queue,
	// Amazon SQS moves the message to the dead-letter-queue.
	MaxReceiveCount string `json:"maxReceiveCount,omitempty"`

	// VisibilityTimeout – The visibility timeout for the queue, in seconds.
	// Valid values: An integer from 0 to 43,200 (12 hours). Default: 30.
	//
	// For more information about the visibility timeout, see
	// [Visibility Timeout](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html) in the Amazon SQS Developer Guide.
	VisibilityTimeout string `json:"VisibilityTimeout,omitempty"`

	// KmsMasterKeyId – The ID of an AWS managed customer master key (CMK) for Amazon SQS or a custom CMK.
	// For more information, see [Key Terms](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-server-side-encryption.html#sqs-sse-key-terms).
	//
	// While the alias of the AWS-managed CMK for Amazon SQS is always
	// alias/aws/sqs, the alias of a custom CMK can, for example, be alias/MyAlias.
	// For more examples, see [KeyId](https://docs.aws.amazon.com/kms/latest/APIReference/API_DescribeKey.html#API_DescribeKey_RequestParameters)
	// in the AWS Key Management Service API Reference.
	KmsMasterKeyId string `json:"KmsMasterKeyId,omitempty"`

	// KmsDataKeyReusePeriodSeconds – The length of time, in seconds, for which Amazon SQS can reuse a [data key](https://docs.aws.amazon.com/kms/latest/developerguide/concepts.html#data-keys)
	// to encrypt or decrypt messages before calling AWS KMS again.
	// An integer representing seconds, between 60 seconds (1 minute) and 86,400 seconds (24 hours).
	// Default: 300 (5 minutes).
	//
	// A shorter time period provides better security but results in more calls to KMS which might incur charges after Free Tier.
	// For more information, see How Does the [Data Key Reuse Period Work?](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-server-side-encryption.html#sqs-how-does-the-data-key-reuse-period-work).
	KmsDataKeyReusePeriodSeconds string `json:"KmsDataKeyReusePeriodSeconds,omitempty"`

	// SqsManagedSseEnabled – Enables server-side queue encryption using SQS owned encryption keys.
	// Only one server-side encryption option is supported per queue (for example, [SSE-KMS](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-configure-sse-existing-queue.html) or [SSE-SQS](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-configure-sqs-sse-queue.html).
	SqsManagedSseEnabled string `json:"SqsManagedSseEnabled,omitempty"`

	// FifoQueue - Designates a queue as FIFO. Valid values: true, false. Default: false.
	FifoQueue QueueConfigAttributesBool `json:"FifoQueue,omitempty"`

	// If you don't provide a MessageDeduplicationId and the queue doesn't have ContentBasedDeduplication set, the
	// action fails with an error.
	// If the queue has ContentBasedDeduplication set, your MessageDeduplicationId overrides the generated one.
	// When ContentBasedDeduplication is in effect, messages with identical content sent within the deduplication
	// interval are treated as duplicates and only one copy of the message is delivered.
	//
	// If you send one message with ContentBasedDeduplication enabled and then another message with a
	// MessageDeduplicationId that is the same as the one generated for the first MessageDeduplicationId, the two
	// messages are treated as duplicates and only one copy of the message is delivered.
	ContentBasedDeduplication QueueConfigAttributesBool `json:"ContentBasedDeduplication,omitempty"`

	// DeduplicationScope – Specifies whether message deduplication occurs at the message group or queue level.
	// Valid values are messageGroup and queue.
	//
	// Apply only to [high throughput for FIFO queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/high-throughput-fifo.html).
	DeduplicationScope string `json:"DeduplicationScope,omitempty"`

	// FifoThroughputLimit – Specifies whether the FIFO queue throughput quota applies to the entire queue or per message group.
	// Valid values are perQueue and perMessageGroupId.
	// The perMessageGroupId value is allowed only when the value for DeduplicationScope is messageGroup.
	// Apply only to [high throughput for FIFO queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/high-throughput-fifo.html).
	FifoThroughputLimit string `json:"FifoThroughputLimit,omitempty"`

	// CustomAttributes is a map of custom attributes that are not mapped to the struct fields.
	CustomAttributes map[string]string `json:"-"`
}

QueueConfigAttributes is a struct that holds the attributes of an SQS queue. https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SetQueueAttributes.html

func (QueueConfigAttributes) Attributes

func (q QueueConfigAttributes) Attributes() (map[string]string, error)

type QueueConfigAttributesBool

type QueueConfigAttributesBool bool

QueueConfigAttributesBool is a custom type for bool values in QueueConfigAttributes that supports marshaling to string.

func (QueueConfigAttributesBool) MarshalText

func (q QueueConfigAttributesBool) MarshalText() ([]byte, error)

type QueueName

type QueueName string

QueueName is a name of the queue.

type QueueURL

type QueueURL string

QueueURL is a URL of the queue.

type QueueUrlResolver

type QueueUrlResolver interface {
	ResolveQueueUrl(ctx context.Context, params ResolveQueueUrlParams) (QueueUrlResolverResult, error)
}

QueueUrlResolver resolves queue URL by topic passed to Publisher.Publish, Subscriber.Subscribe.

type QueueUrlResolverResult

type QueueUrlResolverResult struct {
	QueueName QueueName

	// QueueURL is not present if queue doesn't exist.
	QueueURL *QueueURL

	// Exists says if queue exists.
	// May be nil, if resolver doesn't have information about queue existence.
	Exists *bool
}

type ResolveQueueUrlParams

type ResolveQueueUrlParams struct {
	// Topic passed to Publisher.Publish, Subscriber.Subscribe, etc.
	// It may be mapped to a different name by QueueUrlResolver.
	Topic     string
	SqsClient *sqs.Client
	Logger    watermill.LoggerAdapter
}

type RmMongoMarshaler

type RmMongoMarshaler interface {
	Marshal(msg *message.Message) (*types.Message, error)
}

type RmMongoUnmarshaler

type RmMongoUnmarshaler interface {
	Unmarshal(msg *types.Message) (*message.Message, error)
}

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

func NewSubscriber

func NewSubscriber(config SubscriberConfig, logger watermill.LoggerAdapter) (*Subscriber, error)

func (*Subscriber) Close

func (s *Subscriber) Close() error

func (*Subscriber) GetQueueArn

func (s *Subscriber) GetQueueArn(ctx context.Context, url *QueueURL) (*QueueArn, error)

func (*Subscriber) GetQueueUrl

func (s *Subscriber) GetQueueUrl(ctx context.Context, topic string) (*QueueURL, error)

func (*Subscriber) Subscribe

func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error)

func (*Subscriber) SubscribeInitialize

func (s *Subscriber) SubscribeInitialize(topic string) error

func (*Subscriber) SubscribeInitializeWithContext

func (s *Subscriber) SubscribeInitializeWithContext(ctx context.Context, topic string) error

type SubscriberConfig

type SubscriberConfig struct {
	// AWSConfig is the AWS configuration.
	AWSConfig aws.Config

	MongoDb *mongo.Database

	// OptFns are options for the SQS client.
	OptFns []func(*sqs.Options)

	// DoNotCreateQueueIfNotExists disables creating the queue if it does not exist.
	DoNotCreateQueueIfNotExists bool

	// QueueUrlResolver is a function that resolves the queue name to the queue URL.
	QueueUrlResolver QueueUrlResolver

	// ReconnectRetrySleep is the time to sleep between reconnect attempts.
	ReconnectRetrySleep time.Duration

	// QueueConfigAttributes is a struct that holds the attributes of an SQS queue.
	QueueConfigAttributes QueueConfigAttributes

	// GenerateCreateQueueInput generates *sqs.CreateQueueInput for AWS SDK.
	GenerateCreateQueueInput GenerateCreateQueueInputFunc

	// GenerateReceiveMessageInput generates *sqs.ReceiveMessageInput for AWS SDK.
	GenerateReceiveMessageInput GenerateReceiveMessageInputFunc

	// GenerateDeleteMessageInput generates *sqs.DeleteMessageInput for AWS SDK.
	GenerateDeleteMessageInput GenerateDeleteMessageInputFunc

	Unmarshaler Unmarshaler
}

func (*SubscriberConfig) SetDefaults

func (c *SubscriberConfig) SetDefaults()

func (SubscriberConfig) Validate

func (c SubscriberConfig) Validate() error

type TransparentUrlResolver

type TransparentUrlResolver struct{}

TransparentUrlResolver is a resolver that uses topic passed to Publisher.Publish, Subscriber.Subscribe as queue URL. In this case, you should pass queue URL as topic.

func (TransparentUrlResolver) ResolveQueueUrl

func (p TransparentUrlResolver) ResolveQueueUrl(ctx context.Context, params ResolveQueueUrlParams) (res QueueUrlResolverResult, err error)

type Unmarshaler

type Unmarshaler interface {
	Unmarshal(msg *types.Message) (*message.Message, error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL