node

package
v1.1.7-rc1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 7, 2023 License: Apache-2.0 Imports: 65 Imported by: 2

Documentation

Index

Constants

View Source
const JobInfoTopic = "bacalhau-job-info"
View Source
const NodeInfoTopic = "bacalhau-node-info"

Variables

View Source
var DefaultComputeConfig = ComputeConfigParams{
	PhysicalResourcesProvider: compute_system.NewPhysicalCapacityProvider(),
	DefaultJobResourceLimits: models.Resources{
		CPU:    0.1,
		Memory: 100 * 1024 * 1024,
	},

	JobNegotiationTimeout:      3 * time.Minute,
	MinJobExecutionTimeout:     500 * time.Millisecond,
	MaxJobExecutionTimeout:     model.NoJobTimeout,
	DefaultJobExecutionTimeout: model.NoJobTimeout,

	LogRunningExecutionsInterval: 10 * time.Second,
	JobSelectionPolicy:           NewDefaultJobSelectionPolicy(),
}
View Source
var DefaultNodeInfoPublishConfig = routing.NodeInfoPublisherIntervalConfig{
	Interval:             30 * time.Second,
	EagerPublishInterval: 5 * time.Second,
	EagerPublishDuration: 30 * time.Second,
}
View Source
var DefaultRequesterConfig = RequesterConfigParams{
	JobDefaults: transformer.JobDefaults{
		ExecutionTimeout: model.NoJobTimeout,
	},

	HousekeepingBackgroundTaskInterval: 30 * time.Second,
	NodeRankRandomnessRange:            5,
	OverAskForBidsFactor:               3,

	MinBacalhauVersion: models.BuildVersionInfo{
		Major: "1", Minor: "0", GitVersion: "v1.0.4",
	},

	EvalBrokerVisibilityTimeout:    60 * time.Second,
	EvalBrokerInitialRetryDelay:    1 * time.Second,
	EvalBrokerSubsequentRetryDelay: 30 * time.Second,
	EvalBrokerMaxRetryCount:        10,

	WorkerCount:                  runtime.NumCPU(),
	WorkerEvalDequeueTimeout:     5 * time.Second,
	WorkerEvalDequeueBaseBackoff: 1 * time.Second,
	WorkerEvalDequeueMaxBackoff:  30 * time.Second,

	S3PreSignedURLDisabled:   false,
	S3PreSignedURLExpiration: 30 * time.Minute,
}
View Source
var TestNodeInfoPublishConfig = routing.NodeInfoPublisherIntervalConfig{
	Interval:             30 * time.Second,
	EagerPublishInterval: 10 * time.Millisecond,
	EagerPublishDuration: 5 * time.Second,
}

TestNodeInfoPublishConfig speeds up node announcements for tests

View Source
var TestRequesterConfig = RequesterConfigParams{
	JobDefaults: transformer.JobDefaults{
		ExecutionTimeout: 30 * time.Second,
	},
	HousekeepingBackgroundTaskInterval: 30 * time.Second,
	NodeRankRandomnessRange:            5,
	OverAskForBidsFactor:               3,

	MinBacalhauVersion: models.BuildVersionInfo{
		Major: "1", Minor: "0", GitVersion: "v1.0.4",
	},

	EvalBrokerVisibilityTimeout:    5 * time.Second,
	EvalBrokerInitialRetryDelay:    100 * time.Millisecond,
	EvalBrokerSubsequentRetryDelay: 100 * time.Millisecond,
	EvalBrokerMaxRetryCount:        3,

	WorkerCount:                  3,
	WorkerEvalDequeueTimeout:     200 * time.Millisecond,
	WorkerEvalDequeueBaseBackoff: 20 * time.Millisecond,
	WorkerEvalDequeueMaxBackoff:  200 * time.Millisecond,

	S3PreSignedURLDisabled:   false,
	S3PreSignedURLExpiration: 30 * time.Minute,
}

Functions

func GetNodeInfoPublishConfig added in v1.0.4

func GetNodeInfoPublishConfig() routing.NodeInfoPublisherIntervalConfig

Types

type Compute

type Compute struct {
	// Visible for testing
	ID             string
	LocalEndpoint  compute.Endpoint
	Capacity       capacity.Tracker
	ExecutionStore store.ExecutionStore
	Executors      executor.ExecutorProvider
	Storages       storage.StorageProvider
	LogServer      *logstream.LogStreamServer
	Bidder         compute.Bidder
	// contains filtered or unexported fields
}

func NewComputeNode

func NewComputeNode(
	ctx context.Context,
	cleanupManager *system.CleanupManager,
	host host.Host,
	apiServer *publicapi.Server,
	config ComputeConfig,
	storagePath string,
	storages storage.StorageProvider,
	executors executor.ExecutorProvider,
	publishers publisher.PublisherProvider,
	fsRepo *repo.FsRepo,
) (*Compute, error)

func (*Compute) RegisterLocalComputeCallback

func (c *Compute) RegisterLocalComputeCallback(callback compute.Callback)

type ComputeConfig

type ComputeConfig struct {
	// Capacity config
	TotalResourceLimits          models.Resources
	QueueResourceLimits          models.Resources
	JobResourceLimits            models.Resources
	DefaultJobResourceLimits     models.Resources
	IgnorePhysicalResourceLimits bool

	// JobNegotiationTimeout default timeout value to hold a bid for a job
	JobNegotiationTimeout time.Duration
	// MinJobExecutionTimeout default value for the minimum execution timeout this compute node supports. Jobs with
	// lower timeout requirements will not be bid on.
	MinJobExecutionTimeout time.Duration
	// MaxJobExecutionTimeout default value for the maximum execution timeout this compute node supports. Jobs with
	// higher timeout requirements will not be bid on.
	MaxJobExecutionTimeout time.Duration
	// DefaultJobExecutionTimeout default value for the execution timeout this compute node will assign to jobs with
	// no timeout requirement defined.
	DefaultJobExecutionTimeout time.Duration

	// JobExecutionTimeoutClientIDBypassList is the list of clients that are allowed to bypass the job execution timeout
	// check.
	JobExecutionTimeoutClientIDBypassList []string

	// Bid strategies config
	JobSelectionPolicy JobSelectionPolicy

	// logging running executions
	LogRunningExecutionsInterval time.Duration

	FailureInjectionConfig model.FailureInjectionComputeConfig

	BidSemanticStrategy bidstrategy.SemanticBidStrategy

	BidResourceStrategy bidstrategy.ResourceBidStrategy

	ExecutionStore store.ExecutionStore
}

func NewComputeConfigWith

func NewComputeConfigWith(params ComputeConfigParams) (ComputeConfig, error)

func NewComputeConfigWithDefaults

func NewComputeConfigWithDefaults() (ComputeConfig, error)

type ComputeConfigParams

type ComputeConfigParams struct {
	// Capacity config
	TotalResourceLimits          models.Resources
	QueueResourceLimits          models.Resources
	JobResourceLimits            models.Resources
	DefaultJobResourceLimits     models.Resources
	PhysicalResourcesProvider    capacity.Provider
	IgnorePhysicalResourceLimits bool

	// Timeout config
	JobNegotiationTimeout      time.Duration
	MinJobExecutionTimeout     time.Duration
	MaxJobExecutionTimeout     time.Duration
	DefaultJobExecutionTimeout time.Duration

	JobExecutionTimeoutClientIDBypassList []string

	// Bid strategies config
	JobSelectionPolicy JobSelectionPolicy

	// logging running executions
	LogRunningExecutionsInterval time.Duration

	FailureInjectionConfig model.FailureInjectionComputeConfig

	BidSemanticStrategy bidstrategy.SemanticBidStrategy

	BidResourceStrategy bidstrategy.ResourceBidStrategy
}

type ExecutorsFactory

type ExecutorsFactory interface {
	Get(ctx context.Context, nodeConfig NodeConfig) (executor.ExecutorProvider, error)
}

func NewPluginExecutorFactory added in v1.0.4

func NewPluginExecutorFactory() ExecutorsFactory

func NewStandardExecutorsFactory

func NewStandardExecutorsFactory() ExecutorsFactory

type ExecutorsFactoryFunc

type ExecutorsFactoryFunc func(
	ctx context.Context,
	nodeConfig NodeConfig,
) (executor.ExecutorProvider, error)

func (ExecutorsFactoryFunc) Get

type FeatureConfig added in v0.3.29

type FeatureConfig struct {
	Engines    []string
	Publishers []string
	Storages   []string
}

type JobSelectionPolicy added in v1.0.4

type JobSelectionPolicy struct {
	// this describes if we should run a job based on
	// where the data is located - i.e. if the data is "local"
	// or if the data is "anywhere"
	Locality semantic.JobSelectionDataLocality `json:"locality"`
	// should we reject jobs that don't specify any data
	// the default is "accept"
	RejectStatelessJobs bool `json:"reject_stateless_jobs"`
	// should we accept jobs that specify networking
	// the default is "reject"
	AcceptNetworkedJobs bool `json:"accept_networked_jobs"`
	// external hooks that decide if we should take on the job or not
	// if either of these are given they will override the data locality settings
	ProbeHTTP string `json:"probe_http,omitempty"`
	ProbeExec string `json:"probe_exec,omitempty"`
}

JobSelectionPolicy describe the rules for how a compute node selects an incoming job

func NewDefaultJobSelectionPolicy added in v1.0.4

func NewDefaultJobSelectionPolicy() JobSelectionPolicy

type Node

type Node struct {
	// Visible for testing
	APIServer      *publicapi.Server
	ComputeNode    *Compute
	RequesterNode  *Requester
	NodeInfoStore  routing.NodeInfoStore
	CleanupManager *system.CleanupManager
	IPFSClient     ipfs.Client
	Host           host.Host
}

func NewNode

func NewNode(
	ctx context.Context,
	config NodeConfig) (*Node, error)

func (*Node) IsComputeNode

func (n *Node) IsComputeNode() bool

IsComputeNode returns true if the node is a compute node

func (*Node) IsRequesterNode

func (n *Node) IsRequesterNode() bool

IsRequesterNode returns true if the node is a requester node

func (*Node) Start

func (n *Node) Start(ctx context.Context) error

type NodeConfig

type NodeConfig struct {
	IPFSClient                  ipfs.Client
	CleanupManager              *system.CleanupManager
	Host                        host.Host
	HostAddress                 string
	APIPort                     uint16
	RequesterAutoCert           string
	RequesterAutoCertCache      string
	RequesterTLSCertificateFile string
	RequesterTLSKeyFile         string
	DisabledFeatures            FeatureConfig
	ComputeConfig               ComputeConfig
	RequesterNodeConfig         RequesterConfig
	APIServerConfig             publicapi.Config
	IsRequesterNode             bool
	IsComputeNode               bool
	Labels                      map[string]string
	NodeInfoPublisherInterval   routing.NodeInfoPublisherIntervalConfig
	DependencyInjector          NodeDependencyInjector
	AllowListedLocalPaths       []string
	WebUI                       bool

	FsRepo *repo.FsRepo
}

Node configuration

type NodeDependencyInjector

type NodeDependencyInjector struct {
	StorageProvidersFactory StorageProvidersFactory
	ExecutorsFactory        ExecutorsFactory
	PublishersFactory       PublishersFactory
}

Lazy node dependency injector that generate instances of different components on demand and based on the configuration provided.

func NewExecutorPluginNodeDependencyInjector added in v1.0.4

func NewExecutorPluginNodeDependencyInjector() NodeDependencyInjector

func NewStandardNodeDependencyInjector

func NewStandardNodeDependencyInjector() NodeDependencyInjector

type PublishersFactory

type PublishersFactory interface {
	Get(ctx context.Context, nodeConfig NodeConfig) (publisher.PublisherProvider, error)
}

func NewStandardPublishersFactory

func NewStandardPublishersFactory() PublishersFactory

type PublishersFactoryFunc

type PublishersFactoryFunc func(ctx context.Context, nodeConfig NodeConfig) (publisher.PublisherProvider, error)

func (PublishersFactoryFunc) Get

type Requester

type Requester struct {
	// Visible for testing
	Endpoint       requester.Endpoint
	JobStore       jobstore.Store
	NodeDiscoverer orchestrator.NodeDiscoverer
	// contains filtered or unexported fields
}

func NewRequesterNode

func NewRequesterNode(
	ctx context.Context,
	host host.Host,
	apiServer *publicapi.Server,
	requesterConfig RequesterConfig,
	storageProviders storage.StorageProvider,
	nodeInfoStore routing.NodeInfoStore,
	gossipSub *libp2p_pubsub.PubSub,
	fsRepo *repo.FsRepo,
) (*Requester, error)

func (*Requester) RegisterLocalComputeEndpoint

func (r *Requester) RegisterLocalComputeEndpoint(endpoint compute.Endpoint)

type RequesterConfig

type RequesterConfig struct {
	RequesterConfigParams
}

func NewRequesterConfigWith

func NewRequesterConfigWith(params RequesterConfigParams) (RequesterConfig, error)

func NewRequesterConfigWithDefaults

func NewRequesterConfigWithDefaults() (RequesterConfig, error)

type RequesterConfigParams

type RequesterConfigParams struct {
	JobDefaults transformer.JobDefaults

	HousekeepingBackgroundTaskInterval time.Duration
	NodeRankRandomnessRange            int
	OverAskForBidsFactor               uint
	JobSelectionPolicy                 JobSelectionPolicy
	ExternalValidatorWebhook           *url.URL
	FailureInjectionConfig             model.FailureInjectionRequesterConfig

	// minimum version of compute nodes that the requester will accept and route jobs to
	MinBacalhauVersion models.BuildVersionInfo

	RetryStrategy orchestrator.RetryStrategy

	// evaluation broker config
	EvalBrokerVisibilityTimeout    time.Duration
	EvalBrokerInitialRetryDelay    time.Duration
	EvalBrokerSubsequentRetryDelay time.Duration
	EvalBrokerMaxRetryCount        int

	// worker config
	WorkerCount                  int
	WorkerEvalDequeueTimeout     time.Duration
	WorkerEvalDequeueBaseBackoff time.Duration
	WorkerEvalDequeueMaxBackoff  time.Duration

	S3PreSignedURLDisabled   bool
	S3PreSignedURLExpiration time.Duration
}

type StorageProvidersFactory

type StorageProvidersFactory interface {
	Get(ctx context.Context, nodeConfig NodeConfig) (storage.StorageProvider, error)
}

Interfaces to inject dependencies into the stack

func NewStandardStorageProvidersFactory

func NewStandardStorageProvidersFactory() StorageProvidersFactory

Standard implementations used in prod and when testing prod behavior

type StorageProvidersFactoryFunc

type StorageProvidersFactoryFunc func(ctx context.Context, nodeConfig NodeConfig) (storage.StorageProvider, error)

Functions that implement the factories for easier creation of new implementations

func (StorageProvidersFactoryFunc) Get

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL