orchestrator

package
v1.6.0-rc1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 11, 2024 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ComputeManager

type ComputeManager struct {
	// contains filtered or unexported fields
}

ComputeManager handles the lifecycle and state management of all compute nodes connected to this orchestrator. It is responsible for: - Processing compute node handshakes and connections - Managing individual node data planes - Coordinating message flow between orchestrator and compute nodes - Tracking node health and connection state

func NewComputeManager

func NewComputeManager(cfg Config) (*ComputeManager, error)

NewComputeManager creates a new compute manager with the given configuration. The manager must be started with Start() before it begins processing connections.

func (*ComputeManager) Start

func (cm *ComputeManager) Start(ctx context.Context) error

Start initializes the manager and begins processing compute node connections. This includes: 1. Creating NATS connection 2. Setting up control plane responder 3. Registering message handlers 4. Setting up node state change handling

func (*ComputeManager) Stop

func (cm *ComputeManager) Stop(ctx context.Context) error

Stop gracefully shuts down the manager and all compute node connections. It ensures proper cleanup by: 1. Stopping the control plane responder 2. Stopping all data planes 3. Waiting for background goroutines to complete

type Config

type Config struct {
	// NodeID uniquely identifies this orchestrator instance
	NodeID string

	// ClientFactory creates NATS clients for transport connections
	ClientFactory natsutil.ClientFactory

	// NodeManager handles compute node lifecycle and state management
	NodeManager nodes.Manager

	// Message serialization and type registration
	MessageRegistry   *envelope.Registry         // Registry of message types for serialization
	MessageSerializer envelope.MessageSerializer // Handles message envelope serialization

	// Control plane timeouts and intervals
	HeartbeatTimeout      time.Duration // Maximum time to wait for node heartbeat before considering it disconnected
	NodeCleanupInterval   time.Duration // How often to check for and cleanup disconnected nodes
	RequestHandlerTimeout time.Duration // Timeout for handling individual control plane requests

	// Data plane configuration
	DataPlaneMessageHandler        ncl.MessageHandler                // Handles incoming messages from compute nodes
	DataPlaneMessageCreatorFactory nclprotocol.MessageCreatorFactory // Creates message creators for outgoing messages
	EventStore                     watcher.EventStore                // Store for watching and dispatching events
	DispatcherConfig               dispatcher.Config                 // Configuration for the event dispatcher
}

Config defines the configuration for the orchestrator's transport layer. It contains settings for both the control plane (node management, heartbeats) and data plane (message handling, event dispatching) components.

func DefaultConfig

func DefaultConfig() Config

func (*Config) Validate

func (c *Config) Validate() error

Validate checks if the configuration is valid by verifying: - Required fields are set - Timeouts and intervals are positive - Component configurations are valid

type DataPlane

type DataPlane struct {
	// contains filtered or unexported fields
}

DataPlane manages the message flow between orchestrator and a single compute node. It handles: - Reliable message delivery through ordered publisher - Sequence tracking for both incoming and outgoing messages - Event watching and dispatching Each DataPlane instance corresponds to one compute node connection.

func NewDataPlane

func NewDataPlane(config DataPlaneConfig) (*DataPlane, error)

NewDataPlane creates a new DataPlane instance for a compute node.

func (*DataPlane) GetLastProcessedSequence

func (dp *DataPlane) GetLastProcessedSequence() uint64

GetLastProcessedSequence returns the last sequence number processed from incoming messages from this compute node

func (*DataPlane) Start

func (dp *DataPlane) Start(ctx context.Context) error

Start initializes and begins data plane operations. This includes: 1. Creating subscriber for compute node messages 2. Creating ordered publisher for reliable delivery 3. Setting up event watching and dispatching 4. Starting all components in correct order

func (*DataPlane) Stop

func (dp *DataPlane) Stop(ctx context.Context) error

Stop gracefully shuts down all data plane operations. It ensures proper cleanup of resources by stopping components in correct order: dispatcher -> subscriber -> publisher

type DataPlaneConfig

type DataPlaneConfig struct {
	NodeID string     // ID of the compute node this data plane serves
	Client *nats.Conn // NATS connection

	// Message handling
	MessageHandler        ncl.MessageHandler
	MessageCreatorFactory nclprotocol.MessageCreatorFactory
	MessageRegistry       *envelope.Registry
	MessageSerializer     envelope.MessageSerializer

	// Event tracking
	EventStore  watcher.EventStore
	StartSeqNum uint64 // Initial sequence for event watching

	// Dispatcher settings
	DispatcherConfig dispatcher.Config
}

DataPlaneConfig defines the configuration for a DataPlane instance. Each config corresponds to a single compute node connection.

func (*DataPlaneConfig) Validate

func (c *DataPlaneConfig) Validate() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL