kates

package
v2.0.2-ea Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 23, 2021 License: Apache-2.0 Imports: 55 Imported by: 1

Documentation

Overview

The kates package is a library for writing kubernetes extensions. The library provides a number of capabilities:

  • Consistent bootstrap of multiple resources
  • Graceful Load Shedding via Coalesced Events
  • Read/write coherence
  • Grouping
  • Works well with typed (e.g. corev1.Pod) and untyped (e.g. map[string]interface{}) representations of k8s resources.

It does not provide codegen or admissions controllers, for those we should use kubebuilder.

Comparison to other libraries:

  • higher level, simpler, and more idiomatic than client-go
  • lower level (and more flexible) than operator-sdk or kubebuilder

Constructing a Client

The primary entrypoint for the kates library is the Client type. A Client is constructed by passing in the ClientConfig struct with a path to a kubeconfig file:

client, err := NewClient(ClientConfig{Kubeconfig: "path/to/kubeconfig"}) // or NewClient(ClientConfig{}) for defaults

Creating, Modifying, and Deleting Resources

A client can be used to Create, Update, and/or Delete any kubernetes resource. Each of the "CRUD" methods will, upon success, store an updated copy of the resource into the object referenced by the last argument. This will typically be different than the value you supplied if e.g. the server defaults fields, updates the resource version, assigns UIDs, etc.

var result kates.Pod
err = client.Create(ctx, &kates.Pod{...}, &result)
err = client.Update(ctx, result, &result)
err = client.UpdateStatus(ctx, result, &result)
err = client.Delete(ctx, result, &result)

You can pass both typed and untyped values into the APIs. The only requirement is that the values you pass will json.Marshal to and json.Unmarshal from something that looks like a kubernetes resource:

pod := kates.Pod{...}
err := client.Create(ctx, &pod, &pod)
// -or-
pod := map[string]interface{}{"kind": "Pod", ...}
err := client.Create(ctx, &pod, &pod)

Watching Resources

The client can be used to watch sets of multiple related resources. This is accomplished via the Accumulator type. An accumulator tracks events coming from the API server for the indicated resources, and merges those events with any locally initiated changes made via the client in order to maintain a snapshot that is coherent.

You can construct an Accumulator by invoking the Client's Watch method:

accumulator = client.Watch(ctx,
                           Query{Name: "Services", Kind: "svc"},
                           Query{Name: "Deployments", Kind: "deploy"})

The Accumulator will bootstrap a complete list of each supplied Query, and then provide continuing notifications if any of the resources change. Notifications that the initial bootstrap is complete as well as notifications of any subsequent changes are indicated by sending an empty struct down the Accumulator.Changed() channel:

<-accumulator.Changed() // Wait until all Queries have been initialized.

The Accumulator provides access to the values it tracks via the Accumulator.Update(&snapshot) method. The Update() method expects a pointer to a snapshot that is defined by the caller. The caller must supply a pointer to a struct with fields that match the names of the Query structs used to create the Accumulator. The types of the snapshot fields are free to be anything that will json.Unmarshal from a slice of kubernetes resources:

// initialize an empty snapshot
snapshot := struct {
    Services    []*kates.Service
    Deployments []*kates.Deployment
}{}

accumulator.Update(&snapshot)

The first call to update will populate the snapshot with the bootstrapped values. At this point any startup logic can be performed with confidence that the snapshot represents a complete and recent view of cluster state:

// perform any startup logic
...

The same APIs can then be used to watch for and reconcile subsequent changes:

// reconcile ongoing changes
for {
    select {
        case <-accumulator.Changed():
            wasChanged = accumulator.Update(&snapshot)
            if wasChanged {
                reconcile(snapshot)
            }
        case <-ctx.Done():
            break
    }
}

The Accumulator will provide read/write coherence for any changes made using the client from which the Accumulator was created. This means that any snapshot produced by the Accumulator is guaranteed to include all the Create, Update, UpdateStatus, and/or Delete operations that were performed using the client. (The underlying client-go CRUD and Watch operations do not provide this guarantee, so a straighforward reconcile operation will often end up creating duplicate objects and/or performing updates on stale versions.)

Event Coalescing for Graceful Load Shedding

The Accumulator automatically coalesces events behind the scenes in order to facilitate graceful load shedding for situations where the reconcile operation takes a long time relative to the number of incoming events. This allows the Accumulator.Update(&snapshot) method to provide a guarantee that when it returns the snapshot will contain the most recent view of cluster state regardless of how slowly and infrequently we read from the Accumulator.Changed() channel:

snapshot := Snapshot{}
for {
    <-accumulator.Changed()
    wasChanged := accumulator.Update(&snapshot) // Guaranteed to return the most recent view of cluster state.
    if wasChanged {
        slowReconcile(&snapshot)
    }
}

Index

Constants

View Source
const NodeUnreachablePodReason = "NodeLost" // k8s.io/kubernetes/pkg/util/node.NodeUnreachablePodReason
View Source
const ResourceCPU = corev1.ResourceCPU
View Source
const ResourceMemory = corev1.ResourceMemory
View Source
const SecretTypeServiceAccountToken = corev1.SecretTypeServiceAccountToken
View Source
const SecretTypeTLS = corev1.SecretTypeTLS

Variables

View Source
var (
	JSONPatchType           = types.JSONPatchType
	MergePatchType          = types.MergePatchType
	StrategicMergePatchType = types.StrategicMergePatchType
	ApplyPatchType          = types.ApplyPatchType
)
View Source
var ConditionTrue = xv1.ConditionTrue
View Source
var CoreConditionTrue = corev1.ConditionTrue
View Source
var Established = xv1.Established
View Source
var Int = intstr.Int
View Source
var IsConflict = apierrors.IsConflict
View Source
var IsNotFound = apierrors.IsNotFound
View Source
var MustParseQuantity = resource.MustParse
View Source
var NamesAccepted = xv1.NamesAccepted
View Source
var NamespaceAll = metav1.NamespaceAll
View Source
var NamespaceNone = metav1.NamespaceNone
View Source
var ParseSelector = labels.Parse
View Source
var PodFailed = corev1.PodFailed
View Source
var PodReady = corev1.PodReady
View Source
var PodSucceeded = corev1.PodSucceeded
View Source
var ProtocolSCTP = corev1.ProtocolSCTP
View Source
var ProtocolTCP = corev1.ProtocolTCP
View Source
var ProtocolUDP = corev1.ProtocolUDP
View Source
var ServiceTypeClusterIP = corev1.ServiceTypeClusterIP
View Source
var ServiceTypeLoadBalancer = corev1.ServiceTypeLoadBalancer
View Source
var TolerationOpEqual = corev1.TolerationOpEqual
View Source
var TolerationOpExists = corev1.TolerationOpExists

Functions

func ByName

func ByName(objs interface{}, target interface{})

func HasOwnerReference

func HasOwnerReference(owner, other Object) bool

func InCluster

func InCluster() bool

The InCluster function returns true if the process is running inside a kubernetes cluster, and false if it is running outside the cluster. This is determined by heuristics, however it uses the exact same heuristics as client-go does. This is copied from (client-go/tools/clientcmd/client_config.go), as it is not publically invocable in its original place. This should be re-copied if the original code changes.

func MergeUpdate

func MergeUpdate(target *Unstructured, source *Unstructured)

func SetOwnerReferences

func SetOwnerReferences(owner Object, objects ...Object)

Types

type APIResource

type APIResource = metav1.APIResource

type Accumulator

type Accumulator struct {
	// contains filtered or unexported fields
}

The Accumulator struct is used to efficiently maintain an in-memory copy of kubernetes resources present in a cluster in a form that is easy for business logic to process. It functions as a bridge between delta-based kubernetes watches on individual Kinds and the complete/consistent set of objects on which business logic needs to operate. In that sense it accumulates both multiple kinds of kubernetes resources into a single snapshot, as well as accumulating deltas on individual objects into relevant sets of objects.

The Goals/Requirements below are based heavily on the needs of Ambassador as they have evolved over the years. A lot of this comes down to the fact that unlike the exemplary deployment/replicaset controller examples which typically operate on a single resource and render it into another (deployment -> N replicasets, replicaset -> N pods), Ambassador's controller logic has some additional requirements:

  1. Complete knowledge of resources in a cluster. Because many thousands of Mappings are ultimately assembled into a single envoy configuration responsible for ingress into the cluster, the consequences of producing an envoy configuration when you e.g. know about only half of those Mappings is catastrophic (you are black-holing half your traffic).

  2. Complete knowledge of multiple resources. Instead of having one self contained input like a deployment or a replicaset, Ambassador's business logic has many inputs, and the consequence of producing an envoy without knowledge of *all* of those inputs is equally catastrophic, e.g. it's no use knowing about all the Mappings if you don't know about any of the Hosts yet.

Goals/Requirements:

  1. Bootstrap of a single Kind: the Accumulator will ensure that all pre-existing resources of that Kind have been loaded into memory prior to triggering any notifications. This guarantees we will never trigger business logic on an egregiously incomplete view of the cluster (e.g. when 500 out of 1000 Mappings have been loaded) and makes it safe for the business logic to assume complete knowledge of the cluster.

  2. When multiple Kinds are needed by a controller, the Accumulator will not notify the controller until all the Kinds have been fully bootstrapped.

  3. Graceful load shedding: When the rate of change of resources is very fast, the API and implementation are structured so that individual object deltas get coalesced into a single snapshot update. This prevents excessively triggering business logic to process an entire snapshot for each individual object change that occurs.

func (*Accumulator) Changed

func (a *Accumulator) Changed() chan struct{}

func (*Accumulator) FilteredUpdate

func (a *Accumulator) FilteredUpdate(target interface{}, deltas *[]*Delta, predicate func(*Unstructured) bool) bool

The FilteredUpdate method updates the target snapshot with only those resources for which "predicate" returns true. The predicate is only called when objects are added/updated, it is not repeatedly called on objects that have not changed. The predicate must not modify its argument.

func (*Accumulator) Update

func (a *Accumulator) Update(target interface{}) bool

func (*Accumulator) UpdateWithDeltas

func (a *Accumulator) UpdateWithDeltas(target interface{}, deltas *[]*Delta) bool

type Client

type Client struct {
	// contains filtered or unexported fields
}

The Client struct provides an interface to interact with the kubernetes api-server. You can think of it like a programatic version of the familiar kubectl command line tool. In fact a goal of these APIs is that where possible, your knowledge of kubectl should translate well into using these APIs. It provides a golang-friendly way to perform basic CRUD and Watch operations on kubernetes resources, as well as providing some additional capabilities.

Differences from kubectl:

  • You can also use a Client to update the status of a resource.
  • The Client struct cannot perform an apply operation.
  • The Client provides Read/write coherence (more about this below).
  • The Client provides load shedding via event coalescing for watches.
  • The Client provides bootstrapping of multiple watches.

The biggest difference from kubectl (and also from using client-go directly) is the Read/Write coherence it provides. Kubernetes Watches are inherently asynchronous. This means that if a kubernetes resource is modified at time T0, a client won't find out about it until some later time T1. It is normally difficult to notice this since the delay may be quite small, however if you are writing a controller that uses watches in combination with modifying the resources it is watching, the delay is big enough that a program will often be "notified" with versions of resources that do not included updates made by the program itself. This even happens when a program has a lock and is guaranteed to be the only process modifying a given resource. Needless to say, programming against an API like this can make for some brain twisting logic. The Client struct allows for much simpler code by tracking what changes have been made locally and updating all Watch results with the most recent version of an object, thereby providing the guarantee that your Watch results will *always* include any changes you have made via the Client performing the watch.

Additionally, the Accumulator API provides two key pieces of watch related functionality:

  1. By coalescing multiple updates behind the scenes, the Accumulator API provides a natural form of load shedding if a user of the API cannot keep up with every single update.

  2. The Accumulator API is guaranteed to bootstrap (i.e. perform an initial List operation) on all watches prior to notifying the user that resources are available to process.

func NewClient

func NewClient(options ClientConfig) (*Client, error)

The NewClient function constructs a new client with the supplied ClientConfig.

func NewClientFromConfigFlags

func NewClientFromConfigFlags(config *ConfigFlags) (*Client, error)

func NewClientFromFlagSet

func NewClientFromFlagSet(flags *pflag.FlagSet) (*Client, error)

func (*Client) Create

func (c *Client) Create(ctx context.Context, resource interface{}, target interface{}) error

func (*Client) Delete

func (c *Client) Delete(ctx context.Context, resource interface{}, target interface{}) error

func (*Client) DynamicInterface

func (c *Client) DynamicInterface() dynamic.Interface

DynamicInterface is an accessor method to the k8s dynamic client

func (*Client) Get

func (c *Client) Get(ctx context.Context, resource interface{}, target interface{}) error

func (*Client) InvalidateCache

func (c *Client) InvalidateCache()

func (*Client) List

func (c *Client) List(ctx context.Context, query Query, target interface{}) error

func (*Client) Patch

func (c *Client) Patch(ctx context.Context, resource interface{}, pt PatchType, data []byte, target interface{}) error

func (*Client) PodLogs

func (c *Client) PodLogs(ctx context.Context, pod *Pod, options *PodLogOptions, events chan<- LogEvent) error

The PodLogs method accesses the log output of a given pod by sending LogEvent structs down the supplied channel. The LogEvent struct is designed to hold enough information that it is feasible to invoke PodLogs multiple times with a single channel in order to multiplex log output from many pods, e.g.:

events := make(chan LogEvent)
client.PodLogs(ctx, pod1, options, events)
client.PodLogs(ctx, pod2, options, events)
client.PodLogs(ctx, pod3, options, events)

for event := range events {
    fmt.Printf("%s: %s: %s", event.PodId, event.Timestamp, event.Output)
}

The above code will print log output from all 3 pods.

func (*Client) ServerPreferredResources

func (c *Client) ServerPreferredResources() ([]APIResource, error)

ServerPreferredResources returns the list of resource types that the server supports.

If a resource type supports multiple versions, then *only* the preferred version is returned. Use ServerResources to return a list that includes all versions.

func (*Client) ServerResources

func (c *Client) ServerResources() ([]APIResource, error)

ServerResources returns the list of resource types that the server supports.

If a resource type supports multiple versions, then a list entry for *each* version is returned. Use ServerPreferredResources to return a list that includes just the preferred version.

func (*Client) ServerVersion

func (c *Client) ServerVersion() (*VersionInfo, error)

The ServerVersion() method returns a struct with information about the kubernetes api-server version.

func (*Client) Update

func (c *Client) Update(ctx context.Context, resource interface{}, target interface{}) error

func (*Client) UpdateStatus

func (c *Client) UpdateStatus(ctx context.Context, resource interface{}, target interface{}) error

func (*Client) Upsert

func (c *Client) Upsert(ctx context.Context, resource interface{}, source interface{}, target interface{}) error

func (*Client) WaitFor

func (c *Client) WaitFor(ctx context.Context, kindOrResource string)

func (*Client) Watch

func (c *Client) Watch(ctx context.Context, queries ...Query) *Accumulator

type ClientConfig

type ClientConfig struct {
	Kubeconfig string
	Context    string
	Namespace  string
}

The ClientConfig struct holds all the parameters and configuration that can be passed upon construct of a new Client.

type ClusterRole

type ClusterRole = rbacv1.ClusterRole

type ClusterRoleBinding

type ClusterRoleBinding = rbacv1.ClusterRoleBinding

type ConfigFlags

type ConfigFlags = genericclioptions.ConfigFlags

type ConfigMap

type ConfigMap = corev1.ConfigMap

type Container

type Container = corev1.Container

type ContainerMetrics

type ContainerMetrics = metrics.ContainerMetrics

type CreateOptions

type CreateOptions = metav1.CreateOptions

type CustomResourceDefinition

type CustomResourceDefinition = xv1.CustomResourceDefinition

type DeleteOptions

type DeleteOptions = metav1.DeleteOptions

type Delta

type Delta struct {
	TypeMeta   `json:""`
	ObjectMeta `json:"metadata,omitempty"`
	DeltaType  DeltaType `json:"deltaType"`
}

func NewDelta

func NewDelta(deltaType DeltaType, obj *Unstructured) *Delta

func NewDeltaFromObject

func NewDeltaFromObject(deltaType DeltaType, obj Object) *Delta

type DeltaType

type DeltaType int
const (
	ObjectAdd DeltaType = iota
	ObjectUpdate
	ObjectDelete
)

func (DeltaType) MarshalJSON

func (dt DeltaType) MarshalJSON() ([]byte, error)

func (*DeltaType) UnmarshalJSON

func (dt *DeltaType) UnmarshalJSON(b []byte) error

type Deployment

type Deployment = appsv1.Deployment

type EndpointAddress

type EndpointAddress = corev1.EndpointAddress

type EndpointPort

type EndpointPort = corev1.EndpointPort

type EndpointSubset

type EndpointSubset = corev1.EndpointSubset

type Endpoints

type Endpoints = corev1.Endpoints

type EnvVar

type EnvVar = corev1.EnvVar

type Event

type Event = corev1.Event

type GetOptions

type GetOptions = metav1.GetOptions

type Ingress

type Ingress = netv1beta1.Ingress

type IngressClass

type IngressClass = netv1beta1.IngressClass

type IntOrString

type IntOrString = intstr.IntOrString

type LabelSelector

type LabelSelector = metav1.LabelSelector

type LabelSet

type LabelSet = labels.Set

type ListOptions

type ListOptions = metav1.ListOptions

type LocalObjectReference

type LocalObjectReference = corev1.LocalObjectReference

type LogEvent

type LogEvent struct {
	// The PodID field indicates what pod the log output is associated with.
	PodID string `json:"podID"`
	// The Timestamp field indicates when the log output was created.
	Timestamp string `json:"timestamp"`

	// The Output field contains log output from the pod.
	Output string `json:"output,omitempty"`

	// The Closed field is true if the supply of log events from the given pod was terminated. This does not
	// necessarily mean there is no more log data.
	Closed bool
	// The Error field contains error information if the log events were terminated due to an error.
	Error error `json:"error,omitempty"`
}

The LogEvent struct is used to communicate log output from a pod. It includes PodID and Timestamp information so that LogEvents from different pods can be interleaved without losing information about total ordering and/or pod identity.

type Namespace

type Namespace = corev1.Namespace

type Node

type Node = corev1.Node

type NodeMetrics

type NodeMetrics = metrics.NodeMetrics

type Object

type Object interface {
	runtime.Object
	metav1.Object
}

func NewObject

func NewObject(kind, version string) (Object, error)

func NewObjectFromUnstructured

func NewObjectFromUnstructured(unstructured *Unstructured) (Object, error)

NewObjectFromUnstructured will construct a new specialized object based on the runtime schema ambassador uses. This gaurantees any types defined by or used by ambassador will be constructed as the proper golang type.

func ParseManifests

func ParseManifests(text string) ([]Object, error)

func ParseManifestsToUnstructured

func ParseManifestsToUnstructured(text string) ([]Object, error)

type ObjectMeta

type ObjectMeta = metav1.ObjectMeta

type ObjectReference

type ObjectReference = corev1.ObjectReference

type PatchOptions

type PatchOptions = metav1.PatchOptions

type PatchType

type PatchType = types.PatchType

type PersistentVolumeClaim

type PersistentVolumeClaim = corev1.PersistentVolumeClaim

type PersistentVolumeClaimVolumeSource

type PersistentVolumeClaimVolumeSource = corev1.PersistentVolumeClaimVolumeSource

type Pod

type Pod = corev1.Pod

type PodCondition

type PodCondition = corev1.PodCondition

type PodLogOptions

type PodLogOptions = corev1.PodLogOptions

type PodMetrics

type PodMetrics = metrics.PodMetrics

type PodSpec

type PodSpec = corev1.PodSpec

type PodTemplateSpec

type PodTemplateSpec = corev1.PodTemplateSpec

type Protocol

type Protocol = corev1.Protocol

type Quantity

type Quantity = resource.Quantity

type Query

type Query struct {
	// The Name field holds the name of the Query. This is used by
	// Watch to determine how multiple queries are unmarshaled by
	// Accumulator.Update(). This is ignored for List.
	Name string
	// The Kind field indicates what sort of resource is being queried.
	Kind string
	// The Namespace field holds the namespace to Query.
	Namespace string
	// The FieldSelector field holds a string in selector syntax
	// that is used to filter results based on field values. The
	// only field values supported are metadata.name and
	// metadata.namespace. This is only supported for List.
	FieldSelector string
	// The LabelSelector field holds a string in selector syntax
	// that is used to filter results based on label values.
	LabelSelector string
}

A Query holds all the information needed to List or Watch a set of kubernetes resources.

type ReplicaSet

type ReplicaSet = appsv1.ReplicaSet

type ResourceList

type ResourceList = corev1.ResourceList

type ResourceRequirements

type ResourceRequirements = corev1.ResourceRequirements

type Role

type Role = rbacv1.Role

type RoleBinding

type RoleBinding = rbacv1.RoleBinding

type Secret

type Secret = corev1.Secret

type SecurityContext

type SecurityContext = corev1.SecurityContext

type Selector

type Selector = labels.Selector

type Service

type Service = corev1.Service

type ServiceAccount

type ServiceAccount = corev1.ServiceAccount

type ServicePort

type ServicePort = corev1.ServicePort

type ServiceSpec

type ServiceSpec = corev1.ServiceSpec

type StatefulSet

type StatefulSet = appsv1.StatefulSet

type Time

type Time = metav1.Time

type Toleration

type Toleration = corev1.Toleration

type TolerationOperator

type TolerationOperator = corev1.TolerationOperator

type TypeMeta

type TypeMeta = metav1.TypeMeta

type Unstructured

type Unstructured = unstructured.Unstructured

func NewUnstructured

func NewUnstructured(kind, version string) *Unstructured

func NewUnstructuredFromObject

func NewUnstructuredFromObject(obj Object) (result *Unstructured, err error)

Convert a potentially typed Object to an *Unstructured object.

type UpdateOptions

type UpdateOptions = metav1.UpdateOptions

type Validator

type Validator struct {
	// contains filtered or unexported fields
}

A Validator may be used in concert with a Client to perform validate of freeform jsonish data structures as kubernetes CRDs.

func NewValidator

func NewValidator(client *Client, staticCRDs []Object) (*Validator, error)

The NewValidator constructor returns a *Validator that uses the provided *Client to fetch CustomResourceDefinitions from kubernetes on demand as needed to validate data passed to the Validator.Validate() method.

func (*Validator) Validate

func (v *Validator) Validate(ctx context.Context, resource interface{}) error

The Validate method validates the supplied jsonish object as a kubernetes CRD instance.

If the supplied object is *not* a CRD instance but instead a regular kubernetes instance, the Validate method will assume that the supplied object is valid.

If the supplied object is not a valid kubernetes resource at all, the Validate method will return an error.

Typically the Validate method will perform only local operations, however the first time an instance of a given Kind is supplied, the Validator needs to query the cluster to figure out if it is a CRD and if so to fetch the schema needed to perform validation. All subsequent Validate() calls for that Kind will be local.

type VersionInfo

type VersionInfo = version.Info

type Volume

type Volume = corev1.Volume

type VolumeMount

type VolumeMount = corev1.VolumeMount

type VolumeSource

type VolumeSource = corev1.VolumeSource

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL