|
16 | 16 | brotli = None
|
17 | 17 |
|
18 | 18 |
|
19 |
| -AVAILABLE_DECOMPRESSORS = {"gzip", "deflate"} |
| 19 | +def decompress_deflate(body): |
| 20 | + try: |
| 21 | + return zlib.decompress(body) |
| 22 | + except zlib.error: |
| 23 | + # Assume the response was already decompressed |
| 24 | + return body |
| 25 | + |
| 26 | + |
| 27 | +def decompress_gzip(body): |
| 28 | + # To (de-)compress gzip format, use wbits = zlib.MAX_WBITS | 16. |
| 29 | + try: |
| 30 | + return zlib.decompress(body, zlib.MAX_WBITS | 16) |
| 31 | + except zlib.error: |
| 32 | + # Assume the response was already decompressed |
| 33 | + return body |
| 34 | + |
| 35 | + |
| 36 | +AVAILABLE_DECOMPRESSORS = { |
| 37 | + "deflate": decompress_deflate, |
| 38 | + "gzip": decompress_gzip, |
| 39 | +} |
| 40 | + |
20 | 41 | if brotli is not None:
|
21 |
| - AVAILABLE_DECOMPRESSORS.add("br") |
| 42 | + |
| 43 | + def decompress_brotli(body): |
| 44 | + try: |
| 45 | + return brotli.decompress(body) |
| 46 | + except brotli.error: |
| 47 | + # Assume the response was already decompressed |
| 48 | + return body |
| 49 | + |
| 50 | + AVAILABLE_DECOMPRESSORS["br"] = decompress_brotli |
22 | 51 |
|
23 | 52 |
|
24 | 53 | def replace_headers(request, replacements):
|
@@ -157,45 +186,23 @@ def decode_response(response):
|
157 | 186 | 3. update content-length header to decompressed length
|
158 | 187 | """
|
159 | 188 |
|
160 |
| - def is_decompressable(headers): |
161 |
| - encoding = headers.get("content-encoding", []) |
162 |
| - return encoding and encoding[0] in AVAILABLE_DECOMPRESSORS |
163 |
| - |
164 |
| - def decompress_body(body, encoding): |
165 |
| - """Returns decompressed body according to encoding using zlib. |
166 |
| - to (de-)compress gzip format, use wbits = zlib.MAX_WBITS | 16 |
167 |
| - """ |
168 |
| - if not body: |
169 |
| - return "" |
170 |
| - if encoding == "gzip": |
171 |
| - try: |
172 |
| - return zlib.decompress(body, zlib.MAX_WBITS | 16) |
173 |
| - except zlib.error: |
174 |
| - return body # assumes that the data was already decompressed |
175 |
| - elif encoding == 'deflate': |
176 |
| - try: |
177 |
| - return zlib.decompress(body) |
178 |
| - except zlib.error: |
179 |
| - return body # assumes that the data was already decompressed |
180 |
| - else: # encoding == 'br' |
181 |
| - try: |
182 |
| - return brotli.decompress(body) |
183 |
| - except brotli.error: |
184 |
| - return body # assumes that the data was already decompressed |
185 |
| - |
186 |
| - |
187 | 189 | # Deepcopy here in case `headers` contain objects that could
|
188 | 190 | # be mutated by a shallow copy and corrupt the real response.
|
189 | 191 | response = copy.deepcopy(response)
|
190 | 192 | headers = CaseInsensitiveDict(response["headers"])
|
191 |
| - if is_decompressable(headers): |
192 |
| - encoding = headers["content-encoding"][0] |
193 |
| - headers["content-encoding"].remove(encoding) |
194 |
| - if not headers["content-encoding"]: |
195 |
| - del headers["content-encoding"] |
196 |
| - |
197 |
| - new_body = decompress_body(response["body"]["string"], encoding) |
198 |
| - response["body"]["string"] = new_body |
199 |
| - headers["content-length"] = [str(len(new_body))] |
200 |
| - response["headers"] = dict(headers) |
| 193 | + content_encoding = headers.get("content-encoding") |
| 194 | + if not content_encoding: |
| 195 | + return response |
| 196 | + decompressor = AVAILABLE_DECOMPRESSORS.get(content_encoding[0]) |
| 197 | + if not decompressor: |
| 198 | + return response |
| 199 | + |
| 200 | + headers["content-encoding"].remove(content_encoding[0]) |
| 201 | + if not headers["content-encoding"]: |
| 202 | + del headers["content-encoding"] |
| 203 | + |
| 204 | + new_body = decompressor(response["body"]["string"]) |
| 205 | + response["body"]["string"] = new_body |
| 206 | + headers["content-length"] = [str(len(new_body))] |
| 207 | + response["headers"] = dict(headers) |
201 | 208 | return response
|
0 commit comments