Skip to content

Commit 3ce4014

Browse files
authored
ARROW-17688: [C++][Java][FlightRPC] Substrait, transaction, cancellation for Flight SQL (apache#13492)
"[VOTE] Substrait for Flight SQL" https://lists.apache.org/thread/3k3np6314dwb0n7n1hrfwony5fcy7kzl Authored-by: David Li <[email protected]> Signed-off-by: David Li <[email protected]>
1 parent d571e93 commit 3ce4014

40 files changed

+4445
-380
lines changed

cpp/src/arrow/flight/integration_tests/flight_integration_test.cc

+4
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ TEST(FlightIntegration, Middleware) { ASSERT_OK(RunScenario("middleware")); }
5555

5656
TEST(FlightIntegration, FlightSql) { ASSERT_OK(RunScenario("flight_sql")); }
5757

58+
TEST(FlightIntegration, FlightSqlExtension) {
59+
ASSERT_OK(RunScenario("flight_sql:extension"));
60+
}
61+
5862
} // namespace integration_tests
5963
} // namespace flight
6064
} // namespace arrow

cpp/src/arrow/flight/integration_tests/test_integration.cc

+597-59
Large diffs are not rendered by default.

cpp/src/arrow/flight/server.cc

+3-1
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,9 @@ RecordBatchStream::RecordBatchStream(const std::shared_ptr<RecordBatchReader>& r
353353
impl_.reset(new RecordBatchStreamImpl(reader, options));
354354
}
355355

356-
RecordBatchStream::~RecordBatchStream() {}
356+
RecordBatchStream::~RecordBatchStream() {
357+
ARROW_WARN_NOT_OK(impl_->Close(), "Failed to close FlightDataStream");
358+
}
357359

358360
Status RecordBatchStream::Close() { return impl_->Close(); }
359361

cpp/src/arrow/flight/sql/CMakeLists.txt

+26-1
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,11 @@ if(ARROW_BUILD_TESTS OR ARROW_BUILD_EXAMPLES)
8989
example/sqlite_statement_batch_reader.cc
9090
example/sqlite_server.cc
9191
example/sqlite_tables_schema_batch_reader.cc)
92+
9293
set(ARROW_FLIGHT_SQL_TEST_SRCS server_test.cc)
94+
set(ARROW_FLIGHT_SQL_TEST_LIBS ${SQLite3_LIBRARIES})
95+
set(ARROW_FLIGHT_SQL_ACERO_SRCS example/acero_server.cc)
96+
9397
if(NOT MSVC AND NOT MINGW)
9498
# ARROW-16902: getting Protobuf generated code to have all the
9599
# proper dllexport/dllimport declarations is difficult, since
@@ -98,13 +102,34 @@ if(ARROW_BUILD_TESTS OR ARROW_BUILD_EXAMPLES)
98102
list(APPEND ARROW_FLIGHT_SQL_TEST_SRCS client_test.cc)
99103
endif()
100104

105+
if(ARROW_COMPUTE
106+
AND ARROW_PARQUET
107+
AND ARROW_SUBSTRAIT)
108+
list(APPEND ARROW_FLIGHT_SQL_TEST_SRCS ${ARROW_FLIGHT_SQL_ACERO_SRCS} acero_test.cc)
109+
if(ARROW_FLIGHT_TEST_LINKAGE STREQUAL "static")
110+
list(APPEND ARROW_FLIGHT_SQL_TEST_LIBS arrow_substrait_static)
111+
else()
112+
list(APPEND ARROW_FLIGHT_SQL_TEST_LIBS arrow_substrait_shared)
113+
endif()
114+
115+
if(ARROW_BUILD_EXAMPLES)
116+
add_executable(acero-flight-sql-server ${ARROW_FLIGHT_SQL_ACERO_SRCS}
117+
example/acero_main.cc)
118+
target_link_libraries(acero-flight-sql-server
119+
PRIVATE ${ARROW_FLIGHT_SQL_TEST_LINK_LIBS}
120+
${ARROW_FLIGHT_SQL_TEST_LIBS} ${GFLAGS_LIBRARIES})
121+
endif()
122+
endif()
123+
101124
add_arrow_test(flight_sql_test
102125
SOURCES
103126
${ARROW_FLIGHT_SQL_TEST_SRCS}
104127
${ARROW_FLIGHT_SQL_TEST_SERVER_SRCS}
105128
STATIC_LINK_LIBS
106129
${ARROW_FLIGHT_SQL_TEST_LINK_LIBS}
107-
${SQLite3_LIBRARIES}
130+
${ARROW_FLIGHT_SQL_TEST_LIBS}
131+
EXTRA_INCLUDES
132+
"${CMAKE_CURRENT_BINARY_DIR}/../"
108133
LABELS
109134
"arrow_flight_sql")
110135

+239
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
/// Integration test using the Acero backend
19+
20+
#include <memory>
21+
#include <sstream>
22+
23+
#include <gmock/gmock.h>
24+
#include <gtest/gtest.h>
25+
26+
#include "arrow/array.h"
27+
#include "arrow/engine/substrait/util.h"
28+
#include "arrow/flight/server.h"
29+
#include "arrow/flight/sql/client.h"
30+
#include "arrow/flight/sql/example/acero_server.h"
31+
#include "arrow/flight/sql/types.h"
32+
#include "arrow/flight/types.h"
33+
#include "arrow/stl_iterator.h"
34+
#include "arrow/table.h"
35+
#include "arrow/testing/gtest_util.h"
36+
#include "arrow/type_fwd.h"
37+
#include "arrow/util/checked_cast.h"
38+
39+
namespace arrow {
40+
namespace flight {
41+
namespace sql {
42+
43+
using arrow::internal::checked_cast;
44+
45+
class TestAcero : public ::testing::Test {
46+
public:
47+
void SetUp() override {
48+
ASSERT_OK_AND_ASSIGN(auto location, Location::ForGrpcTcp("localhost", 0));
49+
flight::FlightServerOptions options(location);
50+
51+
ASSERT_OK_AND_ASSIGN(server_, acero_example::MakeAceroServer());
52+
ASSERT_OK(server_->Init(options));
53+
54+
ASSERT_OK_AND_ASSIGN(auto client, FlightClient::Connect(server_->location()));
55+
client_.reset(new FlightSqlClient(std::move(client)));
56+
}
57+
58+
void TearDown() override {
59+
ASSERT_OK(client_->Close());
60+
ASSERT_OK(server_->Shutdown());
61+
}
62+
63+
protected:
64+
std::unique_ptr<FlightSqlClient> client_;
65+
std::unique_ptr<FlightServerBase> server_;
66+
};
67+
68+
arrow::Result<std::shared_ptr<Buffer>> MakeSubstraitPlan() {
69+
ARROW_ASSIGN_OR_RAISE(std::string dir_string,
70+
arrow::internal::GetEnvVar("PARQUET_TEST_DATA"));
71+
ARROW_ASSIGN_OR_RAISE(auto dir,
72+
arrow::internal::PlatformFilename::FromString(dir_string));
73+
ARROW_ASSIGN_OR_RAISE(auto filename, dir.Join("binary.parquet"));
74+
std::string uri = std::string("file://") + filename.ToString();
75+
76+
// TODO(ARROW-17229): we should use a RootRel here
77+
std::string json_plan = R"({
78+
"relations": [
79+
{
80+
"rel": {
81+
"read": {
82+
"base_schema": {
83+
"struct": {
84+
"types": [
85+
{"binary": {}}
86+
]
87+
},
88+
"names": [
89+
"foo"
90+
]
91+
},
92+
"local_files": {
93+
"items": [
94+
{
95+
"uri_file": "URI_PLACEHOLDER",
96+
"parquet": {}
97+
}
98+
]
99+
}
100+
}
101+
}
102+
}
103+
]
104+
})";
105+
std::string uri_placeholder = "URI_PLACEHOLDER";
106+
json_plan.replace(json_plan.find(uri_placeholder), uri_placeholder.size(), uri);
107+
return engine::SerializeJsonPlan(json_plan);
108+
}
109+
110+
TEST_F(TestAcero, GetSqlInfo) {
111+
FlightCallOptions call_options;
112+
std::vector<int> sql_info_codes = {
113+
SqlInfoOptions::SqlInfo::FLIGHT_SQL_SERVER_SUBSTRAIT,
114+
SqlInfoOptions::SqlInfo::FLIGHT_SQL_SERVER_TRANSACTION,
115+
};
116+
ASSERT_OK_AND_ASSIGN(auto flight_info,
117+
client_->GetSqlInfo(call_options, sql_info_codes));
118+
ASSERT_OK_AND_ASSIGN(auto reader,
119+
client_->DoGet(call_options, flight_info->endpoints()[0].ticket));
120+
ASSERT_OK_AND_ASSIGN(auto results, reader->ToTable());
121+
ASSERT_OK_AND_ASSIGN(auto batch, results->CombineChunksToBatch());
122+
ASSERT_EQ(2, results->num_rows());
123+
std::vector<std::pair<uint32_t, SqlInfoResult>> info;
124+
const auto& ids = checked_cast<const UInt32Array&>(*batch->column(0));
125+
const auto& values = checked_cast<const DenseUnionArray&>(*batch->column(1));
126+
for (int64_t i = 0; i < batch->num_rows(); i++) {
127+
ASSERT_OK_AND_ASSIGN(auto scalar, values.GetScalar(i));
128+
if (ids.Value(i) == SqlInfoOptions::SqlInfo::FLIGHT_SQL_SERVER_SUBSTRAIT) {
129+
ASSERT_EQ(*checked_cast<const DenseUnionScalar&>(*scalar).value,
130+
BooleanScalar(true));
131+
} else if (ids.Value(i) == SqlInfoOptions::SqlInfo::FLIGHT_SQL_SERVER_TRANSACTION) {
132+
ASSERT_EQ(
133+
*checked_cast<const DenseUnionScalar&>(*scalar).value,
134+
Int32Scalar(
135+
SqlInfoOptions::SqlSupportedTransaction::SQL_SUPPORTED_TRANSACTION_NONE));
136+
} else {
137+
FAIL() << "Unexpected info value: " << ids.Value(i);
138+
}
139+
}
140+
}
141+
142+
TEST_F(TestAcero, Scan) {
143+
#ifdef _WIN32
144+
GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows";
145+
#endif
146+
147+
FlightCallOptions call_options;
148+
ASSERT_OK_AND_ASSIGN(auto serialized_plan, MakeSubstraitPlan());
149+
150+
SubstraitPlan plan{serialized_plan->ToString(), /*version=*/"0.6.0"};
151+
ASSERT_OK_AND_ASSIGN(std::unique_ptr<FlightInfo> info,
152+
client_->ExecuteSubstrait(call_options, plan));
153+
ipc::DictionaryMemo memo;
154+
ASSERT_OK_AND_ASSIGN(auto schema, info->GetSchema(&memo));
155+
// TODO(ARROW-17229): the scanner "special" fields are still included, strip them
156+
// manually
157+
auto fixed_schema = arrow::schema({schema->fields()[0]});
158+
ASSERT_NO_FATAL_FAILURE(
159+
AssertSchemaEqual(fixed_schema, arrow::schema({field("foo", binary())})));
160+
161+
ASSERT_EQ(1, info->endpoints().size());
162+
ASSERT_EQ(0, info->endpoints()[0].locations.size());
163+
ASSERT_OK_AND_ASSIGN(auto reader,
164+
client_->DoGet(call_options, info->endpoints()[0].ticket));
165+
ASSERT_OK_AND_ASSIGN(auto reader_schema, reader->GetSchema());
166+
ASSERT_NO_FATAL_FAILURE(AssertSchemaEqual(schema, reader_schema));
167+
ASSERT_OK_AND_ASSIGN(auto table, reader->ToTable());
168+
ASSERT_GT(table->num_rows(), 0);
169+
}
170+
171+
TEST_F(TestAcero, Update) {
172+
#ifdef _WIN32
173+
GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows";
174+
#endif
175+
176+
FlightCallOptions call_options;
177+
ASSERT_OK_AND_ASSIGN(auto serialized_plan, MakeSubstraitPlan());
178+
SubstraitPlan plan{serialized_plan->ToString(), /*version=*/"0.6.0"};
179+
EXPECT_RAISES_WITH_MESSAGE_THAT(NotImplemented,
180+
::testing::HasSubstr("Updates are unsupported"),
181+
client_->ExecuteSubstraitUpdate(call_options, plan));
182+
}
183+
184+
TEST_F(TestAcero, Prepare) {
185+
#ifdef _WIN32
186+
GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows";
187+
#endif
188+
189+
FlightCallOptions call_options;
190+
ASSERT_OK_AND_ASSIGN(auto serialized_plan, MakeSubstraitPlan());
191+
SubstraitPlan plan{serialized_plan->ToString(), /*version=*/"0.6.0"};
192+
ASSERT_OK_AND_ASSIGN(auto prepared_statement,
193+
client_->PrepareSubstrait(call_options, plan));
194+
ASSERT_NE(prepared_statement->dataset_schema(), nullptr);
195+
ASSERT_EQ(prepared_statement->parameter_schema(), nullptr);
196+
197+
auto fixed_schema = arrow::schema({prepared_statement->dataset_schema()->fields()[0]});
198+
ASSERT_NO_FATAL_FAILURE(
199+
AssertSchemaEqual(fixed_schema, arrow::schema({field("foo", binary())})));
200+
201+
EXPECT_RAISES_WITH_MESSAGE_THAT(NotImplemented,
202+
::testing::HasSubstr("Updates are unsupported"),
203+
prepared_statement->ExecuteUpdate());
204+
205+
ASSERT_OK_AND_ASSIGN(std::unique_ptr<FlightInfo> info, prepared_statement->Execute());
206+
ASSERT_EQ(1, info->endpoints().size());
207+
ASSERT_EQ(0, info->endpoints()[0].locations.size());
208+
ASSERT_OK_AND_ASSIGN(auto reader,
209+
client_->DoGet(call_options, info->endpoints()[0].ticket));
210+
ASSERT_OK_AND_ASSIGN(auto reader_schema, reader->GetSchema());
211+
ASSERT_NO_FATAL_FAILURE(
212+
AssertSchemaEqual(prepared_statement->dataset_schema(), reader_schema));
213+
ASSERT_OK_AND_ASSIGN(auto table, reader->ToTable());
214+
ASSERT_GT(table->num_rows(), 0);
215+
216+
ASSERT_OK(prepared_statement->Close());
217+
}
218+
219+
TEST_F(TestAcero, Transactions) {
220+
#ifdef _WIN32
221+
GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows";
222+
#endif
223+
224+
FlightCallOptions call_options;
225+
ASSERT_OK_AND_ASSIGN(auto serialized_plan, MakeSubstraitPlan());
226+
Transaction handle("fake-id");
227+
SubstraitPlan plan{serialized_plan->ToString(), /*version=*/"0.6.0"};
228+
229+
EXPECT_RAISES_WITH_MESSAGE_THAT(NotImplemented,
230+
::testing::HasSubstr("Transactions are unsupported"),
231+
client_->ExecuteSubstrait(call_options, plan, handle));
232+
EXPECT_RAISES_WITH_MESSAGE_THAT(NotImplemented,
233+
::testing::HasSubstr("Transactions are unsupported"),
234+
client_->PrepareSubstrait(call_options, plan, handle));
235+
}
236+
237+
} // namespace sql
238+
} // namespace flight
239+
} // namespace arrow

0 commit comments

Comments
 (0)