workqueue

package module
v2.2.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 12, 2024 License: MIT Imports: 9 Imported by: 8

README

English | 中文

logo

Go Report Card Build Status Go Reference

Introduction

WorkQueue is a high-performance, thread-safe, and memory-efficient Go library for managing work queues. It offers a variety of queue implementations such as Queue, DelayingQueue, PriorityQueue, and RateLimitingQueue, each tailored for specific use cases and performance needs. The library's design is simple, user-friendly, and platform-independent, making it suitable for a broad spectrum of applications and environments.

After several iterations and real-world usage, we've gathered valuable user feedback and insights. This led to a complete redesign and optimization of WorkQueue's architecture and underlying code in the new version (v2), significantly enhancing its robustness, reliability, and security.

Why Use WorkQueue(v2)

The WorkQueue(v2) has undergone a comprehensive architectural revamp, greatly improving its robustness and reliability. This redesign enables the library to manage demanding workloads with increased stability, making it ideal for both simple task queues and complex workflows. By utilizing advanced algorithms and optimized data structures, WorkQueue(v2) provides superior performance, efficiently managing larger task volumes, reducing latency, and increasing throughput.

WorkQueue(v2) offers a diverse set of queue implementations to cater to different needs, including standard task management, delayed execution, task prioritization, and rate-limited processing. This flexibility allows you to select the most suitable tool for your specific use case, ensuring optimal performance and functionality. With its cross-platform design, WorkQueue(v2) guarantees consistent behavior and performance across various operating systems, making it a versatile solution for different environments.

The development of WorkQueue(v2) has been heavily influenced by user feedback and real-world usage, resulting in a library that better meets the needs of its users. By addressing user-reported issues and incorporating feature requests, WorkQueue(v2) offers a more refined and user-centric experience.

Choosing WorkQueue(v2) for your application or project could be a great decision. :)

Advantages

  • User-Friendly: The intuitive design ensures easy usage, allowing users of varying skill levels to quickly become proficient.

  • No External Dependencies: The system operates independently, without the need for additional software or libraries, reducing compatibility issues and simplifying deployment.

  • High Performance: The system is optimized for speed and efficiency, swiftly handling tasks to enhance productivity and scalability.

  • Minimal Memory Usage: The design utilizes minimal system resources, ensuring smooth operation even on devices with limited hardware capabilities, and freeing up memory for other applications.

  • Thread-Safe: The system supports multi-threading, allowing for concurrent operations without the risk of data corruption or interference, providing a stable environment for multiple users or processes.

  • Supports Action Callback Functions: The system can execute predefined functions in response to specific events, enhancing interactivity, customization, and responsiveness.

  • Cross-Platform Compatibility: The system operates seamlessly across different operating systems and devices, providing flexibility for diverse user environments.

Installation

go get github.com/shengyanli1982/workqueue/v2

Benchmark

The following benchmark results demonstrate the performance of the WorkQueue library.

1. STL

1.1. List

When a linked list undergoes data modifications, the primary changes occur in the pointers of the elements, rather than directly adding elements like dynamic arrays. Over extended periods, linked lists prove to be more memory efficient than dynamic arrays.

Direct performance

$ go test -benchmem -run=^$ -bench ^BenchmarkList* .
goos: darwin
goarch: amd64
pkg: github.com/shengyanli1982/workqueue/v2/internal/container/list
cpu: Intel(R) Xeon(R) CPU E5-2643 v2 @ 3.50GHz
BenchmarkList_PushBack-12        	186905107	         6.447 ns/op	       0 B/op	       0 allocs/op
BenchmarkList_PushFront-12       	157372052	         7.701 ns/op	       0 B/op	       0 allocs/op
BenchmarkList_PopBack-12         	179555846	         6.645 ns/op	       0 B/op	       0 allocs/op
BenchmarkList_PopFront-12        	180030582	         6.989 ns/op	       0 B/op	       0 allocs/op
BenchmarkList_InsertBefore-12    	189274771	         6.406 ns/op	       0 B/op	       0 allocs/op
BenchmarkList_InsertAfter-12     	160078981	         6.490 ns/op	       0 B/op	       0 allocs/op
BenchmarkList_Remove-12          	183250782	         6.440 ns/op	       0 B/op	       0 allocs/op
BenchmarkList_MoveToFront-12     	146021263	         7.837 ns/op	       0 B/op	       0 allocs/op
BenchmarkList_MoveToBack-12      	141336429	         8.589 ns/op	       0 B/op	       0 allocs/op
BenchmarkList_Swap-12            	100000000	         10.47 ns/op	       0 B/op	       0 allocs/op

Compare with the standard library

Both the standard library and this project employ the same algorithm, leading to comparable performance. However, the list in this project provides additional features compared to the standard library. Furthermore, the list node uses sync.Pool to minimize memory allocation. Therefore, under high concurrency, the performance of the project's list may surpass that of the standard library.

$ go test -benchmem -run=^$ -bench ^BenchmarkCompare* .
goos: darwin
goarch: amd64
pkg: github.com/shengyanli1982/workqueue/v2/internal/container/list
cpu: Intel(R) Xeon(R) CPU E5-2643 v2 @ 3.50GHz
BenchmarkCompareGoStdList_PushBack-12        	 8256513	       129.4 ns/op	      56 B/op	       1 allocs/op
BenchmarkCompareGoStdList_PushFront-12       	 9448060	       115.5 ns/op	      55 B/op	       1 allocs/op
BenchmarkCompareGoStdList_PopBack-12         	178923963	        23.60 ns/op	       0 B/op	       0 allocs/op
BenchmarkCompareGoStdList_PopFront-12        	33846044	        46.40 ns/op	       0 B/op	       0 allocs/op
BenchmarkCompareGoStdList_InsertBefore-12    	12046944	        93.53 ns/op	      55 B/op	       1 allocs/op
BenchmarkCompareGoStdList_InsertAfter-12     	11364718	        94.52 ns/op	      55 B/op	       1 allocs/op
BenchmarkCompareWQList_PushBack-12           	11582172	       109.7 ns/op	      55 B/op	       1 allocs/op
BenchmarkCompareWQList_PushFront-12          	10893723	        92.67 ns/op	      55 B/op	       1 allocs/op
BenchmarkCompareWQList_PopBack-12            	181593789	         6.841 ns/op	       0 B/op	       0 allocs/op
BenchmarkCompareWQList_PopFront-12           	179179370	         7.057 ns/op	       0 B/op	       0 allocs/op
BenchmarkCompareWQList_InsertBefore-12       	 9302694	       116.5 ns/op	      55 B/op	       1 allocs/op
BenchmarkCompareWQList_InsertAfter-12        	10237197	       117.7 ns/op	      55 B/op	       1 allocs/o
1.2. Heap

Prior to version v2.1.3, this project utilized the Insertion Sort algorithm for sorting elements within the heap. However, starting from version v2.1.3, the project has transitioned to using the Red Black Tree algorithm for this purpose. The Red-Black Tree algorithm, with its time complexity of O(logn), typically outperforms the Insertion Sort algorithm in most scenarios.

1.2.1. Insertion Sort algorithm

Direct performance

The project uses the Insertion Sort algorithm to sort elements in the heap. In a sorted array, the time complexity of the Insertion Sort algorithm is O(n). In this project, a list is used to store the elements in the heap. Each element is appended to the end of the list and then sorted.

$ go test -benchmem -run=^$ -bench ^BenchmarkHeap* .
goos: darwin
goarch: amd64
pkg: github.com/shengyanli1982/workqueue/v2/internal/container/heap
cpu: Intel(R) Xeon(R) CPU E5-2643 v2 @ 3.50GHz
BenchmarkHeap_Push-12      	  115560	 	123634 ns/op	       0 B/op	       0 allocs/op
BenchmarkHeap_Pop-12       	176871700	    	10.66 ns/op	       0 B/op	       0 allocs/op
BenchmarkHeap_Remove-12    	1000000000	      	1.217 ns/op	       0 B/op	       0 allocs/op

Compare with the standard library

The heap in this project uses the Insertion Sort algorithm for sorting elements, while the standard library uses the container/heap package to implement the heap. The time complexity of the standard library's sorting is O(logn), while the project's sorting has a time complexity of O(n). Therefore, the project's sorting is slower than the standard library's. However, this is due to the difference in the algorithms used, and thus, a direct comparison may not be fair.

[!TIP]

The Insertion Sort algorithm can provide a stable and consistent sorting, unlike the binary heap. If you have any better suggestions, please feel free to share.

$ go test -benchmem -run=^$ -bench ^BenchmarkCompare* .
goos: darwin
goarch: amd64
pkg: github.com/shengyanli1982/workqueue/v2/internal/container/heap
cpu: Intel(R) Xeon(R) CPU E5-2643 v2 @ 3.50GHz
BenchmarkCompareGoStdHeap_Push-12    	 4552110	       278.9 ns/op	      92 B/op	       1 allocs/op
BenchmarkCompareGoStdHeap_Pop-12     	 3726718	       362.9 ns/op	       0 B/op	       0 allocs/op
BenchmarkCompareWQHeap_Push-12       	  109158	    121247 ns/op	      48 B/op	       1 allocs/op
BenchmarkCompareWQHeap_Pop-12        	174782917	        15.10 ns/op	       0 B/op	       0 allocs/op
1.2.2. Red-Black Tree algorithm

Direct performance

The project uses the Red-Black Tree algorithm to sort elements in the heap. The time complexity of the Red-Black Tree algorithm is O(logn). In this project, a tree is used to store the elements in the heap. Each element is added to the tree and then sorted.

$ go test -benchmem -run=^$ -bench ^BenchmarkHeap* .
goos: darwin
goarch: amd64
pkg: github.com/shengyanli1982/workqueue/v2/internal/container/heap
cpu: Intel(R) Xeon(R) CPU E5-2643 v2 @ 3.50GHz
BenchmarkHeap_Push-12      	 5630415	       257.5 ns/op	       0 B/op	       0 allocs/op
BenchmarkHeap_Pop-12       	16859534	       117.4 ns/op	       0 B/op	       0 allocs/op
BenchmarkHeap_Remove-12    	148432172	         8.197 ns/op	       0 B/op	       0 allocs/op

Compare with the standard library

The heap in this project uses the Red-Black Tree algorithm for sorting elements, while the standard library uses the container/heap package to implement the heap. The time complexity of the standard library's sorting is O(logn), while the project's sorting has a time complexity of O(logn). Therefore, the project's sorting is same as the standard library's. But the project's sorting is more stable and consistent than the standard library's.

$ go test -benchmem -run=^$ -bench ^BenchmarkCompare* .
goos: darwin
goarch: amd64
pkg: github.com/shengyanli1982/workqueue/v2/internal/container/heap
cpu: Intel(R) Xeon(R) CPU E5-2643 v2 @ 3.50GHz
BenchmarkCompareGoStdHeap_Push-12    	 4368770	       283.3 ns/op	     110 B/op	       1 allocs/op
BenchmarkCompareGoStdHeap_Pop-12     	 3745934	       357.6 ns/op	       0 B/op	       0 allocs/op
BenchmarkCompareWQHeap_Push-12       	 4252489	       350.2 ns/op	      64 B/op	       1 allocs/op
BenchmarkCompareWQHeap_Pop-12        	15759519	       116.7 ns/op	       0 B/op	       0 allocs/op
Struct Memory Alignment

In essence, memory alignment enhances performance, minimizes CPU cycles, reduces power usage, boosts stability, and ensures predictable behavior. This is why it's considered a best practice to align data in memory, especially on contemporary 64-bit CPUs.

Node struct alignment:

---- Fields in struct ----
+----+----------------+-----------+-----------+
| ID |   FIELDTYPE    | FIELDNAME | FIELDSIZE |
+----+----------------+-----------+-----------+
| A  | unsafe.Pointer | parentRef | 8         |
| B  | int64          | Priority  | 8         |
| C  | int64          | Color     | 8         |
| D  | *list.Node     | Left      | 8         |
| E  | *list.Node     | Right     | 8         |
| F  | *list.Node     | Parent    | 8         |
| G  | interface {}   | Value     | 16        |
+----+----------------+-----------+-----------+
---- Memory layout ----
|A|A|A|A|A|A|A|A|
|B|B|B|B|B|B|B|B|
|C|C|C|C|C|C|C|C|
|D|D|D|D|D|D|D|D|
|E|E|E|E|E|E|E|E|
|F|F|F|F|F|F|F|F|
|G|G|G|G|G|G|G|G|
|G|G|G|G|G|G|G|G|

total cost: 64 Bytes.

2. Queues

Here are the benchmark results for all queues in the WorkQueue library.

[!NOTE]

The RateLimitingQueue is quite slow due to its use of bucket-based rate limiting. It's not recommended for high-performance scenarios.

$ go test -benchmem -run=^$ -bench ^Benchmark* .
goos: darwin
goarch: amd64
pkg: github.com/shengyanli1982/workqueue/v2
cpu: Intel(R) Xeon(R) CPU E5-2643 v2 @ 3.50GHz
BenchmarkDelayingQueue_Put-12                             4635976           255.6 ns/op        72 B/op          1 allocs/op
BenchmarkDelayingQueue_PutWithDelay-12                    1635588           784.7 ns/op        71 B/op          1 allocs/op
BenchmarkDelayingQueue_Get-12                            24795136            47.53 ns/op       21 B/op          0 allocs/op
BenchmarkDelayingQueue_PutAndGet-12                      15995890            75.25 ns/op        7 B/op          0 allocs/op
BenchmarkDelayingQueue_PutWithDelayAndGet-12              1731825           664.3 ns/op        29 B/op          1 allocs/op
BenchmarkPriorityQueue_Put-12                             3030818           433.5 ns/op        71 B/op          1 allocs/op
BenchmarkPriorityQueue_PutWithPriority-12                 2937105           452.0 ns/op        71 B/op          1 allocs/op
BenchmarkPriorityQueue_Get-12                            11245106           134.3 ns/op        23 B/op          0 allocs/op
BenchmarkPriorityQueue_PutAndGet-12                      12962031            92.24 ns/op        7 B/op          0 allocs/op
BenchmarkPriorityQueue_PutWithPriorityAndGet-12          14543769            83.70 ns/op        7 B/op          0 allocs/op
BenchmarkQueue_Put-12                                     6102608           206.1 ns/op        71 B/op          1 allocs/op
BenchmarkQueue_Get-12                                    30304675            45.30 ns/op       17 B/op          0 allocs/op
BenchmarkQueue_PutAndGet-12                              17171174            71.83 ns/op        7 B/op          0 allocs/op
BenchmarkQueue_Idempotent_Put-12                          1573570           706.9 ns/op       136 B/op          3 allocs/op
BenchmarkQueue_Idempotent_Get-12                          2275533           534.4 ns/op       105 B/op          0 allocs/op
BenchmarkQueue_Idempotent_PutAndGet-12                    2551188           494.5 ns/op        75 B/op          1 allocs/op
BenchmarkRateLimitingQueue_Put-12                         5852602           214.0 ns/op        71 B/op          1 allocs/op
BenchmarkRateLimitingQueue_PutWithLimited-12              1412991           852.6 ns/op       135 B/op          2 allocs/op
BenchmarkRateLimitingQueue_Get-12                        28186063            49.60 ns/op       19 B/op          0 allocs/op
BenchmarkRateLimitingQueue_PutAndGet-12                  15600679            75.69 ns/op        7 B/op          0 allocs/op
BenchmarkRateLimitingQueue_PutWithLimitedAndGet-12        1395084           855.5 ns/op       135 B/op          2 allocs/op

Quick Start

For more examples on how to use WorkQueue, please refer to the examples directory.

1. Queue

The Queue is a simple FIFO (First In, First Out) queue that serves as the base for all other queues in this project. It maintains a dirty set and a processing set to keep track of the queue's state.

The dirty set contains items that have been added to the queue but have not yet been processed. The processing set contains items that are currently being processed.

[!IMPORTANT]

If you create a new queue with the WithValueIdempotent configuration, the queue will automatically remove duplicate items. This means that if you put the same item into the queue, the queue will only keep one instance of that item.

However, the parameter for PutXXX functions should refer to an object that can be hashed by the map in the Go standard library. If the object cannot be hashed, such as pointers or slices, the program may throw an error. To solve this problem, you can use WithSetCreator to create a custom set which can handle these objects.

Config

The Queue has several configuration options that can be set when creating a queue.

  • WithCallback: Sets callback functions.
  • WithValueIdempotent: Enables item idempotency for the queue.
  • WithSetCreator: Sets the creator function for the queue's internal set.
Methods
  • Shutdown: Terminates the queue, preventing it from accepting new tasks.
  • IsClosed: Checks if the queue is closed, returns a boolean.
  • Len: Returns the number of elements in the queue.
  • Values: Returns all elements in the queue as a slice.
  • Range: Iterates over all elements in the queue.
  • Put: Adds an element to the queue.
  • Get: Retrieves an element from the queue.
  • Done: Notifies the queue that an element has been processed.

[!NOTE]

The Done function is only used when the queue is created with the WithValueIdempotent option. If you don't use this option, you don't need to call this function.

Callbacks
  • OnPut: Invoked when an item is added to the queue.
  • OnGet: Invoked when an item is retrieved from the queue.
  • OnDone: Invoked when an item has been processed.
Example
package main

import (
	"errors"
	"fmt"
	"sync"
	"time"

	wkq "github.com/shengyanli1982/workqueue/v2"
)

// consumer 函数是一个消费者函数,它从队列中获取元素并处理它们
// The consumer function is a consumer function that gets elements from the queue and processes them
func consumer(queue wkq.Queue, wg *sync.WaitGroup) {
	// 当函数返回时,调用 wg.Done() 来通知 WaitGroup 一个任务已经完成
	// When the function returns, call wg.Done() to notify the WaitGroup that a task has been completed
	defer wg.Done()

	// 无限循环,直到函数返回
	// Infinite loop until the function returns
	for {
		// 从队列中获取一个元素
		// Get an element from the queue
		element, err := queue.Get()

		// 如果获取元素时发生错误,则处理错误
		// If an error occurs when getting the element, handle the error
		if err != nil {
			// 如果错误不是因为队列为空,则打印错误并返回
			// If the error is not because the queue is empty, print the error and return
			if !errors.Is(err, wkq.ErrQueueIsEmpty) {
				fmt.Println(err)
				return
			} else {
				// 如果错误是因为队列为空,则继续循环
				// If the error is because the queue is empty, continue the loop
				continue
			}
		}

		// 打印获取到的元素
		// Print the obtained element
		fmt.Println("> get element:", element)

		// 标记元素为已处理,'Done' 是在 'Get' 之后必需的
		// Mark the element as done, 'Done' is required after 'Get'
		queue.Done(element)
	}
}

func main() {
	// 创建一个 WaitGroup,用于等待所有的 goroutine 完成
	// Create a WaitGroup to wait for all goroutines to complete
	wg := sync.WaitGroup{}

	// 创建一个新的队列
	// Create a new queue
	queue := wkq.NewQueue(nil)

	// 增加 WaitGroup 的计数器
	// Increase the counter of the WaitGroup
	wg.Add(1)

	// 启动一个新的 goroutine 来运行 comsumer 函数
	// Start a new goroutine to run the comsumer function
	go consumer(queue, &wg)

	// 将 "hello" 放入队列
	// Put "hello" into the queue
	_ = queue.Put("hello")

	// 将 "world" 放入队列
	// Put "world" into the queue
	_ = queue.Put("world")

	// 等待一秒钟,让 comsumer 有机会处理队列中的元素
	// Wait for a second to give the comsumer a chance to process the elements in the queue
	time.Sleep(time.Second)

	// 关闭队列
	// Shut down the queue
	queue.Shutdown()

	// 等待所有的 goroutine 完成
	// Wait for all goroutines to complete
	wg.Wait()
}

Result

$ go run demo.go
> get element: hello
> get element: world
queue is shutting down

2. Delaying Queue

The Delaying Queue is a queue that supports delayed execution. It builds upon the Queue and uses a Heap to manage the expiration times of the elements. When you add an element to the queue, you can specify a delay time. The elements are then sorted by this delay time and executed after the specified delay has passed.

[!TIP]

When the Delaying Queue is empty in the Heap or the first element is not due, it will wait every heartbeat time for an element in the Heap that can be processed. This means that there may be a slight deviation in the actual delay time of the element. The actual mini delay time is the "element delay time + 300ms".

If precise timing is important for your project, you may consider using the kairos project I wrote.

Configuration

The Delaying Queue inherits the configuration of the Queue.

  • WithCallback: Sets callback functions.
Methods

The Delaying Queue inherits the methods of the Queue. Additionally, it introduces the following method:

  • PutWithDelay: Adds an element to the queue with a specified delay.
  • HeapRange: Iterates over all elements in the heap.
Callbacks

The Delaying Queue inherits the callbacks of the Queue. Additionally, it introduces the following callbacks:

  • OnDelay: Invoked when an element is added to the queue with a specified delay.
  • OnPullError: Invoked when an error occurs while pulling an element from the heap to the queue.
Example
package main

import (
	"errors"
	"fmt"
	"sync"
	"time"

	wkq "github.com/shengyanli1982/workqueue/v2"
)

// consumer 函数是一个消费者函数,它从队列中获取元素并处理它们
// The consumer function is a consumer function that gets elements from the queue and processes them
func consumer(queue wkq.Queue, wg *sync.WaitGroup) {
	// 当函数返回时,调用 wg.Done() 来通知 WaitGroup 一个任务已经完成
	// When the function returns, call wg.Done() to notify the WaitGroup that a task has been completed
	defer wg.Done()

	// 无限循环,直到函数返回
	// Infinite loop until the function returns
	for {
		// 从队列中获取一个元素
		// Get an element from the queue
		element, err := queue.Get()

		// 如果获取元素时发生错误,则处理错误
		// If an error occurs when getting the element, handle the error
		if err != nil {
			// 如果错误不是因为队列为空,则打印错误并返回
			// If the error is not because the queue is empty, print the error and return
			if !errors.Is(err, wkq.ErrQueueIsEmpty) {
				fmt.Println(err)
				return
			} else {
				// 如果错误是因为队列为空,则继续循环
				// If the error is because the queue is empty, continue the loop
				continue
			}
		}

		// 打印获取到的元素
		// Print the obtained element
		fmt.Println("> get element:", element)

		// 标记元素为已处理,'Done' 是在 'Get' 之后必需的
		// Mark the element as done, 'Done' is required after 'Get'
		queue.Done(element)
	}
}

func main() {
	// 创建一个 WaitGroup,用于等待所有的 goroutine 完成
	// Create a WaitGroup to wait for all goroutines to complete
	wg := sync.WaitGroup{}

	// 创建一个新的队列
	// Create a new queue
	queue := wkq.NewDelayingQueue(nil)

	// 增加 WaitGroup 的计数器
	// Increase the counter of the WaitGroup
	wg.Add(1)

	// 启动一个新的 goroutine 来运行 consumer 函数
	// Start a new goroutine to run the consumer function
	go consumer(queue, &wg)

	// 将 "delay 1" 放入队列,并设置其延迟时间为 200 毫秒
	// Put "delay 1" into the queue and set its delay time to 200 milliseconds
	_ = queue.PutWithDelay("delay 1", 200)

	// 将 "delay 2" 放入队列,并设置其延迟时间为 100 毫秒
	// Put "delay 2" into the queue and set its delay time to 100 milliseconds
	_ = queue.PutWithDelay("delay 2", 100)

	// 将 "hello" 放入队列
	// Put "hello" into the queue
	_ = queue.Put("hello")

	// 将 "world" 放入队列
	// Put "world" into the queue
	_ = queue.Put("world")

	// 等待一秒钟,让 comsumer 有机会处理队列中的元素
	// Wait for a second to give the comsumer a chance to process the elements in the queue
	time.Sleep(time.Second)

	// 关闭队列
	// Shut down the queue
	queue.Shutdown()

	// 等待所有的 goroutine 完成
	// Wait for all goroutines to complete
	wg.Wait()
}

Result

$ go run demo.go
> get element: hello
> get element: world
> get element: delay 2
> get element: delay 1
queue is shutting down

3. Priority Queue

The Priority Queue is a queue that facilitates prioritized execution. It is constructed on the foundation of the Queue and employs a Heap to manage the priorities of the elements. In the Priority Queue, elements are sorted according to their priorities. Both Queue and Heap utilize the same element structure and storage.

When you add an element to the queue, you can designate its priority. The elements are subsequently sorted and executed based on these priorities. However, if an element has a very low priority and another has a very high priority, the low priority element may never be executed. Exercise Caution !!!

Configuration

The Priority Queue inherits the configuration of the Queue.

  • WithCallback: Sets callback functions.
Methods

The Priority Queue inherits the methods of the Queue. Additionally, it provides the following methods:

  • PutWithPriority: Adds an element to the queue with a specified priority.
  • Put: Adds an element to the queue with a default priority (value is 0).
Callbacks

The Priority Queue inherits the callbacks of the Queue. Additionally, it provides the following callback:

  • OnPriority: Invoked when an element is added to the queue with a specified priority.

[!TIP]

Note that in the Priority Queue, when an element is added, the OnPut callback is not triggered. Instead, the OnPriority callback is exclusively invoked.

Example
package main

import (
	"errors"
	"fmt"
	"sync"
	"time"

	wkq "github.com/shengyanli1982/workqueue/v2"
)

// consumer 函数是一个消费者函数,它从队列中获取元素并处理它们
// The consumer function is a consumer function that gets elements from the queue and processes them
func consumer(queue wkq.Queue, wg *sync.WaitGroup) {
	// 当函数返回时,调用 wg.Done() 来通知 WaitGroup 一个任务已经完成
	// When the function returns, call wg.Done() to notify the WaitGroup that a task has been completed
	defer wg.Done()

	// 无限循环,直到函数返回
	// Infinite loop until the function returns
	for {
		// 从队列中获取一个元素
		// Get an element from the queue
		element, err := queue.Get()

		// 如果获取元素时发生错误,则处理错误
		// If an error occurs when getting the element, handle the error
		if err != nil {
			// 如果错误不是因为队列为空,则打印错误并返回
			// If the error is not because the queue is empty, print the error and return
			if !errors.Is(err, wkq.ErrQueueIsEmpty) {
				fmt.Println(err)
				return
			} else {
				// 如果错误是因为队列为空,则继续循环
				// If the error is because the queue is empty, continue the loop
				continue
			}
		}

		// 打印获取到的元素
		// Print the obtained element
		fmt.Println("> get element:", element)

		// 标记元素为已处理,'Done' 是在 'Get' 之后必需的
		// Mark the element as done, 'Done' is required after 'Get'
		queue.Done(element)
	}
}

func main() {
	// 创建一个 WaitGroup,用于等待所有的 goroutine 完成
	// Create a WaitGroup to wait for all goroutines to complete
	wg := sync.WaitGroup{}

	// 创建一个新的队列
	// Create a new queue
	queue := wkq.NewPriorityQueue(nil)

	// 增加 WaitGroup 的计数器
	// Increase the counter of the WaitGroup
	wg.Add(1)

	// 启动一个新的 goroutine 来运行 consumer 函数
	// Start a new goroutine to run the consumer function
	go consumer(queue, &wg)

	// 将 "delay 1" 放入队列,并设置其优先级为 200
	// Put "delay 1" into the queue and set its priority to 200
	_ = queue.PutWithPriority("priority 1", 200)

	// 将 "delay 2" 放入队列,并设置其优先级为 100
	// Put "delay 2" into the queue and set its priority to 100
	_ = queue.PutWithPriority("priority 2", 100)

	// 将 "hello" 放入队列
	// Put "hello" into the queue
	_ = queue.Put("hello")

	// 将 "world" 放入队列
	// Put "world" into the queue
	_ = queue.Put("world")

	// 等待一秒钟,让 comsumer 有机会处理队列中的元素
	// Wait for a second to give the comsumer a chance to process the elements in the queue
	time.Sleep(time.Second)

	// 关闭队列
	// Shut down the queue
	queue.Shutdown()

	// 等待所有的 goroutine 完成
	// Wait for all goroutines to complete
	wg.Wait()
}

Result

$ go run demo.go
> get element: hello
> get element: world
> get element: priority 2
> get element: priority 1
queue is shutting down

4. RateLimiting Queue

The RateLimiting Queue is a queue that supports rate-limited execution. It is built on top of the Delaying Queue. When adding an element to the queue, you can specify the rate limit, and the element will be processed according to this rate limit.

[!TIP]

The default rate limit is based on the Nop strategy. You can define your own rate limit algorithm by implementing the Limiter interface. The project provides a token bucket algorithm as a Limiter implementation.

Config

The RateLimiting Queue inherits the configuration of the Delaying Queue.

  • WithCallback: Sets callback functions.
  • WithLimiter: Sets the rate limiter for the queue.
Methods

The RateLimiting Queue inherits the methods of the Delaying Queue. Additionally, it has the following method:

  • PutWithLimited: Adds an element to the queue. The delay time of the element is determined by the limiter.
Callback

The RateLimiting Queue inherits the callback of the Delaying Queue. Additionally, it has the following method:

  • OnLimited: Invoked when an element is added to the queue by PutWithLimited.
Example
package main

import (
	"errors"
	"fmt"
	"sync"
	"time"

	wkq "github.com/shengyanli1982/workqueue/v2"
)

// consumer 函数是一个消费者函数,它从队列中获取元素并处理它们
// The consumer function is a consumer function that gets elements from the queue and processes them
func consumer(queue wkq.Queue, wg *sync.WaitGroup) {
	// 当函数返回时,调用 wg.Done() 来通知 WaitGroup 一个任务已经完成
	// When the function returns, call wg.Done() to notify the WaitGroup that a task has been completed
	defer wg.Done()

	// 无限循环,直到函数返回
	// Infinite loop until the function returns
	for {
		// 从队列中获取一个元素
		// Get an element from the queue
		element, err := queue.Get()

		// 如果获取元素时发生错误,则处理错误
		// If an error occurs when getting the element, handle the error
		if err != nil {
			// 如果错误不是因为队列为空,则打印错误并返回
			// If the error is not because the queue is empty, print the error and return
			if !errors.Is(err, wkq.ErrQueueIsEmpty) {
				fmt.Println(err)
				return
			} else {
				// 如果错误是因为队列为空,则继续循环
				// If the error is because the queue is empty, continue the loop
				continue
			}
		}

		// 打印获取到的元素
		// Print the obtained element
		fmt.Println("> get element:", element)

		// 标记元素为已处理,'Done' 是在 'Get' 之后必需的
		// Mark the element as done, 'Done' is required after 'Get'
		queue.Done(element)
	}
}

func main() {
	// 创建一个 WaitGroup,用于等待所有的 goroutine 完成
	// Create a WaitGroup to wait for all goroutines to complete
	wg := sync.WaitGroup{}

	// 创建一个新的桶形限流器,参数为桶的容量和填充速度
	// Create a new bucket rate limiter, the parameters are the capacity of the bucket and the fill rate
	limiter := wkq.NewBucketRateLimiterImpl(5, 1)

	// 创建一个新的限流队列配置,并设置其限流器
	// Create a new rate limiting queue configuration and set its limiter
	config := wkq.NewRateLimitingQueueConfig().WithLimiter(limiter)

	// 使用配置创建一个新的限流队列
	// Create a new rate limiting queue with the configuration
	queue := wkq.NewRateLimitingQueue(config)

	// 增加 WaitGroup 的计数器
	// Increase the counter of the WaitGroup
	wg.Add(1)

	// 启动一个新的 goroutine 来运行 consumer 函数
	// Start a new goroutine to run the consumer function
	go consumer(queue, &wg)

	// 将 "delay 1" 放入队列,并设置其延迟时间为 200 毫秒
	// Put "delay 1" into the queue and set its delay time to 200 milliseconds
	_ = queue.PutWithDelay("delay 1", 200)

	// 将 "delay 2" 放入队列,并设置其延迟时间为 100 毫秒
	// Put "delay 2" into the queue and set its delay time to 100 milliseconds
	_ = queue.PutWithDelay("delay 2", 100)

	// 将 "hello" 放入队列
	// Put "hello" into the queue
	_ = queue.Put("hello")

	// 将 "world" 放入队列
	// Put "world" into the queue
	_ = queue.Put("world")

	// 将 "limited" 放入队列, 触发限流
	// Put "limited" into the queue, trigger rate limiting
	for i := 0; i < 10; i++ {
		go func(i int) {
			_ = queue.PutWithLimited(fmt.Sprintf("limited %d", i))
		}(i)
	}

	// 等待一秒钟,让 comsumer 有机会处理队列中的元素
	// Wait for a second to give the comsumer a chance to process the elements in the queue
	time.Sleep(time.Second)

	// 关闭队列
	// Shut down the queue
	queue.Shutdown()

	// 等待所有的 goroutine 完成
	// Wait for all goroutines to complete
	wg.Wait()
}

Result

$ go run demo.go
> get element: hello
> get element: world
> get element: delay 2
> get element: delay 1
> get element: limited 9
> get element: limited 6
> get element: limited 7
> get element: limited 8
> get element: limited 3
> get element: limited 2
> get element: limited 0
> get element: limited 5
> get element: limited 1
> get element: limited 4
queue is shutting down

Documentation

Index

Constants

View Source
const (
	// PRIORITY_SLOWEST 定义了最慢的优先级,值为最大的 int64
	// PRIORITY_SLOWEST defines the slowest priority, which is the maximum int64
	PRIORITY_SLOWEST = math.MaxInt64

	// PRIORITY_LOW 定义了低优先级,值为最大的 int32
	// PRIORITY_LOW defines low priority, which is the maximum int32
	PRIORITY_LOW = math.MaxInt32

	// PRIORITY_NORMAL 定义了正常优先级,值为 0
	// PRIORITY_NORMAL defines normal priority, which is 0
	PRIORITY_NORMAL = 0

	// PRIORITY_HIGH 定义了高优先级,值为最小的 int32
	// PRIORITY_HIGH defines high priority, which is the minimum int32
	PRIORITY_HIGH = math.MinInt32

	// PRIORITY_FASTEST 定义了最快的优先级,值为最小的 int64
	// PRIORITY_FASTEST defines the fastest priority, which is the minimum int64
	PRIORITY_FASTEST = math.MinInt64
)

Variables

View Source
var ErrElementAlreadyExist = errors.New("element already exist")

ErrElementAlreadyExist 是一个错误,表示元素已经存在。 ErrElementAlreadyExist is an error indicating that the element already exists.

View Source
var ErrElementIsNil = errors.New("element is nil")

ErrElementIsNil 是一个错误,表示元素为 nil。 ErrElementIsNil is an error indicating that the element is nil.

View Source
var ErrQueueIsClosed = errors.New("queue is shutting down")

ErrQueueIsClosed 是一个错误,表示队列正在关闭。 ErrQueueIsClosed is an error indicating that the queue is shutting down.

View Source
var ErrQueueIsEmpty = errors.New("queue is empty")

ErrQueueIsEmpty 是一个错误,表示队列为空。 ErrQueueIsEmpty is an error indicating that the queue is empty.

Functions

func NewNopDelayingQueueCallbackImpl

func NewNopDelayingQueueCallbackImpl() *delayingQueueCallbackImpl

NewNopDelayingQueueCallbackImpl 函数创建并返回一个新的 DelayingQueueCallbackImpl 实例。 The NewNopDelayingQueueCallbackImpl function creates and returns a new instance of DelayingQueueCallbackImpl.

func NewNopPriorityQueueCallbackImpl

func NewNopPriorityQueueCallbackImpl() *priorityQueueCallbackImpl

NewNopPriorityQueueCallbackImpl 函数创建并返回一个新的 PriorityQueueCallbackImpl 实例。 The NewNopPriorityQueueCallbackImpl function creates and returns a new instance of PriorityQueueCallbackImpl.

func NewNopQueueCallbackImpl

func NewNopQueueCallbackImpl() *queueCallbackImpl

NewNopQueueCallbackImpl 函数创建并返回一个新的 QueueCallbackImpl 实例。 The NewNopQueueCallbackImpl function creates and returns a new instance of QueueCallbackImpl.

func NewNopRateLimitingQueueCallbackImpl

func NewNopRateLimitingQueueCallbackImpl() *ratelimitingQueueCallbackImpl

NewNopRateLimitingQueueCallbackImpl 函数创建并返回一个新的 RateLimitingQueueCallbackImpl 实例。 The NewNopRateLimitingQueueCallbackImpl function creates and returns a new instance of RateLimitingQueueCallbackImpl.

Types

type DelayingQueue

type DelayingQueue = interface {
	Queue

	// PutWithDelay 方法用于将元素延迟放入队列。
	// The PutWithDelay method is used to put an element into the queue with delay.
	PutWithDelay(value interface{}, delay int64) error

	// HeapRange 方法用于遍历 sorted 堆中的所有元素。
	// The HeapRange method is used to traverse all elements in the sorted heap.
	HeapRange(fn func(value interface{}, delay int64) bool)
}

DelayingQueue 接口继承了 Queue 接口,并添加了一个 PutWithDelay 方法,用于将元素延迟放入队列。 The DelayingQueue interface inherits from the Queue interface and adds a PutWithDelay method to put an element into the queue with delay.

func NewDelayingQueue

func NewDelayingQueue(config *DelayingQueueConfig) DelayingQueue

NewDelayingQueue 函数用于创建一个新的 DelayingQueue。 The NewDelayingQueue function is used to create a new DelayingQueue.

type DelayingQueueCallback

type DelayingQueueCallback = interface {
	QueueCallback

	// OnDelay 方法在元素被延迟放入队列时被调用。
	// The OnDelay method is called when an element is put into the queue with delay.
	OnDelay(value interface{}, delay int64)

	// OnPullError 方法在从队列中获取元素时出错被调用。
	// The OnPullError method is called when an error occurs while getting an element from the queue.
	OnPullError(value interface{}, reason error)
}

DelayingQueueCallback 接口继承了 QueueCallback 接口,并添加了 OnDelay 和 OnPullError 方法。 The DelayingQueueCallback interface inherits from the QueueCallback interface and adds OnDelay and OnPullError methods.

type DelayingQueueConfig

type DelayingQueueConfig struct {
	// QueueConfig 是队列的配置
	// QueueConfig is the configuration of the queue
	QueueConfig
	// contains filtered or unexported fields
}

DelayingQueueConfig 结构体,用于配置延迟队列 The DelayingQueueConfig struct, used for configuring the delaying queue

func NewDelayingQueueConfig

func NewDelayingQueueConfig() *DelayingQueueConfig

NewDelayingQueueConfig 函数用于创建一个新的 DelayingQueueConfig The NewDelayingQueueConfig function is used to create a new DelayingQueueConfig

func (*DelayingQueueConfig) WithCallback

WithCallback 方法用于设置延迟队列的回调函数 The WithCallback method is used to set the callback function of the delaying queue

type Limiter

type Limiter = interface {
	// When 方法用于获取元素应该被放入队列的时间。
	// The When method is used to get the time when the element should be put into the queue.
	When(value interface{}) time.Duration
}

Limiter 接口定义了一个限制器应该具备的基本操作。 The Limiter interface defines the basic operations that a limiter should have.

func NewBucketRateLimiterImpl

func NewBucketRateLimiterImpl(r float64, burst int64) Limiter

NewBucketRateLimiterImpl 函数创建并返回一个新的 BucketRateLimiterImpl 实例。 The NewBucketRateLimiterImpl function creates and returns a new instance of BucketRateLimiterImpl.

func NewNopRateLimiterImpl

func NewNopRateLimiterImpl() Limiter

NewNopRateLimiterImpl 函数创建并返回一个新的 NopRateLimiterImpl 实例。 The NewNopRateLimiterImpl function creates and returns a new instance of NopRateLimiterImpl.

type NewSetFunc added in v2.2.3

type NewSetFunc = func() Set

NewSetFunc 是一个函数类型,该函数返回一个 Set 实例 NewSetFunc is a function type that returns an instance of Set

type PriorityQueue

type PriorityQueue = interface {
	Queue

	// PutWithPriority 方法用于将元素按优先级放入队列。
	// The PutWithPriority method is used to put an element into the queue with priority.
	PutWithPriority(value interface{}, priority int64) error

	// HeapRange 方法用于遍历 sorted 堆中的所有元素。
	// The HeapRange method is used to traverse all elements in the sorted heap.
	HeapRange(fn func(value interface{}, delay int64) bool)
}

PriorityQueue 接口继承了 Queue 接口,并添加了一个 PutWithPriority 方法,用于将元素按优先级放入队列。 The PriorityQueue interface inherits from the Queue interface and adds a PutWithPriority method to put an element into the queue with priority.

func NewPriorityQueue

func NewPriorityQueue(config *PriorityQueueConfig) PriorityQueue

NewPriorityQueue 函数用于创建一个新的 PriorityQueue The NewPriorityQueue function is used to create a new PriorityQueue

type PriorityQueueCallback

type PriorityQueueCallback = interface {
	QueueCallback

	// OnPriority 方法在元素被按优先级放入队列时被调用。
	// The OnPriority method is called when an element is put into the queue with priority.
	OnPriority(value interface{}, priority int64)
}

PriorityQueueCallback 接口继承了 QueueCallback 接口,并添加了 OnPriority 方法。 The PriorityQueueCallback interface inherits from the QueueCallback interface and adds the OnPriority method.

type PriorityQueueConfig

type PriorityQueueConfig struct {
	// QueueConfig 是队列的配置
	// QueueConfig is the configuration of the queue
	QueueConfig
	// contains filtered or unexported fields
}

PriorityQueueConfig 结构体,用于配置优先队列 The PriorityQueueConfig struct, used for configuring the priority queue

func NewPriorityQueueConfig

func NewPriorityQueueConfig() *PriorityQueueConfig

NewPriorityQueueConfig 函数用于创建一个新的 PriorityQueueConfig The NewPriorityQueueConfig function is used to create a new PriorityQueueConfig

func (*PriorityQueueConfig) WithCallback

WithCallback 方法用于设置优先队列的回调函数 The WithCallback method is used to set the callback function of the priority queue

type Queue

type Queue = interface {
	// Put 方法用于将元素放入队列。
	// The Put method is used to put an element into the queue.
	Put(value interface{}) error

	// Get 方法用于从队列中获取元素。
	// The Get method is used to get an element from the queue.
	Get() (value interface{}, err error)

	// Done 方法用于标记元素处理完成。
	// The Done method is used to mark the element as done.
	Done(value interface{})

	// Len 方法用于获取队列的长度。
	// The Len method is used to get the length of the queue.
	Len() int

	// Values 方法用于获取队列中的所有元素。
	// The Values method is used to get all the elements in the queue.
	Values() []interface{}

	// Range 方法用于遍历队列中的所有元素。
	// The Range method is used to traverse all elements in the queue.
	Range(fn func(value interface{}) bool)

	// Shutdown 方法用于关闭队列。
	// The Shutdown method is used to shut down the queue.
	Shutdown()

	// IsClosed 方法用于检查队列是否已关闭。
	// The IsClosed method is used to check if the queue is closed.
	IsClosed() bool
}

Queue 接口定义了一个队列应该具备的基本操作。 The Queue interface defines the basic operations that a queue should have.

func NewQueue

func NewQueue(config *QueueConfig) Queue

NewQueue 函数创建并返回一个新的 QueueImpl 实例。 The NewQueue function creates and returns a new instance of QueueImpl.

type QueueCallback

type QueueCallback = interface {
	// OnPut 方法在将元素放入队列时被调用。
	// The OnPut method is called when an element is put into the queue.
	OnPut(value interface{})

	// OnGet 方法在从队列中获取元素时被调用。
	// The OnGet method is called when an element is gotten from the queue.
	OnGet(value interface{})

	// OnDone 方法在元素处理完成后被调用。
	// The OnDone method is called when the element is done processing.
	OnDone(value interface{})
}

QueueCallback 接口定义了队列回调应该具备的基本操作。 The QueueCallback interface defines the basic operations that a queue callback should have.

type QueueConfig

type QueueConfig struct {
	// contains filtered or unexported fields
}

QueueConfig 是一个结构体,用于配置队列 QueueConfig is a struct used for configuring the queue

func NewQueueConfig

func NewQueueConfig() *QueueConfig

NewQueueConfig 函数用于创建一个新的 QueueConfig 实例 The NewQueueConfig function is used to create a new instance of QueueConfig

func (*QueueConfig) WithCallback

func (c *QueueConfig) WithCallback(cb QueueCallback) *QueueConfig

WithCallback 方法用于设置队列的回调函数 The WithCallback method is used to set the callback function of the queue

func (*QueueConfig) WithSetCreator added in v2.2.3

func (c *QueueConfig) WithSetCreator(fn NewSetFunc) *QueueConfig

WithSetCreator 方法用于设置集合创建函数 The WithSetCreator method is used to set the set creation function

func (*QueueConfig) WithValueIdempotent

func (c *QueueConfig) WithValueIdempotent() *QueueConfig

WithValueIdempotent 方法用于设置队列中的元素为幂等的 The WithValueIdempotent method is used to set the elements in the queue to be idempotent

type RateLimitingQueue

type RateLimitingQueue = interface {
	DelayingQueue

	// PutWithLimited 方法用于将元素按速率限制放入队列。
	// The PutWithLimited method is used to put an element into the queue with rate limiting.
	PutWithLimited(value interface{}) error
}

RateLimitingQueue 接口继承了 DelayingQueue 接口,并添加了一个 PutWithLimited 方法,用于将元素按速率限制放入队列。 The RateLimitingQueue interface inherits from the DelayingQueue interface and adds a PutWithLimited method to put an element into the queue with rate limiting.

func NewRateLimitingQueue

func NewRateLimitingQueue(config *RateLimitingQueueConfig) RateLimitingQueue

NewRateLimitingQueue 函数用于创建一个新的 RateLimitingQueue The NewRateLimitingQueue function is used to create a new RateLimitingQueue

type RateLimitingQueueCallback

type RateLimitingQueueCallback = interface {
	DelayingQueueCallback

	// OnLimited 方法在元素被按速率限制放入队列时被调用。
	// The OnLimited method is called when an element is put into the queue with rate limiting.
	OnLimited(value interface{})
}

RateLimitingQueueCallback 接口继承了 DelayingQueueCallback 接口,并添加了 OnLimited 方法。 The RateLimitingQueueCallback interface inherits from the DelayingQueueCallback interface and adds the OnLimited method.

type RateLimitingQueueConfig

type RateLimitingQueueConfig struct {
	// DelayingQueueConfig 是延迟队列的配置
	// DelayingQueueConfig is the configuration of the delaying queue
	DelayingQueueConfig
	// contains filtered or unexported fields
}

RateLimitingQueueConfig 结构体,用于配置限流队列 The RateLimitingQueueConfig struct, used for configuring the rate limiting queue

func NewRateLimitingQueueConfig

func NewRateLimitingQueueConfig() *RateLimitingQueueConfig

NewRateLimitingQueueConfig 函数用于创建一个新的 RateLimitingQueueConfig The NewRateLimitingQueueConfig function is used to create a new RateLimitingQueueConfig

func (*RateLimitingQueueConfig) WithCallback

WithCallback 方法用于设置限流队列的回调函数 The WithCallback method is used to set the callback function of the rate limiting queue

func (*RateLimitingQueueConfig) WithLimiter

func (c *RateLimitingQueueConfig) WithLimiter(limiter Limiter) *RateLimitingQueueConfig

WithLimiter 方法用于设置限流器 The WithLimiter method is used to set the rate limiter

type Set added in v2.2.3

type Set = interface {
	// Add 方法用于向集合中添加一个元素
	// The Add method is used to add an element to the set
	Add(item interface{})

	// Remove 方法用于从集合中移除一个元素
	// The Remove method is used to remove an element from the set
	Remove(item interface{})

	// Contains 方法用于检查集合中是否包含一个元素,如果包含则返回 true,否则返回 false
	// The Contains method is used to check whether an element is in the set. If it is, true is returned; otherwise, false is returned
	Contains(item interface{}) bool

	// List 方法用于返回集合中所有元素的列表
	// The List method is used to return a list of all elements in the set
	List() []interface{}

	// Len 方法用于返回集合中元素的数量
	// The Len method is used to return the number of elements in the set
	Len() int

	// Cleanup 方法用于清理集合,移除所有元素
	// The Cleanup method is used to clean up the set, removing all elements
	Cleanup()
}

Set 是一个接口,定义了一组方法,用于操作集合 Set is an interface that defines a set of methods for operating on a set

Directories

Path Synopsis
examples
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL