diff --git a/src/main/java/com/alipay/oceanbase/rpc/direct_load/ObDirectLoadConnection.java b/src/main/java/com/alipay/oceanbase/rpc/direct_load/ObDirectLoadConnection.java index 23ce986e..3e524b74 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/direct_load/ObDirectLoadConnection.java +++ b/src/main/java/com/alipay/oceanbase/rpc/direct_load/ObDirectLoadConnection.java @@ -52,6 +52,8 @@ public class ObDirectLoadConnection { private long heartBeatTimeout = 0; private long heartBeatInterval = 0; + private long connectTimeout = 0; + private boolean isInited = false; private boolean isClosed = false; @@ -137,6 +139,8 @@ private void fillParams(Builder builder) throws ObDirectLoadException { heartBeatInterval = builder.heartBeatInterval; writeConnectionNum = builder.writeConnectionNum; + + connectTimeout = builder.connectTimeout; } private void initCheck() throws ObDirectLoadException { @@ -167,6 +171,7 @@ private void initCheck() throws ObDirectLoadException { "Param 'heartBeatInterval' must not be greater than or equal to Param 'heartBeatTimeout', heartBeatTimeout:" + heartBeatTimeout + ", heartBeatInterval:" + heartBeatInterval); } + ObDirectLoadUtil.checkPositive(connectTimeout, "connectTimeout", logger); } private void initProtocol() throws ObDirectLoadException { @@ -178,6 +183,8 @@ private void initProtocol() throws ObDirectLoadException { Properties properties = new Properties(); properties.setProperty(Property.SERVER_CONNECTION_POOL_SIZE.getKey(), String.valueOf(1)); + properties.setProperty(Property.RPC_CONNECT_TIMEOUT.getKey(), + String.valueOf(connectTimeout)); table = new ObTable.Builder(ip, port) .setLoginInfo(tenantName, userName, password, databaseName) .setProperties(properties).build(); @@ -276,6 +283,8 @@ public static final class Builder { private static final long MAX_HEART_BEAT_TIMEOUT = 1L * 365 * 24 * 3600 * 1000; // 1year + private long connectTimeout = 1000; + Builder(ObDirectLoadConnectionFactory connectionFactory) { this.connectionFactory = connectionFactory; } @@ -306,12 +315,17 @@ public Builder setHeartBeatInfo(long heartBeatTimeout, long heartBeatInterval) { return this; } + public Builder setConnectTimeout(long connectTimeout) { + this.connectTimeout = Math.min(connectTimeout, (long) Integer.MAX_VALUE); + return this; + } + public String toString() { return String .format( - "{ip:\"%s\", port:%d, tenantName:\"%s\", userName:\"%s\", databaseName:\"%s\", writeConnectionNum:%d, heartBeatTimeout:%d, heartBeatInterval:%d}", + "{ip:\"%s\", port:%d, tenantName:\"%s\", userName:\"%s\", databaseName:\"%s\", writeConnectionNum:%d, heartBeatTimeout:%d, heartBeatInterval:%d, connectTimeout:%d}", ip, port, tenantName, userName, databaseName, writeConnectionNum, - heartBeatTimeout, heartBeatInterval); + heartBeatTimeout, heartBeatInterval, connectTimeout); } public ObDirectLoadConnection build() throws ObDirectLoadException { @@ -370,6 +384,8 @@ private void initTables() throws ObDirectLoadException { Properties properties = new Properties(); properties .setProperty(Property.SERVER_CONNECTION_POOL_SIZE.getKey(), String.valueOf(1)); + properties.setProperty(Property.RPC_CONNECT_TIMEOUT.getKey(), + String.valueOf(connection.connectTimeout)); properties.setProperty(Property.RPC_EXECUTE_TIMEOUT.getKey(), String.valueOf(timeoutMillis)); properties.setProperty(Property.RPC_OPERATION_TIMEOUT.getKey(),