From aed9088c8a274124ba2b4fdcd94efb6e4879072e Mon Sep 17 00:00:00 2001 From: senmiaoliu Date: Fri, 27 Dec 2024 18:18:41 +0800 Subject: [PATCH] fix query timeout checker leak in chat engine and jdbc engine --- .../chat/operation/ExecuteStatement.scala | 20 +++++++++++++++---- .../jdbc/operation/ExecuteStatement.scala | 17 +++++++++++++--- 2 files changed, 30 insertions(+), 7 deletions(-) diff --git a/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ExecuteStatement.scala b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ExecuteStatement.scala index 754a519324f..f5d93d45d70 100644 --- a/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ExecuteStatement.scala +++ b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ExecuteStatement.scala @@ -16,7 +16,9 @@ */ package org.apache.kyuubi.engine.chat.operation -import org.apache.kyuubi.Logging +import java.util.concurrent.RejectedExecutionException + +import org.apache.kyuubi.{KyuubiSQLException, Logging} import org.apache.kyuubi.engine.chat.provider.ChatProvider import org.apache.kyuubi.operation.{ArrayFetchIterator, OperationState} import org.apache.kyuubi.operation.log.OperationLog @@ -41,9 +43,19 @@ class ExecuteStatement( executeStatement() } } - val chatSessionManager = session.sessionManager - val backgroundHandle = chatSessionManager.submitBackgroundOperation(asyncOperation) - setBackgroundHandle(backgroundHandle) + try { + val chatSessionManager = session.sessionManager + val backgroundHandle = chatSessionManager.submitBackgroundOperation(asyncOperation) + setBackgroundHandle(backgroundHandle) + } catch { + case rejected: RejectedExecutionException => + setState(OperationState.ERROR) + val ke = + KyuubiSQLException("Error submitting query in background, query rejected", rejected) + setOperationException(ke) + shutdownTimeoutMonitor() + throw ke + } } else { executeStatement() } diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/ExecuteStatement.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/ExecuteStatement.scala index af9e9a10274..d75c7f408cf 100644 --- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/ExecuteStatement.scala +++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/ExecuteStatement.scala @@ -17,6 +17,7 @@ package org.apache.kyuubi.engine.jdbc.operation import java.sql.{Connection, Statement, Types} +import java.util.concurrent.RejectedExecutionException import org.apache.kyuubi.{KyuubiSQLException, Logging} import org.apache.kyuubi.engine.jdbc.schema.{Column, Row, Schema} @@ -50,9 +51,19 @@ class ExecuteStatement( executeStatement() } } - val jdbcSessionManager = session.sessionManager - val backgroundHandle = jdbcSessionManager.submitBackgroundOperation(asyncOperation) - setBackgroundHandle(backgroundHandle) + try { + val jdbcSessionManager = session.sessionManager + val backgroundHandle = jdbcSessionManager.submitBackgroundOperation(asyncOperation) + setBackgroundHandle(backgroundHandle) + } catch { + case rejected: RejectedExecutionException => + setState(OperationState.ERROR) + val ke = + KyuubiSQLException("Error submitting query in background, query rejected", rejected) + setOperationException(ke) + shutdownTimeoutMonitor() + throw ke + } } else { executeStatement() }