graph

package
v0.0.0-...-3107dfb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 2, 2025 License: GPL-3.0 Imports: 23 Imported by: 4

Documentation

Overview

Package graph contains the actual implementation of the resource graph engine that runs the graph of resources in real-time. This package has the algorithm that runs all the graph transitions.

Index

Constants

View Source
const (
	// ReverseFile is the file name in the resource state dir where any
	// reversal information is stored.
	ReverseFile = "reverse"

	// ReversePerm is the permissions mode used to create the ReverseFile.
	ReversePerm = 0600
)
View Source
const SemaSep = ":"

SemaSep is the trailing separator to split the semaphore id from the size.

View Source
const (
	// StateDir is the name of the sub directory where all the local
	// resource state is stored.
	StateDir = "state"
)

Variables

This section is empty.

Functions

func SemaSize

func SemaSize(id string) int

SemaSize returns the size integer associated with the semaphore id. It defaults to 1 if not found.

func SendRecv

func SendRecv(res engine.RecvableRes, fn RecvFn) (map[engine.RecvableRes]map[string]*engine.Send, error)

SendRecv pulls in the sent values into the receive slots. It is called by the receiver and must be given as input the full resource struct to receive on. It applies the loaded values to the resource. It is called recursively, as it recurses into any grouped resources found within the first receiver. It returns a map of resource pointer, to resource field key, to changed boolean.

func TypeCmp

func TypeCmp(a, b reflect.Value) error

TypeCmp compares two reflect values to see if they are the same Kind. It can look into a ptr Kind to see if the underlying pair of ptr's can TypeCmp too!

func UpdatedStrings

func UpdatedStrings(updated map[engine.RecvableRes]map[string]*engine.Send) []string

UpdatedStrings returns a list of strings showing what was updated after a Send/Recv run returned the updated datastructure. This is useful for logs.

Types

type Engine

type Engine struct {
	Program  string
	Version  string
	Hostname string

	Converger *converger.Coordinator
	Local     *local.API
	World     engine.World

	// Prefix is a unique directory prefix which can be used. It should be
	// created if needed.
	Prefix string
	Debug  bool
	Logf   func(format string, v ...interface{})
	// contains filtered or unexported fields
}

Engine encapsulates a generic graph and manages its operations.

func (*Engine) Abort

func (obj *Engine) Abort() error

Abort the pending graph and any work in progress on it. After this call you may Load a new graph.

func (*Engine) Apply

func (obj *Engine) Apply(fn func(*pgraph.Graph) error) error

Apply a function to the pending graph. You must pass in a function which will receive this graph as input, and return an error if something does not succeed.

func (*Engine) AutoEdge

func (obj *Engine) AutoEdge() error

AutoEdge adds the automatic edges to the graph.

func (*Engine) AutoGroup

func (obj *Engine) AutoGroup(ag engine.AutoGrouper) error

AutoGroup runs the auto grouping on the loaded graph.

func (*Engine) BadTimestamps

func (obj *Engine) BadTimestamps(vertex pgraph.Vertex) []pgraph.Vertex

BadTimestamps returns the list of vertices that are causing our timestamp to be bad.

func (*Engine) Commit

func (obj *Engine) Commit() error

Commit runs a graph sync and swaps the loaded graph with the current one. If it errors, then the running graph wasn't changed. It is recommended that you pause the engine before running this, and resume it after you're done.

func (*Engine) Graph

func (obj *Engine) Graph() *pgraph.Graph

Graph returns the running graph.

func (*Engine) Init

func (obj *Engine) Init() error

Init initializes the internal structures and starts this the graph running. If the struct does not validate, or it cannot initialize, then this errors. Initially it will contain an empty graph.

func (*Engine) Load

func (obj *Engine) Load(newGraph *pgraph.Graph) error

Load a new graph into the engine. Offline graph operations will be performed on this graph. To switch it to the active graph, and run it, use Commit.

func (*Engine) OKTimestamp

func (obj *Engine) OKTimestamp(vertex pgraph.Vertex) bool

OKTimestamp returns true if this vertex can run right now.

func (*Engine) Pause

func (obj *Engine) Pause(fastPause bool) error

Pause the active, running graph.

func (*Engine) Process

func (obj *Engine) Process(ctx context.Context, vertex pgraph.Vertex) error

Process is the primary function to execute a particular vertex in the graph.

func (*Engine) RefreshPending

func (obj *Engine) RefreshPending(vertex pgraph.Vertex) bool

RefreshPending determines if any previous nodes have a refresh pending here. If this is true, it means I am expected to apply a refresh when I next run.

func (*Engine) Resume

func (obj *Engine) Resume() error

Resume runs the currently active graph. It also un-pauses the graph if it was paused. Very little that is interesting should happen here. It all happens in the Commit method. After Commit, new things are already started, but we still need to Resume any pre-existing resources. Do not call this concurrently with the Pause method.

func (*Engine) ReversalList

func (obj *Engine) ReversalList() (map[string]string, error)

ReversalList returns all the available pending reversal data on this host. It can then be decoded by whatever method is appropriate for.

func (*Engine) Reversals

func (obj *Engine) Reversals() error

Reversals adds the reversals onto the loaded graph. This should happen last, and before Commit.

func (*Engine) SetDownstreamRefresh

func (obj *Engine) SetDownstreamRefresh(vertex pgraph.Vertex, b bool)

SetDownstreamRefresh sets the refresh value to any downstream vertices.

func (*Engine) SetFastPause

func (obj *Engine) SetFastPause()

SetFastPause puts the graph into fast pause mode. This is usually done via the argument to the Pause command, but this method can be used if a pause was already started, and you'd like subsequent parts to pause quickly. Once in fast pause mode for a given pause action, you cannot switch to regular pause. This is because once you've started a fast pause, some dependencies might have been skipped when fast pausing, and future resources might have missed a poke. In general this is only called when you're trying to hurry up the exit. XXX: Not implemented

func (*Engine) SetUpstreamRefresh

func (obj *Engine) SetUpstreamRefresh(vertex pgraph.Vertex, b bool)

SetUpstreamRefresh sets the refresh value to any upstream vertices.

func (*Engine) Shutdown

func (obj *Engine) Shutdown() error

Shutdown the engine. Engine must be already paused before this is run. It is actually just a Load of an empty graph and a Commit. It waits for all the resources to exit before returning.

func (*Engine) Validate

func (obj *Engine) Validate() error

Validate validates the pending graph to ensure it is appropriate for the engine. This should be called before Commit to avoid any surprises there! This prevents an error on Commit which could cause an engine shutdown.

func (*Engine) Worker

func (obj *Engine) Worker(vertex pgraph.Vertex) error

Worker is the common run frontend of the vertex. It handles all of the retry and retry delay common code, and ultimately returns the final status of this vertex execution. This function cannot be "re-run" for the same vertex. The retry mechanism stuff happens inside of this. To actually "re-run" you need to remove the vertex and build a new one. The engine guarantees that we do not allow CheckApply to run while we are paused. That is enforced here.

type RecvFn

type RecvFn func(engine.RecvableRes) (map[string]*engine.Send, error)

RecvFn represents a custom Recv function which can be used in place of the stock, built-in one. This is needed if we want to receive from a different resource data source than our own. (Only for special occasions of course!)

type State

type State struct {
	// Graph is a pointer to the graph that this vertex is part of.
	Graph *pgraph.Graph

	// Vertex is the pointer in the graph that this state corresponds to. It
	// can be converted to a `Res` if necessary.
	// TODO: should this be passed in on Init instead?
	Vertex pgraph.Vertex

	Program  string
	Version  string
	Hostname string

	Local *local.API
	World engine.World

	// Prefix is a unique directory prefix which can be used. It should be
	// created if needed.
	Prefix string

	// Debug turns on additional output and behaviours.
	Debug bool

	// Logf is the logging function that should be used to display messages.
	Logf func(format string, v ...interface{})
	// contains filtered or unexported fields
}

State stores some state about the resource it is mapped to.

func (*State) Cleanup

func (obj *State) Cleanup() error

Cleanup shuts down and performs any cleanup. This is most akin to a "post" or cleanup command as the initiator for closing a vertex happens in graph sync.

func (*State) Init

func (obj *State) Init() error

Init initializes structures like channels.

func (*State) Pause

func (obj *State) Pause() error

Pause pauses this resource. It must not be called on any already paused resource. It will block until the resource pauses with an acknowledgment, or until an exit for that resource is seen. If the latter happens it will error. It must not be called concurrently with either the Resume() method or itself, so only call these one at a time and alternate between the two.

func (*State) Poke

func (obj *State) Poke()

Poke sends a notification on the poke channel. This channel is used to notify the Worker to run the Process/CheckApply when it can. This is used when there is a need to schedule or reschedule some work which got postponed or dropped. This doesn't contain any internal synchronization primitives or wait groups, callers are expected to make sure that they don't leave any of these running by the time the Worker() shuts down.

func (*State) Resume

func (obj *State) Resume()

Resume unpauses this resource. It can be safely called once on a brand-new resource that has just started running, without incident. It must not be called concurrently with either the Pause() method or itself, so only call these one at a time and alternate between the two.

func (*State) ReversalCleanup

func (obj *State) ReversalCleanup() error

ReversalCleanup performs the reversal shutdown steps if necessary for this resource.

func (*State) ReversalDelete

func (obj *State) ReversalDelete() error

ReversalDelete removes the reversal state information for this resource.

func (*State) ReversalInit

func (obj *State) ReversalInit() error

ReversalInit performs the reversal initialization steps if necessary for this resource.

func (*State) ReversalWrite

func (obj *State) ReversalWrite(str string, overwrite bool) error

ReversalWrite stores the reversal state information for this resource.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL