orchestrator

package
v1.6.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 12, 2025 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Overview

Package orchestrator provides transport layer implementation for orchestrator nodes using the legacy bprotocol over NATS. This package will be deprecated in future releases in favor of a new transport implementation.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// NodeID uniquely identifies this orchestrator node in the cluster
	NodeID string

	//NatsConn is the NATS connection to use for communication
	NatsConn *nats.Conn

	// NodeManager handles node discovery, tracking, and health monitoring
	NodeManager nodes.Manager

	// EventStore provides access to the event log for dispatching updates
	EventStore watcher.EventStore

	// ProtocolRouter determines message routing based on node protocol support
	ProtocolRouter *watchers.ProtocolRouter

	// Callback handles responses from compute nodes (bids, results, etc.)
	Callback *orchestrator.Callback
}

Config defines the configuration and dependencies required to set up the transport layer for an orchestrator node.

type ConnectionManager

type ConnectionManager struct {
	// contains filtered or unexported fields
}

ConnectionManager coordinates all transport-related components for an orchestrator node, including: - NATS connection management - Heartbeat monitoring - Node management endpoints - Event dispatching to compute nodes

func NewConnectionManager

func NewConnectionManager(config Config) (*ConnectionManager, error)

NewConnectionManager creates a new ConnectionManager with the given configuration. The manager will not be active until Start is called.

func (*ConnectionManager) Start

func (cm *ConnectionManager) Start(ctx context.Context) error

Start initializes all transport components in the following order: 1. Sets up heartbeat monitoring 2. Initializes management endpoints for compute node registration 3. Creates compute proxy for job distribution 4. Sets up callback handling for compute responses 5. Starts event dispatching

If any step fails, all initialized components are cleaned up via Stop.

func (*ConnectionManager) Stop

func (cm *ConnectionManager) Stop(ctx context.Context)

Stop gracefully shuts down all transport components in the reverse order of initialization. Any errors during shutdown are logged but not returned as they cannot be meaningfully handled at this point.

The order of shutdown is important to prevent message loss: 1. Stop event dispatcher to prevent new messages to compute nodes 2. Stop heartbeat subscriber to prevent false node state updates

type Server

type Server struct {
	// contains filtered or unexported fields
}

func NewServer

func NewServer(manager nodes.Manager) *Server

func (*Server) HandleMessage

func (h *Server) HandleMessage(ctx context.Context, message *envelope.Message) error

HandleMessage processes NCL messages and routes them to the appropriate handler

func (*Server) Heartbeat

func (h *Server) Heartbeat(ctx context.Context, request legacy.Heartbeat) error

func (*Server) Register

func (h *Server) Register(ctx context.Context, request legacy.RegisterRequest) (*legacy.RegisterResponse, error)

Register handles compute node registration requests

func (*Server) ShouldProcess

func (h *Server) ShouldProcess(ctx context.Context, message *envelope.Message) bool

func (*Server) UpdateInfo

func (h *Server) UpdateInfo(ctx context.Context, request legacy.UpdateInfoRequest) (*legacy.UpdateInfoResponse, error)

UpdateInfo handles compute node info update requests

func (*Server) UpdateResources

UpdateResources stores the latest resource information for a node and forwards it to the manager

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL