Documentation ¶
Index ¶
- Constants
- func GenerateCreateTopicInputDefault(ctx context.Context, topic TopicName, attrs ConfigAttributes) (sns.CreateTopicInput, error)
- func GenerateQueueAccessPolicyDefault(ctx context.Context, params GenerateQueueAccessPolicyParams) (map[string]any, error)
- func GenerateSqsQueueNameEqualToTopicName(ctx context.Context, snsTopic TopicArn) (string, error)
- func GenerateSubscribeInputDefault(ctx context.Context, params GenerateSubscribeInputParams) (*sns.SubscribeInput, error)
- type ConfigAttributes
- type DefaultMarshalerUnmarshaler
- type GenerateArnTopicResolver
- type GenerateCreateTopicInputFunc
- type GenerateQueueAccessPolicyFn
- type GenerateQueueAccessPolicyParams
- type GenerateSqsQueueNameFn
- type GenerateSubscribeInputFn
- type GenerateSubscribeInputParams
- type Marshaler
- type OverrideEndpointResolver
- type Publisher
- type PublisherConfig
- type Subscriber
- type SubscriberConfig
- type TopicArn
- type TopicName
- type TopicResolver
- type TransparentTopicResolver
Constants ¶
const MessageDeduplicationIdMetadataField = "MessageDeduplicationId"
const MessageGroupIdMetadataField = "MessageGroupId"
Variables ¶
This section is empty.
Functions ¶
func GenerateCreateTopicInputDefault ¶
func GenerateCreateTopicInputDefault(ctx context.Context, topic TopicName, attrs ConfigAttributes) (sns.CreateTopicInput, error)
func GenerateSubscribeInputDefault ¶
func GenerateSubscribeInputDefault(ctx context.Context, params GenerateSubscribeInputParams) (*sns.SubscribeInput, error)
Types ¶
type ConfigAttributes ¶
type ConfigAttributes struct { // DeliveryPolicy – The policy that defines how Amazon SNS retries failed // deliveries to HTTP/S endpoints. DeliveryPolicy string `json:"DeliveryPolicy,omitempty"` // DisplayName – The display name to use for a topic with SMS subscriptions. DisplayName string `json:"DisplayName,omitempty"` // Policy – The policy that defines who can access your topic. By default, only // the topic owner can publish or subscribe to the topic. Policy string `json:"Policy,omitempty"` // SignatureVersion – The signature version corresponds to the hashing // algorithm used while creating the signature of the notifications, subscription // confirmations, or unsubscribe confirmation messages sent by Amazon SNS. By // default, SignatureVersion is set to 1 . SignatureVersion string `json:"SignatureVersion,omitempty"` // TracingConfig – Tracing mode of an Amazon SNS topic. // By default TracingConfig is set to PassThrough , and the topic passes through the tracing // header it receives from an Amazon SNS publisher to its subscriptions. If set to // Active , Amazon SNS will vend X-Ray segment data to topic owner account if the // sampled flag in the tracing header is true. This is only supported on standard // topics. TracingConfig string `json:"TracingConfig,omitempty"` // KmsMasterKeyId – The ID of an Amazon Web Services managed customer master // key (CMK) for Amazon SNS or a custom CMK. For more information, see [Key Terms]. For // more examples, see [KeyId]in the Key Management Service API Reference. // // Applies only to server-side encryption. KmsMasterKeyId string `json:"KmsMasterKeyId,omitempty"` // FifoTopic – Set to true to create a FIFO topic. FifoTopic string `json:"FifoTopic,omitempty"` // ArchivePolicy – Adds or updates an inline policy document to archive messages stored in the specified // Amazon SNS topic. ArchivePolicy string `json:"ArchivePolicy,omitempty"` // BeginningArchiveTime – The earliest starting point at which a message in the // topic’s archive can be replayed from. This point in time is based on the // configured message retention period set by the topic’s message archiving policy. BeginningArchiveTime string `json:"BeginningArchiveTime,omitempty"` // ContentBasedDeduplication – Enables content-based deduplication for FIFO topics. // // By default, ContentBasedDeduplication is set to false . If you create a FIFO // topic and this attribute is false , you must specify a value for the // MessageDeduplicationId parameter for the [Publish]action. // // When you set ContentBasedDeduplication to true , Amazon SNS uses a SHA-256 // hash to generate the MessageDeduplicationId using the body of the message (but // not the attributes of the message). ContentBasedDeduplication string `json:"ContentBasedDeduplication,omitempty"` // CustomAttributes is a map of custom attributes that are not mapped to the struct fields. CustomAttributes map[string]string `json:"-"` }
ConfigAttributes is a struct that holds the attributes of an SNS topic
func (ConfigAttributes) Attributes ¶
func (s ConfigAttributes) Attributes() (map[string]string, error)
type DefaultMarshalerUnmarshaler ¶
type DefaultMarshalerUnmarshaler struct{}
func (DefaultMarshalerUnmarshaler) Marshal ¶
func (d DefaultMarshalerUnmarshaler) Marshal(topicArn TopicArn, msg *message.Message) *sns.PublishInput
type GenerateArnTopicResolver ¶
type GenerateArnTopicResolver struct {
// contains filtered or unexported fields
}
GenerateArnTopicResolver is a TopicResolver that generates ARN for the topic using the provided account ID and region.
func NewGenerateArnTopicResolver ¶
func NewGenerateArnTopicResolver(accountID string, region string) (*GenerateArnTopicResolver, error)
func (GenerateArnTopicResolver) ResolveTopic ¶
type GenerateCreateTopicInputFunc ¶
type GenerateCreateTopicInputFunc func(ctx context.Context, topic TopicName, attrs ConfigAttributes) (sns.CreateTopicInput, error)
type GenerateSqsQueueNameFn ¶
type GenerateSubscribeInputFn ¶
type GenerateSubscribeInputFn func(ctx context.Context, params GenerateSubscribeInputParams) (*sns.SubscribeInput, error)
type Marshaler ¶
type Marshaler interface {
Marshal(topicArn TopicArn, msg *message.Message) *sns.PublishInput
}
type OverrideEndpointResolver ¶
type OverrideEndpointResolver struct {
Endpoint transport.Endpoint
}
OverrideEndpointResolver is a custom endpoint resolver that always returns the same endpoint. It can be used to use AWS emulators like Localstack.
For example: import (
"github.com/ThreeDotsLabs/watermill-amazonsns/sns" amazonsns "github.com/aws/aws-sdk-go-v2/service/sns" "github.com/aws/smithy-go/transport"
)
pub, err := sns.NewPublisher(sns.PublisherConfig{ AWSConfig: cfg, Marshaler: sns.DefaultMarshalerUnmarshaler{}, OptFns: []func(*amazonsns.Options){ amazonsns.WithEndpointResolverV2(sns.OverrideEndpointResolver{ Endpoint: transport.Endpoint{ URI: url.URL{Scheme: "http", Host: "localstack:4566"}, }, }), }, }, logger)
func (OverrideEndpointResolver) ResolveEndpoint ¶
func (o OverrideEndpointResolver) ResolveEndpoint(ctx context.Context, params sns.EndpointParameters) (transport.Endpoint, error)
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
func NewPublisher ¶
func NewPublisher(config PublisherConfig, logger watermill.LoggerAdapter) (*Publisher, error)
func (*Publisher) CreateTopic ¶
type PublisherConfig ¶
type PublisherConfig struct { // AWSConfig is the AWS configuration. AWSConfig aws.Config // OptFns are options for the SNS client. OptFns []func(*sns.Options) // ConfigAttributes is a struct that holds the attributes of an SNS topic. CreateTopicConfig ConfigAttributes // DoNotCreateTopicIfNotExists disables creating the topic if it does not exist. DoNotCreateTopicIfNotExists bool // TopicResolver is a function that resolves the topic name to the topic ARN. TopicResolver TopicResolver // GenerateCreateTopicInput generates the input for the CreateTopic operation. GenerateCreateTopicInput GenerateCreateTopicInputFunc // Marshaler is a marshaler that marshals the message to the SNS input. Marshaler Marshaler }
func (*PublisherConfig) Validate ¶
func (c *PublisherConfig) Validate() error
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
func NewSubscriber ¶
func NewSubscriber( config SubscriberConfig, sqsConfig sqs.SubscriberConfig, logger watermill.LoggerAdapter, ) (*Subscriber, error)
func (*Subscriber) Close ¶
func (s *Subscriber) Close() error
func (*Subscriber) SubscribeInitialize ¶
func (s *Subscriber) SubscribeInitialize(topic string) error
SubscribeInitialize initializes SNS subscription for given topic. It creates SQS queue and subscribes it to SNS topic.
func (*Subscriber) SubscribeInitializeWithContext ¶
func (s *Subscriber) SubscribeInitializeWithContext(ctx context.Context, topic string) error
SubscribeInitializeWithContext initializes SNS subscription for given topic. It creates SQS queue and subscribes it to SNS topic.
type SubscriberConfig ¶
type SubscriberConfig struct { // AWSConfig is the AWS configuration. AWSConfig aws.Config // OptFns are options for the SNS client. OptFns []func(*sns.Options) // TopicResolver is a function that resolves the topic name to the topic ARN. TopicResolver TopicResolver // GenerateSqsQueueName generates the name of the SQS queue for the SNS subscription. GenerateSqsQueueName GenerateSqsQueueNameFn // GenerateSubscribeInput generates the input for the Subscribe operation. GenerateSubscribeInput GenerateSubscribeInputFn // GenerateQueueAccessPolicy generates the access policy for the SQS queue. GenerateQueueAccessPolicy GenerateQueueAccessPolicyFn // DoNotCreateSqsSubscription disables creating the SQS subscription. DoNotCreateSqsSubscription bool // DoNotSetQueueAccessPolicy disables setting the queue access policy. // Described in AWS docs: https://docs.aws.amazon.com/sns/latest/dg/subscribe-sqs-queue-to-sns-topic.html#SendMessageToSQS.sqs.permissions // Creating access policy requires "sqs:SetQueueAttributes" permission. DoNotSetQueueAccessPolicy bool }
func (*SubscriberConfig) SetDefaults ¶
func (c *SubscriberConfig) SetDefaults()
func (*SubscriberConfig) Validate ¶
func (c *SubscriberConfig) Validate() error
type TopicArn ¶
type TopicArn string
TopicArn is an ARN of the SNS topic
func GenerateTopicArn ¶
GenerateTopicArn generates an ARN for the SNS topic based on the region, accountID and topic name.
type TopicName ¶
type TopicName string
TopicName is a name of the SNS topic
func ExtractTopicNameFromTopicArn ¶
ExtractTopicNameFromTopicArn extracts the topic name from the topic ARN.
type TopicResolver ¶
type TopicResolver interface {
ResolveTopic(ctx context.Context, topic string) (snsTopic TopicArn, err error)
}
TopicResolver resolves topic name to topic ARN by topic passed to Publisher.Publish, Subscriber.Subscribe.
type TransparentTopicResolver ¶
type TransparentTopicResolver struct{}
TransparentTopicResolver is a TopicResolver that passes the topic as is.