worker

package
v0.0.0-...-5db00e0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 16, 2017 License: MIT Imports: 20 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DefaultWorker

type DefaultWorker struct {
	Conf       config.Worker
	Mapper     *FileMapper
	Store      storage.Storage
	TaskReader TaskReader
	Event      *events.TaskWriter
}

DefaultWorker is the default task worker, which follows a basic, sequential process of task initialization, execution, finalization, and logging.

func (*DefaultWorker) Close

func (r *DefaultWorker) Close() error

Close cleans up worker resources, e.g. closing the event writers.

func (*DefaultWorker) Run

func (r *DefaultWorker) Run(pctx context.Context)

Run runs the Worker. TODO document behavior of slow consumer of task log updates

type DockerCommand

type DockerCommand struct {
	Image           string
	Command         []string
	Volumes         []Volume
	Workdir         string
	ContainerName   string
	RemoveContainer bool
	Env             map[string]string
	Stdin           io.Reader
	Stdout          io.Writer
	Stderr          io.Writer
	Event           *events.ExecutorWriter
}

DockerCommand is responsible for configuring and running a docker container.

func (DockerCommand) Run

func (dcmd DockerCommand) Run() error

Run runs the Docker command and blocks until done.

func (DockerCommand) Stop

func (dcmd DockerCommand) Stop() error

Stop stops the container.

type FileMapper

type FileMapper struct {
	Volumes []Volume
	Inputs  []*tes.Input
	Outputs []*tes.Output
	// contains filtered or unexported fields
}

FileMapper is responsible for mapping paths into a working directory on the worker's host file system.

Every task needs it's own directory to work in. When a file is downloaded for a task, it needs to be stored in the task's working directory. Similar for task outputs, uploads, stdin/out/err, etc. FileMapper helps the worker engine manage all these paths.

func NewFileMapper

func NewFileMapper(dir string) *FileMapper

NewFileMapper returns a new FileMapper, which maps files into the given base directory.

func (*FileMapper) AddInput

func (mapper *FileMapper) AddInput(input *tes.Input) error

AddInput adds an input to the mapped files for the given tes.Input. A copy of the tes.Input will be added to mapper.Inputs, with the "Path" field updated to the mapped host path.

If the path can't be mapped an error is returned.

func (*FileMapper) AddOutput

func (mapper *FileMapper) AddOutput(output *tes.Output) error

AddOutput adds an output to the mapped files for the given tes.Output. A copy of the tes.Output will be added to mapper.Outputs, with the "Path" field updated to the mapped host path.

If the path can't be mapped, an error is returned.

func (*FileMapper) AddTmpVolume

func (mapper *FileMapper) AddTmpVolume(mountPoint string) error

AddTmpVolume creates a directory on the host based on the declared path in the container and adds it to mapper.Volumes.

If the path can't be mapped, an error is returned.

func (*FileMapper) AddVolume

func (mapper *FileMapper) AddVolume(hostPath string, mountPoint string, readonly bool) error

AddVolume adds a mapped volume to the mapper. A corresponding Volume record is added to mapper.Volumes.

If the volume paths are invalid or can't be mapped, an error is returned.

func (*FileMapper) ContainerPath

func (mapper *FileMapper) ContainerPath(src string) string

ContainerPath returns an unmapped path.

The mapper's base dir is stripped from the path. e.g. If the mapper is configured with a base dir of "/tmp/mapped_files", then mapper.ContainerPath("/tmp/mapped_files/home/ubuntu/myfile") will return "/home/ubuntu/myfile".

func (*FileMapper) CreateHostFile

func (mapper *FileMapper) CreateHostFile(src string) (*os.File, error)

CreateHostFile creates a file on the host file system at a mapped path. "src" is an unmapped path. This function will handle mapping the path.

This function calls os.Create

If the path can't be mapped or the file can't be created, an error is returned.

func (*FileMapper) HostPath

func (mapper *FileMapper) HostPath(src string) (string, error)

HostPath returns a mapped path.

The path is concatenated to the mapper's base dir. e.g. If the mapper is configured with a base dir of "/tmp/mapped_files", then mapper.HostPath("/home/ubuntu/myfile") will return "/tmp/mapped_files/home/ubuntu/myfile".

The mapped path is required to be a subpath of the mapper's base directory. e.g. mapper.HostPath("../../foo") should fail with an error.

func (*FileMapper) IsSubpath

func (mapper *FileMapper) IsSubpath(p string, base string) bool

IsSubpath returns true if the given path "p" is a subpath of "base".

func (*FileMapper) MapTask

func (mapper *FileMapper) MapTask(task *tes.Task) error

MapTask adds all the volumes, inputs, and outputs in the given Task to the FileMapper.

func (*FileMapper) OpenHostFile

func (mapper *FileMapper) OpenHostFile(src string) (*os.File, error)

OpenHostFile opens a file on the host file system at a mapped path. "src" is an unmapped path. This function will handle mapping the path.

This function calls os.Open

If the path can't be mapped or the file can't be opened, an error is returned.

type GenericTaskReader

type GenericTaskReader struct {
	// contains filtered or unexported fields
}

GenericTaskReader provides read access to tasks.

func NewGenericTaskReader

func NewGenericTaskReader(get func(ctx context.Context, in *tes.GetTaskRequest) (*tes.Task, error), taskID string) *GenericTaskReader

NewGenericTaskReader returns a new generic task reader.

func (*GenericTaskReader) State

func (r *GenericTaskReader) State() (tes.State, error)

State returns the current state of the task.

func (*GenericTaskReader) Task

func (r *GenericTaskReader) Task() (*tes.Task, error)

Task returns the task descriptor.

type NoopWorker

type NoopWorker struct{}

NoopWorker is useful during testing for creating a worker with a Worker that doesn't do anything.

func (NoopWorker) Close

func (NoopWorker) Close() error

Close is a noop.

func (NoopWorker) Run

func (NoopWorker) Run(context.Context)

Run doesn't do anything, it's an empty function.

type RPCTaskReader

type RPCTaskReader struct {
	// contains filtered or unexported fields
}

RPCTaskReader provides read access to tasks from the funnel server over gRPC.

func NewRPCTaskReader

func NewRPCTaskReader(conf config.RPC, taskID string) (*RPCTaskReader, error)

NewRPCTaskReader returns a new RPC-based task reader.

func (*RPCTaskReader) State

func (r *RPCTaskReader) State() (tes.State, error)

State returns the current state of the task.

func (*RPCTaskReader) Task

func (r *RPCTaskReader) Task() (*tes.Task, error)

Task returns the task descriptor.

type TaskReader

type TaskReader interface {
	Task() (*tes.Task, error)
	State() (tes.State, error)
}

TaskReader is a type which reads task information during task execution.

type Volume

type Volume struct {
	// The path in tes worker.
	HostPath string
	// The path in Docker.
	ContainerPath string
	Readonly      bool
}

Volume represents a volume mounted into a docker container. This includes a HostPath, the path on the host file system, and a ContainerPath, the path on the container file system, and whether the volume is read-only.

type Worker

type Worker interface {
	Run(context.Context)
	Close() error
}

Worker is a type which runs a task.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL