msgqueue

package module
v1.3.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 8, 2017 License: BSD-2-Clause Imports: 18 Imported by: 0

README

Golang task/job queue with in-memory, SQS, IronMQ backends Build Status

Installation

go get -u github.com/go-msgqueue/msgqueue

Features

  • SQS, IronMQ, and in-memory backends.
  • Queue processor can be run on separate server.
  • Rate limiting.
  • Global limit of workers.
  • Call once.
  • Automatic retries with exponential backoffs.
  • Automatic pausing when all messages in queue fail.
  • Fallback handler for processing failed messages.
  • Message batching. It is used in SQS and IronMQ backends to add/delete messages in batches.
  • Statistics.

Design overview

go-msgqueue is a thin wrapper for SQS and IronMQ clients that uses Redis to implement rate limiting and call once semantic.

go-msgqueue consists of following components:

  • memqueue - in memory queue that can be used for local unit testing.
  • azsqs - Amazon SQS backend.
  • ironmq - IronMQ backend.
  • Manager - provides common interface for creating new queues.
  • Processor - queue processor that works with memqueue, azsqs, and ironmq.

rate limiting is implemented in the processor package using redis_rate. Call once is implemented in clients by checking if message name exists in Redis database.

API overview

import "github.com/go-msgqueue/msgqueue"
import "github.com/go-redis/redis"
import "golang.org/x/time/rate"

// Create in-memory queue that prints greetings.
q := memqueue.NewQueue(&msgqueue.Options{
    // Handler is automatically retried on error.
    Handler: func(name string) error {
        fmt.Println("Hello", name)
        return nil
    },

    RateLimit: rate.Every(time.Second),

    // Redis is only needed for rate limiting and call once.
    Redis: redis.NewClient(&redis.Options{
        Addr: ":6379",
    }),
})

// Invoke handler with arguments.
q.Call("World")

// Same using Message API.
q.Add(msgqueue.NewMessage("World"))

// Say "Hello World" with 1 hour delay.
msg := msgqueue.NewMessage("World")
msg.Delay = time.Hour
q.Add(msg)

// Say "Hello World" once.
for i := 0; i < 100; i++ {
    msg := msgqueue.NewMessage("hello")
    msg.Name = "hello-world"
    q.Add(msg)
}

// Say "Hello World" once with 1 hour delay.
for i := 0; i < 100; i++ {
    msg := msgqueue.NewMessage("hello")
    msg.Name = "hello-world"
    msg.Delay = time.Hour
    q.Add(msg)
}

// Say "Hello World" once in an hour.
for i := 0; i < 100; i++ {
    q.CallOnce(time.Hour, "hello")
}

// Say "Hello World" for Europe region once in an hour.
for i := 0; i < 100; i++ {
    msg := msgqueue.NewMessage("hello")
    msg.SetDelayName(delay, "europe") // set delay and autogenerate message name
    q.Add(msg)
}

SQS, IronMQ, and in-memory queues

SQS, IronMQ, and memqueue share the same API and can be used interchangeably.

SQS

azsqs package uses Amazon Simple Queue Service as queue backend.

import "github.com/go-msgqueue/msgqueue"
import "github.com/go-msgqueue/msgqueue/azsqs"
import "github.com/aws/aws-sdk-go/service/sqs"

// Create queue.
awsAccountId := "123456789"
q := azsqs.NewQueue(awsSQS(), awsAccountId, &msgqueue.Options{
    Name: "sqs-queue-name",
    Handler: func(name string) error {
        fmt.Println("Hello", name)
        return nil
    },
})

// Same using Manager.
man := azsqs.NewManager(awsSQS(), accountId)
q := man.NewQueue(&msgqueue.Options{...})

// Add message.
q.Call("World")

// Start processing queue.
p := q.Processor()
p.Start()

// Stop processing.
p.Stop()
IronMQ

ironmq package uses IronMQ as queue backend.

import "github.com/go-msgqueue/msgqueue"
import "github.com/go-msgqueue/msgqueue/ironmq"
import "github.com/iron-io/iron_go3/mq"

// Create queue.
q := ironmq.NewQueue(mq.New("ironmq-queue-name"), &msgqueue.Options{
    Handler: func(name string) error {
        fmt.Println("Hello", name)
        return nil
    },
})

// Same using manager.
cfg := iron_config.Config("iron_mq")
man := ironmq.NewManager(&cfg)
q := man.NewQueue(&msgqueue.Options{...})

// Add message.
q.Call("World")

// Start processing queue.
p := q.Processor()
p.Start()

// Stop processing.
p.Stop()
In-memory

memqueue is in-memory queue backend implementation primarily useful for local development / unit testing. Unlike SQS and IronMQ it has running queue processor by default.

import "github.com/go-msgqueue/msgqueue"

// Create queue.
q := memqueue.NewQueue(&msgqueue.Options{
    Handler: func(name string) error {
        fmt.Println("Hello", name)
        return nil
    },
})

// Same using Manager.
man := memqueue.NewManager()
q := man.NewQueue(&msgqueue.Options{...})

// Stop processor if you don't need it.
p := q.Processor()
p.Stop()

// Process one message.
err := p.ProcessOne()

// Process all buffered messages.
err := p.ProcessAll()

Custom message delay

If error returned by handler implements Delay() time.Duration that delay is used to postpone message processing.

type RateLimitError string

func (e RateLimitError) Error() string {
    return string(e)
}

func (RateLimitError) Delay() time.Duration {
    return time.Hour
}

func handler() error {
    return RateLimitError("calm down")
}

q := memqueue.NewQueue(&msgqueue.Options{
    Handler: handler,
})

Stats

You can log local queue stats using following code:

func LogQueueStats(q msgqueue.Queue) {
    p := q.Processor()
    opt := p.Options()

    var old *msgqueue.ProcessorStats
    for _ = range time.Tick(3 * time.Second) {
        st := p.Stats()
        if st == nil {
            break
        }

        if old != nil && st.Processed == old.Processed &&
            st.Fails == old.Fails &&
            st.Retries == old.Retries {
            continue
        }
        old = st

        glog.Infof(
            "%s: buffered=%d/%d in_flight=%d/%d "+
                "processed=%d fails=%d retries=%d "+
                "avg_dur=%s min_dur=%s max_dur=%s",
            q, st.Buffered, opt.BufferSize, st.InFlight, opt.WorkerNumber,
            st.Processed, st.Fails, st.Retries,
            st.AvgDuration, st.MinDuration, st.MaxDuration,
        )
    }
}

go LogQueueStats(myqueue)

which will log something like this

Memqueue<Name=v1-production-notices-add>: buffered=0/1000 in_flight=3/16 processed=16183872 fails=0 retries=0 avg_dur=44.8ms min_dur=100µs max_dur=5.102s
Memqueue<Name=v1-production-notices-add>: buffered=0/1000 in_flight=8/16 processed=16184022 fails=0 retries=0 avg_dur=42ms min_dur=100µs max_dur=5.102s

Documentation

Overview

Package msgqueue implements task/job queue with in-memory, SQS, IronMQ backends.

go-msgqueue is a thin wrapper for SQS and IronMQ clients that uses Redis to implement rate limiting and call once semantic.

go-msgqueue consists of following components:

  • memqueue - in memory queue that can be used for local unit testing.
  • azsqs - Amazon SQS backend.
  • ironmq - IronMQ backend.
  • Manager - provides common interface for creating new queues.
  • Processor - queue processor that works with memqueue, azsqs, and ironmq.

rate limiting is implemented in the processor package using https://github.com/go-redis/redis_rate. Call once is implemented in clients by checking if message name exists in Redis database.

Example (CustomRateLimit)
package main

import (
	"fmt"
	"time"

	"github.com/go-msgqueue/msgqueue"
	"github.com/go-msgqueue/msgqueue/memqueue"
)

type RateLimitError string

func (e RateLimitError) Error() string {
	return string(e)
}

func (RateLimitError) Delay() time.Duration {
	return 3 * time.Second
}

func main() {
	start := time.Now()
	q := memqueue.NewQueue(&msgqueue.Options{
		Handler: func() error {
			fmt.Println("retried in", timeSince(start))
			return RateLimitError("calm down")
		},
		RetryLimit: 2,
		MinBackoff: time.Millisecond,
	})

	q.Call()

	// Wait for all messages to be processed.
	_ = q.Close()

}
Output:

retried in 0s
retried in 3s
Example (MaxWorkers)
start := time.Now()
q := memqueue.NewQueue(&msgqueue.Options{
	Handler: func() {
		fmt.Println(timeSince(start))
		time.Sleep(time.Second)
	},
	Redis:       redisRing(),
	WorkerLimit: 1,
})

for i := 0; i < 3; i++ {
	q.Call()
}

// Wait for all messages to be processed.
_ = q.Close()
Output:

0s
1s
2s
Example (MessageDelay)
package main

import (
	"fmt"
	"math"
	"time"

	"github.com/go-msgqueue/msgqueue"
	"github.com/go-msgqueue/msgqueue/memqueue"
)

func timeSince(start time.Time) time.Duration {
	secs := float64(time.Since(start)) / float64(time.Second)
	return time.Duration(math.Floor(secs)) * time.Second
}

func main() {
	start := time.Now()
	q := memqueue.NewQueue(&msgqueue.Options{
		Handler: func() {
			fmt.Println("processed with delay", timeSince(start))
		},
	})

	msg := msgqueue.NewMessage()
	msg.Delay = time.Second
	q.Add(msg)

	// Wait for all messages to be processed.
	_ = q.Close()

}
Output:

processed with delay 1s
Example (Once)
q := memqueue.NewQueue(&msgqueue.Options{
	Handler: func(name string) {
		fmt.Println("hello", name)
	},
	Redis:     redisRing(),
	RateLimit: rate.Every(time.Second),
})

for i := 0; i < 10; i++ {
	// Call once in a second.
	q.CallOnce(time.Second, "world")
}

// Wait for all messages to be processed.
_ = q.Close()
Output:

hello world
Example (RateLimit)
start := time.Now()
q := memqueue.NewQueue(&msgqueue.Options{
	Handler:   func() {},
	Redis:     redisRing(),
	RateLimit: rate.Every(time.Second),
})

const n = 5
for i := 0; i < n; i++ {
	q.Call()
}

// Wait for all messages to be processed.
_ = q.Close()

fmt.Printf("%d msg/s", timeSinceCeil(start)/time.Second/n)
Output:

1 msg/s
Example (RetryOnError)
package main

import (
	"errors"
	"fmt"
	"math"
	"time"

	"github.com/go-msgqueue/msgqueue"
	"github.com/go-msgqueue/msgqueue/memqueue"
)

func timeSince(start time.Time) time.Duration {
	secs := float64(time.Since(start)) / float64(time.Second)
	return time.Duration(math.Floor(secs)) * time.Second
}

func main() {
	start := time.Now()
	q := memqueue.NewQueue(&msgqueue.Options{
		Handler: func() error {
			fmt.Println("retried in", timeSince(start))
			return errors.New("fake error")
		},
		RetryLimit: 3,
		MinBackoff: time.Second,
	})

	q.Call()

	// Wait for all messages to be processed.
	_ = q.Close()

}
Output:

retried in 0s
retried in 1s
retried in 3s

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrDuplicate = errors.New("queue: message with such name already exists")

Functions

func SetLogger added in v1.3.0

func SetLogger(logger *log.Logger)

Types

type Batcher added in v1.3.0

type Batcher struct {
	// contains filtered or unexported fields
}

Batcher collects messages for later batch processing.

func NewBatcher added in v1.3.0

func NewBatcher(p *Processor, opt *BatcherOptions) *Batcher

func (*Batcher) Add added in v1.3.0

func (b *Batcher) Add(msg *Message) error

func (*Batcher) Close added in v1.3.0

func (b *Batcher) Close() error

func (*Batcher) SetSync added in v1.3.0

func (b *Batcher) SetSync(v bool)

type BatcherOptions added in v1.3.0

type BatcherOptions struct {
	Worker   func([]*Message) error
	Splitter func([]*Message) ([]*Message, []*Message)

	RetryLimit int
	Timeout    time.Duration
}

type Delayer added in v1.3.0

type Delayer interface {
	Delay() time.Duration
}

type Handler

type Handler interface {
	HandleMessage(msg *Message) error
}

func NewHandler

func NewHandler(fn interface{}) Handler

type HandlerFunc

type HandlerFunc func(*Message) error

func (HandlerFunc) HandleMessage

func (fn HandlerFunc) HandleMessage(msg *Message) error

type Manager added in v1.3.0

type Manager interface {
	NewQueue(*Options) Queue
	Queues() []Queue
}

Manager is an interface that abstracts creation of new queues. It is implemented in subpackages memqueue, azsqs, and ironmq.

type Message

type Message struct {
	// SQS/IronMQ message id.
	Id string

	// Optional name for the message. Messages with the same name
	// are processed only once.
	Name string

	// Delay specifies the duration the queue must wait
	// before executing the message.
	Delay time.Duration

	// Function args passed to the handler.
	Args []interface{}

	// Text representation of the Args.
	Body string

	// SQS/IronMQ reservation id that is used to release/delete the message..
	ReservationId string

	// The number of times the message has been reserved or released.
	ReservedCount int

	Err error
	// contains filtered or unexported fields
}

Message is used to create and retrieve messages from a queue.

func NewMessage

func NewMessage(args ...interface{}) *Message

func (*Message) GetBody added in v1.3.0

func (m *Message) GetBody() (string, error)

func (*Message) SetDelayName

func (m *Message) SetDelayName(delay time.Duration, args ...interface{})

SetDelayName sets delay and generates message name from the args.

func (*Message) String

func (m *Message) String() string

type Options

type Options struct {
	// Queue name.
	Name string
	// Queue group name.
	GroupName string

	// Function called to process a message.
	Handler interface{}
	// Function called to process failed message.
	FallbackHandler interface{}

	// Number of worker goroutines processing messages.
	// Default is 4 * number of CPUs.
	WorkerNumber int
	// Global limit of concurrently running workers. Overrides WorkerNumber.
	WorkerLimit int

	// Size of the buffer where reserved messages are stored.
	// Default is the same as WorkerNumber.
	BufferSize int

	// Number of messages reserved in the queue in 1 request.
	// Default is 10.
	ReservationSize int
	// Time after which the reserved message is returned to the queue.
	ReservationTimeout time.Duration
	// Time that a long polling receive call waits for a message to become
	// available before returning an empty response.
	// Default is 5 seconds.
	WaitTimeout time.Duration

	// Number of tries/releases after which the message fails permanently
	// and is deleted.
	RetryLimit int
	// Minimum backoff time between retries.
	MinBackoff time.Duration
	// Maximum backoff time between retries.
	MaxBackoff time.Duration

	// Number of consecutive failures after which queue processing is paused.
	// Default is 100 failures.
	PauseErrorsThreshold int

	// Processing rate limit.
	RateLimit rate.Limit

	// Redis client that is used for storing metadata.
	Redis Redis

	// Optional storage interface. The default is to use Redis.
	Storage Storage

	// Optional rate limiter interface. The default is to use Redis.
	RateLimiter RateLimiter
	// contains filtered or unexported fields
}

func (*Options) Init

func (opt *Options) Init()

type Processor added in v1.3.0

type Processor struct {
	// contains filtered or unexported fields
}

Processor reserves messages from the queue, processes them, and then either releases or deletes messages from the queue.

func NewProcessor added in v1.3.0

func NewProcessor(q Queue, opt *Options) *Processor

New creates new Processor for the queue using provided processing options.

func StartProcessor added in v1.3.0

func StartProcessor(q Queue, opt *Options) *Processor

Starts creates new Processor and starts it.

func (*Processor) Add added in v1.3.0

func (p *Processor) Add(msg *Message) error

func (*Processor) Options added in v1.3.0

func (p *Processor) Options() *Options

func (*Processor) Process added in v1.3.0

func (p *Processor) Process(msg *Message) error

Process is low-level API to process message bypassing the internal queue.

func (*Processor) ProcessAll added in v1.3.0

func (p *Processor) ProcessAll() error

ProcessAll starts workers to process messages in the queue and then stops them when all messages are processed.

func (*Processor) ProcessOne added in v1.3.0

func (p *Processor) ProcessOne() error

ProcessOne processes at most one message in the queue.

func (*Processor) Purge added in v1.3.0

func (p *Processor) Purge() error

Purge discards messages from the internal queue.

func (*Processor) Put added in v1.3.6

func (p *Processor) Put(msg *Message) error

func (*Processor) Queue added in v1.3.0

func (p *Processor) Queue() Queue

func (*Processor) Start added in v1.3.0

func (p *Processor) Start() error

Start starts processing messages in the queue.

func (*Processor) Stats added in v1.3.0

func (p *Processor) Stats() *ProcessorStats

Stats returns processor stats.

func (*Processor) Stop added in v1.3.0

func (p *Processor) Stop() error

Stop is StopTimeout with 30 seconds timeout.

func (*Processor) StopTimeout added in v1.3.0

func (p *Processor) StopTimeout(timeout time.Duration) error

StopTimeout waits workers for timeout duration to finish processing current messages and stops workers.

func (*Processor) String added in v1.3.0

func (p *Processor) String() string

type ProcessorStats added in v1.3.0

type ProcessorStats struct {
	Buffered    uint32
	InFlight    uint32
	Processed   uint32
	Retries     uint32
	Fails       uint32
	AvgDuration time.Duration
	MinDuration time.Duration
	MaxDuration time.Duration
}

type Queue added in v1.3.0

type Queue interface {
	Name() string
	Processor() *Processor
	Add(msg *Message) error
	Call(args ...interface{}) error
	CallOnce(dur time.Duration, args ...interface{}) error
	ReserveN(n int) ([]*Message, error)
	Release(*Message) error
	Delete(msg *Message) error
	Purge() error
	Close() error
	CloseTimeout(timeout time.Duration) error
}

type RateLimiter

type RateLimiter interface {
	AllowRate(name string, limit rate.Limit) (delay time.Duration, allow bool)
}

type Redis

type Redis interface {
	Del(keys ...string) *redis.IntCmd
	SetNX(key string, value interface{}, expiration time.Duration) *redis.BoolCmd
	SAdd(key string, members ...interface{}) *redis.IntCmd
	SMembers(key string) *redis.StringSliceCmd
	Pipelined(func(pipe redis.Pipeliner) error) ([]redis.Cmder, error)
	Eval(script string, keys []string, args ...interface{}) *redis.Cmd
	Publish(channel, message string) *redis.IntCmd
}

type Storage

type Storage interface {
	Exists(key string) bool
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL