Documentation ¶
Overview ¶
Package graph contains the actual implementation of the resource graph engine that runs the graph of resources in real-time. This package has the algorithm that runs all the graph transitions.
Index ¶
- Constants
- func SemaSize(id string) int
- func SendRecv(res engine.RecvableRes, fn RecvFn) (map[engine.RecvableRes]map[string]*engine.Send, error)
- func TypeCmp(a, b reflect.Value) error
- func UpdatedStrings(updated map[engine.RecvableRes]map[string]*engine.Send) []string
- type Engine
- func (obj *Engine) Abort() error
- func (obj *Engine) Apply(fn func(*pgraph.Graph) error) error
- func (obj *Engine) AutoEdge() error
- func (obj *Engine) AutoGroup(ag engine.AutoGrouper) error
- func (obj *Engine) BadTimestamps(vertex pgraph.Vertex) []pgraph.Vertex
- func (obj *Engine) Commit() error
- func (obj *Engine) Graph() *pgraph.Graph
- func (obj *Engine) Init() error
- func (obj *Engine) Load(newGraph *pgraph.Graph) error
- func (obj *Engine) OKTimestamp(vertex pgraph.Vertex) bool
- func (obj *Engine) Pause(fastPause bool) error
- func (obj *Engine) Process(ctx context.Context, vertex pgraph.Vertex) error
- func (obj *Engine) RefreshPending(vertex pgraph.Vertex) bool
- func (obj *Engine) Resume() error
- func (obj *Engine) ReversalList() (map[string]string, error)
- func (obj *Engine) Reversals() error
- func (obj *Engine) SetDownstreamRefresh(vertex pgraph.Vertex, b bool)
- func (obj *Engine) SetFastPause()
- func (obj *Engine) SetUpstreamRefresh(vertex pgraph.Vertex, b bool)
- func (obj *Engine) Shutdown() error
- func (obj *Engine) Validate() error
- func (obj *Engine) Worker(vertex pgraph.Vertex) error
- type RecvFn
- type State
- func (obj *State) Cleanup() error
- func (obj *State) Init() error
- func (obj *State) Pause() error
- func (obj *State) Poke()
- func (obj *State) Resume()
- func (obj *State) ReversalCleanup() error
- func (obj *State) ReversalDelete() error
- func (obj *State) ReversalInit() error
- func (obj *State) ReversalWrite(str string, overwrite bool) error
Constants ¶
const ( // ReverseFile is the file name in the resource state dir where any // reversal information is stored. ReverseFile = "reverse" // ReversePerm is the permissions mode used to create the ReverseFile. ReversePerm = 0600 )
const SemaSep = ":"
SemaSep is the trailing separator to split the semaphore id from the size.
const ( // StateDir is the name of the sub directory where all the local // resource state is stored. StateDir = "state" )
Variables ¶
This section is empty.
Functions ¶
func SemaSize ¶
SemaSize returns the size integer associated with the semaphore id. It defaults to 1 if not found.
func SendRecv ¶
func SendRecv(res engine.RecvableRes, fn RecvFn) (map[engine.RecvableRes]map[string]*engine.Send, error)
SendRecv pulls in the sent values into the receive slots. It is called by the receiver and must be given as input the full resource struct to receive on. It applies the loaded values to the resource. It is called recursively, as it recurses into any grouped resources found within the first receiver. It returns a map of resource pointer, to resource field key, to changed boolean.
func TypeCmp ¶
TypeCmp compares two reflect values to see if they are the same Kind. It can look into a ptr Kind to see if the underlying pair of ptr's can TypeCmp too!
func UpdatedStrings ¶
UpdatedStrings returns a list of strings showing what was updated after a Send/Recv run returned the updated datastructure. This is useful for logs.
Types ¶
type Engine ¶
type Engine struct { Program string Version string Hostname string Converger *converger.Coordinator Local *local.API World engine.World // Prefix is a unique directory prefix which can be used. It should be // created if needed. Prefix string Debug bool Logf func(format string, v ...interface{}) // contains filtered or unexported fields }
Engine encapsulates a generic graph and manages its operations.
func (*Engine) Abort ¶
Abort the pending graph and any work in progress on it. After this call you may Load a new graph.
func (*Engine) Apply ¶
Apply a function to the pending graph. You must pass in a function which will receive this graph as input, and return an error if something does not succeed.
func (*Engine) AutoGroup ¶
func (obj *Engine) AutoGroup(ag engine.AutoGrouper) error
AutoGroup runs the auto grouping on the loaded graph.
func (*Engine) BadTimestamps ¶
BadTimestamps returns the list of vertices that are causing our timestamp to be bad.
func (*Engine) Commit ¶
Commit runs a graph sync and swaps the loaded graph with the current one. If it errors, then the running graph wasn't changed. It is recommended that you pause the engine before running this, and resume it after you're done.
func (*Engine) Init ¶
Init initializes the internal structures and starts this the graph running. If the struct does not validate, or it cannot initialize, then this errors. Initially it will contain an empty graph.
func (*Engine) Load ¶
Load a new graph into the engine. Offline graph operations will be performed on this graph. To switch it to the active graph, and run it, use Commit.
func (*Engine) OKTimestamp ¶
OKTimestamp returns true if this vertex can run right now.
func (*Engine) Process ¶
Process is the primary function to execute a particular vertex in the graph.
func (*Engine) RefreshPending ¶
RefreshPending determines if any previous nodes have a refresh pending here. If this is true, it means I am expected to apply a refresh when I next run.
func (*Engine) Resume ¶
Resume runs the currently active graph. It also un-pauses the graph if it was paused. Very little that is interesting should happen here. It all happens in the Commit method. After Commit, new things are already started, but we still need to Resume any pre-existing resources. Do not call this concurrently with the Pause method.
func (*Engine) ReversalList ¶
ReversalList returns all the available pending reversal data on this host. It can then be decoded by whatever method is appropriate for.
func (*Engine) Reversals ¶
Reversals adds the reversals onto the loaded graph. This should happen last, and before Commit.
func (*Engine) SetDownstreamRefresh ¶
SetDownstreamRefresh sets the refresh value to any downstream vertices.
func (*Engine) SetFastPause ¶
func (obj *Engine) SetFastPause()
SetFastPause puts the graph into fast pause mode. This is usually done via the argument to the Pause command, but this method can be used if a pause was already started, and you'd like subsequent parts to pause quickly. Once in fast pause mode for a given pause action, you cannot switch to regular pause. This is because once you've started a fast pause, some dependencies might have been skipped when fast pausing, and future resources might have missed a poke. In general this is only called when you're trying to hurry up the exit. XXX: Not implemented
func (*Engine) SetUpstreamRefresh ¶
SetUpstreamRefresh sets the refresh value to any upstream vertices.
func (*Engine) Shutdown ¶
Shutdown the engine. Engine must be already paused before this is run. It is actually just a Load of an empty graph and a Commit. It waits for all the resources to exit before returning.
func (*Engine) Validate ¶
Validate validates the pending graph to ensure it is appropriate for the engine. This should be called before Commit to avoid any surprises there! This prevents an error on Commit which could cause an engine shutdown.
func (*Engine) Worker ¶
Worker is the common run frontend of the vertex. It handles all of the retry and retry delay common code, and ultimately returns the final status of this vertex execution. This function cannot be "re-run" for the same vertex. The retry mechanism stuff happens inside of this. To actually "re-run" you need to remove the vertex and build a new one. The engine guarantees that we do not allow CheckApply to run while we are paused. That is enforced here.
type RecvFn ¶
RecvFn represents a custom Recv function which can be used in place of the stock, built-in one. This is needed if we want to receive from a different resource data source than our own. (Only for special occasions of course!)
type State ¶
type State struct { // Graph is a pointer to the graph that this vertex is part of. Graph *pgraph.Graph // Vertex is the pointer in the graph that this state corresponds to. It // can be converted to a `Res` if necessary. // TODO: should this be passed in on Init instead? Vertex pgraph.Vertex Program string Version string Hostname string Local *local.API World engine.World // Prefix is a unique directory prefix which can be used. It should be // created if needed. Prefix string // Debug turns on additional output and behaviours. Debug bool // Logf is the logging function that should be used to display messages. Logf func(format string, v ...interface{}) // contains filtered or unexported fields }
State stores some state about the resource it is mapped to.
func (*State) Cleanup ¶
Cleanup shuts down and performs any cleanup. This is most akin to a "post" or cleanup command as the initiator for closing a vertex happens in graph sync.
func (*State) Pause ¶
Pause pauses this resource. It must not be called on any already paused resource. It will block until the resource pauses with an acknowledgment, or until an exit for that resource is seen. If the latter happens it will error. It must not be called concurrently with either the Resume() method or itself, so only call these one at a time and alternate between the two.
func (*State) Poke ¶
func (obj *State) Poke()
Poke sends a notification on the poke channel. This channel is used to notify the Worker to run the Process/CheckApply when it can. This is used when there is a need to schedule or reschedule some work which got postponed or dropped. This doesn't contain any internal synchronization primitives or wait groups, callers are expected to make sure that they don't leave any of these running by the time the Worker() shuts down.
func (*State) Resume ¶
func (obj *State) Resume()
Resume unpauses this resource. It can be safely called once on a brand-new resource that has just started running, without incident. It must not be called concurrently with either the Pause() method or itself, so only call these one at a time and alternate between the two.
func (*State) ReversalCleanup ¶
ReversalCleanup performs the reversal shutdown steps if necessary for this resource.
func (*State) ReversalDelete ¶
ReversalDelete removes the reversal state information for this resource.
func (*State) ReversalInit ¶
ReversalInit performs the reversal initialization steps if necessary for this resource.