Skip to content
This repository has been archived by the owner on Jul 30, 2024. It is now read-only.

Commit

Permalink
用NIO Mapper的方式写入文件,大幅度提升效率
Browse files Browse the repository at this point in the history
  • Loading branch information
monkeyWie committed Jan 3, 2018
1 parent cc17820 commit 89e9bd5
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void onPause(TaskInfo taskInfo) {
taskInfo.setStatus(4);
for (ChunkInfo chunkInfo : taskInfo.getChunkInfoList()) {
synchronized (chunkInfo) {
HttpDownUtil.safeClose(chunkInfo.getChannel(), chunkInfo.getFileChannel());
HttpDownUtil.safeClose(chunkInfo.getChannel(), chunkInfo);
if (chunkInfo.getStatus() != 2) {
chunkInfo.setStatus(4);
}
Expand Down Expand Up @@ -112,7 +112,7 @@ public void onDone(TaskInfo taskInfo) {
public void onDelete(TaskInfo taskInfo) {
for (ChunkInfo chunkInfo : taskInfo.getChunkInfoList()) {
synchronized (chunkInfo) {
HttpDownUtil.safeClose(chunkInfo.getChannel(), chunkInfo.getFileChannel());
HttpDownUtil.safeClose(chunkInfo.getChannel(), chunkInfo);
}
}
synchronized (taskInfo) {
Expand Down
53 changes: 21 additions & 32 deletions src/main/java/lee/study/down/hanndle/HttpDownInitializer.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package lee.study.down.hanndle;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
Expand All @@ -13,11 +14,14 @@
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
import lee.study.down.HttpDownServer;
import lee.study.down.dispatch.HttpDownCallback;
import lee.study.down.model.ChunkInfo;
import lee.study.down.model.TaskInfo;
import lee.study.down.util.FileUtil;
import lee.study.down.util.HttpDownUtil;

public class HttpDownInitializer extends ChannelInitializer {
Expand All @@ -27,7 +31,6 @@ public class HttpDownInitializer extends ChannelInitializer {
private ChunkInfo chunkInfo;
private HttpDownCallback callback;

private FileChannel fileChannel;
private long realContentSize;

public HttpDownInitializer(boolean isSsl, TaskInfo taskInfo, ChunkInfo chunkInfo,
Expand All @@ -52,20 +55,20 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
try {
synchronized (chunkInfo) {
Boolean close = ctx.channel().attr(HttpDownUtil.CLOSE_ATTR).get();
if (close!=null&&close==true) {
if (close != null && close == true) {
return;
}
}
if (msg instanceof HttpContent) {
if (fileChannel == null || !fileChannel.isOpen()) {
if (chunkInfo.getFileChannel() == null || !chunkInfo.getFileChannel().isOpen()) {
return;
}
HttpContent httpContent = (HttpContent) msg;
ByteBuf byteBuf = httpContent.content();
int readableBytes = byteBuf.readableBytes();
synchronized (chunkInfo) {
if (chunkInfo.getStatus() == 1) {
fileChannel.write(byteBuf.nioBuffer());
chunkInfo.getMappedBuffer().put(byteBuf.nioBuffer());
//文件已下载大小
chunkInfo.setDownSize(chunkInfo.getDownSize() + readableBytes);
} else {
Expand All @@ -78,7 +81,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
callback.onProgress(taskInfo, chunkInfo);
//分段下载完成关闭fileChannel
if (chunkInfo.getDownSize() == chunkInfo.getTotalSize()) {
HttpDownUtil.safeClose(ctx.channel(), fileChannel);
HttpDownUtil.safeClose(ctx.channel(), chunkInfo);
//分段下载完成回调
chunkInfo.setStatus(2);
chunkInfo.setLastTime(System.currentTimeMillis());
Expand Down Expand Up @@ -123,10 +126,12 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
"下载响应:" + chunkInfo.getIndex() + "\t" + chunkInfo.getDownSize() + "\t"
+ httpResponse.headers().get(
HttpHeaderNames.CONTENT_RANGE) + "\t" + realContentSize);
fileChannel = new RandomAccessFile(taskInfo.buildTaskFilePath(), "rw").getChannel();
fileChannel.position(chunkInfo.getOriStartPosition() + chunkInfo.getDownSize());
FileChannel fileChannel = new RandomAccessFile(taskInfo.buildTaskFilePath(), "rw").getChannel();
MappedByteBuffer mappedBuffer = fileChannel.map(MapMode.READ_WRITE,
chunkInfo.getOriStartPosition() + chunkInfo.getDownSize(), chunkInfo.getTotalSize());
chunkInfo.setStatus(1);
chunkInfo.setFileChannel(fileChannel);
chunkInfo.setMappedBuffer(mappedBuffer);
callback.onChunkStart(taskInfo, chunkInfo);
}
} catch (Exception e) {
Expand Down Expand Up @@ -162,31 +167,15 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E

public static void main(String[] args) throws Exception {
String path = "f:/down/test1.txt";
RandomAccessFile r1 = new RandomAccessFile(path, "rw");
r1.setLength(1024 * 1024 * 1024 * 1L);
r1.close();
new Thread(() -> {
try {
FileChannel fileChannel = new RandomAccessFile(path, "rw").getChannel();
fileChannel.position(1000);
long time = System.currentTimeMillis();
fileChannel.write(ByteBuffer.wrap(new byte[1024]));
System.out.println("use1:" + (System.currentTimeMillis() - time));
} catch (Exception e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
FileChannel fileChannel = new RandomAccessFile(path, "rw").getChannel();
fileChannel.position(4000);
long time = System.currentTimeMillis();
fileChannel.write(ByteBuffer.wrap(new byte[1024]));
System.out.println("use2:" + (System.currentTimeMillis() - time));
} catch (Exception e) {
e.printStackTrace();
}
}).start();
RandomAccessFile randomAccessFile = new RandomAccessFile(path, "rw");
randomAccessFile.setLength(100);
randomAccessFile.close();
FileChannel fileChannel = new RandomAccessFile(path, "rw").getChannel();
MappedByteBuffer mbb = fileChannel.map(MapMode.READ_WRITE, 0, 5);
mbb.put(new byte[]{1, 2, 3, 4, 5});
FileUtil.unmap(mbb);
fileChannel.close();
FileUtil.deleteIfExists(path);
}

}
3 changes: 3 additions & 0 deletions src/main/java/lee/study/down/model/ChunkInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import io.netty.channel.Channel;
import java.io.Serializable;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import lombok.AllArgsConstructor;
import lombok.Data;
Expand All @@ -27,6 +28,8 @@ public class ChunkInfo implements Serializable {
private transient volatile Channel channel;
@JsonIgnore
private transient volatile FileChannel fileChannel;
@JsonIgnore
private transient volatile MappedByteBuffer mappedBuffer;

public ChunkInfo() {
}
Expand Down
13 changes: 11 additions & 2 deletions src/main/java/lee/study/down/util/FileUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.file.Files;
import java.util.Stack;
import sun.nio.ch.FileChannelImpl;

public class FileUtil {

Expand Down Expand Up @@ -111,7 +114,7 @@ public static File createFileSmart(String path) throws IOException {
if (file.exists()) {
file.delete();
file.createNewFile();
}else{
} else {
createDirSmart(file.getParent());
file.createNewFile();
}
Expand Down Expand Up @@ -139,8 +142,14 @@ public static File createDirSmart(String path) throws IOException {
return file;
}

public static void unmap(MappedByteBuffer mappedBuffer) throws Exception {
Method m = FileChannelImpl.class.getDeclaredMethod("unmap", MappedByteBuffer.class);
m.setAccessible(true);
m.invoke(FileChannelImpl.class, mappedBuffer);
}

public static void main(String[] args) throws Exception {
RandomAccessFile raf2 = new RandomAccessFile("F:\\百度云合并下载研究\\testbbb.txt","rw");
RandomAccessFile raf2 = new RandomAccessFile("F:\\百度云合并下载研究\\testbbb.txt", "rw");
raf2.setLength(10);
raf2.getChannel().position(10).read(ByteBuffer.allocate(5));
System.out.println(raf2.getChannel().position());
Expand Down
22 changes: 17 additions & 5 deletions src/main/java/lee/study/down/util/HttpDownUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.net.URLDecoder;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Map.Entry;
import java.util.UUID;
Expand All @@ -41,7 +42,7 @@

public class HttpDownUtil {

//private static final RecvByteBufAllocator RECV_BYTE_BUF_ALLOCATOR = new AdaptiveRecvByteBufAllocator(64,8192,65536);
// private static final RecvByteBufAllocator RECV_BYTE_BUF_ALLOCATOR = new AdaptiveRecvByteBufAllocator(64,8192,65536);
public static final AttributeKey<Boolean> CLOSE_ATTR = AttributeKey.newInstance("close");

/**
Expand Down Expand Up @@ -250,7 +251,7 @@ public static void taskDown(HttpDownInfo httpDownInfo)
for (int i = 0; i < taskInfo.getChunkInfoList().size(); i++) {
ChunkInfo chunkInfo = taskInfo.getChunkInfoList().get(i);
//避免分段下载速度比总的下载速度大太多的问题
chunkInfo.setStatus(1);
// chunkInfo.setStatus(1);
chunkInfo.setStartTime(taskInfo.getStartTime());
chunkDown(httpDownInfo, chunkInfo);
}
Expand All @@ -267,6 +268,7 @@ public static void chunkDown(HttpDownInfo httpDownInfo, ChunkInfo chunkInfo)
HttpDownServer.LOGGER.debug(
"开始下载:" + chunkInfo.getIndex() + "\t" + chunkInfo.getDownSize());
ChannelFuture cf = HttpDownServer.DOWN_BOOT
// .option(ChannelOption.RCVBUF_ALLOCATOR,RECV_BYTE_BUF_ALLOCATOR)
.handler(
new HttpDownInitializer(requestProto.getSsl(), taskInfo, chunkInfo,
HttpDownServer.CALLBACK))
Expand Down Expand Up @@ -314,7 +316,7 @@ public static void retryDown(TaskInfo taskInfo, ChunkInfo chunkInfo)
public static void retryDown(TaskInfo taskInfo, ChunkInfo chunkInfo, long downSize)
throws Exception {
synchronized (chunkInfo) {
safeClose(chunkInfo.getChannel(), chunkInfo.getFileChannel());
safeClose(chunkInfo.getChannel(), chunkInfo);
if (setStatusIfNotDone(chunkInfo, 3)) {
if (downSize != -1) {
chunkInfo.setDownSize(downSize);
Expand All @@ -340,7 +342,7 @@ public static void retryDown(TaskInfo taskInfo, ChunkInfo chunkInfo, long downSi
public static void continueDown(TaskInfo taskInfo, ChunkInfo chunkInfo)
throws Exception {
synchronized (chunkInfo) {
safeClose(chunkInfo.getChannel(), chunkInfo.getFileChannel());
safeClose(chunkInfo.getChannel(), chunkInfo);
//避免同时两个重新下载
if (setStatusIfNotDone(chunkInfo, 5)) {
//计算后续下载字节
Expand All @@ -351,7 +353,7 @@ public static void continueDown(TaskInfo taskInfo, ChunkInfo chunkInfo)
}
}

public static void safeClose(Channel channel, FileChannel fileChannel) {
public static void safeClose(Channel channel, ChunkInfo chunkInfo) {
try {
if (channel != null && channel.isOpen()) {
channel.attr(CLOSE_ATTR).set(true);
Expand All @@ -362,13 +364,23 @@ public static void safeClose(Channel channel, FileChannel fileChannel) {
HttpDownServer.LOGGER.error("safeClose netty channel", e);
}
try {
FileChannel fileChannel = chunkInfo.getFileChannel();
if (fileChannel != null && fileChannel.isOpen()) {
//关闭旧的下载文件连接
fileChannel.close();
}
} catch (IOException e) {
HttpDownServer.LOGGER.error("safeClose file channel", e);
}
try {
MappedByteBuffer mappedBuffer = chunkInfo.getMappedBuffer();
if (mappedBuffer != null) {
//关闭旧的下载文件连接
FileUtil.unmap(mappedBuffer);
}
} catch (Exception e) {
HttpDownServer.LOGGER.error("safeClose file mappedBuffer", e);
}
}

public static boolean setStatusIfNotDone(ChunkInfo chunkInfo, int update) {
Expand Down

0 comments on commit 89e9bd5

Please sign in to comment.