Skip to content

Commit f51fd09

Browse files
[feat]: CGO packed writer api (#160)
related: #158 1. Put parquet writer properties into storage config 2. Packed writer close() API return column offset mapping. 3. Add CGO and Go API for packed writer --------- Signed-off-by: shaoting-huang <[email protected]>
1 parent 51b7244 commit f51fd09

33 files changed

+695
-275
lines changed

cpp/benchmark/benchmark_packed.cpp

+8-5
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ static void PackedRead(benchmark::State& st, arrow::fs::FileSystem* fs, const st
8484

8585
auto paths = std::vector<std::string>{path + "/0", path + "/1"};
8686

87-
// after writing, the column of large_str is in 0th file, and the last int64 columns are in 1st file
87+
// after writing, the pk and the ts are in the first file, and the large str is in the second file
8888
std::vector<std::shared_ptr<arrow::Field>> fields = {
8989
arrow::field("int", arrow::utf8()),
9090
arrow::field("int64", arrow::int32()),
@@ -93,7 +93,7 @@ static void PackedRead(benchmark::State& st, arrow::fs::FileSystem* fs, const st
9393
auto schema = arrow::schema(fields);
9494

9595
for (auto _ : st) {
96-
PackedRecordBatchReader pr(*fs, paths, schema, column_offsets, needed_columns, buffer_size);
96+
PackedRecordBatchReader pr(*fs, path, schema, needed_columns, buffer_size);
9797
auto r = arrow::RecordBatch::MakeEmpty(schema);
9898
SKIP_IF_NOT_OK(r.status(), st)
9999
auto rb = r.ValueOrDie();
@@ -107,7 +107,10 @@ static void PackedRead(benchmark::State& st, arrow::fs::FileSystem* fs, const st
107107
}
108108
}
109109

110-
static void PackedWrite(benchmark::State& st, arrow::fs::FileSystem* fs, const std::string& path, size_t buffer_size) {
110+
static void PackedWrite(benchmark::State& st,
111+
std::shared_ptr<arrow::fs::FileSystem> fs,
112+
std::string& path,
113+
size_t buffer_size) {
111114
auto schema = arrow::schema({arrow::field("int32", arrow::int32()), arrow::field("int64", arrow::int64()),
112115
arrow::field("str", arrow::utf8())});
113116
arrow::Int32Builder int_builder;
@@ -134,7 +137,7 @@ static void PackedWrite(benchmark::State& st, arrow::fs::FileSystem* fs, const s
134137
auto conf = StorageConfig();
135138
conf.use_custom_part_upload_size = true;
136139
conf.part_size = 30 * 1024 * 1024;
137-
PackedRecordBatchWriter writer(buffer_size, schema, *fs, path, conf, *parquet::default_writer_properties());
140+
PackedRecordBatchWriter writer(buffer_size, schema, fs, path, conf);
138141
for (int i = 0; i < 8 * 1024; ++i) {
139142
auto r = writer.Write(record_batch);
140143
if (!r.ok()) {
@@ -153,7 +156,7 @@ std::string PATH = "/tmp/bench/foo";
153156

154157
BENCHMARK_DEFINE_F(S3Fixture, Write32MB)(benchmark::State& st) {
155158
SKIP_IF_NOT_OK(fs_->CreateDir(PATH), st);
156-
PackedWrite(st, fs_.get(), PATH, 22 * 1024 * 1024);
159+
PackedWrite(st, fs_, PATH, 22 * 1024 * 1024);
157160
}
158161
BENCHMARK_REGISTER_F(S3Fixture, Write32MB)->UseRealTime();
159162

cpp/include/milvus-storage/common/config.h

+13-13
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,23 @@
1515
#pragma once
1616

1717
#include <sstream>
18+
#include <parquet/properties.h>
1819

1920
using namespace std;
2021

2122
namespace milvus_storage {
2223

24+
static constexpr int64_t DEFAULT_MAX_ROW_GROUP_SIZE = 1024 * 1024; // 1 MB
25+
26+
// https://github.com/apache/arrow/blob/6b268f62a8a172249ef35f093009c740c32e1f36/cpp/src/arrow/filesystem/s3fs.cc#L1596
27+
static constexpr int64_t ARROW_PART_UPLOAD_SIZE = 10 * 1024 * 1024; // 10 MB
28+
29+
static constexpr int64_t MIN_BUFFER_SIZE_PER_FILE = DEFAULT_MAX_ROW_GROUP_SIZE + ARROW_PART_UPLOAD_SIZE;
30+
31+
// Default number of rows to read when using ::arrow::RecordBatchReader
32+
static constexpr int64_t DEFAULT_READ_BATCH_SIZE = 1024;
33+
static constexpr int64_t DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024;
34+
2335
struct StorageConfig {
2436
std::string uri = "";
2537
std::string bucket_name = "";
@@ -31,6 +43,7 @@ struct StorageConfig {
3143
std::string region = "";
3244
bool use_custom_part_upload_size = false;
3345
int64_t part_size = 0;
46+
parquet::WriterProperties writer_props = *parquet::default_writer_properties();
3447

3548
std::string ToString() const {
3649
std::stringstream ss;
@@ -42,17 +55,4 @@ struct StorageConfig {
4255
}
4356
};
4457

45-
static constexpr int64_t DEFAULT_MAX_ROW_GROUP_SIZE = 1024 * 1024; // 1 MB
46-
47-
// https://github.com/apache/arrow/blob/6b268f62a8a172249ef35f093009c740c32e1f36/cpp/src/arrow/filesystem/s3fs.cc#L1596
48-
static constexpr int64_t ARROW_PART_UPLOAD_SIZE = 10 * 1024 * 1024; // 10 MB
49-
50-
static constexpr int64_t MIN_BUFFER_SIZE_PER_FILE = DEFAULT_MAX_ROW_GROUP_SIZE + ARROW_PART_UPLOAD_SIZE;
51-
52-
static const std::string ROW_GROUP_SIZE_META_KEY = "row_group_size";
53-
54-
// Default number of rows to read when using ::arrow::RecordBatchReader
55-
static constexpr int64_t DEFAULT_READ_BATCH_SIZE = 1024;
56-
static constexpr int64_t DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024;
57-
5858
} // namespace milvus_storage

cpp/include/milvus-storage/common/path_util.h

+8-4
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@ namespace milvus_storage {
2121

2222
constexpr char kSep = '/';
2323

24-
arrow::Status NotAFile(std::string_view path) {
24+
static inline arrow::Status NotAFile(std::string_view path) {
2525
return arrow::Status::IOError("Not a regular file: " + std::string(path));
2626
}
2727

28-
bool HasTrailingSlash(std::string_view s) { return !s.empty() && s.back() == kSep; }
28+
static inline bool HasTrailingSlash(std::string_view s) { return !s.empty() && s.back() == kSep; }
2929

30-
std::string EnsureTrailingSlash(std::string_view v) {
30+
static inline std::string EnsureTrailingSlash(std::string_view v) {
3131
if (!v.empty() && !HasTrailingSlash(v)) {
3232
// XXX How about "C:" on Windows? We probably don't want to turn it into "C:/"...
3333
// Unless the local filesystem always uses absolute paths
@@ -37,7 +37,7 @@ std::string EnsureTrailingSlash(std::string_view v) {
3737
}
3838
}
3939

40-
std::pair<std::string, std::string> GetAbstractPathParent(const std::string& s) {
40+
static inline std::pair<std::string, std::string> GetAbstractPathParent(const std::string& s) {
4141
// XXX should strip trailing slash?
4242

4343
auto pos = s.find_last_of(kSep);
@@ -48,4 +48,8 @@ std::pair<std::string, std::string> GetAbstractPathParent(const std::string& s)
4848
return {s.substr(0, pos), s.substr(pos + 1)};
4949
}
5050

51+
static inline std::string ConcatenateFilePath(const std::string& parent, const std::string& child) {
52+
return parent + kSep + child;
53+
}
54+
5155
} // namespace milvus_storage

cpp/include/milvus-storage/common/serde.h

+58-2
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,78 @@
1818

1919
namespace milvus_storage {
2020

21+
static const std::string GROUP_DELIMITER = ";";
22+
static const std::string COLUMN_DELIMITER = ",";
23+
static const std::string ROW_GROUP_SIZE_META_KEY = "row_group_size";
24+
static const std::string COLUMN_OFFSETS_META_KEY = "column_offsets";
25+
2126
class PackedMetaSerde {
2227
public:
2328
// Serialize a vector of size_t to a byte array and convert it to a string
24-
static std::string serialize(const std::vector<size_t>& sizes) {
29+
static std::string SerializeRowGroupSizes(const std::vector<size_t>& sizes) {
2530
std::vector<uint8_t> byteArray(sizes.size() * sizeof(size_t));
2631
std::memcpy(byteArray.data(), sizes.data(), byteArray.size());
2732
return std::string(byteArray.begin(), byteArray.end());
2833
}
2934

3035
// Deserialize a string back to a vector of size_t
31-
static std::vector<size_t> deserialize(const std::string& input) {
36+
static std::vector<size_t> DeserializeRowGroupSizes(const std::string& input) {
3237
std::vector<uint8_t> byteArray(input.begin(), input.end());
3338
std::vector<size_t> sizes(byteArray.size() / sizeof(size_t));
3439
std::memcpy(sizes.data(), byteArray.data(), byteArray.size());
3540
return sizes;
3641
}
42+
43+
static std::string SerializeColumnOffsets(const std::vector<std::vector<int>>& column_offsets) {
44+
std::stringstream ss;
45+
for (size_t i = 0; i < column_offsets.size(); ++i) {
46+
if (i > 0) {
47+
ss << GROUP_DELIMITER;
48+
}
49+
50+
for (size_t j = 0; j < column_offsets[i].size(); ++j) {
51+
if (j > 0) {
52+
ss << COLUMN_DELIMITER;
53+
}
54+
ss << column_offsets[i][j];
55+
}
56+
}
57+
58+
auto s = ss.str();
59+
return s;
60+
}
61+
62+
static std::vector<std::vector<int>> DeserializeColumnOffsets(const std::string& input) {
63+
std::vector<std::vector<int>> column_offsets;
64+
65+
size_t group_start = 0;
66+
size_t group_end = input.find(GROUP_DELIMITER);
67+
68+
while (group_start != std::string::npos) {
69+
std::string group = input.substr(group_start, group_end - group_start);
70+
std::vector<int> group_indices;
71+
72+
size_t column_start = 0;
73+
size_t column_end = group.find(COLUMN_DELIMITER);
74+
while (column_start != std::string::npos) {
75+
std::string column = group.substr(column_start, column_end - column_start);
76+
if (!column.empty()) {
77+
group_indices.push_back(std::stoi(column));
78+
}
79+
column_start = (column_end == std::string::npos) ? std::string::npos : column_end + COLUMN_DELIMITER.size();
80+
column_end = group.find(COLUMN_DELIMITER, column_start);
81+
}
82+
83+
if (!group_indices.empty()) {
84+
column_offsets.push_back(group_indices);
85+
}
86+
87+
group_start = (group_end == std::string::npos) ? std::string::npos : group_end + GROUP_DELIMITER.size();
88+
group_end = input.find(GROUP_DELIMITER, group_start);
89+
}
90+
91+
return column_offsets;
92+
}
3793
};
3894

3995
} // namespace milvus_storage

cpp/include/milvus-storage/common/status.h

+5
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ class Status {
3737

3838
static Status WriterError(const std::string& msg) { return Status(kWriterError, msg); }
3939

40+
static Status ReaderError(const std::string& msg) { return Status(kReaderError, msg); }
41+
4042
static Status IOError(const std::string& msg) { return Status(kIOError, msg); }
4143

4244
bool ok() const { return code_ == kOk; }
@@ -51,6 +53,8 @@ class Status {
5153

5254
bool IsWriterError() const { return code_ == kWriterError; }
5355

56+
bool IsReaderError() const { return code_ == kReaderError; }
57+
5458
bool IsIOError() const { return code_ == kIOError; }
5559

5660
std::string ToString() const;
@@ -64,6 +68,7 @@ class Status {
6468
kFileNotFound = 4,
6569
kWriterError = 5,
6670
kIOError = 6,
71+
kReaderError = 7
6772
};
6873

6974
explicit Status(Code code, const std::string& msg = "") : code_(code), msg_(msg) {}

cpp/include/milvus-storage/format/parquet/file_writer.h

+2-10
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414

1515
#pragma once
1616

17-
#include <parquet/properties.h>
1817
#include <memory>
1918
#include "arrow/filesystem/filesystem.h"
2019
#include "format/writer.h"
@@ -27,19 +26,11 @@ namespace milvus_storage {
2726

2827
class ParquetFileWriter : public FileWriter {
2928
public:
30-
// with default WriterProperties
3129
ParquetFileWriter(std::shared_ptr<arrow::Schema> schema,
3230
arrow::fs::FileSystem& fs,
3331
const std::string& file_path,
3432
const StorageConfig& storage_config);
3533

36-
// with custom WriterProperties
37-
ParquetFileWriter(std::shared_ptr<arrow::Schema> schema,
38-
arrow::fs::FileSystem& fs,
39-
const std::string& file_path,
40-
const StorageConfig& storage_config,
41-
const parquet::WriterProperties& props);
42-
4334
Status Init() override;
4435

4536
Status Write(const arrow::RecordBatch& record) override;
@@ -49,6 +40,8 @@ class ParquetFileWriter : public FileWriter {
4940
Status WriteRecordBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches,
5041
const std::vector<size_t>& batch_memory_sizes);
5142

43+
void AppendKVMetadata(const std::string& key, const std::string& value);
44+
5245
int64_t count() override;
5346

5447
Status Close() override;
@@ -61,7 +54,6 @@ class ParquetFileWriter : public FileWriter {
6154

6255
std::unique_ptr<parquet::arrow::FileWriter> writer_;
6356
std::shared_ptr<arrow::KeyValueMetadata> kv_metadata_;
64-
parquet::WriterProperties props_;
6557
int64_t count_ = 0;
6658
int row_group_num_ = 0;
6759
std::vector<size_t> row_group_sizes_;

cpp/include/milvus-storage/packed/chunk_manager.h

+7
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
#pragma once
1616

17+
#include "packed/column_group.h"
1718
#include <parquet/arrow/reader.h>
1819
#include <arrow/filesystem/filesystem.h>
1920
#include <arrow/record_batch.h>
@@ -28,7 +29,13 @@ struct ColumnOffset {
2829
int path_index;
2930
int col_index;
3031

32+
ColumnOffset() = default;
33+
3134
ColumnOffset(int path_index, int col_index) : path_index(path_index), col_index(col_index) {}
35+
36+
std::string ToString() {
37+
return "path_index: " + std::to_string(path_index) + ", col_index: " + std::to_string(col_index);
38+
}
3239
};
3340

3441
// record which chunk is in use and its offset in the file

cpp/include/milvus-storage/packed/column_group.h

+3
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,12 @@
1414

1515
#pragma once
1616

17+
#include <arrow/type.h>
1718
#include <arrow/record_batch.h>
1819
#include <queue>
1920
#include "common/status.h"
21+
#include <map>
22+
#include <string>
2023

2124
namespace milvus_storage {
2225

cpp/include/milvus-storage/packed/column_group_writer.h

+2-8
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include "common/status.h"
2323
#include "packed/column_group.h"
2424
#include "common/config.h"
25+
#include "common/serde.h"
2526

2627
namespace milvus_storage {
2728

@@ -34,16 +35,9 @@ class ColumnGroupWriter {
3435
const StorageConfig& storage_config,
3536
const std::vector<int>& origin_column_indices);
3637

37-
ColumnGroupWriter(GroupId group_id,
38-
std::shared_ptr<arrow::Schema> schema,
39-
arrow::fs::FileSystem& fs,
40-
const std::string& file_path,
41-
const StorageConfig& storage_config,
42-
const parquet::WriterProperties& props,
43-
const std::vector<int>& origin_column_indices);
44-
4538
Status Init();
4639
Status Write(const std::shared_ptr<arrow::RecordBatch>& record);
40+
Status WriteColumnOffsetsMeta(const std::vector<std::vector<int>>& column_offsets);
4741
Status Flush();
4842
Status Close();
4943
GroupId Group_id() const;

cpp/include/milvus-storage/packed/reader.h

+4-8
Original file line numberDiff line numberDiff line change
@@ -40,16 +40,9 @@ using RowOffsetMinHeap =
4040

4141
class PackedRecordBatchReader : public arrow::RecordBatchReader {
4242
public:
43-
// Test only
4443
PackedRecordBatchReader(arrow::fs::FileSystem& fs,
45-
const std::string& path,
44+
const std::string& file_path,
4645
const std::shared_ptr<arrow::Schema> schema,
47-
const int64_t buffer_size = DEFAULT_READ_BUFFER_SIZE);
48-
49-
PackedRecordBatchReader(arrow::fs::FileSystem& fs,
50-
const std::vector<std::string>& paths,
51-
const std::shared_ptr<arrow::Schema> schema,
52-
const std::vector<ColumnOffset>& column_offsets,
5346
const std::set<int>& needed_columns,
5447
const int64_t buffer_size = DEFAULT_READ_BUFFER_SIZE);
5548

@@ -60,6 +53,7 @@ class PackedRecordBatchReader : public arrow::RecordBatchReader {
6053
arrow::Status Close() override;
6154

6255
private:
56+
Status initializeColumnOffsets(arrow::fs::FileSystem& fs, const std::set<int>& needed_columns, size_t num_fields);
6357
// Advance buffer to fill the expected buffer size
6458
arrow::Status advanceBuffer();
6559
std::vector<const arrow::Array*> collectChunks(int64_t chunksize) const;
@@ -77,7 +71,9 @@ class PackedRecordBatchReader : public arrow::RecordBatchReader {
7771
std::unique_ptr<ChunkManager> chunk_manager_;
7872
int64_t absolute_row_position_;
7973
std::vector<ColumnOffset> needed_column_offsets_;
74+
std::set<int> needed_paths_;
8075
std::vector<std::vector<size_t>> row_group_sizes_;
76+
const std::string file_path_;
8177
int read_count_;
8278
};
8379

0 commit comments

Comments
 (0)