Skip to content

Commit

Permalink
Merge pull request #203 from taosdata/main
Browse files Browse the repository at this point in the history
merge from main
  • Loading branch information
sheyanjie-qq authored Dec 2, 2024
2 parents 51ba2ef + 95c1efb commit 75f7afd
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 33 deletions.
17 changes: 15 additions & 2 deletions .github/workflows/version3.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:
geos-config --version
- name: install TDengine
run: cd TDengine && mkdir debug && cd debug && cmake .. -DBUILD_JDBC=false -DBUILD_TOOLS=false -DBUILD_HTTP=false && make && sudo make install
run: cd TDengine && mkdir debug && cd debug && cmake .. -DBUILD_JDBC=false -DBUILD_TOOLS=false -DBUILD_HTTP=false -DBUILD_DEPENDENCY_TESTS=false && make && sudo make install

- name: shell
run: |
Expand Down Expand Up @@ -69,4 +69,17 @@ jobs:
- name: Test
env:
TDENGINE_CLOUD_URL: ${{ secrets.TDENGINE_CLOUD_URL }}
run: mvn -B clean test --file pom.xml
run: mvn -B clean verify --file pom.xml

- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4
with:
token: ${{ secrets.CODECOV_TOKEN }}
files: target/site/jacoco/jacoco.xml
flags: unittests
name: codecov-umbrella
fail_ci_if_error: false
verbose: true



14 changes: 10 additions & 4 deletions deploy-pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>3.4.0</version>
<version>3.4.1</version>
<packaging>jar</packaging>

<name>JDBCDriver</name>
Expand Down Expand Up @@ -41,6 +41,7 @@
<product.version>3.0.0.0</product.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<jackson.version>2.18.0</jackson.version>
</properties>

<dependencies>
Expand All @@ -57,9 +58,14 @@
<version>4.5.13</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
Expand Down
48 changes: 46 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>3.4.0</version>
<version>3.4.1</version>
<packaging>jar</packaging>
<name>JDBCDriver</name>
<url>https://github.com/taosdata/taos-connector-jdbc</url>
Expand Down Expand Up @@ -95,6 +95,50 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>0.8.12</version>
<executions>
<!-- Prepare JaCoCo agent before tests run -->
<execution>
<id>prepare-agent</id>
<goals>
<goal>prepare-agent</goal>
</goals>
</execution>
<!-- Generate JaCoCo report after tests run -->
<execution>
<id>report</id>
<goals>
<goal>report</goal>
</goals>

<phase>verify</phase>
</execution>
<!-- Optionally check coverage thresholds -->
<execution>
<id>check</id>
<goals>
<goal>check</goal>
</goals>
<configuration>
<rules>
<rule>
<element>BUNDLE</element>
<limits>
<limit>
<counter>INSTRUCTION</counter>
<value>COVEREDRATIO</value>
<minimum>0.60</minimum>
</limit>
</limits>
</rule>
</rules>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
Expand Down Expand Up @@ -132,7 +176,6 @@
<version>3.0.0-M6</version>
<configuration>
<forkMode>pertest</forkMode>
<argLine>${maven.test.jvmargs}</argLine>
<includes>
<include>**/*Test.java</include>
</includes>
Expand All @@ -151,6 +194,7 @@
<exclude>**/TimeZoneTest.java</exclude>
</excludes>
<testFailureIgnore>false</testFailureIgnore>
<argLine>${argLine}</argLine>
</configuration>
</plugin>
</plugins>
Expand Down
86 changes: 61 additions & 25 deletions src/main/java/com/taosdata/jdbc/ws/AbstractWSResultSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ public abstract class AbstractWSResultSet extends AbstractResultSet {
BlockingQueue<BlockData> blockingQueueOut = new LinkedBlockingQueue<>(CACHE_SIZE);
ThreadPoolExecutor backFetchExecutor;
ForkJoinPool dataHandleExecutor = getForkJoinPool();

private int fetchBlockNum = 0;
private final int START_BACKEND_FETCH_BLOCK_NUM = 3;
protected AbstractWSResultSet(Statement statement, Transport transport,
QueryResp response, String database) throws SQLException {
this.statement = statement;
Expand All @@ -57,7 +60,9 @@ protected AbstractWSResultSet(Statement statement, Transport transport,
}
this.metaData = new RestfulResultSetMetaData(database, fields);
this.timestampPrecision = response.getPrecision();
}

private void startBackendFetch(){
backFetchExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
backFetchExecutor.submit(() -> {
try {
Expand Down Expand Up @@ -126,26 +131,55 @@ public boolean next() throws SQLException {
return true;
}

BlockData blockData;
try {
blockData = blockingQueueOut.take();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "FETCH DATA INTERRUPTED");
}
fetchBlockNum++;
if (fetchBlockNum > START_BACKEND_FETCH_BLOCK_NUM) {
if (backFetchExecutor == null) {
startBackendFetch();
}
BlockData blockData;
try {
blockData = blockingQueueOut.take();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "FETCH DATA INTERRUPTED");
}

if (blockData.getReturnCode() != Code.SUCCESS.getCode()){
throw TSDBError.createSQLException(blockData.getReturnCode(), "FETCH DATA ERROR");
}
this.reset();
if (blockData.isCompleted()){
this.isCompleted = true;
return false;
if (blockData.getReturnCode() != Code.SUCCESS.getCode()) {
throw TSDBError.createSQLException(blockData.getReturnCode(), "FETCH DATA ERROR");
}
this.reset();
if (blockData.isCompleted()) {
this.isCompleted = true;
return false;
}
blockData.waitTillOK();
this.result = blockData.getData();
this.numOfRows = blockData.getNumOfRows();
} else {

byte[] version = {1, 0};
FetchBlockNewResp resp = (FetchBlockNewResp) transport.send(Action.FETCH_BLOCK_NEW.getAction(),
reqId, queryId, 7, version);
resp.init();

if (Code.SUCCESS.getCode() != resp.getCode()) {
throw TSDBError.createSQLException(resp.getCode(), "FETCH DATA ERROR");
}
this.reset();
BlockData blockData = BlockData.getEmptyBlockData(fields);

if (resp.isCompleted() || isClosed) {
blockData.setCompleted(true);
return false;
}

blockData.setBuffer(resp.getBuffer());
blockData.handleData();

this.result = blockData.getData();
this.numOfRows = blockData.getNumOfRows();
}
blockData.waitTillOK();

this.result = blockData.getData();
this.numOfRows = blockData.getNumOfRows();
return true;
}

Expand All @@ -156,15 +190,17 @@ public void close() throws SQLException {
this.isClosed = true;

// wait backFetchExecutor to finish
while (backFetchExecutor.getActiveCount() != 0) {
try {
Thread.sleep(1);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
if (backFetchExecutor != null) {
while (backFetchExecutor.getActiveCount() != 0) {
try {
Thread.sleep(1);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}
if (!backFetchExecutor.isShutdown()) {
backFetchExecutor.shutdown();
}
}
if (!backFetchExecutor.isShutdown()){
backFetchExecutor.shutdown();
}

if (result != null && !result.isEmpty() && !isCompleted) {
Expand Down
71 changes: 71 additions & 0 deletions src/test/java/com/taosdata/jdbc/ws/WSBigQueryTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.taosdata.jdbc.ws;

import com.taosdata.jdbc.utils.SpecifyAddress;
import org.junit.*;

import java.sql.*;
import java.util.AbstractCollection;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Properties;

@FixMethodOrder
public class WSBigQueryTest {
String host = "127.0.0.1";
String db_name = "ws_prepare";
String tableName = "wpt";
String superTable = "wpt_st";
Connection connection;


@Test
public void testExecuteBatchInsert() throws SQLException {
String sql = "insert into " + db_name + "." + tableName + " (ts, c1) values(?, ?)";
PreparedStatement statement = connection.prepareStatement(sql);
HashSet<Object> collect = new HashSet<>();

for (int i = 0; i < 20000; i++) {
collect.add(i);
statement.setTimestamp(1, new Timestamp(System.currentTimeMillis() + i));
statement.setInt(2, i);
statement.addBatch();
}
statement.executeBatch();

String sql1 = "select * from " + db_name + "." + tableName;
statement = connection.prepareStatement(sql1);
boolean b = statement.execute();
Assert.assertTrue(b);
ResultSet resultSet = statement.getResultSet();
while (resultSet.next()) {
Assert.assertTrue(collect.contains(resultSet.getInt(2)));
}
statement.close();
}

@Before
public void before() throws SQLException {
String url = SpecifyAddress.getInstance().getRestWithoutUrl();
if (url == null) {
url = "jdbc:TAOS-RS://" + host + ":6041/?user=root&password=taosdata&batchfetch=true";
} else {
url += "?user=root&password=taosdata&batchfetch=true";
}
Properties properties = new Properties();
connection = DriverManager.getConnection(url, properties);
Statement statement = connection.createStatement();
statement.execute("drop database if exists " + db_name);
statement.execute("create database " + db_name);
statement.execute("use " + db_name);
statement.execute("create table if not exists " + db_name + "." + tableName + " (ts timestamp, c1 int)");
statement.close();
}

@After
public void after() throws SQLException {
try (Statement statement = connection.createStatement()) {
statement.execute("drop database if exists " + db_name);
}
connection.close();
}
}

0 comments on commit 75f7afd

Please sign in to comment.