Skip to content

Commit 437d1ee

Browse files
committed
[new] [#477] Add gzip-wrapping packer
1 parent 5500995 commit 437d1ee

File tree

7 files changed

+324
-75
lines changed

7 files changed

+324
-75
lines changed

example-project/src/example/dynamic_packer.cljc

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
11
(ns example.dynamic-packer
2-
"A dynamic Sente packer that can dynamically switch
3-
between #{:edn/txt :edn/bin :transit :msgpack} modes.
4-
5-
Handy for testing, you wouldn't normally need/want
6-
something like this in production!"
2+
"A dynamic Sente packer that can dynamically switch between a variety
3+
of underlying packers. Handy for testing, though you wouldn't normally
4+
need/want something like this in production!"
75
(:require
86
[taoensso.encore :as encore]
97
[taoensso.sente :as sente]
108
[taoensso.sente.interfaces :as i]
119
[taoensso.sente.packers.transit]
12-
[taoensso.sente.packers.msgpack]))
10+
[taoensso.sente.packers.msgpack]
11+
[taoensso.sente.packers.gzip]))
1312

1413
(defonce mode_ (atom :edn/txt))
1514

@@ -31,16 +30,18 @@
3130
(pack [_ ws? clj cb-fn] (cb-fn {:value (str->bytes (encore/pr-edn clj))}))
3231
(unpack [_ ws? packed cb-fn] (cb-fn {:value (encore/read-edn (bytes->str packed))})))
3332

34-
tp (taoensso.sente.packers.transit/get-packer)
35-
mp (taoensso.sente.packers.msgpack/get-packer)
33+
tp (taoensso.sente.packers.transit/get-packer)
34+
mp (taoensso.sente.packers.msgpack/get-packer)
35+
mp+gz (taoensso.sente.packers.gzip/wrap-packer mp {:binary? true})
3636

3737
get-packer
3838
(fn []
3939
(case @mode_
40-
:edn/txt ep
41-
:edn/bin bp
42-
:transit tp
43-
:msgpack mp))]
40+
:edn/txt ep
41+
:edn/bin bp
42+
:transit tp
43+
:msgpack mp
44+
:msgpack+gz mp+gz))]
4445

4546
(reify i/IPacker2
4647
(pack [_ ws? clj cb-fn] (i/pack (get-packer) ws? clj cb-fn))

example-project/src/example/server.clj

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -281,10 +281,11 @@
281281
(let [old-mode @example.dynamic-packer/mode_
282282
new-mode
283283
(case old-mode
284-
:edn/txt :edn/bin
285-
:edn/bin :transit
286-
:transit :msgpack
287-
:msgpack :edn/txt)]
284+
:edn/txt :edn/bin
285+
:edn/bin :transit
286+
:transit :msgpack
287+
:msgpack :msgpack+gz
288+
:msgpack+gz :edn/txt)]
288289

289290
(tel/log! (str "Changing packer mode: " old-mode " -> " new-mode))
290291
(?reply-fn [old-mode new-mode])
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
(ns taoensso.sente.packers.gzip
2+
{:author "Peter Taoussanis (@ptaoussanis)"}
3+
(:require
4+
[taoensso.truss :as truss]
5+
[taoensso.sente.packers.impl.gzip :as impl]
6+
[taoensso.sente.interfaces :as i]))
7+
8+
#?(:clj (defn- ba->str ^String [^bytes ba] (String. ba java.nio.charset.StandardCharsets/UTF_8)))
9+
#?(:clj (defn- str->ba ^bytes [^String s] (.getBytes s java.nio.charset.StandardCharsets/UTF_8)))
10+
11+
#?(:cljs (let [encoder (js/TextEncoder.)] (defn- str->u8s [^string s] (.encode encoder s))))
12+
#?(:cljs (let [decoder (js/TextDecoder. "utf-8")] (defn- u8s->str [^js u8s] (.decode decoder u8s))))
13+
14+
(defn wrap-packer
15+
"Experimental, please test carefully and report any issues!
16+
17+
Returns Sente packer that wraps another with gzip compression.
18+
Needs `js/CompressionStream` browser support.
19+
20+
If `packer` takes+returns platform bytes: `binary?` should be true.
21+
If `packer` takes+returns platform strings: `binary?` should be false."
22+
23+
[packer {:keys [binary?]}]
24+
#?(:clj
25+
(reify i/IPacker2
26+
(unpack [_ ws? ba cb]
27+
(if binary?
28+
(i/unpack packer ws? (impl/gunzip ba) cb)
29+
(i/unpack packer ws? (ba->str (impl/gunzip ba)) cb)))
30+
31+
(pack [_ ws? x cb]
32+
(i/pack packer ws? x
33+
(fn [{:keys [value error]}]
34+
(if error
35+
(cb {:error error})
36+
(cb {:value (impl/gzip (if binary? value (str->ba value)))}))))))
37+
38+
:cljs
39+
(do
40+
(when-not (exists? js/CompressionStream) (truss/ex-info! "CompressionStream not supported"))
41+
(when-not (exists? js/DecompressionStream) (truss/ex-info! "DecompressionStream not supported"))
42+
(reify i/IPacker2
43+
(unpack [_ ws? packed cb]
44+
(->
45+
(impl/gunzip (impl/as-u8s packed)) ; Decompress -> u8s promise
46+
(.then
47+
(fn [u8s]
48+
(if binary?
49+
(i/unpack packer ws? u8s cb)
50+
(i/unpack packer ws? (u8s->str u8s) cb))))
51+
(.catch (fn [err] (cb {:error err})))))
52+
53+
(pack [_ ws? cljs-val cb]
54+
(i/pack packer ws? cljs-val
55+
(fn [{:keys [error value]}] ; Serialize -> text/bin value
56+
(if error
57+
(cb {:error error})
58+
(-> (impl/gzip (if binary? (impl/as-u8s value) (str->u8s value))) ; Compress -> u8s promise
59+
(.then (fn [u8s] (cb {:value u8s})))
60+
(.catch (fn [err] (cb {:error err}))))))))))))
61+
62+
(comment
63+
(let [p (wrap-packer (taoensso.sente.packers.edn/get-packer) {:binary? false})]
64+
(i/pack p :ws [:chsk/ws-ping "foo"]
65+
(fn [{packed :value}]
66+
(i/unpack p :ws packed
67+
(fn [{clj :value}] clj))))))
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
(ns taoensso.sente.packers.impl.gzip)
2+
3+
(def ^:private ^:const prefix-raw (byte 0x00))
4+
(def ^:private ^:const prefix-gzip (byte 0x01))
5+
(def ^:private ^:const min-bytes 1024)
6+
7+
(defn- prefix
8+
"Returns given byte[] with added flag prefix."
9+
^bytes [gzip? ^bytes ba]
10+
(let [len (alength ba)
11+
out (byte-array (inc len))]
12+
(aset-byte out 0 (if gzip? prefix-gzip prefix-raw))
13+
(System/arraycopy ba 0 out 1 len)
14+
(do out)))
15+
16+
(defn gzip
17+
"Uncompressed byte[] -> compressed byte[]."
18+
^bytes [^bytes ba]
19+
(let [len (alength ba)]
20+
(if (< len min-bytes)
21+
(prefix false ba)
22+
(with-open [baos (java.io.ByteArrayOutputStream.)]
23+
(.write baos (int prefix-gzip))
24+
(with-open [gos (java.util.zip.GZIPOutputStream. baos)]
25+
(.write gos ba 0 len))
26+
(.toByteArray baos)))))
27+
28+
(defn gunzip
29+
"Compressed byte[] -> uncompressed byte[]."
30+
^bytes [^bytes ba]
31+
(let [len (alength ba)
32+
flag (bit-and 0xFF (aget ba 0))]
33+
(if-not (== flag prefix-gzip)
34+
(java.util.Arrays/copyOfRange ba 1 len)
35+
(with-open [bais (java.io.ByteArrayInputStream. ba 1 (dec len))
36+
gis (java.util.zip.GZIPInputStream. bais)
37+
baos (java.io.ByteArrayOutputStream.)]
38+
(.transferTo gis baos)
39+
(.toByteArray baos)))))
40+
41+
(comment
42+
(defn bn [len] (byte-array (take len (cycle (range 128)))))
43+
44+
(let [ba-sm (bn 128)
45+
ba-lg (bn 2048)]
46+
(taoensso.encore/qb 1e4 ; [3.6 280.32]
47+
(gunzip (gzip ba-sm))
48+
(gunzip (gzip ba-lg))))
49+
50+
(take 16 (gunzip (gzip (bn 128))))
51+
(take 16 (gunzip (gzip (bn 2048)))))
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
(ns taoensso.sente.packers.impl.gzip)
2+
3+
(def ^:private ^:const prefix-raw 0x00)
4+
(def ^:private ^:const prefix-gzip 0x01)
5+
(def ^:private ^:const min-bytes 1024)
6+
7+
(defn- type-name [x]
8+
(let [ctor (type x)]
9+
(or
10+
(.-name ctor) (.-displayName ctor) (goog/typeOf x)
11+
(try (pr-str ctor) (catch :default _ nil))
12+
"<unknown>")))
13+
14+
(defn- as-u8s
15+
"#{Uint8Array ArrayBuffer DataView} -> Uint8Array"
16+
^js [input]
17+
(cond
18+
(instance? js/Uint8Array input) input
19+
(instance? js/ArrayBuffer input) (js/Uint8Array. input)
20+
(instance? js/DataView input) (js/Uint8Array. (.-buffer input)
21+
(.-byteOffset input)
22+
(.-byteLength input))
23+
:else
24+
(throw
25+
(ex-info "Unexpected input type"
26+
{:type (type-name input),
27+
:expected '#{Uint8Array ArrayBuffer DataView}}))))
28+
29+
(defn- prefix
30+
"Returns given Uint8Array with added flag prefix."
31+
[gzip? ^js u8s]
32+
(let [out (js/Uint8Array. (inc (.-length u8s)))]
33+
(aset out 0 (if gzip? prefix-gzip prefix-raw))
34+
(.set out u8s 1)
35+
(do out)))
36+
37+
(defn- stream->u8s [readable] (-> (js/Response. readable) (.arrayBuffer) (.then #(js/Uint8Array. %))))
38+
39+
(defn gzip
40+
"Uncompressed Uint8Array -> compressed Uint8Array Promise."
41+
[^js u8s]
42+
(if (< (.-length u8s) min-bytes)
43+
(js/Promise.resolve (prefix false u8s))
44+
(let [cs (js/CompressionStream. "gzip")
45+
writer (.getWriter (.-writable cs))]
46+
(->
47+
(.write writer u8s)
48+
(.then (fn [] (.close writer) (stream->u8s (.-readable cs))))
49+
(.then (fn [u8s] (prefix true u8s)))))))
50+
51+
(defn gunzip
52+
"Compressed Uint8Array -> uncompressed Uint8Array Promise."
53+
[^js u8s]
54+
(let [flag (aget u8s 0)
55+
body (.subarray u8s 1)]
56+
(if-not (== flag prefix-gzip)
57+
(js/Promise.resolve body)
58+
(let [ds (js/DecompressionStream. "gzip")
59+
writer (.getWriter (.-writable ds))]
60+
(->
61+
(.write writer body)
62+
(.then (fn [] (.close writer) (stream->u8s (.-readable ds)))))))))

src/taoensso/sente/packers/msgpack.cljc

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,17 @@
66
[taoensso.sente.interfaces :as i]))
77

88
(defn get-packer
9-
"Returns Sente packer that uses the binary MessagePack
9+
"Experimental, please test carefully and report any issues!
10+
11+
Returns Sente packer that uses the binary MessagePack
1012
format, Ref. <https://msgpack.org/index.html>.
1113
1214
Clj/s MessagePack implementation adapted from
13-
<https://github.com/rosejn/msgpack-cljc>.
14-
15-
Experimental, please test carefully and report any issues!"
15+
<https://github.com/rosejn/msgpack-cljc>."
1616
[]
1717
(reify i/IPacker2
18-
(pack [_ _ws? x cb-fn] (cb-fn {:value (msgpack/pack x)}))
19-
(unpack [_ _ws? ba cb-fn] (cb-fn {:value (msgpack/unpack ba)}))))
18+
(pack [_ _ws? x cb] (cb {:value (msgpack/pack x)}))
19+
(unpack [_ _ws? in cb] (cb {:value (msgpack/unpack in)}))))
2020

2121
(comment
2222
(let [p (get-packer)]

0 commit comments

Comments
 (0)