Documentation ¶
Overview ¶
Package run contains machinery for working with runnable tasks and objects which control tasks.
Index ¶
- Variables
- func IsCompilerError(err error) bool
- func RunAsync(t Task) <-chan error
- func RunWait(t Task) error
- type ArgParser
- type CompilerError
- type Controller
- type DelegatingExecutor
- type ExecCommandTask
- type Executor
- type ExecutorOption
- type ExecutorOptions
- type ExecutorStatus
- type NoRunnerForKind
- type PackagedRequest
- type PairContext
- type ProcessOptions
- type QueuedExecutor
- func (x *QueuedExecutor) CompleteTaskStatus(stat *metrics.TaskStatus)
- func (x *QueuedExecutor) CompleteUsageLimits(stat *metrics.UsageLimits)
- func (x *QueuedExecutor) Exec(task Task) error
- func (x *QueuedExecutor) ExecAsync(task Task) <-chan error
- func (x *QueuedExecutor) SetUsageLimits(cfg *metrics.UsageLimits)
- type RequestManager
- type Resizer
- type ResizerManager
- type ResultOptions
- type SchedulerClientStream
- type StoreAddFunc
- type Task
- type TaskOption
- func InPlace(inPlace bool) TaskOption
- func WithArgs(args []string) TaskOption
- func WithContext(ctx context.Context) TaskOption
- func WithEnv(env []string) TaskOption
- func WithLog(lg *zap.SugaredLogger) TaskOption
- func WithOutputStreams(stdout, stderr io.Writer) TaskOption
- func WithOutputVar(v interface{}) TaskOption
- func WithOutputWriter(w io.Writer) TaskOption
- func WithStdin(stdin io.Reader) TaskOption
- func WithUidGid(uid, gid uint32) TaskOption
- func WithWorkDir(dir string) TaskOption
- type TaskOptions
- type ToolchainController
- type ToolchainRunnerStore
- type UpstreamQueue
- type WorkerPool
- type WorkerPoolOption
- type WorkerPoolOptions
Constants ¶
This section is empty.
Variables ¶
var ErrNoAgentsRetry = errors.New("No agents available to handle the request; retrying")
var ErrNoAgentsRunLocal = errors.New("No agents available to handle the request; running locally")
var (
ErrUnsupportedTask = errors.New("Task not supported")
)
Functions ¶
func IsCompilerError ¶
func RunAsync ¶
RunAsync will run the given task and return a channel which will eventually contain the task's error value, then the channel will be closed. Tasks are responsible for their own cancellation. If a task should be canceled, it should take the necessary actions and return the error context.Canceled.
Types ¶
type ArgParser ¶
ArgParser is a high-level interface that represents an object capable of parsing a set of command-line arguments, and determining whether the associated request can be run remotely based on the arguments given. The concrete object should manage its own data, but should not perform any parsing until the Parse function is called. CanRunRemote will always be called after Parse.
type CompilerError ¶
type CompilerError struct {
// contains filtered or unexported fields
}
func NewCompilerError ¶
func NewCompilerError(text string) *CompilerError
func (*CompilerError) Error ¶
func (e *CompilerError) Error() string
type Controller ¶
type Controller interface { // With transforms the Controller into a ToolchainController by providing // it with a concrete toolchain instance. With(*types.Toolchain) ToolchainController }
Controller represents an object that can control requests for a particular toolchain, when provided with a concrete instance of such a toolchain with parameters set correctly for the host.
type DelegatingExecutor ¶
type DelegatingExecutor struct {
// contains filtered or unexported fields
}
DelegatingExecutor is an executor that does not run a worker pool, runs all tasks as soon as possible, and is always available. It will report that all of its tasks are Delegated, and will not report counts for queued or running tasks.
func NewDelegatingExecutor ¶
func NewDelegatingExecutor() *DelegatingExecutor
func (*DelegatingExecutor) CompleteTaskStatus ¶
func (x *DelegatingExecutor) CompleteTaskStatus(stat *metrics.TaskStatus)
func (*DelegatingExecutor) CompleteUsageLimits ¶
func (x *DelegatingExecutor) CompleteUsageLimits(stat *metrics.UsageLimits)
func (*DelegatingExecutor) Exec ¶
func (x *DelegatingExecutor) Exec(task Task) error
func (*DelegatingExecutor) ExecAsync ¶
func (x *DelegatingExecutor) ExecAsync(task Task) <-chan error
type ExecCommandTask ¶
type ExecCommandTask struct { TaskOptions util.NullableError // contains filtered or unexported fields }
func (*ExecCommandTask) Run ¶
func (t *ExecCommandTask) Run()
type Executor ¶
type Executor interface { metrics.UsageLimitsCompleter metrics.TaskStatusCompleter Exec(task Task) error }
An Executor is an object which runs tasks.
type ExecutorOption ¶
type ExecutorOption func(*ExecutorOptions)
func WithUsageLimits ¶
func WithUsageLimits(cfg *metrics.UsageLimits) ExecutorOption
type ExecutorOptions ¶
type ExecutorOptions struct {
UsageLimits *metrics.UsageLimits
}
func (*ExecutorOptions) Apply ¶
func (o *ExecutorOptions) Apply(opts ...ExecutorOption)
type ExecutorStatus ¶
type ExecutorStatus int
type NoRunnerForKind ¶
type NoRunnerForKind struct{}
func (NoRunnerForKind) Error ¶
func (e NoRunnerForKind) Error() string
type PackagedRequest ¶
type PackagedRequest struct { util.NullableError // contains filtered or unexported fields }
PackagedRequest is a runnable closure which can invoke a RequestManager's Process method with the provided arguments when desired.
func PackageRequest ¶
func PackageRequest( rm RequestManager, ctx PairContext, request interface{}, ) PackagedRequest
PackageRequest creates a PackagedRequest object and returns it. It does not run the request.
func (*PackagedRequest) Response ¶
func (pr *PackagedRequest) Response() chan interface{}
func (*PackagedRequest) Run ¶
func (pr *PackagedRequest) Run()
Invoke will run the PackagedRequest by calling the packaged RequestManager's Process method with the arguments given at the time of its creation.
type PairContext ¶
PairContext is a pair of client and server contexts.
func (PairContext) Done ¶
func (pc PairContext) Done() <-chan struct{}
func (PairContext) Err ¶
func (pc PairContext) Err() error
func (PairContext) Value ¶
func (pc PairContext) Value(key interface{}) interface{}
type ProcessOptions ¶
type QueuedExecutor ¶
type QueuedExecutor struct { ExecutorOptions // contains filtered or unexported fields }
func NewQueuedExecutor ¶
func NewQueuedExecutor(opts ...ExecutorOption) *QueuedExecutor
func (*QueuedExecutor) CompleteTaskStatus ¶
func (x *QueuedExecutor) CompleteTaskStatus(stat *metrics.TaskStatus)
func (*QueuedExecutor) CompleteUsageLimits ¶
func (x *QueuedExecutor) CompleteUsageLimits(stat *metrics.UsageLimits)
func (*QueuedExecutor) Exec ¶
func (x *QueuedExecutor) Exec(task Task) error
func (*QueuedExecutor) ExecAsync ¶
func (x *QueuedExecutor) ExecAsync(task Task) <-chan error
func (*QueuedExecutor) SetUsageLimits ¶
func (x *QueuedExecutor) SetUsageLimits(cfg *metrics.UsageLimits)
type RequestManager ¶
type RequestManager interface { // Process consumes the request and blocks until it is complete, returning // a matching response object and an error. Errors returned from this function // do not indicate the request has failed (where "failure" is specific to and // defined by the request itself), rather it would indicate that the request // could not be completed, either due to a network error, an internal error, // or a similar issue. Responses should encode success or failure within // the response type itself. Process(ctx PairContext, request interface{}) (response interface{}, err error) }
RequestManager represents an entity that is responsible for the entire lifecycle of a request (right now either a RunRequest or CompileRequest) by creating and running tasks.
type Resizer ¶
type Resizer interface { // Resize should set the target number of contained resources to the given // value. It should block until the resize operation is complete. Resize(int64) }
A Resizer is an object that is able to dynamically resize its contained resources.
type ResizerManager ¶
type ResizerManager interface {
Manage(Resizer)
}
A ResizerManager is an object that can "take ownership" of a Resizer and be expected to manage its resource count.
type ResultOptions ¶
type SchedulerClientStream ¶
type SchedulerClientStream interface { LoadNewStream(types.Scheduler_StreamOutgoingTasksClient) Compile(*types.CompileRequest) (*types.CompileResponse, error) }
type StoreAddFunc ¶
type StoreAddFunc func(*ToolchainRunnerStore)
type Task ¶
type Task interface { // Run will run the task. It will block until the task is completed. Run() // Err will return the task's error value once it has completed. // If called before Run() returns, it should panic. Err() error }
Task represents a single runnable action.
func NewExecCommandTask ¶
func NewExecCommandTask(tc *types.Toolchain, opts ...TaskOption) Task
type TaskOption ¶
type TaskOption func(*TaskOptions)
func InPlace ¶
func InPlace(inPlace bool) TaskOption
func WithArgs ¶
func WithArgs(args []string) TaskOption
func WithContext ¶
func WithContext(ctx context.Context) TaskOption
func WithEnv ¶
func WithEnv(env []string) TaskOption
func WithLog ¶
func WithLog(lg *zap.SugaredLogger) TaskOption
func WithOutputStreams ¶
func WithOutputStreams(stdout, stderr io.Writer) TaskOption
func WithOutputVar ¶
func WithOutputVar(v interface{}) TaskOption
func WithOutputWriter ¶
func WithOutputWriter(w io.Writer) TaskOption
func WithStdin ¶
func WithStdin(stdin io.Reader) TaskOption
func WithUidGid ¶
func WithUidGid(uid, gid uint32) TaskOption
func WithWorkDir ¶
func WithWorkDir(dir string) TaskOption
type TaskOptions ¶
type TaskOptions struct { ProcessOptions ResultOptions Context context.Context Log *zap.SugaredLogger }
func (*TaskOptions) Apply ¶
func (o *TaskOptions) Apply(opts ...TaskOption)
type ToolchainController ¶
type ToolchainController interface { // RunLocal returns a RequestManager which can handle a request locally // without dispatching any or all of its tasks to a remote agent. RunLocal(ArgParser) RequestManager // SendRemote returns a RequestManager which will dispatch some or all of // its tasks to be processed by a remote agent. This RequestManager is // only responsible for local tasks associated with the request (if any), // and sending/waiting on tasks using the provided client. SendRemote(ArgParser, SchedulerClientStream) RequestManager // RecvRemote returns a RequestManager which will run its tasks locally // under the assumption that it is running them on behalf of a consumer // somewhere else on the network. RecvRemote() RequestManager // NewArgParser returns a new concrete ArgParser for this toolchain. NewArgParser(ctx context.Context, args []string) ArgParser }
A ToolchainController is an object capable of managing the entire lifecycle of requests for a given toolchain.
type ToolchainRunnerStore ¶
type ToolchainRunnerStore struct {
// contains filtered or unexported fields
}
func NewToolchainRunnerStore ¶
func NewToolchainRunnerStore() *ToolchainRunnerStore
func (*ToolchainRunnerStore) Add ¶
func (s *ToolchainRunnerStore) Add( kind types.ToolchainKind, runner ToolchainController, )
func (*ToolchainRunnerStore) Get ¶
func (s *ToolchainRunnerStore) Get( kind types.ToolchainKind, ) (ToolchainController, error)
type UpstreamQueue ¶
func SingularQueue ¶
func SingularQueue(ch chan Task) UpstreamQueue
type WorkerPool ¶
type WorkerPool struct { *util.PauseController // contains filtered or unexported fields }
WorkerPool is a dynamic pool of worker goroutines which run on a shared task queue. The number of workers can be changed at any time, and the stream itself can be paused and unpaused, which can be used to temporarily stop/start all workers.
func NewWorkerPool ¶
func NewWorkerPool(taskQueue UpstreamQueue, opts ...WorkerPoolOption) *WorkerPool
NewWorkerPool creates a new WorkerPool with the provided task queue.
func (*WorkerPool) Resize ¶
func (wp *WorkerPool) Resize(count int64)
Resize sets the target number of workers that should be running in the pool. When decreasing the number of workers, only workers which are not currently running a task will be stopped. If all workers are busy, the pool will stop the next available workers when they have finished their current task.
func (*WorkerPool) Size ¶
func (wp *WorkerPool) Size() int64
type WorkerPoolOption ¶
type WorkerPoolOption func(*WorkerPoolOptions)
func DefaultPaused ¶
func DefaultPaused() WorkerPoolOption
DefaultPaused indicates the worker pool should start in the paused state. This should be used instead of starting the pool and immediately pausing it to avoid race conditions.
func WithRunner ¶
func WithRunner(f func(Task)) WorkerPoolOption
WithRunner sets the function that will be run by a worker when processing a task.
type WorkerPoolOptions ¶
type WorkerPoolOptions struct {
// contains filtered or unexported fields
}
func (*WorkerPoolOptions) Apply ¶
func (o *WorkerPoolOptions) Apply(opts ...WorkerPoolOption)