submit

package
v1.2.0-beta.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 27, 2023 License: LGPL-3.0 Imports: 22 Imported by: 0

Documentation

Overview

Package submit permits the submission of a job to the DeepSquare Grid.

USAGE:

dps submit [command options] <job.yaml>

OPTIONS:

Submit Settings:

--affinities key<value [ --affinities key<value ]  Affinities flag. Used to filter the clusters. Format: key<value, `key<=value`, `key=value`, `key>=value`, `key>value`, `key!=value` or `key:in:value`.

--credits value                                    Allocated a number of credits. Unit is 1e18. Is a float and is not precise. (default: 0)

--credits-wei value                                Allocated a number of credits. Unit is wei. Is a big int.

--exit-on-job-exit, -e                             Exit the job after the job has finished and throw on error. (default: false)
--job-name value                                   The job name.
--no-timestamp, --no-ts                            Hide timestamp. (default: false)

--uses key=value [ --uses key=value ]              Uses flag. Used to filter the clusters. Format: key=value

--watch, -w                                        Watch logs after submitting the job (default: false)

Index

Constants

This section is empty.

Variables

View Source
var Command = cli.Command{
	Name:      "submit",
	Usage:     "Quickly submit a job.",
	ArgsUsage: "<job.yaml>",
	Flags:     flags,
	Action: func(cCtx *cli.Context) error {
		if cCtx.NArg() < 1 {
			return errors.New("missing arguments")
		}
		if credits == nil {
			return errors.New("missing --credits or --credits-wei parameter")
		}
		jobPath := cCtx.Args().First()
		ctx := cCtx.Context
		pk, err := crypto.HexToECDSA(ethHexPK)
		if err != nil {
			return err
		}
		client, err := deepsquare.NewClient(ctx, &deepsquare.ClientConfig{
			MetaschedulerAddress: common.HexToAddress(metaschedulerSmartContract),
			RPCEndpoint:          ethEndpointRPC,
			SBatchEndpoint:       sbatchEndpoint,
			LoggerEndpoint:       loggerEndpoint,
			UserPrivateKey:       pk,
		})
		if err != nil {
			return err
		}
		defer func() {
			if err := client.Close(); err != nil {
				internallog.I.Error("failed to close client", zap.Error(err))
			}
		}()
		watcher, err := deepsquare.NewWatcher(ctx, &deepsquare.WatcherConfig{
			MetaschedulerAddress: common.HexToAddress(metaschedulerSmartContract),
			RPCEndpoint:          ethEndpointRPC,
			WSEndpoint:           ethEndpointWS,
			UserPrivateKey:       pk,
		})
		if err != nil {
			return err
		}
		defer func() {
			if err := watcher.Close(); err != nil {
				internallog.I.Error("failed to close watcher", zap.Error(err))
			}
		}()
		dat, err := os.ReadFile(jobPath)
		if err != nil {
			return err
		}
		var job sbatch.Job
		if err := yaml.Unmarshal(dat, &job); err != nil {
			return err
		}

		var jobNameB [32]byte
		copy(jobNameB[:], jobName)

		usesLabels := make([]types.Label, 0, len(uses.Value()))
		for _, use := range uses.Value() {
			if key, value, ok := strings.Cut(use, "="); ok {
				usesLabels = append(usesLabels, types.Label{
					Key:   key,
					Value: value,
				})
			}
		}

		affinities := make([]types.Affinity, 0, len(affinitiesSlice.Value()))
		for _, affinity := range affinitiesSlice.Value() {
			k, v, op, err := parseKeyValueOperator(affinity)
			if err != nil {
				internallog.I.Error(
					"failed to parse",
					zap.String("affinity", affinity),
					zap.Error(err),
				)
				return err
			}

			var opB [2]byte
			switch op {
			case ":in:":
				opB = [2]byte{'i', 'n'}
			case "=", "==":
				opB = [2]byte{'=', '='}
			default:
				copy(opB[:], op)
			}
			affinities = append(affinities, types.Affinity{
				Label: metaschedulerabi.Label{
					Key:   k,
					Value: v,
				},
				Op: opB,
			})
		}

		curr, err := client.GetAllowance(ctx)
		if err != nil {
			return err
		}
		if err = client.SetAllowance(ctx, curr.Add(curr, credits)); err != nil {
			return err
		}

		if !watch {
			jobID, err := client.SubmitJob(
				ctx,
				&job,
				credits,
				jobNameB,
				types.WithUse(usesLabels...),
				types.WithAffinity(affinities...),
			)
			if err != nil {
				return err
			}

			jobIDBig := new(big.Int).SetBytes(jobID[:])
			fmt.Printf("job %s submitted\n", jobIDBig.String())
			return nil
		}

		transitions := make(chan types.JobTransition, 1)
		sub, err := watcher.SubscribeEvents(ctx, types.FilterJobTransition(transitions))
		if err != nil {
			return err
		}
		defer sub.Unsubscribe()

		jobID, err := client.SubmitJob(ctx, &job, credits, jobNameB, types.WithUse(usesLabels...))
		if err != nil {
			return err
		}

		jobIDBig := new(big.Int).SetBytes(jobID[:])

		fmt.Printf("---Waiting for job %s to be running...---\n", jobIDBig.String())
		_, err = waitUntilJobRunningOrFinished(sub, transitions, jobID)
		if err != nil {
			fmt.Printf("---Watching transitions has unexpectedly closed---\n%s\n", err)
			return err
		}

		if exitOnJobExit {
			go func() {
				status, err := waitUntilJobFinished(sub, transitions, jobID)
				if err != nil {
					fmt.Printf("---Watching transitions has unexpectedly closed---\n%s\n", err)
					os.Exit(1)
				}
				job, err := client.GetJob(ctx, jobID)
				if err != nil {
					fmt.Printf("failed to fetch job info: %s", err)
					switch status {
					case metascheduler.JobStatusFinished:
						os.Exit(0)
					case metascheduler.JobStatusCancelled:
						os.Exit(130)
					case metascheduler.JobStatusFailed, metascheduler.JobStatusPanicked:
						os.Exit(1)
					case metascheduler.JobStatusOutOfCredits:
						os.Exit(143)
					}
				}
				os.Exit(int(job.ExitCode / 256))
			}()
		}

		stream, err := client.WatchLogs(ctx, jobID)
		if err != nil {
			fmt.Printf("---Watching logs has unexpectedly failed---\n%s\n", err)
			return err
		}
		defer func() {
			_ = stream.CloseSend()
		}()
		for {
			req, err := stream.Recv()
			if err == io.EOF || errors.Is(err, context.Canceled) {
				fmt.Println("---Connection to logging server closed---")
				return nil
			}
			if err != nil {
				fmt.Printf("---Connection to logging server closed unexpectedly---\n%s\n", err)
				return err
			}
			clean := forbiddenReplacer.Replace(string(req.GetData()))
			if noTimestamp {
				fmt.Printf("%s\n", clean)
			} else {
				fmt.Printf("%s:\t%s\n", time.Unix(0, req.GetTimestamp()), clean)
			}
		}
	},
}

Command is the submit subcommand used to submit jobs.

Functions

This section is empty.

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL