diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala index 41b4d448178..5db56809179 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala @@ -73,6 +73,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { private var expireCleanUpTriggerCacheExecutor: ScheduledExecutorService = _ private var cleanupCanceledAppPodExecutor: ThreadPoolExecutor = _ + private var cleanupConfigMapExecutor: ThreadPoolExecutor = _ private def getOrCreateKubernetesClient(kubernetesInfo: KubernetesInfo): KubernetesClient = { checkKubernetesInfo(kubernetesInfo) @@ -165,6 +166,8 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { TimeUnit.MILLISECONDS) cleanupCanceledAppPodExecutor = ThreadUtils.newDaemonCachedThreadPool( "cleanup-canceled-app-pod-thread") + cleanupConfigMapExecutor = ThreadUtils.newDaemonCachedThreadPool( + "cleanup-config-map-thread") } override def isSupported(appMgrInfo: ApplicationManagerInfo): Boolean = { @@ -278,6 +281,48 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { private class SparkEnginePodEventHandler(kubernetesInfo: KubernetesInfo) extends ResourceEventHandler[Pod] { + private def runConfigMapDeletion(pod: Pod, kubernetesInfo: KubernetesInfo): Unit = { + cleanupConfigMapExecutor.submit(new Runnable { + override def run(): Unit = Utils.tryLogNonFatalError { + try { + val volumes = pod.getSpec.getVolumes.asScala + val configMapVolume = volumes.find(_.getConfigMap != null) + .map(_.getConfigMap.getName) + .find(_.contains("spark-exec")) + + configMapVolume match { + case Some(configMapName) => + val kubernetesClient = getOrCreateKubernetesClient(kubernetesInfo) + val statusDetailsList = + kubernetesClient.configMaps().withName(configMapName).delete() + + val deletionSuccessful = + statusDetailsList != null && statusDetailsList.asScala.nonEmpty + if (deletionSuccessful) { + info( + s"[$kubernetesInfo] ConfigMap $configMapName associated with " + + s"pod ${pod.getMetadata.getName} deleted successfully.") + } else { + warn( + s"[$kubernetesInfo] Failed to delete ConfigMap " + + s"$configMapName associated with " + + s"pod ${pod.getMetadata.getName}.") + } + case None => + warn( + s"[$kubernetesInfo] No ConfigMap volume found " + + s"for pod ${pod.getMetadata.getName}.") + } + } catch { + case NonFatal(e) => error( + s"[$kubernetesInfo] Failed to delete ConfigMap associated with " + + s"pod ${pod.getMetadata.getName}", + e) + } + } + }) + } + override def onAdd(pod: Pod): Unit = { if (isSparkEnginePod(pod)) { updateApplicationState(kubernetesInfo, pod) @@ -320,6 +365,8 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { appStateSource, appStateContainer) } + + runConfigMapDeletion(pod, kubernetesInfo) } }