Documentation ¶
Overview ¶
package bigqueue implements is pure Golang implementation for big, fast and persistent queue based on memory mapped file.
- @Author: Malin Xie
- @Description:
- @Date: 2020-08-28 20:06:46
Index ¶
- Constants
- Variables
- func Assert(condition bool, message string, v ...interface{})
- func BytesToInt(b []byte) int64
- func BytesToInt32(b []byte) int32
- func GetFileName(prefix string, suffix string, index int64) string
- func GetFiles(pathname string) (*list.List, error)
- func IntToBytes(n int64) []byte
- func Mod(val int64, bits int) int64
- func PathExists(path string) (bool, error)
- func Printstack()
- func RemoveFiles(pathname string) error
- func Warn(v ...interface{})
- func Warnf(msg string, v ...interface{})
- type DB
- type DBFactory
- type FanOutQueue
- type FileFanoutQueue
- func (q *FileFanoutQueue) Close()
- func (q *FileFanoutQueue) Dequeue(fanoutID int64) (int64, []byte, error)
- func (q *FileFanoutQueue) Enqueue(data []byte) (int64, error)
- func (q *FileFanoutQueue) FreeAllSubscribe()
- func (q *FileFanoutQueue) FreeSubscribe(fanoutID int64)
- func (q *FileFanoutQueue) IsEmpty(fanoutID int64) bool
- func (q *FileFanoutQueue) Open(dir string, queueName string, options *Options) error
- func (q *FileFanoutQueue) Peek(fanoutID int64) (int64, []byte, error)
- func (q *FileFanoutQueue) PeekAll(fanoutID int64) ([][]byte, []int64, error)
- func (q *FileFanoutQueue) PeekPagination(fanoutID int64, page, pagesize uint64) ([][]byte, []int64, error)
- func (q *FileFanoutQueue) Size(fanoutID int64) int64
- func (q *FileFanoutQueue) Skip(fanoutID int64, count int64) error
- func (q *FileFanoutQueue) Status(fanoutID int64) *QueueFilesStatus
- func (q *FileFanoutQueue) Subscribe(fanoutID int64, fn func(int64, []byte, error)) error
- type FileQueue
- func (q *FileQueue) Close() error
- func (q *FileQueue) Dequeue() (int64, []byte, error)
- func (q *FileQueue) Enqueue(data []byte) (int64, error)
- func (q *FileQueue) EnqueueAsync(data []byte, fn func(int64, error))
- func (q *FileQueue) FreeSubscribe()
- func (q *FileQueue) Gc() error
- func (q *FileQueue) IsEmpty() bool
- func (q *FileQueue) Open(dir string, queueName string, options *Options) error
- func (q *FileQueue) Peek() (int64, []byte, error)
- func (q *FileQueue) PeekAll() ([][]byte, []int64, error)
- func (q *FileQueue) PeekPagination(page, pagesize uint64) ([][]byte, []int64, error)
- func (q *FileQueue) Size() int64
- func (q *FileQueue) Skip(count int64) error
- func (q *FileQueue) Status() *QueueFilesStatus
- func (q *FileQueue) Subscribe(fn func(int64, []byte, error)) error
- type IOQueue
- type Options
- type Queue
- type QueueFileInfo
- type QueueFilesStatus
- type QueueFront
- type RemoteQueue
Constants ¶
const ( // DefaultDataPageSize data file size DefaultDataPageSize = 128 * 1024 * 1024 // DefaultIndexItemsPerPage items numbers in one page DefaultIndexItemsPerPage = 17 // MaxInt64 max value of int64 MaxInt64 = 0x7fffffffffffffff // IndexFileName file name IndexFileName = "index" // DataFileName file name DataFileName = "data" // MetaFileName file name MetaFileName = "meta_data" // FrontFileName file name FrontFileName = "front_index" Default_Page_Size = 10 )
const (
// FanoutFrontFileName Fanout FrontFileName file name
FanoutFrontFileName = "front_index_"
)
Variables ¶
var ( ErrEnqueueDataNull = errors.New("enqueue data can not be null") ErrIndexOutOfBoundTH = errors.New("index is valid which should between tail and head index") // SubscribeExistErr repeat call Subscriber method ErrSubscribeExistErr = errors.New("Subscriber alread set, can not repeat set") // Subscribe should call after queue Open method ErrSubscribeFailedNoOpenErr = errors.New("Subscriber method only support after queue opened") )
These errors can be returned when opening or calling methods on a DB.
var DefaultOptions = &Options{ DataPageSize: DefaultDataPageSize, indexPageSize: defaultIndexPageSize, IndexItemsPerPage: DefaultIndexItemsPerPage, itemsPerPage: defaultItemsPerPage, AutoGCBySeconds: 0, }
DefaultOptions default options
Functions ¶
func Assert ¶
Assert assert will panic with a given formatted message if the given condition is false.
func GetFileName ¶
GetFileName to return joined file name
func PathExists ¶
PathExists to check the target path is exist exist return true otherwise return false
func RemoveFiles ¶
RemoveFiles remove all files from current directory. not include any sub directories
Types ¶
type DB ¶
type DB struct { // If you want to read the entire database fast, you can set MmapFlag to // syscall.MAP_POPULATE on Linux 2.6.23+ for sequential read-ahead. MmapFlags int InitialMmapSize int // contains filtered or unexported fields }
DB represents a collection of buckets persisted to a file on disk. All data access is performed through transactions which can be obtained through the DB. All the functions on DB will return a ErrDatabaseNotOpen if accessed before Open() is called.
type DBFactory ¶
type DBFactory struct { InitialMmapSize int // contains filtered or unexported fields }
DBFactory is used to manupilate mulitple data files by index number
type FanOutQueue ¶
type FanOutQueue interface { // Open to open target file if failed error returns Open(dir string, queueName string, options *Options) error // IsEmpty Determines whether a queue is empty //fanoutId queue index // return ture if empty, false otherwise IsEmpty(fanoutID int64) bool // Size return avaiable queue size Size(fanoutID int64) int64 // Enqueue Append an item to the queue and return index no // if any error ocurres a non-nil error returned Enqueue(data []byte) (int64, error) // EnqueueAsync Append an item to the queue async way EnqueueAsync(data []byte, fn func(int64, error)) Dequeue(fanoutID int64) (int64, []byte, error) Peek(fanoutID int64) (int64, []byte, error) // To skip deqeue target number of items Skip(fanoutID int64, count int64) error Close() error // Set to asynchous subscribe Subscribe(fanoutID int64, fn func(int64, []byte, error)) error // to free asynchous subscribe FreeSubscribe(fanoutID int64) // FreeAllSubscribe to free all asynchous subscribe FreeAllSubscribe() }
FanOutQueue queue supports with pub-sub feature
type FileFanoutQueue ¶
type FileFanoutQueue struct {
// contains filtered or unexported fields
}
FileFanoutQueue file fanout queue implements
func (*FileFanoutQueue) Dequeue ¶
func (q *FileFanoutQueue) Dequeue(fanoutID int64) (int64, []byte, error)
Dequeue dequeue data from target fanoutID
func (*FileFanoutQueue) Enqueue ¶
func (q *FileFanoutQueue) Enqueue(data []byte) (int64, error)
Enqueue Append an item to the queue and return index no
func (*FileFanoutQueue) FreeAllSubscribe ¶
func (q *FileFanoutQueue) FreeAllSubscribe()
FreeAllSubscribe to free all subscriber
func (*FileFanoutQueue) FreeSubscribe ¶
func (q *FileFanoutQueue) FreeSubscribe(fanoutID int64)
FreeSubscribe to free subscriber by target fanout id
func (*FileFanoutQueue) IsEmpty ¶
func (q *FileFanoutQueue) IsEmpty(fanoutID int64) bool
IsEmpty test if target fanoutID is empty
func (*FileFanoutQueue) Open ¶
func (q *FileFanoutQueue) Open(dir string, queueName string, options *Options) error
Open the queue files
func (*FileFanoutQueue) Peek ¶
func (q *FileFanoutQueue) Peek(fanoutID int64) (int64, []byte, error)
Peek peek the head item from target fanoutID
func (*FileFanoutQueue) PeekAll ¶
func (q *FileFanoutQueue) PeekAll(fanoutID int64) ([][]byte, []int64, error)
// PeekAll Retrieves all the items from the front of a queue return array of data and array of index
func (*FileFanoutQueue) PeekPagination ¶
func (q *FileFanoutQueue) PeekPagination(fanoutID int64, page, pagesize uint64) ([][]byte, []int64, error)
PeekPagination to peek data from queue by paing feature.
func (*FileFanoutQueue) Size ¶
func (q *FileFanoutQueue) Size(fanoutID int64) int64
Size return item size with target fanoutID
func (*FileFanoutQueue) Skip ¶
func (q *FileFanoutQueue) Skip(fanoutID int64, count int64) error
Skip To skip deqeue target number of items
func (*FileFanoutQueue) Status ¶
func (q *FileFanoutQueue) Status(fanoutID int64) *QueueFilesStatus
Status get status info from current queue
type FileQueue ¶
type FileQueue struct {
// contains filtered or unexported fields
}
FileQueue queue implements with mapp file
func (*FileQueue) EnqueueAsync ¶
EnqueueAsync adds an item at the queue and HeadIndex will increase Asynchouous mode will call back with fn function
func (*FileQueue) Gc ¶
Gc Delete all used data files to free disk space.
BigQueue will persist enqueued data in disk files, these data files will remain even after the data in them has been dequeued later, so your application is responsible to periodically call this method to delete all used data files and free disk space.
func (*FileQueue) Peek ¶
Peek Retrieves the item at the front of a queue if item exist return with index id, item data
func (*FileQueue) PeekAll ¶
PeekAll Retrieves all the items from the front of a queue return array of data and array of index
func (*FileQueue) PeekPagination ¶
PeekPagination to peek data from queue by paing feature.
func (*FileQueue) Status ¶
func (q *FileQueue) Status() *QueueFilesStatus
Status get status info from current queue
type IOQueue ¶
type IOQueue interface { Queue // Open queue from file io info Open(dir string, queueName string, options *Options) error }
I/O queue inteface to define the all necessary functions
type Options ¶
type Options struct { // size in bytes of a data page DataPageSize int // the item count is 1 << IndexItemsPerPage IndexItemsPerPage int // if value > 0 then enable auto gc features and repeat process gc by the specified interval time in seconds. AutoGCBySeconds int // contains filtered or unexported fields }
Options the options struct
type Queue ¶
type Queue interface { // Determines whether a queue is empty // return ture if empty, false otherwise IsEmpty() bool // return avaiable queue size Size() int64 // Append an item to the queue and return index no // if any error ocurres a non-nil error returned Enqueue(data []byte) (int64, error) EnqueueAsync(data []byte, fn func(int64, error)) Dequeue() (int64, []byte, error) Peek() (int64, []byte, error) // To skip deqeue target number of items Skip(count int64) error Close() error // Delete all used data files to free disk space. Gc() error // Set to asynchous subscribe Subscribe(fn func(int64, []byte, error)) error // to free asynchous subscribe FreeSubscribe() }
Queue inteface to define the all necessary functions
type QueueFileInfo ¶
queue file info
type QueueFilesStatus ¶
type QueueFilesStatus struct { // front index of the big queue, FrontIndex int64 // head index of the array, this is the read write barrier. // readers can only read items before this index, and writes can write this index or after HeadIndex int64 // tail index of the array, // readers can't read items before this tail TailIndex int64 // head index of the data page, this is the to be appended data page index HeadDataPageIndex int64 // head offset of the data page, this is the to be appended data offset HeadDataItemOffset int64 IndexFileList []*QueueFileInfo DataFileList []*QueueFileInfo MetaFileInfo *QueueFileInfo FrontFileInfo *QueueFileInfo }
status info of queue files
type QueueFront ¶
type QueueFront struct {
// contains filtered or unexported fields
}
QueueFront queue front struct
type RemoteQueue ¶
type RemoteQueue interface { Queue // Open queue from remote server Open(serverUrl string, queueName string) }
RemoteQueue remote server queue inteface to define the all necessary functions