simqueue

package
v0.0.0-...-d1bef67 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 17, 2024 License: AGPL-3.0 Imports: 13 Imported by: 0

Documentation

Overview

Package simqueue is a queue implementation that uses redis as a backend.

Queue uses one sorted set in redis to store items. It implements a priority queue with the rules described below.

Usage: 1. Create a new queue instance with `NewRedisQueue`. 2. Start processing loop with `StartProcessLoop`. 3. Push items to the queue with `Push`. 4. Queue needs to be updated with the current block number regularly. It does not update the block number automatically.

NOTE: Queue is not 100% reliable.

 There is a small chance that an item is lost when worker who claimed the item crashes or loses connection to the
	network.

	The impact of this is reduced by the fact that workers don't hold more items than they are processing.
	So the max number of items that can be lost in a catastrophic event is equal to the number of workers.
	See shutdown section below on how to avoid loss on normal shutdown.

Queue submission: 1. Client pushes an item to the queue specifying:

  • the target block number range when the item should be processed.
  • whether the item is high priority or not. 2. The queue stores the item in a sorted set with the score being the minimal target block number. 3. If the queue is full, the item is discarded and `ErrQueueFull` is returned. There is a limit on the number of elements in the queue items.

Queue processing:

  1. The queue is processed in a loop by number of workers in parallel. Amount of workers is determined by the number of `ProcessFunc` functions passed to `StartProcessLoop`. Each worker is working on one item at a time. So to fully saturate worker node that can process multiple items in parallel you need to start multiple workers for the same node.

  2. The queue is processed in the following way: * The worker pops next item. Order of items is determined by the following rules: * Items with lower target block number are processed first. If target block number is not reached yet, the item is rescheduled. If target block number is the same, priority is determined lexicographically in the following order: + high priority + number of retries while processing this item + time of submission + max target block number + payload data itself + The worker calls the `ProcessFunc` function with the payload data.

    The `ProcessFunc` function is responsible for processing the item. * It should return `nil` if the item was processed successfully. If item should be retried on the next block, the `ErrProcessScheduleNextBlock` error should be returned. If item should be retried on the same block (worker is faulty), the `ErrProcessWorkerError` error should be returned. * If the `ProcessFunc` function returns `ErrProcess*` error, the item is rescheduled but up to `maxRetries` times. Rescheduling is needed so in case of a worker error (one of the nodes in the cluster is down) the item is added back to the queue and processed by (hopefully) another worker. maxRetries is needed to prevent infinite loop in case of a bug. Rescheduling for the next block is needed if bundle fails, but it is still possible that it will work on the next block. * There is an exponential backoff between retries for the worker so if the worker is constantly failing to process item it will get less and less work.

Queue shutdown: 1. Workers can be shutdown by cancelling the context passed to `StartProcessLoop`. 2. SyncGroup returned form `StartProcessLoop` can be used to wait for all workers to finish processing.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrBlockNumberIncorrect = errors.New("block number is invalid")
	ErrStaleItem            = errors.New("item is stale")
	ErrQueueFull            = errors.New("queue is full")
	ErrMaxRetriesReached    = errors.New("max retries reached")
	ErrNoNextBlock          = errors.New("failed to requeue item, no next block available")
	ErrRequeueFailed        = errors.New("item requeue failed")
)
View Source
var (
	// ErrProcessScheduleNextBlock is returned by ProcessFunc if item should be retried on the next block.
	ErrProcessScheduleNextBlock = errors.New("try to schedule item for the next block")
	// ErrProcessWorkerError is returned by ProcessFunc if item should be retried on the same block by a different worker.
	ErrProcessWorkerError = errors.New("worker error, retry processing on another worker")
	// ErrProcessUnrecoverable is returned by ProcessFunc if item should not be retried.
	// For example, if an item was canceled or error is unrecoverable.
	ErrProcessUnrecoverable = errors.New("unrecoverable error, item will not be retried")
)

Errors returned by ProcessFunc.

View Source
var DefaultQueueConfig = RedisQueueConfig{
	MaxRetries:                          30,
	MaxQueuedProcessableItemsLowPrio:    1024,
	MaxQueuedProcessableItemsHighPrio:   1024 * 2,
	MaxQueuedUnprocessableItemsLowPrio:  1024 * 3,
	MaxQueuedUnprocessableItemsHighPrio: 1024 * 4,
	WorkerTimeout:                       4 * time.Second,
}

Functions

This section is empty.

Types

type ProcessFunc

type ProcessFunc func(ctx context.Context, data []byte, info QueueItemInfo) error

func MultipleWorkers

func MultipleWorkers(processFunc ProcessFunc, n int, limit rate.Limit, burst int) []ProcessFunc

MultipleWorkers creates n workers that are rate limited by limit. Use case is to have multiple workers per simulation node. ProcessFunc must be thread safe.

type Queue

type Queue interface {
	UpdateBlock(block uint64) error
	Push(ctx context.Context, data []byte, highPriority bool, minTargetBlock, maxTargetBlock uint64) error
	StartProcessLoop(ctx context.Context, workers []ProcessFunc) *sync.WaitGroup
}

type QueueItemInfo

type QueueItemInfo struct {
	// Number of times this item was retried before the success.
	Retries int
}

type RedisQueue

type RedisQueue struct {
	Config RedisQueueConfig
	// contains filtered or unexported fields
}

func NewRedisQueue

func NewRedisQueue(log *zap.Logger, red *redis.Client, queueName string) *RedisQueue

func (*RedisQueue) CleanQueues

func (s *RedisQueue) CleanQueues(ctx context.Context) error

CleanQueues cleans all data in redis associated with the given queue NOTE: slow and dangerous operation, should only be used for testing

func (*RedisQueue) Push

func (s *RedisQueue) Push(ctx context.Context, data []byte, highPriority bool, minTargetBlock, maxTargetBlock uint64) error

func (*RedisQueue) StartProcessLoop

func (s *RedisQueue) StartProcessLoop(ctx context.Context, workers []ProcessFunc) *sync.WaitGroup

StartProcessLoop starts a loop that will process items from the queue it will spawn a goroutine for each worker. ctx can be used to signal shutdown Wait group is returned to allow for graceful shutdown

func (*RedisQueue) UpdateBlock

func (s *RedisQueue) UpdateBlock(block uint64) error

type RedisQueueConfig

type RedisQueueConfig struct {
	// MaxRetries is the maximum number of simulations for a single item.
	MaxRetries uint16
	// MaxQueuedProcessableItemsLowPrio is the maximum number of items that can be queued for immediate processing (low prio).
	MaxQueuedProcessableItemsLowPrio uint64
	// MaxQueuedProcessableItemsHighPrio is the maximum number of items that can be queued for immediate processing (high prio).
	MaxQueuedProcessableItemsHighPrio uint64
	// MaxQueuedUnprocessableItemsLowPrio is the maximum number of items that can be queued for processing in the future (low prio).
	MaxQueuedUnprocessableItemsLowPrio uint64
	// MaxQueuedUnprocessableItemsHighPrio is the maximum number of items that can be queued for processing in the future (high prio).
	MaxQueuedUnprocessableItemsHighPrio uint64
	// WorkerTimeout is the maximum time a worker can process an item.
	WorkerTimeout time.Duration
}

RedisQueueConfig is the configuration for RedisQueue. See DefaultQueueConfig for the default values. Can be loaded from environment variables using ConfigFromEnv.

func ConfigFromEnv

func ConfigFromEnv() (RedisQueueConfig, error)

ConfigFromEnv loads `simqueue` config from environment. - `SIMQUEUE_MAX_RETRIES` - `SIMQUEUE_MAX_QUEUED_PROCESSABLE_ITEMS_LOW_PRIO` - `SIMQUEUE_MAX_QUEUED_PROCESSABLE_ITEMS_HIGH_PRIO` - `SIMQUEUE_MAX_QUEUED_UNPROCESSABLE_ITEMS_LOW_PRIO` - `SIMQUEUE_MAX_QUEUED_UNPROCESSABLE_ITEMS_HIGH_PRIO` - `SIMQUEUE_WORKER_TIMEOUT_MS`

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL