Documentation ¶
Overview ¶
Package submit permits the submission of a job to the DeepSquare Grid.
USAGE:
deepsquaretui submit [command options] <job.yaml>
OPTIONS:
Submit Settings:
--affinities key<value [ --affinities key<value ] Affinities flag. Used to filter the clusters. Format: key<value, `key<=value`, `key=value`, `key>=value`, `key>value`, `key!=value` --credits value Allocated a number of credits. Unit is 1e18. Is a float and is not precise. (default: 0) --credits-wei value Allocated a number of credits. Unit is wei. Is a big int. --exit-on-job-exit, -e Exit the job after the job has finished and throw on error. (default: false) --job-name value The job name. --no-timestamp, --no-ts Hide timestamp. (default: false) --uses key=value [ --uses key=value ] Uses flag. Used to filter the clusters. Format: key=value --watch, -w Watch logs after submitting the job (default: false)
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var Command = cli.Command{ Name: "submit", Usage: "Quickly submit a job.", ArgsUsage: "<job.yaml>", Flags: flags, Action: func(cCtx *cli.Context) error { if cCtx.NArg() < 1 { return errors.New("missing arguments") } if credits == nil { return errors.New("missing --credits or --credits-wei parameter") } jobPath := cCtx.Args().First() ctx := cCtx.Context pk, err := crypto.HexToECDSA(ethHexPK) if err != nil { return err } client, err := deepsquare.NewClient(ctx, &deepsquare.ClientConfig{ MetaschedulerAddress: common.HexToAddress(metaschedulerSmartContract), RPCEndpoint: ethEndpointRPC, SBatchEndpoint: sbatchEndpoint, LoggerEndpoint: loggerEndpoint, UserPrivateKey: pk, }) if err != nil { return err } defer func() { if err := client.Close(); err != nil { internallog.I.Error("failed to close client", zap.Error(err)) } }() watcher, err := deepsquare.NewWatcher(ctx, &deepsquare.WatcherConfig{ MetaschedulerAddress: common.HexToAddress(metaschedulerSmartContract), RPCEndpoint: ethEndpointRPC, WSEndpoint: ethEndpointWS, UserPrivateKey: pk, }) if err != nil { return err } defer func() { if err := watcher.Close(); err != nil { internallog.I.Error("failed to close watcher", zap.Error(err)) } }() dat, err := os.ReadFile(jobPath) if err != nil { return err } var job sbatch.Job if err := yaml.Unmarshal(dat, &job); err != nil { return err } var jobNameB [32]byte copy(jobNameB[:], jobName) usesLabels := make([]types.Label, 0, len(uses.Value())) for _, use := range uses.Value() { if key, value, ok := strings.Cut(use, "="); ok { usesLabels = append(usesLabels, types.Label{ Key: key, Value: value, }) } } affinities := make([]types.Affinity, 0, len(affinitiesSlice.Value())) for _, affinity := range affinitiesSlice.Value() { k, v, op, err := parseKeyValueOperator(affinity) if err != nil { internallog.I.Error( "failed to parse", zap.String("affinity", affinity), zap.Error(err), ) return err } var opB [2]byte copy(opB[:], op) affinities = append(affinities, types.Affinity{ Label: metaschedulerabi.Label{ Key: k, Value: v, }, Op: opB, }) } curr, err := client.GetAllowance(ctx) if err != nil { return err } if err = client.SetAllowance(ctx, curr.Add(curr, credits)); err != nil { return err } if !watch { jobID, err := client.SubmitJob( ctx, &job, credits, jobNameB, types.WithUse(usesLabels...), types.WithAffinity(affinities...), ) if err != nil { return err } fmt.Printf("job %s submitted\n", hexutil.Encode(jobID[:])) return nil } transitions := make(chan types.JobTransition, 1) sub, err := watcher.SubscribeEvents(ctx, types.FilterJobTransition(transitions)) if err != nil { return err } defer sub.Unsubscribe() jobID, err := client.SubmitJob(ctx, &job, credits, jobNameB, types.WithUse(usesLabels...)) if err != nil { return err } fmt.Println("---Waiting for job to be running...---") _, err = waitUntilJobRunningOrFinished(sub, transitions, jobID) if err != nil { fmt.Printf("---Watching transitions has unexpectedly closed---\n%s\n", err) return err } if exitOnJobExit { go func() { status, err := waitUntilJobFinished(sub, transitions, jobID) if err != nil { fmt.Printf("---Watching transitions has unexpectedly closed---\n%s\n", err) os.Exit(1) } switch status { case metascheduler.JobStatusFinished: os.Exit(0) case metascheduler.JobStatusCancelled: os.Exit(130) case metascheduler.JobStatusFailed, metascheduler.JobStatusPanicked: os.Exit(1) case metascheduler.JobStatusOutOfCredits: os.Exit(143) } }() } stream, err := client.WatchLogs(ctx, jobID) if err != nil { fmt.Printf("---Watching logs has unexpectedly failed---\n%s\n", err) return err } defer func() { _ = stream.CloseSend() }() for { req, err := stream.Recv() if err == io.EOF || errors.Is(err, context.Canceled) { fmt.Println("---Connection to logging server closed---") return nil } if err != nil { fmt.Printf("---Connection to logging server closed unexpectedly---\n%s\n", err) return err } clean := forbiddenReplacer.Replace(string(req.GetData())) if noTimestamp { fmt.Printf("%s\n", clean) } else { fmt.Printf("%s:\t%s\n", time.Unix(0, req.GetTimestamp()), clean) } } }, }
Command is the submit subcommand used to submit jobs.
Functions ¶
This section is empty.
Types ¶
This section is empty.
Click to show internal directories.
Click to hide internal directories.