flowrate

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 16, 2022 License: GPL-3.0, BSD-3-Clause Imports: 6 Imported by: 0

README

Based on https://github.com/mxk/go-flowrate.

Modified to allow seeking and to calculate a total progress.

Optionally, it can also skip rate limiting on the first pass. This is necessary when transferring files over unencrypted HTTP, since the AWS SDK will in this case calculate a checksum of the payload before it initiates the request. This case will only happen when the user uses --endpoint-url with an http:// endpoint.

Here's a list of the calls that will occur:

  1. The AWS SDK first checks if the stream is seekable, and seeks to the current position to get the start position (which will always be position 0 for shrimp). https://github.com/aws/smithy-go/blob/ec5b67b07969f689d60b7773c2bef00cc047cd7e/transport/http/request.go#L113
  2. It then seeks to the end to calculate the content length. https://github.com/aws/smithy-go/blob/ec5b67b07969f689d60b7773c2bef00cc047cd7e/transport/http/request.go#L61
  3. It then seeks to the beginning. https://github.com/aws/smithy-go/blob/ec5b67b07969f689d60b7773c2bef00cc047cd7e/transport/http/request.go#L72
  4. It then reads the stream and calculates the checksum of the data. https://github.com/aws/aws-sdk-go-v2/blob/af489c33fe60dd0b21b8ab9ffc700b9204dd4bab/aws/signer/v4/middleware.go#L138-L152
  5. It then seeks to the beginning again. https://github.com/aws/smithy-go/blob/ec5b67b07969f689d60b7773c2bef00cc047cd7e/transport/http/request.go#L91
  6. It then reads the data and sends it over the network as the request payload.

In summary the following calls are made:

streamStartPos = stream.Seek(0, io.SeekCurrent)
endOffset = stream.Seek(0, io.SeekEnd)
stream.Seek(streamStartPos, io.SeekStart)
io.Copy(hash, stream)
stream.Seek(streamStartPos, io.SeekStart)
// the data then is read again and sent over the network, we only rate limit starting here

Documentation

Overview

Package flowrate provides the tools for monitoring and limiting the flow rate of an arbitrary data stream.

Index

Constants

This section is empty.

Variables

View Source
var ErrLimit = errors.New("flowrate: flow rate limit exceeded")

ErrLimit is returned by the Writer when a non-blocking write is short due to the transfer rate limit.

Functions

This section is empty.

Types

type Limiter

type Limiter interface {
	Done() int64
	Status() Status
	SetTransferSize(bytes int64)
	SetLimit(new int64) (old int64)
	SetBlocking(new bool) (old bool)
}

Limiter is implemented by the Reader and Writer to provide a consistent interface for monitoring and controlling data transfer.

type Monitor

type Monitor struct {
	// contains filtered or unexported fields
}

Monitor monitors and limits the transfer rate of a data stream.

func New

func New(sampleRate, windowSize time.Duration) *Monitor

New creates a new flow control monitor. Instantaneous transfer rate is measured and updated for each sampleRate interval. windowSize determines the weight of each sample in the exponential moving average (EMA) calculation. The exact formulas are:

sampleTime = currentTime - prevSampleTime
sampleRate = byteCount / sampleTime
weight     = 1 - exp(-sampleTime/windowSize)
newRate    = weight*sampleRate + (1-weight)*oldRate

The default values for sampleRate and windowSize (if <= 0) are 100ms and 1s, respectively.

func (*Monitor) Done

func (m *Monitor) Done() int64

Done marks the transfer as finished and prevents any further updates or limiting. Instantaneous and current transfer rates drop to 0. Update, IO, and Limit methods become NOOPs. It returns the total number of bytes transferred.

func (*Monitor) IO

func (m *Monitor) IO(n int, err error) (int, error)

IO is a convenience method intended to wrap io.Reader and io.Writer method execution. It calls m.Update(n) and then returns (n, err) unmodified.

func (*Monitor) Limit

func (m *Monitor) Limit(want int, rate int64, block bool) (n int)

Limit restricts the instantaneous (per-sample) data flow to rate bytes per second. It returns the maximum number of bytes (0 <= n <= want) that may be transferred immediately without exceeding the limit. If block == true, the call blocks until n > 0. want is returned unmodified if want < 1, rate < 1, or the transfer is inactive (after a call to Done).

At least one byte is always allowed to be transferred in any given sampling period. Thus, if the sampling rate is 100ms, the lowest achievable flow rate is 10 bytes per second.

For usage examples, see the implementation of Reader and Writer in io.go.

func (*Monitor) SetTotal

func (m *Monitor) SetTotal(prevBytes, totalBytes int64)

SetTotal specifies information to calculate total progress.

func (*Monitor) SetTransferSize

func (m *Monitor) SetTransferSize(bytes int64)

SetTransferSize specifies the total size of the data transfer, which allows the Monitor to calculate the overall progress and time to completion.

func (*Monitor) Status

func (m *Monitor) Status() Status

Status returns current transfer status information. The returned value becomes static after a call to Done.

func (*Monitor) Update

func (m *Monitor) Update(n int) int

Update records the transfer of n bytes and returns n. It should be called after each Read/Write operation, even if n is 0.

type Percent

type Percent uint32

Percent represents a percentage in increments of 1/1000th of a percent.

func (Percent) Float

func (p Percent) Float() float64

func (Percent) String

func (p Percent) String() string

type Reader

type Reader struct {
	io.ReadSeeker // Data source
	*Monitor      // Flow control monitor
	// contains filtered or unexported fields
}

Reader implements io.ReadCloser with a restriction on the rate of data transfer.

func NewReader

func NewReader(r io.ReadSeeker, limit int64, skipFirstPass bool) *Reader

NewReader restricts all Read operations on r to limit bytes per second.

func (*Reader) Close

func (r *Reader) Close() error

Close closes the underlying reader if it implements the io.Closer interface.

func (*Reader) Read

func (r *Reader) Read(p []byte) (n int, err error)

Read reads up to len(p) bytes into p without exceeding the current transfer rate limit. It returns (0, nil) immediately if r is non-blocking and no new bytes can be read at this time.

func (*Reader) Seek

func (r *Reader) Seek(offset int64, whence int) (n int64, err error)

func (*Reader) SetBlocking

func (r *Reader) SetBlocking(new bool) (old bool)

SetBlocking changes the blocking behavior and returns the previous setting. A Read call on a non-blocking reader returns immediately if no additional bytes may be read at this time due to the rate limit.

func (*Reader) SetLimit

func (r *Reader) SetLimit(new int64) (old int64)

SetLimit changes the transfer rate limit to new bytes per second and returns the previous setting.

type Status

type Status struct {
	Active   bool          // Flag indicating an active transfer
	Start    time.Time     // Transfer start time
	Duration time.Duration // Time period covered by the statistics
	Idle     time.Duration // Time since the last transfer of at least 1 byte
	Bytes    int64         // Total number of bytes transferred
	Samples  int64         // Total number of samples taken
	InstRate int64         // Instantaneous transfer rate
	CurRate  int64         // Current transfer rate (EMA of InstRate)
	AvgRate  int64         // Average transfer rate (Bytes / Duration)
	PeakRate int64         // Maximum instantaneous transfer rate
	BytesRem int64         // Number of bytes remaining in the transfer
	TimeRem  time.Duration // Estimated time to completion
	Progress Percent       // Overall transfer progress

	TotalBytes    int64         // Total number of bytes transferred
	TotalBytesRem int64         // Number of bytes remaining in the transfer
	TotalTimeRem  time.Duration // Estimated time to completion
	TotalProgress Percent       // Overall transfer progress
}

Status represents the current Monitor status. All transfer rates are in bytes per second rounded to the nearest byte.

type Writer

type Writer struct {
	io.Writer // Data destination
	*Monitor  // Flow control monitor
	// contains filtered or unexported fields
}

Writer implements io.WriteCloser with a restriction on the rate of data transfer.

func NewWriter

func NewWriter(w io.Writer, limit int64) *Writer

NewWriter restricts all Write operations on w to limit bytes per second. The transfer rate and the default blocking behavior (true) can be changed directly on the returned *Writer.

func (*Writer) Close

func (w *Writer) Close() error

Close closes the underlying writer if it implements the io.Closer interface.

func (*Writer) SetBlocking

func (w *Writer) SetBlocking(new bool) (old bool)

SetBlocking changes the blocking behavior and returns the previous setting. A Write call on a non-blocking writer returns as soon as no additional bytes may be written at this time due to the rate limit.

func (*Writer) SetLimit

func (w *Writer) SetLimit(new int64) (old int64)

SetLimit changes the transfer rate limit to new bytes per second and returns the previous setting.

func (*Writer) Write

func (w *Writer) Write(p []byte) (n int, err error)

Write writes len(p) bytes from p to the underlying data stream without exceeding the current transfer rate limit. It returns (n, ErrLimit) if w is non-blocking and no additional bytes can be written at this time.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL