Skip to content

Commit

Permalink
periodic storage driver service stdout reporting
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrey Kurilov committed Jan 10, 2017
1 parent 0429b03 commit 5e2c665
Show file tree
Hide file tree
Showing 20 changed files with 266 additions and 162 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import java.io.Closeable;
import java.rmi.RemoteException;
import java.util.ConcurrentModificationException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;

/**
Expand All @@ -10,6 +13,31 @@
public interface Daemon
extends Closeable {

Queue<Daemon> UNCLOSED = new ConcurrentLinkedQueue<>();

static void closeAll() {
synchronized(UNCLOSED) {
// close all unclosed daemons
for(final Daemon d : UNCLOSED) {
try {
d.close();
} catch(final IllegalStateException | ConcurrentModificationException ignored) {
} catch(final Throwable t) {
t.printStackTrace(System.err);
}
}

// wait until the list of the unclosed daemons is empty
while(!UNCLOSED.isEmpty()) {
try {
TimeUnit.SECONDS.sleep(1);
} catch(final InterruptedException e) {
break;
}
}
}
}

enum State {
INITIAL, STARTED, SHUTDOWN, INTERRUPTED, CLOSED
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.emc.mongoose.load.generator;

import com.emc.mongoose.common.api.SizeInBytes;
import com.emc.mongoose.common.concurrent.DaemonBase;
import com.emc.mongoose.model.DaemonBase;
import com.emc.mongoose.common.concurrent.Throttle;
import com.emc.mongoose.common.io.Output;
import com.emc.mongoose.common.io.ConstantStringInput;
Expand Down Expand Up @@ -122,12 +122,10 @@ private final class GeneratorTask
public final void run() {

if(ioTaskOutput == null) {
LOG.warn(Markers.ERR, "No load I/O task output set, exiting");
}

if(itemInput == null) {
LOG.warn(Markers.MSG, "No item source for the producing, exiting");
return;
LOG.warn(
Markers.ERR, "{}: no load I/O task output set, exiting",
BasicLoadGenerator.this.toString()
);
}

int n = 0, m = 0;
Expand Down Expand Up @@ -173,16 +171,19 @@ public final void run() {
break;
} catch(final Exception e) {
LogUtil.exception(
LOG, Level.WARN, e, "Failed to read the data items, count = {}, " +
"batch size = {}, batch offset = {}", generatedIoTaskCount, n, m
LOG, Level.WARN, e,
"{}: failed to read the data items, count = {}, batch size = {}, " +
"batch offset = {}",
BasicLoadGenerator.this.toString(), generatedIoTaskCount, n, m
);
//e.printStackTrace(System.err);
}
}
} finally {
LOG.debug(
Markers.MSG, "{}: produced {} items from \"{}\" for the \"{}\"",
Thread.currentThread().getName(), generatedIoTaskCount, itemInput.toString(), this
Thread.currentThread().getName(), generatedIoTaskCount, itemInput.toString(),
BasicLoadGenerator.this.toString()
);
try {
shutdown();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.emc.mongoose.load.monitor;

import com.emc.mongoose.common.api.SizeInBytes;
import com.emc.mongoose.common.concurrent.DaemonBase;
import com.emc.mongoose.model.DaemonBase;
import com.emc.mongoose.common.concurrent.NamingThreadFactory;
import com.emc.mongoose.common.concurrent.Throttle;
import com.emc.mongoose.load.monitor.metrics.ExtResultsXmlLogMessage;
Expand Down Expand Up @@ -498,20 +498,28 @@ protected void doShutdown()

try {
nextGenerator.interrupt();
LOG.debug(
Markers.MSG, "{}: load generator \"{}\" shut down", getName(),
nextGenerator.toString()
);
} catch(final RemoteException e) {
LogUtil.exception(
LOG, Level.WARN, e, "Failed to interrupt the generator {}",
nextGenerator.toString()
LOG, Level.WARN, e, "{}: failed to interrupt the generator {}",
getName(), nextGenerator.toString()
);
}

for(final StorageDriver<I, O, R> nextDriver : driversMap.get(nextGenerator)) {
try {
nextDriver.shutdown();
LOG.debug(
Markers.MSG, "{}: storage driver \"{}\" shut down", getName(),
nextDriver.toString()
);
} catch(final RemoteException e) {
LogUtil.exception(
LOG, Level.WARN, e, "Failed to shutdown the driver {}",
nextDriver.toString()
LOG, Level.WARN, e, "failed to shutdown the driver {}",
getName(), nextDriver.toString()
);
}
}
Expand Down Expand Up @@ -574,8 +582,8 @@ protected void doInterrupt()
nextDriver.interrupt();
} catch(final RemoteException e) {
LogUtil.exception(
LOG, Level.DEBUG, e, "Failed to interrupt the driver {}",
nextDriver.toString()
LOG, Level.DEBUG, e, "{}: failed to interrupt the driver {}",
getName(), nextDriver.toString()
);
}
}
Expand All @@ -584,10 +592,14 @@ protected void doInterrupt()
svcTaskExecutor.shutdownNow();
try {
if(!svcTaskExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
LOG.error(Markers.ERR, "Failed to terminate the service tasks in 1 second");
LOG.error(
Markers.ERR, "{}: failed to terminate the service tasks in 1 second", getName()
);
}
} catch(final InterruptedException ignored) {
} catch(final InterruptedException e) {
assert false;
}
LOG.debug(Markers.MSG, "{}: interrupted the load monitor", getName());
}

@Override
Expand All @@ -603,6 +615,11 @@ protected final void doClose()
if(finalResults != null) {
final int finalResultsCount = finalResults.size();
if(finalResultsCount > 0) {
LOG.debug(
Markers.MSG,
"{}: the driver \"{}\" returned {} final I/O results to process",
getName(), driver, finalResults
);
processIoResults(
finalResults, finalResultsCount, circularityMap.get(generator)
);
Expand All @@ -611,22 +628,33 @@ protected final void doClose()
} catch(final Throwable cause) {
LogUtil.exception(
LOG, Level.WARN, cause,
"Failed to process the final results for the driver {}", driver
"{}: failed to process the final results for the driver {}", getName(),
driver
);
}

try {
driver.close();
LOG.debug(
Markers.MSG, "{}: the storage driver \"{}\" has been closed", getName(),
driver
);
} catch(final IOException e) {
LogUtil.exception(LOG, Level.WARN, e, "Failed to close the driver {}", driver);
LogUtil.exception(
LOG, Level.WARN, e, "{}: failed to close the driver {}", getName(), driver
);
}
}

try {
generator.close();
LOG.debug(
Markers.MSG, "{}: the load generator \"{}\" has been closed", getName(),
generator
);
} catch(final IOException e) {
LogUtil.exception(
LOG, Level.WARN, e, "Failed to close the generator {}", generator
LOG, Level.WARN, e, "{}: failed to close the generator {}", getName(), generator
);
}
}
Expand Down Expand Up @@ -684,8 +712,10 @@ protected final void doClose()

if(itemInfoOutput != null) {
itemInfoOutput.close();
LOG.debug(Markers.MSG, "{}: closed the items output", getName());
}

UNCLOSED.remove(this);
LOG.debug(Markers.MSG, "{}: closed the load monitor", getName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,17 @@ protected static final class BasicSnapshot
//
public BasicSnapshot(
final long countSucc, final double succRateLast, final long countFail,
final double failRate, final long countByte, final double byteRate,
final double failRateLast, final long countByte, final double byteRateLast,
final long startTime, final long elapsedTime, final long sumDur,
final long sumLat, final com.codahale.metrics.Snapshot durSnapshot,
final com.codahale.metrics.Snapshot latSnapshot
) {
this.countSucc = countSucc;
this.succRateLast = succRateLast;
this.countFail = countFail;
this.failRateLast = failRate;
this.failRateLast = failRateLast;
this.countByte = countByte;
this.byteRateLast = byteRate;
this.byteRateLast = byteRateLast;
this.sumDur = sumDur;
this.sumLat = sumLat;
this.startTime = startTime;
Expand All @@ -106,7 +106,7 @@ public final long getSuccCount() {
//
@Override
public final double getSuccRateMean() {
return elapsedTime == 0 ? 0 : 1000 * countSucc / elapsedTime;
return elapsedTime == 0 ? 0 : 1000.0 * countSucc / elapsedTime;
}
//
@Override
Expand All @@ -121,7 +121,7 @@ public final long getFailCount() {
//
@Override
public final double getFailRateMean() {
return elapsedTime == 0 ? 0 : 1000 * countFail / elapsedTime;
return elapsedTime == 0 ? 0 : 1000.0 * countFail / elapsedTime;
}
//
@Override
Expand All @@ -136,7 +136,7 @@ public final long getByteCount() {
//
@Override
public final double getByteRateMean() {
return elapsedTime == 0 ? 0 : 1000 * countByte / elapsedTime;
return elapsedTime == 0 ? 0 : 1000.0 * countByte / elapsedTime;
}
//
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ public final void formatTo(final StringBuilder strb) {
.append(nextSnapshot.getSuccCount()).append(',')
.append(nextSnapshot.getFailCount()).append(',')
.append(nextSnapshot.getByteCount()).append(',')
.append(TimeUnit.MILLISECONDS.toSeconds(nextSnapshot.getElapsedTime())).append(',')
.append(TimeUnit.MICROSECONDS.toSeconds(nextSnapshot.getDurationSum())).append(',')
.append(nextSnapshot.getElapsedTime() / 1000.0).append(',')
.append(nextSnapshot.getDurationSum() / 1000.0).append(',')
.append(nextSnapshot.getSuccRateMean()).append(',')
.append(nextSnapshot.getSuccRateLast()).append(',')
.append(nextSnapshot.getByteRateMean()).append(',')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ private static void formatSingleSnapshot(
.append(concurrency).append('x').append(driversCount)
.append(": n=(").append(snapshot.getSuccCount()).append('/')
.append(snapshot.getFailCount()).append("); t[s]=(")
.append(formatFixedWidth(snapshot.getElapsedTime() / 1000, 7)).append('/')
.append(formatFixedWidth(snapshot.getElapsedTime() / 1000.0, 7)).append('/')
.append(formatFixedWidth(snapshot.getDurationSum() / M, 7)).append("); size=(")
.append(formatFixedSize(snapshot.getByteCount())).append("); TP[op/s]=(")
.append(formatFixedWidth(snapshot.getSuccRateMean(), 7)).append('/')
Expand Down Expand Up @@ -97,7 +97,7 @@ private void formatMultiSnapshot(final StringBuilder buffer) {
strb.appendFixedWidthPadLeft(snapshot.getFailCount(), 6, ' ').append('|');
strb
.appendFixedWidthPadLeft(
formatFixedWidth(snapshot.getElapsedTime() / 1000, 7), 7, ' '
formatFixedWidth(snapshot.getElapsedTime() / 1000.0, 7), 7, ' '
)
.append('|');
strb.appendFixedWidthPadRight(snapshot.getSuccRateMean(), 8, ' ').append('|');
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package com.emc.mongoose.common.concurrent;
package com.emc.mongoose.model;

import com.emc.mongoose.common.concurrent.Daemon;

import java.io.IOException;
import java.rmi.RemoteException;
import java.util.ConcurrentModificationException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static com.emc.mongoose.common.concurrent.Daemon.State.CLOSED;
Expand All @@ -19,8 +18,6 @@
public abstract class DaemonBase
implements Daemon {

protected static final Queue<Daemon> UNCLOSED = new ConcurrentLinkedQueue<>();

private AtomicReference<State> stateRef = new AtomicReference<>(INITIAL);
protected final Object state = new Object();

Expand Down Expand Up @@ -122,25 +119,4 @@ public void close()
public final boolean isClosed() {
return stateRef.get().equals(CLOSED);
}

public synchronized static void closeAll() {
// close all unclosed daemons
for(final Daemon d : UNCLOSED) {
try {
d.close();
} catch(final IllegalStateException | ConcurrentModificationException ignored) {
} catch(final Throwable t) {
t.printStackTrace(System.err);
}
}

// wait until the list of the unclosed daemons is empty
while(!UNCLOSED.isEmpty()) {
try {
TimeUnit.SECONDS.sleep(1);
} catch(final InterruptedException e) {
break;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.emc.mongoose.model;

import com.emc.mongoose.common.concurrent.Daemon;

import java.rmi.RemoteException;
import java.util.Map;

/**
Created by kurila on 30.11.16.
*/
public final class ServiceTaskDispatcher
implements Runnable {

private final Map<? extends Daemon, Runnable> dispatchTasks;

public ServiceTaskDispatcher(final Map<? extends Daemon, Runnable> dispatchTasks) {
this.dispatchTasks = dispatchTasks;
}

@Override
public final void run() {
try {
while(true) {
Runnable nextDispatchTask;
for(final Daemon taskOwner : dispatchTasks.keySet()) {
nextDispatchTask = dispatchTasks.get(taskOwner);
if(nextDispatchTask != null) {
try {
nextDispatchTask.run();
} catch(final Exception e) {
if(
taskOwner != null && !taskOwner.isInterrupted() &&
!taskOwner.isClosed()
) {
e.printStackTrace(System.err);
}
}
Thread.sleep(1);
}
}
}
} catch(final InterruptedException | RemoteException ignored) {
} finally {
dispatchTasks.clear();
}
}
}
Loading

0 comments on commit 5e2c665

Please sign in to comment.