submit

package
v1.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 13, 2023 License: LGPL-3.0 Imports: 24 Imported by: 0

Documentation

Overview

Package submit permits the submission of a job to the DeepSquare Grid.

USAGE:

dps submit [command options] <job.yaml>

OPTIONS:

Submit Settings:

--affinities key<value [ --affinities key<value ]  Affinities flag. Used to filter the clusters. Format: key<value, `key<=value`, `key=value`, `key>=value`, `key>value`, `key!=value` or `key:in:value`.

--credits value                                    Allocated a number of credits. Unit is 1e18. Is a float and is not precise. (default: 0)

--credits-wei value                                Allocated a number of credits. Unit is wei. Is a big int.

--exit-on-job-exit, -e                             Exit the job after the job has finished and throw on error. (default: false)
--job-name value                                   The job name.
--no-timestamp, --no-ts                            Hide timestamp. (default: false)

--uses key=value [ --uses key=value ]              Uses flag. Used to filter the clusters. Format: key=value

--watch, -w                                        Watch logs after submitting the job (default: false)

Index

Constants

This section is empty.

Variables

View Source
var Command = cli.Command{
	Name:      "submit",
	Usage:     "Quickly submit a job.",
	ArgsUsage: "<job.yaml>",
	Flags:     flags,
	Action: func(cCtx *cli.Context) error {
		if cCtx.NArg() < 1 {
			return errors.New("missing arguments")
		}
		if credits == nil {
			return errors.New("missing --credits or --credits-wei parameter")
		}
		jobPath := cCtx.Args().First()
		ctx := cCtx.Context
		pk, err := crypto.HexToECDSA(ethHexPK)
		if err != nil {
			return err
		}
		client, err := deepsquare.NewClient(ctx, &deepsquare.ClientConfig{
			MetaschedulerAddress:        common.HexToAddress(metaschedulerSmartContract),
			RPCEndpoint:                 ethEndpointRPC,
			SBatchEndpoint:              sbatchEndpoint,
			LoggerEndpoint:              loggerEndpoint,
			MetaschedulerOracleEndpoint: metaschedulerOracleEndpoint,
			UserPrivateKey:              pk,
		})
		if err != nil {
			return err
		}
		defer func() {
			if err := client.Close(); err != nil {
				internallog.I.Error("failed to close client", zap.Error(err))
			}
		}()
		watcher, err := deepsquare.NewWatcher(ctx, &deepsquare.WatcherConfig{
			MetaschedulerAddress: common.HexToAddress(metaschedulerSmartContract),
			RPCEndpoint:          ethEndpointRPC,
			WSEndpoint:           ethEndpointWS,
			UserPrivateKey:       pk,
		})
		if err != nil {
			return err
		}
		defer func() {
			if err := watcher.Close(); err != nil {
				internallog.I.Error("failed to close watcher", zap.Error(err))
			}
		}()
		dat, err := os.ReadFile(jobPath)
		if err != nil {
			return err
		}
		var job sbatch.Job
		if err := yaml.Unmarshal(dat, &job); err != nil {
			return err
		}

		var jobNameB [32]byte
		copy(jobNameB[:], jobName)

		usesLabels := make([]types.Label, 0, len(uses.Value()))
		for _, use := range uses.Value() {
			if key, value, ok := strings.Cut(use, "="); ok {
				usesLabels = append(usesLabels, types.Label{
					Key:   key,
					Value: value,
				})
			}
		}

		affinities := make([]types.Affinity, 0, len(affinitiesSlice.Value()))
		for _, affinity := range affinitiesSlice.Value() {
			k, v, op, err := parseKeyValueOperator(affinity)
			if err != nil {
				internallog.I.Error(
					"failed to parse",
					zap.String("affinity", affinity),
					zap.Error(err),
				)
				return err
			}

			var opB [2]byte
			switch op {
			case ":in:":
				opB = [2]byte{'i', 'n'}
			case "=", "==":
				opB = [2]byte{'=', '='}
			default:
				copy(opB[:], op)
			}
			affinities = append(affinities, types.Affinity{
				Label: metaschedulerabi.Label{
					Key:   k,
					Value: v,
				},
				Op: opB,
			})
		}

		curr, err := client.GetAllowance(ctx)
		if err != nil {
			return err
		}
		if err = client.SetAllowance(ctx, curr.Add(curr, credits)); err != nil {
			return err
		}

		if !watch {
			jobID, err := client.SubmitJob(
				ctx,
				&job,
				credits,
				jobNameB,
				types.WithUse(usesLabels...),
				types.WithAffinity(affinities...),
			)
			if err != nil {
				return err
			}

			jobIDBig := new(big.Int).SetBytes(jobID[:])
			fmt.Printf("job %s submitted\n", jobIDBig.String())
			return nil
		}

		transitions := make(chan types.JobTransition, 1)
		sub, err := watcher.SubscribeEvents(ctx, types.FilterJobTransition(transitions))
		if err != nil {
			return err
		}
		defer sub.Unsubscribe()

		jobID, err := client.SubmitJob(ctx, &job, credits, jobNameB, types.WithUse(usesLabels...))
		if err != nil {
			return err
		}

		jobIDBig := new(big.Int).SetBytes(jobID[:])

		fmt.Printf("---Waiting for job %s to be running...---\n", jobIDBig.String())
		var finished = false
		var allocatedProviderAddress common.Address
		var provider types.ProviderDetail
		msOrSchedLen, runningLen := int64(0), int64(0)

	loop:
		for {
			select {
			case tr := <-transitions:
				if (allocatedProviderAddress != common.Address{}) {
					jobs, err := client.GetJobsByProvider(ctx, allocatedProviderAddress)
					if err != nil {
						internallog.I.Warn("failed to fetch running jobs info", zap.Error(err))
					}
					msLen, rLen := reduceJobsIntoRunningOrScheduledLens(jobs)
					if len(jobs) > 1 && msOrSchedLen > 0 && (msOrSchedLen != msLen || runningLen != rLen) {
						waitingTime, err := computeWaitingTime(jobID, provider, jobs)
						if err != nil {
							internallog.I.Fatal("failed to compute waiting time", zap.Error(err))
						}
						fmt.Printf("(%d jobs in provider queue: %d waiting, %d running, wait ~%s)\n", len(jobs), msLen, rLen, waitingTime)
					}
					msOrSchedLen, runningLen = msLen, rLen
				}

				if bytes.Equal(jobID[:], tr.JobId[:]) {
					fmt.Printf("(Job is %s)\n", metascheduler.JobStatus(tr.To))
					switch metascheduler.JobStatus(tr.To) {
					case metascheduler.JobStatusMetaScheduled,
						metascheduler.JobStatusScheduled:

						job, err := client.GetJob(ctx, jobID)
						if err != nil {
							internallog.I.Fatal("failed to fetch job info", zap.Error(err))
						}
						allocatedProviderAddress = job.ProviderAddr
						jobs, err := client.GetJobsByProvider(ctx, allocatedProviderAddress)
						if err != nil {
							internallog.I.Warn("failed to fetch running jobs info", zap.Error(err))
						}
						p, err := client.GetProvider(ctx, allocatedProviderAddress)
						if err != nil {
							internallog.I.Fatal("failed to get provider info", zap.Error(err))
						}
						provider = p
						msLen, rLen := reduceJobsIntoRunningOrScheduledLens(jobs)
						if len(jobs) > 1 && msOrSchedLen > 0 && (msOrSchedLen != msLen || runningLen != rLen) {
							waitingTime, err := computeWaitingTime(jobID, provider, jobs)
							if err != nil {
								internallog.I.Fatal("failed to compute waiting time", zap.Error(err))
							}
							fmt.Printf("(%d jobs in provider queue: %d waiting, %d running, wait ~%s)\n", len(jobs), msLen, rLen, waitingTime)
						}
						msOrSchedLen, runningLen = msLen, rLen

					case metascheduler.JobStatusCancelled,
						metascheduler.JobStatusFailed,
						metascheduler.JobStatusFinished,
						metascheduler.JobStatusPanicked,
						metascheduler.JobStatusOutOfCredits:
						finished = true
						break loop
					case metascheduler.JobStatusRunning:
						finished = false
						break loop
					}
				}
			case err := <-sub.Err():
				fmt.Printf("---Watching transitions has unexpectedly closed---\n%s\n", err)
				return err
			}
		}

		if exitOnJobExit {
			go func() {
				if !finished {
					cleanChan := make(chan os.Signal, 1)
					signal.Notify(cleanChan, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
					go func() {
						<-cleanChan
						fmt.Printf("\nWARNING: Your job %s is still running.\n", jobIDBig.String())
						os.Exit(1)
					}()

					_, err := waitUntilJobFinished(sub, transitions, jobID)
					if err != nil {
						fmt.Printf("---Watching transitions has unexpectedly closed---\n%s\n", err)
						os.Exit(1)
					}
				}

				job, err := client.GetJob(ctx, jobID)
				if err != nil {
					internallog.I.Fatal("failed to fetch job info", zap.Error(err))
				}
				os.Exit(int(job.ExitCode / 256))
			}()
		}

		stream, err := client.WatchLogs(ctx, jobID)
		if err != nil {
			fmt.Printf("---Watching logs has unexpectedly failed---\n%s\n", err)
			return err
		}
		defer func() {
			_ = stream.CloseSend()
		}()
		for {
			req, err := stream.Recv()
			if err == io.EOF || errors.Is(err, context.Canceled) {
				fmt.Println("---Connection to logging server closed---")
				return nil
			}
			if err != nil {
				fmt.Printf("---Connection to logging server closed unexpectedly---\n%s\n", err)
				return err
			}
			clean := forbiddenReplacer.Replace(string(req.GetData()))
			if noTimestamp {
				fmt.Printf("%s\n", clean)
			} else {
				fmt.Printf("%s:\t%s\n", time.Unix(0, req.GetTimestamp()), clean)
			}
		}
	},
}

Command is the submit subcommand used to submit jobs.

Functions

This section is empty.

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL