README ¶
Golang task/job queue with in-memory, SQS, IronMQ backends
Installation
go get -u github.com/go-msgqueue/msgqueue
Features
- SQS, IronMQ, and in-memory backends.
- Queue processor can be run on separate server.
- Automatically scaling number of goroutines used to fetch and process messages.
- Rate limiting.
- Global limit of workers.
- Call once.
- Automatic retries with exponential backoffs.
- Automatic pausing when all messages in queue fail.
- Fallback handler for processing failed messages.
- Message batching. It is used in SQS and IronMQ backends to add/delete messages in batches.
- Message compression using Snappy compressor via
Options.Compress
. - Statistics.
Design overview
go-msgqueue is a thin wrapper for SQS and IronMQ clients that uses Redis to implement rate limiting and call once semantic.
go-msgqueue consists of following components:
- memqueue - in memory queue that can be used for local unit testing.
- azsqs - Amazon SQS backend.
- ironmq - IronMQ backend.
- Manager - provides common interface for creating new queues.
- Processor - queue processor that works with memqueue, azsqs, and ironmq.
rate limiting is implemented in the processor package using redis_rate. Call once is implemented in clients by checking if message name exists in Redis database.
API overview
import "github.com/go-msgqueue/msgqueue"
import "github.com/go-redis/redis"
import "golang.org/x/time/rate"
// Create in-memory queue that prints greetings.
q := memqueue.NewQueue(&msgqueue.Options{
// Handler is automatically retried on error.
Handler: func(name string) error {
fmt.Println("Hello", name)
return nil
},
RateLimit: rate.Every(time.Second),
// Redis is only needed for rate limiting and call once.
Redis: redis.NewClient(&redis.Options{
Addr: ":6379",
}),
})
// Invoke handler with arguments.
q.Call("World")
// Same using Message API.
q.Add(msgqueue.NewMessage("World"))
// Say "Hello World" with 1 hour delay.
msg := msgqueue.NewMessage("World")
msg.Delay = time.Hour
q.Add(msg)
// Say "Hello World" once.
for i := 0; i < 100; i++ {
msg := msgqueue.NewMessage("hello")
msg.Name = "hello-world"
q.Add(msg)
}
// Say "Hello World" once with 1 hour delay.
for i := 0; i < 100; i++ {
msg := msgqueue.NewMessage("hello")
msg.Name = "hello-world"
msg.Delay = time.Hour
q.Add(msg)
}
// Say "Hello World" once in an hour.
for i := 0; i < 100; i++ {
q.CallOnce(time.Hour, "hello")
}
// Say "Hello World" for Europe region once in an hour.
for i := 0; i < 100; i++ {
msg := msgqueue.NewMessage("hello")
msg.SetDelayName(delay, "europe") // set delay and autogenerate message name
q.Add(msg)
}
SQS, IronMQ, and in-memory queues
SQS, IronMQ, and memqueue share the same API and can be used interchangeably.
SQS
azsqs package uses Amazon Simple Queue Service as queue backend.
import "github.com/go-msgqueue/msgqueue"
import "github.com/go-msgqueue/msgqueue/azsqs"
import "github.com/aws/aws-sdk-go/service/sqs"
// Create queue.
awsAccountId := "123456789"
q := azsqs.NewQueue(awsSQS(), awsAccountId, &msgqueue.Options{
Name: "sqs-queue-name",
Handler: func(name string) error {
fmt.Println("Hello", name)
return nil
},
})
// Same using Manager.
man := azsqs.NewManager(awsSQS(), accountId)
q := man.NewQueue(&msgqueue.Options{...})
// Add message.
q.Call("World")
// Start processing queue.
p := q.Processor()
p.Start()
// Stop processing.
p.Stop()
IronMQ
ironmq package uses IronMQ as queue backend.
import "github.com/go-msgqueue/msgqueue"
import "github.com/go-msgqueue/msgqueue/ironmq"
import "github.com/iron-io/iron_go3/mq"
// Create queue.
q := ironmq.NewQueue(mq.New("ironmq-queue-name"), &msgqueue.Options{
Handler: func(name string) error {
fmt.Println("Hello", name)
return nil
},
})
// Same using manager.
cfg := iron_config.Config("iron_mq")
man := ironmq.NewManager(&cfg)
q := man.NewQueue(&msgqueue.Options{...})
// Add message.
q.Call("World")
// Start processing queue.
p := q.Processor()
p.Start()
// Stop processing.
p.Stop()
In-memory
memqueue is in-memory queue backend implementation primarily useful for local development / unit testing. Unlike SQS and IronMQ it has running queue processor by default.
import "github.com/go-msgqueue/msgqueue"
// Create queue.
q := memqueue.NewQueue(&msgqueue.Options{
Handler: func(name string) error {
fmt.Println("Hello", name)
return nil
},
})
// Same using Manager.
man := memqueue.NewManager()
q := man.NewQueue(&msgqueue.Options{...})
// Stop processor if you don't need it.
p := q.Processor()
p.Stop()
// Process one message.
err := p.ProcessOne()
// Process all buffered messages.
err := p.ProcessAll()
Custom message delay
If error returned by handler implements Delay() time.Duration
that delay is used to postpone message processing.
type RateLimitError string
func (e RateLimitError) Error() string {
return string(e)
}
func (RateLimitError) Delay() time.Duration {
return time.Hour
}
func handler() error {
return RateLimitError("calm down")
}
q := memqueue.NewQueue(&msgqueue.Options{
Handler: handler,
})
Stats
You can log local queue stats using following code:
func LogQueueStats(q msgqueue.Queue) {
p := q.Processor()
opt := p.Options()
var old *msgqueue.ProcessorStats
for _ = range time.Tick(3 * time.Second) {
st := p.Stats()
if st == nil {
break
}
if old != nil && st.Processed == old.Processed &&
st.Fails == old.Fails &&
st.Retries == old.Retries {
continue
}
old = st
glog.Infof(
"%s: buffered=%d/%d in_flight=%d/%d "+
"processed=%d fails=%d retries=%d "+
"avg_dur=%s min_dur=%s max_dur=%s",
q, st.Buffered, opt.BufferSize, st.InFlight, opt.WorkerNumber,
st.Processed, st.Fails, st.Retries,
st.AvgDuration, st.MinDuration, st.MaxDuration,
)
}
}
go LogQueueStats(myqueue)
which will log something like this
Memqueue<Name=v1-production-notices-add>: buffered=0/1000 in_flight=3/16 processed=16183872 fails=0 retries=0 avg_dur=44.8ms min_dur=100µs max_dur=5.102s
Memqueue<Name=v1-production-notices-add>: buffered=0/1000 in_flight=8/16 processed=16184022 fails=0 retries=0 avg_dur=42ms min_dur=100µs max_dur=5.102s
Documentation ¶
Overview ¶
Package msgqueue implements task/job queue with in-memory, SQS, IronMQ backends.
go-msgqueue is a thin wrapper for SQS and IronMQ clients that uses Redis to implement rate limiting and call once semantic.
go-msgqueue consists of following components:
- memqueue - in memory queue that can be used for local unit testing.
- azsqs - Amazon SQS backend.
- ironmq - IronMQ backend.
- Manager - provides common interface for creating new queues.
- Processor - queue processor that works with memqueue, azsqs, and ironmq.
rate limiting is implemented in the processor package using https://github.com/go-redis/redis_rate. Call once is implemented in clients by checking if message name exists in Redis database.
Example (Once) ¶
Output: hello world
Example (RateLimit) ¶
Output: 1 msg/s
Index ¶
- Variables
- func SetLogger(logger *log.Logger)
- type Batcher
- type BatcherOptions
- type Delayer
- type Handler
- type HandlerFunc
- type Manager
- type Message
- type Options
- type Processor
- func (p *Processor) Add(msg *Message) error
- func (p *Processor) Len() int
- func (p *Processor) Options() *Options
- func (p *Processor) Process(msg *Message) error
- func (p *Processor) ProcessAll() error
- func (p *Processor) ProcessOne() error
- func (p *Processor) Purge() error
- func (p *Processor) Put(msg *Message)
- func (p *Processor) Queue() Queue
- func (p *Processor) Start() error
- func (p *Processor) Stats() *ProcessorStats
- func (p *Processor) Stop() error
- func (p *Processor) StopTimeout(timeout time.Duration) error
- func (p *Processor) String() string
- type ProcessorStats
- type Queue
- type RateLimiter
- type Redis
- type Storage
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrDuplicate = errors.New("queue: message with such name already exists")
ErrDuplicate is returned when adding duplicate message to the queue.
Functions ¶
Types ¶
type Batcher ¶ added in v1.3.0
type Batcher struct {
// contains filtered or unexported fields
}
Batcher collects messages for later batch processing.
func NewBatcher ¶ added in v1.3.0
func NewBatcher(p *Processor, opt *BatcherOptions) *Batcher
type BatcherOptions ¶ added in v1.3.0
type HandlerFunc ¶
func (HandlerFunc) HandleMessage ¶
func (fn HandlerFunc) HandleMessage(msg *Message) error
type Manager ¶ added in v1.3.0
Manager is an interface that abstracts creation of new queues. It is implemented in subpackages memqueue, azsqs, and ironmq.
type Message ¶
type Message struct { // SQS/IronMQ message id. Id string // Optional name for the message. Messages with the same name // are processed only once. Name string // Delay specifies the duration the queue must wait // before executing the message. Delay time.Duration // Function args passed to the handler. Args []interface{} // Text representation of the Args. Body string // SQS/IronMQ reservation id that is used to release/delete the message.. ReservationId string // The number of times the message has been reserved or released. ReservedCount int Err error }
Message is used to create and retrieve messages from a queue.
func NewMessage ¶
func NewMessage(args ...interface{}) *Message
func (*Message) EncodeBody ¶ added in v1.3.9
func (*Message) SetDelayName ¶
SetDelayName sets delay and generates message name from the args.
type Options ¶
type Options struct { // Queue name. Name string // Queue group name. GroupName string // Function called to process a message. Handler interface{} // Function called to process failed message. FallbackHandler interface{} // Maximum number of goroutines processing messages. // Default is 32 * number of CPUs. MaxWorkers int // Global limit of concurrently running workers across all servers. // Overrides MaxWorkers. WorkerLimit int // Maximum number of goroutines fetching messages. // Default is 4 * number of CPUs. MaxFetchers int // Compress data before sending to the queue. Compress bool // Number of messages reserved by a fetcher in the queue in one request. // Default is 10 messages. ReservationSize int // Time after which the reserved message is returned to the queue. // Default is 5 minutes. ReservationTimeout time.Duration // Time that a long polling receive call waits for a message to become // available before returning an empty response. // Default is 10 seconds. WaitTimeout time.Duration // Size of the buffer where reserved messages are stored. // Default is the same as ReservationSize. BufferSize int // Number of tries/releases after which the message fails permanently // and is deleted. // Default is 64 retries. RetryLimit int // Minimum backoff time between retries. // Default is 30 seconds. MinBackoff time.Duration // Maximum backoff time between retries. // Default is 30 minutes. MaxBackoff time.Duration // Number of consecutive failures after which queue processing is paused. // Default is 100 failures. PauseErrorsThreshold int // Processing rate limit. RateLimit rate.Limit // Optional rate limiter interface. The default is to use Redis. RateLimiter RateLimiter // Redis client that is used for storing metadata. Redis Redis // Optional storage interface. The default is to use Redis. Storage Storage // contains filtered or unexported fields }
type Processor ¶ added in v1.3.0
type Processor struct {
// contains filtered or unexported fields
}
Processor reserves messages from the queue, processes them, and then either releases or deletes messages from the queue.
func NewProcessor ¶ added in v1.3.0
New creates new Processor for the queue using provided processing options.
func StartProcessor ¶ added in v1.3.0
Starts creates new Processor and starts it.
func (*Processor) Process ¶ added in v1.3.0
Process is low-level API to process message bypassing the internal queue.
func (*Processor) ProcessAll ¶ added in v1.3.0
ProcessAll starts workers to process messages in the queue and then stops them when all messages are processed.
func (*Processor) ProcessOne ¶ added in v1.3.0
ProcessOne processes at most one message in the queue.
func (*Processor) Stats ¶ added in v1.3.0
func (p *Processor) Stats() *ProcessorStats
Stats returns processor stats.
func (*Processor) StopTimeout ¶ added in v1.3.0
StopTimeout waits workers for timeout duration to finish processing current messages and stops workers.
type ProcessorStats ¶ added in v1.3.0
type Queue ¶ added in v1.3.0
type Queue interface { Name() string Processor() *Processor Add(msg *Message) error Call(args ...interface{}) error CallOnce(dur time.Duration, args ...interface{}) error Len() (int, error) ReserveN(n int, reservationTimeout time.Duration, waitTimeout time.Duration) ([]*Message, error) Release(*Message) error Delete(msg *Message) error Purge() error Close() error CloseTimeout(timeout time.Duration) error }
type RateLimiter ¶
type Redis ¶
type Redis interface { Del(keys ...string) *redis.IntCmd SetNX(key string, value interface{}, expiration time.Duration) *redis.BoolCmd SAdd(key string, members ...interface{}) *redis.IntCmd SMembers(key string) *redis.StringSliceCmd Pipelined(func(pipe redis.Pipeliner) error) ([]redis.Cmder, error) Eval(script string, keys []string, args ...interface{}) *redis.Cmd EvalSha(sha1 string, keys []string, args ...interface{}) *redis.Cmd ScriptExists(scripts ...string) *redis.BoolSliceCmd ScriptLoad(script string) *redis.StringCmd Publish(channel string, message interface{}) *redis.IntCmd }