dataplane

package
v0.0.0-...-363c35e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 5, 2024 License: MIT Imports: 38 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CurrentBufferFormatVersion = "0.3.0"

	BufferStatusComplete BufferStatus = "complete"
	BufferStatusFailed   BufferStatus = "failed"

	HashChainHeader  = "x-ms-meta-cumulative_hash_chain"
	ContentMD5Header = "Content-MD5"

	StartMetadataBlobName = ".bufferstart"
	EndMetadataBlobName   = ".bufferend"
)
View Source
const (
	DefaultReadDop  = 32
	MaxRetries      = 6
	ResponseTimeout = 100 * time.Second
)
View Source
const (
	DefaultWriteDop              = 16
	DefaultBlockSize             = 4 * 1024 * 1024
	EncodedHashChainInitialValue = "MDAwMDAwMDAwMDAwMDAwMA=="
)
View Source
const CurrentSasVersion = "0.1.0"

Variables

View Source
var (
	ErrNotFound          = errors.New("not found")
	ErrBufferFailedState = errors.New("the buffer is in a permanently failed state")
)
View Source
var (
	ErrInvalidSas          = errors.New("the SAS token is not valid")
	ErrSasActionNotAllowed = errors.New("the requested action is not permissed with the given SAS token")
)
View Source
var (
	DefaultFlushInterval = 1 * time.Second
)
View Source
var (
	ErrAccessStringNotUri = errors.New("the buffer access string is invalid. It must be a URI or the path of a file whose contents is a URI")
)

Functions

func AddCommonBlobRequestHeaders

func AddCommonBlobRequestHeaders(header http.Header)

func DownloadBlob

func DownloadBlob(ctx context.Context, httpClient *retryablehttp.Client, blobUri string, waitForBlob *atomic.Bool, blobNumber *int64, finalBlobNumber *atomic.Int64) (*readData, error)

func Gen

func Gen(byteCount int64, outputWriter io.Writer) error

func GetFreePort

func GetFreePort() (port int, err error)

func GetUriFromAccessString

func GetUriFromAccessString(accessString string) (*url.URL, error)

func NewSshTunnelPool

func NewSshTunnelPool(ctx context.Context, sshParams client.SshParams, count int) *sshTunnelPool

func Read

func Read(ctx context.Context, uri *url.URL, outputWriter io.Writer, options ...ReadOption) error

func RelayInputServer

func RelayInputServer(
	ctx context.Context,
	listeners []net.Listener,
	bufferId string,
	outputWriter io.Writer,
	validateSignatureFunc ValidateSignatureFunc,
) error

func RelayOutputServer

func RelayOutputServer(
	ctx context.Context,
	listeners []net.Listener,
	containerId string,
	inputReaderChan <-chan io.ReadCloser,
	errorChan <-chan error,
	validateSignatureFunc ValidateSignatureFunc,
) error

func ValidateSas

func ValidateSas(containerId string, action SasAction, queryString url.Values, validateSignature ValidateSignatureFunc) error

func Write

func Write(ctx context.Context, uri *url.URL, inputReader io.Reader, options ...WriteOption) error

If invalidHashChain is set to true, the value of the hash chain attached to the blob will always be the Inital Value. This should only be set for testing.

Types

type BufferBlob

type BufferBlob struct {
	BlobNumber int64
	Contents   []byte
	Error      error

	// For Writing
	PreviousCumulativeHash chan string
	CurrentCumulativeHash  chan string

	// For Reading
	EncodedMD5Hash      string
	EncodedMD5ChainHash string
}

type BufferEndMetadata

type BufferEndMetadata struct {
	Status BufferStatus `json:"status"`
}

type BufferStartMetadata

type BufferStartMetadata struct {
	Version string `json:"version"`
}

type BufferStatus

type BufferStatus string

type Container

type Container struct {
	*url.URL
}

func (*Container) GetBlobUri

func (c *Container) GetBlobUri(blobNumber int64) string

func (*Container) GetContainerName

func (c *Container) GetContainerName() string

func (*Container) GetEndMetadataUri

func (c *Container) GetEndMetadataUri() string

func (*Container) GetStartMetadataUri

func (c *Container) GetStartMetadataUri() string

func (*Container) SupportsRelay

func (c *Container) SupportsRelay() bool

type MergedContext

type MergedContext struct {
	context.Context // The context that is is used for deadlines and cancellation
	// contains filtered or unexported fields
}

func (*MergedContext) Value

func (c *MergedContext) Value(key any) any

type PartiallyBufferedReader

type PartiallyBufferedReader struct {
	io.Reader
	// contains filtered or unexported fields
}

An io.Reader that stores the first N bytes read from the underlying reader as they are read so that it can be rewound and read again, if <= N bytes were read.

func NewPartiallyBufferedReader

func NewPartiallyBufferedReader(r io.Reader, capacity int) *PartiallyBufferedReader

func (*PartiallyBufferedReader) Read

func (r *PartiallyBufferedReader) Read(p []byte) (n int, err error)

func (*PartiallyBufferedReader) Rewind

func (r *PartiallyBufferedReader) Rewind() error

type ReadOption

type ReadOption func(o *readOptions)

func WithReadDop

func WithReadDop(dop int) ReadOption

func WithReadHttpClient

func WithReadHttpClient(httpClient *retryablehttp.Client) ReadOption

type ReaderWithMetrics

type ReaderWithMetrics struct {
	// contains filtered or unexported fields
}

func (*ReaderWithMetrics) Read

func (c *ReaderWithMetrics) Read(p []byte) (n int, err error)

type SasAction

type SasAction int
const (
	SasActionRead   SasAction = iota
	SasActionCreate SasAction = iota
)

type TransferMetrics

type TransferMetrics struct {
	// contains filtered or unexported fields
}

func NewTransferMetrics

func NewTransferMetrics(ctx context.Context) *TransferMetrics

func (*TransferMetrics) Stop

func (ts *TransferMetrics) Stop()

func (*TransferMetrics) Update

func (ts *TransferMetrics) Update(byteCount uint64, completedBuffers uint64)

type ValidateSignatureFunc

type ValidateSignatureFunc func(data []byte, signature []byte) bool

func CreateSignatureValidationFunc

func CreateSignatureValidationFunc(primarySigningPublicKeyPath, secondarySigningPublicKeyPath string) (ValidateSignatureFunc, error)

type WriteOption

type WriteOption func(o *writeOptions)

func WithWriteBlockSize

func WithWriteBlockSize(blockSize int) WriteOption

func WithWriteDop

func WithWriteDop(dop int) WriteOption

func WithWriteFlushInterval

func WithWriteFlushInterval(flushInterval time.Duration) WriteOption

func WithWriteHttpClient

func WithWriteHttpClient(httpClient *retryablehttp.Client) WriteOption

func WithWriteMetadataEndWriteTimeout

func WithWriteMetadataEndWriteTimeout(timeout time.Duration) WriteOption

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL