Documentation ¶
Index ¶
- Constants
- func GenerateCreateQueueInputDefault(ctx context.Context, queueName QueueName, attrs QueueConfigAttributes) (*sqs.CreateQueueInput, error)
- func GenerateDeleteMessageInputDefault(ctx context.Context, queueURL QueueURL, receiptHandle *string) (*sqs.DeleteMessageInput, error)
- func GenerateGetQueueUrlInputDefault(ctx context.Context, topic string) (*sqs.GetQueueUrlInput, error)
- func GenerateReceiveMessageInputDefault(ctx context.Context, queueURL QueueURL) (*sqs.ReceiveMessageInput, error)
- func GenerateSendMessageInputDefault(ctx context.Context, queueURL QueueURL, msg *types.Message) (*sqs.SendMessageInput, error)
- type DefaultMarshalerUnmarshaler
- type DefaultRmMongoMarshalerUnmarshaler
- type GenerateCreateQueueInputFunc
- type GenerateDeleteMessageInputFunc
- type GenerateGetQueueUrlInputFunc
- type GenerateQueueUrlResolver
- type GenerateReceiveMessageInputFunc
- type GenerateSendMessageInputFunc
- type GetQueueUrlByNameUrlResolver
- type GetQueueUrlByNameUrlResolverConfig
- type Marshaler
- type OverrideEndpointResolver
- type Publisher
- func (p *Publisher) Close() error
- func (p *Publisher) GetQueueArn(ctx context.Context, url *QueueURL) (*QueueArn, error)
- func (p *Publisher) GetQueueUrl(ctx context.Context, topic string, createIfNotExists bool) (QueueName, QueueURL, error)
- func (p *Publisher) Publish(topic string, messages ...*message.Message) error
- type PublisherConfig
- type QueueArn
- type QueueConfigAttributes
- type QueueConfigAttributesBool
- type QueueName
- type QueueURL
- type QueueUrlResolver
- type QueueUrlResolverResult
- type ResolveQueueUrlParams
- type RmMongoMarshaler
- type RmMongoUnmarshaler
- type Subscriber
- func (s *Subscriber) Close() error
- func (s *Subscriber) GetQueueArn(ctx context.Context, url *QueueURL) (*QueueArn, error)
- func (s *Subscriber) GetQueueUrl(ctx context.Context, topic string) (*QueueURL, error)
- func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error)
- func (s *Subscriber) SubscribeInitialize(topic string) error
- func (s *Subscriber) SubscribeInitializeWithContext(ctx context.Context, topic string) error
- type SubscriberConfig
- type TransparentUrlResolver
- type Unmarshaler
Constants ¶
const ( AWSStringDataType = "String" AWSNumberDataType = "Number" AWSBinaryDataType = "Binary" )
const NoSleep time.Duration = -1
const UUIDAttribute = "_watermill_message_uuid"
Variables ¶
This section is empty.
Functions ¶
func GenerateCreateQueueInputDefault ¶
func GenerateCreateQueueInputDefault(ctx context.Context, queueName QueueName, attrs QueueConfigAttributes) (*sqs.CreateQueueInput, error)
Types ¶
type DefaultMarshalerUnmarshaler ¶
type DefaultMarshalerUnmarshaler struct{}
type DefaultRmMongoMarshalerUnmarshaler ¶
type DefaultRmMongoMarshalerUnmarshaler struct {
MongoCollection *mongo.Collection
}
type GenerateCreateQueueInputFunc ¶
type GenerateCreateQueueInputFunc func(ctx context.Context, queueName QueueName, attrs QueueConfigAttributes) (*sqs.CreateQueueInput, error)
type GenerateQueueUrlResolver ¶
GenerateQueueUrlResolver is a resolver that generates queue URL based on AWS region and account ID.
func (GenerateQueueUrlResolver) ResolveQueueUrl ¶
func (p GenerateQueueUrlResolver) ResolveQueueUrl(ctx context.Context, params ResolveQueueUrlParams) (QueueUrlResolverResult, error)
type GetQueueUrlByNameUrlResolver ¶
type GetQueueUrlByNameUrlResolver struct {
// contains filtered or unexported fields
}
GetQueueUrlByNameUrlResolver resolves queue url by calling AWS API. Topic name passed to Publisher.Publish, Subscriber.Subscribe is mapped to queue name.
By default, queue URL is cached. It can be changed with GetQueueUrlByNameUrlResolverConfig.
func NewGetQueueUrlByNameUrlResolver ¶
func NewGetQueueUrlByNameUrlResolver( config GetQueueUrlByNameUrlResolverConfig, ) *GetQueueUrlByNameUrlResolver
func (*GetQueueUrlByNameUrlResolver) ResolveQueueUrl ¶
func (p *GetQueueUrlByNameUrlResolver) ResolveQueueUrl(ctx context.Context, params ResolveQueueUrlParams) (res QueueUrlResolverResult, err error)
type GetQueueUrlByNameUrlResolverConfig ¶
type GetQueueUrlByNameUrlResolverConfig struct { // DoNotCacheQueues disables caching of queue URLs. DoNotCacheQueues bool // GenerateGetQueueUrlInput generates *sqs.GetQueueUrlInput for AWS SDK. GenerateGetQueueUrlInput GenerateGetQueueUrlInputFunc }
type OverrideEndpointResolver ¶
type OverrideEndpointResolver struct {
Endpoint transport.Endpoint
}
OverrideEndpointResolver is a custom endpoint resolver that always returns the same endpoint. It can be used to use AWS emulators like Localstack.
For example: import (
"github.com/ThreeDotsLabs/watermill-aws/sqs" amazonsqs "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/aws/smithy-go/transport"
)
pub, err := sqs.NewPublisher(sqs.PublisherConfig{ AWSConfig: cfg, Marshaler: sqs.DefaultMarshalerUnmarshaler{}, OptFns: []func(*amazonsqs.Options){ amazonsqs.WithEndpointResolverV2(sqs.OverrideEndpointResolver{ Endpoint: transport.Endpoint{ URI: url.URL{Scheme: "http", Host: "localstack:4566"}, }, }), }, }, logger)
func (OverrideEndpointResolver) ResolveEndpoint ¶
func (o OverrideEndpointResolver) ResolveEndpoint(ctx context.Context, params sqs.EndpointParameters) (transport.Endpoint, error)
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
func NewPublisher ¶
func NewPublisher(config PublisherConfig, logger watermill.LoggerAdapter) (*Publisher, error)
func (*Publisher) GetQueueArn ¶
func (*Publisher) GetQueueUrl ¶
type PublisherConfig ¶
type PublisherConfig struct { // AWSConfig is the AWS configuration. AWSConfig aws.Config MongoConfig *mongo.Database // OptFns are options for the SQS client. OptFns []func(*sqs.Options) // QueueConfigAttributes is a struct that holds the attributes of an SQS queue. CreateQueueConfig QueueConfigAttributes // DoNotCreateQueueIfNotExists disables creating the queue if it does not exist. DoNotCreateQueueIfNotExists bool // QueueUrlResolver is a function that resolves queue URL. QueueUrlResolver QueueUrlResolver // GenerateSendMessageInput generates *sqs.SendMessageInput for AWS SDK. GenerateSendMessageInput GenerateSendMessageInputFunc // GenerateCreateQueueInput generates *sqs.CreateQueueInput for AWS SDK. GenerateCreateQueueInput GenerateCreateQueueInputFunc Marshaler Marshaler }
func (*PublisherConfig) Validate ¶
func (c *PublisherConfig) Validate() error
type QueueConfigAttributes ¶
type QueueConfigAttributes struct { // DelaySeconds – The length of time, in seconds, for which the delivery of all messages in the queue is delayed. // Valid values: An integer from 0 to 900 (15 minutes). Default: 0. DelaySeconds string `json:"DelaySeconds,omitempty"` // MaximumMessageSize – The limit of how many bytes a message can contain before Amazon SQS rejects it. // Valid values: An integer from 1,024 bytes (1 KiB) up to 262,144 bytes (256 KiB). Default: 262,144 (256 KiB). MaximumMessageSize string `json:"MaximumMessageSize,omitempty"` // MessageRetentionPeriod – The length of time, in seconds, for which Amazon SQS retains a message. // Valid values: An integer representing seconds, from 60 (1 minute) to 1,209,600 (14 days). // Default: 345,600 (4 days). // // When you change a queue's attributes, the change can take up to 60 seconds for most of the attributes to // propagate throughout the Amazon SQS system. Changes made to the MessageRetentionPeriod attribute can take up to // 15 minutes and will impact existing messages in the queue potentially causing them to be expired and deleted if // the MessageRetentionPeriod is reduced below the age of existing messages. MessageRetentionPeriod string `json:"MessageRetentionPeriod,omitempty"` // Policy – The queue's policy. A valid AWS policy. For more information about policy structure, see // [Overview of AWS IAM Policies](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies.html) in the // AWS Identity and Access Management User Guide. Policy string `json:"Policy,omitempty"` // ReceiveMessageWaitTimeSeconds – The length of time, in seconds, for which a ReceiveMessage action waits for a // message to arrive. // Valid values: An integer from 0 to 20 (seconds). Default: 0. ReceiveMessageWaitTimeSeconds string `json:"ReceiveMessageWaitTimeSeconds,omitempty"` // RedrivePolicy – The string that includes the parameters for the dead-letter queue functionality of the source // queue as a JSON object. The parameters are as follows: RedrivePolicy string `json:"RedrivePolicy,omitempty"` // DeadLetterTargetArn – The Amazon Resource Name (ARN) of the dead-letter queue to which Amazon SQS moves // messages after the value of maxReceiveCount is exceeded. DeadLetterTargetArn string `json:"deadLetterTargetArn,omitempty"` // MaxReceiveCount – The number of times a message is delivered to the source queue before being moved to the // dead-letter queue. Default: 10. When the ReceiveCount for a message exceeds the maxReceiveCount for a queue, // Amazon SQS moves the message to the dead-letter-queue. MaxReceiveCount string `json:"maxReceiveCount,omitempty"` // VisibilityTimeout – The visibility timeout for the queue, in seconds. // Valid values: An integer from 0 to 43,200 (12 hours). Default: 30. // // For more information about the visibility timeout, see // [Visibility Timeout](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html) in the Amazon SQS Developer Guide. VisibilityTimeout string `json:"VisibilityTimeout,omitempty"` // KmsMasterKeyId – The ID of an AWS managed customer master key (CMK) for Amazon SQS or a custom CMK. // For more information, see [Key Terms](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-server-side-encryption.html#sqs-sse-key-terms). // // While the alias of the AWS-managed CMK for Amazon SQS is always // alias/aws/sqs, the alias of a custom CMK can, for example, be alias/MyAlias. // For more examples, see [KeyId](https://docs.aws.amazon.com/kms/latest/APIReference/API_DescribeKey.html#API_DescribeKey_RequestParameters) // in the AWS Key Management Service API Reference. KmsMasterKeyId string `json:"KmsMasterKeyId,omitempty"` // KmsDataKeyReusePeriodSeconds – The length of time, in seconds, for which Amazon SQS can reuse a [data key](https://docs.aws.amazon.com/kms/latest/developerguide/concepts.html#data-keys) // to encrypt or decrypt messages before calling AWS KMS again. // An integer representing seconds, between 60 seconds (1 minute) and 86,400 seconds (24 hours). // Default: 300 (5 minutes). // // A shorter time period provides better security but results in more calls to KMS which might incur charges after Free Tier. // For more information, see How Does the [Data Key Reuse Period Work?](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-server-side-encryption.html#sqs-how-does-the-data-key-reuse-period-work). KmsDataKeyReusePeriodSeconds string `json:"KmsDataKeyReusePeriodSeconds,omitempty"` // SqsManagedSseEnabled – Enables server-side queue encryption using SQS owned encryption keys. // Only one server-side encryption option is supported per queue (for example, [SSE-KMS](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-configure-sse-existing-queue.html) or [SSE-SQS](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-configure-sqs-sse-queue.html). SqsManagedSseEnabled string `json:"SqsManagedSseEnabled,omitempty"` // FifoQueue - Designates a queue as FIFO. Valid values: true, false. Default: false. FifoQueue QueueConfigAttributesBool `json:"FifoQueue,omitempty"` // If you don't provide a MessageDeduplicationId and the queue doesn't have ContentBasedDeduplication set, the // action fails with an error. // If the queue has ContentBasedDeduplication set, your MessageDeduplicationId overrides the generated one. // When ContentBasedDeduplication is in effect, messages with identical content sent within the deduplication // interval are treated as duplicates and only one copy of the message is delivered. // // If you send one message with ContentBasedDeduplication enabled and then another message with a // MessageDeduplicationId that is the same as the one generated for the first MessageDeduplicationId, the two // messages are treated as duplicates and only one copy of the message is delivered. ContentBasedDeduplication QueueConfigAttributesBool `json:"ContentBasedDeduplication,omitempty"` // DeduplicationScope – Specifies whether message deduplication occurs at the message group or queue level. // Valid values are messageGroup and queue. // // Apply only to [high throughput for FIFO queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/high-throughput-fifo.html). DeduplicationScope string `json:"DeduplicationScope,omitempty"` // FifoThroughputLimit – Specifies whether the FIFO queue throughput quota applies to the entire queue or per message group. // Valid values are perQueue and perMessageGroupId. // The perMessageGroupId value is allowed only when the value for DeduplicationScope is messageGroup. // Apply only to [high throughput for FIFO queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/high-throughput-fifo.html). FifoThroughputLimit string `json:"FifoThroughputLimit,omitempty"` // CustomAttributes is a map of custom attributes that are not mapped to the struct fields. CustomAttributes map[string]string `json:"-"` }
QueueConfigAttributes is a struct that holds the attributes of an SQS queue. https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SetQueueAttributes.html
func (QueueConfigAttributes) Attributes ¶
func (q QueueConfigAttributes) Attributes() (map[string]string, error)
type QueueConfigAttributesBool ¶
type QueueConfigAttributesBool bool
QueueConfigAttributesBool is a custom type for bool values in QueueConfigAttributes that supports marshaling to string.
func (QueueConfigAttributesBool) MarshalText ¶
func (q QueueConfigAttributesBool) MarshalText() ([]byte, error)
type QueueUrlResolver ¶
type QueueUrlResolver interface {
ResolveQueueUrl(ctx context.Context, params ResolveQueueUrlParams) (QueueUrlResolverResult, error)
}
QueueUrlResolver resolves queue URL by topic passed to Publisher.Publish, Subscriber.Subscribe.
type QueueUrlResolverResult ¶
type ResolveQueueUrlParams ¶
type ResolveQueueUrlParams struct { // Topic passed to Publisher.Publish, Subscriber.Subscribe, etc. // It may be mapped to a different name by QueueUrlResolver. Topic string SqsClient *sqs.Client Logger watermill.LoggerAdapter }
type RmMongoMarshaler ¶
type RmMongoUnmarshaler ¶
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
func NewSubscriber ¶
func NewSubscriber(config SubscriberConfig, logger watermill.LoggerAdapter) (*Subscriber, error)
func (*Subscriber) Close ¶
func (s *Subscriber) Close() error
func (*Subscriber) GetQueueArn ¶
func (*Subscriber) GetQueueUrl ¶
func (*Subscriber) SubscribeInitialize ¶
func (s *Subscriber) SubscribeInitialize(topic string) error
func (*Subscriber) SubscribeInitializeWithContext ¶
func (s *Subscriber) SubscribeInitializeWithContext(ctx context.Context, topic string) error
type SubscriberConfig ¶
type SubscriberConfig struct { // AWSConfig is the AWS configuration. AWSConfig aws.Config MongoDb *mongo.Database // OptFns are options for the SQS client. OptFns []func(*sqs.Options) // DoNotCreateQueueIfNotExists disables creating the queue if it does not exist. DoNotCreateQueueIfNotExists bool // QueueUrlResolver is a function that resolves the queue name to the queue URL. QueueUrlResolver QueueUrlResolver // ReconnectRetrySleep is the time to sleep between reconnect attempts. ReconnectRetrySleep time.Duration // QueueConfigAttributes is a struct that holds the attributes of an SQS queue. QueueConfigAttributes QueueConfigAttributes // GenerateCreateQueueInput generates *sqs.CreateQueueInput for AWS SDK. GenerateCreateQueueInput GenerateCreateQueueInputFunc // GenerateReceiveMessageInput generates *sqs.ReceiveMessageInput for AWS SDK. GenerateReceiveMessageInput GenerateReceiveMessageInputFunc // GenerateDeleteMessageInput generates *sqs.DeleteMessageInput for AWS SDK. GenerateDeleteMessageInput GenerateDeleteMessageInputFunc Unmarshaler Unmarshaler }
func (*SubscriberConfig) SetDefaults ¶
func (c *SubscriberConfig) SetDefaults()
func (SubscriberConfig) Validate ¶
func (c SubscriberConfig) Validate() error
type TransparentUrlResolver ¶
type TransparentUrlResolver struct{}
TransparentUrlResolver is a resolver that uses topic passed to Publisher.Publish, Subscriber.Subscribe as queue URL. In this case, you should pass queue URL as topic.
func (TransparentUrlResolver) ResolveQueueUrl ¶
func (p TransparentUrlResolver) ResolveQueueUrl(ctx context.Context, params ResolveQueueUrlParams) (res QueueUrlResolverResult, err error)