Documentation ¶
Overview ¶
Package sqshandler implements a lightweight wrapper for handling SQS Events inside Lambda functions, reducing the complexity of dealing with different and sometimes unexpected conditions to a set of defined results in a work-based abstraction.
Index ¶
Constants ¶
const ( DefaultIniTimeout = 5 DefaultMaxTimeout = 300 DefaultMultiplier = 2.5 DefaultRandFactor = 0.3 )
Default values for BackOff.
const SQSMaxVisibility = 43200
SQS maximum value for message visibility
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BackOff ¶
BackOff defines values used for calculating a message's exponential backoff in case of a transient failure by altering its visibility timeout. Each retry attempt will take exponentially longer based on the amount of delivery attempts (attribute ApproximateReceiveCount) until the message is either delivered or sent to a DLQ, according to the following parameters:
InitTimeoutSec ¶
Defines the initial timeout value for the message, in seconds, ranging from 0 to 43200 (12h, the maximum value accepted by AWS).
MaxTimeoutSec ¶
Defines the maximum timeout value for the message, in seconds, ranging from 0 to 43200 (12h, the maximum value accepted by AWS). Note that this does not include jitter ranges, unless the final value exceeds 43200.
For example, if MaxTimeoutSec is set to 6, and RandFactor is set to 0.5, the final timeout value can be any integer from 3 to 9. However if MaxTimeoutSec is set to 43200, the values will range from 21600 to 43200 (instead of 64800).
Multiplier ¶
Defines the scaling factor of the exponential function. Note that the resulting values will be rounded down, as AWS only accepts positive integer values. Setting this value to 1 will linearize the backoff curve.
RandFactor ¶
Adds a jitter factor to the function by making it so that the final timeout value ranges in [interval * (1 ± RandFactor)], rounded down. Setting this value to 0 disables it.
Example ¶
For the default values 5, 300, 2.5 and 0.2:
D Timeout Timeout Timeout # (Raw) (NoJitter) (WithJitter) 1 5 5 [4, 6] 2 12.5 12 [9, 14] 3 31.25 31 [24, 37] 4 78.125 78 [62, 93] 5 195.3125 195 [156, 234] 6 488.28125 300 [240, 360] 7 1220.7031 300 [240, 360]
Based on `https://github.com/cenkalti/backoff/blob/v4/exponential.go`.
For more information about message visibility, see: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html.
func NewBackOff ¶
func NewBackOff() BackOff
NewBackoff creates an instance of BackOff using default values.
type Handler ¶
type Handler struct { BackOff BackOff Context context.Context FailureDlqURL string SQSClient SQSClient }
Handler functions as a configurable wrapper of an SQS Event, letting you control some of the behaviors related to a message's lifetime inside the queue.
BackOff ¶
BackOff configures the exponential backoff values of retried messages.
Context ¶
The Lambda's context.Context. Note that if this context has an invalid Deadline, such as in a context.TODO, Lambda's default of 15 minutes will be considered by the Handler as the ceiling for execution.
FailureDlqURL ¶
This property is optional.
The URL of a DLQ to which messages with a Status of FAILURE will be sent to. This can be the original queue's own DLQ, or a separate one. Make sure that your Lambda has an execution role with enough permissions to write to said queue.
SQSClient ¶
A properly configured sqs.Client, which will be used by the Handler to interface with the queue.
Configuring parallelism and number of retries ¶
By design, a Handler will process all messages in a batch in parallel. Thus, the degree of parallelism can be controlled by altering the property Batch Size of your Lambda's trigger.
In much the same way, the amount of retries is tied to your queue's Max Receive Count property, which defines how many delivery attempts will be made on a single message until it is moved to a DLQ. For more information, see:
func New ¶
New creates an instance of Handler with default BackOff values on retries, no DLQ set for messages with a Status of FAILURE and a sqs.Client with default configurations, as per the following:
cfg, err := config.LoadDefaultConfig(c) if err != nil { log.Fatalf("unable to load SDK config, %v", err) } return sqs.NewFromConfig(cfg)
func (*Handler) HandleEvent ¶
func (b *Handler) HandleEvent(event *events.SQSEvent, worker Worker) (events.SQSEventResponse, error)
HandleEvent is, as the name implies, the method called for handling an events.SQSEvent. It works by separating the event into processable messages which will then be given to a user defined Worker. After all messages have been processed, a Report will be printed to stdout detailing their Status and any errors that may have occurred.
Handling Messages ¶
[event.SQSMessage]s inside the events.SQSEvent will be delivered as they are to the Worker for processing, in parallel. The Handler will wait for a Status callback for all messages up until 5 seconds before the Lambda's configured timeout. After that, any message which is still being processed will be ignored and returned to the queue with their default VisibilityTimeout value. If any Worker panics during execution, the Handler will consider that it failed to process the message, and assign it such Status itself.
At this point, all possible messages should be in one of the four states defined by Status. Any property that deviates from those 4 will be treated as a [Status.FAILURE], regardless if the message was successfully processed or not, with an appended error describing the issue.
Reporting to the Lambda framework ¶
While the Lambda framework accepts quite a few handler signatures, this Handler requires the use of
func (context.Context, TIn) (TOut, error)
as we will be reporting any messages that we want to retry in a events.SQSEventResponse in order to avoid unnecessary calls to the SQS API.
With this option, the framework behaves differently depending on which kind of response is given after execution. We will be focusing on 3 specific kinds (the complete list can be found here):
- Empty events.SQSEventResponse, no error
- Populated events.SQSEventResponse, no error
- Anything, error
These 3 responses will be utilized in different scenarios, which will vary depending on the Status reported by your Worker:
- If all messages were reported as [Status.SUCCESS], [Status.SKIP] or [Status.FAILURE] response number 1 will be used for the Lambda, signaling the framework that all messages were processed successfully and can be deleted from the queue. Failed messages will be reported as such by the Handler and sent to a DLQ, if one was configured. This response was chosen for failures due to how the framework interprets errors, which will be explained later.
- If any message was reported as [Status.RETRY], then response number 2 will be used, where events.SQSEventResponse will contain the ids of all messages that need reprocessing.
- If the Lambda reached a timeout, then response number 3 will be used, and the Handler will behave a bit differently, due to how the framework deals with errors.
Which brings us to the next point:
Reporting Errors ¶
If any error reaches the Lambda framework, it will consider the execution as having completely failed and any and all messages that were delivered will be returned to the queue with their default VisibilityTimeout.
Thus, even if your Worker returns an error on a [Status.FAILURE], it will not be reported directly to the Lambda framework, as doing so would mean that all messages, including successfully processed ones, would be returned to the queue. These errors will still show up during logging, as they are printed to stdout in the Report.
On a timeout, however, the Handler doesn't know how to proceed with the messages that didn't report back. Thus, the best idea is to return them to the queue as they were, where they will follow the redelivery pattern until the problem is fixed or they are naturally sent to a DLQ. Note, however, that in order to ensure that only these messages will be returned, the Handler will have to MANUALLY DELETE every message with a Status of SUCCESS, SKIP and FAILURE from the queue.
Handler Errors ¶
When dealing with messages, the Handler will perform a few checks and make calls to the SQS API (like SendMessage for sending failures to a DLQ and ChangeMessageVisibility for exponential backoff on retries). If any of these fail, they will be reported at the end of execution. Statuses will try to be preserved as much as possible. Failures will still be removed from the queue in order to avoid queue-poisoning and Retries will still be sent back, only with their VisibilityTimeout unchanged. Note that these behaviors are open to debate, so feel free to reach out with your thoughts on the matter.
type Report ¶ added in v0.9.1
type Report struct { BatchSize int `json:"batchSize"` Success int `json:"success,omitempty"` Skip int `json:"skip,omitempty"` Retry *retryFailureReport `json:"retry,omitempty"` Failure *retryFailureReport `json:"failure,omitempty"` HandlerErrors []errorReport `json:"handlerErrors,omitempty"` }
Report defines a struct used for serializing and printing an execution report of a batch of messages.
BatchSize ¶
A count of how many events.SQSMessage were contained in the events.SQSEvent
Success, Skip ¶
A count of how many messages had a processing Status of said values.
Retry, Failure ¶
A count of how many messages had a processing Status of said values, including any individual errors reported by either the Handler or Worker.
HandlerErrors ¶
A collection of errors that occurred during message handling (changing visibility, sending to a DLQ, etc.).
type SQSClient ¶
type SQSClient interface { ChangeMessageVisibility(context.Context, *sqs.ChangeMessageVisibilityInput, ...func(*sqs.Options)) (*sqs.ChangeMessageVisibilityOutput, error) SendMessage(context.Context, *sqs.SendMessageInput, ...func(*sqs.Options)) (*sqs.SendMessageOutput, error) DeleteMessage(context.Context, *sqs.DeleteMessageInput, ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error) }
Interface to enable mocking of a SQSClient, usually for testing purposes.
type Status ¶
type Status string
Status defines the four possible states a Worker can report to the Handler. These states are:
SUCCESS ¶
Denotes that the message has been processed successfully and thus can be safely removed from the queue.
SKIP ¶
Denotes that the message is not relevant and has thus been skipped. Functionally identical to SUCCESS, used for reporting purposes.
RETRY ¶
Denotes that the message was not processed successfully due to a transient error. This signals the Handler to return the message to the queue with an updated VisibilityTimeout, following its [Backoff] configuration.
FAILURE ¶
Denotes that the message was not processed successfully due to immutable reasons that cannot be solved by simply retrying the operation. This signals the Handler to send this message to a DLQ, if one was specified during configuration, and then to remove said message from the queue.
type Worker ¶
The Worker interface defines the main method which will be called by the Handler for message processing: [Worker.Work], and should be considered the entry point of your lambda function.
Work ¶
This method will be called once per message by the Handler, which will pass along the Lambda context and the SQSMessage to be worked upon.
IMPORTANT: This means that this method will be called for all messages IN PARALLEL, and should be treated as a Go Routine. Pay special attention if your code makes use of shared memory structs or any other non-thread safe components!
Note that the Lambda context contains its preconfigured timeout, which your Worker should respect. Also note that if a timeout is imminent, the Handler will reserve 5 seconds of total runtime in order to cleanup all previously handled messages. Any Work that does not return before that will be discarded, and its message will be returned to the queue with its DefaultVisibilityTimeout.
After processing a message, the Worker should return the relevant Status and errors that may have occurred.
If a Worker panics during execution, the Handler will consider that processing has failed, and treat the message as if a [Status.FAILURE] was returned.