Documentation ¶
Overview ¶
Package queue implements a specialized concurrent queue system for Gitea.
Terminology:
Item: - An item can be a simple value, such as an integer, or a more complex structure that has multiple fields. Usually a item serves as a task or a message. Sets of items will be sent to a queue handler to be processed. - It's represented as a JSON-marshaled binary slice in the queue
Batch: - A collection of items that are grouped together for processing. Each worker receives a batch of items.
Worker: - Individual unit of execution designed to process items from the queue. It's a goroutine that calls the Handler. - Workers will get new items through a channel (WorkerPoolQueue is responsible for the distribution). - Workers operate in parallel. The default value of max workers is determined by the setting system.
Handler (represented by HandlerFuncT type): - It's the function responsible for processing items. Each active worker will call it. - If an item or some items are not psuccessfully rocessed, the handler could return them as "unhandled items". In such scenarios, the queue system ensures these unhandled items are returned to the base queue after a brief delay. This mechanism is particularly beneficial in cases where the processing entity (like a document indexer) is temporarily unavailable. It ensures that no item is skipped or lost due to transient failures in the processing mechanism.
Base queue: - Represents the underlying storage mechanism for the queue. There are several implementations: - Channel: Uses Go's native channel constructs to manage the queue, suitable for in-memory queuing. - LevelDB: Especially useful in persistent queues for single instances. - Redis: Suitable for clusters, where we may have multiple nodes. - Dummy: This is special, it's not a real queue, it's a immediate no-op queue, which is useful for tests. - They all have the same abstraction, the same interface, and they are tested by the same testing code.
6. WorkerPoolQueue:
- It's responsible to glue all together, using the "base queue" to provide "worker pool" functionality. It creates new workers if needed and can flush the queue, running all the items synchronously till it finishes.
- Its "Push" function doesn't block forever, it will return an error if the queue is full after the timeout.
7. Manager:
- The purpose of it is to serve as a centralized manager for multiple WorkerPoolQueue instances. Whenever we want to create a new queue, flush, or get a specific queue, we could use it.
A queue can be "simple" or "unique". A unique queue will try to avoid duplicate items. Unique queue's "Has" function can be used to check whether an item is already in the queue, although it's not 100% reliable due to the lack of proper transaction support. Simple queue's "Has" function always returns "has=false".
A WorkerPoolQueue is a generic struct; this means it will work with any type but just for that type. If you want another kind of items to run, you would have to call the manager to create a new WorkerPoolQueue for you with a different handler that works with this new type of item. As an example of this:
func Init() error { itemQueue = queue.CreateSimpleQueue(graceful.GetManager().ShutdownContext(), "queue-name", handler) ... } func handler(items ...*mypkg.QueueItem) []*mypkg.QueueItem { ... }
Index ¶
- Variables
- type BaseConfig
- type HandlerFuncT
- type ManagedWorkerPoolQueue
- type Manager
- type WorkerPoolQueue
- func CreateSimpleQueue[T any](ctx context.Context, name string, handler HandlerFuncT[T]) *WorkerPoolQueue[T]
- func CreateUniqueQueue[T any](ctx context.Context, name string, handler HandlerFuncT[T]) *WorkerPoolQueue[T]
- func NewWorkerPoolQueueWithContext[T any](ctx context.Context, name string, queueSetting setting.QueueSettings, ...) (*WorkerPoolQueue[T], error)
- func (q *WorkerPoolQueue[T]) Cancel()
- func (q *WorkerPoolQueue[T]) FlushWithContext(ctx context.Context, timeout time.Duration) (err error)
- func (q *WorkerPoolQueue[T]) GetItemTypeName() string
- func (q *WorkerPoolQueue[T]) GetName() string
- func (q *WorkerPoolQueue[T]) GetQueueItemNumber() int
- func (q *WorkerPoolQueue[T]) GetType() string
- func (q *WorkerPoolQueue[T]) GetWorkerActiveNumber() int
- func (q *WorkerPoolQueue[T]) GetWorkerMaxNumber() int
- func (q *WorkerPoolQueue[T]) GetWorkerNumber() int
- func (q *WorkerPoolQueue[T]) Has(data T) (bool, error)
- func (q *WorkerPoolQueue[T]) Push(data T) error
- func (q *WorkerPoolQueue[T]) RemoveAllItems(ctx context.Context) error
- func (q *WorkerPoolQueue[T]) Run()
- func (q *WorkerPoolQueue[T]) SetWorkerMaxNumber(num int)
- func (q *WorkerPoolQueue[T]) ShutdownWait(timeout time.Duration)
Constants ¶
This section is empty.
Variables ¶
var ErrAlreadyInQueue = util.NewAlreadyExistErrorf("already in queue")
Functions ¶
This section is empty.
Types ¶
type BaseConfig ¶ added in v1.20.0
type HandlerFuncT ¶ added in v1.20.0
type HandlerFuncT[T any] func(...T) (unhandled []T)
type ManagedWorkerPoolQueue ¶ added in v1.20.0
type ManagedWorkerPoolQueue interface { GetName() string GetType() string GetItemTypeName() string GetWorkerNumber() int GetWorkerActiveNumber() int GetWorkerMaxNumber() int SetWorkerMaxNumber(num int) GetQueueItemNumber() int // FlushWithContext tries to make the handler process all items in the queue synchronously. // It is for testing purpose only. It's not designed to be used in a cluster. FlushWithContext(ctx context.Context, timeout time.Duration) error // RemoveAllItems removes all items in the base queue (on-the-fly items are not affected) RemoveAllItems(ctx context.Context) error }
type Manager ¶
type Manager struct { Queues map[int64]ManagedWorkerPoolQueue // contains filtered or unexported fields }
Manager is a manager for the queues created by "CreateXxxQueue" functions, these queues are called "managed queues".
func GetManager ¶
func GetManager() *Manager
func (*Manager) AddManagedQueue ¶ added in v1.20.0
func (m *Manager) AddManagedQueue(managed ManagedWorkerPoolQueue)
func (*Manager) FlushAll ¶ added in v1.12.0
FlushAll tries to make all managed queues process all items synchronously, until timeout or the queue is empty. It is for testing purpose only. It's not designed to be used in a cluster.
func (*Manager) GetManagedQueue ¶
func (m *Manager) GetManagedQueue(qid int64) ManagedWorkerPoolQueue
func (*Manager) ManagedQueues ¶
func (m *Manager) ManagedQueues() map[int64]ManagedWorkerPoolQueue
type WorkerPoolQueue ¶ added in v1.20.0
type WorkerPoolQueue[T any] struct { // contains filtered or unexported fields }
WorkerPoolQueue is a queue that uses a pool of workers to process items It can use different underlying (base) queue types
func CreateSimpleQueue ¶ added in v1.20.0
func CreateSimpleQueue[T any](ctx context.Context, name string, handler HandlerFuncT[T]) *WorkerPoolQueue[T]
CreateSimpleQueue creates a simple queue from global setting config provider by name
func CreateUniqueQueue ¶ added in v1.12.0
func CreateUniqueQueue[T any](ctx context.Context, name string, handler HandlerFuncT[T]) *WorkerPoolQueue[T]
CreateUniqueQueue creates a unique queue from global setting config provider by name
func NewWorkerPoolQueueWithContext ¶ added in v1.20.0
func NewWorkerPoolQueueWithContext[T any](ctx context.Context, name string, queueSetting setting.QueueSettings, handler HandlerFuncT[T], unique bool) (*WorkerPoolQueue[T], error)
func (*WorkerPoolQueue[T]) Cancel ¶ added in v1.20.0
func (q *WorkerPoolQueue[T]) Cancel()
func (*WorkerPoolQueue[T]) FlushWithContext ¶ added in v1.20.0
func (*WorkerPoolQueue[T]) GetItemTypeName ¶ added in v1.20.0
func (q *WorkerPoolQueue[T]) GetItemTypeName() string
func (*WorkerPoolQueue[T]) GetName ¶ added in v1.20.0
func (q *WorkerPoolQueue[T]) GetName() string
func (*WorkerPoolQueue[T]) GetQueueItemNumber ¶ added in v1.20.0
func (q *WorkerPoolQueue[T]) GetQueueItemNumber() int
func (*WorkerPoolQueue[T]) GetType ¶ added in v1.20.0
func (q *WorkerPoolQueue[T]) GetType() string
func (*WorkerPoolQueue[T]) GetWorkerActiveNumber ¶ added in v1.20.0
func (q *WorkerPoolQueue[T]) GetWorkerActiveNumber() int
func (*WorkerPoolQueue[T]) GetWorkerMaxNumber ¶ added in v1.20.0
func (q *WorkerPoolQueue[T]) GetWorkerMaxNumber() int
func (*WorkerPoolQueue[T]) GetWorkerNumber ¶ added in v1.20.0
func (q *WorkerPoolQueue[T]) GetWorkerNumber() int
func (*WorkerPoolQueue[T]) Has ¶ added in v1.20.0
func (q *WorkerPoolQueue[T]) Has(data T) (bool, error)
Has only works for unique queues. Keep in mind that this check may not be reliable (due to lacking of proper transaction support) There could be a small chance that duplicate items appear in the queue
func (*WorkerPoolQueue[T]) Push ¶ added in v1.20.0
func (q *WorkerPoolQueue[T]) Push(data T) error
Push adds an item to the queue, it may block for a while and then returns an error if the queue is full
func (*WorkerPoolQueue[T]) RemoveAllItems ¶ added in v1.20.0
func (q *WorkerPoolQueue[T]) RemoveAllItems(ctx context.Context) error
RemoveAllItems removes all items in the baes queue
func (*WorkerPoolQueue[T]) Run ¶ added in v1.20.0
func (q *WorkerPoolQueue[T]) Run()
func (*WorkerPoolQueue[T]) SetWorkerMaxNumber ¶ added in v1.20.0
func (q *WorkerPoolQueue[T]) SetWorkerMaxNumber(num int)
func (*WorkerPoolQueue[T]) ShutdownWait ¶ added in v1.20.0
func (q *WorkerPoolQueue[T]) ShutdownWait(timeout time.Duration)
ShutdownWait shuts down the queue, waits for all workers to finish their jobs, and pushes the unhandled items back to the base queue It waits for all workers (handlers) to finish their jobs, in case some buggy handlers would hang forever, a reasonable timeout is needed