Skip to content

Commit 7ae6c30

Browse files
authored
Merge pull request #63 from phronmophobic/sse-close-stream
Make sure sse async channel gets closed.
2 parents 4cce875 + 36ddb35 commit 7ae6c30

File tree

2 files changed

+77
-25
lines changed

2 files changed

+77
-25
lines changed

src/wkok/openai_clojure/sse.clj

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
(when on-next
1616
(a/go
1717
(loop []
18-
(let [event (a/<! events)]
18+
(when-let [event (a/<! events)]
1919
(when (not= :done event)
2020
(on-next event)
2121
(recur)))))))
@@ -43,33 +43,39 @@
4343
(defn sse-events
4444
"Returns a core.async channel with events as clojure data structures.
4545
Inspiration from https://gist.github.com/oliyh/2b9b9107e7e7e12d4a60e79a19d056ee"
46-
[{:keys [request params]}]
47-
(let [event-stream ^InputStream (:body (http/request (merge request
46+
[{:keys [request params] :as m}]
47+
(let [close? (:stream/close? params)
48+
event-stream ^InputStream (:body (http/request (merge request
4849
params
4950
{:as :stream})))
5051
buffer-size (calc-buffer-size params)
5152
events (a/chan (a/buffer buffer-size) (map parse-event))]
5253
(a/thread
53-
(loop [byte-coll []]
54-
(let [byte-arr (byte-array (max 1 (.available event-stream)))
55-
bytes-read (.read event-stream byte-arr)]
56-
57-
(if (neg? bytes-read)
58-
59-
;; Input stream closed, exiting read-loop
60-
(.close event-stream)
61-
62-
(let [next-byte-coll (concat byte-coll (seq byte-arr))
63-
data (slurp (byte-array next-byte-coll))]
64-
(if-let [es (not-empty (re-seq event-mask data))]
65-
(if (every? true? (map #(a/>!! events %) es))
66-
(recur (drop (apply + (map #(count (.getBytes ^String %)) es))
67-
next-byte-coll))
68-
69-
;; Output stream closed, exiting read-loop
70-
(.close event-stream))
71-
72-
(recur next-byte-coll)))))))
54+
(try
55+
(loop [byte-coll []]
56+
(let [byte-arr (byte-array (max 1 (.available event-stream)))
57+
bytes-read (.read event-stream byte-arr)]
58+
59+
(if (neg? bytes-read)
60+
61+
;; Input stream closed, exiting read-loop
62+
nil
63+
64+
(let [next-byte-coll (concat byte-coll (seq byte-arr))
65+
data (slurp (byte-array next-byte-coll))]
66+
(if-let [es (not-empty (re-seq event-mask data))]
67+
(if (every? true? (map #(a/>!! events %) es))
68+
(recur (drop (apply + (map #(count (.getBytes ^String %)) es))
69+
next-byte-coll))
70+
71+
;; Output stream closed, exiting read-loop
72+
nil)
73+
74+
(recur next-byte-coll))))))
75+
(finally
76+
(when close?
77+
(a/close! events))
78+
(.close event-stream))))
7379
events))
7480

7581
(defn sse-request

test/wkok/openai_clojure/sse_test.clj

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,20 @@
1111
(defn- generate-events
1212
[data-coll]
1313
(let [events (map #(str "data: " (json/generate-string %)) data-coll)]
14-
(str/join "\n\n" (concat events ["data: [DONE]"]))))
14+
(str/join
15+
(eduction
16+
cat
17+
(map #(str % "\n\n"))
18+
[events ["data: [DONE]"]]))))
1519

1620
(defn- stream-string
1721
[^PipedOutputStream output-stream ^String s]
1822
(future
1923
(doseq [c (seq (.getBytes s))]
2024
(.write output-stream ^int c)
2125
(.flush output-stream)
22-
(Thread/sleep 1))))
26+
(Thread/sleep 1))
27+
(.close output-stream)))
2328

2429
(deftest sse-events-test
2530
(testing "channel can get events"
@@ -33,8 +38,49 @@
3338
(is (= (first test-data)
3439
(a/<!! events)))
3540
(is (= (second test-data)
41+
(a/<!! events)))
42+
(is (= :done
43+
(a/<!! events)))
44+
(is (= nil
45+
(a/poll! events))))))))
46+
47+
(testing "channel events with `stream/close?` parameter"
48+
(let [test-data [{:text "hello"} {:text "world"}]
49+
test-events (generate-events test-data)]
50+
(with-open [output-stream (PipedOutputStream.)
51+
input-stream (PipedInputStream. output-stream)]
52+
(with-redefs [http/request (constantly {:body input-stream})]
53+
(let [events (sse/sse-events {:params {:stream/close? true}})]
54+
(stream-string output-stream test-events)
55+
(is (= (first test-data)
56+
(a/<!! events)))
57+
(is (= (second test-data)
58+
(a/<!! events)))
59+
(is (= :done
60+
(a/<!! events)))
61+
(is (= nil
3662
(a/<!! events))))))))
3763

64+
(testing "channel events with `:on-next`"
65+
(doseq [close? [true false]]
66+
(let [test-data [{:text "hello"} {:text "world"}]
67+
test-events (generate-events test-data)]
68+
(with-open [output-stream (PipedOutputStream.)
69+
input-stream (PipedInputStream. output-stream)]
70+
(with-redefs [http/request (constantly {:body input-stream})]
71+
(let [events (sse/sse-events {:stream/close? close?})
72+
results (promise)
73+
;; accumulate results and fulfill promise when done
74+
on-next (let [acc (volatile! [])]
75+
(fn [x]
76+
(vswap! acc conj x)
77+
(when (= x (last test-data))
78+
(deliver results @acc))))]
79+
(sse/deliver-events events {:on-next on-next})
80+
(stream-string output-stream test-events)
81+
(is (= test-data
82+
@results))))))))
83+
3884
(testing "support multibytes"
3985
(let [test-data [{:text "こんにちは"} {:text "你好"}]
4086
test-events (generate-events test-data)]

0 commit comments

Comments
 (0)