nclprotocol

package
v1.6.0-rc3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 11, 2024 License: Apache-2.0 Imports: 12 Imported by: 0

README

NCL Protocol Documentation

The NCL (NATS Client Library) Protocol manages reliable bidirectional communication between compute nodes and orchestrators in the Bacalhau network. It provides ordered async message delivery, connection health monitoring, and automatic recovery from failures.

Table of Contents

  1. Definitions & Key Concepts
  2. Architecture Overview
  3. Message Sequencing
  4. Connection Lifecycle
  5. Message Contracts
  6. Communication Flows
  7. Component Dependencies
  8. Configuration
  9. Glossary

Definitions & Key Concepts

Events and Messages
  • Event: An immutable record of a state change in the local system
  • Message: A communication packet sent between nodes derived from events
  • Sequence Number: A monotonically increasing identifier for ordering events and messages
Node Information
  • Node ID: Unique identifier for each compute node
  • Resources: Computational resources like CPU, Memory, GPU
  • Available Capacity: Currently free resources on a node
  • Queue Used Capacity: Resources allocated to queued jobs
Connection States
  • Disconnected: No active connection, no message processing
  • Connecting: Attempting to establish connection
  • Connected: Active message processing and health monitoring

Transitions between states occur based on:

  • Successful/failed handshakes
  • Missing heartbeats
  • Network failures
  • Explicit disconnection

Architecture Overview

The protocol consists of two main planes:

Control Plane
  • Handles connection establishment and health monitoring
  • Manages periodic heartbeats and node info updates
  • Maintains connection state and health metrics
  • Handles checkpointing for recovery
Data Plane
  • Provides reliable, ordered message delivery
  • Manages event watching and dispatching
  • Tracks message sequences for both sides
  • Handles recovery from network failures
NATS Subject Structure
bacalhau.global.compute.<nodeID>.in.msgs  - Messages to compute node
bacalhau.global.compute.<nodeID>.out.msgs - Messages from compute node
bacalhau.global.compute.<nodeID>.out.ctrl - Control messages from compute
bacalhau.global.compute.*.out.ctrl       - Global control channel

Message Sequencing

Overview

The NCL protocol integrates with a local event watcher system to decouple event processing from message delivery. Each node maintains its own ordered ledger of events that the protocol watches and selectively publishes. This decoupling provides several benefits:

  • Clean separation between business logic and message transport
  • Reliable local event ordering
  • Simple checkpointing and recovery
  • Built-in replay capabilities
Event Flow Architecture
Local Event Store          NCL Protocol              Remote Node
┌──────────────┐    ┌─────────────────────┐    ┌──────────────┐
│              │    │  1. Watch Events     │    │              │
│  Ordered     │◄───┤  2. Filter Relevant  │    │              │
│  Event       │    │  3. Create Messages  │───►│   Receive    │
│  Ledger      │    │  4. Track Sequences  │    │   Process    │
│              │    │  5. Checkpoint       │    │              │
└──────────────┘    └─────────────────────┘    └──────────────┘
Key Components
  1. Event Store

    • Maintains ordered sequence of all local events
    • Each event has unique monotonic sequence number
    • Supports seeking and replay from any position
  2. Event Watcher

    • Watches event store for new entries
    • Filters events relevant for transport
    • Supports resuming from checkpoint
  3. Message Dispatcher

    • Creates messages from events
    • Manages reliable delivery
    • Tracks publish acknowledgments

Connection Lifecycle

Initial Connection
  1. Handshake

    • Compute node initiates connection by sending HandshakeRequest
    • Includes node info, start time, and last processed sequence number
    • Orchestrator validates request and accepts/rejects connection
    • On acceptance, orchestrator creates dedicated data plane for node
  2. Data Plane Setup

    • Both sides establish message subscriptions
    • Create ordered publishers for reliable delivery
    • Initialize event watchers and dispatchers
    • Set up sequence tracking
Ongoing Communication
  1. Health Monitoring

    • Compute nodes send periodic heartbeats
    • Include current capacity and last processed sequence
    • Orchestrator tracks node health and connection state
    • Missing heartbeats trigger disconnection
  2. Node Info Updates

    • Compute nodes send updates when configuration changes
    • Includes updated capacity, features, labels
    • Orchestrator maintains current node state
  3. Message Flow

    • Data flows through separate control/data subjects
    • Messages include sequence numbers for ordering
    • Both sides track processed sequences
    • Failed deliveries trigger automatic recovery

Message Contracts

Handshake Messages
// Request sent by compute node to initiate connection
HandshakeRequest {
    NodeInfo: models.NodeInfo
    StartTime: Time
    LastOrchestratorSeqNum: uint64
}

// Response from orchestrator
HandshakeResponse {
    Accepted: boolean
    Reason: string          // Only set if not accepted
    LastComputeSeqNum: uint64
}
Heartbeat Messages
// Periodic heartbeat from compute node
HeartbeatRequest {
    NodeID: string
    AvailableCapacity: Resources
    QueueUsedCapacity: Resources
    LastOrchestratorSeqNum: uint64
}

// Acknowledgment from orchestrator
HeartbeatResponse {
    LastComputeSeqNum: uint64
}
Node Info Update Messages
// Node info update notification
UpdateNodeInfoRequest {
    NodeInfo: NodeInfo  // Same structure as in HandshakeRequest
}

UpdateNodeInfoResponse {
    Accepted: boolean
    Reason: string     // Only set if not accepted
}

Communication Flows

Initial Connection and Handshake

The following sequence shows the initial connection establishment between compute node and orchestrator:

sequenceDiagram
    participant C as Compute Node
    participant O as Orchestrator

    Note over C,O: Connection Establishment
    C->>O: HandshakeRequest(NodeInfo, StartTime, LastSeqNum)
    
    Note over O: Validate Node
    alt Valid Node
        O->>O: Create Data Plane
        O->>O: Setup Message Handlers
        O-->>C: HandshakeResponse(Accepted=true, LastSeqNum)
        
        Note over C: Setup Data Plane
        C->>C: Start Control Plane
        C->>C: Initialize Data Plane
        
        Note over C,O: Begin Regular Communication
        C->>O: Initial Heartbeat
        O-->>C: HeartbeatResponse
    else Invalid Node
        O-->>C: HandshakeResponse(Accepted=false, Reason)
        Note over C: Retry with backoff
    end
Regular Operation Flow

The following sequence shows the ongoing communication pattern between compute node and orchestrator, including periodic health checks and configuration updates:

sequenceDiagram
    participant C as Compute Node
    participant O as Orchestrator

    rect rgb(200, 230, 200)
        Note over C,O: Periodic Health Monitoring
        loop Every HeartbeatInterval
            C->>O: HeartbeatRequest(NodeID, Capacity, LastSeqNum)
            O-->>C: HeartbeatResponse()
        end
    end

    rect rgb(230, 200, 200)
        Note over C,O: Node Info Updates
        C->>C: Detect Config Change
        C->>O: UpdateNodeInfoRequest(NewNodeInfo)
        O-->>C: UpdateNodeInfoResponse(Accepted)
    end

    rect rgb(200, 200, 230)
        Note over C,O: Data Plane Messages
        O->>C: Execution Messages (with SeqNum)
        C->>O: Result Messages (with SeqNum)
        Note over C,O: Both track sequence numbers
    end

During regular operation:

  • Heartbeats occur every HeartbeatInterval (default 15s)
  • Configuration changes trigger immediate updates
  • Data plane messages flow continuously in both directions
  • Both sides maintain sequence tracking and acknowledgments
Failure Recover Flow

The protocol provides comprehensive failure recovery through several mechanisms:

sequenceDiagram
    participant C as Compute Node
    participant O as Orchestrator

    rect rgb(240, 200, 200)
        Note over C,O: Network Failure
        C->>O: HeartbeatRequest
        x--xO: Connection Lost
        
        Note over C: Detect Missing Response
        C->>C: Mark Disconnected
        C->>C: Stop Data Plane
        
        Note over O: Detect Missing Heartbeats
        O->>O: Mark Node Disconnected
        O->>O: Cleanup Node Resources
    end

    rect rgb(200, 240, 200)
        Note over C,O: Recovery
        loop Until Connected
            Note over C: Exponential Backoff
            C->>O: HandshakeRequest(LastSeqNum)
            O-->>C: HandshakeResponse(Accepted)
        end

        Note over C,O: Resume from Last Checkpoint
        Note over C: Restart Data Plane
        Note over O: Recreate Node Resources
    end
Failure Detection
  • Missing heartbeats beyond threshold
  • NATS connection failures
  • Message publish failures
Recovery Process
  1. Both sides independently detect failure
  2. Clean up existing resources
  3. Compute node initiates reconnection
  4. Resume from last checkpoint:
    • Load last checkpoint sequence
    • Resume event watching
    • Rebuild publish state
    • Resend pending messages
  5. Continue normal operation

This process ensures:

  • No events are lost
  • Messages remain ordered
  • Efficient recovery
  • At-least-once delivery

Component Dependencies

Compute Node Components:
ConnectionManager
├── ControlPlane
│   ├── NodeInfoProvider
│   │   └── Monitors node state changes
│   ├── MessageHandler
│   │   └── Processes control messages
│   └── Checkpointer
│       └── Saves progress state
└── DataPlane
    ├── LogStreamServer
    │   └── Handles job output streaming
    ├── MessageHandler
    │   └── Processes execution messages
    ├── MessageCreator
    │   └── Formats outgoing messages
    └── EventStore
        └── Tracks execution events
Orchestrator Components:
ComputeManager
├── NodeManager
│   ├── Tracks node states
│   └── Manages node lifecycle
├── MessageHandler
│   └── Processes node messages
├── MessageCreatorFactory
│   └── Creates per-node message handlers
└── DataPlane (per node)
    ├── Subscriber
    │   └── Handles incoming messages
    ├── Publisher
    │   └── Sends ordered messages
    └── Dispatcher
        └── Watches and sends events

Configuration

Connection Management
  • HeartbeatInterval: How often compute nodes send heartbeats (default: 15s)
  • HeartbeatMissFactor: Number of missed heartbeats before disconnection (default: 5)
  • NodeInfoUpdateInterval: How often node info updates are checked (default: 60s)
  • RequestTimeout: Timeout for individual requests (default: 10s)
Recovery Settings
  • ReconnectInterval: Base interval between reconnection attempts (default: 10s)
  • BaseRetryInterval: Initial retry delay after failure (default: 5s)
  • MaxRetryInterval: Maximum retry delay (default: 5m)
Data Plane Settings
  • CheckpointInterval: How often sequence progress is saved (default: 30s)

Glossary

  • Checkpoint: A saved position in the event sequence used for recovery
  • Handshake: Initial connection protocol between compute node and orchestrator
  • Heartbeat: Periodic health check message from compute node to orchestrator
  • Node Info: Current state and capabilities of a compute node
  • Sequence Number: Monotonically increasing identifier used for message ordering

Documentation

Overview

Package nclprotocol is a generated GoMock package.

Index

Constants

View Source
const (
	KeySeqNum = "Bacalhau-SeqNum"
)

Variables

This section is empty.

Functions

func CreateMessageRegistry

func CreateMessageRegistry() (*envelope.Registry, error)

CreateMessageRegistry creates a new payload registry.

func GenerateMsgID

func GenerateMsgID(event watcher.Event) string

GenerateMsgID Message ID generation helper

func MustCreateMessageRegistry

func MustCreateMessageRegistry() *envelope.Registry

MustCreateMessageRegistry creates a new payload registry.

func NatsSubjectComputeInMsgs

func NatsSubjectComputeInMsgs(computeNodeID string) string

func NatsSubjectComputeOutCtrl

func NatsSubjectComputeOutCtrl(computeNodeID string) string

func NatsSubjectComputeOutMsgs

func NatsSubjectComputeOutMsgs(computeNodeID string) string

func NatsSubjectOrchestratorInCtrl

func NatsSubjectOrchestratorInCtrl() string

func NatsSubjectOrchestratorInMsgs

func NatsSubjectOrchestratorInMsgs(computeNodeID string) string

func NatsSubjectOrchestratorOutMsgs

func NatsSubjectOrchestratorOutMsgs(computeNodeID string) string

Types

type Checkpointer

type Checkpointer interface {
	Checkpoint(ctx context.Context, name string, sequenceNumber uint64) error
	GetCheckpoint(ctx context.Context, name string) (uint64, error)
}

type ConnectionHealth

type ConnectionHealth struct {
	StartTime               time.Time
	LastSuccessfulHeartbeat time.Time
	LastSuccessfulUpdate    time.Time
	CurrentState            ConnectionState
	ConsecutiveFailures     int
	LastError               error
	ConnectedSince          time.Time
}

type ConnectionState

type ConnectionState int

ConnectionState represents the current state of a connection

const (
	Disconnected ConnectionState = iota
	Connecting
	Connected
)

func (ConnectionState) String

func (c ConnectionState) String() string

String returns the string representation of the connection state

type ConnectionStateHandler

type ConnectionStateHandler func(ConnectionState)

ConnectionStateHandler is called when connection state changes

type MessageCreator

type MessageCreator interface {
	// CreateMessage converts a watcher event into a message envelope.
	// Returns nil if no message should be published for this event.
	// Any error will halt event processing.
	CreateMessage(event watcher.Event) (*envelope.Message, error)
}

MessageCreator defines how events from the watcher are converted into messages for publishing. This is the primary extension point for customizing transport behavior.

type MessageCreatorFactory

type MessageCreatorFactory interface {
	CreateMessageCreator(ctx context.Context, nodeID string) (MessageCreator, error)
}

type MockCheckpointer

type MockCheckpointer struct {
	// contains filtered or unexported fields
}

MockCheckpointer is a mock of Checkpointer interface.

func NewMockCheckpointer

func NewMockCheckpointer(ctrl *gomock.Controller) *MockCheckpointer

NewMockCheckpointer creates a new mock instance.

func (*MockCheckpointer) Checkpoint

func (m *MockCheckpointer) Checkpoint(ctx context.Context, name string, sequenceNumber uint64) error

Checkpoint mocks base method.

func (*MockCheckpointer) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockCheckpointer) GetCheckpoint

func (m *MockCheckpointer) GetCheckpoint(ctx context.Context, name string) (uint64, error)

GetCheckpoint mocks base method.

type MockCheckpointerMockRecorder

type MockCheckpointerMockRecorder struct {
	// contains filtered or unexported fields
}

MockCheckpointerMockRecorder is the mock recorder for MockCheckpointer.

func (*MockCheckpointerMockRecorder) Checkpoint

func (mr *MockCheckpointerMockRecorder) Checkpoint(ctx, name, sequenceNumber interface{}) *gomock.Call

Checkpoint indicates an expected call of Checkpoint.

func (*MockCheckpointerMockRecorder) GetCheckpoint

func (mr *MockCheckpointerMockRecorder) GetCheckpoint(ctx, name interface{}) *gomock.Call

GetCheckpoint indicates an expected call of GetCheckpoint.

type MockMessageCreator

type MockMessageCreator struct {
	// contains filtered or unexported fields
}

MockMessageCreator is a mock of MessageCreator interface.

func NewMockMessageCreator

func NewMockMessageCreator(ctrl *gomock.Controller) *MockMessageCreator

NewMockMessageCreator creates a new mock instance.

func (*MockMessageCreator) CreateMessage

func (m *MockMessageCreator) CreateMessage(event watcher.Event) (*envelope.Message, error)

CreateMessage mocks base method.

func (*MockMessageCreator) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

type MockMessageCreatorFactory

type MockMessageCreatorFactory struct {
	// contains filtered or unexported fields
}

MockMessageCreatorFactory is a mock of MessageCreatorFactory interface.

func NewMockMessageCreatorFactory

func NewMockMessageCreatorFactory(ctrl *gomock.Controller) *MockMessageCreatorFactory

NewMockMessageCreatorFactory creates a new mock instance.

func (*MockMessageCreatorFactory) CreateMessageCreator

func (m *MockMessageCreatorFactory) CreateMessageCreator(ctx context.Context, nodeID string) (MessageCreator, error)

CreateMessageCreator mocks base method.

func (*MockMessageCreatorFactory) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

type MockMessageCreatorFactoryMockRecorder

type MockMessageCreatorFactoryMockRecorder struct {
	// contains filtered or unexported fields
}

MockMessageCreatorFactoryMockRecorder is the mock recorder for MockMessageCreatorFactory.

func (*MockMessageCreatorFactoryMockRecorder) CreateMessageCreator

func (mr *MockMessageCreatorFactoryMockRecorder) CreateMessageCreator(ctx, nodeID interface{}) *gomock.Call

CreateMessageCreator indicates an expected call of CreateMessageCreator.

type MockMessageCreatorMockRecorder

type MockMessageCreatorMockRecorder struct {
	// contains filtered or unexported fields
}

MockMessageCreatorMockRecorder is the mock recorder for MockMessageCreator.

func (*MockMessageCreatorMockRecorder) CreateMessage

func (mr *MockMessageCreatorMockRecorder) CreateMessage(event interface{}) *gomock.Call

CreateMessage indicates an expected call of CreateMessage.

type SequenceTracker

type SequenceTracker struct {
	// contains filtered or unexported fields
}

SequenceTracker tracks the last successfully processed message sequence number. Used by connection managers to checkpoint progress and resume message processing after restarts. Thread-safe through atomic operations.

func NewSequenceTracker

func NewSequenceTracker() *SequenceTracker

NewSequenceTracker creates a new sequence tracker starting at sequence 0

func (*SequenceTracker) GetLastSeqNum

func (s *SequenceTracker) GetLastSeqNum() uint64

GetLastSeqNum returns the last processed sequence number atomically

func (*SequenceTracker) OnProcessed

func (s *SequenceTracker) OnProcessed(ctx context.Context, message *envelope.Message)

OnProcessed implements ncl.ProcessingNotifier to track message sequence numbers. Called after each successful message processing operation.

func (*SequenceTracker) UpdateLastSeqNum

func (s *SequenceTracker) UpdateLastSeqNum(seqNum uint64)

UpdateLastSeqNum updates the latest processed sequence number atomically

func (*SequenceTracker) WithLastSeqNum

func (s *SequenceTracker) WithLastSeqNum(seqNum uint64) *SequenceTracker

WithLastSeqNum sets the initial sequence number for resuming processing

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL