sqsch

package module
v0.0.0-...-a1067e7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 8, 2019 License: MIT Imports: 9 Imported by: 0

README

sqs-receive-channel Build Status GoDoc

This package (package sqsch) provides a channel-oriented interface for processing messages from an SQS queue.

SQS charges per API request. This package implements Horizontal Scaling and Action Batching as recommended by the SQS docs to make efficient use of API calls. This allows you to write applications that can handle both high throughput and long idle periods effectively.

  • Fetches as many messages (ReceiveMessages) as the receive channel's buffer can fit
    • 0 (full): no requests are issued
    • 1 to 10: a single request is issued
    • 10+: multiple requests are issued concurrently—all must complete before the loop can continue
  • Uses SQS long polling to reduce requests when no messages are available
  • Deletes messages in batches
    • When 10 messages are enqueued for deletion via the delete channel (the maximum batch size)
    • After Delete.Interval (e.g. 1s), regardless of whether any other messages can be deleted in the batch
    • Will issue as many concurrent batch deletion requests as specified in Delete.Concurrency (default: 1)

Usage

import (
  "context"
  "time"

  "github.com/bendrucker/sqs-receive-channel"
)

func main() {
  receive, delete, errs := sqsch.Start(context.TODO(), sqsch.Options{
    ReceiveBufferSize: 10,
    DeleteInterval: time.Duration(1) * time.Second
  })

  go func() {
    message := <-receive
    fmt.Println(message.Body)

		// SQS DeleteMessageBatch API will be called after:
		// a) 1 second or b) 9 more messages are processed
    delete <- message
  }()

  go func() {
    err := <-errs
    fmt.Println(err)
  }()
}

Example

The following example illustrates the API calls made by this package in a "bursty" application. This scenario envisions a queue that is mostly idle, but receives large numbers of messages on occasion.

  • Configuration: ReceiveBufferSize=10, DeleteInterval=time.Duration(1)*time.Second,
  • Queue is idle for 55 seconds
  • 10 messages become available
  • All messages are read from the receive channel immediately
  • All messages are handled and sent to the delete channel within 100ms

The following API requests will be performed in the first 60s:

  • 4 calls to ReceiveMessage (2 that return empty, 1 that returns 10 messages, and then 1 that remains open at 1:00)
  • 1 call to DeleteMessageBatch, containing 10 message handles

Compared to a naive approach (1 request per message, short polling), this can reduce requests to the SQS API by a considerable margin (1-3 orders of magnitude). Results will vary depending on throughput.

See Also

Documentation

Index

Constants

View Source
const (
	// MaxLongPollDuration is the maximum duration for SQS long polling
	// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-long-polling.html
	MaxLongPollDuration = 20 * time.Second

	// MaxBatchSize is the largest number of messages that can be processed in a batch SQS request.
	// It applies to ReceiveMessage (MaxNumberOfMessages) and DeleteMessageBatch (DeleteMessageBatchRequestEntry.N)
	// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html
	// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_DeleteMessageBatch.html
	MaxBatchSize = 10
)

Variables

This section is empty.

Functions

func Start

func Start(ctx context.Context, options Options) (
	<-chan *sqs.Message,
	chan<- *sqs.Message,
	<-chan error,
)

Start allocates channels, begins receiving, and begins processing deletes

Types

type BatchDeleteError

type BatchDeleteError struct {
	Code          string
	Message       string
	ReceiptHandle string
}

BatchDeleteError represents an error returned from SQS in response to a DeleteMessageBatch request

func (*BatchDeleteError) Error

func (err *BatchDeleteError) Error() string

type DeleteOptions

type DeleteOptions struct {
	Interval    time.Duration
	Concurrency int
}

DeleteOptions configures deletion of messages from SQS

func (*DeleteOptions) Defaults

func (do *DeleteOptions) Defaults()

Defaults sets default values

type Dispatch

type Dispatch struct {
	Options Options
	// contains filtered or unexported fields
}

Dispatch provides methods for processing messages SQS via channels

func (*Dispatch) BatchDeletes

func (d *Dispatch) BatchDeletes(deletes <-chan *sqs.Message) <-chan []*sqs.DeleteMessageBatchRequestEntry

BatchDeletes buffers messages received on the delete channel, batching according to the Delete.Interval and the MaxBatchSize

func (*Dispatch) Delete

func (d *Dispatch) Delete(ctx context.Context)

Delete processes messages received on the delete channel until the supplied context is canceled. It batching messages with BatchDeletes and calls the SQS DeleteMessageBatch API to trigger deletion. If there are failures in the DeleteMessageBatchOutput, it sends one error per failure to the errors channel.

func (*Dispatch) QueueURL

func (d *Dispatch) QueueURL() *string

QueueURL returns the SQS Queue URL specified with Options.Receive.ReceiveMessageInput

func (*Dispatch) Receive

func (d *Dispatch) Receive(ctx context.Context)

Receive runs a loop that receives messages from SQS until the supplied context is canceled. It checks for available space on the receive channel's buffer. It fetches up to that number of messages from SQS and sends them to the receive channel. If the receive buffer is full, it continues looping until capacity is detected. Because SQS bills per API request, ReceiveMessageInput.WaitTimeSeconds allows the loop to block for up to 20 seconds if no messages are available to receive which results in ~3 requests per minute instead of hundreds when your queue is idle.

func (*Dispatch) ReceiveCapacity

func (d *Dispatch) ReceiveCapacity() int

ReceiveCapacity returns the available space in the receive channel's buffer. This is used to determine how many ReceiveMessage requests to issue and how many messages (count) are requested in each.

type Options

type Options struct {
	Receive ReceiveOptions
	Delete  DeleteOptions

	SQS sqsiface.SQSAPI
}

Options represents the user-configurable options for a Dispatch

func (*Options) Defaults

func (o *Options) Defaults()

Defaults sets default values

type ReceiveOptions

type ReceiveOptions struct {
	BufferSize          int
	RecieveMessageInput *sqs.ReceiveMessageInput
}

ReceiveOptions configures receiving of messages from SQS

func (*ReceiveOptions) Defaults

func (ro *ReceiveOptions) Defaults()

Defaults sets default values

Directories

Path Synopsis
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.
pkg

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL