sqspoller

package module
v0.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 22, 2020 License: MIT Imports: 7 Imported by: 0

README

kinluek

SQS-Poller

SQS-Poller is a simple queue polling framework, designed specifically to work with AWS SQS.

Installation

  1. Install sqspoller:
$ go get -u github.com/kinluek/sqspoller
  1. Import code:
import "github.com/kinluek/sqspoller"

Features

  • Timeouts
  • Polling Intervals
  • Graceful Shutdowns
  • Middleware
  • Simple Message Delete Methods

Quick Start

// example.go
package main

import (
	"context"
	"fmt"
	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/sqs"
	"github.com/kinluek/sqspoller"
	"log"
	"time"
)

func main() {

	// create SQS client.
	sess := session.Must(session.NewSession())
	sqsClient := sqs.New(sess)

	// use client to create default Poller instance.
	poller := sqspoller.Default(sqsClient)

	// supply polling parameters.
	poller.ReceiveMessageParams(&sqs.ReceiveMessageInput{
		MaxNumberOfMessages: aws.Int64(1),
		QueueUrl:            aws.String("https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"),
	})

	// configure poll interval and handler timeout
	poller.SetPollInterval(30 * time.Second)
	poller.SetHandlerTimeout(120 * time.Second)

	// supply handler to handle new messages
	poller.Handle(func(ctx context.Context, client *sqs.SQS, msgOutput *sqspoller.MessageOutput, err error) error {

		// check errors returned from polling the queue.
		if err != nil {
			return err
		}
		msg := msgOutput.Messages[0]

		// do work on message
		fmt.Println("GOT MESSAGE: ", msg)

		// delete message from queue
		if _, err := msg.Delete(); err != nil {
			return err
		}
		return nil
	})

	// Run poller.
	if err := poller.Run(); err != nil {
		log.Fatal(err)
	}
}

Using Middleware

func main() {
	poller := sqspoller.New(sqsClient)

	// IgnoreEmptyResponses stops empty message outputs from reaching the core handler
	// and therefore the user can guarantee that there will be at least one message in
	// the message output.
	poller.Use(sqspoller.IgnoreEmptyResponses())

	// Tracking adds tracking values to the context object which can be retrieved using
	// sqspoller.CtxKey.
	poller.Use(sqspoller.Tracking())

	// supply polling parameters.
	poller.ReceiveMessageParams(&sqs.ReceiveMessageInput{
		MaxNumberOfMessages: aws.Int64(1),
		QueueUrl:            aws.String("https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"),
	})

	// supply handler to handle new messages
	poller.Handle(func(ctx context.Context, client *sqs.SQS, msgOutput *sqspoller.MessageOutput, err error) error {
		// check errors returned from polling the queue.
		if err != nil {
			return err
		}
		msg := msgOutput.Messages[0]

		// get tracking values provided by Tracking middleware.
		v, ok := ctx.Value(sqspoller.CtxKey).(*sqspoller.CtxTackingValue)
		if !ok {
			return errors.New("tracking middleware should have provided traced ID and receive time")
		}

		fmt.Println(v.TraceID, v.Now)

		// delete message from queue
		if _, err := msg.Delete(); err != nil {
			return err
		}
		return nil
	})
	
	if err := poller.Run(); err != nil {
		log.Fatal(err)
	}
}

Shutdown

When shutting down the poller, there are three different modes of shutdown to choose from.

ShutdownNow
 poller.ShutdownNow()

The ShutdownNow method cancels the context object immediately and exits the Run() function. It does not wait for any jobs to finish handling before exiting.

ShutdownGracefully
 poller.ShutdownGracefully()

The ShutdownGracefully method waits for the handler to finish handling the current message before cancelling the context object and exiting the Run() function. If the handler is blocked then ShutdownGracefully will not exit.

ShutdownAfter
 poller.ShutdownAfter(30*time.Second)

The ShutdownAfter method attempts to shutdown gracefully within the given time, if the handler cannot complete it's current job within the given time, then the context object is cancelled at that time allowing the Run() function to exit.

If the timeout happens before the poller can shutdown gracefully then ShutdownAfter returns error, ErrShutdownGraceful.

func main() {
	poller := sqspoller.Default(sqsClient)

	poller.ReceiveMessageParams(&sqs.ReceiveMessageInput{
		MaxNumberOfMessages: aws.Int64(1),
		QueueUrl:            aws.String("https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"),
	})

	poller.Handle(Handler)

	// run poller in a separate goroutine and wait for errors on channel
	pollerErrors := make(chan error, 1)
	go func() {
		pollerErrors <- poller.Run()
	}()

	// listen for shutdown signals
	shutdown := make(chan os.Signal, 1)
	signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM)

	select {
	case err := <-pollerErrors:
		log.Fatal(err)
	case <-shutdown:
		if err := poller.ShutdownAfter(30 * time.Second); err != nil {
			log.Fatal(err)
		}
	}
}

Testing

Tests in the sqspoller_test.go file require that docker is installed and running on your machine as the tests spin up local SQS containers to test against.

Documentation

Overview

package sqspoller is a simple queue polling framework, designed specifically to work with AWS SQS.

Index

Constants

View Source
const CtxKey ctxKey = 1

CtxKey should be used to access the values on the context object of type *CtxTackingValue.

This can only be used if the Tracking outerMiddleware has been used. The Poller returned by Default() comes with this outerMiddleware installed.

Variables

View Source
var (
	ErrNoHandler              = errors.New("ErrNoHandler: no handler set on poller instance")
	ErrNoReceiveMessageParams = errors.New("ErrNoReceiveMessageParams: no ReceiveMessage parameters have been set")
	ErrHandlerTimeout         = errors.New("ErrHandlerTimeout: handler took to long to process message")
	ErrShutdownNow            = errors.New("ErrShutdownNow: poller was suddenly shutdown")
	ErrShutdownGraceful       = errors.New("ErrShutdownGraceful: poller could not shutdown gracefully in time")
	ErrAlreadyShuttingDown    = errors.New("ErrAlreadyShuttingDown: poller is already in the process of shutting down")
	ErrAlreadyRunning         = errors.New("ErrAlreadyShuttingDown: poller is already running")
	ErrIntegrityIssue         = errors.New("ErrIntegrityIssue: unknown integrity issue")
)

Functions

This section is empty.

Types

type CtxTackingValue added in v0.2.0

type CtxTackingValue struct {
	TraceID string
	Now     time.Time
}

CtxTackingValue represents the values stored on the context object about the message response which is passed down through the handler function and outerMiddleware.

This can only be used if the Tracking outerMiddleware has been used. The Poller returned by Default() comes with this outerMiddleware installed.

type Handler

type Handler func(ctx context.Context, client *sqs.SQS, msgOutput *MessageOutput, err error) error

Handler is a function which handles the incoming SQS message.

When making Handlers to be used by the Poller, make sure the error value is checked first, before any business logic code, unless you have created an error checking outerMiddleware that wraps the core Handler.

If the error is non-nil, it will be of type *awserr.Error which is returned from a failed receive message request from SQS.

The sqs Client used to instantiate the poller will also be made available to allow the user to perform standard sqs operations.

type Message

type Message struct {
	*sqs.Message
	// contains filtered or unexported fields
}

Message is an individual message, contained within a MessageOutput, it provides methods to remove itself from the SQS queue.

func (*Message) Delete

func (m *Message) Delete() (*sqs.DeleteMessageOutput, error)

Delete removes the message from the queue, permanently.

type MessageOutput

type MessageOutput struct {
	*sqs.ReceiveMessageOutput
	Messages []*Message
	// contains filtered or unexported fields
}

MessageOutput is contains the SQS ReceiveMessageOutput and is passed down to the Handler when the Poller is running.

type Middleware

type Middleware func(Handler) Handler

Middleware is a function which that wraps a Handler to add functionality before or after the Handler code.

func HandlerTimeout added in v0.3.0

func HandlerTimeout(t time.Duration) Middleware

HandlerTimeout takes a timeout duration and returns ErrHandlerTimeout if the handler cannot process the message within that time. The user can then use other outerMiddleware to check for ErrHandlerTimeout and decide whether to exit or move onto the next poll request.

func IgnoreEmptyResponses

func IgnoreEmptyResponses() Middleware

IgnoreEmptyResponses stops the data from being passed down to the inner handler, if there is no message to be handled.

func Tracking added in v0.2.0

func Tracking() Middleware

Tracking adds tracking information to the context object for each message output received from the queue. The information can be accessed on the context object by using the CtxKey constant and returns a *CtxTackingValue object, containing a traceID and receive time.

type Poller

type Poller struct {

	// Time to wait for handler to process message, if handler function
	// takes longer than this to return, then the program is exited.
	HandlerTimeout time.Duration

	// Time interval between each poll request. After a poll request
	// has been made and response has been handled, the poller will
	// wait for this amount of time before making the next call.
	PollInterval time.Duration

	// Holds the time of the last poll request that was made. This can
	// be checked periodically, to confirm the Poller is running as expected.
	LastPollTime time.Time
	// contains filtered or unexported fields
}

Poller is an instance of the polling framework, it contains the SQS client and provides a simple API for polling an SQS queue.

func Default

func Default(sqsSvc *sqs.SQS) *Poller

Default creates a new instance of the SQS Poller from an instance of sqs.SQS. It also comes set up with the recommend outerMiddleware plugged in.

func New

func New(sqsSvc *sqs.SQS) *Poller

New creates a new instance of the SQS Poller from an instance of sqs.SQS.

func (*Poller) Handle

func (p *Poller) Handle(handler Handler, middleware ...Middleware)

Handle attaches a Handler to the Poller instance, if a Handler already exists on the Poller instance, it will be replaced. The Middleware supplied to Handle will be applied first before any global middleware which are set by Use().

func (*Poller) ReceiveMessageParams added in v0.3.0

func (p *Poller) ReceiveMessageParams(input *sqs.ReceiveMessageInput, opts ...request.Option)

ReceiveMessageParams accepts the same parameters as the SQS ReceiveMessage method. It configures how the poller receives new messages, the parameters must be set before the Poller is run.

func (*Poller) Run

func (p *Poller) Run() error

Run starts the poller, the poller will continuously poll SQS until an error is returned, or explicitly told to shutdown.

func (*Poller) SetHandlerTimeout added in v0.2.0

func (p *Poller) SetHandlerTimeout(t time.Duration)

SetHandlerTimeout lets the user set the timeout for handling a message, if the handler function cannot finish execution within this time frame, the Handler will return ErrHandlerTimeout. The error can be caught and handled by custom middleware, so the user can choose to move onto the next poll request if they so wish.

func (*Poller) SetPollInterval added in v0.2.0

func (p *Poller) SetPollInterval(t time.Duration)

SetPollInterval lets the user set the time interval between poll requests.

func (*Poller) ShutdownAfter

func (p *Poller) ShutdownAfter(t time.Duration) error

ShutdownAfter will attempt to shutdown gracefully, if graceful shutdown cannot be achieved within the given time frame, the Poller will exit, potentially leaking unhandled resources.

func (*Poller) ShutdownGracefully

func (p *Poller) ShutdownGracefully() error

ShutdownGracefully gracefully shuts down the poller.

func (*Poller) ShutdownNow

func (p *Poller) ShutdownNow() error

ShutdownNow shuts down the Poller instantly, potentially leaking unhandled resources.

func (*Poller) Use

func (p *Poller) Use(middleware ...Middleware)

Use attaches global outerMiddleware to the Poller instance which will wrap any Handler and Handler specific outerMiddleware.

Directories

Path Synopsis
example
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL