Skip to content

Commit 1f4361d

Browse files
committed
[new] [#477] Add gzip-wrapping packer
1 parent 5500995 commit 1f4361d

File tree

5 files changed

+282
-32
lines changed

5 files changed

+282
-32
lines changed
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
(ns taoensso.sente.packers.gzip
2+
{:author "Peter Taoussanis (@ptaoussanis)"}
3+
(:require
4+
[taoensso.truss :as truss]
5+
[taoensso.sente.packers.impl.gzip :as impl]
6+
[taoensso.sente.interfaces :as i]))
7+
8+
#?(:clj (defn- ba->str ^String [^bytes ba] (String. ba java.nio.charset.StandardCharsets/UTF_8)))
9+
#?(:clj (defn- str->ba ^bytes [^String s] (.getBytes s java.nio.charset.StandardCharsets/UTF_8)))
10+
11+
#?(:cljs (let [encoder (js/TextEncoder.)] (defn- str->u8s [^string s] (.encode encoder s))))
12+
#?(:cljs (let [decoder (js/TextDecoder. "utf-8")] (defn- u8s->str [^js u8s] (.decode decoder u8s))))
13+
14+
(defn wrap-packer
15+
"Experimental, please test carefully and report any issues!
16+
17+
Returns Sente packer that wraps another with gzip compression.
18+
Needs `js/CompressionStream` browser support.
19+
20+
If `packer` takes+returns platform bytes: `binary?` should be true.
21+
If `packer` takes+returns platform strings: `binary?` should be false."
22+
23+
[packer {:keys [binary?]}]
24+
#?(:clj
25+
(reify i/IPacker2
26+
(unpack [_ ws? ba cb]
27+
(if binary?
28+
(i/unpack packer ws? (impl/gunzip ba) cb)
29+
(i/unpack packer ws? (ba->str (impl/gunzip ba)) cb)))
30+
31+
(pack [_ ws? x cb]
32+
(i/pack packer ws? x
33+
(fn [{:keys [value error]}]
34+
(if error
35+
(cb {:error error})
36+
(cb {:value (impl/gzip (if binary? value (str->ba value)))}))))))
37+
38+
:cljs
39+
(do
40+
(when-not (exists? js/CompressionStream) (truss/ex-info! "CompressionStream not supported"))
41+
(when-not (exists? js/DecompressionStream) (truss/ex-info! "DecompressionStream not supported"))
42+
(reify i/IPacker2
43+
(unpack [_ ws? packed cb]
44+
(->
45+
(impl/gunzip (impl/as-u8s packed)) ; Decompress -> u8s promise
46+
(.then
47+
(fn [u8s]
48+
(if binary?
49+
(i/unpack packer ws? u8s cb)
50+
(i/unpack packer ws? (u8s->str u8s) cb))))
51+
(.catch (fn [err] (cb {:error err})))))
52+
53+
(pack [_ ws? cljs-val cb]
54+
(i/pack packer ws? cljs-val
55+
(fn [{:keys [error value]}] ; Serialize -> text/bin value
56+
(if error
57+
(cb {:error error})
58+
(-> (impl/gzip (if binary? (impl/as-u8s value) (str->u8s value))) ; Compress -> u8s promise
59+
(.then (fn [u8s] (cb {:value u8s})))
60+
(.catch (fn [err] (cb {:error err}))))))))))))
61+
62+
(comment
63+
(let [p (wrap-packer (taoensso.sente.packers.edn/get-packer) {:binary? false})]
64+
(i/pack p :ws [:chsk/ws-ping "foo"]
65+
(fn [{packed :value}]
66+
(i/unpack p :ws packed
67+
(fn [{clj :value}] clj))))))
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
(ns taoensso.sente.packers.impl.gzip)
2+
3+
(def ^:private ^:const prefix-raw (byte 0x00))
4+
(def ^:private ^:const prefix-gzip (byte 0x01))
5+
(def ^:private ^:const min-bytes 1024)
6+
7+
(defn- prefix
8+
"Returns given byte[] with added flag prefix."
9+
^bytes [gzip? ^bytes ba]
10+
(let [len (alength ba)
11+
out (byte-array (inc len))]
12+
(aset-byte out 0 (if gzip? prefix-gzip prefix-raw))
13+
(System/arraycopy ba 0 out 1 len)
14+
(do out)))
15+
16+
(defn gzip
17+
"Uncompressed byte[] -> compressed byte[]."
18+
^bytes [^bytes ba]
19+
(let [len (alength ba)]
20+
(if (< len min-bytes)
21+
(prefix false ba)
22+
(with-open [baos (java.io.ByteArrayOutputStream.)]
23+
(.write baos (int prefix-gzip))
24+
(with-open [gos (java.util.zip.GZIPOutputStream. baos)]
25+
(.write gos ba 0 len))
26+
(.toByteArray baos)))))
27+
28+
(defn gunzip
29+
"Compressed byte[] -> uncompressed byte[]."
30+
^bytes [^bytes ba]
31+
(let [len (alength ba)
32+
flag (bit-and 0xFF (aget ba 0))]
33+
(if-not (== flag prefix-gzip)
34+
(java.util.Arrays/copyOfRange ba 1 len)
35+
(with-open [bais (java.io.ByteArrayInputStream. ba 1 (dec len))
36+
gis (java.util.zip.GZIPInputStream. bais)
37+
baos (java.io.ByteArrayOutputStream.)]
38+
(.transferTo gis baos)
39+
(.toByteArray baos)))))
40+
41+
(comment
42+
(defn bn [len] (byte-array (take len (cycle (range 128)))))
43+
44+
(let [ba-sm (bn 128)
45+
ba-lg (bn 2048)]
46+
(taoensso.encore/qb 1e4 ; [3.6 280.32]
47+
(gunzip (gzip ba-sm))
48+
(gunzip (gzip ba-lg))))
49+
50+
(take 16 (gunzip (gzip (bn 128))))
51+
(take 16 (gunzip (gzip (bn 2048)))))
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
(ns taoensso.sente.packers.impl.gzip)
2+
3+
(def ^:private ^:const prefix-raw 0x00)
4+
(def ^:private ^:const prefix-gzip 0x01)
5+
(def ^:private ^:const min-bytes 1024)
6+
7+
(defn- type-name [x]
8+
(let [ctor (type x)]
9+
(or
10+
(.-name ctor) (.-displayName ctor) (goog/typeOf x)
11+
(try (pr-str ctor) (catch :default _ nil))
12+
"<unknown>")))
13+
14+
(defn- as-u8s
15+
"#{Uint8Array ArrayBuffer DataView} -> Uint8Array"
16+
^js [input]
17+
(cond
18+
(instance? js/Uint8Array input) input
19+
(instance? js/ArrayBuffer input) (js/Uint8Array. input)
20+
(instance? js/DataView input) (js/Uint8Array. (.-buffer input)
21+
(.-byteOffset input)
22+
(.-byteLength input))
23+
:else
24+
(throw
25+
(ex-info "Unexpected input type"
26+
{:type (type-name input),
27+
:expected '#{Uint8Array ArrayBuffer DataView}}))))
28+
29+
(defn- prefix
30+
"Returns given Uint8Array with added flag prefix."
31+
[gzip? ^js u8s]
32+
(let [out (js/Uint8Array. (inc (.-length u8s)))]
33+
(aset out 0 (if gzip? prefix-gzip prefix-raw))
34+
(.set out u8s 1)
35+
(do out)))
36+
37+
(defn- stream->u8s [readable] (-> (js/Response. readable) (.arrayBuffer) (.then #(js/Uint8Array. %))))
38+
39+
(defn gzip
40+
"Uncompressed Uint8Array -> compressed Uint8Array Promise."
41+
[^js u8s]
42+
(if (< (.-length u8s) min-bytes)
43+
(js/Promise.resolve (prefix false u8s))
44+
(let [cs (js/CompressionStream. "gzip")
45+
writer (.getWriter (.-writable cs))]
46+
(->
47+
(.write writer u8s)
48+
(.then (fn [] (.close writer) (stream->u8s (.-readable cs))))
49+
(.then (fn [u8s] (prefix true u8s)))))))
50+
51+
(defn gunzip
52+
"Compressed Uint8Array -> uncompressed Uint8Array Promise."
53+
[^js u8s]
54+
(let [flag (aget u8s 0)
55+
body (.subarray u8s 1)]
56+
(if-not (== flag prefix-gzip)
57+
(js/Promise.resolve body)
58+
(let [ds (js/DecompressionStream. "gzip")
59+
writer (.getWriter (.-writable ds))]
60+
(->
61+
(.write writer body)
62+
(.then (fn [] (.close writer) (stream->u8s (.-readable ds)))))))))

src/taoensso/sente/packers/msgpack.cljc

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,17 @@
66
[taoensso.sente.interfaces :as i]))
77

88
(defn get-packer
9-
"Returns Sente packer that uses the binary MessagePack
9+
"Experimental, please test carefully and report any issues!
10+
11+
Returns Sente packer that uses the binary MessagePack
1012
format, Ref. <https://msgpack.org/index.html>.
1113
1214
Clj/s MessagePack implementation adapted from
13-
<https://github.com/rosejn/msgpack-cljc>.
14-
15-
Experimental, please test carefully and report any issues!"
15+
<https://github.com/rosejn/msgpack-cljc>."
1616
[]
1717
(reify i/IPacker2
18-
(pack [_ _ws? x cb-fn] (cb-fn {:value (msgpack/pack x)}))
19-
(unpack [_ _ws? ba cb-fn] (cb-fn {:value (msgpack/unpack ba)}))))
18+
(pack [_ _ws? x cb] (cb {:value (msgpack/pack x)}))
19+
(unpack [_ _ws? in cb] (cb {:value (msgpack/unpack in)}))))
2020

2121
(comment
2222
(let [p (get-packer)]

test/taoensso/sente_tests.cljc

Lines changed: 96 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,26 @@
1818
[taoensso.encore :as enc]
1919

2020
;; :cljs cannot compile taoensso.sente under :nodejs target
21-
#?(:clj [taoensso.sente :as sente])
21+
#?(:clj [taoensso.sente :as sente])
2222

2323
[taoensso.sente.interfaces :as i]
2424
[taoensso.sente.packers.edn :as ep]
2525
[taoensso.sente.packers.transit :as tp]
26-
[taoensso.sente.packers.msgpack :as mp])
27-
28-
#?(:cljs
29-
(:require-macros
30-
[taoensso.sente-tests :refer [get-bench-data]])))
26+
[taoensso.sente.packers.msgpack :as mp]
27+
[taoensso.sente.packers.gzip :as gz]))
3128

3229
(comment
3330
(remove-ns 'taoensso.sente-tests)
3431
(test/run-tests 'taoensso.sente-tests))
3532

3633
;;;;
3734

38-
(defn get-bench-data []
35+
(def ^:const timeout-msecs 4000)
36+
(def ^:const bench-laps 1e2)
37+
38+
;;;; Test data
39+
40+
(defn get-test-data []
3941
#_{:a :A :b :B :c "foo", :v (vec (range 128)), :s (set (range 128))}
4042
{:ids (vec (repeatedly 1024 (fn [] (rand-int 32767))))
4143
:data
@@ -63,29 +65,97 @@
6365
:submit-date (str (enc/now-inst))
6466
:display-name (enc/uuid-str)
6567
:user nil
68+
:uuid (random-uuid)
6669
:a-vector-of-integers (vec (repeatedly 64 (fn [] (rand-int 128))))
6770
:children-count (rand-int 1024)
6871
:a-usually-absent-string nil})))})
6972

70-
(defn packed-len [x]
71-
#?(:cljs (if (instance? js/Uint8Array x) (.-length x) (count x))
72-
:clj (count x)))
73-
74-
(defn bench1 [packer laps data]
75-
{:time (enc/qb laps (i/pack packer nil data (fn [x] (i/unpack packer nil (get x :value) (fn [y] (get y :value))))))
76-
:size (packed-len (i/pack packer nil data (fn [x] (get x :value))))})
77-
78-
(deftest bench-packers
79-
(let [laps 1e2
80-
bd (get-bench-data)
81-
ep (ep/get-packer)
82-
tp (tp/get-packer)
83-
mp (mp/get-packer)]
84-
85-
(println "Benching comparison:"
86-
{:ep (bench1 ep laps bd)
87-
:tp (bench1 tp laps bd)
88-
:mp (bench1 mp laps bd)})))
73+
(def td1 (get-test-data))
74+
(def td2 (get-test-data))
75+
76+
;;;; Packers
77+
78+
(defn roundtrip
79+
"Returns Clj/s promise."
80+
([data packer cb-fn]
81+
(i/pack packer nil data
82+
(fn [packed]
83+
(i/unpack packer nil (get packed :value)
84+
(fn [unpacked] (cb-fn (get unpacked :value)))))))
85+
86+
([data packer]
87+
#?(:clj (let [p (promise)] (roundtrip data packer (fn [v] (deliver p v))) p)
88+
:cljs (js/Promise. (fn [resolve _reject] (roundtrip data packer (fn [v] (resolve v))))))))
89+
90+
(comment @(roundtrip :data (ep/get-packer)))
91+
92+
(defn test-promise [p f] ; f is test-fn
93+
#?(:clj (f (deref p timeout-msecs :timeout))
94+
:cljs
95+
(test/async done
96+
(let [done (let [run?_ (atom false)] (fn [] (when (compare-and-set! run?_ false true) (done))))
97+
timer (js/setTimeout #(do (f :timeout) (done)) timeout-msecs)]
98+
(-> p
99+
(.then (fn [v] (js/clearTimeout timer) (f v)))
100+
(.catch (fn [e] (js/clearTimeout timer) (f e)))
101+
(.finally (fn [ ] (js/clearTimeout timer) (done))))))))
102+
103+
(do
104+
(deftest test-packer-edn (test-promise (roundtrip td1 (ep/get-packer)) #(is (= td1 %))))
105+
(deftest test-packer-transit (test-promise (roundtrip td1 (tp/get-packer)) #(is (= td1 %))))
106+
(deftest test-packer-msgpack (test-promise (roundtrip td1 (mp/get-packer)) #(is (= td1 %))))
107+
(deftest test-packer-gz+edn (test-promise (roundtrip td1 (gz/wrap-packer (ep/get-packer) {:binary? false})) #(is (= td1 %))))
108+
(deftest test-packer-gz+transit (test-promise (roundtrip td1 (gz/wrap-packer (tp/get-packer) {:binary? false})) #(is (= td1 %))))
109+
(deftest test-packer-gz+msgpack (test-promise (roundtrip td1 (gz/wrap-packer (mp/get-packer) {:binary? true})) #(is (= td1 %)))))
110+
111+
;;;; Benching
112+
113+
(defn packed-len [x] #?(:clj (count x), :cljs (if (instance? js/Uint8Array x) (.-length x) (count x))))
114+
115+
(defn bench1
116+
"Returns Clj/s promise."
117+
[data packer]
118+
#?(:clj
119+
(let [size (let [p (promise)] (i/pack packer nil data #(deliver p (packed-len (get % :value)))) (deref p timeout-msecs -1))
120+
t0 (enc/now-nano)]
121+
122+
(dotimes [_ bench-laps] (deref (roundtrip data packer) timeout-msecs :timeout))
123+
(let [msecs (enc/round (/ (- (enc/now-nano) t0) 1e6))]
124+
(deliver (promise) {:size size, :msecs msecs})))
125+
126+
:cljs
127+
(->
128+
(js/Promise. (fn [resolve _] (i/pack packer nil data #(resolve (get % :value)))))
129+
(.then
130+
(fn [packed]
131+
(let [size (packed-len packed)
132+
t0 (enc/now-nano)
133+
step
134+
(fn step [i]
135+
(if (< i bench-laps)
136+
(-> (roundtrip data packer) (.then (fn [_] (step (inc i)))))
137+
(let [msecs (enc/round (/ (- (enc/now-nano) t0) 1e6))]
138+
(js/Promise.resolve {:size size, :msecs msecs}))))]
139+
(step 0)))))))
140+
141+
(def benched_ (atom {}))
142+
(defn >benched [id] (fn [val] (swap! benched_ assoc id val)))
143+
144+
(test/use-fixtures :once
145+
(taoensso.encore/test-fixtures
146+
{:after
147+
(fn []
148+
(println "Benchmarks:")
149+
(doseq [[id results] (sort @benched_)]
150+
(println (str id ": " results))))}))
151+
152+
(do
153+
(deftest bench-packer-edn (test-promise (bench1 td1 (ep/get-packer)) (>benched :edn)))
154+
(deftest bench-packer-transit (test-promise (bench1 td1 (tp/get-packer)) (>benched :transit)))
155+
(deftest bench-packer-msgpack (test-promise (bench1 td1 (mp/get-packer)) (>benched :msg-pack)))
156+
(deftest bench-packer-gz+edn (test-promise (bench1 td1 (gz/wrap-packer (ep/get-packer) {:binary? false})) (>benched :edn+gz)))
157+
(deftest bench-packer-gz+transit (test-promise (bench1 td1 (gz/wrap-packer (tp/get-packer) {:binary? false})) (>benched :transit+gz)))
158+
(deftest bench-packer-gz+msgpack (test-promise (bench1 td1 (gz/wrap-packer (mp/get-packer) {:binary? true})) (>benched :msg-pack+gz))))
89159

90160
;;;;
91161

0 commit comments

Comments
 (0)