Skip to content

Commit

Permalink
packed reader paritial columns read
Browse files Browse the repository at this point in the history
Signed-off-by: shaoting-huang <[email protected]>
  • Loading branch information
shaoting-huang committed Jan 10, 2025
1 parent f51fd09 commit 321ad6d
Show file tree
Hide file tree
Showing 14 changed files with 194 additions and 57 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ on:
jobs:
test:
name: Test
runs-on: ubuntu-latest
runs-on: ubuntu-22.04

steps:
- name: Checkout code
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/cpp-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ on:
- '!go/**'
jobs:
unittest:
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v3

Expand Down
16 changes: 9 additions & 7 deletions cpp/include/milvus-storage/common/macro.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,18 @@ namespace milvus_storage {
#undef RETURN_NOT_OK
#define RETURN_NOT_OK(status) \
do { \
if (!(status).ok()) { \
return status; \
auto _s = (status); \
if (!_s.ok()) { \
return _s; \
} \
} while (false)

#define RETURN_ARROW_NOT_OK(status) \
do { \
if (!(status).ok()) { \
return Status::ArrowError((status).ToString()); \
} \
#define RETURN_ARROW_NOT_OK(status) \
do { \
auto _s = (status); \
if (!_s.ok()) { \
return Status::ArrowError((_s).ToString()); \
} \
} while (false)

#define RETURN_ARROW_NOT_OK_WITH_PREFIX(msg, staus) \
Expand Down
8 changes: 7 additions & 1 deletion cpp/include/milvus-storage/packed/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class PackedRecordBatchReader : public arrow::RecordBatchReader {
PackedRecordBatchReader(arrow::fs::FileSystem& fs,
const std::string& file_path,
const std::shared_ptr<arrow::Schema> schema,
const std::set<int>& needed_columns,
std::set<int>& needed_columns,
const int64_t buffer_size = DEFAULT_READ_BUFFER_SIZE);

std::shared_ptr<arrow::Schema> schema() const override;
Expand All @@ -53,6 +53,12 @@ class PackedRecordBatchReader : public arrow::RecordBatchReader {
arrow::Status Close() override;

private:
void initialize(arrow::fs::FileSystem& fs,
const std::string& file_path,
const std::shared_ptr<arrow::Schema> schema,
std::set<int>& needed_columns,
const int64_t buffer_size);

Status initializeColumnOffsets(arrow::fs::FileSystem& fs, const std::set<int>& needed_columns, size_t num_fields);
// Advance buffer to fill the expected buffer size
arrow::Status advanceBuffer();
Expand Down
26 changes: 24 additions & 2 deletions cpp/include/milvus-storage/packed/reader_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,37 @@ typedef void* CPackedReader;
typedef void* CArrowArray;
typedef void* CArrowSchema;

int Open(const char* path, struct ArrowSchema* schema, const int64_t buffer_size, struct ArrowArrayStream* out);

/**
* @brief Open a packed reader to read needed columns in the specified path.
*
* @param path The root path of the packed files to read.
* @param schema The original schema of data.
* @param buffer_size The max buffer size of the packed reader.
* @param needed_columns The columns to read. If it is empty, all columns will be read.
* @param c_packed_reader The output pointer of the packed reader.
*/
int NewPackedReader(const char* path,
struct ArrowSchema* schema,
const int64_t buffer_size,
int* needed_columns,
int num_needed_columns,
CPackedReader* c_packed_reader);

/**
* @brief Read the next record batch from the packed reader.
* By default, the maximum return batch is 1024 rows.
*
* @param c_packed_reader The packed reader to read.
* @param out_array The output pointer of the arrow array.
* @param out_schema The output pointer of the arrow schema.
*/
int ReadNext(CPackedReader c_packed_reader, CArrowArray* out_array, CArrowSchema* out_schema);

/**
* @brief Close the packed reader and release the resources.
*
* @param c_packed_reader The packed reader to close.
*/
int CloseReader(CPackedReader c_packed_reader);

#ifdef __cplusplus
Expand Down
2 changes: 0 additions & 2 deletions cpp/src/packed/column_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
#include <arrow/table.h>
#include "common/status.h"

using namespace std;

namespace milvus_storage {

ColumnGroup::ColumnGroup(GroupId group_id, const std::vector<int>& origin_column_indices)
Expand Down
15 changes: 14 additions & 1 deletion cpp/src/packed/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace milvus_storage {
PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs,
const std::string& file_path,
const std::shared_ptr<arrow::Schema> schema,
const std::set<int>& needed_columns,
std::set<int>& needed_columns,
const int64_t buffer_size)
: file_path_(file_path),
schema_(schema),
Expand All @@ -40,6 +40,19 @@ PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs,
row_limit_(0),
absolute_row_position_(0),
read_count_(0) {
initialize(fs, file_path_, schema_, needed_columns, buffer_size);
}

void PackedRecordBatchReader::initialize(arrow::fs::FileSystem& fs,
const std::string& file_path,
const std::shared_ptr<arrow::Schema> schema,
std::set<int>& needed_columns,
const int64_t buffer_size) {
if (needed_columns.empty()) {
for (int i = 0; i < schema->num_fields(); i++) {
needed_columns.insert(i);
}
}
auto status = initializeColumnOffsets(fs, needed_columns, schema->num_fields());
if (!status.ok()) {
throw std::runtime_error(status.ToString());
Expand Down
39 changes: 8 additions & 31 deletions cpp/src/packed/reader_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,35 +23,11 @@
#include <arrow/status.h>
#include <memory>

int Open(const char* path, struct ArrowSchema* schema, const int64_t buffer_size, struct ArrowArrayStream* out) {
auto truePath = std::string(path);
auto factory = std::make_shared<milvus_storage::FileSystemFactory>();
auto conf = milvus_storage::StorageConfig();
conf.uri = "file:///tmp/";
auto r = factory->BuildFileSystem(conf, &truePath);
if (!r.ok()) {
LOG_STORAGE_ERROR_ << "Error building filesystem: " << path;
return -2;
}
auto trueFs = r.value();
auto trueSchema = arrow::ImportSchema(schema).ValueOrDie();
std::set<int> needed_columns;
for (int i = 0; i < trueSchema->num_fields(); i++) {
needed_columns.emplace(i);
}
auto reader =
std::make_shared<milvus_storage::PackedRecordBatchReader>(*trueFs, path, trueSchema, needed_columns, buffer_size);
auto status = ExportRecordBatchReader(reader, out);
if (!status.ok()) {
LOG_STORAGE_ERROR_ << "Error exporting record batch reader" << status.ToString();
return static_cast<int>(status.code());
}
return 0;
}

int NewPackedReader(const char* path,
struct ArrowSchema* schema,
const int64_t buffer_size,
int* needed_columns,
int num_needed_columns,
CPackedReader* c_packed_reader) {
try {
auto truePath = std::string(path);
Expand All @@ -60,12 +36,13 @@ int NewPackedReader(const char* path,
conf.uri = "file:///tmp/";
auto trueFs = factory->BuildFileSystem(conf, &truePath).value();
auto trueSchema = arrow::ImportSchema(schema).ValueOrDie();
std::set<int> needed_columns;
for (int i = 0; i < trueSchema->num_fields(); i++) {
needed_columns.emplace(i);
std::set<int> trueNeededColumns;
for (int i = 0; i < num_needed_columns; i++) {
trueNeededColumns.insert(needed_columns[i]);
}
auto reader = std::make_unique<milvus_storage::PackedRecordBatchReader>(*trueFs, path, trueSchema, needed_columns,
buffer_size);

auto reader = std::make_unique<milvus_storage::PackedRecordBatchReader>(*trueFs, path, trueSchema,
trueNeededColumns, buffer_size);
*c_packed_reader = reader.release();
return 0;
} catch (std::exception& e) {
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/packed/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,13 @@ Status PackedRecordBatchWriter::writeWithSplitIndex(const std::shared_ptr<arrow:
// Flush column groups until there's enough room for the new column groups
// to ensure that memory usage stays strictly below the limit
while (current_memory_usage_ + next_batch_size >= memory_limit_ && !max_heap_.empty()) {
LOG_STORAGE_DEBUG_ << "Current memory usage: " << current_memory_usage_
LOG_STORAGE_DEBUG_ << "Current memory usage: " << current_memory_usage_ / 1024 / 1024 << " MB, "
<< ", flushing column group: " << max_heap_.top().first;
auto max_group = max_heap_.top();
max_heap_.pop();
current_memory_usage_ -= max_group.second;

ColumnGroupWriter* writer = group_writers_[max_group.first].get();
max_heap_.pop();
RETURN_NOT_OK(writer->Flush());
}

Expand Down
4 changes: 0 additions & 4 deletions cpp/test/packed/packed_integration_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ TEST_F(PackedIntegrationTest, TestOneFile) {
}
EXPECT_TRUE(writer.Close().ok());

std::vector<std::string> paths = {file_path_ + "/0"};

std::set<int> needed_columns = {0, 1, 2};

PackedRecordBatchReader pr(*fs_, file_path_, schema_, needed_columns, reader_memory_);
Expand All @@ -47,8 +45,6 @@ TEST_F(PackedIntegrationTest, TestSplitColumnGroup) {
}
EXPECT_TRUE(writer.Close().ok());

std::vector<std::string> paths = {file_path_ + "/0", file_path_ + "/1"};

std::set<int> needed_columns = {0, 1, 2};

PackedRecordBatchReader pr(*fs_, file_path_, schema_, needed_columns, reader_memory_);
Expand Down
2 changes: 1 addition & 1 deletion go/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ test:
LD_LIBRARY_PATH=$(MILVUS_STORAGE_LD_DIR):$$LD_LIBRARY_PATH \
CGO_CFLAGS="$(CPPFLAGS)" \
CGO_LDFLAGS="$(LDFLAGS) -lmilvus-storage" \
go test -count=1 -timeout 30s ./...
go test -count=1 -timeout 30s ./... -gcflags "all=-N -l" -o gdb/

proto:
mkdir -p proto/manifest_proto
Expand Down
27 changes: 27 additions & 0 deletions go/packed/packed_option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2023 Zilliz
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package packed

type packedReaderOption struct {
needColumns []int
}

func (opt *packedReaderOption) WithNeededColumns(columns []int) {
opt.needColumns = columns
}

func NewPackedReaderOption() *packedReaderOption {
return &packedReaderOption{}
}
9 changes: 7 additions & 2 deletions go/packed/packed_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"github.com/apache/arrow/go/v12/arrow/cdata"
)

func newPackedReader(path string, schema *arrow.Schema, bufferSize int) (*PackedReader, error) {
func newPackedReader(path string, schema *arrow.Schema, bufferSize int, opt *packedReaderOption) (*PackedReader, error) {
var cas cdata.CArrowSchema
cdata.ExportArrowSchema(schema, &cas)
cSchema := (*C.struct_ArrowSchema)(unsafe.Pointer(&cas))
Expand All @@ -42,7 +42,12 @@ func newPackedReader(path string, schema *arrow.Schema, bufferSize int) (*Packed
cBufferSize := C.int64_t(bufferSize)

var cPackedReader C.CPackedReader
status := C.NewPackedReader(cPath, cSchema, cBufferSize, &cPackedReader)
cNeedColumns := (*C.int)(C.malloc(C.size_t(len(opt.needColumns)) * C.size_t(unsafe.Sizeof(C.int(0)))))
for i, col := range opt.needColumns {
(*[1<<31 - 1]C.int)(unsafe.Pointer(cNeedColumns))[i] = C.int(col)
}
cNumNeededColumns := C.int(len(opt.needColumns))
status := C.NewPackedReader(cPath, cSchema, cBufferSize, cNeedColumns, cNumNeededColumns, &cPackedReader)
if status != 0 {
return nil, errors.New(fmt.Sprintf("failed to new packed reader: %s, status: %d", path, status))
}
Expand Down
Loading

0 comments on commit 321ad6d

Please sign in to comment.