Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrey Kurilov committed Jan 15, 2017
1 parent a9c0b5e commit 1903448
Show file tree
Hide file tree
Showing 13 changed files with 133 additions and 73 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import com.emc.mongoose.common.api.SizeInBytes;
import com.emc.mongoose.common.concurrent.ThreadUtil;
import com.emc.mongoose.model.DaemonBase;
import com.emc.mongoose.common.concurrent.NamingThreadFactory;
import com.emc.mongoose.ui.log.NamingThreadFactory;
import com.emc.mongoose.common.concurrent.Throttle;
import com.emc.mongoose.load.monitor.metrics.ExtResultsXmlLogMessage;
import com.emc.mongoose.load.monitor.metrics.IoTraceCsvBatchLogMessage;
Expand Down Expand Up @@ -40,7 +40,6 @@
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.config.Order;

import java.io.IOException;
import java.rmi.NoSuchObjectException;
Expand Down Expand Up @@ -498,7 +497,8 @@ protected void doShutdown()
throws IllegalStateException {

final ExecutorService shutdownExecutor = Executors.newFixedThreadPool(
ThreadUtil.getHardwareConcurrencyLevel()
ThreadUtil.getHardwareConcurrencyLevel(),
new NamingThreadFactory("shutdownWorker", true)
);

for(final LoadGenerator<I, O, R> nextGenerator : driversMap.keySet()) {
Expand Down Expand Up @@ -603,7 +603,8 @@ protected void doInterrupt()
throws IllegalStateException {

final ExecutorService interruptExecutor = Executors.newFixedThreadPool(
ThreadUtil.getHardwareConcurrencyLevel()
ThreadUtil.getHardwareConcurrencyLevel(),
new NamingThreadFactory("interruptWorker", true)
);

for(final LoadGenerator<I, O, R> nextGenerator : driversMap.keySet()) {
Expand Down Expand Up @@ -656,7 +657,8 @@ protected final void doClose()
throws IOException {

final ExecutorService ioResultsGetAndApplyExecutor = Executors.newFixedThreadPool(
ThreadUtil.getHardwareConcurrencyLevel()
ThreadUtil.getHardwareConcurrencyLevel(),
new NamingThreadFactory("ioResultsGetAndApplyWorker", true)
);

for(final LoadGenerator<I, O, R> generator : driversMap.keySet()) {
Expand All @@ -673,30 +675,32 @@ protected final void doClose()
LOG.debug(
Markers.MSG,
"{}: the driver \"{}\" returned {} final I/O results to process",
getName(), driver, finalResults
getName(), driver.toString(), finalResults.size()
);
processIoResults(
finalResults, finalResultsCount, circularityMap.get(generator)
finalResults, finalResultsCount,
circularityMap.get(generator)
);
}
}
} catch(final Throwable cause) {
LogUtil.exception(
LOG, Level.WARN, cause,
"{}: failed to process the final results for the driver {}", getName(),
driver
"{}: failed to process the final results for the driver {}",
getName(), driver.toString()
);
}

try {
driver.close();
LOG.debug(
Markers.MSG, "{}: the storage driver \"{}\" has been closed", getName(),
driver
Markers.MSG, "{}: the storage driver \"{}\" has been closed",
getName(), driver.toString()
);
} catch(final IOException e) {
LogUtil.exception(
LOG, Level.WARN, e, "{}: failed to close the driver {}", getName(), driver
LOG, Level.WARN, e, "{}: failed to close the driver {}", getName(),
driver.toString()
);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.emc.mongoose.run.scenario;

import com.emc.mongoose.common.concurrent.NamingThreadFactory;
import com.emc.mongoose.ui.log.NamingThreadFactory;
import com.emc.mongoose.ui.config.Config;
import com.emc.mongoose.ui.log.LogUtil;
import static com.emc.mongoose.ui.log.LogUtil.BLUE;
Expand Down
30 changes: 21 additions & 9 deletions run/src/main/java/com/emc/mongoose/run/scenario/JobBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
import static com.emc.mongoose.ui.config.Config.LoadConfig.JobConfig;
import static com.emc.mongoose.common.Constants.KEY_JOB_NAME;

import com.emc.mongoose.ui.log.LogUtil;
import com.emc.mongoose.ui.log.Markers;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.ThreadContext;

/**
Expand All @@ -12,6 +17,8 @@
public abstract class JobBase
implements Job {

private static final Logger LOG = LogManager.getLogger();

protected final Config localConfig;

protected JobBase(final Config appConfig) {
Expand All @@ -26,15 +33,20 @@ public final Config getConfig() {
@Override
public void run() {
final JobConfig jobConfig = localConfig.getLoadConfig().getJobConfig();
String jobName = jobConfig.getName();
if(jobName == null) {
jobName = ThreadContext.get(KEY_JOB_NAME);
jobConfig.setName(jobName);
} else {
ThreadContext.put(KEY_JOB_NAME, jobName);
}
if(jobName == null) {
throw new IllegalStateException("Job name is not set");
try {
String jobName = jobConfig.getName();
if(jobName == null) {
jobName = ThreadContext.get(KEY_JOB_NAME);
if(jobName == null) {
LOG.fatal(Markers.ERR, "Job name is not set");
} else {
jobConfig.setName(jobName);
}
} else {
ThreadContext.put(KEY_JOB_NAME, jobName);
}
} catch(final Throwable t) {
LogUtil.exception(LOG, Level.ERROR, t, "Unexpected failure");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ protected final synchronized boolean append(final Job job) {
//
@Override
public final void run() {
LOG.info(Markers.MSG, "Scenario start");
super.run();
LOG.info(Markers.MSG, "Scenario end");
}
Expand Down
13 changes: 9 additions & 4 deletions run/src/main/java/com/emc/mongoose/run/scenario/ParallelJob.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.emc.mongoose.run.scenario;

import com.emc.mongoose.common.concurrent.NamingThreadFactory;
import com.emc.mongoose.ui.log.NamingThreadFactory;
import com.emc.mongoose.ui.config.Config;
import com.emc.mongoose.ui.log.Markers;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -28,14 +28,16 @@ protected ParallelJob(final Config appConfig, final Map<String, Object> subTree)
public final synchronized void run() {

super.run();

final ExecutorService parallelJobsExecutor = Executors.newFixedThreadPool(
childJobs.size(), new NamingThreadFactory("jobWorker" + hashCode(), true)
);
for(final Job subJob : childJobs) {
parallelJobsExecutor.submit(subJob);
}
LOG.debug(Markers.MSG, "{}: started {} child jobs", toString(), childJobs.size());
LOG.info(
Markers.MSG, "{}: execute {} child jobs in parallel", toString(), childJobs.size()
);
parallelJobsExecutor.shutdown();

final long limitTime = localConfig.getLoadConfig().getLimitConfig().getTime();
Expand All @@ -45,12 +47,15 @@ public final synchronized void run() {
} else {
parallelJobsExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
}
LOG.debug(Markers.MSG, "{}: {} child jobs done", toString(), childJobs.size());
} catch(final InterruptedException e) {
LOG.debug(Markers.MSG, "{}: interrupted the child jobs execution", toString());
} finally {
parallelJobsExecutor.shutdownNow();
}
LOG.info(
Markers.MSG, "{}: finished parallel execution of {} child jobs", toString(),
childJobs.size()
);
}
//
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ public String toString() {
@Override
public synchronized void run() {
super.run();
LOG.debug(Markers.MSG, "{}: start {} child jobs", toString(), childJobs.size());
LOG.info(
Markers.MSG, "{}: execute {} child jobs sequentially", toString(), childJobs.size()
);
for(final Job subJob : childJobs) {
LOG.debug(Markers.MSG, "{}: child job \"{}\" start", toString(), subJob.toString());
subJob.run();
LOG.debug(Markers.MSG, "{}: child job \"{}\" is done", toString(), subJob.toString());
}
LOG.debug(Markers.MSG, "{}: end", toString());
LOG.info(Markers.MSG, "{}: finished the sequential execution of {} child jobs", toString());
}
}
5 changes: 0 additions & 5 deletions scenario/misc/sanity.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@
"output" : {
"path" : "/default"
}
},
"storage" : {
"driver" : {
"remote" : true
}
}
},
"jobs" : [
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.emc.mongoose.storage.driver.net.base;

import com.emc.mongoose.common.api.SizeInBytes;
import com.emc.mongoose.common.concurrent.NamingThreadFactory;
import com.emc.mongoose.ui.log.NamingThreadFactory;
import com.emc.mongoose.common.concurrent.ThreadUtil;
import com.emc.mongoose.common.net.ssl.SslContext;
import com.emc.mongoose.common.io.Input;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.emc.mongoose.storage.driver.nio.base;

import com.emc.mongoose.common.concurrent.NamingThreadFactory;
import com.emc.mongoose.ui.log.NamingThreadFactory;
import com.emc.mongoose.common.concurrent.ThreadUtil;
import com.emc.mongoose.model.io.task.IoTask;
import static com.emc.mongoose.model.io.task.IoTask.IoResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.emc.mongoose.storage.mock.impl.remote.MDns;
import com.emc.mongoose.ui.log.LogUtil;
import com.emc.mongoose.ui.log.Markers;
import com.emc.mongoose.ui.log.NamingThreadFactory;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -64,6 +65,7 @@ public BasicStorageMockClient(final ContentSource contentSrc, final JmDNS jmDns)
this.executor = new ThreadPoolExecutor(
ThreadUtil.getHardwareConcurrencyLevel(), ThreadUtil.getHardwareConcurrencyLevel(),
0, TimeUnit.DAYS, new ArrayBlockingQueue<>(TaskSequencer.DEFAULT_TASK_QUEUE_SIZE_LIMIT),
new NamingThreadFactory("storageMockClientWorker", true),
(r, e) -> LOG.error("Task {} rejected", r.toString())
) {
@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.emc.mongoose.storage.mock.impl.http;

import com.emc.mongoose.common.concurrent.NamingThreadFactory;
import com.emc.mongoose.ui.log.NamingThreadFactory;
import com.emc.mongoose.common.concurrent.ThreadUtil;
import com.emc.mongoose.common.net.ssl.SslContext;
import com.emc.mongoose.model.data.ContentSource;
Expand Down Expand Up @@ -30,6 +30,7 @@
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.ThreadContext;

import java.io.IOException;
import java.util.List;
Expand Down
75 changes: 75 additions & 0 deletions ui/src/main/java/com/emc/mongoose/ui/log/NamingThreadFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package com.emc.mongoose.ui.log;

import org.apache.logging.log4j.ThreadContext;

import java.lang.Thread.UncaughtExceptionHandler;
import java.util.Map;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
Created by kurila on 19.07.16.
*/
public class NamingThreadFactory
implements ThreadFactory {

protected static final UncaughtExceptionHandler exceptionHandler = (t, e) -> {
synchronized(System.err) {
System.err.println("Uncaught exception in the thread \"" + t.getName() + "\":");
e.printStackTrace(System.err);
}
};

protected final AtomicInteger threadNumber = new AtomicInteger(0);
protected final String threadNamePrefix;
protected final boolean daemonFlag;
protected final Map<String, String> context;

public NamingThreadFactory(final String threadNamePrefix) {
this.threadNamePrefix = threadNamePrefix;
this.daemonFlag = false;
this.context = ThreadContext.getContext();
}

public NamingThreadFactory(
final String threadNamePrefix, final boolean daemonFlag
) {
this.threadNamePrefix = threadNamePrefix;
this.daemonFlag = daemonFlag;
this.context = ThreadContext.getContext();
}

private static final class ContextAwareThread
extends Thread {

private final Map<String, String> context;

private ContextAwareThread(
final Runnable task, final String name, final Map<String, String> context
) {
super(task, name);
this.context = context;
}

@Override
public final void run() {
ThreadContext.putAll(context);
super.run();
}
}

@Override
public Thread newThread(final Runnable task) {
final Thread t = new ContextAwareThread(
task, threadNamePrefix + "#" + threadNumber.incrementAndGet(), context
);
t.setDaemon(daemonFlag);
t.setUncaughtExceptionHandler(exceptionHandler);
return t;
}

@Override
public final String toString() {
return threadNamePrefix;
}
}

0 comments on commit 1903448

Please sign in to comment.