node

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 30, 2024 License: Apache-2.0 Imports: 42 Imported by: 0

README

node

Table of Contents

  1. Description
  2. Structure and Organisation
  3. Class Diagram
  4. Functionality
  5. Data Types
  6. Testing
  7. Proposed Functionality/Requirements
  8. References

Specification

proposed Description

This package is responsible for creation of a Node object which is the main actor residing on the machine as long as DMS is running. The Node gets created when the DMS is onboarded.

The Node is responsible for:

  • Communicating with other actors (nodes and allocations) via messages. This will include sending bid requests, bids, invocations, job status etc

  • Checking used and free resource before creating allocations

  • Continuous monitoring of the machine

Structure and Organisation

Here is quick overview of the contents of this pacakge:

  • README: Current file which is aimed towards developers who wish to use and modify the DMS functionality.
Class Diagram

The class diagram for the node package is shown below.

Source file

node Class Diagram

Rendered from source file
!$rootUrlGitlab = "https://gitlab.com/nunet/device-management-service/-/raw/main"
!$packageRelativePath = "/dms/node"
!$packageUrlGitlab = $rootUrlGitlab + $packageRelativePath
 
!include $packageUrlGitlab/specs/class_diagram.puml
Functionality

TBD

Note: the functionality of DMS is being currently developed. See the proposed section for the suggested design of interfaces and methods.

Data Types

TBD

Note: the functionality of DMS is being currently developed. See the proposed section for the suggested data types.

Testing

proposed Refer to *_test.go files for unit tests of different functionalities.

Proposed Functionality / Requirements
List of issues

All issues that are related to the implementation of dms package can be found below. These include any proposals for modifications to the package or new functionality needed to cover the requirements of other packages.

Interfaces & Methods
proposed Node_interface
type Node_interface interface {
	// Extends Actor interface from types package;
	// which implements message passing logic

	// Somewhat similar to the libp2p.Host interface in the current implementation;
	// (however, network.libp2p package is too low level for this interface in the 
	// new architecture)
	
	types.Actor

    // extends the Orchestrator interface from orchestrator sub-package
	dms.orchestrator.Orchestrator

	// extends Vertex interface, so connecting Actor model with Graph computing model...
	dms.graph.Vertex_interface

	// each node will hold a structure with allocations that will be running on it
	// allocation type is specified in jobs package
	getAllocation(allocationID dms.jobs.Allocation.allocationID) dms.jobs.Allocation

	// allocations will be short-lived objects (depending on the requirements of a job pertaining to that allocation)
	// but (ideally) all allocations that have ever been initiated on a node
	// need to be logged into the local database
	checkAllocationStatus()
	
	// routes message to the allocation of the job that is running on the machine
	// routeToAllocation()

	// below methods related to COMPUTE PROVIDER functionality (mostly)
	// All setters and getters are not included into global dms class diagram to make
	// it more compact

	// benchmark Capability of the machine on which the Node is running
	benchmarkCapability()

	// save benchmarked Capability into persistent data store for further retrieval
	setRegisteredCapability()

	// get all registered capability of the node
	getRegisteredCapability()

	// set available Capability, given the registered Capability and the new allocation
	setAvailableCapability()

	// return currently available capability of the node
	getAvailableCapability()

    // reserve resources for a job
	lockCapability(dms.jobs.Pod, dms.Capability)

	// get all locked Capabilities
	getLockedCapabilities()

	// set preferences of a node in the form of Capability Comparator
	setPreferences()

	getPreferences() dms.orchestrator.CapabilityComparator

	// below methods are related to SERVICE PROVIDER functionality (mostly)
	getRegisteredBids(bidRequestID types.ID) []dms.orchestrator.Bid

	// start a new allocation
	startAllocation(dms.orchestrator.Invocation) 

}

getAllocation method retrieves an Allocation on the machine based on the provided AllocationID.

checkAllocationStatus method will retrieve status of an Allocation.

routeToAllocation method will route a message to the Allocation of the job that is running on the machine.

benchmarkCapability method will perform machine benchmarking

setRegisteredCapability method will record the benchmarked Capability of the machine into a persistent data store for retrieval and usage (mostly in job orchestration functionality)

getRegisteredCapability method will retrieve the benchmarked Capability of the machine from the persistent data store.

setAvailableCapability method changes the available capability of the machine when resources are locked

getAvailableCapability method will return currently available capability of the node

lockCapability method will lock certain amount of resources for a job. This can happen during bid submission. But it must happen once job is accepted and before invocation.

getLockedCapabilities method retrieves the locked capabilities of the machine.

setPreferences method sets the preferences of a node as dms.orchestrator.CapabilityComparator

getPreferences method retrieves the node preferences as dms.orchestrator.CapabilityComparator

getRegisteredBids method retrieves list of bids receieved for a job.

startAllocation method will create an allocation based on the invocation received.

Data types
proposed dms.node.Node

An initial data model for Node is defined below.

type Node struct {
	// unique identifier
	id dms.node.nodeID

	// unique mailbox
	mailbox types.Mailbox
	
	// access to the local events database on the node
	db db.LocalDatabaseCollector // a type allowing to issue queries to our local database

	// registered capability will be a slow changing value
	// most probably it will not change for the whole lifecycle of a DMS
	// therefore it will be written into the persistent database
	// however it is good to have it in the cache, since it may be used often
	registeredCapability dms.Capability

	// available capability can be always calculated / recalculated
	// by: (1) taking the value of registered Capability (from persistent database)
	// (2) adding all capabilities of current Allocations of the node and locked capabilities
	// available capability will be (1) - (2)..
	// it can also extracted via database queries
	// lets consider that the current available capability will always be cached as a variable too
	// readily available for the Node interface
	availableCapability dms.Capability

	// part of the machine that is reserved for future jobs
	// it is the difference between registered capability and available capability
	// can be calculated readily, but it is good to have it cached too.
	// for orchestration purposes
	lockedCapabilities map[dms.jobs.Job]dms.Capability

	// each node has its preferences encoded into CapabilityComparator type
	// which will be used directly in the functions which compare job requirements with availableCapabilities
	// note that Capabilities include price
	// these types and variables are introduced in order to create ability for each node
	// to autonomously 'choose' best options as per individually defined preferences
	// the initial implementation based on these types could be pretty trivial
	// but we want this to be extendable to unlimited complexity of behaviors
	preferences dms.orchestrator.CapabilityComparator

	allocations slice[dms.jobs.AllocationID] // list of all allocations running on the node

	// indices of activity
	// we may need to have more indexes
	// note, that all information in these indexes in principle
	// should be re-constructable from the local database 
	jobIndex dms.jobs.JobIndex // index of all accepted job posts

 	// index of all sent/received bid requests
	// (note that a Node can both receive and send bids depending on the mode)
	// (if to implement this in one or more data structure -- shall be decided by developers)
	bidRequestsSentIndex dms.orchestrator.BidRequestsSentIndex
	
    bidRequestsReceivedIndex dms.orchestrator.BidRequestsReceivedIndex

	// index of all received/sent bids
	// (note, that a dms can both receive and send bids, depending on which role
	// it takes in the orchestration -- compute provider or service provider;
    // both, however are available via the same dms package
	// and may even be combined -- possibly in the future)
	bidsSentIndex dms.orchestrator.BidsSentIndex
	
    bidsReceivedIndex dms.orchestrator.BidsReceivedIndex

}

proposed dms.node.NodeID
type NodeID struct {
	// ID is unique identifier created by DMS 
    ID types.ID.UUID
   
    // CID is a Content identifier that can be used in DHT or otherwise
    CID types.ID.CID

    // libp2p peerID
	PeerID string

	// DID is the decentralized identifier that can be used for authentication and authorization
	DID string 
}
References

Documentation

Index

Constants

View Source
const (
	DefaultContextName = "dms"
	UserContextName    = "user"
	KeystoreDir        = "key/"
	CapstoreDir        = "cap/"
)
View Source
const (
	NewDeploymentBehavior = "/dms/node/deployment/new"

	// Minimum time for deployment
	MinDeploymentTime = time.Minute - time.Second

	RestoreDeadlineCommitting   = 1 * time.Minute
	RestoreDeadlineProvisioning = 1 * time.Minute
	RestoreDeadlineRunning      = 5 * time.Minute
)
View Source
const (
	PeersListBehavior    = "/dms/node/peers/list"
	PeerAddrInfoBehavior = "/dms/node/peers/self"
	PeerPingBehavior     = "/dms/node/peers/ping"
	PeerDHTBehavior      = "/dms/node/peers/dht"
	PeerConnectBehavior  = "/dms/node/peers/connect"
	PeerScoreBehavior    = "/dms/node/peers/score"

	OnboardBehavior       = "/dms/node/onboarding/onboard"
	OffboardBehavior      = "/dms/node/onboarding/offboard"
	OnboardStatusBehavior = "/dms/node/onboarding/status"

	ContainerStartBehavior = "/dms/node/container/start"
	ContainerStopBehavior  = "/dms/node/container/stop"
	ContainerListBehavior  = "/dms/node/container/list"

	VMStartBehavior = "/dms/node/vm/start/custom"
	VMStopBehavior  = "/dms/node/vm/stop"
	VMListBehavior  = "/dms/node/vm/list"

	DeploymentListBehavior     = "/dms/node/deployment/list"
	DeploymentStatusBehavior   = "/dms/node/deployment/status"
	DeploymentLogsBehavior     = "/dms/node/deployment/logs"
	DeploymentManifestBehavior = "/dms/node/deployment/manifest"
	DeploymentShutdownBehavior = "/dms/node/deployment/shutdown"

	ResourcesAllocatedBehavior = "/dms/node/resources/allocated"
	ResourcesFreeBehavior      = "/dms/node/resources/free"
	ResourcesOnboardedBehavior = "/dms/node/resources/onboarded"

	HardwareSpecBehavior  = "/dms/node/hardware/spec"
	HardwareUsageBehavior = "/dms/node/hardware/usage"

	LoggerConfigBehavior      = "/dms/node/logger/config"
	RestartAllocationBehavior = "/dms/node/allocation/restart"
	StopAllocationBehavior    = "/dms/node/allocation/stop"

	CapListBehavior   = "/dms/cap/list"
	CapAnchorBehavior = "/dms/cap/anchor"
)
View Source
const (
	PublicHelloBehavior    = "/public/hello"
	PublicStatusBehavior   = "/public/status"
	BroadcastHelloBehavior = "/broadcast/hello"
	BroadcastHelloTopic    = "/nunet/hello"
)

Variables

View Source
var (
	ErrTODO                 = errors.New("TODO")
	ErrDeploymentNotFound   = errors.New("deployment not found")
	ErrDeploymentNotRunning = errors.New("deployment not running")
)

Functions

func CreateTrustContextFromKeyStore

func CreateTrustContextFromKeyStore(afs afero.Afero, contextKey string, cfg *config.Config) (did.TrustContext, crypto.PrivKey, error)

func IsLedgerContext

func IsLedgerContext(context string) bool

func LedgerContext

func LedgerContext(context string) string

func LoadCapabilityContext

func LoadCapabilityContext(trustCtx did.TrustContext, name string, cfg *config.Config) (ucan.CapabilityContext, error)

func SaveCapabilityContext

func SaveCapabilityContext(capCtx ucan.CapabilityContext, cfg *config.Config) error

Types

type CapAnchorRequest

type CapAnchorRequest struct {
	Root    []did.DID
	Require ucan.TokenList
	Provide ucan.TokenList
	Revoke  ucan.TokenList
}

type CapAnchorResponse

type CapAnchorResponse struct {
	OK    bool
	Error string
}

type CapListRequest

type CapListRequest struct {
	Context string
}

type CapListResponse

type CapListResponse struct {
	OK      bool
	Error   string
	Roots   []did.DID
	Require ucan.TokenList
	Provide ucan.TokenList
	Revoke  ucan.TokenList
}

type CustomVMStartRequest

type CustomVMStartRequest struct {
	Execution types.ExecutionRequest
}

type CustomVMStartResponse

type CustomVMStartResponse struct {
	Error string
}

type DeploymentListResponse

type DeploymentListResponse struct {
	Deployments map[string]string
}

type DeploymentLogsRequest

type DeploymentLogsRequest struct {
	EnsembleID     string
	AllocationName string
}

type DeploymentLogsResponse

type DeploymentLogsResponse struct {
	LogsWrittenTo string
	Error         string
}

type DeploymentManifestRequest

type DeploymentManifestRequest struct {
	ID string
}

type DeploymentManifestResponse

type DeploymentManifestResponse struct {
	Manifest jobs.EnsembleManifest
	Error    string
}

type DeploymentShutdownRequest

type DeploymentShutdownRequest struct {
	ID string
}

type DeploymentShutdownResponse

type DeploymentShutdownResponse struct {
	OK    bool
	Error string
}

type DeploymentStatusRequest

type DeploymentStatusRequest struct {
	ID string
}

type DeploymentStatusResponse

type DeploymentStatusResponse struct {
	Status string
	Error  string
}

type HelloResponse

type HelloResponse struct {
	DID did.DID
}

type HostGeolocation

type HostGeolocation struct {
	HostContinent string
	HostCountry   string
	HostCity      string
}

type ListVMResponse

type ListVMResponse struct {
	Error         string
	VMS           []types.ExecutionListItem
	ExecutionType jobs.AllocationExecutor
}

type LoggerConfigRequest

type LoggerConfigRequest struct {
	Interval       int    `json:"interval,omitempty"`
	URL            string `json:"url,omitempty"`
	Level          string `json:"level,omitempty"`
	APIKey         string `json:"api_key,omitempty"`
	APMURL         string `json:"apm_url,omitempty"`
	ElasticEnabled *bool  `json:"elastic_enabled,omitempty"`
}

type LoggerConfigResponse

type LoggerConfigResponse struct {
	Error string `json:"error,omitempty"`
	OK    bool
}

type NewDeploymentRequest

type NewDeploymentRequest struct {
	Ensemble job_types.EnsembleConfig
}

type NewDeploymentResponse

type NewDeploymentResponse struct {
	Status     string
	EnsembleID string `json:",omitempty"`
	Error      string `json:",omitempty"`
}

type Node

type Node struct {
	// contains filtered or unexported fields
}

Node is the structure that holds the node's dependencies.

func New

func New(cfg config.Config, fs afero.Afero,
	onboarder *onboarding.Onboarding,
	rootCap ucan.CapabilityContext,
	hostID string, net network.Network,
	resourceManager types.ResourceManager,
	scheduler *bt.Scheduler,
	hardware types.HardwareManager,
	orchestratorRepo repositories.OrchestratorView,
	geoip types.GeoIPLocator, hostLocation HostGeolocation, portConfig PortConfig,
	contractRepo repositories.Contract,
) (*Node, error)

New creates a new node, attaches an actor to the node.

func (*Node) ExecutorAvailable

func (n *Node) ExecutorAvailable(execType jobs.AllocationExecutor) bool

ExecutorAvailable returns the availability of a specific executor.

func (*Node) GetAllocation

func (n *Node) GetAllocation(id string) (*jobs.Allocation, error)

GetAllocation gets an allocation by id.

func (*Node) GetAllocations

func (n *Node) GetAllocations() []*jobs.Allocation

GetAllocations returns a list of allocations in the node.

func (*Node) GetBidRequests

func (n *Node) GetBidRequests() []jobs.BidRequest

GetBidRequests returns the bid requests for the node.

func (*Node) ResourceManager

func (n *Node) ResourceManager() types.ResourceManager

func (*Node) Start

func (n *Node) Start() error

Start node

func (*Node) Stop

func (n *Node) Stop() error

Stop node

type OffboardRequest

type OffboardRequest struct{}

type OffboardResponse

type OffboardResponse struct {
	Success bool   `json:"success"`
	Error   string `json:"error,omitempty"`
}

type OnboardRequest

type OnboardRequest struct {
	NoGPU  bool
	GPUs   string
	Config types.OnboardingConfig
}

type OnboardResponse

type OnboardResponse struct {
	Success bool                   `json:"success"`
	Error   string                 `json:"error,omitempty"`
	Config  types.OnboardingConfig `json:"config,omitempty"`
}

type OnboardStatusResponse

type OnboardStatusResponse struct {
	Onboarded bool   `json:"onboarded"`
	Error     string `json:"error,omitempty"`
}

type PeerAddrInfoResponse

type PeerAddrInfoResponse struct {
	ID      string `json:"id"`
	Address string `json:"listen_addr"`
}

type PeerConnectRequest

type PeerConnectRequest struct {
	Address string
}

type PeerConnectResponse

type PeerConnectResponse struct {
	Status string
	Error  string
}

type PeerDHTResponse

type PeerDHTResponse struct {
	Peers []kbucket.PeerInfo
}

type PeerScoreResponse

type PeerScoreResponse struct {
	Score map[string]*network.PeerScoreSnapshot
}

type PeersListResponse

type PeersListResponse struct {
	Peers []peer.ID
}

type PingRequest

type PingRequest struct {
	Host string
}

type PingResponse

type PingResponse struct {
	Error string
	RTT   int64
}

type PortAllocator

type PortAllocator struct {
	// contains filtered or unexported fields
}

PortAllocator keeps track of port allocations and manages state.

func NewPortAllocator

func NewPortAllocator(config PortConfig) *PortAllocator

NewPortAllocator initializes a new PortAllocator with a PortConfig.

func (*PortAllocator) AllocatePorts

func (pa *PortAllocator) AllocatePorts(allocationID string, ports []int) error

AllocatePorts allocates the requested ports, associating them with an allocation. If it's not possible to allocate one of the ports, an error is returned and no ports are allocated.

func (*PortAllocator) AllocateRandom

func (pa *PortAllocator) AllocateRandom(allocationID string, numPorts int) ([]int, error)

AllocateRandom allocates the requested number of ports and associates them with the allocation ID.

func (*PortAllocator) Allocated

func (pa *PortAllocator) Allocated(ports []int) bool

Allocated checks if the given ports are already allocated.

func (*PortAllocator) GetAllocation

func (pa *PortAllocator) GetAllocation(allocationID string) ([]int, error)

GetAllocations returns the allocated ports for a specific allocation ID.

func (*PortAllocator) PortsAvailable

func (pa *PortAllocator) PortsAvailable(numPorts int) bool

func (*PortAllocator) Release

func (pa *PortAllocator) Release(allocationID string)

Release releases the ports associated with the allocation ID.

type PortConfig

type PortConfig struct {
	AvailableRangeFrom int
	AvailableRangeTo   int
}

type PublicStatusResponse

type PublicStatusResponse struct {
	Status    string
	Resources types.Resources
}

type VMStopRequest

type VMStopRequest struct {
	ExecutionID   string
	ExecutionType jobs.AllocationExecutor
}

type VMStopResponse

type VMStopResponse struct {
	Error string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL