Skip to content

Commit bb62b52

Browse files
committed
fix comment
1 parent 74b0a44 commit bb62b52

File tree

4 files changed

+73
-49
lines changed

4 files changed

+73
-49
lines changed

be/src/vec/exec/format/json/new_json_reader.cpp

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1178,33 +1178,46 @@ Status NewJsonReader::_read_one_message_from_pipe(std::unique_ptr<uint8_t[]>* fi
11781178
size_t* read_size) {
11791179
auto* stream_load_pipe = dynamic_cast<io::StreamLoadPipe*>(_file_reader.get());
11801180

1181+
// first read: read from the pipe once.
1182+
RETURN_IF_ERROR(stream_load_pipe->read_one_message(file_buf, read_size));
1183+
1184+
// When the file is not chunked, the entire file has already been read.
11811185
if (!stream_load_pipe->is_chunked_transfer()) {
1182-
return stream_load_pipe->read_one_message(file_buf, read_size);
1186+
return Status::OK();
11831187
}
11841188

1185-
// StreamLoadPipe::read_one_message only reads a portion of the data when stream loading with a chunked transfer HTTP request.
1186-
// Need to read all the data before performing JSON parsing.
1187-
uint64_t buffer_size = 1024 * 1024;
1188-
std::vector<uint8_t> buf(buffer_size);
1189-
1189+
std::vector<uint8_t> buf;
11901190
uint64_t cur_size = 0;
1191+
1192+
// second read: continuously read data from the pipe until all data is read.
1193+
std::unique_ptr<uint8_t[]> read_buf;
1194+
size_t read_buf_size = 0;
11911195
while (true) {
1192-
RETURN_IF_ERROR(stream_load_pipe->read_one_message(file_buf, read_size));
1193-
if (*read_size == 0) {
1196+
RETURN_IF_ERROR(stream_load_pipe->read_one_message(&read_buf, &read_buf_size));
1197+
if (read_buf_size == 0) {
11941198
break;
11951199
} else {
1196-
if (cur_size + (*read_size) > buf.size()) {
1197-
buffer_size = 2 * (cur_size + (*read_size));
1198-
buf.resize(buffer_size);
1199-
}
1200-
memcpy(buf.data() + cur_size, file_buf->get(), *read_size);
1201-
cur_size += *read_size;
1200+
buf.insert(buf.end(), read_buf.get(), read_buf.get() + read_buf_size);
1201+
cur_size += read_buf_size;
1202+
read_buf_size = 0;
1203+
read_buf.reset();
12021204
}
12031205
}
12041206

1205-
file_buf->reset(new uint8_t[cur_size]);
1206-
memcpy(file_buf->get(), buf.data(), cur_size);
1207-
*read_size = cur_size;
1207+
// No data is available during the second read.
1208+
if (cur_size == 0) {
1209+
return Status::OK();
1210+
}
1211+
1212+
std::unique_ptr<uint8_t[]> total_buf = std::make_unique<uint8_t[]>(cur_size + *read_size);
1213+
1214+
// copy the data during the first read
1215+
memcpy(total_buf.get(), file_buf->get(), *read_size);
1216+
1217+
// copy the data during the second read
1218+
memcpy(total_buf.get() + *read_size, buf.data(), cur_size);
1219+
*file_buf = std::move(total_buf);
1220+
*read_size += cur_size;
12081221
return Status::OK();
12091222
}
12101223

be/src/vec/exec/format/json/new_json_reader.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,8 @@ class NewJsonReader : public GenericReader {
136136

137137
Status _read_one_message(std::unique_ptr<uint8_t[]>* file_buf, size_t* read_size);
138138

139+
// StreamLoadPipe::read_one_message only reads a portion of the data when stream loading with a chunked transfer HTTP request.
140+
// Need to read all the data before performing JSON parsing.
139141
Status _read_one_message_from_pipe(std::unique_ptr<uint8_t[]>* file_buf, size_t* read_size);
140142

141143
// simdjson, replace none simdjson function if it is ready

regression-test/data/load_p0/stream_load/test_load_with_transfer_encoding.out

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,6 @@
22
-- !sql --
33
15272
44

5+
-- !sql --
6+
15282
7+

regression-test/suites/load_p0/stream_load/test_load_with_transfer_encoding.groovy

Lines changed: 38 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -70,44 +70,50 @@ suite("test_load_with_transfer_encoding", "p0") {
7070

7171

7272
String db = context.config.getDbNameByFile(context.file)
73-
String url = """${getS3Url()}/regression/load/data/test_load_with_transfer_encoding.json"""
74-
String fileName
7573

76-
HttpClients.createDefault().withCloseable { client ->
77-
def file = new File("${context.config.cacheDataPath}/test_load_with_transfer_encoding.json")
78-
if (file.exists()) {
79-
log.info("Found ${url} in ${file.getAbsolutePath()}");
80-
fileName = file.getAbsolutePath()
81-
return;
82-
}
74+
def load_data = { inputFile, int count ->
75+
String url = """${getS3Url()}/regression/load/data/${inputFile}.json"""
76+
String fileName
77+
78+
HttpClients.createDefault().withCloseable { client ->
79+
def file = new File("${context.config.cacheDataPath}/${inputFile}.json")
80+
if (file.exists()) {
81+
log.info("Found ${url} in ${file.getAbsolutePath()}");
82+
fileName = file.getAbsolutePath()
83+
return;
84+
}
8385

84-
log.info("Start to down data from ${url} to $context.config.cacheDataPath}/");
85-
CloseableHttpResponse resp = client.execute(RequestBuilder.get(url).build())
86-
int code = resp.getStatusLine().getStatusCode()
86+
log.info("Start to down data from ${url} to $context.config.cacheDataPath}/");
87+
CloseableHttpResponse resp = client.execute(RequestBuilder.get(url).build())
88+
int code = resp.getStatusLine().getStatusCode()
8789

88-
if (code != HttpStatus.SC_OK) {
89-
String streamBody = EntityUtils.toString(resp.getEntity())
90-
log.info("Fail to download data ${url}, code: ${code}, body:\n${streamBody}")
91-
throw new IllegalStateException("Get http stream failed, status code is ${code}, body:\n${streamBody}")
90+
if (code != HttpStatus.SC_OK) {
91+
String streamBody = EntityUtils.toString(resp.getEntity())
92+
log.info("Fail to download data ${url}, code: ${code}, body:\n${streamBody}")
93+
throw new IllegalStateException("Get http stream failed, status code is ${code}, body:\n${streamBody}")
94+
}
95+
96+
InputStream httpFileStream = resp.getEntity().getContent()
97+
java.nio.file.Files.copy(httpFileStream, file.toPath(), java.nio.file.StandardCopyOption.REPLACE_EXISTING)
98+
httpFileStream.close()
99+
fileName = file.getAbsolutePath()
100+
log.info("File downloaded to: ${fileName}")
92101
}
93102

94-
InputStream httpFileStream = resp.getEntity().getContent()
95-
java.nio.file.Files.copy(httpFileStream, file.toPath(), java.nio.file.StandardCopyOption.REPLACE_EXISTING)
96-
httpFileStream.close()
97-
fileName = file.getAbsolutePath()
98-
log.info("File downloaded to: ${fileName}")
103+
def command = """curl --location-trusted -u ${context.config.feHttpUser}:${context.config.feHttpPassword} -H read_json_by_line:false -H Expect:100-continue -H max_filter_ratio:1 -H strict_mode:false -H strip_outer_array:true -H columns:id,created,creater,deleted,updated,card_id,card_type_id,card_type_name,cash_balance,cashier_id,client_id,cost,creater_name,details,id_name,id_number,last_client_id,login_id,operation_type,place_id,present,present_balance,remark,shift_id,source_type,online_account -H format:json -H Transfer-Encoding:chunked -T ${fileName} -XPUT http://${context.config.feHttpAddress}/api/${db}/${table_name}/_stream_load"""
104+
log.info("stream load: ${command}")
105+
def process = command.execute()
106+
def code = process.waitFor()
107+
def out = process.text
108+
def json = parseJson(out)
109+
log.info("stream load result is:: ${out}".toString())
110+
assertEquals("success", json.Status.toLowerCase())
111+
assertEquals(count, json.NumberLoadedRows)
112+
qt_sql """ select count() from ${table_name} """
99113
}
100114

101-
def command = """curl --location-trusted -u ${context.config.feHttpUser}:${context.config.feHttpPassword} -H read_json_by_line:false -H Expect:100-continue -H max_filter_ratio:1 -H strict_mode:false -H strip_outer_array:true -H columns:id,created,creater,deleted,updated,card_id,card_type_id,card_type_name,cash_balance,cashier_id,client_id,cost,creater_name,details,id_name,id_number,last_client_id,login_id,operation_type,place_id,present,present_balance,remark,shift_id,source_type,online_account -H format:json -H Transfer-Encoding:chunked -T ${fileName} -XPUT http://${context.config.feHttpAddress}/api/${db}/${table_name}/_stream_load"""
102-
log.info("stream load: ${command}")
103-
def process = command.execute()
104-
def code = process.waitFor()
105-
def out = process.text
106-
def json = parseJson(out)
107-
log.info("stream load result is:: ${out}".toString())
108-
assertEquals("success", json.Status.toLowerCase())
109-
assertEquals(15272, json.NumberLoadedRows)
110-
111-
qt_sql """ select count() from ${table_name} """
115+
load_data.call("test_load_with_transfer_encoding", 15272)
116+
load_data.call("test_transfer_encoding_small", 10)
117+
112118
}
113119

0 commit comments

Comments
 (0)