ez

package
v0.5.14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 29, 2024 License: Apache-2.0, MIT Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var PrepCmd = &cli.Command{
	Name:      "ez-prep",
	Category:  "Utility",
	Before:    cliutil.CheckNArgs,
	ArgsUsage: "<path>",
	Usage:     "Prepare a dataset from a local path",
	Description: "This commands can be used to prepare a dataset from a local path with minimum configurable parameters.\n" +
		"For more advanced usage, please use the subcommands under `storage` and `data-prep`.\n" +
		"You can also use this command for benchmarking with in-memory database and inline preparation, i.e.\n" +
		"  mkdir dataset\n" +
		"  truncate -s 1024G dataset/1T.bin\n" +
		"  singularity ez-prep --output-dir '' --database-file '' -j $(($(nproc) / 4 + 1)) ./dataset",
	Flags: []cli.Flag{
		&cli.StringFlag{
			Name:    "max-size",
			Aliases: []string{"M"},
			Usage:   "Maximum size of the CAR files to be created",
			Value:   "31.5GiB",
		},
		&cli.StringFlag{
			Name:    "output-dir",
			Aliases: []string{"o"},
			Usage:   "Output directory for CAR files. To use inline preparation, use an empty string",
			Value:   "./cars",
		},
		&cli.IntFlag{
			Name:    "concurrency",
			Aliases: []string{"j"},
			Usage:   "Concurrency for packing",
			Value:   1,
		},
		&cli.StringFlag{
			Name:        "database-file",
			Aliases:     []string{"f"},
			Usage:       "The database file to store the metadata. To use in memory database, use an empty string.",
			DefaultText: "./ezprep-<name>.db",
		},
	},
	Action: func(c *cli.Context) error {
		t := time.Now().Unix()
		path := c.Args().Get(0)
		if path == "" {
			return errors.New("path is required")
		}
		databaseFile := c.String("database-file")
		if databaseFile == "" {
			if c.IsSet("database-file") {
				databaseFile = "file::memory:"
			} else {
				databaseFile = fmt.Sprintf("./ezprep-%d.db", t)
			}
		}
		var err error
		if !strings.HasPrefix(databaseFile, "file::memory") {
			databaseFile, err = filepath.Abs(databaseFile)
			if err != nil {
				return errors.Wrap(err, "failed to get absolute path")
			}
		}
		db, closer, err := database.OpenWithLogger("sqlite:" + databaseFile)
		if err != nil {
			return errors.Wrapf(err, "failed to open database %s", databaseFile)
		}

		defer closer.Close()

		err = admin.Default.InitHandler(c.Context, db)
		if err != nil {
			return errors.WithStack(err)
		}

		outputDir := c.String("output-dir")
		var outputStorages []string
		if outputDir != "" {
			err = os.MkdirAll(outputDir, 0755)
			if err != nil {
				return errors.Wrap(err, "failed to create output directory")
			}

			_, err = storage.Default.CreateStorageHandler(c.Context, db, "local", storage.CreateRequest{
				Name: "output",
				Path: outputDir,
			})
			if err != nil {
				return errors.Wrap(err, "failed to create output storage")
			}
			outputStorages = []string{"output"}
		}

		_, err = storage.Default.CreateStorageHandler(c.Context, db, "local", storage.CreateRequest{
			Name: "source",
			Path: path,
		})
		if err != nil {
			return errors.Wrap(err, "failed to create source storage")
		}

		_, err = dataprep.Default.CreatePreparationHandler(c.Context, db, dataprep.CreateRequest{
			SourceStorages: []string{"source"},
			OutputStorages: outputStorages,
			MaxSizeStr:     c.String("max-size"),
			Name:           "preparation",
		})
		if err != nil {
			return errors.Wrap(err, "failed to create preparation")
		}

		_, err = job.Default.StartScanHandler(c.Context, db, "preparation", "source")
		if err != nil {
			return errors.Wrap(err, "failed to start scan")
		}

		worker := datasetworker.NewWorker(
			db,
			datasetworker.Config{
				Concurrency:    1,
				EnableScan:     true,
				ExitOnComplete: true,
				ExitOnError:    true,
			})
		err = worker.Run(c.Context)
		if err != nil {
			return errors.Wrap(err, "failed to run dataset worker for scanning")
		}

		worker = datasetworker.NewWorker(
			db,
			datasetworker.Config{
				Concurrency:    c.Int("concurrency"),
				EnablePack:     true,
				ExitOnComplete: true,
				ExitOnError:    true,
			})
		err = worker.Run(c.Context)
		if err != nil {
			return errors.Wrap(err, "failed to run dataset worker for packing")
		}

		_, err = job.Default.StartDagGenHandler(c.Context, db, "preparation", "source")
		if err != nil {
			return errors.Wrap(err, "failed to start dag gen")
		}

		worker = datasetworker.NewWorker(
			db,
			datasetworker.Config{
				Concurrency:    1,
				EnableDag:      true,
				ExitOnComplete: true,
				ExitOnError:    true,
			})
		err = worker.Run(c.Context)
		if err != nil {
			return errors.Wrap(err, "failed to run dataset worker")
		}

		pieceLists, err := dataprep.Default.ListPiecesHandler(
			c.Context, db, "preparation",
		)
		if err != nil {
			return errors.Wrap(err, "failed to list pieces")
		}

		cliutil.Print(c, pieceLists[0].Pieces)
		return nil
	},
}

Functions

This section is empty.

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL