|
| 1 | +(ns taoensso.msgpack.impl |
| 2 | + (:require [taoensso.msgpack.interfaces :as i :refer [Packable pack-bytes]]) |
| 3 | + (:import |
| 4 | + [taoensso.msgpack.interfaces PackableExt] |
| 5 | + [java.nio ByteBuffer ByteOrder] |
| 6 | + [java.nio.charset StandardCharsets] |
| 7 | + [java.io |
| 8 | + ByteArrayInputStream ByteArrayOutputStream DataInput DataOutput |
| 9 | + DataInputStream DataOutputStream InputStream OutputStream])) |
| 10 | + |
| 11 | +;;;; Utils |
| 12 | + |
| 13 | +(defmacro with-out [[out] & body] |
| 14 | + `(let [baos# (ByteArrayOutputStream.) |
| 15 | + ~out (DataOutputStream. baos#)] |
| 16 | + ~@body |
| 17 | + (.toByteArray baos#))) |
| 18 | + |
| 19 | +(defmacro with-in [[in ba] & body] |
| 20 | + `(let [bais# (ByteArrayInputStream. ~ba) |
| 21 | + ~in (DataInputStream. bais#)] |
| 22 | + ~@body)) |
| 23 | + |
| 24 | +;;;; Packing |
| 25 | + |
| 26 | +(defn- pack-ba [^bytes ba ^DataOutput out] |
| 27 | + (let [len (count ba)] |
| 28 | + (cond |
| 29 | + (<= len 0xff) (do (.writeByte out 0xc4) (.writeByte out len) (.write out ba)) |
| 30 | + (<= len 0xffff) (do (.writeByte out 0xc5) (.writeShort out len) (.write out ba)) |
| 31 | + (<= len 0xffffffff) (do (.writeByte out 0xc6) (.writeInt out len) (.write out ba))))) |
| 32 | + |
| 33 | +(defn- pack-str [^bytes ba ^DataOutput out] |
| 34 | + (let [len (count ba)] |
| 35 | + (cond |
| 36 | + (<= len 0x1f) (do (.writeByte out (bit-or 2r10100000 len)) (.write out ba)) |
| 37 | + (<= len 0xff) (do (.writeByte out 0xd9) (.writeByte out len) (.write out ba)) |
| 38 | + (<= len 0xffff) (do (.writeByte out 0xda) (.writeShort out len) (.write out ba)) |
| 39 | + (<= len 0xffffffff) (do (.writeByte out 0xdb) (.writeInt out len) (.write out ba))))) |
| 40 | + |
| 41 | +(defn- pack-int [^long n ^DataOutput out] |
| 42 | + (if (neg? n) |
| 43 | + (cond |
| 44 | + (>= n -32) (.writeByte out n) ; -fixnum |
| 45 | + (>= n -0x80) (do (.writeByte out 0xd0) (.writeByte out n)) ; int8 |
| 46 | + (>= n -0x8000) (do (.writeByte out 0xd1) (.writeShort out n)) ; int16 |
| 47 | + (>= n -0x80000000) (do (.writeByte out 0xd2) (.writeInt out n)) ; int32 |
| 48 | + :else (do (.writeByte out 0xd3) (.writeLong out n)) ; int64 |
| 49 | + ) |
| 50 | + |
| 51 | + (cond |
| 52 | + (<= n 127) (.writeByte out n) ; +fixnum |
| 53 | + (<= n 0xff) (do (.writeByte out 0xcc) (.writeByte out n)) ; uint8 |
| 54 | + (<= n 0xffff) (do (.writeByte out 0xcd) (.writeShort out n)) ; uint16 |
| 55 | + (<= n 0xffffffff) (do (.writeByte out 0xce) (.writeInt out (unchecked-int n))) ; uint32 |
| 56 | + :else (do (.writeByte out 0xcf) (.writeLong out (unchecked-long n))) ; uint64 |
| 57 | + ))) |
| 58 | + |
| 59 | +(defn- pack-coll [c ^DataOutput out] (reduce (fn [_ el] (pack-bytes el out)) nil c)) |
| 60 | +(defn- pack-kvs [m ^DataOutput out] (reduce-kv (fn [_ k v] (pack-bytes k out) (pack-bytes v out)) nil m)) |
| 61 | +(defn- pack-seq [s ^DataOutput out] |
| 62 | + (let [len (count s)] |
| 63 | + (cond |
| 64 | + (<= len 0xf) (do (.writeByte out (bit-or 2r10010000 len)) (pack-coll s out)) |
| 65 | + (<= len 0xffff) (do (.writeByte out 0xdc) (.writeShort out len) (pack-coll s out)) |
| 66 | + (<= len 0xffffffff) (do (.writeByte out 0xdd) (.writeInt out len) (pack-coll s out))))) |
| 67 | + |
| 68 | +(defn- pack-map [m ^DataOutput out] |
| 69 | + (let [len (count m)] |
| 70 | + (cond |
| 71 | + (<= len 0xf) (do (.writeByte out (bit-or 2r10000000 len)) (pack-kvs m out)) |
| 72 | + (<= len 0xffff) (do (.writeByte out 0xde) (.writeShort out len) (pack-kvs m out)) |
| 73 | + (<= len 0xffffffff) (do (.writeByte out 0xdf) (.writeInt out len) (pack-kvs m out))))) |
| 74 | + |
| 75 | +(extend-protocol Packable |
| 76 | + nil (pack-bytes [_ ^DataOutput out] (.writeByte out 0xc0)) |
| 77 | + java.lang.Boolean (pack-bytes [b ^DataOutput out] (if b (.writeByte out 0xc3) (.writeByte out 0xc2))) |
| 78 | + java.lang.String (pack-bytes [s ^DataOutput out] (pack-str (.getBytes ^String s StandardCharsets/UTF_8) out)) |
| 79 | + ;; |
| 80 | + java.lang.Byte (pack-bytes [n ^DataOutput out] (pack-int n out)) |
| 81 | + java.lang.Short (pack-bytes [n ^DataOutput out] (pack-int n out)) |
| 82 | + java.lang.Integer (pack-bytes [n ^DataOutput out] (pack-int n out)) |
| 83 | + java.lang.Long (pack-bytes [n ^DataOutput out] (pack-int n out)) |
| 84 | + java.math.BigInteger (pack-bytes [n ^DataOutput out] (pack-int (.longValueExact n) out)) |
| 85 | + clojure.lang.BigInt (pack-bytes [n ^DataOutput out] (pack-int (.longValueExact (.toBigInteger n)) out)) |
| 86 | + ;; |
| 87 | + java.lang.Float (pack-bytes [f ^DataOutput out] (do (.writeByte out 0xca) (.writeFloat out f))) |
| 88 | + java.lang.Double (pack-bytes [d ^DataOutput out] (do (.writeByte out 0xcb) (.writeDouble out d))) |
| 89 | + java.math.BigDecimal (pack-bytes [d ^DataOutput out] (pack-bytes (.doubleValue d) out)) |
| 90 | + clojure.lang.Sequential (pack-bytes [s ^DataOutput out] (pack-seq s out)) |
| 91 | + clojure.lang.IPersistentMap (pack-bytes [m ^DataOutput out] (pack-map m out)) |
| 92 | + ;; |
| 93 | + PackableExt |
| 94 | + (pack-bytes [x ^DataOutput out] |
| 95 | + (let [byte-id (.-byte-id x) |
| 96 | + ^bytes ba (.-ba-content x) |
| 97 | + len (alength ba)] |
| 98 | + |
| 99 | + (case len |
| 100 | + 1 (.writeByte out 0xd4) |
| 101 | + 2 (.writeByte out 0xd5) |
| 102 | + 4 (.writeByte out 0xd6) |
| 103 | + 8 (.writeByte out 0xd7) |
| 104 | + 16 (.writeByte out 0xd8) |
| 105 | + (cond |
| 106 | + (<= len 0xff) (do (.writeByte out 0xc7) (.writeByte out len)) |
| 107 | + (<= len 0xffff) (do (.writeByte out 0xc8) (.writeShort out len)) |
| 108 | + (<= len 0xffffffff) (do (.writeByte out 0xc9) (.writeInt out len)))) |
| 109 | + |
| 110 | + (.writeByte out byte-id) |
| 111 | + (.write out ba))) |
| 112 | + |
| 113 | + Object |
| 114 | + (pack-bytes [x ^DataOutput out] |
| 115 | + (pack-bytes |
| 116 | + {:msgpack/unpackable |
| 117 | + {:type (type x) |
| 118 | + :preview |
| 119 | + (try |
| 120 | + (let [out (pr-str x)] (subs out 0 (min 16 (count out)))) |
| 121 | + (catch Throwable _ "<unprintable>"))}} |
| 122 | + out))) |
| 123 | + |
| 124 | +;; Separate for CLJ-1381 |
| 125 | +(extend-protocol Packable (class (into-array Byte [])) (pack-bytes [a ^DataOutput out] (pack-bytes (byte-array a) out))) |
| 126 | +(extend-protocol Packable (class (byte-array 0)) (pack-bytes [ba ^DataOutput out] (pack-ba ba out))) |
| 127 | + |
| 128 | +;;;; Unpacking |
| 129 | + |
| 130 | +(defn- read-u8 [^DataInput in] (.readUnsignedByte in)) |
| 131 | +(defn- read-u16 [^DataInput in] (.readUnsignedShort in)) |
| 132 | +(defn- read-u32 [^DataInput in] (bit-and 0xffffffff (.readInt in))) |
| 133 | +(defn- read-u64 [^DataInput in] (let [n (.readLong in)] (if (neg? n) (.and (BigInteger/valueOf n) (biginteger 0xffffffffffffffffN)) n))) |
| 134 | + |
| 135 | +(defn- read-bytes ^bytes [n ^DataInput in] (let [ba (byte-array n)] (.readFully in ba) ba)) |
| 136 | +(defn- read-str [n ^DataInput in] (let [ba (read-bytes n in)] (String. ba StandardCharsets/UTF_8))) |
| 137 | + |
| 138 | +(declare unpack-1) |
| 139 | +(defn- unpack-n [init n ^DataInput in] (persistent! (reduce (fn [acc _] (conj! acc (unpack-1 in))) (transient init) (range n)))) |
| 140 | +(defn- unpack-map [ n ^DataInput in] (persistent! (reduce (fn [acc _] (assoc! acc (unpack-1 in) (unpack-1 in))) (transient {}) (range n)))) |
| 141 | +(defn- unpack-1 [ ^DataInput in] |
| 142 | + (let [byte-id (.readUnsignedByte in)] |
| 143 | + (case byte-id |
| 144 | + 0xc0 nil |
| 145 | + 0xc2 false |
| 146 | + 0xc3 true |
| 147 | + |
| 148 | + ;; Ints |
| 149 | + 0xcc (read-u8 in) |
| 150 | + 0xcd (read-u16 in) |
| 151 | + 0xce (read-u32 in) |
| 152 | + 0xcf (read-u64 in) |
| 153 | + 0xd0 (.readByte in) |
| 154 | + 0xd1 (.readShort in) |
| 155 | + 0xd2 (.readInt in) |
| 156 | + 0xd3 (.readLong in) |
| 157 | + |
| 158 | + ;; Floats |
| 159 | + 0xca (.readFloat in) |
| 160 | + 0xcb (.readDouble in) |
| 161 | + |
| 162 | + ;; Strings |
| 163 | + 0xd9 (read-str (read-u8 in) in) |
| 164 | + 0xda (read-str (read-u16 in) in) |
| 165 | + 0xdb (read-str (read-u32 in) in) |
| 166 | + |
| 167 | + ;; Byte arrays |
| 168 | + 0xc4 (read-bytes (read-u8 in) in) |
| 169 | + 0xc5 (read-bytes (read-u16 in) in) |
| 170 | + 0xc6 (read-bytes (read-u32 in) in) |
| 171 | + |
| 172 | + ;; Seqs |
| 173 | + 0xdc (unpack-n [] (read-u16 in) in) |
| 174 | + 0xdd (unpack-n [] (read-u32 in) in) |
| 175 | + |
| 176 | + ;; Maps |
| 177 | + 0xde (unpack-map (read-u16 in) in) |
| 178 | + 0xdf (unpack-map (read-u32 in) in) |
| 179 | + |
| 180 | + ;; Extensions |
| 181 | + 0xd4 (i/unpack-ext (.readByte in) (read-bytes 1 in)) |
| 182 | + 0xd5 (i/unpack-ext (.readByte in) (read-bytes 2 in)) |
| 183 | + 0xd6 (i/unpack-ext (.readByte in) (read-bytes 4 in)) |
| 184 | + 0xd7 (i/unpack-ext (.readByte in) (read-bytes 8 in)) |
| 185 | + 0xd8 (i/unpack-ext (.readByte in) (read-bytes 16 in)) |
| 186 | + 0xc7 (let [n (read-u8 in)] (i/unpack-ext (.readByte in) (read-bytes n in))) |
| 187 | + 0xc8 (let [n (read-u16 in)] (i/unpack-ext (.readByte in) (read-bytes n in))) |
| 188 | + 0xc9 (let [n (read-u32 in)] (i/unpack-ext (.readByte in) (read-bytes n in))) |
| 189 | + |
| 190 | + ;; Fix types |
| 191 | + (cond |
| 192 | + (== (bit-and byte-id 2r10000000) 0) (unchecked-byte byte-id) ; +fixnum |
| 193 | + (== (bit-and byte-id 2r11100000) 2r11100000) (unchecked-byte byte-id) ; -fixnum |
| 194 | + (== (bit-and byte-id 2r11100000) 2r10100000) (let [n (bit-and 2r11111 byte-id)] (read-str n in)) ; String |
| 195 | + (== (bit-and byte-id 2r11110000) 2r10010000) (unpack-n [] (bit-and 2r1111 byte-id) in) ; Seq |
| 196 | + (== (bit-and byte-id 2r11110000) 2r10000000) (unpack-map (bit-and 2r1111 byte-id) in) ; Map |
| 197 | + :else (throw (ex-info "Unpack failed: unexpected `byte-id`" {:byte-id byte-id})))))) |
| 198 | + |
| 199 | +(defn pack |
| 200 | + (^bytes [clj] (let [baos (ByteArrayOutputStream.)] (pack clj baos) (.toByteArray baos))) |
| 201 | + ([clj out] |
| 202 | + (cond |
| 203 | + (instance? DataOutput out) (pack-bytes clj out) |
| 204 | + (instance? OutputStream out) (pack-bytes clj (DataOutputStream. ^OutStream out)) |
| 205 | + :else |
| 206 | + (throw |
| 207 | + (ex-info "Pack failed: unexpected `out` type" |
| 208 | + {:given {:value out, :type (type out)} |
| 209 | + :expected '#{DataOutput OutputStream}}))))) |
| 210 | + |
| 211 | +(defn unpack [packed] |
| 212 | + (cond |
| 213 | + (bytes? packed) (unpack-1 (DataInputStream. (ByteArrayInputStream. packed))) |
| 214 | + (instance? DataInput packed) (unpack-1 packed) |
| 215 | + (instance? InputStream packed) (unpack-1 (DataInputStream. packed)) |
| 216 | + (seq? packed) (unpack-1 (DataInputStream. (ByteArrayInputStream. (byte-array packed)))) |
| 217 | + :else |
| 218 | + (throw |
| 219 | + (ex-info "Unpack failed: unexpected `packed` type" |
| 220 | + {:given {:value packed, :type (type packed)} |
| 221 | + :expected '#{bytes DataInput InputStream}})))) |
| 222 | + |
| 223 | +;;;; Built-in extensions |
| 224 | + |
| 225 | +(i/extend-packable 0 clojure.lang.Keyword |
| 226 | + (pack [k] (pack (subs (str k) 1))) |
| 227 | + (unpack [ba] (keyword (unpack ba)))) |
| 228 | + |
| 229 | +(i/extend-packable 1 clojure.lang.Symbol |
| 230 | + (pack [s] (pack (str s))) |
| 231 | + (unpack [ba] (symbol (unpack ba)))) |
| 232 | + |
| 233 | +(i/extend-packable 2 java.lang.Character |
| 234 | + (pack [c] (pack (str c))) |
| 235 | + (unpack [ba] (aget (char-array (unpack ba)) 0))) |
| 236 | + |
| 237 | +(i/extend-packable 3 clojure.lang.Ratio |
| 238 | + (pack [r] (pack [(numerator r) (denominator r)])) |
| 239 | + (unpack [ba] (let [[n d] (unpack ba)] (/ n d)))) |
| 240 | + |
| 241 | +(i/extend-packable 4 clojure.lang.IPersistentSet |
| 242 | + (pack [s] (with-out [out] (pack-seq s out))) |
| 243 | + (unpack [ba] |
| 244 | + (with-in [in ba] |
| 245 | + (let [byte-id (.readUnsignedByte in)] |
| 246 | + (case byte-id |
| 247 | + 0xdc (unpack-n #{} (read-u16 in) in) |
| 248 | + 0xdd (unpack-n #{} (read-u32 in) in) |
| 249 | + (do (unpack-n #{} (bit-and 2r1111 byte-id) in))))))) |
| 250 | + |
| 251 | +(i/extend-packable 5 (class (int-array 0)) |
| 252 | + (pack [ar] |
| 253 | + (let [bb (ByteBuffer/allocate (* 4 (count ar)))] |
| 254 | + (.order bb (ByteOrder/nativeOrder)) |
| 255 | + (areduce ^ints ar i _ nil (.putInt bb (aget ^ints ar i))) |
| 256 | + (.array bb))) |
| 257 | + |
| 258 | + (unpack [ba] |
| 259 | + (let [bb (ByteBuffer/wrap ba) |
| 260 | + _ (.order bb (ByteOrder/nativeOrder)) |
| 261 | + int-bb (.asIntBbfer bb) |
| 262 | + int-ar (int-array (.limit int-bb))] |
| 263 | + (.get int-bb int-ar) |
| 264 | + (do int-ar)))) |
| 265 | + |
| 266 | +(i/extend-packable 6 (class (float-array 0)) |
| 267 | + (pack [ar] |
| 268 | + (let [bb (ByteBuffer/allocate (* 4 (count ar)))] |
| 269 | + (.order bb (ByteOrder/nativeOrder)) |
| 270 | + (areduce ^floats ar idx _ nil (.putFloat bb (aget ^floats ar idx))) |
| 271 | + (.array bb))) |
| 272 | + |
| 273 | + (unpack [ba] |
| 274 | + (let [bb (ByteBuffer/wrap ba) |
| 275 | + _ (.order bb (ByteOrder/nativeOrder)) |
| 276 | + float-bb (.asFloatBbfer bb) |
| 277 | + float-ar (float-array (.limit float-bb))] |
| 278 | + (.get float-bb float-ar) |
| 279 | + (do float-ar)))) |
| 280 | + |
| 281 | +(i/extend-packable 7 (class (double-array 0)) |
| 282 | + (pack [ar] |
| 283 | + (let [bb (ByteBuffer/allocate (* 8 (count ar)))] |
| 284 | + (.order bb (ByteOrder/nativeOrder)) |
| 285 | + (areduce ^doubles ar idx _ nil (.putDouble bb (aget ^doubles ar idx))) |
| 286 | + (.array bb))) |
| 287 | + |
| 288 | + (unpack [ba] |
| 289 | + (let [bb (ByteBuffer/wrap ba) |
| 290 | + _ (.order bb (ByteOrder/nativeOrder)) |
| 291 | + double-bb (.asDoubleBbfer bb) |
| 292 | + double-ar (double-array (.limit double-bb))] |
| 293 | + (.get double-bb double-ar) |
| 294 | + (do double-ar)))) |
| 295 | + |
| 296 | +(defn- instant->ba [^java.time.Instant i] |
| 297 | + (let [bb (ByteBuffer/allocate 12)] |
| 298 | + (.putInt bb (.getNano i)) |
| 299 | + (.putLong bb (.getEpochSecond i)) |
| 300 | + (.array bb))) |
| 301 | + |
| 302 | +(i/extend-packable -1 java.util.Date (pack [d] (instant->ba (.toInstant d)))) |
| 303 | +(i/extend-packable -1 java.time.Instant |
| 304 | + (pack [i] (instant->ba i)) |
| 305 | + (unpack [ba] |
| 306 | + (let [bb (ByteBuffer/wrap ba) |
| 307 | + nanos (.getInt bb) |
| 308 | + secs (.getLong bb)] |
| 309 | + (java.time.Instant/ofEpochSecond secs nanos)))) |
| 310 | + |
| 311 | +(comment |
| 312 | + (require '[taoensso.encore :as enc]) |
| 313 | + (let [x [nil {:a :A :b :B :c "foo", :v (vec (range 128)), :s (set (range 128))}]] |
| 314 | + (enc/qb 1e4 ; [984.51 162.51] |
| 315 | + (enc/read-edn (enc/pr-edn x)) |
| 316 | + (unpack (pack x))))) |
0 commit comments