Skip to content

Commit

Permalink
Merge pull request #134 from taosdata/feat/TS-4109
Browse files Browse the repository at this point in the history
Feat/ts 4109
  • Loading branch information
sheyanjie-qq authored Dec 21, 2023
2 parents d303343 + 6784553 commit 12d01fa
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 58 deletions.
15 changes: 12 additions & 3 deletions src/main/java/com/taosdata/jdbc/ws/tmq/WSConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class WSConsumer<V> implements Consumer<V> {
private Transport transport;
private ConsumerParam param;
private TMQRequestFactory factory;
private long lastCommitTime = 0;
private long messageId = 0L;

@Override
Expand Down Expand Up @@ -71,8 +72,7 @@ public void subscribe(Collection<String> topics) throws SQLException {
, param.getClientId()
, param.getOffsetRest()
, topics.toArray(new String[0])
, String.valueOf(param.isAutoCommit())
, param.getAutoCommitInterval()
, String.valueOf(false)
, param.getMsgWithTableName()
);
SubscribeResp response = (SubscribeResp) transport.send(request);
Expand Down Expand Up @@ -105,6 +105,14 @@ public Set<String> subscription() throws SQLException {

@Override
public ConsumerRecords<V> poll(Duration timeout, Deserializer<V> deserializer) throws SQLException {
if (param.isAutoCommit() && (0 != messageId)) {
long now = System.currentTimeMillis();
if (now - lastCommitTime > param.getAutoCommitInterval()) {
commitSync();
lastCommitTime = now;
}
}

Request request = factory.generatePoll(timeout.toMillis());
PollResp pollResp = (PollResp) transport.send(request);
if (Code.SUCCESS.getCode() != pollResp.getCode()) {
Expand Down Expand Up @@ -140,8 +148,9 @@ public ConsumerRecords<V> poll(Duration timeout, Deserializer<V> deserializer) t
public synchronized void commitSync() throws SQLException {
if (0 != messageId) {
CommitResp commitResp = (CommitResp) transport.send(factory.generateCommit(messageId));
if (Code.SUCCESS.getCode() != commitResp.getCode())
if (Code.SUCCESS.getCode() != commitResp.getCode()) {
throw new SQLException("consumer commit error. code: (0x" + Integer.toHexString(commitResp.getCode()) + "), message: " + commitResp.getMessage());
}
messageId = 0;
}
}
Expand Down
13 changes: 9 additions & 4 deletions src/main/java/com/taosdata/jdbc/ws/tmq/entity/ConsumerParam.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.taosdata.jdbc.ws.tmq.entity;

import com.taosdata.jdbc.TSDBDriver;
import com.taosdata.jdbc.TSDBError;
import com.taosdata.jdbc.TSDBErrorNumbers;
import com.taosdata.jdbc.rs.ConnectionParam;
import com.taosdata.jdbc.tmq.TMQConstants;
import com.taosdata.jdbc.utils.StringUtils;
Expand All @@ -14,7 +16,7 @@ public class ConsumerParam {
private String clientId;
private String offsetRest;
private final boolean autoCommit;
private String autoCommitInterval;
private long autoCommitInterval;
private String msgWithTableName;

public ConsumerParam(Properties properties) throws SQLException {
Expand All @@ -35,7 +37,10 @@ public ConsumerParam(Properties properties) throws SQLException {
groupId = properties.getProperty(TMQConstants.GROUP_ID);
clientId = properties.getProperty(TMQConstants.CLIENT_ID);
offsetRest = properties.getProperty(TMQConstants.AUTO_OFFSET_RESET);
autoCommitInterval = properties.getProperty(TMQConstants.AUTO_COMMIT_INTERVAL);
autoCommitInterval = Long.parseLong(properties.getProperty(TMQConstants.AUTO_COMMIT_INTERVAL, "5000"));
if (autoCommitInterval < 0){
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_TMQ_CONF_ERROR, "autoCommitInterval must be greater than 0");
}
msgWithTableName = properties.getProperty(TMQConstants.MSG_WITH_TABLE_NAME);
}

Expand Down Expand Up @@ -75,11 +80,11 @@ public boolean isAutoCommit() {
return autoCommit;
}

public String getAutoCommitInterval() {
public long getAutoCommitInterval() {
return autoCommitInterval;
}

public void setAutoCommitInterval(String autoCommitInterval) {
public void setAutoCommitInterval(long autoCommitInterval) {
this.autoCommitInterval = autoCommitInterval;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public TMQRequestFactory() {

public Request generateSubscribe(String user, String password, String db, String groupId,
String clientId, String offsetRest, String[] topics
, String enableAutoCommit, String autoCommitIntervalMs, String withTableName) {
, String enableAutoCommit, String withTableName) {
long reqId = this.getId(ConsumerAction.SUBSCRIBE.getAction());

SubscribeReq subscribeReq = new SubscribeReq();
Expand All @@ -39,7 +39,6 @@ public Request generateSubscribe(String user, String password, String db, String
subscribeReq.setOffsetRest(offsetRest);
subscribeReq.setTopics(topics);
subscribeReq.setAutoCommit(enableAutoCommit);
subscribeReq.setAutoCommitIntervalMs(autoCommitIntervalMs);
subscribeReq.setWithTableName(withTableName);
return new Request(ConsumerAction.SUBSCRIBE.getAction(), subscribeReq);
}
Expand Down
101 changes: 53 additions & 48 deletions src/test/java/com/taosdata/jdbc/ws/WSConsumerAutoCommitTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.taosdata.jdbc.ws;

import com.alibaba.fastjson.JSON;
import com.taosdata.jdbc.TSDBDriver;
import com.taosdata.jdbc.tmq.*;
import com.taosdata.jdbc.utils.SpecifyAddress;
Expand All @@ -11,6 +12,8 @@
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class WSConsumerAutoCommitTest {
Expand All @@ -21,34 +24,73 @@ public class WSConsumerAutoCommitTest {

private static Connection connection;

private static ScheduledExecutorService scheduledExecutorService;
private static int count = 0;
private volatile boolean stop = false;

@Test
public void TestWithBean() throws Exception {
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r);
t.setName("topic-thread-" + t.getId());
return t;
});

scheduledExecutorService.scheduleWithFixedDelay(() -> {
try {
if (!stop) {
Statement statement = connection.createStatement();
String sql = String.format("insert into %s.ct0 (ts, c1) values (now, %s)", dbName, count);
statement.executeUpdate(sql);
count++;
}
} catch (SQLException e) {
System.out.println(e.getMessage());
}
}, 20, 10, TimeUnit.MILLISECONDS);

Properties properties = new Properties();
properties.setProperty(TMQConstants.BOOTSTRAP_SERVERS, host + ":6041");
properties.setProperty(TMQConstants.CONNECT_TYPE, "ws");
properties.setProperty(TMQConstants.CONNECT_USER, "root");
properties.setProperty(TMQConstants.CONNECT_PASS, "taosdata");
properties.setProperty(TMQConstants.MSG_WITH_TABLE_NAME, "true");
properties.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "false");
properties.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "true");
properties.setProperty(TMQConstants.AUTO_COMMIT_INTERVAL, "30");
properties.setProperty(TMQConstants.GROUP_ID, "withBean");
properties.setProperty(TMQConstants.AUTO_OFFSET_RESET, "earliest");
properties.setProperty(TMQConstants.VALUE_DESERIALIZER, "com.taosdata.jdbc.ws.WSConsumerAutoCommitTest$BeanDeserializer");

Timestamp ts = null;
int last = 0;
try (TaosConsumer<Bean> consumer = new TaosConsumer<>(properties);) {
consumer.subscribe(Collections.singletonList(topic));
for (int i = 0; i < 2; i++) {
ConsumerRecords<Bean> consumerRecords = consumer.poll(Duration.ofMillis(100));

for (int i = 0; i < 5; i++) {
ConsumerRecords<Bean> consumerRecords = consumer.poll(Duration.ofMillis(10));
for (ConsumerRecord<Bean> r : consumerRecords) {
Bean bean = r.value();
if (ts == null) {
ts = bean.getTs();
} else {
assert ts.equals(bean.getTs());
}
last = bean.getC1();
}
}
Thread.sleep(30);
//this poll will commit msg last received
consumer.poll(Duration.ofMillis(10));

}

try (TaosConsumer<Bean> consumer = new TaosConsumer<>(properties);) {
consumer.subscribe(Collections.singletonList(topic));
ConsumerRecords<Bean> consumerRecords = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<Bean> r : consumerRecords) {
Bean bean = r.value();
//new msg value will bigger than last received
assert (bean.getC1() > last);
break;
}
consumer.unsubscribe();
}
stop = true;
scheduledExecutorService.shutdown();
}

@BeforeClass
Expand All @@ -68,10 +110,9 @@ public static void before() throws SQLException {
statement.executeUpdate("create database if not exists " + dbName + " WAL_RETENTION_PERIOD 3650");
statement.executeUpdate("use " + dbName);
statement.executeUpdate("create stable if not exists " + superTable
+ " (ts timestamp, c1 int, c2 float, c3 nchar(10), c4 binary(10), c5 bool) tags(t1 int)");
+ " (ts timestamp, c1 int) tags(t1 int)");
statement.executeUpdate("create table if not exists ct0 using " + superTable + " tags(1000)");
statement.executeUpdate("insert into " + dbName + ".ct0 (ts) values (now)");
statement.executeUpdate("create topic if not exists " + topic + " as select ts, c1, c2, c3, c4, c5, t1 from ct0");
statement.executeUpdate("create topic if not exists " + topic + " as select ts, c1, t1 from ct0");
}
}

Expand All @@ -90,11 +131,7 @@ static class BeanDeserializer extends ReferenceDeserializer<Bean> {
static class Bean {
private Timestamp ts;
private Integer c1;
private Float c2;
private String c3;
private byte[] c4;
private Integer t1;
private Boolean c5;

public Timestamp getTs() {
return ts;
Expand All @@ -112,44 +149,12 @@ public void setC1(Integer c1) {
this.c1 = c1;
}

public Float getC2() {
return c2;
}

public void setC2(Float c2) {
this.c2 = c2;
}

public String getC3() {
return c3;
}

public void setC3(String c3) {
this.c3 = c3;
}

public byte[] getC4() {
return c4;
}

public void setC4(byte[] c4) {
this.c4 = c4;
}

public Integer getT1() {
return t1;
}

public void setT1(Integer t1) {
this.t1 = t1;
}

public Boolean getC5() {
return c5;
}

public void setC5(Boolean c5) {
this.c5 = c5;
}
}
}
1 change: 1 addition & 0 deletions src/test/java/com/taosdata/jdbc/ws/WSConsumerNullTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public static void before() throws SQLException {
// properties.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "true");
connection = DriverManager.getConnection(url, properties);
statement = connection.createStatement();
statement.execute("drop topic if exists topic_ws_bean_type");
statement.execute("drop database if exists " + dbName);
statement.execute("create database if not exists " + dbName + " WAL_RETENTION_PERIOD 3650");
statement.execute("use " + dbName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public void testGenerateSubscribe() {
String[] topics = {"topic_1", "topic_2"};
Request request = factory.generateSubscribe("root", "taosdata", "test", "gId",
"cId", "offset", topics
, null, null, null);
, null, null);
JSONObject jsonObject = JSONObject.parseObject(request.toString());
SubscribeReq req = JSON.toJavaObject((JSON) JSON.toJSON(jsonObject.get("args")), SubscribeReq.class);
Assert.assertEquals(1, req.getReqId());
Expand Down

0 comments on commit 12d01fa

Please sign in to comment.