Documentation ¶
Index ¶
- Constants
- func SemaSize(id string) int
- func TypeCmp(a, b reflect.Value) error
- type Engine
- func (obj *Engine) Abort() error
- func (obj *Engine) Apply(fn func(*pgraph.Graph) error) error
- func (obj *Engine) AutoEdge() error
- func (obj *Engine) AutoGroup(ag engine.AutoGrouper) error
- func (obj *Engine) BadTimestamps(vertex pgraph.Vertex) []pgraph.Vertex
- func (obj *Engine) Close() error
- func (obj *Engine) Commit() error
- func (obj *Engine) Graph() *pgraph.Graph
- func (obj *Engine) Init() error
- func (obj *Engine) Load(newGraph *pgraph.Graph) error
- func (obj *Engine) OKTimestamp(vertex pgraph.Vertex) bool
- func (obj *Engine) Pause(fastPause bool) error
- func (obj *Engine) Process(vertex pgraph.Vertex) error
- func (obj *Engine) RefreshPending(vertex pgraph.Vertex) bool
- func (obj *Engine) Resume() error
- func (obj *Engine) ReversalList() (map[string]string, error)
- func (obj *Engine) Reversals() error
- func (obj *Engine) SendRecv(res engine.RecvableRes) (map[string]bool, error)
- func (obj *Engine) SetDownstreamRefresh(vertex pgraph.Vertex, b bool)
- func (obj *Engine) SetFastPause()
- func (obj *Engine) SetUpstreamRefresh(vertex pgraph.Vertex, b bool)
- func (obj *Engine) Validate() error
- func (obj *Engine) Worker(vertex pgraph.Vertex) error
- type State
- func (obj *State) Close() error
- func (obj *State) Init() error
- func (obj *State) Pause() error
- func (obj *State) Poke()
- func (obj *State) Resume()
- func (obj *State) ReversalClose() error
- func (obj *State) ReversalDelete() error
- func (obj *State) ReversalInit() error
- func (obj *State) ReversalWrite(str string, overwrite bool) error
Constants ¶
const ( // ReverseFile is the file name in the resource state dir where any // reversal information is stored. ReverseFile = "reverse" // ReversePerm is the permissions mode used to create the ReverseFile. ReversePerm = 0600 )
const SemaSep = ":"
SemaSep is the trailing separator to split the semaphore id from the size.
const ( // StateDir is the name of the sub directory where all the local // resource state is stored. StateDir = "state" )
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Engine ¶
type Engine struct { Program string Hostname string World engine.World // Prefix is a unique directory prefix which can be used. It should be // created if needed. Prefix string Converger *converger.Coordinator Debug bool Logf func(format string, v ...interface{}) // contains filtered or unexported fields }
Engine encapsulates a generic graph and manages its operations.
func (*Engine) Abort ¶
Abort the pending graph and any work in progress on it. After this call you may Load a new graph.
func (*Engine) Apply ¶
Apply a function to the pending graph. You must pass in a function which will receive this graph as input, and return an error if something does not succeed.
func (*Engine) AutoGroup ¶
func (obj *Engine) AutoGroup(ag engine.AutoGrouper) error
AutoGroup runs the auto grouping on the loaded graph.
func (*Engine) BadTimestamps ¶
BadTimestamps returns the list of vertices that are causing our timestamp to be bad.
func (*Engine) Commit ¶
Commit runs a graph sync and swaps the loaded graph with the current one. If it errors, then the running graph wasn't changed. It is recommended that you pause the engine before running this, and resume it after you're done.
func (*Engine) Init ¶
Init initializes the internal structures and starts this the graph running. If the struct does not validate, or it cannot initialize, then this errors. Initially it will contain an empty graph.
func (*Engine) Load ¶
Load a new graph into the engine. Offline graph operations will be performed on this graph. To switch it to the active graph, and run it, use Commit.
func (*Engine) OKTimestamp ¶
OKTimestamp returns true if this vertex can run right now.
func (*Engine) Process ¶
Process is the primary function to execute a particular vertex in the graph.
func (*Engine) RefreshPending ¶
RefreshPending determines if any previous nodes have a refresh pending here. If this is true, it means I am expected to apply a refresh when I next run.
func (*Engine) Resume ¶
Resume runs the currently active graph. It also un-pauses the graph if it was paused. Very little that is interesting should happen here. It all happens in the Commit method. After Commit, new things are already started, but we still need to Resume any pre-existing resources.
func (*Engine) ReversalList ¶
ReversalList returns all the available pending reversal data on this host. It can then be decoded by whatever method is appropriate for.
func (*Engine) Reversals ¶
Reversals adds the reversals onto the loaded graph. This should happen last, and before Commit.
func (*Engine) SendRecv ¶
SendRecv pulls in the sent values into the receive slots. It is called by the receiver and must be given as input the full resource struct to receive on. It applies the loaded values to the resource.
func (*Engine) SetDownstreamRefresh ¶
SetDownstreamRefresh sets the refresh value to any downstream vertices.
func (*Engine) SetFastPause ¶
func (obj *Engine) SetFastPause()
SetFastPause puts the graph into fast pause mode. This is usually done via the argument to the Pause command, but this method can be used if a pause was already started, and you'd like subsequent parts to pause quickly. Once in fast pause mode for a given pause action, you cannot switch to regular pause. This is because once you've started a fast pause, some dependencies might have been skipped when fast pausing, and future resources might have missed a poke. In general this is only called when you're trying to hurry up the exit. XXX: Not implemented
func (*Engine) SetUpstreamRefresh ¶
SetUpstreamRefresh sets the refresh value to any upstream vertices.
func (*Engine) Validate ¶
Validate validates the pending graph to ensure it is appropriate for the engine. This should be called before Commit to avoid any surprises there! This prevents an error on Commit which could cause an engine shutdown.
func (*Engine) Worker ¶
Worker is the common run frontend of the vertex. It handles all of the retry and retry delay common code, and ultimately returns the final status of this vertex execution. This function cannot be "re-run" for the same vertex. The retry mechanism stuff happens inside of this. To actually "re-run" you need to remove the vertex and build a new one. The engine guarantees that we do not allow CheckApply to run while we are paused. That is enforced here.
type State ¶
type State struct { // Graph is a pointer to the graph that this vertex is part of. Graph *pgraph.Graph // Vertex is the pointer in the graph that this state corresponds to. It // can be converted to a `Res` if necessary. // TODO: should this be passed in on Init instead? Vertex pgraph.Vertex Program string Hostname string World engine.World // Prefix is a unique directory prefix which can be used. It should be // created if needed. Prefix string // Debug turns on additional output and behaviours. Debug bool // Logf is the logging function that should be used to display messages. Logf func(format string, v ...interface{}) // contains filtered or unexported fields }
State stores some state about the resource it is mapped to.
func (*State) Close ¶
Close shuts down and performs any cleanup. This is most akin to a "post" or cleanup command as the initiator for closing a vertex happens in graph sync.
func (*State) Pause ¶
Pause pauses this resource. It should not be called on any already paused resource. It will block until the resource pauses with an acknowledgment, or until an exit for that resource is seen. If the latter happens it will error. It is NOT thread-safe with the Resume() method so only call either one at a time.
func (*State) Poke ¶
func (obj *State) Poke()
Poke sends a notification on the poke channel. This channel is used to notify the Worker to run the Process/CheckApply when it can. This is used when there is a need to schedule or reschedule some work which got postponed or dropped. This doesn't contain any internal synchronization primitives or wait groups, callers are expected to make sure that they don't leave any of these running by the time the Worker() shuts down.
func (*State) Resume ¶
func (obj *State) Resume()
Resume unpauses this resource. It can be safely called on a brand-new resource that has just started running without incident. It is NOT thread-safe with the Pause() method, so only call either one at a time.
func (*State) ReversalClose ¶
ReversalClose performs the reversal shutdown steps if necessary for this resource.
func (*State) ReversalDelete ¶
ReversalDelete removes the reversal state information for this resource.
func (*State) ReversalInit ¶
ReversalInit performs the reversal initialization steps if necessary for this resource.