-
-
Notifications
You must be signed in to change notification settings - Fork 651
Decompression Interceptor #4317
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice job!
I'd just suggest to implement the decompressor in accordance to RFC-9110, in that way decompressor can work with different combination in according to standards.
I believe the only missing part might be the support for multiple encodings on a response
Co-authored-by: Carlos Fuentes <[email protected]>
@metcoder95 Thanks! All the above have been implemented along with documentation. Let me know if anything else is required |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice job! I left few commenets.
Do not forget about documentation and TS types
lib/interceptor/decompress.js
Outdated
if (this.#decompressors.length > 0) { | ||
const writeResult = this.#decompressors[0].write(chunk) | ||
if (writeResult === false) { | ||
console.log('pause') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
console.log('pause') |
const { pipeline } = require('node:stream') | ||
const DecoratorHandler = require('../handler/decorator-handler') | ||
|
||
class DecompressHandler extends DecoratorHandler { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The whole is looking great, you aligned the the backpressure from each chunk processing of the handler when handling the decompressed response, as well as to align the decompressor stream state with the chunk processing one, tho there's a step still missed.
Within the handler there's a controller that also states the state of the chunk processing with the handler, if the controller get's called with pause
, or resume
the decompressing stream should react to that. See:
undici/lib/handler/unwrap-handler.js
Lines 20 to 29 in a36e299
pause () { | |
this.#paused = true | |
} | |
resume () { | |
if (this.#paused) { | |
this.#paused = false | |
this[kResume]?.() | |
} | |
} |
If we can add testing around it will be amazing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right on! What're your thoughts on something like this in the decompressor's onResponseStart
?
const superPause = controller.pause.bind(controller)
const superResume = controller.resume.bind(controller)
controller.pause = () => {
const result = superPause()
if (this.#decompressors.length > 0) {
this.#decompressors.forEach(d => {
if (!d.readableEnded && !d.destroyed) {
d.pause()
}
})
}
return result
}
controller.resume = () => {
const result = superResume()
if (this.#decompressors.length > 0) {
this.#decompressors.forEach(d => {
if (!d.readableEnded && !d.destroyed) {
d.resume()
}
})
}
return result
}
Happy to add additional tests, however I'm unsure how to simulate pauses (and backpressure) in the handlers without exposing internals. Open to suggestions!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, although you can just pause the head of the compressors, and the rest will stop processing any more chunks as they will be waiting for the head to continue pass more chunks.
Additionally you can also pause the resulting stream of pipeline
if available when the controller gets called (especially as the other streams will continue processing their buffered data).
For the tests, it should be enough by calling client.dispatch
and set a custom request handler. As you receive the controller during onRequsestStart
, you should have the chance to pause and resume as needed.
controller.resume() | ||
}) | ||
|
||
pipeline(this.#decompressors, (err) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pipeline returns a stream
which you can subscribe for the chunks of data
const { pipeline } = require('node:stream') | ||
const DecoratorHandler = require('../handler/decorator-handler') | ||
|
||
class DecompressHandler extends DecoratorHandler { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, although you can just pause the head of the compressors, and the rest will stop processing any more chunks as they will be waiting for the head to continue pass more chunks.
Additionally you can also pause the resulting stream of pipeline
if available when the controller gets called (especially as the other streams will continue processing their buffered data).
For the tests, it should be enough by calling client.dispatch
and set a custom request handler. As you receive the controller during onRequsestStart
, you should have the chance to pause and resume as needed.
This relates to...
Address proposal brought up here
and is related to #4316
Rationale
This PR implements a decompression interceptor for Undici, as discussed in the linked proposal and issue. The goal is to provide automatic response decompression for
request()
(matching the behavior offetch()
), reducing boilerplate and improving developer experience when working with compressed HTTP responses.Changes
content-encoding
andcontent-length
headers when decompressingFeatures
Status