rate

package
v1.0.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 21, 2024 License: MIT Imports: 7 Imported by: 0

Documentation

Overview

Package rate contains rate limiting strategies for asynq.Handler(s).

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Semaphore

type Semaphore struct {
	// contains filtered or unexported fields
}

Semaphore is a distributed counting semaphore which can be used to set maxTokens across multiple asynq servers.

func NewSemaphore

func NewSemaphore(rco asynq.RedisConnOpt, scope string, maxTokens int) *Semaphore

NewSemaphore creates a counting Semaphore for the given scope with the given number of tokens.

Example
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/awanganddong/asynq"
	"github.com/awanganddong/asynq/x/rate"
)

type RateLimitError struct {
	RetryIn time.Duration
}

func (e *RateLimitError) Error() string {
	return fmt.Sprintf("rate limited (retry in  %v)", e.RetryIn)
}

func main() {
	redisConnOpt := asynq.RedisClientOpt{Addr: ":6379"}
	sema := rate.NewSemaphore(redisConnOpt, "my_queue", 10)
	// call sema.Close() when appropriate

	_ = asynq.HandlerFunc(func(ctx context.Context, task *asynq.Task) error {
		ok, err := sema.Acquire(ctx)
		if err != nil {
			return err
		}
		if !ok {
			return &RateLimitError{RetryIn: 30 * time.Second}
		}

		// Make sure to release the token once we're done.
		defer sema.Release(ctx)

		// Process task
		return nil
	})
}
Output:

func (*Semaphore) Acquire

func (s *Semaphore) Acquire(ctx context.Context) (bool, error)

Acquire attempts to acquire a token from the semaphore. - Returns (true, nil), iff semaphore key exists and current value is less than maxTokens - Returns (false, nil) when token cannot be acquired - Returns (false, error) otherwise

The context.Context passed to Acquire must have a deadline set, this ensures that token is released if the job goroutine crashes and does not call Release.

func (*Semaphore) Close

func (s *Semaphore) Close() error

Close closes the connection to redis.

func (*Semaphore) Release

func (s *Semaphore) Release(ctx context.Context) error

Release will release the token on the counting semaphore.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL