From 7f9d17e9b4e65bdccb4a793f831cb00ae3bb32ec Mon Sep 17 00:00:00 2001 From: huolibo Date: Fri, 27 Oct 2023 15:24:26 +0800 Subject: [PATCH] fix: tmq skip meta data --- .../taosdata/jdbc/enums/TmqMessageType.java | 22 +++++++++++++++++++ .../com/taosdata/jdbc/ws/tmq/WSConsumer.java | 5 +++++ .../jdbc/ws/tmq/WSConsumerResultSet.java | 2 +- 3 files changed, 28 insertions(+), 1 deletion(-) create mode 100644 src/main/java/com/taosdata/jdbc/enums/TmqMessageType.java diff --git a/src/main/java/com/taosdata/jdbc/enums/TmqMessageType.java b/src/main/java/com/taosdata/jdbc/enums/TmqMessageType.java new file mode 100644 index 00000000..48fff4f1 --- /dev/null +++ b/src/main/java/com/taosdata/jdbc/enums/TmqMessageType.java @@ -0,0 +1,22 @@ +package com.taosdata.jdbc.enums; + +public enum TmqMessageType { + TMQ_RES_INVALID(-1), + + TMQ_RES_DATA(1), + + TMQ_RES_TABLE_META(2), + + TMQ_RES_METADATA(3), + ; + + final int code; + + TmqMessageType(int code) { + this.code = code; + } + + public int getCode() { + return code; + } +} diff --git a/src/main/java/com/taosdata/jdbc/ws/tmq/WSConsumer.java b/src/main/java/com/taosdata/jdbc/ws/tmq/WSConsumer.java index 9adec9bc..baa12cc6 100644 --- a/src/main/java/com/taosdata/jdbc/ws/tmq/WSConsumer.java +++ b/src/main/java/com/taosdata/jdbc/ws/tmq/WSConsumer.java @@ -5,6 +5,7 @@ import com.taosdata.jdbc.TSDBError; import com.taosdata.jdbc.TSDBErrorNumbers; import com.taosdata.jdbc.common.Consumer; +import com.taosdata.jdbc.enums.TmqMessageType; import com.taosdata.jdbc.enums.WSFunction; import com.taosdata.jdbc.tmq.*; import com.taosdata.jdbc.ws.FutureResponse; @@ -113,6 +114,10 @@ public ConsumerRecords poll(Duration timeout, Deserializer deserializer) t if (!pollResp.isHaveMessage()) return ConsumerRecords.emptyRecord(); + if (pollResp.getMessageType() != TmqMessageType.TMQ_RES_DATA.getCode()) { + // TODO handle other message type + return ConsumerRecords.emptyRecord(); + } messageId = pollResp.getMessageId(); ConsumerRecords records = new ConsumerRecords<>(); try (WSConsumerResultSet rs = new WSConsumerResultSet(transport, factory, pollResp.getMessageId(), pollResp.getDatabase())) { diff --git a/src/main/java/com/taosdata/jdbc/ws/tmq/WSConsumerResultSet.java b/src/main/java/com/taosdata/jdbc/ws/tmq/WSConsumerResultSet.java index cca8437f..5e3fa0cf 100644 --- a/src/main/java/com/taosdata/jdbc/ws/tmq/WSConsumerResultSet.java +++ b/src/main/java/com/taosdata/jdbc/ws/tmq/WSConsumerResultSet.java @@ -79,7 +79,7 @@ public boolean next() throws SQLException { Request request = factory.generateFetch(messageId); FetchResp fetchResp = (FetchResp) transport.send(request); if (Code.SUCCESS.getCode() != fetchResp.getCode()) - throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, fetchResp.getMessage()); + throw TSDBError.createSQLException(fetchResp.getCode(), fetchResp.getMessage()); this.reset(); if (fetchResp.isCompleted() || fetchResp.getRows() == 0)