Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ComputeManager ¶
type ComputeManager struct {
// contains filtered or unexported fields
}
ComputeManager handles the lifecycle and state management of all compute nodes connected to this orchestrator. It is responsible for: - Processing compute node handshakes and connections - Managing individual node data planes - Coordinating message flow between orchestrator and compute nodes - Tracking node health and connection state
func NewComputeManager ¶
func NewComputeManager(cfg Config) (*ComputeManager, error)
NewComputeManager creates a new compute manager with the given configuration. The manager must be started with Start() before it begins processing connections.
func (*ComputeManager) Start ¶
func (cm *ComputeManager) Start(ctx context.Context) error
Start initializes the manager and begins processing compute node connections. This includes: 1. Creating NATS connection 2. Setting up control plane responder 3. Registering message handlers 4. Setting up node state change handling
func (*ComputeManager) Stop ¶
func (cm *ComputeManager) Stop(ctx context.Context) error
Stop gracefully shuts down the manager and all compute node connections. It ensures proper cleanup by: 1. Stopping the control plane responder 2. Stopping all data planes 3. Waiting for background goroutines to complete
type Config ¶
type Config struct { // NodeID uniquely identifies this orchestrator instance NodeID string // ClientFactory creates NATS clients for transport connections ClientFactory natsutil.ClientFactory // NodeManager handles compute node lifecycle and state management NodeManager nodes.Manager // Message serialization and type registration MessageRegistry *envelope.Registry // Registry of message types for serialization MessageSerializer envelope.MessageSerializer // Handles message envelope serialization // Control plane timeouts and intervals HeartbeatTimeout time.Duration // Maximum time to wait for node heartbeat before considering it disconnected NodeCleanupInterval time.Duration // How often to check for and cleanup disconnected nodes RequestHandlerTimeout time.Duration // Timeout for handling individual control plane requests // Data plane configuration DataPlaneMessageHandler ncl.MessageHandler // Handles incoming messages from compute nodes DataPlaneMessageCreatorFactory nclprotocol.MessageCreatorFactory // Creates message creators for outgoing messages EventStore watcher.EventStore // Store for watching and dispatching events DispatcherConfig dispatcher.Config // Configuration for the event dispatcher }
Config defines the configuration for the orchestrator's transport layer. It contains settings for both the control plane (node management, heartbeats) and data plane (message handling, event dispatching) components.
func DefaultConfig ¶
func DefaultConfig() Config
type DataPlane ¶
type DataPlane struct {
// contains filtered or unexported fields
}
DataPlane manages the message flow between orchestrator and a single compute node. It handles: - Reliable message delivery through ordered publisher - Sequence tracking for both incoming and outgoing messages - Event watching and dispatching Each DataPlane instance corresponds to one compute node connection.
func NewDataPlane ¶
func NewDataPlane(config DataPlaneConfig) (*DataPlane, error)
NewDataPlane creates a new DataPlane instance for a compute node.
func (*DataPlane) GetLastProcessedSequence ¶
GetLastProcessedSequence returns the last sequence number processed from incoming messages from this compute node
type DataPlaneConfig ¶
type DataPlaneConfig struct { NodeID string // ID of the compute node this data plane serves Client *nats.Conn // NATS connection // Message handling MessageHandler ncl.MessageHandler MessageCreatorFactory nclprotocol.MessageCreatorFactory MessageRegistry *envelope.Registry MessageSerializer envelope.MessageSerializer // Event tracking EventStore watcher.EventStore StartSeqNum uint64 // Initial sequence for event watching // Dispatcher settings DispatcherConfig dispatcher.Config }
DataPlaneConfig defines the configuration for a DataPlane instance. Each config corresponds to a single compute node connection.
func (*DataPlaneConfig) Validate ¶
func (c *DataPlaneConfig) Validate() error