durablequeue

package
v2.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 16, 2022 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultSegmentSize = 10 * 1024 * 1024
)
View Source
const MaxWritesPending = 1024

MaxWritesPending is the number of writes that can be pending at any given time.

Variables

View Source
var (
	ErrNotOpen      = fmt.Errorf("queue not open")
	ErrQueueFull    = fmt.Errorf("queue is full")
	ErrQueueBlocked = fmt.Errorf("queue is blocked")
	ErrSegmentFull  = fmt.Errorf("segment is full")
)

Possible errors returned by a queue.

Functions

This section is empty.

Types

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue is a bounded, disk-backed, append-only type that combines Queue and log semantics. byte slices can be appended and read back in-order. The Queue maintains a pointer to the current Head byte slice and can re-read from the Head until it has been advanced.

Internally, the Queue writes byte slices to multiple segment files so that disk space can be reclaimed. When a segment file is larger than the max segment size, a new file is created. Segments are removed after their Head pointer has advanced past the last entry. The first segment is the head, and the last segment is the tail. Reads are from the head segment and writes tail segment.

queues can have a max size configured such that when the size of all segments on disk exceeds the size, write will fail.

┌─────┐
│Head │
├─────┘
│
▼
┌─────────────────┐ ┌─────────────────┐┌─────────────────┐
│Segment 1 - 10MB │ │Segment 2 - 10MB ││Segment 3 - 10MB │
└─────────────────┘ └─────────────────┘└─────────────────┘
                                                         ▲
                                                         │
                                                         │
                                                    ┌─────┐
                                                    │Tail │
                                                    └─────┘

func NewQueue

func NewQueue(dir string, maxSize int64, maxSegmentSize int64, queueTotalSize *SharedCount, depth int, verifyBlockFn func([]byte) error) (*Queue, error)

NewQueue create a Queue that will store segments in dir and that will consume no more than maxSize on disk.

func (*Queue) Advance

func (l *Queue) Advance() error

Advance moves the Head point to the next byte slice in the queue.

func (*Queue) Append

func (l *Queue) Append(b []byte) error

Append appends a byte slice to the end of the queue.

func (*Queue) Close

func (l *Queue) Close() error

Close stops the queue for reading and writing.

func (*Queue) Current

func (l *Queue) Current() ([]byte, error)

Current returns the current byte slice at the Head of the queue.

func (*Queue) Dir added in v2.2.0

func (l *Queue) Dir() string

Dir returns the directory associated with the queue.

func (*Queue) DiskUsage added in v2.2.0

func (l *Queue) DiskUsage() int64

DiskUsage returns the total size on disk used by the Queue.

func (*Queue) Empty

func (l *Queue) Empty() bool

Empty returns whether the queue's underlying segments are empty.

func (*Queue) LastModified

func (l *Queue) LastModified() (time.Time, error)

LastModified returns the last time the queue was modified.

func (*Queue) NewScanner

func (l *Queue) NewScanner() (Scanner, error)

func (*Queue) Open

func (l *Queue) Open() error

Open opens the queue for reading and writing.

func (*Queue) PeekN

func (l *Queue) PeekN(n int) ([][]byte, error)

Peek returns the next n byte slices at the Head of the queue.

func (*Queue) Position

func (l *Queue) Position() (*QueuePos, error)

func (*Queue) PurgeOlderThan

func (l *Queue) PurgeOlderThan(when time.Time) error

func (*Queue) Remove

func (l *Queue) Remove() error

Remove removes all underlying file-based resources for the queue. It is an error to call this on an open queue.

func (*Queue) RemoveSegments

func (l *Queue) RemoveSegments() error

RemoveSegments removes all segments for the queue. It is an error to call this on an open queue.

func (*Queue) SetMaxSegmentSize

func (l *Queue) SetMaxSegmentSize(size int64) error

SetMaxSegmentSize updates the max segment size for new and existing (tail) segments.

The new segment size must be less than half the current max queue size, otherwise an error will be returned.

func (*Queue) SetMaxSize

func (l *Queue) SetMaxSize(maxSize int64) error

SetMaxSize updates the max queue size to the passed-in value.

Max queue size must be at least twice the current max segment size, otherwise an error will be returned.

If the new value is smaller than the amount of data currently in the queue, writes will be rejected until the queue drains to below the new maximum.

func (*Queue) TotalBytes

func (l *Queue) TotalBytes() int64

TotalBytes returns the number of bytes of data remaining in the queue.

func (*Queue) TotalSegments

func (l *Queue) TotalSegments() int

TotalSegments determines how many segments the current Queue is utilising. Empty segments at the end of the Queue are not counted.

func (*Queue) WithLogger

func (l *Queue) WithLogger(log *zap.Logger)

WithLogger sets the internal logger to the logger passed in.

type QueuePos

type QueuePos struct {
	Head string
	Tail string
}

type Scanner

type Scanner interface {
	// Next returns the current block and advances the scanner to the next block.
	Next() bool

	// Err returns any non io.EOF error as a result of calling the Next function.
	Err() error

	// Bytes returns the most recent block generated by a call to Next. A new buffer
	// is generated with each call to Next, so the buffer may be retained by the caller.
	Bytes() []byte

	// Advance moves the head pointer to the next byte slice in the queue.
	// Advance is guaranteed to make forward progress and is idempotent.
	Advance() (int64, error)
}

type SharedCount

type SharedCount struct {
	// contains filtered or unexported fields
}

SharedCount manages an integer value, which can be read/written concurrently.

func (*SharedCount) Add

func (sc *SharedCount) Add(delta int64)

Add adds delta to the counter value.

func (*SharedCount) Value

func (sc *SharedCount) Value() int64

Value returns the current value value.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL