Documentation ¶
Index ¶
- type BoundedQueue
- func (q *BoundedQueue) Capacity() int
- func (q *BoundedQueue) Produce(item interface{}) bool
- func (q *BoundedQueue) Resize(capacity int) bool
- func (q *BoundedQueue) Size() int
- func (q *BoundedQueue) StartConsumers(num int, consumer func(item interface{}))
- func (q *BoundedQueue) StartLengthReporting(reportPeriod time.Duration, gauge metrics.Gauge)
- func (q *BoundedQueue) Stop()
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BoundedQueue ¶
type BoundedQueue struct {
// contains filtered or unexported fields
}
BoundedQueue implements a producer-consumer exchange similar to a ring buffer queue, where the queue is bounded and if it fills up due to slow consumers, the new items written by the producer force the earliest items to be dropped. The implementation is actually based on channels, with a special Reaper goroutine that wakes up when the queue is full and consumers the items from the top of the queue until its size drops back to maxSize
func NewBoundedQueue ¶
func NewBoundedQueue(capacity int, onDroppedItem func(item interface{})) *BoundedQueue
NewBoundedQueue constructs the new queue of specified capacity, and with an optional callback for dropped items (e.g. useful to emit metrics).
func (*BoundedQueue) Capacity ¶
func (q *BoundedQueue) Capacity() int
Capacity returns capacity of the queue
func (*BoundedQueue) Produce ¶
func (q *BoundedQueue) Produce(item interface{}) bool
Produce is used by the producer to submit new item to the queue. Returns false in case of queue overflow.
func (*BoundedQueue) Resize ¶
func (q *BoundedQueue) Resize(capacity int) bool
Resize changes the capacity of the queue, returning whether the action was successful
func (*BoundedQueue) Size ¶
func (q *BoundedQueue) Size() int
Size returns the current size of the queue
func (*BoundedQueue) StartConsumers ¶
func (q *BoundedQueue) StartConsumers(num int, consumer func(item interface{}))
StartConsumers starts a given number of goroutines consuming items from the queue and passing them into the consumer callback.
func (*BoundedQueue) StartLengthReporting ¶
func (q *BoundedQueue) StartLengthReporting(reportPeriod time.Duration, gauge metrics.Gauge)
StartLengthReporting starts a timer-based goroutine that periodically reports current queue length to a given metrics gauge.
func (*BoundedQueue) Stop ¶
func (q *BoundedQueue) Stop()
Stop stops all consumers, as well as the length reporter if started, and releases the items channel. It blocks until all consumers have stopped.