cmdgc

package
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 20, 2024 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var Command = &command.C{
	Name: "gc",
	Help: `Garbage-collect objects not reachable from known roots.

If no roots are defined, an error is reported without making any changes
unless -force is set. This avoids accidentally deleting everything in a
store without roots.
`,

	SetFlags: command.Flags(flax.MustBind, &gcFlags),

	Run: command.Adapt(func(env *command.Env) error {
		cfg := env.Config.(*config.Settings)
		return cfg.WithStore(env.Context(), func(s config.CAS) error {
			var keys []string
			if err := s.Roots().List(env.Context(), "", func(key string) error {
				keys = append(keys, key)
				return nil
			}); err != nil {
				return fmt.Errorf("listing roots: %w", err)
			}

			if len(keys) == 0 && !gcFlags.Force {
				return errors.New("there are no root keys defined")
			} else if len(keys) == 0 {
				fmt.Fprint(env, `>> WARNING <<
* No root keys found!
* Proceeding with collection anyway because -force is set

`)
			}

			n, err := s.Len(env.Context())
			if err != nil {
				return err
			} else if n == 0 {
				return errors.New("the store is empty")
			}
			var idxs []*index.Index
			idx := index.New(int(n), &index.Options{FalsePositiveRate: 0.01})
			fmt.Fprintf(env, "Begin GC of %d objects from %d roots\n", n, len(keys))
			dprintf(env, "Roots: %s\n", wrap(keys, 90, "  ", ", "))

			for i := 0; i < len(keys); i++ {
				key := keys[i]
				rp, err := root.Open(env.Context(), s.Roots(), key)
				if err != nil {
					return fmt.Errorf("opening %q: %w", key, err)
				}
				idx.Add(key)

				if rp.IndexKey != "" {
					rpi, err := config.LoadIndex(env.Context(), s, rp.IndexKey)
					if err != nil {
						return err
					}
					idxs = append(idxs, rpi)
					idx.Add(rp.IndexKey)
					dprintf(env, "Loaded cached index for %q (%s)\n", key, config.FormatKey(rp.IndexKey))
					continue
				}

				if gcFlags.RequireIndex {
					return fmt.Errorf("missing required index for %q", key)
				}

				rf, err := rp.File(env.Context(), s)
				if err != nil {
					return fmt.Errorf("opening %q: %w", rp.FileKey, err)
				}
				idx.Add(rp.FileKey)

				dprintf(env, "Scanning data reachable from %q (%s)...\n",
					config.PrintableKey(key), config.FormatKey(rp.FileKey))
				scanned := mapset.New[string]()
				start := time.Now()
				if err := rf.Scan(env.Context(), func(si file.ScanItem) bool {
					key := si.Key()
					if scanned.Has(key) {
						return false
					}
					scanned.Add(key)
					idx.Add(key)
					for _, dkey := range si.Data().Keys() {
						idx.Add(dkey)
					}
					return true
				}); err != nil {
					return fmt.Errorf("scanning %q: %w", key, err)
				}
				dprintf(env, "Finished scanning %d objects [%v elapsed]\n",
					idx.Len(), time.Since(start).Truncate(10*time.Millisecond))
			}
			idxs = append(idxs, idx)

			ctx, cancel := context.WithCancelCause(env.Context())
			defer cancel(nil)
			if gcFlags.Limit > 0 {
				time.AfterFunc(gcFlags.Limit, func() { cancel(errSweepLimit) })
				fmt.Fprintf(env, "Begin sweep over %d objects (limit %v)...\n", n, gcFlags.Limit)
			} else {
				fmt.Fprintf(env, "Begin sweep over %d objects...\n", n)
			}
			g, run := taskgroup.New(taskgroup.Listen(cancel)).Limit(gcFlags.Tasks)

			start := time.Now()
			var numKeep, numDrop atomic.Int64
			pb := pbar.New(env, n).Start()
			for _, p := range shuffledSeeds() {
				pfx := string([]byte{p})
				run(func() error {
					return s.List(ctx, pfx, func(key string) error {
						if !strings.HasPrefix(key, pfx) {
							return blob.ErrStopListing
						}
						pb.Add(1)
						for _, idx := range idxs {
							if idx.Has(key) {
								numKeep.Add(1)
								return nil
							}
						}
						pb.SetMeta(numDrop.Add(1))
						if err := s.Delete(ctx, key); err != nil && !errors.Is(err, context.Canceled) {
							log.Printf("WARNING: delete key %s: %v", config.FormatKey(key), err)
						}
						return nil
					})
				})
			}
			serr := g.Wait()
			pb.Stop()
			fmt.Fprintln(env, " *")
			if serr != nil {
				if errors.Is(context.Cause(ctx), errSweepLimit) {
					fmt.Fprintln(env, "(sweep limit reached)")
				} else {
					return fmt.Errorf("sweeping failed: %w", serr)
				}
			}
			fmt.Fprintf(env, "GC complete: keep %d, drop %d [%v elapsed]\n",
				numKeep.Load(), numDrop.Load(), time.Since(start).Truncate(10*time.Millisecond))
			return nil
		})
	}),
}

Functions

This section is empty.

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL