fetch

package
v0.0.0-...-b000f17 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 5, 2024 License: NIST-PD-fallback Imports: 28 Imported by: 0

README

ndn-dpdk/app/fetch

This package is the congestion aware fetcher, used in the traffic generator. It implements a consumer that follows the TCP CUBIC congestion control algorithm, simulating traffic patterns similar to bulk file transfer. It requires at least one thread, running the FetchThread_Run function.

Fetch Task Definition

TaskDef defines a fetch task that retrieves one segmented object. A segmented object is a group of NDN packets, which have a common name prefix and have SegmentNameComponent as the last component. The TaskDef contains these fields:

  • Prefix: a name prefix except the last SegmentNameComponent.
    • Importantly, if you are retrieving from the file server, this field must end with the VersionNameComponent.
  • InterestLifetime
  • HopLimit
  • SegmentRange: retrieve a consecutive subset of the available segments.
    • If the fetcher encounters a Data packet whose FinalBlockId equals its last name component, the fetching will terminate at this segment, even if the upper bound of SegmentRange has not been reached.

Normally, a fetch task generates traffic similar to bulk file transfer, in which contents of the received packets are discarded. It is however possible to write the received payload into a file. In this case, the TaskDef additionally contains these fields:

  • Filename: output file name.
  • FileSize: total file size.
  • SegmentLen: the payload length in every segment; the last segment may be shorter.

Fetcher and its Workers

A worker is a thread running the FetchThread_Run function. It can simultaneously process zero or more fetch tasks, which are arranged in an RCU-protected linked list. It has an io_uring handle in order to write payload to files when requested.

A TaskContext stores information of an ongoing fetch task, which can be initialized from a TaskDef. It includes a taskSlot (aka C.FetchTask) used by C code, along with several Go objects. It is responsible for opening and closing the file, if the TaskDef requests to write payload to a file. Each taskSlot has an index number that used as the PIT token for its Interests, which allows the reply Data packets to come back to the same taskSlot.

FetchLogic contained with the taskSlot implements the algorithmic part of the fetch procedure. It includes an RTT estimator, a CUBIC-like congestion control implementation, and a retransmission queue. It makes decisions on when to transmit an Interest for a certain segment number, and gets notified about when the Data arrives with or without a congestion mark. Nack packets are not considered in the congestion aware fetcher.

Fetcher is the top level. It controls one or more workers and owns one or more task slots. A incoming TaskDef is placed into an unused task slot, and then added to the worker with the least number of ongoing fetch tasks.

Documentation

Overview

Package fetch retrieves segmented objects.

Index

Constants

This section is empty.

Variables

View Source
var (
	GqlConfigInput     *graphql.InputObject
	GqlTaskDefInput    *graphql.InputObject
	GqlTaskDefType     *graphql.Object
	GqlTaskContextType *gqlserver.NodeType[*TaskContext]
	GqlFetcherType     *gqlserver.NodeType[*Fetcher]
)

GraphQL types.

View Source
var GqlRetrieveByFaceID func(id iface.ID) *Fetcher

GqlRetrieveByFaceID returns *Fetcher associated with a face. It is assigned during package tg initialization.

Functions

This section is empty.

Types

type Config

type Config struct {
	TaskSlotConfig

	// NThreads is the number of worker threads.
	// Each worker thread can serve multiple fetch tasks.
	NThreads int `json:"nThreads,omitempty"`

	// NTasks is the number of task slots.
	// Each task retrieves one segmented object and has independent congestion control.
	NTasks int `json:"nTasks,omitempty"`
}

Config contains Fetcher configuration.

func (*Config) Validate

func (cfg *Config) Validate() error

Validate applies defaults and validates the configuration.

type Counters

type Counters struct {
	Elapsed   time.Duration  `json:"elapsed" gqldesc:"Duration since start fetching."`
	Finished  *time.Duration `json:"finished" gqldesc:"Duration between start and finish; null if not finished."`
	LastRtt   time.Duration  `json:"lastRtt" gqldesc:"Last RTT sample."`
	SRtt      time.Duration  `json:"sRtt" gqldesc:"Smoothed RTT."`
	Rto       time.Duration  `json:"rto" gqldesc:"RTO."`
	Cwnd      int            `json:"cwnd" gqldesc:"Congestion window."`
	NInFlight uint32         `json:"nInFlight" gqldesc:"Currently in-flight Interests."`
	NTxRetx   uint64         `json:"nTxRetx" gqldesc:"Retransmitted Interests."`
	NRxData   uint64         `json:"nRxData" gqldesc:"Data satisfying pending Interests."`
}

Counters contains counters of Logic.

func (Counters) String

func (cnt Counters) String() string

type Fetcher

type Fetcher struct {
	// contains filtered or unexported fields
}

Fetcher controls worker threads and task slots on a face.

func New

func New(face iface.Face, cfg Config) (*Fetcher, error)

New creates a Fetcher.

func (*Fetcher) Close

func (fetcher *Fetcher) Close() error

Close deallocates data structures.

func (*Fetcher) ConnectRxQueues

func (fetcher *Fetcher) ConnectRxQueues(demuxD, demuxN *iface.InputDemux)

ConnectRxQueues connects Data InputDemux to RxQueues. Nack InputDemux is set to drop packets because fetcher does not support Nacks.

func (*Fetcher) Face

func (fetcher *Fetcher) Face() iface.Face

Face returns the face.

func (*Fetcher) Fetch

func (fetcher *Fetcher) Fetch(d TaskDef) (task *TaskContext, e error)

Fetch starts a fetch task.

func (*Fetcher) Launch

func (fetcher *Fetcher) Launch()

Launch launches all worker threads.

func (*Fetcher) Reset

func (fetcher *Fetcher) Reset()

Reset aborts all tasks and stops all worker threads.

func (*Fetcher) Stop

func (fetcher *Fetcher) Stop() error

Stop stops all worker threads.

func (*Fetcher) Tasks

func (fetcher *Fetcher) Tasks() (list []*TaskContext)

Tasks returns running fetch tasks.

func (*Fetcher) Workers

func (fetcher *Fetcher) Workers() []ealthread.ThreadWithRole

Workers returns worker threads.

type Logic

type Logic C.FetchLogic

Logic implements fetcher congestion control and scheduling logic.

func (*Logic) Close

func (fl *Logic) Close() error

Close deallocates data structures.

func (*Logic) Counters

func (fl *Logic) Counters() (cnt Counters)

Counters retrieves counters.

func (*Logic) Finished

func (fl *Logic) Finished() bool

Finished determines if all segments have been fetched.

func (*Logic) Init

func (fl *Logic) Init(winCapacity int, socket eal.NumaSocket)

Init initializes the logic and allocates data structures.

func (*Logic) Reset

func (fl *Logic) Reset(r segmented.SegmentRange)

Reset resets this to initial state.

type TaskContext

type TaskContext struct {
	// contains filtered or unexported fields
}

TaskContext provides contextual information about an active fetch task.

func (*TaskContext) Counters

func (task *TaskContext) Counters() Counters

Counters returns congestion control and scheduling counters.

func (*TaskContext) Finished

func (task *TaskContext) Finished() bool

Finished determines if all segments have been fetched.

func (*TaskContext) Stop

func (task *TaskContext) Stop()

Stop aborts/stops the fetch task. This should be called even if the fetch task has succeeded.

type TaskDef

type TaskDef struct {
	// InterestTemplateConfig contains the name prefix, InterestLifetime, etc.
	//
	// The fetcher neither retrieves metadata nor performs version discovery.
	// If the content is published with version component, it should appear in the name prefix.
	//
	// CanBePrefix and MustBeFresh are not normally used, but they may be included for benchmarking purpose.
	ndni.InterestTemplateConfig

	// SegmentRange specifies range of segment numbers.
	// If writing to a file, SegmentEnd must be explicitly specified.
	segmented.SegmentRange

	// Filename is the output file name.
	// If omitted, payload is not written to a file.
	Filename string `json:"filename,omitempty"`

	// FileSize is total payload length.
	// This is only relevant when writing to a file.
	// If set, the file will be truncated to this size after fetching is completed.
	FileSize *int64 `json:"fileSize"`

	// SegmentLen is the payload length in each segment.
	// This is only needed when writing to a file.
	// If any segment has incorrect Content TLV-LENGTH, the output file would not contain correct payload.
	SegmentLen int `json:"segmentLen,omitempty"`
}

TaskDef defines a fetch task that retrieves one segmented object.

type TaskSlotConfig

type TaskSlotConfig struct {
	// RxQueue configures the RX queue of Data packets going to each task slot.
	// CoDel cannot be used in these queues.
	RxQueue iface.PktQueueConfig `json:"rxQueue,omitempty"`

	// WindowCapacity is the maximum distance between lower and upper bounds of segment numbers in an ongoing fetch logic.
	WindowCapacity int `json:"windowCapacity,omitempty"`
}

TaskSlotConfig contains task slot configuration.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL