Skip to content

Commit

Permalink
feat: add mandatory dep check; dequeue when app is initialized (#126)
Browse files Browse the repository at this point in the history
  • Loading branch information
hysyeah authored Jan 9, 2025
1 parent 1d67494 commit 022eb24
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 21 deletions.
9 changes: 5 additions & 4 deletions controllers/application_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,10 @@ func (r *ApplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request)
if err != nil {
if apierrors.IsNotFound(err) {
// check if a new deployment created or not
ctrl.Log.Info("create app from deployment watching", "name", validAppObject.GetName(), "namespace", validAppObject.GetNamespace())
ctrl.Log.Info("create app from deployment watching", "name", validAppObject.GetName(), "namespace", validAppObject.GetNamespace(), "appname", name)
err = r.createApplication(ctx, req, validAppObject, name)
if err != nil {
ctrl.Log.Info("create app failed", "appname", name, "err", err)
return ctrl.Result{}, err
}
continue
Expand Down Expand Up @@ -328,11 +329,11 @@ func (r *ApplicationReconciler) createApplication(ctx context.Context, req ctrl.
}
servicePortsMap, err := r.getAppPorts(ctx, deployment, isMultiApp)
if err != nil {
klog.Errorf("failed to get app ports err=%v", err)
klog.Warningf("get app ports err=%v", err)
}
tailScaleACLs, err := r.getAppACLs(deployment)
if err != nil {
klog.Errorf("failed to get app tailscale acls err=%v", err)
klog.Warningf("get app tailscale acls err=%v", err)
}

var appid string
Expand Down Expand Up @@ -392,7 +393,7 @@ func (r *ApplicationReconciler) createApplication(ctx context.Context, req ctrl.
app.Status.UpdateTime = &now

// set startedTime when app first become running
klog.Infof("createApplication: appState: %v", app.Status.State)
klog.Infof("createApplication:name: %s, appState: %v", app.Spec.Name, app.Status.State)
klog.Infof("createApplication,startedTime: %v", appCopy.Status.StartedTime.IsZero())
if app.Status.State == appv1alpha1.AppRunning.String() && appCopy.Status.StartedTime.IsZero() {
app.Status.StartedTime = &now
Expand Down
17 changes: 11 additions & 6 deletions controllers/appmgr_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

appv1alpha1 "bytetrade.io/web3os/app-service/api/app.bytetrade.io/v1alpha1"
"bytetrade.io/web3os/app-service/pkg/apiserver"
"bytetrade.io/web3os/app-service/pkg/apiserver/api"
"bytetrade.io/web3os/app-service/pkg/appinstaller"
"bytetrade.io/web3os/app-service/pkg/constants"
"bytetrade.io/web3os/app-service/pkg/generated/clientset/versioned"
Expand Down Expand Up @@ -136,15 +137,18 @@ func (r *ApplicationManagerController) Reconcile(ctx context.Context, req ctrl.R
func (r *ApplicationManagerController) reconcile(instance *appv1alpha1.ApplicationManager) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var err error

defer func() {
delete(manager, instance.Name)
req := reconcile.Request{NamespacedName: types.NamespacedName{
Namespace: instance.Spec.AppOwner,
}}
task.WQueue.(*task.Type).SetCompleted(req)
if !errors.Is(err, api.ErrLaunchFailed) {
req := reconcile.Request{NamespacedName: types.NamespacedName{
Namespace: instance.Spec.AppOwner,
}}
task.WQueue.(*task.Type).SetCompleted(req)
}

}()
var err error
klog.Infof("Start to perform operate=%s appName=%s", instance.Status.OpType, instance.Spec.AppName)

var curAppMgr appv1alpha1.ApplicationManager
Expand Down Expand Up @@ -312,7 +316,8 @@ func (r *ApplicationManagerController) install(ctx context.Context, appMgr *appv
var ops *appinstaller.HelmOps
defer func() {
if err != nil {
if err.Error() == "canceled" {
if errors.Is(err, api.ErrStartUpFailed) || errors.Is(err, api.ErrLaunchFailed) {
klog.Infof("app=%s installation is canceled", appMgr.Spec.AppName)
return
}
state := appv1alpha1.Failed
Expand Down
2 changes: 2 additions & 0 deletions pkg/apiserver/api/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ var (
// ErrResourceNotFound indicates that a resource is not found.
ErrResourceNotFound = errors.New("resource not found")
ErrGPUNodeNotFound = errors.New("no available gpu node found")
ErrStartUpFailed = errors.New("app started up failed")
ErrLaunchFailed = errors.New("app launched failed")
)
6 changes: 6 additions & 0 deletions pkg/apiserver/handler_installer.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ func (h *Handler) install(req *restful.Request, resp *restful.Response) {
api.HandleBadRequest(resp, req, err)
return
}
unSatisfiedDeps, _ := CheckDependencies(req.Request.Context(), appConfig.Dependencies, h.ctrlClient, owner, true)
if len(unSatisfiedDeps) > 0 {
api.HandleBadRequest(resp, req, FormatDependencyError(unSatisfiedDeps))
return
}

err = utils.CheckTailScaleACLs(appConfig.TailScaleACLs)
if err != nil {
api.HandleError(resp, req, err)
Expand Down
38 changes: 36 additions & 2 deletions pkg/apiserver/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,13 @@ func CheckDependencies(ctx context.Context, deps []appinstaller.Dependency, ctrl
}
}
if dep.Type == constants.DependencyTypeApp {
if !set.Has(dep.Name) {
if !set.Has(dep.Name) && dep.Mandatory {
unSatisfiedDeps = append(unSatisfiedDeps, dep)
if !checkAll {
return unSatisfiedDeps, fmt.Errorf("dependency application %s not existed", dep.Name)
}
}
if !utils.MatchVersion(appToVersion[dep.Name], dep.Version) {
if !utils.MatchVersion(appToVersion[dep.Name], dep.Version) && dep.Mandatory {
unSatisfiedDeps = append(unSatisfiedDeps, dep)
if !checkAll {
return unSatisfiedDeps, fmt.Errorf("%s version: %s not match dependency %s", dep.Name, appToVersion[dep.Name], dep.Version)
Expand Down Expand Up @@ -640,3 +640,37 @@ func CheckMiddlewareRequirement(ctx context.Context, kubeConfig *rest.Config, mi
}
return true, nil
}

func FormatDependencyError(deps []appinstaller.Dependency) error {
var systemDeps, appDeps []string

for _, dep := range deps {
depInfo := fmt.Sprintf("%s version=%s",
dep.Name, dep.Version)

if dep.Type == "system" {
systemDeps = append(systemDeps, depInfo)
} else if dep.Type == "application" {
appDeps = append(appDeps, depInfo)
}
}

var errMsg strings.Builder
errMsg.WriteString("Missing dependencies:\n")

if len(systemDeps) > 0 {
errMsg.WriteString("\nSystem Dependencies:\n")
for _, dep := range systemDeps {
errMsg.WriteString(fmt.Sprintf("- %s\n", dep))
}
}

if len(appDeps) > 0 {
errMsg.WriteString("\nApplication Dependencies:\n")
for _, dep := range appDeps {
errMsg.WriteString(fmt.Sprintf("- %s\n", dep))
}
}

return fmt.Errorf(errMsg.String())
}
3 changes: 2 additions & 1 deletion pkg/appinstaller/appcfg_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ type Dependency struct {
Name string `yaml:"name" json:"name"`
Version string `yaml:"version" json:"version"`
// dependency type: system, application.
Type string `yaml:"type" json:"type"`
Type string `yaml:"type" json:"type"`
Mandatory bool `json:"mandatory"`
}

type Options struct {
Expand Down
27 changes: 19 additions & 8 deletions pkg/appinstaller/helm.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ import (
"time"

appv1alpha1 "bytetrade.io/web3os/app-service/api/app.bytetrade.io/v1alpha1"
"bytetrade.io/web3os/app-service/pkg/apiserver/api"
"bytetrade.io/web3os/app-service/pkg/client/clientset"
"bytetrade.io/web3os/app-service/pkg/constants"
"bytetrade.io/web3os/app-service/pkg/generated/clientset/versioned"
"bytetrade.io/web3os/app-service/pkg/helm"
"bytetrade.io/web3os/app-service/pkg/kubesphere"
"bytetrade.io/web3os/app-service/pkg/tapr"
"bytetrade.io/web3os/app-service/pkg/task"
"bytetrade.io/web3os/app-service/pkg/users/userspace"
userspacev1 "bytetrade.io/web3os/app-service/pkg/users/userspace/v1"
"bytetrade.io/web3os/app-service/pkg/utils"
Expand All @@ -39,6 +41,7 @@ import (
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

var (
Expand Down Expand Up @@ -146,13 +149,13 @@ func (h *HelmOps) Install() error {
return err
}
}
ok := h.waitForLaunch()
ok, err := h.waitForLaunch()

if !ok {
// install operation has been canceled, so to uninstall it.
h.Uninstall()
//return context.Canceled
return errors.New("canceled")
return err
}
klog.Infof("app: %s launched success", h.app.AppName)

Expand Down Expand Up @@ -830,11 +833,11 @@ func (h *HelmOps) upgrade() error {
return err
}

ok := h.waitForLaunch()
ok, err := h.waitForLaunch()
if !ok {
// canceled
h.rollBack()

return err
}

return nil
Expand Down Expand Up @@ -941,11 +944,19 @@ func (h *HelmOps) createOIDCClient(values map[string]interface{}, userZone, name
return nil
}

func (h *HelmOps) waitForLaunch() bool {
func (h *HelmOps) waitForLaunch() (bool, error) {
ok := h.waitForStartUp()
if !ok {
return false
return false, api.ErrStartUpFailed
}

req := reconcile.Request{NamespacedName: types.NamespacedName{
Namespace: h.app.OwnerName,
}}
task.WQueue.(*task.Type).SetCompleted(req)

klog.Infof("dequeue username:%s,appname:%s", h.app.OwnerName, h.app.AppName)

timer := time.NewTicker(2 * time.Second)
entrances := h.app.Entrances
entranceCount := len(entrances)
Expand All @@ -961,12 +972,12 @@ func (h *HelmOps) waitForLaunch() bool {
}
}
if entranceCount == count {
return true
return true, nil
}

case <-h.ctx.Done():
klog.Infof("Waiting for launch canceled appName=%s", h.app.AppName)
return false
return false, api.ErrLaunchFailed
}
}
}
Expand Down

0 comments on commit 022eb24

Please sign in to comment.