Documentation ¶
Index ¶
- type DefaultWorker
- type DockerCommand
- type FileMapper
- func (mapper *FileMapper) AddInput(input *tes.Input) error
- func (mapper *FileMapper) AddOutput(output *tes.Output) error
- func (mapper *FileMapper) AddTmpVolume(mountPoint string) error
- func (mapper *FileMapper) AddVolume(hostPath string, mountPoint string, readonly bool) error
- func (mapper *FileMapper) ContainerPath(src string) string
- func (mapper *FileMapper) CreateHostFile(src string) (*os.File, error)
- func (mapper *FileMapper) HostPath(src string) (string, error)
- func (mapper *FileMapper) IsSubpath(p string, base string) bool
- func (mapper *FileMapper) MapTask(task *tes.Task) error
- func (mapper *FileMapper) OpenHostFile(src string) (*os.File, error)
- type GenericTaskReader
- type NoopWorker
- type RPCTaskReader
- type TaskReader
- type Volume
- type Worker
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DefaultWorker ¶
type DefaultWorker struct { Conf config.Worker Mapper *FileMapper Store storage.Storage TaskReader TaskReader Event *events.TaskWriter }
DefaultWorker is the default task worker, which follows a basic, sequential process of task initialization, execution, finalization, and logging.
func (*DefaultWorker) Close ¶
func (r *DefaultWorker) Close() error
Close cleans up worker resources, e.g. closing the event writers.
func (*DefaultWorker) Run ¶
func (r *DefaultWorker) Run(pctx context.Context)
Run runs the Worker. TODO document behavior of slow consumer of task log updates
type DockerCommand ¶
type DockerCommand struct { Image string Command []string Volumes []Volume Workdir string ContainerName string RemoveContainer bool Env map[string]string Stdin io.Reader Stdout io.Writer Stderr io.Writer Event *events.ExecutorWriter }
DockerCommand is responsible for configuring and running a docker container.
func (DockerCommand) Run ¶
func (dcmd DockerCommand) Run() error
Run runs the Docker command and blocks until done.
type FileMapper ¶
type FileMapper struct { Volumes []Volume Inputs []*tes.Input Outputs []*tes.Output // contains filtered or unexported fields }
FileMapper is responsible for mapping paths into a working directory on the worker's host file system.
Every task needs it's own directory to work in. When a file is downloaded for a task, it needs to be stored in the task's working directory. Similar for task outputs, uploads, stdin/out/err, etc. FileMapper helps the worker engine manage all these paths.
func NewFileMapper ¶
func NewFileMapper(dir string) *FileMapper
NewFileMapper returns a new FileMapper, which maps files into the given base directory.
func (*FileMapper) AddInput ¶
func (mapper *FileMapper) AddInput(input *tes.Input) error
AddInput adds an input to the mapped files for the given tes.Input. A copy of the tes.Input will be added to mapper.Inputs, with the "Path" field updated to the mapped host path.
If the path can't be mapped an error is returned.
func (*FileMapper) AddOutput ¶
func (mapper *FileMapper) AddOutput(output *tes.Output) error
AddOutput adds an output to the mapped files for the given tes.Output. A copy of the tes.Output will be added to mapper.Outputs, with the "Path" field updated to the mapped host path.
If the path can't be mapped, an error is returned.
func (*FileMapper) AddTmpVolume ¶
func (mapper *FileMapper) AddTmpVolume(mountPoint string) error
AddTmpVolume creates a directory on the host based on the declared path in the container and adds it to mapper.Volumes.
If the path can't be mapped, an error is returned.
func (*FileMapper) AddVolume ¶
func (mapper *FileMapper) AddVolume(hostPath string, mountPoint string, readonly bool) error
AddVolume adds a mapped volume to the mapper. A corresponding Volume record is added to mapper.Volumes.
If the volume paths are invalid or can't be mapped, an error is returned.
func (*FileMapper) ContainerPath ¶
func (mapper *FileMapper) ContainerPath(src string) string
ContainerPath returns an unmapped path.
The mapper's base dir is stripped from the path. e.g. If the mapper is configured with a base dir of "/tmp/mapped_files", then mapper.ContainerPath("/tmp/mapped_files/home/ubuntu/myfile") will return "/home/ubuntu/myfile".
func (*FileMapper) CreateHostFile ¶
func (mapper *FileMapper) CreateHostFile(src string) (*os.File, error)
CreateHostFile creates a file on the host file system at a mapped path. "src" is an unmapped path. This function will handle mapping the path.
This function calls os.Create ¶
If the path can't be mapped or the file can't be created, an error is returned.
func (*FileMapper) HostPath ¶
func (mapper *FileMapper) HostPath(src string) (string, error)
HostPath returns a mapped path.
The path is concatenated to the mapper's base dir. e.g. If the mapper is configured with a base dir of "/tmp/mapped_files", then mapper.HostPath("/home/ubuntu/myfile") will return "/tmp/mapped_files/home/ubuntu/myfile".
The mapped path is required to be a subpath of the mapper's base directory. e.g. mapper.HostPath("../../foo") should fail with an error.
func (*FileMapper) IsSubpath ¶
func (mapper *FileMapper) IsSubpath(p string, base string) bool
IsSubpath returns true if the given path "p" is a subpath of "base".
func (*FileMapper) MapTask ¶
func (mapper *FileMapper) MapTask(task *tes.Task) error
MapTask adds all the volumes, inputs, and outputs in the given Task to the FileMapper.
func (*FileMapper) OpenHostFile ¶
func (mapper *FileMapper) OpenHostFile(src string) (*os.File, error)
OpenHostFile opens a file on the host file system at a mapped path. "src" is an unmapped path. This function will handle mapping the path.
This function calls os.Open ¶
If the path can't be mapped or the file can't be opened, an error is returned.
type GenericTaskReader ¶
type GenericTaskReader struct {
// contains filtered or unexported fields
}
GenericTaskReader provides read access to tasks.
func NewGenericTaskReader ¶
func NewGenericTaskReader(get func(ctx context.Context, in *tes.GetTaskRequest) (*tes.Task, error), taskID string) *GenericTaskReader
NewGenericTaskReader returns a new generic task reader.
type NoopWorker ¶
type NoopWorker struct{}
NoopWorker is useful during testing for creating a worker with a Worker that doesn't do anything.
func (NoopWorker) Run ¶
func (NoopWorker) Run(context.Context)
Run doesn't do anything, it's an empty function.
type RPCTaskReader ¶
type RPCTaskReader struct {
// contains filtered or unexported fields
}
RPCTaskReader provides read access to tasks from the funnel server over gRPC.
func NewRPCTaskReader ¶
func NewRPCTaskReader(conf config.RPC, taskID string) (*RPCTaskReader, error)
NewRPCTaskReader returns a new RPC-based task reader.
type TaskReader ¶
TaskReader is a type which reads task information during task execution.
type Volume ¶
type Volume struct { // The path in tes worker. HostPath string // The path in Docker. ContainerPath string Readonly bool }
Volume represents a volume mounted into a docker container. This includes a HostPath, the path on the host file system, and a ContainerPath, the path on the container file system, and whether the volume is read-only.