improved

package
v1.12.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 8, 2023 License: MIT Imports: 14 Imported by: 0

Documentation

Overview

Package improved provides an "improved" version of the venerable eestream.StripeReader type from 2018.

Index

Constants

This section is empty.

Variables

View Source
var (

	// Error is an eestream/improved error.
	Error = errs.Class("eestream/improved")
)

Functions

This section is empty.

Types

type Batch

type Batch struct {
	// contains filtered or unexported fields
}

A Batch is a reference counted slice of erasure shares. Batches are returned by BatchPool.Get with a starting reference count of 1.

func (*Batch) Claim

func (b *Batch) Claim() bool

Claim adds 1 to the batch reference count and returns true if the batch was claimable. See Release.

func (*Batch) Release

func (b *Batch) Release()

Release subtracts 1 from the batch reference count, returning the batch to the pool when it hits zero. Future Claim calls will return false once the counter drops to zero.

func (*Batch) Slice

func (b *Batch) Slice() []byte

Slice returns the batch's underlying memory allocation.

type BatchPool

type BatchPool struct {
	// contains filtered or unexported fields
}

A BatchPool is a sync.Pool that deals with batches of erasure shares, serialized as []byte slices of a fixed size. The fixed size is the largest multiple of the erasure share size that fits in standardBufSize.

func NewBatchPool

func NewBatchPool(shareSize int) *BatchPool

NewBatchPool creates a BatchPool with the given erasure share size.

func (*BatchPool) GetAndClaim

func (b *BatchPool) GetAndClaim() *Batch

GetAndClaim returns a batch of the pool. To free the batch, a Dec() call is needed.

func (*BatchPool) Size

func (b *BatchPool) Size() int

Size returns the buffer size used in this pool.

type ErasureScheme

type ErasureScheme interface {
	// Encode will take 'in' and call 'out' with erasure coded pieces.
	Encode(in []byte, out func(num int, data []byte)) error

	// EncodeSingle will take 'in' with the stripe and fill 'out' with the erasure share for piece 'num'.
	EncodeSingle(in, out []byte, num int) error

	// Decode will take a mapping of available erasure coded piece num -> data,
	// 'in', and append the combined data to 'out', returning it.
	Decode(out []byte, in []infectious.Share) ([]byte, error)

	// Rebuild is a direct call to infectious.Rebuild, which does no error
	// detection and is faster.
	Rebuild(in []infectious.Share, out func(infectious.Share)) error

	// ErasureShareSize is the size of the erasure shares that come from Encode
	// and are passed to Decode.
	ErasureShareSize() int

	// StripeSize is the size the stripes that are passed to Encode and come
	// from Decode.
	StripeSize() int

	// Encode will generate this many erasure shares and therefore this many pieces.
	TotalCount() int

	// Decode requires at least this many pieces.
	RequiredCount() int
}

ErasureScheme represents the general format of any erasure scheme algorithm. If this interface can be implemented, the rest of this library will work with it.

type PiecesProgress

type PiecesProgress struct {
	// contains filtered or unexported fields
}

PiecesProgress is an interesting concurrency primitive we don't know what to call, but it's kind of like those old clocks that employees would clock in and out of. Imagine that Looney Toons episode where Sam the Sheepdog and Wile E Coyote are clocking in and out.

In our case, PiecesProgress is two dimensional:

  • There is the neededShares vs total dimension - this is the number of shares that are necessary for Reed Solomon construction, for instance.
  • There is the current watermark of what stripe specific pieces are working on.

A bunch of piece readers will be keeping PiecesProgress updated with how many shares they have downloaded within their piece. When a piece reader tells PiecesProgress that they have downloaded n shares, and that was the trigger that means we now have the neededShares necessary shares at a certain watermark or stripe, PiecesProgress will tell the piece reader to wake up the combining layer.

This data structure is designed to cause these wakeups to happen as little as possible.

func NewPiecesProgress

func NewPiecesProgress(minimum, total int32) *PiecesProgress

NewPiecesProgress constructs PiecesProgress with a neededShares number of necessary shares per stripe, out of total total shares. PiecesProgress doesn't care about how many stripes there are but will keep track of which stripe each share reader is on.

func (*PiecesProgress) AcknowledgeNewStripes

func (y *PiecesProgress) AcknowledgeNewStripes()

AcknowledgeNewStripes tells PiecesProgress that the combiner has woken up and new alarms are okay to trigger.

func (*PiecesProgress) IncreaseNeededShares

func (y *PiecesProgress) IncreaseNeededShares() bool

IncreaseNeededShares tells PiecesProgress that going forward, we need more shares per watermark value.

func (*PiecesProgress) NeededShares

func (y *PiecesProgress) NeededShares() int32

NeededShares returns the current value of the number of shares required at a given watermark.

func (*PiecesProgress) PieceSharesReceived

func (y *PiecesProgress) PieceSharesReceived(idx int) int32

PieceSharesReceived returns the current watermark for reader idx.

func (*PiecesProgress) SetStripesNeeded

func (y *PiecesProgress) SetStripesNeeded(required int32)

SetStripesNeeded tells PiecesProgress what neededShares stripe is needed next.

func (*PiecesProgress) SharesCompleted

func (y *PiecesProgress) SharesCompleted(idx int, delta int32) bool

SharesCompleted adds some read events to a given index. If SharesCompleted returns true, then the calling reader should wake up the combiner.

type StreamingPiece

type StreamingPiece struct {
	// contains filtered or unexported fields
}

A StreamingPiece is an in memory storage location for a stream of bytes being operated on by a single producer and a single consumer in atomic units of a given erasure share size. The StreamingPiece type must know its full expected size up front, and allocates slots for each *BufPool batch of erasure shares up to that total size. It will hydrate these slots on demand and free them back to the BufPool as they are consumed.

func NewStreamingPiece

func NewStreamingPiece(shareSize int, totalSize int64, pool *BatchPool) *StreamingPiece

NewStreamingPiece creates a buffer that uses units of size unitSize, with a total amount of bytes of totalSize. It uses pool to hydrate and return buffers in its slots.

func (*StreamingPiece) Err

func (b *StreamingPiece) Err() error

Err returns the last error encountered during reading.

func (*StreamingPiece) MarkCompleted

func (b *StreamingPiece) MarkCompleted(sharesCompleted int)

MarkCompleted tells the StreamingPiece to return some internal batches back to the BatchPool, since we don't need them anymore. It will assume that none of the first sharesCompleted units will be asked for again.

func (*StreamingPiece) ReadShare

func (b *StreamingPiece) ReadShare(shareIdx int) (data []byte, release func(), err error)

ReadShare returns the byte slice that references the read data in a buffer representing the share with index shareIdx. Note that shareIdx is not the Reed Solomon Share Number, since all shares in this buffer share the same Reed Solomon Share Number. If a share at shareIdx cannot be returned, it will return an error, which may be a read error determined by ReadSharesFrom. The release callback must be released when the share is done being read from.

func (*StreamingPiece) ReadSharesFrom

func (b *StreamingPiece) ReadSharesFrom(r io.Reader) (shareCount int, done bool)

ReadSharesFrom is going to call r.Read() once, and will return the number of full shares that are now newly completely read as a result of this call. If r.Read() returns an error or io.EOF, or no data is expected otherwise, done will be true. The read error if any is available from Err() or ReadShare().

type StripeReader

type StripeReader struct {
	// contains filtered or unexported fields
}

StripeReader reads from a collection of piece io.ReadClosers in parallel, recombining them into a single stream using an ErasureScheme.

func New

func New(readers map[int]io.ReadCloser, scheme ErasureScheme, totalStripes int,
	errorDetection bool) *StripeReader

New makes a new StripeReader using the provided map of share number to io.ReadClosers, an ErasureScheme, the total number of stripes in the stream, and whether or not to use the Erasure Scheme's error detection.

func (*StripeReader) Close

func (s *StripeReader) Close() error

Close does *not* close the readers it received in the constructor. Close does *not* wait for reader goroutines to shut down. See CloseAndWait if you want other behavior. Close mimics the older eestream.StripeReader behavior.

func (*StripeReader) CloseAndWait

func (s *StripeReader) CloseAndWait() error

CloseAndWait closes all readers and waits for all goroutines.

func (*StripeReader) ReadStripes

func (s *StripeReader) ReadStripes(ctx context.Context, nextStripe int64, out []byte) (_ []byte, count int, err error)

ReadStripes returns 1 or more stripes. out is overwritten.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL