dslvm

package
v0.29.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 20, 2024 License: GPL-3.0 Imports: 19 Imported by: 0

Documentation

Overview

Package dslvm contains low-level code for implementing the measurements DSL.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Start

func Start(ctx context.Context, rtx Runtime, stages ...Stage)

Start starts all the given Stage instances.

func Wait

func Wait(channels ...<-chan Done)

Wait waits until all the given channels are done.

Types

type Closer

type Closer interface {
	Close(logger model.Logger) error
}

Closer is something that [Drop] should explicitly close.

type DNSLookupUDPStage

type DNSLookupUDPStage struct {
	// Domain is the MANDATORY domain to resolve using this DNS resolver.
	Domain string

	// Output is the MANDATORY channel emitting IP addresses. We will close this
	// channel when we have finished streaming the resolved addresses.
	Output chan<- string

	// Resolver is the MANDATORY resolver endpoint (e.g., [::1]:53).
	Resolver string

	// Tags contains OPTIONAL tags for the DNS observations.
	Tags []string
}

DNSLookupUDPStage is a Stage that resolves domain names using an UDP resolver.

func (*DNSLookupUDPStage) Run

func (sx *DNSLookupUDPStage) Run(ctx context.Context, rtx Runtime)

Run resolves a Domain using the given Do53 Endpoint and streams the results on Output, which is closed when we're done.

This function honours the semaphore returned by the Runtime ActiveDNSLookups method and waits until it's given the permission to start a lookup.

type DedupAddrsStage

type DedupAddrsStage struct {
	// Inputs contains the MANDATORY channels from which to read IP addresses. We
	// assume that these channels will be closed when done.
	Inputs []<-chan string

	// Output is the MANDATORY channel where we emit the deduplicated IP addresss. We
	// close this channel when all the Inputs have been closed.
	Output chan<- string
}

DedupAddrsStage is a Stage that deduplicates IP addresses.

func (*DedupAddrsStage) Run

func (sx *DedupAddrsStage) Run(ctx context.Context, rtx Runtime)

Run reads possibly duplicate IP addresses from Inputs and emits deduplicated IP addresses on Outputs. We close Outputs when done.

type Done

type Done struct{}

Done indicates that a DSL pipeline terminated.

type DropStage

type DropStage[T any] struct {
	// Input contains the MANDATORY channel from which to read instances to drop. We
	// assume that this channel will be closed when done.
	Input <-chan T

	// Output contains the MANDATORY channel closed when Input has been closed.
	Output chan Done
}

DropStage is a Stage that drops reference to whatever it is passed in input. If the input is a Closer, this stage will also make sure it is closed.

func (*DropStage[T]) Run

func (sx *DropStage[T]) Run(ctx context.Context, rtx Runtime)

Run drops all the input passed to the Input channel and closes Output when done.

type GetaddrinfoStage

type GetaddrinfoStage struct {
	// Domain is the MANDATORY domain to resolve using this DNS resolver.
	Domain string

	// Output is the MANDATORY channel emitting IP addresses. We will close this
	// channel when we have finished streaming the resolved addresses.
	Output chan<- string

	// Tags contains OPTIONAL tags for the DNS observations.
	Tags []string
}

GetaddrinfoStage is a Stage that resolves domain names using getaddrinfo.

func (*GetaddrinfoStage) Run

func (sx *GetaddrinfoStage) Run(ctx context.Context, rtx Runtime)

Run resolves a Domain using the getaddrinfo resolver.

This function honours the semaphore returned by the Runtime ActiveDNSLookups method and waits until it's given the permission to start a lookup.

type HTTPConnection

type HTTPConnection interface {
	// AsSingleUseTransport converts the connection to a single-use HTTP transport
	AsSingleUseTransport(logger model.Logger) model.HTTPTransport

	// Closer embeds the Closer interface
	Closer

	// Network returns the network
	Network() string

	// RemoteAddress returns the remote address
	RemoteAddress() string

	// Scheme returns the HTTP scheme for this connection
	Scheme() string

	// TLSNegotiatedProtocol is the protocol negotiated by TLS
	TLSNegotiatedProtocol() string

	// Trace returns the Trace to use
	Trace() Trace
}

HTTPConnection is the connection type expected by *HTTPRoundTripStage.

type HTTPRoundTripStage

type HTTPRoundTripStage[T HTTPConnection] struct {
	// Accept contains the OPTIONAL accept header.
	Accept string

	// AcceptLanguage contains the OPTIONAL accept-language header.
	AcceptLanguage string

	// Host contains the MANDATORY host header.
	Host string

	// Input contains the MANDATORY channel from which to connections. We
	// assume that this channel will be closed when done.
	Input <-chan T

	// MaxBodySnapshotSize is the OPTIONAL maximum body snapshot size.
	MaxBodySnapshotSize int64

	// Method contains the MANDATORY method.
	Method string

	// Output is the MANDATORY channel emitting [Void]. We will close this
	// channel when the Input channel has been closed.
	Output chan<- Done

	// Referer contains the OPTIONAL referer header.
	Referer string

	// URLPath contains the MANDATORY URL path.
	URLPath string

	// UserAgent contains the OPTIONAL user-agent header.
	UserAgent string
}

HTTPRoundTripStage performs HTTP round trips with connections of type T.

func (*HTTPRoundTripStage[T]) Run

func (sx *HTTPRoundTripStage[T]) Run(ctx context.Context, rtx Runtime)

Run is like [*TCPConnect.Run] except that it reads connections in Input and emits [Void] in Output. Each HTTP round trip runs in its own background goroutine. The parallelism is controlled by the Runtime ActiveConnections Semaphore. Note that this code TAKES OWNERSHIP of the connection it reads and closes it at the end of the round trip. While closing the conn, we signal Runtime ActiveConnections to unblock another measurement.

type MakeEndpointsStage

type MakeEndpointsStage struct {
	// Input contains the MANDATORY channel from which to read IP addresses. We
	// assume that this channel will be closed when done.
	Input <-chan string

	// Output is the MANDATORY channel emitting endpoints. We will close this
	// channel when the Input channel has been closed.
	Output chan<- string

	// Port is the MANDATORY port.
	Port string
}

MakeEndpointsStage is a Stage that transforms IP addresses to TCP/UDP endpoints.

func (*MakeEndpointsStage) Run

func (sx *MakeEndpointsStage) Run(ctx context.Context, rtx Runtime)

Run transforms IP addresses to endpoints.

type Observations

type Observations struct {
	// NetworkEvents contains I/O events.
	NetworkEvents []*model.ArchivalNetworkEvent `json:"network_events"`

	// Queries contains the DNS queries results.
	Queries []*model.ArchivalDNSLookupResult `json:"queries"`

	// Requests contains HTTP request results.
	Requests []*model.ArchivalHTTPRequestResult `json:"requests"`

	// TCPConnect contains the TCP connect results.
	TCPConnect []*model.ArchivalTCPConnectResult `json:"tcp_connect"`

	// TLSHandshakes contains the TLS handshakes results.
	TLSHandshakes []*model.ArchivalTLSOrQUICHandshakeResult `json:"tls_handshakes"`

	// QUICHandshakes contains the QUIC handshakes results.
	QUICHandshakes []*model.ArchivalTLSOrQUICHandshakeResult `json:"quic_handshakes"`
}

Observations is the skeleton shared by most OONI measurements where we group observations by type using standard test keys.

func NewObservations

func NewObservations() *Observations

NewObservations initializes all measurements to empty arrays and returns the Observations skeleton.

type QUICConnection

type QUICConnection struct {
	Conn quic.EarlyConnection
	// contains filtered or unexported fields
}

QUICConnection is a QUIC connection.

func (*QUICConnection) AsSingleUseTransport

func (c *QUICConnection) AsSingleUseTransport(logger model.Logger) model.HTTPTransport

AsSingleUseTransport implements HTTPConnection.

func (*QUICConnection) Close

func (c *QUICConnection) Close(logger model.Logger) error

Close implements HTTPConnection.

func (*QUICConnection) Network

func (c *QUICConnection) Network() string

Network implements HTTPConnection.

func (*QUICConnection) RemoteAddress

func (c *QUICConnection) RemoteAddress() (addr string)

RemoteAddress implements HTTPConnection.

func (*QUICConnection) Scheme

func (c *QUICConnection) Scheme() string

Scheme implements HTTPConnection.

func (*QUICConnection) TLSNegotiatedProtocol

func (c *QUICConnection) TLSNegotiatedProtocol() string

TLSNegotiatedProtocol implements HTTPConnection.

func (*QUICConnection) Trace

func (c *QUICConnection) Trace() Trace

Trace implements HTTPConnection.

type QUICHandshakeStage

type QUICHandshakeStage struct {
	// Input contains the MANDATORY channel from which to read endpoints. We
	// assume that this channel will be closed when done.
	Input <-chan string

	// InsecureSkipVerify OPTIONALLY skips QUIC verification.
	InsecureSkipVerify bool

	// NextProtos OPTIONALLY configures the ALPN.
	NextProtos []string

	// Output is the MANDATORY channel emitting [*QUICConnection]. We will close this
	// channel when the Input channel has been closed.
	Output chan<- *QUICConnection

	// RootCAs OPTIONALLY configures alternative root CAs.
	RootCAs *x509.CertPool

	// ServerName is the MANDATORY server name.
	ServerName string

	// Tags contains OPTIONAL tags to add to the endpoint observations.
	Tags []string
}

QUICHandshakeStage is a Stage that creates *QUICConnection.

func (*QUICHandshakeStage) Run

func (sx *QUICHandshakeStage) Run(ctx context.Context, rtx Runtime)

Run is like [*TCPConnect.Run] except that it reads [endpoints] in Input and emits *QUICConnection in Output. Each QUIC handshake runs in its own background goroutine. The parallelism is controlled by the Runtime ActiveConnections Semaphore and you MUST arrange for the *QUICConnection to eventually enter into a [*CloseStage] such that the code can release the above mentioned Semaphore and close the conn. Note that this code TAKES OWNERSHIP of the *TCPConnection it reads. We will close these conns automatically on failure. On success, they will be closed when the *QUICConnection wrapping them eventually enters into a [*CloseStage].

type Runtime

type Runtime interface {
	// ActiveConnections returns the [Semaphore] controlling the
	// maximum number of active connections that we can have.
	ActiveConnections() *Semaphore

	// ActiveDNSLookups returns the [Semaphore] controlling the
	// maximum number of active DNS lookups that we can have.
	ActiveDNSLookups() *Semaphore

	// IDGenerator returns an atomic counter used to generate
	// separate unique IDs for each trace.
	IDGenerator() *atomic.Int64

	// Logger returns the base logger to use.
	Logger() model.Logger

	// NewTrace creates a [Trace] instance. Note that each [Runtime]
	// creates its own [Trace] type. A [Trace] is not guaranteed to collect
	// [*Observations]. For example, [NewMinimalRuntime] creates a [Runtime]
	// that does not collect any [*Observations].
	NewTrace(index int64, zeroTime time.Time, tags ...string) Trace

	// Observations returns the [*Observations] saved so far and clears our
	// internal copy such that the next call to this method only returns
	// the [*Observations] saved since the previous call.
	//
	// You can safely call this method from multiple goroutine contexts.
	Observations() *Observations

	// SaveObservations saves [*Observations] inside the [Runtime]. You can
	// get the saved [*Observations] by calling Observations.
	//
	// You can safely call this method from multiple goroutine contexts.
	SaveObservations(obs ...*Observations)

	// ZeroTime returns the runtime's "zero" time, which is used as the
	// starting point to generate observation's delta times.
	ZeroTime() time.Time
}

Runtime is the runtime in which we execute the DSL.

type Semaphore

type Semaphore struct {
	// contains filtered or unexported fields
}

Semaphore implements a semaphore.

See https://en.wikipedia.org/wiki/Semaphore_(programming).

func NewSemaphore

func NewSemaphore(name string, count int) *Semaphore

NewSemaphore creates a new *Semaphore with the given count of available resources. This function PANICS if the given count of available resources is zero or negative.

func (*Semaphore) Signal

func (sema *Semaphore) Signal()

Signal signals that a resource is now available.

func (*Semaphore) Wait

func (sema *Semaphore) Wait()

Wait waits for a resource to be available.

type Stage

type Stage interface {
	Run(ctx context.Context, rtx Runtime)
}

Stage is a stage in the DSL graph.

type TCPConnectStage

type TCPConnectStage struct {
	// Input contains the MANDATORY channel from which to read endpoints. We
	// assume that this channel will be closed when done.
	Input <-chan string

	// Output is the MANDATORY channel emitting [*TCPConnection]. We will close this
	// channel when the Input channel has been closed.
	Output chan<- *TCPConnection

	// Tags contains OPTIONAL tags to add to the endpoint observations.
	Tags []string
}

TCPConnectStage is a Stage that creates *TCPConnection.

func (*TCPConnectStage) Run

func (sx *TCPConnectStage) Run(ctx context.Context, rtx Runtime)

Run reads endpoints from Input and streams on the Output channel the *TCPConnection that it could successfully establish. Note that this function honors the Semaphore returned by the Runtime ActiveConnections that controls how many connections we can measure in parallel. When given the permission to run, this function spawns a background goroutine that attempts to establish a connection. The *TCPConnection returned by this stage MUST eventually feed into a [*CloseStage], so that the code can notify the above mentioned Semaphore and so that we close the open connection. This function will close the Output channel when Inputs have been closed and there are no pending connection attempts. In case of failure, the code will automatically notify the Semaphore.

type TCPConnection

type TCPConnection struct {
	Conn net.Conn
	// contains filtered or unexported fields
}

TCPConnection is a TCP connection.

func (*TCPConnection) AsSingleUseTransport

func (c *TCPConnection) AsSingleUseTransport(logger model.Logger) model.HTTPTransport

AsSingleUseTransport implements HTTPConnection.

func (*TCPConnection) Close

func (c *TCPConnection) Close(logger model.Logger) error

Close implements HTTPConnection.

func (*TCPConnection) Network

func (c *TCPConnection) Network() string

Network implements HTTPConnection.

func (*TCPConnection) RemoteAddress

func (c *TCPConnection) RemoteAddress() (addr string)

RemoteAddress implements HTTPConnection.

func (*TCPConnection) Scheme

func (c *TCPConnection) Scheme() string

Scheme implements HTTPConnection.

func (*TCPConnection) TLSNegotiatedProtocol

func (c *TCPConnection) TLSNegotiatedProtocol() string

TLSNegotiatedProtocol implements HTTPConnection.

func (*TCPConnection) Trace

func (c *TCPConnection) Trace() Trace

Trace implements HTTPConnection.

type TLSConnection

type TLSConnection struct {
	Conn model.TLSConn
	// contains filtered or unexported fields
}

TLSConnection is a TLS connection.

func (*TLSConnection) AsSingleUseTransport

func (c *TLSConnection) AsSingleUseTransport(logger model.Logger) model.HTTPTransport

AsSingleUseTransport implements HTTPConnection.

func (*TLSConnection) Close

func (c *TLSConnection) Close(logger model.Logger) error

Close implements HTTPConnection.

func (*TLSConnection) Network

func (c *TLSConnection) Network() string

Network implements HTTPConnection.

func (*TLSConnection) RemoteAddress

func (c *TLSConnection) RemoteAddress() (addr string)

RemoteAddress implements HTTPConnection.

func (*TLSConnection) Scheme

func (c *TLSConnection) Scheme() string

Scheme implements HTTPConnection.

func (*TLSConnection) TLSNegotiatedProtocol

func (c *TLSConnection) TLSNegotiatedProtocol() string

TLSNegotiatedProtocol implements HTTPConnection.

func (*TLSConnection) Trace

func (c *TLSConnection) Trace() Trace

Trace implements HTTPConnection.

type TLSHandshakeStage

type TLSHandshakeStage struct {
	// Input contains the MANDATORY channel from which to read [*TCPConnection]. We
	// assume that this channel will be closed when done.
	Input <-chan *TCPConnection

	// InsecureSkipVerify OPTIONALLY skips TLS verification.
	InsecureSkipVerify bool

	// NextProtos OPTIONALLY configures the ALPN.
	NextProtos []string

	// Output is the MANDATORY channel emitting [*TLSConnection]. We will close this
	// channel when the Input channel has been closed.
	Output chan<- *TLSConnection

	// RootCAs OPTIONALLY configures alternative root CAs.
	RootCAs *x509.CertPool

	// ServerName is the MANDATORY server name.
	ServerName string
}

TLSHandshakeStage is a Stage that creates *TLSConnection.

func (*TLSHandshakeStage) Run

func (sx *TLSHandshakeStage) Run(ctx context.Context, rtx Runtime)

Run is like [*TCPConnect.Run] except that it reads *TCPConnection in Input and emits *TLSConnection in Output. Each TLS handshake runs in its own background goroutine. The parallelism is controlled by the Runtime ActiveConnections Semaphore and you MUST arrange for the *TLSConnection to eventually enter into a [*CloseStage] such that the code can release the above mentioned Semaphore and close the conn. Note that this code TAKES OWNERSHIP of the *TCPConnection it reads. We will close these conns automatically on failure. On success, they will be closed when the *TLSConnection wrapping them eventually enters into a [*CloseStage].

type TakeNStage

type TakeNStage[T any] struct {
	// Input contains the MANDATORY channel from which to read T. We
	// assume that this channel will be closed when done.
	Input <-chan T

	// N is the maximum number of entries to allow to pass. Any value
	// lower than zero is equivalent to setting this field to zero.
	N int64

	// Output is the MANDATORY channel emitting [T]. We will close this
	// channel when the Input channel has been closed.
	Output chan<- T
}

TakeNStage is a Stage that allows N elements with type T to pass and drops subsequent elements.

func (*TakeNStage[T]) Run

func (sx *TakeNStage[T]) Run(ctx context.Context, rtx Runtime)

Run runs the stage until completion.

type TeeAddrsStage

type TeeAddrsStage struct {
	// Input is the MANDATORY channel from which we read addresses. We assume
	// this channel is closed when done.
	Input <-chan string

	// Outputs is the MANDATORY list of channels where to duplicate the addresses read
	// from the Input channel. We close all Outputs when done.
	Outputs []chan<- string
}

TeeAddrsStage is a Stage that duplicates the addresses read in Input into each of the channels belonging to [Outputs].

func (*TeeAddrsStage) Run

func (sx *TeeAddrsStage) Run(ctx context.Context, rtx Runtime)

Run duplicates addresses read in Input into all the given Outputs.

type Trace

type Trace interface {
	// CloneBytesReceivedMap returns a clone of the internal bytes received map. The key of the
	// map is a string following the "EPNT_ADDRESS PROTO" pattern where the "EPNT_ADDRESS" contains
	// the endpoint address and "PROTO" is "tcp" or "udp".
	CloneBytesReceivedMap() (out map[string]int64)

	// DNSLookupsFromRoundTrip returns all the DNS lookup results collected so far.
	DNSLookupsFromRoundTrip() (out []*model.ArchivalDNSLookupResult)

	// Index returns the unique index used by this trace.
	Index() int64

	// NewDialerWithoutResolver is equivalent to netxlite.Netx.NewDialerWithoutResolver
	// except that it returns a model.Dialer that uses this trace.
	//
	// Caveat: the dialer wrappers are there to implement the
	// model.MeasuringNetwork interface, but they're not used by this function.
	NewDialerWithoutResolver(dl model.DebugLogger, wrappers ...model.DialerWrapper) model.Dialer

	// NewParallelUDPResolver returns a possibly-trace-ware parallel UDP resolver
	NewParallelUDPResolver(logger model.DebugLogger, dialer model.Dialer, address string) model.Resolver

	// NewQUICDialerWithoutResolver is equivalent to
	// netxlite.Netx.NewQUICDialerWithoutResolver except that it returns a
	// model.QUICDialer that uses this trace.
	//
	// Caveat: the dialer wrappers are there to implement the
	// model.MeasuringNetwork interface, but they're not used by this function.
	NewQUICDialerWithoutResolver(listener model.UDPListener,
		dl model.DebugLogger, wrappers ...model.QUICDialerWrapper) model.QUICDialer

	// NewTLSHandshakerStdlib is equivalent to netxlite.Netx.NewTLSHandshakerStdlib
	// except that it returns a model.TLSHandshaker that uses this trace.
	NewTLSHandshakerStdlib(dl model.DebugLogger) model.TLSHandshaker

	// NetworkEvents returns all the network events collected so far.
	NetworkEvents() (out []*model.ArchivalNetworkEvent)

	// NewStdlibResolver returns a possibly-trace-ware system resolver.
	NewStdlibResolver(logger model.DebugLogger) model.Resolver

	// NewUDPListener implements model.MeasuringNetwork.
	NewUDPListener() model.UDPListener

	// QUICHandshakes collects all the QUIC handshake results collected so far.
	QUICHandshakes() (out []*model.ArchivalTLSOrQUICHandshakeResult)

	// TCPConnects collects all the TCP connect results collected so far.
	TCPConnects() (out []*model.ArchivalTCPConnectResult)

	// TLSHandshakes collects all the TLS handshake results collected so far.
	TLSHandshakes() (out []*model.ArchivalTLSOrQUICHandshakeResult)

	// Tags returns the trace tags.
	Tags() []string

	// TimeSince is equivalent to Trace.TimeNow().Sub(t0).
	TimeSince(t0 time.Time) time.Duration

	// ZeroTime returns the "zero" time of this trace.
	ZeroTime() time.Time
}

Trace collects *Observations using tracing. Specific implementations of this interface may be engineered to collect no *Observations for efficiency (i.e., when you don't care about collecting *Observations but you still want to use this package).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL