diff --git a/mycat2/src/main/java/io/mycat/monitor/MycatSQLLogMonitorImpl.java b/mycat2/src/main/java/io/mycat/monitor/MycatSQLLogMonitorImpl.java index 56b7850b5..994cd753a 100644 --- a/mycat2/src/main/java/io/mycat/monitor/MycatSQLLogMonitorImpl.java +++ b/mycat2/src/main/java/io/mycat/monitor/MycatSQLLogMonitorImpl.java @@ -1,9 +1,12 @@ package io.mycat.monitor; +import io.mycat.IOExecutor; +import io.mycat.MetaClusterCurrent; import io.mycat.config.MonitorConfig; import io.mycat.config.SqlLogConfig; import io.mycat.config.TimerConfig; import io.vertx.core.Handler; +import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.http.HttpServer; import io.vertx.core.http.HttpServerRequest; @@ -24,6 +27,7 @@ public class MycatSQLLogMonitorImpl extends MycatSQLLogMonitor { public static final String SHOW_DB_MONITOR_URL = "/ShowDbMonitor"; public static final String SHOW_RW_MONITOR_URL = "/ShowRwMonitor"; public static final String QUERY_SQL_LOG = "/QuerySqlLog"; + @SneakyThrows public MycatSQLLogMonitorImpl(long workerId, MonitorConfig monitorConfig, Vertx vertx) { super(workerId); @@ -71,8 +75,15 @@ private void initBaseMonitor(Vertx vertx) { TimeUnit timeUnit = TimeUnit.valueOf(instanceMonitorConfig.getTimeUnit()); vertx.setTimer(timeUnit.toMillis(instanceMonitorConfig.getInitialDelay()), event -> vertx.setPeriodic(timeUnit.toMillis(instanceMonitorConfig.getPeriod()), event1 -> { - instanceSnapshot = InstanceEntry.snapshot(); - InstanceEntry.reset(); + IOExecutor ioExecutor = MetaClusterCurrent.wrapper(IOExecutor.class); + ioExecutor.executeBlocking((Handler>) promise -> { + try { + instanceSnapshot = InstanceEntry.snapshot(); + InstanceEntry.reset(); + }finally { + promise.tryComplete(); + } + }); })); } @@ -83,8 +94,15 @@ private void initBaseMonitor(Vertx vertx) { long readWriteRatioMillis = timeUnit.toMillis(clusterConfig.getPeriod()); vertx.setTimer(timeUnit.toMillis(clusterConfig.getInitialDelay()), event -> vertx.setPeriodic(readWriteRatioMillis, event12 -> { - rwEntryMapSnapshot = RWEntry.snapshot(); - RWEntry.reset(); + IOExecutor ioExecutor = MetaClusterCurrent.wrapper(IOExecutor.class); + ioExecutor.executeBlocking((Handler>) promise -> { + try { + rwEntryMapSnapshot = RWEntry.snapshot(); + RWEntry.reset(); + }finally { + promise.tryComplete(); + } + }); })); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -93,8 +111,15 @@ private void initBaseMonitor(Vertx vertx) { TimeUnit timeUnit = TimeUnit.valueOf(databaseInstanceMonitor.getTimeUnit()); long databaseInstanceMonitorMillis = timeUnit.toMillis(databaseInstanceMonitor.getPeriod()); vertx.setTimer(timeUnit.toMillis(databaseInstanceMonitor.getInitialDelay()), event -> vertx.setPeriodic(databaseInstanceMonitorMillis, event12 -> { - databaseInstanceMapSnapshot = DatabaseInstanceEntry.snapshot(); - DatabaseInstanceEntry.reset(); + IOExecutor ioExecutor = MetaClusterCurrent.wrapper(IOExecutor.class); + ioExecutor.executeBlocking((Handler>) promise -> { + try { + databaseInstanceMapSnapshot = DatabaseInstanceEntry.snapshot(); + DatabaseInstanceEntry.reset(); + }finally { + promise.tryComplete(); + } + }); })); } } diff --git a/mycat2/src/main/java/io/mycat/vertx/VertxMycatServer.java b/mycat2/src/main/java/io/mycat/vertx/VertxMycatServer.java index a3ec8bb4f..d19dbd2e6 100644 --- a/mycat2/src/main/java/io/mycat/vertx/VertxMycatServer.java +++ b/mycat2/src/main/java/io/mycat/vertx/VertxMycatServer.java @@ -134,6 +134,7 @@ public void addSession(VertxSession vertxSession) { socket.closeHandler(event -> { String message = "session:{} is closing:{}"; LOGGER.info(message, vertxSession); + sessions.remove(vertxSession); }); sessions.add(vertxSession); }