Documentation ¶
Overview ¶
Package process is for background processes and listed at the ../processes endpoint.
Index ¶
- Constants
- type ErrorAction
- type Process
- func (p *Process) AddError(err error, workStatus *pb.Process_Status, state *pb.Process) ErrorAction
- func (p *Process) AddStats(count float64, name string, state *pb.Process)
- func (p *Process) AddWorkError(err error, workName string, state *pb.Process) ErrorAction
- func (p *Process) AddWorkStats(count float64, stat, workName string, state *pb.Process)
- func (p *Process) DefaultSettings() *pb.Process_Params
- func (p *Process) Progress(state *pb.Process) (Progress, error)
- func (p *Process) RegisterWork(workName string, workParams *pb.Process_Params, tx storage.Tx) (_ *pb.Process_Work, ferr error)
- func (p *Process) Run(ctx context.Context)
- func (p *Process) ScheduleFrequency() time.Duration
- func (p *Process) UnregisterWork(workName string, tx storage.Tx) (ferr error)
- func (p *Process) UpdateFlowControl(initialWaitDuration time.Duration, minScheduleFrequency time.Duration) error
- func (p *Process) UpdateSettings(scheduleFrequency time.Duration, settings *pb.Process_Params, tx storage.Tx) (ferr error)
- type Progress
- type Worker
Constants ¶
const ( // Continue indicates the error was within max error tolerance. Continue ErrorAction = "Continue" // Abort indicates this error exceeds max error tolerance. Abort ErrorAction = "Abort" // Completed indicates that execution has terminated normally due to completion of work. Completed Progress = "Completed" // Updated indicates that the state was updated in storage. Updated Progress = "Updated" // Merged indicates that the state was merged, then updated in storage. Merged Progress = "Merged" // Aborted indicates that errors caused execution to prematurely stop (incomplete). Aborted Progress = "Aborted" // Conflict indicates that the state ownership was taken over by another instance. // Unlike Aborted, the Conflict level indicates that any further writes of state // to storage should not be attempted. Conflict Progress = "Conflict" // None indicates that there was no storage update at this time. None Progress = "None" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ErrorAction ¶
type ErrorAction string
ErrorAction indicates how an AddError or AddWorkError should be handled.
type Process ¶
type Process struct {
// contains filtered or unexported fields
}
Process is a background process that performs work at a scheduled frequency.
func NewProcess ¶
func NewProcess(name string, worker Worker, store storage.Store, scheduleFrequency time.Duration, defaultSettings *pb.Process_Params) *Process
NewProcess creates a new process to perform work of a given name. It will trigger every "scheduleFrequency" and workers will report back status updates to the storage layer every "progressFrequency".
- If the process is not found in the storage layer, it will initialize with "defaultSettings".
- scheduleFrequency may be adjusted to meet schedule frequency constraints.
func (*Process) AddError ¶
func (p *Process) AddError(err error, workStatus *pb.Process_Status, state *pb.Process) ErrorAction
AddError will add error state to a given status block. Set "workStatus" to nil if it is not specific.
func (*Process) AddStats ¶
AddStats will increment metrics of a given name within the process status.
func (*Process) AddWorkError ¶
AddWorkError will add error state to a given work item status block as well as the process status block.
func (*Process) AddWorkStats ¶
AddWorkStats will increment metrics of a given name within the work item and process status.
func (*Process) DefaultSettings ¶
func (p *Process) DefaultSettings() *pb.Process_Params
DefaultSettings returns the default settings.
func (*Process) Progress ¶
Progress is called by workers every 1 or more units of work and may update the underlying state. Returns true if an update occured. Important note: take caution as maps may have been merged with data from storage layer. If so, Merged progress will be returned.
func (*Process) RegisterWork ¶
func (p *Process) RegisterWork(workName string, workParams *pb.Process_Params, tx storage.Tx) (_ *pb.Process_Work, ferr error)
RegisterWork adds a work item to the state for workers to process.
func (*Process) Run ¶
Run schedules a background process. Typically this will be on its own go routine.
func (*Process) ScheduleFrequency ¶
ScheduleFrequency returns schedule frequency.
func (*Process) UnregisterWork ¶
UnregisterWork (eventually) removes a work item from the active state, and allows cleanup work to be performed.
func (*Process) UpdateFlowControl ¶
func (p *Process) UpdateFlowControl(initialWaitDuration time.Duration, minScheduleFrequency time.Duration) error
UpdateFlowControl alters settings for how flow of processing is managed. These are advanced settings and should be carefully managed when used outside of tests. These should be based on the size of the processing work between updates and the expected total time for each run with sufficient tolerance for errors and retries to minimize collisions with 2+ workers grabbing control of the state.
func (*Process) UpdateSettings ¶
func (p *Process) UpdateSettings(scheduleFrequency time.Duration, settings *pb.Process_Params, tx storage.Tx) (ferr error)
UpdateSettings alters resource management settings.
type Worker ¶
type Worker interface { // ProcessActiveWork has a worker perform the work needed to process an active work item. ProcessActiveWork(ctx context.Context, state *pb.Process, workName string, work *pb.Process_Work, process *Process) error // CleanupWork has a worker perform the work needed to clean up a work item that was active previously. CleanupWork(ctx context.Context, state *pb.Process, workName string, process *Process) error // Wait indicates that the worker should wait for the next active cycle to begin. Return false to exit worker. Wait(ctx context.Context, duration time.Duration) bool }
Worker represents a process that perform work on the set of work items provided.