Skip to content

Commit 2b5a802

Browse files
sgilmore10kevingurneykou
authored
apacheGH-44922: [MATLAB] Add IPC RecordBatchStreamFileWriter MATLAB class (apache#44925)
### Rationale for this change To enable support for the IPC Streaming format in the MATLAB interface, we should add a `RecordBatchStreamWriter` class. ### What changes are included in this PR? Added `arrow.io.ipc.RecordBatchStreamWriter` class. **Example Usage:** ```matlab >> city = ["Boston" "Seattle" "Denver" "Juno" "Anchorage" "Chicago"]'; >> daylength = duration(["15:17:01" "15:59:16" "14:59:14" "19:21:23" "14:18:24" "15:13:39"])'; >> matlabTable = table(city, daylength, VariableNames=["City", "DayLength"]); >> recordBatch1 = arrow.recordBatch(matlabTable(1:4, :)) >> recordBatch2 = arrow.recordBatch(matlabTable(5:end, :)); >> writer = arrow.io.ipc.RecordBatchStreamWriter("daylight.arrow", recordBatch1.Schema); >> writer.writeRecordBatch(recordBatch1); >> writer.writeRecordBatch(recordBatch2); >> writer.close(); ``` ### Are these changes tested? Yes. I Parameterized the test cases in `test/arrow/io/ipc/tRecordBatchWriter.m` to test the behavior of both `arrow.io.ipc.RecordBatchFileWriter` AND `arrow.io.ipc.RecordBatchStreamWriter`. ### Are there any user-facing changes? Yes. Users can now use `arrow.io.ipc.RecordBatchStreamWriter` to serialize `RecordBatch`es/`Table`s to the Arrow IPC Streaming format. ### Future Directions 1. apache#44923 * GitHub Issue: apache#44922 Lead-authored-by: Sarah Gilmore <[email protected]> Co-authored-by: Sarah Gilmore <[email protected]> Co-authored-by: Kevin Gurney <[email protected]> Co-authored-by: Sutou Kouhei <[email protected]> Signed-off-by: Sarah Gilmore <[email protected]>
1 parent ded148c commit 2b5a802

12 files changed

+387
-145
lines changed

matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_file_writer.cc

Lines changed: 2 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,7 @@
1818
#include "arrow/matlab/io/ipc/proxy/record_batch_file_writer.h"
1919
#include "arrow/io/file.h"
2020
#include "arrow/matlab/error/error.h"
21-
#include "arrow/matlab/tabular/proxy/record_batch.h"
2221
#include "arrow/matlab/tabular/proxy/schema.h"
23-
#include "arrow/matlab/tabular/proxy/table.h"
2422
#include "arrow/util/utf8.h"
2523

2624
#include "libmexclass/proxy/ProxyManager.h"
@@ -29,11 +27,7 @@ namespace arrow::matlab::io::ipc::proxy {
2927

3028
RecordBatchFileWriter::RecordBatchFileWriter(
3129
const std::shared_ptr<arrow::ipc::RecordBatchWriter> writer)
32-
: writer{std::move(writer)} {
33-
REGISTER_METHOD(RecordBatchFileWriter, close);
34-
REGISTER_METHOD(RecordBatchFileWriter, writeRecordBatch);
35-
REGISTER_METHOD(RecordBatchFileWriter, writeTable);
36-
}
30+
: RecordBatchWriter(std::move(writer)) {}
3731

3832
libmexclass::proxy::MakeResult RecordBatchFileWriter::make(
3933
const libmexclass::proxy::FunctionArguments& constructor_arguments) {
@@ -65,43 +59,4 @@ libmexclass::proxy::MakeResult RecordBatchFileWriter::make(
6559
return std::make_shared<RecordBatchFileWriterProxy>(std::move(writer));
6660
}
6761

68-
void RecordBatchFileWriter::writeRecordBatch(
69-
libmexclass::proxy::method::Context& context) {
70-
namespace mda = ::matlab::data;
71-
using RecordBatchProxy = ::arrow::matlab::tabular::proxy::RecordBatch;
72-
73-
mda::StructArray opts = context.inputs[0];
74-
const mda::TypedArray<uint64_t> record_batch_proxy_id_mda =
75-
opts[0]["RecordBatchProxyID"];
76-
const uint64_t record_batch_proxy_id = record_batch_proxy_id_mda[0];
77-
78-
auto proxy = libmexclass::proxy::ProxyManager::getProxy(record_batch_proxy_id);
79-
auto record_batch_proxy = std::static_pointer_cast<RecordBatchProxy>(proxy);
80-
auto record_batch = record_batch_proxy->unwrap();
81-
82-
MATLAB_ERROR_IF_NOT_OK_WITH_CONTEXT(writer->WriteRecordBatch(*record_batch), context,
83-
error::IPC_RECORD_BATCH_WRITE_FAILED);
84-
}
85-
86-
void RecordBatchFileWriter::writeTable(libmexclass::proxy::method::Context& context) {
87-
namespace mda = ::matlab::data;
88-
using TableProxy = ::arrow::matlab::tabular::proxy::Table;
89-
90-
mda::StructArray opts = context.inputs[0];
91-
const mda::TypedArray<uint64_t> table_proxy_id_mda = opts[0]["TableProxyID"];
92-
const uint64_t table_proxy_id = table_proxy_id_mda[0];
93-
94-
auto proxy = libmexclass::proxy::ProxyManager::getProxy(table_proxy_id);
95-
auto table_proxy = std::static_pointer_cast<TableProxy>(proxy);
96-
auto table = table_proxy->unwrap();
97-
98-
MATLAB_ERROR_IF_NOT_OK_WITH_CONTEXT(writer->WriteTable(*table), context,
99-
error::IPC_RECORD_BATCH_WRITE_FAILED);
100-
}
101-
102-
void RecordBatchFileWriter::close(libmexclass::proxy::method::Context& context) {
103-
MATLAB_ERROR_IF_NOT_OK_WITH_CONTEXT(writer->Close(), context,
104-
error::IPC_RECORD_BATCH_WRITE_CLOSE_FAILED);
105-
}
106-
107-
} // namespace arrow::matlab::io::ipc::proxy
62+
} // namespace arrow::matlab::io::ipc::proxy

matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_file_writer.h

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,27 +16,20 @@
1616
// under the License.
1717

1818
#include "arrow/ipc/writer.h"
19+
#include "arrow/matlab/io/ipc/proxy/record_batch_writer.h"
20+
1921
#include "libmexclass/proxy/Proxy.h"
2022

2123
namespace arrow::matlab::io::ipc::proxy {
2224

23-
class RecordBatchFileWriter : public libmexclass::proxy::Proxy {
25+
class RecordBatchFileWriter : public RecordBatchWriter {
2426
public:
2527
RecordBatchFileWriter(std::shared_ptr<arrow::ipc::RecordBatchWriter> writer);
2628

27-
~RecordBatchFileWriter() = default;
29+
virtual ~RecordBatchFileWriter() = default;
2830

2931
static libmexclass::proxy::MakeResult make(
3032
const libmexclass::proxy::FunctionArguments& constructor_arguments);
31-
32-
protected:
33-
std::shared_ptr<arrow::ipc::RecordBatchWriter> writer;
34-
35-
void writeRecordBatch(libmexclass::proxy::method::Context& context);
36-
37-
void writeTable(libmexclass::proxy::method::Context& context);
38-
39-
void close(libmexclass::proxy::method::Context& context);
4033
};
4134

42-
} // namespace arrow::matlab::io::ipc::proxy
35+
} // namespace arrow::matlab::io::ipc::proxy
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "arrow/matlab/io/ipc/proxy/record_batch_stream_writer.h"
19+
#include "arrow/io/file.h"
20+
#include "arrow/ipc/writer.h"
21+
#include "arrow/matlab/error/error.h"
22+
#include "arrow/matlab/tabular/proxy/schema.h"
23+
#include "arrow/util/utf8.h"
24+
25+
#include "libmexclass/proxy/ProxyManager.h"
26+
27+
namespace arrow::matlab::io::ipc::proxy {
28+
29+
RecordBatchStreamWriter::RecordBatchStreamWriter(
30+
const std::shared_ptr<arrow::ipc::RecordBatchWriter> writer)
31+
: RecordBatchWriter(std::move(writer)) {}
32+
33+
libmexclass::proxy::MakeResult RecordBatchStreamWriter::make(
34+
const libmexclass::proxy::FunctionArguments& constructor_arguments) {
35+
namespace mda = ::matlab::data;
36+
using RecordBatchStreamWriterProxy =
37+
arrow::matlab::io::ipc::proxy::RecordBatchStreamWriter;
38+
using SchemaProxy = arrow::matlab::tabular::proxy::Schema;
39+
40+
const mda::StructArray opts = constructor_arguments[0];
41+
42+
const mda::StringArray filename_mda = opts[0]["Filename"];
43+
const auto filename_utf16 = std::u16string(filename_mda[0]);
44+
MATLAB_ASSIGN_OR_ERROR(const auto filename_utf8,
45+
arrow::util::UTF16StringToUTF8(filename_utf16),
46+
error::UNICODE_CONVERSION_ERROR_ID);
47+
48+
const mda::TypedArray<uint64_t> arrow_schema_proxy_id_mda = opts[0]["SchemaProxyID"];
49+
auto proxy = libmexclass::proxy::ProxyManager::getProxy(arrow_schema_proxy_id_mda[0]);
50+
auto arrow_schema_proxy = std::static_pointer_cast<SchemaProxy>(proxy);
51+
auto arrow_schema = arrow_schema_proxy->unwrap();
52+
53+
MATLAB_ASSIGN_OR_ERROR(auto output_stream,
54+
arrow::io::FileOutputStream::Open(filename_utf8),
55+
error::FAILED_TO_OPEN_FILE_FOR_WRITE);
56+
57+
MATLAB_ASSIGN_OR_ERROR(auto writer,
58+
arrow::ipc::MakeStreamWriter(output_stream, arrow_schema),
59+
"arrow:matlab:MakeFailed");
60+
61+
return std::make_shared<RecordBatchStreamWriterProxy>(std::move(writer));
62+
}
63+
64+
} // namespace arrow::matlab::io::ipc::proxy
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "arrow/ipc/writer.h"
19+
#include "arrow/matlab/io/ipc/proxy/record_batch_writer.h"
20+
21+
#include "libmexclass/proxy/Proxy.h"
22+
23+
namespace arrow::matlab::io::ipc::proxy {
24+
25+
class RecordBatchStreamWriter : public RecordBatchWriter {
26+
public:
27+
RecordBatchStreamWriter(std::shared_ptr<arrow::ipc::RecordBatchWriter> writer);
28+
29+
virtual ~RecordBatchStreamWriter() = default;
30+
31+
static libmexclass::proxy::MakeResult make(
32+
const libmexclass::proxy::FunctionArguments& constructor_arguments);
33+
};
34+
35+
} // namespace arrow::matlab::io::ipc::proxy
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "arrow/matlab/io/ipc/proxy/record_batch_writer.h"
19+
#include "arrow/io/file.h"
20+
#include "arrow/matlab/error/error.h"
21+
#include "arrow/matlab/tabular/proxy/record_batch.h"
22+
#include "arrow/matlab/tabular/proxy/schema.h"
23+
#include "arrow/matlab/tabular/proxy/table.h"
24+
25+
#include "libmexclass/proxy/ProxyManager.h"
26+
27+
namespace arrow::matlab::io::ipc::proxy {
28+
29+
RecordBatchWriter::RecordBatchWriter(
30+
const std::shared_ptr<arrow::ipc::RecordBatchWriter> writer)
31+
: writer{std::move(writer)} {
32+
REGISTER_METHOD(RecordBatchWriter, close);
33+
REGISTER_METHOD(RecordBatchWriter, writeRecordBatch);
34+
REGISTER_METHOD(RecordBatchWriter, writeTable);
35+
}
36+
37+
void RecordBatchWriter::writeRecordBatch(libmexclass::proxy::method::Context& context) {
38+
namespace mda = ::matlab::data;
39+
using RecordBatchProxy = ::arrow::matlab::tabular::proxy::RecordBatch;
40+
41+
mda::StructArray opts = context.inputs[0];
42+
const mda::TypedArray<uint64_t> record_batch_proxy_id_mda =
43+
opts[0]["RecordBatchProxyID"];
44+
const uint64_t record_batch_proxy_id = record_batch_proxy_id_mda[0];
45+
46+
auto proxy = libmexclass::proxy::ProxyManager::getProxy(record_batch_proxy_id);
47+
auto record_batch_proxy = std::static_pointer_cast<RecordBatchProxy>(proxy);
48+
auto record_batch = record_batch_proxy->unwrap();
49+
50+
MATLAB_ERROR_IF_NOT_OK_WITH_CONTEXT(writer->WriteRecordBatch(*record_batch), context,
51+
error::IPC_RECORD_BATCH_WRITE_FAILED);
52+
}
53+
54+
void RecordBatchWriter::writeTable(libmexclass::proxy::method::Context& context) {
55+
namespace mda = ::matlab::data;
56+
using TableProxy = ::arrow::matlab::tabular::proxy::Table;
57+
58+
mda::StructArray opts = context.inputs[0];
59+
const mda::TypedArray<uint64_t> table_proxy_id_mda = opts[0]["TableProxyID"];
60+
const uint64_t table_proxy_id = table_proxy_id_mda[0];
61+
62+
auto proxy = libmexclass::proxy::ProxyManager::getProxy(table_proxy_id);
63+
auto table_proxy = std::static_pointer_cast<TableProxy>(proxy);
64+
auto table = table_proxy->unwrap();
65+
66+
MATLAB_ERROR_IF_NOT_OK_WITH_CONTEXT(writer->WriteTable(*table), context,
67+
error::IPC_RECORD_BATCH_WRITE_FAILED);
68+
}
69+
70+
void RecordBatchWriter::close(libmexclass::proxy::method::Context& context) {
71+
MATLAB_ERROR_IF_NOT_OK_WITH_CONTEXT(writer->Close(), context,
72+
error::IPC_RECORD_BATCH_WRITE_CLOSE_FAILED);
73+
}
74+
75+
} // namespace arrow::matlab::io::ipc::proxy
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#pragma once
19+
20+
#include "arrow/ipc/writer.h"
21+
#include "libmexclass/proxy/Proxy.h"
22+
23+
namespace arrow::matlab::io::ipc::proxy {
24+
25+
class RecordBatchWriter : public libmexclass::proxy::Proxy {
26+
public:
27+
RecordBatchWriter(std::shared_ptr<arrow::ipc::RecordBatchWriter> writer);
28+
29+
virtual ~RecordBatchWriter() = default;
30+
31+
protected:
32+
std::shared_ptr<arrow::ipc::RecordBatchWriter> writer;
33+
34+
void writeRecordBatch(libmexclass::proxy::method::Context& context);
35+
36+
void writeTable(libmexclass::proxy::method::Context& context);
37+
38+
void close(libmexclass::proxy::method::Context& context);
39+
};
40+
41+
} // namespace arrow::matlab::io::ipc::proxy

matlab/src/cpp/arrow/matlab/proxy/factory.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
#include "arrow/matlab/io/feather/proxy/writer.h"
3737
#include "arrow/matlab/io/ipc/proxy/record_batch_file_reader.h"
3838
#include "arrow/matlab/io/ipc/proxy/record_batch_file_writer.h"
39+
#include "arrow/matlab/io/ipc/proxy/record_batch_stream_writer.h"
3940
#include "arrow/matlab/tabular/proxy/record_batch.h"
4041
#include "arrow/matlab/tabular/proxy/schema.h"
4142
#include "arrow/matlab/tabular/proxy/table.h"
@@ -111,6 +112,8 @@ libmexclass::proxy::MakeResult Factory::make_proxy(
111112
REGISTER_PROXY(arrow.c.proxy.RecordBatchImporter , arrow::matlab::c::proxy::RecordBatchImporter);
112113
REGISTER_PROXY(arrow.io.ipc.proxy.RecordBatchFileReader , arrow::matlab::io::ipc::proxy::RecordBatchFileReader);
113114
REGISTER_PROXY(arrow.io.ipc.proxy.RecordBatchFileWriter , arrow::matlab::io::ipc::proxy::RecordBatchFileWriter);
115+
REGISTER_PROXY(arrow.io.ipc.proxy.RecordBatchStreamWriter , arrow::matlab::io::ipc::proxy::RecordBatchStreamWriter);
116+
114117
// clang-format on
115118

116119
return libmexclass::error::Error{error::UNKNOWN_PROXY_ERROR_ID,

0 commit comments

Comments
 (0)