qringbuf

package module
v0.0.0-...-95833b0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 26, 2024 License: Apache-2.0, MIT Imports: 7 Imported by: 1

README

q(uantized)ringbuf(fer)

A thread-safe variant of a classic circular buffer with a double-twist

qringbuf is a circular buffer variant, similar to (but not a derivative of) Bip Buffer and spsc-bip-buffer/bbqueue. It provides a concurrency-friendly, zero-copy abstraction of io.ReadAtLeast() over a pre-allocated ring-buffer, populated asynchronously by a standalone goroutine. Refer to the implementation-notes diagrams to get a quick overview of how this works in practice.

This library is primarily designed for processing a series of arbitrary streams, each comprised of variable-length records. Refer to the Examples and to the StartFill(…) / NextRegion(…) combo for a crash-course in usage patterns.

Documentation

https://pkg.go.dev/github.com/ipfs/go-qringbuf

Lead Maintainer

Peter Rabbitson

License

SPDX-License-Identifier: Apache-2.0 OR MIT

Documentation

Overview

Package qringbuf provides a concurrency-friendly, zero-copy abstraction of io.ReadAtLeast(…) over a pre-allocated ring-buffer, populated asynchronously by a standalone goroutine. It is primarily designed for processing a series of consecutive sub-streams from a single io.Reader, each sub-stream in turn comprised of variable-length records.

The buffer object DOES NOT ASSUME exclusive ownership of the supplied io.Reader, never reads more than instructed by an argument to StartFill(…), and exposes a standard sync.Mutex interface allowing pausing all operations when exclusive access of the underlying Reader is desired.

Examples

In all cases below the background "collector" goroutine reading from the enclosed someIoReader into the ring buffer is guaranteed to:

  • never overwrite the buffer portion backing the latest result of NextRegion(…)
  • never overwrite any buffer portion backing a Reserve()d (Sub)Region

In code the basic usage looks roughly like this (error/flow handling elided):

qrb, initErr := qringbuf.NewFromReader(someIoReader, qringbuf.Config{…})
…
var nextSubstreamSize int64
for {
	sizeErr := binary.Read(
		someIoReader,
		binary.BigEndian,
		&nextSubstreamSize,
	)
	…
	startErr := qrb.StartFill(nextSubstreamSize)
	…
	var available, processed int
	for {
		// Reevaluate available and processed from the *previous* round,
		// indicating how much has NOT been processed, and needs re-serving
		// Note: one MUST advance the stream by at least a single byte
		reg, streamErr := qrb.NextRegion(available - processed)

		// io.EOF only means the collector stopped: there could be up to
		// BufferSize() bytes remaining in the buffer. Loop until reg is nil
		if (streamErr != nil && streamErr != io.EOF) {
			return streamErr
		} else if reg == nil {
			break
		}

		// Work with region, processing all or just a portion of the data
		available = reg.Size()
		processed = frobnicate(reg.Bytes(), …)
	}
}

In addition one can operate over individual (sub)regions with "fearless concurrency":

…
var available, processed int
for {
	// Reevaluate available and processed from the *previous* round,
	// indicating how much has NOT been processed, and needs re-serving
	// Note: one MUST advance the stream by at least a single byte
	reg, streamErr := qrb.NextRegion(available - processed)

	// io.EOF only means the collector stopped: there could be up to
	// BufferSize() bytes remaining in the buffer. Loop until reg is nil
	if (streamErr != nil && streamErr != io.EOF) {
		return streamErr
	} else if reg == nil {
		break
	}

	available = reg.Size()
	if available > 256 {
		reg = reg.SubRegion(0, 256)
	}

	reg.Reserve()
	processed = reg.Size()

	go func() {
		frobnicate(reg.Bytes(), …)
		reg.Release()
	}()
}

Implementation notes

The specific technical guarantees made by an object of this package are:

  • Memory for incoming data is allocated only at construction time, never during streaming
  • StartFill(…) spawns off one (and only one) goroutine (collector) which terminates when: ◦ It reaches readLimit, if one was supplied to StartFill(…) ◦ It receives any error from the wrapped reader (including io.EOF or os.ErrClosed)
  • Every call to NextRegion(…) blocks until it can return: ◦ ( *Region object representing a contiguous slice of at least MinRegion bytes, nil ) ◦ ( *Region object representing all remaining data, any underlying error including io.EOF ) ◦ ( nil when there is no data remaining in buffer, any underlying error including io.EOF )
  • *Region.Bytes() is always a slice of the underlying buffer, no data copying takes place
  • Data backing a *Region is guaranteed to remain available / not be overwritten, provided: ◦ NextRegion(…) has not been called again allowing further writes into the buffer ◦ *Region.Reserve() was invoked, which blocks writes until a subsequent *Region.Release()

Unlike io.ReadAtLeast(…), errors from the underlying reader are always made available on NextRegion(…). As with the standard io.Read(…) semantics, an error can be returned together with a result. One should always check whether the *Region return value is nil first, before processing the error. See the documentation of io.Read(…) for an extended discussion.

Changes of the NextRegion(…) "emitter" and collector positions are protected by a mutex on the qringbuf object. Calls modifying the buffer state will block until this lock can be obtained. The same mutex is exposed as part of the API, so one can pause the collector if a direct read and/or skip on the underlying io.Reader is needed.

The *Region.{Reserve/Release}() functionality does not use the mutex, ensuring that an asynchronous Release() call can not be affected by the current state of the buffer. Reservation tracking is implemented as an atomically modified list of reservation counts, one int32 per SectorSize bytes of the buffer.

The reservation system explicitly allows "recursive locking": you can hold an arbitrary number of reservations over a sector by repeatedly creating SubRegion(…) objects. Care must be taken to release every single reservation obtained previously, otherwise the collector will remain blocked forever.

Follows an illustration of a contrived lifecycle of a hypothetical qringbuf object initialized with:

qringbuf.Config{
	BufferSize: 64,
	MinRegion:  16,
	MinRead:    8,
	MaxCopy:    24,
}

Note that for brevity THE DIAGRAMS BELOW ARE DECIDEDLY NOT REPRESENTATIVE of a typical lifecycle. Normally BufferSize is an order of magnitude larger than MinRegion and MaxCopy, and the time spent waiting and copying data is insignificant in relation to all other possible states. Also outstanding async reservations typically trail the emitter very closely, so after a wrap the collector is virtually never blocked, contrary to what is depicted below. Instead the diagrams merely demonstrate the choices this library makes dealing with the "tricky parts" of maintaining the illusion of an arbitrary stream of contiguous bytes.

† C is the collector position: the *end* of the most recent read from the underlying io.Reader
  E is the emitter position: the *start* of the most recently returned NextRegion(…)
  W is the last value of "E" before a wrap took place, 0 otherwise

⓪ Buffer initialized, StartFill(…) is called.
     ╆━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╅
     C=0                                                             ┃
     E=0                                                             ┃
     ╄━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╃
     |0        |10       |20       |30       |40       |50       |60

① NextRegion(0) is blocked until MinRegion of 16 is available,
   fill in progress
     ╆━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╅
     cccccccccc|C=10                                                 ┃
     E=0                                                             ┃
     ╄━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╃
     |0        |10       |20       |30       |40       |50       |60

② NextRegion(0) returned the first 30 bytes when it could,
   collector keeps reading further
     ╆━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╅
     eeeeeeeeeeeeeeeeeeeeeeeeeeeeeecccccccccc|C=40                   ┃
     E=0==========================<                                  ┃
     ╄━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╃
     |0        |10       |20       |30       |40       |50       |60

③ User reserves subRegion 18~21 for async workers, recycles last 6 of the
   30 bytes, NextRegion(6) returns 17 bytes available at the time, 23 total.
   Collector keeps reading, until it can no longer satisfy MinRead
     ╆━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╅
     ┋                 RRRR  eeeeeeeeeeeeeeeeeeeeeeecccccccccccc|C=59┃
     ┋                 RRRR  E=24==================<                 ┃
     ╄━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╃
     |0        |10       |20       |30       |40       |50       |60

④ User recycles last 6 bytes, NextRegion(6) serves the remaining 18 bytes
   Collector now can satisfy MaxCopy, and copies everything over,
   repositioning the emitter index. It then blocks, as it can't write
   past the not-yet released reservation.
     ╆━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╅
     W=41wwwwwwwwwwwwww|C=18                  eeeeeeeeeeeeeeeeee|    ┃
     E=0               RRRR                   W=41=============<|    ┃
     ╄━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╃
     |0        |10       |20       |30       |40       |50       |60

⑤ The async job finishes, reservation is released, collector can now
   advance further, and blocks again as NextRegion(…) has not been called
   meaning the last 18 bytes are still being processed.
     ╆━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╅
     W=41wwwwwwwwwwwwwwccccccccccccccccccccccc|C=41|eeeeeeeeeeee|    ┃
     E=0                                      W=41=============<|    ┃
     ╄━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╃
     |0        |10       |20       |30       |40       |50       |60

⑥ User recycles 4 bytes, NextRegion(4) serves available 27 bytes, and
   the cycle continues from the top until error or EOF
     ╆━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╅
     ┋             wwwwccccccccccccccccccccccc|C=41                  ┃
     ┋             E=14======================<|                      ┃
     ╄━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╃
     |0        |10       |20       |30       |40       |50       |60

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	MinRegion   int // [1:…] Any result of NextRegion(…) is guaranteed to be at least that many bytes long, except at EOF
	MinRead     int // [1:MinRegion] Do not read data from io.Reader until buffer has space to store that many bytes
	BufferSize  int // [MinRead+2*MinRegion:…] Size of the allocated buffer in bytes
	MaxCopy     int // [MinRegion:BufferSize/2] Delay "wrap around" until amount of data to copy falls under this threshold
	SectorSize  int // [1:BufferSize/3] Size of each occupancy sector for *Region.{Reserve|Release}() tracking
	Stats       *Stats
	TrackTiming bool // When a Stats structure is provided, record timing of operations in addition to counts
}

Config is the structure of options expected at initialization time.

type QuantizedRingBuffer

type QuantizedRingBuffer struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewFromReader

func NewFromReader(
	reader io.Reader,
	cfg Config,
) (*QuantizedRingBuffer, error)

func (*QuantizedRingBuffer) Buffered

func (qrb *QuantizedRingBuffer) Buffered() int

Buffered returns the current amount of data already read from the underlying reader, but not yet served via NextRegion(…). It is primarily useful for informative error messages:

if err == io.ErrUnexpectedEOF {
	return fmt.Errorf(
		"unexpected end of stream after %d bytes (stream expected to be %d bytes long)",
		(totalProcessedSoFar + int64(qrb.Buffered())),
		expectedStreamLengthPassedToStartFill,
	)
}

NOTE: if the collector is active as per IsCollectorRunning(), you *MUST* Lock() the qringbuf object before calling Buffered(). Otherwise you will race with the collector changing internal position indexes as it continues doing its job.

func (*QuantizedRingBuffer) IsCollectorRunning

func (qrb *QuantizedRingBuffer) IsCollectorRunning() bool

IsCollectorRunning returns a boolean indicating whether a collector goroutine currently exists. Only useful for debugging purposes.

func (*QuantizedRingBuffer) NextRegion

func (qrb *QuantizedRingBuffer) NextRegion(regionRemainder int) (*Region, error)

NextRegion returns a *Region object representing a portion of the underlying stream. One can explicitly request overlapping *Region's by supplying the number of bytes to "step back" (use 0 for "just give me what's next"). This functionality is especially useful when processing variable-length records where the only information you have is the maximum size of a record. By initializing your qringbuf with MinRegion equal to this maximum value, you guarantee never experiencing a "short-read".

Every call to NextRegion(…) blocks until it can return:

◦ ( *Region object representing a contiguous slice of at least MinRegion bytes, nil )
◦ ( *Region object representing all remaining data, any underlying error including io.EOF )
◦ ( nil when there is no data remaining in buffer, any underlying error including io.EOF )

Any error encountered on the underlying io.Reader, including io.EOF, is returned on every subsequent NextRegion(…) call, often combined with a *Region object representing data still remaining in the ring buffer. The interface follows the model of io.Reader, requiring the user to not only check for errors, but also observe whether data was made available. See the Examples for the typical loop-termination condition and the error handling discussion at https://pkg.go.dev/io?tab=doc#Reader (2nd and 3rd paragraphs)

Each call to NextRegion must advance the stream by at least a single byte: calling NextRegion with regionRemainder equal or larger than Size() of the last *Region results in log.Panic()

func (*QuantizedRingBuffer) StartFill

func (qrb *QuantizedRingBuffer) StartFill(readLimit int64) error

StartFill is the method kicking off the background "collector" goroutine, which then asynchronously writes bytes from the underlying io.Reader into the buffer. This method must be called at the start of every stream-cycle.

Note that a typical use-case of this library is processing a single io.Reader as a series of multiple consecutive sub-streams, using the same qringbuf object and buffer allocation. All one needs is the ability to determine the length of each sub-stream in advance. See the Examples at the start of this document for a complete pseudo-program illustrating this workflow.

No direct controls of the collector goroutine are exposed: the collector will keep trying to read into the buffer until reaching the given readLimit or until the underlying io.Reader returns an error. Closing the io.Reader will result in os.ErrClosed, and thus terminate the collector.

It the supplied readLimit is 0 the collector will continue reading into the buffer until the underlying io.Reader returns io.EOF. If readLimit is a non-zero value, the collector will read exactly that many bytes before shutting down. If the underlying reader returns io.EOF before readLimit is reached, NextRegion(…) will return io.ErrUnexpectedEOF.

If any error other than io.EOF has been encountered, you will not be able to restart a collector via StartFill(). The only way to recover is to allocate a new qringbuf object.

type Region

type Region struct {
	// contains filtered or unexported fields
}

Region is an object representing a part of the buffer. Initially a *Region is obtained by calling NextRegion(…), but then one can subdivide a *Region object into smaller portions via SubRegion(…).

func (*Region) Bytes

func (r *Region) Bytes() []byte

Bytes returns a slice of the underlying ring buffer. One should take care to copy or finish using the returned slice before making another call to NextRegion(…). If Reserve() has been invoked, then the slice is guaranteed to remain intact before the corresponding Release() call.

func (*Region) Release

func (r *Region) Release()

Release marks the buffer area backing this *Region object free after a previous Reserve() call. Calling Release() more than once on the same object, or before Reserve() has been called, results in log.Panic()

func (*Region) Reserve

func (r *Region) Reserve()

Reserve marks the buffer area backing this *Region object as a "no-write" zone, until the corresponding Release() has been called. Losing a Reserve()d *Region object before calling Release() will lead to a deadlock: neither NextRegion(…) nor the collector will be able to proceed beyond the point now-forever reserved. Calling Reserve() more than once on the same object results in log.Panic()

func (*Region) Size

func (r *Region) Size() int

Size returns the size of the region in bytes. It is equivalent to (but cheaper than):

len(r.Bytes)

func (*Region) SubRegion

func (r *Region) SubRegion(offset, length int) *Region

SubRegion is analogous to re-slicing. Supplying offset/length values causing an out of bounds re-slice results in log.Panic(). As a special case one can "clone" a *Region object via:

clone := region.SubRegion(0, region.Size())

type Stats

type Stats struct {
	ReadCalls                int64 `json:"readCalls"`
	CollectorYields          int64 `json:"collectorYields"`
	CollectorWaitNanoseconds int64 `json:"collectorWaitNanoseconds"`
	NextRegionCalls          int64 `json:"nextRegionCalls"`
	EmitterYields            int64 `json:"emitterYields"`
	EmitterWaitNanoseconds   int64 `json:"emitterWaitNanoseconds"`
}

Stats is a simple struct of counters. When (optionally) supplied as part of the constructor options, its fields will be incremented through the course of the qringbuf object lifetime. In order to obtain a consistent read of the stats values, either the collector should have terminated, or you should obtain a Lock() before accessing the structure. Note that collecting timings is disabled by default, due to the non-trivial cost of time.Now(). Toggle the boolean Config.TrackTiming to enable.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL