From f64e910ddec09bfbe8f43b3905194671a1fa2396 Mon Sep 17 00:00:00 2001 From: sheyanjie Date: Fri, 24 Nov 2023 18:03:09 +0800 Subject: [PATCH 1/2] auto commit msg by jdbc own --- .../java/com/taosdata/jdbc/ws/tmq/WSConsumer.java | 13 ++++++++++--- .../taosdata/jdbc/ws/tmq/entity/ConsumerParam.java | 13 +++++++++---- .../jdbc/ws/tmq/entity/TMQRequestFactory.java | 3 +-- .../com/taosdata/jdbc/ws/WSConsumerNullTest.java | 1 + .../jdbc/ws/tmq/entity/TMQRequestFactoryTest.java | 2 +- 5 files changed, 22 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/taosdata/jdbc/ws/tmq/WSConsumer.java b/src/main/java/com/taosdata/jdbc/ws/tmq/WSConsumer.java index 5f50d732..41dba1ac 100644 --- a/src/main/java/com/taosdata/jdbc/ws/tmq/WSConsumer.java +++ b/src/main/java/com/taosdata/jdbc/ws/tmq/WSConsumer.java @@ -27,6 +27,7 @@ public class WSConsumer implements Consumer { private Transport transport; private ConsumerParam param; private TMQRequestFactory factory; + private long lastCommitTime = 0; private long messageId = 0L; @Override @@ -71,8 +72,7 @@ public void subscribe(Collection topics) throws SQLException { , param.getClientId() , param.getOffsetRest() , topics.toArray(new String[0]) - , String.valueOf(param.isAutoCommit()) - , param.getAutoCommitInterval() + , String.valueOf(false) , param.getMsgWithTableName() ); SubscribeResp response = (SubscribeResp) transport.send(request); @@ -105,6 +105,14 @@ public Set subscription() throws SQLException { @Override public ConsumerRecords poll(Duration timeout, Deserializer deserializer) throws SQLException { + if (param.isAutoCommit() && (0 != messageId)) { + long now = System.currentTimeMillis(); + if (now - lastCommitTime > param.getAutoCommitInterval()) { + commitSync(); + lastCommitTime = now; + } + } + Request request = factory.generatePoll(timeout.toMillis()); PollResp pollResp = (PollResp) transport.send(request); if (Code.SUCCESS.getCode() != pollResp.getCode()) { @@ -142,7 +150,6 @@ public synchronized void commitSync() throws SQLException { CommitResp commitResp = (CommitResp) transport.send(factory.generateCommit(messageId)); if (Code.SUCCESS.getCode() != commitResp.getCode()) throw new SQLException("consumer commit error. code: (0x" + Integer.toHexString(commitResp.getCode()) + "), message: " + commitResp.getMessage()); - messageId = 0; } } diff --git a/src/main/java/com/taosdata/jdbc/ws/tmq/entity/ConsumerParam.java b/src/main/java/com/taosdata/jdbc/ws/tmq/entity/ConsumerParam.java index 051df0f2..cbeb3aa4 100644 --- a/src/main/java/com/taosdata/jdbc/ws/tmq/entity/ConsumerParam.java +++ b/src/main/java/com/taosdata/jdbc/ws/tmq/entity/ConsumerParam.java @@ -1,6 +1,8 @@ package com.taosdata.jdbc.ws.tmq.entity; import com.taosdata.jdbc.TSDBDriver; +import com.taosdata.jdbc.TSDBError; +import com.taosdata.jdbc.TSDBErrorNumbers; import com.taosdata.jdbc.rs.ConnectionParam; import com.taosdata.jdbc.tmq.TMQConstants; import com.taosdata.jdbc.utils.StringUtils; @@ -14,7 +16,7 @@ public class ConsumerParam { private String clientId; private String offsetRest; private final boolean autoCommit; - private String autoCommitInterval; + private long autoCommitInterval; private String msgWithTableName; public ConsumerParam(Properties properties) throws SQLException { @@ -35,7 +37,10 @@ public ConsumerParam(Properties properties) throws SQLException { groupId = properties.getProperty(TMQConstants.GROUP_ID); clientId = properties.getProperty(TMQConstants.CLIENT_ID); offsetRest = properties.getProperty(TMQConstants.AUTO_OFFSET_RESET); - autoCommitInterval = properties.getProperty(TMQConstants.AUTO_COMMIT_INTERVAL); + autoCommitInterval = Long.parseLong(properties.getProperty(TMQConstants.AUTO_COMMIT_INTERVAL, "5000")); + if (autoCommitInterval < 0){ + throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_TMQ_CONF_ERROR, "autoCommitInterval must be greater than 0"); + } msgWithTableName = properties.getProperty(TMQConstants.MSG_WITH_TABLE_NAME); } @@ -75,11 +80,11 @@ public boolean isAutoCommit() { return autoCommit; } - public String getAutoCommitInterval() { + public long getAutoCommitInterval() { return autoCommitInterval; } - public void setAutoCommitInterval(String autoCommitInterval) { + public void setAutoCommitInterval(long autoCommitInterval) { this.autoCommitInterval = autoCommitInterval; } diff --git a/src/main/java/com/taosdata/jdbc/ws/tmq/entity/TMQRequestFactory.java b/src/main/java/com/taosdata/jdbc/ws/tmq/entity/TMQRequestFactory.java index 69999b9f..a63ff3bd 100644 --- a/src/main/java/com/taosdata/jdbc/ws/tmq/entity/TMQRequestFactory.java +++ b/src/main/java/com/taosdata/jdbc/ws/tmq/entity/TMQRequestFactory.java @@ -26,7 +26,7 @@ public TMQRequestFactory() { public Request generateSubscribe(String user, String password, String db, String groupId, String clientId, String offsetRest, String[] topics - , String enableAutoCommit, String autoCommitIntervalMs, String withTableName) { + , String enableAutoCommit, String withTableName) { long reqId = this.getId(ConsumerAction.SUBSCRIBE.getAction()); SubscribeReq subscribeReq = new SubscribeReq(); @@ -39,7 +39,6 @@ public Request generateSubscribe(String user, String password, String db, String subscribeReq.setOffsetRest(offsetRest); subscribeReq.setTopics(topics); subscribeReq.setAutoCommit(enableAutoCommit); - subscribeReq.setAutoCommitIntervalMs(autoCommitIntervalMs); subscribeReq.setWithTableName(withTableName); return new Request(ConsumerAction.SUBSCRIBE.getAction(), subscribeReq); } diff --git a/src/test/java/com/taosdata/jdbc/ws/WSConsumerNullTest.java b/src/test/java/com/taosdata/jdbc/ws/WSConsumerNullTest.java index 4c346e47..84420859 100644 --- a/src/test/java/com/taosdata/jdbc/ws/WSConsumerNullTest.java +++ b/src/test/java/com/taosdata/jdbc/ws/WSConsumerNullTest.java @@ -76,6 +76,7 @@ public static void before() throws SQLException { // properties.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "true"); connection = DriverManager.getConnection(url, properties); statement = connection.createStatement(); + statement.execute("drop topic if exists topic_ws_bean_type"); statement.execute("drop database if exists " + dbName); statement.execute("create database if not exists " + dbName + " WAL_RETENTION_PERIOD 3650"); statement.execute("use " + dbName); diff --git a/src/test/java/com/taosdata/jdbc/ws/tmq/entity/TMQRequestFactoryTest.java b/src/test/java/com/taosdata/jdbc/ws/tmq/entity/TMQRequestFactoryTest.java index 00c6de02..012be581 100644 --- a/src/test/java/com/taosdata/jdbc/ws/tmq/entity/TMQRequestFactoryTest.java +++ b/src/test/java/com/taosdata/jdbc/ws/tmq/entity/TMQRequestFactoryTest.java @@ -22,7 +22,7 @@ public void testGenerateSubscribe() { String[] topics = {"topic_1", "topic_2"}; Request request = factory.generateSubscribe("root", "taosdata", "test", "gId", "cId", "offset", topics - , null, null, null); + , null, null); JSONObject jsonObject = JSONObject.parseObject(request.toString()); SubscribeReq req = JSON.toJavaObject((JSON) JSON.toJSON(jsonObject.get("args")), SubscribeReq.class); Assert.assertEquals(1, req.getReqId()); From 678455301192f92b7940c5e348747a609b196e16 Mon Sep 17 00:00:00 2001 From: sheyanjie Date: Thu, 21 Dec 2023 11:20:03 +0800 Subject: [PATCH 2/2] add auto commit test --- .../com/taosdata/jdbc/ws/tmq/WSConsumer.java | 4 +- .../jdbc/ws/WSConsumerAutoCommitTest.java | 101 +++++++++--------- 2 files changed, 56 insertions(+), 49 deletions(-) diff --git a/src/main/java/com/taosdata/jdbc/ws/tmq/WSConsumer.java b/src/main/java/com/taosdata/jdbc/ws/tmq/WSConsumer.java index 41dba1ac..142bb28b 100644 --- a/src/main/java/com/taosdata/jdbc/ws/tmq/WSConsumer.java +++ b/src/main/java/com/taosdata/jdbc/ws/tmq/WSConsumer.java @@ -148,8 +148,10 @@ public ConsumerRecords poll(Duration timeout, Deserializer deserializer) t public synchronized void commitSync() throws SQLException { if (0 != messageId) { CommitResp commitResp = (CommitResp) transport.send(factory.generateCommit(messageId)); - if (Code.SUCCESS.getCode() != commitResp.getCode()) + if (Code.SUCCESS.getCode() != commitResp.getCode()) { throw new SQLException("consumer commit error. code: (0x" + Integer.toHexString(commitResp.getCode()) + "), message: " + commitResp.getMessage()); + } + messageId = 0; } } diff --git a/src/test/java/com/taosdata/jdbc/ws/WSConsumerAutoCommitTest.java b/src/test/java/com/taosdata/jdbc/ws/WSConsumerAutoCommitTest.java index c6d6cf6a..4c3b1c86 100644 --- a/src/test/java/com/taosdata/jdbc/ws/WSConsumerAutoCommitTest.java +++ b/src/test/java/com/taosdata/jdbc/ws/WSConsumerAutoCommitTest.java @@ -1,5 +1,6 @@ package com.taosdata.jdbc.ws; +import com.alibaba.fastjson.JSON; import com.taosdata.jdbc.TSDBDriver; import com.taosdata.jdbc.tmq.*; import com.taosdata.jdbc.utils.SpecifyAddress; @@ -11,6 +12,8 @@ import java.time.Duration; import java.util.Collections; import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class WSConsumerAutoCommitTest { @@ -21,34 +24,73 @@ public class WSConsumerAutoCommitTest { private static Connection connection; + private static ScheduledExecutorService scheduledExecutorService; + private static int count = 0; + private volatile boolean stop = false; + @Test public void TestWithBean() throws Exception { + scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> { + Thread t = new Thread(r); + t.setName("topic-thread-" + t.getId()); + return t; + }); + + scheduledExecutorService.scheduleWithFixedDelay(() -> { + try { + if (!stop) { + Statement statement = connection.createStatement(); + String sql = String.format("insert into %s.ct0 (ts, c1) values (now, %s)", dbName, count); + statement.executeUpdate(sql); + count++; + } + } catch (SQLException e) { + System.out.println(e.getMessage()); + } + }, 20, 10, TimeUnit.MILLISECONDS); + Properties properties = new Properties(); properties.setProperty(TMQConstants.BOOTSTRAP_SERVERS, host + ":6041"); properties.setProperty(TMQConstants.CONNECT_TYPE, "ws"); properties.setProperty(TMQConstants.CONNECT_USER, "root"); properties.setProperty(TMQConstants.CONNECT_PASS, "taosdata"); properties.setProperty(TMQConstants.MSG_WITH_TABLE_NAME, "true"); - properties.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "false"); + properties.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "true"); + properties.setProperty(TMQConstants.AUTO_COMMIT_INTERVAL, "30"); properties.setProperty(TMQConstants.GROUP_ID, "withBean"); + properties.setProperty(TMQConstants.AUTO_OFFSET_RESET, "earliest"); properties.setProperty(TMQConstants.VALUE_DESERIALIZER, "com.taosdata.jdbc.ws.WSConsumerAutoCommitTest$BeanDeserializer"); - Timestamp ts = null; + int last = 0; try (TaosConsumer consumer = new TaosConsumer<>(properties);) { consumer.subscribe(Collections.singletonList(topic)); - for (int i = 0; i < 2; i++) { - ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(100)); + + for (int i = 0; i < 5; i++) { + ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(10)); for (ConsumerRecord r : consumerRecords) { Bean bean = r.value(); - if (ts == null) { - ts = bean.getTs(); - } else { - assert ts.equals(bean.getTs()); - } + last = bean.getC1(); } } + Thread.sleep(30); + //this poll will commit msg last received + consumer.poll(Duration.ofMillis(10)); + + } + + try (TaosConsumer consumer = new TaosConsumer<>(properties);) { + consumer.subscribe(Collections.singletonList(topic)); + ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord r : consumerRecords) { + Bean bean = r.value(); + //new msg value will bigger than last received + assert (bean.getC1() > last); + break; + } consumer.unsubscribe(); } + stop = true; + scheduledExecutorService.shutdown(); } @BeforeClass @@ -68,10 +110,9 @@ public static void before() throws SQLException { statement.executeUpdate("create database if not exists " + dbName + " WAL_RETENTION_PERIOD 3650"); statement.executeUpdate("use " + dbName); statement.executeUpdate("create stable if not exists " + superTable - + " (ts timestamp, c1 int, c2 float, c3 nchar(10), c4 binary(10), c5 bool) tags(t1 int)"); + + " (ts timestamp, c1 int) tags(t1 int)"); statement.executeUpdate("create table if not exists ct0 using " + superTable + " tags(1000)"); - statement.executeUpdate("insert into " + dbName + ".ct0 (ts) values (now)"); - statement.executeUpdate("create topic if not exists " + topic + " as select ts, c1, c2, c3, c4, c5, t1 from ct0"); + statement.executeUpdate("create topic if not exists " + topic + " as select ts, c1, t1 from ct0"); } } @@ -90,11 +131,7 @@ static class BeanDeserializer extends ReferenceDeserializer { static class Bean { private Timestamp ts; private Integer c1; - private Float c2; - private String c3; - private byte[] c4; private Integer t1; - private Boolean c5; public Timestamp getTs() { return ts; @@ -112,30 +149,6 @@ public void setC1(Integer c1) { this.c1 = c1; } - public Float getC2() { - return c2; - } - - public void setC2(Float c2) { - this.c2 = c2; - } - - public String getC3() { - return c3; - } - - public void setC3(String c3) { - this.c3 = c3; - } - - public byte[] getC4() { - return c4; - } - - public void setC4(byte[] c4) { - this.c4 = c4; - } - public Integer getT1() { return t1; } @@ -143,13 +156,5 @@ public Integer getT1() { public void setT1(Integer t1) { this.t1 = t1; } - - public Boolean getC5() { - return c5; - } - - public void setC5(Boolean c5) { - this.c5 = c5; - } } }