Skip to content

Commit

Permalink
Merge pull request #341 from sthaha/fix-infinite-recon
Browse files Browse the repository at this point in the history
fix(controller): update status only if needed
  • Loading branch information
sthaha authored Jan 16, 2024
2 parents 7b99b64 + 00a6b48 commit 9debe42
Showing 1 changed file with 91 additions and 22 deletions.
113 changes: 91 additions & 22 deletions pkg/controllers/kepler_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func (r KeplerInternalReconciler) getInternal(ctx context.Context, req ctrl.Requ
func (r KeplerInternalReconciler) updateStatus(ctx context.Context, req ctrl.Request, recErr error) error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {

logger := r.logger.WithValues("keplerinternal", req.Name)
ki, _ := r.getInternal(ctx, req)
// may be deleted
if ki == nil || !ki.GetDeletionTimestamp().IsZero() {
Expand All @@ -160,13 +161,16 @@ func (r KeplerInternalReconciler) updateStatus(ctx context.Context, req ctrl.Req
return nil
}

ki.Status = v1alpha1.KeplerInternalStatus{
Exporter: v1alpha1.ExporterStatus{
Conditions: []v1alpha1.Condition{},
},
// sanitize the conditions so that all types are present and the order is predictable
ki.Status.Exporter.Conditions = sanitizeConditions(ki.Status.Exporter.Conditions)

updated := r.updateReconciledStatus(ctx, ki, recErr) ||
r.updateAvailableStatus(ctx, ki, recErr)

if !updated {
logger.V(6).Info("no changes to existing status; skipping update")
return nil
}
r.updateReconciledStatus(ctx, ki, recErr)
r.updateAvailableStatus(ctx, ki, recErr)

now := metav1.Now()
for i := range ki.Status.Exporter.Conditions {
Expand All @@ -178,12 +182,47 @@ func (r KeplerInternalReconciler) updateStatus(ctx context.Context, req ctrl.Req
})
}

func (r KeplerInternalReconciler) updateReconciledStatus(ctx context.Context, ki *v1alpha1.KeplerInternal, recErr error) {
func sanitizeConditions(conditions []v1alpha1.Condition) []v1alpha1.Condition {
required := map[v1alpha1.ConditionType]bool{
v1alpha1.Reconciled: false,
v1alpha1.Available: false,
}

if len(conditions) == len(required) {
return conditions
}

if len(conditions) == 0 {
return []v1alpha1.Condition{{
Type: v1alpha1.Reconciled,
Status: v1alpha1.ConditionFalse,
}, {
Type: v1alpha1.Available,
Status: v1alpha1.ConditionFalse,
}}
}

for _, c := range conditions {
required[c.Type] = true
}

for t, present := range required {
if !present {
conditions = append(conditions, v1alpha1.Condition{
Type: t,
Status: v1alpha1.ConditionFalse,
})
}
}
return conditions
}

func (r KeplerInternalReconciler) updateReconciledStatus(ctx context.Context, ki *v1alpha1.KeplerInternal, recErr error) bool {

reconciled := v1alpha1.Condition{
Type: v1alpha1.Reconciled,
ObservedGeneration: ki.Generation,
Status: v1alpha1.ConditionTrue,
ObservedGeneration: ki.Generation,
Reason: v1alpha1.ReconcileComplete,
Message: "Reconcile succeeded",
}
Expand All @@ -194,16 +233,46 @@ func (r KeplerInternalReconciler) updateReconciledStatus(ctx context.Context, ki
reconciled.Message = recErr.Error()
}

ki.Status.Exporter.Conditions = append(ki.Status.Exporter.Conditions, reconciled)
return updateCondition(ki.Status.Exporter.Conditions, reconciled)
}

func findCondition(conditions []v1alpha1.Condition, t v1alpha1.ConditionType) *v1alpha1.Condition {
for i, c := range conditions {
if c.Type == t {
return &conditions[i]
}
}
return nil
}

// returns true if the condition has been updated
func updateCondition(conditions []v1alpha1.Condition, latest v1alpha1.Condition) bool {

old := findCondition(conditions, latest.Type)
if old == nil {
panic("old condition not found; this should never happen after sanitizeConditions")
}

if old.ObservedGeneration == latest.ObservedGeneration &&
old.Status == latest.Status &&
old.Reason == latest.Reason &&
old.Message == latest.Message {
return false
}

old.ObservedGeneration = latest.ObservedGeneration
old.Status = latest.Status
old.Reason = latest.Reason
old.Message = latest.Message
return true
}

func (r KeplerInternalReconciler) updateAvailableStatus(ctx context.Context, ki *v1alpha1.KeplerInternal, recErr error) {
func (r KeplerInternalReconciler) updateAvailableStatus(ctx context.Context, ki *v1alpha1.KeplerInternal, recErr error) bool {
// get daemonset owned by kepler
dset := appsv1.DaemonSet{}
key := types.NamespacedName{Name: ki.DaemonsetName(), Namespace: ki.Namespace()}
if err := r.Client.Get(ctx, key, &dset); err != nil {
ki.Status.Exporter.Conditions = append(ki.Status.Exporter.Conditions, availableConditionForGetError(err))
return
return updateCondition(ki.Status.Exporter.Conditions, availableConditionForGetError(err))
}

ds := dset.Status
Expand All @@ -219,39 +288,39 @@ func (r KeplerInternalReconciler) updateAvailableStatus(ctx context.Context, ki
if recErr == nil {
c.ObservedGeneration = ki.Generation
}
ki.Status.Exporter.Conditions = append(ki.Status.Exporter.Conditions, c)

updated := updateCondition(ki.Status.Exporter.Conditions, c)

// update estimator status
estimatorStatus := &v1alpha1.EstimatorStatus{
estimatorStatus := v1alpha1.EstimatorStatus{
Status: v1alpha1.DeploymentNotInstalled,
}
if ki.Spec.Estimator != nil && len(dset.Spec.Template.Spec.Containers) > 1 {
// estimator enabled and has a sidecar container
estimatorStatus.Status = v1alpha1.DeploymentNotReady
if ds.NumberReady == ds.DesiredNumberScheduled {
estimatorStatus.Status = v1alpha1.DeploymentRunning
} else {
estimatorStatus.Status = v1alpha1.DeploymentNotReady
}
}
ki.Status.Estimator = *estimatorStatus

ki.Status.Estimator = estimatorStatus

// update model server status
modelServerStatus := &v1alpha1.ModelServerStatus{
modelServerStatus := v1alpha1.ModelServerStatus{
Status: v1alpha1.DeploymentNotInstalled,
}
if ki.Spec.ModelServer != nil {
key = types.NamespacedName{Name: ki.ModelServerDeploymentName(), Namespace: ki.Namespace()}
deploy := appsv1.Deployment{}
if err := r.Client.Get(ctx, key, &deploy); err == nil {
modelServerStatus.Status = v1alpha1.DeploymentNotReady
if deploy.Status.ReadyReplicas > 0 {
modelServerStatus.Status = v1alpha1.DeploymentRunning
} else {
modelServerStatus.Status = v1alpha1.DeploymentNotReady
}
}
}
ki.Status.ModelServer = *modelServerStatus

ki.Status.ModelServer = modelServerStatus
return updated
}

func availableConditionForGetError(err error) v1alpha1.Condition {
Expand Down

0 comments on commit 9debe42

Please sign in to comment.