Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(controller): update status only if needed #341

Merged
merged 1 commit into from
Jan 16, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 91 additions & 22 deletions pkg/controllers/kepler_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func (r KeplerInternalReconciler) getInternal(ctx context.Context, req ctrl.Requ
func (r KeplerInternalReconciler) updateStatus(ctx context.Context, req ctrl.Request, recErr error) error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {

logger := r.logger.WithValues("keplerinternal", req.Name)
ki, _ := r.getInternal(ctx, req)
// may be deleted
if ki == nil || !ki.GetDeletionTimestamp().IsZero() {
Expand All @@ -160,13 +161,16 @@ func (r KeplerInternalReconciler) updateStatus(ctx context.Context, req ctrl.Req
return nil
}

ki.Status = v1alpha1.KeplerInternalStatus{
Exporter: v1alpha1.ExporterStatus{
Conditions: []v1alpha1.Condition{},
},
// sanitize the conditions so that all types are present and the order is predictable
ki.Status.Exporter.Conditions = sanitizeConditions(ki.Status.Exporter.Conditions)

updated := r.updateReconciledStatus(ctx, ki, recErr) ||
r.updateAvailableStatus(ctx, ki, recErr)

if !updated {
logger.V(6).Info("no changes to existing status; skipping update")
return nil
}
r.updateReconciledStatus(ctx, ki, recErr)
r.updateAvailableStatus(ctx, ki, recErr)

now := metav1.Now()
for i := range ki.Status.Exporter.Conditions {
Expand All @@ -178,12 +182,47 @@ func (r KeplerInternalReconciler) updateStatus(ctx context.Context, req ctrl.Req
})
}

func (r KeplerInternalReconciler) updateReconciledStatus(ctx context.Context, ki *v1alpha1.KeplerInternal, recErr error) {
func sanitizeConditions(conditions []v1alpha1.Condition) []v1alpha1.Condition {
required := map[v1alpha1.ConditionType]bool{
v1alpha1.Reconciled: false,
v1alpha1.Available: false,
}

if len(conditions) == len(required) {
return conditions
}

if len(conditions) == 0 {
return []v1alpha1.Condition{{
Type: v1alpha1.Reconciled,
Status: v1alpha1.ConditionFalse,
}, {
Type: v1alpha1.Available,
Status: v1alpha1.ConditionFalse,
}}
}

for _, c := range conditions {
required[c.Type] = true
}

for t, present := range required {
if !present {
conditions = append(conditions, v1alpha1.Condition{
Type: t,
Status: v1alpha1.ConditionFalse,
})
}
}
return conditions
}

func (r KeplerInternalReconciler) updateReconciledStatus(ctx context.Context, ki *v1alpha1.KeplerInternal, recErr error) bool {

reconciled := v1alpha1.Condition{
Type: v1alpha1.Reconciled,
ObservedGeneration: ki.Generation,
Status: v1alpha1.ConditionTrue,
ObservedGeneration: ki.Generation,
Reason: v1alpha1.ReconcileComplete,
Message: "Reconcile succeeded",
}
Expand All @@ -194,16 +233,46 @@ func (r KeplerInternalReconciler) updateReconciledStatus(ctx context.Context, ki
reconciled.Message = recErr.Error()
}

ki.Status.Exporter.Conditions = append(ki.Status.Exporter.Conditions, reconciled)
return updateCondition(ki.Status.Exporter.Conditions, reconciled)
}

func findCondition(conditions []v1alpha1.Condition, t v1alpha1.ConditionType) *v1alpha1.Condition {
for i, c := range conditions {
if c.Type == t {
return &conditions[i]
}
}
return nil
}

// returns true if the condition has been updated
func updateCondition(conditions []v1alpha1.Condition, latest v1alpha1.Condition) bool {

old := findCondition(conditions, latest.Type)
if old == nil {
panic("old condition not found; this should never happen after sanitizeConditions")
}

if old.ObservedGeneration == latest.ObservedGeneration &&
old.Status == latest.Status &&
old.Reason == latest.Reason &&
old.Message == latest.Message {
return false
}

old.ObservedGeneration = latest.ObservedGeneration
old.Status = latest.Status
old.Reason = latest.Reason
old.Message = latest.Message
return true
}

func (r KeplerInternalReconciler) updateAvailableStatus(ctx context.Context, ki *v1alpha1.KeplerInternal, recErr error) {
func (r KeplerInternalReconciler) updateAvailableStatus(ctx context.Context, ki *v1alpha1.KeplerInternal, recErr error) bool {
// get daemonset owned by kepler
dset := appsv1.DaemonSet{}
key := types.NamespacedName{Name: ki.DaemonsetName(), Namespace: ki.Namespace()}
if err := r.Client.Get(ctx, key, &dset); err != nil {
ki.Status.Exporter.Conditions = append(ki.Status.Exporter.Conditions, availableConditionForGetError(err))
return
return updateCondition(ki.Status.Exporter.Conditions, availableConditionForGetError(err))
}

ds := dset.Status
Expand All @@ -219,39 +288,39 @@ func (r KeplerInternalReconciler) updateAvailableStatus(ctx context.Context, ki
if recErr == nil {
c.ObservedGeneration = ki.Generation
}
ki.Status.Exporter.Conditions = append(ki.Status.Exporter.Conditions, c)

updated := updateCondition(ki.Status.Exporter.Conditions, c)

// update estimator status
estimatorStatus := &v1alpha1.EstimatorStatus{
estimatorStatus := v1alpha1.EstimatorStatus{
Status: v1alpha1.DeploymentNotInstalled,
}
if ki.Spec.Estimator != nil && len(dset.Spec.Template.Spec.Containers) > 1 {
// estimator enabled and has a sidecar container
estimatorStatus.Status = v1alpha1.DeploymentNotReady
if ds.NumberReady == ds.DesiredNumberScheduled {
estimatorStatus.Status = v1alpha1.DeploymentRunning
} else {
estimatorStatus.Status = v1alpha1.DeploymentNotReady
}
}
ki.Status.Estimator = *estimatorStatus

ki.Status.Estimator = estimatorStatus

// update model server status
modelServerStatus := &v1alpha1.ModelServerStatus{
modelServerStatus := v1alpha1.ModelServerStatus{
Status: v1alpha1.DeploymentNotInstalled,
}
if ki.Spec.ModelServer != nil {
key = types.NamespacedName{Name: ki.ModelServerDeploymentName(), Namespace: ki.Namespace()}
deploy := appsv1.Deployment{}
if err := r.Client.Get(ctx, key, &deploy); err == nil {
modelServerStatus.Status = v1alpha1.DeploymentNotReady
if deploy.Status.ReadyReplicas > 0 {
modelServerStatus.Status = v1alpha1.DeploymentRunning
} else {
modelServerStatus.Status = v1alpha1.DeploymentNotReady
}
}
}
ki.Status.ModelServer = *modelServerStatus

ki.Status.ModelServer = modelServerStatus
return updated
}

func availableConditionForGetError(err error) v1alpha1.Condition {
Expand Down
Loading