Documentation ¶
Overview ¶
Package sqsconsumer enables easy and efficient message processing from an SQS queue.
Overview ¶
Consumers will read from queues in batches and run a handler func for each message. Note that no retry limit is managed by this package, so use the SQS Dead Letter Queue facility. Of course, you can use another consumer to handle messages that end up in the Dead Letter Queue.
SQS ¶
SQS provides at-least-once delivery with no guarantee of message ordering. When messages are received, a visibility timeout starts and when the timeout expires then the message will be delivered again. Long running message handlers must extend the timeout periodically to ensure that they retain exclusivity on the message, and they must explicitly delete messages that were successfully consumed to avoid redelivery.
To read more about how SQS works, check the SQS documentation at https://aws.amazon.com/documentation/sqs/
Middleware ¶
Visibility timeout extension and deleting messages after successful handling are implemented as handler middleware. See github.com/Wattpad/sqsconsumer/middleware for details on these and other middleware layers available.
Use ¶
See the example directory for a demonstration of use.
Index ¶
- Variables
- func NewBatchDeleter(ctx context.Context, wg *sync.WaitGroup, s *SQSService, ...) chan<- *sqs.Message
- func NewBatchVisibilityExtender(ctx context.Context, s *SQSService, ticker <-chan time.Time, ...) chan<- *sqs.Message
- func NoopLogger(_ string, _ ...interface{})
- func SetupQueue(svc SQSAPI, name string) (*string, error)
- type AWSConfigOption
- type Consumer
- type MessageHandlerFunc
- type RunOption
- type SQSAPI
- type SQSService
Constants ¶
This section is empty.
Variables ¶
var (
ErrShutdownChannelClosed = errors.New("shutDown channel is already closed")
)
Functions ¶
func NewBatchDeleter ¶
func NewBatchDeleter(ctx context.Context, wg *sync.WaitGroup, s *SQSService, every, drainTimeout time.Duration) chan<- *sqs.Message
NewBatchDeleter starts a batch deleter routine that deletes messages after they are sent to the returned channel
func NewBatchVisibilityExtender ¶
func NewBatchVisibilityExtender(ctx context.Context, s *SQSService, ticker <-chan time.Time, extensionSecs int64, pending []*sqs.Message) chan<- *sqs.Message
NewBatchBatchVisibilityExtender starts a batch visibility extender routine that extends visibilty on messages until they are sent to the returned channel
func NoopLogger ¶
func NoopLogger(_ string, _ ...interface{})
Types ¶
type AWSConfigOption ¶
func OptAWSRegion ¶
func OptAWSRegion(region string) AWSConfigOption
type Consumer ¶
type Consumer struct { Logger func(string, ...interface{}) WaitSeconds int64 ReceiveVisibilityTimoutSeconds int64 ExtendVisibilityTimeoutBySeconds int64 ExtendVisibilityTimeoutEvery time.Duration DeleteMessageAccumulatorTimeout time.Duration DeleteMessageDrainTimeout time.Duration // contains filtered or unexported fields }
Consumer is an SQS queue consumer
func NewConsumer ¶
func NewConsumer(s *SQSService, handler MessageHandlerFunc) *Consumer
NewConsumer creates a Consumer that uses the given SQSService to connect and invokes the handler for each message received.
func (*Consumer) Run ¶
Run starts the Consumer, stopping it when the given context is cancelled. To shut down without canceling the Context, and allow in-flight messages to drain, use the WithShutdownChan RunOption.
If the context is canceled, the returned error is the context's error. If the optional shutDown channel is closed before Run is called, the returned error is ErrShutdownChannelClosed. If in-flight messages drain to completion after shutdown, the returned error is nil.
type MessageHandlerFunc ¶
MessageHandlerFunc is the interface that users of this library should implement. It will be called once per message and should return an error if there was a problem processing the message. Note that Consumer ignores the error, but it is necessary for some middleware to know whether handling was successful or not.
type RunOption ¶
type RunOption func(o *runOpts)
func WithShutdownChan ¶
func WithShutdownChan(shutDown <-chan struct{}) RunOption
WithShutdownChan accepts a channel that will gracefully shut down the consumer when it is closed. The consumer will stop receiving messages from SQS and will return from the Run method once in-flight handlers have completed.
The channel must be closed for shutdown to occur. Sending a value down the channel will not shut down the consumer.
The Run method's context.Context may be canceled during this time to abort pending operations early. This is done co-operatively and requires the consumer's handler func to honour the context cancelation.
type SQSAPI ¶
type SQSAPI interface { ChangeMessageVisibilityBatch(*sqs.ChangeMessageVisibilityBatchInput) (*sqs.ChangeMessageVisibilityBatchOutput, error) CreateQueue(*sqs.CreateQueueInput) (*sqs.CreateQueueOutput, error) DeleteMessageBatch(*sqs.DeleteMessageBatchInput) (*sqs.DeleteMessageBatchOutput, error) GetQueueUrl(*sqs.GetQueueUrlInput) (*sqs.GetQueueUrlOutput, error) ReceiveMessage(*sqs.ReceiveMessageInput) (*sqs.ReceiveMessageOutput, error) }
SQSAPI is the part of the AWS SQS API which is used by the sqsconsumer package.
type SQSService ¶
SQSService links an SQS client with a queue URL.
func NewSQSService ¶
func NewSQSService(queueName string, svc SQSAPI) (*SQSService, error)
Takes SQS type as an argument so the library may be mocked and tested locally
func SQSServiceForQueue ¶
func SQSServiceForQueue(queueName string, opts ...AWSConfigOption) (*SQSService, error)
SQSServiceForQueue creates an AWS SQS client configured for the given region and gets or creates a queue with the given name.