Documentation ¶
Index ¶
- Variables
- func GetNodeInfoPublishConfig() routing.NodeInfoPublisherIntervalConfig
- type AuthenticatorsFactory
- type AuthenticatorsFactoryFunc
- type Compute
- type ComputeConfig
- type ComputeConfigParams
- type ConfigLabelsProvider
- type ExecutorsFactory
- type ExecutorsFactoryFunc
- type Factory
- type FactoryFunc
- type FeatureConfig
- type JobSelectionPolicy
- type NetworkConfig
- type Node
- type NodeConfig
- type NodeDependencyInjector
- type PublishersFactory
- type PublishersFactoryFunc
- type Requester
- type RequesterConfig
- type RequesterConfigParams
- type RuntimeLabelsProvider
- type StorageProvidersFactory
- type StorageProvidersFactoryFunc
Constants ¶
This section is empty.
Variables ¶
View Source
var DefaultComputeConfig = ComputeConfigParams{ PhysicalResourcesProvider: compute_system.NewPhysicalCapacityProvider(), DefaultJobResourceLimits: models.Resources{ CPU: 0.1, Memory: 100 * 1024 * 1024, }, JobNegotiationTimeout: 3 * time.Minute, MinJobExecutionTimeout: 500 * time.Millisecond, MaxJobExecutionTimeout: model.NoJobTimeout, DefaultJobExecutionTimeout: model.NoJobTimeout, LogRunningExecutionsInterval: 10 * time.Second, JobSelectionPolicy: NewDefaultJobSelectionPolicy(), LocalPublisher: types.LocalPublisherConfig{ Directory: path.Join(config.GetStoragePath(), "bacalhau-local-publisher"), }, }
View Source
var DefaultNodeInfoPublishConfig = routing.NodeInfoPublisherIntervalConfig{ Interval: 30 * time.Second, EagerPublishInterval: 5 * time.Second, EagerPublishDuration: 30 * time.Second, }
View Source
var DefaultRequesterConfig = RequesterConfigParams{ JobDefaults: transformer.JobDefaults{ ExecutionTimeout: model.NoJobTimeout, }, HousekeepingBackgroundTaskInterval: 30 * time.Second, NodeRankRandomnessRange: 5, OverAskForBidsFactor: 3, MinBacalhauVersion: models.BuildVersionInfo{ Major: "1", Minor: "0", GitVersion: "v1.0.4", }, EvalBrokerVisibilityTimeout: 60 * time.Second, EvalBrokerInitialRetryDelay: 1 * time.Second, EvalBrokerSubsequentRetryDelay: 30 * time.Second, EvalBrokerMaxRetryCount: 10, WorkerCount: runtime.NumCPU(), WorkerEvalDequeueTimeout: 5 * time.Second, WorkerEvalDequeueBaseBackoff: 1 * time.Second, WorkerEvalDequeueMaxBackoff: 30 * time.Second, S3PreSignedURLDisabled: false, S3PreSignedURLExpiration: 30 * time.Minute, TranslationEnabled: false, }
View Source
var TestNodeInfoPublishConfig = routing.NodeInfoPublisherIntervalConfig{ Interval: 30 * time.Second, EagerPublishInterval: 10 * time.Millisecond, EagerPublishDuration: 5 * time.Second, }
TestNodeInfoPublishConfig speeds up node announcements for tests
View Source
var TestRequesterConfig = RequesterConfigParams{ JobDefaults: transformer.JobDefaults{ ExecutionTimeout: 30 * time.Second, }, HousekeepingBackgroundTaskInterval: 30 * time.Second, NodeRankRandomnessRange: 5, OverAskForBidsFactor: 3, MinBacalhauVersion: models.BuildVersionInfo{ Major: "1", Minor: "0", GitVersion: "v1.0.4", }, EvalBrokerVisibilityTimeout: 5 * time.Second, EvalBrokerInitialRetryDelay: 100 * time.Millisecond, EvalBrokerSubsequentRetryDelay: 100 * time.Millisecond, EvalBrokerMaxRetryCount: 3, WorkerCount: 3, WorkerEvalDequeueTimeout: 200 * time.Millisecond, WorkerEvalDequeueBaseBackoff: 20 * time.Millisecond, WorkerEvalDequeueMaxBackoff: 200 * time.Millisecond, TranslationEnabled: false, S3PreSignedURLDisabled: false, S3PreSignedURLExpiration: 30 * time.Minute, }
Functions ¶
func GetNodeInfoPublishConfig ¶ added in v1.0.4
func GetNodeInfoPublishConfig() routing.NodeInfoPublisherIntervalConfig
Types ¶
type AuthenticatorsFactory ¶ added in v1.2.1
type AuthenticatorsFactory = Factory[authn.Authenticator]
func NewStandardAuthenticatorsFactory ¶ added in v1.2.1
func NewStandardAuthenticatorsFactory() AuthenticatorsFactory
type AuthenticatorsFactoryFunc ¶ added in v1.2.1
type AuthenticatorsFactoryFunc = FactoryFunc[authn.Authenticator]
type Compute ¶
type Compute struct { // Visible for testing ID string LocalEndpoint compute.Endpoint Capacity capacity.Tracker ExecutionStore store.ExecutionStore Executors executor.ExecutorProvider Storages storage.StorageProvider Bidder compute.Bidder // contains filtered or unexported fields }
func NewComputeNode ¶
func NewComputeNode( ctx context.Context, nodeID string, cleanupManager *system.CleanupManager, apiServer *publicapi.Server, config ComputeConfig, storagePath string, storages storage.StorageProvider, executors executor.ExecutorProvider, publishers publisher.PublisherProvider, computeCallback compute.Callback, ) (*Compute, error)
type ComputeConfig ¶
type ComputeConfig struct { // Capacity config TotalResourceLimits models.Resources QueueResourceLimits models.Resources JobResourceLimits models.Resources DefaultJobResourceLimits models.Resources IgnorePhysicalResourceLimits bool // JobNegotiationTimeout default timeout value to hold a bid for a job JobNegotiationTimeout time.Duration // MinJobExecutionTimeout default value for the minimum execution timeout this compute node supports. Jobs with // lower timeout requirements will not be bid on. MinJobExecutionTimeout time.Duration // MaxJobExecutionTimeout default value for the maximum execution timeout this compute node supports. Jobs with // higher timeout requirements will not be bid on. MaxJobExecutionTimeout time.Duration // DefaultJobExecutionTimeout default value for the execution timeout this compute node will assign to jobs with // no timeout requirement defined. DefaultJobExecutionTimeout time.Duration // JobExecutionTimeoutClientIDBypassList is the list of clients that are allowed to bypass the job execution timeout // check. JobExecutionTimeoutClientIDBypassList []string // Bid strategies config JobSelectionPolicy JobSelectionPolicy // logging running executions LogRunningExecutionsInterval time.Duration // How many messages to buffer in the log stream channel LogStreamBufferSize int FailureInjectionConfig model.FailureInjectionComputeConfig BidSemanticStrategy bidstrategy.SemanticBidStrategy BidResourceStrategy bidstrategy.ResourceBidStrategy ExecutionStore store.ExecutionStore LocalPublisher types.LocalPublisherConfig }
func NewComputeConfigWith ¶
func NewComputeConfigWith(params ComputeConfigParams) (ComputeConfig, error)
func NewComputeConfigWithDefaults ¶
func NewComputeConfigWithDefaults() (ComputeConfig, error)
type ComputeConfigParams ¶
type ComputeConfigParams struct { // Capacity config TotalResourceLimits models.Resources QueueResourceLimits models.Resources JobResourceLimits models.Resources DefaultJobResourceLimits models.Resources PhysicalResourcesProvider capacity.Provider IgnorePhysicalResourceLimits bool // Timeout config JobNegotiationTimeout time.Duration MinJobExecutionTimeout time.Duration MaxJobExecutionTimeout time.Duration DefaultJobExecutionTimeout time.Duration JobExecutionTimeoutClientIDBypassList []string // Bid strategies config JobSelectionPolicy JobSelectionPolicy // logging running executions LogRunningExecutionsInterval time.Duration // How many messages to buffer in the log stream channel LogStreamBufferSize int FailureInjectionConfig model.FailureInjectionComputeConfig BidSemanticStrategy bidstrategy.SemanticBidStrategy BidResourceStrategy bidstrategy.ResourceBidStrategy ExecutionStore store.ExecutionStore LocalPublisher types.LocalPublisherConfig }
type ConfigLabelsProvider ¶ added in v1.2.1
type ConfigLabelsProvider struct {
// contains filtered or unexported fields
}
type ExecutorsFactory ¶
func NewPluginExecutorFactory ¶ added in v1.0.4
func NewPluginExecutorFactory() ExecutorsFactory
func NewStandardExecutorsFactory ¶
func NewStandardExecutorsFactory() ExecutorsFactory
type ExecutorsFactoryFunc ¶
type ExecutorsFactoryFunc = FactoryFunc[executor.Executor]
type Factory ¶ added in v1.2.1
type Factory[P provider.Providable] interface { Get(ctx context.Context, nodeConfig NodeConfig) (provider.Provider[P], error) }
Interfaces to inject dependencies into the stack
type FactoryFunc ¶ added in v1.2.1
type FactoryFunc[P provider.Providable] func(ctx context.Context, nodeConfig NodeConfig) (provider.Provider[P], error)
Functions that implement the factories for easier creation of new implementations
func (FactoryFunc[P]) Get ¶ added in v1.2.1
func (f FactoryFunc[P]) Get(ctx context.Context, nodeConfig NodeConfig) (provider.Provider[P], error)
type FeatureConfig ¶ added in v0.3.29
type JobSelectionPolicy ¶ added in v1.0.4
type JobSelectionPolicy struct { // this describes if we should run a job based on // where the data is located - i.e. if the data is "local" // or if the data is "anywhere" Locality semantic.JobSelectionDataLocality `json:"locality"` // should we reject jobs that don't specify any data // the default is "accept" RejectStatelessJobs bool `json:"reject_stateless_jobs"` // should we accept jobs that specify networking // the default is "reject" AcceptNetworkedJobs bool `json:"accept_networked_jobs"` // external hooks that decide if we should take on the job or not // if either of these are given they will override the data locality settings ProbeHTTP string `json:"probe_http,omitempty"` ProbeExec string `json:"probe_exec,omitempty"` }
JobSelectionPolicy describe the rules for how a compute node selects an incoming job
func NewDefaultJobSelectionPolicy ¶ added in v1.0.4
func NewDefaultJobSelectionPolicy() JobSelectionPolicy
type NetworkConfig ¶ added in v1.2.1
type NetworkConfig struct { Type string Libp2pHost host.Host // only set if using libp2p transport, nil otherwise ReconnectDelay time.Duration // NATS config for requesters to be reachable by compute nodes Port int AdvertisedAddress string Orchestrators []string // Storage directory for NATS features that require it StoreDir string // AuthSecret is a secret string that clients must use to connect. It is // only used by NATS servers; clients should supply the auth secret as the // user part of their Orchestrator URL. AuthSecret string // NATS config for requester nodes to connect with each other ClusterName string ClusterPort int ClusterAdvertisedAddress string ClusterPeers []string }
func (*NetworkConfig) Validate ¶ added in v1.2.1
func (c *NetworkConfig) Validate() error
type Node ¶
type Node struct { // Visible for testing ID string APIServer *publicapi.Server ComputeNode *Compute RequesterNode *Requester CleanupManager *system.CleanupManager IPFSClient ipfs.Client Libp2pHost host.Host // only set if using libp2p transport, nil otherwise }
func (*Node) IsComputeNode ¶
IsComputeNode returns true if the node is a compute node
func (*Node) IsRequesterNode ¶
IsRequesterNode returns true if the node is a requester node
type NodeConfig ¶
type NodeConfig struct { NodeID string IPFSClient ipfs.Client CleanupManager *system.CleanupManager HostAddress string APIPort uint16 RequesterAutoCert string RequesterAutoCertCache string RequesterTLSCertificateFile string RequesterTLSKeyFile string DisabledFeatures FeatureConfig ComputeConfig ComputeConfig RequesterNodeConfig RequesterConfig APIServerConfig publicapi.Config AuthConfig types.AuthConfig IsRequesterNode bool IsComputeNode bool Labels map[string]string NodeInfoPublisherInterval routing.NodeInfoPublisherIntervalConfig DependencyInjector NodeDependencyInjector AllowListedLocalPaths []string NodeInfoStoreTTL time.Duration NetworkConfig NetworkConfig }
Node configuration
func (*NodeConfig) Validate ¶ added in v1.2.1
func (c *NodeConfig) Validate() error
type NodeDependencyInjector ¶
type NodeDependencyInjector struct { StorageProvidersFactory StorageProvidersFactory ExecutorsFactory ExecutorsFactory PublishersFactory PublishersFactory AuthenticatorsFactory AuthenticatorsFactory }
Lazy node dependency injector that generate instances of different components on demand and based on the configuration provided.
func NewExecutorPluginNodeDependencyInjector ¶ added in v1.0.4
func NewExecutorPluginNodeDependencyInjector() NodeDependencyInjector
func NewStandardNodeDependencyInjector ¶
func NewStandardNodeDependencyInjector() NodeDependencyInjector
type PublishersFactory ¶
func NewStandardPublishersFactory ¶
func NewStandardPublishersFactory() PublishersFactory
type PublishersFactoryFunc ¶
type PublishersFactoryFunc = FactoryFunc[publisher.Publisher]
type Requester ¶
type Requester struct { // Visible for testing Endpoint requester.Endpoint EndpointV2 *orchestrator.BaseEndpoint JobStore jobstore.Store NodeInfoStore routing.NodeInfoStore NodeDiscoverer orchestrator.NodeDiscoverer // contains filtered or unexported fields }
func NewRequesterNode ¶
type RequesterConfig ¶
type RequesterConfig struct {
RequesterConfigParams
}
func NewRequesterConfigWith ¶
func NewRequesterConfigWith(params RequesterConfigParams) (RequesterConfig, error)
func NewRequesterConfigWithDefaults ¶
func NewRequesterConfigWithDefaults() (RequesterConfig, error)
type RequesterConfigParams ¶
type RequesterConfigParams struct { JobDefaults transformer.JobDefaults HousekeepingBackgroundTaskInterval time.Duration NodeRankRandomnessRange int OverAskForBidsFactor uint JobSelectionPolicy JobSelectionPolicy ExternalValidatorWebhook *url.URL FailureInjectionConfig model.FailureInjectionRequesterConfig // minimum version of compute nodes that the requester will accept and route jobs to MinBacalhauVersion models.BuildVersionInfo RetryStrategy orchestrator.RetryStrategy // evaluation broker config EvalBrokerVisibilityTimeout time.Duration EvalBrokerInitialRetryDelay time.Duration EvalBrokerSubsequentRetryDelay time.Duration EvalBrokerMaxRetryCount int // worker config WorkerCount int WorkerEvalDequeueTimeout time.Duration WorkerEvalDequeueBaseBackoff time.Duration WorkerEvalDequeueMaxBackoff time.Duration // Should the orchestrator attempt to translate jobs? TranslationEnabled bool S3PreSignedURLDisabled bool S3PreSignedURLExpiration time.Duration JobStore jobstore.Store DefaultPublisher string }
type RuntimeLabelsProvider ¶ added in v1.2.1
type RuntimeLabelsProvider struct{}
type StorageProvidersFactory ¶
func NewStandardStorageProvidersFactory ¶
func NewStandardStorageProvidersFactory() StorageProvidersFactory
Standard implementations used in prod and when testing prod behavior
type StorageProvidersFactoryFunc ¶
type StorageProvidersFactoryFunc = FactoryFunc[storage.Storage]
Source Files ¶
Click to show internal directories.
Click to hide internal directories.