Documentation ¶
Index ¶
- type Config
- type ConnectionManager
- type ControlPlane
- type ControlPlaneParams
- type DataPlane
- type DataPlaneParams
- type HealthTracker
- func (ht *HealthTracker) GetHealth() nclprotocol.ConnectionHealth
- func (ht *HealthTracker) GetState() nclprotocol.ConnectionState
- func (ht *HealthTracker) HandshakeRequired()
- func (ht *HealthTracker) HeartbeatSuccess()
- func (ht *HealthTracker) IsHandshakeRequired() bool
- func (ht *HealthTracker) MarkConnected()
- func (ht *HealthTracker) MarkConnecting()
- func (ht *HealthTracker) MarkDisconnected(err error)
- func (ht *HealthTracker) UpdateSuccess()
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { NodeID string ClientFactory nats.ClientFactory NodeInfoProvider models.NodeInfoProvider MessageSerializer envelope.MessageSerializer MessageRegistry *envelope.Registry // Control plane config ReconnectInterval time.Duration HeartbeatInterval time.Duration HeartbeatMissFactor int NodeInfoUpdateInterval time.Duration RequestTimeout time.Duration ReconnectBackoff backoff.Backoff // Data plane config DataPlaneMessageHandler ncl.MessageHandler // Handles incoming messages DataPlaneMessageCreator nclprotocol.MessageCreator // Creates messages for sending EventStore watcher.EventStore DispatcherConfig dispatcher.Config LogStreamServer logstream.Server // Checkpoint config Checkpointer nclprotocol.Checkpointer CheckpointInterval time.Duration Clock clock.Clock }
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns a new Config with default values
func (*Config) SetDefaults ¶
func (c *Config) SetDefaults()
type ConnectionManager ¶
type ConnectionManager struct {
// contains filtered or unexported fields
}
ConnectionManager handles the lifecycle of a compute node's connection to the orchestrator. It manages the complete connection lifecycle including:
- Initial connection and handshake
- Connection health monitoring
- Automated reconnection with backoff
- Control and data plane management
- Connection state transitions
func NewConnectionManager ¶
func NewConnectionManager(cfg Config) (*ConnectionManager, error)
NewConnectionManager creates a new connection manager with the given configuration. It initializes the manager but does not start any connections - Start() must be called.
func (*ConnectionManager) Close ¶
func (cm *ConnectionManager) Close(ctx context.Context) error
Close gracefully shuts down the connection manager and all its components. It waits for background goroutines to complete or until the context is cancelled.
func (*ConnectionManager) GetHealth ¶
func (cm *ConnectionManager) GetHealth() nclprotocol.ConnectionHealth
GetHealth returns the current health status of the connection including: - Timestamps of last successful operations - Current connection state - Error counts and details
func (*ConnectionManager) OnStateChange ¶
func (cm *ConnectionManager) OnStateChange(handler nclprotocol.ConnectionStateHandler)
OnStateChange registers a new handler to be called when the connection state changes. Handlers are called synchronously when state transitions occur.
type ControlPlane ¶
type ControlPlane struct {
// contains filtered or unexported fields
}
ControlPlane manages the periodic control operations between a compute node and the orchestrator. It is responsible for: - Sending periodic heartbeats to indicate node health - Updating node information when changes occur - Maintaining checkpoints of message processing progress
func NewControlPlane ¶
func NewControlPlane(params ControlPlaneParams) (*ControlPlane, error)
NewControlPlane creates a new ControlPlane instance with the provided parameters. It initializes the control plane but does not start any background operations.
type ControlPlaneParams ¶
type ControlPlaneParams struct { Config Config Requester ncl.Publisher // For sending control messages HealthTracker *HealthTracker // For health monitoring IncomingSeqTracker *nclprotocol.SequenceTracker // For sequence tracking CheckpointName string // For checkpoint identification }
ControlPlaneParams encapsulates all dependencies needed to create a new ControlPlane
type DataPlane ¶
type DataPlane struct { // Core messaging components Client *nats.Conn // NATS connection for messaging Publisher ncl.OrderedPublisher // Handles ordered message publishing Dispatcher *dispatcher.Dispatcher // Manages event watching and dispatch // contains filtered or unexported fields }
DataPlane manages the data transfer operations between a compute node and the orchestrator. It is responsible for: - Setting up and managing the log streaming server - Reliable message publishing through ordered publisher - Event watching and dispatching - Maintaining message sequence ordering
func NewDataPlane ¶
func NewDataPlane(params DataPlaneParams) (*DataPlane, error)
NewDataPlane creates a new DataPlane instance with the provided parameters. It initializes the data plane but does not start any operations - Start() must be called.
func (*DataPlane) Start ¶
Start initializes and begins data plane operations. This includes: 1. Setting up the log stream server for job output streaming 2. Creating an ordered publisher for reliable message delivery 3. Setting up event watching and dispatching 4. Starting the dispatcher
Note that message subscriber and handler are not started here, as they must be started during the handshake and before the data plane is started to avoid message loss.
If any component fails to initialize, cleanup is performed before returning error.
type DataPlaneParams ¶
type DataPlaneParams struct { Config Config Client *nats.Conn // NATS client connection LastReceivedSeqNum uint64 // Initial sequence number for message ordering }
DataPlaneParams encapsulates the parameters needed to create a new DataPlane
type HealthTracker ¶
type HealthTracker struct {
// contains filtered or unexported fields
}
HealthTracker monitors connection health and maintains status metrics. Thread-safe and uses an injectable clock for testing.
func NewHealthTracker ¶
func NewHealthTracker(clock clock.Clock) *HealthTracker
NewHealthTracker creates a new health tracker with the given clock
func (*HealthTracker) GetHealth ¶
func (ht *HealthTracker) GetHealth() nclprotocol.ConnectionHealth
GetHealth returns a copy of current health status
func (*HealthTracker) GetState ¶
func (ht *HealthTracker) GetState() nclprotocol.ConnectionState
GetState returns current connection state
func (*HealthTracker) HandshakeRequired ¶
func (ht *HealthTracker) HandshakeRequired()
HandshakeRequired marks that a handshake is required
func (*HealthTracker) HeartbeatSuccess ¶
func (ht *HealthTracker) HeartbeatSuccess()
HeartbeatSuccess records successful heartbeat
func (*HealthTracker) IsHandshakeRequired ¶
func (ht *HealthTracker) IsHandshakeRequired() bool
IsHandshakeRequired returns true if a handshake is required
func (*HealthTracker) MarkConnected ¶
func (ht *HealthTracker) MarkConnected()
MarkConnected updates status when connection is established
func (*HealthTracker) MarkConnecting ¶
func (ht *HealthTracker) MarkConnecting()
MarkConnecting update status when connection is in progress
func (*HealthTracker) MarkDisconnected ¶
func (ht *HealthTracker) MarkDisconnected(err error)
MarkDisconnected updates status when connection is lost
func (*HealthTracker) UpdateSuccess ¶
func (ht *HealthTracker) UpdateSuccess()
UpdateSuccess records successful node info update