Documentation ¶
Index ¶
- Constants
- Variables
- func GetNodeInfoPublishConfig() routing.NodeInfoPublisherIntervalConfig
- type Compute
- type ComputeConfig
- type ComputeConfigParams
- type ExecutorsFactory
- type ExecutorsFactoryFunc
- type FeatureConfig
- type JobSelectionPolicy
- type Node
- type NodeConfig
- type NodeDependencyInjector
- type PublishersFactory
- type PublishersFactoryFunc
- type Requester
- type RequesterConfig
- type RequesterConfigParams
- type StorageProvidersFactory
- type StorageProvidersFactoryFunc
Constants ¶
View Source
const JobInfoTopic = "bacalhau-job-info"
View Source
const NodeInfoTopic = "bacalhau-node-info"
Variables ¶
View Source
var DefaultComputeConfig = ComputeConfigParams{ PhysicalResourcesProvider: compute_system.NewPhysicalCapacityProvider(), DefaultJobResourceLimits: models.Resources{ CPU: 0.1, Memory: 100 * 1024 * 1024, }, JobNegotiationTimeout: 3 * time.Minute, MinJobExecutionTimeout: 500 * time.Millisecond, MaxJobExecutionTimeout: model.NoJobTimeout, DefaultJobExecutionTimeout: model.NoJobTimeout, LogRunningExecutionsInterval: 10 * time.Second, JobSelectionPolicy: NewDefaultJobSelectionPolicy(), }
View Source
var DefaultNodeInfoPublishConfig = routing.NodeInfoPublisherIntervalConfig{ Interval: 30 * time.Second, EagerPublishInterval: 5 * time.Second, EagerPublishDuration: 30 * time.Second, }
View Source
var DefaultRequesterConfig = RequesterConfigParams{ JobDefaults: transformer.JobDefaults{ ExecutionTimeout: model.NoJobTimeout, }, HousekeepingBackgroundTaskInterval: 30 * time.Second, NodeRankRandomnessRange: 5, OverAskForBidsFactor: 3, MinBacalhauVersion: models.BuildVersionInfo{ Major: "1", Minor: "0", GitVersion: "v1.0.4", }, EvalBrokerVisibilityTimeout: 60 * time.Second, EvalBrokerInitialRetryDelay: 1 * time.Second, EvalBrokerSubsequentRetryDelay: 30 * time.Second, EvalBrokerMaxRetryCount: 10, WorkerCount: runtime.NumCPU(), WorkerEvalDequeueTimeout: 5 * time.Second, WorkerEvalDequeueBaseBackoff: 1 * time.Second, WorkerEvalDequeueMaxBackoff: 30 * time.Second, S3PreSignedURLDisabled: false, S3PreSignedURLExpiration: 30 * time.Minute, }
View Source
var TestNodeInfoPublishConfig = routing.NodeInfoPublisherIntervalConfig{ Interval: 30 * time.Second, EagerPublishInterval: 10 * time.Millisecond, EagerPublishDuration: 5 * time.Second, }
TestNodeInfoPublishConfig speeds up node announcements for tests
View Source
var TestRequesterConfig = RequesterConfigParams{ JobDefaults: transformer.JobDefaults{ ExecutionTimeout: 30 * time.Second, }, HousekeepingBackgroundTaskInterval: 30 * time.Second, NodeRankRandomnessRange: 5, OverAskForBidsFactor: 3, MinBacalhauVersion: models.BuildVersionInfo{ Major: "1", Minor: "0", GitVersion: "v1.0.4", }, EvalBrokerVisibilityTimeout: 5 * time.Second, EvalBrokerInitialRetryDelay: 100 * time.Millisecond, EvalBrokerSubsequentRetryDelay: 100 * time.Millisecond, EvalBrokerMaxRetryCount: 3, WorkerCount: 3, WorkerEvalDequeueTimeout: 200 * time.Millisecond, WorkerEvalDequeueBaseBackoff: 20 * time.Millisecond, WorkerEvalDequeueMaxBackoff: 200 * time.Millisecond, S3PreSignedURLDisabled: false, S3PreSignedURLExpiration: 30 * time.Minute, }
Functions ¶
func GetNodeInfoPublishConfig ¶ added in v1.0.4
func GetNodeInfoPublishConfig() routing.NodeInfoPublisherIntervalConfig
Types ¶
type Compute ¶
type Compute struct { // Visible for testing ID string LocalEndpoint compute.Endpoint Capacity capacity.Tracker ExecutionStore store.ExecutionStore Executors executor.ExecutorProvider Storages storage.StorageProvider LogServer *logstream.LogStreamServer Bidder compute.Bidder // contains filtered or unexported fields }
func NewComputeNode ¶
func NewComputeNode( ctx context.Context, cleanupManager *system.CleanupManager, host host.Host, apiServer *publicapi.Server, config ComputeConfig, storages storage.StorageProvider, executors executor.ExecutorProvider, publishers publisher.PublisherProvider, fsRepo *repo.FsRepo, ) (*Compute, error)
func (*Compute) RegisterLocalComputeCallback ¶
type ComputeConfig ¶
type ComputeConfig struct { // Capacity config TotalResourceLimits models.Resources QueueResourceLimits models.Resources JobResourceLimits models.Resources DefaultJobResourceLimits models.Resources IgnorePhysicalResourceLimits bool // JobNegotiationTimeout default timeout value to hold a bid for a job JobNegotiationTimeout time.Duration // MinJobExecutionTimeout default value for the minimum execution timeout this compute node supports. Jobs with // lower timeout requirements will not be bid on. MinJobExecutionTimeout time.Duration // MaxJobExecutionTimeout default value for the maximum execution timeout this compute node supports. Jobs with // higher timeout requirements will not be bid on. MaxJobExecutionTimeout time.Duration // DefaultJobExecutionTimeout default value for the execution timeout this compute node will assign to jobs with // no timeout requirement defined. DefaultJobExecutionTimeout time.Duration // JobExecutionTimeoutClientIDBypassList is the list of clients that are allowed to bypass the job execution timeout // check. JobExecutionTimeoutClientIDBypassList []string // Bid strategies config JobSelectionPolicy JobSelectionPolicy // logging running executions LogRunningExecutionsInterval time.Duration FailureInjectionConfig model.FailureInjectionComputeConfig BidSemanticStrategy bidstrategy.SemanticBidStrategy BidResourceStrategy bidstrategy.ResourceBidStrategy ExecutionStore store.ExecutionStore }
func NewComputeConfigWith ¶
func NewComputeConfigWith(params ComputeConfigParams) (ComputeConfig, error)
func NewComputeConfigWithDefaults ¶
func NewComputeConfigWithDefaults() (ComputeConfig, error)
type ComputeConfigParams ¶
type ComputeConfigParams struct { // Capacity config TotalResourceLimits models.Resources QueueResourceLimits models.Resources JobResourceLimits models.Resources DefaultJobResourceLimits models.Resources PhysicalResourcesProvider capacity.Provider IgnorePhysicalResourceLimits bool // Timeout config JobNegotiationTimeout time.Duration MinJobExecutionTimeout time.Duration MaxJobExecutionTimeout time.Duration DefaultJobExecutionTimeout time.Duration JobExecutionTimeoutClientIDBypassList []string // Bid strategies config JobSelectionPolicy JobSelectionPolicy // logging running executions LogRunningExecutionsInterval time.Duration FailureInjectionConfig model.FailureInjectionComputeConfig BidSemanticStrategy bidstrategy.SemanticBidStrategy BidResourceStrategy bidstrategy.ResourceBidStrategy }
type ExecutorsFactory ¶
type ExecutorsFactory interface {
Get(ctx context.Context, nodeConfig NodeConfig) (executor.ExecutorProvider, error)
}
func NewPluginExecutorFactory ¶ added in v1.0.4
func NewPluginExecutorFactory() ExecutorsFactory
func NewStandardExecutorsFactory ¶
func NewStandardExecutorsFactory() ExecutorsFactory
type ExecutorsFactoryFunc ¶
type ExecutorsFactoryFunc func( ctx context.Context, nodeConfig NodeConfig, ) (executor.ExecutorProvider, error)
func (ExecutorsFactoryFunc) Get ¶
func (f ExecutorsFactoryFunc) Get( ctx context.Context, nodeConfig NodeConfig, ) (executor.ExecutorProvider, error)
type FeatureConfig ¶ added in v0.3.29
type JobSelectionPolicy ¶ added in v1.0.4
type JobSelectionPolicy struct { // this describes if we should run a job based on // where the data is located - i.e. if the data is "local" // or if the data is "anywhere" Locality semantic.JobSelectionDataLocality `json:"locality"` // should we reject jobs that don't specify any data // the default is "accept" RejectStatelessJobs bool `json:"reject_stateless_jobs"` // should we accept jobs that specify networking // the default is "reject" AcceptNetworkedJobs bool `json:"accept_networked_jobs"` // external hooks that decide if we should take on the job or not // if either of these are given they will override the data locality settings ProbeHTTP string `json:"probe_http,omitempty"` ProbeExec string `json:"probe_exec,omitempty"` }
JobSelectionPolicy describe the rules for how a compute node selects an incoming job
func NewDefaultJobSelectionPolicy ¶ added in v1.0.4
func NewDefaultJobSelectionPolicy() JobSelectionPolicy
type Node ¶
type Node struct { // Visible for testing APIServer *publicapi.Server ComputeNode *Compute RequesterNode *Requester NodeInfoStore routing.NodeInfoStore CleanupManager *system.CleanupManager IPFSClient ipfs.Client Host host.Host }
func (*Node) IsComputeNode ¶
IsComputeNode returns true if the node is a compute node
func (*Node) IsRequesterNode ¶
IsRequesterNode returns true if the node is a requester node
type NodeConfig ¶
type NodeConfig struct { IPFSClient ipfs.Client CleanupManager *system.CleanupManager Host host.Host HostAddress string APIPort uint16 RequesterAutoCert string RequesterAutoCertCache string RequesterTLSCertificateFile string RequesterTLSKeyFile string DisabledFeatures FeatureConfig ComputeConfig ComputeConfig RequesterNodeConfig RequesterConfig APIServerConfig publicapi.Config IsRequesterNode bool IsComputeNode bool Labels map[string]string NodeInfoPublisherInterval routing.NodeInfoPublisherIntervalConfig DependencyInjector NodeDependencyInjector AllowListedLocalPaths []string FsRepo *repo.FsRepo }
Node configuration
type NodeDependencyInjector ¶
type NodeDependencyInjector struct { StorageProvidersFactory StorageProvidersFactory ExecutorsFactory ExecutorsFactory PublishersFactory PublishersFactory }
Lazy node dependency injector that generate instances of different components on demand and based on the configuration provided.
func NewExecutorPluginNodeDependencyInjector ¶ added in v1.0.4
func NewExecutorPluginNodeDependencyInjector() NodeDependencyInjector
func NewStandardNodeDependencyInjector ¶
func NewStandardNodeDependencyInjector() NodeDependencyInjector
type PublishersFactory ¶
type PublishersFactory interface {
Get(ctx context.Context, nodeConfig NodeConfig) (publisher.PublisherProvider, error)
}
func NewStandardPublishersFactory ¶
func NewStandardPublishersFactory() PublishersFactory
type PublishersFactoryFunc ¶
type PublishersFactoryFunc func(ctx context.Context, nodeConfig NodeConfig) (publisher.PublisherProvider, error)
func (PublishersFactoryFunc) Get ¶
func (f PublishersFactoryFunc) Get(ctx context.Context, nodeConfig NodeConfig) (publisher.PublisherProvider, error)
type Requester ¶
type Requester struct { // Visible for testing Endpoint requester.Endpoint JobStore jobstore.Store NodeDiscoverer orchestrator.NodeDiscoverer // contains filtered or unexported fields }
func NewRequesterNode ¶
func NewRequesterNode( ctx context.Context, host host.Host, apiServer *publicapi.Server, requesterConfig RequesterConfig, storageProviders storage.StorageProvider, nodeInfoStore routing.NodeInfoStore, gossipSub *libp2p_pubsub.PubSub, fsRepo *repo.FsRepo, ) (*Requester, error)
func (*Requester) RegisterLocalComputeEndpoint ¶
type RequesterConfig ¶
type RequesterConfig struct {
RequesterConfigParams
}
func NewRequesterConfigWith ¶
func NewRequesterConfigWith(params RequesterConfigParams) (RequesterConfig, error)
func NewRequesterConfigWithDefaults ¶
func NewRequesterConfigWithDefaults() (RequesterConfig, error)
type RequesterConfigParams ¶
type RequesterConfigParams struct { JobDefaults transformer.JobDefaults HousekeepingBackgroundTaskInterval time.Duration NodeRankRandomnessRange int OverAskForBidsFactor uint JobSelectionPolicy JobSelectionPolicy ExternalValidatorWebhook *url.URL FailureInjectionConfig model.FailureInjectionRequesterConfig // minimum version of compute nodes that the requester will accept and route jobs to MinBacalhauVersion models.BuildVersionInfo RetryStrategy orchestrator.RetryStrategy // evaluation broker config EvalBrokerVisibilityTimeout time.Duration EvalBrokerInitialRetryDelay time.Duration EvalBrokerSubsequentRetryDelay time.Duration EvalBrokerMaxRetryCount int // worker config WorkerCount int WorkerEvalDequeueTimeout time.Duration WorkerEvalDequeueBaseBackoff time.Duration WorkerEvalDequeueMaxBackoff time.Duration S3PreSignedURLDisabled bool S3PreSignedURLExpiration time.Duration }
type StorageProvidersFactory ¶
type StorageProvidersFactory interface {
Get(ctx context.Context, nodeConfig NodeConfig) (storage.StorageProvider, error)
}
Interfaces to inject dependencies into the stack
func NewStandardStorageProvidersFactory ¶
func NewStandardStorageProvidersFactory() StorageProvidersFactory
Standard implementations used in prod and when testing prod behavior
type StorageProvidersFactoryFunc ¶
type StorageProvidersFactoryFunc func(ctx context.Context, nodeConfig NodeConfig) (storage.StorageProvider, error)
Functions that implement the factories for easier creation of new implementations
func (StorageProvidersFactoryFunc) Get ¶
func (f StorageProvidersFactoryFunc) Get(ctx context.Context, nodeConfig NodeConfig) (storage.StorageProvider, error)
Click to show internal directories.
Click to hide internal directories.