node

package
v1.5.0-dev14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 2, 2024 License: Apache-2.0 Imports: 80 Imported by: 2

Documentation

Index

Constants

View Source
const HeartbeatTopicFormat = "bacalhau.global.compute.%s.out.heartbeat"

Variables

View Source
var DefaultRequesterConfig = RequesterConfigParams{
	HousekeepingBackgroundTaskInterval: 30 * time.Second,
	HousekeepingTimeoutBuffer:          2 * time.Minute,
	NodeRankRandomnessRange:            5,
	OverAskForBidsFactor:               3,

	MinBacalhauVersion: models.BuildVersionInfo{
		Major: "1", Minor: "0", GitVersion: "v1.0.4",
	},

	EvalBrokerVisibilityTimeout:    60 * time.Second,
	EvalBrokerInitialRetryDelay:    1 * time.Second,
	EvalBrokerSubsequentRetryDelay: 30 * time.Second,
	EvalBrokerMaxRetryCount:        10,

	WorkerCount:                  runtime.NumCPU(),
	WorkerEvalDequeueTimeout:     5 * time.Second,
	WorkerEvalDequeueBaseBackoff: 1 * time.Second,
	WorkerEvalDequeueMaxBackoff:  30 * time.Second,

	NodeOverSubscriptionFactor: 1.5,

	S3PreSignedURLDisabled:   false,
	S3PreSignedURLExpiration: 30 * time.Minute,

	TranslationEnabled: false,

	ControlPlaneSettings: legacy_types.RequesterControlPlaneConfig{
		HeartbeatCheckFrequency: legacy_types.Duration(30 * time.Second),
		NodeDisconnectedAfter:   legacy_types.Duration(30 * time.Second),
	},

	NodeInfoStoreTTL:     10 * time.Minute,
	DefaultApprovalState: models.NodeMembership.APPROVED,
}
View Source
var TestRequesterConfig = RequesterConfigParams{
	HousekeepingBackgroundTaskInterval: 30 * time.Second,
	HousekeepingTimeoutBuffer:          100 * time.Millisecond,
	NodeRankRandomnessRange:            5,
	OverAskForBidsFactor:               3,

	MinBacalhauVersion: models.BuildVersionInfo{
		Major: "1", Minor: "0", GitVersion: "v1.0.4",
	},

	EvalBrokerVisibilityTimeout:    5 * time.Second,
	EvalBrokerInitialRetryDelay:    100 * time.Millisecond,
	EvalBrokerSubsequentRetryDelay: 100 * time.Millisecond,
	EvalBrokerMaxRetryCount:        3,

	WorkerCount:                  3,
	WorkerEvalDequeueTimeout:     200 * time.Millisecond,
	WorkerEvalDequeueBaseBackoff: 20 * time.Millisecond,
	WorkerEvalDequeueMaxBackoff:  200 * time.Millisecond,

	NodeOverSubscriptionFactor: 1.5,

	TranslationEnabled: false,

	S3PreSignedURLDisabled:   false,
	S3PreSignedURLExpiration: 30 * time.Minute,

	ControlPlaneSettings: legacy_types.RequesterControlPlaneConfig{
		HeartbeatCheckFrequency: legacy_types.Duration(30 * time.Second),
		NodeDisconnectedAfter:   legacy_types.Duration(30 * time.Second),
	},

	DefaultApprovalState: models.NodeMembership.APPROVED,
}

Functions

func CreateMessageSerDeRegistry added in v1.5.0

func CreateMessageSerDeRegistry() (*ncl.MessageSerDeRegistry, error)

CreateMessageSerDeRegistry creates a new payload registry.

func NewBidder added in v1.3.1

func NewBidder(
	config ComputeConfig,
	publishers publisher.PublisherProvider,
	storages storage.StorageProvider,
	executors executor.ExecutorProvider,
	runningCapacityTracker capacity.Tracker,
	nodeID string,
	executionStore store.ExecutionStore,
	computeCallback compute.Callback,
	bufferRunner *compute.ExecutorBuffer,
	apiServer *publicapi.Server,
	calculator capacity.UsageCalculator,
) compute.Bidder

Types

type AuthenticatorsFactory added in v1.2.1

type AuthenticatorsFactory = Factory[authn.Authenticator]

func NewStandardAuthenticatorsFactory added in v1.2.1

func NewStandardAuthenticatorsFactory(userKey *baccrypto.UserKey) AuthenticatorsFactory

type AuthenticatorsFactoryFunc added in v1.2.1

type AuthenticatorsFactoryFunc = FactoryFunc[authn.Authenticator]

type Compute

type Compute struct {
	// Visible for testing
	ID               string
	LocalEndpoint    compute.Endpoint
	Capacity         capacity.Tracker
	ExecutionStore   store.ExecutionStore
	Executors        executor.ExecutorProvider
	Storages         storage.StorageProvider
	Publishers       publisher.PublisherProvider
	Bidder           compute.Bidder
	ManagementClient *compute.ManagementClient
	// contains filtered or unexported fields
}

func NewComputeNode

func NewComputeNode(
	ctx context.Context,
	nodeID string,
	apiServer *publicapi.Server,
	cfg types.Bacalhau,
	config ComputeConfig,
	storages storage.StorageProvider,
	executors executor.ExecutorProvider,
	publishers publisher.PublisherProvider,
	natsConn *nats.Conn,
	computeCallback compute.Callback,
	managementProxy compute.ManagementEndpoint,
	configuredLabels map[string]string,
	messageSerDeRegistry *ncl.MessageSerDeRegistry,
) (*Compute, error)

func (*Compute) Cleanup added in v1.2.2

func (c *Compute) Cleanup(ctx context.Context)

type ComputeConfig

type ComputeConfig struct {
	// Capacity config
	TotalResourceLimits          models.Resources
	JobResourceLimits            models.Resources
	DefaultJobResourceLimits     models.Resources
	IgnorePhysicalResourceLimits bool

	// JobNegotiationTimeout default timeout value to hold a bid for a job
	JobNegotiationTimeout time.Duration
	// MinJobExecutionTimeout default value for the minimum execution timeout this compute node supports. Jobs with
	// lower timeout requirements will not be bid on.
	MinJobExecutionTimeout time.Duration
	// MaxJobExecutionTimeout default value for the maximum execution timeout this compute node supports. Jobs with
	// higher timeout requirements will not be bid on.
	MaxJobExecutionTimeout time.Duration
	// DefaultJobExecutionTimeout default value for the execution timeout this compute node will assign to jobs with
	// no timeout requirement defined.
	DefaultJobExecutionTimeout time.Duration

	// Bid strategies config
	JobSelectionPolicy JobSelectionPolicy

	// logging running executions
	LogRunningExecutionsInterval time.Duration

	// How many messages to buffer in the log stream channel
	LogStreamBufferSize int

	FailureInjectionConfig models.FailureInjectionComputeConfig

	BidSemanticStrategy bidstrategy.SemanticBidStrategy

	BidResourceStrategy bidstrategy.ResourceBidStrategy

	ExecutionStore store.ExecutionStore

	LocalPublisher types.LocalPublisher

	ControlPlaneSettings legacy_types.ComputeControlPlaneConfig
}

func NewComputeConfigWith

func NewComputeConfigWith(executionDir string, params ComputeConfigParams) (ComputeConfig, error)

func NewComputeConfigWithDefaults

func NewComputeConfigWithDefaults(executionDir string) (ComputeConfig, error)

func (*ComputeConfig) Validate

func (c *ComputeConfig) Validate() error

type ComputeConfigParams

type ComputeConfigParams struct {
	// Capacity config
	TotalResourceLimits       models.Resources
	JobResourceLimits         models.Resources
	DefaultJobResourceLimits  models.Resources
	PhysicalResourcesProvider capacity.Provider
	// Deprecated: this feature is no longer supported, but still used in several tests and thus preserved
	IgnorePhysicalResourceLimits bool

	// Timeout config
	JobNegotiationTimeout      time.Duration
	MinJobExecutionTimeout     time.Duration
	MaxJobExecutionTimeout     time.Duration
	DefaultJobExecutionTimeout time.Duration

	// Bid strategies config
	JobSelectionPolicy JobSelectionPolicy

	// logging running executions
	LogRunningExecutionsInterval time.Duration

	// How many messages to buffer in the log stream channel
	LogStreamBufferSize int

	FailureInjectionConfig models.FailureInjectionComputeConfig

	BidSemanticStrategy bidstrategy.SemanticBidStrategy
	BidResourceStrategy bidstrategy.ResourceBidStrategy

	ExecutionStore store.ExecutionStore

	LocalPublisher types.LocalPublisher

	ControlPlaneSettings legacy_types.ComputeControlPlaneConfig
}

func NewDefaultComputeParam added in v1.3.2

func NewDefaultComputeParam(storagePath string) ComputeConfigParams

type ConfigLabelsProvider added in v1.2.1

type ConfigLabelsProvider struct {
	// contains filtered or unexported fields
}

func (*ConfigLabelsProvider) GetLabels added in v1.2.1

func (p *ConfigLabelsProvider) GetLabels(context.Context) map[string]string

type ExecutorsFactory

type ExecutorsFactory = Factory[executor.Executor]

func NewPluginExecutorFactory added in v1.0.4

func NewPluginExecutorFactory(pluginPath string) ExecutorsFactory

func NewStandardExecutorsFactory

func NewStandardExecutorsFactory(cfg types.EngineConfig) ExecutorsFactory

type ExecutorsFactoryFunc

type ExecutorsFactoryFunc = FactoryFunc[executor.Executor]

type Factory added in v1.2.1

type Factory[P provider.Providable] interface {
	Get(ctx context.Context, nodeConfig NodeConfig) (provider.Provider[P], error)
}

Interfaces to inject dependencies into the stack

type FactoryFunc added in v1.2.1

type FactoryFunc[P provider.Providable] func(ctx context.Context, nodeConfig NodeConfig) (provider.Provider[P], error)

Functions that implement the factories for easier creation of new implementations

func (FactoryFunc[P]) Get added in v1.2.1

func (f FactoryFunc[P]) Get(ctx context.Context, nodeConfig NodeConfig) (provider.Provider[P], error)

type FeatureConfig added in v0.3.29

type FeatureConfig struct {
	Engines    []string
	Publishers []string
	Storages   []string
}

type JobSelectionPolicy added in v1.0.4

type JobSelectionPolicy struct {
	// this describes if we should run a job based on
	// where the data is located - i.e. if the data is "local"
	// or if the data is "anywhere"
	Locality semantic.JobSelectionDataLocality `json:"locality"`
	// should we reject jobs that don't specify any data
	// the default is "accept"
	RejectStatelessJobs bool `json:"reject_stateless_jobs"`
	// should we accept jobs that specify networking
	// the default is "reject"
	AcceptNetworkedJobs bool `json:"accept_networked_jobs"`
	// external hooks that decide if we should take on the job or not
	// if either of these are given they will override the data locality settings
	ProbeHTTP string `json:"probe_http,omitempty"`
	ProbeExec string `json:"probe_exec,omitempty"`
}

JobSelectionPolicy describe the rules for how a compute node selects an incoming job

func NewDefaultJobSelectionPolicy added in v1.0.4

func NewDefaultJobSelectionPolicy() JobSelectionPolicy

type NetworkConfig added in v1.2.1

type NetworkConfig struct {
	// NATS config for requesters to be reachable by compute nodes
	Host              string
	Port              int
	AdvertisedAddress string
	Orchestrators     []string

	// Storage directory for NATS features that require it
	StoreDir string

	// AuthSecret is a secret string that clients must use to connect. NATS servers
	// must supply this config, while clients can also supply it as the user part
	// of their Orchestrator URL.
	AuthSecret string

	// NATS config for requester nodes to connect with each other
	ClusterName              string
	ClusterPort              int
	ClusterAdvertisedAddress string

	// When using NATS, never set this value unless you are connecting multiple requester
	// nodes together. This should never reference this current running instance (e.g.
	// don't use localhost).
	ClusterPeers []string
}

func (*NetworkConfig) Validate added in v1.2.1

func (c *NetworkConfig) Validate() error

type Node

type Node struct {
	// Visible for testing
	ID             string
	APIServer      *publicapi.Server
	ComputeNode    *Compute
	RequesterNode  *Requester
	CleanupManager *system.CleanupManager
}

func NewNode

func NewNode(
	ctx context.Context,
	bacalhauConfig types.Bacalhau,
	config NodeConfig,
	fsr *repo.FsRepo,
) (*Node, error)

func (*Node) IsComputeNode

func (n *Node) IsComputeNode() bool

IsComputeNode returns true if the node is a compute node

func (*Node) IsRequesterNode

func (n *Node) IsRequesterNode() bool

IsRequesterNode returns true if the node is a requester node

func (*Node) Start

func (n *Node) Start(ctx context.Context) error

type NodeConfig

type NodeConfig struct {
	NodeID                      string
	CleanupManager              *system.CleanupManager
	HostAddress                 string
	APIPort                     uint16
	RequesterAutoCert           string
	RequesterAutoCertCache      string
	RequesterTLSCertificateFile string
	RequesterTLSKeyFile         string
	RequesterSelfSign           bool
	DisabledFeatures            FeatureConfig
	ComputeConfig               ComputeConfig
	RequesterNodeConfig         RequesterConfig
	APIServerConfig             publicapi.Config
	AuthConfig                  types.AuthConfig
	IsRequesterNode             bool
	IsComputeNode               bool
	Labels                      map[string]string
	DependencyInjector          NodeDependencyInjector
	AllowListedLocalPaths       []string
	NetworkConfig               NetworkConfig
}

Node configuration

func (*NodeConfig) Validate added in v1.2.1

func (c *NodeConfig) Validate() error

type NodeDependencyInjector

type NodeDependencyInjector struct {
	StorageProvidersFactory StorageProvidersFactory
	ExecutorsFactory        ExecutorsFactory
	PublishersFactory       PublishersFactory
	AuthenticatorsFactory   AuthenticatorsFactory
}

Lazy node dependency injector that generate instances of different components on demand and based on the configuration provided.

func NewExecutorPluginNodeDependencyInjector added in v1.0.4

func NewExecutorPluginNodeDependencyInjector(
	cfg types.Bacalhau,
	userKey *baccrypto.UserKey,
	pluginPath string,
) NodeDependencyInjector

func NewStandardNodeDependencyInjector

func NewStandardNodeDependencyInjector(cfg types.Bacalhau, userKey *baccrypto.UserKey) NodeDependencyInjector

type PublishersFactory

type PublishersFactory = Factory[publisher.Publisher]

func NewStandardPublishersFactory

func NewStandardPublishersFactory(cfg types.Bacalhau) PublishersFactory

type PublishersFactoryFunc

type PublishersFactoryFunc = FactoryFunc[publisher.Publisher]

type Requester

type Requester struct {
	// Visible for testing
	Endpoint *orchestrator.BaseEndpoint
	JobStore jobstore.Store
	// We need a reference to the node info store until libp2p is removed
	NodeInfoStore  routing.NodeInfoStore
	NodeDiscoverer orchestrator.NodeDiscoverer
	// contains filtered or unexported fields
}

func NewRequesterNode

func NewRequesterNode(
	ctx context.Context,
	nodeID string,
	apiServer *publicapi.Server,
	nodeConfig NodeConfig,
	metricsConfig legacy_types.MetricsConfig,
	requesterConfig RequesterConfig,
	transportLayer *nats_transport.NATSTransport,
	computeProxy compute.Endpoint,
	messageSerDeRegistry *ncl.MessageSerDeRegistry,
	fsr *repo.FsRepo,
) (*Requester, error)

type RequesterConfig

type RequesterConfig struct {
	RequesterConfigParams
}

func NewRequesterConfigWith

func NewRequesterConfigWith(params RequesterConfigParams) (RequesterConfig, error)

func NewRequesterConfigWithDefaults

func NewRequesterConfigWithDefaults() (RequesterConfig, error)

type RequesterConfigParams

type RequesterConfigParams struct {
	JobDefaults types.JobDefaults

	HousekeepingBackgroundTaskInterval time.Duration
	HousekeepingTimeoutBuffer          time.Duration
	NodeRankRandomnessRange            int
	OverAskForBidsFactor               uint
	JobSelectionPolicy                 JobSelectionPolicy
	ExternalValidatorWebhook           *url.URL
	FailureInjectionConfig             models.FailureInjectionRequesterConfig

	// minimum version of compute nodes that the requester will accept and route jobs to
	MinBacalhauVersion models.BuildVersionInfo

	RetryStrategy orchestrator.RetryStrategy

	// evaluation broker config
	EvalBrokerVisibilityTimeout    time.Duration
	EvalBrokerInitialRetryDelay    time.Duration
	EvalBrokerSubsequentRetryDelay time.Duration
	EvalBrokerMaxRetryCount        int

	// worker config
	WorkerCount                  int
	WorkerEvalDequeueTimeout     time.Duration
	WorkerEvalDequeueBaseBackoff time.Duration
	WorkerEvalDequeueMaxBackoff  time.Duration

	// scheduler config
	SchedulerQueueBackoff      time.Duration
	NodeOverSubscriptionFactor float64

	// Should the orchestrator attempt to translate jobs?
	TranslationEnabled bool

	S3PreSignedURLDisabled   bool
	S3PreSignedURLExpiration time.Duration

	JobStore jobstore.Store
	// TODO(review): we don't use this field for anything, maybe we can delete it
	NodeInfoStoreTTL time.Duration

	// When new nodes join the cluster, what state do they have? By default, APPROVED, and
	// for tests, APPROVED. We will provide an option to set this to PENDING for production
	// or for when operators are ready to control node approval.
	DefaultApprovalState models.NodeMembershipState

	ControlPlaneSettings legacy_types.RequesterControlPlaneConfig
}

type RuntimeLabelsProvider added in v1.2.1

type RuntimeLabelsProvider struct{}

func (*RuntimeLabelsProvider) GetLabels added in v1.2.1

GetLabels implements models.LabelsProvider.

type StorageProvidersFactory

type StorageProvidersFactory = Factory[storage.Storage]

func NewStandardStorageProvidersFactory

func NewStandardStorageProvidersFactory(cfg types.Bacalhau) StorageProvidersFactory

Standard implementations used in prod and when testing prod behavior

type StorageProvidersFactoryFunc

type StorageProvidersFactoryFunc = FactoryFunc[storage.Storage]

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL