datasource

package
v0.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 5, 2023 License: Apache-2.0, MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var AddCmd = &cli.Command{
	Name:  "add",
	Usage: "Add a new data source to the dataset",
	Subcommands: underscore.Map(underscore.Filter(fs.Registry, func(r *fs.RegInfo) bool {
		return !slices.Contains([]string{"crypt", "memory", "tardigrade"}, r.Prefix)
	}), func(r *fs.RegInfo) *cli.Command {
		ws := model.Ready
		cmd := datasource.OptionsToCLIFlags(r)
		cmd.Flags = append(cmd.Flags, &cli.BoolFlag{
			Name:     "delete-after-export",
			Usage:    "[Dangerous] Delete the files of the dataset after exporting it to CAR files. ",
			Category: "Data Preparation Options",
		}, &cli.DurationFlag{
			Name:        "rescan-interval",
			Usage:       "Automatically rescan the source directory when this interval has passed from last successful scan",
			Category:    "Data Preparation Options",
			DefaultText: "disabled",
		}, &cli.GenericFlag{
			Name:        "scanning-state",
			Usage:       "set the initial scanning state",
			Category:    "Data Preparation Options",
			DefaultText: "ready",
			Value:       &ws,
		})
		cmd.Action = func(c *cli.Context) error {
			datasetName := c.Args().Get(0)
			path := c.Args().Get(1)
			db, closer, err := database.OpenFromCLI(c)
			if err != nil {
				return err
			}
			defer closer.Close()
			db = db.WithContext(c.Context)
			dataset, err := database.FindDatasetByName(db, datasetName)
			if err != nil {
				return handler.InvalidParameterError{Err: err}
			}
			if path == "" {
				return handler.NewInvalidParameterErr("path is required")
			}
			if r.Prefix == "local" {
				path, err = filepath.Abs(path)
				if err != nil {
					return errors.Wrap(err, "failed to get absolute path")
				}
			}
			deleteAfterExport := c.Bool("delete-after-export")
			result := map[string]string{}
			for _, flag := range c.Command.Flags {
				flagName := flag.Names()[0]
				if strings.HasPrefix(flagName, r.Prefix) && c.String(flagName) != "" {
					optionName := strings.SplitN(strings.ReplaceAll(flagName, "-", "_"), "_", 2)[1]
					reg, err := fs.Find(r.Prefix)
					if err != nil {
						return errors.Wrap(err, "failed to find fs")
					}
					option, err := underscore.Find(reg.Options, func(o fs.Option) bool {
						return o.Name == optionName
					})
					if err != nil {
						return errors.Wrap(err, "failed to find option")
					}
					result[option.Name] = c.String(flagName)
				}
			}
			source := model.Source{
				DatasetID:           dataset.ID,
				Type:                r.Prefix,
				Path:                path,
				Metadata:            model.Metadata(result),
				ScanIntervalSeconds: 0,
				ScanningState:       ws,
				DeleteAfterExport:   deleteAfterExport,
				DagGenState:         model.Created,
			}

			handler, err := datasource.DefaultHandlerResolver{}.Resolve(c.Context, source)
			if err != nil {
				return errors.Wrap(err, "failed to resolve handler")
			}

			_, err = handler.List(c.Context, "")
			if err != nil {
				return errors.Wrap(err, "failed to check source")
			}

			err = database.DoRetry(c.Context, func() error {
				return db.Transaction(func(db *gorm.DB) error {
					err := db.Create(&source).Error
					if err != nil {
						return errors.Wrap(err, "failed to create source")
					}
					dir := model.Directory{
						Name:     path,
						SourceID: source.ID,
					}
					err = db.Create(&dir).Error
					if err != nil {
						return errors.Wrap(err, "failed to create directory")
					}
					return nil
				})
			})
			if err != nil {
				return cli.Exit(errors.Wrap(err, "failed to add source").Error(), 1)
			}

			cliutil.PrintToConsole(source, c.Bool("json"), exclude)
			return nil
		}
		return cmd
	}),
}
View Source
var CheckCmd = &cli.Command{
	Name:      "check",
	Usage:     "Check a data source by listing its entries. This is not to list prepared files but a direct listing of the data source to verify datasource connection",
	ArgsUsage: "<source_id> [sub_path]",
	Description: "This command will list entries in a data source under <sub_path>. " +
		"If <sub_path> is not provided, it will use the root directory",
	Action: func(c *cli.Context) error {
		db, closer, err := database.OpenFromCLI(c)
		if err != nil {
			return err
		}
		defer closer.Close()
		entries, err := datasource.CheckSourceHandler(
			c.Context,
			db,
			c.Args().Get(0),
			datasource.CheckSourceRequest{
				Path: c.Args().Get(1),
			},
		)
		if err != nil {
			return err
		}

		cliutil.PrintToConsole(entries, c.Bool("json"), nil)
		return nil
	},
}
View Source
var DagGenCmd = &cli.Command{
	Name:      "daggen",
	Usage:     "Generate and export the DAG which represents the full folder structure of the data source",
	ArgsUsage: "<source_id>",
	Description: "This step is required for:\n" +
		"  1. Lookup and download files or folders using unixfs path\n" +
		"  2. Retrieve file that are splited across multiple pieces/deals",
	Action: func(c *cli.Context) error {
		db, closer, err := database.OpenFromCLI(c)
		if err != nil {
			return err
		}
		defer closer.Close()
		source, err := datasource.DagGenHandler(c.Context, db, c.Args().Get(0))
		if err != nil {
			return err
		}
		cliutil.PrintToConsole(source, c.Bool("json"), exclude)
		return nil
	},
}
View Source
var ListCmd = &cli.Command{
	Name:  "list",
	Usage: "List all sources",
	Flags: []cli.Flag{
		&cli.StringFlag{
			Name:  "dataset",
			Usage: "Filter by dataset name",
		},
	},
	Action: func(c *cli.Context) error {
		db, closer, err := database.OpenFromCLI(c)
		if err != nil {
			return err
		}
		defer closer.Close()
		datasetName := c.String("dataset")
		sources, err := datasource.ListSourcesByDatasetHandler(
			c.Context,
			db,
			datasetName,
		)
		if err != nil {
			return err
		}
		cliutil.PrintToConsole(sources, c.Bool("json"), exclude)
		return nil
	},
}
View Source
var RemoveCmd = &cli.Command{
	Name:      "remove",
	Usage:     "Remove a data source",
	ArgsUsage: "<source_id>",
	Flags:     []cli.Flag{cliutil.ReallyDotItFlag},
	Action: func(c *cli.Context) error {
		if err := cliutil.HandleReallyDoIt(c); err != nil {
			return err
		}
		db, closer, err := database.OpenFromCLI(c)
		if err != nil {
			return err
		}
		defer closer.Close()
		return datasource.RemoveSourceHandler(
			c.Context,
			db,
			c.Args().Get(0),
		)
	},
}
View Source
var RepackCmd = &cli.Command{
	Name:      "repack",
	Usage:     "Retry packing a packjob or all errored packjobs of a data source",
	ArgsUsage: "<source_id> or --pack-job-id <pack_job_id>",
	Flags: []cli.Flag{
		&cli.Uint64Flag{
			Name:  "pack-job-id",
			Usage: "Pack job ID to retry packing",
		},
	},
	Action: func(c *cli.Context) error {
		db, closer, err := database.OpenFromCLI(c)
		if err != nil {
			return err
		}
		defer closer.Close()
		var packJobID *uint64
		if c.IsSet("pack-job-id") {
			c2 := c.Uint64("pack-job-id")
			packJobID = &c2
		}
		packJobs, err := datasource.RepackHandler(
			c.Context,
			db,
			c.Args().Get(0),
			datasource.RepackRequest{
				PackJobID: packJobID,
			},
		)
		if err != nil {
			return err
		}

		cliutil.PrintToConsole(packJobs, c.Bool("json"), nil)
		return nil
	},
}
View Source
var RescanCmd = &cli.Command{
	Name:        "rescan",
	Usage:       "Rescan a data source",
	ArgsUsage:   "<source_id>",
	Description: "This command will clear any error of a data source scanning work and rescan it",
	Action: func(c *cli.Context) error {
		db, closer, err := database.OpenFromCLI(c)
		if err != nil {
			return err
		}
		defer closer.Close()
		source, err := datasource.RescanSourceHandler(
			c.Context,
			db,
			c.Args().Get(0),
		)
		if err != nil {
			return err
		}

		cliutil.PrintToConsole(source, c.Bool("json"), exclude)
		return nil
	},
}
View Source
var StatusCmd = &cli.Command{
	Name:      "status",
	Usage:     "Get the data preparation summary of a data source",
	ArgsUsage: "<source_id>",
	Action: func(c *cli.Context) error {
		db, closer, err := database.OpenFromCLI(c)
		if err != nil {
			return err
		}
		defer closer.Close()
		result, err := datasource.GetSourceStatusHandler(
			c.Context,
			db,
			c.Args().Get(0),
		)
		if err != nil {
			return err
		}

		if c.Bool("json") {
			objJSON, err := json.MarshalIndent(result, "", "  ")
			if err != nil {
				return err
			}
			fmt.Println(string(objJSON))
			return nil
		}

		fmt.Println("Pack jobs by state:")
		cliutil.PrintToConsole(result.PackJobSummary, c.Bool("json"), nil)
		fmt.Println("Files by state:")
		cliutil.PrintToConsole(result.FileSummary, c.Bool("json"), nil)
		if len(result.FailedPackJobs) > 0 {
			fmt.Println("Failed pack jobs:")
			cliutil.PrintToConsole(result.FailedPackJobs, c.Bool("json"), nil)
		}
		return nil
	},
}
View Source
var UpdateCmd = &cli.Command{
	Name:      "update",
	Usage:     "Update the config options of a source",
	ArgsUsage: "<source_id>",
	Flags: func() []cli.Flag {
		var flags []cli.Flag
		for _, cmd := range AddCmd.Subcommands {
			cmdFlags := underscore.Map(cmd.Flags, func(flag cli.Flag) cli.Flag {
				stringFlag, ok := flag.(*cli.StringFlag)
				if !ok {
					return flag
				}
				stringFlag.Required = false
				stringFlag.Category = "Options for " + cmd.Name
				stringFlag.Aliases = nil
				return stringFlag
			})
			flags = append(flags, cmdFlags...)
		}
		keys := make(map[string]cli.Flag)
		newFlags := make([]cli.Flag, 0)
		for _, flag := range flags {
			if _, ok := keys[flag.Names()[0]]; !ok {
				keys[flag.Names()[0]] = flag
				newFlags = append(newFlags, flag)
			}
		}
		return newFlags
	}(),
	Action: func(c *cli.Context) error {
		db, closer, err := database.OpenFromCLI(c)
		if err != nil {
			return err
		}
		defer closer.Close()
		config := map[string]any{}
		for _, name := range c.LocalFlagNames() {
			if c.IsSet(name) {
				if name == "delete-after-export" {
					b := c.Bool(name)
					config["deleteAfterExport"] = b
					continue
				}
				if name == "rescan-interval" {
					d := c.String(name)
					config["rescanInterval"] = d
					continue
				}
				value := c.String(name)
				config[name] = value
			}
		}

		source, err := datasource.UpdateSourceHandler(
			c.Context,
			db,
			c.Args().Get(0),
			config,
		)
		if err != nil {
			return err
		}

		cliutil.PrintToConsole(source, c.Bool("json"), nil)
		return nil
	},
}

Functions

This section is empty.

Types

This section is empty.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL