Documentation ¶
Index ¶
- Variables
- type CIDRInfoProvider
- type Datastore
- func (a *Datastore) CreateAgent(agentID uuid.UUID, agt *agentpb.Agent) error
- func (a *Datastore) DeleteAgent(agentID uuid.UUID) error
- func (a *Datastore) GetASID() (uint32, error)
- func (a *Datastore) GetAgent(agentID uuid.UUID) (*agentpb.Agent, error)
- func (a *Datastore) GetAgentIDForHostnamePair(hnPair *HostnameIPPair) (string, error)
- func (a *Datastore) GetAgentIDFromPodName(podName string) (string, error)
- func (a *Datastore) GetAgents() ([]*agentpb.Agent, error)
- func (a *Datastore) GetAgentsDataInfo() (map[uuid.UUID]*messagespb.AgentDataInfo, error)
- func (a *Datastore) GetComputedSchema() (*storepb.ComputedSchema, error)
- func (a *Datastore) GetProcesses(upids []*types.UInt128) ([]*metadatapb.ProcessInfo, error)
- func (a *Datastore) PruneComputedSchema() error
- func (a *Datastore) UpdateAgent(agentID uuid.UUID, agt *agentpb.Agent) error
- func (a *Datastore) UpdateAgentDataInfo(agentID uuid.UUID, dataInfo *messagespb.AgentDataInfo) error
- func (a *Datastore) UpdateProcesses(processes []*metadatapb.ProcessInfo) error
- func (a *Datastore) UpdateSchemas(agentID uuid.UUID, schemas []*storepb.TableInfo) error
- type HostnameIPPair
- type Manager
- type ManagerImpl
- func (m *ManagerImpl) ApplyAgentUpdate(update *Update) error
- func (m *ManagerImpl) DeleteAgent(agentID uuid.UUID) error
- func (m *ManagerImpl) DeleteAgentUpdateCursor(cursorID uuid.UUID)
- func (m *ManagerImpl) GetActiveAgents() ([]*agentpb.Agent, error)
- func (m *ManagerImpl) GetAgentIDForHostnamePair(hnPair *HostnameIPPair) (string, error)
- func (m *ManagerImpl) GetAgentUpdates(cursorID uuid.UUID) ([]*metadata_servicepb.AgentUpdate, *storepb.ComputedSchema, error)
- func (m *ManagerImpl) GetComputedSchema() (*storepb.ComputedSchema, error)
- func (m *ManagerImpl) GetPodCIDRs() []string
- func (m *ManagerImpl) GetServiceCIDR() string
- func (m *ManagerImpl) MessageActiveAgents(msg []byte) error
- func (m *ManagerImpl) MessageAgents(agentIDs []uuid.UUID, msg []byte) error
- func (m *ManagerImpl) NewAgentUpdateCursor() uuid.UUID
- func (m *ManagerImpl) RegisterAgent(agent *agentpb.Agent) (uint32, error)
- func (m *ManagerImpl) UpdateConfig(ns string, podName string, key string, value string) error
- func (m *ManagerImpl) UpdateHeartbeat(agentID uuid.UUID) error
- type Store
- type Update
Constants ¶
This section is empty.
Variables ¶
var ErrNoComputedSchemas = errors.New("Could not find any computed schemas")
ErrNoComputedSchemas is an error indicating the lack of computedSchemas.
Functions ¶
This section is empty.
Types ¶
type CIDRInfoProvider ¶
CIDRInfoProvider is an interface that provides CIDRInfo for a given agent.
type Datastore ¶
type Datastore struct {
// contains filtered or unexported fields
}
Datastore implements the Store interface on a given Datastore.
func NewDatastore ¶
func NewDatastore(ds datastore.MultiGetterSetterDeleterCloser, expiryDuration time.Duration) *Datastore
NewDatastore wraps the datastore in a Store
func (*Datastore) CreateAgent ¶
CreateAgent creates a new agent.
func (*Datastore) DeleteAgent ¶
DeleteAgent deletes the agent with the given ID.
func (*Datastore) GetAgentIDForHostnamePair ¶
func (a *Datastore) GetAgentIDForHostnamePair(hnPair *HostnameIPPair) (string, error)
GetAgentIDForHostnamePair gets the agent for the given hostnamePair, if it exists.
func (*Datastore) GetAgentIDFromPodName ¶
GetAgentIDFromPodName gets the agent ID for the agent with the given name.
func (*Datastore) GetAgentsDataInfo ¶
func (a *Datastore) GetAgentsDataInfo() (map[uuid.UUID]*messagespb.AgentDataInfo, error)
GetAgentsDataInfo returns all of the information about data tables that each agent has.
func (*Datastore) GetComputedSchema ¶
func (a *Datastore) GetComputedSchema() (*storepb.ComputedSchema, error)
GetComputedSchema returns the raw CombinedComputedSchema.
func (*Datastore) GetProcesses ¶
func (a *Datastore) GetProcesses(upids []*types.UInt128) ([]*metadatapb.ProcessInfo, error)
GetProcesses gets the process infos for the given process upids.
func (*Datastore) PruneComputedSchema ¶
PruneComputedSchema cleans any dead agents from the computed schema. This is a temporary fix, to address a larger consistency and race-condition problem that will be addressed by the upcoming extensive refactor of the metadata service.
func (*Datastore) UpdateAgent ¶
UpdateAgent updates the agent info for the agent with the given ID.
func (*Datastore) UpdateAgentDataInfo ¶
func (a *Datastore) UpdateAgentDataInfo(agentID uuid.UUID, dataInfo *messagespb.AgentDataInfo) error
UpdateAgentDataInfo updates the information about data tables that a particular agent has.
func (*Datastore) UpdateProcesses ¶
func (a *Datastore) UpdateProcesses(processes []*metadatapb.ProcessInfo) error
UpdateProcesses updates the given processes in the metadata store.
type HostnameIPPair ¶
HostnameIPPair is a unique identifies for a K8s node.
type Manager ¶
type Manager interface { // RegisterAgent registers a new agent. RegisterAgent(info *agentpb.Agent) (uint32, error) // UpdateHeartbeat updates the agent heartbeat with the current time. UpdateHeartbeat(agentID uuid.UUID) error // Delete agent deletes the agent. DeleteAgent(uuid.UUID) error // GetActiveAgents gets all of the current active agents. GetActiveAgents() ([]*agentpb.Agent, error) MessageAgents(agentIDs []uuid.UUID, msg []byte) error MessageActiveAgents(msg []byte) error ApplyAgentUpdate(update *Update) error // NewAgentUpdateCursor creates a unique ID for an agent update tracking cursor. // It, when used with GetAgentUpdates, can be used by clients of the agent manager // to get the initial agent state and track updates as deltas to that state. NewAgentUpdateCursor() uuid.UUID // DeleteAgentUpdateCursor deletes a cursor from the Manager so that it no longer // tracks updates. DeleteAgentUpdateCursor(cursorID uuid.UUID) // GetAgentUpdates returns all of the updates that have occurred for agents since // the last invocation of GetAgentUpdates. If GetAgentUpdates has never been called for // a given cursorID, the full initial state will be read first. GetAgentUpdates(cursorID uuid.UUID) ([]*metadata_servicepb.AgentUpdate, *storepb.ComputedSchema, error) // UpdateConfig updates the config for the specified agent. UpdateConfig(string, string, string, string) error // GetComputedSchema gets the computed schemas GetComputedSchema() (*storepb.ComputedSchema, error) // GetAgentIDForHostnamePair gets the agent for the given hostnamePair, if it exists. GetAgentIDForHostnamePair(hnPair *HostnameIPPair) (string, error) // GetServiceCIDR returns the service CIDR for the current cluster. GetServiceCIDR() string // GetPodCIDRs returns the PodCIDRs for the cluster. GetPodCIDRs() []string }
Manager handles any agent updates and requests.
type ManagerImpl ¶
type ManagerImpl struct {
// contains filtered or unexported fields
}
ManagerImpl is an implementation for Manager which talks to the metadata store.
func NewManager ¶
func NewManager(agtStore Store, cidr CIDRInfoProvider, conn *nats.Conn) *ManagerImpl
NewManager creates a new agent manager. TODO (vihang/michelle): Figure out a better solution than passing in the k8s controller. We need the cidr to get CIDR info right now.
func (*ManagerImpl) ApplyAgentUpdate ¶
func (m *ManagerImpl) ApplyAgentUpdate(update *Update) error
ApplyAgentUpdate updates the metadata store with the information from the agent update.
func (*ManagerImpl) DeleteAgent ¶
func (m *ManagerImpl) DeleteAgent(agentID uuid.UUID) error
DeleteAgent deletes the agent with the given ID.
func (*ManagerImpl) DeleteAgentUpdateCursor ¶
func (m *ManagerImpl) DeleteAgentUpdateCursor(cursorID uuid.UUID)
DeleteAgentUpdateCursor deletes a created cursor so that it no longer needs to keep track of agent updates when it's not used anymore.
func (*ManagerImpl) GetActiveAgents ¶
func (m *ManagerImpl) GetActiveAgents() ([]*agentpb.Agent, error)
GetActiveAgents gets all of the current active agents.
func (*ManagerImpl) GetAgentIDForHostnamePair ¶
func (m *ManagerImpl) GetAgentIDForHostnamePair(hnPair *HostnameIPPair) (string, error)
GetAgentIDForHostnamePair gets the agent for the given hostnamePair, if it exists.
func (*ManagerImpl) GetAgentUpdates ¶
func (m *ManagerImpl) GetAgentUpdates(cursorID uuid.UUID) ([]*metadata_servicepb.AgentUpdate, *storepb.ComputedSchema, error)
GetAgentUpdates returns the latest agent status since the last call to GetAgentUpdates(). if the input cursor has never read the initial state before, the full initial agent state is read out. Afterwards, the changes to the agent state are read out as a delta to the previous state.
func (*ManagerImpl) GetComputedSchema ¶
func (m *ManagerImpl) GetComputedSchema() (*storepb.ComputedSchema, error)
GetComputedSchema gets the computed schemas
func (*ManagerImpl) GetPodCIDRs ¶
func (m *ManagerImpl) GetPodCIDRs() []string
GetPodCIDRs returns the PodCIDRs for the cluster.
func (*ManagerImpl) GetServiceCIDR ¶
func (m *ManagerImpl) GetServiceCIDR() string
GetServiceCIDR returns the service CIDR for the current cluster.
func (*ManagerImpl) MessageActiveAgents ¶
func (m *ManagerImpl) MessageActiveAgents(msg []byte) error
MessageActiveAgents sends the message to all active agents.
func (*ManagerImpl) MessageAgents ¶
func (m *ManagerImpl) MessageAgents(agentIDs []uuid.UUID, msg []byte) error
MessageAgents sends the message to the given agentIDs.
func (*ManagerImpl) NewAgentUpdateCursor ¶
func (m *ManagerImpl) NewAgentUpdateCursor() uuid.UUID
NewAgentUpdateCursor creates a new cursor that keeps track of agent state over time.
func (*ManagerImpl) RegisterAgent ¶
func (m *ManagerImpl) RegisterAgent(agent *agentpb.Agent) (uint32, error)
RegisterAgent creates a new agent.
func (*ManagerImpl) UpdateConfig ¶
UpdateConfig updates the config key and value for the specified agent.
func (*ManagerImpl) UpdateHeartbeat ¶
func (m *ManagerImpl) UpdateHeartbeat(agentID uuid.UUID) error
UpdateHeartbeat updates the agent heartbeat with the current time.
type Store ¶
type Store interface { CreateAgent(agentID uuid.UUID, a *agentpb.Agent) error GetAgent(agentID uuid.UUID) (*agentpb.Agent, error) UpdateAgent(agentID uuid.UUID, a *agentpb.Agent) error DeleteAgent(agentID uuid.UUID) error GetAgents() ([]*agentpb.Agent, error) GetASID() (uint32, error) GetAgentIDFromPodName(podName string) (string, error) GetAgentsDataInfo() (map[uuid.UUID]*messagespb.AgentDataInfo, error) UpdateAgentDataInfo(agentID uuid.UUID, dataInfo *messagespb.AgentDataInfo) error GetComputedSchema() (*storepb.ComputedSchema, error) UpdateSchemas(agentID uuid.UUID, schemas []*storepb.TableInfo) error PruneComputedSchema() error GetProcesses(upids []*types.UInt128) ([]*metadatapb.ProcessInfo, error) UpdateProcesses(processes []*metadatapb.ProcessInfo) error GetAgentIDForHostnamePair(hnPair *HostnameIPPair) (string, error) }
Store is the interface that a persistent datastore needs to implement for tracking agent data.
type Update ¶
type Update struct { UpdateInfo *messagespb.AgentUpdateInfo AgentID uuid.UUID }
Update describes the update info for a given agent.