node

package
v1.5.0-dev15 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 3, 2024 License: Apache-2.0 Imports: 76 Imported by: 2

Documentation

Index

Constants

View Source
const HeartbeatTopicFormat = "bacalhau.global.compute.%s.out.heartbeat"

Variables

This section is empty.

Functions

func CreateMessageSerDeRegistry added in v1.5.0

func CreateMessageSerDeRegistry() (*ncl.MessageSerDeRegistry, error)

CreateMessageSerDeRegistry creates a new payload registry.

func NewBidder added in v1.3.1

func NewBidder(
	cfg NodeConfig,
	allocatedResources models.Resources,
	publishers publisher.PublisherProvider,
	storages storage.StorageProvider,
	executors executor.ExecutorProvider,
	executionStore store.ExecutionStore,
	computeCallback compute.Callback,
	bufferRunner *compute.ExecutorBuffer,
	calculator capacity.UsageCalculator,
) compute.Bidder

Types

type AuthenticatorsFactory added in v1.2.1

type AuthenticatorsFactory = Factory[authn.Authenticator]

func NewStandardAuthenticatorsFactory added in v1.2.1

func NewStandardAuthenticatorsFactory(userKey *baccrypto.UserKey) AuthenticatorsFactory

type AuthenticatorsFactoryFunc added in v1.2.1

type AuthenticatorsFactoryFunc = FactoryFunc[authn.Authenticator]

type Compute

type Compute struct {
	// Visible for testing
	ID               string
	LocalEndpoint    compute.Endpoint
	Capacity         capacity.Tracker
	ExecutionStore   store.ExecutionStore
	Executors        executor.ExecutorProvider
	Storages         storage.StorageProvider
	Publishers       publisher.PublisherProvider
	Bidder           compute.Bidder
	ManagementClient *compute.ManagementClient
	// contains filtered or unexported fields
}

func NewComputeNode

func NewComputeNode(
	ctx context.Context,
	cfg NodeConfig,
	apiServer *publicapi.Server,
	natsConn *nats.Conn,
	computeCallback compute.Callback,
	managementProxy compute.ManagementEndpoint,
	messageSerDeRegistry *ncl.MessageSerDeRegistry,
) (*Compute, error)

func (*Compute) Cleanup added in v1.2.2

func (c *Compute) Cleanup(ctx context.Context)

type ConfigLabelsProvider added in v1.2.1

type ConfigLabelsProvider struct {
	// contains filtered or unexported fields
}

func (*ConfigLabelsProvider) GetLabels added in v1.2.1

func (p *ConfigLabelsProvider) GetLabels(context.Context) map[string]string

type ExecutorsFactory

type ExecutorsFactory = Factory[executor.Executor]

func NewPluginExecutorFactory added in v1.0.4

func NewPluginExecutorFactory(pluginPath string) ExecutorsFactory

func NewStandardExecutorsFactory

func NewStandardExecutorsFactory(cfg types.EngineConfig) ExecutorsFactory

type ExecutorsFactoryFunc

type ExecutorsFactoryFunc = FactoryFunc[executor.Executor]

type Factory added in v1.2.1

type Factory[P provider.Providable] interface {
	Get(ctx context.Context, nodeConfig NodeConfig) (provider.Provider[P], error)
}

Interfaces to inject dependencies into the stack

type FactoryFunc added in v1.2.1

type FactoryFunc[P provider.Providable] func(ctx context.Context, nodeConfig NodeConfig) (provider.Provider[P], error)

Functions that implement the factories for easier creation of new implementations

func (FactoryFunc[P]) Get added in v1.2.1

func (f FactoryFunc[P]) Get(ctx context.Context, nodeConfig NodeConfig) (provider.Provider[P], error)

type FeatureConfig added in v0.3.29

type FeatureConfig struct {
	Engines    []string
	Publishers []string
	Storages   []string
}

type MetadataStore added in v1.5.0

type MetadataStore interface {
	ReadLastUpdateCheck() (time.Time, error)
	WriteLastUpdateCheck(time.Time) error
	InstanceID() string
}

type Node

type Node struct {
	// Visible for testing
	ID             string
	APIServer      *publicapi.Server
	ComputeNode    *Compute
	RequesterNode  *Requester
	CleanupManager *system.CleanupManager
}

func NewNode

func NewNode(
	ctx context.Context,
	cfg NodeConfig,
	metadataStore MetadataStore,
) (*Node, error)

func (*Node) IsComputeNode

func (n *Node) IsComputeNode() bool

IsComputeNode returns true if the node is a compute node

func (*Node) IsRequesterNode

func (n *Node) IsRequesterNode() bool

IsRequesterNode returns true if the node is a requester node

func (*Node) Start

func (n *Node) Start(ctx context.Context) error

type NodeConfig

type NodeConfig struct {
	NodeID                 string
	CleanupManager         *system.CleanupManager
	BacalhauConfig         types.Bacalhau
	SystemConfig           SystemConfig
	DependencyInjector     NodeDependencyInjector
	FailureInjectionConfig models.FailureInjectionConfig
}

func (*NodeConfig) Validate added in v1.2.1

func (c *NodeConfig) Validate() error

type NodeDependencyInjector

type NodeDependencyInjector struct {
	StorageProvidersFactory StorageProvidersFactory
	ExecutorsFactory        ExecutorsFactory
	PublishersFactory       PublishersFactory
	AuthenticatorsFactory   AuthenticatorsFactory
}

NodeDependencyInjector Lazy node dependency injector that generate instances of different components on demand and based on the configuration provided.

func NewExecutorPluginNodeDependencyInjector added in v1.0.4

func NewExecutorPluginNodeDependencyInjector(
	cfg types.Bacalhau,
	userKey *baccrypto.UserKey,
	pluginPath string,
) NodeDependencyInjector

func NewStandardNodeDependencyInjector

func NewStandardNodeDependencyInjector(cfg types.Bacalhau, userKey *baccrypto.UserKey) NodeDependencyInjector

type PublishersFactory

type PublishersFactory = Factory[publisher.Publisher]

func NewStandardPublishersFactory

func NewStandardPublishersFactory(cfg types.Bacalhau) PublishersFactory

type PublishersFactoryFunc

type PublishersFactoryFunc = FactoryFunc[publisher.Publisher]

type Requester

type Requester struct {
	// Visible for testing
	Endpoint *orchestrator.BaseEndpoint
	JobStore jobstore.Store
	// We need a reference to the node info store until libp2p is removed
	NodeInfoStore  routing.NodeInfoStore
	NodeDiscoverer orchestrator.NodeDiscoverer
	// contains filtered or unexported fields
}

func NewRequesterNode

func NewRequesterNode(
	ctx context.Context,
	cfg NodeConfig,
	apiServer *publicapi.Server,
	transportLayer *nats_transport.NATSTransport,
	computeProxy compute.Endpoint,
	messageSerDeRegistry *ncl.MessageSerDeRegistry,
	metadataStore MetadataStore,
) (*Requester, error)

type RuntimeLabelsProvider added in v1.2.1

type RuntimeLabelsProvider struct{}

func (*RuntimeLabelsProvider) GetLabels added in v1.2.1

GetLabels implements models.LabelsProvider.

type StorageProvidersFactory

type StorageProvidersFactory = Factory[storage.Storage]

func NewStandardStorageProvidersFactory

func NewStandardStorageProvidersFactory(cfg types.Bacalhau) StorageProvidersFactory

Standard implementations used in prod and when testing prod behavior

type StorageProvidersFactoryFunc

type StorageProvidersFactoryFunc = FactoryFunc[storage.Storage]

type SystemConfig added in v1.5.0

type SystemConfig struct {

	// RetryStrategy overrides the orchestrator's retry strategy for testing purposes
	RetryStrategy orchestrator.RetryStrategy

	// OverSubscriptionFactor overrides the orchestrator's over subscription factor for testing purposes
	OverSubscriptionFactor float64

	// NodeRankRandomnessRange overrides the node's rank randomness range for testing purposes
	NodeRankRandomnessRange int

	// BidSemanticStrategy overrides the node's bid semantic strategy for testing purposes
	BidSemanticStrategy bidstrategy.BidStrategy
	// BidResourceStrategy overrides the node's bid resource strategy for testing purposes
	BidResourceStrategy bidstrategy.BidStrategy

	// TODO: remove compute level resource defaults. This should be handled at the orchestrator,
	//  but we still need to validate the behaviour is a job without resource limits land on a compute node
	DefaultComputeJobResourceLimits models.Resources
}

SystemConfig is node configuration that cannot be specified by the user. They are meant for internal use only and to override the node's behaviour for testing purposes

func DefaultSystemConfig added in v1.5.0

func DefaultSystemConfig() SystemConfig

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL