From 5e2c665941113ab646feda5f8baa0cb5cacd2bbf Mon Sep 17 00:00:00 2001 From: Andrey Kurilov Date: Tue, 10 Jan 2017 17:10:15 +0300 Subject: [PATCH] periodic storage driver service stdout reporting --- .../mongoose/common/concurrent/Daemon.java | 28 +++++++++ .../load/generator/BasicLoadGenerator.java | 21 ++++--- .../load/monitor/BasicLoadMonitor.java | 54 ++++++++++++---- .../load/monitor/metrics/BasicIoStats.java | 12 ++-- .../monitor/metrics/MetricsCsvLogMessage.java | 4 +- .../metrics/MetricsStdoutLogMessage.java | 4 +- .../com/emc/mongoose/model}/DaemonBase.java | 30 +-------- .../mongoose/model/ServiceTaskDispatcher.java | 47 ++++++++++++++ .../mongoose/model/storage/StorageDriver.java | 19 ++++++ .../{load => }/UniformOptionSelectorTest.java | 2 +- scenario/misc/jira-issue-sltm843.json | 20 ++++++ .../driver/base/CommonDispatchTask.java | 61 ------------------- .../driver/base/StorageDriverBase.java | 54 +++++++--------- .../service/BasicStorageDriverBuilderSvc.java | 4 +- .../service/WrappingStorageDriverSvc.java | 55 ++++++++++++++++- .../impl/base/BasicStorageMockClient.java | 3 +- .../impl/base/BasicStorageMockServer.java | 2 +- .../mock/impl/base/StorageMockBase.java | 2 +- .../storage/mock/impl/http/NagainaNode.java | 2 +- .../java/com/emc/mongoose/ui/log/LogUtil.java | 4 +- 20 files changed, 266 insertions(+), 162 deletions(-) rename {common/src/main/java/com/emc/mongoose/common/concurrent => model/src/main/java/com/emc/mongoose/model}/DaemonBase.java (80%) create mode 100644 model/src/main/java/com/emc/mongoose/model/ServiceTaskDispatcher.java rename model/src/test/java/com/emc/mongoose/model/impl/{load => }/UniformOptionSelectorTest.java (97%) create mode 100644 scenario/misc/jira-issue-sltm843.json delete mode 100644 storage/driver/base/src/main/java/com/emc/mongoose/storage/driver/base/CommonDispatchTask.java diff --git a/common/src/main/java/com/emc/mongoose/common/concurrent/Daemon.java b/common/src/main/java/com/emc/mongoose/common/concurrent/Daemon.java index 5c7814eb51..cb8327fe97 100644 --- a/common/src/main/java/com/emc/mongoose/common/concurrent/Daemon.java +++ b/common/src/main/java/com/emc/mongoose/common/concurrent/Daemon.java @@ -2,6 +2,9 @@ import java.io.Closeable; import java.rmi.RemoteException; +import java.util.ConcurrentModificationException; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; /** @@ -10,6 +13,31 @@ public interface Daemon extends Closeable { + Queue UNCLOSED = new ConcurrentLinkedQueue<>(); + + static void closeAll() { + synchronized(UNCLOSED) { + // close all unclosed daemons + for(final Daemon d : UNCLOSED) { + try { + d.close(); + } catch(final IllegalStateException | ConcurrentModificationException ignored) { + } catch(final Throwable t) { + t.printStackTrace(System.err); + } + } + + // wait until the list of the unclosed daemons is empty + while(!UNCLOSED.isEmpty()) { + try { + TimeUnit.SECONDS.sleep(1); + } catch(final InterruptedException e) { + break; + } + } + } + } + enum State { INITIAL, STARTED, SHUTDOWN, INTERRUPTED, CLOSED } diff --git a/load/generator/src/main/java/com/emc/mongoose/load/generator/BasicLoadGenerator.java b/load/generator/src/main/java/com/emc/mongoose/load/generator/BasicLoadGenerator.java index 3057036107..ac71f9a5a5 100644 --- a/load/generator/src/main/java/com/emc/mongoose/load/generator/BasicLoadGenerator.java +++ b/load/generator/src/main/java/com/emc/mongoose/load/generator/BasicLoadGenerator.java @@ -1,7 +1,7 @@ package com.emc.mongoose.load.generator; import com.emc.mongoose.common.api.SizeInBytes; -import com.emc.mongoose.common.concurrent.DaemonBase; +import com.emc.mongoose.model.DaemonBase; import com.emc.mongoose.common.concurrent.Throttle; import com.emc.mongoose.common.io.Output; import com.emc.mongoose.common.io.ConstantStringInput; @@ -122,12 +122,10 @@ private final class GeneratorTask public final void run() { if(ioTaskOutput == null) { - LOG.warn(Markers.ERR, "No load I/O task output set, exiting"); - } - - if(itemInput == null) { - LOG.warn(Markers.MSG, "No item source for the producing, exiting"); - return; + LOG.warn( + Markers.ERR, "{}: no load I/O task output set, exiting", + BasicLoadGenerator.this.toString() + ); } int n = 0, m = 0; @@ -173,8 +171,10 @@ public final void run() { break; } catch(final Exception e) { LogUtil.exception( - LOG, Level.WARN, e, "Failed to read the data items, count = {}, " + - "batch size = {}, batch offset = {}", generatedIoTaskCount, n, m + LOG, Level.WARN, e, + "{}: failed to read the data items, count = {}, batch size = {}, " + + "batch offset = {}", + BasicLoadGenerator.this.toString(), generatedIoTaskCount, n, m ); //e.printStackTrace(System.err); } @@ -182,7 +182,8 @@ public final void run() { } finally { LOG.debug( Markers.MSG, "{}: produced {} items from \"{}\" for the \"{}\"", - Thread.currentThread().getName(), generatedIoTaskCount, itemInput.toString(), this + Thread.currentThread().getName(), generatedIoTaskCount, itemInput.toString(), + BasicLoadGenerator.this.toString() ); try { shutdown(); diff --git a/load/monitor/src/main/java/com/emc/mongoose/load/monitor/BasicLoadMonitor.java b/load/monitor/src/main/java/com/emc/mongoose/load/monitor/BasicLoadMonitor.java index 6107e805e9..9da4fa5478 100644 --- a/load/monitor/src/main/java/com/emc/mongoose/load/monitor/BasicLoadMonitor.java +++ b/load/monitor/src/main/java/com/emc/mongoose/load/monitor/BasicLoadMonitor.java @@ -1,7 +1,7 @@ package com.emc.mongoose.load.monitor; import com.emc.mongoose.common.api.SizeInBytes; -import com.emc.mongoose.common.concurrent.DaemonBase; +import com.emc.mongoose.model.DaemonBase; import com.emc.mongoose.common.concurrent.NamingThreadFactory; import com.emc.mongoose.common.concurrent.Throttle; import com.emc.mongoose.load.monitor.metrics.ExtResultsXmlLogMessage; @@ -498,20 +498,28 @@ protected void doShutdown() try { nextGenerator.interrupt(); + LOG.debug( + Markers.MSG, "{}: load generator \"{}\" shut down", getName(), + nextGenerator.toString() + ); } catch(final RemoteException e) { LogUtil.exception( - LOG, Level.WARN, e, "Failed to interrupt the generator {}", - nextGenerator.toString() + LOG, Level.WARN, e, "{}: failed to interrupt the generator {}", + getName(), nextGenerator.toString() ); } for(final StorageDriver nextDriver : driversMap.get(nextGenerator)) { try { nextDriver.shutdown(); + LOG.debug( + Markers.MSG, "{}: storage driver \"{}\" shut down", getName(), + nextDriver.toString() + ); } catch(final RemoteException e) { LogUtil.exception( - LOG, Level.WARN, e, "Failed to shutdown the driver {}", - nextDriver.toString() + LOG, Level.WARN, e, "failed to shutdown the driver {}", + getName(), nextDriver.toString() ); } } @@ -574,8 +582,8 @@ protected void doInterrupt() nextDriver.interrupt(); } catch(final RemoteException e) { LogUtil.exception( - LOG, Level.DEBUG, e, "Failed to interrupt the driver {}", - nextDriver.toString() + LOG, Level.DEBUG, e, "{}: failed to interrupt the driver {}", + getName(), nextDriver.toString() ); } } @@ -584,10 +592,14 @@ protected void doInterrupt() svcTaskExecutor.shutdownNow(); try { if(!svcTaskExecutor.awaitTermination(1, TimeUnit.SECONDS)) { - LOG.error(Markers.ERR, "Failed to terminate the service tasks in 1 second"); + LOG.error( + Markers.ERR, "{}: failed to terminate the service tasks in 1 second", getName() + ); } - } catch(final InterruptedException ignored) { + } catch(final InterruptedException e) { + assert false; } + LOG.debug(Markers.MSG, "{}: interrupted the load monitor", getName()); } @Override @@ -603,6 +615,11 @@ protected final void doClose() if(finalResults != null) { final int finalResultsCount = finalResults.size(); if(finalResultsCount > 0) { + LOG.debug( + Markers.MSG, + "{}: the driver \"{}\" returned {} final I/O results to process", + getName(), driver, finalResults + ); processIoResults( finalResults, finalResultsCount, circularityMap.get(generator) ); @@ -611,22 +628,33 @@ protected final void doClose() } catch(final Throwable cause) { LogUtil.exception( LOG, Level.WARN, cause, - "Failed to process the final results for the driver {}", driver + "{}: failed to process the final results for the driver {}", getName(), + driver ); } try { driver.close(); + LOG.debug( + Markers.MSG, "{}: the storage driver \"{}\" has been closed", getName(), + driver + ); } catch(final IOException e) { - LogUtil.exception(LOG, Level.WARN, e, "Failed to close the driver {}", driver); + LogUtil.exception( + LOG, Level.WARN, e, "{}: failed to close the driver {}", getName(), driver + ); } } try { generator.close(); + LOG.debug( + Markers.MSG, "{}: the load generator \"{}\" has been closed", getName(), + generator + ); } catch(final IOException e) { LogUtil.exception( - LOG, Level.WARN, e, "Failed to close the generator {}", generator + LOG, Level.WARN, e, "{}: failed to close the generator {}", getName(), generator ); } } @@ -684,8 +712,10 @@ protected final void doClose() if(itemInfoOutput != null) { itemInfoOutput.close(); + LOG.debug(Markers.MSG, "{}: closed the items output", getName()); } UNCLOSED.remove(this); + LOG.debug(Markers.MSG, "{}: closed the load monitor", getName()); } } diff --git a/load/monitor/src/main/java/com/emc/mongoose/load/monitor/metrics/BasicIoStats.java b/load/monitor/src/main/java/com/emc/mongoose/load/monitor/metrics/BasicIoStats.java index 7c119255fd..fa445267c1 100644 --- a/load/monitor/src/main/java/com/emc/mongoose/load/monitor/metrics/BasicIoStats.java +++ b/load/monitor/src/main/java/com/emc/mongoose/load/monitor/metrics/BasicIoStats.java @@ -78,7 +78,7 @@ protected static final class BasicSnapshot // public BasicSnapshot( final long countSucc, final double succRateLast, final long countFail, - final double failRate, final long countByte, final double byteRate, + final double failRateLast, final long countByte, final double byteRateLast, final long startTime, final long elapsedTime, final long sumDur, final long sumLat, final com.codahale.metrics.Snapshot durSnapshot, final com.codahale.metrics.Snapshot latSnapshot @@ -86,9 +86,9 @@ public BasicSnapshot( this.countSucc = countSucc; this.succRateLast = succRateLast; this.countFail = countFail; - this.failRateLast = failRate; + this.failRateLast = failRateLast; this.countByte = countByte; - this.byteRateLast = byteRate; + this.byteRateLast = byteRateLast; this.sumDur = sumDur; this.sumLat = sumLat; this.startTime = startTime; @@ -106,7 +106,7 @@ public final long getSuccCount() { // @Override public final double getSuccRateMean() { - return elapsedTime == 0 ? 0 : 1000 * countSucc / elapsedTime; + return elapsedTime == 0 ? 0 : 1000.0 * countSucc / elapsedTime; } // @Override @@ -121,7 +121,7 @@ public final long getFailCount() { // @Override public final double getFailRateMean() { - return elapsedTime == 0 ? 0 : 1000 * countFail / elapsedTime; + return elapsedTime == 0 ? 0 : 1000.0 * countFail / elapsedTime; } // @Override @@ -136,7 +136,7 @@ public final long getByteCount() { // @Override public final double getByteRateMean() { - return elapsedTime == 0 ? 0 : 1000 * countByte / elapsedTime; + return elapsedTime == 0 ? 0 : 1000.0 * countByte / elapsedTime; } // @Override diff --git a/load/monitor/src/main/java/com/emc/mongoose/load/monitor/metrics/MetricsCsvLogMessage.java b/load/monitor/src/main/java/com/emc/mongoose/load/monitor/metrics/MetricsCsvLogMessage.java index c4c86da1c9..6eb89e1192 100644 --- a/load/monitor/src/main/java/com/emc/mongoose/load/monitor/metrics/MetricsCsvLogMessage.java +++ b/load/monitor/src/main/java/com/emc/mongoose/load/monitor/metrics/MetricsCsvLogMessage.java @@ -71,8 +71,8 @@ public final void formatTo(final StringBuilder strb) { .append(nextSnapshot.getSuccCount()).append(',') .append(nextSnapshot.getFailCount()).append(',') .append(nextSnapshot.getByteCount()).append(',') - .append(TimeUnit.MILLISECONDS.toSeconds(nextSnapshot.getElapsedTime())).append(',') - .append(TimeUnit.MICROSECONDS.toSeconds(nextSnapshot.getDurationSum())).append(',') + .append(nextSnapshot.getElapsedTime() / 1000.0).append(',') + .append(nextSnapshot.getDurationSum() / 1000.0).append(',') .append(nextSnapshot.getSuccRateMean()).append(',') .append(nextSnapshot.getSuccRateLast()).append(',') .append(nextSnapshot.getByteRateMean()).append(',') diff --git a/load/monitor/src/main/java/com/emc/mongoose/load/monitor/metrics/MetricsStdoutLogMessage.java b/load/monitor/src/main/java/com/emc/mongoose/load/monitor/metrics/MetricsStdoutLogMessage.java index a2832506bd..7e2f4b67dd 100644 --- a/load/monitor/src/main/java/com/emc/mongoose/load/monitor/metrics/MetricsStdoutLogMessage.java +++ b/load/monitor/src/main/java/com/emc/mongoose/load/monitor/metrics/MetricsStdoutLogMessage.java @@ -63,7 +63,7 @@ private static void formatSingleSnapshot( .append(concurrency).append('x').append(driversCount) .append(": n=(").append(snapshot.getSuccCount()).append('/') .append(snapshot.getFailCount()).append("); t[s]=(") - .append(formatFixedWidth(snapshot.getElapsedTime() / 1000, 7)).append('/') + .append(formatFixedWidth(snapshot.getElapsedTime() / 1000.0, 7)).append('/') .append(formatFixedWidth(snapshot.getDurationSum() / M, 7)).append("); size=(") .append(formatFixedSize(snapshot.getByteCount())).append("); TP[op/s]=(") .append(formatFixedWidth(snapshot.getSuccRateMean(), 7)).append('/') @@ -97,7 +97,7 @@ private void formatMultiSnapshot(final StringBuilder buffer) { strb.appendFixedWidthPadLeft(snapshot.getFailCount(), 6, ' ').append('|'); strb .appendFixedWidthPadLeft( - formatFixedWidth(snapshot.getElapsedTime() / 1000, 7), 7, ' ' + formatFixedWidth(snapshot.getElapsedTime() / 1000.0, 7), 7, ' ' ) .append('|'); strb.appendFixedWidthPadRight(snapshot.getSuccRateMean(), 8, ' ').append('|'); diff --git a/common/src/main/java/com/emc/mongoose/common/concurrent/DaemonBase.java b/model/src/main/java/com/emc/mongoose/model/DaemonBase.java similarity index 80% rename from common/src/main/java/com/emc/mongoose/common/concurrent/DaemonBase.java rename to model/src/main/java/com/emc/mongoose/model/DaemonBase.java index d84c375ec4..c721ab8200 100644 --- a/common/src/main/java/com/emc/mongoose/common/concurrent/DaemonBase.java +++ b/model/src/main/java/com/emc/mongoose/model/DaemonBase.java @@ -1,10 +1,9 @@ -package com.emc.mongoose.common.concurrent; +package com.emc.mongoose.model; + +import com.emc.mongoose.common.concurrent.Daemon; import java.io.IOException; import java.rmi.RemoteException; -import java.util.ConcurrentModificationException; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import static com.emc.mongoose.common.concurrent.Daemon.State.CLOSED; @@ -19,8 +18,6 @@ public abstract class DaemonBase implements Daemon { - protected static final Queue UNCLOSED = new ConcurrentLinkedQueue<>(); - private AtomicReference stateRef = new AtomicReference<>(INITIAL); protected final Object state = new Object(); @@ -122,25 +119,4 @@ public void close() public final boolean isClosed() { return stateRef.get().equals(CLOSED); } - - public synchronized static void closeAll() { - // close all unclosed daemons - for(final Daemon d : UNCLOSED) { - try { - d.close(); - } catch(final IllegalStateException | ConcurrentModificationException ignored) { - } catch(final Throwable t) { - t.printStackTrace(System.err); - } - } - - // wait until the list of the unclosed daemons is empty - while(!UNCLOSED.isEmpty()) { - try { - TimeUnit.SECONDS.sleep(1); - } catch(final InterruptedException e) { - break; - } - } - } } diff --git a/model/src/main/java/com/emc/mongoose/model/ServiceTaskDispatcher.java b/model/src/main/java/com/emc/mongoose/model/ServiceTaskDispatcher.java new file mode 100644 index 0000000000..422d2ec8fb --- /dev/null +++ b/model/src/main/java/com/emc/mongoose/model/ServiceTaskDispatcher.java @@ -0,0 +1,47 @@ +package com.emc.mongoose.model; + +import com.emc.mongoose.common.concurrent.Daemon; + +import java.rmi.RemoteException; +import java.util.Map; + +/** + Created by kurila on 30.11.16. + */ +public final class ServiceTaskDispatcher +implements Runnable { + + private final Map dispatchTasks; + + public ServiceTaskDispatcher(final Map dispatchTasks) { + this.dispatchTasks = dispatchTasks; + } + + @Override + public final void run() { + try { + while(true) { + Runnable nextDispatchTask; + for(final Daemon taskOwner : dispatchTasks.keySet()) { + nextDispatchTask = dispatchTasks.get(taskOwner); + if(nextDispatchTask != null) { + try { + nextDispatchTask.run(); + } catch(final Exception e) { + if( + taskOwner != null && !taskOwner.isInterrupted() && + !taskOwner.isClosed() + ) { + e.printStackTrace(System.err); + } + } + Thread.sleep(1); + } + } + } + } catch(final InterruptedException | RemoteException ignored) { + } finally { + dispatchTasks.clear(); + } + } +} diff --git a/model/src/main/java/com/emc/mongoose/model/storage/StorageDriver.java b/model/src/main/java/com/emc/mongoose/model/storage/StorageDriver.java index 6a0761bc90..792fabab5b 100644 --- a/model/src/main/java/com/emc/mongoose/model/storage/StorageDriver.java +++ b/model/src/main/java/com/emc/mongoose/model/storage/StorageDriver.java @@ -4,6 +4,7 @@ import com.emc.mongoose.common.concurrent.Daemon; import com.emc.mongoose.common.io.Output; import com.emc.mongoose.common.net.ServiceUtil; +import com.emc.mongoose.model.ServiceTaskDispatcher; import com.emc.mongoose.model.io.IoType; import com.emc.mongoose.model.io.task.IoTask; import static com.emc.mongoose.model.io.task.IoTask.IoResult; @@ -14,12 +15,24 @@ import java.rmi.Remote; import java.rmi.RemoteException; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** Created on 11.07.16. */ public interface StorageDriver, R extends IoResult> extends Daemon, Output, Remote { + + Map SVC_TASKS = new ConcurrentHashMap<>(); + Thread SVC_TASKS_WORKER = new Thread( + new ServiceTaskDispatcher(SVC_TASKS), "ioTasksDispatcher" + ) { + { + setDaemon(true); + start(); + } + }; String HOST_ADDR = ServiceUtil.getHostAddr(); int BUFF_SIZE_MIN = 0x1000; @@ -44,6 +57,12 @@ List getResults() int getActiveTaskCount() throws RemoteException; + + long getScheduledTaskCount() + throws RemoteException; + + long getCompletedTaskCount() + throws RemoteException; boolean isIdle() throws RemoteException; diff --git a/model/src/test/java/com/emc/mongoose/model/impl/load/UniformOptionSelectorTest.java b/model/src/test/java/com/emc/mongoose/model/impl/UniformOptionSelectorTest.java similarity index 97% rename from model/src/test/java/com/emc/mongoose/model/impl/load/UniformOptionSelectorTest.java rename to model/src/test/java/com/emc/mongoose/model/impl/UniformOptionSelectorTest.java index 54f644bb7f..c120bbf510 100644 --- a/model/src/test/java/com/emc/mongoose/model/impl/load/UniformOptionSelectorTest.java +++ b/model/src/test/java/com/emc/mongoose/model/impl/UniformOptionSelectorTest.java @@ -1,4 +1,4 @@ -package com.emc.mongoose.model.impl.load; +package com.emc.mongoose.model.impl; import com.emc.mongoose.common.io.Input; import com.emc.mongoose.common.io.UniformOptionSelector; diff --git a/scenario/misc/jira-issue-sltm843.json b/scenario/misc/jira-issue-sltm843.json new file mode 100644 index 0000000000..dbf10b1ee0 --- /dev/null +++ b/scenario/misc/jira-issue-sltm843.json @@ -0,0 +1,20 @@ +{ + "type" : "precondition", + "config" : { + "item" : { + "data" : { + "size" : "100KB" + } + }, + "load" : { + "concurrency" : 40, + "limit" : { + "count" : 100000, + "time" : "10m" + }, + "metrics" : { + "period" : 1 + } + } + } +} diff --git a/storage/driver/base/src/main/java/com/emc/mongoose/storage/driver/base/CommonDispatchTask.java b/storage/driver/base/src/main/java/com/emc/mongoose/storage/driver/base/CommonDispatchTask.java deleted file mode 100644 index d7e8aaf25c..0000000000 --- a/storage/driver/base/src/main/java/com/emc/mongoose/storage/driver/base/CommonDispatchTask.java +++ /dev/null @@ -1,61 +0,0 @@ -package com.emc.mongoose.storage.driver.base; - -import com.emc.mongoose.model.storage.StorageDriver; -import com.emc.mongoose.ui.log.LogUtil; -import com.emc.mongoose.ui.log.Markers; - -import org.apache.logging.log4j.Level; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.rmi.RemoteException; -import java.util.Map; - -/** - Created by kurila on 30.11.16. - */ -public final class CommonDispatchTask -implements Runnable { - - private static final Logger LOG = LogManager.getLogger(); - - private final Map dispatchTasks; - - public CommonDispatchTask(final Map dispatchTasks) { - this.dispatchTasks = dispatchTasks; - } - - @Override - public final void run() { - try { - while(true) { - Runnable nextDispatchTask; - for(final StorageDriver taskOwner : dispatchTasks.keySet()) { - nextDispatchTask = dispatchTasks.get(taskOwner); - if(nextDispatchTask != null) { - try { - nextDispatchTask.run(); - } catch(final Exception e) { - if( - taskOwner != null && !taskOwner.isInterrupted() && - !taskOwner.isClosed() - ) { - LogUtil.exception( - LOG, Level.WARN, e, - "Failed to invoke the I/O task dispatching for the \"{}\"", - taskOwner - ); - } - } - Thread.sleep(1); - } - } - } - } catch(final InterruptedException e) { - LOG.debug(Markers.MSG, "Interrupted"); - } catch(final RemoteException ignored) { - } finally { - dispatchTasks.clear(); - } - } -} diff --git a/storage/driver/base/src/main/java/com/emc/mongoose/storage/driver/base/StorageDriverBase.java b/storage/driver/base/src/main/java/com/emc/mongoose/storage/driver/base/StorageDriverBase.java index d43898e7b8..0f9a3ff7e1 100644 --- a/storage/driver/base/src/main/java/com/emc/mongoose/storage/driver/base/StorageDriverBase.java +++ b/storage/driver/base/src/main/java/com/emc/mongoose/storage/driver/base/StorageDriverBase.java @@ -1,6 +1,6 @@ package com.emc.mongoose.storage.driver.base; -import com.emc.mongoose.common.concurrent.DaemonBase; +import com.emc.mongoose.model.DaemonBase; import static com.emc.mongoose.common.Constants.BATCH_SIZE; import static com.emc.mongoose.ui.config.Config.LoadConfig; import static com.emc.mongoose.ui.config.Config.StorageConfig.AuthConfig; @@ -25,10 +25,8 @@ import java.rmi.RemoteException; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.LongAdder; @@ -42,16 +40,7 @@ public abstract class StorageDriverBase, implements StorageDriver { private static final Logger LOG = LogManager.getLogger(); - private static final Map DISPATCH_INBOUND_TASKS = new ConcurrentHashMap<>(); - static { - new Thread(new CommonDispatchTask(DISPATCH_INBOUND_TASKS), "ioTasksDispatcher") { - { - setDaemon(true); - start(); - } - }; - } - + private final int queueCapacity; private final BlockingQueue ownTasksQueue; private final BlockingQueue inTasksQueue; @@ -75,9 +64,9 @@ public abstract class StorageDriverBase, private final boolean useRespLatencyResult; private final boolean useDataLatencyResult; private final boolean useTransferSizeResult; - - private final LongAdder inCount = new LongAdder(); - private final LongAdder outCount = new LongAdder(); + + private final LongAdder scheduledTaskCount = new LongAdder(); + private final LongAdder completedTaskCount = new LongAdder(); protected StorageDriverBase( final String jobName, final AuthConfig authConfig, final LoadConfig loadConfig, @@ -109,7 +98,7 @@ protected StorageDriverBase( useDataLatencyResult = traceConfig.getDataLatency(); useTransferSizeResult = traceConfig.getTransferSize(); - DISPATCH_INBOUND_TASKS.put(this, new IoTasksDispatch()); + SVC_TASKS.put(this, new IoTasksDispatch()); } private final class IoTasksDispatch @@ -138,13 +127,6 @@ public final void run() { } } - private void outputCurrentMetrics() { - LOG.info( - Markers.MSG, "{} I/O tasks: scheduled={}, active={}, completed={}", toString(), - inCount.sum(), getActiveTaskCount(), outCount.sum() - ); - } - @Override public final boolean put(final O task) throws IOException { @@ -152,7 +134,7 @@ public final boolean put(final O task) throw new EOFException(); } if(inTasksQueue.offer(task)) { - inCount.increment(); + scheduledTaskCount.increment(); return true; } else { return false; @@ -170,7 +152,7 @@ public final int put(final List tasks, final int from, final int to) { } } final int n = i - from; - inCount.add(n); + scheduledTaskCount.add(n); return n; } @@ -184,7 +166,7 @@ public final int put(final List tasks) { break; } } - inCount.add(n); + scheduledTaskCount.add(n); return n; } @@ -192,6 +174,16 @@ public final int put(final List tasks) { public int getActiveTaskCount() { return concurrencyLevel - concurrencyThrottle.availablePermits(); } + + @Override + public final long getScheduledTaskCount() { + return scheduledTaskCount.sum(); + } + + @Override + public final long getCompletedTaskCount() { + return completedTaskCount.sum(); + } @Override public final boolean isIdle() { @@ -222,7 +214,7 @@ public List getResults() @SuppressWarnings("unchecked") protected final void ioTaskCompleted(final O ioTask) { - outCount.increment(); + completedTaskCount.increment(); try { if(isCircular) { @@ -351,15 +343,15 @@ public void setAuthToken(final String authToken) { @Override protected void doShutdown() { - DISPATCH_INBOUND_TASKS.remove(this); + SVC_TASKS.remove(this); LOG.info(Markers.MSG, "{}: shut down", toString()); } @Override protected void doInterrupt() { try { - if(!concurrencyThrottle.tryAcquire(concurrencyLevel, 1, TimeUnit.SECONDS)) { - LOG.warn(Markers.ERR, "Failed to await the idle state"); + if(!concurrencyThrottle.tryAcquire(concurrencyLevel, 10, TimeUnit.MILLISECONDS)) { + LOG.debug(Markers.MSG, "{}: interrupting while not in the idle state", toString()); } } catch(final InterruptedException e) { LogUtil.exception(LOG, Level.WARN, e, "Failed to await the idle state"); diff --git a/storage/driver/service/src/main/java/com/emc/mongoose/storage/driver/service/BasicStorageDriverBuilderSvc.java b/storage/driver/service/src/main/java/com/emc/mongoose/storage/driver/service/BasicStorageDriverBuilderSvc.java index 626201e409..327874a7bf 100644 --- a/storage/driver/service/src/main/java/com/emc/mongoose/storage/driver/service/BasicStorageDriverBuilderSvc.java +++ b/storage/driver/service/src/main/java/com/emc/mongoose/storage/driver/service/BasicStorageDriverBuilderSvc.java @@ -135,7 +135,9 @@ public final void close() public final String buildRemotely() throws IOException, UserShootHisFootException { final StorageDriver driver = build(); - final T wrapper = (T) new WrappingStorageDriverSvc<>(driver, getContentSource()); + final T wrapper = (T) new WrappingStorageDriverSvc<>( + driver, getContentSource(), getLoadConfig().getMetricsConfig().getPeriod() + ); return wrapper.getName(); } } diff --git a/storage/driver/service/src/main/java/com/emc/mongoose/storage/driver/service/WrappingStorageDriverSvc.java b/storage/driver/service/src/main/java/com/emc/mongoose/storage/driver/service/WrappingStorageDriverSvc.java index 01b368257a..d6dd125055 100644 --- a/storage/driver/service/src/main/java/com/emc/mongoose/storage/driver/service/WrappingStorageDriverSvc.java +++ b/storage/driver/service/src/main/java/com/emc/mongoose/storage/driver/service/WrappingStorageDriverSvc.java @@ -32,16 +32,54 @@ public final class WrappingStorageDriverSvc< implements StorageDriverSvc { private static final Logger LOG = LogManager.getLogger(); - + private final StorageDriver driver; private final ContentSource contentSrc; public WrappingStorageDriverSvc( - final StorageDriver driver, final ContentSource contentSrc + final StorageDriver driver, final ContentSource contentSrc, + final long metricsPeriodSec ) { this.driver = driver; this.contentSrc = contentSrc; LOG.info(Markers.MSG, "Service started: " + ServiceUtil.create(this)); + if(metricsPeriodSec > 0 && metricsPeriodSec < Long.MAX_VALUE) { + SVC_TASKS.put(this, new StateReportingTask(this, metricsPeriodSec)); + } + } + + private final static class StateReportingTask + implements Runnable { + + private final StorageDriverSvc storageDriver; + private final long metricsPeriodNanoSec; + + private long prevNanoTimeStamp; + private long nextNanoTimeStamp; + + public StateReportingTask( + final StorageDriverSvc storageDriver, final long metricsPeriodSec + ) { + this.storageDriver = storageDriver; + this.metricsPeriodNanoSec = TimeUnit.SECONDS.toNanos(metricsPeriodSec); + this.prevNanoTimeStamp = 0; + } + + @Override + public final void run() { + nextNanoTimeStamp = System.nanoTime(); + if(metricsPeriodNanoSec < nextNanoTimeStamp - prevNanoTimeStamp) { + prevNanoTimeStamp = nextNanoTimeStamp; + try { + LOG.info( + Markers.MSG, "{} I/O tasks: scheduled={}, active={}, completed={}", + storageDriver.getName(), storageDriver.getScheduledTaskCount(), + storageDriver.getActiveTaskCount(), storageDriver.getCompletedTaskCount() + ); + } catch(final RemoteException ignored) { + } + } + } } @Override @@ -65,6 +103,7 @@ public final List getResults() @Override public final void close() throws IOException { + SVC_TASKS.remove(this); driver.close(); contentSrc.close(); LOG.info(Markers.MSG, "Service closed: " + ServiceUtil.close(this)); @@ -188,6 +227,18 @@ public final int getActiveTaskCount() throws RemoteException { return driver.getActiveTaskCount(); } + + @Override + public final long getScheduledTaskCount() + throws RemoteException { + return driver.getScheduledTaskCount(); + } + + @Override + public final long getCompletedTaskCount() + throws RemoteException { + return driver.getCompletedTaskCount(); + } @Override public final boolean isIdle() diff --git a/storage/mock/src/main/java/com/emc/mongoose/storage/mock/impl/base/BasicStorageMockClient.java b/storage/mock/src/main/java/com/emc/mongoose/storage/mock/impl/base/BasicStorageMockClient.java index a6468ec640..26f3b175a4 100644 --- a/storage/mock/src/main/java/com/emc/mongoose/storage/mock/impl/base/BasicStorageMockClient.java +++ b/storage/mock/src/main/java/com/emc/mongoose/storage/mock/impl/base/BasicStorageMockClient.java @@ -1,7 +1,7 @@ package com.emc.mongoose.storage.mock.impl.base; import com.emc.mongoose.common.concurrent.AnyNotNullSharedFutureTaskBase; -import com.emc.mongoose.common.concurrent.DaemonBase; +import com.emc.mongoose.model.DaemonBase; import com.emc.mongoose.common.concurrent.TaskSequencer; import com.emc.mongoose.common.concurrent.ThreadUtil; import com.emc.mongoose.model.data.ContentSource; @@ -40,7 +40,6 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.LockSupport; import java.util.function.Consumer; import static com.emc.mongoose.storage.mock.impl.http.Nagaina.SVC_NAME; diff --git a/storage/mock/src/main/java/com/emc/mongoose/storage/mock/impl/base/BasicStorageMockServer.java b/storage/mock/src/main/java/com/emc/mongoose/storage/mock/impl/base/BasicStorageMockServer.java index 5e4bd24b35..7f479698a6 100644 --- a/storage/mock/src/main/java/com/emc/mongoose/storage/mock/impl/base/BasicStorageMockServer.java +++ b/storage/mock/src/main/java/com/emc/mongoose/storage/mock/impl/base/BasicStorageMockServer.java @@ -1,6 +1,6 @@ package com.emc.mongoose.storage.mock.impl.base; -import com.emc.mongoose.common.concurrent.DaemonBase; +import com.emc.mongoose.model.DaemonBase; import com.emc.mongoose.storage.mock.api.MutableDataItemMock; import com.emc.mongoose.storage.mock.api.StorageMock; import com.emc.mongoose.storage.mock.api.StorageMockServer; diff --git a/storage/mock/src/main/java/com/emc/mongoose/storage/mock/impl/base/StorageMockBase.java b/storage/mock/src/main/java/com/emc/mongoose/storage/mock/impl/base/StorageMockBase.java index 6ea9168263..1e29d24a22 100644 --- a/storage/mock/src/main/java/com/emc/mongoose/storage/mock/impl/base/StorageMockBase.java +++ b/storage/mock/src/main/java/com/emc/mongoose/storage/mock/impl/base/StorageMockBase.java @@ -1,7 +1,7 @@ package com.emc.mongoose.storage.mock.impl.base; import com.emc.mongoose.common.collection.ListingLRUMap; -import com.emc.mongoose.common.concurrent.DaemonBase; +import com.emc.mongoose.model.DaemonBase; import com.emc.mongoose.model.data.ContentSource; import com.emc.mongoose.model.item.ItemFactory; import com.emc.mongoose.model.item.CsvFileItemInput; diff --git a/storage/mock/src/main/java/com/emc/mongoose/storage/mock/impl/http/NagainaNode.java b/storage/mock/src/main/java/com/emc/mongoose/storage/mock/impl/http/NagainaNode.java index 887fd13906..c3472ee830 100644 --- a/storage/mock/src/main/java/com/emc/mongoose/storage/mock/impl/http/NagainaNode.java +++ b/storage/mock/src/main/java/com/emc/mongoose/storage/mock/impl/http/NagainaNode.java @@ -1,6 +1,6 @@ package com.emc.mongoose.storage.mock.impl.http; -import com.emc.mongoose.common.concurrent.DaemonBase; +import com.emc.mongoose.model.DaemonBase; import com.emc.mongoose.common.exception.OmgDoesNotPerformException; import com.emc.mongoose.common.exception.OmgLookAtMyConsoleException; import com.emc.mongoose.common.net.NetUtil; diff --git a/ui/src/main/java/com/emc/mongoose/ui/log/LogUtil.java b/ui/src/main/java/com/emc/mongoose/ui/log/LogUtil.java index 8d5074d879..551b06c532 100644 --- a/ui/src/main/java/com/emc/mongoose/ui/log/LogUtil.java +++ b/ui/src/main/java/com/emc/mongoose/ui/log/LogUtil.java @@ -1,6 +1,6 @@ package com.emc.mongoose.ui.log; -import com.emc.mongoose.common.concurrent.DaemonBase; +import com.emc.mongoose.common.concurrent.Daemon; import com.emc.mongoose.common.env.PathUtil; import org.apache.logging.log4j.Level; @@ -165,7 +165,7 @@ public final void run() { } // public static void shutdown() { - DaemonBase.closeAll(); + Daemon.closeAll(); // stop the logging LOG_CTX_LOCK.lock(); try {