compute

package
v1.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 16, 2024 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	NodeID           string
	ClientFactory    nats.ClientFactory
	NodeInfoProvider models.NodeInfoProvider

	MessageSerializer envelope.MessageSerializer
	MessageRegistry   *envelope.Registry

	// Control plane config
	ReconnectInterval      time.Duration
	HeartbeatInterval      time.Duration
	HeartbeatMissFactor    int
	NodeInfoUpdateInterval time.Duration
	RequestTimeout         time.Duration
	ReconnectBackoff       backoff.Backoff

	// Data plane config
	DataPlaneMessageHandler ncl.MessageHandler         // Handles incoming messages
	DataPlaneMessageCreator nclprotocol.MessageCreator // Creates messages for sending
	EventStore              watcher.EventStore
	DispatcherConfig        dispatcher.Config
	LogStreamServer         logstream.Server

	// Checkpoint config
	Checkpointer       nclprotocol.Checkpointer
	CheckpointInterval time.Duration

	Clock clock.Clock
}

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns a new Config with default values

func (*Config) SetDefaults

func (c *Config) SetDefaults()

func (*Config) Validate

func (c *Config) Validate() error

Validate checks if the config is valid

type ConnectionManager

type ConnectionManager struct {
	// contains filtered or unexported fields
}

ConnectionManager handles the lifecycle of a compute node's connection to the orchestrator. It manages the complete connection lifecycle including:

  • Initial connection and handshake
  • Connection health monitoring
  • Automated reconnection with backoff
  • Control and data plane management
  • Connection state transitions

func NewConnectionManager

func NewConnectionManager(cfg Config) (*ConnectionManager, error)

NewConnectionManager creates a new connection manager with the given configuration. It initializes the manager but does not start any connections - Start() must be called.

func (*ConnectionManager) Close

func (cm *ConnectionManager) Close(ctx context.Context) error

Close gracefully shuts down the connection manager and all its components. It waits for background goroutines to complete or until the context is cancelled.

func (*ConnectionManager) GetHealth

GetHealth returns the current health status of the connection including: - Timestamps of last successful operations - Current connection state - Error counts and details

func (*ConnectionManager) OnStateChange

func (cm *ConnectionManager) OnStateChange(handler nclprotocol.ConnectionStateHandler)

OnStateChange registers a new handler to be called when the connection state changes. Handlers are called synchronously when state transitions occur.

func (*ConnectionManager) Start

func (cm *ConnectionManager) Start(ctx context.Context) error

Start begins the connection management process. It launches background goroutines for: - Connection maintenance - Heartbeat sending - Node info updates

type ControlPlane

type ControlPlane struct {
	// contains filtered or unexported fields
}

ControlPlane manages the periodic control operations between a compute node and the orchestrator. It is responsible for: - Sending periodic heartbeats to indicate node health - Updating node information when changes occur - Maintaining checkpoints of message processing progress

func NewControlPlane

func NewControlPlane(params ControlPlaneParams) (*ControlPlane, error)

NewControlPlane creates a new ControlPlane instance with the provided parameters. It initializes the control plane but does not start any background operations.

func (*ControlPlane) Start

func (cp *ControlPlane) Start(ctx context.Context) error

Start begins the control plane operations. It launches a background goroutine that manages periodic tasks: - Heartbeat sending - Node info updates - Progress checkpointing

func (*ControlPlane) Stop

func (cp *ControlPlane) Stop(ctx context.Context) error

Stop gracefully shuts down the control plane and waits for background operations to complete or until the context is cancelled.

type ControlPlaneParams

type ControlPlaneParams struct {
	Config             Config
	Requester          ncl.Publisher                // For sending control messages
	HealthTracker      *HealthTracker               // For health monitoring
	IncomingSeqTracker *nclprotocol.SequenceTracker // For sequence tracking
	CheckpointName     string                       // For checkpoint identification
}

ControlPlaneParams encapsulates all dependencies needed to create a new ControlPlane

type DataPlane

type DataPlane struct {

	// Core messaging components
	Client     *nats.Conn             // NATS connection for messaging
	Publisher  ncl.OrderedPublisher   // Handles ordered message publishing
	Dispatcher *dispatcher.Dispatcher // Manages event watching and dispatch
	// contains filtered or unexported fields
}

DataPlane manages the data transfer operations between a compute node and the orchestrator. It is responsible for: - Setting up and managing the log streaming server - Reliable message publishing through ordered publisher - Event watching and dispatching - Maintaining message sequence ordering

func NewDataPlane

func NewDataPlane(params DataPlaneParams) (*DataPlane, error)

NewDataPlane creates a new DataPlane instance with the provided parameters. It initializes the data plane but does not start any operations - Start() must be called.

func (*DataPlane) IsRunning

func (dp *DataPlane) IsRunning() bool

IsRunning returns true if the data plane is currently running.

func (*DataPlane) Start

func (dp *DataPlane) Start(ctx context.Context) error

Start initializes and begins data plane operations. This includes: 1. Setting up the log stream server for job output streaming 2. Creating an ordered publisher for reliable message delivery 3. Setting up event watching and dispatching 4. Starting the dispatcher

Note that message subscriber and handler are not started here, as they must be started during the handshake and before the data plane is started to avoid message loss.

If any component fails to initialize, cleanup is performed before returning error.

func (*DataPlane) Stop

func (dp *DataPlane) Stop(ctx context.Context) error

Stop gracefully shuts down all data plane operations. It ensures proper cleanup of resources by: 1. Stopping the dispatcher 2. Closing the publisher Any errors during cleanup are collected and returned.

type DataPlaneParams

type DataPlaneParams struct {
	Config             Config
	Client             *nats.Conn // NATS client connection
	LastReceivedSeqNum uint64     // Initial sequence number for message ordering
}

DataPlaneParams encapsulates the parameters needed to create a new DataPlane

type HealthTracker

type HealthTracker struct {
	// contains filtered or unexported fields
}

HealthTracker monitors connection health and maintains status metrics. Thread-safe and uses an injectable clock for testing.

func NewHealthTracker

func NewHealthTracker(clock clock.Clock) *HealthTracker

NewHealthTracker creates a new health tracker with the given clock

func (*HealthTracker) GetHealth

func (ht *HealthTracker) GetHealth() nclprotocol.ConnectionHealth

GetHealth returns a copy of current health status

func (*HealthTracker) GetState

func (ht *HealthTracker) GetState() nclprotocol.ConnectionState

GetState returns current connection state

func (*HealthTracker) HandshakeRequired

func (ht *HealthTracker) HandshakeRequired()

HandshakeRequired marks that a handshake is required

func (*HealthTracker) HeartbeatSuccess

func (ht *HealthTracker) HeartbeatSuccess()

HeartbeatSuccess records successful heartbeat

func (*HealthTracker) IsHandshakeRequired

func (ht *HealthTracker) IsHandshakeRequired() bool

IsHandshakeRequired returns true if a handshake is required

func (*HealthTracker) MarkConnected

func (ht *HealthTracker) MarkConnected()

MarkConnected updates status when connection is established

func (*HealthTracker) MarkConnecting

func (ht *HealthTracker) MarkConnecting()

MarkConnecting update status when connection is in progress

func (*HealthTracker) MarkDisconnected

func (ht *HealthTracker) MarkDisconnected(err error)

MarkDisconnected updates status when connection is lost

func (*HealthTracker) UpdateSuccess

func (ht *HealthTracker) UpdateSuccess()

UpdateSuccess records successful node info update

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL