nodes

package
v1.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 16, 2024 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Overview

Package nodes is a generated GoMock package.

Package nodes provides node lifecycle and health management for distributed compute clusters.

The package implements a node manager that handles node registration, health monitoring, state tracking, and resource management. It maintains both in-memory state for fast access and persistent storage for durability.

Key features:

  • Node lifecycle management (registration, approval/rejection, deletion)
  • Health monitoring via heartbeats
  • Connection state tracking
  • Resource capacity tracking
  • Event notifications for state changes

Basic usage:

manager, err := nodes.NewManager(nodes.ManagerParams{
    Store: store,
    NodeDisconnectedAfter: 5 * time.Minute,
})
if err != nil {
    return err
}

if err := manager.Start(ctx); err != nil {
    return err
}

// Register connection state handler
manager.OnConnectionStateChange(func(event NodeConnectionEvent) {
    log.Printf("Node %s changed from %s to %s",
        event.NodeID, event.Previous, event.Current)
})

Index

Constants

View Source
const (
	MultipleNodesFound bacerrors.ErrorCode = "MultipleNodesFound"
	ConflictNodeState  bacerrors.ErrorCode = "ConflictNodeState"
	HandshakeRequired  bacerrors.ErrorCode = "HandshakeRequired"
	ConcurrentUpdate   bacerrors.ErrorCode = "ConcurrentUpdate"
)

Variables

This section is empty.

Functions

func NewErrConcurrentModification

func NewErrConcurrentModification() bacerrors.Error

NewErrConcurrentModification returns a standardized error for concurrent update conflicts

func NewErrHandshakeRequired

func NewErrHandshakeRequired(nodeID string) bacerrors.Error

NewErrHandshakeRequired returns a standardized error for when a handshake is required

func NewErrMultipleNodesFound

func NewErrMultipleNodesFound(nodeIDPrefix string, matchingNodeIDs []string) bacerrors.Error

NewErrMultipleNodesFound returns a standardized error for when multiple nodes match a prefix

func NewErrNodeAlreadyApproved

func NewErrNodeAlreadyApproved(nodeID string) bacerrors.Error

NewErrNodeAlreadyApproved returns a standardized error for when a node is already approved

func NewErrNodeAlreadyRejected

func NewErrNodeAlreadyRejected(nodeID string) bacerrors.Error

NewErrNodeAlreadyRejected returns a standardized error for when a node is already rejected

func NewErrNodeNotFound

func NewErrNodeNotFound(nodeID string) bacerrors.Error

NewErrNodeNotFound returns a standardized error for when a node is not found

Types

type ConnectionStateChangeHandler

type ConnectionStateChangeHandler func(NodeConnectionEvent)

ConnectionStateChangeHandler defines a function type for handling connection state changes.

type ExtendedHeartbeatRequest

type ExtendedHeartbeatRequest struct {
	messages.HeartbeatRequest

	// LastComputeSeqNum is the last processed compute message sequence
	LastComputeSeqNum uint64
}

ExtendedHeartbeatRequest represents a heartbeat message with additional metadata.

type ExtendedShutdownNoticeRequest

type ExtendedShutdownNoticeRequest struct {
	messages.ShutdownNoticeRequest

	// LastComputeSeqNum is the last processed compute message sequence
	LastComputeSeqNum uint64
}

ExtendedShutdownNoticeRequest represents a shutdown message with additional metadata.

type Lookup

type Lookup interface {
	// Get retrieves a node's state by exact ID.
	Get(ctx context.Context, nodeID string) (models.NodeState, error)

	// GetByPrefix retrieves a node's state by ID prefix.
	GetByPrefix(ctx context.Context, prefix string) (models.NodeState, error)

	// List returns all nodes matching the given filters.
	List(ctx context.Context, filters ...NodeStateFilter) ([]models.NodeState, error)
}

type Manager

type Manager interface {
	// Start initializes the manager and begins background tasks.
	// It loads existing node states and starts health monitoring.
	Start(ctx context.Context) error

	// Stop gracefully shuts down the manager and its background tasks.
	// It ensures state is persisted before stopping.
	Stop(ctx context.Context) error

	// Running returns whether the manager is currently active.
	Running() bool

	// Handshake handles initial node registration or reconnection.
	// It validates the node and establishes its initial state.
	Handshake(ctx context.Context, request messages.HandshakeRequest) (messages.HandshakeResponse, error)

	// UpdateNodeInfo updates a node's information and capabilities.
	// The node must be registered and not rejected.
	UpdateNodeInfo(ctx context.Context, request messages.UpdateNodeInfoRequest) (messages.UpdateNodeInfoResponse, error)

	// ShutdownNotice handles a node's graceful shutdown notification.
	// It updates sequence numbers and marks the node as cleanly disconnected.
	ShutdownNotice(ctx context.Context, request ExtendedShutdownNoticeRequest) (messages.ShutdownNoticeResponse, error)

	// Heartbeat processes a node's heartbeat message and updates its state.
	// It returns the last known sequence numbers for synchronization.
	Heartbeat(ctx context.Context, request ExtendedHeartbeatRequest) (messages.HeartbeatResponse, error)

	// ApproveNode approves a node for cluster participation.
	// Returns error if node is already approved or not found.
	ApproveNode(ctx context.Context, nodeID string) error

	// RejectNode rejects a node from cluster participation.
	// Returns error if node is already rejected or not found.
	RejectNode(ctx context.Context, nodeID string) error

	// DeleteNode removes a node from the cluster.
	// Returns error if node is not found.
	DeleteNode(ctx context.Context, nodeID string) error

	// OnConnectionStateChange registers a handler for node connection state changes.
	OnConnectionStateChange(handler ConnectionStateChangeHandler)

	Lookup
}

Manager defines the interface for node lifecycle and health management. It provides operations for node registration, state updates, and queries.

func NewManager

func NewManager(params ManagerParams) (Manager, error)

NewManager creates a new nodesManager with the given configuration. It initializes the manager but does not start background tasks - call Start() for that.

type ManagerParams

type ManagerParams struct {
	// Store provides persistent storage for node states
	Store Store

	// Clock is the time source (defaults to real clock if nil)
	Clock clock.Clock

	// NodeInfoProvider provides node information for self registration
	NodeInfoProvider models.NodeInfoProvider

	// NodeDisconnectedAfter is how long to wait before marking nodes as disconnected
	NodeDisconnectedAfter time.Duration

	// HealthCheckFrequency is how often to check node health (optional)
	HealthCheckFrequency time.Duration

	// ManualApproval determines if nodes require manual approval
	ManualApproval bool

	// PersistInterval is how often to persist state changes (optional)
	PersistInterval time.Duration

	// PersistTimeout is the timeout for persistence operations (optional)
	PersistTimeout time.Duration

	// ShutdownTimeout is the timeout for graceful shutdown (optional)
	ShutdownTimeout time.Duration

	// EventStore provides storage for events so that node manager can assign
	// new nodes with latest sequence number in the store
	EventStore watcher.EventStore
}

ManagerParams holds configuration for creating a new node manager.

type MockLookup

type MockLookup struct {
	// contains filtered or unexported fields
}

MockLookup is a mock of Lookup interface.

func NewMockLookup

func NewMockLookup(ctrl *gomock.Controller) *MockLookup

NewMockLookup creates a new mock instance.

func (*MockLookup) EXPECT

func (m *MockLookup) EXPECT() *MockLookupMockRecorder

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockLookup) Get

func (m *MockLookup) Get(ctx context.Context, nodeID string) (models.NodeState, error)

Get mocks base method.

func (*MockLookup) GetByPrefix

func (m *MockLookup) GetByPrefix(ctx context.Context, prefix string) (models.NodeState, error)

GetByPrefix mocks base method.

func (*MockLookup) List

func (m *MockLookup) List(ctx context.Context, filters ...NodeStateFilter) ([]models.NodeState, error)

List mocks base method.

type MockLookupMockRecorder

type MockLookupMockRecorder struct {
	// contains filtered or unexported fields
}

MockLookupMockRecorder is the mock recorder for MockLookup.

func (*MockLookupMockRecorder) Get

func (mr *MockLookupMockRecorder) Get(ctx, nodeID interface{}) *gomock.Call

Get indicates an expected call of Get.

func (*MockLookupMockRecorder) GetByPrefix

func (mr *MockLookupMockRecorder) GetByPrefix(ctx, prefix interface{}) *gomock.Call

GetByPrefix indicates an expected call of GetByPrefix.

func (*MockLookupMockRecorder) List

func (mr *MockLookupMockRecorder) List(ctx interface{}, filters ...interface{}) *gomock.Call

List indicates an expected call of List.

type MockManager

type MockManager struct {
	// contains filtered or unexported fields
}

MockManager is a mock of Manager interface.

func NewMockManager

func NewMockManager(ctrl *gomock.Controller) *MockManager

NewMockManager creates a new mock instance.

func (*MockManager) ApproveNode

func (m *MockManager) ApproveNode(ctx context.Context, nodeID string) error

ApproveNode mocks base method.

func (*MockManager) DeleteNode

func (m *MockManager) DeleteNode(ctx context.Context, nodeID string) error

DeleteNode mocks base method.

func (*MockManager) EXPECT

func (m *MockManager) EXPECT() *MockManagerMockRecorder

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockManager) Get

func (m *MockManager) Get(ctx context.Context, nodeID string) (models.NodeState, error)

Get mocks base method.

func (*MockManager) GetByPrefix

func (m *MockManager) GetByPrefix(ctx context.Context, prefix string) (models.NodeState, error)

GetByPrefix mocks base method.

func (*MockManager) Handshake

Handshake mocks base method.

func (*MockManager) Heartbeat

Heartbeat mocks base method.

func (*MockManager) List

func (m *MockManager) List(ctx context.Context, filters ...NodeStateFilter) ([]models.NodeState, error)

List mocks base method.

func (*MockManager) OnConnectionStateChange

func (m *MockManager) OnConnectionStateChange(handler ConnectionStateChangeHandler)

OnConnectionStateChange mocks base method.

func (*MockManager) RejectNode

func (m *MockManager) RejectNode(ctx context.Context, nodeID string) error

RejectNode mocks base method.

func (*MockManager) Running

func (m *MockManager) Running() bool

Running mocks base method.

func (*MockManager) Start

func (m *MockManager) Start(ctx context.Context) error

Start mocks base method.

func (*MockManager) Stop

func (m *MockManager) Stop(ctx context.Context) error

Stop mocks base method.

func (*MockManager) UpdateNodeInfo

UpdateNodeInfo mocks base method.

type MockManagerMockRecorder

type MockManagerMockRecorder struct {
	// contains filtered or unexported fields
}

MockManagerMockRecorder is the mock recorder for MockManager.

func (*MockManagerMockRecorder) ApproveNode

func (mr *MockManagerMockRecorder) ApproveNode(ctx, nodeID interface{}) *gomock.Call

ApproveNode indicates an expected call of ApproveNode.

func (*MockManagerMockRecorder) DeleteNode

func (mr *MockManagerMockRecorder) DeleteNode(ctx, nodeID interface{}) *gomock.Call

DeleteNode indicates an expected call of DeleteNode.

func (*MockManagerMockRecorder) Get

func (mr *MockManagerMockRecorder) Get(ctx, nodeID interface{}) *gomock.Call

Get indicates an expected call of Get.

func (*MockManagerMockRecorder) GetByPrefix

func (mr *MockManagerMockRecorder) GetByPrefix(ctx, prefix interface{}) *gomock.Call

GetByPrefix indicates an expected call of GetByPrefix.

func (*MockManagerMockRecorder) Handshake

func (mr *MockManagerMockRecorder) Handshake(ctx, request interface{}) *gomock.Call

Handshake indicates an expected call of Handshake.

func (*MockManagerMockRecorder) Heartbeat

func (mr *MockManagerMockRecorder) Heartbeat(ctx, request interface{}) *gomock.Call

Heartbeat indicates an expected call of Heartbeat.

func (*MockManagerMockRecorder) List

func (mr *MockManagerMockRecorder) List(ctx interface{}, filters ...interface{}) *gomock.Call

List indicates an expected call of List.

func (*MockManagerMockRecorder) OnConnectionStateChange

func (mr *MockManagerMockRecorder) OnConnectionStateChange(handler interface{}) *gomock.Call

OnConnectionStateChange indicates an expected call of OnConnectionStateChange.

func (*MockManagerMockRecorder) RejectNode

func (mr *MockManagerMockRecorder) RejectNode(ctx, nodeID interface{}) *gomock.Call

RejectNode indicates an expected call of RejectNode.

func (*MockManagerMockRecorder) Running

func (mr *MockManagerMockRecorder) Running() *gomock.Call

Running indicates an expected call of Running.

func (*MockManagerMockRecorder) Start

func (mr *MockManagerMockRecorder) Start(ctx interface{}) *gomock.Call

Start indicates an expected call of Start.

func (*MockManagerMockRecorder) Stop

func (mr *MockManagerMockRecorder) Stop(ctx interface{}) *gomock.Call

Stop indicates an expected call of Stop.

func (*MockManagerMockRecorder) UpdateNodeInfo

func (mr *MockManagerMockRecorder) UpdateNodeInfo(ctx, request interface{}) *gomock.Call

UpdateNodeInfo indicates an expected call of UpdateNodeInfo.

type MockStore

type MockStore struct {
	// contains filtered or unexported fields
}

MockStore is a mock of Store interface.

func NewMockStore

func NewMockStore(ctrl *gomock.Controller) *MockStore

NewMockStore creates a new mock instance.

func (*MockStore) Delete

func (m *MockStore) Delete(ctx context.Context, nodeID string) error

Delete mocks base method.

func (*MockStore) EXPECT

func (m *MockStore) EXPECT() *MockStoreMockRecorder

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockStore) Get

func (m *MockStore) Get(ctx context.Context, nodeID string) (models.NodeState, error)

Get mocks base method.

func (*MockStore) GetByPrefix

func (m *MockStore) GetByPrefix(ctx context.Context, prefix string) (models.NodeState, error)

GetByPrefix mocks base method.

func (*MockStore) List

func (m *MockStore) List(ctx context.Context, filters ...NodeStateFilter) ([]models.NodeState, error)

List mocks base method.

func (*MockStore) Put

func (m *MockStore) Put(ctx context.Context, nodeInfo models.NodeState) error

Put mocks base method.

type MockStoreMockRecorder

type MockStoreMockRecorder struct {
	// contains filtered or unexported fields
}

MockStoreMockRecorder is the mock recorder for MockStore.

func (*MockStoreMockRecorder) Delete

func (mr *MockStoreMockRecorder) Delete(ctx, nodeID interface{}) *gomock.Call

Delete indicates an expected call of Delete.

func (*MockStoreMockRecorder) Get

func (mr *MockStoreMockRecorder) Get(ctx, nodeID interface{}) *gomock.Call

Get indicates an expected call of Get.

func (*MockStoreMockRecorder) GetByPrefix

func (mr *MockStoreMockRecorder) GetByPrefix(ctx, prefix interface{}) *gomock.Call

GetByPrefix indicates an expected call of GetByPrefix.

func (*MockStoreMockRecorder) List

func (mr *MockStoreMockRecorder) List(ctx interface{}, filters ...interface{}) *gomock.Call

List indicates an expected call of List.

func (*MockStoreMockRecorder) Put

func (mr *MockStoreMockRecorder) Put(ctx, nodeInfo interface{}) *gomock.Call

Put indicates an expected call of Put.

type NodeConnectionEvent

type NodeConnectionEvent struct {
	// NodeID is the identifier of the node whose state changed
	NodeID string

	// Previous is the previous connection state
	Previous models.NodeConnectionState

	// Current is the new connection state
	Current models.NodeConnectionState

	// Timestamp is when the state change occurred
	Timestamp time.Time
}

NodeConnectionEvent represents a change in a node's connection state.

type NodeStateFilter

type NodeStateFilter func(models.NodeState) bool

NodeStateFilter defines a function type for filtering node states.

type Store

type Store interface {
	Lookup

	// Put stores a node's state.
	Put(ctx context.Context, state models.NodeState) error

	// Delete removes a node's state.
	Delete(ctx context.Context, nodeID string) error
}

Store defines the interface for persistent node state storage.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL