Documentation ¶
Overview ¶
Package simqueue is a queue implementation that uses redis as a backend.
Queue uses one sorted set in redis to store items. It implements a priority queue with the rules described below.
Usage: 1. Create a new queue instance with `NewRedisQueue`. 2. Start processing loop with `StartProcessLoop`. 3. Push items to the queue with `Push`. 4. Queue needs to be updated with the current block number regularly. It does not update the block number automatically.
NOTE: Queue is not 100% reliable.
There is a small chance that an item is lost when worker who claimed the item crashes or loses connection to the network. The impact of this is reduced by the fact that workers don't hold more items than they are processing. So the max number of items that can be lost in a catastrophic event is equal to the number of workers. See shutdown section below on how to avoid loss on normal shutdown.
Queue submission: 1. Client pushes an item to the queue specifying:
- the target block number range when the item should be processed.
- whether the item is high priority or not. 2. The queue stores the item in a sorted set with the score being the minimal target block number. 3. If the queue is full, the item is discarded and `ErrQueueFull` is returned. There is a limit on the number of elements in the queue items.
Queue processing:
The queue is processed in a loop by number of workers in parallel. Amount of workers is determined by the number of `ProcessFunc` functions passed to `StartProcessLoop`. Each worker is working on one item at a time. So to fully saturate worker node that can process multiple items in parallel you need to start multiple workers for the same node.
The queue is processed in the following way: * The worker pops next item. Order of items is determined by the following rules: * Items with lower target block number are processed first. If target block number is not reached yet, the item is rescheduled. If target block number is the same, priority is determined lexicographically in the following order: + high priority + number of retries while processing this item + time of submission + max target block number + payload data itself + The worker calls the `ProcessFunc` function with the payload data.
The `ProcessFunc` function is responsible for processing the item. * It should return `nil` if the item was processed successfully. If item should be retried on the next block, the `ErrProcessScheduleNextBlock` error should be returned. If item should be retried on the same block (worker is faulty), the `ErrProcessWorkerError` error should be returned. * If the `ProcessFunc` function returns `ErrProcess*` error, the item is rescheduled but up to `maxRetries` times. Rescheduling is needed so in case of a worker error (one of the nodes in the cluster is down) the item is added back to the queue and processed by (hopefully) another worker. maxRetries is needed to prevent infinite loop in case of a bug. Rescheduling for the next block is needed if bundle fails, but it is still possible that it will work on the next block. * There is an exponential backoff between retries for the worker so if the worker is constantly failing to process item it will get less and less work.
Queue shutdown: 1. Workers can be shutdown by cancelling the context passed to `StartProcessLoop`. 2. SyncGroup returned form `StartProcessLoop` can be used to wait for all workers to finish processing.
Index ¶
- Variables
- type ProcessFunc
- type Queue
- type QueueItemInfo
- type RedisQueue
- func (s *RedisQueue) CleanQueues(ctx context.Context) error
- func (s *RedisQueue) Push(ctx context.Context, data []byte, highPriority bool, ...) error
- func (s *RedisQueue) StartProcessLoop(ctx context.Context, workers []ProcessFunc) *sync.WaitGroup
- func (s *RedisQueue) UpdateBlock(block uint64) error
- type RedisQueueConfig
Constants ¶
This section is empty.
Variables ¶
var ( ErrBlockNumberIncorrect = errors.New("block number is invalid") ErrStaleItem = errors.New("item is stale") ErrQueueFull = errors.New("queue is full") ErrMaxRetriesReached = errors.New("max retries reached") ErrNoNextBlock = errors.New("failed to requeue item, no next block available") ErrRequeueFailed = errors.New("item requeue failed") )
var ( // ErrProcessScheduleNextBlock is returned by ProcessFunc if item should be retried on the next block. ErrProcessScheduleNextBlock = errors.New("try to schedule item for the next block") // ErrProcessWorkerError is returned by ProcessFunc if item should be retried on the same block by a different worker. ErrProcessWorkerError = errors.New("worker error, retry processing on another worker") // ErrProcessUnrecoverable is returned by ProcessFunc if item should not be retried. // For example, if an item was canceled or error is unrecoverable. ErrProcessUnrecoverable = errors.New("unrecoverable error, item will not be retried") )
Errors returned by ProcessFunc.
var DefaultQueueConfig = RedisQueueConfig{ MaxRetries: 30, MaxQueuedProcessableItemsLowPrio: 1024, MaxQueuedProcessableItemsHighPrio: 1024 * 2, MaxQueuedUnprocessableItemsLowPrio: 1024 * 3, MaxQueuedUnprocessableItemsHighPrio: 1024 * 4, WorkerTimeout: 4 * time.Second, }
Functions ¶
This section is empty.
Types ¶
type ProcessFunc ¶
type ProcessFunc func(ctx context.Context, data []byte, info QueueItemInfo) error
func MultipleWorkers ¶
func MultipleWorkers(processFunc ProcessFunc, n int, limit rate.Limit, burst int) []ProcessFunc
MultipleWorkers creates n workers that are rate limited by limit. Use case is to have multiple workers per simulation node. ProcessFunc must be thread safe.
type QueueItemInfo ¶
type QueueItemInfo struct { // Number of times this item was retried before the success. Retries int }
type RedisQueue ¶
type RedisQueue struct { Config RedisQueueConfig // contains filtered or unexported fields }
func NewRedisQueue ¶
func NewRedisQueue(log *zap.Logger, red *redis.Client, queueName string) *RedisQueue
func (*RedisQueue) CleanQueues ¶
func (s *RedisQueue) CleanQueues(ctx context.Context) error
CleanQueues cleans all data in redis associated with the given queue NOTE: slow and dangerous operation, should only be used for testing
func (*RedisQueue) StartProcessLoop ¶
func (s *RedisQueue) StartProcessLoop(ctx context.Context, workers []ProcessFunc) *sync.WaitGroup
StartProcessLoop starts a loop that will process items from the queue it will spawn a goroutine for each worker. ctx can be used to signal shutdown Wait group is returned to allow for graceful shutdown
func (*RedisQueue) UpdateBlock ¶
func (s *RedisQueue) UpdateBlock(block uint64) error
type RedisQueueConfig ¶
type RedisQueueConfig struct { // MaxRetries is the maximum number of simulations for a single item. MaxRetries uint16 // MaxQueuedProcessableItemsLowPrio is the maximum number of items that can be queued for immediate processing (low prio). MaxQueuedProcessableItemsLowPrio uint64 // MaxQueuedProcessableItemsHighPrio is the maximum number of items that can be queued for immediate processing (high prio). MaxQueuedProcessableItemsHighPrio uint64 // MaxQueuedUnprocessableItemsLowPrio is the maximum number of items that can be queued for processing in the future (low prio). MaxQueuedUnprocessableItemsLowPrio uint64 // MaxQueuedUnprocessableItemsHighPrio is the maximum number of items that can be queued for processing in the future (high prio). MaxQueuedUnprocessableItemsHighPrio uint64 // WorkerTimeout is the maximum time a worker can process an item. WorkerTimeout time.Duration }
RedisQueueConfig is the configuration for RedisQueue. See DefaultQueueConfig for the default values. Can be loaded from environment variables using ConfigFromEnv.
func ConfigFromEnv ¶
func ConfigFromEnv() (RedisQueueConfig, error)
ConfigFromEnv loads `simqueue` config from environment. - `SIMQUEUE_MAX_RETRIES` - `SIMQUEUE_MAX_QUEUED_PROCESSABLE_ITEMS_LOW_PRIO` - `SIMQUEUE_MAX_QUEUED_PROCESSABLE_ITEMS_HIGH_PRIO` - `SIMQUEUE_MAX_QUEUED_UNPROCESSABLE_ITEMS_LOW_PRIO` - `SIMQUEUE_MAX_QUEUED_UNPROCESSABLE_ITEMS_HIGH_PRIO` - `SIMQUEUE_WORKER_TIMEOUT_MS`