Documentation ¶
Index ¶
- Constants
- Variables
- func IsComplete(workState int) bool
- func IsPending(err error) bool
- func WorkStateToString(workState int) string
- type BaseWorkUnit
- func (bwu *BaseWorkUnit) CancelContext()
- func (bwu *BaseWorkUnit) Debug(format string, v ...interface{})
- func (bwu *BaseWorkUnit) Error(format string, v ...interface{})
- func (bwu *BaseWorkUnit) ID() string
- func (bwu *BaseWorkUnit) Info(format string, v ...interface{})
- func (bwu *BaseWorkUnit) Init(w *Workceptor, unitID string, workType string, fs FileSystemer, ...)
- func (bwu *BaseWorkUnit) LastUpdateError() error
- func (bwu *BaseWorkUnit) Load() error
- func (bwu *BaseWorkUnit) MonitorLocalStatus()
- func (bwu *BaseWorkUnit) Release(force bool) error
- func (bwu *BaseWorkUnit) Save() error
- func (bwu *BaseWorkUnit) SetFromParams(_ map[string]string) error
- func (bwu *BaseWorkUnit) Status() *StatusFileData
- func (bwu *BaseWorkUnit) StatusFileName() string
- func (bwu *BaseWorkUnit) StdoutFileName() string
- func (bwu *BaseWorkUnit) UnitDir() string
- func (bwu *BaseWorkUnit) UnredactedStatus() *StatusFileData
- func (bwu *BaseWorkUnit) UpdateBasicStatus(state int, detail string, stdoutSize int64)
- func (bwu *BaseWorkUnit) UpdateFullStatus(statusFunc func(*StatusFileData))
- func (bwu *BaseWorkUnit) Warning(format string, v ...interface{})
- type CommandWorkerCfg
- type FileReadCloser
- type FileSystem
- type FileSystemer
- type FileWriteCloser
- type KubeWorkerCfg
- type NetceptorForWorkceptor
- type NewWorkerFunc
- type RealWatcher
- type STDinReader
- type STDoutWriter
- type SigningKeyPrivateCfg
- type StatusFileData
- func (sfd *StatusFileData) Load(filename string) error
- func (sfd *StatusFileData) Save(filename string) error
- func (sfd *StatusFileData) UpdateBasicStatus(filename string, state int, detail string, stdoutSize int64) error
- func (sfd *StatusFileData) UpdateFullStatus(filename string, statusFunc func(*StatusFileData)) error
- type VerifyingKeyPublicCfg
- type WatcherWrapper
- type WorkUnit
- type Workceptor
- func (w *Workceptor) AllocateRemoteUnit(remoteNode, remoteWorkType, tlsClient, ttl string, signWork bool, ...) (WorkUnit, error)
- func (w *Workceptor) AllocateUnit(workTypeName string, params map[string]string) (WorkUnit, error)
- func (w *Workceptor) CancelUnit(unitID string) error
- func (w *Workceptor) GetResults(ctx context.Context, unitID string, startPos int64) (chan []byte, error)
- func (w *Workceptor) ListKnownUnitIDs() []string
- func (w *Workceptor) RegisterWithControlService(cs *controlsvc.Server) error
- func (w *Workceptor) RegisterWorker(typeName string, newWorkerFunc NewWorkerFunc, verifySignature bool) error
- func (w *Workceptor) ReleaseUnit(unitID string, force bool) error
- func (w *Workceptor) ShouldVerifySignature(workType string, signWork bool) bool
- func (w *Workceptor) StartUnit(unitID string) error
- func (w *Workceptor) UnitStatus(unitID string) (*StatusFileData, error)
- func (w *Workceptor) VerifySignature(signature string) error
- type WorkerConfig
Constants ¶
const ( SuccessWorkSleep = 1 * time.Second // Normal time to wait between checks MaxWorkSleep = 1 * time.Minute // Max time to ever wait between checks )
Work sleep constants.
const ( WorkStatePending = 0 WorkStateRunning = 1 WorkStateSucceeded = 2 WorkStateFailed = 3 WorkStateCanceled = 4 )
Work state constants.
Variables ¶
var ErrImagePullBackOff = fmt.Errorf("container failed to start")
ErrImagePullBackOff is returned when the image for the container in the Pod cannot be pulled.
var ErrPending = fmt.Errorf("operation pending")
ErrPending is returned when an operation hasn't succeeded or failed yet.
var ErrPodCompleted = fmt.Errorf("pod ran to completion")
ErrPodCompleted is returned when pod has already completed before we could attach.
var ErrPodFailed = fmt.Errorf("pod failed to start")
ErrPodFailed is returned when pod has failed before we could attach.
Functions ¶
func IsComplete ¶
IsComplete returns true if a given WorkState indicates the job is finished.
func WorkStateToString ¶
WorkStateToString returns a string representation of a WorkState.
Types ¶
type BaseWorkUnit ¶
type BaseWorkUnit struct {
// contains filtered or unexported fields
}
BaseWorkUnit includes data common to all work units, and partially implements the WorkUnit interface.
func (*BaseWorkUnit) CancelContext ¶
func (bwu *BaseWorkUnit) CancelContext()
func (*BaseWorkUnit) Debug ¶
func (bwu *BaseWorkUnit) Debug(format string, v ...interface{})
Debug logs message with unitID prepended.
func (*BaseWorkUnit) Error ¶
func (bwu *BaseWorkUnit) Error(format string, v ...interface{})
Error logs message with unitID prepended.
func (*BaseWorkUnit) ID ¶
func (bwu *BaseWorkUnit) ID() string
ID returns the unique identifier of this work unit.
func (*BaseWorkUnit) Info ¶
func (bwu *BaseWorkUnit) Info(format string, v ...interface{})
Info logs message with unitID prepended.
func (*BaseWorkUnit) Init ¶
func (bwu *BaseWorkUnit) Init(w *Workceptor, unitID string, workType string, fs FileSystemer, watcher WatcherWrapper)
Init initializes the basic work unit data, in memory only.
func (*BaseWorkUnit) LastUpdateError ¶
func (bwu *BaseWorkUnit) LastUpdateError() error
LastUpdateError returns the last error (including nil) resulting from an UpdateBasicStatus or UpdateFullStatus.
func (*BaseWorkUnit) MonitorLocalStatus ¶
func (bwu *BaseWorkUnit) MonitorLocalStatus()
MonitorLocalStatus watches a unit dir and keeps the in-memory workUnit up to date with status changes.
func (*BaseWorkUnit) Release ¶
func (bwu *BaseWorkUnit) Release(force bool) error
Release releases this unit of work, deleting its files.
func (*BaseWorkUnit) SetFromParams ¶
func (bwu *BaseWorkUnit) SetFromParams(_ map[string]string) error
SetFromParams sets the in-memory state from parameters.
func (*BaseWorkUnit) Status ¶
func (bwu *BaseWorkUnit) Status() *StatusFileData
Status returns a copy of the status currently loaded in memory (use Load to get it from disk).
func (*BaseWorkUnit) StatusFileName ¶
func (bwu *BaseWorkUnit) StatusFileName() string
StatusFileName returns the full path to the status file in the unit dir.
func (*BaseWorkUnit) StdoutFileName ¶
func (bwu *BaseWorkUnit) StdoutFileName() string
StdoutFileName returns the full path to the stdout file in the unit dir.
func (*BaseWorkUnit) UnitDir ¶
func (bwu *BaseWorkUnit) UnitDir() string
UnitDir returns the unit directory of this work unit.
func (*BaseWorkUnit) UnredactedStatus ¶
func (bwu *BaseWorkUnit) UnredactedStatus() *StatusFileData
UnredactedStatus returns a copy of the status currently loaded in memory, including secrets.
func (*BaseWorkUnit) UpdateBasicStatus ¶
func (bwu *BaseWorkUnit) UpdateBasicStatus(state int, detail string, stdoutSize int64)
UpdateBasicStatus atomically updates key fields in the status metadata file. Errors are logged rather than returned. Passing -1 as stdoutSize leaves it unchanged.
func (*BaseWorkUnit) UpdateFullStatus ¶
func (bwu *BaseWorkUnit) UpdateFullStatus(statusFunc func(*StatusFileData))
UpdateFullStatus atomically updates the whole status record. Changes should be made in the callback function. Errors are logged rather than returned.
func (*BaseWorkUnit) Warning ¶
func (bwu *BaseWorkUnit) Warning(format string, v ...interface{})
Warning logs message with unitID prepended.
type CommandWorkerCfg ¶
type CommandWorkerCfg struct { WorkType string `required:"true" description:"Name for this worker type"` Command string `required:"true" description:"Command to run to process units of work"` Params string `description:"Command-line parameters"` AllowRuntimeParams bool `description:"Allow users to add more parameters" default:"false"` VerifySignature bool `description:"Verify a signed work submission" default:"false"` }
CommandWorkerCfg is the cmdline configuration object for a worker that runs a command.
func (CommandWorkerCfg) GetVerifySignature ¶
func (cfg CommandWorkerCfg) GetVerifySignature() bool
func (CommandWorkerCfg) GetWorkType ¶
func (cfg CommandWorkerCfg) GetWorkType() string
func (CommandWorkerCfg) NewWorker ¶
func (cfg CommandWorkerCfg) NewWorker(w *Workceptor, unitID string, workType string) WorkUnit
type FileReadCloser ¶
type FileReadCloser interface { io.ReadCloser }
FileReadCloser wraps io.ReadCloser.
type FileSystem ¶
type FileSystem struct{}
FileSystem represents the real filesystem.
func (FileSystem) RemoveAll ¶
func (FileSystem) RemoveAll(path string) error
RemoveAll removes path and any children it contains.
type FileSystemer ¶
type FileSystemer interface { OpenFile(name string, flag int, perm os.FileMode) (*os.File, error) Stat(name string) (os.FileInfo, error) Open(name string) (*os.File, error) RemoveAll(path string) error }
FileSystemer represents a filesystem.
type FileWriteCloser ¶
type FileWriteCloser interface { io.WriteCloser }
FileWriteCloser wraps io.WriteCloser.
type KubeWorkerCfg ¶
type KubeWorkerCfg struct { WorkType string `required:"true" description:"Name for this worker type"` Namespace string `description:"Kubernetes namespace to create pods in"` Image string `description:"Container image to use for the worker pod"` Command string `description:"Command to run in the container (overrides entrypoint)"` Params string `description:"Command-line parameters to pass to the entrypoint"` AuthMethod string `description:"One of: kubeconfig, incluster" default:"incluster"` KubeConfig string `description:"Kubeconfig filename (for authmethod=kubeconfig)"` Pod string `description:"Pod definition filename, in json or yaml format"` AllowRuntimeAuth bool `description:"Allow passing API parameters at runtime" default:"false"` AllowRuntimeCommand bool `description:"Allow specifying image & command at runtime" default:"false"` AllowRuntimeParams bool `description:"Allow adding command parameters at runtime" default:"false"` AllowRuntimePod bool `description:"Allow passing Pod at runtime" default:"false"` DeletePodOnRestart bool `description:"On restart, delete the pod if in pending state" default:"true"` StreamMethod string `description:"Method for connecting to worker pods: logger or tcp" default:"logger"` VerifySignature bool `description:"Verify a signed work submission" default:"false"` }
KubeWorkerCfg is the cmdline configuration object for a Kubernetes worker plugin.
func (KubeWorkerCfg) GetVerifySignature ¶
func (cfg KubeWorkerCfg) GetVerifySignature() bool
func (KubeWorkerCfg) GetWorkType ¶
func (cfg KubeWorkerCfg) GetWorkType() string
func (KubeWorkerCfg) NewWorker ¶
func (cfg KubeWorkerCfg) NewWorker(w *Workceptor, unitID string, workType string) WorkUnit
NewWorker is a factory to produce worker instances.
func (KubeWorkerCfg) Prepare ¶
func (cfg KubeWorkerCfg) Prepare() error
Prepare inspects the configuration for validity.
type NetceptorForWorkceptor ¶
type NetceptorForWorkceptor interface { NodeID() string AddWorkCommand(typeName string, verifySignature bool) error GetClientTLSConfig(name string, expectedHostName string, expectedHostNameType netceptor.ExpectedHostnameType) (*tls.Config, error) // have a common pkg for types GetLogger() *logger.ReceptorLogger DialContext(ctx context.Context, node string, service string, tlscfg *tls.Config) (*netceptor.Conn, error) // create an interface for Conn }
NetceptorForWorkceptor is a interface to decouple workceptor from netceptor. it includes only the functions that workceptor uses.
type NewWorkerFunc ¶
type NewWorkerFunc func(w *Workceptor, unitID string, workType string) WorkUnit
NewWorkerFunc represents a factory of WorkUnit instances.
type RealWatcher ¶
type RealWatcher struct {
// contains filtered or unexported fields
}
func (*RealWatcher) Add ¶
func (rw *RealWatcher) Add(name string) error
func (*RealWatcher) Close ¶
func (rw *RealWatcher) Close() error
func (*RealWatcher) EventChannel ¶
func (rw *RealWatcher) EventChannel() chan fsnotify.Event
type STDinReader ¶
type STDinReader struct {
// contains filtered or unexported fields
}
STDinReader reads from a stdin file and provides a Done function.
func NewStdinReader ¶
func NewStdinReader(fs FileSystemer, unitdir string) (*STDinReader, error)
NewStdinReader allocates a new stdinReader, which reads from a stdin file and provides a Done function.
func (*STDinReader) Done ¶
func (sr *STDinReader) Done() <-chan struct{}
Done returns a channel that will be closed on error (including EOF) in the reader.
func (*STDinReader) Error ¶
func (sr *STDinReader) Error() error
Error returns the most recent error encountered in the reader.
func (*STDinReader) Read ¶
func (sr *STDinReader) Read(p []byte) (n int, err error)
Read reads data from the stdout file, implementing io.Reader.
func (*STDinReader) SetReader ¶
func (sr *STDinReader) SetReader(reader FileReadCloser)
SetReader sets the reader var.
type STDoutWriter ¶
type STDoutWriter struct {
// contains filtered or unexported fields
}
STDoutWriter writes to a stdout file while also updating the status file.
func NewStdoutWriter ¶
func NewStdoutWriter(fs FileSystemer, unitdir string) (*STDoutWriter, error)
NewStdoutWriter allocates a new stdoutWriter, which writes to both the stdout and status files.
func (*STDoutWriter) SetWriter ¶
func (sw *STDoutWriter) SetWriter(writer FileWriteCloser)
SetWriter sets the writer var.
func (*STDoutWriter) Size ¶
func (sw *STDoutWriter) Size() int64
Size returns the current size of the stdout file.
type SigningKeyPrivateCfg ¶
type SigningKeyPrivateCfg struct { PrivateKey string `description:"Private key to sign work submissions" barevalue:"yes" default:""` TokenExpiration string `description:"Expiration of the signed json web token, e.g. 3h or 3h30m" default:""` }
func (SigningKeyPrivateCfg) Prepare ¶
func (cfg SigningKeyPrivateCfg) Prepare() error
func (SigningKeyPrivateCfg) PrepareSigningKeyPrivateCfg ¶
func (cfg SigningKeyPrivateCfg) PrepareSigningKeyPrivateCfg() (*time.Duration, error)
type StatusFileData ¶
type StatusFileData struct { State int Detail string StdoutSize int64 WorkType string ExtraData interface{} }
StatusFileData is the structure of the JSON data saved to a status file. This struct should only contain value types, except for ExtraData.
func (*StatusFileData) Load ¶
func (sfd *StatusFileData) Load(filename string) error
Load loads status from a file.
func (*StatusFileData) Save ¶
func (sfd *StatusFileData) Save(filename string) error
Save saves status to a file.
func (*StatusFileData) UpdateBasicStatus ¶
func (sfd *StatusFileData) UpdateBasicStatus(filename string, state int, detail string, stdoutSize int64) error
UpdateBasicStatus atomically updates key fields in the status metadata file. Errors are logged rather than returned. Passing -1 as stdoutSize leaves it unchanged.
func (*StatusFileData) UpdateFullStatus ¶
func (sfd *StatusFileData) UpdateFullStatus(filename string, statusFunc func(*StatusFileData)) error
UpdateFullStatus atomically updates the status metadata file. Changes should be made in the callback function. Errors are logged rather than returned.
type VerifyingKeyPublicCfg ¶
type VerifyingKeyPublicCfg struct {
PublicKey string `description:"Public key to verify signed work submissions" barevalue:"yes" default:""`
}
func (VerifyingKeyPublicCfg) Prepare ¶
func (cfg VerifyingKeyPublicCfg) Prepare() error
func (VerifyingKeyPublicCfg) PrepareVerifyingKeyPublicCfg ¶
func (cfg VerifyingKeyPublicCfg) PrepareVerifyingKeyPublicCfg() error
type WatcherWrapper ¶
type WatcherWrapper interface { Add(name string) error Close() error EventChannel() chan fsnotify.Event }
WatcherWrapper is wrapping the fsnofity Watcher struct and exposing the Event chan within.
type WorkUnit ¶
type WorkUnit interface { ID() string UnitDir() string StatusFileName() string StdoutFileName() string Save() error Load() error SetFromParams(params map[string]string) error UpdateBasicStatus(state int, detail string, stdoutSize int64) UpdateFullStatus(statusFunc func(*StatusFileData)) LastUpdateError() error Status() *StatusFileData UnredactedStatus() *StatusFileData Start() error Restart() error Cancel() error Release(force bool) error }
WorkUnit represents a local unit of work.
type Workceptor ¶
type Workceptor struct { Cancel context.CancelFunc SigningKey string SigningExpiration time.Duration VerifyingKey string // contains filtered or unexported fields }
Workceptor is the main object that handles unit-of-work management.
var MainInstance *Workceptor
MainInstance is the global instance of Workceptor instantiated by the command-line main() function.
func New ¶
func New(ctx context.Context, nc NetceptorForWorkceptor, dataDir string) (*Workceptor, error)
New constructs a new Workceptor instance.
func (*Workceptor) AllocateRemoteUnit ¶
func (w *Workceptor) AllocateRemoteUnit(remoteNode, remoteWorkType, tlsClient, ttl string, signWork bool, params map[string]string) (WorkUnit, error)
AllocateRemoteUnit creates a new remote work unit and generates a local identifier for it.
func (*Workceptor) AllocateUnit ¶
AllocateUnit creates a new local work unit and generates an identifier for it.
func (*Workceptor) CancelUnit ¶
func (w *Workceptor) CancelUnit(unitID string) error
CancelUnit cancels a unit of work, killing any processes.
func (*Workceptor) GetResults ¶
func (w *Workceptor) GetResults(ctx context.Context, unitID string, startPos int64) (chan []byte, error)
GetResults returns a live stream of the results of a unit.
func (*Workceptor) ListKnownUnitIDs ¶
func (w *Workceptor) ListKnownUnitIDs() []string
ListKnownUnitIDs returns a slice containing the known unit IDs.
func (*Workceptor) RegisterWithControlService ¶
func (w *Workceptor) RegisterWithControlService(cs *controlsvc.Server) error
RegisterWithControlService registers this workceptor instance with a control service instance.
func (*Workceptor) RegisterWorker ¶
func (w *Workceptor) RegisterWorker(typeName string, newWorkerFunc NewWorkerFunc, verifySignature bool) error
RegisterWorker notifies the Workceptor of a new kind of work that can be done.
func (*Workceptor) ReleaseUnit ¶
func (w *Workceptor) ReleaseUnit(unitID string, force bool) error
ReleaseUnit releases (deletes) resources from a unit of work, including stdout. Release implies Cancel.
func (*Workceptor) ShouldVerifySignature ¶
func (w *Workceptor) ShouldVerifySignature(workType string, signWork bool) bool
func (*Workceptor) StartUnit ¶
func (w *Workceptor) StartUnit(unitID string) error
StartUnit starts a unit of work.
func (*Workceptor) UnitStatus ¶
func (w *Workceptor) UnitStatus(unitID string) (*StatusFileData, error)
UnitStatus returns the state of a unit.
func (*Workceptor) VerifySignature ¶
func (w *Workceptor) VerifySignature(signature string) error
type WorkerConfig ¶
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package mock_workceptor is a generated GoMock package.
|
Package mock_workceptor is a generated GoMock package. |