Skip to content

Commit

Permalink
ASYNC-254: direct return of blocking ops, avoiding dispatch
Browse files Browse the repository at this point in the history
  • Loading branch information
ghadishayban authored and puredanger committed Dec 17, 2024
1 parent 57b4fca commit 70183b1
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 29 deletions.
19 changes: 9 additions & 10 deletions src/main/clojure/clojure/core/async.clj
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ to catch and handle."
(lock-id [_] 0)
(commit [_] f))))

(defn- on-caller [f]
(with-meta f {:on-caller? true}))

(defn buffer
"Returns a fixed buffer of size n. When full, puts will block/park."
[n]
Expand Down Expand Up @@ -133,7 +136,7 @@ to catch and handle."
namespace docs)."
[port]
(let [p (promise)
ret (impl/take! port (fn-handler (fn [v] (deliver p v))))]
ret (impl/take! port (fn-handler (on-caller #(deliver p %))))]
(if ret
@ret
(deref p))))
Expand All @@ -156,7 +159,7 @@ to catch and handle."
Returns nil."
([port fn1] (take! port fn1 true))
([port fn1 on-caller?]
(let [ret (impl/take! port (fn-handler fn1))]
(let [ret (impl/take! port (fn-handler (if on-caller? (on-caller fn1) fn1)))]
(when ret
(let [val @ret]
(if on-caller?
Expand All @@ -172,7 +175,7 @@ to catch and handle."
namespace docs)."
[port val]
(let [p (promise)
ret (impl/put! port val (fn-handler (fn [open?] (deliver p open?))))]
ret (impl/put! port val (fn-handler (on-caller #(deliver p %))))]
(if ret
@ret
(deref p))))
Expand Down Expand Up @@ -204,7 +207,7 @@ to catch and handle."
true))
([port val fn1] (put! port val fn1 true))
([port val fn1 on-caller?]
(if-let [retb (impl/put! port val (fn-handler fn1))]
(if-let [retb (impl/put! port val (fn-handler (if on-caller? (on-caller fn1) fn1)))]
(let [ret @retb]
(if on-caller?
(fn1 ret)
Expand Down Expand Up @@ -309,7 +312,7 @@ to catch and handle."
namespace docs)."
[ports & opts]
(let [p (promise)
ret (do-alts (partial deliver p) ports (apply hash-map opts))]
ret (do-alts (on-caller #(deliver p %)) ports (apply hash-map opts))]
(if ret
@ret
(deref p))))
Expand Down Expand Up @@ -512,11 +515,7 @@ to catch and handle."
(defn- pipeline*
([n to xf from close? ex-handler type]
(assert (pos? n))
(let [ex-handler (or ex-handler (fn [ex]
(-> (Thread/currentThread)
.getUncaughtExceptionHandler
(.uncaughtException (Thread/currentThread) ex))
nil))
(let [ex-handler (or ex-handler dispatch/ex-handler)
jobs (chan n)
results (chan n)
process (fn [[v p :as job]]
Expand Down
25 changes: 12 additions & 13 deletions src/main/clojure/clojure/core/async/impl/channels.clj
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@
(cleanup [_])
(abort [_]))

(defn appm
"returns closure that applies f to arg and has the metadata of f"
[f arg]
(with-meta #(f arg) (meta f)))

(deftype ManyToManyChannel [^LinkedList takes ^LinkedList puts ^Queue buf closed ^Lock mutex add!]
MMC
(cleanup
Expand Down Expand Up @@ -58,7 +63,7 @@
(let [put-cb (and (impl/active? putter) (impl/commit putter))]
(.unlock putter)
(when put-cb
(dispatch/run (fn [] (put-cb true))))
(dispatch/run (appm put-cb true)))
(when (.hasNext iter)
(recur (.next iter)))))))
(.clear puts)
Expand Down Expand Up @@ -97,7 +102,7 @@
(if ret
(let [val (impl/remove! buf)]
(.remove iter)
(recur (conj takers (fn [] (ret val)))))
(recur (conj takers (appm ret val))))
(recur takers))))
takers))]
(if (seq take-cbs)
Expand Down Expand Up @@ -137,7 +142,7 @@
(if (and put-cb take-cb)
(do
(.unlock mutex)
(dispatch/run (fn [] (take-cb val)))
(dispatch/run (appm take-cb val))
(box true))
(if (and buf (not (impl/full? buf)))
(do
Expand Down Expand Up @@ -195,7 +200,7 @@
(abort this))
(.unlock mutex)
(doseq [cb cbs]
(dispatch/run #(cb true)))
(dispatch/run (appm cb true)))
(box val))
(do (.unlock mutex)
nil))
Expand All @@ -221,7 +226,7 @@
(if (and put-cb take-cb)
(do
(.unlock mutex)
(dispatch/run #(put-cb true))
(dispatch/run (appm put-cb true))
(box val))
(if @closed
(do
Expand Down Expand Up @@ -266,22 +271,16 @@
(.unlock taker)
(when take-cb
(let [val (when (and buf (pos? (count buf))) (impl/remove! buf))]
(dispatch/run (fn [] (take-cb val)))))
(dispatch/run (appm take-cb val))))
(.remove iter)
(when (.hasNext iter)
(recur (.next iter)))))))
(when buf (impl/close-buf! buf))
(.unlock mutex)
nil))))

(defn- ex-handler [ex]
(-> (Thread/currentThread)
.getUncaughtExceptionHandler
(.uncaughtException (Thread/currentThread) ex))
nil)

(defn- handle [buf exh t]
(let [else ((or exh ex-handler) t)]
(let [else ((or exh dispatch/ex-handler) t)]
(if (nil? else)
buf
(impl/add! buf else))))
Expand Down
14 changes: 12 additions & 2 deletions src/main/clojure/clojure/core/async/impl/dispatch.clj
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,17 @@
(when (.get ^ThreadLocal in-dispatch)
(throw (IllegalStateException. "Invalid blocking call in dispatch thread"))))

(defn ex-handler
"conveys given Exception to current thread's default uncaught handler. returns nil"
[ex]
(-> (Thread/currentThread)
.getUncaughtExceptionHandler
(.uncaughtException (Thread/currentThread) ex))
nil)

(defn run
"Runs Runnable r in a thread pool thread"
"Runs Runnable r on current thread when :on-caller? meta true, else in a thread pool thread."
[^Runnable r]
(impl/exec @executor r))
(if (-> r meta :on-caller?)
(try (.run r) (catch Throwable t (ex-handler t)))
(impl/exec @executor r)))
8 changes: 4 additions & 4 deletions src/test/clojure/clojure/core/async_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@
"The written value is the value provided to the read callback."))

(deftest take!-on-caller?
(is (apply not= (let [starting-thread (Thread/currentThread)
(is (apply = (let [starting-thread (Thread/currentThread)
test-channel (chan nil)
read-promise (promise)]
(take! test-channel (fn [_] (deliver read-promise (Thread/currentThread))) true)
(>!! test-channel :foo)
[starting-thread @read-promise]))
"When on-caller? requested, but no value is immediately
available, take!'s callback executes on another thread.")
available, take!'s callback executes putter's thread.")
(is (apply = (let [starting-thread (Thread/currentThread)
test-channel (chan nil)
read-promise (promise)]
Expand Down Expand Up @@ -101,14 +101,14 @@
[starting-thread @write-promise]))
"When on-caller? is false, but a reader can consume the value,
put!'s callback executes on a different thread.")
(is (apply not= (let [starting-thread (Thread/currentThread)
(is (apply = (let [starting-thread (Thread/currentThread)
test-channel (chan nil)
write-promise (promise)]
(put! test-channel :foo (fn [_] (deliver write-promise (Thread/currentThread))) true)
(take! test-channel (fn [_] nil))
[starting-thread @write-promise]))
"When on-caller? requested, but no reader can consume the value,
put!'s callback executes on a different thread."))
put!'s callback executes on a taker's thread."))


(deftest limit-async-take!-put!
Expand Down

0 comments on commit 70183b1

Please sign in to comment.