sns

package
v0.0.0-...-5e471cc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 13, 2024 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const MessageDeduplicationIdMetadataField = "MessageDeduplicationId"
View Source
const MessageGroupIdMetadataField = "MessageGroupId"

Variables

This section is empty.

Functions

func GenerateCreateTopicInputDefault

func GenerateCreateTopicInputDefault(ctx context.Context, topic TopicName, attrs ConfigAttributes) (sns.CreateTopicInput, error)

func GenerateQueueAccessPolicyDefault

func GenerateQueueAccessPolicyDefault(ctx context.Context, params GenerateQueueAccessPolicyParams) (map[string]any, error)

func GenerateSqsQueueNameEqualToTopicName

func GenerateSqsQueueNameEqualToTopicName(ctx context.Context, snsTopic TopicArn) (string, error)

func GenerateSubscribeInputDefault

func GenerateSubscribeInputDefault(ctx context.Context, params GenerateSubscribeInputParams) (*sns.SubscribeInput, error)

Types

type ConfigAttributes

type ConfigAttributes struct {
	// DeliveryPolicy – The policy that defines how Amazon SNS retries failed
	// deliveries to HTTP/S endpoints.
	DeliveryPolicy string `json:"DeliveryPolicy,omitempty"`

	// DisplayName – The display name to use for a topic with SMS subscriptions.
	DisplayName string `json:"DisplayName,omitempty"`

	// Policy – The policy that defines who can access your topic. By default, only
	// the topic owner can publish or subscribe to the topic.
	Policy string `json:"Policy,omitempty"`

	// SignatureVersion – The signature version corresponds to the hashing
	// algorithm used while creating the signature of the notifications, subscription
	// confirmations, or unsubscribe confirmation messages sent by Amazon SNS. By
	// default, SignatureVersion is set to 1 .
	SignatureVersion string `json:"SignatureVersion,omitempty"`

	// TracingConfig – Tracing mode of an Amazon SNS topic.
	// By default TracingConfig is set to PassThrough , and the topic passes through the tracing
	// header it receives from an Amazon SNS publisher to its subscriptions. If set to
	// Active , Amazon SNS will vend X-Ray segment data to topic owner account if the
	// sampled flag in the tracing header is true. This is only supported on standard
	// topics.
	TracingConfig string `json:"TracingConfig,omitempty"`

	// KmsMasterKeyId – The ID of an Amazon Web Services managed customer master
	// key (CMK) for Amazon SNS or a custom CMK. For more information, see [Key Terms]. For
	// more examples, see [KeyId]in the Key Management Service API Reference.
	//
	// Applies only to server-side encryption.
	KmsMasterKeyId string `json:"KmsMasterKeyId,omitempty"`

	// FifoTopic – Set to true to create a FIFO topic.
	FifoTopic string `json:"FifoTopic,omitempty"`

	// ArchivePolicy – Adds or updates an inline policy document to archive  messages stored in the specified
	// Amazon SNS topic.
	ArchivePolicy string `json:"ArchivePolicy,omitempty"`

	// BeginningArchiveTime – The earliest starting point at which a message in the
	// topic’s archive can be replayed from. This point in time is based on the
	// configured message retention period set by the topic’s message archiving policy.
	BeginningArchiveTime string `json:"BeginningArchiveTime,omitempty"`

	// ContentBasedDeduplication – Enables content-based deduplication for FIFO topics.
	//
	// By default, ContentBasedDeduplication is set to false . If you create a FIFO
	// topic and this attribute is false , you must specify a value for the
	// MessageDeduplicationId parameter for the [Publish]action.
	//
	// When you set ContentBasedDeduplication to true , Amazon SNS uses a SHA-256
	// hash to generate the MessageDeduplicationId using the body of the message (but
	// not the attributes of the message).
	ContentBasedDeduplication string `json:"ContentBasedDeduplication,omitempty"`

	// CustomAttributes is a map of custom attributes that are not mapped to the struct fields.
	CustomAttributes map[string]string `json:"-"`
}

ConfigAttributes is a struct that holds the attributes of an SNS topic

func (ConfigAttributes) Attributes

func (s ConfigAttributes) Attributes() (map[string]string, error)

type DefaultMarshalerUnmarshaler

type DefaultMarshalerUnmarshaler struct{}

func (DefaultMarshalerUnmarshaler) Marshal

type GenerateArnTopicResolver

type GenerateArnTopicResolver struct {
	// contains filtered or unexported fields
}

GenerateArnTopicResolver is a TopicResolver that generates ARN for the topic using the provided account ID and region.

func NewGenerateArnTopicResolver

func NewGenerateArnTopicResolver(accountID string, region string) (*GenerateArnTopicResolver, error)

func (GenerateArnTopicResolver) ResolveTopic

func (g GenerateArnTopicResolver) ResolveTopic(ctx context.Context, topic string) (snsTopic TopicArn, err error)

type GenerateCreateTopicInputFunc

type GenerateCreateTopicInputFunc func(ctx context.Context, topic TopicName, attrs ConfigAttributes) (sns.CreateTopicInput, error)

type GenerateQueueAccessPolicyFn

type GenerateQueueAccessPolicyFn func(ctx context.Context, params GenerateQueueAccessPolicyParams) (map[string]any, error)

type GenerateQueueAccessPolicyParams

type GenerateQueueAccessPolicyParams struct {
	SqsQueueArn sqs.QueueArn
	SnsTopicArn TopicArn
	SqsURL      sqs.QueueURL
}

type GenerateSqsQueueNameFn

type GenerateSqsQueueNameFn func(ctx context.Context, snsTopic TopicArn) (string, error)

type GenerateSubscribeInputFn

type GenerateSubscribeInputFn func(ctx context.Context, params GenerateSubscribeInputParams) (*sns.SubscribeInput, error)

type GenerateSubscribeInputParams

type GenerateSubscribeInputParams struct {
	SqsTopic string

	SnsTopicArn TopicArn
	SqsQueueArn sqs.QueueArn
}

type Marshaler

type Marshaler interface {
	Marshal(topicArn TopicArn, msg *message.Message) *sns.PublishInput
}

type OverrideEndpointResolver

type OverrideEndpointResolver struct {
	Endpoint transport.Endpoint
}

OverrideEndpointResolver is a custom endpoint resolver that always returns the same endpoint. It can be used to use AWS emulators like Localstack.

For example: import (

"github.com/ThreeDotsLabs/watermill-amazonsns/sns"
amazonsns "github.com/aws/aws-sdk-go-v2/service/sns"
"github.com/aws/smithy-go/transport"

)

pub, err := sns.NewPublisher(sns.PublisherConfig{
		AWSConfig: cfg,
		Marshaler: sns.DefaultMarshalerUnmarshaler{},
		OptFns: []func(*amazonsns.Options){
			amazonsns.WithEndpointResolverV2(sns.OverrideEndpointResolver{
				Endpoint: transport.Endpoint{
					URI: url.URL{Scheme: "http", Host: "localstack:4566"},
				},
			}),
		},
	}, logger)

func (OverrideEndpointResolver) ResolveEndpoint

func (o OverrideEndpointResolver) ResolveEndpoint(ctx context.Context, params sns.EndpointParameters) (transport.Endpoint, error)

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

func NewPublisher

func NewPublisher(config PublisherConfig, logger watermill.LoggerAdapter) (*Publisher, error)

func (*Publisher) Close

func (p *Publisher) Close() error

func (*Publisher) CreateTopic

func (p *Publisher) CreateTopic(ctx context.Context, topic string) (string, error)

func (*Publisher) Publish

func (p *Publisher) Publish(topic string, messages ...*message.Message) error

type PublisherConfig

type PublisherConfig struct {
	// AWSConfig is the AWS configuration.
	AWSConfig aws.Config

	// OptFns are options for the SNS client.
	OptFns []func(*sns.Options)

	// ConfigAttributes is a struct that holds the attributes of an SNS topic.
	CreateTopicConfig ConfigAttributes

	// DoNotCreateTopicIfNotExists disables creating the topic if it does not exist.
	DoNotCreateTopicIfNotExists bool

	// TopicResolver is a function that resolves the topic name to the topic ARN.
	TopicResolver TopicResolver

	// GenerateCreateTopicInput generates the input for the CreateTopic operation.
	GenerateCreateTopicInput GenerateCreateTopicInputFunc

	// Marshaler is a marshaler that marshals the message to the SNS input.
	Marshaler Marshaler
}

func (*PublisherConfig) Validate

func (c *PublisherConfig) Validate() error

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

func NewSubscriber

func NewSubscriber(
	config SubscriberConfig,
	sqsConfig sqs.SubscriberConfig,
	logger watermill.LoggerAdapter,
) (*Subscriber, error)

func (*Subscriber) Close

func (s *Subscriber) Close() error

func (*Subscriber) Subscribe

func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error)

func (*Subscriber) SubscribeInitialize

func (s *Subscriber) SubscribeInitialize(topic string) error

SubscribeInitialize initializes SNS subscription for given topic. It creates SQS queue and subscribes it to SNS topic.

func (*Subscriber) SubscribeInitializeWithContext

func (s *Subscriber) SubscribeInitializeWithContext(ctx context.Context, topic string) error

SubscribeInitializeWithContext initializes SNS subscription for given topic. It creates SQS queue and subscribes it to SNS topic.

type SubscriberConfig

type SubscriberConfig struct {
	// AWSConfig is the AWS configuration.
	AWSConfig aws.Config

	// OptFns are options for the SNS client.
	OptFns []func(*sns.Options)

	// TopicResolver is a function that resolves the topic name to the topic ARN.
	TopicResolver TopicResolver

	// GenerateSqsQueueName generates the name of the SQS queue for the SNS subscription.
	GenerateSqsQueueName GenerateSqsQueueNameFn

	// GenerateSubscribeInput generates the input for the Subscribe operation.
	GenerateSubscribeInput GenerateSubscribeInputFn

	// GenerateQueueAccessPolicy generates the access policy for the SQS queue.
	GenerateQueueAccessPolicy GenerateQueueAccessPolicyFn

	// DoNotCreateSqsSubscription disables creating the SQS subscription.
	DoNotCreateSqsSubscription bool

	// DoNotSetQueueAccessPolicy disables setting the queue access policy.
	// Described in AWS docs: https://docs.aws.amazon.com/sns/latest/dg/subscribe-sqs-queue-to-sns-topic.html#SendMessageToSQS.sqs.permissions
	// Creating access policy requires "sqs:SetQueueAttributes" permission.
	DoNotSetQueueAccessPolicy bool
}

func (*SubscriberConfig) SetDefaults

func (c *SubscriberConfig) SetDefaults()

func (*SubscriberConfig) Validate

func (c *SubscriberConfig) Validate() error

type TopicArn

type TopicArn string

TopicArn is an ARN of the SNS topic

func GenerateTopicArn

func GenerateTopicArn(region, accountID, topic string) (TopicArn, error)

GenerateTopicArn generates an ARN for the SNS topic based on the region, accountID and topic name.

type TopicName

type TopicName string

TopicName is a name of the SNS topic

func ExtractTopicNameFromTopicArn

func ExtractTopicNameFromTopicArn(topicArn TopicArn) (TopicName, error)

ExtractTopicNameFromTopicArn extracts the topic name from the topic ARN.

type TopicResolver

type TopicResolver interface {
	ResolveTopic(ctx context.Context, topic string) (snsTopic TopicArn, err error)
}

TopicResolver resolves topic name to topic ARN by topic passed to Publisher.Publish, Subscriber.Subscribe.

type TransparentTopicResolver

type TransparentTopicResolver struct{}

TransparentTopicResolver is a TopicResolver that passes the topic as is.

func (TransparentTopicResolver) ResolveTopic

func (a TransparentTopicResolver) ResolveTopic(ctx context.Context, topic string) (snsTopic TopicArn, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL