Documentation ¶
Index ¶
- Constants
- Variables
- type Job
- type Manager
- func (m *Manager) OutputStream(ctx context.Context, username string, jobID string) (<-chan []byte, error)
- func (m *Manager) Start(ctx context.Context, username string, cmd string, args ...string) (string, error)
- func (m *Manager) Status(ctx context.Context, username string, jobID string) (jogv1.Status, error)
- func (m *Manager) Stop(ctx context.Context, username string, jobID string) error
- type OutputStreamer
- type OutputStreamerOption
Constants ¶
const CommandWaitDelay = 10 * time.Second
CommandWaitDelay is the amount of time to wait for a canceled Job to shut down before sending a SIGKILL
Variables ¶
var ErrJobNotFound = fmt.Errorf("job not found")
var ErrOutputStreamerClosed = errors.New("output streamer is closed")
Functions ¶
This section is empty.
Types ¶
type Job ¶
type Job struct {
// contains filtered or unexported fields
}
func StartNewJob ¶
func StartNewJob(shutdownCtx context.Context, cgroupFD int, name string, args ...string) (*Job, error)
StartNewJob creates a new job, starts it, and returns a reference to it If the underlying cmd.Start() call fails, an error is returned as well as a nil pointer to ensure that the job is thrown away. This ensures that callers cannot call exported methods on jobs that cannot be started.
func (*Job) OutputStream ¶
OutputStream returns a channel that streams the output of the job
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager is a job manager that keeps track of jobs by username and jobID. It also holds a context that the server uses to stop all jobs when during shut down
func NewManager ¶
NewManager creates a new Manager
func (*Manager) OutputStream ¶
func (*Manager) Start ¶
func (m *Manager) Start(ctx context.Context, username string, cmd string, args ...string) (string, error)
Start starts a new job and returns the jobID
func (*Manager) Status ¶
Status gets the status of a job Because stop signals are eventually respected, the internal state of a job process may not yet be reflected in the status. Eventually consistency is guaranteed, though, and delays mostly depend on the CommandWaitDelay constant in the job package.
type OutputStreamer ¶
type OutputStreamer struct {
// contains filtered or unexported fields
}
A OutputStreamer is an io.Writer that collects data written to it and fans it out to clients who want to read that data as a stream. Callers of NewStream() are provided a channel that will receive all data written since the streamer was created.
When the context passed to NewStream() is canceled, the channel will be closed immediately without writing any further data.
OutputStreamer also implements the io.Closer interface. Closing an OutputStreamer means that we don't expect any more data to be written to it. After an OutputStreamer instance is closed, any calls to Write() will return an error. And channels returned from NewStream() will be closed after all data has been written to them.
func NewOutputStreamer ¶
func NewOutputStreamer(options ...OutputStreamerOption) *OutputStreamer
func (*OutputStreamer) CloseWriter ¶
func (o *OutputStreamer) CloseWriter()
func (*OutputStreamer) NewStream ¶
func (o *OutputStreamer) NewStream(ctx context.Context) <-chan []byte
NewStream returns a channel that will receive all data written to the OutputStreamer. When a job is running and writing data to the OutputStreamer, the channel will receive data in chunks of, at most, streamMessageSize bytes.
The reader is configured to check for new data at least once per second. When there is new data, it catches up to the end of stream without waiting.
When the job exits, the OutputStreamer is closed to writes, but the data remains available to NewStream() callers until the server is shutdown.
func (*OutputStreamer) Next ¶
func (o *OutputStreamer) Next(index int) []byte
Next returns the next chunk of data to be read from the OutputStreamer. Note: no copies of the data are made, so the caller should not modify the returned slice. This design enables large output buffers to be read by many clients without incurring the cost of copying the data.
type OutputStreamerOption ¶
type OutputStreamerOption func(*OutputStreamer)
func WithStreamMessageSize ¶
func WithStreamMessageSize(size int) OutputStreamerOption