streams

package
v0.56.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 6, 2025 License: AGPL-3.0 Imports: 10 Imported by: 2

Documentation

Overview

Package streams provides support for the Web Streams API.

Index

Constants

View Source
const (
	// TypeError is thrown when an argument is not of an expected type
	TypeError errorKind = iota + 1

	// RangeError is thrown when an argument is not within the expected range
	RangeError

	// RuntimeError is thrown when an error occurs that was caused by the JS runtime
	// and is not likely caused by the user, but rather the implementation.
	RuntimeError

	// AssertionError is thrown when an assertion fails
	AssertionError

	// NotSupportedError is thrown when a feature is not supported, or not yet implemented
	NotSupportedError
)
View Source
const (
	// ReadableStreamStateReadable indicates that the stream is readable, and that more data may be read from the stream.
	ReadableStreamStateReadable = "readable"

	// ReadableStreamStateClosed indicates that the stream is closed and cannot be read from.
	ReadableStreamStateClosed = "closed"

	// ReadableStreamStateErrored indicates that the stream has been aborted (errored).
	ReadableStreamStateErrored = "errored"
)
View Source
const (
	// ReadableStreamTypeBytes indicates that the stream is a byte stream.
	ReadableStreamTypeBytes = "bytes"
)

Variables

This section is empty.

Functions

func NewReadableStreamDefaultControllerObject

func NewReadableStreamDefaultControllerObject(controller *ReadableStreamDefaultController) (*sobek.Object, error)

NewReadableStreamDefaultControllerObject creates a new sobek.Object from a ReadableStreamDefaultController instance.

func NewReadableStreamDefaultReaderObject

func NewReadableStreamDefaultReaderObject(reader *ReadableStreamDefaultReader) (*sobek.Object, error)

NewReadableStreamDefaultReaderObject creates a new sobek.Object from a ReadableStreamDefaultReader instance.

func NewReadableStreamFromReader added in v0.52.0

func NewReadableStreamFromReader(vu modules.VU, reader io.Reader) *sobek.Object

NewReadableStreamFromReader is the equivalent of [NewReadableStreamDefaultReader] but to initialize a new ReadableStream from a given io.Reader in Go code. It is useful for those situations when a io.Reader needs to be surfaced up to the JS runtime.

func ReadableStreamReaderGenericInitialize

func ReadableStreamReaderGenericInitialize(reader ReadableStreamGenericReader, stream *ReadableStream)

ReadableStreamReaderGenericInitialize implements the specification ReadableStreamReaderGenericInitialize algorithm.

Types

type BaseReadableStreamReader

type BaseReadableStreamReader struct {
	// contains filtered or unexported fields
}

BaseReadableStreamReader is a base implement

func (*BaseReadableStreamReader) Cancel

func (reader *BaseReadableStreamReader) Cancel(reason sobek.Value) *sobek.Promise

Cancel returns a sobek.Promise that resolves when the stream is canceled.

func (*BaseReadableStreamReader) GetClosed

func (reader *BaseReadableStreamReader) GetClosed() (p *sobek.Promise, resolve, reject func(any) error)

GetClosed returns the reader's closed promise as well as its resolve and reject functions.

func (*BaseReadableStreamReader) GetStream

func (reader *BaseReadableStreamReader) GetStream() *ReadableStream

GetStream returns the stream that owns this reader.

func (*BaseReadableStreamReader) SetClosed

func (reader *BaseReadableStreamReader) SetClosed(p *sobek.Promise, resolve, reject func(any) error)

SetClosed sets the reader's closed promise as well as its resolve and reject functions.

func (*BaseReadableStreamReader) SetStream

func (reader *BaseReadableStreamReader) SetStream(stream *ReadableStream)

SetStream sets the stream that owns this reader.

type ModuleInstance

type ModuleInstance struct {
	// contains filtered or unexported fields
}

ModuleInstance is the module instance that will be created for each VU.

func (*ModuleInstance) Exports

func (mi *ModuleInstance) Exports() modules.Exports

Exports returns the module exports, that will be available in the runtime.

func (*ModuleInstance) NewCountQueuingStrategy

func (mi *ModuleInstance) NewCountQueuingStrategy(call sobek.ConstructorCall) *sobek.Object

NewCountQueuingStrategy is the constructor for the CountQueuingStrategy object.

func (*ModuleInstance) NewReadableStream

func (mi *ModuleInstance) NewReadableStream(call sobek.ConstructorCall) *sobek.Object

NewReadableStream is the constructor for the ReadableStream object.

func (*ModuleInstance) NewReadableStreamDefaultReader

func (mi *ModuleInstance) NewReadableStreamDefaultReader(call sobek.ConstructorCall) *sobek.Object

NewReadableStreamDefaultReader is the constructor for the ReadableStreamDefaultReader object.

type QueueWithSizes

type QueueWithSizes struct {
	Queue          []ValueWithSize
	QueueTotalSize float64
	// contains filtered or unexported fields
}

QueueWithSizes is a queue of values with sizes.

func NewQueueWithSizes

func NewQueueWithSizes(runtime *sobek.Runtime) *QueueWithSizes

NewQueueWithSizes creates a new queue of values with sizes, as described in the specification.

func (*QueueWithSizes) Dequeue

func (q *QueueWithSizes) Dequeue() (sobek.Value, error)

Dequeue removes and returns the first value from the queue.

It implements the DequeueValue abstract operation.

func (*QueueWithSizes) Enqueue

func (q *QueueWithSizes) Enqueue(value sobek.Value, size float64) error

Enqueue adds a value to the queue, and implements the specification's EnqueueValueWithSize abstract operation.

func (*QueueWithSizes) Len

func (q *QueueWithSizes) Len() int

Len returns the length of the queue.

func (*QueueWithSizes) Peek

func (q *QueueWithSizes) Peek() (sobek.Value, error)

Peek returns the first value from the queue without removing it.

It implements the PeekQueueValue abstract operation.

func (*QueueWithSizes) Reset

func (q *QueueWithSizes) Reset()

Reset clears the queue and resets the total size.

It implements the ResetQueue abstract operation.

type ReadRequest

type ReadRequest struct {
	// contains filtered or unexported fields
}

ReadRequest is a struct containing three algorithms to perform in reaction to filling the readable stream's internal queue or changing its state

type ReadResult

type ReadResult struct {
	Value sobek.Value
	Done  bool
}

ReadResult is the result of a read operation

It contains the value read from the stream and a boolean indicating whether or not the stream is done. An undefined value indicates that the stream has been closed.

type ReadableStream

type ReadableStream struct {
	// Locked indicate whether the readable stream is locked to a reader
	Locked bool

	Source *sobek.Object
	// contains filtered or unexported fields
}

ReadableStream is a concrete instance of the general readable stream concept.

It is adaptable to any chunk type, and maintains an internal queue to keep track of data supplied by the underlying source but not yet read by any consumer.

func (*ReadableStream) Cancel

func (stream *ReadableStream) Cancel(reason sobek.Value) *sobek.Promise

Cancel cancels the stream and returns a Promise to the user

func (*ReadableStream) GetReader

func (stream *ReadableStream) GetReader(options *sobek.Object) sobek.Value

GetReader implements the getReader operation.

func (*ReadableStream) Tee

func (stream *ReadableStream) Tee() sobek.Value

Tee implements the tee operation.

type ReadableStreamController

type ReadableStreamController interface {
	Close()
	Enqueue(chunk sobek.Value)
	Error(err sobek.Value)
	// contains filtered or unexported methods
}

ReadableStreamController is the interface implemented by all readable stream controllers.

It defines both the specification's shared controller and private methods.

type ReadableStreamDefaultController

type ReadableStreamDefaultController struct {
	// contains filtered or unexported fields
}

ReadableStreamDefaultController is the default controller for a ReadableStream. It has methods to control the stream's state and internal queue.

For more details, see the specification.

func (*ReadableStreamDefaultController) Close

func (controller *ReadableStreamDefaultController) Close()

Close closes the stream.

It implements the ReadableStreamDefaultController.close() specification algorithm.

func (*ReadableStreamDefaultController) Enqueue

func (controller *ReadableStreamDefaultController) Enqueue(chunk sobek.Value)

Enqueue enqueues a chunk to the stream's internal queue.

It implements the ReadableStreamDefaultController.enqueue(chunk) specification algorithm.

func (*ReadableStreamDefaultController) Error

func (controller *ReadableStreamDefaultController) Error(err sobek.Value)

Error signals that the stream has been errored, and performs the necessary cleanup steps.

It implements the ReadableStreamDefaultController.error(e) specification algorithm.

type ReadableStreamDefaultReader

type ReadableStreamDefaultReader struct {
	BaseReadableStreamReader
	// contains filtered or unexported fields
}

ReadableStreamDefaultReader represents a default reader designed to be vended by a ReadableStream.

func (*ReadableStreamDefaultReader) Cancel

func (reader *ReadableStreamDefaultReader) Cancel(reason sobek.Value) *sobek.Promise

Cancel returns a sobek.Promise that resolves when the stream is canceled.

Calling this method signals a loss of interest in the stream by a consumer. The supplied reason argument will be given to the underlying source, which may or may not use it.

The `reason` argument is optional, and should hold a human-readable reason for the cancellation. This value may or may not be used.

func (*ReadableStreamDefaultReader) Read

func (reader *ReadableStreamDefaultReader) Read() *sobek.Promise

Read returns a sobek.Promise providing access to the next chunk in the stream's internal queue.

func (*ReadableStreamDefaultReader) ReleaseLock

func (reader *ReadableStreamDefaultReader) ReleaseLock()

ReleaseLock releases the reader's lock on the stream.

If the associated stream is errored when the lock is released, the reader will appear errored in that same way subsequently; otherwise, the reader will appear closed.

type ReadableStreamGenericReader

type ReadableStreamGenericReader interface {
	// GetStream returns the stream that owns this reader.
	GetStream() *ReadableStream

	// SetStream sets the stream that owns this reader.
	SetStream(stream *ReadableStream)

	// GetClosed returns a [sobek.Promise] that resolves when the stream is closed.
	GetClosed() (p *sobek.Promise, resolve, reject func(any) error)

	// SetClosed sets the [sobek.Promise] that resolves when the stream is closed.
	SetClosed(p *sobek.Promise, resolve, reject func(any) error)

	// Cancel returns a [sobek.Promise] that resolves when the stream is canceled.
	Cancel(reason sobek.Value) *sobek.Promise
}

ReadableStreamGenericReader defines common internal getters/setters and methods that are shared between ReadableStreamDefaultReader and ReadableStreamBYOBReader objects.

It implements the ReadableStreamReaderGeneric mixin from the specification.

Because we are in the context of Sobek, we cannot really define properties the same way as in the spec, so we use getters/setters instead.

type ReadableStreamReader

type ReadableStreamReader interface {
	ReadableStreamGenericReader

	// Read returns a [sobek.Promise] providing access to the next chunk in the stream's internal queue.
	Read() *sobek.Promise

	// ReleaseLock releases the reader's lock on the stream.
	ReleaseLock()
}

ReadableStreamReader is the interface implemented by all readable stream readers.

type ReadableStreamState

type ReadableStreamState string

ReadableStreamState represents the current state of a ReadableStream

type ReadableStreamType

type ReadableStreamType = string

ReadableStreamType represents the type of the ReadableStream

type RootModule

type RootModule struct{}

RootModule is the module that will be registered with the runtime.

func New

func New() *RootModule

New creates a new RootModule instance.

func (*RootModule) NewModuleInstance

func (rm *RootModule) NewModuleInstance(vu modules.VU) modules.Instance

NewModuleInstance creates a new instance of the module for a specific VU.

type SizeAlgorithm

type SizeAlgorithm = sobek.Callable

SizeAlgorithm is a function that returns the size of a chunk. type SizeAlgorithm func(chunk sobek.Value) (float64, error)

type UnderlyingSource

type UnderlyingSource struct {
	// StartFunc is called immediately during the creation of a ReadableStream.
	//
	// Typically, this is used to a adapt a push source by setting up relevant event listeners.
	// If the setup process is asynchronous, it can return a Promise to signal success or
	// failure; a rejected promise will error the stream.
	Start sobek.Value `json:"start"`

	// PullFunc is  a function that is called whenever the stream's internal queue of chunks
	// becomes not full, i.e. whenever the queue's desired size becomes positive.
	//
	// Generally it will be called repeatedly until the queue reaches its high watermark.
	//
	// This function will not be called until `start()` successfully completes. Additionally,
	// it will only be called repeatedly if it enqueues at least one chunk or fulfills a
	// BYOB request; a no-op `pull` implementation will not be continually called.
	Pull sobek.Value `json:"pull"`

	// CancelFunc is a function that is called when the stream's or reader's `cancel()` method is
	// called.
	//
	// It takes as its argument the same value as was passed to those methods by the consumer.
	//
	// For all streams, this is generally used to release access to the underlying resource.
	//
	// If the shutdown process is asynchronous, it can return a promise to signal success or
	// failure; the result will be communicated via the return value of the cancel() method
	// that was called. Throwing an exception is treated the same as returning a rejected promise.
	Cancel sobek.Value `json:"cancel"`

	// Type is a string indicating the type of the underlying source.
	Type ReadableStreamType `json:"type"`

	// AutoAllocateChunkSize (optional) is a non-negative integer indicating the size of
	// chunks to allocate when auto-allocating chunks.
	//
	// Can be set to a positive integer to cause the implementation to automatically
	// allocate buffers for the underlying source code to write into. In this case, when
	// a consumer is using a default reader, the stream implementation will automatically
	// allocate an ArrayBuffer of the given size, so that `controller.byobRequest` is always
	// present, as if the consumer was using a BYOB reader.
	AutoAllocateChunkSize null.Int `json:"autoAllocateChunkSize"`
	// contains filtered or unexported fields
}

UnderlyingSource represents the underlying source of a ReadableStream, and defines how the underlying data is pulled from the source.

func NewUnderlyingSourceFromObject

func NewUnderlyingSourceFromObject(rt *sobek.Runtime, obj *sobek.Object) (UnderlyingSource, error)

NewUnderlyingSourceFromObject creates a new UnderlyingSource from a sobek.Object.

type UnderlyingSourceCancelCallback

type UnderlyingSourceCancelCallback func(reason any) sobek.Value

UnderlyingSourceCancelCallback is a function that is called when the stream's or reader's `cancel()` method is called.

type UnderlyingSourcePullCallback

type UnderlyingSourcePullCallback func(controller *sobek.Object) *sobek.Promise

UnderlyingSourcePullCallback is a function that is called whenever the stream's internal queue of chunks becomes not full, i.e. whenever the queue's desired size becomes positive.

type UnderlyingSourceStartCallback

type UnderlyingSourceStartCallback func(controller *sobek.Object) sobek.Value

UnderlyingSourceStartCallback is a function that is called immediately during the creation of a ReadableStream.

type ValueWithSize

type ValueWithSize struct {
	Value sobek.Value
	Size  float64
}

ValueWithSize holds a value and its corresponding size.

It is used to store values in the queue.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL