analytics

package
v0.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 16, 2022 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ConvertToRecommendationRule added in v0.7.0

func ConvertToRecommendationRule(analytics *analysisv1alph1.Analytics) *analysisv1alph1.RecommendationRule

func UpsertRecommendationRule added in v0.7.0

func UpsertRecommendationRule(recommendationRule *analysisv1alph1.RecommendationRule, client client.Client) error

Types

type Controller

type Controller struct {
	client.Client
}

func (*Controller) Reconcile

func (c *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error)

func (*Controller) SetupWithManager

func (c *Controller) SetupWithManager(mgr ctrl.Manager) error
func (c *Controller) analyze(ctx context.Context, analytics *analysisv1alph1.Analytics) bool {
	newStatus := analytics.Status.DeepCopy()

	identities, err := c.getIdentities(ctx, analytics)
	if err != nil {
		c.Recorder.Event(analytics, corev1.EventTypeNormal, "FailedSelectResource", err.Error())
		msg := fmt.Sprintf("Failed to get idenitities, Analytics %s error %v", klog.KObj(analytics), err)
		klog.Errorf(msg)
		setReadyCondition(newStatus, metav1.ConditionFalse, "FailedSelectResource", msg)
		c.UpdateStatus(ctx, analytics, newStatus)
		return false
	}

	timeNow := metav1.Now()

	// if the first mission start time is last round, reset currMissions here
	currMissions := newStatus.Recommendations
	if currMissions != nil && len(currMissions) > 0 {
		firstMissionStartTime := currMissions[0].LastStartTime
		if firstMissionStartTime.IsZero() {
			currMissions = nil
		} else {
			planingTime := firstMissionStartTime.Add(time.Duration(*analytics.Spec.CompletionStrategy.PeriodSeconds) * time.Second)
			if time.Now().After(planingTime) {
				currMissions = nil // reset missions to trigger creation for missions
			}
		}
	}

	if currMissions == nil {
		// create recommendation missions for this round
		for _, id := range identities {
			currMissions = append(currMissions, analysisv1alph1.RecommendationMission{
				TargetRef: corev1.ObjectReference{Kind: id.Kind, APIVersion: id.APIVersion, Namespace: id.Namespace, Name: id.Name},
			})
		}
	}

	var currRecommendations []*analysisv1alph1.Recommendation
	labelSet := labels.Set{}
	labelSet[known.AnalyticsUidLabel] = string(analytics.UID)
	currRecommendations, err = c.recommLister.Recommendations(analytics.Namespace).List(labels.SelectorFromSet(labelSet))
	if err != nil {
		c.Recorder.Event(analytics, corev1.EventTypeNormal, "FailedSelectResource", err.Error())
		msg := fmt.Sprintf("Failed to get recomendations, Analytics %s error %v", klog.KObj(analytics), err)
		klog.Errorf(msg)
		setReadyCondition(newStatus, metav1.ConditionFalse, "FailedSelectResource", msg)
		c.UpdateStatus(ctx, analytics, newStatus)
		return false
	}

	if klog.V(6).Enabled() {
		// Print identities
		for k, id := range identities {
			klog.V(6).InfoS("identities", "analytics", klog.KObj(analytics), "key", k, "apiVersion", id.APIVersion, "kind", id.Kind, "namespace", id.Namespace, "name", id.Name)
		}
	}

	maxConcurrency := 10
	executionIndex := -1
	var concurrency int
	for index, mission := range currMissions {
		if mission.LastStartTime != nil {
			continue
		}
		if executionIndex == -1 {
			executionIndex = index
		}
		if concurrency < maxConcurrency {
			concurrency++
		}
	}

	wg := sync.WaitGroup{}
	wg.Add(concurrency)
	for index := executionIndex; index < len(currMissions) && index < concurrency+executionIndex; index++ {
		var existingRecommendation *analysisv1alph1.Recommendation
		for _, r := range currRecommendations {
			if reflect.DeepEqual(currMissions[index].TargetRef, r.Spec.TargetRef) {
				existingRecommendation = r
				break
			}
		}

		go c.executeMission(ctx, &wg, analytics, identities, &currMissions[index], existingRecommendation, timeNow)
	}

	wg.Wait()

	finished := false
	if executionIndex+concurrency == len(currMissions) || len(currMissions) == 0 {
		finished = true
	}

	if finished {
		newStatus.LastUpdateTime = &timeNow

		// clean orphan recommendations
		for _, recommendation := range currRecommendations {
			exist := false
			for _, mission := range currMissions {
				if recommendation.UID == mission.UID {
					exist = true
					break
				}
			}

			if !exist {
				err = c.Client.Delete(ctx, recommendation)
				if err != nil {
					klog.ErrorS(err, "Failed to delete recommendation.", "recommendation", klog.KObj(recommendation))
				} else {
					klog.Infof("Deleted orphan recommendation %v.", klog.KObj(recommendation))
				}
			}
		}

	}

	newStatus.Recommendations = currMissions
	setReadyCondition(newStatus, metav1.ConditionTrue, "AnalyticsReady", "Analytics is ready")

	c.UpdateStatus(ctx, analytics, newStatus)
	return finished
}

func (c *Controller) CreateRecommendationObject(ctx context.Context, analytics *analysisv1alph1.Analytics,

	target corev1.ObjectReference, id ObjectIdentity) *analysisv1alph1.Recommendation {

	recommendation := &analysisv1alph1.Recommendation{
		ObjectMeta: metav1.ObjectMeta{
			GenerateName: fmt.Sprintf("%s-%s-", analytics.Name, strings.ToLower(string(analytics.Spec.Type))),
			Namespace:    analytics.Namespace,
			OwnerReferences: []metav1.OwnerReference{
				*newOwnerRef(analytics),
			},
			Labels: id.Labels,
		},
		Spec: analysisv1alph1.RecommendationSpec{
			TargetRef: target,
			Type:      analytics.Spec.Type,
		},
	}

	if recommendation.Labels == nil {
		recommendation.Labels = map[string]string{}
	}
	recommendation.Labels[known.AnalyticsNameLabel] = analytics.Name
	recommendation.Labels[known.AnalyticsUidLabel] = string(analytics.UID)
	recommendation.Labels[known.AnalyticsTypeLabel] = string(analytics.Spec.Type)

	return recommendation
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL