sqsworker

package
v2.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 13, 2021 License: LGPL-3.0 Imports: 8 Imported by: 0

README

sqs-worker

Forked from: https://github.com/Stashchenko/sqs-worker

Worker service which reads from a SQS queue pulls off job messages and processes them concurrently.

Required config fields:

Optionally the follow config variables can be provided.

  • MaxNumberOfMessage - SQS ReceiveMessage could return multiple messages at once This is also and easier pattern if we want to bump up the number of messages that will be read from SQS at once by default 10 messages are read.
  • WaitTimeSecond - The duration (in seconds) for which the call waits for a message to arrive in the queue before returning. If a message is available, the call returns sooner than WaitTimeSeconds.
  • VisibilityTimeout - The duration (in seconds) that the received messages are hidden from subsequent retrieve requests after being retrieved by a ReceiveMessage request.
  • WorkerSize - Numbers of max parallel workers
  • Logger - by default used stdout. Supports logrus interface

Usage

参见 example/main.go

Run SQS Servce (localstack) from DockerHub:
docker run --rm \
 	-p 4566:4566 \
 	-e SERVICES:sqs \
 	--name sqs \
 	localstack/localstack
Create the SQS Queue:
aws --endpoint http://localhost:4566 \
	sqs create-queue \
	--queue-name task-queue \
	--attributes '{"ContentBasedDeduplication":"true"}'
Check the SQS Queue:
aws --endpoint http://localhost:4566 sqs list-queues
Add messages to the SQS Queue:
aws --endpoint http://localhost:4566 \
	sqs send-message \
	--queue-url http://localhost:4566/queue/task-queue \
	--message-body '{"msg": "hello"}'

or use sh ./gen_messages.sh to generate 100 messages and send it to a queue

Example of gracefully shutdown
    2020/05/20 15:02:36 [INFO] queue: job Message queue starting
    2020/05/20 15:02:36 [INFO] worker 0: starting
    2020/05/20 15:02:36 [INFO] worker 2: starting
    2020/05/20 15:02:36 [INFO] worker 1: starting
    ...
    2020/05/20 15:03:13 [DEBUG] worker 0: getting message from queue: 7a29e48e-cf9e-4e6b-a9aa-f31e6600f5e0
    {'msg': 'hello46'}
    2020/05/20 15:03:13 [DEBUG] worker 0: deleted message from queue: 7a29e48e-cf9e-4e6b-a9aa-f31e6600f5e0
    2020/05/20 15:03:13 [DEBUG] worker 0: processed job in: 9.999398ms
    2020/05/20 15:03:14 [DEBUG] worker 2: getting message from queue: 16d0df57-4a77-4a88-b83a-8f5eee2c4312
    {'msg': 'hello47'}
    2020/05/20 15:03:14 [DEBUG] worker 2: deleted message from queue: 16d0df57-4a77-4a88-b83a-8f5eee2c4312
    2020/05/20 15:03:14 [DEBUG] worker 2: processed job in: 12.541419ms

    #Stop via ^C

    Stopping sqs workers gracefully...
    2020/05/20 15:03:14 [DEBUG] queue: Stopping polling because a context kill signal was sent
    2020/05/20 15:03:14 [INFO] queue: job Message queue quitting.
    2020/05/20 15:03:14 [INFO] worker 0: quitting.
    2020/05/20 15:03:14 [INFO] worker 2: quitting.
    2020/05/20 15:03:14 [DEBUG] worker 1: getting message from queue: 5b1a52b0-34b6-4b63-9eb5-96d70db355c0
    {'msg': 'hello48'}
    2020/05/20 15:03:15 [DEBUG] worker 1: deleted message from queue: 5b1a52b0-34b6-4b63-9eb5-96d70db355c0
    2020/05/20 15:03:15 [DEBUG] worker 1: processed job in: 11.258625ms
    2020/05/20 15:03:15 [INFO] worker 1: quitting.

    Process finished with exit code 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// 每次获取最大的消息数量。默认 10 个
	MaxNumberOfMessage int64

	// SQS Queue URL 地址
	QueueURL string

	// 等待时间,单位秒。默认 20s
	// The duration (in seconds) for which the call waits for a message to arrive in the queue before returning.
	// If a message is available, the call returns sooner than WaitTimeSeconds.
	// If no messages are available and the wait time expires, the call returns successfully with an empty list of messages.
	// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-short-and-long-polling.html
	WaitTimeSecond int64

	// 消息可见超时时间,单位秒。默认 20s
	// The duration (in seconds) that the received messages are hidden from subsequent retrieve requests after being retrieved by a ReceiveMessage request.
	// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html
	VisibilityTimeout int64

	// Worker 大小,默认 1
	WorkerSize int64

	// Logger
	Logger Logger
}

Config 配置

type Handler

type Handler interface {
	HandleMessage(msg *sqs.Message) error
}

Handler interface

type HandlerFunc

type HandlerFunc func(msg *sqs.Message) error

HandlerFunc is used to define the Handler that is run on for each message

func (HandlerFunc) HandleMessage

func (f HandlerFunc) HandleMessage(msg *sqs.Message) error

HandleMessage wraps a function for handling sqs messages

type Logger

type Logger interface {
	Debug(args ...interface{})
	Info(args ...interface{})
	Error(args ...interface{})
}

Logger interface

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

A Worker is a individual processor of jobs from the job channel.

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

A WorkerPool provides a collection of workers, and access to their lifecycle.

func NewWorkerPool

func NewWorkerPool(ctx context.Context, conf *Config, sqsClient sqsiface.SQSAPI) *WorkerPool

NewWorkerPool creates a new instance of the worker pool, and creates all the workers in the pool. The workers are spun off in their own goroutines and the WorkerPool's wait group is used to know when the workers all completed their work and existed.

func (*WorkerPool) Run

func (w *WorkerPool) Run(handler Handler)

func (*WorkerPool) WaitForWorkersDone

func (w *WorkerPool) WaitForWorkersDone()

WaitForWorkersDone waits for the works to of all completed their work and exited.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL