Documentation ¶
Index ¶
- Constants
- Variables
- func GetContainerUpdatesFromPod(pod *metadatapb.Pod) []*metadatapb.ContainerUpdate
- func UpdatePodLabelStore(update *storepb.K8SResource, pls PodLabelStore) error
- type Controller
- type Datastore
- func (m *Datastore) AddFullResourceUpdate(updateVersion int64, resource *storepb.K8SResource) error
- func (m *Datastore) AddResourceUpdate(updateVersion int64, resource *storepb.K8SResourceUpdate) error
- func (m *Datastore) AddResourceUpdateForTopic(updateVersion int64, topic string, resource *storepb.K8SResourceUpdate) error
- func (m *Datastore) DeletePodLabels(namespace string, podName string) error
- func (m *Datastore) FetchFullResourceUpdates(from int64, to int64) ([]*storepb.K8SResource, error)
- func (m *Datastore) FetchPodsWithLabelKey(namespace string, key string) ([]string, error)
- func (m *Datastore) FetchPodsWithLabels(namespace string, labels map[string]string) ([]string, error)
- func (m *Datastore) FetchResourceUpdates(topic string, from int64, to int64) ([]*storepb.K8SResourceUpdate, error)
- func (m *Datastore) GetUpdateVersion(topic string) (int64, error)
- func (m *Datastore) GetWithPrefix(prefix string) ([]string, [][]byte, error)
- func (m *Datastore) SetPodLabels(namespace string, podName string, labels map[string]string) error
- func (m *Datastore) SetUpdateVersion(topic string, uv int64) error
- type DeploymentUpdateProcessor
- func (p *DeploymentUpdateProcessor) GetStoredProtos(obj *storepb.K8SResource) []*storepb.K8SResource
- func (p *DeploymentUpdateProcessor) GetUpdatesToSend(updates []*StoredUpdate, state *ProcessorState) []*OutgoingUpdate
- func (p *DeploymentUpdateProcessor) IsNodeScoped() bool
- func (p *DeploymentUpdateProcessor) SetDeleted(obj *storepb.K8SResource)
- func (p *DeploymentUpdateProcessor) ValidateUpdate(obj *storepb.K8SResource, state *ProcessorState) bool
- type EndpointsUpdateProcessor
- func (p *EndpointsUpdateProcessor) GetStoredProtos(obj *storepb.K8SResource) []*storepb.K8SResource
- func (p *EndpointsUpdateProcessor) GetUpdatesToSend(storedUpdates []*StoredUpdate, state *ProcessorState) []*OutgoingUpdate
- func (p *EndpointsUpdateProcessor) IsNodeScoped() bool
- func (p *EndpointsUpdateProcessor) SetDeleted(obj *storepb.K8SResource)
- func (p *EndpointsUpdateProcessor) ValidateUpdate(obj *storepb.K8SResource, state *ProcessorState) bool
- type Handler
- type K8sResourceMessage
- type MetadataTopicListener
- type NamespaceUpdateProcessor
- func (p *NamespaceUpdateProcessor) GetStoredProtos(obj *storepb.K8SResource) []*storepb.K8SResource
- func (p *NamespaceUpdateProcessor) GetUpdatesToSend(storedUpdates []*StoredUpdate, state *ProcessorState) []*OutgoingUpdate
- func (p *NamespaceUpdateProcessor) IsNodeScoped() bool
- func (p *NamespaceUpdateProcessor) SetDeleted(obj *storepb.K8SResource)
- func (p *NamespaceUpdateProcessor) ValidateUpdate(obj *storepb.K8SResource, state *ProcessorState) bool
- type NodeUpdateProcessor
- func (p *NodeUpdateProcessor) GetStoredProtos(obj *storepb.K8SResource) []*storepb.K8SResource
- func (p *NodeUpdateProcessor) GetUpdatesToSend(updates []*StoredUpdate, state *ProcessorState) []*OutgoingUpdate
- func (p *NodeUpdateProcessor) IsNodeScoped() bool
- func (p *NodeUpdateProcessor) SetDeleted(obj *storepb.K8SResource)
- func (p *NodeUpdateProcessor) ValidateUpdate(obj *storepb.K8SResource, state *ProcessorState) bool
- type OutgoingUpdate
- type PodLabelStore
- type PodUpdateProcessor
- func (p *PodUpdateProcessor) GetStoredProtos(obj *storepb.K8SResource) []*storepb.K8SResource
- func (p *PodUpdateProcessor) GetUpdatesToSend(storedUpdates []*StoredUpdate, state *ProcessorState) []*OutgoingUpdate
- func (p *PodUpdateProcessor) IsNodeScoped() bool
- func (p *PodUpdateProcessor) SetDeleted(obj *storepb.K8SResource)
- func (p *PodUpdateProcessor) ValidateUpdate(obj *storepb.K8SResource, state *ProcessorState) bool
- type ProcessorState
- type ReplicaSetUpdateProcessor
- func (p *ReplicaSetUpdateProcessor) GetStoredProtos(obj *storepb.K8SResource) []*storepb.K8SResource
- func (p *ReplicaSetUpdateProcessor) GetUpdatesToSend(updates []*StoredUpdate, state *ProcessorState) []*OutgoingUpdate
- func (p *ReplicaSetUpdateProcessor) IsNodeScoped() bool
- func (p *ReplicaSetUpdateProcessor) SetDeleted(obj *storepb.K8SResource)
- func (p *ReplicaSetUpdateProcessor) ValidateUpdate(obj *storepb.K8SResource, state *ProcessorState) bool
- type SendMessageFn
- type ServiceUpdateProcessor
- func (p *ServiceUpdateProcessor) GetStoredProtos(obj *storepb.K8SResource) []*storepb.K8SResource
- func (p *ServiceUpdateProcessor) GetUpdatesToSend(storedUpdates []*StoredUpdate, state *ProcessorState) []*OutgoingUpdate
- func (p *ServiceUpdateProcessor) IsNodeScoped() bool
- func (p *ServiceUpdateProcessor) SetDeleted(obj *storepb.K8SResource)
- func (p *ServiceUpdateProcessor) ValidateUpdate(obj *storepb.K8SResource, state *ProcessorState) bool
- type Store
- type StoredUpdate
- type UpdateProcessor
Constants ¶
const K8sMetadataUpdateChannel = "K8sUpdates"
K8sMetadataUpdateChannel is the channel where metadata updates are sent.
const KelvinUpdateTopic = "all"
KelvinUpdateTopic is the topic that all kelvins updates are sent on.
Variables ¶
var ( // MetadataRequestSubscribeTopic is the channel which the listener is subscribed to for metadata requests. MetadataRequestSubscribeTopic = messagebus.C2VTopic("MetadataRequest") // MissingMetadataRequestTopic is the channel which the listener should listen to missing metadata update requests on. MissingMetadataRequestTopic = "MissingMetadataRequests" )
var MetadataUpdatesTopic = messagebus.V2CTopic("DurableMetadataUpdates")
MetadataUpdatesTopic is the channel which the listener publishes metadata updates to.
Functions ¶
func GetContainerUpdatesFromPod ¶
func GetContainerUpdatesFromPod(pod *metadatapb.Pod) []*metadatapb.ContainerUpdate
GetContainerUpdatesFromPod gets the container updates for the given pod.
func UpdatePodLabelStore ¶
func UpdatePodLabelStore(update *storepb.K8SResource, pls PodLabelStore) error
UpdatePodLabelStore reads the pod resource update. If the pod is running, we update the store with new labels. If the pod has finished, we delete its labels in the store.
Types ¶
type Controller ¶
type Controller struct {
// contains filtered or unexported fields
}
Controller listens to any metadata updates from the K8s API and forwards them to a channel where it can be processed.
func NewController ¶
func NewController(namespaces []string, updateCh chan *K8sResourceMessage) (*Controller, error)
NewController creates a new Controller.
func NewControllerWithClientSet ¶
func NewControllerWithClientSet(namespaces []string, updateCh chan *K8sResourceMessage, clientset kubernetes.Interface) (*Controller, error)
NewControllerWithClientSet creates a new Controller using the given Clientset.
type Datastore ¶
type Datastore struct {
// contains filtered or unexported fields
}
Datastore implements the Store interface on a given Datastore.
func NewDatastore ¶
func NewDatastore(ds datastore.MultiGetterSetterDeleterCloser) *Datastore
NewDatastore wraps the datastore in a metadata store.
func (*Datastore) AddFullResourceUpdate ¶
func (m *Datastore) AddFullResourceUpdate(updateVersion int64, resource *storepb.K8SResource) error
AddFullResourceUpdate stores full resource update with the given update version.
func (*Datastore) AddResourceUpdate ¶
func (m *Datastore) AddResourceUpdate(updateVersion int64, resource *storepb.K8SResourceUpdate) error
AddResourceUpdate stores a resource update that is applicable to all topics.
func (*Datastore) AddResourceUpdateForTopic ¶
func (m *Datastore) AddResourceUpdateForTopic(updateVersion int64, topic string, resource *storepb.K8SResourceUpdate) error
AddResourceUpdateForTopic stores the given resource with its associated updateVersion for 24h.
func (*Datastore) DeletePodLabels ¶
DeletePodLabels deletes the labels information associated with a pod.
func (*Datastore) FetchFullResourceUpdates ¶
FetchFullResourceUpdates gets the full resource updates from the `from` update version, to the `to` update version (exclusive).
func (*Datastore) FetchPodsWithLabelKey ¶
FetchPodsWithLabelKey gets the names of all the pods that has a certain label key.
func (*Datastore) FetchPodsWithLabels ¶
func (m *Datastore) FetchPodsWithLabels(namespace string, labels map[string]string) ([]string, error)
FetchPodsWithLabels gets the names of all the pods whose labels match exactly all the labels provided.
func (*Datastore) FetchResourceUpdates ¶
func (m *Datastore) FetchResourceUpdates(topic string, from int64, to int64) ([]*storepb.K8SResourceUpdate, error)
FetchResourceUpdates gets the resource updates from the `from` update version, to the `to` update version (exclusive).
func (*Datastore) GetUpdateVersion ¶
GetUpdateVersion gets the last update version sent on a topic.
func (*Datastore) GetWithPrefix ¶
GetWithPrefix gets all keys and values with the given prefix, for debugging purposes.
func (*Datastore) SetPodLabels ¶
SetPodLabels stores the pod labels information. `<namespace>/<labelKey>/<podName>` is the key and `<labelValue>` is the value. The current implementation assumes that the pod resource update contains the ground truth for all the labels on this pod (verified by playing around with the k8s api). If an old label no longer shows up in the current update, it will be deleted from the PodLabelStore.
type DeploymentUpdateProcessor ¶
type DeploymentUpdateProcessor struct{}
DeploymentUpdateProcessor is a processor for deployment updates.
func (*DeploymentUpdateProcessor) GetStoredProtos ¶
func (p *DeploymentUpdateProcessor) GetStoredProtos(obj *storepb.K8SResource) []*storepb.K8SResource
GetStoredProtos gets the update protos that should be persisted.
func (*DeploymentUpdateProcessor) GetUpdatesToSend ¶
func (p *DeploymentUpdateProcessor) GetUpdatesToSend(updates []*StoredUpdate, state *ProcessorState) []*OutgoingUpdate
GetUpdatesToSend gets the resource updates that should be sent out to the agents, along with the agent IPs that the update should be sent to.
func (*DeploymentUpdateProcessor) IsNodeScoped ¶
func (p *DeploymentUpdateProcessor) IsNodeScoped() bool
IsNodeScoped returns whether this update is scoped to specific nodes, or should be sent to all nodes.
func (*DeploymentUpdateProcessor) SetDeleted ¶
func (p *DeploymentUpdateProcessor) SetDeleted(obj *storepb.K8SResource)
SetDeleted sets the deletion timestamp for the object, if there is none already set.
func (*DeploymentUpdateProcessor) ValidateUpdate ¶
func (p *DeploymentUpdateProcessor) ValidateUpdate(obj *storepb.K8SResource, state *ProcessorState) bool
ValidateUpdate checks that the provided service object is valid, and casts it to the correct type.
type EndpointsUpdateProcessor ¶
type EndpointsUpdateProcessor struct{}
EndpointsUpdateProcessor is a processor for endpoints.
func (*EndpointsUpdateProcessor) GetStoredProtos ¶
func (p *EndpointsUpdateProcessor) GetStoredProtos(obj *storepb.K8SResource) []*storepb.K8SResource
GetStoredProtos gets the update protos that should be persisted.
func (*EndpointsUpdateProcessor) GetUpdatesToSend ¶
func (p *EndpointsUpdateProcessor) GetUpdatesToSend(storedUpdates []*StoredUpdate, state *ProcessorState) []*OutgoingUpdate
GetUpdatesToSend gets the resource updates that should be sent out to the agents, along with the agent IPs that the update should be sent to.
func (*EndpointsUpdateProcessor) IsNodeScoped ¶
func (p *EndpointsUpdateProcessor) IsNodeScoped() bool
IsNodeScoped returns whether this update is scoped to specific nodes, or should be sent to all nodes.
func (*EndpointsUpdateProcessor) SetDeleted ¶
func (p *EndpointsUpdateProcessor) SetDeleted(obj *storepb.K8SResource)
SetDeleted sets the deletion timestamp for the object, if there is none already set.
func (*EndpointsUpdateProcessor) ValidateUpdate ¶
func (p *EndpointsUpdateProcessor) ValidateUpdate(obj *storepb.K8SResource, state *ProcessorState) bool
ValidateUpdate checks that the provided endpoints object is valid.
type Handler ¶
type Handler struct {
// contains filtered or unexported fields
}
Handler handles any incoming k8s updates. It saves the update to the store for persistence, and also sends the update to the relevant IP channel.
func NewHandler ¶
func NewHandler(updateCh <-chan *K8sResourceMessage, mds Store, pls PodLabelStore, conn *nats.Conn) *Handler
NewHandler creates a new Handler.
func (*Handler) GetPodCIDRs ¶
GetPodCIDRs returns the PodCIDRs for the cluster.
func (*Handler) GetServiceCIDR ¶
GetServiceCIDR returns the service CIDR for the current cluster.
func (*Handler) GetUpdatesForIP ¶
func (m *Handler) GetUpdatesForIP(ip string, from int64, to int64) ([]*metadatapb.ResourceUpdate, error)
GetUpdatesForIP gets all known resource updates for the IP in the given range.
type K8sResourceMessage ¶
type K8sResourceMessage struct { Object *storepb.K8SResource ObjectType string EventType watch.EventType }
K8sResourceMessage is a message for K8s metadata events/updates.
type MetadataTopicListener ¶
type MetadataTopicListener struct {
// contains filtered or unexported fields
}
MetadataTopicListener is responsible for listening to and handling messages on the metadata update topic.
func NewMetadataTopicListener ¶
func NewMetadataTopicListener(newMdHandler *Handler, sendMsgFn SendMessageFn) (*MetadataTopicListener, error)
NewMetadataTopicListener creates a new metadata topic listener.
func (*MetadataTopicListener) HandleMessage ¶
func (m *MetadataTopicListener) HandleMessage(msg *nats.Msg) error
HandleMessage handles a message on the agent topic.
func (*MetadataTopicListener) Initialize ¶
func (m *MetadataTopicListener) Initialize() error
Initialize handles any setup that needs to be done.
func (*MetadataTopicListener) ProcessMessage ¶
func (m *MetadataTopicListener) ProcessMessage(msg *nats.Msg) error
ProcessMessage processes a single message in the metadata topic.
func (*MetadataTopicListener) Stop ¶
func (m *MetadataTopicListener) Stop()
Stop stops processing any metadata messagespb.
type NamespaceUpdateProcessor ¶
type NamespaceUpdateProcessor struct{}
NamespaceUpdateProcessor is a processor for namespaces.
func (*NamespaceUpdateProcessor) GetStoredProtos ¶
func (p *NamespaceUpdateProcessor) GetStoredProtos(obj *storepb.K8SResource) []*storepb.K8SResource
GetStoredProtos gets the update protos that should be persisted.
func (*NamespaceUpdateProcessor) GetUpdatesToSend ¶
func (p *NamespaceUpdateProcessor) GetUpdatesToSend(storedUpdates []*StoredUpdate, state *ProcessorState) []*OutgoingUpdate
GetUpdatesToSend gets the resource updates that should be sent out to the agents, along with the agent IPs that the update should be sent to.
func (*NamespaceUpdateProcessor) IsNodeScoped ¶
func (p *NamespaceUpdateProcessor) IsNodeScoped() bool
IsNodeScoped returns whether this update is scoped to specific nodes, or should be sent to all nodes.
func (*NamespaceUpdateProcessor) SetDeleted ¶
func (p *NamespaceUpdateProcessor) SetDeleted(obj *storepb.K8SResource)
SetDeleted sets the deletion timestamp for the object, if there is none already set.
func (*NamespaceUpdateProcessor) ValidateUpdate ¶
func (p *NamespaceUpdateProcessor) ValidateUpdate(obj *storepb.K8SResource, state *ProcessorState) bool
ValidateUpdate checks that the provided namespace object is valid, and casts it to the correct type.
type NodeUpdateProcessor ¶
type NodeUpdateProcessor struct{}
NodeUpdateProcessor is a processor for nodes.
func (*NodeUpdateProcessor) GetStoredProtos ¶
func (p *NodeUpdateProcessor) GetStoredProtos(obj *storepb.K8SResource) []*storepb.K8SResource
GetStoredProtos gets the update protos that should be persisted.
func (*NodeUpdateProcessor) GetUpdatesToSend ¶
func (p *NodeUpdateProcessor) GetUpdatesToSend(updates []*StoredUpdate, state *ProcessorState) []*OutgoingUpdate
GetUpdatesToSend gets the resource updates that should be sent out to the agents, along with the agent IPs that the update should be sent to.
func (*NodeUpdateProcessor) IsNodeScoped ¶
func (p *NodeUpdateProcessor) IsNodeScoped() bool
IsNodeScoped returns whether this update is scoped to specific nodes, or should be sent to all nodes.
func (*NodeUpdateProcessor) SetDeleted ¶
func (p *NodeUpdateProcessor) SetDeleted(obj *storepb.K8SResource)
SetDeleted sets the deletion timestamp for the object, if there is none already set.
func (*NodeUpdateProcessor) ValidateUpdate ¶
func (p *NodeUpdateProcessor) ValidateUpdate(obj *storepb.K8SResource, state *ProcessorState) bool
ValidateUpdate checks that the provided service object is valid, and casts it to the correct type.
type OutgoingUpdate ¶
type OutgoingUpdate struct { // Update is the ResourceUpdate that should be sent out. Update *metadatapb.ResourceUpdate // Topics are the channels that the update should be sent to. These are usually IPs, to be sent to specific // agents. Topics []string }
OutgoingUpdate is an outgoing resource message that should be sent to NATS on the provided channels.
type PodLabelStore ¶
type PodLabelStore interface { // SetPodLabels stores the pod labels information. `<namespace>/<labelKey>/<podName>` is the key and // `<labelValue>` is the value. SetPodLabels(namespace string, podName string, labels map[string]string) error // DeletePodLabels deletes the labels information associated with a pod. DeletePodLabels(namespace string, podName string) error // FetchPodsWithLabelKey gets the names of all the pods that has a certain label key. FetchPodsWithLabelKey(namespace string, key string) ([]string, error) // FetchPodsWithLabels gets the names of all the pods whose labels match exactly all the labels provided. FetchPodsWithLabels(namespace string, labels map[string]string) ([]string, error) // GetWithPrefix gets all keys and values with the given prefix, for debugging purposes. GetWithPrefix(prefix string) ([]string, [][]byte, error) }
PodLabelStore handles storing and fetching data of pods and their associated labels.
type PodUpdateProcessor ¶
type PodUpdateProcessor struct{}
PodUpdateProcessor is a processor for pods.
func (*PodUpdateProcessor) GetStoredProtos ¶
func (p *PodUpdateProcessor) GetStoredProtos(obj *storepb.K8SResource) []*storepb.K8SResource
GetStoredProtos gets the update protos that should be persisted.
func (*PodUpdateProcessor) GetUpdatesToSend ¶
func (p *PodUpdateProcessor) GetUpdatesToSend(storedUpdates []*StoredUpdate, state *ProcessorState) []*OutgoingUpdate
GetUpdatesToSend gets the resource updates that should be sent out to the agents, along with the agent IPs that the update should be sent to.
func (*PodUpdateProcessor) IsNodeScoped ¶
func (p *PodUpdateProcessor) IsNodeScoped() bool
IsNodeScoped returns whether this update is scoped to specific nodes, or should be sent to all nodes.
func (*PodUpdateProcessor) SetDeleted ¶
func (p *PodUpdateProcessor) SetDeleted(obj *storepb.K8SResource)
SetDeleted sets the deletion timestamp for the object, if there is none already set.
func (*PodUpdateProcessor) ValidateUpdate ¶
func (p *PodUpdateProcessor) ValidateUpdate(obj *storepb.K8SResource, state *ProcessorState) bool
ValidateUpdate checks that the provided pod object is valid, and casts it to the correct type.
type ProcessorState ¶
type ProcessorState struct { // This is a cache of all the leader election message. This is used to avoid sending excessive // endpoints updates. LeaderMsgs map[string]*metadatapb.Endpoints ServiceCIDR *net.IPNet // This is the service CIDR block; it is inferred from all observed service IPs. PodCIDRs []string // The pod CIDRs in the cluster, inferred from each node's reported pod CIDR. // A map from node name to its internal IP. NodeToIP map[string]string // A map from pod name to its IP. PodToIP map[string]string }
ProcessorState is data that should be shared across update processors. It can contain that needs to be cached across all updates, such as CIDRs or tracking leader election messages.
type ReplicaSetUpdateProcessor ¶
type ReplicaSetUpdateProcessor struct{}
ReplicaSetUpdateProcessor is a processor for replicasets.
func (*ReplicaSetUpdateProcessor) GetStoredProtos ¶
func (p *ReplicaSetUpdateProcessor) GetStoredProtos(obj *storepb.K8SResource) []*storepb.K8SResource
GetStoredProtos gets the update protos that should be persisted.
func (*ReplicaSetUpdateProcessor) GetUpdatesToSend ¶
func (p *ReplicaSetUpdateProcessor) GetUpdatesToSend(updates []*StoredUpdate, state *ProcessorState) []*OutgoingUpdate
GetUpdatesToSend gets the resource updates that should be sent out to the agents, along with the agent IPs that the update should be sent to.
func (*ReplicaSetUpdateProcessor) IsNodeScoped ¶
func (p *ReplicaSetUpdateProcessor) IsNodeScoped() bool
IsNodeScoped returns whether this update is scoped to specific nodes, or should be sent to all nodes.
func (*ReplicaSetUpdateProcessor) SetDeleted ¶
func (p *ReplicaSetUpdateProcessor) SetDeleted(obj *storepb.K8SResource)
SetDeleted sets the deletion timestamp for the object, if there is none already set.
func (*ReplicaSetUpdateProcessor) ValidateUpdate ¶
func (p *ReplicaSetUpdateProcessor) ValidateUpdate(obj *storepb.K8SResource, state *ProcessorState) bool
ValidateUpdate checks that the provided service object is valid, and casts it to the correct type.
type SendMessageFn ¶
SendMessageFn is the function the TopicListener uses to publish messages back to NATS.
type ServiceUpdateProcessor ¶
type ServiceUpdateProcessor struct{}
ServiceUpdateProcessor is a processor for services.
func (*ServiceUpdateProcessor) GetStoredProtos ¶
func (p *ServiceUpdateProcessor) GetStoredProtos(obj *storepb.K8SResource) []*storepb.K8SResource
GetStoredProtos gets the update protos that should be persisted.
func (*ServiceUpdateProcessor) GetUpdatesToSend ¶
func (p *ServiceUpdateProcessor) GetUpdatesToSend(storedUpdates []*StoredUpdate, state *ProcessorState) []*OutgoingUpdate
GetUpdatesToSend gets the resource updates that should be sent out to the agents, along with the agent IPs that the update should be sent to.
func (*ServiceUpdateProcessor) IsNodeScoped ¶
func (p *ServiceUpdateProcessor) IsNodeScoped() bool
IsNodeScoped returns whether this update is scoped to specific nodes, or should be sent to all nodes.
func (*ServiceUpdateProcessor) SetDeleted ¶
func (p *ServiceUpdateProcessor) SetDeleted(obj *storepb.K8SResource)
SetDeleted sets the deletion timestamp for the object, if there is none already set.
func (*ServiceUpdateProcessor) ValidateUpdate ¶
func (p *ServiceUpdateProcessor) ValidateUpdate(obj *storepb.K8SResource, state *ProcessorState) bool
ValidateUpdate checks that the provided service object is valid, and casts it to the correct type.
type Store ¶
type Store interface { // AddResourceUpdateForTopic stores the given resource with its associated updateVersion for 24h. AddResourceUpdateForTopic(updateVersion int64, topic string, resource *storepb.K8SResourceUpdate) error // AddResourceUpdate stores a resource update that is applicable to all topics. AddResourceUpdate(updateVersion int64, resource *storepb.K8SResourceUpdate) error // AddFullResourceUpdate stores full resource update with the given update version. AddFullResourceUpdate(updateversion int64, resource *storepb.K8SResource) error // FetchFullResourceUpdates gets the full resource updates from the `from` update version, to the `to` // update version (exclusive). FetchFullResourceUpdates(from int64, to int64) ([]*storepb.K8SResource, error) // FetchResourceUpdates gets the resource updates from the `from` update version, to the `to` // update version (exclusive). FetchResourceUpdates(topic string, from int64, to int64) ([]*storepb.K8SResourceUpdate, error) // GetUpdateVersion gets the last update version sent on a topic. GetUpdateVersion(topic string) (int64, error) // SetUpdateVersion sets the last update version sent on a topic. SetUpdateVersion(topic string, uv int64) error }
Store handles storing and fetching any data related to K8s resources.
type StoredUpdate ¶
type StoredUpdate struct { // Update is the resource update that should be stored. Update *storepb.K8SResource // UpdateVersion is the update version of the update. UpdateVersion int64 }
StoredUpdate is a K8s resource update that should be persisted with its update version.
type UpdateProcessor ¶
type UpdateProcessor interface { // SetDeleted sets the deletion timestamp for the object, if there is none already set. SetDeleted(*storepb.K8SResource) // ValidateUpdate checks whether the update is valid and should be further processed. ValidateUpdate(*storepb.K8SResource, *ProcessorState) bool // GetStoredProtos gets the protos that should be persisted in the data store, derived from // the given update. GetStoredProtos(*storepb.K8SResource) []*storepb.K8SResource // IsNodeScoped returns whether this update is scoped to specific nodes, or should be sent to all nodes. IsNodeScoped() bool // GetUpdatesToSend gets all of the updates that should be sent to the agents, along with the relevant IPs that // the updates should be sent to. GetUpdatesToSend([]*StoredUpdate, *ProcessorState) []*OutgoingUpdate }
An UpdateProcessor is responsible for processing an incoming update, such as determining what updates should be persisted and sent to NATS.