Documentation ¶
Index ¶
Constants ¶
View Source
const ControllerImport = `` /* 512-byte string literal not displayed */
View Source
const ControllerIntro = `` /* 217-byte string literal not displayed */
View Source
const ControllerMockClock = `
/*
We'll mock out the clock to make it easier to jump around in time while testing,
the "real" clock just calls` + " `" + `time.Now` + "`" + `.
*/
type realClock struct{}
func (_ realClock) Now() time.Time { return time.Now() }
// Clock knows how to get the current time.
// It can be used to fake out timing for testing.
type Clock interface {
Now() time.Time
}
// +kubebuilder:docs-gen:collapse=Clock
/*
Notice that we need a few more RBAC permissions -- since we're creating and
managing jobs now, we'll need permissions for those, which means adding
a couple more [markers](/reference/markers/rbac.md).
*/
`
View Source
const ControllerReconcile = `` /* 319-byte string literal not displayed */
View Source
const ControllerReconcileLogic = `log := log.FromContext(ctx)
/*
### 1: Load the CronJob by name
We'll fetch the CronJob using our client. All client methods take a
context (to allow for cancellation) as their first argument, and the object
in question as their last. Get is a bit special, in that it takes a
[` + "`" + `NamespacedName` + "`" + `](https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/client?tab=doc#ObjectKey)
as the middle argument (most don't have a middle argument, as we'll see
below).
Many client methods also take variadic options at the end.
*/
var cronJob batchv1.CronJob
if err := r.Get(ctx, req.NamespacedName, &cronJob); err != nil {
log.Error(err, "unable to fetch CronJob")
// we'll ignore not-found errors, since they can't be fixed by an immediate
// requeue (we'll need to wait for a new notification), and we can get them
// on deleted requests.
return ctrl.Result{}, client.IgnoreNotFound(err)
}
/*
### 2: List all active jobs, and update the status
To fully update our status, we'll need to list all child jobs in this namespace that belong to this CronJob.
Similarly to Get, we can use the List method to list the child jobs. Notice that we use variadic options to
set the namespace and field match (which is actually an index lookup that we set up below).
*/
var childJobs kbatch.JobList
if err := r.List(ctx, &childJobs, client.InNamespace(req.Namespace), client.MatchingFields{jobOwnerKey: req.Name}); err != nil {
log.Error(err, "unable to list child Jobs")
return ctrl.Result{}, err
}
/*
<aside class="note">
<h1>What is this index about?</h1>
<p>The reconciler fetches all jobs owned by the cronjob for the status. As our number of cronjobs increases,
looking these up can become quite slow as we have to filter through all of them. For a more efficient lookup,
these jobs will be indexed locally on the controller's name. A jobOwnerKey field is added to the
cached job objects. This key references the owning controller and functions as the index. Later in this
document we will configure the manager to actually index this field.</p>
</aside>
Once we have all the jobs we own, we'll split them into active, successful,
and failed jobs, keeping track of the most recent run so that we can record it
in status. Remember, status should be able to be reconstituted from the state
of the world, so it's generally not a good idea to read from the status of the
root object. Instead, you should reconstruct it every run. That's what we'll
do here.
We can check if a job is "finished" and whether it succeeded or failed using status
conditions. We'll put that logic in a helper to make our code cleaner.
*/
// find the active list of jobs
var activeJobs []*kbatch.Job
var successfulJobs []*kbatch.Job
var failedJobs []*kbatch.Job
var mostRecentTime *time.Time // find the last run so we can update the status
/*
We consider a job "finished" if it has a "Complete" or "Failed" condition marked as true.
Status conditions allow us to add extensible status information to our objects that other
humans and controllers can examine to check things like completion and health.
*/
isJobFinished := func(job *kbatch.Job) (bool, kbatch.JobConditionType) {
for _, c := range job.Status.Conditions {
if (c.Type == kbatch.JobComplete || c.Type == kbatch.JobFailed) && c.Status == corev1.ConditionTrue {
return true, c.Type
}
}
return false, ""
}
// +kubebuilder:docs-gen:collapse=isJobFinished
/*
We'll use a helper to extract the scheduled time from the annotation that
we added during job creation.
*/
getScheduledTimeForJob := func(job *kbatch.Job) (*time.Time, error) {
timeRaw := job.Annotations[scheduledTimeAnnotation]
if len(timeRaw) == 0 {
return nil, nil
}
timeParsed, err := time.Parse(time.RFC3339, timeRaw)
if err != nil {
return nil, err
}
return &timeParsed, nil
}
// +kubebuilder:docs-gen:collapse=getScheduledTimeForJob
for i, job := range childJobs.Items {
_, finishedType := isJobFinished(&job)
switch finishedType {
case "": // ongoing
activeJobs = append(activeJobs, &childJobs.Items[i])
case kbatch.JobFailed:
failedJobs = append(failedJobs, &childJobs.Items[i])
case kbatch.JobComplete:
successfulJobs = append(successfulJobs, &childJobs.Items[i])
}
// We'll store the launch time in an annotation, so we'll reconstitute that from
// the active jobs themselves.
scheduledTimeForJob, err := getScheduledTimeForJob(&job)
if err != nil {
log.Error(err, "unable to parse schedule time for child job", "job", &job)
continue
}
if scheduledTimeForJob != nil {
if mostRecentTime == nil || mostRecentTime.Before(*scheduledTimeForJob) {
mostRecentTime = scheduledTimeForJob
}
}
}
if mostRecentTime != nil {
cronJob.Status.LastScheduleTime = &metav1.Time{Time: *mostRecentTime}
} else {
cronJob.Status.LastScheduleTime = nil
}
cronJob.Status.Active = nil
for _, activeJob := range activeJobs {
jobRef, err := ref.GetReference(r.Scheme, activeJob)
if err != nil {
log.Error(err, "unable to make reference to active job", "job", activeJob)
continue
}
cronJob.Status.Active = append(cronJob.Status.Active, *jobRef)
}
/*
Here, we'll log how many jobs we observed at a slightly higher logging level,
for debugging. Notice how instead of using a format string, we use a fixed message,
and attach key-value pairs with the extra information. This makes it easier to
filter and query log lines.
*/
log.V(1).Info("job count", "active jobs", len(activeJobs), "successful jobs", len(successfulJobs), "failed jobs", len(failedJobs))
/*
Using the data we've gathered, we'll update the status of our CRD.
Just like before, we use our client. To specifically update the status
subresource, we'll use the` + " `" + `Status` + "`" + ` part of the client, with the` + " `" + `Update` + "`" + `
method.
The status subresource ignores changes to spec, so it's less likely to conflict
with any other updates, and can have separate permissions.
*/
if err := r.Status().Update(ctx, &cronJob); err != nil {
log.Error(err, "unable to update CronJob status")
return ctrl.Result{}, err
}
/*
Once we've updated our status, we can move on to ensuring that the status of
the world matches what we want in our spec.
### 3: Clean up old jobs according to the history limit
First, we'll try to clean up old jobs, so that we don't leave too many lying
around.
*/
// NB: deleting these are "best effort" -- if we fail on a particular one,
// we won't requeue just to finish the deleting.
if cronJob.Spec.FailedJobsHistoryLimit != nil {
sort.Slice(failedJobs, func(i, j int) bool {
if failedJobs[i].Status.StartTime == nil {
return failedJobs[j].Status.StartTime != nil
}
return failedJobs[i].Status.StartTime.Before(failedJobs[j].Status.StartTime)
})
for i, job := range failedJobs {
if int32(i) >= int32(len(failedJobs))-*cronJob.Spec.FailedJobsHistoryLimit {
break
}
if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); client.IgnoreNotFound(err) != nil {
log.Error(err, "unable to delete old failed job", "job", job)
} else {
log.V(0).Info("deleted old failed job", "job", job)
}
}
}
if cronJob.Spec.SuccessfulJobsHistoryLimit != nil {
sort.Slice(successfulJobs, func(i, j int) bool {
if successfulJobs[i].Status.StartTime == nil {
return successfulJobs[j].Status.StartTime != nil
}
return successfulJobs[i].Status.StartTime.Before(successfulJobs[j].Status.StartTime)
})
for i, job := range successfulJobs {
if int32(i) >= int32(len(successfulJobs))-*cronJob.Spec.SuccessfulJobsHistoryLimit {
break
}
if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); err != nil {
log.Error(err, "unable to delete old successful job", "job", job)
} else {
log.V(0).Info("deleted old successful job", "job", job)
}
}
}
/* ### 4: Check if we're suspended
If this object is suspended, we don't want to run any jobs, so we'll stop now.
This is useful if something's broken with the job we're running and we want to
pause runs to investigate or putz with the cluster, without deleting the object.
*/
if cronJob.Spec.Suspend != nil && *cronJob.Spec.Suspend {
log.V(1).Info("cronjob suspended, skipping")
return ctrl.Result{}, nil
}
/*
### 5: Get the next scheduled run
If we're not paused, we'll need to calculate the next scheduled run, and whether
or not we've got a run that we haven't processed yet.
*/
/*
We'll calculate the next scheduled time using our helpful cron library.
We'll start calculating appropriate times from our last run, or the creation
of the CronJob if we can't find a last run.
If there are too many missed runs and we don't have any deadlines set, we'll
bail so that we don't cause issues on controller restarts or wedges.
Otherwise, we'll just return the missed runs (of which we'll just use the latest),
and the next run, so that we can know when it's time to reconcile again.
*/
getNextSchedule := func(cronJob *batchv1.CronJob, now time.Time) (lastMissed time.Time, next time.Time, err error) {
sched, err := cron.ParseStandard(cronJob.Spec.Schedule)
if err != nil {
return time.Time{}, time.Time{}, fmt.Errorf("Unparseable schedule %q: %v", cronJob.Spec.Schedule, err)
}
// for optimization purposes, cheat a bit and start from our last observed run time
// we could reconstitute this here, but there's not much point, since we've
// just updated it.
var earliestTime time.Time
if cronJob.Status.LastScheduleTime != nil {
earliestTime = cronJob.Status.LastScheduleTime.Time
} else {
earliestTime = cronJob.ObjectMeta.CreationTimestamp.Time
}
if cronJob.Spec.StartingDeadlineSeconds != nil {
// controller is not going to schedule anything below this point
schedulingDeadline := now.Add(-time.Second * time.Duration(*cronJob.Spec.StartingDeadlineSeconds))
if schedulingDeadline.After(earliestTime) {
earliestTime = schedulingDeadline
}
}
if earliestTime.After(now) {
return time.Time{}, sched.Next(now), nil
}
starts := 0
for t := sched.Next(earliestTime); !t.After(now); t = sched.Next(t) {
lastMissed = t
// An object might miss several starts. For example, if
// controller gets wedged on Friday at 5:01pm when everyone has
// gone home, and someone comes in on Tuesday AM and discovers
// the problem and restarts the controller, then all the hourly
// jobs, more than 80 of them for one hourly scheduledJob, should
// all start running with no further intervention (if the scheduledJob
// allows concurrency and late starts).
//
// However, if there is a bug somewhere, or incorrect clock
// on controller's server or apiservers (for setting creationTimestamp)
// then there could be so many missed start times (it could be off
// by decades or more), that it would eat up all the CPU and memory
// of this controller. In that case, we want to not try to list
// all the missed start times.
starts++
if starts > 100 {
// We can't get the most recent times so just return an empty slice
return time.Time{}, time.Time{}, fmt.Errorf("Too many missed start times (> 100). Set or decrease .spec.startingDeadlineSeconds or check clock skew.")
}
}
return lastMissed, sched.Next(now), nil
}
// +kubebuilder:docs-gen:collapse=getNextSchedule
// figure out the next times that we need to create
// jobs at (or anything we missed).
missedRun, nextRun, err := getNextSchedule(&cronJob, r.Now())
if err != nil {
log.Error(err, "unable to figure out CronJob schedule")
// we don't really care about requeuing until we get an update that
// fixes the schedule, so don't return an error
return ctrl.Result{}, nil
}
/*
We'll prep our eventual request to requeue until the next job, and then figure
out if we actually need to run.
*/
scheduledResult := ctrl.Result{RequeueAfter: nextRun.Sub(r.Now())} // save this so we can re-use it elsewhere
log = log.WithValues("now", r.Now(), "next run", nextRun)
/*
### 6: Run a new job if it's on schedule, not past the deadline, and not blocked by our concurrency policy
If we've missed a run, and we're still within the deadline to start it, we'll need to run a job.
*/
if missedRun.IsZero() {
log.V(1).Info("no upcoming scheduled times, sleeping until next")
return scheduledResult, nil
}
// make sure we're not too late to start the run
log = log.WithValues("current run", missedRun)
tooLate := false
if cronJob.Spec.StartingDeadlineSeconds != nil {
tooLate = missedRun.Add(time.Duration(*cronJob.Spec.StartingDeadlineSeconds) * time.Second).Before(r.Now())
}
if tooLate {
log.V(1).Info("missed starting deadline for last run, sleeping till next")
// TODO(directxman12): events
return scheduledResult, nil
}
/*
If we actually have to run a job, we'll need to either wait till existing ones finish,
replace the existing ones, or just add new ones. If our information is out of date due
to cache delay, we'll get a requeue when we get up-to-date information.
*/
// figure out how to run this job -- concurrency policy might forbid us from running
// multiple at the same time...
if cronJob.Spec.ConcurrencyPolicy == batchv1.ForbidConcurrent && len(activeJobs) > 0 {
log.V(1).Info("concurrency policy blocks concurrent runs, skipping", "num active", len(activeJobs))
return scheduledResult, nil
}
// ...or instruct us to replace existing ones...
if cronJob.Spec.ConcurrencyPolicy == batchv1.ReplaceConcurrent {
for _, activeJob := range activeJobs {
// we don't care if the job was already deleted
if err := r.Delete(ctx, activeJob, client.PropagationPolicy(metav1.DeletePropagationBackground)); client.IgnoreNotFound(err) != nil {
log.Error(err, "unable to delete active job", "job", activeJob)
return ctrl.Result{}, err
}
}
}
/*
Once we've figured out what to do with existing jobs, we'll actually create our desired job
*/
/*
We need to construct a job based on our CronJob's template. We'll copy over the spec
from the template and copy some basic object meta.
Then, we'll set the "scheduled time" annotation so that we can reconstitute our
` + "`" + `LastScheduleTime` + "`" + ` field each reconcile.
Finally, we'll need to set an owner reference. This allows the Kubernetes garbage collector
to clean up jobs when we delete the CronJob, and allows controller-runtime to figure out
which cronjob needs to be reconciled when a given job changes (is added, deleted, completes, etc).
*/
constructJobForCronJob := func(cronJob *batchv1.CronJob, scheduledTime time.Time) (*kbatch.Job, error) {
// We want job names for a given nominal start time to have a deterministic name to avoid the same job being created twice
name := fmt.Sprintf("%s-%d", cronJob.Name, scheduledTime.Unix())
job := &kbatch.Job{
ObjectMeta: metav1.ObjectMeta{
Labels: make(map[string]string),
Annotations: make(map[string]string),
Name: name,
Namespace: cronJob.Namespace,
},
Spec: *cronJob.Spec.JobTemplate.Spec.DeepCopy(),
}
for k, v := range cronJob.Spec.JobTemplate.Annotations {
job.Annotations[k] = v
}
job.Annotations[scheduledTimeAnnotation] = scheduledTime.Format(time.RFC3339)
for k, v := range cronJob.Spec.JobTemplate.Labels {
job.Labels[k] = v
}
if err := ctrl.SetControllerReference(cronJob, job, r.Scheme); err != nil {
return nil, err
}
return job, nil
}
// +kubebuilder:docs-gen:collapse=constructJobForCronJob
// actually make the job...
job, err := constructJobForCronJob(&cronJob, missedRun)
if err != nil {
log.Error(err, "unable to construct job from template")
// don't bother requeuing until we get a change to the spec
return scheduledResult, nil
}
// ...and create it on the cluster
if err := r.Create(ctx, job); err != nil {
log.Error(err, "unable to create Job for CronJob", "job", job)
return ctrl.Result{}, err
}
log.V(1).Info("created Job for CronJob run", "job", job)
/*
### 7: Requeue when we either see a running job or it's time for the next scheduled run
Finally, we'll return the result that we prepped above, that says we want to requeue
when our next run would need to occur. This is taken as a maximum deadline -- if something
else changes in between, like our job starts or finishes, we get modified, etc, we might
reconcile again sooner.
*/
// we'll requeue once we see the running job, and update our status
return scheduledResult, nil
}
/*
### Setup
Finally, we'll update our setup. In order to allow our reconciler to quickly
look up Jobs by their owner, we'll need an index. We declare an index key that
we can later use with the client as a pseudo-field name, and then describe how to
extract the indexed value from the Job object. The indexer will automatically take
care of namespaces for us, so we just have to extract the owner name if the Job has
a CronJob owner.
Additionally, we'll inform the manager that this controller owns some Jobs, so that it
will automatically call Reconcile on the underlying CronJob when a Job changes, is
deleted, etc.
*/
var (
jobOwnerKey = ".metadata.controller"
apiGVStr = batchv1.GroupVersion.String()
)
`
View Source
const ControllerSetupWithManager = `` /* 598-byte string literal not displayed */
View Source
const ControllerTest = `/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// +kubebuilder:docs-gen:collapse=Apache License
/*
Ideally, we should have one` + " `" + `<kind>_controller_test.go` + "`" + ` for each controller scaffolded and called in the` + " `" + `suite_test.go` + "`" + `.
So, let's write our example test for the CronJob controller (` + "`" + `cronjob_controller_test.go.` + "`" + `)
*/
/*
As usual, we start with the necessary imports. We also define some utility variables.
*/
package controller
import (
"context"
"reflect"
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
cronjobv1 "tutorial.kubebuilder.io/project/api/v1"
)
// +kubebuilder:docs-gen:collapse=Imports
/*
The first step to writing a simple integration test is to actually create an instance of CronJob you can run tests against.
Note that to create a CronJob, you’ll need to create a stub CronJob struct that contains your CronJob’s specifications.
Note that when we create a stub CronJob, the CronJob also needs stubs of its required downstream objects.
Without the stubbed Job template spec and the Pod template spec below, the Kubernetes API will not be able to
create the CronJob.
*/
var _ = Describe("CronJob controller", func() {
// Define utility constants for object names and testing timeouts/durations and intervals.
const (
CronjobName = "test-cronjob"
CronjobNamespace = "default"
JobName = "test-job"
timeout = time.Second * 10
duration = time.Second * 10
interval = time.Millisecond * 250
)
Context("When updating CronJob Status", func() {
It("Should increase CronJob Status.Active count when new Jobs are created", func() {
By("By creating a new CronJob")
ctx := context.Background()
cronJob := &cronjobv1.CronJob{
TypeMeta: metav1.TypeMeta{
APIVersion: "batch.tutorial.kubebuilder.io/v1",
Kind: "CronJob",
},
ObjectMeta: metav1.ObjectMeta{
Name: CronjobName,
Namespace: CronjobNamespace,
},
Spec: cronjobv1.CronJobSpec{
Schedule: "1 * * * *",
JobTemplate: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
// For simplicity, we only fill out the required fields.
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
// For simplicity, we only fill out the required fields.
Containers: []v1.Container{
{
Name: "test-container",
Image: "test-image",
},
},
RestartPolicy: v1.RestartPolicyOnFailure,
},
},
},
},
},
}
Expect(k8sClient.Create(ctx, cronJob)).Should(Succeed())
/*
After creating this CronJob, let's check that the CronJob's Spec fields match what we passed in.
Note that, because the k8s apiserver may not have finished creating a CronJob after our` + " `" + `Create()` + "`" + ` call from earlier, we will use Gomega’s Eventually() testing function instead of Expect() to give the apiserver an opportunity to finish creating our CronJob.` + `
` +
"`" + `Eventually()` + "`" + ` will repeatedly run the function provided as an argument every interval seconds until
(a) the function’s output matches what’s expected in the subsequent` + " `" + `Should()` + "`" + ` call, or
(b) the number of attempts * interval period exceed the provided timeout value.
In the examples below, timeout and interval are Go Duration values of our choosing.
*/
cronjobLookupKey := types.NamespacedName{Name: CronjobName, Namespace: CronjobNamespace}
createdCronjob := &cronjobv1.CronJob{}
// We'll need to retry getting this newly created CronJob, given that creation may not immediately happen.
Eventually(func() bool {
err := k8sClient.Get(ctx, cronjobLookupKey, createdCronjob)
return err == nil
}, timeout, interval).Should(BeTrue())
// Let's make sure our Schedule string value was properly converted/handled.
Expect(createdCronjob.Spec.Schedule).Should(Equal("1 * * * *"))
/*
Now that we've created a CronJob in our test cluster, the next step is to write a test that actually tests our CronJob controller’s behavior.
Let’s test the CronJob controller’s logic responsible for updating CronJob.Status.Active with actively running jobs.
We’ll verify that when a CronJob has a single active downstream Job, its CronJob.Status.Active field contains a reference to this Job.
First, we should get the test CronJob we created earlier, and verify that it currently does not have any active jobs.
We use Gomega's` + " `" + `Consistently()` + "`" + ` check here to ensure that the active job count remains 0 over a duration of time.
*/
By("By checking the CronJob has zero active Jobs")
Consistently(func() (int, error) {
err := k8sClient.Get(ctx, cronjobLookupKey, createdCronjob)
if err != nil {
return -1, err
}
return len(createdCronjob.Status.Active), nil
}, duration, interval).Should(Equal(0))
/*
Next, we actually create a stubbed Job that will belong to our CronJob, as well as its downstream template specs.
We set the Job's status's "Active" count to 2 to simulate the Job running two pods, which means the Job is actively running.
We then take the stubbed Job and set its owner reference to point to our test CronJob.
This ensures that the test Job belongs to, and is tracked by, our test CronJob.
Once that’s done, we create our new Job instance.
*/
By("By creating a new Job")
testJob := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: JobName,
Namespace: CronjobNamespace,
},
Spec: batchv1.JobSpec{
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
// For simplicity, we only fill out the required fields.
Containers: []v1.Container{
{
Name: "test-container",
Image: "test-image",
},
},
RestartPolicy: v1.RestartPolicyOnFailure,
},
},
},
}
// Note that your CronJob’s GroupVersionKind is required to set up this owner reference.
kind := reflect.TypeOf(cronjobv1.CronJob{}).Name()
gvk := cronjobv1.GroupVersion.WithKind(kind)
controllerRef := metav1.NewControllerRef(createdCronjob, gvk)
testJob.SetOwnerReferences([]metav1.OwnerReference{*controllerRef})
Expect(k8sClient.Create(ctx, testJob)).Should(Succeed())
// Note that you can not manage the status values while creating the resource.
// The status field is managed separately to reflect the current state of the resource.
// Therefore, it should be updated using a PATCH or PUT operation after the resource has been created.
// Additionally, it is recommended to use StatusConditions to manage the status. For further information see:
// https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#spec-and-status
testJob.Status.Active = 2
Expect(k8sClient.Status().Update(ctx, testJob)).Should(Succeed())
/*
Adding this Job to our test CronJob should trigger our controller’s reconciler logic.
After that, we can write a test that evaluates whether our controller eventually updates our CronJob’s Status field as expected!
*/
By("By checking that the CronJob has one active Job")
Eventually(func() ([]string, error) {
err := k8sClient.Get(ctx, cronjobLookupKey, createdCronjob)
if err != nil {
return nil, err
}
names := []string{}
for _, job := range createdCronjob.Status.Active {
names = append(names, job.Name)
}
return names, nil
}, timeout, interval).Should(ConsistOf(JobName), "should list our active job %s in the active jobs list in status", JobName)
})
})
})
/*
After writing all this code, you can run` + " `" + `go test ./...` + "`" + ` in your` + " `" + `controllers/` + "`" + ` directory again to run your new test!
*/
`
View Source
const CronjobList = `
// A list of pointers to currently running jobs.
// +optional
Active []corev1.ObjectReference` + " `" + `json:"active,omitempty"` + "`" + `
// Information when was the last time the job was successfully scheduled.
// +optional
LastScheduleTime *metav1.Time` + " `" + `json:"lastScheduleTime,omitempty"` + "`" + `
}
/*
Finally, we have the rest of the boilerplate that we've already discussed.
As previously noted, we don't need to change this, except to mark that
we want a status subresource, so that we behave like built-in kubernetes types.
*/
`
View Source
const CronjobSample = `` /* 411-byte string literal not displayed */
View Source
const CronjobSpecExplaination = `
// +kubebuilder:docs-gen:collapse=Imports
/*
First, let's take a look at our spec. As we discussed before, spec holds
*desired state*, so any "inputs" to our controller go here.
Fundamentally a CronJob needs the following pieces:
- A schedule (the *cron* in CronJob)
- A template for the Job to run (the
*job* in CronJob)
We'll also want a few extras, which will make our users' lives easier:
- A deadline for starting jobs (if we miss this deadline, we'll just wait till
the next scheduled time)
- What to do if multiple jobs would run at once (do we wait? stop the old one? run both?)
- A way to pause the running of a CronJob, in case something's wrong with it
- Limits on old job history
Remember, since we never read our own status, we need to have some other way to
keep track of whether a job has run. We can use at least one old job to do
this.
We'll use several markers (` + "`" + `// +comment` + "`" + `) to specify additional metadata. These
will be used by [controller-tools](https://github.com/kubernetes-sigs/controller-tools) when generating our CRD manifest.
As we'll see in a bit, controller-tools will also use GoDoc to form descriptions for
the fields.
*/
`
View Source
const CronjobSpecStruct = `
// +kubebuilder:validation:MinLength=0
// The schedule in Cron format, see https://en.wikipedia.org/wiki/Cron.
Schedule string` + " `" + `json:"schedule"` + "`" + `
// +kubebuilder:validation:Minimum=0
// Optional deadline in seconds for starting the job if it misses scheduled
// time for any reason. Missed jobs executions will be counted as failed ones.
// +optional
StartingDeadlineSeconds *int64` + " `" + `json:"startingDeadlineSeconds,omitempty"` + "`" + `
// Specifies how to treat concurrent executions of a Job.
// Valid values are:
// - "Allow" (default): allows CronJobs to run concurrently;
// - "Forbid": forbids concurrent runs, skipping next run if previous run hasn't finished yet;
// - "Replace": cancels currently running job and replaces it with a new one
// +optional
ConcurrencyPolicy ConcurrencyPolicy` + " `" + `json:"concurrencyPolicy,omitempty"` + "`" + `
// This flag tells the controller to suspend subsequent executions, it does
// not apply to already started executions. Defaults to false.
// +optional
Suspend *bool` + " `" + `json:"suspend,omitempty"` + "`" + `
// Specifies the job that will be created when executing a CronJob.
JobTemplate batchv1.JobTemplateSpec` + " `" + `json:"jobTemplate"` + "`" + `
// +kubebuilder:validation:Minimum=0
// The number of successful finished jobs to retain.
// This is a pointer to distinguish between explicit zero and not specified.
// +optional
SuccessfulJobsHistoryLimit *int32` + " `" + `json:"successfulJobsHistoryLimit,omitempty"` + "`" + `
// +kubebuilder:validation:Minimum=0
// The number of failed finished jobs to retain.
// This is a pointer to distinguish between explicit zero and not specified.
// +optional
FailedJobsHistoryLimit *int32` + " `" + `json:"failedJobsHistoryLimit,omitempty"` + "`" + `
}
/*
We define a custom type to hold our concurrency policy. It's actually
just a string under the hood, but the type gives extra documentation,
and allows us to attach validation on the type instead of the field,
making the validation more easily reusable.
*/
// ConcurrencyPolicy describes how the job will be handled.
// Only one of the following concurrent policies may be specified.
// If none of the following policies is specified, the default one
// is AllowConcurrent.
// +kubebuilder:validation:Enum=Allow;Forbid;Replace
type ConcurrencyPolicy string
const (
// AllowConcurrent allows CronJobs to run concurrently.
AllowConcurrent ConcurrencyPolicy = "Allow"
// ForbidConcurrent forbids concurrent runs, skipping next run if previous
// hasn't finished yet.
ForbidConcurrent ConcurrencyPolicy = "Forbid"
// ReplaceConcurrent cancels currently running job and replaces it with a new one.
ReplaceConcurrent ConcurrencyPolicy = "Replace"
)
/*
Next, let's design our status, which holds observed state. It contains any information
we want users or other controllers to be able to easily obtain.
We'll keep a list of actively running jobs, as well as the last time that we successfully
ran our job. Notice that we use` + " `" + `metav1.Time` + "`" + ` instead of` + " `" + `time.Time` + "`" + ` to get the stable
serialization, as mentioned above.
*/`
View Source
const DefaultKustomization = `` /* 2932-byte string literal not displayed */
View Source
const GroupversionIntro = `
// +kubebuilder:docs-gen:collapse=Apache License
/*
First, we have some *package-level* markers that denote that there are
Kubernetes objects in this package, and that this package represents the group
` + "`" + `batch.tutorial.kubebuilder.io` + "`" + `. The` + " `" + `object` + "`" + ` generator makes use of the
former, while the latter is used by the CRD generator to generate the right
metadata for the CRDs it creates from this package.
*/
`
View Source
const GroupversionSchema = `
/*
Then, we have the commonly useful variables that help us set up our Scheme.
Since we need to use all the types in this package in our controller, it's
helpful (and the convention) to have a convenient method to add all the types to
some other` + " `" + `Scheme` + "`" + `. SchemeBuilder makes this easy for us.
*/`
View Source
const MainBatch = `
// +kubebuilder:docs-gen:collapse=Imports
/*
The first difference to notice is that kubebuilder has added the new API
group's package (` + "`" + `batchv1` + "`" + `) to our scheme. This means that we can use those
objects in our controller.
If we would be using any other CRD we would have to add their scheme the same way.
Builtin types such as Job have their scheme added by` + " `" + `clientgoscheme` + "`" + `.
*/`
View Source
const MainEnableWebhook = `
/*
We'll also set up webhooks for our type, which we'll talk about next.
We just need to add them to the manager. Since we might want to run
the webhooks separately, or not run them when testing our controller
locally, we'll put them behind an environment variable.
We'll just make sure to set` + " `" + `ENABLE_WEBHOOKS=false` + "`" + ` when we run locally.
*/`
View Source
const SuiteTestAddSchema = `` /* 534-byte string literal not displayed */
View Source
const SuiteTestCleanup = `` /* 486-byte string literal not displayed */
View Source
const SuiteTestDescription = `
/*
One thing that this autogenerated file is missing, however, is a way to actually start your controller.
The code above will set up a client for interacting with your custom Kind,
but will not be able to test your controller behavior.
If you want to test your custom controller logic, you’ll need to add some familiar-looking manager logic
to your BeforeSuite() function, so you can register your custom controller to run on this test cluster.
You may notice that the code below runs your controller with nearly identical logic to your CronJob project’s main.go!
The only difference is that the manager is started in a separate goroutine so it does not block the cleanup of envtest
when you’re done running your tests.
Note that we set up both a "live" k8s client and a separate client from the manager. This is because when making
assertions in tests, you generally want to assert against the live state of the API server. If you use the client
from the manager (` + "`" + `k8sManager.GetClient` + "`" + `), you'd end up asserting against the contents of the cache instead, which is
slower and can introduce flakiness into your tests. We could use the manager's ` + "`" + `APIReader` + "`" + ` to accomplish the same
thing, but that would leave us with two clients in our test assertions and setup (one for reading, one for writing),
and it'd be easy to make mistakes.
Note that we keep the reconciler running against the manager's cache client, though -- we want our controller to
behave as it would in production, and we use features of the cache (like indicies) in our controller which aren't
available when talking directly to the API server.
*/
k8sManager, err := ctrl.NewManager(cfg, ctrl.Options{
Scheme: scheme.Scheme,
})
Expect(err).ToNot(HaveOccurred())
err = (&CronJobReconciler{
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
}).SetupWithManager(k8sManager)
Expect(err).ToNot(HaveOccurred())
go func() {
defer GinkgoRecover()
err = k8sManager.Start(ctx)
Expect(err).ToNot(HaveOccurred(), "failed to run manager")
}()
`
View Source
const SuiteTestEnv = `` /* 228-byte string literal not displayed */
View Source
const SuiteTestIntro = `
// +kubebuilder:docs-gen:collapse=Apache License
/*
When we created the CronJob API with` + " `" + `kubebuilder create api` + "`" + ` in a [previous chapter](/cronjob-tutorial/new-api.md), Kubebuilder already did some test work for you.
Kubebuilder scaffolded a` + " `" + `internal/controller/suite_test.go` + "`" + ` file that does the bare bones of setting up a test environment.
First, it will contain the necessary imports.
*/
`
View Source
const SuiteTestReadCRD = `
/*
First, the envtest cluster is configured to read CRDs from the CRD directory Kubebuilder scaffolds for you.
*/`
View Source
const WebhookIntro = `` /* 557-byte string literal not displayed */
View Source
const WebhookMarker = `/*
Notice that we use kubebuilder markers to generate webhook manifests.
This marker is responsible for generating a mutating webhook manifest.
The meaning of each marker can be found [here](/reference/markers/webhook.md).
*/
// +kubebuilder:webhook:path=/mutate-batch-tutorial-kubebuilder-io-v1-cronjob,mutating=true,failurePolicy=fail,groups=batch.tutorial.kubebuilder.io,resources=cronjobs,verbs=create;update,versions=v1,name=mcronjob.kb.io,sideEffects=None,admissionReviewVersions=v1
/*
We use the` + " `" + `webhook.Defaulter` + "`" + ` interface to set defaults to our CRD.
A webhook will automatically be served that calls this defaulting.
The` + " `" + `Default` + "`" + ` method is expected to mutate the receiver, setting the defaults.
*/
`
View Source
const WebhookValidate = ` cronjoblog.Info("default", "name", r.Name)
if r.Spec.ConcurrencyPolicy == "" {
r.Spec.ConcurrencyPolicy = AllowConcurrent
}
if r.Spec.Suspend == nil {
r.Spec.Suspend = new(bool)
}
if r.Spec.SuccessfulJobsHistoryLimit == nil {
r.Spec.SuccessfulJobsHistoryLimit = new(int32)
*r.Spec.SuccessfulJobsHistoryLimit = 3
}
if r.Spec.FailedJobsHistoryLimit == nil {
r.Spec.FailedJobsHistoryLimit = new(int32)
*r.Spec.FailedJobsHistoryLimit = 1
}
}
/*
This marker is responsible for generating a validating webhook manifest.
*/
// +kubebuilder:webhook:verbs=create;update;delete,path=/validate-batch-tutorial-kubebuilder-io-v1-cronjob,mutating=false,failurePolicy=fail,groups=batch.tutorial.kubebuilder.io,resources=cronjobs,versions=v1,name=vcronjob.kb.io,sideEffects=None,admissionReviewVersions=v1
/*
We can validate our CRD beyond what's possible with declarative
validation. Generally, declarative validation should be sufficient, but
sometimes more advanced use cases call for complex validation.
For instance, we'll see below that we use this to validate a well-formed cron
schedule without making up a long regular expression.
If` + " `" + `webhook.Validator` + "`" + ` interface is implemented, a webhook will automatically be
served that calls the validation.
The` + " `" + `ValidateCreate` + "`" + `, ` + "`" + `ValidateUpdate` + "`" + ` and` + " `" + `ValidateDelete` + "`" + ` methods are expected
to validate its receiver upon creation, update and deletion respectively.
We separate out ValidateCreate from ValidateUpdate to allow behavior like making
certain fields immutable, so that they can only be set on creation.
ValidateDelete is also separated from ValidateUpdate to allow different
validation behavior on deletion.
Here, however, we just use the same shared validation for` + " `" + `ValidateCreate` + "`" + ` and
` + "`" + `ValidateUpdate` + "`" + `. And we do nothing in` + " `" + `ValidateDelete` + "`" + `, since we don't need to
validate anything on deletion.
*/
`
View Source
const WebhookValidateSpec = `
/*
We validate the name and the spec of the CronJob.
*/
func (r *CronJob) validateCronJob() error {
var allErrs field.ErrorList
if err := r.validateCronJobName(); err != nil {
allErrs = append(allErrs, err)
}
if err := r.validateCronJobSpec(); err != nil {
allErrs = append(allErrs, err)
}
if len(allErrs) == 0 {
return nil
}
return apierrors.NewInvalid(
schema.GroupKind{Group: "batch.tutorial.kubebuilder.io", Kind: "CronJob"},
r.Name, allErrs)
}
/*
Some fields are declaratively validated by OpenAPI schema.
You can find kubebuilder validation markers (prefixed
with` + " `" + `// +kubebuilder:validation` + "`" + `) in the
[Designing an API](api-design.md) section.
You can find all of the kubebuilder supported markers for
declaring validation by running` + " `" + `controller-gen crd -w` + "`" + `,
or [here](/reference/markers/crd-validation.md).
*/
func (r *CronJob) validateCronJobSpec() *field.Error {
// The field helpers from the kubernetes API machinery help us return nicely
// structured validation errors.
return validateScheduleFormat(
r.Spec.Schedule,
field.NewPath("spec").Child("schedule"))
}
/*
We'll need to validate the [cron](https://en.wikipedia.org/wiki/Cron) schedule
is well-formatted.
*/
func validateScheduleFormat(schedule string, fldPath *field.Path) *field.Error {
if _, err := cron.ParseStandard(schedule); err != nil {
return field.Invalid(fldPath, schedule, err.Error())
}
return nil
}
/*
Validating the length of a string field can be done declaratively by
the validation schema.
But the` + " `" + `ObjectMeta.Name` + "`" + ` field is defined in a shared package under
the apimachinery repo, so we can't declaratively validate it using
the validation schema.
*/
func (r *CronJob) validateCronJobName() *field.Error {
if len(r.ObjectMeta.Name) > validationutils.DNS1035LabelMaxLength-11 {
// The job name length is 63 character like all Kubernetes objects
// (which must fit in a DNS subdomain). The cronjob controller appends
// a 11-character suffix to the cronjob (` + "`" + `-$TIMESTAMP` + "`" + `) when creating
// a job. The job name length limit is 63 characters. Therefore cronjob
// names must have length <= 63-11=52. If we don't validate this here,
// then job creation will fail later.
return field.Invalid(field.NewPath("metadata").Child("name"), r.Name, "must be no more than 52 characters")
}
return nil
}
// +kubebuilder:docs-gen:collapse=Validate object name`
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Sample ¶
type Sample struct {
// contains filtered or unexported fields
}
func (*Sample) CodeGen ¶
func (sp *Sample) CodeGen()
CodeGen is a noop for this sample, just to make generation of all samples more efficient. We may want to refactor `UpdateTutorial` some day to take advantage of a separate call, but it is not necessary.
func (*Sample) GenerateSampleProject ¶
func (sp *Sample) GenerateSampleProject()
func (*Sample) UpdateTutorial ¶
func (sp *Sample) UpdateTutorial()
Click to show internal directories.
Click to hide internal directories.