Documentation ¶
Overview ¶
Package compute provides transport layer implementation for compute nodes using the legacy bprotocol over NATS. This package will be deprecated in future releases in favor of a new transport implementation.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { // NodeID uniquely identifies this compute node in the cluster NodeID string // ClientFactory creates NATS client connections with the appropriate settings ClientFactory natsutil.ClientFactory // NodeInfoProvider supplies current node information for registration and updates NodeInfoProvider models.NodeInfoProvider // HeartbeatConfig controls heartbeat timing and behavior HeartbeatConfig types.Heartbeat // ComputeEndpoint handles incoming compute requests ComputeEndpoint compute.Endpoint // EventStore provides access to the event log for dispatching updates EventStore watcher.EventStore }
Config defines the configuration and dependencies required to set up the transport layer for a compute node.
type ConnectionManager ¶
type ConnectionManager struct {
// contains filtered or unexported fields
}
ConnectionManager coordinates all transport-related components for a compute node, including: - NATS connection management - Heartbeat publishing - Node registration and updates - Event dispatching to orchestrator
func NewConnectionManager ¶
func NewConnectionManager(config Config) (*ConnectionManager, error)
NewConnectionManager creates a new ConnectionManager with the given configuration. The manager will not be active until Start is called.
func (*ConnectionManager) Start ¶
func (cm *ConnectionManager) Start(ctx context.Context) error
Start initializes all transport components in the following order: 1. Establishes NATS connection 2. Sets up compute request handling 3. Initializes heartbeat publishing 4. Registers with orchestrator 5. Starts event dispatching
If any step fails, all initialized components are cleaned up via Stop.
func (*ConnectionManager) Stop ¶
func (cm *ConnectionManager) Stop(ctx context.Context)
Stop gracefully shuts down all transport components in the reverse order of initialization. Any errors during shutdown are logged but not returned as they cannot be meaningfully handled at this point.
The order of shutdown is important to prevent message loss: 1. Stop event dispatcher to prevent new messages 2. Stop management client to prevent new registrations 3. Stop heartbeat client to prevent false positive disconnections 4. Close NATS connection
type HeartbeatClient ¶
type HeartbeatClient struct {
// contains filtered or unexported fields
}
func NewHeartbeatClient ¶
func NewHeartbeatClient(nodeID string, publisher ncl.Publisher) (*HeartbeatClient, error)
func (*HeartbeatClient) SendHeartbeat ¶
func (h *HeartbeatClient) SendHeartbeat(ctx context.Context, sequence uint64) error
type ManagementClient ¶
type ManagementClient struct {
// contains filtered or unexported fields
}
ManagementClient is used to call management functions with the requester nodes, via the NATS transport. When `Start`ed it will periodically send an update to the requester node with the latest node info for this node.
func NewManagementClient ¶
func NewManagementClient(params *ManagementClientParams) *ManagementClient
func (*ManagementClient) RegisterNode ¶
func (m *ManagementClient) RegisterNode(ctx context.Context) error
RegisterNode sends a registration request to the requester node. If we successfully register, a sentinel file is created to indicate that we are registered. If present the requester node will know it is already registered. If not present, it will attempt to register again, expecting the requester node to gracefully handle any previous registrations.
func (*ManagementClient) Start ¶
func (m *ManagementClient) Start(ctx context.Context)
func (*ManagementClient) Stop ¶
func (m *ManagementClient) Stop()
type ManagementClientParams ¶
type ManagementClientParams struct { NodeInfoProvider models.NodeInfoProvider ManagementProxy bprotocol.ManagementEndpoint HeartbeatClient *HeartbeatClient HeartbeatConfig types.Heartbeat }