Documentation ¶
Index ¶
- Constants
- Variables
- type Compute
- type ComputeConfig
- type ComputeConfigParams
- type ExecutorsFactory
- type ExecutorsFactoryFunc
- type Node
- type NodeConfig
- type NodeDependencyInjector
- type PublishersFactory
- type PublishersFactoryFunc
- type Requester
- type RequesterConfig
- type RequesterConfigParams
- type StandardExecutorsFactory
- type StandardPublishersFactory
- type StandardStorageProvidersFactory
- type StandardVerifiersFactory
- type StorageProvidersFactory
- type StorageProvidersFactoryFunc
- type VerifiersFactory
- type VerifiersFactoryFunc
Constants ¶
View Source
const DefaultNodeInfoPublisherInterval = 30 * time.Second
View Source
const JobEventsTopic = "bacalhau-job-events"
View Source
const NodeInfoTopic = "bacalhau-node-info"
Variables ¶
View Source
var DefaultComputeConfig = ComputeConfigParams{ PhysicalResourcesProvider: system.NewPhysicalCapacityProvider(), DefaultJobResourceLimits: model.ResourceUsageData{ CPU: 0.1, Memory: 100 * 1024 * 1024, }, ExecutorBufferBackoffDuration: 50 * time.Millisecond, JobNegotiationTimeout: 3 * time.Minute, MinJobExecutionTimeout: 500 * time.Millisecond, MaxJobExecutionTimeout: 60 * time.Minute, DefaultJobExecutionTimeout: 10 * time.Minute, LogRunningExecutionsInterval: 10 * time.Second, }
View Source
var DefaultRequesterConfig = RequesterConfigParams{ MinJobExecutionTimeout: 0 * time.Second, DefaultJobExecutionTimeout: 30 * time.Minute, HousekeepingBackgroundTaskInterval: 30 * time.Second, NodeRankRandomnessRange: 10, MinBacalhauVersion: model.BuildVersionInfo{ Major: "0", Minor: "3", GitVersion: "v0.3.20", }, }
Functions ¶
This section is empty.
Types ¶
type Compute ¶
type Compute struct { // Visible for testing LocalEndpoint compute.Endpoint Capacity capacity.Tracker ExecutionStore store.ExecutionStore Executors executor.ExecutorProvider // contains filtered or unexported fields }
func NewComputeNode ¶
func NewComputeNode( ctx context.Context, cleanupManager *system.CleanupManager, host host.Host, apiServer *publicapi.APIServer, config ComputeConfig, simulatorNodeID string, simulatorRequestHandler *simulator.RequestHandler, storages storage.StorageProvider, executors executor.ExecutorProvider, verifiers verifier.VerifierProvider, publishers publisher.PublisherProvider) (*Compute, error)
func (*Compute) RegisterLocalComputeCallback ¶
type ComputeConfig ¶
type ComputeConfig struct { // Capacity config TotalResourceLimits model.ResourceUsageData QueueResourceLimits model.ResourceUsageData JobResourceLimits model.ResourceUsageData DefaultJobResourceLimits model.ResourceUsageData IgnorePhysicalResourceLimits bool // How long the buffer would backoff before polling the queue again for new jobs ExecutorBufferBackoffDuration time.Duration // JobNegotiationTimeout default timeout value to hold a bid for a job JobNegotiationTimeout time.Duration // MinJobExecutionTimeout default value for the minimum execution timeout this compute node supports. Jobs with // lower timeout requirements will not be bid on. MinJobExecutionTimeout time.Duration // MaxJobExecutionTimeout default value for the maximum execution timeout this compute node supports. Jobs with // higher timeout requirements will not be bid on. MaxJobExecutionTimeout time.Duration // DefaultJobExecutionTimeout default value for the execution timeout this compute node will assign to jobs with // no timeout requirement defined. DefaultJobExecutionTimeout time.Duration // JobExecutionTimeoutClientIDBypassList is the list of clients that are allowed to bypass the job execution timeout // check. JobExecutionTimeoutClientIDBypassList []string // Bid strategies config JobSelectionPolicy model.JobSelectionPolicy // logging running executions LogRunningExecutionsInterval time.Duration SimulatorConfig model.SimulatorConfigCompute }
func NewComputeConfigWith ¶
func NewComputeConfigWith(params ComputeConfigParams) (config ComputeConfig)
func NewComputeConfigWithDefaults ¶
func NewComputeConfigWithDefaults() ComputeConfig
type ComputeConfigParams ¶
type ComputeConfigParams struct { // Capacity config TotalResourceLimits model.ResourceUsageData QueueResourceLimits model.ResourceUsageData JobResourceLimits model.ResourceUsageData DefaultJobResourceLimits model.ResourceUsageData PhysicalResourcesProvider capacity.Provider IgnorePhysicalResourceLimits bool ExecutorBufferBackoffDuration time.Duration // Timeout config JobNegotiationTimeout time.Duration MinJobExecutionTimeout time.Duration MaxJobExecutionTimeout time.Duration DefaultJobExecutionTimeout time.Duration JobExecutionTimeoutClientIDBypassList []string // Bid strategies config JobSelectionPolicy model.JobSelectionPolicy // logging running executions LogRunningExecutionsInterval time.Duration SimulatorConfig model.SimulatorConfigCompute }
type ExecutorsFactory ¶
type ExecutorsFactory interface {
Get(ctx context.Context, nodeConfig NodeConfig) (executor.ExecutorProvider, error)
}
type ExecutorsFactoryFunc ¶
type ExecutorsFactoryFunc func(ctx context.Context, nodeConfig NodeConfig) (executor.ExecutorProvider, error)
func (ExecutorsFactoryFunc) Get ¶
func (f ExecutorsFactoryFunc) Get(ctx context.Context, nodeConfig NodeConfig) (executor.ExecutorProvider, error)
type Node ¶
type Node struct { // Visible for testing APIServer *publicapi.APIServer ComputeNode *Compute RequesterNode *Requester NodeInfoStore routing.NodeInfoStore CleanupManager *system.CleanupManager IPFSClient ipfs.Client Host host.Host }
func NewNode ¶
func NewNode( ctx context.Context, config NodeConfig, injector NodeDependencyInjector) (*Node, error)
func NewStandardNode ¶
func NewStandardNode( ctx context.Context, config NodeConfig) (*Node, error)
func (*Node) IsComputeNode ¶
IsComputeNode returns true if the node is a compute node
func (*Node) IsRequesterNode ¶
IsRequesterNode returns true if the node is a requester node
type NodeConfig ¶
type NodeConfig struct { IPFSClient ipfs.Client CleanupManager *system.CleanupManager JobStore jobstore.Store Host host.Host FilecoinUnsealedPath string EstuaryAPIKey string HostAddress string APIPort int ComputeConfig ComputeConfig RequesterNodeConfig RequesterConfig APIServerConfig publicapi.APIServerConfig LotusConfig *filecoinlotus.PublisherConfig SimulatorNodeID string IsRequesterNode bool IsComputeNode bool Labels map[string]string NodeInfoPublisherInterval time.Duration }
Node configuration
type NodeDependencyInjector ¶
type NodeDependencyInjector struct { StorageProvidersFactory StorageProvidersFactory ExecutorsFactory ExecutorsFactory VerifiersFactory VerifiersFactory PublishersFactory PublishersFactory }
Lazy node dependency injector that generate instances of different components on demand and based on the configuration provided.
func NewStandardNodeDependencyInjector ¶
func NewStandardNodeDependencyInjector() NodeDependencyInjector
type PublishersFactory ¶
type PublishersFactory interface { Get(ctx context.Context, nodeConfig NodeConfig) (publisher.PublisherProvider, error) }
type PublishersFactoryFunc ¶
type PublishersFactoryFunc func( ctx context.Context, nodeConfig NodeConfig) (publisher.PublisherProvider, error)
func (PublishersFactoryFunc) Get ¶
func (f PublishersFactoryFunc) Get( ctx context.Context, nodeConfig NodeConfig) (publisher.PublisherProvider, error)
type Requester ¶
type Requester struct { // Visible for testing Endpoint requester.Endpoint JobStore jobstore.Store // contains filtered or unexported fields }
func NewRequesterNode ¶
func NewRequesterNode( ctx context.Context, cleanupManager *system.CleanupManager, host host.Host, apiServer *publicapi.APIServer, config RequesterConfig, jobStore jobstore.Store, simulatorNodeID string, simulatorRequestHandler *simulator.RequestHandler, verifiers verifier.VerifierProvider, storageProviders storage.StorageProvider, gossipSub *libp2p_pubsub.PubSub, nodeInfoStore routing.NodeInfoStore, ) (*Requester, error)
func (*Requester) RegisterLocalComputeEndpoint ¶
type RequesterConfig ¶
type RequesterConfig struct { // MinJobExecutionTimeout requester will replace any job execution timeout that is less than this // value with DefaultJobExecutionTimeout. MinJobExecutionTimeout time.Duration // DefaultJobExecutionTimeout default value for running, verifying and publishing job results, // if the user didn't define one in the spec DefaultJobExecutionTimeout time.Duration // HousekeepingBackgroundTaskInterval background task interval that periodically checks for expired states HousekeepingBackgroundTaskInterval time.Duration // NodeRankRandomnessRange defines the range of randomness used to rank nodes NodeRankRandomnessRange int JobSelectionPolicy model.JobSelectionPolicy SimulatorConfig model.SimulatorConfigRequester // minimum version of compute nodes that the requester will accept and route jobs to MinBacalhauVersion model.BuildVersionInfo }
func NewRequesterConfigWith ¶
func NewRequesterConfigWith(params RequesterConfigParams) (config RequesterConfig)
func NewRequesterConfigWithDefaults ¶
func NewRequesterConfigWithDefaults() RequesterConfig
type RequesterConfigParams ¶
type RequesterConfigParams struct { // Timeout config MinJobExecutionTimeout time.Duration DefaultJobExecutionTimeout time.Duration HousekeepingBackgroundTaskInterval time.Duration NodeRankRandomnessRange int JobSelectionPolicy model.JobSelectionPolicy SimulatorConfig model.SimulatorConfigRequester // minimum version of compute nodes that the requester will accept and route jobs to MinBacalhauVersion model.BuildVersionInfo }
type StandardExecutorsFactory ¶
type StandardExecutorsFactory struct{}
func NewStandardExecutorsFactory ¶
func NewStandardExecutorsFactory() *StandardExecutorsFactory
func (*StandardExecutorsFactory) Get ¶
func (f *StandardExecutorsFactory) Get( ctx context.Context, nodeConfig NodeConfig) (executor.ExecutorProvider, error)
type StandardPublishersFactory ¶
type StandardPublishersFactory struct{}
func NewStandardPublishersFactory ¶
func NewStandardPublishersFactory() *StandardPublishersFactory
func (*StandardPublishersFactory) Get ¶
func (f *StandardPublishersFactory) Get( ctx context.Context, nodeConfig NodeConfig) (publisher.PublisherProvider, error)
type StandardStorageProvidersFactory ¶
type StandardStorageProvidersFactory struct{}
Standard implementations used in prod and when testing prod behavior
func NewStandardStorageProvidersFactory ¶
func NewStandardStorageProvidersFactory() *StandardStorageProvidersFactory
func (*StandardStorageProvidersFactory) Get ¶
func (f *StandardStorageProvidersFactory) Get( ctx context.Context, nodeConfig NodeConfig) (storage.StorageProvider, error)
type StandardVerifiersFactory ¶
type StandardVerifiersFactory struct{}
func NewStandardVerifiersFactory ¶
func NewStandardVerifiersFactory() *StandardVerifiersFactory
func (*StandardVerifiersFactory) Get ¶
func (f *StandardVerifiersFactory) Get( ctx context.Context, nodeConfig NodeConfig) (verifier.VerifierProvider, error)
type StorageProvidersFactory ¶
type StorageProvidersFactory interface {
Get(ctx context.Context, nodeConfig NodeConfig) (storage.StorageProvider, error)
}
Interfaces to inject dependencies into the stack
type StorageProvidersFactoryFunc ¶
type StorageProvidersFactoryFunc func( ctx context.Context, nodeConfig NodeConfig) (storage.StorageProvider, error)
Functions that implement the factories for easier creation of new implementations
func (StorageProvidersFactoryFunc) Get ¶
func (f StorageProvidersFactoryFunc) Get( ctx context.Context, nodeConfig NodeConfig) (storage.StorageProvider, error)
type VerifiersFactory ¶
type VerifiersFactory interface { Get(ctx context.Context, nodeConfig NodeConfig) (verifier.VerifierProvider, error) }
type VerifiersFactoryFunc ¶
type VerifiersFactoryFunc func( ctx context.Context, nodeConfig NodeConfig) (verifier.VerifierProvider, error)
func (VerifiersFactoryFunc) Get ¶
func (f VerifiersFactoryFunc) Get( ctx context.Context, nodeConfig NodeConfig) (verifier.VerifierProvider, error)
Click to show internal directories.
Click to hide internal directories.