diskqueue

package module
v1.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 10, 2020 License: MIT Imports: 16 Imported by: 0

README

go-diskqueue

Build Status GoDoc GitHub release

A Go package providing a filesystem-backed FIFO queue

Pulled out of https://github.com/nsqio/nsq

说明:基于go-diskqueue的改造版本

  • 去除原先depth属性
  • 去除自动轮转与自动删除的策略,为了适应某些场景需要重新读取队列
  • 增加基于时间轮转的功能,每过一个时间周期,轮转一次队列的实际存储名字
  • 此roll queue将写入记录持续不断地写入进来,并且保证数据至少被消费一遍
  • 如果调用者不手动提交删除操作,则队列不会删除,在重启后,队列里面的数据会被再消费一遍,所以说重复读是大概率事情
  • 此队列的场景是保证数据不丢,而不是保持消费一次

Documentation

Index

Constants

View Source
const (
	DEBUG = LogLevel(1)
	INFO  = LogLevel(2)
	WARN  = LogLevel(3)
	ERROR = LogLevel(4)
	FATAL = LogLevel(5)
)
View Source
const (
	DefaultName               = "timerollqueue"
	DefaultDataPath           = "."
	DefaultMaxBytesPerFile    = 100 * 1024 * 1024
	DefaultMinMsgSize         = 0
	DefaultMaxMsgSize         = 4 * 1024 * 1024
	DefaultSyncEvery          = 500
	DefaultSyncTimeout        = 2 * time.Second
	DefaultRollTimeSpanSecond = 2 * 3600
	DefaultRotationTimeSecond = 4 * 3600
	DefaultBackoffDuration    = 100 * time.Millisecond
	DefaultLimiterBatch       = 100000
	DefaultLimiterDuration    = 0 // 默认不去做流控了,实在需要在应用层自己设置呗
	QueueMetaSuffix           = ".diskqueue.meta.dat"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type AppLogFunc

type AppLogFunc func(lvl LogLevel, f string, args ...interface{})

type Interface

type Interface interface {
	GetName() string
	Put([]byte) error
	// 这个是阻塞的读
	ReadChan() <-chan []byte // this is expected to be an *unbuffered* channel
	// 提供非阻塞读
	ReadNoBlock() ([]byte, bool)
	Close() error
	// 判断是否已经读完当前队列里面的消息
	ReadEnd() bool
	// 清空某个队列相关所有的相关信息
	Empty() error
	ResetReadMetaData() error
}

func New

func New(name string, dataPath string, maxBytesPerFile int64,
	minMsgSize int32, maxMsgSize int32,
	syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface

New instantiates an instance of diskQueue, retrieving metadata from the filesystem and starting the read ahead goroutine 创建磁盘队列 参数:队列名,数据路径,每个文件最大字节数,消息消息大小,最大消息大小,每多少个请求同步一次,同步耗时

type LogLevel

type LogLevel int

func (LogLevel) String

func (l LogLevel) String() string

type Options

type Options struct {
	Name               string        `json:"WALTimeRollQueue.Name"`
	DataPath           string        `json:"WALTimeRollQueue.DataPath"`
	MaxBytesPerFile    int64         `json:"WALTimeRollQueue.MaxBytesPerFile"`
	MinMsgSize         int32         `json:"WALTimeRollQueue.MinMsgSize"`
	MaxMsgSize         int32         `json:"WALTimeRollQueue.MaxMsgSize"`
	SyncEvery          int64         `json:"WALTimeRollQueue.SyncEvery"`
	SyncTimeout        time.Duration `json:"WALTimeRollQueue.SyncTimeout"`
	RollTimeSpanSecond int64         `json:"WALTimeRollQueue.RollTimeSpanSecond"` // 配置单位为s
	RotationTimeSecond int64         `json:"WALTimeRollQueue.RotationTimeSecond"` // 单位为s
	BackoffDuration    time.Duration `json:"WALTimeRollQueue.BackoffDuration"`
	LimiterBatch       int           `json:"WALTimeRollQueue.LimiterBatch"`
	LimiterDuration    time.Duration `json:"WALTimeRollQueue.LimiterDuration"`
}

func DefaultOption

func DefaultOption() *Options

type RepairProcessFunc

type RepairProcessFunc func(msg []byte) bool

repair process func

type StringSlice

type StringSlice []string

排序从大到小

func (StringSlice) Len

func (s StringSlice) Len() int

func (StringSlice) Less

func (s StringSlice) Less(i, j int) bool

func (StringSlice) Swap

func (s StringSlice) Swap(i, j int)

type WALTimeRollQueue

type WALTimeRollQueue struct {
	// 每次roll的时候要加锁
	sync.RWMutex

	// instantiation time metadata
	// 队列名字
	Name string
	// contains filtered or unexported fields
}

func (*WALTimeRollQueue) Close

func (w *WALTimeRollQueue) Close()

func (*WALTimeRollQueue) DeleteForezenBefore

func (w *WALTimeRollQueue) DeleteForezenBefore(t int64)

func (*WALTimeRollQueue) DeleteRepairs

func (w *WALTimeRollQueue) DeleteRepairs()

func (*WALTimeRollQueue) FinishRepaired added in v1.0.3

func (w *WALTimeRollQueue) FinishRepaired() bool

func (*WALTimeRollQueue) GetLeftOverDelQueueNames added in v1.2.1

func (w *WALTimeRollQueue) GetLeftOverDelQueueNames() []string

func (*WALTimeRollQueue) GetLeftOverRepairQueueNames added in v1.1.4

func (w *WALTimeRollQueue) GetLeftOverRepairQueueNames() ([]string, error)

func (*WALTimeRollQueue) GetNextRollTime

func (w *WALTimeRollQueue) GetNextRollTime() int64

func (*WALTimeRollQueue) GetNowActiveQueueName

func (w *WALTimeRollQueue) GetNowActiveQueueName() string

func (*WALTimeRollQueue) GetNowActiveRepairQueueName added in v1.1.9

func (w *WALTimeRollQueue) GetNowActiveRepairQueueName() string

func (*WALTimeRollQueue) GetRepairQueueNames added in v1.2.0

func (w *WALTimeRollQueue) GetRepairQueueNames() []string

func (*WALTimeRollQueue) Put

func (w *WALTimeRollQueue) Put(msg []byte) error

func (*WALTimeRollQueue) ReadMsg

func (w *WALTimeRollQueue) ReadMsg() ([]byte, bool)

func (*WALTimeRollQueue) ResetRepairs

func (w *WALTimeRollQueue) ResetRepairs()

func (*WALTimeRollQueue) Roll

func (w *WALTimeRollQueue) Roll()

并发安全的,多个进程同时发起Roll,那么只有一个会成功

func (*WALTimeRollQueue) SetRepairProcessFunc added in v1.0.3

func (w *WALTimeRollQueue) SetRepairProcessFunc(rpf func(msg []byte) bool)

func (*WALTimeRollQueue) Start

func (w *WALTimeRollQueue) Start() error

func (*WALTimeRollQueue) Stats added in v1.1.4

type WALTimeRollQueueI

type WALTimeRollQueueI interface {
	// 生产消息
	Put([]byte) error
	// 启动, 一旦启动会开启一个新的activeQueue,并且从磁盘中获取所有的队列,将其放入repair列表
	Start() error

	ReadMsg() ([]byte, bool)
	// 关闭:关闭activeQueue并且同步磁盘
	Close()

	Stats() *WALTimeRollQueueStats

	SetRepairProcessFunc(rpf func(msg []byte) bool)
	// 是否修复完成
	FinishRepaired() bool
	// 删除某个时间戳之前的冷冻队列
	DeleteForezenBefore(t int64)
	// 删除repair队列
	DeleteRepairs()
}

func NewTimeRollQueue

func NewTimeRollQueue(log AppLogFunc, options *Options) WALTimeRollQueueI

type WALTimeRollQueueStats added in v1.1.4

type WALTimeRollQueueStats struct {
	Name                   string `json:"Name"`
	ForezenQueuesNum       int    `json:"ForezenQueuesNum"`
	RepairQueueNamesNum    int    `json:"RepairQueueNamesNum"`
	LeftOverRepairQueueNum int    `json:"LeftOverRepairQueueNum"`
	LeftOverDelQueueNum    int    `json:"LeftOverDelQueueNum"`
	RepairCount            int64  `json:"RepairCount"`
	RepairFinished         bool   `json:"RepairFinished"`
}

Directories

Path Synopsis
test

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL