Documentation ¶
Index ¶
- Constants
- Variables
- func AttachQueueForwardingPolicy(ctx context.Context, svc *sqs.Client, queueURL, queueARN string, ...) error
- func CreateQueue(ctx context.Context, svc *sqs.Client, queueName string) (string, error)
- func CreateTopic(ctx context.Context, svc *sns.Client, topicName string) (string, error)
- func DeleteQueue(ctx context.Context, svc *sqs.Client, queueURL string) error
- func DeleteTopic(ctx context.Context, svc *sns.Client, topicARN string) error
- func ForwardingPolicy(queueARN string, topicARNs ...string) string
- func GetQueueARN(ctx context.Context, svc *sqs.Client, queueURL string) (string, error)
- func Must(err error)
- func MustGetResource(s string, err error) string
- func RedrivePolicy(deadLetterQueueARN string, maxReceiveCount int) string
- func SetQueueAttributes(ctx context.Context, svc *sqs.Client, queueURL string, ...) error
- func Subscribe(ctx context.Context, svc *sns.Client, topicARN, queueARN string) (string, error)
- func Unsubscribe(ctx context.Context, svc *sns.Client, subscriptionARN string) error
- func WithAck(cfg AckConfig) func(s *Subscriber)
- func WithAckWaitTime(ackWaitTime time.Duration) func(s *Subscriber)
- func WithMaxMessages(maxMessages int) func(s *Subscriber)
- func WithStorage(storage pubsub.SchedulerStorage) func(s *Subscriber)
- func WithStorageThreshold(threshold time.Duration) func(s *Subscriber)
- func WithVisibilityTimeout(visibilityTimeout int) func(s *Subscriber)
- func WithWaitTime(waitTime int) func(s *Subscriber)
- type AckConfig
- type Publisher
- type SNSPublisher
- type SQSPublisher
- type SQSSchedulerStorage
- type Subscriber
Constants ¶
const ( QueueForwardingPolicyAttribute = "Policy" QueueRedrivePolicyAttribute = "RedrivePolicy" )
const (
MaxChangeVisibilityDelay = 12 * time.Hour
)
Variables ¶
var ( ErrSubscriberStopped = errors.New("subscriber stopped") ErrAcknowledgement = errors.New("cannot ack message") ErrNAcknowledgement = errors.New("cannot nack message") ErrChangeVisibility = errors.New("cannot change message visibility") ErrReSchedule = errors.New("cannot re-schedule message") ErrAlreadyStarted = errors.New("already started") ErrAlreadyStopped = errors.New("already stopped") ErrMissingConfig = errors.New("missing configuration") )
var ErrAsyncNAckNotSupported = errors.New("NAck on async strategy is not supported")
var ErrQueueNotFound = errors.New("could not find queue URL")
var ErrTopicNotFound = errors.New("could not find topic ARN")
var ErrTopicOrQueueNotFound = errors.New("could not find neither topic ARN nor queue URL")
Functions ¶
func AttachQueueForwardingPolicy ¶
func AttachQueueForwardingPolicy(ctx context.Context, svc *sqs.Client, queueURL, queueARN string, topicARNs ...string) error
AttachQueueForwardingPolicy attaches a queue policy that enables a topic to send messages to it.
func CreateQueue ¶
CreateQueue creates a SQS queue. Returns the QueueURL.
func CreateTopic ¶
CreateTopic creates a SNS topic.
func DeleteQueue ¶
DeleteQueue deletes a queue.
func DeleteTopic ¶
DeleteTopic deletes a topic.
func ForwardingPolicy ¶ added in v0.4.0
ForwardingPolicy generates the forwarding policy for a queue to be able to receive messages from the given topics.
func GetQueueARN ¶
GetQueueARN gets the queue ARN.
func MustGetResource ¶
MustGetResource will panic if the creation of a AWS resource has failed.
func RedrivePolicy ¶ added in v0.4.0
RedrivePolicy return the string to use for the redrive policy attribute of a queue.
func SetQueueAttributes ¶ added in v0.4.0
func SetQueueAttributes(ctx context.Context, svc *sqs.Client, queueURL string, attributes map[string]string) error
SetQueueAttributes sets the queue attributes.
func Unsubscribe ¶
Unsubscribe removes the subscription of the topic.
func WithAck ¶ added in v0.2.1
func WithAck(cfg AckConfig) func(s *Subscriber)
WithAck configures the acknowledgements behaviour
func WithAckWaitTime ¶ added in v0.4.1
func WithAckWaitTime(ackWaitTime time.Duration) func(s *Subscriber)
WithAckWaitTime indicates how much time the subscriber should wait for all the messages in the batch to be acknowledged before requesting a new batch. Ideally this time should be greater than the message visibility, either the specific for this subscriber or the queue default.
func WithMaxMessages ¶ added in v0.4.0
func WithMaxMessages(maxMessages int) func(s *Subscriber)
WithMaxMessages configures the number of messages to retrieve per request. If max messages <= 0 or > 10 the default will be used (10 messages).
func WithStorage ¶ added in v0.6.0
func WithStorage(storage pubsub.SchedulerStorage) func(s *Subscriber)
WithStorage sets an optional storage that can be used to re-schedule the message beyond the maximum message visibility in SQS (15 minutes)
func WithStorageThreshold ¶ added in v0.6.0
func WithStorageThreshold(threshold time.Duration) func(s *Subscriber)
WithStorageThreshold sets the threshold above which the storage will be used when changing the message visibility.
func WithVisibilityTimeout ¶ added in v0.4.0
func WithVisibilityTimeout(visibilityTimeout int) func(s *Subscriber)
WithVisibilityTimeout configures the time that the retrieved messages will be hidden from subsequent retrieve requests. If visibilityTimeout <= 0 the queue's default will be used. If it's greater than the 12 hours maximum, the maximum will be used: 43200s.
func WithWaitTime ¶ added in v0.4.0
func WithWaitTime(waitTime int) func(s *Subscriber)
WithWaitTime configures the time to wait during long poling waiting for new messages in the queue until the request is cancelled.
Types ¶
type AckConfig ¶
type AckConfig struct { // Async will ack on the message asynchronously returning // immediately with success. // // Errors will be reported in the next consuming cycle. // // When the subscriber closes, it will wait until all // acknowledge operations finish, reporting any errors. Async bool // BatchSize will indicate to buffer acknowledgements // until a certain amount of messages are pending. // // Batching acknowledgements creates // // Calling Ack on the message will return success, and // the errors will be reported when consuming new messages // // When the subscriber closes, it will wait until all // acknowledge operation finish. BatchSize int // ChangeVisibilityOnNack when true, the message visibility // will be reset to zero so the message is redelivered again // immediately. It doesn't support batching. ChangeVisibilityOnNack bool // FlushEvery indicates how often the messages should be // acknowledged even if the batch is not full yet. // // This value has no effect if Batch is not true. FlushEvery time.Duration }
AckConfig configures the acknowledgements behaviour.
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher SNS+SQS publisher.
func NewPublisher ¶ added in v0.6.0
NewPublisher creates a new SNS+SQS publisher.
func (*Publisher) AddResource ¶ added in v0.9.1
type SNSPublisher ¶ added in v0.4.2
type SNSPublisher struct {
// contains filtered or unexported fields
}
SNSPublisher SNS publisher.
func NewSNSPublisher ¶ added in v0.2.1
func NewSNSPublisher(sns *sns.Client, topicARNs map[string]string) *SNSPublisher
NewSNSPublisher creates a new SNS publisher.
type SQSPublisher ¶ added in v0.4.2
type SQSPublisher struct {
// contains filtered or unexported fields
}
SQSPublisher a publisher that publishes directly to queues.
func NewSQSDirectPublisher ¶ added in v0.4.2
func NewSQSDirectPublisher(sqs *sqs.Client) *SQSPublisher
NewSQSDirectPublisher creates a new SQS publisher without any queue alias.
func NewSQSPublisher ¶ added in v0.4.2
func NewSQSPublisher(sqs *sqs.Client, queueURLs map[string]string) *SQSPublisher
NewSQSPublisher creates a new SQS publisher with a custom map for queue URLs.
type SQSSchedulerStorage ¶ added in v0.5.0
type SQSSchedulerStorage struct { *SQSPublisher pubsub.SchedulerStorage }
SQSSchedulerStorage a publisher that can publish directly to queues.
func NewSQSSchedulerStorage ¶ added in v0.5.0
func NewSQSSchedulerStorage(pub *SQSPublisher, storage pubsub.SchedulerStorage) *SQSSchedulerStorage
NewSQSSchedulerStorage creates a new hybrid SQS publisher + scheduler storage
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber for AWS SQS.
func NewSQSSubscriber ¶ added in v0.2.1
func NewSQSSubscriber(sqs sqsSvc, queueURL string, opts ...func(s *Subscriber)) *Subscriber
NewSQSSubscriber creates a new SQS subscriber.