node

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 31, 2023 License: Apache-2.0 Imports: 57 Imported by: 2

Documentation

Index

Constants

View Source
const DefaultNodeInfoPublisherInterval = 30 * time.Second
View Source
const JobEventsTopic = "bacalhau-job-events"
View Source
const NodeInfoTopic = "bacalhau-node-info"

Variables

View Source
var DefaultComputeConfig = ComputeConfigParams{
	PhysicalResourcesProvider: system.NewPhysicalCapacityProvider(),
	DefaultJobResourceLimits: model.ResourceUsageData{
		CPU:    0.1,
		Memory: 100 * 1024 * 1024,
	},
	ExecutorBufferBackoffDuration: 50 * time.Millisecond,

	JobNegotiationTimeout:      3 * time.Minute,
	MinJobExecutionTimeout:     500 * time.Millisecond,
	MaxJobExecutionTimeout:     60 * time.Minute,
	DefaultJobExecutionTimeout: 10 * time.Minute,

	LogRunningExecutionsInterval: 10 * time.Second,
}
View Source
var DefaultRequesterConfig = RequesterConfigParams{
	MinJobExecutionTimeout:     0 * time.Second,
	DefaultJobExecutionTimeout: 30 * time.Minute,

	HousekeepingBackgroundTaskInterval: 30 * time.Second,
	NodeRankRandomnessRange:            5,
	OverAskForBidsFactor:               3,

	MinBacalhauVersion: model.BuildVersionInfo{
		Major: "0", Minor: "3", GitVersion: "v0.3.26",
	},
}

Functions

This section is empty.

Types

type Compute

type Compute struct {
	// Visible for testing
	ID             string
	LocalEndpoint  compute.Endpoint
	Capacity       capacity.Tracker
	ExecutionStore store.ExecutionStore
	Executors      executor.ExecutorProvider
	LogServer      *logstream.LogStreamServer
	Bidder         compute.Bidder
	// contains filtered or unexported fields
}

func NewComputeNode

func NewComputeNode(
	ctx context.Context,
	cleanupManager *system.CleanupManager,
	host host.Host,
	apiServer *publicapi.APIServer,
	config ComputeConfig,
	simulatorNodeID string,
	simulatorRequestHandler *simulator.RequestHandler,
	storages storage.StorageProvider,
	executors executor.ExecutorProvider,
	verifiers verifier.VerifierProvider,
	publishers publisher.PublisherProvider) (*Compute, error)

func (*Compute) RegisterLocalComputeCallback

func (c *Compute) RegisterLocalComputeCallback(callback compute.Callback)

type ComputeConfig

type ComputeConfig struct {
	// Capacity config
	TotalResourceLimits          model.ResourceUsageData
	QueueResourceLimits          model.ResourceUsageData
	JobResourceLimits            model.ResourceUsageData
	DefaultJobResourceLimits     model.ResourceUsageData
	IgnorePhysicalResourceLimits bool

	// How long the buffer would backoff before polling the queue again for new jobs
	ExecutorBufferBackoffDuration time.Duration

	// JobNegotiationTimeout default timeout value to hold a bid for a job
	JobNegotiationTimeout time.Duration
	// MinJobExecutionTimeout default value for the minimum execution timeout this compute node supports. Jobs with
	// lower timeout requirements will not be bid on.
	MinJobExecutionTimeout time.Duration
	// MaxJobExecutionTimeout default value for the maximum execution timeout this compute node supports. Jobs with
	// higher timeout requirements will not be bid on.
	MaxJobExecutionTimeout time.Duration
	// DefaultJobExecutionTimeout default value for the execution timeout this compute node will assign to jobs with
	// no timeout requirement defined.
	DefaultJobExecutionTimeout time.Duration

	// JobExecutionTimeoutClientIDBypassList is the list of clients that are allowed to bypass the job execution timeout
	// check.
	JobExecutionTimeoutClientIDBypassList []string

	// Bid strategies config
	JobSelectionPolicy model.JobSelectionPolicy

	// logging running executions
	LogRunningExecutionsInterval time.Duration

	SimulatorConfig model.SimulatorConfigCompute

	BidSemanticStrategy bidstrategy.SemanticBidStrategy

	BidResourceStrategy bidstrategy.ResourceBidStrategy

	ExecutionStore store.ExecutionStore
}

func NewComputeConfigWith

func NewComputeConfigWith(params ComputeConfigParams) (config ComputeConfig)

func NewComputeConfigWithDefaults

func NewComputeConfigWithDefaults() ComputeConfig

type ComputeConfigParams

type ComputeConfigParams struct {
	// Capacity config
	TotalResourceLimits          model.ResourceUsageData
	QueueResourceLimits          model.ResourceUsageData
	JobResourceLimits            model.ResourceUsageData
	DefaultJobResourceLimits     model.ResourceUsageData
	PhysicalResourcesProvider    capacity.Provider
	IgnorePhysicalResourceLimits bool

	ExecutorBufferBackoffDuration time.Duration

	// Timeout config
	JobNegotiationTimeout      time.Duration
	MinJobExecutionTimeout     time.Duration
	MaxJobExecutionTimeout     time.Duration
	DefaultJobExecutionTimeout time.Duration

	JobExecutionTimeoutClientIDBypassList []string

	// Bid strategies config
	JobSelectionPolicy model.JobSelectionPolicy

	// logging running executions
	LogRunningExecutionsInterval time.Duration

	SimulatorConfig model.SimulatorConfigCompute

	BidSemanticStrategy bidstrategy.SemanticBidStrategy

	BidResourceStrategy bidstrategy.ResourceBidStrategy
}

type ExecutorsFactory

type ExecutorsFactory interface {
	Get(ctx context.Context, nodeConfig NodeConfig, storages storage.StorageProvider) (executor.ExecutorProvider, error)
}

func NewStandardExecutorsFactory

func NewStandardExecutorsFactory() ExecutorsFactory

type ExecutorsFactoryFunc

type ExecutorsFactoryFunc func(
	ctx context.Context,
	nodeConfig NodeConfig,
	storages storage.StorageProvider,
) (executor.ExecutorProvider, error)

func (ExecutorsFactoryFunc) Get

type FeatureConfig added in v0.3.29

type FeatureConfig struct {
	Engines    []model.Engine
	Verifiers  []model.Verifier
	Publishers []model.Publisher
	Storages   []model.StorageSourceType
}

type Node

type Node struct {
	// Visible for testing
	APIServer      *publicapi.APIServer
	ComputeNode    *Compute
	RequesterNode  *Requester
	NodeInfoStore  routing.NodeInfoStore
	CleanupManager *system.CleanupManager
	IPFSClient     ipfs.Client
	Host           host.Host
}

func NewNode

func NewNode(
	ctx context.Context,
	config NodeConfig) (*Node, error)

func (*Node) IsComputeNode

func (n *Node) IsComputeNode() bool

IsComputeNode returns true if the node is a compute node

func (*Node) IsRequesterNode

func (n *Node) IsRequesterNode() bool

IsRequesterNode returns true if the node is a requester node

func (*Node) Start

func (n *Node) Start(ctx context.Context) error

type NodeConfig

type NodeConfig struct {
	IPFSClient                ipfs.Client
	CleanupManager            *system.CleanupManager
	JobStore                  jobstore.Store
	Host                      host.Host
	FilecoinUnsealedPath      string
	EstuaryAPIKey             string
	HostAddress               string
	APIPort                   uint16
	DisabledFeatures          FeatureConfig
	ComputeConfig             ComputeConfig
	RequesterNodeConfig       RequesterConfig
	APIServerConfig           publicapi.APIServerConfig
	LotusConfig               *filecoinlotus.PublisherConfig
	SimulatorNodeID           string
	IsRequesterNode           bool
	IsComputeNode             bool
	Labels                    map[string]string
	NodeInfoPublisherInterval time.Duration
	DependencyInjector        NodeDependencyInjector
	AllowListedLocalPaths     []string
}

Node configuration

type NodeDependencyInjector

type NodeDependencyInjector struct {
	StorageProvidersFactory StorageProvidersFactory
	ExecutorsFactory        ExecutorsFactory
	VerifiersFactory        VerifiersFactory
	PublishersFactory       PublishersFactory
}

Lazy node dependency injector that generate instances of different components on demand and based on the configuration provided.

func NewStandardNodeDependencyInjector

func NewStandardNodeDependencyInjector() NodeDependencyInjector

type PublishersFactory

type PublishersFactory interface {
	Get(ctx context.Context, nodeConfig NodeConfig) (publisher.PublisherProvider, error)
}

func NewStandardPublishersFactory

func NewStandardPublishersFactory() PublishersFactory

type PublishersFactoryFunc

type PublishersFactoryFunc func(ctx context.Context, nodeConfig NodeConfig) (publisher.PublisherProvider, error)

func (PublishersFactoryFunc) Get

type Requester

type Requester struct {
	// Visible for testing
	Endpoint       requester.Endpoint
	JobStore       jobstore.Store
	NodeDiscoverer requester.NodeDiscoverer
	// contains filtered or unexported fields
}

func NewRequesterNode

func NewRequesterNode(
	ctx context.Context,
	cleanupManager *system.CleanupManager,
	host host.Host,
	apiServer *publicapi.APIServer,
	config RequesterConfig,
	jobStore jobstore.Store,
	simulatorNodeID string,
	simulatorRequestHandler *simulator.RequestHandler,
	verifiers verifier.VerifierProvider,
	storageProviders storage.StorageProvider,
	gossipSub *libp2p_pubsub.PubSub,
	nodeInfoStore routing.NodeInfoStore,
) (*Requester, error)

func (*Requester) RegisterLocalComputeEndpoint

func (r *Requester) RegisterLocalComputeEndpoint(endpoint compute.Endpoint)

type RequesterConfig

type RequesterConfig struct {
	// MinJobExecutionTimeout requester will replace any job execution timeout that is less than this
	// value with DefaultJobExecutionTimeout.
	MinJobExecutionTimeout time.Duration
	// DefaultJobExecutionTimeout default value for running, verifying and publishing job results,
	// if the user didn't define one in the spec
	DefaultJobExecutionTimeout time.Duration

	// HousekeepingBackgroundTaskInterval background task interval that periodically checks for expired states
	HousekeepingBackgroundTaskInterval time.Duration
	// NodeRankRandomnessRange defines the range of randomness used to rank nodes
	NodeRankRandomnessRange  int
	OverAskForBidsFactor     int
	JobSelectionPolicy       model.JobSelectionPolicy
	ExternalValidatorWebhook *url.URL
	SimulatorConfig          model.SimulatorConfigRequester

	// minimum version of compute nodes that the requester will accept and route jobs to
	MinBacalhauVersion model.BuildVersionInfo

	RetryStrategy requester.RetryStrategy
}

func NewRequesterConfigWith

func NewRequesterConfigWith(params RequesterConfigParams) (config RequesterConfig)

func NewRequesterConfigWithDefaults

func NewRequesterConfigWithDefaults() RequesterConfig

type RequesterConfigParams

type RequesterConfigParams struct {
	// Timeout config
	MinJobExecutionTimeout     time.Duration
	DefaultJobExecutionTimeout time.Duration

	HousekeepingBackgroundTaskInterval time.Duration
	NodeRankRandomnessRange            int
	OverAskForBidsFactor               int
	JobSelectionPolicy                 model.JobSelectionPolicy
	ExternalValidatorWebhook           *url.URL
	SimulatorConfig                    model.SimulatorConfigRequester

	// minimum version of compute nodes that the requester will accept and route jobs to
	MinBacalhauVersion model.BuildVersionInfo

	RetryStrategy requester.RetryStrategy
}

type StorageProvidersFactory

type StorageProvidersFactory interface {
	Get(ctx context.Context, nodeConfig NodeConfig) (storage.StorageProvider, error)
}

Interfaces to inject dependencies into the stack

func NewStandardStorageProvidersFactory

func NewStandardStorageProvidersFactory() StorageProvidersFactory

Standard implementations used in prod and when testing prod behavior

type StorageProvidersFactoryFunc

type StorageProvidersFactoryFunc func(ctx context.Context, nodeConfig NodeConfig) (storage.StorageProvider, error)

Functions that implement the factories for easier creation of new implementations

func (StorageProvidersFactoryFunc) Get

type VerifiersFactory

type VerifiersFactory interface {
	Get(ctx context.Context, nodeConfig NodeConfig, publishers publisher.PublisherProvider) (verifier.VerifierProvider, error)
}

func NewStandardVerifiersFactory

func NewStandardVerifiersFactory() VerifiersFactory

type VerifiersFactoryFunc

type VerifiersFactoryFunc func(
	ctx context.Context,
	nodeConfig NodeConfig,
	publishers publisher.PublisherProvider,
) (verifier.VerifierProvider, error)

func (VerifiersFactoryFunc) Get

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL