sqshandler

package module
v1.0.0-beta Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 20, 2023 License: MIT Imports: 15 Imported by: 0

README

go-lambda-sqs-handler

A lightweight wrapper for handling SQS Events inside Lambda functions, in Go.

Description

Dealing with single events inside Lambda functions is usually pretty straightforward, as the event is either handled succesfully, or not.

Dealing with multiple messages inside a single event, however, usually proves harder. What if only one message couldn't be handled? Should I return an error? What if many of them couldn't, but not all? Do I delete them manually from the queue? If I set up a DLQ, does that mean that all messages that I couldn't handle will keep returning to the queue again and again until they finally go away? What if I only want some of them to go away immediately, while others are retried?

To alleviate said issues that many developers face when building Lambda functions with SQS triggers, go-lambda-sqs-handler aims to reduced the complexity of dealing with problems related to infrastructure so you can focus on the logic that matters.

Features

Simplified handling of different scenarios using a work-based abstraction.

This package works by wrapping an sqs.Event and shifiting focus to individual messages instead, which will be handled by your customized Worker. The Result of said Worker.Work() will then be processed individually for each message, which brings us to the second point:

Extended error handling to accommodate both transient and non-transient scenarios.

By defining different Status properties, we can differentiate between which messages should be retried and which should not.

Server returned a 503 Unavailable? Return this message to the queue with an exponential Backoff.

Malformed message payload? Send it straight to a pre-configured DLQ to avoid queue-poisoning.

Improved usage of Lambda execution times by handling multiple messages in parallel.

Make use of idle threads during I/O operations by handling other messages in the batch.

Requirements

Make sure you have:

  • A DLQ set on your SQS queue
  • A Lambda function connected to a SQS trigger
  • The Property Report batch item failures set to Yes on said trigger

Instalation

Run go get github.com/ram-sa/go-lambda-sqs-handler

Then add import "github.com/ram-sa/go-lambda-sqs-handler" to your function handler

Example

package main
import "github.com/ram-sa/go-lambda-sqs-handler"

type MyWorker struct {
    // Set up your worker as needed
}

// This is the function that will be called by the Handler for processing a message.
func (w MyWorker) Work(c context.Context, m events.SQSMessage) {
    body := parseTheMessageBody(m.Body)
    doSomeStuff(body)
    err := sendItToSomeone(body)
    // Did everything go well?
    if isTransient(err){
        // Message will return to the queue with an exponential backoff.
        return sqshandler.Result{
            Message: &m, Status: Retry, Error: err
        }
    }else {
        // Everything went great, message acknowledged!
        return sqshandler.Result{
            Message: &m, Status: Success
        }
    }
}

func main() {
	lambda.Start(handleRequest)
}

func handleRequest(ctx context.Context, sqsEvent events.SQSEvent) (events.SQSEventResponse, error) {
    //Create a new handler with default configuration
	handler := sqshandler.New(ctx)
    //Initialize your worker as needed
    worker := MyWorker{}
	return handler.HandleEvent(&sqsEvent, worker)
}

For more information, check the docs at https://pkg.go.dev/github.com/ram-sa/go-lambda-sqs-handler

Documentation

Overview

Package sqshandler implements a

Index

Constants

View Source
const (
	DefaultIniTimeout = 5
	DefaultMaxTimeout = 300
	DefaultMultiplier = 2.5
	DefaultRandFactor = 0.3
)

Default values for BackOff.

View Source
const SQSMaxVisibility = 43200

Variables

This section is empty.

Functions

This section is empty.

Types

type BackOff

type BackOff struct {
	InitTimeoutSec, MaxTimeoutSec uint16
	Multiplier, RandFactor        float64
}

BackOff defines values used for calculating a message's exponential backoff in case of a transient failure by altering its visibility timeout. Each retry attempt will take exponentially longer based on the amount of delivery attempts (attribute `ApproximateReceiveCount`) until the message is either delivered or sent to a DLQ, according to the following parameters:

InitTimeoutSec

Defines the initial timeout value for the message, in seconds, ranging from 0 to 43200 (12h, the maximum value accepted by AWS).

MaxTimeoutSec

Defines the maximum timeout value for the message, in seconds, ranging from 0 to 43200 (12h, the maximum value accepted by AWS). Note that this does not include jitter ranges, unless the final value exceeds 43200.

For example, if MaxTimeoutSec is set to 6, and RandFactor is set to 0.5, the final timeout value can be any integer from 3 to 9. However if MaxTimeoutSec is set to 43200, the values will range from 21600 to 43200 (instead of 64800).

Multiplier

Defines the scaling factor of the exponential function. Note that the resulting values will be rounded down, as AWS only accepts positive interger values. Setting this value to 1 will linearize the backoff curve.

RandFactor

Adds a jitter factor to the function by making it so that the final timeout value ranges in [interval * (1 ± RandFactor)], rounded down. Setting this value to 0 disables it.

Example

For the default values 5, 300, 2.5 and 0.2:

D	Timeout		Timeout		Timeout
#	(Raw)		(NoJitter)	(WithJitter)

1	5   		5   		[4, 6]
2	12.5		12  		[9, 14]
3	31.25		31  		[24, 37]
4	78.125		78  		[62, 93]
5	195.3125	195 		[156, 234]
6	488.28125	300 		[240, 360]
7	1220.7031	300 		[240, 360]

Based on `https://github.com/cenkalti/backoff/blob/v4/exponential.go`.

For more information about message visibility, see: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html.

func NewBackOff

func NewBackOff() BackOff

NewBackoff creates an instance of BackOff using default values.

type Handler

type Handler struct {
	BackOff       BackOff
	Context       context.Context
	FailureDlqURL string
	SQSClient     SQSClient
}

func New

func New(c context.Context) *Handler

New creates an instance of BatchHandler with default values for exponential backoff on retries, no DLQ for failed messages and a sqs.Client with default configurations.

func (*Handler) HandleEvent

func (b *Handler) HandleEvent(event *events.SQSEvent, worker Worker) (events.SQSEventResponse, error)

HandleEvent

type Result

type Result struct {
	Message *events.SQSMessage `validate:"required"`
	Status  Status             `validate:"oneof=FAILURE RETRY SKIP SUCCESS"`
	Error   error
}

type SQSClient

type SQSClient interface {
	ChangeMessageVisibility(context.Context, *sqs.ChangeMessageVisibilityInput, ...func(*sqs.Options)) (*sqs.ChangeMessageVisibilityOutput, error)
	SendMessage(context.Context, *sqs.SendMessageInput, ...func(*sqs.Options)) (*sqs.SendMessageOutput, error)
	DeleteMessage(context.Context, *sqs.DeleteMessageInput, ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error)
}

Interface to enable mocking of a SQSClient, usually for testing purposes

type Status

type Status string
const (
	Failure Status = "FAILURE"
	Retry   Status = "RETRY"
	Skip    Status = "SKIP"
	Success Status = "SUCCESS"
)

type Worker

type Worker interface {
	Work(context.Context, events.SQSMessage) Result
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL