Documentation ¶
Index ¶
- Constants
- Variables
- type Config
- type FStore
- type Node
- func (n *Node) ExecuteFunction(ctx context.Context, req execute.Request, subgroup string) (codes.Code, string, execute.ResultMap, execute.Cluster, error)
- func (n *Node) ExecutionResult(id string) (execute.ResultMap, bool)
- func (n *Node) HealthPing(ctx context.Context)
- func (n *Node) ID() string
- func (n *Node) PublishFunctionInstall(ctx context.Context, uri string, cid string, subgroup string) error
- func (n *Node) Run(ctx context.Context) error
- func (n *Node) ValidateConfig() error
- type Option
- func WithAttributeLoading(b bool) Option
- func WithClusterFormationTimeout(d time.Duration) Option
- func WithConcurrency(n uint) Option
- func WithDefaultConsensus(c consensus.Type) Option
- func WithExecutionTimeout(d time.Duration) Option
- func WithExecutor(execute blockless.Executor) Option
- func WithHealthInterval(d time.Duration) Option
- func WithMetadataProvider(p metadata.Provider) Option
- func WithRole(role blockless.NodeRole) Option
- func WithRollCallTimeout(d time.Duration) Option
- func WithTopics(topics []string) Option
- func WithWorkspace(path string) Option
Constants ¶
const ( DefaultTopic = "blockless/b7s/general" DefaultHealthInterval = 1 * time.Minute DefaultRollCallTimeout = 5 * time.Second DefaultExecutionTimeout = 20 * time.Second DefaultClusterFormationTimeout = 10 * time.Second DefaultConcurrency = 10 ClusterAddressTTL = 30 * time.Minute DefaultConsensusAlgorithm = consensus.Raft DefaultAttributeLoadingSetting = false )
Variables ¶
var Counters = []prometheus.CounterDefinition{
{
Name: rollCallsPublishedMetric,
Help: "Number of roll calls this node issued.",
},
{
Name: rollCallsSeenMetric,
Help: "Number of roll calls seen by the node.",
},
{
Name: rollCallsAppliedMetric,
Help: "Number of roll calls this node applied to.",
},
{
Name: messagesProcessedMetric,
Help: "Number of messages this node processed.",
},
{
Name: messagesProcessedOkMetric,
Help: "Number of messages successfully processed by the node.",
},
{
Name: messagesProcessedErrMetric,
Help: "Number of messages processed with an error.",
},
{
Name: functionExecutionsMetric,
Help: "Number of function executions.",
},
{
Name: subscriptionsMetric,
Help: "Number of topics this node subscribes to.",
},
{
Name: directMessagesMetric,
Help: "Number of direct messages this node received.",
},
{
Name: topicMessagesMetric,
Help: "Number of topic messages this node received.",
},
{
Name: messagesSentMetric,
Help: "Number of messages sent.",
},
{
Name: messagesPublishedMetric,
Help: "Number of messages published.",
},
}
var DefaultConfig = Config{ Role: blockless.WorkerNode, Topics: []string{DefaultTopic}, HealthInterval: DefaultHealthInterval, RollCallTimeout: DefaultRollCallTimeout, Concurrency: DefaultConcurrency, ExecutionTimeout: DefaultExecutionTimeout, ClusterFormationTimeout: DefaultClusterFormationTimeout, DefaultConsensus: DefaultConsensusAlgorithm, LoadAttributes: DefaultAttributeLoadingSetting, MetadataProvider: metadata.NewNoopProvider(), }
DefaultConfig represents the default settings for the node.
var (
ErrUnsupportedMessage = errors.New("unsupported message")
)
var Gauges = []prometheus.GaugeDefinition{
{
Name: nodeInfoMetric,
Help: "Information about the b7s node.",
},
}
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { Role blockless.NodeRole // Node role. Topics []string // Topics to subscribe to. Execute blockless.Executor // Executor to use for running functions. HealthInterval time.Duration // How often should we emit the health ping. RollCallTimeout time.Duration // How long do we wait for roll call responses. Concurrency uint // How many requests should the node process in parallel. ExecutionTimeout time.Duration // How long does the head node wait for worker nodes to send their execution results. ClusterFormationTimeout time.Duration // How long do we wait for the nodes to form a cluster for an execution. Workspace string // Directory where we can store files needed for execution. DefaultConsensus consensus.Type // Default consensus algorithm to use. LoadAttributes bool // Node should try to load its attributes from IPFS. MetadataProvider metadata.Provider // Metadata provider for the node }
Config represents the Node configuration.
type FStore ¶
type FStore interface { // Install will install a function based on the address and CID. Install(ctx context.Context, address string, cid string) error // IsInstalled returns info if the function is installed or not. IsInstalled(cid string) (bool, error) // TODO: Refactor the sync code - move the logic outside of the package // Sync will ensure function installations are correct, redownloading functions if needed. Sync(ctx context.Context, haltOnError bool) error }
FStore provides retrieval of function manifest.
type Node ¶
type Node struct {
// contains filtered or unexported fields
}
Node is the entity that actually provides the main Blockless node functionality. It listens for messages coming from the wire and processes them. Depending on the node role, which is determined on construction, it may process messages in different ways. For example, upon receiving a message requesting execution of a Blockless function, a Worker Node will use the `Execute` component to fulfill the execution request. On the other hand, a Head Node will issue a roll call and eventually delegate the execution to the chosend Worker Node.
func New ¶
func New(log zerolog.Logger, host *host.Host, store blockless.PeerStore, fstore FStore, options ...Option) (*Node, error)
New creates a new Node.
func (*Node) ExecuteFunction ¶
func (n *Node) ExecuteFunction(ctx context.Context, req execute.Request, subgroup string) (codes.Code, string, execute.ResultMap, execute.Cluster, error)
ExecuteFunction can be used to start function execution. At the moment this is used by the API server to start execution on the head node.
func (*Node) ExecutionResult ¶
ExecutionResult fetches the execution result from the node cache.
func (*Node) HealthPing ¶
HealthPing will run a long running loop, publishing health signal until cancelled.
func (*Node) PublishFunctionInstall ¶
func (n *Node) PublishFunctionInstall(ctx context.Context, uri string, cid string, subgroup string) error
PublishFunctionInstall publishes a function install message.
func (*Node) ValidateConfig ¶
Validate checks if the given configuration is correct.
type Option ¶
type Option func(*Config)
Option can be used to set Node configuration options.
func WithAttributeLoading ¶ added in v0.4.0
WithAttributeLoading specifies whether node should try to load its attributes data from IPFS.
func WithClusterFormationTimeout ¶
WithClusterFormationTimeout specifies how long does the head node wait for worker nodes to form a consensus cluster.
func WithConcurrency ¶
WithConcurrency specifies how many requests the node should process in parallel.
func WithDefaultConsensus ¶ added in v0.3.0
WithDefaultConsensus specifies the consensus algorithm to use, if not specified in the request.
func WithExecutionTimeout ¶
WithExecutionTimeout specifies how long does the head node wait for worker nodes to send their execution results.
func WithExecutor ¶
WithExecutor specifies the executor to be used for running Blockless functions
func WithHealthInterval ¶
WithHealthInterval specifies how often we should emit the health signal.
func WithMetadataProvider ¶ added in v0.6.4
WithMetadataProvider sets the metadata provider for the node.
func WithRollCallTimeout ¶
WithRollCallTimeout specifies how long do we wait for roll call responses.
func WithTopics ¶ added in v0.4.6
WithTopics specifies the p2p topics to which node should subscribe.
func WithWorkspace ¶
WithWorkspace specifies the workspace that the node can use for file storage.
Source Files ¶
- attributes.go
- cluster.go
- config.go
- consensus.go
- execute.go
- execution_results.go
- fstore.go
- handlers.go
- head_execute.go
- health.go
- install.go
- message.go
- node.go
- notifiee.go
- params.go
- pipeline.go
- process.go
- queue.go
- rest.go
- roll_call.go
- run.go
- subgroups.go
- sync.go
- telemetry.go
- telemetry_params.go
- worker_execute.go