agent

package
v0.0.0-...-e20880f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 17, 2025 License: Apache-2.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrNoComputedSchemas = errors.New("Could not find any computed schemas")

ErrNoComputedSchemas is an error indicating the lack of computedSchemas.

Functions

This section is empty.

Types

type CIDRInfoProvider

type CIDRInfoProvider interface {
	GetServiceCIDR() string
	GetPodCIDRs() []string
}

CIDRInfoProvider is an interface that provides CIDRInfo for a given agent.

type Datastore

type Datastore struct {
	// contains filtered or unexported fields
}

Datastore implements the Store interface on a given Datastore.

func NewDatastore

func NewDatastore(ds datastore.MultiGetterSetterDeleterCloser, expiryDuration time.Duration) *Datastore

NewDatastore wraps the datastore in a Store

func (*Datastore) CreateAgent

func (a *Datastore) CreateAgent(agentID uuid.UUID, agt *agentpb.Agent) error

CreateAgent creates a new agent.

func (*Datastore) DeleteAgent

func (a *Datastore) DeleteAgent(agentID uuid.UUID) error

DeleteAgent deletes the agent with the given ID.

func (*Datastore) GetASID

func (a *Datastore) GetASID() (uint32, error)

GetASID gets the next assignable ASID.

func (*Datastore) GetAgent

func (a *Datastore) GetAgent(agentID uuid.UUID) (*agentpb.Agent, error)

GetAgent gets the agent info for the agent with the given id.

func (*Datastore) GetAgentIDForHostnamePair

func (a *Datastore) GetAgentIDForHostnamePair(hnPair *HostnameIPPair) (string, error)

GetAgentIDForHostnamePair gets the agent for the given hostnamePair, if it exists.

func (*Datastore) GetAgentIDFromPodName

func (a *Datastore) GetAgentIDFromPodName(podName string) (string, error)

GetAgentIDFromPodName gets the agent ID for the agent with the given name.

func (*Datastore) GetAgents

func (a *Datastore) GetAgents() ([]*agentpb.Agent, error)

GetAgents gets all of the current active agents.

func (*Datastore) GetAgentsDataInfo

func (a *Datastore) GetAgentsDataInfo() (map[uuid.UUID]*messagespb.AgentDataInfo, error)

GetAgentsDataInfo returns all of the information about data tables that each agent has.

func (*Datastore) GetComputedSchema

func (a *Datastore) GetComputedSchema() (*storepb.ComputedSchema, error)

GetComputedSchema returns the raw CombinedComputedSchema.

func (*Datastore) GetProcesses

func (a *Datastore) GetProcesses(upids []*types.UInt128) ([]*metadatapb.ProcessInfo, error)

GetProcesses gets the process infos for the given process upids.

func (*Datastore) PruneComputedSchema

func (a *Datastore) PruneComputedSchema() error

PruneComputedSchema cleans any dead agents from the computed schema. This is a temporary fix, to address a larger consistency and race-condition problem that will be addressed by the upcoming extensive refactor of the metadata service.

func (*Datastore) UpdateAgent

func (a *Datastore) UpdateAgent(agentID uuid.UUID, agt *agentpb.Agent) error

UpdateAgent updates the agent info for the agent with the given ID.

func (*Datastore) UpdateAgentDataInfo

func (a *Datastore) UpdateAgentDataInfo(agentID uuid.UUID, dataInfo *messagespb.AgentDataInfo) error

UpdateAgentDataInfo updates the information about data tables that a particular agent has.

func (*Datastore) UpdateProcesses

func (a *Datastore) UpdateProcesses(processes []*metadatapb.ProcessInfo) error

UpdateProcesses updates the given processes in the metadata store.

func (*Datastore) UpdateSchemas

func (a *Datastore) UpdateSchemas(agentID uuid.UUID, schemas []*storepb.TableInfo) error

UpdateSchemas updates the given schemas in the metadata store.

type HostnameIPPair

type HostnameIPPair struct {
	Hostname string
	IP       string
}

HostnameIPPair is a unique identifies for a K8s node.

type Manager

type Manager interface {
	// RegisterAgent registers a new agent.
	RegisterAgent(info *agentpb.Agent) (uint32, error)

	// UpdateHeartbeat updates the agent heartbeat with the current time.
	UpdateHeartbeat(agentID uuid.UUID) error

	// Delete agent deletes the agent.
	DeleteAgent(uuid.UUID) error

	// GetActiveAgents gets all of the current active agents.
	GetActiveAgents() ([]*agentpb.Agent, error)

	MessageAgents(agentIDs []uuid.UUID, msg []byte) error
	MessageActiveAgents(msg []byte) error

	ApplyAgentUpdate(update *Update) error

	// NewAgentUpdateCursor creates a unique ID for an agent update tracking cursor.
	// It, when used with GetAgentUpdates, can be used by clients of the agent manager
	// to get the initial agent state and track updates as deltas to that state.
	NewAgentUpdateCursor() uuid.UUID

	// DeleteAgentUpdateCursor deletes a cursor from the Manager so that it no longer
	// tracks updates.
	DeleteAgentUpdateCursor(cursorID uuid.UUID)

	// GetAgentUpdates returns all of the updates that have occurred for agents since
	// the last invocation of GetAgentUpdates. If GetAgentUpdates has never been called for
	// a given cursorID, the full initial state will be read first.
	GetAgentUpdates(cursorID uuid.UUID) ([]*metadata_servicepb.AgentUpdate, *storepb.ComputedSchema, error)

	// UpdateConfig updates the config for the specified agent.
	UpdateConfig(string, string, string, string) error

	// GetComputedSchema gets the computed schemas
	GetComputedSchema() (*storepb.ComputedSchema, error)
	// GetAgentIDForHostnamePair gets the agent for the given hostnamePair, if it exists.
	GetAgentIDForHostnamePair(hnPair *HostnameIPPair) (string, error)

	// GetServiceCIDR returns the service CIDR for the current cluster.
	GetServiceCIDR() string
	// GetPodCIDRs returns the PodCIDRs for the cluster.
	GetPodCIDRs() []string
}

Manager handles any agent updates and requests.

type ManagerImpl

type ManagerImpl struct {
	// contains filtered or unexported fields
}

ManagerImpl is an implementation for Manager which talks to the metadata store.

func NewManager

func NewManager(agtStore Store, cidr CIDRInfoProvider, conn *nats.Conn) *ManagerImpl

NewManager creates a new agent manager. TODO (vihang/michelle): Figure out a better solution than passing in the k8s controller. We need the cidr to get CIDR info right now.

func (*ManagerImpl) ApplyAgentUpdate

func (m *ManagerImpl) ApplyAgentUpdate(update *Update) error

ApplyAgentUpdate updates the metadata store with the information from the agent update.

func (*ManagerImpl) DeleteAgent

func (m *ManagerImpl) DeleteAgent(agentID uuid.UUID) error

DeleteAgent deletes the agent with the given ID.

func (*ManagerImpl) DeleteAgentUpdateCursor

func (m *ManagerImpl) DeleteAgentUpdateCursor(cursorID uuid.UUID)

DeleteAgentUpdateCursor deletes a created cursor so that it no longer needs to keep track of agent updates when it's not used anymore.

func (*ManagerImpl) GetActiveAgents

func (m *ManagerImpl) GetActiveAgents() ([]*agentpb.Agent, error)

GetActiveAgents gets all of the current active agents.

func (*ManagerImpl) GetAgentIDForHostnamePair

func (m *ManagerImpl) GetAgentIDForHostnamePair(hnPair *HostnameIPPair) (string, error)

GetAgentIDForHostnamePair gets the agent for the given hostnamePair, if it exists.

func (*ManagerImpl) GetAgentUpdates

func (m *ManagerImpl) GetAgentUpdates(cursorID uuid.UUID) ([]*metadata_servicepb.AgentUpdate,
	*storepb.ComputedSchema, error)

GetAgentUpdates returns the latest agent status since the last call to GetAgentUpdates(). if the input cursor has never read the initial state before, the full initial agent state is read out. Afterwards, the changes to the agent state are read out as a delta to the previous state.

func (*ManagerImpl) GetComputedSchema

func (m *ManagerImpl) GetComputedSchema() (*storepb.ComputedSchema, error)

GetComputedSchema gets the computed schemas

func (*ManagerImpl) GetPodCIDRs

func (m *ManagerImpl) GetPodCIDRs() []string

GetPodCIDRs returns the PodCIDRs for the cluster.

func (*ManagerImpl) GetServiceCIDR

func (m *ManagerImpl) GetServiceCIDR() string

GetServiceCIDR returns the service CIDR for the current cluster.

func (*ManagerImpl) MessageActiveAgents

func (m *ManagerImpl) MessageActiveAgents(msg []byte) error

MessageActiveAgents sends the message to all active agents.

func (*ManagerImpl) MessageAgents

func (m *ManagerImpl) MessageAgents(agentIDs []uuid.UUID, msg []byte) error

MessageAgents sends the message to the given agentIDs.

func (*ManagerImpl) NewAgentUpdateCursor

func (m *ManagerImpl) NewAgentUpdateCursor() uuid.UUID

NewAgentUpdateCursor creates a new cursor that keeps track of agent state over time.

func (*ManagerImpl) RegisterAgent

func (m *ManagerImpl) RegisterAgent(agent *agentpb.Agent) (uint32, error)

RegisterAgent creates a new agent.

func (*ManagerImpl) UpdateConfig

func (m *ManagerImpl) UpdateConfig(ns string, podName string, key string, value string) error

UpdateConfig updates the config key and value for the specified agent.

func (*ManagerImpl) UpdateHeartbeat

func (m *ManagerImpl) UpdateHeartbeat(agentID uuid.UUID) error

UpdateHeartbeat updates the agent heartbeat with the current time.

type Store

type Store interface {
	CreateAgent(agentID uuid.UUID, a *agentpb.Agent) error
	GetAgent(agentID uuid.UUID) (*agentpb.Agent, error)
	UpdateAgent(agentID uuid.UUID, a *agentpb.Agent) error
	DeleteAgent(agentID uuid.UUID) error

	GetAgents() ([]*agentpb.Agent, error)

	GetASID() (uint32, error)
	GetAgentIDFromPodName(podName string) (string, error)

	GetAgentsDataInfo() (map[uuid.UUID]*messagespb.AgentDataInfo, error)
	UpdateAgentDataInfo(agentID uuid.UUID, dataInfo *messagespb.AgentDataInfo) error

	GetComputedSchema() (*storepb.ComputedSchema, error)
	UpdateSchemas(agentID uuid.UUID, schemas []*storepb.TableInfo) error
	PruneComputedSchema() error

	GetProcesses(upids []*types.UInt128) ([]*metadatapb.ProcessInfo, error)
	UpdateProcesses(processes []*metadatapb.ProcessInfo) error

	GetAgentIDForHostnamePair(hnPair *HostnameIPPair) (string, error)
}

Store is the interface that a persistent datastore needs to implement for tracking agent data.

type Update

type Update struct {
	UpdateInfo *messagespb.AgentUpdateInfo
	AgentID    uuid.UUID
}

Update describes the update info for a given agent.

Directories

Path Synopsis
Package mock_agent is a generated GoMock package.
Package mock_agent is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL