gosqs

package module
v0.0.0-...-07095ca Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 4, 2023 License: MIT Imports: 15 Imported by: 0

README

gosqs

Circleci Go Reference

What if sending events to SQS or SNS was as easy as this

So easy to send messages!

What if receiving SQS messages was as easy as this

So easy to send messages!

Yes it is possible! Check out my blog post that led me to develop this library

Do you have a simple messaging system and need to scale to hundreds, thousands, millions? I am available for contracts to scale your async messaging system and provide a customized solution for you and your team/company/project! Email me at qhenkart@gmail.com. Let's make magic together!

Better yet, do it yourself for free with this open source project.

GoSQS serves as the messaging interface between AWS-SQS and AWS-SNS services. If there is an feature you would like implemented, please make a Pull Request

Please refer to the examples to see how to interact with this library. Please make contributions, and post any issues.

Take a look at the Dead Letter Queue and Naming your Queue as they are important for this design pattern

TODO

  • create better documentation
  • implement internal testing package to avoid dependency
  • create a controller that scales the worker pool up automatically depending on the message count and then kills the workers to avoid costs

Scaling the Consumers

Each SQS consumer is ready to be scaled up as well as scaled out.

  • Scaling Out: You can scale out by creating additional instances with no extra configuration required. The library and SQS is designed for multiple workers reaching into the same pool

  • Scaling Up: Each consumer has a configuration variable config.WorkerPool. The default is set to 30, that means there are 30 goroutines checking for messages at any given time. You can increase the amount of active threads simply by adjusting that number. Make sure to monitor CPU usage to find the right count for your application. For a local or dev environment. Reduce this number to 1 to save battery

Configuring SNS

configuring SNS is easy, simply login to the AWS-console, navigate to SNS. Click on Topics on the sidebar and "Create New Topic". Fill in the name and display name.

  • make sure to set the topic delivery policy to exponential back off

Configuring SQS

  1. Navigate to aws-sqs
  2. Choose a queue Name and click on Standard Queue
  3. Click configure, apply optional configurations
  4. Hit Create Queue
  5. in the main page, click on the newly created queue
  6. Click on Queue Actions and at the bottom hit "Subscribe Queue to SNS Topic"
  7. Select the SNS topic from the dropdown provided

Naming your Queue

The naming convention for queues supported by this library follow the following syntax

-

SQS Configurations

Default Visibility Timeout

The default visibility timeout is responsible for managing how often a single message gets received by a consumer. A message remains in the queue until it is deleted, and any receipt of the message that does not end in deletion before the Visibility Timeout is hit is considered a "failure". As a result, the Default Visibility Timeout should exceed the maximum possible amount of time it would take for a handler to process and delete a message.

The currently set default is 30 seconds

note The visibility timeout is extended for an individual message, for a maximum of 3 x the visibility timeout

Message Retention Period

The # of days that the Queue will hold on to an unconsumed message before deleting it. Since we will always be consuming, this value is not important, the default is 4 days

Receive Message Wait Time

The amount of time that the request will hang before returning 0 messages. This field is important as it allows us to use long-polling instead of short-polling. The default is 0 and it should be set to the max 20 seconds to save on processing and cost. AWS recommends using long polling over short polling

Custom Attributes

You can add custom attributes to your SQS implementation. These are fields that exist outside of the payload body. A common practice is to include a correlationId or some sort of trackingId to track a message

DEAD LETTER QUEUE CONFIGURATION

The following settings activate an automatic reroute to the DLQ upon repetetive failure of message processing.

  • Redrive Policy must be checked
  • The Dead Letter Queue name must be provided (A DLQ is just a normal SQS)
  • Maximum Receives reflects the amount of times a message is received, but not deleted before it is requeued into the DLQ
  • Including a DLQ is an absolute must, do not run a system without it our you will be vulnerable to Poison-Pill attacks

Consumer Configuration

Custom Middleware

You can add custom middleware to your consumer. These will run using the adapter method before each handler is called. You can include a logger or modify the context etc

Testing

You can set up a local SNS/SQS emulator using https://github.com/p4tin/goaws. Contributions have been added to this emulator specifically to support this library Tests also require this to be running, I will eventually set up a ci environment that runs the emulator in a container and runs the tests

Documentation

Index

Constants

View Source
const DataTypeNumber = dataType("Number")

DataTypeNumber represents the Number datatype, use it when creating custom attributes

View Source
const DataTypeString = dataType("String")

DataTypeString represents the String datatype, use it when creating custom attributes

Variables

View Source
var ErrBodyOverflow = newSQSErr("message surpasses sqs limit of 262144, please truncate body")

ErrBodyOverflow AWS SQS can only hold payloads of 262144 bytes. Messages must either be routed to s3 or truncated

View Source
var ErrGetMessage = newSQSErr("unable to retrieve message")

ErrGetMessage fires when a request to retrieve messages from sqs fails

View Source
var ErrInvalidCreds = newSQSErr("invalid aws credentials")

ErrInvalidCreds invalid credentials

View Source
var ErrInvalidVal = newSQSErr("value type does not match specified datatype")

ErrInvalidVal the custom attribute value must match the type of the custom attribute Datatype

View Source
var ErrMarshal = newSQSErr("unable to marshal request")

ErrMarshal unable to marshal request

View Source
var ErrMessageProcessing = newSQSErr("processing time exceeding limit")

ErrMessageProcessing occurs when a message has exceeded the consumption time limit set by aws SQS

View Source
var ErrNoRoute = newSQSErr("message received without a route")

ErrNoRoute message received without a route

View Source
var ErrPublish = newSQSErr("message publish failure. Retrying...")

ErrPublish If there is an error publishing a message. gosqs will wait 10 seconds and try again up to the configured retry count

View Source
var ErrQueueURL = newSQSErr("undefined queueURL")

ErrQueueURL undefined queueURL

View Source
var ErrUnableToDelete = newSQSErr("unable to delete item in queue")

ErrUnableToDelete unable to delete item

View Source
var ErrUnableToExtend = newSQSErr("unable to extend message processing time")

ErrUnableToExtend unable to extend message processing time

View Source
var ErrUndefinedPublisher = newSQSErr("sqs publisher is undefined")

ErrUndefinedPublisher invalid credentials

Functions

func WithDispatcher

func WithDispatcher(ctx context.Context, pub Publisher) context.Context

WithDispatcher sets an adapter to support sending async messages

Types

type Adapter

type Adapter func(Handler) Handler

Adapter implements adapters in the context

func WithMiddleware

func WithMiddleware(f func(ctx context.Context, m Message) error) Adapter

WithMiddleware add middleware to the consumer service

func WithRecovery

func WithRecovery(recovery func()) Adapter

WithRecovery is an adapter that logs a Panic error and recovers the service from a failed state

type Config

type Config struct {
	// a way to provide custom session setup. A default based on key/secret will be used if not provided
	SessionProvider SessionProviderFunc
	// private key to access aws
	Key string
	// secret to access aws
	Secret string
	// region for aws and used for determining the topic ARN
	Region string
	// provided automatically by aws, but must be set for emulators or local testing
	Hostname string
	// account ID of the aws account, used for determining the topic ARN
	AWSAccountID string
	// environment name, used for determinig the topic ARN
	Env string
	// prefix of the topic, this is set as a prefix to the environment
	TopicPrefix string
	// optional address of the topic, if this is not provided it will be created using other variables
	TopicARN string
	// optional address of queue, if this is not provided it will be retrieved during setup
	QueueURL string
	// used to extend the allowed processing time of a message
	VisibilityTimeout int
	// used to determine how many attempts exponential backoff should use before logging an error
	RetryCount int
	// defines the total amount of goroutines that can be run by the consumer
	WorkerPool int
	// defines the total number of processing extensions that occur. Each proccessing extension will double the
	// visibilitytimeout counter, ensuring the handler has more time to process the message. Default is 2 extensions (1m30s processing time)
	// set to 0 to turn off extension processing
	ExtensionLimit *int

	// Add custom attributes to the message. This might be a correlationId or client meta information
	// custom attributes will be viewable on the sqs dashboard as meta data
	Attributes []customAttribute

	// Add a custom logger, the default will be log.Println
	Logger Logger
}

Config defines the gosqs configuration

func (*Config) NewCustomAttribute

func (c *Config) NewCustomAttribute(dataType dataType, title string, value interface{}) error

NewCustomAttribute adds a custom attribute to SNS and SQS messages. This can include correlationIds, logIds, or any additional information you would like separate from the payload body. These attributes can be easily seen from the SQS console.

must use gosqs.DataTypeNumber of gosqs.DataTypeString for the datatype, the value must match the type provided

type Consumer

type Consumer interface {
	// Consume polls for new messages and if it finds one, decodes it, sends it to the handler and deletes it
	//
	// A message is not considered dequeued until it has been sucessfully processed and deleted. There is a 30 Second
	// delay between receiving a single message and receiving the same message. This delay can be adjusted in the AWS
	// console and can also be extended during operation. If a message is successfully received 4 times but not deleted,
	// it will be considered unprocessable and sent to the DLQ automatically
	//
	// Consume uses long-polling to check and retrieve messages, if it is unable to make a connection, the aws-SDK will use its
	// advanced retrying mechanism (including exponential backoff), if all of the retries fail, then we will wait 10s before
	// trying again.
	//
	// When a new message is received, it runs in a separate go-routine that will handle the full consuming of the message, error reporting
	// and deleting
	Consume()
	// RegisterHandler registers an event listener and an associated handler. If the event matches, the handler will
	// be run
	RegisterHandler(name string, h Handler, adapters ...Adapter)
	// Message serves as the direct messaging capability within the consumer. A worker can send direct messages to other workers
	Message(ctx context.Context, queue, event string, body interface{})
	// MessageSelf serves as the self messaging capability within the consumer, a worker can send messages to itself for continued
	// processing and resiliency
	MessageSelf(ctx context.Context, event string, body interface{})
}

Consumer provides an interface for receiving messages through AWS SQS and SNS

func NewConsumer

func NewConsumer(c Config, queueName string) (Consumer, error)

NewConsumer creates a new SQS instance and provides a configured consumer interface for receiving and sending messages

type Handler

type Handler func(context.Context, Message) error

Handler provides a standardized handler method, this is the required function composition for event handlers

type Logger

type Logger interface {
	Println(v ...interface{})
}

Logger provides a simple interface to implement your own logging platform or use the default

type Message

type Message interface {
	// Route returns the event name that is used for routing within a worker, e.g. post_published
	Route() string
	// Decode will unmarshal the message into a supplied output using json
	Decode(out interface{}) error
	// DecodeModified is used for decoding the modification message, it will populate the body with the actual message and a
	// map[string]interface{} to view original values from that message
	DecodeModified(out interface{}, changes interface{}) error
	// Attribute will return the custom attribute that was sent through out the request.
	Attribute(key string) string
}

Message serves as the message interface for handling the message

type Notifier

type Notifier interface {
	ModelName() string
}

Notifier used for broadcasting messages

type Publisher

type Publisher interface {
	// Create sends a message using a notifier, the modelname will be prepended to the static event, e.g post_created
	Create(n Notifier)
	// Delete sends a message using a notifier, the modelname will be prepended to the static event, e.g post_deleted
	Delete(n Notifier)
	// Update sends a message using a notifier, the modelname will be prepended to the static event, e.g post_updated
	Update(n Notifier)
	// Modify sends a message using a notifier, as a map of changes. The modelname will be prepended to the static event, e.g post_modified
	//
	// a special decoder will need to be used to process these events
	Modify(n Notifier, changes interface{})
	// Dispatch sends a message using a notifier, the modelname will be prepended to the provided event, e.g post_published
	Dispatch(n Notifier, event string)
	// Message sends a direct message to an individual queue, the queueName(receiver) must be provided. The event will be sent
	// as is, no prepending will take place. No other queues will receive this message.
	Message(queue, message string, body interface{})
}

Publisher provides an interface for sending messages through AWS SQS and SNS

func Dispatcher

func Dispatcher(ctx context.Context) (Publisher, error)

Dispatcher retrieves the sqs dispatcher from the context for sending messeges

func MustDispatcher

func MustDispatcher(ctx context.Context) Publisher

MustDispatcher retrieves the sqs dispatcher from the context for sending messeges or panics if the Dispatcher does not exist in the context

func NewPublisher

func NewPublisher(c Config) (Publisher, error)

NewPublisher creates a new SQS/SNS publisher instance

type SQSError

type SQSError struct {
	Err string `json:"err"`
	// contains filtered or unexported fields
}

SQSError defines the error handler for the gosqs package. SQSError satisfies the error interface and can be used safely with other error handlers

func (*SQSError) Context

func (e *SQSError) Context(err error) *SQSError

Context is used for creating a new instance of the error with the contextual error attached

func (*SQSError) Error

func (e *SQSError) Error() string

Error is used for implementing the error interface, and for creating a proper error string

type SessionProviderFunc

type SessionProviderFunc func(c Config) (*session.Session, error)

SessionProviderFunc can be used to add custom AWS session setup to the gosqs.Config. Callers simply need to implement this function type and set it as Config.SessionProvider. If Config.SessionProvider is not set (is nil), a default provider based on AWS Key/Secret will be used.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL