Documentation ¶
Index ¶
- Variables
- type Aggregator
- type ComputeFunc
- type Edge
- type Executor
- type ExecutorCallbacks
- type ExecutorFactory
- type Graph
- func (g *Graph) AddEdge(srcID, dstID string, initValue interface{}) error
- func (g *Graph) AddVertex(id string, initValue interface{})
- func (g *Graph) Aggregator(name string) Aggregator
- func (g *Graph) Aggregators() map[string]Aggregator
- func (g *Graph) BroadcastToNeighbors(v *Vertex, msg message.Message) error
- func (g *Graph) Close() error
- func (g *Graph) RegisterAggregator(name string, aggr Aggregator)
- func (g *Graph) RegisterRelayer(relayer Relayer)
- func (g *Graph) Reset() error
- func (g *Graph) SendMessage(dstID string, msg message.Message) error
- func (g *Graph) Superstep() int
- func (g *Graph) Vertices() map[string]*Vertex
- type GraphConfig
- type Relayer
- type RelayerFunc
- type Vertex
Constants ¶
This section is empty.
Variables ¶
var ( // ErrUnknownEdgeSource is returned by AddEdge when the source vertex // is not present in the graph. ErrUnknownEdgeSource = xerrors.New("source vertex is not part of the graph") // ErrDestinationIsLocal is returned by Relayer instances to indicate // that a message destination is actually owned by the local graph. ErrDestinationIsLocal = xerrors.New("message destination is assigned to the local graph") // ErrInvalidMessageDestination is returned by calls to SendMessage and // BroadcastToNeighbors when the destination cannot be resolved to any // (local or remote) vertex. ErrInvalidMessageDestination = xerrors.New("invalid message destination") )
Functions ¶
This section is empty.
Types ¶
type Aggregator ¶
type Aggregator interface { // Type returns the type of this aggregator. Type() string // Set the aggregator to the specified value. Set(val interface{}) // Get the current aggregator value. Get() interface{} // Aggregate updates the aggregator's value based on the provided value. Aggregate(val interface{}) // Delta returns the change in the aggregator's value since the last // call to Delta. The delta values can be used in distributed // aggregator use-cases to reduce local, partially-aggregated values // into a single value across by feeding them into the Aggregate method // of a top-level aggregator. // // For example, in a distributed counter scenario, each node maintains // its own local counter instance. At the end of each step, the master // node calls delta on each local counter and aggregates the values // to obtain the correct total which is then pushed back to the workers. Delta() interface{} }
Aggregator is implemented by types that provide concurrent-safe aggregation primitives (e.g. counters, min/max, topN).
type ComputeFunc ¶
ComputeFunc is a function that a graph instance invokes on each vertex when executing a superstep.
type Edge ¶
type Edge struct {
// contains filtered or unexported fields
}
Edge represents a directed edge in the Graph.
type Executor ¶
type Executor struct {
// contains filtered or unexported fields
}
Executor wraps a Graph instance and provides an orchestration layer for executing super-steps until an error occurs or an exit condition is met. Users can provide an optional set of callbacks to be executed before and after each super-step.
func NewExecutor ¶
func NewExecutor(g *Graph, cb ExecutorCallbacks) *Executor
NewExecutor returns an Executor instance for graph g that invokes the provided list of callbacks inside each execution loop.
func (*Executor) RunSteps ¶
RunSteps executes at most numStep supersteps unless the context expires, an error occurs or one of the Pre/PostStepKeepRunning callbacks specified at configuration time returns false.
func (*Executor) RunToCompletion ¶
RunToCompletion keeps executing supersteps until the context expires, an error occurs or one of the Pre/PostStepKeepRunning callbacks specified at configuration time returns false.
type ExecutorCallbacks ¶
type ExecutorCallbacks struct { // PreStep, if defined, is invoked before running the next superstep. // This is a good place to initialize variables, aggregators etc. that // will be used for the next superstep. PreStep func(ctx context.Context, g *Graph) error // PostStep, if defined, is invoked after running a superstep. PostStep func(ctx context.Context, g *Graph, activeInStep int) error // PostStepKeepRunning, if defined, is invoked after running a superstep // to decide whether the stop condition for terminating the run has // been met. The number of the active vertices in the last step is // passed as the second argument. PostStepKeepRunning func(ctx context.Context, g *Graph, activeInStep int) (bool, error) }
ExecutorCallbacks encapsulates a series of callbacks that are invoked by an Executor instance on a graph. All callbacks are optional and will be ignored if not specified.
type ExecutorFactory ¶
type ExecutorFactory func(*Graph, ExecutorCallbacks) *Executor
ExecutorFactory is a function that creates new Executor instances.
type Graph ¶
type Graph struct {
// contains filtered or unexported fields
}
Graph implements a parallel graph processor based on the concepts described in the Pregel paper.
func NewGraph ¶
func NewGraph(cfg GraphConfig) (*Graph, error)
NewGraph creates a new Graph instance using the specified configuration. It is important for callers to invoke Close() on the returned graph instance when they are done using it.
func (*Graph) AddEdge ¶
AddEdge inserts a directed edge from src to destination and annotates it with the specified initValue. By design, edges are owned by the source vertices (destinations can be either local or remote) and therefore srcID must resolve to a local vertex. Otherwise, AddEdge returns an error.
func (*Graph) AddVertex ¶
AddVertex inserts a new vertex with the specified id and initial value into the graph. If the vertex already exists, AddVertex will just overwrite its value with the provided initValue.
func (*Graph) Aggregator ¶
func (g *Graph) Aggregator(name string) Aggregator
Aggregator returns the aggregator with the specified name or nil if the aggregator does not exist
func (*Graph) Aggregators ¶
func (g *Graph) Aggregators() map[string]Aggregator
Aggregators returns a map of all currently registered aggregators where the key is the aggregator's name.
func (*Graph) BroadcastToNeighbors ¶
BroadcastToNeighbors is a helper function that broadcasts a single message to each neighbor of a particular vertex. Messages are queued for delivery and will be processed by receipients in the next superstep.
func (*Graph) RegisterAggregator ¶
func (g *Graph) RegisterAggregator(name string, aggr Aggregator)
RegisterAggregator adds an aggregator with the specified name into the graph.
func (*Graph) RegisterRelayer ¶
RegisterRelayer configures a Relayer that the graph will invoke when attempting to deliver a message to a vertex that is not known locally but could potentially be owned by a remote graph instance.
func (*Graph) Reset ¶
Reset the state of the graph by removing any existing vertices or aggregators and resetting the superstep counter.
func (*Graph) SendMessage ¶
SendMessage attempts to deliver a message to the vertex with the specified destination ID. Messages are queued for delivery and will be processed by receipients in the next superstep.
If the destination ID is not known by this graph, it might still be a valid ID for a vertex that is owned by a remote graph instance. If the client has provided a Relayer when configuring the graph, SendMessage will delegate message delivery to it.
On the other hand, if no Relayer is defined or the configured RemoteMessageSender returns a ErrDestinationIsLocal error, SendMessage will first check whether an UnknownVertexHandler has been provided at configuration time and invoke it. Otherwise, an ErrInvalidMessageDestination is returned to the caller.
type GraphConfig ¶
type GraphConfig struct { // QueueFactory is used by the graph to create message queue instances // for each vertex that is added to the graph. If not specified, the // default in-memory queue will be used instead. QueueFactory message.QueueFactory // ComputeFn is the compute function that will be invoked for each graph // vertex when executing a superstep. A valid ComputeFunc instance is // required for the config to be valid. ComputeFn ComputeFunc // ComputeWorkers specifies the number of workers to use for invoking // the registered ComputeFunc when executing each superstep. If not // specified, a single worker will be used. ComputeWorkers int }
GraphConfig encapsulates the configuration options for creating graphs.
type Relayer ¶
type Relayer interface { // Relay a message to a vertex that is not known locally. Calls to // Relay must return ErrDestinationIsLocal if the provided dst value is // not a valid remote destination. Relay(dst string, msg message.Message) error }
Relayer is implemented by types that can relay messages to vertices that are managed by a remote graph instance.
type RelayerFunc ¶
The RelayerFunc type is an adapter to allow the use of ordinary functions as Relayers. If f is a function with the appropriate signature, RelayerFunc(f) is a Relayer that calls f.
type Vertex ¶
type Vertex struct {
// contains filtered or unexported fields
}
Vertex represents a vertex in the Graph.
func (*Vertex) Freeze ¶
func (v *Vertex) Freeze()
Freeze marks the vertex as inactive. Inactive vertices will not be processed in the following supersteps unless they receive a message in which case they will be re-activated.