k8smeta

package
v0.0.0-...-73c8340 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 23, 2024 License: Apache-2.0 Imports: 32 Imported by: 0

Documentation

Index

Constants

View Source
const K8sMetadataUpdateChannel = "K8sUpdates"

K8sMetadataUpdateChannel is the channel where metadata updates are sent.

View Source
const KelvinUpdateTopic = "all"

KelvinUpdateTopic is the topic that all kelvins updates are sent on.

Variables

View Source
var (
	// MetadataRequestSubscribeTopic is the channel which the listener is subscribed to for metadata requests.
	MetadataRequestSubscribeTopic = messagebus.C2VTopic("MetadataRequest")

	// MissingMetadataRequestTopic is the channel which the listener should listen to missing metadata update requests on.
	MissingMetadataRequestTopic = "MissingMetadataRequests"
)
View Source
var MetadataUpdatesTopic = messagebus.V2CTopic("DurableMetadataUpdates")

MetadataUpdatesTopic is the channel which the listener publishes metadata updates to.

Functions

func GetContainerUpdatesFromPod

func GetContainerUpdatesFromPod(pod *metadatapb.Pod) []*metadatapb.ContainerUpdate

GetContainerUpdatesFromPod gets the container updates for the given pod.

func UpdatePodLabelStore

func UpdatePodLabelStore(update *storepb.K8SResource, pls PodLabelStore) error

UpdatePodLabelStore reads the pod resource update. If the pod is running, we update the store with new labels. If the pod has finished, we delete its labels in the store.

Types

type Controller

type Controller struct {
	// contains filtered or unexported fields
}

Controller listens to any metadata updates from the K8s API and forwards them to a channel where it can be processed.

func NewController

func NewController(namespaces []string, updateCh chan *K8sResourceMessage) (*Controller, error)

NewController creates a new Controller.

func NewControllerWithClientSet

func NewControllerWithClientSet(namespaces []string, updateCh chan *K8sResourceMessage, clientset kubernetes.Interface) (*Controller, error)

NewControllerWithClientSet creates a new Controller using the given Clientset.

func (*Controller) Stop

func (mc *Controller) Stop()

Stop stops all K8s watchers.

type Datastore

type Datastore struct {
	// contains filtered or unexported fields
}

Datastore implements the Store interface on a given Datastore.

func NewDatastore

NewDatastore wraps the datastore in a metadata store.

func (*Datastore) AddFullResourceUpdate

func (m *Datastore) AddFullResourceUpdate(updateVersion int64, resource *storepb.K8SResource) error

AddFullResourceUpdate stores full resource update with the given update version.

func (*Datastore) AddResourceUpdate

func (m *Datastore) AddResourceUpdate(updateVersion int64, resource *storepb.K8SResourceUpdate) error

AddResourceUpdate stores a resource update that is applicable to all topics.

func (*Datastore) AddResourceUpdateForTopic

func (m *Datastore) AddResourceUpdateForTopic(updateVersion int64, topic string, resource *storepb.K8SResourceUpdate) error

AddResourceUpdateForTopic stores the given resource with its associated updateVersion for 24h.

func (*Datastore) DeletePodLabels

func (m *Datastore) DeletePodLabels(namespace string, podName string) error

DeletePodLabels deletes the labels information associated with a pod.

func (*Datastore) FetchFullResourceUpdates

func (m *Datastore) FetchFullResourceUpdates(from int64, to int64) ([]*storepb.K8SResource, error)

FetchFullResourceUpdates gets the full resource updates from the `from` update version, to the `to` update version (exclusive).

func (*Datastore) FetchPodsWithLabelKey

func (m *Datastore) FetchPodsWithLabelKey(namespace string, key string) ([]string, error)

FetchPodsWithLabelKey gets the names of all the pods that has a certain label key.

func (*Datastore) FetchPodsWithLabels

func (m *Datastore) FetchPodsWithLabels(namespace string, labels map[string]string) ([]string, error)

FetchPodsWithLabels gets the names of all the pods whose labels match exactly all the labels provided.

func (*Datastore) FetchResourceUpdates

func (m *Datastore) FetchResourceUpdates(topic string, from int64, to int64) ([]*storepb.K8SResourceUpdate, error)

FetchResourceUpdates gets the resource updates from the `from` update version, to the `to` update version (exclusive).

func (*Datastore) GetUpdateVersion

func (m *Datastore) GetUpdateVersion(topic string) (int64, error)

GetUpdateVersion gets the last update version sent on a topic.

func (*Datastore) GetWithPrefix

func (m *Datastore) GetWithPrefix(prefix string) ([]string, [][]byte, error)

GetWithPrefix gets all keys and values with the given prefix, for debugging purposes.

func (*Datastore) SetPodLabels

func (m *Datastore) SetPodLabels(namespace string, podName string, labels map[string]string) error

SetPodLabels stores the pod labels information. `<namespace>/<labelKey>/<podName>` is the key and `<labelValue>` is the value. The current implementation assumes that the pod resource update contains the ground truth for all the labels on this pod (verified by playing around with the k8s api). If an old label no longer shows up in the current update, it will be deleted from the PodLabelStore.

func (*Datastore) SetUpdateVersion

func (m *Datastore) SetUpdateVersion(topic string, uv int64) error

SetUpdateVersion sets the last update version sent on a topic.

type DeploymentUpdateProcessor

type DeploymentUpdateProcessor struct{}

DeploymentUpdateProcessor is a processor for deployment updates.

func (*DeploymentUpdateProcessor) GetStoredProtos

func (p *DeploymentUpdateProcessor) GetStoredProtos(obj *storepb.K8SResource) []*storepb.K8SResource

GetStoredProtos gets the update protos that should be persisted.

func (*DeploymentUpdateProcessor) GetUpdatesToSend

func (p *DeploymentUpdateProcessor) GetUpdatesToSend(updates []*StoredUpdate, state *ProcessorState) []*OutgoingUpdate

GetUpdatesToSend gets the resource updates that should be sent out to the agents, along with the agent IPs that the update should be sent to.

func (*DeploymentUpdateProcessor) IsNodeScoped

func (p *DeploymentUpdateProcessor) IsNodeScoped() bool

IsNodeScoped returns whether this update is scoped to specific nodes, or should be sent to all nodes.

func (*DeploymentUpdateProcessor) SetDeleted

func (p *DeploymentUpdateProcessor) SetDeleted(obj *storepb.K8SResource)

SetDeleted sets the deletion timestamp for the object, if there is none already set.

func (*DeploymentUpdateProcessor) ValidateUpdate

func (p *DeploymentUpdateProcessor) ValidateUpdate(obj *storepb.K8SResource, state *ProcessorState) bool

ValidateUpdate checks that the provided service object is valid, and casts it to the correct type.

type EndpointsUpdateProcessor

type EndpointsUpdateProcessor struct{}

EndpointsUpdateProcessor is a processor for endpoints.

func (*EndpointsUpdateProcessor) GetStoredProtos

func (p *EndpointsUpdateProcessor) GetStoredProtos(obj *storepb.K8SResource) []*storepb.K8SResource

GetStoredProtos gets the update protos that should be persisted.

func (*EndpointsUpdateProcessor) GetUpdatesToSend

func (p *EndpointsUpdateProcessor) GetUpdatesToSend(storedUpdates []*StoredUpdate, state *ProcessorState) []*OutgoingUpdate

GetUpdatesToSend gets the resource updates that should be sent out to the agents, along with the agent IPs that the update should be sent to.

func (*EndpointsUpdateProcessor) IsNodeScoped

func (p *EndpointsUpdateProcessor) IsNodeScoped() bool

IsNodeScoped returns whether this update is scoped to specific nodes, or should be sent to all nodes.

func (*EndpointsUpdateProcessor) SetDeleted

func (p *EndpointsUpdateProcessor) SetDeleted(obj *storepb.K8SResource)

SetDeleted sets the deletion timestamp for the object, if there is none already set.

func (*EndpointsUpdateProcessor) ValidateUpdate

func (p *EndpointsUpdateProcessor) ValidateUpdate(obj *storepb.K8SResource, state *ProcessorState) bool

ValidateUpdate checks that the provided endpoints object is valid.

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

Handler handles any incoming k8s updates. It saves the update to the store for persistence, and also sends the update to the relevant IP channel.

func NewHandler

func NewHandler(updateCh <-chan *K8sResourceMessage, mds Store, pls PodLabelStore, conn *nats.Conn) *Handler

NewHandler creates a new Handler.

func (*Handler) GetPodCIDRs

func (m *Handler) GetPodCIDRs() []string

GetPodCIDRs returns the PodCIDRs for the cluster.

func (*Handler) GetServiceCIDR

func (m *Handler) GetServiceCIDR() string

GetServiceCIDR returns the service CIDR for the current cluster.

func (*Handler) GetUpdatesForIP

func (m *Handler) GetUpdatesForIP(ip string, from int64, to int64) ([]*metadatapb.ResourceUpdate, error)

GetUpdatesForIP gets all known resource updates for the IP in the given range.

func (*Handler) Stop

func (m *Handler) Stop()

Stop stops processing incoming k8s metadata updates.

type K8sResourceMessage

type K8sResourceMessage struct {
	Object     *storepb.K8SResource
	ObjectType string
	EventType  watch.EventType
}

K8sResourceMessage is a message for K8s metadata events/updates.

type MetadataTopicListener

type MetadataTopicListener struct {
	// contains filtered or unexported fields
}

MetadataTopicListener is responsible for listening to and handling messages on the metadata update topic.

func NewMetadataTopicListener

func NewMetadataTopicListener(newMdHandler *Handler, sendMsgFn SendMessageFn) (*MetadataTopicListener, error)

NewMetadataTopicListener creates a new metadata topic listener.

func (*MetadataTopicListener) HandleMessage

func (m *MetadataTopicListener) HandleMessage(msg *nats.Msg) error

HandleMessage handles a message on the agent topic.

func (*MetadataTopicListener) Initialize

func (m *MetadataTopicListener) Initialize() error

Initialize handles any setup that needs to be done.

func (*MetadataTopicListener) ProcessMessage

func (m *MetadataTopicListener) ProcessMessage(msg *nats.Msg) error

ProcessMessage processes a single message in the metadata topic.

func (*MetadataTopicListener) Stop

func (m *MetadataTopicListener) Stop()

Stop stops processing any metadata messagespb.

type NamespaceUpdateProcessor

type NamespaceUpdateProcessor struct{}

NamespaceUpdateProcessor is a processor for namespaces.

func (*NamespaceUpdateProcessor) GetStoredProtos

func (p *NamespaceUpdateProcessor) GetStoredProtos(obj *storepb.K8SResource) []*storepb.K8SResource

GetStoredProtos gets the update protos that should be persisted.

func (*NamespaceUpdateProcessor) GetUpdatesToSend

func (p *NamespaceUpdateProcessor) GetUpdatesToSend(storedUpdates []*StoredUpdate, state *ProcessorState) []*OutgoingUpdate

GetUpdatesToSend gets the resource updates that should be sent out to the agents, along with the agent IPs that the update should be sent to.

func (*NamespaceUpdateProcessor) IsNodeScoped

func (p *NamespaceUpdateProcessor) IsNodeScoped() bool

IsNodeScoped returns whether this update is scoped to specific nodes, or should be sent to all nodes.

func (*NamespaceUpdateProcessor) SetDeleted

func (p *NamespaceUpdateProcessor) SetDeleted(obj *storepb.K8SResource)

SetDeleted sets the deletion timestamp for the object, if there is none already set.

func (*NamespaceUpdateProcessor) ValidateUpdate

func (p *NamespaceUpdateProcessor) ValidateUpdate(obj *storepb.K8SResource, state *ProcessorState) bool

ValidateUpdate checks that the provided namespace object is valid, and casts it to the correct type.

type NodeUpdateProcessor

type NodeUpdateProcessor struct{}

NodeUpdateProcessor is a processor for nodes.

func (*NodeUpdateProcessor) GetStoredProtos

func (p *NodeUpdateProcessor) GetStoredProtos(obj *storepb.K8SResource) []*storepb.K8SResource

GetStoredProtos gets the update protos that should be persisted.

func (*NodeUpdateProcessor) GetUpdatesToSend

func (p *NodeUpdateProcessor) GetUpdatesToSend(updates []*StoredUpdate, state *ProcessorState) []*OutgoingUpdate

GetUpdatesToSend gets the resource updates that should be sent out to the agents, along with the agent IPs that the update should be sent to.

func (*NodeUpdateProcessor) IsNodeScoped

func (p *NodeUpdateProcessor) IsNodeScoped() bool

IsNodeScoped returns whether this update is scoped to specific nodes, or should be sent to all nodes.

func (*NodeUpdateProcessor) SetDeleted

func (p *NodeUpdateProcessor) SetDeleted(obj *storepb.K8SResource)

SetDeleted sets the deletion timestamp for the object, if there is none already set.

func (*NodeUpdateProcessor) ValidateUpdate

func (p *NodeUpdateProcessor) ValidateUpdate(obj *storepb.K8SResource, state *ProcessorState) bool

ValidateUpdate checks that the provided service object is valid, and casts it to the correct type.

type OutgoingUpdate

type OutgoingUpdate struct {
	// Update is the ResourceUpdate that should be sent out.
	Update *metadatapb.ResourceUpdate
	// Topics are the channels that the update should be sent to. These are usually IPs, to be sent to specific
	// agents.
	Topics []string
}

OutgoingUpdate is an outgoing resource message that should be sent to NATS on the provided channels.

type PodLabelStore

type PodLabelStore interface {
	// SetPodLabels stores the pod labels information. `<namespace>/<labelKey>/<podName>` is the key and
	// `<labelValue>` is the value.
	SetPodLabels(namespace string, podName string, labels map[string]string) error
	// DeletePodLabels deletes the labels information associated with a pod.
	DeletePodLabels(namespace string, podName string) error
	// FetchPodsWithLabelKey gets the names of all the pods that has a certain label key.
	FetchPodsWithLabelKey(namespace string, key string) ([]string, error)
	// FetchPodsWithLabels gets the names of all the pods whose labels match exactly all the labels provided.
	FetchPodsWithLabels(namespace string, labels map[string]string) ([]string, error)

	// GetWithPrefix gets all keys and values with the given prefix, for debugging purposes.
	GetWithPrefix(prefix string) ([]string, [][]byte, error)
}

PodLabelStore handles storing and fetching data of pods and their associated labels.

type PodUpdateProcessor

type PodUpdateProcessor struct{}

PodUpdateProcessor is a processor for pods.

func (*PodUpdateProcessor) GetStoredProtos

func (p *PodUpdateProcessor) GetStoredProtos(obj *storepb.K8SResource) []*storepb.K8SResource

GetStoredProtos gets the update protos that should be persisted.

func (*PodUpdateProcessor) GetUpdatesToSend

func (p *PodUpdateProcessor) GetUpdatesToSend(storedUpdates []*StoredUpdate, state *ProcessorState) []*OutgoingUpdate

GetUpdatesToSend gets the resource updates that should be sent out to the agents, along with the agent IPs that the update should be sent to.

func (*PodUpdateProcessor) IsNodeScoped

func (p *PodUpdateProcessor) IsNodeScoped() bool

IsNodeScoped returns whether this update is scoped to specific nodes, or should be sent to all nodes.

func (*PodUpdateProcessor) SetDeleted

func (p *PodUpdateProcessor) SetDeleted(obj *storepb.K8SResource)

SetDeleted sets the deletion timestamp for the object, if there is none already set.

func (*PodUpdateProcessor) ValidateUpdate

func (p *PodUpdateProcessor) ValidateUpdate(obj *storepb.K8SResource, state *ProcessorState) bool

ValidateUpdate checks that the provided pod object is valid, and casts it to the correct type.

type ProcessorState

type ProcessorState struct {
	// This is a cache of all the leader election message. This is used to avoid sending excessive
	// endpoints updates.
	LeaderMsgs  map[string]*metadatapb.Endpoints
	ServiceCIDR *net.IPNet // This is the service CIDR block; it is inferred from all observed service IPs.
	PodCIDRs    []string   // The pod CIDRs in the cluster, inferred from each node's reported pod CIDR.
	// A map from node name to its internal IP.
	NodeToIP map[string]string
	// A map from pod name to its IP.
	PodToIP map[string]string
}

ProcessorState is data that should be shared across update processors. It can contain that needs to be cached across all updates, such as CIDRs or tracking leader election messages.

type ReplicaSetUpdateProcessor

type ReplicaSetUpdateProcessor struct{}

ReplicaSetUpdateProcessor is a processor for replicasets.

func (*ReplicaSetUpdateProcessor) GetStoredProtos

func (p *ReplicaSetUpdateProcessor) GetStoredProtos(obj *storepb.K8SResource) []*storepb.K8SResource

GetStoredProtos gets the update protos that should be persisted.

func (*ReplicaSetUpdateProcessor) GetUpdatesToSend

func (p *ReplicaSetUpdateProcessor) GetUpdatesToSend(updates []*StoredUpdate, state *ProcessorState) []*OutgoingUpdate

GetUpdatesToSend gets the resource updates that should be sent out to the agents, along with the agent IPs that the update should be sent to.

func (*ReplicaSetUpdateProcessor) IsNodeScoped

func (p *ReplicaSetUpdateProcessor) IsNodeScoped() bool

IsNodeScoped returns whether this update is scoped to specific nodes, or should be sent to all nodes.

func (*ReplicaSetUpdateProcessor) SetDeleted

func (p *ReplicaSetUpdateProcessor) SetDeleted(obj *storepb.K8SResource)

SetDeleted sets the deletion timestamp for the object, if there is none already set.

func (*ReplicaSetUpdateProcessor) ValidateUpdate

func (p *ReplicaSetUpdateProcessor) ValidateUpdate(obj *storepb.K8SResource, state *ProcessorState) bool

ValidateUpdate checks that the provided service object is valid, and casts it to the correct type.

type SendMessageFn

type SendMessageFn func(string, []byte) error

SendMessageFn is the function the TopicListener uses to publish messages back to NATS.

type ServiceUpdateProcessor

type ServiceUpdateProcessor struct{}

ServiceUpdateProcessor is a processor for services.

func (*ServiceUpdateProcessor) GetStoredProtos

func (p *ServiceUpdateProcessor) GetStoredProtos(obj *storepb.K8SResource) []*storepb.K8SResource

GetStoredProtos gets the update protos that should be persisted.

func (*ServiceUpdateProcessor) GetUpdatesToSend

func (p *ServiceUpdateProcessor) GetUpdatesToSend(storedUpdates []*StoredUpdate, state *ProcessorState) []*OutgoingUpdate

GetUpdatesToSend gets the resource updates that should be sent out to the agents, along with the agent IPs that the update should be sent to.

func (*ServiceUpdateProcessor) IsNodeScoped

func (p *ServiceUpdateProcessor) IsNodeScoped() bool

IsNodeScoped returns whether this update is scoped to specific nodes, or should be sent to all nodes.

func (*ServiceUpdateProcessor) SetDeleted

func (p *ServiceUpdateProcessor) SetDeleted(obj *storepb.K8SResource)

SetDeleted sets the deletion timestamp for the object, if there is none already set.

func (*ServiceUpdateProcessor) ValidateUpdate

func (p *ServiceUpdateProcessor) ValidateUpdate(obj *storepb.K8SResource, state *ProcessorState) bool

ValidateUpdate checks that the provided service object is valid, and casts it to the correct type.

type Store

type Store interface {
	// AddResourceUpdateForTopic stores the given resource with its associated updateVersion for 24h.
	AddResourceUpdateForTopic(updateVersion int64, topic string, resource *storepb.K8SResourceUpdate) error
	// AddResourceUpdate stores a resource update that is applicable to all topics.
	AddResourceUpdate(updateVersion int64, resource *storepb.K8SResourceUpdate) error
	// AddFullResourceUpdate stores full resource update with the given update version.
	AddFullResourceUpdate(updateversion int64, resource *storepb.K8SResource) error
	// FetchFullResourceUpdates gets the full resource updates from the `from` update version, to the `to`
	// update version (exclusive).
	FetchFullResourceUpdates(from int64, to int64) ([]*storepb.K8SResource, error)
	// FetchResourceUpdates gets the resource updates from the `from` update version, to the `to`
	// update version (exclusive).
	FetchResourceUpdates(topic string, from int64, to int64) ([]*storepb.K8SResourceUpdate, error)

	// GetUpdateVersion gets the last update version sent on a topic.
	GetUpdateVersion(topic string) (int64, error)
	// SetUpdateVersion sets the last update version sent on a topic.
	SetUpdateVersion(topic string, uv int64) error
}

Store handles storing and fetching any data related to K8s resources.

type StoredUpdate

type StoredUpdate struct {
	// Update is the resource update that should be stored.
	Update *storepb.K8SResource
	// UpdateVersion is the update version of the update.
	UpdateVersion int64
}

StoredUpdate is a K8s resource update that should be persisted with its update version.

type UpdateProcessor

type UpdateProcessor interface {
	// SetDeleted sets the deletion timestamp for the object, if there is none already set.
	SetDeleted(*storepb.K8SResource)
	// ValidateUpdate checks whether the update is valid and should be further processed.
	ValidateUpdate(*storepb.K8SResource, *ProcessorState) bool
	// GetStoredProtos gets the protos that should be persisted in the data store, derived from
	// the given update.
	GetStoredProtos(*storepb.K8SResource) []*storepb.K8SResource
	// IsNodeScoped returns whether this update is scoped to specific nodes, or should be sent to all nodes.
	IsNodeScoped() bool
	// GetUpdatesToSend gets all of the updates that should be sent to the agents, along with the relevant IPs that
	// the updates should be sent to.
	GetUpdatesToSend([]*StoredUpdate, *ProcessorState) []*OutgoingUpdate
}

An UpdateProcessor is responsible for processing an incoming update, such as determining what updates should be persisted and sent to NATS.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL