Documentation ¶
Index ¶
- Constants
- Variables
- type Queue
- func (l *Queue) Advance() error
- func (l *Queue) Append(b []byte) error
- func (l *Queue) Close() error
- func (l *Queue) Current() ([]byte, error)
- func (l *Queue) Dir() string
- func (l *Queue) DiskUsage() int64
- func (l *Queue) Empty() bool
- func (l *Queue) LastModified() (time.Time, error)
- func (l *Queue) NewScanner() (Scanner, error)
- func (l *Queue) Open() error
- func (l *Queue) PeekN(n int) ([][]byte, error)
- func (l *Queue) Position() (*QueuePos, error)
- func (l *Queue) PurgeOlderThan(when time.Time) error
- func (l *Queue) Remove() error
- func (l *Queue) RemoveSegments() error
- func (l *Queue) SetMaxSegmentSize(size int64) error
- func (l *Queue) SetMaxSize(maxSize int64) error
- func (l *Queue) TotalBytes() int64
- func (l *Queue) TotalSegments() int
- func (l *Queue) WithLogger(log *zap.Logger)
- type QueuePos
- type Scanner
- type SharedCount
Constants ¶
const (
DefaultSegmentSize = 10 * 1024 * 1024
)
const MaxWritesPending = 1024
MaxWritesPending is the number of writes that can be pending at any given time.
Variables ¶
var ( ErrNotOpen = fmt.Errorf("queue not open") ErrQueueFull = fmt.Errorf("queue is full") ErrQueueBlocked = fmt.Errorf("queue is blocked") ErrSegmentFull = fmt.Errorf("segment is full") )
Possible errors returned by a queue.
Functions ¶
This section is empty.
Types ¶
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue is a bounded, disk-backed, append-only type that combines Queue and log semantics. byte slices can be appended and read back in-order. The Queue maintains a pointer to the current Head byte slice and can re-read from the Head until it has been advanced.
Internally, the Queue writes byte slices to multiple segment files so that disk space can be reclaimed. When a segment file is larger than the max segment size, a new file is created. Segments are removed after their Head pointer has advanced past the last entry. The first segment is the head, and the last segment is the tail. Reads are from the head segment and writes tail segment.
queues can have a max size configured such that when the size of all segments on disk exceeds the size, write will fail.
┌─────┐ │Head │ ├─────┘ │ ▼ ┌─────────────────┐ ┌─────────────────┐┌─────────────────┐ │Segment 1 - 10MB │ │Segment 2 - 10MB ││Segment 3 - 10MB │ └─────────────────┘ └─────────────────┘└─────────────────┘ ▲ │ │ ┌─────┐ │Tail │ └─────┘
func NewQueue ¶
func NewQueue(dir string, maxSize int64, maxSegmentSize int64, queueTotalSize *SharedCount, depth int, verifyBlockFn func([]byte) error) (*Queue, error)
NewQueue create a Queue that will store segments in dir and that will consume no more than maxSize on disk.
func (*Queue) DiskUsage ¶ added in v2.2.0
DiskUsage returns the total size on disk used by the Queue.
func (*Queue) LastModified ¶
LastModified returns the last time the queue was modified.
func (*Queue) NewScanner ¶
func (*Queue) Remove ¶
Remove removes all underlying file-based resources for the queue. It is an error to call this on an open queue.
func (*Queue) RemoveSegments ¶
RemoveSegments removes all segments for the queue. It is an error to call this on an open queue.
func (*Queue) SetMaxSegmentSize ¶
SetMaxSegmentSize updates the max segment size for new and existing (tail) segments.
The new segment size must be less than half the current max queue size, otherwise an error will be returned.
func (*Queue) SetMaxSize ¶
SetMaxSize updates the max queue size to the passed-in value.
Max queue size must be at least twice the current max segment size, otherwise an error will be returned.
If the new value is smaller than the amount of data currently in the queue, writes will be rejected until the queue drains to below the new maximum.
func (*Queue) TotalBytes ¶
TotalBytes returns the number of bytes of data remaining in the queue.
func (*Queue) TotalSegments ¶
TotalSegments determines how many segments the current Queue is utilising. Empty segments at the end of the Queue are not counted.
func (*Queue) WithLogger ¶
WithLogger sets the internal logger to the logger passed in.
type Scanner ¶
type Scanner interface { // Next returns the current block and advances the scanner to the next block. Next() bool // Err returns any non io.EOF error as a result of calling the Next function. Err() error // Bytes returns the most recent block generated by a call to Next. A new buffer // is generated with each call to Next, so the buffer may be retained by the caller. Bytes() []byte // Advance moves the head pointer to the next byte slice in the queue. // Advance is guaranteed to make forward progress and is idempotent. Advance() (int64, error) }
type SharedCount ¶
type SharedCount struct {
// contains filtered or unexported fields
}
SharedCount manages an integer value, which can be read/written concurrently.
func (*SharedCount) Add ¶
func (sc *SharedCount) Add(delta int64)
Add adds delta to the counter value.
func (*SharedCount) Value ¶
func (sc *SharedCount) Value() int64
Value returns the current value value.