msgqueue

package module
v1.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 13, 2017 License: BSD-2-Clause Imports: 12 Imported by: 0

README

SQS & IronMQ clients with rate limiting and call once Build Status

Installation

go get -u github.com/go-msgqueue/msgqueue

Features

  • SQS, IronMQ, and in-memory clients.
  • Queue processor can be run on separate server.
  • Rate limiting.
  • Call once.
  • Automatic retries with exponential backoffs.
  • Automatic pausing when all messages in queue fail.
  • Fallback handler for processing failed messages.
  • Processed messages are deleted in batches.
  • Statistics.

Design overview

go-msgqueue is a thin wrapper for SQS and IronMQ clients that uses Redis to implement rate limiting and call once semantic.

go-msgqueue consists of following packages:

  • memqueue - in memory queue that can be used for local unit testing.
  • azsqs - Amazon SQS client.
  • ironmq - IronMQ client.
  • processor - queue processor that works with memqueue, azsqs, and ironmq.

rate limiting is implemented in the processor package using go-redis rate. Call once is implemented in the clients by checking if key that consists of message name exists in Redis database.

API overview

import "github.com/go-msgqueue/msgqueue"
import "github.com/go-redis/redis"
import timerate "golang.org/x/time/rate"

// Create in-memory queue that prints greetings.
q := memqueue.NewQueue(&msgqueue.Options{
    // Handler is automatically retried on error.
    Handler: func(name string) error {
        fmt.Println("Hello", name)
        return nil
    },

    RateLimit: timerate.Every(time.Second),

    // Redis is only needed for rate limiting and call once.
    Redis: redis.NewClient(&redis.Options{
        Addr: ":6379",
    }),
})

// Invoke handler with arguments.
q.Call("World")

// Same using Message API.
q.Add(msgqueue.NewMessage("World"))

// Say "Hello World" with 1 hour delay.
msg := msgqueue.NewMessage("World")
msg.Delay = time.Hour
q.Add(msg)

// Say "Hello World" only once.
for i := 0; i < 100; i++ {
    msg := msgqueue.NewMessage("hello")
    msg.Name = "hello-world"
    q.Add(msg)
}

// Say "Hello World" only once with 1 hour delay.
for i := 0; i < 100; i++ {
    msg := msgqueue.NewMessage("hello")
    msg.Name = "hello-world"
    msg.Delay = time.Hour
    q.Add(msg)
}

// Same using CallOnce.
for i := 0; i < 100; i++ {
    q.CallOnce(time.Hour, "hello")
}

// Say "Hello World" for Europe region only once with 1 hour delay.
for i := 0; i < 100; i++ {
    msg := msgqueue.NewMessage("hello")
    msg.SetDelayName(delay, "europe") // set delay & autogenerate message name
    q.Add(msg)
}

SQS & IronMQ & in-memory queues

SQS, IronMQ, and memqueue share the same API and can be used interchangeably.

SQS

azsqs package uses Amazon Simple Queue Service as queue backend.

import "github.com/go-msgqueue/msgqueue"
import "github.com/go-msgqueue/msgqueue/azsqs"
import "github.com/aws/aws-sdk-go/service/sqs"

awsAccountId := "123456789"
q := azsqs.NewQueue(awsSQS(), awsAccountId, &msgqueue.Options{
    Name: "sqs-queue-name",
    Handler: func(name string) error {
        fmt.Println("Hello", name)
        return nil
    },
})

// Add message.
q.Call("World")

// Start processing queue.
p := q.Processor()
p.Start()

// Stop processing.
p.Stop()
IronMQ

ironmq package uses IronMQ as queue backend.

import "github.com/go-msgqueue/msgqueue"
import "github.com/go-msgqueue/msgqueue/ironmq"
import "github.com/iron-io/iron_go3/mq"

q := ironmq.NewQueue(mq.New("ironmq-queue-name"), &msgqueue.Options{
    Handler: func(name string) error {
        fmt.Println("Hello", name)
        return nil
    },
})

// Add message.
q.Call("World")

// Start processing queue.
p := q.Processor()
p.Start()

// Stop processing.
p.Stop()
In-memory

memqueue is in-memory queue backend implementation primarily useful for local development / unit testing. Unlike SQS and IronMQ it has running queue processor by default.

import "github.com/go-msgqueue/msgqueue"

q := memqueue.NewQueue(&msgqueue.Options{
    Handler: func(name string) error {
        fmt.Println("Hello", name)
        return nil
    },
})

// Stop processor if you don't need it.
p := q.Processor()
p.Stop()

// Process one message.
err := p.ProcessOne()

// Process all buffered messages.
err := p.ProcessAll()

Custom message delay

If error returned by handler implements Delay() time.Duration that delay is used to postpone message processing.

type RateLimitError string

func (e RateLimitError) Error() string {
    return string(e)
}

func (RateLimitError) Delay() time.Duration {
    return time.Hour
}

func handler() error {
    return RateLimitError("calm down")
}

q := memqueue.NewQueue(&msgqueue.Options{
    Handler: handler,
})

Documentation

Overview

Package msgqueue implements a SQS & IronMQ client with rate limiting and call once.

Example (CustomRateLimit)
package main

import (
	"fmt"
	"time"

	"github.com/go-msgqueue/msgqueue"
	"github.com/go-msgqueue/msgqueue/memqueue"
)

type RateLimitError string

func (e RateLimitError) Error() string {
	return string(e)
}

func (RateLimitError) Delay() time.Duration {
	return 3 * time.Second
}

func main() {
	start := time.Now()
	q := memqueue.NewQueue(&msgqueue.Options{
		Handler: func() error {
			fmt.Println("retried in", timeSince(start))
			return RateLimitError("calm down")
		},
		RetryLimit: 2,
		MinBackoff: time.Millisecond,
	})
	defer q.Close()
	q.Processor().Stop()

	q.Call()
	q.Processor().ProcessAll()
}
Output:

retried in 0s
retried in 3s
Example (MaxWorkers)
package main

import (
	"fmt"
	"math"
	"time"

	"github.com/go-msgqueue/msgqueue"
	"github.com/go-msgqueue/msgqueue/memqueue"

	"github.com/go-redis/redis"
)

func redisRing() *redis.Ring {
	ring := redis.NewRing(&redis.RingOptions{
		Addrs:    map[string]string{"0": ":6379"},
		PoolSize: 100,
	})
	err := ring.FlushDb().Err()
	if err != nil {
		panic(err)
	}
	return ring
}

func timeSince(start time.Time) time.Duration {
	secs := float64(time.Since(start)) / float64(time.Second)
	return time.Duration(math.Floor(secs)) * time.Second
}

func main() {
	start := time.Now()
	q := memqueue.NewQueue(&msgqueue.Options{
		Handler: func() {
			fmt.Println(timeSince(start))
			time.Sleep(time.Second)
		},
		Redis:      redisRing(),
		MaxWorkers: 1,
	})

	for i := 0; i < 3; i++ {
		q.Call()
	}

	// Close queue to make sure all messages are processed.
	_ = q.Close()

}
Output:

0s
1s
2s
Example (MessageDelay)
package main

import (
	"fmt"
	"math"
	"time"

	"github.com/go-msgqueue/msgqueue"
	"github.com/go-msgqueue/msgqueue/memqueue"
)

func timeSince(start time.Time) time.Duration {
	secs := float64(time.Since(start)) / float64(time.Second)
	return time.Duration(math.Floor(secs)) * time.Second
}

func main() {
	start := time.Now()
	q := memqueue.NewQueue(&msgqueue.Options{
		Handler: func() {
			fmt.Println("processed with delay", timeSince(start))
		},
	})
	defer q.Close()
	q.Processor().Stop()

	msg := msgqueue.NewMessage()
	msg.Delay = time.Second
	q.Add(msg)
	q.Processor().ProcessAll()
}
Output:

processed with delay 1s
Example (Once)
package main

import (
	"fmt"
	"time"

	"github.com/go-msgqueue/msgqueue"
	"github.com/go-msgqueue/msgqueue/memqueue"

	"github.com/go-redis/redis"
	timerate "golang.org/x/time/rate"
)

func redisRing() *redis.Ring {
	ring := redis.NewRing(&redis.RingOptions{
		Addrs:    map[string]string{"0": ":6379"},
		PoolSize: 100,
	})
	err := ring.FlushDb().Err()
	if err != nil {
		panic(err)
	}
	return ring
}

func main() {
	q := memqueue.NewQueue(&msgqueue.Options{
		Handler: func(name string) {
			fmt.Println("hello", name)
		},
		Redis:     redisRing(),
		RateLimit: timerate.Every(time.Second),
	})

	for _, name := range []string{"world", "adele"} {
		for i := 0; i < 10; i++ {
			// Call once in a second.
			q.CallOnce(time.Second, name)
		}
	}

	// Close queue to make sure all messages are processed.
	_ = q.Close()

}
Output:

hello world
hello adele
Example (RateLimit)
package main

import (
	"fmt"
	"math"
	"time"

	"github.com/go-msgqueue/msgqueue"
	"github.com/go-msgqueue/msgqueue/memqueue"

	"github.com/go-redis/redis"
	timerate "golang.org/x/time/rate"
)

func redisRing() *redis.Ring {
	ring := redis.NewRing(&redis.RingOptions{
		Addrs:    map[string]string{"0": ":6379"},
		PoolSize: 100,
	})
	err := ring.FlushDb().Err()
	if err != nil {
		panic(err)
	}
	return ring
}

func timeSince(start time.Time) time.Duration {
	secs := float64(time.Since(start)) / float64(time.Second)
	return time.Duration(math.Floor(secs)) * time.Second
}

func main() {
	start := time.Now()
	q := memqueue.NewQueue(&msgqueue.Options{
		Handler: func() {
			fmt.Println(timeSince(start))
		},
		Redis:     redisRing(),
		RateLimit: timerate.Every(time.Second),
	})

	for i := 0; i < 3; i++ {
		q.Call()
	}

	// Close queue to make sure all messages are processed.
	_ = q.Close()

}
Output:

0s
0s
1s
Example (RetryOnError)
package main

import (
	"errors"
	"fmt"
	"math"
	"time"

	"github.com/go-msgqueue/msgqueue"
	"github.com/go-msgqueue/msgqueue/memqueue"
)

func timeSince(start time.Time) time.Duration {
	secs := float64(time.Since(start)) / float64(time.Second)
	return time.Duration(math.Floor(secs)) * time.Second
}

func main() {
	start := time.Now()
	q := memqueue.NewQueue(&msgqueue.Options{
		Handler: func() error {
			fmt.Println("retried in", timeSince(start))
			return errors.New("fake error")
		},
		RetryLimit: 3,
		MinBackoff: time.Second,
	})
	defer q.Close()
	q.Processor().Stop()

	q.Call()
	q.Processor().ProcessAll()
}
Output:

retried in 0s
retried in 1s
retried in 3s

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrDuplicate = errors.New("queue: message with such name already exists")

Functions

This section is empty.

Types

type Handler

type Handler interface {
	HandleMessage(msg *Message) error
}

func NewHandler

func NewHandler(fn interface{}) Handler

type HandlerFunc

type HandlerFunc func(*Message) error

func (HandlerFunc) HandleMessage

func (fn HandlerFunc) HandleMessage(msg *Message) error

type Message

type Message struct {
	// SQS/IronMQ message id.
	Id string

	// Optional name for the message. Messages with the same name
	// are processed only once.
	Name string

	// Delay specifies the duration the queue must wait
	// before executing the message.
	Delay time.Duration

	// Function args passed to the handler.
	Args []interface{}

	// Text representation of the Args.
	Body string

	// SQS/IronMQ reservation id that is used to release/delete the message..
	ReservationId string

	// The number of times the message has been reserved or released.
	ReservedCount int
}

Message is used to create and retrieve messages from a queue.

func NewMessage

func NewMessage(args ...interface{}) *Message

func (*Message) MarshalArgs

func (m *Message) MarshalArgs() (string, error)

func (*Message) SetDelayName

func (m *Message) SetDelayName(delay time.Duration, args ...interface{})

SetDelayName sets delay and generates message name from the args.

func (*Message) String

func (m *Message) String() string

type Options

type Options struct {
	// Queue name.
	Name string
	// Queue group name.
	GroupName string

	// Function called to process a message.
	Handler interface{}
	// Function called to process failed message.
	FallbackHandler interface{}

	// Number of goroutines processing messages.
	WorkerNumber int
	// Global max number of workers which overrides WorkerNumber.
	MaxWorkers int

	// Size of the buffer where reserved messages are stored.
	BufferSize int

	// Time after which the reserved message is returned to the queue.
	ReservationTimeout time.Duration

	// Number of tries/releases after which the message fails permanently
	// and is deleted.
	RetryLimit int

	// Minimum time between retries.
	MinBackoff time.Duration

	// Processing rate limit.
	RateLimit timerate.Limit

	// Redis client that is used for storing metadata.
	Redis Redis

	// Optional storage interface. The default is to use Redis.
	Storage Storage

	// Optional rate limiter interface. The default is to use Redis.
	RateLimiter RateLimiter
	// contains filtered or unexported fields
}

func (*Options) Init

func (opt *Options) Init()

type RateLimiter

type RateLimiter interface {
	AllowRate(name string, limit timerate.Limit) (delay time.Duration, allow bool)
}

type Redis

type Redis interface {
	Del(keys ...string) *redis.IntCmd
	SetNX(key string, value interface{}, expiration time.Duration) *redis.BoolCmd
	SAdd(key string, members ...interface{}) *redis.IntCmd
	SMembers(key string) *redis.StringSliceCmd
	Pipelined(func(pipe *redis.Pipeline) error) ([]redis.Cmder, error)
	Eval(script string, keys []string, args ...interface{}) *redis.Cmd
	Publish(channel, message string) *redis.IntCmd
}

type Storage

type Storage interface {
	Exists(key string) bool
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL