Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var AddCmd = &cli.Command{ Name: "add", Usage: "Add a new data source to the dataset", Subcommands: underscore.Map(underscore.Filter(fs.Registry, func(r *fs.RegInfo) bool { return !slices.Contains([]string{"crypt", "memory", "tardigrade"}, r.Prefix) }), func(r *fs.RegInfo) *cli.Command { ws := model.Ready cmd := datasource.OptionsToCLIFlags(r) cmd.Flags = append(cmd.Flags, &cli.BoolFlag{ Name: "delete-after-export", Usage: "[Dangerous] Delete the files of the dataset after exporting it to CAR files. ", Category: "Data Preparation Options", }, &cli.DurationFlag{ Name: "rescan-interval", Usage: "Automatically rescan the source directory when this interval has passed from last successful scan", Category: "Data Preparation Options", DefaultText: "disabled", }, &cli.GenericFlag{ Name: "scanning-state", Usage: "set the initial scanning state", Category: "Data Preparation Options", DefaultText: "ready", Value: &ws, }) cmd.Action = func(c *cli.Context) error { datasetName := c.Args().Get(0) path := c.Args().Get(1) db, closer, err := database.OpenFromCLI(c) if err != nil { return err } defer closer.Close() db = db.WithContext(c.Context) dataset, err := database.FindDatasetByName(db, datasetName) if err != nil { return handler.InvalidParameterError{Err: err} } if path == "" { return handler.NewInvalidParameterErr("path is required") } if r.Prefix == "local" { path, err = filepath.Abs(path) if err != nil { return errors.Wrap(err, "failed to get absolute path") } } deleteAfterExport := c.Bool("delete-after-export") result := map[string]string{} for _, flag := range c.Command.Flags { flagName := flag.Names()[0] if strings.HasPrefix(flagName, r.Prefix) && c.String(flagName) != "" { optionName := strings.SplitN(strings.ReplaceAll(flagName, "-", "_"), "_", 2)[1] reg, err := fs.Find(r.Prefix) if err != nil { return errors.Wrap(err, "failed to find fs") } option, err := underscore.Find(reg.Options, func(o fs.Option) bool { return o.Name == optionName }) if err != nil { return errors.Wrap(err, "failed to find option") } result[option.Name] = c.String(flagName) } } source := model.Source{ DatasetID: dataset.ID, Type: r.Prefix, Path: path, Metadata: model.Metadata(result), ScanIntervalSeconds: 0, ScanningState: ws, DeleteAfterExport: deleteAfterExport, DagGenState: model.Created, } handler, err := datasource.DefaultHandlerResolver{}.Resolve(c.Context, source) if err != nil { return errors.Wrap(err, "failed to resolve handler") } _, err = handler.List(c.Context, "") if err != nil { return errors.Wrap(err, "failed to check source") } err = database.DoRetry(c.Context, func() error { return db.Transaction(func(db *gorm.DB) error { err := db.Create(&source).Error if err != nil { return errors.Wrap(err, "failed to create source") } dir := model.Directory{ Name: path, SourceID: source.ID, } err = db.Create(&dir).Error if err != nil { return errors.Wrap(err, "failed to create directory") } return nil }) }) if err != nil { return cli.Exit(errors.Wrap(err, "failed to add source").Error(), 1) } cliutil.PrintToConsole(source, c.Bool("json"), exclude) return nil } return cmd }), }
View Source
var CheckCmd = &cli.Command{ Name: "check", Usage: "Check a data source by listing its entries. This is not to list prepared files but a direct listing of the data source to verify datasource connection", ArgsUsage: "<source_id> [sub_path]", Description: "This command will list entries in a data source under <sub_path>. " + "If <sub_path> is not provided, it will use the root directory", Action: func(c *cli.Context) error { db, closer, err := database.OpenFromCLI(c) if err != nil { return err } defer closer.Close() entries, err := datasource.CheckSourceHandler( c.Context, db, c.Args().Get(0), datasource.CheckSourceRequest{ Path: c.Args().Get(1), }, ) if err != nil { return err } cliutil.PrintToConsole(entries, c.Bool("json"), nil) return nil }, }
View Source
var DagGenCmd = &cli.Command{ Name: "daggen", Usage: "Generate and export the DAG which represents the full folder structure of the data source", ArgsUsage: "<source_id>", Description: "This step is required for:\n" + " 1. Lookup and download files or folders using unixfs path\n" + " 2. Retrieve file that are splited across multiple pieces/deals", Action: func(c *cli.Context) error { db, closer, err := database.OpenFromCLI(c) if err != nil { return err } defer closer.Close() source, err := datasource.DagGenHandler(c.Context, db, c.Args().Get(0)) if err != nil { return err } cliutil.PrintToConsole(source, c.Bool("json"), exclude) return nil }, }
View Source
var ListCmd = &cli.Command{ Name: "list", Usage: "List all sources", Flags: []cli.Flag{ &cli.StringFlag{ Name: "dataset", Usage: "Filter by dataset name", }, }, Action: func(c *cli.Context) error { db, closer, err := database.OpenFromCLI(c) if err != nil { return err } defer closer.Close() datasetName := c.String("dataset") sources, err := datasource.ListSourcesByDatasetHandler( c.Context, db, datasetName, ) if err != nil { return err } cliutil.PrintToConsole(sources, c.Bool("json"), exclude) return nil }, }
View Source
var RemoveCmd = &cli.Command{ Name: "remove", Usage: "Remove a data source", ArgsUsage: "<source_id>", Flags: []cli.Flag{cliutil.ReallyDotItFlag}, Action: func(c *cli.Context) error { if err := cliutil.HandleReallyDoIt(c); err != nil { return err } db, closer, err := database.OpenFromCLI(c) if err != nil { return err } defer closer.Close() return datasource.RemoveSourceHandler( c.Context, db, c.Args().Get(0), ) }, }
View Source
var RepackCmd = &cli.Command{ Name: "repack", Usage: "Retry packing a packjob or all errored packjobs of a data source", ArgsUsage: "<source_id> or --pack-job-id <pack_job_id>", Flags: []cli.Flag{ &cli.Uint64Flag{ Name: "pack-job-id", Usage: "Pack job ID to retry packing", }, }, Action: func(c *cli.Context) error { db, closer, err := database.OpenFromCLI(c) if err != nil { return err } defer closer.Close() var packJobID *uint64 if c.IsSet("pack-job-id") { c2 := c.Uint64("pack-job-id") packJobID = &c2 } packJobs, err := datasource.RepackHandler( c.Context, db, c.Args().Get(0), datasource.RepackRequest{ PackJobID: packJobID, }, ) if err != nil { return err } cliutil.PrintToConsole(packJobs, c.Bool("json"), nil) return nil }, }
View Source
var RescanCmd = &cli.Command{ Name: "rescan", Usage: "Rescan a data source", ArgsUsage: "<source_id>", Description: "This command will clear any error of a data source scanning work and rescan it", Action: func(c *cli.Context) error { db, closer, err := database.OpenFromCLI(c) if err != nil { return err } defer closer.Close() source, err := datasource.RescanSourceHandler( c.Context, db, c.Args().Get(0), ) if err != nil { return err } cliutil.PrintToConsole(source, c.Bool("json"), exclude) return nil }, }
View Source
var StatusCmd = &cli.Command{ Name: "status", Usage: "Get the data preparation summary of a data source", ArgsUsage: "<source_id>", Action: func(c *cli.Context) error { db, closer, err := database.OpenFromCLI(c) if err != nil { return err } defer closer.Close() result, err := datasource.GetSourceStatusHandler( c.Context, db, c.Args().Get(0), ) if err != nil { return err } if c.Bool("json") { objJSON, err := json.MarshalIndent(result, "", " ") if err != nil { return err } fmt.Println(string(objJSON)) return nil } fmt.Println("Pack jobs by state:") cliutil.PrintToConsole(result.PackJobSummary, c.Bool("json"), nil) fmt.Println("Files by state:") cliutil.PrintToConsole(result.FileSummary, c.Bool("json"), nil) if len(result.FailedPackJobs) > 0 { fmt.Println("Failed pack jobs:") cliutil.PrintToConsole(result.FailedPackJobs, c.Bool("json"), nil) } return nil }, }
View Source
var UpdateCmd = &cli.Command{ Name: "update", Usage: "Update the config options of a source", ArgsUsage: "<source_id>", Flags: func() []cli.Flag { var flags []cli.Flag for _, cmd := range AddCmd.Subcommands { cmdFlags := underscore.Map(cmd.Flags, func(flag cli.Flag) cli.Flag { stringFlag, ok := flag.(*cli.StringFlag) if !ok { return flag } stringFlag.Required = false stringFlag.Category = "Options for " + cmd.Name stringFlag.Aliases = nil return stringFlag }) flags = append(flags, cmdFlags...) } keys := make(map[string]cli.Flag) newFlags := make([]cli.Flag, 0) for _, flag := range flags { if _, ok := keys[flag.Names()[0]]; !ok { keys[flag.Names()[0]] = flag newFlags = append(newFlags, flag) } } return newFlags }(), Action: func(c *cli.Context) error { db, closer, err := database.OpenFromCLI(c) if err != nil { return err } defer closer.Close() config := map[string]any{} for _, name := range c.LocalFlagNames() { if c.IsSet(name) { if name == "delete-after-export" { b := c.Bool(name) config["deleteAfterExport"] = b continue } if name == "rescan-interval" { d := c.String(name) config["rescanInterval"] = d continue } value := c.String(name) config[name] = value } } source, err := datasource.UpdateSourceHandler( c.Context, db, c.Args().Get(0), config, ) if err != nil { return err } cliutil.PrintToConsole(source, c.Bool("json"), nil) return nil }, }
Functions ¶
This section is empty.
Types ¶
This section is empty.
Source Files ¶
Click to show internal directories.
Click to hide internal directories.