dsort

package
v1.3.25 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 6, 2024 License: MIT Imports: 46 Imported by: 4

Documentation

Overview

Package dsort provides distributed massively parallel resharding for very large datasets.

  • Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.

Package dsort provides distributed massively parallel resharding for very large datasets.

  • Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.

Package dsort provides distributed massively parallel resharding for very large datasets.

  • Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.

Package dsort provides distributed massively parallel resharding for very large datasets.

  • Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved. *

Package dsort provides distributed massively parallel resharding for very large datasets.

  • Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved.

Package dsort provides distributed massively parallel resharding for very large datasets.

  • Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.

Package dsort provides distributed massively parallel resharding for very large datasets.

  • Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.

Package dsort provides distributed massively parallel resharding for very large datasets.

  • Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.

Package dsort provides distributed massively parallel resharding for very large datasets.

  • Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved.

Package dsort provides distributed massively parallel resharding for very large datasets.

  • Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved.

Package dsort provides distributed massively parallel resharding for very large datasets.

  • Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.

Package dsort provides distributed massively parallel resharding for very large datasets.

  • Copyright (c) 2018-2021, NVIDIA CORPORATION. All rights reserved.

msgp -file <path to dsort/manager_types.go> -tests=false -marshal=false -unexported Code generated by the command above; see docs/msgp.md. DO NOT EDIT.

Package dsort provides APIs for distributed archive file shuffling.

  • Copyright (c) 2018-2021, NVIDIA CORPORATION. All rights reserved.

Package dsort provides distributed massively parallel resharding for very large datasets.

  • Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.

Package dsort provides APIs for distributed archive file shuffling.

  • Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved.

Package dsort provides distributed massively parallel resharding for very large datasets.

  • Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.

Index

Constants

View Source
const (
	Alphanumeric = "alphanumeric" // string comparison (decreasing or increasing)
	None         = "none"         // none (used for resharding)
	MD5          = "md5"          // compare md5(name)
	Shuffle      = "shuffle"      // random shuffle (use with the same seed to reproduce)
	Content      = "content"      // extract (int, string, float) from a given file, and compare
)
View Source
const (
	ExtractionPhase = "extraction"
	SortingPhase    = "sorting"
	CreationPhase   = "creation"
)
View Source
const DefaultExt = archive.ExtTar // default shard extension/format/MIME when spec's input_extension is empty
View Source
const (
	GeneralType = "dsort_general"
)
View Source
const (
	MemType = "dsort_mem"
)
View Source
const PrefixJobID = "srt-"

Variables

This section is empty.

Functions

func PabortHandler added in v1.3.19

func PabortHandler(w http.ResponseWriter, r *http.Request)

DELETE /v1/sort/abort

func PgetHandler added in v1.3.19

func PgetHandler(w http.ResponseWriter, r *http.Request)

GET /v1/sort

func Pinit added in v1.3.19

func Pinit(si core.Node, config *cmn.Config)

func PremoveHandler added in v1.3.19

func PremoveHandler(w http.ResponseWriter, r *http.Request)

DELETE /v1/sort

func PstartHandler added in v1.3.19

func PstartHandler(w http.ResponseWriter, r *http.Request, parsc *ParsedReq)

POST /v1/sort

func TargetHandler

func TargetHandler(w http.ResponseWriter, r *http.Request)

[METHOD] /v1/sort

func Tinit added in v1.3.19

func Tinit(tstats stats.Tracker, db kvdb.Driver, config *cmn.Config)

Types

type Algorithm added in v1.3.19

type Algorithm struct {
	// one of the `algorithms` above
	Kind string `json:"kind"`

	// used with two sorting alg-s: Alphanumeric and Content
	Decreasing bool `json:"decreasing"`

	// when sort is a random shuffle
	Seed string `json:"seed"`

	// usage: exclusively for Content sorting
	// e.g.: ".cls" containing sorting key for each record (sample) - see next
	// NOTE: not to confuse with shards "input_extension"
	Ext string `json:"extension"`

	// ditto: Content only
	// `shard.contentKeyTypes` enum values: {"int", "string", "float" }
	ContentKeyType string `json:"content_key_type"`
}

type CreationPhaseMetadata

type CreationPhaseMetadata struct {
	Shards    []*shard.Shard          `msg:"shards"`
	SendOrder map[string]*shard.Shard `msg:"send_order"`
}

func (*CreationPhaseMetadata) DecodeMsg

func (z *CreationPhaseMetadata) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (*CreationPhaseMetadata) EncodeMsg

func (z *CreationPhaseMetadata) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (*CreationPhaseMetadata) Msgsize

func (z *CreationPhaseMetadata) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

type JobInfo

type JobInfo struct {
	ID                string        `json:"id"` // job ID == xact ID (aka managerUUID)
	SrcBck            cmn.Bck       `json:"src-bck"`
	DstBck            cmn.Bck       `json:"dst-bck"`
	StartedTime       time.Time     `json:"started_time,omitempty"`
	FinishTime        time.Time     `json:"finish_time,omitempty"`
	ExtractedDuration time.Duration `json:"started_meta_sorting,omitempty"`
	SortingDuration   time.Duration `json:"started_shard_creation,omitempty"`
	CreationDuration  time.Duration `json:"finished_shard_creation,omitempty"`
	Objs              int64         `json:"loc-objs,string"`  // locally processed
	Bytes             int64         `json:"loc-bytes,string"` //
	Metrics           *Metrics
	Aborted           bool `json:"aborted"`
	Archived          bool `json:"archived"`
}

JobInfo is a struct that contains stats that represent the Dsort run in a list

func (*JobInfo) Aggregate

func (j *JobInfo) Aggregate(other *JobInfo)

func (*JobInfo) IsFinished

func (j *JobInfo) IsFinished() bool

func (*JobInfo) IsRunning

func (j *JobInfo) IsRunning() bool

type LocalExtraction

type LocalExtraction struct {

	// TotalCnt is the number of shards Dsort has to process in total.
	TotalCnt int64 `json:"total_count,string"`
	// ExtractedCnt is the cumulative number of extracted shards. In the
	// end, this should be roughly equal to TotalCnt/#Targets.
	ExtractedCnt int64 `json:"extracted_count,string"`
	// ExtractedSize is uncompressed size of extracted shards.
	ExtractedSize int64 `json:"extracted_size,string"`
	// ExtractedRecordCnt - number of records extracted from all shards.
	ExtractedRecordCnt int64 `json:"extracted_record_count,string"`
	// ExtractedToDiskCnt describes number of shards extracted to the disk. To
	// compute the number shards extracted to memory just subtract it from
	// ExtractedCnt.
	ExtractedToDiskCnt int64 `json:"extracted_to_disk_count,string"`
	// ExtractedToDiskSize - uncompressed size of shards extracted to disk.
	ExtractedToDiskSize int64 `json:"extracted_to_disk_size,string"`
	// contains filtered or unexported fields
}

LocalExtraction contains metrics for first phase of Dsort.

type Manager

type Manager struct {
	// tagged fields are the only fields persisted once dsort finishes
	ManagerUUID string         `json:"manager_uuid"`
	Metrics     *Metrics       `json:"metrics"`
	Pars        *parsedReqSpec `json:"pars"`
	// contains filtered or unexported fields
}

Manager maintains all the state required for a single run of a distributed archive file shuffle.

func (*Manager) String

func (m *Manager) String() string

type MetaSorting

type MetaSorting struct {

	// SentStats - time statistics about records sent to another target
	SentStats *TimeStats `json:"sent_stats,omitempty"`
	// RecvStats - time statistics about records receivied from another target
	RecvStats *TimeStats `json:"recv_stats,omitempty"`
	// contains filtered or unexported fields
}

MetaSorting contains metrics for second phase of Dsort.

type Metrics

type Metrics struct {
	Extraction *LocalExtraction `json:"local_extraction,omitempty"`
	Sorting    *MetaSorting     `json:"meta_sorting,omitempty"`
	Creation   *ShardCreation   `json:"shard_creation,omitempty"`

	// job description
	Description string `json:"description,omitempty"`

	// warnings during the run
	Warnings []string `json:"warnings,omitempty"`
	// errors, if any
	Errors []string `json:"errors,omitempty"`

	// has been aborted
	Aborted atomic.Bool `json:"aborted,omitempty"`
	// has been archived to persistent storage
	Archived atomic.Bool `json:"archived,omitempty"`
}

Metrics is general struct which contains all stats about Dsort run.

func (*Metrics) ElapsedTime

func (m *Metrics) ElapsedTime() time.Duration

func (*Metrics) ToJobInfo

func (m *Metrics) ToJobInfo(id string, pars *parsedReqSpec) JobInfo

type ParsedReq added in v1.3.19

type ParsedReq struct {
	InputBck  cmn.Bck
	OutputBck cmn.Bck
	// contains filtered or unexported fields
}

type RemoteResponse

type RemoteResponse struct {
	Record    *shard.Record    `msg:"r"`
	RecordObj *shard.RecordObj `msg:"o"`
}

func (*RemoteResponse) DecodeMsg

func (z *RemoteResponse) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (*RemoteResponse) EncodeMsg

func (z *RemoteResponse) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (*RemoteResponse) Msgsize

func (z *RemoteResponse) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

type RequestSpec

type RequestSpec struct {
	// Required
	InputBck        cmn.Bck       `json:"input_bck" yaml:"input_bck"`
	InputFormat     apc.ListRange `json:"input_format" yaml:"input_format"`
	OutputFormat    string        `json:"output_format" yaml:"output_format"`
	OutputShardSize string        `json:"output_shard_size" yaml:"output_shard_size"`

	// Desirable
	InputExtension string `json:"input_extension" yaml:"input_extension"`

	// Optional
	// Default: InputExtension
	OutputExtension string `json:"output_extension" yaml:"output_extension"`
	// Default: ""
	Description string `json:"description" yaml:"description"`
	// Default: same as `bck` field
	OutputBck cmn.Bck `json:"output_bck" yaml:"output_bck"`
	// Default: alphanumeric, increasing
	Algorithm Algorithm `json:"algorithm" yaml:"algorithm"`
	// Default: ""
	EKMFileURL string `json:"ekm_file" yaml:"ekm_file"`
	// Default: "\t"
	EKMFileSep string `json:"ekm_file_sep" yaml:"ekm_file_sep"`
	// Default: "80%"
	MaxMemUsage string `json:"max_mem_usage" yaml:"max_mem_usage"`
	// Default: calcMaxLimit()
	ExtractConcMaxLimit int `json:"extract_concurrency_max_limit" yaml:"extract_concurrency_max_limit"`
	// Default: calcMaxLimit()
	CreateConcMaxLimit int `json:"create_concurrency_max_limit" yaml:"create_concurrency_max_limit"`

	// debug
	DsorterType string `json:"dsorter_type"`
	DryRun      bool   `json:"dry_run"` // Default: false

	Config cmn.DsortConf
}

RequestSpec defines the user specification for requests to the endpoint /v1/sort.

func (*RequestSpec) ParseCtx added in v1.3.19

func (rs *RequestSpec) ParseCtx() (*ParsedReq, error)

type ShardCreation

type ShardCreation struct {

	// ToCreate - number of shards that to be created in this phase.
	ToCreate int64 `json:"to_create,string"`
	// CreatedCnt the number of shards that have been so far created.
	// Should match ToCreate when phase finishes.
	CreatedCnt int64 `json:"created_count,string"`
	// MovedShardCnt specifies the number of shards that have migrated from this
	// to another target. Applies only when dealing with compressed
	// data. Sometimes, rather than creating at the destination, it is faster
	// to create a shard on a specific target and send it over (to the destination).
	MovedShardCnt int64 `json:"moved_shard_count,string"`
	// RequestStats - time statistics: requests to other targets.
	RequestStats *TimeStats `json:"req_stats,omitempty"`
	// ResponseStats - time statistics: responses to other targets.
	ResponseStats *TimeStats `json:"resp_stats,omitempty"`
	// contains filtered or unexported fields
}

ShardCreation contains metrics for third and last phase of Dsort.

type TimeStats

type TimeStats struct {
	// Total contains total number of milliseconds spend on
	// specific task.
	Total int64 `json:"total_ms,string"`
	// Count contains number of time specific task was triggered.
	Count int64 `json:"count,string"`
	MinMs int64 `json:"min_ms,string"`
	MaxMs int64 `json:"max_ms,string"`
	AvgMs int64 `json:"avg_ms,string"`
}

TimeStats contains statistics about time spent on specific task. It calculates min, max and avg times.

Directories

Path Synopsis
Package ct provides additional dsort-specific content types
Package ct provides additional dsort-specific content types
Package shard provides Extract(shard), Create(shard), and associated methods across all suppported archival formats (see cmn/archive/mime.go)
Package shard provides Extract(shard), Create(shard), and associated methods across all suppported archival formats (see cmn/archive/mime.go)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL