Skip to content

Commit 51b7244

Browse files
[feat]: file record batch reader (#159)
related: #158 --------- Signed-off-by: shaoting-huang <[email protected]>
1 parent e0473b6 commit 51b7244

File tree

8 files changed

+206
-6
lines changed

8 files changed

+206
-6
lines changed

.github/workflows/ci.yaml

+3
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@ jobs:
5656

5757
- name: Build c++
5858
run: cd cpp && make
59+
60+
- name: Run cpp tests
61+
run: cd cpp/build/Release/test && ./milvus_test
5962

6063
- name: Run tests
6164
run: cd go && make && make test

cpp/include/milvus-storage/format/parquet/file_reader.h

+53
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,61 @@
1717
#include "format/reader.h"
1818
#include "parquet/arrow/reader.h"
1919
#include "storage/options.h"
20+
#include "common/config.h"
2021
namespace milvus_storage {
2122

23+
class FileRecordBatchReader : public arrow::RecordBatchReader {
24+
public:
25+
/**
26+
* @brief FileRecordBatchReader reads num of row groups starting from row_group_offset with memory constraints.
27+
*
28+
* @param fs The Arrow filesystem interface.
29+
* @param path Path to the Parquet file.
30+
* @param schema Expected schema of the Parquet file.
31+
* @param buffer_size Memory limit for reading row groups.
32+
* @param row_group_offset The starting row group index to read.
33+
* @param row_group_num The number of row groups to read.
34+
*/
35+
FileRecordBatchReader(arrow::fs::FileSystem& fs,
36+
const std::string& path,
37+
const std::shared_ptr<arrow::Schema>& schema,
38+
const int64_t buffer_size = DEFAULT_READ_BUFFER_SIZE,
39+
const size_t row_group_offset = 0,
40+
const size_t row_group_num = std::numeric_limits<size_t>::max());
41+
42+
/**
43+
* @brief Returns the schema of the Parquet file.
44+
*
45+
* @return A shared pointer to the Arrow schema.
46+
*/
47+
std::shared_ptr<arrow::Schema> schema() const;
48+
49+
/**
50+
* @brief Reads the next record batch from the file.
51+
*
52+
* @param out A shared pointer to the output record batch.
53+
* @return Arrow Status indicating success or failure.
54+
*/
55+
arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* out);
56+
57+
/**
58+
* @brief Closes the reader and releases resources.
59+
*
60+
* @return Arrow Status indicating success or failure.
61+
*/
62+
arrow::Status Close();
63+
64+
private:
65+
std::shared_ptr<arrow::Schema> schema_;
66+
std::unique_ptr<parquet::arrow::FileReader> file_reader_;
67+
size_t current_row_group_ = 0;
68+
size_t read_count_ = 0;
69+
70+
int64_t buffer_size_;
71+
std::vector<size_t> row_group_sizes_;
72+
size_t row_group_offset_;
73+
};
74+
2275
class ParquetFileReader : public Reader {
2376
public:
2477
ParquetFileReader(std::unique_ptr<parquet::arrow::FileReader> reader);

cpp/src/format/parquet/file_reader.cpp

+68
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,84 @@
1717
#include <arrow/record_batch.h>
1818
#include <arrow/table_builder.h>
1919
#include <arrow/type_fwd.h>
20+
#include <arrow/util/key_value_metadata.h>
2021
#include <parquet/type_fwd.h>
2122
#include <iterator>
2223
#include <memory>
2324
#include <utility>
2425
#include <vector>
2526
#include "arrow/table.h"
2627
#include "common/macro.h"
28+
#include "common/serde.h"
29+
#include "common/log.h"
30+
#include "common/arrow_util.h"
2731

2832
namespace milvus_storage {
2933

34+
FileRecordBatchReader::FileRecordBatchReader(arrow::fs::FileSystem& fs,
35+
const std::string& path,
36+
const std::shared_ptr<arrow::Schema>& schema,
37+
const int64_t buffer_size,
38+
const size_t row_group_offset,
39+
const size_t row_group_num)
40+
: schema_(schema), row_group_offset_(row_group_offset), buffer_size_(buffer_size) {
41+
auto result = MakeArrowFileReader(fs, path);
42+
if (!result.ok()) {
43+
LOG_STORAGE_ERROR_ << "Error making file reader:" << result.status().ToString();
44+
throw std::runtime_error(result.status().ToString());
45+
}
46+
file_reader_ = std::move(result.value());
47+
48+
auto metadata = file_reader_->parquet_reader()->metadata()->key_value_metadata()->Get(ROW_GROUP_SIZE_META_KEY);
49+
if (!metadata.ok()) {
50+
LOG_STORAGE_ERROR_ << "Metadata not found in file: " << path;
51+
throw std::runtime_error(metadata.status().ToString());
52+
}
53+
auto all_row_group_sizes = PackedMetaSerde::deserialize(metadata.ValueOrDie());
54+
if (row_group_offset >= all_row_group_sizes.size()) {
55+
std::string error_msg =
56+
"Row group offset exceeds total number of row groups. "
57+
"Row group offset: " +
58+
std::to_string(row_group_offset) + ", Total row groups: " + std::to_string(all_row_group_sizes.size());
59+
LOG_STORAGE_ERROR_ << error_msg;
60+
throw std::out_of_range(error_msg);
61+
}
62+
size_t end_offset = std::min(row_group_offset + row_group_num, all_row_group_sizes.size());
63+
row_group_sizes_.assign(all_row_group_sizes.begin() + row_group_offset, all_row_group_sizes.begin() + end_offset);
64+
}
65+
66+
std::shared_ptr<arrow::Schema> FileRecordBatchReader::schema() const { return schema_; }
67+
68+
arrow::Status FileRecordBatchReader::ReadNext(std::shared_ptr<arrow::RecordBatch>* out) {
69+
std::vector<int> rgs_to_read;
70+
size_t buffer_size = 0;
71+
72+
while (current_row_group_ < row_group_sizes_.size() &&
73+
buffer_size + row_group_sizes_[current_row_group_] <= buffer_size_) {
74+
rgs_to_read.push_back(current_row_group_ + row_group_offset_);
75+
buffer_size += row_group_sizes_[current_row_group_];
76+
current_row_group_++;
77+
}
78+
79+
if (rgs_to_read.empty()) {
80+
*out = nullptr;
81+
return arrow::Status::OK();
82+
}
83+
84+
std::shared_ptr<arrow::Table> table = nullptr;
85+
RETURN_NOT_OK(file_reader_->ReadRowGroups(rgs_to_read, &table));
86+
*out = table->CombineChunksToBatch().ValueOrDie();
87+
return arrow::Status::OK();
88+
}
89+
90+
arrow::Status FileRecordBatchReader::Close() {
91+
LOG_STORAGE_DEBUG_ << "FileRecordBatchReader closed after reading " << read_count_ << " times.";
92+
file_reader_ = nullptr;
93+
schema_ = nullptr;
94+
row_group_sizes_.clear();
95+
return arrow::Status::OK();
96+
}
97+
3098
ParquetFileReader::ParquetFileReader(std::unique_ptr<parquet::arrow::FileReader> reader) : reader_(std::move(reader)) {}
3199

32100
Result<std::shared_ptr<arrow::RecordBatch>> GetRecordAtOffset(arrow::RecordBatchReader* reader, int64_t offset) {

cpp/src/format/parquet/file_writer.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
#include "filesystem/fs.h"
2121
#include <boost/variant.hpp>
2222
#include "common/config.h"
23-
#include "packed/utils/serde.h"
23+
#include "common/serde.h"
2424
#include "filesystem/s3/multi_part_upload_s3_fs.h"
2525

2626
namespace milvus_storage {

cpp/src/packed/reader.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
#include "common/log.h"
2424
#include "packed/chunk_manager.h"
2525
#include "common/config.h"
26-
#include "packed/utils/serde.h"
26+
#include "common/serde.h"
2727

2828
namespace milvus_storage {
2929

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
// Copyright 2024 Zilliz
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "../../packed/packed_test_base.h"
16+
#include "format/parquet/file_reader.h"
17+
namespace milvus_storage {
18+
19+
class FileReaderTest : public PackedTestBase {};
20+
21+
TEST_F(FileReaderTest, FileRecordBatchReader) {
22+
int batch_size = 100;
23+
24+
PackedRecordBatchWriter writer(writer_memory_, schema_, *fs_, file_path_, storage_config_, props_);
25+
for (int i = 0; i < batch_size; ++i) {
26+
EXPECT_TRUE(writer.Write(record_batch_).ok());
27+
}
28+
EXPECT_TRUE(writer.Close().ok());
29+
30+
std::vector<std::shared_ptr<arrow::Field>> fields = {
31+
arrow::field("int32", arrow::int32()),
32+
arrow::field("int64", arrow::int64()),
33+
arrow::field("str", arrow::utf8()),
34+
};
35+
auto schema = arrow::schema(fields);
36+
37+
// exeed row group range, should throw out_of_range
38+
std::string path = file_path_ + "/0";
39+
EXPECT_THROW(FileRecordBatchReader fr(*fs_, path, schema, reader_memory_, 100), std::out_of_range);
40+
41+
// file not exist, should throw runtime_error
42+
path = file_path_ + "/file_not_exist";
43+
EXPECT_THROW(FileRecordBatchReader fr(*fs_, path, schema, reader_memory_), std::runtime_error);
44+
45+
// read all row groups
46+
path = file_path_ + "/0";
47+
FileRecordBatchReader fr(*fs_, path, schema, reader_memory_);
48+
ASSERT_AND_ARROW_ASSIGN(auto fr_table, fr.ToTable());
49+
ASSERT_STATUS_OK(fr.Close());
50+
51+
std::set<int> needed_columns = {0, 1, 2};
52+
std::vector<ColumnOffset> column_offsets = {
53+
ColumnOffset(0, 0),
54+
ColumnOffset(0, 1),
55+
ColumnOffset(0, 2),
56+
};
57+
PackedRecordBatchReader pr(*fs_, {path}, schema, column_offsets, needed_columns, reader_memory_);
58+
ASSERT_AND_ARROW_ASSIGN(auto pr_table, pr.ToTable());
59+
ASSERT_STATUS_OK(pr.Close());
60+
ASSERT_EQ(fr_table->num_rows(), pr_table->num_rows());
61+
62+
// read row group 1
63+
path = file_path_ + "/0";
64+
FileRecordBatchReader rgr(*fs_, path, schema, reader_memory_, 1, 1);
65+
ASSERT_AND_ARROW_ASSIGN(auto rg_table, rgr.ToTable());
66+
ASSERT_STATUS_OK(rgr.Close());
67+
ASSERT_GT(fr_table->num_rows(), rg_table->num_rows());
68+
}
69+
70+
} // namespace milvus_storage

go/Makefile

+10-4
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,25 @@ CFLAGS += $(CONAN_CFLAGS)
88
CXXFLAGS += $(CONAN_CXXFLAGS)
99
INCLUDE_DIRS = $(CONAN_INCLUDE_DIRS_ARROW) $(MILVUS_STORAGE_INCLUDE_DIR)
1010
CPPFLAGS = $(addprefix -I, $(INCLUDE_DIRS))
11-
LDFLAGS += $(addprefix -L, $(MILVUS_STORAGE_LD_DIR))
11+
LDFLAGS += $(addprefix -L, $(MILVUS_STORAGE_LD_DIR)) -Wl,-rpath,$(MILVUS_STORAGE_LD_DIR)
1212

1313
.EXPORT_ALL_VARIABLES:
14-
.PHONY: build
14+
.PHONY: build test proto
1515

1616
build:
17+
@echo "CPPFLAGS: $(CPPFLAGS)"
18+
@echo "LDFLAGS: $(LDFLAGS)"
1719
CGO_CFLAGS="$(CPPFLAGS)" CGO_LDFLAGS="$(LDFLAGS) -lmilvus-storage" go build ./...
1820

1921
test:
20-
CGO_CFLAGS="$(CPPFLAGS)" CGO_LDFLAGS="$(LDFLAGS) -Wl,-rpath,$(MILVUS_STORAGE_LD_DIR) -lmilvus-storage" go test -timeout 30s ./...
22+
LD_LIBRARY_PATH=$(MILVUS_STORAGE_LD_DIR):$$LD_LIBRARY_PATH \
23+
CGO_CFLAGS="$(CPPFLAGS)" \
24+
CGO_LDFLAGS="$(LDFLAGS) -lmilvus-storage" \
25+
go test -timeout 30s ./...
26+
2127
proto:
2228
mkdir -p proto/manifest_proto
2329
mkdir -p proto/schema_proto
2430
protoc -I="proto" --go_out=paths=source_relative:./proto/manifest_proto proto/manifest.proto
25-
protoc -I="proto" --go_out=paths=source_relative:./proto/schema_proto proto/storage_schema.proto
2631

32+
protoc -I="proto" --go_out=paths=source_relative:./proto/schema_proto proto/storage_schema.proto

0 commit comments

Comments
 (0)