diff --git a/src/main/java/lee/study/down/dispatch/DefaultHttpDownCallback.java b/src/main/java/lee/study/down/dispatch/DefaultHttpDownCallback.java index 9a7a2384..ca0601c0 100644 --- a/src/main/java/lee/study/down/dispatch/DefaultHttpDownCallback.java +++ b/src/main/java/lee/study/down/dispatch/DefaultHttpDownCallback.java @@ -47,7 +47,7 @@ public void onPause(TaskInfo taskInfo) { taskInfo.setStatus(4); for (ChunkInfo chunkInfo : taskInfo.getChunkInfoList()) { synchronized (chunkInfo) { - HttpDownUtil.safeClose(chunkInfo.getChannel(), chunkInfo.getFileChannel()); + HttpDownUtil.safeClose(chunkInfo.getChannel(), chunkInfo); if (chunkInfo.getStatus() != 2) { chunkInfo.setStatus(4); } @@ -112,7 +112,7 @@ public void onDone(TaskInfo taskInfo) { public void onDelete(TaskInfo taskInfo) { for (ChunkInfo chunkInfo : taskInfo.getChunkInfoList()) { synchronized (chunkInfo) { - HttpDownUtil.safeClose(chunkInfo.getChannel(), chunkInfo.getFileChannel()); + HttpDownUtil.safeClose(chunkInfo.getChannel(), chunkInfo); } } synchronized (taskInfo) { diff --git a/src/main/java/lee/study/down/hanndle/HttpDownInitializer.java b/src/main/java/lee/study/down/hanndle/HttpDownInitializer.java index 32a7e08b..fcc53cfc 100644 --- a/src/main/java/lee/study/down/hanndle/HttpDownInitializer.java +++ b/src/main/java/lee/study/down/hanndle/HttpDownInitializer.java @@ -1,6 +1,7 @@ package lee.study.down.hanndle; import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; @@ -13,11 +14,14 @@ import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; +import java.nio.channels.FileChannel.MapMode; import lee.study.down.HttpDownServer; import lee.study.down.dispatch.HttpDownCallback; import lee.study.down.model.ChunkInfo; import lee.study.down.model.TaskInfo; +import lee.study.down.util.FileUtil; import lee.study.down.util.HttpDownUtil; public class HttpDownInitializer extends ChannelInitializer { @@ -27,7 +31,6 @@ public class HttpDownInitializer extends ChannelInitializer { private ChunkInfo chunkInfo; private HttpDownCallback callback; - private FileChannel fileChannel; private long realContentSize; public HttpDownInitializer(boolean isSsl, TaskInfo taskInfo, ChunkInfo chunkInfo, @@ -52,12 +55,12 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception try { synchronized (chunkInfo) { Boolean close = ctx.channel().attr(HttpDownUtil.CLOSE_ATTR).get(); - if (close!=null&&close==true) { + if (close != null && close == true) { return; } } if (msg instanceof HttpContent) { - if (fileChannel == null || !fileChannel.isOpen()) { + if (chunkInfo.getFileChannel() == null || !chunkInfo.getFileChannel().isOpen()) { return; } HttpContent httpContent = (HttpContent) msg; @@ -65,7 +68,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception int readableBytes = byteBuf.readableBytes(); synchronized (chunkInfo) { if (chunkInfo.getStatus() == 1) { - fileChannel.write(byteBuf.nioBuffer()); + chunkInfo.getMappedBuffer().put(byteBuf.nioBuffer()); //文件已下载大小 chunkInfo.setDownSize(chunkInfo.getDownSize() + readableBytes); } else { @@ -78,7 +81,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception callback.onProgress(taskInfo, chunkInfo); //分段下载完成关闭fileChannel if (chunkInfo.getDownSize() == chunkInfo.getTotalSize()) { - HttpDownUtil.safeClose(ctx.channel(), fileChannel); + HttpDownUtil.safeClose(ctx.channel(), chunkInfo); //分段下载完成回调 chunkInfo.setStatus(2); chunkInfo.setLastTime(System.currentTimeMillis()); @@ -123,10 +126,12 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception "下载响应:" + chunkInfo.getIndex() + "\t" + chunkInfo.getDownSize() + "\t" + httpResponse.headers().get( HttpHeaderNames.CONTENT_RANGE) + "\t" + realContentSize); - fileChannel = new RandomAccessFile(taskInfo.buildTaskFilePath(), "rw").getChannel(); - fileChannel.position(chunkInfo.getOriStartPosition() + chunkInfo.getDownSize()); + FileChannel fileChannel = new RandomAccessFile(taskInfo.buildTaskFilePath(), "rw").getChannel(); + MappedByteBuffer mappedBuffer = fileChannel.map(MapMode.READ_WRITE, + chunkInfo.getOriStartPosition() + chunkInfo.getDownSize(), chunkInfo.getTotalSize()); chunkInfo.setStatus(1); chunkInfo.setFileChannel(fileChannel); + chunkInfo.setMappedBuffer(mappedBuffer); callback.onChunkStart(taskInfo, chunkInfo); } } catch (Exception e) { @@ -162,31 +167,15 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E public static void main(String[] args) throws Exception { String path = "f:/down/test1.txt"; - RandomAccessFile r1 = new RandomAccessFile(path, "rw"); - r1.setLength(1024 * 1024 * 1024 * 1L); - r1.close(); - new Thread(() -> { - try { - FileChannel fileChannel = new RandomAccessFile(path, "rw").getChannel(); - fileChannel.position(1000); - long time = System.currentTimeMillis(); - fileChannel.write(ByteBuffer.wrap(new byte[1024])); - System.out.println("use1:" + (System.currentTimeMillis() - time)); - } catch (Exception e) { - e.printStackTrace(); - } - }).start(); - new Thread(() -> { - try { - FileChannel fileChannel = new RandomAccessFile(path, "rw").getChannel(); - fileChannel.position(4000); - long time = System.currentTimeMillis(); - fileChannel.write(ByteBuffer.wrap(new byte[1024])); - System.out.println("use2:" + (System.currentTimeMillis() - time)); - } catch (Exception e) { - e.printStackTrace(); - } - }).start(); + RandomAccessFile randomAccessFile = new RandomAccessFile(path, "rw"); + randomAccessFile.setLength(100); + randomAccessFile.close(); + FileChannel fileChannel = new RandomAccessFile(path, "rw").getChannel(); + MappedByteBuffer mbb = fileChannel.map(MapMode.READ_WRITE, 0, 5); + mbb.put(new byte[]{1, 2, 3, 4, 5}); + FileUtil.unmap(mbb); + fileChannel.close(); + FileUtil.deleteIfExists(path); } } diff --git a/src/main/java/lee/study/down/model/ChunkInfo.java b/src/main/java/lee/study/down/model/ChunkInfo.java index 7fb1278a..970196fb 100644 --- a/src/main/java/lee/study/down/model/ChunkInfo.java +++ b/src/main/java/lee/study/down/model/ChunkInfo.java @@ -3,6 +3,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import io.netty.channel.Channel; import java.io.Serializable; +import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import lombok.AllArgsConstructor; import lombok.Data; @@ -27,6 +28,8 @@ public class ChunkInfo implements Serializable { private transient volatile Channel channel; @JsonIgnore private transient volatile FileChannel fileChannel; + @JsonIgnore + private transient volatile MappedByteBuffer mappedBuffer; public ChunkInfo() { } diff --git a/src/main/java/lee/study/down/util/FileUtil.java b/src/main/java/lee/study/down/util/FileUtil.java index f073b74d..8bf25d99 100644 --- a/src/main/java/lee/study/down/util/FileUtil.java +++ b/src/main/java/lee/study/down/util/FileUtil.java @@ -3,9 +3,12 @@ import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; +import java.lang.reflect.Method; import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; import java.nio.file.Files; import java.util.Stack; +import sun.nio.ch.FileChannelImpl; public class FileUtil { @@ -111,7 +114,7 @@ public static File createFileSmart(String path) throws IOException { if (file.exists()) { file.delete(); file.createNewFile(); - }else{ + } else { createDirSmart(file.getParent()); file.createNewFile(); } @@ -139,8 +142,14 @@ public static File createDirSmart(String path) throws IOException { return file; } + public static void unmap(MappedByteBuffer mappedBuffer) throws Exception { + Method m = FileChannelImpl.class.getDeclaredMethod("unmap", MappedByteBuffer.class); + m.setAccessible(true); + m.invoke(FileChannelImpl.class, mappedBuffer); + } + public static void main(String[] args) throws Exception { - RandomAccessFile raf2 = new RandomAccessFile("F:\\百度云合并下载研究\\testbbb.txt","rw"); + RandomAccessFile raf2 = new RandomAccessFile("F:\\百度云合并下载研究\\testbbb.txt", "rw"); raf2.setLength(10); raf2.getChannel().position(10).read(ByteBuffer.allocate(5)); System.out.println(raf2.getChannel().position()); diff --git a/src/main/java/lee/study/down/util/HttpDownUtil.java b/src/main/java/lee/study/down/util/HttpDownUtil.java index 83ea4f25..e45eb803 100644 --- a/src/main/java/lee/study/down/util/HttpDownUtil.java +++ b/src/main/java/lee/study/down/util/HttpDownUtil.java @@ -24,6 +24,7 @@ import java.io.UnsupportedEncodingException; import java.net.InetSocketAddress; import java.net.URLDecoder; +import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.util.Map.Entry; import java.util.UUID; @@ -41,7 +42,7 @@ public class HttpDownUtil { - //private static final RecvByteBufAllocator RECV_BYTE_BUF_ALLOCATOR = new AdaptiveRecvByteBufAllocator(64,8192,65536); +// private static final RecvByteBufAllocator RECV_BYTE_BUF_ALLOCATOR = new AdaptiveRecvByteBufAllocator(64,8192,65536); public static final AttributeKey CLOSE_ATTR = AttributeKey.newInstance("close"); /** @@ -250,7 +251,7 @@ public static void taskDown(HttpDownInfo httpDownInfo) for (int i = 0; i < taskInfo.getChunkInfoList().size(); i++) { ChunkInfo chunkInfo = taskInfo.getChunkInfoList().get(i); //避免分段下载速度比总的下载速度大太多的问题 - chunkInfo.setStatus(1); +// chunkInfo.setStatus(1); chunkInfo.setStartTime(taskInfo.getStartTime()); chunkDown(httpDownInfo, chunkInfo); } @@ -267,6 +268,7 @@ public static void chunkDown(HttpDownInfo httpDownInfo, ChunkInfo chunkInfo) HttpDownServer.LOGGER.debug( "开始下载:" + chunkInfo.getIndex() + "\t" + chunkInfo.getDownSize()); ChannelFuture cf = HttpDownServer.DOWN_BOOT +// .option(ChannelOption.RCVBUF_ALLOCATOR,RECV_BYTE_BUF_ALLOCATOR) .handler( new HttpDownInitializer(requestProto.getSsl(), taskInfo, chunkInfo, HttpDownServer.CALLBACK)) @@ -314,7 +316,7 @@ public static void retryDown(TaskInfo taskInfo, ChunkInfo chunkInfo) public static void retryDown(TaskInfo taskInfo, ChunkInfo chunkInfo, long downSize) throws Exception { synchronized (chunkInfo) { - safeClose(chunkInfo.getChannel(), chunkInfo.getFileChannel()); + safeClose(chunkInfo.getChannel(), chunkInfo); if (setStatusIfNotDone(chunkInfo, 3)) { if (downSize != -1) { chunkInfo.setDownSize(downSize); @@ -340,7 +342,7 @@ public static void retryDown(TaskInfo taskInfo, ChunkInfo chunkInfo, long downSi public static void continueDown(TaskInfo taskInfo, ChunkInfo chunkInfo) throws Exception { synchronized (chunkInfo) { - safeClose(chunkInfo.getChannel(), chunkInfo.getFileChannel()); + safeClose(chunkInfo.getChannel(), chunkInfo); //避免同时两个重新下载 if (setStatusIfNotDone(chunkInfo, 5)) { //计算后续下载字节 @@ -351,7 +353,7 @@ public static void continueDown(TaskInfo taskInfo, ChunkInfo chunkInfo) } } - public static void safeClose(Channel channel, FileChannel fileChannel) { + public static void safeClose(Channel channel, ChunkInfo chunkInfo) { try { if (channel != null && channel.isOpen()) { channel.attr(CLOSE_ATTR).set(true); @@ -362,6 +364,7 @@ public static void safeClose(Channel channel, FileChannel fileChannel) { HttpDownServer.LOGGER.error("safeClose netty channel", e); } try { + FileChannel fileChannel = chunkInfo.getFileChannel(); if (fileChannel != null && fileChannel.isOpen()) { //关闭旧的下载文件连接 fileChannel.close(); @@ -369,6 +372,15 @@ public static void safeClose(Channel channel, FileChannel fileChannel) { } catch (IOException e) { HttpDownServer.LOGGER.error("safeClose file channel", e); } + try { + MappedByteBuffer mappedBuffer = chunkInfo.getMappedBuffer(); + if (mappedBuffer != null) { + //关闭旧的下载文件连接 + FileUtil.unmap(mappedBuffer); + } + } catch (Exception e) { + HttpDownServer.LOGGER.error("safeClose file mappedBuffer", e); + } } public static boolean setStatusIfNotDone(ChunkInfo chunkInfo, int update) {