operator

package
v0.0.0-...-e88f72c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 8, 2024 License: Apache-2.0 Imports: 44 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// FailedCreatePodReason is added in an event and in a replica set condition
	// when a pod for a replica set is failed to be created.
	FailedCreatePodReason = "FailedCreatePod"
	// SuccessfulCreatePodReason is added in an event when a pod for a replica set
	// is successfully created.
	SuccessfulCreatePodReason = "SuccessfulCreatePod"
	// FailedDeletePodReason is added in an event and in a replica set condition
	// when a pod for a replica set is failed to be deleted.
	FailedDeletePodReason = "FailedDeletePod"
	// SuccessfulDeletePodReason is added in an event when a pod for a replica set
	// is successfully deleted.
	SuccessfulDeletePodReason = "SuccessfulDeletePod"
)

Reasons for pod events

View Source
const (
	FailedCreateServiceReason     = "FailedCreateService"
	SuccessfulCreateServiceReason = "SuccessfulCreateService"
	FailedDeleteServiceReason     = "FailedDeleteService"
	SuccessfulDeleteServiceReason = "SuccessfulDeleteService"
)
View Source
const (
	AppNameLabel = "app-name"
	RoleLabel    = "role"

	RoleLeader = "Leader"
)
View Source
const (
	ServiceFormat = "%s.%s.svc"
)

Variables

This section is empty.

Functions

func FilterPodsForReplicaType

func FilterPodsForReplicaType(pods []*v1.Pod, replicaType string) ([]*v1.Pod, error)

FilterPodsForReplicaType returns pods belong to a replicaType.

func FilterServicesForReplicaType

func FilterServicesForReplicaType(services []*v1.Service, replicaType string) ([]*v1.Service, error)

FilterServicesForReplicaType returns service belong to a replicaType.

func GenIndexName

func GenIndexName(appName, r, rt, index string) string

func GenLabels

func GenLabels(app *v1alpha1.FLApp) map[string]string

func GenName

func GenName(appName, r string) string

func GenReplicaName

func GenReplicaName(appName, r, rt string) string

func GetIngressClassName

func GetIngressClassName(app *v1alpha1.FLApp, defaultName string) string

func GetIngressClientAuthSecretNameOrDefault

func GetIngressClientAuthSecretNameOrDefault(app *v1alpha1.FLApp, defaultName string) string

func GetIngressExtraHostSuffix

func GetIngressExtraHostSuffix(app *v1alpha1.FLApp, defaultSuffix string) string

func GetIngressSecretNameOrDefault

func GetIngressSecretNameOrDefault(app *v1alpha1.FLApp, defaultName string) string

func GetPodFromTemplate

func GetPodFromTemplate(template *v1.PodTemplateSpec, parentObject runtime.Object, controllerRef *metav1.OwnerReference) (*v1.Pod, error)

func GetPortFromApp

func GetPortFromApp(app *v1alpha1.FLApp, rtype v1alpha1.FLReplicaType) (int32, error)

GetPortFromApp gets the flapp-port port of tensorflow container.

func GetPortsFromApp

func GetPortsFromApp(app *v1alpha1.FLApp, rtype v1alpha1.FLReplicaType) ([]v1.ContainerPort, error)

GetPortsFromApp gets the ports of tensorflow container.

func IsLeader

func IsLeader(role string) bool

func RecheckDeletionTimestamp

func RecheckDeletionTimestamp(getObject func() (metav1.Object, error)) func() error

Types

type AppEventHandler

type AppEventHandler interface {
	// Called after follower bootstrapped
	Register(context.Context, *v1alpha1.FLApp) error
	// Called after leader finished pairing
	Pair(context.Context, *v1alpha1.FLApp) error
	// Called when leader/follower needs to shutdown peer
	Shutdown(context.Context, *v1alpha1.FLApp) error
	// Called when leader/follower is finished
	Finish(context.Context, *v1alpha1.FLApp) error
	// Received when peer send sync request
	RegisterHandler(ctx context.Context, name string, role string, followerReplicas map[string][]string) (*pb.Status, error)
	// Received when peer send sync callback request
	PairHandler(ctx context.Context, name string, leaderReplicas map[string][]string, followerReplicas map[string][]string) (*pb.Status, error)
	// Received when peer send shutdown request
	ShutdownHandler(ctx context.Context, name string) (*pb.Status, error)
	// Received when peer send finish request
	FinishHandler(ctx context.Context, name string) (*pb.Status, error)
}

func NewAppEventHandlerWithClientTimeout

func NewAppEventHandlerWithClientTimeout(namespace string, crdClient crdclientset.Interface, clientTimeout time.Duration) AppEventHandler

type AppManager

type AppManager interface {
	SyncApp(app *v1alpha1.FLApp, deleting bool) error
}

func NewAppManager

func NewAppManager(
	namespace string,
	recorder record.EventRecorder,
	ingressExtraHostSuffix string,
	ingressSecretName string,
	ingressEnableClientAuth bool,
	ingressClientAuthSecretName string,
	ingressClassName string,
	kubeClient clientset.Interface,
	crdClient crdclientset.Interface,
	appLister crdlisters.FLAppLister,
	configMapLister listerscorev1.ConfigMapLister,
	podLister listerscorev1.PodLister,
	serviceLister listerscorev1.ServiceLister,
	ingressLister listersnetworking.IngressLister,
	secretLister listerscorev1.SecretLister,
	appEventHandler AppEventHandler,
	podCache *podCache,
) AppManager

type BaseControllerRefManager

type BaseControllerRefManager struct {
	Controller metav1.Object
	Selector   labels.Selector

	CanAdoptFunc func() error
	// contains filtered or unexported fields
}

func (*BaseControllerRefManager) CanAdopt

func (m *BaseControllerRefManager) CanAdopt() error

func (*BaseControllerRefManager) ClaimObject

func (m *BaseControllerRefManager) ClaimObject(obj metav1.Object, match func(metav1.Object) bool, adopt, release func(metav1.Object) error) (bool, error)

ClaimObject tries to take ownership of an object for this controller.

It will reconcile the following:

  • Adopt orphans if the match function returns true.
  • Release owned objects if the match function returns false.

A non-nil error is returned if some form of reconciliation was attempted and failed. Usually, controllers should try again later in case reconciliation is still needed.

If the error is nil, either the reconciliation succeeded, or no reconciliation was necessary. The returned boolean indicates whether you now own the object.

No reconciliation will be attempted if the controller is being deleted.

type ClusterSpec

type ClusterSpec struct {
	Services map[v1alpha1.FLReplicaType][]string `json:"clusterSpec"`
}

func NewClusterSpec

func NewClusterSpec(namespace string, app *v1alpha1.FLApp) ClusterSpec

func (ClusterSpec) Marshal

func (cs ClusterSpec) Marshal() ([]byte, error)

type FLController

type FLController struct {
	// contains filtered or unexported fields
}

func NewFLController

func NewFLController(
	namespace string,
	recorder record.EventRecorder,
	resyncInterval int,
	ingressExtraHostSuffix string,
	ingressSecretName string,
	ingressEnableClientAuth bool,
	ingressClientAuthSecretName string,
	ingressClassName string,
	kubeClient clientset.Interface,
	crdClientset crdclientset.Interface,
	kubeSharedInformerFactory informers.SharedInformerFactory,
	crdSharedInformerFactory crdinformers.SharedInformerFactory,
	appEventHandler AppEventHandler,
	stopCh <-chan struct{},
) *FLController

func (*FLController) Start

func (c *FLController) Start(workers int) error

func (*FLController) Stop

func (c *FLController) Stop()

type FakeServiceControl

type FakeServiceControl struct {
	sync.Mutex
	Templates         []v1.Service
	ControllerRefs    []metav1.OwnerReference
	DeleteServiceName []string
	Patches           [][]byte
	Err               error
	CreateLimit       int
	CreateCallCount   int
}

func (*FakeServiceControl) Clear

func (f *FakeServiceControl) Clear()

func (*FakeServiceControl) CreateServices

func (f *FakeServiceControl) CreateServices(ctx context.Context, namespace string, service *v1.Service, object runtime.Object) error

func (*FakeServiceControl) CreateServicesWithControllerRef

func (f *FakeServiceControl) CreateServicesWithControllerRef(ctx context.Context, namespace string, service *v1.Service, object runtime.Object, controllerRef *metav1.OwnerReference) error

func (*FakeServiceControl) DeleteService

func (f *FakeServiceControl) DeleteService(ctx context.Context, namespace string, serviceID string, object runtime.Object) error

func (*FakeServiceControl) PatchService

func (f *FakeServiceControl) PatchService(ctx context.Context, namespace, name string, data []byte) error

type PodControlInterface

type PodControlInterface interface {
	// CreatePods creates new pods according to the spec.
	CreatePods(ctx context.Context, namespace string, template *v1.PodTemplateSpec, object runtime.Object) error
	// CreatePodsOnNode creates a new pod according to the spec on the specified node,
	// and sets the ControllerRef.
	CreatePodsOnNode(ctx context.Context, nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error
	// CreatePodsWithControllerRef creates new pods according to the spec, and sets object as the pod's controller.
	CreatePodsWithControllerRef(ctx context.Context, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error
	// DeletePod deletes the pod identified by podID.
	DeletePod(ctx context.Context, namespace string, podID string, object runtime.Object) error
	// PatchPod patches the pod.
	PatchPod(ctx context.Context, namespace, name string, data []byte) error
}

PodControlInterface is an interface that knows how to add or delete pods created as an interface to allow testing.

type PodControllerRefManager

type PodControllerRefManager struct {
	BaseControllerRefManager
	// contains filtered or unexported fields
}

func NewPodControllerRefManager

func NewPodControllerRefManager(
	podControl PodControlInterface,
	controller metav1.Object,
	selector labels.Selector,
	controllerKind schema.GroupVersionKind,
	canAdopt func() error,
) *PodControllerRefManager

NewPodControllerRefManager returns a PodControllerRefManager that exposes methods to manage the controllerRef of pods.

The CanAdopt() function can be used to perform a potentially expensive check (such as a live GET from the API server) prior to the first adoption. It will only be called (at most once) if an adoption is actually attempted. If CanAdopt() returns a non-nil error, all adoptions will fail.

NOTE: Once CanAdopt() is called, it will not be called again by the same

PodControllerRefManager instance. Create a new instance if it makes
sense to check CanAdopt() again (e.g. in a different sync pass).

func (*PodControllerRefManager) AdoptPod

func (m *PodControllerRefManager) AdoptPod(ctx context.Context, pod *v1.Pod) error

AdoptPod sends a patch to take control of the pod. It returns the error if the patching fails.

func (*PodControllerRefManager) ClaimPods

func (m *PodControllerRefManager) ClaimPods(ctx context.Context, pods []*v1.Pod, filters ...func(*v1.Pod) bool) ([]*v1.Pod, error)

ClaimPods tries to take ownership of a list of Pods.

It will reconcile the following:

  • Adopt orphans if the selector matches.
  • Release owned objects if the selector no longer matches.

Optional: If one or more filters are specified, a Pod will only be claimed if all filters return true.

A non-nil error is returned if some form of reconciliation was attempted and failed. Usually, controllers should try again later in case reconciliation is still needed.

If the error is nil, either the reconciliation succeeded, or no reconciliation was necessary. The list of Pods that you now own is returned.

func (*PodControllerRefManager) ReleasePod

func (m *PodControllerRefManager) ReleasePod(ctx context.Context, pod *v1.Pod) error

ReleasePod sends a patch to free the pod from the control of the controller. It returns the error if the patching fails. 404 and 422 errors are ignored.

type RealPodControl

type RealPodControl struct {
	KubeClient clientset.Interface
	Recorder   record.EventRecorder
}

RealPodControl is the default implementation of PodControlInterface.

func (RealPodControl) CreatePods

func (r RealPodControl) CreatePods(ctx context.Context, namespace string, template *v1.PodTemplateSpec, object runtime.Object) error

func (RealPodControl) CreatePodsOnNode

func (r RealPodControl) CreatePodsOnNode(ctx context.Context, nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error

func (RealPodControl) CreatePodsWithControllerRef

func (r RealPodControl) CreatePodsWithControllerRef(ctx context.Context, namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference) error

func (RealPodControl) DeletePod

func (r RealPodControl) DeletePod(ctx context.Context, namespace string, podID string, object runtime.Object) error

func (RealPodControl) PatchPod

func (r RealPodControl) PatchPod(ctx context.Context, namespace, name string, data []byte) error

type RealServiceControl

type RealServiceControl struct {
	KubeClient clientset.Interface
	Recorder   record.EventRecorder
}

RealServiceControl is the default implementation of ServiceControlInterface.

func (RealServiceControl) CreateServices

func (r RealServiceControl) CreateServices(ctx context.Context, namespace string, service *v1.Service, object runtime.Object) error

func (RealServiceControl) CreateServicesWithControllerRef

func (r RealServiceControl) CreateServicesWithControllerRef(ctx context.Context, namespace string, service *v1.Service, controllerObject runtime.Object, controllerRef *metav1.OwnerReference) error

func (RealServiceControl) DeleteService

func (r RealServiceControl) DeleteService(ctx context.Context, namespace, serviceID string, object runtime.Object) error

DeleteService deletes the service identified by serviceID.

func (RealServiceControl) PatchService

func (r RealServiceControl) PatchService(ctx context.Context, namespace, name string, data []byte) error

type ServiceControlInterface

type ServiceControlInterface interface {
	// CreateServices creates new Services according to the spec.
	CreateServices(ctx context.Context, namespace string, service *v1.Service, object runtime.Object) error
	// CreateServicesWithControllerRef creates new services according to the spec, and sets object as the service's controller.
	CreateServicesWithControllerRef(ctx context.Context, namespace string, service *v1.Service, object runtime.Object, controllerRef *metav1.OwnerReference) error
	// PatchService patches the service.
	PatchService(ctx context.Context, namespace, name string, data []byte) error
	// DeleteService deletes the service identified by serviceID.
	DeleteService(ctx context.Context, namespace, serviceID string, object runtime.Object) error
}

ServiceControlInterface is an interface that knows how to add or delete Services created as an interface to allow testing.

type ServiceControllerRefManager

type ServiceControllerRefManager struct {
	BaseControllerRefManager
	// contains filtered or unexported fields
}

func NewServiceControllerRefManager

func NewServiceControllerRefManager(
	serviceControl ServiceControlInterface,
	ctr metav1.Object,
	selector labels.Selector,
	controllerKind schema.GroupVersionKind,
	canAdopt func() error,
) *ServiceControllerRefManager

NewServiceControllerRefManager returns a ServiceControllerRefManager that exposes methods to manage the controllerRef of services.

The canAdopt() function can be used to perform a potentially expensive check (such as a live GET from the API server) prior to the first adoption. It will only be called (at most once) if an adoption is actually attempted. If canAdopt() returns a non-nil error, all adoptions will fail.

NOTE: Once canAdopt() is called, it will not be called again by the same

ServiceControllerRefManager instance. Create a new instance if it makes
sense to check canAdopt() again (e.g. in a different sync pass).

func (*ServiceControllerRefManager) AdoptService

func (m *ServiceControllerRefManager) AdoptService(ctx context.Context, service *v1.Service) error

AdoptService sends a patch to take control of the service. It returns the error if the patching fails.

func (*ServiceControllerRefManager) ClaimServices

func (m *ServiceControllerRefManager) ClaimServices(ctx context.Context, services []*v1.Service, filters ...func(*v1.Service) bool) ([]*v1.Service, error)

ClaimServices tries to take ownership of a list of Services.

It will reconcile the following:

  • Adopt orphans if the selector matches.
  • Release owned objects if the selector no longer matches.

Optional: If one or more filters are specified, a Service will only be claimed if all filters return true.

A non-nil error is returned if some form of reconciliation was attempted and failed. Usually, controllers should try again later in case reconciliation is still needed.

If the error is nil, either the reconciliation succeeded, or no reconciliation was necessary. The list of Services that you now own is returned.

func (*ServiceControllerRefManager) ReleaseService

func (m *ServiceControllerRefManager) ReleaseService(ctx context.Context, service *v1.Service) error

ReleaseService sends a patch to free the service from the control of the controller. It returns the error if the patching fails. 404 and 422 errors are ignored.

type StatusUpdater

type StatusUpdater interface {
	// Update app state only in app's status
	UpdateAppStateWithRetry(ctx context.Context, app *v1alpha1.FLApp, state v1alpha1.FLState) error
	// Update any field in app's status
	UpdateStatusWithRetry(ctx context.Context, app *v1alpha1.FLApp, updateFunc func(*v1alpha1.FLApp) bool) (*v1alpha1.FLApp, error)
}

func NewAppStatusUpdater

func NewAppStatusUpdater(crdClient crdclientset.Interface, namespace string) StatusUpdater

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL