Skip to content

Commit

Permalink
issue #903 resolution
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrey Kurilov committed Jan 7, 2017
1 parent 94b0e47 commit 0894c5f
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 29 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package com.emc.mongoose.load.generator;

import com.emc.mongoose.common.api.SizeInBytes;
import com.emc.mongoose.common.concurrent.DaemonBase;
import com.emc.mongoose.common.concurrent.Throttle;
import com.emc.mongoose.common.io.Output;
import com.emc.mongoose.common.io.ConstantStringInput;
import com.emc.mongoose.common.exception.UserShootHisFootException;
import static com.emc.mongoose.common.Constants.BATCH_SIZE;
import static com.emc.mongoose.model.io.task.IoTask.IoResult;

import com.emc.mongoose.model.io.IoType;
import com.emc.mongoose.ui.log.LogUtil;
import com.emc.mongoose.ui.log.Markers;
import com.emc.mongoose.common.io.Input;
Expand Down Expand Up @@ -44,6 +47,7 @@ public class BasicLoadGenerator<I extends Item, O extends IoTask<I, R>, R extend
private volatile Output<O> ioTaskOutput;

private final Input<I> itemInput;
private final SizeInBytes avgItemSize;
private final Input<String> dstPathInput;
private final Thread worker;
private final long countLimit;
Expand All @@ -56,12 +60,13 @@ public class BasicLoadGenerator<I extends Item, O extends IoTask<I, R>, R extend

@SuppressWarnings("unchecked")
public BasicLoadGenerator(
final Input<I> itemInput, final Input<String> dstPathInput,
final IoTaskBuilder<I, O, R> ioTaskBuilder, final long countLimit,
final int maxItemQueueSize, final boolean isCircular
final Input<I> itemInput, final SizeInBytes avgItemSize,
final Input<String> dstPathInput, final IoTaskBuilder<I, O, R> ioTaskBuilder,
final long countLimit, final int maxItemQueueSize, final boolean isCircular
) throws UserShootHisFootException {

this.itemInput = itemInput;
this.avgItemSize = avgItemSize;
this.dstPathInput = dstPathInput;
this.ioTaskBuilder = ioTaskBuilder;
this.countLimit = countLimit > 0 ? countLimit : Long.MAX_VALUE;
Expand Down Expand Up @@ -98,6 +103,16 @@ public final long getGeneratedIoTasksCount() {
return generatedIoTaskCount;
}

@Override
public final SizeInBytes getAvgItemSize() {
return avgItemSize;
}

@Override
public final IoType getIoType() {
return ioTaskBuilder.getIoType();
}

private final class GeneratorTask
implements Runnable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ public T build()
dstPathInput = getDstPathInput(ioType);

return (T) new BasicLoadGenerator<>(
itemInput, dstPathInput, ioTaskBuilder, countLimit, maxQueueSize, isCircular
itemInput, avgItemSize, dstPathInput, ioTaskBuilder, countLimit, maxQueueSize,
isCircular
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.emc.mongoose.load.monitor;

import com.emc.mongoose.common.api.SizeInBytes;
import com.emc.mongoose.common.concurrent.DaemonBase;
import com.emc.mongoose.common.concurrent.NamingThreadFactory;
import com.emc.mongoose.common.concurrent.Throttle;
Expand Down Expand Up @@ -77,6 +78,7 @@ public class BasicLoadMonitor<I extends Item, O extends IoTask<I, R>, R extends
private final Int2ObjectMap<IoStats> ioStats = new Int2ObjectOpenHashMap<>();
private final Int2ObjectMap<IoStats> medIoStats = new Int2ObjectOpenHashMap<>();
private volatile Int2ObjectMap<IoStats.Snapshot> lastStats = new Int2ObjectOpenHashMap<>();
private final Int2ObjectMap<SizeInBytes> itemSizeMap = new Int2ObjectOpenHashMap<>();
private final LongAdder counterResults = new LongAdder();
private volatile Output<String> itemInfoOutput;
private final Int2IntMap concurrencyMap;
Expand Down Expand Up @@ -184,6 +186,7 @@ public BasicLoadMonitor(
ioStats.put(
ioTypeCode, new BasicIoStats(IoType.values()[ioTypeCode].name(), metricsPeriodSec)
);
itemSizeMap.put(nextGenerator.getIoType().ordinal(), nextGenerator.getAvgItemSize());
}
this.isAnyCircular = anyCircularFlag;
if(isAnyCircular) {
Expand Down Expand Up @@ -641,7 +644,9 @@ protected final void doClose()
);
LOG.info(
Markers.METRICS_EXT_RESULTS,
new ExtResultsXmlLogMessage(name, lastStats, concurrencyMap, driversCountMap)
new ExtResultsXmlLogMessage(
name, lastStats, itemSizeMap, concurrencyMap, driversCountMap
)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,18 @@ public final class ExtResultsXmlLogMessage

private final String jobName;
private final Int2ObjectMap<Snapshot> snapshots;
private final Int2ObjectMap<SizeInBytes> itemSizeMap;
private final Int2IntMap concurrencyMap;
private final Int2IntMap driversCountMap;

public ExtResultsXmlLogMessage(
final String jobName, final Int2ObjectMap<Snapshot> snapshots,
final Int2IntMap concurrencyMap, final Int2IntMap driversCountMap
final Int2ObjectMap<SizeInBytes> itemSizeMap, final Int2IntMap concurrencyMap,
final Int2IntMap driversCountMap
) {
this.jobName = jobName;
this.snapshots = snapshots;
this.itemSizeMap = itemSizeMap;
this.concurrencyMap = concurrencyMap;
this.driversCountMap = driversCountMap;
}
Expand All @@ -46,12 +49,14 @@ public ExtResultsXmlLogMessage(
public final void formatTo(final StringBuilder buffer) {

Snapshot snapshot;
SizeInBytes itemSize;
int concurrency;
int driversCount;

for(final int ioTypeCode : snapshots.keySet()) {

snapshot = snapshots.get(ioTypeCode);
itemSize = itemSizeMap.get(ioTypeCode);
concurrency = concurrencyMap.get(ioTypeCode);
driversCount = driversCountMap.get(ioTypeCode);

Expand All @@ -71,10 +76,7 @@ public final void formatTo(final StringBuilder buffer) {
buffer.append("clients=\"").append(driversCount).append("\" ");
buffer.append("error=\"").append(snapshot.getFailCount()).append("\" ");
buffer.append("runtime=\"").append(((float) elapsedTimeMillis) / 1000).append("\" ");
final long itemCount = snapshot.getSuccCount();
final long byteCount = snapshot.getByteCount();
final String avgSize = SizeInBytes.formatFixedSize(itemCount > 0 ? byteCount / itemCount : 0);
buffer.append("filesize=\"").append(avgSize).append("\" ");
buffer.append("filesize=\"").append(itemSize == null ? 0 : itemSize.toString()).append("\" ");
buffer.append("tps=\"").append(snapshot.getSuccRateMean()).append("\" tps_unit=\"Fileps\" ");
buffer.append("bw=\"").append(snapshot.getByteRateMean() / MIB).append("\" bw_unit=\"MBps\" ");
buffer.append("latency=\"").append(snapshot.getLatencyAvg()).append("\" latency_unit=\"us\" ");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.emc.mongoose.model.load;

import com.emc.mongoose.common.api.SizeInBytes;
import com.emc.mongoose.common.concurrent.Daemon;
import com.emc.mongoose.common.concurrent.Throttle;
import com.emc.mongoose.common.io.Output;
import com.emc.mongoose.model.io.IoType;
import com.emc.mongoose.model.io.task.IoTask;
import static com.emc.mongoose.model.io.task.IoTask.IoResult;
import com.emc.mongoose.model.item.Item;
Expand All @@ -20,4 +22,8 @@ public interface LoadGenerator<I extends Item, O extends IoTask<I, R>, R extends
void setOutput(final Output<O> ioTaskOutput);

long getGeneratedIoTasksCount();

SizeInBytes getAvgItemSize();

IoType getIoType();
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,23 +109,11 @@ protected StorageDriverBase(
useDataLatencyResult = traceConfig.getDataLatency();
useTransferSizeResult = traceConfig.getTransferSize();

DISPATCH_INBOUND_TASKS.put(this, new IoTasksDispatch(metricsConfig.getPeriod()));
DISPATCH_INBOUND_TASKS.put(this, new IoTasksDispatch());
}

private final class IoTasksDispatch
implements Runnable {

private final long metricsPeriodNanoSec;

private IoTasksDispatch(final long metricsPeriodSec) {
this.metricsPeriodNanoSec = TimeUnit.SECONDS.toNanos(
metricsPeriodSec > 0 ? metricsPeriodSec : Long.MAX_VALUE
);
}

private long prevNanoTimeStamp = -1;
private long nextNanoTimeStamp;

@Override
public final void run() {
int n;
Expand All @@ -147,12 +135,6 @@ public final void run() {
toString()
);
}

nextNanoTimeStamp = System.nanoTime();
if(nextNanoTimeStamp - prevNanoTimeStamp > metricsPeriodNanoSec) {
outputCurrentMetrics();
prevNanoTimeStamp = nextNanoTimeStamp;
}
}
}

Expand Down

0 comments on commit 0894c5f

Please sign in to comment.