Skip to content

Commit

Permalink
Merge pull request #129 from taosdata/member/huolibo
Browse files Browse the repository at this point in the history
fix: tmq skip meta data
  • Loading branch information
sheyanjie-qq authored Oct 27, 2023
2 parents eb4edc0 + 7f9d17e commit 1567359
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 1 deletion.
22 changes: 22 additions & 0 deletions src/main/java/com/taosdata/jdbc/enums/TmqMessageType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.taosdata.jdbc.enums;

public enum TmqMessageType {
TMQ_RES_INVALID(-1),

TMQ_RES_DATA(1),

TMQ_RES_TABLE_META(2),

TMQ_RES_METADATA(3),
;

final int code;

TmqMessageType(int code) {
this.code = code;
}

public int getCode() {
return code;
}
}
5 changes: 5 additions & 0 deletions src/main/java/com/taosdata/jdbc/ws/tmq/WSConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.taosdata.jdbc.TSDBError;
import com.taosdata.jdbc.TSDBErrorNumbers;
import com.taosdata.jdbc.common.Consumer;
import com.taosdata.jdbc.enums.TmqMessageType;
import com.taosdata.jdbc.enums.WSFunction;
import com.taosdata.jdbc.tmq.*;
import com.taosdata.jdbc.ws.FutureResponse;
Expand Down Expand Up @@ -113,6 +114,10 @@ public ConsumerRecords<V> poll(Duration timeout, Deserializer<V> deserializer) t
if (!pollResp.isHaveMessage())
return ConsumerRecords.emptyRecord();

if (pollResp.getMessageType() != TmqMessageType.TMQ_RES_DATA.getCode()) {
// TODO handle other message type
return ConsumerRecords.emptyRecord();
}
messageId = pollResp.getMessageId();
ConsumerRecords<V> records = new ConsumerRecords<>();
try (WSConsumerResultSet rs = new WSConsumerResultSet(transport, factory, pollResp.getMessageId(), pollResp.getDatabase())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public boolean next() throws SQLException {
Request request = factory.generateFetch(messageId);
FetchResp fetchResp = (FetchResp) transport.send(request);
if (Code.SUCCESS.getCode() != fetchResp.getCode())
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, fetchResp.getMessage());
throw TSDBError.createSQLException(fetchResp.getCode(), fetchResp.getMessage());

this.reset();
if (fetchResp.isCompleted() || fetchResp.getRows() == 0)
Expand Down

0 comments on commit 1567359

Please sign in to comment.