Documentation ¶
Overview ¶
Package qringbuf provides a concurrency-friendly, zero-copy abstraction of io.ReadAtLeast(…) over a pre-allocated ring-buffer, populated asynchronously by a standalone goroutine. It is primarily designed for processing a series of consecutive sub-streams from a single io.Reader, each sub-stream in turn comprised of variable-length records.
The buffer object DOES NOT ASSUME exclusive ownership of the supplied io.Reader, never reads more than instructed by an argument to StartFill(…), and exposes a standard sync.Mutex interface allowing pausing all operations when exclusive access of the underlying Reader is desired.
Examples ¶
In all cases below the background "collector" goroutine reading from the enclosed someIoReader into the ring buffer is guaranteed to:
- never overwrite the buffer portion backing the latest result of NextRegion(…)
- never overwrite any buffer portion backing a Reserve()d (Sub)Region
In code the basic usage looks roughly like this (error/flow handling elided):
qrb, initErr := qringbuf.NewFromReader(someIoReader, qringbuf.Config{…}) … var nextSubstreamSize int64 for { sizeErr := binary.Read( someIoReader, binary.BigEndian, &nextSubstreamSize, ) … startErr := qrb.StartFill(nextSubstreamSize) … var available, processed int for { // Reevaluate available and processed from the *previous* round, // indicating how much has NOT been processed, and needs re-serving // Note: one MUST advance the stream by at least a single byte reg, streamErr := qrb.NextRegion(available - processed) // io.EOF only means the collector stopped: there could be up to // BufferSize() bytes remaining in the buffer. Loop until reg is nil if (streamErr != nil && streamErr != io.EOF) { return streamErr } else if reg == nil { break } // Work with region, processing all or just a portion of the data available = reg.Size() processed = frobnicate(reg.Bytes(), …) } }
In addition one can operate over individual (sub)regions with "fearless concurrency":
… var available, processed int for { // Reevaluate available and processed from the *previous* round, // indicating how much has NOT been processed, and needs re-serving // Note: one MUST advance the stream by at least a single byte reg, streamErr := qrb.NextRegion(available - processed) // io.EOF only means the collector stopped: there could be up to // BufferSize() bytes remaining in the buffer. Loop until reg is nil if (streamErr != nil && streamErr != io.EOF) { return streamErr } else if reg == nil { break } available = reg.Size() if available > 256 { reg = reg.SubRegion(0, 256) } reg.Reserve() processed = reg.Size() go func() { frobnicate(reg.Bytes(), …) reg.Release() }() }
Implementation notes ¶
The specific technical guarantees made by an object of this package are:
- Memory for incoming data is allocated only at construction time, never during streaming
- StartFill(…) spawns off one (and only one) goroutine (collector) which terminates when: ◦ It reaches readLimit, if one was supplied to StartFill(…) ◦ It receives any error from the wrapped reader (including io.EOF or os.ErrClosed)
- Every call to NextRegion(…) blocks until it can return: ◦ ( *Region object representing a contiguous slice of at least MinRegion bytes, nil ) ◦ ( *Region object representing all remaining data, any underlying error including io.EOF ) ◦ ( nil when there is no data remaining in buffer, any underlying error including io.EOF )
- *Region.Bytes() is always a slice of the underlying buffer, no data copying takes place
- Data backing a *Region is guaranteed to remain available / not be overwritten, provided: ◦ NextRegion(…) has not been called again allowing further writes into the buffer ◦ *Region.Reserve() was invoked, which blocks writes until a subsequent *Region.Release()
Unlike io.ReadAtLeast(…), errors from the underlying reader are always made available on NextRegion(…). As with the standard io.Read(…) semantics, an error can be returned together with a result. One should always check whether the *Region return value is nil first, before processing the error. See the documentation of io.Read(…) for an extended discussion.
Changes of the NextRegion(…) "emitter" and collector positions are protected by a mutex on the qringbuf object. Calls modifying the buffer state will block until this lock can be obtained. The same mutex is exposed as part of the API, so one can pause the collector if a direct read and/or skip on the underlying io.Reader is needed.
The *Region.{Reserve/Release}() functionality does not use the mutex, ensuring that an asynchronous Release() call can not be affected by the current state of the buffer. Reservation tracking is implemented as an atomically modified list of reservation counts, one int32 per SectorSize bytes of the buffer.
The reservation system explicitly allows "recursive locking": you can hold an arbitrary number of reservations over a sector by repeatedly creating SubRegion(…) objects. Care must be taken to release every single reservation obtained previously, otherwise the collector will remain blocked forever.
Follows an illustration of a contrived lifecycle of a hypothetical qringbuf object initialized with:
qringbuf.Config{ BufferSize: 64, MinRegion: 16, MinRead: 8, MaxCopy: 24, }
Note that for brevity THE DIAGRAMS BELOW ARE DECIDEDLY NOT REPRESENTATIVE of a typical lifecycle. Normally BufferSize is an order of magnitude larger than MinRegion and MaxCopy, and the time spent waiting and copying data is insignificant in relation to all other possible states. Also outstanding async reservations typically trail the emitter very closely, so after a wrap the collector is virtually never blocked, contrary to what is depicted below. Instead the diagrams merely demonstrate the choices this library makes dealing with the "tricky parts" of maintaining the illusion of an arbitrary stream of contiguous bytes.
† C is the collector position: the *end* of the most recent read from the underlying io.Reader E is the emitter position: the *start* of the most recently returned NextRegion(…) W is the last value of "E" before a wrap took place, 0 otherwise ⓪ Buffer initialized, StartFill(…) is called. ╆━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╅ C=0 ┃ E=0 ┃ ╄━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╃ |0 |10 |20 |30 |40 |50 |60 ① NextRegion(0) is blocked until MinRegion of 16 is available, fill in progress ╆━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╅ cccccccccc|C=10 ┃ E=0 ┃ ╄━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╃ |0 |10 |20 |30 |40 |50 |60 ② NextRegion(0) returned the first 30 bytes when it could, collector keeps reading further ╆━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╅ eeeeeeeeeeeeeeeeeeeeeeeeeeeeeecccccccccc|C=40 ┃ E=0==========================< ┃ ╄━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╃ |0 |10 |20 |30 |40 |50 |60 ③ User reserves subRegion 18~21 for async workers, recycles last 6 of the 30 bytes, NextRegion(6) returns 17 bytes available at the time, 23 total. Collector keeps reading, until it can no longer satisfy MinRead ╆━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╅ ┋ RRRR eeeeeeeeeeeeeeeeeeeeeeecccccccccccc|C=59┃ ┋ RRRR E=24==================< ┃ ╄━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╃ |0 |10 |20 |30 |40 |50 |60 ④ User recycles last 6 bytes, NextRegion(6) serves the remaining 18 bytes Collector now can satisfy MaxCopy, and copies everything over, repositioning the emitter index. It then blocks, as it can't write past the not-yet released reservation. ╆━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╅ W=41wwwwwwwwwwwwww|C=18 eeeeeeeeeeeeeeeeee| ┃ E=0 RRRR W=41=============<| ┃ ╄━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╃ |0 |10 |20 |30 |40 |50 |60 ⑤ The async job finishes, reservation is released, collector can now advance further, and blocks again as NextRegion(…) has not been called meaning the last 18 bytes are still being processed. ╆━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╅ W=41wwwwwwwwwwwwwwccccccccccccccccccccccc|C=41|eeeeeeeeeeee| ┃ E=0 W=41=============<| ┃ ╄━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╃ |0 |10 |20 |30 |40 |50 |60 ⑥ User recycles 4 bytes, NextRegion(4) serves available 27 bytes, and the cycle continues from the top until error or EOF ╆━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╅ ┋ wwwwccccccccccccccccccccccc|C=41 ┃ ┋ E=14======================<| ┃ ╄━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╃ |0 |10 |20 |30 |40 |50 |60
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { MinRegion int // [1:…] Any result of NextRegion(…) is guaranteed to be at least that many bytes long, except at EOF MinRead int // [1:MinRegion] Do not read data from io.Reader until buffer has space to store that many bytes BufferSize int // [MinRead+2*MinRegion:…] Size of the allocated buffer in bytes MaxCopy int // [MinRegion:BufferSize/2] Delay "wrap around" until amount of data to copy falls under this threshold SectorSize int // [1:BufferSize/3] Size of each occupancy sector for *Region.{Reserve|Release}() tracking Stats *Stats TrackTiming bool // When a Stats structure is provided, record timing of operations in addition to counts }
Config is the structure of options expected at initialization time.
type QuantizedRingBuffer ¶
func NewFromReader ¶
func NewFromReader( reader io.Reader, cfg Config, ) (*QuantizedRingBuffer, error)
func (*QuantizedRingBuffer) Buffered ¶
func (qrb *QuantizedRingBuffer) Buffered() int
Buffered returns the current amount of data already read from the underlying reader, but not yet served via NextRegion(…). It is primarily useful for informative error messages:
if err == io.ErrUnexpectedEOF { return fmt.Errorf( "unexpected end of stream after %d bytes (stream expected to be %d bytes long)", (totalProcessedSoFar + int64(qrb.Buffered())), expectedStreamLengthPassedToStartFill, ) }
NOTE: if the collector is active as per IsCollectorRunning(), you *MUST* Lock() the qringbuf object before calling Buffered(). Otherwise you will race with the collector changing internal position indexes as it continues doing its job.
func (*QuantizedRingBuffer) IsCollectorRunning ¶
func (qrb *QuantizedRingBuffer) IsCollectorRunning() bool
IsCollectorRunning returns a boolean indicating whether a collector goroutine currently exists. Only useful for debugging purposes.
func (*QuantizedRingBuffer) NextRegion ¶
func (qrb *QuantizedRingBuffer) NextRegion(regionRemainder int) (*Region, error)
NextRegion returns a *Region object representing a portion of the underlying stream. One can explicitly request overlapping *Region's by supplying the number of bytes to "step back" (use 0 for "just give me what's next"). This functionality is especially useful when processing variable-length records where the only information you have is the maximum size of a record. By initializing your qringbuf with MinRegion equal to this maximum value, you guarantee never experiencing a "short-read".
Every call to NextRegion(…) blocks until it can return:
◦ ( *Region object representing a contiguous slice of at least MinRegion bytes, nil ) ◦ ( *Region object representing all remaining data, any underlying error including io.EOF ) ◦ ( nil when there is no data remaining in buffer, any underlying error including io.EOF )
Any error encountered on the underlying io.Reader, including io.EOF, is returned on every subsequent NextRegion(…) call, often combined with a *Region object representing data still remaining in the ring buffer. The interface follows the model of io.Reader, requiring the user to not only check for errors, but also observe whether data was made available. See the Examples for the typical loop-termination condition and the error handling discussion at https://pkg.go.dev/io?tab=doc#Reader (2nd and 3rd paragraphs)
Each call to NextRegion must advance the stream by at least a single byte: calling NextRegion with regionRemainder equal or larger than Size() of the last *Region results in log.Panic()
func (*QuantizedRingBuffer) StartFill ¶
func (qrb *QuantizedRingBuffer) StartFill(readLimit int64) error
StartFill is the method kicking off the background "collector" goroutine, which then asynchronously writes bytes from the underlying io.Reader into the buffer. This method must be called at the start of every stream-cycle.
Note that a typical use-case of this library is processing a single io.Reader as a series of multiple consecutive sub-streams, using the same qringbuf object and buffer allocation. All one needs is the ability to determine the length of each sub-stream in advance. See the Examples at the start of this document for a complete pseudo-program illustrating this workflow.
No direct controls of the collector goroutine are exposed: the collector will keep trying to read into the buffer until reaching the given readLimit or until the underlying io.Reader returns an error. Closing the io.Reader will result in os.ErrClosed, and thus terminate the collector.
It the supplied readLimit is 0 the collector will continue reading into the buffer until the underlying io.Reader returns io.EOF. If readLimit is a non-zero value, the collector will read exactly that many bytes before shutting down. If the underlying reader returns io.EOF before readLimit is reached, NextRegion(…) will return io.ErrUnexpectedEOF.
If any error other than io.EOF has been encountered, you will not be able to restart a collector via StartFill(). The only way to recover is to allocate a new qringbuf object.
type Region ¶
type Region struct {
// contains filtered or unexported fields
}
Region is an object representing a part of the buffer. Initially a *Region is obtained by calling NextRegion(…), but then one can subdivide a *Region object into smaller portions via SubRegion(…).
func (*Region) Bytes ¶
Bytes returns a slice of the underlying ring buffer. One should take care to copy or finish using the returned slice before making another call to NextRegion(…). If Reserve() has been invoked, then the slice is guaranteed to remain intact before the corresponding Release() call.
func (*Region) Release ¶
func (r *Region) Release()
Release marks the buffer area backing this *Region object free after a previous Reserve() call. Calling Release() more than once on the same object, or before Reserve() has been called, results in log.Panic()
func (*Region) Reserve ¶
func (r *Region) Reserve()
Reserve marks the buffer area backing this *Region object as a "no-write" zone, until the corresponding Release() has been called. Losing a Reserve()d *Region object before calling Release() will lead to a deadlock: neither NextRegion(…) nor the collector will be able to proceed beyond the point now-forever reserved. Calling Reserve() more than once on the same object results in log.Panic()
func (*Region) Size ¶
Size returns the size of the region in bytes. It is equivalent to (but cheaper than):
len(r.Bytes)
type Stats ¶
type Stats struct { ReadCalls int64 `json:"readCalls"` CollectorYields int64 `json:"collectorYields"` CollectorWaitNanoseconds int64 `json:"collectorWaitNanoseconds"` NextRegionCalls int64 `json:"nextRegionCalls"` EmitterYields int64 `json:"emitterYields"` EmitterWaitNanoseconds int64 `json:"emitterWaitNanoseconds"` }
Stats is a simple struct of counters. When (optionally) supplied as part of the constructor options, its fields will be incremented through the course of the qringbuf object lifetime. In order to obtain a consistent read of the stats values, either the collector should have terminated, or you should obtain a Lock() before accessing the structure. Note that collecting timings is disabled by default, due to the non-trivial cost of time.Now(). Toggle the boolean Config.TrackTiming to enable.