node

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 29, 2024 License: Apache-2.0 Imports: 44 Imported by: 5

Documentation

Index

Constants

View Source
const (
	DefaultTopic                   = "blockless/b7s/general"
	DefaultHealthInterval          = 1 * time.Minute
	DefaultRollCallTimeout         = 5 * time.Second
	DefaultExecutionTimeout        = 20 * time.Second
	DefaultClusterFormationTimeout = 10 * time.Second
	DefaultConcurrency             = 10

	ClusterAddressTTL = 30 * time.Minute

	DefaultConsensusAlgorithm = consensus.Raft

	DefaultAttributeLoadingSetting = false
)

Variables

View Source
var Counters = []prometheus.CounterDefinition{
	{
		Name: rollCallsPublishedMetric,
		Help: "Number of roll calls this node issued.",
	},
	{
		Name: rollCallsSeenMetric,
		Help: "Number of roll calls seen by the node.",
	},
	{
		Name: rollCallsAppliedMetric,
		Help: "Number of roll calls this node applied to.",
	},
	{
		Name: messagesProcessedMetric,
		Help: "Number of messages this node processed.",
	},
	{
		Name: messagesProcessedOkMetric,
		Help: "Number of messages successfully processed by the node.",
	},
	{
		Name: messagesProcessedErrMetric,
		Help: "Number of messages processed with an error.",
	},
	{
		Name: functionExecutionsMetric,
		Help: "Number of function executions.",
	},
	{
		Name: subscriptionsMetric,
		Help: "Number of topics this node subscribes to.",
	},
	{
		Name: directMessagesMetric,
		Help: "Number of direct messages this node received.",
	},
	{
		Name: topicMessagesMetric,
		Help: "Number of topic messages this node received.",
	},
	{
		Name: messagesSentMetric,
		Help: "Number of messages sent.",
	},
	{
		Name: messagesPublishedMetric,
		Help: "Number of messages published.",
	},
}
View Source
var DefaultConfig = Config{
	Role:                    blockless.WorkerNode,
	Topics:                  []string{DefaultTopic},
	HealthInterval:          DefaultHealthInterval,
	RollCallTimeout:         DefaultRollCallTimeout,
	Concurrency:             DefaultConcurrency,
	ExecutionTimeout:        DefaultExecutionTimeout,
	ClusterFormationTimeout: DefaultClusterFormationTimeout,
	DefaultConsensus:        DefaultConsensusAlgorithm,
	LoadAttributes:          DefaultAttributeLoadingSetting,
	MetadataProvider:        metadata.NewNoopProvider(),
}

DefaultConfig represents the default settings for the node.

View Source
var (
	ErrUnsupportedMessage = errors.New("unsupported message")
)
View Source
var Gauges = []prometheus.GaugeDefinition{
	{
		Name: nodeInfoMetric,
		Help: "Information about the b7s node.",
	},
}

Functions

This section is empty.

Types

type Config

type Config struct {
	Role                    blockless.NodeRole // Node role.
	Topics                  []string           // Topics to subscribe to.
	Execute                 blockless.Executor // Executor to use for running functions.
	HealthInterval          time.Duration      // How often should we emit the health ping.
	RollCallTimeout         time.Duration      // How long do we wait for roll call responses.
	Concurrency             uint               // How many requests should the node process in parallel.
	ExecutionTimeout        time.Duration      // How long does the head node wait for worker nodes to send their execution results.
	ClusterFormationTimeout time.Duration      // How long do we wait for the nodes to form a cluster for an execution.
	Workspace               string             // Directory where we can store files needed for execution.
	DefaultConsensus        consensus.Type     // Default consensus algorithm to use.
	LoadAttributes          bool               // Node should try to load its attributes from IPFS.
	MetadataProvider        metadata.Provider  // Metadata provider for the node
}

Config represents the Node configuration.

type FStore

type FStore interface {
	// Install will install a function based on the address and CID.
	Install(ctx context.Context, address string, cid string) error

	// IsInstalled returns info if the function is installed or not.
	IsInstalled(cid string) (bool, error)

	// TODO: Refactor the sync code - move the logic outside of the package
	// Sync will ensure function installations are correct, redownloading functions if needed.
	Sync(ctx context.Context, haltOnError bool) error
}

FStore provides retrieval of function manifest.

type Node

type Node struct {
	// contains filtered or unexported fields
}

Node is the entity that actually provides the main Blockless node functionality. It listens for messages coming from the wire and processes them. Depending on the node role, which is determined on construction, it may process messages in different ways. For example, upon receiving a message requesting execution of a Blockless function, a Worker Node will use the `Execute` component to fulfill the execution request. On the other hand, a Head Node will issue a roll call and eventually delegate the execution to the chosend Worker Node.

func New

func New(log zerolog.Logger, host *host.Host, store blockless.PeerStore, fstore FStore, options ...Option) (*Node, error)

New creates a new Node.

func (*Node) ExecuteFunction

func (n *Node) ExecuteFunction(ctx context.Context, req execute.Request, subgroup string) (codes.Code, string, execute.ResultMap, execute.Cluster, error)

ExecuteFunction can be used to start function execution. At the moment this is used by the API server to start execution on the head node.

func (*Node) ExecutionResult

func (n *Node) ExecutionResult(id string) (execute.ResultMap, bool)

ExecutionResult fetches the execution result from the node cache.

func (*Node) HealthPing

func (n *Node) HealthPing(ctx context.Context)

HealthPing will run a long running loop, publishing health signal until cancelled.

func (*Node) ID

func (n *Node) ID() string

ID returns the ID of this node.

func (*Node) PublishFunctionInstall

func (n *Node) PublishFunctionInstall(ctx context.Context, uri string, cid string, subgroup string) error

PublishFunctionInstall publishes a function install message.

func (*Node) Run

func (n *Node) Run(ctx context.Context) error

Run will start the main loop for the node.

func (*Node) ValidateConfig

func (n *Node) ValidateConfig() error

Validate checks if the given configuration is correct.

type Option

type Option func(*Config)

Option can be used to set Node configuration options.

func WithAttributeLoading added in v0.4.0

func WithAttributeLoading(b bool) Option

WithAttributeLoading specifies whether node should try to load its attributes data from IPFS.

func WithClusterFormationTimeout

func WithClusterFormationTimeout(d time.Duration) Option

WithClusterFormationTimeout specifies how long does the head node wait for worker nodes to form a consensus cluster.

func WithConcurrency

func WithConcurrency(n uint) Option

WithConcurrency specifies how many requests the node should process in parallel.

func WithDefaultConsensus added in v0.3.0

func WithDefaultConsensus(c consensus.Type) Option

WithDefaultConsensus specifies the consensus algorithm to use, if not specified in the request.

func WithExecutionTimeout

func WithExecutionTimeout(d time.Duration) Option

WithExecutionTimeout specifies how long does the head node wait for worker nodes to send their execution results.

func WithExecutor

func WithExecutor(execute blockless.Executor) Option

WithExecutor specifies the executor to be used for running Blockless functions

func WithHealthInterval

func WithHealthInterval(d time.Duration) Option

WithHealthInterval specifies how often we should emit the health signal.

func WithMetadataProvider added in v0.6.4

func WithMetadataProvider(p metadata.Provider) Option

WithMetadataProvider sets the metadata provider for the node.

func WithRole

func WithRole(role blockless.NodeRole) Option

WithRole specifies the role for the node.

func WithRollCallTimeout

func WithRollCallTimeout(d time.Duration) Option

WithRollCallTimeout specifies how long do we wait for roll call responses.

func WithTopics added in v0.4.6

func WithTopics(topics []string) Option

WithTopics specifies the p2p topics to which node should subscribe.

func WithWorkspace

func WithWorkspace(path string) Option

WithWorkspace specifies the workspace that the node can use for file storage.

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL