Skip to content
This repository has been archived by the owner on Jul 30, 2024. It is now read-only.

Commit

Permalink
下载同步问题处理
Browse files Browse the repository at this point in the history
  • Loading branch information
monkeyWie committed Jan 3, 2018
1 parent 4fb9a1b commit cc17820
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ public void onDone(TaskInfo taskInfo) {
HttpDownServer.LOGGER.error("call onDone:" + e);
}
WsUtil.sendMsg();
//检查是否为损坏文件
}

@Override
Expand Down
42 changes: 33 additions & 9 deletions src/main/java/lee/study/down/hanndle/HttpDownInitializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import lee.study.down.HttpDownServer;
import lee.study.down.dispatch.HttpDownCallback;
import lee.study.down.model.ChunkInfo;
import lee.study.down.model.TaskInfo;
import lee.study.down.util.FileUtil;
import lee.study.down.util.HttpDownUtil;

public class HttpDownInitializer extends ChannelInitializer {
Expand Down Expand Up @@ -50,6 +50,12 @@ protected void initChannel(Channel ch) throws Exception {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
synchronized (chunkInfo) {
Boolean close = ctx.channel().attr(HttpDownUtil.CLOSE_ATTR).get();
if (close!=null&&close==true) {
return;
}
}
if (msg instanceof HttpContent) {
if (fileChannel == null || !fileChannel.isOpen()) {
return;
Expand Down Expand Up @@ -155,14 +161,32 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
}

public static void main(String[] args) throws Exception {
RandomAccessFile r1 = new RandomAccessFile("f:/down/test1.txt", "rw");
RandomAccessFile r2 = new RandomAccessFile("f:/down/test2.txt", "rw");
r1.setLength(1024 * 1024 * 16);
r1.write(new byte[]{1, 3, 6, 3, 1, 6, 5, 9, 6, 5, 7});
// r1.write(new byte[]{2,6,3,3,9,7,4,7,8});
r2.setLength(1024 * 1024 * 16);
// r2.write(new byte[]{1,3,6,3,1,6,5,9,6,5,7});
r2.write(new byte[]{2, 6, 3, 3, 9, 7, 4, 7, 8});
String path = "f:/down/test1.txt";
RandomAccessFile r1 = new RandomAccessFile(path, "rw");
r1.setLength(1024 * 1024 * 1024 * 1L);
r1.close();
new Thread(() -> {
try {
FileChannel fileChannel = new RandomAccessFile(path, "rw").getChannel();
fileChannel.position(1000);
long time = System.currentTimeMillis();
fileChannel.write(ByteBuffer.wrap(new byte[1024]));
System.out.println("use1:" + (System.currentTimeMillis() - time));
} catch (Exception e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
FileChannel fileChannel = new RandomAccessFile(path, "rw").getChannel();
fileChannel.position(4000);
long time = System.currentTimeMillis();
fileChannel.write(ByteBuffer.wrap(new byte[1024]));
System.out.println("use2:" + (System.currentTimeMillis() - time));
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}

}
16 changes: 10 additions & 6 deletions src/main/java/lee/study/down/util/HttpDownUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.ssl.SslContext;
import java.io.File;
import io.netty.util.AttributeKey;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.net.URLDecoder;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Map.Entry;
import java.util.UUID;
Expand All @@ -42,6 +41,9 @@

public class HttpDownUtil {

//private static final RecvByteBufAllocator RECV_BYTE_BUF_ALLOCATOR = new AdaptiveRecvByteBufAllocator(64,8192,65536);
public static final AttributeKey<Boolean> CLOSE_ATTR = AttributeKey.newInstance("close");

/**
* 检测请求头是否存在
*/
Expand Down Expand Up @@ -164,7 +166,7 @@ public void channelRead(ChannelHandlerContext ctx0, Object msg0)
content.content().writeBytes(requestInfo.content());
cf.channel().writeAndFlush(content);
}
}else{
} else {
cdl.countDown();
}
});
Expand Down Expand Up @@ -236,8 +238,9 @@ public static void taskDown(HttpDownInfo httpDownInfo)
try {
FileUtil.deleteIfExists(taskInfo.buildTaskFilePath());
try (
RandomAccessFile randomAccessFile = new RandomAccessFile(taskInfo.buildTaskFilePath(),"rw")
){
RandomAccessFile randomAccessFile = new RandomAccessFile(taskInfo.buildTaskFilePath(),
"rw")
) {
randomAccessFile.setLength(taskInfo.getTotalSize());
}
//文件下载开始回调
Expand Down Expand Up @@ -351,6 +354,7 @@ public static void continueDown(TaskInfo taskInfo, ChunkInfo chunkInfo)
public static void safeClose(Channel channel, FileChannel fileChannel) {
try {
if (channel != null && channel.isOpen()) {
channel.attr(CLOSE_ATTR).set(true);
//关闭旧的下载连接
channel.close();
}
Expand All @@ -368,7 +372,7 @@ public static void safeClose(Channel channel, FileChannel fileChannel) {
}

public static boolean setStatusIfNotDone(ChunkInfo chunkInfo, int update) {
if (chunkInfo.getStatus() != 2) {
if (chunkInfo.getStatus() != 2 && chunkInfo.getStatus() != update) {
chunkInfo.setStatus(update);
return true;
}
Expand Down

0 comments on commit cc17820

Please sign in to comment.