From eb1d2a9dfcec489eac17f4f34190df45b7873d66 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Drvo=C5=A1t=C4=9Bp?= Date: Mon, 25 Mar 2024 17:46:33 +0100 Subject: [PATCH] Introduce `connections_lock` to protect `Server.connections` (#1161) * Introduce `connections_lock` for mutating `Server.connections` * Version bump --- Project.toml | 2 +- src/Servers.jl | 37 +++++++++++++++++++++++++------------ 2 files changed, 26 insertions(+), 13 deletions(-) diff --git a/Project.toml b/Project.toml index de51fe64..f2c9cbed 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "HTTP" uuid = "cd3eb016-35fb-5094-929b-558a96fad6f3" authors = ["Jacob Quinn", "contributors: https://github.com/JuliaWeb/HTTP.jl/graphs/contributors"] -version = "1.10.4" +version = "1.10.5" [deps] Base64 = "2a0f44e3-6c83-55bd-87e4-b1978d98bd5f" diff --git a/src/Servers.jl b/src/Servers.jl index 30d7c133..89f2939f 100644 --- a/src/Servers.jl +++ b/src/Servers.jl @@ -118,6 +118,9 @@ struct Server{L <: Listener} connections::Set{Connection} # server listenandserve loop task task::Task + # Protects the connections Set which is mutated in the listenloop + # while potentially being accessed by the close method at the same time + connections_lock::ReentrantLock end port(s::Server) = Int(s.listener.addr.port) @@ -127,8 +130,10 @@ Base.wait(s::Server) = wait(s.task) function forceclose(s::Server) shutdown(s.on_shutdown) close(s.listener) - for c in s.connections - close(c) + Base.@lock s.connections_lock begin + for c in s.connections + close(c) + end end return wait(s.task) end @@ -166,14 +171,19 @@ function Base.close(s::Server) shutdown(s.on_shutdown) close(s.listener) # first pass to mark or request connections to close - for c in s.connections - requestclose!(c) + Base.@lock s.connections_lock begin + for c in s.connections + requestclose!(c) + end end # second pass to wait for connections to close # we wait for connections to empty because as # connections close themselves, they are removed # from our connections Set - while !isempty(s.connections) + while true + Base.@lock s.connections_lock begin + isempty(s.connections) && break + end sleep(0.5 + rand() * 0.1) end return wait(s.task) @@ -346,25 +356,28 @@ function listen!(f, listener::Listener; access_log::Union{Function,Nothing}=nothing, verbose=false, kw...) conns = Set{Connection}() + conns_lock = ReentrantLock() ready_to_accept = Threads.Event() if verbose > 0 tsk = @_spawn_interactive LoggingExtras.withlevel(Logging.Debug; verbosity=verbose) do - listenloop(f, listener, conns, tcpisvalid, max_connections, readtimeout, access_log, ready_to_accept, verbose) + listenloop(f, listener, conns, tcpisvalid, max_connections, readtimeout, access_log, ready_to_accept, conns_lock, verbose) end else - tsk = @_spawn_interactive listenloop(f, listener, conns, tcpisvalid, max_connections, readtimeout, access_log, ready_to_accept, verbose) + tsk = @_spawn_interactive listenloop(f, listener, conns, tcpisvalid, max_connections, readtimeout, access_log, ready_to_accept, conns_lock, verbose) end # wait until the listenloop enters the loop wait(ready_to_accept) - return Server(listener, on_shutdown, conns, tsk) + return Server(listener, on_shutdown, conns, tsk, conns_lock) end """" Main server loop. Accepts new tcp connections and spawns async tasks to handle them." """ -function listenloop(f, listener, conns, tcpisvalid, - max_connections, readtimeout, access_log, ready_to_accept, verbose) +function listenloop( + f, listener, conns, tcpisvalid, max_connections, readtimeout, access_log, ready_to_accept, + conns_lock, verbose +) sem = Base.Semaphore(max_connections) verbose >= 0 && @infov 1 "Listening on: $(listener.hostname):$(listener.hostport), thread id: $(Threads.threadid())" notify(ready_to_accept) @@ -382,13 +395,13 @@ function listenloop(f, listener, conns, tcpisvalid, end conn = Connection(io) conn.state = IDLE - push!(conns, conn) + Base.@lock conns_lock push!(conns, conn) conn.host, conn.port = listener.hostname, listener.hostport @async try handle_connection(f, conn, listener, readtimeout, access_log) finally # handle_connection is in charge of closing the underlying io - delete!(conns, conn) + Base.@lock conns_lock delete!(conns, conn) Base.release(sem) end catch e