Documentation ¶
Index ¶
- Constants
- Variables
- func IsComplete(workState int) bool
- func IsPending(err error) bool
- func WorkStateToString(workState int) string
- type BaseWorkUnit
- func (bwu *BaseWorkUnit) ID() string
- func (bwu *BaseWorkUnit) Init(w *Workceptor, unitID string, workType string)
- func (bwu *BaseWorkUnit) LastUpdateError() error
- func (bwu *BaseWorkUnit) Load() error
- func (bwu *BaseWorkUnit) Release(force bool) error
- func (bwu *BaseWorkUnit) Save() error
- func (bwu *BaseWorkUnit) SetFromParams(params map[string]string) error
- func (bwu *BaseWorkUnit) Status() *StatusFileData
- func (bwu *BaseWorkUnit) StatusFileName() string
- func (bwu *BaseWorkUnit) StdoutFileName() string
- func (bwu *BaseWorkUnit) UnitDir() string
- func (bwu *BaseWorkUnit) UnredactedStatus() *StatusFileData
- func (bwu *BaseWorkUnit) UpdateBasicStatus(state int, detail string, stdoutSize int64)
- func (bwu *BaseWorkUnit) UpdateFullStatus(statusFunc func(*StatusFileData))
- type NewWorkerFunc
- type StatusFileData
- func (sfd *StatusFileData) Load(filename string) error
- func (sfd *StatusFileData) Save(filename string) error
- func (sfd *StatusFileData) UpdateBasicStatus(filename string, state int, detail string, stdoutSize int64) error
- func (sfd *StatusFileData) UpdateFullStatus(filename string, statusFunc func(*StatusFileData)) error
- type WorkUnit
- type Workceptor
- func (w *Workceptor) AllocateRemoteUnit(remoteNode, remoteWorkType, tlsClient, ttl string, signWork bool, ...) (WorkUnit, error)
- func (w *Workceptor) AllocateUnit(workTypeName string, params map[string]string) (WorkUnit, error)
- func (w *Workceptor) CancelUnit(unitID string) error
- func (w *Workceptor) GetResults(ctx context.Context, unitID string, startPos int64) (chan []byte, error)
- func (w *Workceptor) ListKnownUnitIDs() []string
- func (w *Workceptor) RegisterWithControlService(cs *controlsvc.Server) error
- func (w *Workceptor) RegisterWorker(typeName string, newWorkerFunc NewWorkerFunc, verifySignature bool) error
- func (w *Workceptor) ReleaseUnit(unitID string, force bool) error
- func (w *Workceptor) ShouldVerifySignature(workType string, signWork bool) bool
- func (w *Workceptor) StartUnit(unitID string) error
- func (w *Workceptor) UnitStatus(unitID string) (*StatusFileData, error)
- func (w *Workceptor) VerifySignature(signature string) error
Constants ¶
const ( SuccessWorkSleep = 1 * time.Second // Normal time to wait between checks MaxWorkSleep = 1 * time.Minute // Max time to ever wait between checks )
Work sleep constants.
const ( WorkStatePending = 0 WorkStateRunning = 1 WorkStateSucceeded = 2 WorkStateFailed = 3 )
Work state constants.
Variables ¶
var ErrImagePullBackOff = fmt.Errorf("container failed to start")
ErrImagePullBackOff is returned when the image for the container in the Pod cannot be pulled.
var ErrPending = fmt.Errorf("operation pending")
ErrPending is returned when an operation hasn't succeeded or failed yet.
var ErrPodCompleted = fmt.Errorf("pod ran to completion")
ErrPodCompleted is returned when pod has already completed before we could attach.
Functions ¶
func IsComplete ¶
IsComplete returns true if a given WorkState indicates the job is finished.
func WorkStateToString ¶
WorkStateToString returns a string representation of a WorkState.
Types ¶
type BaseWorkUnit ¶
type BaseWorkUnit struct {
// contains filtered or unexported fields
}
BaseWorkUnit includes data common to all work units, and partially implements the WorkUnit interface.
func (*BaseWorkUnit) ID ¶
func (bwu *BaseWorkUnit) ID() string
ID returns the unique identifier of this work unit.
func (*BaseWorkUnit) Init ¶
func (bwu *BaseWorkUnit) Init(w *Workceptor, unitID string, workType string)
Init initializes the basic work unit data, in memory only.
func (*BaseWorkUnit) LastUpdateError ¶
func (bwu *BaseWorkUnit) LastUpdateError() error
LastUpdateError returns the last error (including nil) resulting from an UpdateBasicStatus or UpdateFullStatus.
func (*BaseWorkUnit) Release ¶
func (bwu *BaseWorkUnit) Release(force bool) error
Release releases this unit of work, deleting its files.
func (*BaseWorkUnit) SetFromParams ¶
func (bwu *BaseWorkUnit) SetFromParams(params map[string]string) error
SetFromParams sets the in-memory state from parameters.
func (*BaseWorkUnit) Status ¶
func (bwu *BaseWorkUnit) Status() *StatusFileData
Status returns a copy of the status currently loaded in memory (use Load to get it from disk).
func (*BaseWorkUnit) StatusFileName ¶
func (bwu *BaseWorkUnit) StatusFileName() string
StatusFileName returns the full path to the status file in the unit dir.
func (*BaseWorkUnit) StdoutFileName ¶
func (bwu *BaseWorkUnit) StdoutFileName() string
StdoutFileName returns the full path to the stdout file in the unit dir.
func (*BaseWorkUnit) UnitDir ¶
func (bwu *BaseWorkUnit) UnitDir() string
UnitDir returns the unit directory of this work unit.
func (*BaseWorkUnit) UnredactedStatus ¶
func (bwu *BaseWorkUnit) UnredactedStatus() *StatusFileData
UnredactedStatus returns a copy of the status currently loaded in memory, including secrets.
func (*BaseWorkUnit) UpdateBasicStatus ¶
func (bwu *BaseWorkUnit) UpdateBasicStatus(state int, detail string, stdoutSize int64)
UpdateBasicStatus atomically updates key fields in the status metadata file. Errors are logged rather than returned. Passing -1 as stdoutSize leaves it unchanged.
func (*BaseWorkUnit) UpdateFullStatus ¶
func (bwu *BaseWorkUnit) UpdateFullStatus(statusFunc func(*StatusFileData))
UpdateFullStatus atomically updates the whole status record. Changes should be made in the callback function. Errors are logged rather than returned.
type NewWorkerFunc ¶
type NewWorkerFunc func(w *Workceptor, unitID string, workType string) WorkUnit
NewWorkerFunc represents a factory of WorkUnit instances.
type StatusFileData ¶
type StatusFileData struct { State int Detail string StdoutSize int64 WorkType string ExtraData interface{} }
StatusFileData is the structure of the JSON data saved to a status file. This struct should only contain value types, except for ExtraData.
func (*StatusFileData) Load ¶
func (sfd *StatusFileData) Load(filename string) error
Load loads status from a file.
func (*StatusFileData) Save ¶
func (sfd *StatusFileData) Save(filename string) error
Save saves status to a file.
func (*StatusFileData) UpdateBasicStatus ¶
func (sfd *StatusFileData) UpdateBasicStatus(filename string, state int, detail string, stdoutSize int64) error
UpdateBasicStatus atomically updates key fields in the status metadata file. Errors are logged rather than returned. Passing -1 as stdoutSize leaves it unchanged.
func (*StatusFileData) UpdateFullStatus ¶
func (sfd *StatusFileData) UpdateFullStatus(filename string, statusFunc func(*StatusFileData)) error
UpdateFullStatus atomically updates the status metadata file. Changes should be made in the callback function. Errors are logged rather than returned.
type WorkUnit ¶
type WorkUnit interface { ID() string UnitDir() string StatusFileName() string StdoutFileName() string Save() error Load() error SetFromParams(params map[string]string) error UpdateBasicStatus(state int, detail string, stdoutSize int64) UpdateFullStatus(statusFunc func(*StatusFileData)) LastUpdateError() error Status() *StatusFileData UnredactedStatus() *StatusFileData Start() error Restart() error Cancel() error Release(force bool) error }
WorkUnit represents a local unit of work.
type Workceptor ¶
type Workceptor struct {
// contains filtered or unexported fields
}
Workceptor is the main object that handles unit-of-work management.
var MainInstance *Workceptor
MainInstance is the global instance of Workceptor instantiated by the command-line main() function.
func (*Workceptor) AllocateRemoteUnit ¶
func (w *Workceptor) AllocateRemoteUnit(remoteNode, remoteWorkType, tlsClient, ttl string, signWork bool, params map[string]string) (WorkUnit, error)
AllocateRemoteUnit creates a new remote work unit and generates a local identifier for it.
func (*Workceptor) AllocateUnit ¶
AllocateUnit creates a new local work unit and generates an identifier for it.
func (*Workceptor) CancelUnit ¶
func (w *Workceptor) CancelUnit(unitID string) error
CancelUnit cancels a unit of work, killing any processes.
func (*Workceptor) GetResults ¶
func (w *Workceptor) GetResults(ctx context.Context, unitID string, startPos int64) (chan []byte, error)
GetResults returns a live stream of the results of a unit.
func (*Workceptor) ListKnownUnitIDs ¶
func (w *Workceptor) ListKnownUnitIDs() []string
ListKnownUnitIDs returns a slice containing the known unit IDs.
func (*Workceptor) RegisterWithControlService ¶
func (w *Workceptor) RegisterWithControlService(cs *controlsvc.Server) error
RegisterWithControlService registers this workceptor instance with a control service instance.
func (*Workceptor) RegisterWorker ¶
func (w *Workceptor) RegisterWorker(typeName string, newWorkerFunc NewWorkerFunc, verifySignature bool) error
RegisterWorker notifies the Workceptor of a new kind of work that can be done.
func (*Workceptor) ReleaseUnit ¶
func (w *Workceptor) ReleaseUnit(unitID string, force bool) error
ReleaseUnit releases (deletes) resources from a unit of work, including stdout. Release implies Cancel.
func (*Workceptor) ShouldVerifySignature ¶
func (w *Workceptor) ShouldVerifySignature(workType string, signWork bool) bool
func (*Workceptor) StartUnit ¶
func (w *Workceptor) StartUnit(unitID string) error
StartUnit starts a unit of work.
func (*Workceptor) UnitStatus ¶
func (w *Workceptor) UnitStatus(unitID string) (*StatusFileData, error)
UnitStatus returns the state of a unit.
func (*Workceptor) VerifySignature ¶
func (w *Workceptor) VerifySignature(signature string) error