compute

package
v1.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 16, 2024 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Overview

Package compute provides transport layer implementation for compute nodes using the legacy bprotocol over NATS. This package will be deprecated in future releases in favor of a new transport implementation.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// NodeID uniquely identifies this compute node in the cluster
	NodeID string

	// ClientFactory creates NATS client connections with the appropriate settings
	ClientFactory natsutil.ClientFactory

	// NodeInfoProvider supplies current node information for registration and updates
	NodeInfoProvider models.NodeInfoProvider

	// HeartbeatConfig controls heartbeat timing and behavior
	HeartbeatConfig types.Heartbeat

	// ComputeEndpoint handles incoming compute requests
	ComputeEndpoint compute.Endpoint

	// EventStore provides access to the event log for dispatching updates
	EventStore watcher.EventStore
}

Config defines the configuration and dependencies required to set up the transport layer for a compute node.

type ConnectionManager

type ConnectionManager struct {
	// contains filtered or unexported fields
}

ConnectionManager coordinates all transport-related components for a compute node, including: - NATS connection management - Heartbeat publishing - Node registration and updates - Event dispatching to orchestrator

func NewConnectionManager

func NewConnectionManager(config Config) (*ConnectionManager, error)

NewConnectionManager creates a new ConnectionManager with the given configuration. The manager will not be active until Start is called.

func (*ConnectionManager) Start

func (cm *ConnectionManager) Start(ctx context.Context) error

Start initializes all transport components in the following order: 1. Establishes NATS connection 2. Sets up compute request handling 3. Initializes heartbeat publishing 4. Registers with orchestrator 5. Starts event dispatching

If any step fails, all initialized components are cleaned up via Stop.

func (*ConnectionManager) Stop

func (cm *ConnectionManager) Stop(ctx context.Context)

Stop gracefully shuts down all transport components in the reverse order of initialization. Any errors during shutdown are logged but not returned as they cannot be meaningfully handled at this point.

The order of shutdown is important to prevent message loss: 1. Stop event dispatcher to prevent new messages 2. Stop management client to prevent new registrations 3. Stop heartbeat client to prevent false positive disconnections 4. Close NATS connection

type HeartbeatClient

type HeartbeatClient struct {
	// contains filtered or unexported fields
}

func NewHeartbeatClient

func NewHeartbeatClient(nodeID string, publisher ncl.Publisher) (*HeartbeatClient, error)

func (*HeartbeatClient) Close

func (h *HeartbeatClient) Close(ctx context.Context) error

func (*HeartbeatClient) SendHeartbeat

func (h *HeartbeatClient) SendHeartbeat(ctx context.Context, sequence uint64) error

type ManagementClient

type ManagementClient struct {
	// contains filtered or unexported fields
}

ManagementClient is used to call management functions with the requester nodes, via the NATS transport. When `Start`ed it will periodically send an update to the requester node with the latest node info for this node.

func NewManagementClient

func NewManagementClient(params *ManagementClientParams) *ManagementClient

func (*ManagementClient) RegisterNode

func (m *ManagementClient) RegisterNode(ctx context.Context) error

RegisterNode sends a registration request to the requester node. If we successfully register, a sentinel file is created to indicate that we are registered. If present the requester node will know it is already registered. If not present, it will attempt to register again, expecting the requester node to gracefully handle any previous registrations.

func (*ManagementClient) Start

func (m *ManagementClient) Start(ctx context.Context)

func (*ManagementClient) Stop

func (m *ManagementClient) Stop()

type ManagementClientParams

type ManagementClientParams struct {
	NodeInfoProvider models.NodeInfoProvider
	ManagementProxy  bprotocol.ManagementEndpoint
	HeartbeatClient  *HeartbeatClient
	HeartbeatConfig  types.Heartbeat
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL