Documentation ¶
Overview ¶
Package nodes is a generated GoMock package.
Package nodes provides node lifecycle and health management for distributed compute clusters.
The package implements a node manager that handles node registration, health monitoring, state tracking, and resource management. It maintains both in-memory state for fast access and persistent storage for durability.
Key features:
- Node lifecycle management (registration, approval/rejection, deletion)
- Health monitoring via heartbeats
- Connection state tracking
- Resource capacity tracking
- Event notifications for state changes
Basic usage:
manager, err := nodes.NewManager(nodes.ManagerParams{ Store: store, NodeDisconnectedAfter: 5 * time.Minute, }) if err != nil { return err } if err := manager.Start(ctx); err != nil { return err } // Register connection state handler manager.OnConnectionStateChange(func(event NodeConnectionEvent) { log.Printf("Node %s changed from %s to %s", event.NodeID, event.Previous, event.Current) })
Index ¶
- Constants
- func NewErrConcurrentModification() bacerrors.Error
- func NewErrHandshakeRequired(nodeID string) bacerrors.Error
- func NewErrMultipleNodesFound(nodeIDPrefix string, matchingNodeIDs []string) bacerrors.Error
- func NewErrNodeAlreadyApproved(nodeID string) bacerrors.Error
- func NewErrNodeAlreadyRejected(nodeID string) bacerrors.Error
- func NewErrNodeNotFound(nodeID string) bacerrors.Error
- type ConnectionStateChangeHandler
- type ExtendedHeartbeatRequest
- type Lookup
- type Manager
- type ManagerParams
- type MockLookup
- func (m *MockLookup) EXPECT() *MockLookupMockRecorder
- func (m *MockLookup) Get(ctx context.Context, nodeID string) (models.NodeState, error)
- func (m *MockLookup) GetByPrefix(ctx context.Context, prefix string) (models.NodeState, error)
- func (m *MockLookup) List(ctx context.Context, filters ...NodeStateFilter) ([]models.NodeState, error)
- type MockLookupMockRecorder
- type MockManager
- func (m *MockManager) ApproveNode(ctx context.Context, nodeID string) error
- func (m *MockManager) DeleteNode(ctx context.Context, nodeID string) error
- func (m *MockManager) EXPECT() *MockManagerMockRecorder
- func (m *MockManager) Get(ctx context.Context, nodeID string) (models.NodeState, error)
- func (m *MockManager) GetByPrefix(ctx context.Context, prefix string) (models.NodeState, error)
- func (m *MockManager) Handshake(ctx context.Context, request messages.HandshakeRequest) (messages.HandshakeResponse, error)
- func (m *MockManager) Heartbeat(ctx context.Context, request ExtendedHeartbeatRequest) (messages.HeartbeatResponse, error)
- func (m *MockManager) List(ctx context.Context, filters ...NodeStateFilter) ([]models.NodeState, error)
- func (m *MockManager) OnConnectionStateChange(handler ConnectionStateChangeHandler)
- func (m *MockManager) RejectNode(ctx context.Context, nodeID string) error
- func (m *MockManager) Running() bool
- func (m *MockManager) Start(ctx context.Context) error
- func (m *MockManager) Stop(ctx context.Context) error
- func (m *MockManager) UpdateNodeInfo(ctx context.Context, request messages.UpdateNodeInfoRequest) (messages.UpdateNodeInfoResponse, error)
- type MockManagerMockRecorder
- func (mr *MockManagerMockRecorder) ApproveNode(ctx, nodeID interface{}) *gomock.Call
- func (mr *MockManagerMockRecorder) DeleteNode(ctx, nodeID interface{}) *gomock.Call
- func (mr *MockManagerMockRecorder) Get(ctx, nodeID interface{}) *gomock.Call
- func (mr *MockManagerMockRecorder) GetByPrefix(ctx, prefix interface{}) *gomock.Call
- func (mr *MockManagerMockRecorder) Handshake(ctx, request interface{}) *gomock.Call
- func (mr *MockManagerMockRecorder) Heartbeat(ctx, request interface{}) *gomock.Call
- func (mr *MockManagerMockRecorder) List(ctx interface{}, filters ...interface{}) *gomock.Call
- func (mr *MockManagerMockRecorder) OnConnectionStateChange(handler interface{}) *gomock.Call
- func (mr *MockManagerMockRecorder) RejectNode(ctx, nodeID interface{}) *gomock.Call
- func (mr *MockManagerMockRecorder) Running() *gomock.Call
- func (mr *MockManagerMockRecorder) Start(ctx interface{}) *gomock.Call
- func (mr *MockManagerMockRecorder) Stop(ctx interface{}) *gomock.Call
- func (mr *MockManagerMockRecorder) UpdateNodeInfo(ctx, request interface{}) *gomock.Call
- type MockStore
- func (m *MockStore) Delete(ctx context.Context, nodeID string) error
- func (m *MockStore) EXPECT() *MockStoreMockRecorder
- func (m *MockStore) Get(ctx context.Context, nodeID string) (models.NodeState, error)
- func (m *MockStore) GetByPrefix(ctx context.Context, prefix string) (models.NodeState, error)
- func (m *MockStore) List(ctx context.Context, filters ...NodeStateFilter) ([]models.NodeState, error)
- func (m *MockStore) Put(ctx context.Context, nodeInfo models.NodeState) error
- type MockStoreMockRecorder
- func (mr *MockStoreMockRecorder) Delete(ctx, nodeID interface{}) *gomock.Call
- func (mr *MockStoreMockRecorder) Get(ctx, nodeID interface{}) *gomock.Call
- func (mr *MockStoreMockRecorder) GetByPrefix(ctx, prefix interface{}) *gomock.Call
- func (mr *MockStoreMockRecorder) List(ctx interface{}, filters ...interface{}) *gomock.Call
- func (mr *MockStoreMockRecorder) Put(ctx, nodeInfo interface{}) *gomock.Call
- type NodeConnectionEvent
- type NodeStateFilter
- type Store
Constants ¶
Variables ¶
This section is empty.
Functions ¶
func NewErrConcurrentModification ¶
NewErrConcurrentModification returns a standardized error for concurrent update conflicts
func NewErrHandshakeRequired ¶
NewErrHandshakeRequired returns a standardized error for when a handshake is required
func NewErrMultipleNodesFound ¶
NewErrMultipleNodesFound returns a standardized error for when multiple nodes match a prefix
func NewErrNodeAlreadyApproved ¶
NewErrNodeAlreadyApproved returns a standardized error for when a node is already approved
func NewErrNodeAlreadyRejected ¶
NewErrNodeAlreadyRejected returns a standardized error for when a node is already rejected
func NewErrNodeNotFound ¶
NewErrNodeNotFound returns a standardized error for when a node is not found
Types ¶
type ConnectionStateChangeHandler ¶
type ConnectionStateChangeHandler func(NodeConnectionEvent)
ConnectionStateChangeHandler defines a function type for handling connection state changes.
type ExtendedHeartbeatRequest ¶
type ExtendedHeartbeatRequest struct { messages.HeartbeatRequest // LastComputeSeqNum is the last processed compute message sequence LastComputeSeqNum uint64 }
ExtendedHeartbeatRequest represents a heartbeat message with additional metadata.
type Lookup ¶
type Lookup interface { // Get retrieves a node's state by exact ID. Get(ctx context.Context, nodeID string) (models.NodeState, error) // GetByPrefix retrieves a node's state by ID prefix. GetByPrefix(ctx context.Context, prefix string) (models.NodeState, error) // List returns all nodes matching the given filters. List(ctx context.Context, filters ...NodeStateFilter) ([]models.NodeState, error) }
type Manager ¶
type Manager interface { // Start initializes the manager and begins background tasks. // It loads existing node states and starts health monitoring. Start(ctx context.Context) error // Stop gracefully shuts down the manager and its background tasks. // It ensures state is persisted before stopping. Stop(ctx context.Context) error // Running returns whether the manager is currently active. Running() bool // Handshake handles initial node registration or reconnection. // It validates the node and establishes its initial state. Handshake(ctx context.Context, request messages.HandshakeRequest) (messages.HandshakeResponse, error) // UpdateNodeInfo updates a node's information and capabilities. // The node must be registered and not rejected. UpdateNodeInfo(ctx context.Context, request messages.UpdateNodeInfoRequest) (messages.UpdateNodeInfoResponse, error) // Heartbeat processes a node's heartbeat message and updates its state. // It returns the last known sequence numbers for synchronization. Heartbeat(ctx context.Context, request ExtendedHeartbeatRequest) (messages.HeartbeatResponse, error) // ApproveNode approves a node for cluster participation. // Returns error if node is already approved or not found. ApproveNode(ctx context.Context, nodeID string) error // RejectNode rejects a node from cluster participation. // Returns error if node is already rejected or not found. RejectNode(ctx context.Context, nodeID string) error // DeleteNode removes a node from the cluster. // Returns error if node is not found. DeleteNode(ctx context.Context, nodeID string) error // OnConnectionStateChange registers a handler for node connection state changes. OnConnectionStateChange(handler ConnectionStateChangeHandler) Lookup }
Manager defines the interface for node lifecycle and health management. It provides operations for node registration, state updates, and queries.
func NewManager ¶
func NewManager(params ManagerParams) (Manager, error)
NewManager creates a new nodesManager with the given configuration. It initializes the manager but does not start background tasks - call Start() for that.
type ManagerParams ¶
type ManagerParams struct { // Store provides persistent storage for node states Store Store // Clock is the time source (defaults to real clock if nil) Clock clock.Clock // NodeDisconnectedAfter is how long to wait before marking nodes as disconnected NodeDisconnectedAfter time.Duration // HealthCheckFrequency is how often to check node health (optional) HealthCheckFrequency time.Duration // ManualApproval determines if nodes require manual approval ManualApproval bool // PersistInterval is how often to persist state changes (optional) PersistInterval time.Duration // PersistTimeout is the timeout for persistence operations (optional) PersistTimeout time.Duration // ShutdownTimeout is the timeout for graceful shutdown (optional) ShutdownTimeout time.Duration // EventStore provides storage for events so that node manager can assign // new nodes with latest sequence number in the store EventStore watcher.EventStore }
ManagerParams holds configuration for creating a new node manager.
type MockLookup ¶
type MockLookup struct {
// contains filtered or unexported fields
}
MockLookup is a mock of Lookup interface.
func NewMockLookup ¶
func NewMockLookup(ctrl *gomock.Controller) *MockLookup
NewMockLookup creates a new mock instance.
func (*MockLookup) EXPECT ¶
func (m *MockLookup) EXPECT() *MockLookupMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockLookup) GetByPrefix ¶
GetByPrefix mocks base method.
func (*MockLookup) List ¶
func (m *MockLookup) List(ctx context.Context, filters ...NodeStateFilter) ([]models.NodeState, error)
List mocks base method.
type MockLookupMockRecorder ¶
type MockLookupMockRecorder struct {
// contains filtered or unexported fields
}
MockLookupMockRecorder is the mock recorder for MockLookup.
func (*MockLookupMockRecorder) Get ¶
func (mr *MockLookupMockRecorder) Get(ctx, nodeID interface{}) *gomock.Call
Get indicates an expected call of Get.
func (*MockLookupMockRecorder) GetByPrefix ¶
func (mr *MockLookupMockRecorder) GetByPrefix(ctx, prefix interface{}) *gomock.Call
GetByPrefix indicates an expected call of GetByPrefix.
func (*MockLookupMockRecorder) List ¶
func (mr *MockLookupMockRecorder) List(ctx interface{}, filters ...interface{}) *gomock.Call
List indicates an expected call of List.
type MockManager ¶
type MockManager struct {
// contains filtered or unexported fields
}
MockManager is a mock of Manager interface.
func NewMockManager ¶
func NewMockManager(ctrl *gomock.Controller) *MockManager
NewMockManager creates a new mock instance.
func (*MockManager) ApproveNode ¶
func (m *MockManager) ApproveNode(ctx context.Context, nodeID string) error
ApproveNode mocks base method.
func (*MockManager) DeleteNode ¶
func (m *MockManager) DeleteNode(ctx context.Context, nodeID string) error
DeleteNode mocks base method.
func (*MockManager) EXPECT ¶
func (m *MockManager) EXPECT() *MockManagerMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockManager) GetByPrefix ¶
GetByPrefix mocks base method.
func (*MockManager) Handshake ¶
func (m *MockManager) Handshake(ctx context.Context, request messages.HandshakeRequest) (messages.HandshakeResponse, error)
Handshake mocks base method.
func (*MockManager) Heartbeat ¶
func (m *MockManager) Heartbeat(ctx context.Context, request ExtendedHeartbeatRequest) (messages.HeartbeatResponse, error)
Heartbeat mocks base method.
func (*MockManager) List ¶
func (m *MockManager) List(ctx context.Context, filters ...NodeStateFilter) ([]models.NodeState, error)
List mocks base method.
func (*MockManager) OnConnectionStateChange ¶
func (m *MockManager) OnConnectionStateChange(handler ConnectionStateChangeHandler)
OnConnectionStateChange mocks base method.
func (*MockManager) RejectNode ¶
func (m *MockManager) RejectNode(ctx context.Context, nodeID string) error
RejectNode mocks base method.
func (*MockManager) Start ¶
func (m *MockManager) Start(ctx context.Context) error
Start mocks base method.
func (*MockManager) Stop ¶
func (m *MockManager) Stop(ctx context.Context) error
Stop mocks base method.
func (*MockManager) UpdateNodeInfo ¶
func (m *MockManager) UpdateNodeInfo(ctx context.Context, request messages.UpdateNodeInfoRequest) (messages.UpdateNodeInfoResponse, error)
UpdateNodeInfo mocks base method.
type MockManagerMockRecorder ¶
type MockManagerMockRecorder struct {
// contains filtered or unexported fields
}
MockManagerMockRecorder is the mock recorder for MockManager.
func (*MockManagerMockRecorder) ApproveNode ¶
func (mr *MockManagerMockRecorder) ApproveNode(ctx, nodeID interface{}) *gomock.Call
ApproveNode indicates an expected call of ApproveNode.
func (*MockManagerMockRecorder) DeleteNode ¶
func (mr *MockManagerMockRecorder) DeleteNode(ctx, nodeID interface{}) *gomock.Call
DeleteNode indicates an expected call of DeleteNode.
func (*MockManagerMockRecorder) Get ¶
func (mr *MockManagerMockRecorder) Get(ctx, nodeID interface{}) *gomock.Call
Get indicates an expected call of Get.
func (*MockManagerMockRecorder) GetByPrefix ¶
func (mr *MockManagerMockRecorder) GetByPrefix(ctx, prefix interface{}) *gomock.Call
GetByPrefix indicates an expected call of GetByPrefix.
func (*MockManagerMockRecorder) Handshake ¶
func (mr *MockManagerMockRecorder) Handshake(ctx, request interface{}) *gomock.Call
Handshake indicates an expected call of Handshake.
func (*MockManagerMockRecorder) Heartbeat ¶
func (mr *MockManagerMockRecorder) Heartbeat(ctx, request interface{}) *gomock.Call
Heartbeat indicates an expected call of Heartbeat.
func (*MockManagerMockRecorder) List ¶
func (mr *MockManagerMockRecorder) List(ctx interface{}, filters ...interface{}) *gomock.Call
List indicates an expected call of List.
func (*MockManagerMockRecorder) OnConnectionStateChange ¶
func (mr *MockManagerMockRecorder) OnConnectionStateChange(handler interface{}) *gomock.Call
OnConnectionStateChange indicates an expected call of OnConnectionStateChange.
func (*MockManagerMockRecorder) RejectNode ¶
func (mr *MockManagerMockRecorder) RejectNode(ctx, nodeID interface{}) *gomock.Call
RejectNode indicates an expected call of RejectNode.
func (*MockManagerMockRecorder) Running ¶
func (mr *MockManagerMockRecorder) Running() *gomock.Call
Running indicates an expected call of Running.
func (*MockManagerMockRecorder) Start ¶
func (mr *MockManagerMockRecorder) Start(ctx interface{}) *gomock.Call
Start indicates an expected call of Start.
func (*MockManagerMockRecorder) Stop ¶
func (mr *MockManagerMockRecorder) Stop(ctx interface{}) *gomock.Call
Stop indicates an expected call of Stop.
func (*MockManagerMockRecorder) UpdateNodeInfo ¶
func (mr *MockManagerMockRecorder) UpdateNodeInfo(ctx, request interface{}) *gomock.Call
UpdateNodeInfo indicates an expected call of UpdateNodeInfo.
type MockStore ¶
type MockStore struct {
// contains filtered or unexported fields
}
MockStore is a mock of Store interface.
func NewMockStore ¶
func NewMockStore(ctrl *gomock.Controller) *MockStore
NewMockStore creates a new mock instance.
func (*MockStore) EXPECT ¶
func (m *MockStore) EXPECT() *MockStoreMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockStore) GetByPrefix ¶
GetByPrefix mocks base method.
type MockStoreMockRecorder ¶
type MockStoreMockRecorder struct {
// contains filtered or unexported fields
}
MockStoreMockRecorder is the mock recorder for MockStore.
func (*MockStoreMockRecorder) Delete ¶
func (mr *MockStoreMockRecorder) Delete(ctx, nodeID interface{}) *gomock.Call
Delete indicates an expected call of Delete.
func (*MockStoreMockRecorder) Get ¶
func (mr *MockStoreMockRecorder) Get(ctx, nodeID interface{}) *gomock.Call
Get indicates an expected call of Get.
func (*MockStoreMockRecorder) GetByPrefix ¶
func (mr *MockStoreMockRecorder) GetByPrefix(ctx, prefix interface{}) *gomock.Call
GetByPrefix indicates an expected call of GetByPrefix.
func (*MockStoreMockRecorder) List ¶
func (mr *MockStoreMockRecorder) List(ctx interface{}, filters ...interface{}) *gomock.Call
List indicates an expected call of List.
func (*MockStoreMockRecorder) Put ¶
func (mr *MockStoreMockRecorder) Put(ctx, nodeInfo interface{}) *gomock.Call
Put indicates an expected call of Put.
type NodeConnectionEvent ¶
type NodeConnectionEvent struct { // NodeID is the identifier of the node whose state changed NodeID string // Previous is the previous connection state Previous models.NodeConnectionState // Current is the new connection state Current models.NodeConnectionState // Timestamp is when the state change occurred Timestamp time.Time }
NodeConnectionEvent represents a change in a node's connection state.
type NodeStateFilter ¶
NodeStateFilter defines a function type for filtering node states.