Documentation ¶
Overview ¶
Package msgqueue implements a SQS & IronMQ client with rate limiting and call once.
Example (CustomRateLimit) ¶
package main import ( "fmt" "time" "github.com/go-msgqueue/msgqueue" "github.com/go-msgqueue/msgqueue/memqueue" ) type RateLimitError string func (e RateLimitError) Error() string { return string(e) } func (RateLimitError) Delay() time.Duration { return 3 * time.Second } func main() { start := time.Now() q := memqueue.NewQueue(&msgqueue.Options{ Handler: func() error { fmt.Println("retried in", timeSince(start)) return RateLimitError("calm down") }, RetryLimit: 2, MinBackoff: time.Millisecond, }) defer q.Close() q.Processor().Stop() q.Call() q.Processor().ProcessAll() }
Output: retried in 0s retried in 3s
Example (MaxWorkers) ¶
package main import ( "fmt" "math" "time" "github.com/go-redis/redis" "github.com/go-msgqueue/msgqueue" "github.com/go-msgqueue/msgqueue/memqueue" ) func redisRing() *redis.Ring { ring := redis.NewRing(&redis.RingOptions{ Addrs: map[string]string{"0": ":6379"}, PoolSize: 100, }) err := ring.FlushDb().Err() if err != nil { panic(err) } return ring } func timeSince(start time.Time) time.Duration { secs := float64(time.Since(start)) / float64(time.Second) return time.Duration(math.Floor(secs)) * time.Second } func main() { start := time.Now() q := memqueue.NewQueue(&msgqueue.Options{ Handler: func() { fmt.Println(timeSince(start)) time.Sleep(time.Second) }, Redis: redisRing(), WorkerLimit: 1, }) for i := 0; i < 3; i++ { q.Call() } // Close queue to make sure all messages are processed. _ = q.Close() }
Output: 0s 1s 2s
Example (MessageDelay) ¶
package main import ( "fmt" "math" "time" "github.com/go-msgqueue/msgqueue" "github.com/go-msgqueue/msgqueue/memqueue" ) func timeSince(start time.Time) time.Duration { secs := float64(time.Since(start)) / float64(time.Second) return time.Duration(math.Floor(secs)) * time.Second } func main() { start := time.Now() q := memqueue.NewQueue(&msgqueue.Options{ Handler: func() { fmt.Println("processed with delay", timeSince(start)) }, }) defer q.Close() q.Processor().Stop() msg := msgqueue.NewMessage() msg.Delay = time.Second q.Add(msg) q.Processor().ProcessAll() }
Output: processed with delay 1s
Example (Once) ¶
package main import ( "fmt" "time" "github.com/go-redis/redis" "golang.org/x/time/rate" "github.com/go-msgqueue/msgqueue" "github.com/go-msgqueue/msgqueue/memqueue" ) func redisRing() *redis.Ring { ring := redis.NewRing(&redis.RingOptions{ Addrs: map[string]string{"0": ":6379"}, PoolSize: 100, }) err := ring.FlushDb().Err() if err != nil { panic(err) } return ring } func main() { q := memqueue.NewQueue(&msgqueue.Options{ Handler: func(name string) { fmt.Println("hello", name) }, Redis: redisRing(), RateLimit: rate.Every(time.Second), }) for _, name := range []string{"world", "adele"} { for i := 0; i < 10; i++ { // Call once in a second. q.CallOnce(time.Second, name) } } // Close queue to make sure all messages are processed. _ = q.Close() }
Output: hello world hello adele
Example (RateLimit) ¶
package main import ( "fmt" "math" "time" "github.com/go-redis/redis" "golang.org/x/time/rate" "github.com/go-msgqueue/msgqueue" "github.com/go-msgqueue/msgqueue/memqueue" ) func redisRing() *redis.Ring { ring := redis.NewRing(&redis.RingOptions{ Addrs: map[string]string{"0": ":6379"}, PoolSize: 100, }) err := ring.FlushDb().Err() if err != nil { panic(err) } return ring } func timeSinceCeil(start time.Time) time.Duration { secs := float64(time.Since(start)) / float64(time.Second) return time.Duration(math.Ceil(secs)) * time.Second } func main() { start := time.Now() q := memqueue.NewQueue(&msgqueue.Options{ Handler: func() { fmt.Println(timeSinceCeil(start)) }, Redis: redisRing(), RateLimit: rate.Every(time.Second), }) for i := 0; i < 5; i++ { q.Call() } // Close queue to make sure all messages are processed. _ = q.Close() }
Output: 1s 1s 2s 3s 4s
Example (RetryOnError) ¶
package main import ( "errors" "fmt" "math" "time" "github.com/go-msgqueue/msgqueue" "github.com/go-msgqueue/msgqueue/memqueue" ) func timeSince(start time.Time) time.Duration { secs := float64(time.Since(start)) / float64(time.Second) return time.Duration(math.Floor(secs)) * time.Second } func main() { start := time.Now() q := memqueue.NewQueue(&msgqueue.Options{ Handler: func() error { fmt.Println("retried in", timeSince(start)) return errors.New("fake error") }, RetryLimit: 3, MinBackoff: time.Second, }) defer q.Close() q.Processor().Stop() q.Call() q.Processor().ProcessAll() }
Output: retried in 0s retried in 1s retried in 3s
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var ErrDuplicate = errors.New("queue: message with such name already exists")
Functions ¶
This section is empty.
Types ¶
type Handler ¶
func NewHandler ¶
func NewHandler(fn interface{}) Handler
type HandlerFunc ¶
func (HandlerFunc) HandleMessage ¶
func (fn HandlerFunc) HandleMessage(msg *Message) error
type Message ¶
type Message struct { // SQS/IronMQ message id. Id string // Optional name for the message. Messages with the same name // are processed only once. Name string // Delay specifies the duration the queue must wait // before executing the message. Delay time.Duration // Function args passed to the handler. Args []interface{} // Text representation of the Args. Body string // SQS/IronMQ reservation id that is used to release/delete the message.. ReservationId string // The number of times the message has been reserved or released. ReservedCount int }
Message is used to create and retrieve messages from a queue.
func NewMessage ¶
func NewMessage(args ...interface{}) *Message
func (*Message) MarshalArgs ¶
func (*Message) SetDelayName ¶
SetDelayName sets delay and generates message name from the args.
type Options ¶
type Options struct { // Queue name. Name string // Queue group name. GroupName string // Function called to process a message. Handler interface{} // Function called to process failed message. FallbackHandler interface{} // Number of goroutines processing messages. WorkerNumber int // Global limit of concurrently running workers. Overrides WorkerNumber. WorkerLimit int // Size of the buffer where reserved messages are stored. BufferSize int // Time after which the reserved message is returned to the queue. ReservationTimeout time.Duration // Number of tries/releases after which the message fails permanently // and is deleted. RetryLimit int // Minimum time between retries. MinBackoff time.Duration // Processing rate limit. RateLimit rate.Limit // Redis client that is used for storing metadata. Redis Redis // Optional storage interface. The default is to use Redis. Storage Storage // Optional rate limiter interface. The default is to use Redis. RateLimiter RateLimiter // contains filtered or unexported fields }
type RateLimiter ¶
type Redis ¶
type Redis interface { Del(keys ...string) *redis.IntCmd SetNX(key string, value interface{}, expiration time.Duration) *redis.BoolCmd SAdd(key string, members ...interface{}) *redis.IntCmd SMembers(key string) *redis.StringSliceCmd Pipelined(func(pipe redis.Pipeliner) error) ([]redis.Cmder, error) Eval(script string, keys []string, args ...interface{}) *redis.Cmd Publish(channel, message string) *redis.IntCmd }
Click to show internal directories.
Click to hide internal directories.