diff --git a/common/src/main/java/com/emc/mongoose/common/concurrent/NamingThreadFactory.java b/common/src/main/java/com/emc/mongoose/common/concurrent/NamingThreadFactory.java deleted file mode 100644 index 143356f9b2..0000000000 --- a/common/src/main/java/com/emc/mongoose/common/concurrent/NamingThreadFactory.java +++ /dev/null @@ -1,37 +0,0 @@ -package com.emc.mongoose.common.concurrent; - -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicInteger; - -/** - Created by kurila on 19.07.16. - */ -public class NamingThreadFactory - implements ThreadFactory { - // - protected final AtomicInteger threadNumber = new AtomicInteger(0); - protected final String threadNamePrefix; - protected final boolean daemonFlag; - // - public NamingThreadFactory(final String threadNamePrefix) { - this.threadNamePrefix = threadNamePrefix; - this.daemonFlag = false; - } - // - public NamingThreadFactory(final String threadNamePrefix, final boolean daemonFlag) { - this.threadNamePrefix = threadNamePrefix; - this.daemonFlag = daemonFlag; - } - // - @Override - public Thread newThread(final Runnable task) { - final Thread t = new Thread(task, threadNamePrefix + "#" + threadNumber.incrementAndGet()); - t.setDaemon(daemonFlag); - return t; - } - // - @Override - public final String toString() { - return threadNamePrefix; - } -} diff --git a/load/monitor/src/main/java/com/emc/mongoose/load/monitor/BasicLoadMonitor.java b/load/monitor/src/main/java/com/emc/mongoose/load/monitor/BasicLoadMonitor.java index a0bcae9fba..b70cd14557 100644 --- a/load/monitor/src/main/java/com/emc/mongoose/load/monitor/BasicLoadMonitor.java +++ b/load/monitor/src/main/java/com/emc/mongoose/load/monitor/BasicLoadMonitor.java @@ -3,7 +3,7 @@ import com.emc.mongoose.common.api.SizeInBytes; import com.emc.mongoose.common.concurrent.ThreadUtil; import com.emc.mongoose.model.DaemonBase; -import com.emc.mongoose.common.concurrent.NamingThreadFactory; +import com.emc.mongoose.ui.log.NamingThreadFactory; import com.emc.mongoose.common.concurrent.Throttle; import com.emc.mongoose.load.monitor.metrics.ExtResultsXmlLogMessage; import com.emc.mongoose.load.monitor.metrics.IoTraceCsvBatchLogMessage; @@ -40,7 +40,6 @@ import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.core.config.Order; import java.io.IOException; import java.rmi.NoSuchObjectException; @@ -498,7 +497,8 @@ protected void doShutdown() throws IllegalStateException { final ExecutorService shutdownExecutor = Executors.newFixedThreadPool( - ThreadUtil.getHardwareConcurrencyLevel() + ThreadUtil.getHardwareConcurrencyLevel(), + new NamingThreadFactory("shutdownWorker", true) ); for(final LoadGenerator nextGenerator : driversMap.keySet()) { @@ -603,7 +603,8 @@ protected void doInterrupt() throws IllegalStateException { final ExecutorService interruptExecutor = Executors.newFixedThreadPool( - ThreadUtil.getHardwareConcurrencyLevel() + ThreadUtil.getHardwareConcurrencyLevel(), + new NamingThreadFactory("interruptWorker", true) ); for(final LoadGenerator nextGenerator : driversMap.keySet()) { @@ -656,7 +657,8 @@ protected final void doClose() throws IOException { final ExecutorService ioResultsGetAndApplyExecutor = Executors.newFixedThreadPool( - ThreadUtil.getHardwareConcurrencyLevel() + ThreadUtil.getHardwareConcurrencyLevel(), + new NamingThreadFactory("ioResultsGetAndApplyWorker", true) ); for(final LoadGenerator generator : driversMap.keySet()) { @@ -673,30 +675,32 @@ protected final void doClose() LOG.debug( Markers.MSG, "{}: the driver \"{}\" returned {} final I/O results to process", - getName(), driver, finalResults + getName(), driver.toString(), finalResults.size() ); processIoResults( - finalResults, finalResultsCount, circularityMap.get(generator) + finalResults, finalResultsCount, + circularityMap.get(generator) ); } } } catch(final Throwable cause) { LogUtil.exception( LOG, Level.WARN, cause, - "{}: failed to process the final results for the driver {}", getName(), - driver + "{}: failed to process the final results for the driver {}", + getName(), driver.toString() ); } try { driver.close(); LOG.debug( - Markers.MSG, "{}: the storage driver \"{}\" has been closed", getName(), - driver + Markers.MSG, "{}: the storage driver \"{}\" has been closed", + getName(), driver.toString() ); } catch(final IOException e) { LogUtil.exception( - LOG, Level.WARN, e, "{}: failed to close the driver {}", getName(), driver + LOG, Level.WARN, e, "{}: failed to close the driver {}", getName(), + driver.toString() ); } } diff --git a/run/src/main/java/com/emc/mongoose/run/scenario/CommandJob.java b/run/src/main/java/com/emc/mongoose/run/scenario/CommandJob.java index 24d07b4d82..50ec00a011 100644 --- a/run/src/main/java/com/emc/mongoose/run/scenario/CommandJob.java +++ b/run/src/main/java/com/emc/mongoose/run/scenario/CommandJob.java @@ -1,6 +1,6 @@ package com.emc.mongoose.run.scenario; -import com.emc.mongoose.common.concurrent.NamingThreadFactory; +import com.emc.mongoose.ui.log.NamingThreadFactory; import com.emc.mongoose.ui.config.Config; import com.emc.mongoose.ui.log.LogUtil; import static com.emc.mongoose.ui.log.LogUtil.BLUE; diff --git a/run/src/main/java/com/emc/mongoose/run/scenario/JobBase.java b/run/src/main/java/com/emc/mongoose/run/scenario/JobBase.java index c495125fcf..45e8ab9932 100644 --- a/run/src/main/java/com/emc/mongoose/run/scenario/JobBase.java +++ b/run/src/main/java/com/emc/mongoose/run/scenario/JobBase.java @@ -4,6 +4,11 @@ import static com.emc.mongoose.ui.config.Config.LoadConfig.JobConfig; import static com.emc.mongoose.common.Constants.KEY_JOB_NAME; +import com.emc.mongoose.ui.log.LogUtil; +import com.emc.mongoose.ui.log.Markers; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.ThreadContext; /** @@ -12,6 +17,8 @@ public abstract class JobBase implements Job { + private static final Logger LOG = LogManager.getLogger(); + protected final Config localConfig; protected JobBase(final Config appConfig) { @@ -26,15 +33,20 @@ public final Config getConfig() { @Override public void run() { final JobConfig jobConfig = localConfig.getLoadConfig().getJobConfig(); - String jobName = jobConfig.getName(); - if(jobName == null) { - jobName = ThreadContext.get(KEY_JOB_NAME); - jobConfig.setName(jobName); - } else { - ThreadContext.put(KEY_JOB_NAME, jobName); - } - if(jobName == null) { - throw new IllegalStateException("Job name is not set"); + try { + String jobName = jobConfig.getName(); + if(jobName == null) { + jobName = ThreadContext.get(KEY_JOB_NAME); + if(jobName == null) { + LOG.fatal(Markers.ERR, "Job name is not set"); + } else { + jobConfig.setName(jobName); + } + } else { + ThreadContext.put(KEY_JOB_NAME, jobName); + } + } catch(final Throwable t) { + LogUtil.exception(LOG, Level.ERROR, t, "Unexpected failure"); } } } diff --git a/run/src/main/java/com/emc/mongoose/run/scenario/JsonScenario.java b/run/src/main/java/com/emc/mongoose/run/scenario/JsonScenario.java index 03034202ed..0cf5e26cd4 100644 --- a/run/src/main/java/com/emc/mongoose/run/scenario/JsonScenario.java +++ b/run/src/main/java/com/emc/mongoose/run/scenario/JsonScenario.java @@ -188,6 +188,7 @@ protected final synchronized boolean append(final Job job) { // @Override public final void run() { + LOG.info(Markers.MSG, "Scenario start"); super.run(); LOG.info(Markers.MSG, "Scenario end"); } diff --git a/run/src/main/java/com/emc/mongoose/run/scenario/ParallelJob.java b/run/src/main/java/com/emc/mongoose/run/scenario/ParallelJob.java index ca00f451a9..cf67074a04 100644 --- a/run/src/main/java/com/emc/mongoose/run/scenario/ParallelJob.java +++ b/run/src/main/java/com/emc/mongoose/run/scenario/ParallelJob.java @@ -1,6 +1,6 @@ package com.emc.mongoose.run.scenario; -import com.emc.mongoose.common.concurrent.NamingThreadFactory; +import com.emc.mongoose.ui.log.NamingThreadFactory; import com.emc.mongoose.ui.config.Config; import com.emc.mongoose.ui.log.Markers; import org.apache.logging.log4j.LogManager; @@ -28,14 +28,16 @@ protected ParallelJob(final Config appConfig, final Map subTree) public final synchronized void run() { super.run(); - + final ExecutorService parallelJobsExecutor = Executors.newFixedThreadPool( childJobs.size(), new NamingThreadFactory("jobWorker" + hashCode(), true) ); for(final Job subJob : childJobs) { parallelJobsExecutor.submit(subJob); } - LOG.debug(Markers.MSG, "{}: started {} child jobs", toString(), childJobs.size()); + LOG.info( + Markers.MSG, "{}: execute {} child jobs in parallel", toString(), childJobs.size() + ); parallelJobsExecutor.shutdown(); final long limitTime = localConfig.getLoadConfig().getLimitConfig().getTime(); @@ -45,12 +47,15 @@ public final synchronized void run() { } else { parallelJobsExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); } - LOG.debug(Markers.MSG, "{}: {} child jobs done", toString(), childJobs.size()); } catch(final InterruptedException e) { LOG.debug(Markers.MSG, "{}: interrupted the child jobs execution", toString()); } finally { parallelJobsExecutor.shutdownNow(); } + LOG.info( + Markers.MSG, "{}: finished parallel execution of {} child jobs", toString(), + childJobs.size() + ); } // @Override diff --git a/run/src/main/java/com/emc/mongoose/run/scenario/SequentialJob.java b/run/src/main/java/com/emc/mongoose/run/scenario/SequentialJob.java index 9e378c96a3..1bf47c5375 100644 --- a/run/src/main/java/com/emc/mongoose/run/scenario/SequentialJob.java +++ b/run/src/main/java/com/emc/mongoose/run/scenario/SequentialJob.java @@ -29,12 +29,14 @@ public String toString() { @Override public synchronized void run() { super.run(); - LOG.debug(Markers.MSG, "{}: start {} child jobs", toString(), childJobs.size()); + LOG.info( + Markers.MSG, "{}: execute {} child jobs sequentially", toString(), childJobs.size() + ); for(final Job subJob : childJobs) { LOG.debug(Markers.MSG, "{}: child job \"{}\" start", toString(), subJob.toString()); subJob.run(); LOG.debug(Markers.MSG, "{}: child job \"{}\" is done", toString(), subJob.toString()); } - LOG.debug(Markers.MSG, "{}: end", toString()); + LOG.info(Markers.MSG, "{}: finished the sequential execution of {} child jobs", toString()); } } diff --git a/scenario/misc/sanity.json b/scenario/misc/sanity.json index fb11a0b142..4015e4d34b 100644 --- a/scenario/misc/sanity.json +++ b/scenario/misc/sanity.json @@ -5,11 +5,6 @@ "output" : { "path" : "/default" } - }, - "storage" : { - "driver" : { - "remote" : true - } } }, "jobs" : [ diff --git a/storage/driver/net/base/src/main/java/com/emc/mongoose/storage/driver/net/base/NetStorageDriverBase.java b/storage/driver/net/base/src/main/java/com/emc/mongoose/storage/driver/net/base/NetStorageDriverBase.java index 486dbd7ca3..7f2ac11404 100644 --- a/storage/driver/net/base/src/main/java/com/emc/mongoose/storage/driver/net/base/NetStorageDriverBase.java +++ b/storage/driver/net/base/src/main/java/com/emc/mongoose/storage/driver/net/base/NetStorageDriverBase.java @@ -1,7 +1,7 @@ package com.emc.mongoose.storage.driver.net.base; import com.emc.mongoose.common.api.SizeInBytes; -import com.emc.mongoose.common.concurrent.NamingThreadFactory; +import com.emc.mongoose.ui.log.NamingThreadFactory; import com.emc.mongoose.common.concurrent.ThreadUtil; import com.emc.mongoose.common.net.ssl.SslContext; import com.emc.mongoose.common.io.Input; diff --git a/storage/driver/nio/base/src/main/java/com/emc/mongoose/storage/driver/nio/base/NioStorageDriverBase.java b/storage/driver/nio/base/src/main/java/com/emc/mongoose/storage/driver/nio/base/NioStorageDriverBase.java index ece6c91cac..abe98b7248 100644 --- a/storage/driver/nio/base/src/main/java/com/emc/mongoose/storage/driver/nio/base/NioStorageDriverBase.java +++ b/storage/driver/nio/base/src/main/java/com/emc/mongoose/storage/driver/nio/base/NioStorageDriverBase.java @@ -1,6 +1,6 @@ package com.emc.mongoose.storage.driver.nio.base; -import com.emc.mongoose.common.concurrent.NamingThreadFactory; +import com.emc.mongoose.ui.log.NamingThreadFactory; import com.emc.mongoose.common.concurrent.ThreadUtil; import com.emc.mongoose.model.io.task.IoTask; import static com.emc.mongoose.model.io.task.IoTask.IoResult; diff --git a/storage/mock/src/main/java/com/emc/mongoose/storage/mock/impl/base/BasicStorageMockClient.java b/storage/mock/src/main/java/com/emc/mongoose/storage/mock/impl/base/BasicStorageMockClient.java index 26f3b175a4..7fbfbfe1e3 100644 --- a/storage/mock/src/main/java/com/emc/mongoose/storage/mock/impl/base/BasicStorageMockClient.java +++ b/storage/mock/src/main/java/com/emc/mongoose/storage/mock/impl/base/BasicStorageMockClient.java @@ -12,6 +12,7 @@ import com.emc.mongoose.storage.mock.impl.remote.MDns; import com.emc.mongoose.ui.log.LogUtil; import com.emc.mongoose.ui.log.Markers; +import com.emc.mongoose.ui.log.NamingThreadFactory; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -64,6 +65,7 @@ public BasicStorageMockClient(final ContentSource contentSrc, final JmDNS jmDns) this.executor = new ThreadPoolExecutor( ThreadUtil.getHardwareConcurrencyLevel(), ThreadUtil.getHardwareConcurrencyLevel(), 0, TimeUnit.DAYS, new ArrayBlockingQueue<>(TaskSequencer.DEFAULT_TASK_QUEUE_SIZE_LIMIT), + new NamingThreadFactory("storageMockClientWorker", true), (r, e) -> LOG.error("Task {} rejected", r.toString()) ) { @Override diff --git a/storage/mock/src/main/java/com/emc/mongoose/storage/mock/impl/http/Nagaina.java b/storage/mock/src/main/java/com/emc/mongoose/storage/mock/impl/http/Nagaina.java index 43dc8deade..4255e03bce 100644 --- a/storage/mock/src/main/java/com/emc/mongoose/storage/mock/impl/http/Nagaina.java +++ b/storage/mock/src/main/java/com/emc/mongoose/storage/mock/impl/http/Nagaina.java @@ -1,6 +1,6 @@ package com.emc.mongoose.storage.mock.impl.http; -import com.emc.mongoose.common.concurrent.NamingThreadFactory; +import com.emc.mongoose.ui.log.NamingThreadFactory; import com.emc.mongoose.common.concurrent.ThreadUtil; import com.emc.mongoose.common.net.ssl.SslContext; import com.emc.mongoose.model.data.ContentSource; @@ -30,6 +30,7 @@ import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.ThreadContext; import java.io.IOException; import java.util.List; diff --git a/ui/src/main/java/com/emc/mongoose/ui/log/NamingThreadFactory.java b/ui/src/main/java/com/emc/mongoose/ui/log/NamingThreadFactory.java new file mode 100644 index 0000000000..3be1479bb1 --- /dev/null +++ b/ui/src/main/java/com/emc/mongoose/ui/log/NamingThreadFactory.java @@ -0,0 +1,75 @@ +package com.emc.mongoose.ui.log; + +import org.apache.logging.log4j.ThreadContext; + +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.Map; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +/** + Created by kurila on 19.07.16. + */ +public class NamingThreadFactory +implements ThreadFactory { + + protected static final UncaughtExceptionHandler exceptionHandler = (t, e) -> { + synchronized(System.err) { + System.err.println("Uncaught exception in the thread \"" + t.getName() + "\":"); + e.printStackTrace(System.err); + } + }; + + protected final AtomicInteger threadNumber = new AtomicInteger(0); + protected final String threadNamePrefix; + protected final boolean daemonFlag; + protected final Map context; + + public NamingThreadFactory(final String threadNamePrefix) { + this.threadNamePrefix = threadNamePrefix; + this.daemonFlag = false; + this.context = ThreadContext.getContext(); + } + + public NamingThreadFactory( + final String threadNamePrefix, final boolean daemonFlag + ) { + this.threadNamePrefix = threadNamePrefix; + this.daemonFlag = daemonFlag; + this.context = ThreadContext.getContext(); + } + + private static final class ContextAwareThread + extends Thread { + + private final Map context; + + private ContextAwareThread( + final Runnable task, final String name, final Map context + ) { + super(task, name); + this.context = context; + } + + @Override + public final void run() { + ThreadContext.putAll(context); + super.run(); + } + } + + @Override + public Thread newThread(final Runnable task) { + final Thread t = new ContextAwareThread( + task, threadNamePrefix + "#" + threadNumber.incrementAndGet(), context + ); + t.setDaemon(daemonFlag); + t.setUncaughtExceptionHandler(exceptionHandler); + return t; + } + + @Override + public final String toString() { + return threadNamePrefix; + } +}