job

package
v0.2.9-rc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 8, 2024 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var Module = module.Descriptor{
	Kind: "job",
	Dependencies: map[string]string{
		driver.KeyKubeDependency: kubernetes.Module.Kind,
	},
	Actions: []module.ActionDesc{
		{
			Name:        module.CreateAction,
			Description: "Creates a new Kube job.",
		},
		{
			Name:        driver.SuspendAction,
			Description: "Suspend the kube Job.",
		},
		{
			Name:        driver.StartAction,
			Description: "Start the kube Job.",
		},
		{
			Name:        module.DeleteAction,
			Description: "Delete the kube Job.",
		},
	},
	DriverFactory: func(confJSON json.RawMessage) (module.Driver, error) {
		conf := defaultDriverConf
		if err := json.Unmarshal(confJSON, &conf); err != nil {
			return nil, err
		} else if err := validator.TaggedStruct(conf); err != nil {
			return nil, err
		}
		return &driver.Driver{
			Conf: conf,
			CreateJob: func(ctx context.Context, conf kube.Config, j *job.Job) error {
				kubeCl, err := kube.NewClient(ctx, conf)
				if err != nil {
					return errors.ErrInternal.WithMsgf("failed to create new kube client on job driver").WithCausef(err.Error())
				}
				processor, err := kubeCl.GetJobProcessor(j)
				if err != nil {
					return err
				}
				return processor.SubmitJob()
			},
			SuspendJob: func(ctx context.Context, conf kube.Config, j *job.Job) error {
				kubeCl, err := kube.NewClient(ctx, conf)
				if err != nil {
					return errors.ErrInternal.WithMsgf("failed to suspend the job").WithCausef(err.Error())
				}
				processor, err := kubeCl.GetJobProcessor(j)
				if err != nil {
					return err
				}
				return processor.UpdateJob(true)
			},
			DeleteJob: func(ctx context.Context, conf kube.Config, j *job.Job) error {
				kubeCl, err := kube.NewClient(ctx, conf)
				if err != nil {
					return errors.ErrInternal.WithMsgf("failed to delete the job").WithCausef(err.Error())
				}
				processor, err := kubeCl.GetJobProcessor(j)
				if err != nil {
					return err
				}
				return processor.DeleteJob()
			},
			StartJob: func(ctx context.Context, conf kube.Config, j *job.Job) error {
				kubeCl, err := kube.NewClient(ctx, conf)
				if err != nil {
					return errors.ErrInternal.WithMsgf("failed to start the job").WithCausef(err.Error())
				}
				processor, err := kubeCl.GetJobProcessor(j)
				if err != nil {
					return err
				}
				return processor.UpdateJob(false)
			},
			GetJobPods: func(ctx context.Context, kubeConf kube.Config, j *job.Job, labels map[string]string) ([]kube.Pod, error) {
				kubeCl, err := kube.NewClient(ctx, kubeConf)
				if err != nil {
					return nil, errors.ErrInternal.WithMsgf("failed to create new kube client on driver").WithCausef(err.Error())
				}
				return kubeCl.GetPodDetails(ctx, j.Namespace, labels, func(pod v1.Pod) bool {

					return true
				})
			},
			StreamLogs: func(ctx context.Context, kubeConf kube.Config, j *job.Job, filter map[string]string) (<-chan module.LogChunk, error) {
				kubeCl, err := kube.NewClient(ctx, kubeConf)
				if err != nil {
					return nil, errors.ErrInternal.WithMsgf("failed to create new kube client on firehose driver Log").WithCausef(err.Error())
				}

				logs, err := kubeCl.StreamLogs(ctx, j.Namespace, filter)
				if err != nil {
					return nil, err
				}

				mappedLogs := make(chan module.LogChunk)
				go func() {
					defer close(mappedLogs)
					for {
						select {
						case log, ok := <-logs:
							if !ok {
								return
							}
							mappedLogs <- module.LogChunk{Data: log.Data, Labels: log.Labels}
						case <-ctx.Done():
							return
						}
					}
				}()
				return mappedLogs, err
			},
		}, nil
	},
}

Functions

This section is empty.

Types

This section is empty.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL