Skip to content

Commit fa7b16c

Browse files
committed
Return async results as a stream
In an async query, success sequence and success partial rows are put on the channel row by row. This means that the consumer can be unaware of the batching that takes place between the driver and RethinkDB.
1 parent fc23866 commit fa7b16c

File tree

3 files changed

+35
-29
lines changed

3 files changed

+35
-29
lines changed

src/rethinkdb/net.clj

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@
150150
(defn add-token [conn query]
151151
(let [token (:token (swap! (:conn conn) update-in [:token] inc))
152152
query (assoc query :token token)]
153+
(assert (nil? (get-in @conn [:pending token])))
153154
(swap! (:conn conn) assoc-in [:pending token] query)
154155
query))
155156

@@ -176,6 +177,7 @@
176177
(let [{:keys [async?]} query
177178
{:keys [initial-query-chan]} @conn
178179
stream (s/stream)]
180+
(log/debug "Init query" query)
179181
(async/go (async/>! initial-query-chan
180182
(assoc query :result stream)))
181183
(if async?

test-resources/logback-test.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@
88
<root level="ERROR">
99
<appender-ref ref="STDOUT"/>
1010
</root>
11-
<logger name="rethinkdb.net" level="INFO"/>
11+
<logger name="rethinkdb.net" level="DEBUG"/>
1212
</configuration>

test/rethinkdb/async_test.clj

Lines changed: 32 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,11 @@
1010
(use-fixtures :each utils/setup-each)
1111
(use-fixtures :once utils/setup-once)
1212

13+
(def pokemon [{:national_no 25 :name "Pikachu"}
14+
{:national_no 26 :name "Raichu"}])
15+
(def changefeed-pokemon (map #(hash-map :new_val %) pokemon))
16+
17+
1318
(deftest always-return-async
1419
(let [root-logger ^Logger (LoggerFactory/getLogger "rethinkdb.net")
1520
level (.getLevel root-logger)]
@@ -22,31 +27,30 @@
2227
(.setLevel root-logger level)))
2328

2429
(deftest async-results
25-
(let [conn (r/connect :async? true :db utils/test-db)
26-
pokemon [{:national_no 25 :name "Pikachu"}
27-
{:national_no 26 :name "Raichu"}]]
28-
(are [expected query] (= (->> (r/run query conn)
29-
(async/into [])
30-
(async/<!!))
31-
expected)
32-
;; Insert (success atom)
33-
[{:deleted 0
34-
:errors 0
35-
:inserted 2
36-
:replaced 0
37-
:skipped 0
38-
:unchanged 0}]
39-
(-> (r/table utils/test-table)
40-
(r/insert pokemon))
41-
42-
;; Success sequence
43-
pokemon (-> (r/table utils/test-table))
44-
45-
;; Success atom
46-
[pokemon] (-> (r/table utils/test-table) (r/order-by :name))
47-
48-
;; Changefeed
49-
(map #(hash-map :new_val %) pokemon)
50-
(-> (r/table utils/test-table)
51-
(r/changes {:include-initial true})
52-
(r/limit 2)))))
30+
(with-open [conn (r/connect :async? true :db utils/test-db)]
31+
(testing "Shape of async results"
32+
(are [expected query] (= (->> (r/run query conn)
33+
(async/into [])
34+
(async/<!!))
35+
expected)
36+
;; Insert (success atom)
37+
[{:deleted 0
38+
:errors 0
39+
:inserted 2
40+
:replaced 0
41+
:skipped 0
42+
:unchanged 0}]
43+
(-> (r/table utils/test-table)
44+
(r/insert pokemon))
45+
46+
;; Success sequence
47+
pokemon (-> (r/table utils/test-table))
48+
49+
;; Success atom
50+
[pokemon] (-> (r/table utils/test-table) (r/order-by :name))
51+
52+
;; Changefeed
53+
changefeed-pokemon
54+
(-> (r/table utils/test-table)
55+
(r/changes {:include-initial true})
56+
(r/limit 2))))))

0 commit comments

Comments
 (0)