Documentation ¶
Overview ¶
Package queue implements a specialized queue system for Gitea.
There are two major kinds of concepts:
* The "base queue": channel, level, redis:
- They have the same abstraction, the same interface, and they are tested by the same testing code.
- The dummy(immediate) queue is special, it's not a real queue, it's only used as a no-op queue or a testing queue.
* The WorkerPoolQueue: it uses the "base queue" to provide "worker pool" function.
- It calls the "handler" to process the data in the base queue.
- Its "Push" function doesn't block forever, it will return an error if the queue is full after the timeout.
A queue can be "simple" or "unique". A unique queue will try to avoid duplicate items. Unique queue's "Has" function can be used to check whether an item is already in the queue, although it's not 100% reliable due to there is no proper transaction support. Simple queue's "Has" function always returns "has=false".
The HandlerFuncT function is called by the WorkerPoolQueue to process the data in the base queue. If the handler returns "unhandled" items, they will be re-queued to the base queue after a slight delay, in case the item processor (eg: document indexer) is not available.
Index ¶
- Variables
- type BaseConfig
- type HandlerFuncT
- type ManagedWorkerPoolQueue
- type Manager
- type WorkerPoolQueue
- func CreateSimpleQueue[T any](name string, handler HandlerFuncT[T]) *WorkerPoolQueue[T]
- func CreateUniqueQueue[T any](name string, handler HandlerFuncT[T]) *WorkerPoolQueue[T]
- func NewWorkerPoolQueueBySetting[T any](name string, queueSetting setting.QueueSettings, handler HandlerFuncT[T], ...) (*WorkerPoolQueue[T], error)
- func (q *WorkerPoolQueue[T]) FlushWithContext(ctx context.Context, timeout time.Duration) (err error)
- func (q *WorkerPoolQueue[T]) GetItemTypeName() string
- func (q *WorkerPoolQueue[T]) GetName() string
- func (q *WorkerPoolQueue[T]) GetQueueItemNumber() int
- func (q *WorkerPoolQueue[T]) GetType() string
- func (q *WorkerPoolQueue[T]) GetWorkerActiveNumber() int
- func (q *WorkerPoolQueue[T]) GetWorkerMaxNumber() int
- func (q *WorkerPoolQueue[T]) GetWorkerNumber() int
- func (q *WorkerPoolQueue[T]) Has(data T) (bool, error)
- func (q *WorkerPoolQueue[T]) Push(data T) error
- func (q *WorkerPoolQueue[T]) RemoveAllItems(ctx context.Context) error
- func (q *WorkerPoolQueue[T]) Run(atShutdown, atTerminate func(func()))
- func (q *WorkerPoolQueue[T]) SetWorkerMaxNumber(num int)
- func (q *WorkerPoolQueue[T]) ShutdownWait(timeout time.Duration)
Constants ¶
This section is empty.
Variables ¶
var ErrAlreadyInQueue = util.NewAlreadyExistErrorf("already in queue")
Functions ¶
This section is empty.
Types ¶
type BaseConfig ¶
type HandlerFuncT ¶
type HandlerFuncT[T any] func(...T) (unhandled []T)
type ManagedWorkerPoolQueue ¶
type ManagedWorkerPoolQueue interface { GetName() string GetType() string GetItemTypeName() string GetWorkerNumber() int GetWorkerActiveNumber() int GetWorkerMaxNumber() int SetWorkerMaxNumber(num int) GetQueueItemNumber() int // FlushWithContext tries to make the handler process all items in the queue synchronously. // It is for testing purpose only. It's not designed to be used in a cluster. FlushWithContext(ctx context.Context, timeout time.Duration) error // RemoveAllItems removes all items in the base queue (on-the-fly items are not affected) RemoveAllItems(ctx context.Context) error }
type Manager ¶
type Manager struct { Queues map[int64]ManagedWorkerPoolQueue // contains filtered or unexported fields }
Manager is a manager for the queues created by "CreateXxxQueue" functions, these queues are called "managed queues".
func GetManager ¶
func GetManager() *Manager
func (*Manager) AddManagedQueue ¶
func (m *Manager) AddManagedQueue(managed ManagedWorkerPoolQueue)
func (*Manager) FlushAll ¶
FlushAll tries to make all managed queues process all items synchronously, until timeout or the queue is empty. It is for testing purpose only. It's not designed to be used in a cluster.
func (*Manager) GetManagedQueue ¶
func (m *Manager) GetManagedQueue(qid int64) ManagedWorkerPoolQueue
func (*Manager) ManagedQueues ¶
func (m *Manager) ManagedQueues() map[int64]ManagedWorkerPoolQueue
type WorkerPoolQueue ¶
type WorkerPoolQueue[T any] struct { // contains filtered or unexported fields }
WorkerPoolQueue is a queue that uses a pool of workers to process items It can use different underlying (base) queue types
func CreateSimpleQueue ¶
func CreateSimpleQueue[T any](name string, handler HandlerFuncT[T]) *WorkerPoolQueue[T]
CreateSimpleQueue creates a simple queue from global setting config provider by name
func CreateUniqueQueue ¶
func CreateUniqueQueue[T any](name string, handler HandlerFuncT[T]) *WorkerPoolQueue[T]
CreateUniqueQueue creates a unique queue from global setting config provider by name
func NewWorkerPoolQueueBySetting ¶
func NewWorkerPoolQueueBySetting[T any](name string, queueSetting setting.QueueSettings, handler HandlerFuncT[T], unique bool) (*WorkerPoolQueue[T], error)
func (*WorkerPoolQueue[T]) FlushWithContext ¶
func (*WorkerPoolQueue[T]) GetItemTypeName ¶
func (q *WorkerPoolQueue[T]) GetItemTypeName() string
func (*WorkerPoolQueue[T]) GetName ¶
func (q *WorkerPoolQueue[T]) GetName() string
func (*WorkerPoolQueue[T]) GetQueueItemNumber ¶
func (q *WorkerPoolQueue[T]) GetQueueItemNumber() int
func (*WorkerPoolQueue[T]) GetType ¶
func (q *WorkerPoolQueue[T]) GetType() string
func (*WorkerPoolQueue[T]) GetWorkerActiveNumber ¶
func (q *WorkerPoolQueue[T]) GetWorkerActiveNumber() int
func (*WorkerPoolQueue[T]) GetWorkerMaxNumber ¶
func (q *WorkerPoolQueue[T]) GetWorkerMaxNumber() int
func (*WorkerPoolQueue[T]) GetWorkerNumber ¶
func (q *WorkerPoolQueue[T]) GetWorkerNumber() int
func (*WorkerPoolQueue[T]) Has ¶
func (q *WorkerPoolQueue[T]) Has(data T) (bool, error)
Has only works for unique queues. Keep in mind that this check may not be reliable (due to lacking of proper transaction support) There could be a small chance that duplicate items appear in the queue
func (*WorkerPoolQueue[T]) Push ¶
func (q *WorkerPoolQueue[T]) Push(data T) error
Push adds an item to the queue, it may block for a while and then returns an error if the queue is full
func (*WorkerPoolQueue[T]) RemoveAllItems ¶
func (q *WorkerPoolQueue[T]) RemoveAllItems(ctx context.Context) error
RemoveAllItems removes all items in the baes queue
func (*WorkerPoolQueue[T]) Run ¶
func (q *WorkerPoolQueue[T]) Run(atShutdown, atTerminate func(func()))
func (*WorkerPoolQueue[T]) SetWorkerMaxNumber ¶
func (q *WorkerPoolQueue[T]) SetWorkerMaxNumber(num int)
func (*WorkerPoolQueue[T]) ShutdownWait ¶
func (q *WorkerPoolQueue[T]) ShutdownWait(timeout time.Duration)
ShutdownWait shuts down the queue, waits for all workers to finish their jobs, and pushes the unhandled items back to the base queue It waits for all workers (handlers) to finish their jobs, in case some buggy handlers would hang forever, a reasonable timeout is needed