Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat]: Packed reader multi files test #161

Merged
merged 3 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ on:
jobs:
test:
name: Test
runs-on: ubuntu-latest
runs-on: ubuntu-22.04

steps:
- name: Checkout code
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/cpp-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ on:
- '!go/**'
jobs:
unittest:
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v3

Expand Down
16 changes: 9 additions & 7 deletions cpp/include/milvus-storage/common/macro.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,18 @@ namespace milvus_storage {
#undef RETURN_NOT_OK
#define RETURN_NOT_OK(status) \
do { \
if (!(status).ok()) { \
return status; \
auto _s = (status); \
if (!_s.ok()) { \
return _s; \
} \
} while (false)

#define RETURN_ARROW_NOT_OK(status) \
do { \
if (!(status).ok()) { \
return Status::ArrowError((status).ToString()); \
} \
#define RETURN_ARROW_NOT_OK(status) \
do { \
auto _s = (status); \
if (!_s.ok()) { \
return Status::ArrowError((_s).ToString()); \
} \
} while (false)

#define RETURN_ARROW_NOT_OK_WITH_PREFIX(msg, staus) \
Expand Down
8 changes: 7 additions & 1 deletion cpp/include/milvus-storage/packed/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class PackedRecordBatchReader : public arrow::RecordBatchReader {
PackedRecordBatchReader(arrow::fs::FileSystem& fs,
const std::string& file_path,
const std::shared_ptr<arrow::Schema> schema,
const std::set<int>& needed_columns,
std::set<int>& needed_columns,
const int64_t buffer_size = DEFAULT_READ_BUFFER_SIZE);

std::shared_ptr<arrow::Schema> schema() const override;
Expand All @@ -53,6 +53,12 @@ class PackedRecordBatchReader : public arrow::RecordBatchReader {
arrow::Status Close() override;

private:
void initialize(arrow::fs::FileSystem& fs,
const std::string& file_path,
const std::shared_ptr<arrow::Schema> schema,
std::set<int>& needed_columns,
const int64_t buffer_size);

Status initializeColumnOffsets(arrow::fs::FileSystem& fs, const std::set<int>& needed_columns, size_t num_fields);
// Advance buffer to fill the expected buffer size
arrow::Status advanceBuffer();
Expand Down
26 changes: 24 additions & 2 deletions cpp/include/milvus-storage/packed/reader_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,37 @@ typedef void* CPackedReader;
typedef void* CArrowArray;
typedef void* CArrowSchema;

int Open(const char* path, struct ArrowSchema* schema, const int64_t buffer_size, struct ArrowArrayStream* out);

/**
* @brief Open a packed reader to read needed columns in the specified path.
*
* @param path The root path of the packed files to read.
* @param schema The original schema of data.
* @param buffer_size The max buffer size of the packed reader.
* @param needed_columns The columns to read. If it is empty, all columns will be read.
* @param c_packed_reader The output pointer of the packed reader.
*/
int NewPackedReader(const char* path,
struct ArrowSchema* schema,
const int64_t buffer_size,
int* needed_columns,
int num_needed_columns,
CPackedReader* c_packed_reader);

/**
* @brief Read the next record batch from the packed reader.
* By default, the maximum return batch is 1024 rows.
*
* @param c_packed_reader The packed reader to read.
* @param out_array The output pointer of the arrow array.
* @param out_schema The output pointer of the arrow schema.
*/
int ReadNext(CPackedReader c_packed_reader, CArrowArray* out_array, CArrowSchema* out_schema);

/**
* @brief Close the packed reader and release the resources.
*
* @param c_packed_reader The packed reader to close.
*/
int CloseReader(CPackedReader c_packed_reader);

#ifdef __cplusplus
Expand Down
2 changes: 0 additions & 2 deletions cpp/src/packed/column_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
#include <arrow/table.h>
#include "common/status.h"

using namespace std;

namespace milvus_storage {

ColumnGroup::ColumnGroup(GroupId group_id, const std::vector<int>& origin_column_indices)
Expand Down
15 changes: 14 additions & 1 deletion cpp/src/packed/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace milvus_storage {
PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs,
const std::string& file_path,
const std::shared_ptr<arrow::Schema> schema,
const std::set<int>& needed_columns,
std::set<int>& needed_columns,
const int64_t buffer_size)
: file_path_(file_path),
schema_(schema),
Expand All @@ -40,6 +40,19 @@ PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs,
row_limit_(0),
absolute_row_position_(0),
read_count_(0) {
initialize(fs, file_path_, schema_, needed_columns, buffer_size);
}

void PackedRecordBatchReader::initialize(arrow::fs::FileSystem& fs,
const std::string& file_path,
const std::shared_ptr<arrow::Schema> schema,
std::set<int>& needed_columns,
const int64_t buffer_size) {
if (needed_columns.empty()) {
for (int i = 0; i < schema->num_fields(); i++) {
needed_columns.insert(i);
}
}
auto status = initializeColumnOffsets(fs, needed_columns, schema->num_fields());
if (!status.ok()) {
throw std::runtime_error(status.ToString());
Expand Down
39 changes: 8 additions & 31 deletions cpp/src/packed/reader_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,35 +23,11 @@
#include <arrow/status.h>
#include <memory>

int Open(const char* path, struct ArrowSchema* schema, const int64_t buffer_size, struct ArrowArrayStream* out) {
auto truePath = std::string(path);
auto factory = std::make_shared<milvus_storage::FileSystemFactory>();
auto conf = milvus_storage::StorageConfig();
conf.uri = "file:///tmp/";
auto r = factory->BuildFileSystem(conf, &truePath);
if (!r.ok()) {
LOG_STORAGE_ERROR_ << "Error building filesystem: " << path;
return -2;
}
auto trueFs = r.value();
auto trueSchema = arrow::ImportSchema(schema).ValueOrDie();
std::set<int> needed_columns;
for (int i = 0; i < trueSchema->num_fields(); i++) {
needed_columns.emplace(i);
}
auto reader =
std::make_shared<milvus_storage::PackedRecordBatchReader>(*trueFs, path, trueSchema, needed_columns, buffer_size);
auto status = ExportRecordBatchReader(reader, out);
if (!status.ok()) {
LOG_STORAGE_ERROR_ << "Error exporting record batch reader" << status.ToString();
return static_cast<int>(status.code());
}
return 0;
}

int NewPackedReader(const char* path,
struct ArrowSchema* schema,
const int64_t buffer_size,
int* needed_columns,
int num_needed_columns,
CPackedReader* c_packed_reader) {
try {
auto truePath = std::string(path);
Expand All @@ -60,12 +36,13 @@ int NewPackedReader(const char* path,
conf.uri = "file:///tmp/";
auto trueFs = factory->BuildFileSystem(conf, &truePath).value();
auto trueSchema = arrow::ImportSchema(schema).ValueOrDie();
std::set<int> needed_columns;
for (int i = 0; i < trueSchema->num_fields(); i++) {
needed_columns.emplace(i);
std::set<int> trueNeededColumns;
for (int i = 0; i < num_needed_columns; i++) {
trueNeededColumns.insert(needed_columns[i]);
}
auto reader = std::make_unique<milvus_storage::PackedRecordBatchReader>(*trueFs, path, trueSchema, needed_columns,
buffer_size);

auto reader = std::make_unique<milvus_storage::PackedRecordBatchReader>(*trueFs, path, trueSchema,
trueNeededColumns, buffer_size);
*c_packed_reader = reader.release();
return 0;
} catch (std::exception& e) {
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/packed/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,13 @@ Status PackedRecordBatchWriter::writeWithSplitIndex(const std::shared_ptr<arrow:
// Flush column groups until there's enough room for the new column groups
// to ensure that memory usage stays strictly below the limit
while (current_memory_usage_ + next_batch_size >= memory_limit_ && !max_heap_.empty()) {
LOG_STORAGE_DEBUG_ << "Current memory usage: " << current_memory_usage_
LOG_STORAGE_DEBUG_ << "Current memory usage: " << current_memory_usage_ / 1024 / 1024 << " MB, "
<< ", flushing column group: " << max_heap_.top().first;
auto max_group = max_heap_.top();
max_heap_.pop();
current_memory_usage_ -= max_group.second;

ColumnGroupWriter* writer = group_writers_[max_group.first].get();
max_heap_.pop();
RETURN_NOT_OK(writer->Flush());
}

Expand Down
4 changes: 0 additions & 4 deletions cpp/test/packed/packed_integration_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ TEST_F(PackedIntegrationTest, TestOneFile) {
}
EXPECT_TRUE(writer.Close().ok());

std::vector<std::string> paths = {file_path_ + "/0"};

std::set<int> needed_columns = {0, 1, 2};

PackedRecordBatchReader pr(*fs_, file_path_, schema_, needed_columns, reader_memory_);
Expand All @@ -47,8 +45,6 @@ TEST_F(PackedIntegrationTest, TestSplitColumnGroup) {
}
EXPECT_TRUE(writer.Close().ok());

std::vector<std::string> paths = {file_path_ + "/0", file_path_ + "/1"};

std::set<int> needed_columns = {0, 1, 2};

PackedRecordBatchReader pr(*fs_, file_path_, schema_, needed_columns, reader_memory_);
Expand Down
2 changes: 1 addition & 1 deletion go/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ test:
LD_LIBRARY_PATH=$(MILVUS_STORAGE_LD_DIR):$$LD_LIBRARY_PATH \
CGO_CFLAGS="$(CPPFLAGS)" \
CGO_LDFLAGS="$(LDFLAGS) -lmilvus-storage" \
go test -count=1 -timeout 30s ./...
go test -count=1 -timeout 30s ./... -gcflags "all=-N -l" -o gdb/

proto:
mkdir -p proto/manifest_proto
Expand Down
27 changes: 27 additions & 0 deletions go/packed/packed_option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2023 Zilliz
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package packed

type packedReaderOption struct {
needColumns []int
}

func (opt *packedReaderOption) WithNeededColumns(columns []int) {
opt.needColumns = columns
}

func NewPackedReaderOption() *packedReaderOption {
return &packedReaderOption{}
}
9 changes: 7 additions & 2 deletions go/packed/packed_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"github.com/apache/arrow/go/v12/arrow/cdata"
)

func newPackedReader(path string, schema *arrow.Schema, bufferSize int) (*PackedReader, error) {
func newPackedReader(path string, schema *arrow.Schema, bufferSize int, opt *packedReaderOption) (*PackedReader, error) {
var cas cdata.CArrowSchema
cdata.ExportArrowSchema(schema, &cas)
cSchema := (*C.struct_ArrowSchema)(unsafe.Pointer(&cas))
Expand All @@ -42,7 +42,12 @@ func newPackedReader(path string, schema *arrow.Schema, bufferSize int) (*Packed
cBufferSize := C.int64_t(bufferSize)

var cPackedReader C.CPackedReader
status := C.NewPackedReader(cPath, cSchema, cBufferSize, &cPackedReader)
cNeedColumns := (*C.int)(C.malloc(C.size_t(len(opt.needColumns)) * C.size_t(unsafe.Sizeof(C.int(0)))))
for i, col := range opt.needColumns {
(*[1<<31 - 1]C.int)(unsafe.Pointer(cNeedColumns))[i] = C.int(col)
}
cNumNeededColumns := C.int(len(opt.needColumns))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is too much for my small heart. Can we make it simpler? The needed columns can be just a bitset with several words.

status := C.NewPackedReader(cPath, cSchema, cBufferSize, cNeedColumns, cNumNeededColumns, &cPackedReader)
if status != 0 {
return nil, errors.New(fmt.Sprintf("failed to new packed reader: %s, status: %d", path, status))
}
Expand Down
Loading