Documentation ¶
Index ¶
- Constants
- Variables
- func CreateMessageSerDeRegistry() (*ncl.MessageSerDeRegistry, error)
- func NewBidder(config ComputeConfig, publishers publisher.PublisherProvider, ...) compute.Bidder
- type AuthenticatorsFactory
- type AuthenticatorsFactoryFunc
- type Compute
- type ComputeConfig
- type ComputeConfigParams
- type ConfigLabelsProvider
- type ExecutorsFactory
- type ExecutorsFactoryFunc
- type Factory
- type FactoryFunc
- type FeatureConfig
- type JobSelectionPolicy
- type NetworkConfig
- type Node
- type NodeConfig
- type NodeDependencyInjector
- type PublishersFactory
- type PublishersFactoryFunc
- type Requester
- type RequesterConfig
- type RequesterConfigParams
- type RuntimeLabelsProvider
- type StorageProvidersFactory
- type StorageProvidersFactoryFunc
Constants ¶
View Source
const HeartbeatTopicFormat = "bacalhau.global.compute.%s.out.heartbeat"
Variables ¶
View Source
var DefaultRequesterConfig = RequesterConfigParams{ HousekeepingBackgroundTaskInterval: 30 * time.Second, HousekeepingTimeoutBuffer: 2 * time.Minute, NodeRankRandomnessRange: 5, OverAskForBidsFactor: 3, MinBacalhauVersion: models.BuildVersionInfo{ Major: "1", Minor: "0", GitVersion: "v1.0.4", }, EvalBrokerVisibilityTimeout: 60 * time.Second, EvalBrokerInitialRetryDelay: 1 * time.Second, EvalBrokerSubsequentRetryDelay: 30 * time.Second, EvalBrokerMaxRetryCount: 10, WorkerCount: runtime.NumCPU(), WorkerEvalDequeueTimeout: 5 * time.Second, WorkerEvalDequeueBaseBackoff: 1 * time.Second, WorkerEvalDequeueMaxBackoff: 30 * time.Second, NodeOverSubscriptionFactor: 1.5, S3PreSignedURLDisabled: false, S3PreSignedURLExpiration: 30 * time.Minute, TranslationEnabled: false, ControlPlaneSettings: legacy_types.RequesterControlPlaneConfig{ HeartbeatCheckFrequency: legacy_types.Duration(30 * time.Second), NodeDisconnectedAfter: legacy_types.Duration(30 * time.Second), }, NodeInfoStoreTTL: 10 * time.Minute, DefaultApprovalState: models.NodeMembership.APPROVED, }
View Source
var TestRequesterConfig = RequesterConfigParams{ HousekeepingBackgroundTaskInterval: 30 * time.Second, HousekeepingTimeoutBuffer: 100 * time.Millisecond, NodeRankRandomnessRange: 5, OverAskForBidsFactor: 3, MinBacalhauVersion: models.BuildVersionInfo{ Major: "1", Minor: "0", GitVersion: "v1.0.4", }, EvalBrokerVisibilityTimeout: 5 * time.Second, EvalBrokerInitialRetryDelay: 100 * time.Millisecond, EvalBrokerSubsequentRetryDelay: 100 * time.Millisecond, EvalBrokerMaxRetryCount: 3, WorkerCount: 3, WorkerEvalDequeueTimeout: 200 * time.Millisecond, WorkerEvalDequeueBaseBackoff: 20 * time.Millisecond, WorkerEvalDequeueMaxBackoff: 200 * time.Millisecond, NodeOverSubscriptionFactor: 1.5, TranslationEnabled: false, S3PreSignedURLDisabled: false, S3PreSignedURLExpiration: 30 * time.Minute, ControlPlaneSettings: legacy_types.RequesterControlPlaneConfig{ HeartbeatCheckFrequency: legacy_types.Duration(30 * time.Second), NodeDisconnectedAfter: legacy_types.Duration(30 * time.Second), }, DefaultApprovalState: models.NodeMembership.APPROVED, }
Functions ¶
func CreateMessageSerDeRegistry ¶ added in v1.5.0
func CreateMessageSerDeRegistry() (*ncl.MessageSerDeRegistry, error)
CreateMessageSerDeRegistry creates a new payload registry.
func NewBidder ¶ added in v1.3.1
func NewBidder( config ComputeConfig, publishers publisher.PublisherProvider, storages storage.StorageProvider, executors executor.ExecutorProvider, runningCapacityTracker capacity.Tracker, nodeID string, executionStore store.ExecutionStore, computeCallback compute.Callback, bufferRunner *compute.ExecutorBuffer, apiServer *publicapi.Server, calculator capacity.UsageCalculator, ) compute.Bidder
Types ¶
type AuthenticatorsFactory ¶ added in v1.2.1
type AuthenticatorsFactory = Factory[authn.Authenticator]
func NewStandardAuthenticatorsFactory ¶ added in v1.2.1
func NewStandardAuthenticatorsFactory(userKey *baccrypto.UserKey) AuthenticatorsFactory
type AuthenticatorsFactoryFunc ¶ added in v1.2.1
type AuthenticatorsFactoryFunc = FactoryFunc[authn.Authenticator]
type Compute ¶
type Compute struct { // Visible for testing ID string LocalEndpoint compute.Endpoint Capacity capacity.Tracker ExecutionStore store.ExecutionStore Executors executor.ExecutorProvider Storages storage.StorageProvider Publishers publisher.PublisherProvider Bidder compute.Bidder ManagementClient *compute.ManagementClient // contains filtered or unexported fields }
func NewComputeNode ¶
func NewComputeNode( ctx context.Context, nodeID string, apiServer *publicapi.Server, cfg types.Bacalhau, config ComputeConfig, storages storage.StorageProvider, executors executor.ExecutorProvider, publishers publisher.PublisherProvider, natsConn *nats.Conn, computeCallback compute.Callback, managementProxy compute.ManagementEndpoint, configuredLabels map[string]string, messageSerDeRegistry *ncl.MessageSerDeRegistry, ) (*Compute, error)
type ComputeConfig ¶
type ComputeConfig struct { // Capacity config TotalResourceLimits models.Resources JobResourceLimits models.Resources DefaultJobResourceLimits models.Resources IgnorePhysicalResourceLimits bool // JobNegotiationTimeout default timeout value to hold a bid for a job JobNegotiationTimeout time.Duration // MinJobExecutionTimeout default value for the minimum execution timeout this compute node supports. Jobs with // lower timeout requirements will not be bid on. MinJobExecutionTimeout time.Duration // MaxJobExecutionTimeout default value for the maximum execution timeout this compute node supports. Jobs with // higher timeout requirements will not be bid on. MaxJobExecutionTimeout time.Duration // DefaultJobExecutionTimeout default value for the execution timeout this compute node will assign to jobs with // no timeout requirement defined. DefaultJobExecutionTimeout time.Duration // Bid strategies config JobSelectionPolicy JobSelectionPolicy // logging running executions LogRunningExecutionsInterval time.Duration // How many messages to buffer in the log stream channel LogStreamBufferSize int FailureInjectionConfig models.FailureInjectionComputeConfig BidSemanticStrategy bidstrategy.SemanticBidStrategy BidResourceStrategy bidstrategy.ResourceBidStrategy ExecutionStore store.ExecutionStore LocalPublisher types.LocalPublisher ControlPlaneSettings legacy_types.ComputeControlPlaneConfig }
func NewComputeConfigWith ¶
func NewComputeConfigWith(executionDir string, params ComputeConfigParams) (ComputeConfig, error)
func NewComputeConfigWithDefaults ¶
func NewComputeConfigWithDefaults(executionDir string) (ComputeConfig, error)
func (*ComputeConfig) Validate ¶
func (c *ComputeConfig) Validate() error
type ComputeConfigParams ¶
type ComputeConfigParams struct { // Capacity config TotalResourceLimits models.Resources JobResourceLimits models.Resources DefaultJobResourceLimits models.Resources PhysicalResourcesProvider capacity.Provider // Deprecated: this feature is no longer supported, but still used in several tests and thus preserved IgnorePhysicalResourceLimits bool // Timeout config JobNegotiationTimeout time.Duration MinJobExecutionTimeout time.Duration MaxJobExecutionTimeout time.Duration DefaultJobExecutionTimeout time.Duration // Bid strategies config JobSelectionPolicy JobSelectionPolicy // logging running executions LogRunningExecutionsInterval time.Duration // How many messages to buffer in the log stream channel LogStreamBufferSize int FailureInjectionConfig models.FailureInjectionComputeConfig BidSemanticStrategy bidstrategy.SemanticBidStrategy BidResourceStrategy bidstrategy.ResourceBidStrategy ExecutionStore store.ExecutionStore LocalPublisher types.LocalPublisher ControlPlaneSettings legacy_types.ComputeControlPlaneConfig }
func NewDefaultComputeParam ¶ added in v1.3.2
func NewDefaultComputeParam(storagePath string) ComputeConfigParams
type ConfigLabelsProvider ¶ added in v1.2.1
type ConfigLabelsProvider struct {
// contains filtered or unexported fields
}
type ExecutorsFactory ¶
func NewPluginExecutorFactory ¶ added in v1.0.4
func NewPluginExecutorFactory(pluginPath string) ExecutorsFactory
func NewStandardExecutorsFactory ¶
func NewStandardExecutorsFactory(cfg types.EngineConfig) ExecutorsFactory
type ExecutorsFactoryFunc ¶
type ExecutorsFactoryFunc = FactoryFunc[executor.Executor]
type Factory ¶ added in v1.2.1
type Factory[P provider.Providable] interface { Get(ctx context.Context, nodeConfig NodeConfig) (provider.Provider[P], error) }
Interfaces to inject dependencies into the stack
type FactoryFunc ¶ added in v1.2.1
type FactoryFunc[P provider.Providable] func(ctx context.Context, nodeConfig NodeConfig) (provider.Provider[P], error)
Functions that implement the factories for easier creation of new implementations
func (FactoryFunc[P]) Get ¶ added in v1.2.1
func (f FactoryFunc[P]) Get(ctx context.Context, nodeConfig NodeConfig) (provider.Provider[P], error)
type FeatureConfig ¶ added in v0.3.29
type JobSelectionPolicy ¶ added in v1.0.4
type JobSelectionPolicy struct { // this describes if we should run a job based on // where the data is located - i.e. if the data is "local" // or if the data is "anywhere" Locality semantic.JobSelectionDataLocality `json:"locality"` // should we reject jobs that don't specify any data // the default is "accept" RejectStatelessJobs bool `json:"reject_stateless_jobs"` // should we accept jobs that specify networking // the default is "reject" AcceptNetworkedJobs bool `json:"accept_networked_jobs"` // external hooks that decide if we should take on the job or not // if either of these are given they will override the data locality settings ProbeHTTP string `json:"probe_http,omitempty"` ProbeExec string `json:"probe_exec,omitempty"` }
JobSelectionPolicy describe the rules for how a compute node selects an incoming job
func NewDefaultJobSelectionPolicy ¶ added in v1.0.4
func NewDefaultJobSelectionPolicy() JobSelectionPolicy
type NetworkConfig ¶ added in v1.2.1
type NetworkConfig struct { // NATS config for requesters to be reachable by compute nodes Port int AdvertisedAddress string Orchestrators []string // Storage directory for NATS features that require it StoreDir string // AuthSecret is a secret string that clients must use to connect. NATS servers // must supply this config, while clients can also supply it as the user part // of their Orchestrator URL. AuthSecret string // NATS config for requester nodes to connect with each other ClusterName string ClusterPort int ClusterAdvertisedAddress string // When using NATS, never set this value unless you are connecting multiple requester // nodes together. This should never reference this current running instance (e.g. // don't use localhost). ClusterPeers []string }
func (*NetworkConfig) Validate ¶ added in v1.2.1
func (c *NetworkConfig) Validate() error
type Node ¶
type Node struct { // Visible for testing ID string APIServer *publicapi.Server ComputeNode *Compute RequesterNode *Requester CleanupManager *system.CleanupManager }
func (*Node) IsComputeNode ¶
IsComputeNode returns true if the node is a compute node
func (*Node) IsRequesterNode ¶
IsRequesterNode returns true if the node is a requester node
type NodeConfig ¶
type NodeConfig struct { NodeID string CleanupManager *system.CleanupManager HostAddress string APIPort uint16 RequesterAutoCert string RequesterAutoCertCache string RequesterTLSCertificateFile string RequesterTLSKeyFile string RequesterSelfSign bool DisabledFeatures FeatureConfig ComputeConfig ComputeConfig RequesterNodeConfig RequesterConfig APIServerConfig publicapi.Config AuthConfig types.AuthConfig IsRequesterNode bool IsComputeNode bool Labels map[string]string DependencyInjector NodeDependencyInjector AllowListedLocalPaths []string NetworkConfig NetworkConfig }
Node configuration
func (*NodeConfig) Validate ¶ added in v1.2.1
func (c *NodeConfig) Validate() error
type NodeDependencyInjector ¶
type NodeDependencyInjector struct { StorageProvidersFactory StorageProvidersFactory ExecutorsFactory ExecutorsFactory PublishersFactory PublishersFactory AuthenticatorsFactory AuthenticatorsFactory }
Lazy node dependency injector that generate instances of different components on demand and based on the configuration provided.
func NewExecutorPluginNodeDependencyInjector ¶ added in v1.0.4
func NewStandardNodeDependencyInjector ¶
func NewStandardNodeDependencyInjector(cfg types.Bacalhau, userKey *baccrypto.UserKey) NodeDependencyInjector
type PublishersFactory ¶
func NewStandardPublishersFactory ¶
func NewStandardPublishersFactory(cfg types.Bacalhau) PublishersFactory
type PublishersFactoryFunc ¶
type PublishersFactoryFunc = FactoryFunc[publisher.Publisher]
type Requester ¶
type Requester struct { // Visible for testing Endpoint *orchestrator.BaseEndpoint JobStore jobstore.Store // We need a reference to the node info store until libp2p is removed NodeInfoStore routing.NodeInfoStore NodeDiscoverer orchestrator.NodeDiscoverer // contains filtered or unexported fields }
func NewRequesterNode ¶
func NewRequesterNode( ctx context.Context, nodeID string, apiServer *publicapi.Server, nodeConfig NodeConfig, metricsConfig legacy_types.MetricsConfig, requesterConfig RequesterConfig, transportLayer *nats_transport.NATSTransport, computeProxy compute.Endpoint, messageSerDeRegistry *ncl.MessageSerDeRegistry, fsr *repo.FsRepo, ) (*Requester, error)
type RequesterConfig ¶
type RequesterConfig struct {
RequesterConfigParams
}
func NewRequesterConfigWith ¶
func NewRequesterConfigWith(params RequesterConfigParams) (RequesterConfig, error)
func NewRequesterConfigWithDefaults ¶
func NewRequesterConfigWithDefaults() (RequesterConfig, error)
type RequesterConfigParams ¶
type RequesterConfigParams struct { JobDefaults types.JobDefaults HousekeepingBackgroundTaskInterval time.Duration HousekeepingTimeoutBuffer time.Duration NodeRankRandomnessRange int OverAskForBidsFactor uint JobSelectionPolicy JobSelectionPolicy ExternalValidatorWebhook *url.URL FailureInjectionConfig models.FailureInjectionRequesterConfig // minimum version of compute nodes that the requester will accept and route jobs to MinBacalhauVersion models.BuildVersionInfo RetryStrategy orchestrator.RetryStrategy // evaluation broker config EvalBrokerVisibilityTimeout time.Duration EvalBrokerInitialRetryDelay time.Duration EvalBrokerSubsequentRetryDelay time.Duration EvalBrokerMaxRetryCount int // worker config WorkerCount int WorkerEvalDequeueTimeout time.Duration WorkerEvalDequeueBaseBackoff time.Duration WorkerEvalDequeueMaxBackoff time.Duration // scheduler config SchedulerQueueBackoff time.Duration NodeOverSubscriptionFactor float64 // Should the orchestrator attempt to translate jobs? TranslationEnabled bool S3PreSignedURLDisabled bool S3PreSignedURLExpiration time.Duration JobStore jobstore.Store // TODO(review): we don't use this field for anything, maybe we can delete it NodeInfoStoreTTL time.Duration // When new nodes join the cluster, what state do they have? By default, APPROVED, and // for tests, APPROVED. We will provide an option to set this to PENDING for production // or for when operators are ready to control node approval. DefaultApprovalState models.NodeMembershipState ControlPlaneSettings legacy_types.RequesterControlPlaneConfig }
type RuntimeLabelsProvider ¶ added in v1.2.1
type RuntimeLabelsProvider struct{}
type StorageProvidersFactory ¶
func NewStandardStorageProvidersFactory ¶
func NewStandardStorageProvidersFactory(cfg types.Bacalhau) StorageProvidersFactory
Standard implementations used in prod and when testing prod behavior
type StorageProvidersFactoryFunc ¶
type StorageProvidersFactoryFunc = FactoryFunc[storage.Storage]
Source Files ¶
Click to show internal directories.
Click to hide internal directories.