Documentation ¶
Overview ¶
The kates package is a library for writing kubernetes extensions. The library provides a number of capabilities:
- Consistent bootstrap of multiple resources
- Graceful Load Shedding via Coalesced Events
- Read/write coherence
- Grouping
- Works well with typed (e.g. corev1.Pod) and untyped (e.g. map[string]interface{}) representations of k8s resources.
It does not provide codegen or admissions controllers, for those we should use kubebuilder.
Comparison to other libraries:
- higher level, simpler, and more idiomatic than client-go
- lower level (and more flexible) than operator-sdk or kubebuilder
Constructing a Client ¶
The primary entrypoint for the kates library is the Client type. A Client is constructed by passing in the ClientConfig struct with a path to a kubeconfig file:
client, err := NewClient(ClientConfig{Kubeconfig: "path/to/kubeconfig"}) // or NewClient(ClientConfig{}) for defaults
Creating, Modifying, and Deleting Resources ¶
A client can be used to Create, Update, and/or Delete any kubernetes resource. Each of the "CRUD" methods will, upon success, store an updated copy of the resource into the object referenced by the last argument. This will typically be different than the value you supplied if e.g. the server defaults fields, updates the resource version, assigns UIDs, etc.
var result kates.Pod err = client.Create(ctx, &kates.Pod{...}, &result) err = client.Update(ctx, result, &result) err = client.UpdateStatus(ctx, result, &result) err = client.Delete(ctx, result, &result)
You can pass both typed and untyped values into the APIs. The only requirement is that the values you pass will json.Marshal to and json.Unmarshal from something that looks like a kubernetes resource:
pod := kates.Pod{...} err := client.Create(ctx, &pod, &pod) // -or- pod := map[string]interface{}{"kind": "Pod", ...} err := client.Create(ctx, &pod, &pod)
Watching Resources ¶
The client can be used to watch sets of multiple related resources. This is accomplished via the Accumulator type. An accumulator tracks events coming from the API server for the indicated resources, and merges those events with any locally initiated changes made via the client in order to maintain a snapshot that is coherent.
You can construct an Accumulator by invoking the Client's Watch method:
accumulator = client.Watch(ctx, Query{Name: "Services", Kind: "svc"}, Query{Name: "Deployments", Kind: "deploy"})
The Accumulator will bootstrap a complete list of each supplied Query, and then provide continuing notifications if any of the resources change. Notifications that the initial bootstrap is complete as well as notifications of any subsequent changes are indicated by sending an empty struct down the Accumulator.Changed() channel:
<-accumulator.Changed() // Wait until all Queries have been initialized.
The Accumulator provides access to the values it tracks via the Accumulator.Update(&snapshot) method. The Update() method expects a pointer to a snapshot that is defined by the caller. The caller must supply a pointer to a struct with fields that match the names of the Query structs used to create the Accumulator. The types of the snapshot fields are free to be anything that will json.Unmarshal from a slice of kubernetes resources:
// initialize an empty snapshot snapshot := struct { Services []*kates.Service Deployments []*kates.Deployment }{} accumulator.Update(&snapshot)
The first call to update will populate the snapshot with the bootstrapped values. At this point any startup logic can be performed with confidence that the snapshot represents a complete and recent view of cluster state:
// perform any startup logic ...
The same APIs can then be used to watch for and reconcile subsequent changes:
// reconcile ongoing changes for { select { case <-accumulator.Changed(): wasChanged = accumulator.Update(&snapshot) if wasChanged { reconcile(snapshot) } case <-ctx.Done(): break } }
The Accumulator will provide read/write coherence for any changes made using the client from which the Accumulator was created. This means that any snapshot produced by the Accumulator is guaranteed to include all the Create, Update, UpdateStatus, and/or Delete operations that were performed using the client. (The underlying client-go CRUD and Watch operations do not provide this guarantee, so a straighforward reconcile operation will often end up creating duplicate objects and/or performing updates on stale versions.)
Event Coalescing for Graceful Load Shedding ¶
The Accumulator automatically coalesces events behind the scenes in order to facilitate graceful load shedding for situations where the reconcile operation takes a long time relative to the number of incoming events. This allows the Accumulator.Update(&snapshot) method to provide a guarantee that when it returns the snapshot will contain the most recent view of cluster state regardless of how slowly and infrequently we read from the Accumulator.Changed() channel:
snapshot := Snapshot{} for { <-accumulator.Changed() wasChanged := accumulator.Update(&snapshot) // Guaranteed to return the most recent view of cluster state. if wasChanged { slowReconcile(&snapshot) } }
Index ¶
- Constants
- Variables
- func ByName(objs interface{}, target interface{})
- func HasOwnerReference(owner, other Object) bool
- func InCluster() bool
- func MergeUpdate(target *Unstructured, source *Unstructured)
- func NewRESTMapper(config *ConfigFlags) (meta.RESTMapper, discovery.CachedDiscoveryInterface, error)
- func SetOwnerReferences(owner Object, objects ...Object)
- type APIResource
- type Accumulator
- type Client
- func (c *Client) Create(ctx context.Context, resource interface{}, target interface{}) error
- func (c *Client) Delete(ctx context.Context, resource interface{}, target interface{}) error
- func (c *Client) DynamicInterface() dynamic.Interface
- func (c *Client) Get(ctx context.Context, resource interface{}, target interface{}) error
- func (c *Client) InvalidateCache()
- func (c *Client) List(ctx context.Context, query Query, target interface{}) error
- func (c *Client) Patch(ctx context.Context, resource interface{}, pt PatchType, data []byte, ...) error
- func (c *Client) PodLogs(ctx context.Context, pod *Pod, options *PodLogOptions, events chan<- LogEvent) error
- func (c *Client) ServerPreferredResources() ([]APIResource, error)
- func (c *Client) ServerResources() ([]APIResource, error)
- func (c *Client) ServerVersion() (*VersionInfo, error)
- func (c *Client) Update(ctx context.Context, resource interface{}, target interface{}) error
- func (c *Client) UpdateStatus(ctx context.Context, resource interface{}, target interface{}) error
- func (c *Client) Upsert(ctx context.Context, resource interface{}, source interface{}, ...) error
- func (c *Client) WaitFor(ctx context.Context, kindOrResource string)
- func (c *Client) Watch(ctx context.Context, queries ...Query) *Accumulator
- type ClientConfig
- type ClusterRole
- type ClusterRoleBinding
- type ConfigFlags
- type ConfigMap
- type Container
- type ContainerMetrics
- type CreateOptions
- type CustomResourceDefinition
- type DeleteOptions
- type Delta
- type DeltaType
- type Deployment
- type EndpointAddress
- type EndpointPort
- type EndpointSubset
- type Endpoints
- type EnvVar
- type Event
- type GetOptions
- type Ingress
- type IngressClass
- type IntOrString
- type LabelSelector
- type LabelSet
- type ListOptions
- type LocalObjectReference
- type LogEvent
- type Namespace
- type Node
- type NodeMetrics
- type Object
- type ObjectMeta
- type ObjectReference
- type PatchOptions
- type PatchType
- type PersistentVolumeClaim
- type PersistentVolumeClaimVolumeSource
- type Pod
- type PodCondition
- type PodLogOptions
- type PodMetrics
- type PodSpec
- type PodTemplateSpec
- type Protocol
- type Quantity
- type Query
- type ReplicaSet
- type ResourceList
- type ResourceRequirements
- type Role
- type RoleBinding
- type Secret
- type SecurityContext
- type Selector
- type Service
- type ServiceAccount
- type ServicePort
- type ServiceSpec
- type StatefulSet
- type Time
- type Toleration
- type TolerationOperator
- type TypeMeta
- type Unstructured
- type UpdateOptions
- type Validator
- type VersionInfo
- type Volume
- type VolumeMount
- type VolumeSource
Constants ¶
const NodeUnreachablePodReason = "NodeLost" // k8s.io/kubernetes/pkg/util/node.NodeUnreachablePodReason
const ResourceCPU = corev1.ResourceCPU
const ResourceMemory = corev1.ResourceMemory
const SecretTypeServiceAccountToken = corev1.SecretTypeServiceAccountToken
const SecretTypeTLS = corev1.SecretTypeTLS
Variables ¶
var ( JSONPatchType = types.JSONPatchType MergePatchType = types.MergePatchType StrategicMergePatchType = types.StrategicMergePatchType ApplyPatchType = types.ApplyPatchType )
var ConditionTrue = xv1.ConditionTrue
var CoreConditionTrue = corev1.ConditionTrue
var Established = xv1.Established
var Int = intstr.Int
var IsConflict = apierrors.IsConflict
var IsNotFound = apierrors.IsNotFound
var MustParseQuantity = resource.MustParse
var NamesAccepted = xv1.NamesAccepted
var NamespaceAll = metav1.NamespaceAll
var NamespaceNone = metav1.NamespaceNone
var NewConfigFlags = genericclioptions.NewConfigFlags
var ParseSelector = labels.Parse
var PodFailed = corev1.PodFailed
var PodReady = corev1.PodReady
var PodSucceeded = corev1.PodSucceeded
var ProtocolSCTP = corev1.ProtocolSCTP
var ProtocolTCP = corev1.ProtocolTCP
var ProtocolUDP = corev1.ProtocolUDP
var ServiceTypeClusterIP = corev1.ServiceTypeClusterIP
var ServiceTypeLoadBalancer = corev1.ServiceTypeLoadBalancer
var TolerationOpEqual = corev1.TolerationOpEqual
var TolerationOpExists = corev1.TolerationOpExists
Functions ¶
func HasOwnerReference ¶
func InCluster ¶
func InCluster() bool
The InCluster function returns true if the process is running inside a kubernetes cluster, and false if it is running outside the cluster. This is determined by heuristics, however it uses the exact same heuristics as client-go does. This is copied from (client-go/tools/clientcmd/client_config.go), as it is not publically invocable in its original place. This should be re-copied if the original code changes.
func MergeUpdate ¶
func MergeUpdate(target *Unstructured, source *Unstructured)
func NewRESTMapper ¶
func NewRESTMapper(config *ConfigFlags) (meta.RESTMapper, discovery.CachedDiscoveryInterface, error)
func SetOwnerReferences ¶
Types ¶
type APIResource ¶
type APIResource = metav1.APIResource
type Accumulator ¶
type Accumulator struct {
// contains filtered or unexported fields
}
The Accumulator struct is used to efficiently maintain an in-memory copy of kubernetes resources present in a cluster in a form that is easy for business logic to process. It functions as a bridge between delta-based kubernetes watches on individual Kinds and the complete/consistent set of objects on which business logic needs to operate. In that sense it accumulates both multiple kinds of kubernetes resources into a single snapshot, as well as accumulating deltas on individual objects into relevant sets of objects.
The Goals/Requirements below are based heavily on the needs of Ambassador as they have evolved over the years. A lot of this comes down to the fact that unlike the exemplary deployment/replicaset controller examples which typically operate on a single resource and render it into another (deployment -> N replicasets, replicaset -> N pods), Ambassador's controller logic has some additional requirements:
Complete knowledge of resources in a cluster. Because many thousands of Mappings are ultimately assembled into a single envoy configuration responsible for ingress into the cluster, the consequences of producing an envoy configuration when you e.g. know about only half of those Mappings is catastrophic (you are black-holing half your traffic).
Complete knowledge of multiple resources. Instead of having one self contained input like a deployment or a replicaset, Ambassador's business logic has many inputs, and the consequence of producing an envoy without knowledge of *all* of those inputs is equally catastrophic, e.g. it's no use knowing about all the Mappings if you don't know about any of the Hosts yet.
Goals/Requirements:
Bootstrap of a single Kind: the Accumulator will ensure that all pre-existing resources of that Kind have been loaded into memory prior to triggering any notifications. This guarantees we will never trigger business logic on an egregiously incomplete view of the cluster (e.g. when 500 out of 1000 Mappings have been loaded) and makes it safe for the business logic to assume complete knowledge of the cluster.
When multiple Kinds are needed by a controller, the Accumulator will not notify the controller until all the Kinds have been fully bootstrapped.
Graceful load shedding: When the rate of change of resources is very fast, the API and implementation are structured so that individual object deltas get coalesced into a single snapshot update. This prevents excessively triggering business logic to process an entire snapshot for each individual object change that occurs.
func (*Accumulator) Changed ¶
func (a *Accumulator) Changed() chan struct{}
func (*Accumulator) FilteredUpdate ¶
func (a *Accumulator) FilteredUpdate(target interface{}, deltas *[]*Delta, predicate func(*Unstructured) bool) bool
The FilteredUpdate method updates the target snapshot with only those resources for which "predicate" returns true. The predicate is only called when objects are added/updated, it is not repeatedly called on objects that have not changed. The predicate must not modify its argument.
func (*Accumulator) Update ¶
func (a *Accumulator) Update(target interface{}) bool
func (*Accumulator) UpdateWithDeltas ¶
func (a *Accumulator) UpdateWithDeltas(target interface{}, deltas *[]*Delta) bool
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
The Client struct provides an interface to interact with the kubernetes api-server. You can think of it like a programatic version of the familiar kubectl command line tool. In fact a goal of these APIs is that where possible, your knowledge of kubectl should translate well into using these APIs. It provides a golang-friendly way to perform basic CRUD and Watch operations on kubernetes resources, as well as providing some additional capabilities.
Differences from kubectl:
- You can also use a Client to update the status of a resource.
- The Client struct cannot perform an apply operation.
- The Client provides Read/write coherence (more about this below).
- The Client provides load shedding via event coalescing for watches.
- The Client provides bootstrapping of multiple watches.
The biggest difference from kubectl (and also from using client-go directly) is the Read/Write coherence it provides. Kubernetes Watches are inherently asynchronous. This means that if a kubernetes resource is modified at time T0, a client won't find out about it until some later time T1. It is normally difficult to notice this since the delay may be quite small, however if you are writing a controller that uses watches in combination with modifying the resources it is watching, the delay is big enough that a program will often be "notified" with versions of resources that do not included updates made by the program itself. This even happens when a program has a lock and is guaranteed to be the only process modifying a given resource. Needless to say, programming against an API like this can make for some brain twisting logic. The Client struct allows for much simpler code by tracking what changes have been made locally and updating all Watch results with the most recent version of an object, thereby providing the guarantee that your Watch results will *always* include any changes you have made via the Client performing the watch.
Additionally, the Accumulator API provides two key pieces of watch related functionality:
By coalescing multiple updates behind the scenes, the Accumulator API provides a natural form of load shedding if a user of the API cannot keep up with every single update.
The Accumulator API is guaranteed to bootstrap (i.e. perform an initial List operation) on all watches prior to notifying the user that resources are available to process.
func NewClient ¶
func NewClient(options ClientConfig) (*Client, error)
The NewClient function constructs a new client with the supplied ClientConfig.
func NewClientFromConfigFlags ¶
func NewClientFromConfigFlags(config *ConfigFlags) (*Client, error)
func (*Client) DynamicInterface ¶
DynamicInterface is an accessor method to the k8s dynamic client
func (*Client) InvalidateCache ¶
func (c *Client) InvalidateCache()
func (*Client) PodLogs ¶
func (c *Client) PodLogs(ctx context.Context, pod *Pod, options *PodLogOptions, events chan<- LogEvent) error
The PodLogs method accesses the log output of a given pod by sending LogEvent structs down the supplied channel. The LogEvent struct is designed to hold enough information that it is feasible to invoke PodLogs multiple times with a single channel in order to multiplex log output from many pods, e.g.:
events := make(chan LogEvent) client.PodLogs(ctx, pod1, options, events) client.PodLogs(ctx, pod2, options, events) client.PodLogs(ctx, pod3, options, events) for event := range events { fmt.Printf("%s: %s: %s", event.PodId, event.Timestamp, event.Output) }
The above code will print log output from all 3 pods.
func (*Client) ServerPreferredResources ¶
func (c *Client) ServerPreferredResources() ([]APIResource, error)
ServerPreferredResources returns the list of resource types that the server supports.
If a resource type supports multiple versions, then *only* the preferred version is returned. Use ServerResources to return a list that includes all versions.
func (*Client) ServerResources ¶
func (c *Client) ServerResources() ([]APIResource, error)
ServerResources returns the list of resource types that the server supports.
If a resource type supports multiple versions, then a list entry for *each* version is returned. Use ServerPreferredResources to return a list that includes just the preferred version.
func (*Client) ServerVersion ¶
func (c *Client) ServerVersion() (*VersionInfo, error)
The ServerVersion() method returns a struct with information about the kubernetes api-server version.
func (*Client) UpdateStatus ¶
type ClientConfig ¶
The ClientConfig struct holds all the parameters and configuration that can be passed upon construct of a new Client.
type ClusterRole ¶
type ClusterRole = rbacv1.ClusterRole
type ClusterRoleBinding ¶
type ClusterRoleBinding = rbacv1.ClusterRoleBinding
type ConfigFlags ¶
type ConfigFlags = genericclioptions.ConfigFlags
type ContainerMetrics ¶
type ContainerMetrics = metrics.ContainerMetrics
type CreateOptions ¶
type CreateOptions = metav1.CreateOptions
type CustomResourceDefinition ¶
type CustomResourceDefinition = xv1.CustomResourceDefinition
type DeleteOptions ¶
type DeleteOptions = metav1.DeleteOptions
type Delta ¶
type Delta struct { TypeMeta `json:""` ObjectMeta `json:"metadata,omitempty"` DeltaType DeltaType `json:"deltaType"` }
func NewDelta ¶
func NewDelta(deltaType DeltaType, obj *Unstructured) *Delta
func NewDeltaFromObject ¶
type DeltaType ¶
type DeltaType int
func (DeltaType) MarshalJSON ¶
func (*DeltaType) UnmarshalJSON ¶
type Deployment ¶
type Deployment = appsv1.Deployment
type EndpointAddress ¶
type EndpointAddress = corev1.EndpointAddress
type EndpointPort ¶
type EndpointPort = corev1.EndpointPort
type EndpointSubset ¶
type EndpointSubset = corev1.EndpointSubset
type GetOptions ¶
type GetOptions = metav1.GetOptions
type Ingress ¶
type Ingress = netv1beta1.Ingress
type IngressClass ¶
type IngressClass = netv1beta1.IngressClass
type IntOrString ¶
type IntOrString = intstr.IntOrString
type LabelSelector ¶
type LabelSelector = metav1.LabelSelector
type ListOptions ¶
type ListOptions = metav1.ListOptions
type LocalObjectReference ¶
type LocalObjectReference = corev1.LocalObjectReference
type LogEvent ¶
type LogEvent struct { // The PodID field indicates what pod the log output is associated with. PodID string `json:"podID"` // The Timestamp field indicates when the log output was created. Timestamp string `json:"timestamp"` // The Output field contains log output from the pod. Output string `json:"output,omitempty"` // The Closed field is true if the supply of log events from the given pod was terminated. This does not // necessarily mean there is no more log data. Closed bool // The Error field contains error information if the log events were terminated due to an error. Error error `json:"error,omitempty"` }
The LogEvent struct is used to communicate log output from a pod. It includes PodID and Timestamp information so that LogEvents from different pods can be interleaved without losing information about total ordering and/or pod identity.
type NodeMetrics ¶
type NodeMetrics = metrics.NodeMetrics
type Object ¶
func NewObjectFromUnstructured ¶
func NewObjectFromUnstructured(unstructured *Unstructured) (Object, error)
NewObjectFromUnstructured will construct a new specialized object based on the runtime schema ambassador uses. This gaurantees any types defined by or used by ambassador will be constructed as the proper golang type.
func ParseManifests ¶
type ObjectMeta ¶
type ObjectMeta = metav1.ObjectMeta
type ObjectReference ¶
type ObjectReference = corev1.ObjectReference
type PatchOptions ¶
type PatchOptions = metav1.PatchOptions
type PersistentVolumeClaim ¶
type PersistentVolumeClaim = corev1.PersistentVolumeClaim
type PersistentVolumeClaimVolumeSource ¶
type PersistentVolumeClaimVolumeSource = corev1.PersistentVolumeClaimVolumeSource
type PodCondition ¶
type PodCondition = corev1.PodCondition
type PodLogOptions ¶
type PodLogOptions = corev1.PodLogOptions
type PodMetrics ¶
type PodMetrics = metrics.PodMetrics
type PodTemplateSpec ¶
type PodTemplateSpec = corev1.PodTemplateSpec
type Query ¶
type Query struct { // The Name field holds the name of the Query. This is used by // Watch to determine how multiple queries are unmarshaled by // Accumulator.Update(). This is ignored for List. Name string // The Kind field indicates what sort of resource is being queried. Kind string // The Namespace field holds the namespace to Query. Namespace string // The FieldSelector field holds a string in selector syntax // that is used to filter results based on field values. The // only field values supported are metadata.name and // metadata.namespace. This is only supported for List. FieldSelector string // The LabelSelector field holds a string in selector syntax // that is used to filter results based on label values. LabelSelector string }
A Query holds all the information needed to List or Watch a set of kubernetes resources.
type ReplicaSet ¶
type ReplicaSet = appsv1.ReplicaSet
type ResourceList ¶
type ResourceList = corev1.ResourceList
type ResourceRequirements ¶
type ResourceRequirements = corev1.ResourceRequirements
type RoleBinding ¶
type RoleBinding = rbacv1.RoleBinding
type SecurityContext ¶
type SecurityContext = corev1.SecurityContext
type ServiceAccount ¶
type ServiceAccount = corev1.ServiceAccount
type ServicePort ¶
type ServicePort = corev1.ServicePort
type ServiceSpec ¶
type ServiceSpec = corev1.ServiceSpec
type StatefulSet ¶
type StatefulSet = appsv1.StatefulSet
type Toleration ¶
type Toleration = corev1.Toleration
type TolerationOperator ¶
type TolerationOperator = corev1.TolerationOperator
type Unstructured ¶
type Unstructured = unstructured.Unstructured
func NewUnstructured ¶
func NewUnstructured(kind, version string) *Unstructured
func NewUnstructuredFromObject ¶
func NewUnstructuredFromObject(obj Object) (result *Unstructured, err error)
Convert a potentially typed Object to an *Unstructured object.
type UpdateOptions ¶
type UpdateOptions = metav1.UpdateOptions
type Validator ¶
type Validator struct {
// contains filtered or unexported fields
}
A Validator may be used in concert with a Client to perform validate of freeform jsonish data structures as kubernetes CRDs.
func NewValidator ¶
The NewValidator constructor returns a *Validator that uses the provided *Client to fetch CustomResourceDefinitions from kubernetes on demand as needed to validate data passed to the Validator.Validate() method.
func (*Validator) Validate ¶
The Validate method validates the supplied jsonish object as a kubernetes CRD instance.
If the supplied object is *not* a CRD instance but instead a regular kubernetes instance, the Validate method will assume that the supplied object is valid.
If the supplied object is not a valid kubernetes resource at all, the Validate method will return an error.
Typically the Validate method will perform only local operations, however the first time an instance of a given Kind is supplied, the Validator needs to query the cluster to figure out if it is a CRD and if so to fetch the schema needed to perform validation. All subsequent Validate() calls for that Kind will be local.
type VersionInfo ¶
type VolumeMount ¶
type VolumeMount = corev1.VolumeMount
type VolumeSource ¶
type VolumeSource = corev1.VolumeSource