Documentation ¶
Overview ¶
Package improved provides an "improved" version of the venerable eestream.StripeReader type from 2018.
Index ¶
- Variables
- type Batch
- type BatchPool
- type ErasureScheme
- type PiecesProgress
- func (y *PiecesProgress) AcknowledgeNewStripes()
- func (y *PiecesProgress) IncreaseNeededShares() bool
- func (y *PiecesProgress) NeededShares() int32
- func (y *PiecesProgress) PieceSharesReceived(idx int) int32
- func (y *PiecesProgress) SetStripesNeeded(required int32)
- func (y *PiecesProgress) SharesCompleted(idx int, delta int32) bool
- type StreamingPiece
- type StripeReader
Constants ¶
This section is empty.
Variables ¶
var ( // Error is an eestream/improved error. Error = errs.Class("eestream/improved") )
Functions ¶
This section is empty.
Types ¶
type Batch ¶
type Batch struct {
// contains filtered or unexported fields
}
A Batch is a reference counted slice of erasure shares. Batches are returned by BatchPool.Get with a starting reference count of 1.
func (*Batch) Claim ¶
Claim adds 1 to the batch reference count and returns true if the batch was claimable. See Release.
type BatchPool ¶
type BatchPool struct {
// contains filtered or unexported fields
}
A BatchPool is a sync.Pool that deals with batches of erasure shares, serialized as []byte slices of a fixed size. The fixed size is the largest multiple of the erasure share size that fits in standardBufSize.
func NewBatchPool ¶
NewBatchPool creates a BatchPool with the given erasure share size.
func (*BatchPool) GetAndClaim ¶
GetAndClaim returns a batch of the pool. To free the batch, a Dec() call is needed.
type ErasureScheme ¶
type ErasureScheme interface { // Encode will take 'in' and call 'out' with erasure coded pieces. Encode(in []byte, out func(num int, data []byte)) error // EncodeSingle will take 'in' with the stripe and fill 'out' with the erasure share for piece 'num'. EncodeSingle(in, out []byte, num int) error // Decode will take a mapping of available erasure coded piece num -> data, // 'in', and append the combined data to 'out', returning it. Decode(out []byte, in []infectious.Share) ([]byte, error) // Rebuild is a direct call to infectious.Rebuild, which does no error // detection and is faster. Rebuild(in []infectious.Share, out func(infectious.Share)) error // and are passed to Decode. ErasureShareSize() int // StripeSize is the size the stripes that are passed to Encode and come // from Decode. StripeSize() int // Encode will generate this many erasure shares and therefore this many pieces. TotalCount() int // Decode requires at least this many pieces. RequiredCount() int }
ErasureScheme represents the general format of any erasure scheme algorithm. If this interface can be implemented, the rest of this library will work with it.
type PiecesProgress ¶
type PiecesProgress struct {
// contains filtered or unexported fields
}
PiecesProgress is an interesting concurrency primitive we don't know what to call, but it's kind of like those old clocks that employees would clock in and out of. Imagine that Looney Toons episode where Sam the Sheepdog and Wile E Coyote are clocking in and out.
In our case, PiecesProgress is two dimensional:
- There is the neededShares vs total dimension - this is the number of shares that are necessary for Reed Solomon construction, for instance.
- There is the current watermark of what stripe specific pieces are working on.
A bunch of piece readers will be keeping PiecesProgress updated with how many shares they have downloaded within their piece. When a piece reader tells PiecesProgress that they have downloaded n shares, and that was the trigger that means we now have the neededShares necessary shares at a certain watermark or stripe, PiecesProgress will tell the piece reader to wake up the combining layer.
This data structure is designed to cause these wakeups to happen as little as possible.
func NewPiecesProgress ¶
func NewPiecesProgress(minimum, total int32) *PiecesProgress
NewPiecesProgress constructs PiecesProgress with a neededShares number of necessary shares per stripe, out of total total shares. PiecesProgress doesn't care about how many stripes there are but will keep track of which stripe each share reader is on.
func (*PiecesProgress) AcknowledgeNewStripes ¶
func (y *PiecesProgress) AcknowledgeNewStripes()
AcknowledgeNewStripes tells PiecesProgress that the combiner has woken up and new alarms are okay to trigger.
func (*PiecesProgress) IncreaseNeededShares ¶
func (y *PiecesProgress) IncreaseNeededShares() bool
IncreaseNeededShares tells PiecesProgress that going forward, we need more shares per watermark value.
func (*PiecesProgress) NeededShares ¶
func (y *PiecesProgress) NeededShares() int32
NeededShares returns the current value of the number of shares required at a given watermark.
func (*PiecesProgress) PieceSharesReceived ¶
func (y *PiecesProgress) PieceSharesReceived(idx int) int32
PieceSharesReceived returns the current watermark for reader idx.
func (*PiecesProgress) SetStripesNeeded ¶
func (y *PiecesProgress) SetStripesNeeded(required int32)
SetStripesNeeded tells PiecesProgress what neededShares stripe is needed next.
func (*PiecesProgress) SharesCompleted ¶
func (y *PiecesProgress) SharesCompleted(idx int, delta int32) bool
SharesCompleted adds some read events to a given index. If SharesCompleted returns true, then the calling reader should wake up the combiner.
type StreamingPiece ¶
type StreamingPiece struct {
// contains filtered or unexported fields
}
A StreamingPiece is an in memory storage location for a stream of bytes being operated on by a single producer and a single consumer in atomic units of a given erasure share size. The StreamingPiece type must know its full expected size up front, and allocates slots for each *BufPool batch of erasure shares up to that total size. It will hydrate these slots on demand and free them back to the BufPool as they are consumed.
func NewStreamingPiece ¶
func NewStreamingPiece(shareSize int, totalSize int64, pool *BatchPool) *StreamingPiece
NewStreamingPiece creates a buffer that uses units of size unitSize, with a total amount of bytes of totalSize. It uses pool to hydrate and return buffers in its slots.
func (*StreamingPiece) Err ¶
func (b *StreamingPiece) Err() error
Err returns the last error encountered during reading.
func (*StreamingPiece) MarkCompleted ¶
func (b *StreamingPiece) MarkCompleted(sharesCompleted int)
MarkCompleted tells the StreamingPiece to return some internal batches back to the BatchPool, since we don't need them anymore. It will assume that none of the first sharesCompleted units will be asked for again.
func (*StreamingPiece) ReadShare ¶
func (b *StreamingPiece) ReadShare(shareIdx int) (data []byte, release func(), err error)
ReadShare returns the byte slice that references the read data in a buffer representing the share with index shareIdx. Note that shareIdx is not the Reed Solomon Share Number, since all shares in this buffer share the same Reed Solomon Share Number. If a share at shareIdx cannot be returned, it will return an error, which may be a read error determined by ReadSharesFrom. The release callback must be released when the share is done being read from.
func (*StreamingPiece) ReadSharesFrom ¶
func (b *StreamingPiece) ReadSharesFrom(r io.Reader) (shareCount int, done bool)
ReadSharesFrom is going to call r.Read() once, and will return the number of full shares that are now newly completely read as a result of this call. If r.Read() returns an error or io.EOF, or no data is expected otherwise, done will be true. The read error if any is available from Err() or ReadShare().
type StripeReader ¶
type StripeReader struct {
// contains filtered or unexported fields
}
StripeReader reads from a collection of piece io.ReadClosers in parallel, recombining them into a single stream using an ErasureScheme.
func New ¶
func New(readers map[int]io.ReadCloser, scheme ErasureScheme, totalStripes int, errorDetection bool) *StripeReader
New makes a new StripeReader using the provided map of share number to io.ReadClosers, an ErasureScheme, the total number of stripes in the stream, and whether or not to use the Erasure Scheme's error detection.
func (*StripeReader) Close ¶
func (s *StripeReader) Close() error
Close does *not* close the readers it received in the constructor. Close does *not* wait for reader goroutines to shut down. See CloseAndWait if you want other behavior. Close mimics the older eestream.StripeReader behavior.
func (*StripeReader) CloseAndWait ¶
func (s *StripeReader) CloseAndWait() error
CloseAndWait closes all readers and waits for all goroutines.
func (*StripeReader) ReadStripes ¶
func (s *StripeReader) ReadStripes(ctx context.Context, nextStripe int64, out []byte) (_ []byte, count int, err error)
ReadStripes returns 1 or more stripes. out is overwritten.