Documentation ¶
Overview ¶
Package queue provides a fast, ring-buffer queue based on the version suggested by Dariusz Górecki. Using this instead of other, simpler, queue implementations (slice+append or linked list) provides substantial memory and time benefits, and fewer GC pauses.
The queue implemented here is as fast as it is for an additional reason: it is *not* thread-safe.
Index ¶
- Variables
- func NewAutoScaleTaskManager[I any](ctx *TaskManagerContext[I], threshold, minThreshold int) *autoScaleTaskManager[I]
- func NewRestrictedTaskManager[I any](ctx *TaskManagerContext[I], size int) *restrictedTaskManager[I]
- func NewmonoTaskManager[I any](ctx *TaskManagerContext[I]) *monoTaskManager[I]
- func RegisterBuffer(prefix string, meter metric.Meter, buffer Buffer)
- func RegisterTaskManager[I any](prefix string, meter metric.Meter, taskManager TaskManager[I])
- type BlackHole
- type Buffer
- type Channel
- type ConfigFn
- func WithAutoScaleTaskManager[I any](minThreshold, threshold int) ConfigFn[I]
- func WithChannel[I, O any](channel Channel[I], prefix string, meter metric.Meter) ConfigFn[I]
- func WithContext[I, O any](ctx context.Context) ConfigFn[I]
- func WithMonoTaskManager[I any]() ConfigFn[I]
- func WithRestrictedTaskManager[I any](size int) ConfigFn[I]
- func WithUnbufferedChannel[I, O any](channel UnbufferedChannel[I]) ConfigFn[I]
- type DefaultQueue
- type InChannel
- type Native
- type Operation
- type OperationWithResutl
- type OutChannel
- type Pipe
- type Plug
- type Queue
- type QueueChannel
- type ResizingStrategy
- type TaskManager
- type TaskManagerContext
- type UnbufferedChannel
- type UnbufferedInChannel
- type UnbufferedOutChannel
Constants ¶
This section is empty.
Variables ¶
var ( // Eager maximizes responsiveness at the expense of higher resource usage, // which can reduce throughput under certain conditions. // This strategy is meant for worker pools that will operate at a small percentage of their capacity // most of the time and may occasionally receive bursts of tasks. It's the default strategy. Eager = func() ResizingStrategy { return RatedResizer(1) } // Balanced tries to find a balance between responsiveness and throughput. // It's suitable for general purpose worker pools or those // that will operate close to 50% of their capacity most of the time. Balanced = func() ResizingStrategy { return RatedResizer(maxProcs / 2) } // Lazy maximizes throughput at the expense of responsiveness. // This strategy is meant for worker pools that will operate close to their max. capacity most of the time. Lazy = func() ResizingStrategy { return RatedResizer(maxProcs) } )
Preset pool resizing strategies
var ( // TODO: Need to figure out a wau for channel closed logic ErrPipeHasBeanClosed = errors.New("the pipe has already benn closed") )
Errors that are used throughout the Tunny API.
Functions ¶
func NewAutoScaleTaskManager ¶
func NewAutoScaleTaskManager[I any](ctx *TaskManagerContext[I], threshold, minThreshold int) *autoScaleTaskManager[I]
func NewRestrictedTaskManager ¶
func NewRestrictedTaskManager[I any](ctx *TaskManagerContext[I], size int) *restrictedTaskManager[I]
func NewmonoTaskManager ¶
func NewmonoTaskManager[I any](ctx *TaskManagerContext[I]) *monoTaskManager[I]
func RegisterTaskManager ¶
func RegisterTaskManager[I any](prefix string, meter metric.Meter, taskManager TaskManager[I])
Types ¶
type BlackHole ¶
type BlackHole[I any] struct { // contains filtered or unexported fields }
BlackHole implements the InChannel interface and provides an analogue for the "Discard" variable in the ioutil package - it never blocks, and simply discards every value it reads. The number of items discarded in this way is counted and returned from Len.
func NewBlackHole ¶
type Channel ¶
type Channel[T any] interface { UnbufferedChannel[T] Buffer }
type ConfigFn ¶
func WithChannel ¶
func WithMonoTaskManager ¶
func WithUnbufferedChannel ¶
func WithUnbufferedChannel[I, O any](channel UnbufferedChannel[I]) ConfigFn[I]
Will be blocking channels
type DefaultQueue ¶
type DefaultQueue[T any] struct { // contains filtered or unexported fields }
Queue represents a single instance of the queue data structure.
func NewDefaultQueue ¶
func NewDefaultQueue[T any]() *DefaultQueue[T]
New constructs and returns a new Queue.
func NewDefaultQueueWithSize ¶
func NewDefaultQueueWithSize[T any](size int) *DefaultQueue[T]
func (*DefaultQueue[T]) Add ¶
func (q *DefaultQueue[T]) Add(elem T)
Add puts an element on the end of the queue.
func (*DefaultQueue[T]) Get ¶
func (q *DefaultQueue[T]) Get(i int) T
Get returns the element at index i in the queue. If the index is invalid, the call will panic. This method accepts both positive and negative index values. Index 0 refers to the first element, and index -1 refers to the last.
func (*DefaultQueue[T]) Length ¶
func (q *DefaultQueue[T]) Length() int
Length returns the number of elements currently stored in the queue.
func (*DefaultQueue[T]) Peek ¶
func (q *DefaultQueue[T]) Peek() T
Peek returns the element at the head of the queue. This call panics if the queue is empty.
func (*DefaultQueue[T]) Poll ¶
func (q *DefaultQueue[T]) Poll() T
Remove removes and returns the element from the front of the queue. If the queue is empty, the call will panic.
type InChannel ¶
type InChannel[T any] interface { UnbufferedInChannel[T] Buffer }
type Native ¶
type Native[T any] chan T
Native implements the Channel interface by wrapping a native go channel.
type OperationWithResutl ¶
type OutChannel ¶
type OutChannel[T any] interface { UnbufferedOutChannel[T] Buffer }
type Pipe ¶
type Pipe[I, O any] struct { Close context.CancelFunc Returns chan<- O UnbufferedInChannel[I] TaskManager[I] // contains filtered or unexported fields }
func NewPipe ¶
func NewPipe[I, O any](operation OperationWithResutl[I, O], configs ...ConfigFn[I]) *Pipe[I, O]
func (*Pipe[I, O]) InTo ¶
func (p *Pipe[I, O]) InTo(other UnbufferedInChannel[O]) UnbufferedInChannel[O]
type Plug ¶
type Plug[I any] struct { Close context.CancelFunc UnbufferedInChannel[I] TaskManager[I] // contains filtered or unexported fields }
type Queue ¶
type Queue[T any] interface { // Length returns the number of elements currently stored in the queue. Length() int // Peek returns the element at the head of the queue. will retirn nil // if the queue is empty. Peek() T // Poll removes and returns the element from the front of the queue. If the // queue is empty, the retirn nil. Poll() T // Add puts an element on the end of the queue. Add(T) }
type QueueChannel ¶
type QueueChannel[T any] struct { // contains filtered or unexported fields }
QueueChannel implements the Channel interface with an infinite buffer between the input and the output.
func NewQueueChannel ¶
func NewQueueChannel[T any](buffer Queue[T]) *QueueChannel[T]
func NewQueueChannelWithInputSize ¶
func NewQueueChannelWithInputSize[T any](buffer Queue[T], size int) *QueueChannel[T]
func (*QueueChannel[T]) Cap ¶
func (ch *QueueChannel[T]) Cap() int
func (*QueueChannel[T]) Close ¶
func (ch *QueueChannel[T]) Close()
func (*QueueChannel[T]) In ¶
func (ch *QueueChannel[T]) In() chan T
func (*QueueChannel[T]) Len ¶
func (ch *QueueChannel[T]) Len() int
func (*QueueChannel[T]) Out ¶
func (ch *QueueChannel[T]) Out() <-chan T
type ResizingStrategy ¶
func RatedResizer ¶
func RatedResizer(rate int) ResizingStrategy
RatedResizer creates a resizing strategy which can be configured to create workers at a specific rate when the pool has no idle workers. rate: determines the number of tasks to receive before creating an extra worker. A value of 3 can be interpreted as: "Create a new worker every 3 tasks".
type TaskManagerContext ¶
type TaskManagerContext[I any] struct { Operation Operation[I] context.Context UnbufferedChannel[I] }
type UnbufferedChannel ¶
type UnbufferedChannel[T any] interface { UnbufferedInChannel[T] UnbufferedOutChannel[T] }
type UnbufferedInChannel ¶
type UnbufferedInChannel[T any] interface { In() chan T // The writeable end of the channel. Close() // Closes the channel. It is an error to write to In() after calling Close(). }
type UnbufferedOutChannel ¶
type UnbufferedOutChannel[T any] interface { Out() <-chan T // The readable end of the channel. }