Documentation
¶
Overview ¶
Package dsort provides distributed massively parallel resharding for very large datasets.
- Copyright (c) 2018-2022, NVIDIA CORPORATION. All rights reserved. *
Package dsort provides distributed massively parallel resharding for very large datasets.
- Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.
Package dsort provides distributed massively parallel resharding for very large datasets.
- Copyright (c) 2018-2022, NVIDIA CORPORATION. All rights reserved.
Package dsort provides distributed massively parallel resharding for very large datasets.
- Copyright (c) 2018-2022, NVIDIA CORPORATION. All rights reserved.
Package dsort provides distributed massively parallel resharding for very large datasets.
- Copyright (c) 2018-2022, NVIDIA CORPORATION. All rights reserved.
Package dsort provides distributed massively parallel resharding for very large datasets.
- Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.
Package dsort provides distributed massively parallel resharding for very large datasets.
- Copyright (c) 2018-2021, NVIDIA CORPORATION. All rights reserved.
Package dsort provides distributed massively parallel resharding for very large datasets.
- Copyright (c) 2018-2021, NVIDIA CORPORATION. All rights reserved.
Package dsort provides distributed massively parallel resharding for very large datasets.
- Copyright (c) 2018-2021, NVIDIA CORPORATION. All rights reserved.
msgp -file <path to dsort/manager_types.go> -tests=false -marshal=false -unexported Code generated by the command above; see docs/msgp.md. DO NOT EDIT.
Package dsort provides APIs for distributed archive file shuffling.
- Copyright (c) 2018-2021, NVIDIA CORPORATION. All rights reserved.
Package dsort provides distributed massively parallel resharding for very large datasets.
- Copyright (c) 2018-2022, NVIDIA CORPORATION. All rights reserved.
Package dsort provides APIs for distributed archive file shuffling.
Package dsort provides APIs for distributed archive file shuffling.
- Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.
Index ¶
- Constants
- func InitManagers(db kvdb.Driver)
- func ProxyAbortSortHandler(w http.ResponseWriter, r *http.Request)
- func ProxyGetHandler(w http.ResponseWriter, r *http.Request)
- func ProxyRemoveSortHandler(w http.ResponseWriter, r *http.Request)
- func ProxyStartSortHandler(w http.ResponseWriter, r *http.Request, parsedRS *ParsedRequestSpec)
- func RegisterNode(smapOwner meta.Sowner, bmdOwner meta.Bowner, snode *meta.Snode, ...)
- func TargetHandler(w http.ResponseWriter, r *http.Request)
- type CreationPhaseMetadata
- type DetailedStats
- type JobInfo
- type LocalExtraction
- type Manager
- type ManagerGroup
- func (mg *ManagerGroup) AbortAll(err error)
- func (mg *ManagerGroup) Add(managerUUID string) (*Manager, error)
- func (mg *ManagerGroup) Get(managerUUID string, ap ...bool) (*Manager, bool)
- func (mg *ManagerGroup) List(descRegex *regexp.Regexp, onlyActive bool) []JobInfo
- func (mg *ManagerGroup) Remove(managerUUID string) error
- type MetaSorting
- type Metrics
- type ParsedRequestSpec
- type PhaseInfo
- type RemoteResponse
- type RequestSpec
- type ShardCreation
- type SortAlgorithm
- type ThroughputStats
- type TimeStats
Constants ¶
const ( ExtractionPhase = "extraction" SortingPhase = "sorting" CreationPhase = "creation" )
const ( SortKindAlphanumeric = "alphanumeric" // sort the records (decreasing or increasing) SortKindNone = "none" // none, used for resharding SortKindMD5 = "md5" SortKindShuffle = "shuffle" // shuffle randomly, can be used with seed to get reproducible results SortKindContent = "content" // sort by content of given file )
const DSortName = "dsort"
const (
DSorterGeneralType = "dsort_general"
)
const (
DSorterMemType = "dsort_mem"
)
const PrefixJobID = "srt-"
Variables ¶
This section is empty.
Functions ¶
func InitManagers ¶
func ProxyAbortSortHandler ¶
func ProxyAbortSortHandler(w http.ResponseWriter, r *http.Request)
DELETE /v1/sort/abort
func ProxyRemoveSortHandler ¶
func ProxyRemoveSortHandler(w http.ResponseWriter, r *http.Request)
DELETE /v1/sort
func ProxyStartSortHandler ¶
func ProxyStartSortHandler(w http.ResponseWriter, r *http.Request, parsedRS *ParsedRequestSpec)
POST /v1/sort
func RegisterNode ¶
Types ¶
type CreationPhaseMetadata ¶
type CreationPhaseMetadata struct { Shards []*extract.Shard `msg:"shards"` SendOrder map[string]*extract.Shard `msg:"send_order"` }
func (*CreationPhaseMetadata) DecodeMsg ¶
func (z *CreationPhaseMetadata) DecodeMsg(dc *msgp.Reader) (err error)
DecodeMsg implements msgp.Decodable
func (*CreationPhaseMetadata) EncodeMsg ¶
func (z *CreationPhaseMetadata) EncodeMsg(en *msgp.Writer) (err error)
EncodeMsg implements msgp.Encodable
func (*CreationPhaseMetadata) Msgsize ¶
func (z *CreationPhaseMetadata) Msgsize() (s int)
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
type DetailedStats ¶
type DetailedStats struct { *TimeStats *ThroughputStats }
DetailedStats contains time and throughput statistics .
type JobInfo ¶
type JobInfo struct { ID string `json:"id"` Description string `json:"description"` StartedTime time.Time `json:"started_time,omitempty"` FinishTime time.Time `json:"finish_time,omitempty"` ExtractedDuration time.Duration `json:"started_meta_sorting,omitempty"` SortingDuration time.Duration `json:"started_shard_creation,omitempty"` CreationDuration time.Duration `json:"finished_shard_creation,omitempty"` Aborted bool `json:"aborted"` Archived bool `json:"archived"` }
JobInfo is a struct that contains stats that represent the DSort run in a list
func (*JobInfo) IsFinished ¶
type LocalExtraction ¶
type LocalExtraction struct { PhaseInfo // TotalCnt is the number of shards DSort has to process in total. TotalCnt int64 `json:"total_count,string"` // ExtractedCnt describes number of extracted shards to given moment. At the // end, number should be roughly equal to TotalCnt/#Targets. ExtractedCnt int64 `json:"extracted_count,string"` // ExtractedSize describes uncompressed size of extracted shards to given moment. ExtractedSize int64 `json:"extracted_size,string"` // ExtractedRecordCnt describes number of records extracted from all shards. ExtractedRecordCnt int64 `json:"extracted_record_count,string"` // ExtractedToDiskCnt describes number of shards extracted to the disk. To // compute the number shards extracted to memory just subtract it from // ExtractedCnt. ExtractedToDiskCnt int64 `json:"extracted_to_disk_count,string"` // ExtractedToDiskSize describes uncompressed size of extracted shards to disk // to given moment. ExtractedToDiskSize int64 `json:"extracted_to_disk_size,string"` // ShardExtractionStats describes time statistics about single shard extraction. ShardExtractionStats *DetailedStats `json:"single_shard_stats,omitempty"` }
LocalExtraction contains metrics for first phase of DSort.
type Manager ¶
type Manager struct { // Fields with json tags are the only fields which are persisted // into the disk once the dSort is finished. ManagerUUID string `json:"manager_uuid"` Metrics *Metrics `json:"metrics"` // contains filtered or unexported fields }
Manager maintains all the state required for a single run of a distributed archive file shuffle.
func (*Manager) ListenSmapChanged ¶
func (m *Manager) ListenSmapChanged()
type ManagerGroup ¶
type ManagerGroup struct {
// contains filtered or unexported fields
}
ManagerGroup abstracts multiple dsort managers into single struct.
var Managers *ManagerGroup
func NewManagerGroup ¶
func NewManagerGroup(db kvdb.Driver, skipHk bool) *ManagerGroup
NewManagerGroup returns new, initialized manager group.
func (*ManagerGroup) AbortAll ¶
func (mg *ManagerGroup) AbortAll(err error)
func (*ManagerGroup) Add ¶
func (mg *ManagerGroup) Add(managerUUID string) (*Manager, error)
Add new, non-initialized manager with given managerUUID to manager group. Returned manager is locked, it's caller responsibility to unlock it. Returns error when manager with specified managerUUID already exists.
func (*ManagerGroup) Get ¶
func (mg *ManagerGroup) Get(managerUUID string, ap ...bool) (*Manager, bool)
Get gets manager with given mangerUUID. When manager with given uuid does not exist and user requested persisted lookup, it looks for it in persistent storage and returns it if found. Returns false if does not exist, true otherwise.
func (*ManagerGroup) List ¶
func (mg *ManagerGroup) List(descRegex *regexp.Regexp, onlyActive bool) []JobInfo
func (*ManagerGroup) Remove ¶
func (mg *ManagerGroup) Remove(managerUUID string) error
Remove the managerUUID from history. Used for reducing clutter. Fails if process hasn't been cleaned up.
type MetaSorting ¶
type MetaSorting struct { PhaseInfo // SentStats describes time statistics about records sending to another target SentStats *TimeStats `json:"sent_stats,omitempty"` // RecvStats describes time statistics about records receiving from another target RecvStats *TimeStats `json:"recv_stats,omitempty"` }
MetaSorting contains metrics for second phase of DSort.
type Metrics ¶
type Metrics struct { Extraction *LocalExtraction `json:"local_extraction,omitempty"` Sorting *MetaSorting `json:"meta_sorting,omitempty"` Creation *ShardCreation `json:"shard_creation,omitempty"` // Aborted specifies if the DSort has been aborted or not. Aborted atomic.Bool `json:"aborted,omitempty"` // Archived specifies if the DSort has been archived to persistent storage. Archived atomic.Bool `json:"archived,omitempty"` // Description of the job. Description string `json:"description,omitempty"` // Warnings which were produced during the job. Warnings []string `json:"warnings,omitempty"` // Errors which happened during the job. Errors []string `json:"errors,omitempty"` // contains filtered or unexported fields }
Metrics is general struct which contains all stats about DSort run.
func (*Metrics) ElapsedTime ¶
type ParsedRequestSpec ¶
type ParsedRequestSpec struct { Bck cmn.Bck `json:"bck"` Description string `json:"description"` OutputBck cmn.Bck `json:"output_bck"` Extension string `json:"extension"` OutputShardSize int64 `json:"output_shard_size,string"` InputFormat *parsedInputTemplate `json:"input_format"` OutputFormat *parsedOutputTemplate `json:"output_format"` Algorithm *SortAlgorithm `json:"algorithm"` OrderFileURL string `json:"order_file"` OrderFileSep string `json:"order_file_sep"` MaxMemUsage cos.ParsedQuantity `json:"max_mem_usage"` TargetOrderSalt []byte `json:"target_order_salt"` ExtractConcMaxLimit int `json:"extract_concurrency_max_limit"` CreateConcMaxLimit int `json:"create_concurrency_max_limit"` StreamMultiplier int `json:"stream_multiplier"` // TODO: should be removed ExtendedMetrics bool `json:"extended_metrics"` // debug DSorterType string `json:"dsorter_type"` DryRun bool `json:"dry_run"` cmn.DSortConf }
type PhaseInfo ¶
type PhaseInfo struct { Start time.Time `json:"started_time"` End time.Time `json:"end_time"` // Elapsed time (in seconds) from start to given point of time or end when // phase has finished. Elapsed time.Duration `json:"elapsed"` // Running specifies if phase is in progress. Running bool `json:"running"` // Finished specifies if phase has finished. If running and finished is // false this means that the phase did not have started yet. Finished bool `json:"finished"` // contains filtered or unexported fields }
PhaseInfo contains general stats and state for given phase. It is base struct which is extended by actual phases structs.
type RemoteResponse ¶
type RemoteResponse struct { Record *extract.Record `msg:"r"` RecordObj *extract.RecordObj `msg:"o"` }
func (*RemoteResponse) DecodeMsg ¶
func (z *RemoteResponse) DecodeMsg(dc *msgp.Reader) (err error)
DecodeMsg implements msgp.Decodable
func (*RemoteResponse) EncodeMsg ¶
func (z *RemoteResponse) EncodeMsg(en *msgp.Writer) (err error)
EncodeMsg implements msgp.Encodable
func (*RemoteResponse) Msgsize ¶
func (z *RemoteResponse) Msgsize() (s int)
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
type RequestSpec ¶
type RequestSpec struct { // Required Bck cmn.Bck `json:"bck" yaml:"bck"` Extension string `json:"extension" yaml:"extension"` InputFormat string `json:"input_format" yaml:"input_format"` OutputFormat string `json:"output_format" yaml:"output_format"` OutputShardSize string `json:"output_shard_size" yaml:"output_shard_size"` // Optional Description string `json:"description" yaml:"description"` // Default: same as `bck` field OutputBck cmn.Bck `json:"output_bck" yaml:"output_bck"` // Default: alphanumeric, increasing Algorithm SortAlgorithm `json:"algorithm" yaml:"algorithm"` // Default: "" OrderFileURL string `json:"order_file" yaml:"order_file"` // Default: "\t" OrderFileSep string `json:"order_file_sep" yaml:"order_file_sep"` // Default: "80%" MaxMemUsage string `json:"max_mem_usage" yaml:"max_mem_usage"` // Default: calcMaxLimit() ExtractConcMaxLimit int `json:"extract_concurrency_max_limit" yaml:"extract_concurrency_max_limit"` // Default: calcMaxLimit() CreateConcMaxLimit int `json:"create_concurrency_max_limit" yaml:"create_concurrency_max_limit"` // Default: bundle.Multiplier StreamMultiplier int `json:"stream_multiplier" yaml:"stream_multiplier"` // Default: false ExtendedMetrics bool `json:"extended_metrics" yaml:"extended_metrics"` // debug DSorterType string `json:"dsorter_type"` DryRun bool `json:"dry_run"` // Default: false cmn.DSortConf }
RequestSpec defines the user specification for requests to the endpoint /v1/sort.
func (*RequestSpec) Parse ¶
func (rs *RequestSpec) Parse() (*ParsedRequestSpec, error)
Parse returns a non-nil error if a RequestSpec is invalid. When RequestSpec is valid it parses all the fields, sets the values and returns ParsedRequestSpec.
type ShardCreation ¶
type ShardCreation struct { PhaseInfo // ToCreate specifies number of shards that have to be created in this phase. ToCreate int64 `json:"to_create,string"` // CreatedCnt specifies the number of shards that have been so far created. // Should match ToCreate when phase is finished. CreatedCnt int64 `json:"created_count,string"` // MovedShardCnt specifies the number of shards that have migrated from this // to another target in the cluster. Applies only when dealing with compressed // data. Sometimes it is faster to create a shard on a specific target and send it // over (rather than creating on a destination target). MovedShardCnt int64 `json:"moved_shard_count,string"` // RequestStats describes time statistics about request to other target. RequestStats *TimeStats `json:"req_stats,omitempty"` // ResponseStats describes time statistics about response to other target. ResponseStats *TimeStats `json:"resp_stats,omitempty"` // LocalSendStats describes time statistics about sending record content to other target. LocalSendStats *DetailedStats `json:"local_send_stats,omitempty"` // LocalRecvStats describes time statistics about receiving record content from other target. LocalRecvStats *DetailedStats `json:"local_recv_stats,omitempty"` // ShardCreationStats describes time statistics about single shard creation. ShardCreationStats *DetailedStats `json:"single_shard_stats,omitempty"` }
ShardCreation contains metrics for third and last phase of DSort.
type SortAlgorithm ¶
type ThroughputStats ¶
type ThroughputStats struct { MinTp int64 `json:"min_throughput,string"` MaxTp int64 `json:"max_throughput,string"` AvgTp int64 `json:"avg_throughput,string"` // contains filtered or unexported fields }
ThroughputStats contains statistics about throughput of specific task.
type TimeStats ¶
type TimeStats struct { // Total contains total number of milliseconds spend on // specific task. Total int64 `json:"total_ms,string"` // Count contains number of time specific task was triggered. Count int64 `json:"count,string"` MinMs int64 `json:"min_ms,string"` MaxMs int64 `json:"max_ms,string"` AvgMs int64 `json:"avg_ms,string"` }
TimeStats contains statistics about time spent on specific task. It calculates min, max and avg times.
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
Package extract provides provides functions for working with compressed files
|
Package extract provides provides functions for working with compressed files |
Package filetype provides the implementation of custom content file type for dsort.
|
Package filetype provides the implementation of custom content file type for dsort. |