Skip to content

Commit 5afc8f8

Browse files
committed
reads of closed channels (would have to be ports atm) send nil to transform exactly once then boot the chan out of the read pool
1 parent 8500143 commit 5afc8f8

File tree

1 file changed

+13
-11
lines changed
  • src/main/clojure/clojure/core/async/flow

1 file changed

+13
-11
lines changed

src/main/clojure/clojure/core/async/flow/impl.clj

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -242,9 +242,9 @@
242242
io-id (zipmap (concat (vals ins) (vals outs)) (concat (keys ins) (keys outs)))
243243
control (::flow/control ins)
244244
;;TODO rotate/randomize after control per normal alts?
245-
read-chans (into [control] (-> ins (dissoc ::flow/control) vals))
245+
read-chans (vec (-> ins (dissoc ::flow/control) vals))
246246
run
247-
#(loop [status :paused, state state, count 0]
247+
#(loop [status :paused, state state, count 0, read-chans read-chans]
248248
(let [pong (fn []
249249
(let [pins (dissoc ins ::flow/control)
250250
pouts (dissoc outs ::flow/error ::flow/report)]
@@ -254,34 +254,36 @@
254254
:ins (zipmap (keys pins) (map chan->data (vals pins)))
255255
:outs (zipmap (keys pouts) (map chan->data (vals pouts)))})))
256256
handle-command (partial handle-command pid pong)
257-
[nstatus nstate count]
257+
[nstatus nstate count read-chans]
258258
(try
259259
(if (= status :paused)
260260
(let [nstatus (handle-command status (async/<!! control))
261261
nstate (handle-transition transition status nstatus state)]
262-
[nstatus nstate count])
262+
[nstatus nstate count read-chans])
263263
;;:running
264-
(let [[msg c] (async/alts!! read-chans :priority true)
264+
(let [[msg c] (async/alts!! (into [control] read-chans) :priority true)
265265
cid (io-id c)]
266266
(if (= c control)
267267
(let [nstatus (handle-command status msg)
268268
nstate (handle-transition transition status nstatus state)]
269-
[nstatus nstate count])
269+
[nstatus nstate count read-chans])
270270
(try
271271
(let [[nstate outputs] (transform state cid msg)
272272
[nstatus nstate]
273273
(send-outputs status nstate outputs outs
274274
resolver control handle-command transition)]
275-
[nstatus nstate (inc count)])
275+
[nstatus nstate (inc count) (if (some? msg)
276+
read-chans
277+
(vec (remove #{c} read-chans)))])
276278
(catch Throwable ex
277279
(async/>!! (outs ::flow/error)
278280
#::flow{:pid pid, :status status, :state state,
279281
:count (inc count), :cid cid, :msg msg :op :step, :ex ex})
280-
[status state count])))))
282+
[status state count read-chans])))))
281283
(catch Throwable ex
282284
(async/>!! (outs ::flow/error)
283-
#::flow{:pid pid, :status status, :state state, :count (inc count), :ex ex})
284-
[status state count]))]
285+
#::flow{:pid pid, :status status, :state state, :count (inc count), :ex ex})
286+
[status state count read-chans]))]
285287
(when-not (= nstatus :exit) ;;fall out
286-
(recur nstatus nstate (long count)))))]
288+
(recur nstatus nstate (long count) read-chans))))]
287289
((futurize run {:exec exs})))))))

0 commit comments

Comments
 (0)