Skip to content

Commit 74b0a44

Browse files
committed
fix p0
1 parent 37e6e17 commit 74b0a44

File tree

3 files changed

+11
-4
lines changed

3 files changed

+11
-4
lines changed

be/src/http/action/stream_load.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
429429
if (ctx->is_chunked_transfer) {
430430
pipe = std::make_shared<io::StreamLoadPipe>(
431431
io::kMaxPipeBufferedBytes /* max_buffered_bytes */);
432+
pipe->set_is_chunked_transfer(true);
432433
} else {
433434
pipe = std::make_shared<io::StreamLoadPipe>(
434435
io::kMaxPipeBufferedBytes /* max_buffered_bytes */,

be/src/io/fs/stream_load_pipe.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,11 @@ class StreamLoadPipe : public MessageBodySink, public FileReader {
8585

8686
size_t current_capacity();
8787

88-
int64_t total_length() const { return _total_length; }
88+
bool is_chunked_transfer() const { return _is_chunked_transfer; }
89+
90+
void set_is_chunked_transfer(bool is_chunked_transfer) {
91+
_is_chunked_transfer = is_chunked_transfer;
92+
}
8993

9094
protected:
9195
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
@@ -121,6 +125,10 @@ class StreamLoadPipe : public MessageBodySink, public FileReader {
121125

122126
// no use, only for compatibility with the `Path` interface
123127
Path _path = "";
128+
129+
// When importing JSON data and using chunked transfer encoding,
130+
// the data needs to be completely read before it can be parsed.
131+
bool _is_chunked_transfer = false;
124132
};
125133
} // namespace io
126134
} // namespace doris

be/src/vec/exec/format/json/new_json_reader.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1178,12 +1178,10 @@ Status NewJsonReader::_read_one_message_from_pipe(std::unique_ptr<uint8_t[]>* fi
11781178
size_t* read_size) {
11791179
auto* stream_load_pipe = dynamic_cast<io::StreamLoadPipe*>(_file_reader.get());
11801180

1181-
// With the total length known, the complete data can now be retrieved
1182-
if (stream_load_pipe->total_length() != -1) {
1181+
if (!stream_load_pipe->is_chunked_transfer()) {
11831182
return stream_load_pipe->read_one_message(file_buf, read_size);
11841183
}
11851184

1186-
// The total_length is -1, which means that the data arrives in a stream and the length is unknown.
11871185
// StreamLoadPipe::read_one_message only reads a portion of the data when stream loading with a chunked transfer HTTP request.
11881186
// Need to read all the data before performing JSON parsing.
11891187
uint64_t buffer_size = 1024 * 1024;

0 commit comments

Comments
 (0)