queue

package
v0.0.0-...-b8a1a0e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 31, 2024 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Overview

*

  • 哈希索引的多队列实现

*

  • 参照github.com/Workiva/go-datastructures的PriorityQueue实现
  • 区别主要是,PriorityQueue通过优先级来决定放置到队首或队尾,但是此Queue
  • 是固定长度队列,新的数据会覆盖旧的数据,并且没有优先级比较的过程。

Index

Constants

View Source
const MAX_BATCH_GET_SIZE = 1 << 16
View Source
const MAX_QUEUE_COUNT = 16

Variables

View Source
var OverflowError = errors.New("Requested size is larger than capacity")

Functions

This section is empty.

Types

type Counter

type Counter struct {
	In          uint64 `statsd:"in,count"`
	Out         uint64 `statsd:"out,count"`
	Overwritten uint64 `statsd:"overwritten,count"`
	Pending     uint64 `statsd:"pending,gauge"`
}

type FixedMultiQueue

type FixedMultiQueue []*OverwriteQueue

func NewOverwriteQueues

func NewOverwriteQueues(module string, count uint8, queueSize int, options ...Option) FixedMultiQueue

count和queueSize要求是2的幂以避免求余计算,如果不是2的幂将会隐式转换为2的幂来构造 HashKey要求映射到count范围内,否则MultiQueue只会取低比特位

func (FixedMultiQueue) Close

func (q FixedMultiQueue) Close() error

func (FixedMultiQueue) Get

func (q FixedMultiQueue) Get(key HashKey) interface{}

func (FixedMultiQueue) Gets

func (q FixedMultiQueue) Gets(key HashKey, output []interface{}) int

func (FixedMultiQueue) Len

func (q FixedMultiQueue) Len(key HashKey) int

func (FixedMultiQueue) Put

func (q FixedMultiQueue) Put(key HashKey, items ...interface{}) error

func (FixedMultiQueue) Puts

func (q FixedMultiQueue) Puts(keys []HashKey, items []interface{}) error

type HashKey

type HashKey = uint8

type MultiQueueReader

type MultiQueueReader interface {
	Get(HashKey) interface{}
	Gets(HashKey, []interface{}) int
	Len(HashKey) int
	Close() error
}

type MultiQueueWriter

type MultiQueueWriter interface {
	Put(HashKey, ...interface{}) error
	Puts([]HashKey, []interface{}) error
	Len(HashKey) int
	Close() error
}

type Option

type Option = interface{}

type OptionFlushIndicator

type OptionFlushIndicator = time.Duration // scheduled put nil into queue

type OptionModule

type OptionModule = string

type OptionRelease

type OptionRelease = func(x interface{})

type OptionStatsOption

type OptionStatsOption = stats.Option

type OverwriteQueue

type OverwriteQueue struct {
	utils.Closable
	sync.Mutex
	// contains filtered or unexported fields
}

func NewOverwriteQueue

func NewOverwriteQueue(name string, size int, options ...Option) *OverwriteQueue

func (*OverwriteQueue) Get

func (q *OverwriteQueue) Get() interface{}

获取单个队列中的元素。当队列为空时将会阻塞等待

func (*OverwriteQueue) GetCounter

func (q *OverwriteQueue) GetCounter() interface{}

func (*OverwriteQueue) Gets

func (q *OverwriteQueue) Gets(output []interface{}) int

获取多个队列中的元素,传入的slice会被覆盖写入,队列为空时阻塞等待 写入的数量是slice的length而不是capacity

func (*OverwriteQueue) Init

func (q *OverwriteQueue) Init(name string, size int, options ...Option)

func (*OverwriteQueue) Len

func (q *OverwriteQueue) Len() int

获取队列等待处理的元素数量

func (*OverwriteQueue) Put

func (q *OverwriteQueue) Put(items ...interface{}) error

放置单个/多个元素,注意不要超过Size、不能放置空列表

type QueueReader

type QueueReader interface {
	Get() interface{}
	Gets([]interface{}) int
	Len() int
	Close() error
}

type QueueWriter

type QueueWriter interface {
	Put(...interface{}) error
	Len() int
	Close() error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL