Documentation ¶
Index ¶
- Variables
- func CalcPieceSize(dataSize int64, scheme ErasureScheme) int64
- func Decode(rrs map[int]ranger.Ranger, es ErasureScheme, mbm int, forceErrorDetection bool) (ranger.Ranger, error)
- func DecodeReaders2(ctx context.Context, cancel func(), rs map[int]io.ReadCloser, es ErasureScheme, ...) io.ReadCloser
- func EncodeReader2(ctx context.Context, r io.Reader, rs RedundancyStrategy) (_ []io.ReadCloser, err error)
- func NewFEC(k, n int) (*infectious.FEC, error)
- type Batch
- type BatchPool
- type EncodedRanger
- type ErasureScheme
- type PiecesProgress
- func (y *PiecesProgress) AcknowledgeNewStripes()
- func (y *PiecesProgress) IncreaseNeededShares() bool
- func (y *PiecesProgress) NeededShares() int32
- func (y *PiecesProgress) PieceSharesReceived(idx int) int32
- func (y *PiecesProgress) ProgressSnapshot(out []int32) []int32
- func (y *PiecesProgress) SetStripesNeeded(required int32)
- func (y *PiecesProgress) SharesCompleted(idx int, delta int32) bool
- type RedundancyStrategy
- type Share
- type StreamingPiece
- type StripeReader
Constants ¶
This section is empty.
Variables ¶
var ( // Error is the default eestream errs class. Error = errs.Class("eestream") // ErrInactive is the class of errors returned when a stream is inactive // and should be restarted. ErrInactive = errs.Class("quiescence") )
Functions ¶
func CalcPieceSize ¶
func CalcPieceSize(dataSize int64, scheme ErasureScheme) int64
CalcPieceSize calculates what would be the piece size of the encoded data after erasure coding data with dataSize using the given ErasureScheme.
func Decode ¶
func Decode(rrs map[int]ranger.Ranger, es ErasureScheme, mbm int, forceErrorDetection bool) (ranger.Ranger, error)
Decode takes a map of Rangers and an ErasureScheme and returns a combined Ranger.
rrs is a map of erasure piece numbers to erasure piece rangers. mbm is the maximum memory (in bytes) to be allocated for read buffers. If set to 0, the minimum possible memory will be used. if forceErrorDetection is set to true then k+1 pieces will be always required for decoding, so corrupted pieces can be detected.
func DecodeReaders2 ¶ added in v1.6.0
func DecodeReaders2(ctx context.Context, cancel func(), rs map[int]io.ReadCloser, es ErasureScheme, expectedSize int64, mbm int, forceErrorDetection bool) io.ReadCloser
DecodeReaders2 takes a map of readers and an ErasureScheme returning a combined Reader.
rs is a map of erasure piece numbers to erasure piece streams. expectedSize is the number of bytes expected to be returned by the Reader. mbm is the maximum memory (in bytes) to be allocated for read buffers. If set to 0, the minimum possible memory will be used. if forceErrorDetection is set to true then k+1 pieces will be always required for decoding, so corrupted pieces can be detected.
func EncodeReader2 ¶ added in v1.6.0
func EncodeReader2(ctx context.Context, r io.Reader, rs RedundancyStrategy) (_ []io.ReadCloser, err error)
EncodeReader2 takes a Reader and a RedundancyStrategy and returns a slice of io.ReadClosers.
func NewFEC ¶ added in v1.12.2
func NewFEC(k, n int) (*infectious.FEC, error)
NewFEC creates a *FEC using k required pieces and n total pieces. Encoding data with this *FEC will generate n pieces, and decoding data requires k uncorrupted pieces. During decode, when more than k pieces exist, corrupted data can be detected and recovered from.
Types ¶
type Batch ¶ added in v1.13.0
type Batch struct {
// contains filtered or unexported fields
}
A Batch is a reference counted slice of erasure shares. Batches are returned by BatchPool.Get with a starting reference count of 1.
func (*Batch) Claim ¶ added in v1.13.0
Claim adds 1 to the batch reference count and returns true if the batch was claimable. See Release.
type BatchPool ¶ added in v1.13.0
type BatchPool struct {
// contains filtered or unexported fields
}
A BatchPool is a sync.Pool that deals with batches of erasure shares, serialized as []byte slices of a fixed size. The fixed size is the largest multiple of the erasure share size that fits in standardBufSize.
func NewBatchPool ¶ added in v1.13.0
NewBatchPool creates a BatchPool with the given erasure share size.
func (*BatchPool) GetAndClaim ¶ added in v1.13.0
GetAndClaim returns a batch of the pool. To free the batch, a Dec() call is needed.
type EncodedRanger ¶
type EncodedRanger struct {
// contains filtered or unexported fields
}
EncodedRanger will take an existing Ranger and provide a means to get multiple Ranged sub-Readers. EncodedRanger does not match the normal Ranger interface.
func NewEncodedRanger ¶
func NewEncodedRanger(rr ranger.Ranger, rs RedundancyStrategy) (*EncodedRanger, error)
NewEncodedRanger from the given Ranger and RedundancyStrategy. See the comments for EncodeReader about the repair and success thresholds.
func (*EncodedRanger) OutputSize ¶
func (er *EncodedRanger) OutputSize() int64
OutputSize is like Ranger.Size but returns the Size of the erasure encoded pieces that come out.
func (*EncodedRanger) Range ¶
func (er *EncodedRanger) Range(ctx context.Context, offset, length int64) (_ []io.ReadCloser, err error)
Range is like Ranger.Range, but returns a slice of Readers.
type ErasureScheme ¶
type ErasureScheme interface { // Encode will take 'in' and call 'out' with erasure coded pieces. Encode(in []byte, out func(num int, data []byte)) error // EncodeSingle will take 'in' with the stripe and fill 'out' with the erasure share for piece 'num'. EncodeSingle(in, out []byte, num int) error // Decode will take a mapping of available erasure coded piece num -> data, // 'in', and append the combined data to 'out', returning it. Decode(out []byte, in []infectious.Share) ([]byte, error) // Rebuild is a direct call to infectious.Rebuild, which does no error // detection and is faster. Rebuild(in []infectious.Share, out func(infectious.Share)) error // and are passed to Decode. ErasureShareSize() int // StripeSize is the size the stripes that are passed to Encode and come // from Decode. StripeSize() int // Encode will generate this many erasure shares and therefore this many pieces. TotalCount() int // Decode requires at least this many pieces. RequiredCount() int }
ErasureScheme represents the general format of any erasure scheme algorithm. If this interface can be implemented, the rest of this library will work with it.
func NewRSScheme ¶
func NewRSScheme(fc *infectious.FEC, erasureShareSize int) ErasureScheme
NewRSScheme returns a Reed-Solomon-based ErasureScheme.
func NewUnsafeRSScheme ¶
func NewUnsafeRSScheme(fc *infectious.FEC, erasureShareSize int) ErasureScheme
NewUnsafeRSScheme returns a Reed-Solomon-based ErasureScheme without error correction.
type PiecesProgress ¶ added in v1.13.0
type PiecesProgress struct {
// contains filtered or unexported fields
}
PiecesProgress is an interesting concurrency primitive we don't know what to call, but it's kind of like those old clocks that employees would clock in and out of. Imagine that Looney Toons episode where Sam the Sheepdog and Wile E Coyote are clocking in and out.
In our case, PiecesProgress is two dimensional:
- There is the neededShares vs total dimension - this is the number of shares that are necessary for Reed Solomon construction, for instance.
- There is the current watermark of what stripe specific pieces are working on.
A bunch of piece readers will be keeping PiecesProgress updated with how many shares they have downloaded within their piece. When a piece reader tells PiecesProgress that they have downloaded n shares, and that was the trigger that means we now have the neededShares necessary shares at a certain watermark or stripe, PiecesProgress will tell the piece reader to wake up the combining layer.
This data structure is designed to cause these wakeups to happen as little as possible.
func NewPiecesProgress ¶ added in v1.13.0
func NewPiecesProgress(minimum, total int32) *PiecesProgress
NewPiecesProgress constructs PiecesProgress with a neededShares number of necessary shares per stripe, out of total total shares. PiecesProgress doesn't care about how many stripes there are but will keep track of which stripe each share reader is on.
func (*PiecesProgress) AcknowledgeNewStripes ¶ added in v1.13.0
func (y *PiecesProgress) AcknowledgeNewStripes()
AcknowledgeNewStripes tells PiecesProgress that the combiner has woken up and new alarms are okay to trigger.
func (*PiecesProgress) IncreaseNeededShares ¶ added in v1.13.0
func (y *PiecesProgress) IncreaseNeededShares() bool
IncreaseNeededShares tells PiecesProgress that going forward, we need more shares per watermark value.
func (*PiecesProgress) NeededShares ¶ added in v1.13.0
func (y *PiecesProgress) NeededShares() int32
NeededShares returns the current value of the number of shares required at a given watermark.
func (*PiecesProgress) PieceSharesReceived ¶ added in v1.13.0
func (y *PiecesProgress) PieceSharesReceived(idx int) int32
PieceSharesReceived returns the current watermark for reader idx.
func (*PiecesProgress) ProgressSnapshot ¶ added in v1.13.0
func (y *PiecesProgress) ProgressSnapshot(out []int32) []int32
ProgressSnapshot returns a snapshot of the current progress. No locks are held so it doesn't represent a single point in time in the presence of concurrent mutations.
func (*PiecesProgress) SetStripesNeeded ¶ added in v1.13.0
func (y *PiecesProgress) SetStripesNeeded(required int32)
SetStripesNeeded tells PiecesProgress what neededShares stripe is needed next.
func (*PiecesProgress) SharesCompleted ¶ added in v1.13.0
func (y *PiecesProgress) SharesCompleted(idx int, delta int32) bool
SharesCompleted adds some read events to a given index. If SharesCompleted returns true, then the calling reader should wake up the combiner.
type RedundancyStrategy ¶
type RedundancyStrategy struct { ErasureScheme // contains filtered or unexported fields }
RedundancyStrategy is an ErasureScheme with a repair and optimal thresholds.
func NewRedundancyStrategy ¶
func NewRedundancyStrategy(es ErasureScheme, repairThreshold, optimalThreshold int) (RedundancyStrategy, error)
NewRedundancyStrategy from the given ErasureScheme, repair and optimal thresholds.
repairThreshold is the minimum repair threshold. If set to 0, it will be reset to the TotalCount of the ErasureScheme. optimalThreshold is the optimal threshold. If set to 0, it will be reset to the TotalCount of the ErasureScheme.
func NewRedundancyStrategyFromProto ¶
func NewRedundancyStrategyFromProto(scheme *pb.RedundancyScheme) (RedundancyStrategy, error)
NewRedundancyStrategyFromProto creates new RedundancyStrategy from the given RedundancyScheme protobuf.
func NewRedundancyStrategyFromStorj ¶
func NewRedundancyStrategyFromStorj(scheme storj.RedundancyScheme) (RedundancyStrategy, error)
NewRedundancyStrategyFromStorj creates new RedundancyStrategy from the given storj.RedundancyScheme.
func (*RedundancyStrategy) OptimalThreshold ¶
func (rs *RedundancyStrategy) OptimalThreshold() int
OptimalThreshold is the number of available erasure pieces above which there is no need for the data to be repaired.
func (*RedundancyStrategy) RepairThreshold ¶
func (rs *RedundancyStrategy) RepairThreshold() int
RepairThreshold is the number of available erasure pieces below which the data must be repaired to avoid loss.
type Share ¶ added in v1.12.2
type Share = infectious.Share
A Share represents a piece of the FEC-encoded data.
type StreamingPiece ¶ added in v1.13.0
type StreamingPiece struct {
// contains filtered or unexported fields
}
A StreamingPiece is an in memory storage location for a stream of bytes being operated on by a single producer and a single consumer in atomic units of a given erasure share size. The StreamingPiece type must know its full expected size up front, and allocates slots for each *BufPool batch of erasure shares up to that total size. It will hydrate these slots on demand and free them back to the BufPool as they are consumed.
func NewStreamingPiece ¶ added in v1.13.0
func NewStreamingPiece(shareSize int, totalSize int64, pool *BatchPool) *StreamingPiece
NewStreamingPiece creates a buffer that uses units of size unitSize, with a total amount of bytes of totalSize. It uses pool to hydrate and return buffers in its slots.
func (*StreamingPiece) Err ¶ added in v1.13.0
func (b *StreamingPiece) Err() error
Err returns the last error encountered during reading.
func (*StreamingPiece) MarkCompleted ¶ added in v1.13.0
func (b *StreamingPiece) MarkCompleted(sharesCompleted int)
MarkCompleted tells the StreamingPiece to return some internal batches back to the BatchPool, since we don't need them anymore. It will assume that none of the first sharesCompleted units will be asked for again.
func (*StreamingPiece) ReadShare ¶ added in v1.13.0
func (b *StreamingPiece) ReadShare(shareIdx int) (data []byte, release func(), err error)
ReadShare returns the byte slice that references the read data in a buffer representing the share with index shareIdx. Note that shareIdx is not the Reed Solomon Share Number, since all shares in this buffer share the same Reed Solomon Share Number. If a share at shareIdx cannot be returned, it will return an error, which may be a read error determined by ReadSharesFrom. The release callback must be released when the share is done being read from.
func (*StreamingPiece) ReadSharesFrom ¶ added in v1.13.0
func (b *StreamingPiece) ReadSharesFrom(r io.Reader) (shareCount int, done bool)
ReadSharesFrom is going to call r.Read() once, and will return the number of full shares that are now newly completely read as a result of this call. If r.Read() returns an error or io.EOF, or no data is expected otherwise, done will be true. The read error if any is available from Err() or ReadShare().
type StripeReader ¶
type StripeReader struct {
// contains filtered or unexported fields
}
StripeReader reads from a collection of piece io.ReadClosers in parallel, recombining them into a single stream using an ErasureScheme.
func NewStripeReader ¶
func NewStripeReader(readers map[int]io.ReadCloser, scheme ErasureScheme, totalStripes int, errorDetection bool) *StripeReader
NewStripeReader makes a new StripeReader using the provided map of share number to io.ReadClosers, an ErasureScheme, the total number of stripes in the stream, and whether or not to use the Erasure Scheme's error detection.
func (*StripeReader) Close ¶
func (s *StripeReader) Close() error
Close does *not* close the readers it received in the constructor. Close does *not* wait for reader goroutines to shut down. See CloseAndWait if you want other behavior. Close mimics the older eestream.StripeReader behavior.
func (*StripeReader) CloseAndWait ¶ added in v1.13.0
func (s *StripeReader) CloseAndWait() error
CloseAndWait closes all readers and waits for all goroutines.
func (*StripeReader) ReadStripes ¶ added in v1.12.2
func (s *StripeReader) ReadStripes(ctx context.Context, nextStripe int64, out []byte) (_ []byte, count int, err error)
ReadStripes returns 1 or more stripes. out is overwritten.