From 358b61ebe02dfa8406f83bf58633569b3d0ae7eb Mon Sep 17 00:00:00 2001 From: Hugh Sanderson Date: Thu, 10 Oct 2024 23:11:15 +0800 Subject: [PATCH] Add a non-threaded, non-blocking http implementation --- src/nme/net/HttpLoader.hx | 165 ++++----- src/nme/net/HttpNonBlocking.hx | 617 +++++++++++++++++++++++++++++++++ src/nme/net/URLLoader.hx | 1 + 3 files changed, 702 insertions(+), 81 deletions(-) create mode 100644 src/nme/net/HttpNonBlocking.hx diff --git a/src/nme/net/HttpLoader.hx b/src/nme/net/HttpLoader.hx index ddc819cc3..eb0152ac6 100644 --- a/src/nme/net/HttpLoader.hx +++ b/src/nme/net/HttpLoader.hx @@ -7,7 +7,9 @@ import nme.events.ProgressEvent; import nme.events.HTTPStatusEvent; import nme.utils.ByteArray; import haxe.Http; -#if !js + +#if http_threaded + #if haxe4 import sys.thread.Mutex; import sys.thread.Thread; @@ -15,6 +17,12 @@ import sys.thread.Thread; import cpp.vm.Mutex; import cpp.vm.Thread; #end + +typedef HttpApi = haxe.Http; +#else + +typedef HttpApi = nme.net.HttpNonBlocking; + #end private class OutputWatcher extends haxe.io.BytesOutput @@ -44,7 +52,7 @@ private class OutputWatcher extends haxe.io.BytesOutput class HttpLoader { - #if !js + #if http_threaded static var jobs:ArrayVoid>; static var mutex:Mutex; static var workers = 0; @@ -64,10 +72,9 @@ class HttpLoader public var bytesLoaded(default,null):Int; public var bytesTotal(default,null):Int; public var state(default,null):Int; - var http:Http; - #if !js + var http:HttpApi; + var output:OutputWatcher; - #end public function new(inLoader:URLLoader, inRequest:URLRequest) { @@ -79,9 +86,10 @@ class HttpLoader code = 0; closed = false; - http = new Http(inRequest.url); + http = new HttpApi(inRequest.url); http.onError = onError; http.onStatus = onStatus; + http.onBytes = onComplete; for(header in urlRequest.requestHeaders) http.addHeader(header.name, header.value); @@ -93,10 +101,23 @@ class HttpLoader if (isPost) http.setPostBytes(urlRequest.nmeBytes); - #if wasm - run(); - #elseif !js - runAsync(run); + var output = new OutputWatcher(this); + var isPost = urlRequest.method==URLRequestMethod.POST; + + #if http_threaded + runAsync( () -> { + try + { + http.customRequest(isPost, output); + } + catch(e:Dynamic) + { + if (!closed) + onError(""+e); + } + } ); + #else + http.nonblockingRequest(isPost, output); #end } @@ -112,87 +133,65 @@ class HttpLoader closed = true; } - #if !js - public function run() + function onComplete(bytes:haxe.io.Bytes) { - try - { - var output = new OutputWatcher(this); + bytesLoaded = bytesTotal = bytes.length; - var isPost = urlRequest.method==URLRequestMethod.POST; - http.customRequest(isPost, output); - - if (state!=URLLoader.urlError) + var encoding = http.responseHeaders.get("Content-Encoding"); + if (encoding=="gzip") + { + var decoded = false; + try { - var bytes = output.getBytes(); - - bytesLoaded = bytesTotal = bytes.length; - - var encoding = http.responseHeaders.get("Content-Encoding"); - if (encoding=="gzip") + if (bytes.length>10 && bytes.get(0)==0x1f && bytes.get(1)==0x8b) { - var decoded = false; - try - { - if (bytes.length>10 && bytes.get(0)==0x1f && bytes.get(1)==0x8b) - { - var u = new haxe.zip.Uncompress(15|32); - var tmp = haxe.io.Bytes.alloc(1<<16); - u.setFlushMode(haxe.zip.FlushMode.SYNC); - var b = new haxe.io.BytesBuffer(); - var pos = 0; - while (true) { - var r = u.execute(bytes, pos, tmp, 0); - b.addBytes(tmp, 0, r.write); - pos += r.read; - if (r.done) - break; - } - u.close(); - bytes = b.getBytes(); - decoded = bytes!=null; - } + var u = new haxe.zip.Uncompress(15|32); + var tmp = haxe.io.Bytes.alloc(1<<16); + u.setFlushMode(haxe.zip.FlushMode.SYNC); + var b = new haxe.io.BytesBuffer(); + var pos = 0; + while (true) { + var r = u.execute(bytes, pos, tmp, 0); + b.addBytes(tmp, 0, r.write); + pos += r.read; + if (r.done) + break; } - catch(e:Dynamic) - { - trace(e); - } - - if (!decoded) - onError("Bad GZip data"); - } - - if (urlLoader.dataFormat== URLLoaderDataFormat.BINARY) - { - byteData = ByteArray.fromBytes(bytes); + u.close(); + bytes = b.getBytes(); + decoded = bytes!=null; } - else - { - #if neko - stringData = neko.Lib.stringReference(bytes); - #else - #if haxe4 - stringData = bytes.getString(0, bytes.length, UTF8); - #else - stringData = bytes.getString(0, bytes.length); - #end - #end - } - - state = URLLoader.urlComplete; } - else + catch(e:Dynamic) { - //trace(" -> error"); + trace(e); } + + if (!decoded) + onError("Bad GZip data"); } - catch(e:Dynamic) + + if (urlLoader.dataFormat== URLLoaderDataFormat.BINARY) { - if (!closed) - onError(""+e); + byteData = ByteArray.fromBytes(bytes); } + else + { + #if neko + stringData = neko.Lib.stringReference(bytes); + #else + #if haxe4 + stringData = bytes.getString(0, bytes.length, UTF8); + #else + stringData = bytes.getString(0, bytes.length); + #end + #end + } + + state = URLLoader.urlComplete; } - #end + + public function onBytesLoaded(count:Int) { @@ -220,7 +219,7 @@ class HttpLoader code = inStatus; } - #if !js + #if http_threaded public static function runAsync(job:Void->Void) { if (jobs==null) @@ -258,11 +257,15 @@ class HttpLoader } #end - - public static function pollAll() + inline public function update() { + #if !http_threaded + if (http!=null) + http.update(); + #end } + public function getErrorMessage() return errorMessage; public function getData(): ByteArray return byteData; public function getString(): String return stringData; diff --git a/src/nme/net/HttpNonBlocking.hx b/src/nme/net/HttpNonBlocking.hx new file mode 100644 index 000000000..e5604f97f --- /dev/null +++ b/src/nme/net/HttpNonBlocking.hx @@ -0,0 +1,617 @@ +/* + * Copyright (C)2005-2019 Haxe Foundation + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +package nme.net; + +import haxe.io.BytesOutput; +import haxe.io.BytesBuffer; +import haxe.io.Bytes; +import haxe.io.Input; +import sys.net.Host; +import sys.net.Socket; +import sys.Http; + +import haxe.io.Bytes; +import haxe.io.Output; +import haxe.io.Error; + +typedef IntRef = Array; + +private enum AsyncJob +{ + WriteBytes(b:Bytes, len:Int, doneRef:IntRef); + ReadHeaders(n:BytesBuffer, k:IntRef, s:Bytes, p:IntRef); + ParseHeaders(b:BytesBuffer, api:BytesOutput); + ReadChunk(api:BytesOutput, chunk_re:EReg, buf:Bytes, bufRead:IntRef); + ReadEof(api:BytesOutput, buf:Bytes); + ReadBytes(api:BytesOutput, buf:Bytes, remain:IntRef); +} + +class HttpNonBlocking extends sys.Http { + var sock:sys.net.Socket; + var asyncJobs:Array; + + public var onComplete:Void->Void; + + public function new(url:String) { + super(url); + } + + public function nonblockingRequest(post:Bool, api:BytesOutput, ?inSock:sys.net.Socket, ?method:String) { + this.responseAsString = null; + this.responseBytes = null; + sock = inSock; + var url_regexp = ~/^(https?:\/\/)?([a-zA-Z\.0-9_-]+)(:[0-9]+)?(.*)$/; + if (!url_regexp.match(url)) { + asyncError("Invalid URL"); + return; + } + var secure = (url_regexp.matched(1) == "https://"); + if (sock == null) { + if (secure) { + #if php + sock = new php.net.SslSocket(); + #elseif java + sock = new java.net.SslSocket(); + #elseif python + sock = new python.net.SslSocket(); + #elseif (!no_ssl && (hxssl || hl || cpp || (neko && !(macro || interp) || eval) || (lua && !lua_vanilla))) + sock = new sys.ssl.Socket(); + #elseif (neko || cpp) + throw "Https is only supported with -lib hxssl"; + #else + throw new haxe.exceptions.NotImplementedException("Https support in haxe.Http is not implemented for this target"); + #end + } else { + sock = new Socket(); + } + sock.setTimeout(cnxTimeout); + } + var host = url_regexp.matched(2); + var portString = url_regexp.matched(3); + var request = url_regexp.matched(4); + // ensure path begins with a forward slash + // this is required by original URL specifications and many servers have issues if it's not supplied + // see https://stackoverflow.com/questions/1617058/ok-to-skip-slash-before-query-string + if (request.charAt(0) != "/") { + request = "/" + request; + } + var port = if (portString == null || portString == "") secure ? 443 : 80 else Std.parseInt(portString.substr(1, portString.length - 1)); + + var multipart = (file != null); + var boundary = null; + var uri = null; + if (multipart) { + post = true; + boundary = Std.string(Std.random(1000)) + + Std.string(Std.random(1000)) + + Std.string(Std.random(1000)) + + Std.string(Std.random(1000)); + while (boundary.length < 38) + boundary = "-" + boundary; + var b = new StringBuf(); + for (p in params) { + b.add("--"); + b.add(boundary); + b.add("\r\n"); + b.add('Content-Disposition: form-data; name="'); + b.add(p.name); + b.add('"'); + b.add("\r\n"); + b.add("\r\n"); + b.add(p.value); + b.add("\r\n"); + } + b.add("--"); + b.add(boundary); + b.add("\r\n"); + b.add('Content-Disposition: form-data; name="'); + b.add(file.param); + b.add('"; filename="'); + b.add(file.filename); + b.add('"'); + b.add("\r\n"); + b.add("Content-Type: " + file.mimeType + "\r\n" + "\r\n"); + uri = b.toString(); + } else { + for (p in params) { + if (uri == null) + uri = ""; + else + uri += "&"; + uri += StringTools.urlEncode(p.name) + "=" + StringTools.urlEncode('${p.value}'); + } + } + + var b = new BytesOutput(); + if (method != null) { + b.writeString(method); + b.writeString(" "); + } else if (post) + b.writeString("POST "); + else + b.writeString("GET "); + + if (Http.PROXY != null) { + b.writeString("http://"); + b.writeString(host); + if (port != 80) { + b.writeString(":"); + b.writeString('$port'); + } + } + b.writeString(request); + + if (!post && uri != null) { + if (request.indexOf("?", 0) >= 0) + b.writeString("&"); + else + b.writeString("?"); + b.writeString(uri); + } + b.writeString(" HTTP/1.1\r\nHost: " + host + "\r\n"); + if (postData != null) { + postBytes = Bytes.ofString(postData); + postData = null; + } + if (postBytes != null) + b.writeString("Content-Length: " + postBytes.length + "\r\n"); + else if (post && uri != null) { + if (multipart || !Lambda.exists(headers, function(h) return h.name == "Content-Type")) { + b.writeString("Content-Type: "); + if (multipart) { + b.writeString("multipart/form-data"); + b.writeString("; boundary="); + b.writeString(boundary); + } else + b.writeString("application/x-www-form-urlencoded"); + b.writeString("\r\n"); + } + if (multipart) + b.writeString("Content-Length: " + (uri.length + file.size + boundary.length + 6) + "\r\n"); + else + b.writeString("Content-Length: " + uri.length + "\r\n"); + } + if( !Lambda.exists(headers, function(h) return h.name == "Connection") ) + b.writeString("Connection: close\r\n"); + for (h in headers) { + b.writeString(h.name); + b.writeString(": "); + b.writeString(h.value); + b.writeString("\r\n"); + } + b.writeString("\r\n"); + if (postBytes != null) + b.writeFullBytes(postBytes, 0, postBytes.length); + else if (post && uri != null) + b.writeString(uri); + + + try + { + if (Http.PROXY != null) + sock.connect(new Host(Http.PROXY.host), Http.PROXY.port); + else + sock.connect(new Host(host), port); + + var jobs = new Array(); + + if (multipart) + pushBody(b, file.io, file.size, boundary, jobs) + else + pushBody(b, null, 0, null, jobs); + + var headerBytes = new haxe.io.BytesBuffer(); + jobs.push( ReadHeaders( headerBytes, [4], haxe.io.Bytes.alloc(4), [0]) ); + jobs.push( ParseHeaders(headerBytes, api) ); + //jobs.push( SockClose ); + + sock.setBlocking(false); + asyncJobs = jobs; + + update(); + } + catch (e:Dynamic) + { + asyncError(Std.string(e)); + } + } + + function closeSocket() + { + asyncJobs = null; + if (sock!=null) + { + try { + sock.close(); + } catch(e:Dynamic) { } + sock = null; + } + } + + public function asyncError(e:String) + { + closeSocket(); + onError(e); + } + + + public function asyncComplete(output:BytesOutput) + { + closeSocket(); + success(output.getBytes()); + } + + public function isPending() return asyncJobs!=null && asyncJobs.length>0; + + public function update() + { + while(asyncJobs!=null && asyncJobs[0]!=null) + { + var job = asyncJobs[0]; + switch(job) + { + + case WriteBytes(bytes, len, done): + try + { + while( done[0]>(); + } + var array = responseHeadersSameKey.get(hname); + if (array == null) { + array = new Array(); + array.push(previousValue); + responseHeadersSameKey.set(hname, array); + } + array.push(hval); + } + } + responseHeaders.set(hname, hval); + switch (hname.toLowerCase()) { + case "content-length": + size = Std.parseInt(hval); + case "transfer-encoding": + chunked = (hval.toLowerCase() == "chunked"); + } + } + onStatus(status); + asyncJobs.shift(); + + if (status < 200 || status >= 400) + { + asyncError("Http Error #" + status); + return; + } + + if (chunked) + { + //if ((chunk_size != null || chunk_buf != null)) + // throw "Invalid chunk"; + var chunk_re = ~/^([0-9A-Fa-f]+)[ ]*\r\n/m; + var buf = haxe.io.Bytes.alloc(1024); + chunk_size = null; + chunk_buf = null; + asyncJobs.push( ReadChunk(api, chunk_re, buf, [0]) ); + } + else if (size==null) + { + if (!noShutdown) + sock.shutdown(false, true); + var buf = haxe.io.Bytes.alloc(1024); + asyncJobs.push( ReadEof(api,buf) ); + } + else + { + api.prepare(size); + var s:Int = size; + var buf = haxe.io.Bytes.alloc(s>4096 ? 4096 : s); + asyncJobs.push( ReadBytes(api,buf,[s]) ); + } + + + case ReadChunk(api, chunk_re, buf, bufRead): + try + { + while (true) + { + var bufsize = buf.length; + var remain = bufsize - bufRead[0]; + var len = sock.input.readBytes(buf, bufRead[0], remain); + if (len<1) + return; + bufRead[0] += len; + if (bufRead[0]==bufsize) + { + if (!readChunk(chunk_re, api, buf, bufRead[0])) + break; + bufRead[0] = 0; + } + } + } + catch (e:haxe.io.Error) + { + if (e!=Blocked) + asyncError("Error reading chunk " + e); + return; + } + catch (e:haxe.io.Eof) + { + var chunksDone = false; + if (bufRead[0]>0) + chunksDone = !readChunk(chunk_re, api, buf, bufRead[0]); + + if (!chunksDone) + asyncError( "Transfer aborted" ); + } + if (chunk_size != null || chunk_buf != null) + { + asyncError("Invalid chunk"); + } + else + { + asyncComplete(api); + } + + + case ReadEof(api, buf): + try + { + while (true) + { + var len = sock.input.readBytes(buf, 0, buf.length); + if (len == 0) + return; + api.writeBytes(buf, 0, len); + } + } + catch (e:haxe.io.Eof) + { + asyncComplete(api); + } + catch (e:haxe.io.Error) + { + if (e!=Blocked) + asyncError("Error reading data " + e); + return; + } + catch (e:Dynamic) + { + asyncError("Error reading data " + e); + } + + case ReadBytes(api, buf, remain): + try + { + while (true) + { + var l = remain[0]; + if (l>buf.length) + l = buf.length; + var read = sock.input.readBytes(buf, 0, l); + if (read == 0) + return; + api.writeBytes(buf, 0, read); + remain[0] -= read; + if (remain[0]==0) + { + asyncComplete(api); + return; + } + } + } + catch (e:haxe.io.Eof) + { + asyncError("EOF while reading bytes"); + return; + } + catch (e:haxe.io.Error) + { + if (e!=Blocked) + asyncError("Error reading data " + e); + return; + } + catch (e:Dynamic) + { + asyncError("Error reading bytes " + e); + return; + } + } + } + } + + + + function pushBody(body:Null, fileInput:Null, fileSize:Int, boundary:Null, jobs: Array) { + if (body != null) { + var bytes = body.getBytes(); + jobs.push( WriteBytes(bytes,bytes.length,[0]) ); + //sock.output.writeFullBytes(bytes, 0, bytes.length); + } + if (boundary != null) { + var bufsize = 4096; + var buf = haxe.io.Bytes.alloc(bufsize); + while (fileSize > 0) { + var size = if (fileSize > bufsize) bufsize else fileSize; + var len = 0; + try { + len = fileInput.readBytes(buf, 0, size); + } catch (e:haxe.io.Eof) + break; + jobs.push( WriteBytes(buf,len,[0]) ); + //sock.output.writeFullBytes(buf, 0, len); + fileSize -= len; + } + var str = "\r\n" + "--" + boundary + "--"; + var bytes = Bytes.ofString(str); + jobs.push( WriteBytes(bytes,bytes.length,[0]) ); + + //sock.output.writeString("\r\n"); + //sock.output.writeString("--"); + //sock.output.writeString(boundary); + //sock.output.writeString("--"); + } + } + +} + diff --git a/src/nme/net/URLLoader.hx b/src/nme/net/URLLoader.hx index 282d89015..d84ce7bf9 100644 --- a/src/nme/net/URLLoader.hx +++ b/src/nme/net/URLLoader.hx @@ -347,6 +347,7 @@ class URLLoader extends EventDispatcher #if !no_haxe_http else { + httpLoader.update(); bytesLoaded = httpLoader.bytesLoaded; bytesTotal = httpLoader.bytesTotal; state = httpLoader.state;