Documentation ¶
Overview ¶
package bigqueue implements is pure Golang implementation for big, fast and persistent queue based on memory mapped file.
- @Author: Malin Xie
- @Description:
- @Date: 2020-08-28 20:06:46
Index ¶
- Constants
- Variables
- func Assert(condition bool, message string, v ...interface{})
- func BytesToInt(b []byte) int64
- func BytesToInt32(b []byte) int32
- func GetFileName(prefix string, suffix string, index int64) string
- func GetFiles(pathname string) (*list.List, error)
- func IntToBytes(n int64) []byte
- func Mod(val int64, bits int) int64
- func PathExists(path string) (bool, error)
- func Printstack()
- func RemoveFiles(pathname string) error
- func Warn(v ...interface{})
- func Warnf(msg string, v ...interface{})
- type DB
- type DBFactory
- type FanOutQueue
- type FileFanoutQueue
- func (q *FileFanoutQueue) Close()
- func (q *FileFanoutQueue) Dequeue(fanoutID int64) (int64, []byte, error)
- func (q *FileFanoutQueue) Enqueue(data []byte) (int64, error)
- func (q *FileFanoutQueue) FreeAllSubscribe()
- func (q *FileFanoutQueue) FreeSubscribe(fanoutID int64)
- func (q *FileFanoutQueue) IsEmpty(fanoutID int64) bool
- func (q *FileFanoutQueue) Open(dir string, queueName string, options *Options) error
- func (q *FileFanoutQueue) Peek(fanoutID int64) (int64, []byte, error)
- func (q *FileFanoutQueue) PeekAll(fanoutID int64) ([][]byte, []int64, error)
- func (q *FileFanoutQueue) PeekPagination(fanoutID int64, page, pagesize uint64) ([][]byte, []int64, error)
- func (q *FileFanoutQueue) Size(fanoutID int64) int64
- func (q *FileFanoutQueue) Skip(fanoutID int64, count int64) error
- func (q *FileFanoutQueue) Status(fanoutID int64) *QueueFilesStatus
- func (q *FileFanoutQueue) Subscribe(fanoutID int64, fn func(int64, []byte, error)) error
- type FileQueue
- func (q *FileQueue) Close() error
- func (q *FileQueue) Dequeue() (int64, []byte, error)
- func (q *FileQueue) Enqueue(data []byte) (int64, error)
- func (q *FileQueue) EnqueueAsync(data []byte, fn func(int64, error))
- func (q *FileQueue) FreeSubscribe()
- func (q *FileQueue) Gc() error
- func (q *FileQueue) IsEmpty() bool
- func (q *FileQueue) Open(dir string, queueName string, options *Options) error
- func (q *FileQueue) Peek() (int64, []byte, error)
- func (q *FileQueue) PeekAll() ([][]byte, []int64, error)
- func (q *FileQueue) PeekPagination(page, pagesize uint64) ([][]byte, []int64, error)
- func (q *FileQueue) Size() int64
- func (q *FileQueue) Skip(count int64) error
- func (q *FileQueue) Status() *QueueFilesStatus
- func (q *FileQueue) Subscribe(fn func(int64, []byte, error)) error
- type IOQueue
- type Options
- type Queue
- type QueueFileInfo
- type QueueFilesStatus
- type QueueFront
Constants ¶
const ( // DefaultDataPageSize data file size DefaultDataPageSize = 128 * 1024 * 1024 // DefaultIndexItemsPerPage items numbers in one page DefaultIndexItemsPerPage = 17 // MaxInt64 max value of int64 MaxInt64 = 0x7fffffffffffffff // IndexFileName file name IndexFileName = "index" // DataFileName file name DataFileName = "data" // MetaFileName file name MetaFileName = "meta_data" // FrontFileName file name FrontFileName = "front_index" Default_Page_Size = 10 )
const (
// FanoutFrontFileName Fanout FrontFileName file name
FanoutFrontFileName = "front_index_"
)
Variables ¶
var ( ErrEnqueueDataNull = errors.New("enqueue data can not be null") ErrIndexOutOfBoundTH = errors.New("index is valid which should between tail and head index") // SubscribeExistErr repeat call Subscriber method ErrSubscribeExistErr = errors.New("Subscriber alread set, can not repeat set") // Subscribe should call after queue Open method ErrSubscribeFailedNoOpenErr = errors.New("Subscriber method only support after queue opened") )
These errors can be returned when opening or calling methods on a DB.
var DefaultOptions = &Options{ DataPageSize: DefaultDataPageSize, indexPageSize: defaultIndexPageSize, IndexItemsPerPage: DefaultIndexItemsPerPage, itemsPerPage: defaultItemsPerPage, AutoGCBySeconds: 0, }
DefaultOptions default options
Functions ¶
func Assert ¶
Assert assert will panic with a given formatted message if the given condition is false.
func GetFileName ¶
GetFileName to return joined file name
func PathExists ¶
PathExists to check the target path is exist exist return true otherwise return false
func RemoveFiles ¶
RemoveFiles remove all files from current directory. not include any sub directories
Types ¶
type DB ¶
type DB struct { // If you want to read the entire database fast, you can set MmapFlag to // syscall.MAP_POPULATE on Linux 2.6.23+ for sequential read-ahead. MmapFlags int InitialMmapSize int // contains filtered or unexported fields }
DB represents a collection of buckets persisted to a file on disk. All data access is performed through transactions which can be obtained through the DB. All the functions on DB will return a ErrDatabaseNotOpen if accessed before Open() is called.
type DBFactory ¶
type DBFactory struct { InitialMmapSize int // contains filtered or unexported fields }
DBFactory is used to manupilate mulitple data files by index number
type FanOutQueue ¶ added in v1.2.4
type FanOutQueue interface { // Open to open target file if failed error returns Open(dir string, queueName string, options *Options) error // IsEmpty Determines whether a queue is empty //fanoutId queue index // return ture if empty, false otherwise IsEmpty(fanoutID int64) bool // Size return avaiable queue size Size(fanoutID int64) int64 // Enqueue Append an item to the queue and return index no // if any error ocurres a non-nil error returned Enqueue(data []byte) (int64, error) // EnqueueAsync Append an item to the queue async way EnqueueAsync(data []byte, fn func(int64, error)) Dequeue(fanoutID int64) (int64, []byte, error) Peek(fanoutID int64) (int64, []byte, error) // To skip deqeue target number of items Skip(fanoutID int64, count int64) error Close() error // Set to asynchous subscribe Subscribe(fanoutID int64, fn func(int64, []byte, error)) error // to free asynchous subscribe FreeSubscribe(fanoutID int64) // FreeAllSubscribe to free all asynchous subscribe FreeAllSubscribe() }
FanOutQueue queue supports with pub-sub feature
type FileFanoutQueue ¶ added in v1.2.4
type FileFanoutQueue struct {
// contains filtered or unexported fields
}
FileFanoutQueue file fanout queue implements
func (*FileFanoutQueue) Close ¶ added in v1.2.4
func (q *FileFanoutQueue) Close()
Close free the resource
func (*FileFanoutQueue) Dequeue ¶ added in v1.2.4
func (q *FileFanoutQueue) Dequeue(fanoutID int64) (int64, []byte, error)
Dequeue dequeue data from target fanoutID
func (*FileFanoutQueue) Enqueue ¶ added in v1.2.4
func (q *FileFanoutQueue) Enqueue(data []byte) (int64, error)
Enqueue Append an item to the queue and return index no
func (*FileFanoutQueue) FreeAllSubscribe ¶ added in v1.2.4
func (q *FileFanoutQueue) FreeAllSubscribe()
FreeAllSubscribe to free all subscriber
func (*FileFanoutQueue) FreeSubscribe ¶ added in v1.2.4
func (q *FileFanoutQueue) FreeSubscribe(fanoutID int64)
FreeSubscribe to free subscriber by target fanout id
func (*FileFanoutQueue) IsEmpty ¶ added in v1.2.4
func (q *FileFanoutQueue) IsEmpty(fanoutID int64) bool
IsEmpty test if target fanoutID is empty
func (*FileFanoutQueue) Open ¶ added in v1.2.4
func (q *FileFanoutQueue) Open(dir string, queueName string, options *Options) error
Open the queue files
func (*FileFanoutQueue) Peek ¶ added in v1.2.4
func (q *FileFanoutQueue) Peek(fanoutID int64) (int64, []byte, error)
Peek peek the head item from target fanoutID
func (*FileFanoutQueue) PeekAll ¶ added in v1.2.4
func (q *FileFanoutQueue) PeekAll(fanoutID int64) ([][]byte, []int64, error)
// PeekAll Retrieves all the items from the front of a queue return array of data and array of index
func (*FileFanoutQueue) PeekPagination ¶ added in v1.2.4
func (q *FileFanoutQueue) PeekPagination(fanoutID int64, page, pagesize uint64) ([][]byte, []int64, error)
PeekPagination to peek data from queue by paing feature.
func (*FileFanoutQueue) Size ¶ added in v1.2.4
func (q *FileFanoutQueue) Size(fanoutID int64) int64
Size return item size with target fanoutID
func (*FileFanoutQueue) Skip ¶ added in v1.2.4
func (q *FileFanoutQueue) Skip(fanoutID int64, count int64) error
Skip To skip deqeue target number of items
func (*FileFanoutQueue) Status ¶ added in v1.2.4
func (q *FileFanoutQueue) Status(fanoutID int64) *QueueFilesStatus
Status get status info from current queue
type FileQueue ¶
type FileQueue struct {
// contains filtered or unexported fields
}
FileQueue queue implements with mapp file
func (*FileQueue) EnqueueAsync ¶
EnqueueAsync adds an item at the queue and HeadIndex will increase Asynchouous mode will call back with fn function
func (*FileQueue) FreeSubscribe ¶ added in v1.2.4
func (q *FileQueue) FreeSubscribe()
FreeSubscribe free subscriber
func (*FileQueue) Gc ¶
Gc Delete all used data files to free disk space.
BigQueue will persist enqueued data in disk files, these data files will remain even after the data in them has been dequeued later, so your application is responsible to periodically call this method to delete all used data files and free disk space.
func (*FileQueue) Peek ¶
Peek Retrieves the item at the front of a queue if item exist return with index id, item data
func (*FileQueue) PeekAll ¶ added in v1.2.4
PeekAll Retrieves all the items from the front of a queue return array of data and array of index
func (*FileQueue) PeekPagination ¶ added in v1.2.4
PeekPagination to peek data from queue by paing feature.
func (*FileQueue) Status ¶ added in v1.2.4
func (q *FileQueue) Status() *QueueFilesStatus
Status get status info from current queue
type IOQueue ¶ added in v1.2.4
type IOQueue interface { Queue // Open queue from file io info Open(dir string, queueName string, options *Options) error }
I/O Queue inteface to define the all necessary functions
type Options ¶
type Options struct { // size in bytes of a data page DataPageSize int // the item count is 1 << IndexItemsPerPage IndexItemsPerPage int // if value > 0 then enable auto gc features and repeat process gc by the specified interval time in seconds. AutoGCBySeconds int // contains filtered or unexported fields }
Options the options struct
type Queue ¶
type Queue interface { // Determines whether a queue is empty // return ture if empty, false otherwise IsEmpty() bool // return avaiable queue size Size() int64 // Append an item to the queue and return index no // if any error ocurres a non-nil error returned Enqueue(data []byte) (int64, error) EnqueueAsync(data []byte, fn func(int64, error)) Dequeue() (int64, []byte, error) Peek() (int64, []byte, error) // To skip deqeue target number of items Skip(count int64) error Close() error // Delete all used data files to free disk space. Gc() error // Set to asynchous subscribe Subscribe(fn func(int64, []byte, error)) error // to free asynchous subscribe FreeSubscribe() }
Queue inteface to define the all necessary functions
type QueueFileInfo ¶ added in v1.2.4
queue file info
type QueueFilesStatus ¶ added in v1.2.4
type QueueFilesStatus struct { // front index of the big queue, FrontIndex int64 // head index of the array, this is the read write barrier. // readers can only read items before this index, and writes can write this index or after HeadIndex int64 // tail index of the array, // readers can't read items before this tail TailIndex int64 // head index of the data page, this is the to be appended data page index HeadDataPageIndex int64 // head offset of the data page, this is the to be appended data offset HeadDataItemOffset int64 IndexFileList []*QueueFileInfo DataFileList []*QueueFileInfo MetaFileInfo *QueueFileInfo FrontFileInfo *QueueFileInfo }
status info of queue files
type QueueFront ¶ added in v1.2.4
type QueueFront struct {
// contains filtered or unexported fields
}
QueueFront queue front struct