diff --git a/src/rethinkdb/net.clj b/src/rethinkdb/net.clj index 147c540..93e7a05 100644 --- a/src/rethinkdb/net.clj +++ b/src/rethinkdb/net.clj @@ -13,7 +13,8 @@ [rethinkdb.query-builder :as qb] [rethinkdb.response :refer [parse-response]] [rethinkdb.types :as types]) - (:import [java.io Closeable])) + (:import [java.io Closeable] + [clojure.lang Keyword])) (declare send-continue-query send-stop-query) @@ -54,7 +55,7 @@ (s/stream->seq stream)) java.lang.Iterable (iterator [this] - (.iterator (seq this))) + (.iterator ^Iterable (seq this))) java.util.Collection (toArray [this] (into-array Object this)) @@ -78,7 +79,7 @@ (do (swap! (:conn conn) update-in [:pending token] #(dissoc % :cursor)) (s/put-all! cursor (conj resp ::done))) (do (swap! (:conn conn) update :pending #(dissoc % token)) - (s/put! result resp) + (s/put-all! result resp) (s/close! result))))) (defn append-result [conn token resp] @@ -93,9 +94,10 @@ (defn handle-response [conn token resp] (let [{type :t resp :r etype :e notes :n :as json-resp} resp] + (log/debug "Handling response" token type resp) (case (int type) (1 5) ;; Success atom, server info - (deliver-result conn token (first resp)) + (deliver-result conn token resp) 2 ;; Success sequence (deliver-result conn token resp) @@ -137,7 +139,7 @@ (-> json (json/parse-string-strict true) parse-response))) - (io/decode-channel (:client @conn) query-protocol))) + (io/decode-stream (:client @conn) query-protocol))) (defn add-global-optargs [{:keys [db]} query] @@ -149,6 +151,7 @@ (defn add-token [conn query] (let [token (:token (swap! (:conn conn) update-in [:token] inc)) query (assoc query :token token)] + (assert (nil? (get-in @conn [:pending token]))) (swap! (:conn conn) assoc-in [:pending token] query) query)) @@ -157,12 +160,13 @@ (async/pipeline 1 query-chan (map (partial add-token conn)) initial-query-chan) (async/go-loop [] (when-let [{:keys [term query-type token]} (async/ - + diff --git a/test/rethinkdb/async_test.clj b/test/rethinkdb/async_test.clj new file mode 100644 index 0000000..e61379b --- /dev/null +++ b/test/rethinkdb/async_test.clj @@ -0,0 +1,69 @@ +(ns rethinkdb.async-test + (:require [clojure.test :refer :all] + [clojure.core.async :as async] + [rethinkdb.query :as r] + [rethinkdb.test-utils :as utils]) + (:import (clojure.core.async.impl.protocols ReadPort) + (org.slf4j LoggerFactory) + (ch.qos.logback.classic Logger Level))) + +(use-fixtures :each utils/setup-each) +(use-fixtures :once utils/setup-once) + +(def pokemon [{:national_no 25 :name "Pikachu"} + {:national_no 26 :name "Raichu"} + {:national_no 27 :name "Sandshrew"} + {:national_no 28 :name "Nidoran"}]) +(def changefeed-pokemon (map #(hash-map :new_val %) pokemon)) + + +(deftest always-return-async + (let [root-logger ^Logger (LoggerFactory/getLogger "rethinkdb.net") + level (.getLevel root-logger)] + (.setLevel root-logger Level/OFF) + (with-open [conn (r/connect :async? true)] + (are [query] (instance? ReadPort (r/run query conn)) + (r/db-list) + (-> (r/db :non-existent) (r/table :nope)) + (-> (r/db utils/test-db) (r/table utils/test-table) (r/insert {:a 1})))) + (.setLevel root-logger level))) + +(deftest async-results + (with-open [conn (r/connect :async? true :db utils/test-db)] + (testing "Shape of async results" + (are [expected query] (= (->> (r/run query conn) + (async/into []) + (async/ (r/table utils/test-table) + (r/insert pokemon)) + + ;; Success sequence + pokemon (-> (r/table utils/test-table)) + + ;; Success atom + [pokemon] (-> (r/table utils/test-table) (r/order-by :name)) + + ;; Changefeed + changefeed-pokemon + (-> (r/table utils/test-table) + (r/changes {:include-initial true}) + (r/limit 2)))) + + #_(testing "Closing a changefeed with results" + (let [changefeed (-> (r/table utils/test-table) + (r/changes {:include-initial true}) + (r/run conn))] + (async/go (async/ (r/table test-table) (r/delete {:durability :soft :return-changes false}) (r/run conn)) (test-fn))) (defn setup-once [test-fn] - (with-open [conn (r/connect)] + ;; Add token to help distinguish between setup and test code when tracing responses + (with-open [conn (r/connect :token 4000)] (ensure-db test-db conn) (ensure-table test-table {:primary-key :national_no} conn) (test-fn)