Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/ts 4109 #134

Merged
merged 2 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions src/main/java/com/taosdata/jdbc/ws/tmq/WSConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class WSConsumer<V> implements Consumer<V> {
private Transport transport;
private ConsumerParam param;
private TMQRequestFactory factory;
private long lastCommitTime = 0;
private long messageId = 0L;

@Override
Expand Down Expand Up @@ -71,8 +72,7 @@ public void subscribe(Collection<String> topics) throws SQLException {
, param.getClientId()
, param.getOffsetRest()
, topics.toArray(new String[0])
, String.valueOf(param.isAutoCommit())
, param.getAutoCommitInterval()
, String.valueOf(false)
, param.getMsgWithTableName()
);
SubscribeResp response = (SubscribeResp) transport.send(request);
Expand Down Expand Up @@ -105,6 +105,14 @@ public Set<String> subscription() throws SQLException {

@Override
public ConsumerRecords<V> poll(Duration timeout, Deserializer<V> deserializer) throws SQLException {
if (param.isAutoCommit() && (0 != messageId)) {
long now = System.currentTimeMillis();
if (now - lastCommitTime > param.getAutoCommitInterval()) {
commitSync();
lastCommitTime = now;
}
}

Request request = factory.generatePoll(timeout.toMillis());
PollResp pollResp = (PollResp) transport.send(request);
if (Code.SUCCESS.getCode() != pollResp.getCode()) {
Expand Down Expand Up @@ -140,8 +148,9 @@ public ConsumerRecords<V> poll(Duration timeout, Deserializer<V> deserializer) t
public synchronized void commitSync() throws SQLException {
if (0 != messageId) {
CommitResp commitResp = (CommitResp) transport.send(factory.generateCommit(messageId));
if (Code.SUCCESS.getCode() != commitResp.getCode())
if (Code.SUCCESS.getCode() != commitResp.getCode()) {
throw new SQLException("consumer commit error. code: (0x" + Integer.toHexString(commitResp.getCode()) + "), message: " + commitResp.getMessage());
}
messageId = 0;
}
}
Expand Down
13 changes: 9 additions & 4 deletions src/main/java/com/taosdata/jdbc/ws/tmq/entity/ConsumerParam.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.taosdata.jdbc.ws.tmq.entity;

import com.taosdata.jdbc.TSDBDriver;
import com.taosdata.jdbc.TSDBError;
import com.taosdata.jdbc.TSDBErrorNumbers;
import com.taosdata.jdbc.rs.ConnectionParam;
import com.taosdata.jdbc.tmq.TMQConstants;
import com.taosdata.jdbc.utils.StringUtils;
Expand All @@ -14,7 +16,7 @@ public class ConsumerParam {
private String clientId;
private String offsetRest;
private final boolean autoCommit;
private String autoCommitInterval;
private long autoCommitInterval;
private String msgWithTableName;

public ConsumerParam(Properties properties) throws SQLException {
Expand All @@ -35,7 +37,10 @@ public ConsumerParam(Properties properties) throws SQLException {
groupId = properties.getProperty(TMQConstants.GROUP_ID);
clientId = properties.getProperty(TMQConstants.CLIENT_ID);
offsetRest = properties.getProperty(TMQConstants.AUTO_OFFSET_RESET);
autoCommitInterval = properties.getProperty(TMQConstants.AUTO_COMMIT_INTERVAL);
autoCommitInterval = Long.parseLong(properties.getProperty(TMQConstants.AUTO_COMMIT_INTERVAL, "5000"));
if (autoCommitInterval < 0){
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_TMQ_CONF_ERROR, "autoCommitInterval must be greater than 0");
}
msgWithTableName = properties.getProperty(TMQConstants.MSG_WITH_TABLE_NAME);
}

Expand Down Expand Up @@ -75,11 +80,11 @@ public boolean isAutoCommit() {
return autoCommit;
}

public String getAutoCommitInterval() {
public long getAutoCommitInterval() {
return autoCommitInterval;
}

public void setAutoCommitInterval(String autoCommitInterval) {
public void setAutoCommitInterval(long autoCommitInterval) {
this.autoCommitInterval = autoCommitInterval;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public TMQRequestFactory() {

public Request generateSubscribe(String user, String password, String db, String groupId,
String clientId, String offsetRest, String[] topics
, String enableAutoCommit, String autoCommitIntervalMs, String withTableName) {
, String enableAutoCommit, String withTableName) {
long reqId = this.getId(ConsumerAction.SUBSCRIBE.getAction());

SubscribeReq subscribeReq = new SubscribeReq();
Expand All @@ -39,7 +39,6 @@ public Request generateSubscribe(String user, String password, String db, String
subscribeReq.setOffsetRest(offsetRest);
subscribeReq.setTopics(topics);
subscribeReq.setAutoCommit(enableAutoCommit);
subscribeReq.setAutoCommitIntervalMs(autoCommitIntervalMs);
subscribeReq.setWithTableName(withTableName);
return new Request(ConsumerAction.SUBSCRIBE.getAction(), subscribeReq);
}
Expand Down
101 changes: 53 additions & 48 deletions src/test/java/com/taosdata/jdbc/ws/WSConsumerAutoCommitTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.taosdata.jdbc.ws;

import com.alibaba.fastjson.JSON;
import com.taosdata.jdbc.TSDBDriver;
import com.taosdata.jdbc.tmq.*;
import com.taosdata.jdbc.utils.SpecifyAddress;
Expand All @@ -11,6 +12,8 @@
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class WSConsumerAutoCommitTest {
Expand All @@ -21,34 +24,73 @@ public class WSConsumerAutoCommitTest {

private static Connection connection;

private static ScheduledExecutorService scheduledExecutorService;
private static int count = 0;
private volatile boolean stop = false;

@Test
public void TestWithBean() throws Exception {
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r);
t.setName("topic-thread-" + t.getId());
return t;
});

scheduledExecutorService.scheduleWithFixedDelay(() -> {
try {
if (!stop) {
Statement statement = connection.createStatement();
String sql = String.format("insert into %s.ct0 (ts, c1) values (now, %s)", dbName, count);
statement.executeUpdate(sql);
count++;
}
} catch (SQLException e) {
System.out.println(e.getMessage());
}
}, 20, 10, TimeUnit.MILLISECONDS);

Properties properties = new Properties();
properties.setProperty(TMQConstants.BOOTSTRAP_SERVERS, host + ":6041");
properties.setProperty(TMQConstants.CONNECT_TYPE, "ws");
properties.setProperty(TMQConstants.CONNECT_USER, "root");
properties.setProperty(TMQConstants.CONNECT_PASS, "taosdata");
properties.setProperty(TMQConstants.MSG_WITH_TABLE_NAME, "true");
properties.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "false");
properties.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "true");
properties.setProperty(TMQConstants.AUTO_COMMIT_INTERVAL, "30");
properties.setProperty(TMQConstants.GROUP_ID, "withBean");
properties.setProperty(TMQConstants.AUTO_OFFSET_RESET, "earliest");
properties.setProperty(TMQConstants.VALUE_DESERIALIZER, "com.taosdata.jdbc.ws.WSConsumerAutoCommitTest$BeanDeserializer");

Timestamp ts = null;
int last = 0;
try (TaosConsumer<Bean> consumer = new TaosConsumer<>(properties);) {
consumer.subscribe(Collections.singletonList(topic));
for (int i = 0; i < 2; i++) {
ConsumerRecords<Bean> consumerRecords = consumer.poll(Duration.ofMillis(100));

for (int i = 0; i < 5; i++) {
ConsumerRecords<Bean> consumerRecords = consumer.poll(Duration.ofMillis(10));
for (ConsumerRecord<Bean> r : consumerRecords) {
Bean bean = r.value();
if (ts == null) {
ts = bean.getTs();
} else {
assert ts.equals(bean.getTs());
}
last = bean.getC1();
}
}
Thread.sleep(30);
//this poll will commit msg last received
consumer.poll(Duration.ofMillis(10));

}

try (TaosConsumer<Bean> consumer = new TaosConsumer<>(properties);) {
consumer.subscribe(Collections.singletonList(topic));
ConsumerRecords<Bean> consumerRecords = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<Bean> r : consumerRecords) {
Bean bean = r.value();
//new msg value will bigger than last received
assert (bean.getC1() > last);
break;
}
consumer.unsubscribe();
}
stop = true;
scheduledExecutorService.shutdown();
}

@BeforeClass
Expand All @@ -68,10 +110,9 @@ public static void before() throws SQLException {
statement.executeUpdate("create database if not exists " + dbName + " WAL_RETENTION_PERIOD 3650");
statement.executeUpdate("use " + dbName);
statement.executeUpdate("create stable if not exists " + superTable
+ " (ts timestamp, c1 int, c2 float, c3 nchar(10), c4 binary(10), c5 bool) tags(t1 int)");
+ " (ts timestamp, c1 int) tags(t1 int)");
statement.executeUpdate("create table if not exists ct0 using " + superTable + " tags(1000)");
statement.executeUpdate("insert into " + dbName + ".ct0 (ts) values (now)");
statement.executeUpdate("create topic if not exists " + topic + " as select ts, c1, c2, c3, c4, c5, t1 from ct0");
statement.executeUpdate("create topic if not exists " + topic + " as select ts, c1, t1 from ct0");
}
}

Expand All @@ -90,11 +131,7 @@ static class BeanDeserializer extends ReferenceDeserializer<Bean> {
static class Bean {
private Timestamp ts;
private Integer c1;
private Float c2;
private String c3;
private byte[] c4;
private Integer t1;
private Boolean c5;

public Timestamp getTs() {
return ts;
Expand All @@ -112,44 +149,12 @@ public void setC1(Integer c1) {
this.c1 = c1;
}

public Float getC2() {
return c2;
}

public void setC2(Float c2) {
this.c2 = c2;
}

public String getC3() {
return c3;
}

public void setC3(String c3) {
this.c3 = c3;
}

public byte[] getC4() {
return c4;
}

public void setC4(byte[] c4) {
this.c4 = c4;
}

public Integer getT1() {
return t1;
}

public void setT1(Integer t1) {
this.t1 = t1;
}

public Boolean getC5() {
return c5;
}

public void setC5(Boolean c5) {
this.c5 = c5;
}
}
}
1 change: 1 addition & 0 deletions src/test/java/com/taosdata/jdbc/ws/WSConsumerNullTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public static void before() throws SQLException {
// properties.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "true");
connection = DriverManager.getConnection(url, properties);
statement = connection.createStatement();
statement.execute("drop topic if exists topic_ws_bean_type");
statement.execute("drop database if exists " + dbName);
statement.execute("create database if not exists " + dbName + " WAL_RETENTION_PERIOD 3650");
statement.execute("use " + dbName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public void testGenerateSubscribe() {
String[] topics = {"topic_1", "topic_2"};
Request request = factory.generateSubscribe("root", "taosdata", "test", "gId",
"cId", "offset", topics
, null, null, null);
, null, null);
JSONObject jsonObject = JSONObject.parseObject(request.toString());
SubscribeReq req = JSON.toJavaObject((JSON) JSON.toJSON(jsonObject.get("args")), SubscribeReq.class);
Assert.assertEquals(1, req.getReqId());
Expand Down
Loading