Documentation ¶
Overview ¶
Package submit permits the submission of a job to the DeepSquare Grid.
USAGE:
dps submit [command options] <job.yaml>
OPTIONS:
--metascheduler.oracle value Metascheduler Oracle endpoint. (default: "https://meta-scheduler.deepsquare.run") [$METASCHEDULER_ORACLE] DeepSquare Settings: --logger.endpoint value Grid Logger endpoint. (default: "https://grid-logger.deepsquare.run") [$LOGGER_ENDPOINT] --metascheduler.rpc value Metascheduler Avalanche C-Chain JSON-RPC endpoint. (default: "https://testnet.deepsquare.run/rpc") [$METASCHEDULER_RPC] --metascheduler.smart-contract value Metascheduler smart-contract address. [$METASCHEDULER_SMART_CONTRACT] --metascheduler.ws value Metascheduler Avalanche C-Chain WS endpoint. (default: "wss://testnet.deepsquare.run/ws") [$METASCHEDULER_WS] --private-key value An hexadecimal private key for ethereum transactions. [$ETH_PRIVATE_KEY] --sbatch.endpoint value SBatch Service GraphQL endpoint. (default: "https://sbatch.deepsquare.run/graphql") [$SBATCH_ENDPOINT] Submit Settings: --affinities key<value [ --affinities key<value ] Affinities flag. Used to filter the clusters. Format: key<value, `key<=value`, `key=value`, `key>=value`, `key>value`, `key!=value` or `key:in:value` --credits value Allocated a number of credits. Unit is 1e18. Is a float and is not precise. (default: 0) --credits-wei value Allocated a number of credits. Unit is wei. Is a big int. --exit-on-job-exit, -e Exit the job after the job has finished and throw on error. (default: false) --job-name value The job name. --no-timestamp, --no-ts Hide timestamp. (default: false) --uses key=value [ --uses key=value ] Uses flag. Used to filter the clusters. Format: key=value --watch, -w Watch logs after submitting the job (default: false)
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var Command = cli.Command{ Name: "submit", Usage: "Quickly submit a job.", ArgsUsage: "<job.yaml>", Flags: flags, Action: func(cCtx *cli.Context) error { if cCtx.NArg() < 1 { return errors.New("missing arguments") } if credits == nil { return errors.New("missing --credits or --credits-wei parameter") } jobPath := cCtx.Args().First() if jobName == "" { jobName = filepath.Base(strings.TrimSuffix(jobPath, filepath.Ext(jobPath))) } ctx := cCtx.Context pk, err := utils.GetPrivateKey(ethHexPK, ethHexPKPath) if err != nil { return err } client, err := deepsquare.NewClient(ctx, &deepsquare.ClientConfig{ MetaschedulerAddress: common.HexToAddress(metaschedulerSmartContract), RPCEndpoint: ethEndpointRPC, SBatchEndpoint: sbatchEndpoint, LoggerEndpoint: loggerEndpoint, MetaschedulerOracleEndpoint: metaschedulerOracleEndpoint, UserPrivateKey: pk, }) if err != nil { return err } defer func() { if err := client.Close(); err != nil { internallog.I.Error("failed to close client", zap.Error(err)) } }() watcher, err := deepsquare.NewWatcher(ctx, &deepsquare.WatcherConfig{ MetaschedulerAddress: common.HexToAddress(metaschedulerSmartContract), RPCEndpoint: ethEndpointRPC, WSEndpoint: ethEndpointWS, UserPrivateKey: pk, }) if err != nil { return err } defer func() { if err := watcher.Close(); err != nil { internallog.I.Error("failed to close watcher", zap.Error(err)) } }() dat, err := os.ReadFile(jobPath) if err != nil { return err } var j sbatch.Job if err := yaml.Unmarshal(dat, &j); err != nil { return err } var jobNameB [32]byte copy(jobNameB[:], jobName) usesLabels := make([]types.Label, 0, len(uses.Value())) for _, use := range uses.Value() { if key, value, ok := strings.Cut(use, "="); ok { usesLabels = append(usesLabels, types.Label{ Key: key, Value: value, }) } } affinities := make([]types.Affinity, 0, len(affinitiesSlice.Value())) for _, affinity := range affinitiesSlice.Value() { k, v, op, err := parseKeyValueOperator(affinity) if err != nil { internallog.I.Error( "failed to parse", zap.String("affinity", affinity), zap.Error(err), ) return err } var opB [2]byte switch op { case ":in:": opB = [2]byte{'i', 'n'} case "=", "==": opB = [2]byte{'=', '='} default: copy(opB[:], op) } affinities = append(affinities, types.Affinity{ Label: metaschedulerabi.Label{ Key: k, Value: v, }, Op: opB, }) } curr, err := client.AllowanceManager.GetAllowance(ctx) if err != nil { return err } if err = client.AllowanceManager.SetAllowance(ctx, curr.Add(curr, credits)); err != nil { return err } if !watch { jobID, err := client.SubmitJob( ctx, &j, credits, jobNameB, job.WithUse(usesLabels...), job.WithAffinity(affinities...), ) if err != nil { return err } jobIDBig := new(big.Int).SetBytes(jobID[:]) fmt.Printf("job %s submitted\n", jobIDBig.String()) return nil } transitions := make(chan types.JobTransition, 1) sub, err := watcher.SubscribeEvents(ctx, event.FilterJobTransition(transitions)) if err != nil { return err } defer sub.Unsubscribe() jobID, err := client.SubmitJob(ctx, &j, credits, jobNameB, job.WithUse(usesLabels...)) if err != nil { return err } jobIDBig := new(big.Int).SetBytes(jobID[:]) fmt.Printf("---Waiting for job %s to be running...---\n", jobIDBig.String()) var finished = false var allocatedProviderAddress common.Address var provider provider.Detail msOrSchedLen, runningLen := int64(0), int64(0) loop: for { select { case tr := <-transitions: if (allocatedProviderAddress != common.Address{}) { jobs, err := client.GetJobsByProvider(ctx, allocatedProviderAddress) if err != nil { internallog.I.Warn("failed to fetch running jobs info", zap.Error(err)) } msLen, rLen := reduceJobsIntoRunningOrScheduledLens(jobs) if len(jobs) > 1 && msOrSchedLen > 1 && (msOrSchedLen != msLen || runningLen != rLen) { waitingTime, err := computeWaitingTime(jobID, provider, jobs) if err != nil { internallog.I.Fatal("failed to compute waiting time", zap.Error(err)) } fmt.Printf("(%d jobs in provider queue: %d waiting, %d running, wait ~%s)\n", len(jobs), msLen, rLen, waitingTime) } msOrSchedLen, runningLen = msLen, rLen } if bytes.Equal(jobID[:], tr.JobId[:]) { fmt.Printf("(Job is %s)\n", metascheduler.JobStatus(tr.To)) switch metascheduler.JobStatus(tr.To) { case metascheduler.JobStatusMetaScheduled, metascheduler.JobStatusScheduled: job, err := client.GetJob(ctx, jobID) if err != nil { internallog.I.Fatal("failed to fetch job info", zap.Error(err)) } allocatedProviderAddress = job.ProviderAddr jobs, err := client.GetJobsByProvider(ctx, allocatedProviderAddress) if err != nil { internallog.I.Warn("failed to fetch running jobs info", zap.Error(err)) } p, err := client.ProviderManager.GetProvider(ctx, allocatedProviderAddress) if err != nil { internallog.I.Fatal("failed to get provider info", zap.Error(err)) } provider = p msLen, rLen := reduceJobsIntoRunningOrScheduledLens(jobs) if len(jobs) > 1 && msOrSchedLen > 1 && (msOrSchedLen != msLen || runningLen != rLen) { waitingTime, err := computeWaitingTime(jobID, provider, jobs) if err != nil { internallog.I.Fatal("failed to compute waiting time", zap.Error(err)) } fmt.Printf("(%d jobs in provider queue: %d waiting, %d running, wait ~%s)\n", len(jobs), msLen, rLen, waitingTime) } msOrSchedLen, runningLen = msLen, rLen case metascheduler.JobStatusCancelled, metascheduler.JobStatusFailed, metascheduler.JobStatusFinished, metascheduler.JobStatusPanicked, metascheduler.JobStatusOutOfCredits: finished = true break loop case metascheduler.JobStatusRunning: finished = false break loop } } case err := <-sub.Err(): fmt.Printf("---Watching transitions has unexpectedly closed---\n%s\n", err) return err } } if exitOnJobExit { go func() { if !finished { cleanChan := make(chan os.Signal, 1) signal.Notify(cleanChan, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) go func() { <-cleanChan fmt.Printf("\nWARNING: Your job %s is still running.\n", jobIDBig.String()) os.Exit(1) }() _, err := waitUntilJobFinished(sub, transitions, jobID) if err != nil { fmt.Printf("---Watching transitions has unexpectedly closed---\n%s\n", err) os.Exit(1) } } job, err := client.GetJob(ctx, jobID) if err != nil { internallog.I.Fatal("failed to fetch job info", zap.Error(err)) } os.Exit(int(job.ExitCode / 256)) }() } stream, err := client.WatchLogs(ctx, jobID) if err != nil { fmt.Printf("---Watching logs has unexpectedly failed---\n%s\n", err) return err } defer func() { _ = stream.CloseSend() }() for { req, err := stream.Recv() if err == io.EOF || errors.Is(err, context.Canceled) { fmt.Println("---Connection to logging server closed---") return nil } if err != nil { fmt.Printf("---Connection to logging server closed unexpectedly---\n%s\n", err) return err } if noTimestamp { fmt.Printf("%s\n", string(req.GetData())) } else { fmt.Printf("%s:\t%s\n", time.Unix(0, req.GetTimestamp()), forbiddenReplacer.Replace(string(req.GetData()))) } } }, }
Command is the submit subcommand used to submit jobs.
Functions ¶
This section is empty.
Types ¶
This section is empty.
Click to show internal directories.
Click to hide internal directories.