Documentation ¶
Index ¶
- Constants
- type AppLogFunc
- type Interface
- type LogLevel
- type Options
- type RepairProcessFunc
- type StringSlice
- type WALTimeRollQueue
- func (w *WALTimeRollQueue) Close()
- func (w *WALTimeRollQueue) DeleteForezenBefore(t int64)
- func (w *WALTimeRollQueue) DeleteRepairs()
- func (w *WALTimeRollQueue) FinishRepaired() bool
- func (w *WALTimeRollQueue) GetLeftOverDelQueueNames() []string
- func (w *WALTimeRollQueue) GetLeftOverRepairQueueNames() ([]string, error)
- func (w *WALTimeRollQueue) GetNextRollTime() int64
- func (w *WALTimeRollQueue) GetNowActiveQueueName() string
- func (w *WALTimeRollQueue) GetNowActiveRepairQueueName() string
- func (w *WALTimeRollQueue) GetRepairQueueNames() []string
- func (w *WALTimeRollQueue) Put(msg []byte) error
- func (w *WALTimeRollQueue) ReadMsg() ([]byte, bool)
- func (w *WALTimeRollQueue) ResetRepairs()
- func (w *WALTimeRollQueue) Roll()
- func (w *WALTimeRollQueue) SetRepairProcessFunc(rpf func(msg []byte) bool)
- func (w *WALTimeRollQueue) Start() error
- func (w *WALTimeRollQueue) Stats() *WALTimeRollQueueStats
- type WALTimeRollQueueI
- type WALTimeRollQueueStats
Constants ¶
View Source
const ( DEBUG = LogLevel(1) INFO = LogLevel(2) WARN = LogLevel(3) ERROR = LogLevel(4) FATAL = LogLevel(5) )
View Source
const ( DefaultName = "timerollqueue" DefaultDataPath = "." DefaultMaxBytesPerFile = 100 * 1024 * 1024 DefaultMinMsgSize = 0 DefaultMaxMsgSize = 4 * 1024 * 1024 DefaultSyncEvery = 500 DefaultSyncTimeout = 2 * time.Second DefaultRollTimeSpanSecond = 2 * 3600 DefaultRotationTimeSecond = 4 * 3600 DefaultBackoffDuration = 100 * time.Millisecond DefaultLimiterBatch = 100000 DefaultLimiterDuration = 0 // 默认不去做流控了,实在需要在应用层自己设置呗 QueueMetaSuffix = ".diskqueue.meta.dat" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AppLogFunc ¶
type Interface ¶
type Interface interface { GetName() string Put([]byte) error // 这个是阻塞的读 ReadChan() <-chan []byte // this is expected to be an *unbuffered* channel // 提供非阻塞读 ReadNoBlock() ([]byte, bool) Close() error // 判断是否已经读完当前队列里面的消息 ReadEnd() bool // 清空某个队列相关所有的相关信息 Empty() error ResetReadMetaData() error }
func New ¶
func New(name string, dataPath string, maxBytesPerFile int64, minMsgSize int32, maxMsgSize int32, syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface
New instantiates an instance of diskQueue, retrieving metadata from the filesystem and starting the read ahead goroutine 创建磁盘队列 参数:队列名,数据路径,每个文件最大字节数,消息消息大小,最大消息大小,每多少个请求同步一次,同步耗时
type Options ¶
type Options struct { Name string `json:"WALTimeRollQueue.Name"` DataPath string `json:"WALTimeRollQueue.DataPath"` MaxBytesPerFile int64 `json:"WALTimeRollQueue.MaxBytesPerFile"` MinMsgSize int32 `json:"WALTimeRollQueue.MinMsgSize"` MaxMsgSize int32 `json:"WALTimeRollQueue.MaxMsgSize"` SyncEvery int64 `json:"WALTimeRollQueue.SyncEvery"` SyncTimeout time.Duration `json:"WALTimeRollQueue.SyncTimeout"` RollTimeSpanSecond int64 `json:"WALTimeRollQueue.RollTimeSpanSecond"` // 配置单位为s RotationTimeSecond int64 `json:"WALTimeRollQueue.RotationTimeSecond"` // 单位为s BackoffDuration time.Duration `json:"WALTimeRollQueue.BackoffDuration"` LimiterBatch int `json:"WALTimeRollQueue.LimiterBatch"` LimiterDuration time.Duration `json:"WALTimeRollQueue.LimiterDuration"` }
func DefaultOption ¶
func DefaultOption() *Options
type StringSlice ¶
type StringSlice []string
排序从大到小
func (StringSlice) Len ¶
func (s StringSlice) Len() int
func (StringSlice) Less ¶
func (s StringSlice) Less(i, j int) bool
func (StringSlice) Swap ¶
func (s StringSlice) Swap(i, j int)
type WALTimeRollQueue ¶
type WALTimeRollQueue struct { // 每次roll的时候要加锁 sync.RWMutex // instantiation time metadata // 队列名字 Name string // contains filtered or unexported fields }
func (*WALTimeRollQueue) Close ¶
func (w *WALTimeRollQueue) Close()
func (*WALTimeRollQueue) DeleteForezenBefore ¶
func (w *WALTimeRollQueue) DeleteForezenBefore(t int64)
func (*WALTimeRollQueue) DeleteRepairs ¶
func (w *WALTimeRollQueue) DeleteRepairs()
func (*WALTimeRollQueue) FinishRepaired ¶ added in v1.0.3
func (w *WALTimeRollQueue) FinishRepaired() bool
func (*WALTimeRollQueue) GetLeftOverDelQueueNames ¶ added in v1.2.1
func (w *WALTimeRollQueue) GetLeftOverDelQueueNames() []string
func (*WALTimeRollQueue) GetLeftOverRepairQueueNames ¶ added in v1.1.4
func (w *WALTimeRollQueue) GetLeftOverRepairQueueNames() ([]string, error)
func (*WALTimeRollQueue) GetNextRollTime ¶
func (w *WALTimeRollQueue) GetNextRollTime() int64
func (*WALTimeRollQueue) GetNowActiveQueueName ¶
func (w *WALTimeRollQueue) GetNowActiveQueueName() string
func (*WALTimeRollQueue) GetNowActiveRepairQueueName ¶ added in v1.1.9
func (w *WALTimeRollQueue) GetNowActiveRepairQueueName() string
func (*WALTimeRollQueue) GetRepairQueueNames ¶ added in v1.2.0
func (w *WALTimeRollQueue) GetRepairQueueNames() []string
func (*WALTimeRollQueue) Put ¶
func (w *WALTimeRollQueue) Put(msg []byte) error
func (*WALTimeRollQueue) ReadMsg ¶
func (w *WALTimeRollQueue) ReadMsg() ([]byte, bool)
func (*WALTimeRollQueue) ResetRepairs ¶
func (w *WALTimeRollQueue) ResetRepairs()
func (*WALTimeRollQueue) SetRepairProcessFunc ¶ added in v1.0.3
func (w *WALTimeRollQueue) SetRepairProcessFunc(rpf func(msg []byte) bool)
func (*WALTimeRollQueue) Start ¶
func (w *WALTimeRollQueue) Start() error
func (*WALTimeRollQueue) Stats ¶ added in v1.1.4
func (w *WALTimeRollQueue) Stats() *WALTimeRollQueueStats
type WALTimeRollQueueI ¶
type WALTimeRollQueueI interface { // 生产消息 Put([]byte) error // 启动, 一旦启动会开启一个新的activeQueue,并且从磁盘中获取所有的队列,将其放入repair列表 Start() error ReadMsg() ([]byte, bool) // 关闭:关闭activeQueue并且同步磁盘 Close() Stats() *WALTimeRollQueueStats SetRepairProcessFunc(rpf func(msg []byte) bool) // 是否修复完成 FinishRepaired() bool // 删除某个时间戳之前的冷冻队列 DeleteForezenBefore(t int64) // 删除repair队列 DeleteRepairs() }
func NewTimeRollQueue ¶
func NewTimeRollQueue(log AppLogFunc, options *Options) WALTimeRollQueueI
type WALTimeRollQueueStats ¶ added in v1.1.4
type WALTimeRollQueueStats struct { Name string `json:"Name"` ForezenQueuesNum int `json:"ForezenQueuesNum"` RepairQueueNamesNum int `json:"RepairQueueNamesNum"` LeftOverRepairQueueNum int `json:"LeftOverRepairQueueNum"` LeftOverDelQueueNum int `json:"LeftOverDelQueueNum"` RepairCount int64 `json:"RepairCount"` RepairFinished bool `json:"RepairFinished"` }
Click to show internal directories.
Click to hide internal directories.