node

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 27, 2024 License: Apache-2.0 Imports: 33 Imported by: 1

Documentation

Index

Constants

View Source
const (
	DefaultTopic                   = "blockless/b7s/general"
	DefaultHealthInterval          = 1 * time.Minute
	DefaultRollCallTimeout         = 5 * time.Second
	DefaultExecutionTimeout        = 20 * time.Second
	DefaultClusterFormationTimeout = 10 * time.Second
	DefaultConcurrency             = 10

	DefaultConsensusAlgorithm = consensus.Raft

	DefaultAttributeLoadingSetting = false
)

Variables

View Source
var DefaultConfig = Config{
	Role:                    blockless.WorkerNode,
	Topics:                  []string{DefaultTopic},
	HealthInterval:          DefaultHealthInterval,
	RollCallTimeout:         DefaultRollCallTimeout,
	Concurrency:             DefaultConcurrency,
	ExecutionTimeout:        DefaultExecutionTimeout,
	ClusterFormationTimeout: DefaultClusterFormationTimeout,
	DefaultConsensus:        DefaultConsensusAlgorithm,
	LoadAttributes:          DefaultAttributeLoadingSetting,
}

DefaultConfig represents the default settings for the node.

View Source
var (
	ErrUnsupportedMessage = errors.New("unsupported message")
)

Functions

This section is empty.

Types

type ChanData

type ChanData struct {
	Res        codes.Code        `json:"res,omitempty"`
	FunctionId string            `json:"functionId,omitempty"`
	RequestId  string            `json:"requestId,omitempty"`
	Topic      string            `json:"topic,omitempty"`
	Data       execute.ResultMap `json:"data,omitempty"`
}

type Config

type Config struct {
	Role                    blockless.NodeRole // Node role.
	Topics                  []string           // Topics to subscribe to.
	Execute                 blockless.Executor // Executor to use for running functions.
	HealthInterval          time.Duration      // How often should we emit the health ping.
	RollCallTimeout         time.Duration      // How long do we wait for roll call responses.
	Concurrency             uint               // How many requests should the node process in parallel.
	ExecutionTimeout        time.Duration      // How long does the head node wait for worker nodes to send their execution results.
	ClusterFormationTimeout time.Duration      // How long do we wait for the nodes to form a cluster for an execution.
	Workspace               string             // Directory where we can store files needed for execution.
	DefaultConsensus        consensus.Type     // Default consensus algorithm to use.
	LoadAttributes          bool               // Node should try to load its attributes from IPFS.
}

Config represents the Node configuration.

type FStore

type FStore interface {
	// Install will install a function based on the address and CID.
	Install(address string, cid string) error

	// Installed returns info if the function is installed or not.
	Installed(cid string) (bool, error)

	// InstalledFunction returns the list of CIDs of installed functions.
	InstalledFunctions() ([]string, error)

	// Sync will recheck if function installation is found in local storage, and redownload it if it isn't.
	Sync(cid string) error
}

FStore provides retrieval of function manifest.

type HandlerFunc

type HandlerFunc func(context.Context, peer.ID, []byte) error

type Node

type Node struct {
	// contains filtered or unexported fields
}

Node is the entity that actually provides the main Blockless node functionality. It listens for messages coming from the wire and processes them. Depending on the node role, which is determined on construction, it may process messages in different ways. For example, upon receiving a message requesting execution of a Blockless function, a Worker Node will use the `Execute` component to fulfill the execution request. On the other hand, a Head Node will issue a roll call and eventually delegate the execution to the chosend Worker Node.

func New

func New(log zerolog.Logger, host *host.Host, peerStore PeerStore, fstore FStore, options ...Option) (*Node, error)

New creates a new Node.

func (*Node) CommunicatorAppLayer

func (n *Node) CommunicatorAppLayer() chan []byte

func (*Node) ExecuteFunction

func (n *Node) ExecuteFunction(ctx context.Context, req execute.Request, subgroup string) (codes.Code, string, execute.ResultMap, execute.Cluster, error)

ExecuteFunction can be used to start function execution. At the moment this is used by the API server to start execution on the head node.

func (*Node) ExecutionResult

func (n *Node) ExecutionResult(id string) (execute.Result, bool)

ExecutionResult fetches the execution result from the node cache.

func (*Node) HealthPing

func (n *Node) HealthPing(ctx context.Context)

HealthPing will run a long running loop, publishing health signal until cancelled.

func (*Node) ID

func (n *Node) ID() string

ID returns the ID of this node.

func (*Node) PublishFunctionInstall

func (n *Node) PublishFunctionInstall(ctx context.Context, uri string, cid string, subgroup string) error

PublishFunctionInstall publishes a function install message.

func (*Node) Run

func (n *Node) Run(ctx context.Context) error

Run will start the main loop for the node.

func (*Node) ValidateConfig

func (n *Node) ValidateConfig() error

Validate checks if the given configuration is correct.

type Option

type Option func(*Config)

Option can be used to set Node configuration options.

func WithAttributeLoading

func WithAttributeLoading(b bool) Option

WithAttributeLoading specifies whether node should try to load its attributes data from IPFS.

func WithClusterFormationTimeout

func WithClusterFormationTimeout(d time.Duration) Option

WithClusterFormationTimeout specifies how long does the head node wait for worker nodes to form a consensus cluster.

func WithConcurrency

func WithConcurrency(n uint) Option

WithConcurrency specifies how many requests the node should process in parallel.

func WithDefaultConsensus

func WithDefaultConsensus(c consensus.Type) Option

WithDefaultConsensus specifies the consensus algorithm to use, if not specified in the request.

func WithExecutionTimeout

func WithExecutionTimeout(d time.Duration) Option

WithExecutionTimeout specifies how long does the head node wait for worker nodes to send their execution results.

func WithExecutor

func WithExecutor(execute blockless.Executor) Option

WithExecutor specifies the executor to be used for running Blockless functions

func WithHealthInterval

func WithHealthInterval(d time.Duration) Option

WithHealthInterval specifies how often we should emit the health signal.

func WithRole

func WithRole(role blockless.NodeRole) Option

WithRole specifies the role for the node.

func WithRollCallTimeout

func WithRollCallTimeout(d time.Duration) Option

WithRollCallTimeout specifies how long do we wait for roll call responses.

func WithTopics

func WithTopics(topics []string) Option

WithTopics specifies the p2p topics to which node should subscribe.

func WithWorkspace

func WithWorkspace(path string) Option

WithWorkspace specifies the workspace that the node can use for file storage.

type PeerStore

type PeerStore interface {
	Store(peer.ID, multiaddr.Multiaddr, peer.AddrInfo) error
}

type Store

type Store interface {
	GetRecord(string, interface{}) error
	SetRecord(string, interface{}) error
}

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL