Skip to content

Commit f059d14

Browse files
committed
[Feature](catalog) Prune nested columns for hive/iceberg parquet and orc reader.
1 parent 5840ca2 commit f059d14

File tree

72 files changed

+8905
-126
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

72 files changed

+8905
-126
lines changed

.gitmodules

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
[submodule "contrib/apache-orc"]
2121
path = contrib/apache-orc
2222
url = https://github.com/apache/doris-thirdparty.git
23-
branch = orc
23+
branch = cq_nested_column_prune_external_table
2424
[submodule "doris-faiss"]
2525
path = contrib/faiss
2626
url = https://github.com/apache/doris-thirdparty.git

be/src/runtime/descriptors.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,11 +63,11 @@ SlotDescriptor::SlotDescriptor(const TSlotDescriptor& tdesc)
6363
_is_materialized(tdesc.isMaterialized && tdesc.need_materialize),
6464
_is_key(tdesc.is_key),
6565
_column_paths(tdesc.column_paths),
66-
_all_access_paths(tdesc.__isset.all_access_paths ? tdesc.all_access_paths
67-
: TColumnAccessPaths {}),
68-
_predicate_access_paths(tdesc.__isset.predicate_access_paths
69-
? tdesc.predicate_access_paths
70-
: TColumnAccessPaths {}),
66+
_all_column_access_paths(tdesc.__isset.all_access_paths ? tdesc.all_access_paths
67+
: TColumnAccessPaths {}),
68+
_predicate_column_access_paths(tdesc.__isset.predicate_access_paths
69+
? tdesc.predicate_access_paths
70+
: TColumnAccessPaths {}),
7171
_is_auto_increment(tdesc.__isset.is_auto_increment ? tdesc.is_auto_increment : false),
7272
_col_default_value(tdesc.__isset.col_default_value ? tdesc.col_default_value : "") {
7373
if (tdesc.__isset.virtual_column_expr) {

be/src/runtime/descriptors.h

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,10 @@ class SlotDescriptor {
8282
bool is_key() const { return _is_key; }
8383
const std::vector<std::string>& column_paths() const { return _column_paths; };
8484

85-
const TColumnAccessPaths& all_access_paths() const { return _all_access_paths; }
86-
const TColumnAccessPaths& predicate_access_paths() const { return _predicate_access_paths; }
85+
const TColumnAccessPaths& all_column_access_paths() const { return _all_column_access_paths; }
86+
const TColumnAccessPaths& predicate_column_access_paths() const {
87+
return _predicate_column_access_paths;
88+
}
8789

8890
bool is_auto_increment() const { return _is_auto_increment; }
8991

@@ -98,6 +100,10 @@ class SlotDescriptor {
98100
return virtual_column_expr;
99101
}
100102

103+
void set_is_predicate(bool is_predicate) { _is_predicate = is_predicate; }
104+
105+
bool is_predicate() const { return _is_predicate; }
106+
101107
private:
102108
friend class DescriptorTbl;
103109
friend class TupleDescriptor;
@@ -131,14 +137,16 @@ class SlotDescriptor {
131137
const bool _is_key;
132138
const std::vector<std::string> _column_paths;
133139

134-
const TColumnAccessPaths _all_access_paths;
135-
const TColumnAccessPaths _predicate_access_paths;
140+
const TColumnAccessPaths _all_column_access_paths;
141+
const TColumnAccessPaths _predicate_column_access_paths;
136142

137143
const bool _is_auto_increment;
138144
const std::string _col_default_value;
139145

140146
std::shared_ptr<doris::TExpr> virtual_column_expr = nullptr;
141147

148+
bool _is_predicate = false;
149+
142150
SlotDescriptor(const TSlotDescriptor& tdesc);
143151
SlotDescriptor(const PSlotDescriptor& pdesc);
144152
MOCK_DEFINE(SlotDescriptor();)

be/src/vec/exec/format/orc/vorc_reader.cpp

Lines changed: 288 additions & 23 deletions
Large diffs are not rendered by default.

be/src/vec/exec/format/orc/vorc_reader.h

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,12 @@ struct LazyReadContext {
112112
std::unordered_map<std::string, VExprContextSPtr> predicate_missing_columns;
113113
// lazy read missing columns or all missing columns
114114
std::unordered_map<std::string, VExprContextSPtr> missing_columns;
115+
116+
std::vector<std::string> partial_predicate_columns;
117+
118+
// Record the number of rows filled in filter phase for lazy materialization
119+
// This is used to check if a column was already processed in filter phase
120+
size_t filter_phase_rows = 0;
115121
};
116122

117123
class OrcReader : public GenericReader {
@@ -158,7 +164,9 @@ class OrcReader : public GenericReader {
158164
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
159165
const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts,
160166
std::shared_ptr<TableSchemaChangeHelper::Node> table_info_node_ptr =
161-
TableSchemaChangeHelper::ConstNode::get_instance());
167+
TableSchemaChangeHelper::ConstNode::get_instance(),
168+
const std::set<uint64_t>& column_ids = {},
169+
const std::set<uint64_t>& filter_column_ids = {});
162170

163171
Status set_fill_columns(
164172
const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
@@ -195,7 +203,8 @@ class OrcReader : public GenericReader {
195203
std::unordered_map<std::string, orc::StringDictionary*>& column_name_to_dict_map,
196204
bool* is_stripe_filtered);
197205

198-
static DataTypePtr convert_to_doris_type(const orc::Type* orc_type);
206+
DataTypePtr convert_to_doris_type(const orc::Type* orc_type);
207+
199208
static std::string get_field_name_lower_case(const orc::Type* orc_type, int pos);
200209

201210
void set_row_id_column_iterator(
@@ -630,7 +639,8 @@ class OrcReader : public GenericReader {
630639
size_t _batch_size;
631640
int64_t _range_start_offset;
632641
int64_t _range_size;
633-
const std::string& _ctz;
642+
// const std::string& _ctz;
643+
std::string _ctz;
634644

635645
int32_t _offset_days = 0;
636646
cctz::time_zone _time_zone;
@@ -653,6 +663,12 @@ class OrcReader : public GenericReader {
653663
// file column name to orc type
654664
std::unordered_map<std::string, const orc::Type*> _type_map;
655665

666+
// Optimized type conversion support
667+
// column ID to file original type mapping for handling partial column selection
668+
std::unordered_map<uint64_t, const orc::Type*> _column_id_to_file_type;
669+
// Keep reference to file root type for complete type information
670+
const orc::Type* _file_root_type = nullptr;
671+
656672
std::unique_ptr<ORCFileInputStream> _file_input_stream;
657673
Statistics _statistics;
658674
OrcProfile _orc_profile;
@@ -673,6 +689,8 @@ class OrcReader : public GenericReader {
673689
std::vector<DecimalScaleParams> _decimal_scale_params;
674690
size_t _decimal_scale_params_index;
675691

692+
std::set<uint64_t> _column_ids;
693+
std::set<uint64_t> _filter_column_ids;
676694
const std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range = nullptr;
677695
bool _is_acid = false;
678696
std::unique_ptr<IColumn::Filter> _filter;

be/src/vec/exec/format/parquet/schema_desc.cpp

Lines changed: 76 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ Status FieldDescriptor::parse_node_field(const std::vector<tparquet::SchemaEleme
176176
parse_physical_field(t_schema, false, child);
177177

178178
node_field->name = t_schema.name;
179+
node_field->lower_case_name = to_lower(t_schema.name);
179180
node_field->data_type = std::make_shared<DataTypeArray>(make_nullable(child->data_type));
180181
_next_schema_pos = curr_pos + 1;
181182
node_field->field_id = t_schema.__isset.field_id ? t_schema.field_id : -1;
@@ -193,8 +194,10 @@ Status FieldDescriptor::parse_node_field(const std::vector<tparquet::SchemaEleme
193194
void FieldDescriptor::parse_physical_field(const tparquet::SchemaElement& physical_schema,
194195
bool is_nullable, FieldSchema* physical_field) {
195196
physical_field->name = physical_schema.name;
197+
physical_field->lower_case_name = to_lower(physical_field->name);
196198
physical_field->parquet_schema = physical_schema;
197199
physical_field->physical_type = physical_schema.type;
200+
physical_field->column_id = UNASSIGNED_COLUMN_ID; // Initialize column_id
198201
_physical_fields.push_back(physical_field);
199202
physical_field->physical_column_index = cast_set<int>(_physical_fields.size() - 1);
200203
auto type = get_doris_type(physical_schema, is_nullable);
@@ -393,6 +396,8 @@ Status FieldDescriptor::parse_group_field(const std::vector<tparquet::SchemaElem
393396
RETURN_IF_ERROR(parse_struct_field(t_schemas, curr_pos, struct_field));
394397

395398
group_field->name = group_schema.name;
399+
group_field->lower_case_name = to_lower(group_field->name);
400+
group_field->column_id = UNASSIGNED_COLUMN_ID; // Initialize column_id
396401
group_field->data_type =
397402
std::make_shared<DataTypeArray>(make_nullable(struct_field->data_type));
398403
group_field->field_id = group_schema.__isset.field_id ? group_schema.field_id : -1;
@@ -461,6 +466,8 @@ Status FieldDescriptor::parse_list_field(const std::vector<tparquet::SchemaEleme
461466
}
462467

463468
list_field->name = first_level.name;
469+
list_field->lower_case_name = to_lower(first_level.name);
470+
list_field->column_id = UNASSIGNED_COLUMN_ID; // Initialize column_id
464471
list_field->data_type =
465472
std::make_shared<DataTypeArray>(make_nullable(list_field->children[0].data_type));
466473
if (is_optional) {
@@ -519,21 +526,24 @@ Status FieldDescriptor::parse_map_field(const std::vector<tparquet::SchemaElemen
519526
map_field->repetition_level++;
520527
map_field->definition_level++;
521528

522-
map_field->children.resize(1);
529+
// Directly create key and value children instead of intermediate key_value node
530+
map_field->children.resize(2);
523531
// map is a repeated node, we should set the `repeated_parent_def_level` of its children as `definition_level`
524532
set_child_node_level(map_field, map_field->definition_level);
525-
auto map_kv_field = &map_field->children[0];
526-
// produce MAP<STRUCT<KEY, VALUE>>
527-
RETURN_IF_ERROR(parse_struct_field(t_schemas, curr_pos + 1, map_kv_field));
533+
534+
auto key_field = &map_field->children[0];
535+
auto value_field = &map_field->children[1];
536+
537+
// Parse key and value fields directly from the key_value group's children
538+
_next_schema_pos = curr_pos + 2; // Skip key_value group, go directly to key
539+
RETURN_IF_ERROR(parse_node_field(t_schemas, _next_schema_pos, key_field));
540+
RETURN_IF_ERROR(parse_node_field(t_schemas, _next_schema_pos, value_field));
528541

529542
map_field->name = map_schema.name;
530-
map_field->data_type = std::make_shared<DataTypeMap>(
531-
make_nullable(assert_cast<const DataTypeStruct*>(
532-
remove_nullable(map_kv_field->data_type).get())
533-
->get_element(0)),
534-
make_nullable(assert_cast<const DataTypeStruct*>(
535-
remove_nullable(map_kv_field->data_type).get())
536-
->get_element(1)));
543+
map_field->lower_case_name = to_lower(map_field->name);
544+
map_field->column_id = UNASSIGNED_COLUMN_ID; // Initialize column_id
545+
map_field->data_type = std::make_shared<DataTypeMap>(make_nullable(key_field->data_type),
546+
make_nullable(value_field->data_type));
537547
if (is_optional) {
538548
map_field->data_type = make_nullable(map_field->data_type);
539549
}
@@ -558,6 +568,8 @@ Status FieldDescriptor::parse_struct_field(const std::vector<tparquet::SchemaEle
558568
RETURN_IF_ERROR(parse_node_field(t_schemas, _next_schema_pos, &struct_field->children[i]));
559569
}
560570
struct_field->name = struct_schema.name;
571+
struct_field->lower_case_name = to_lower(struct_field->name);
572+
struct_field->column_id = UNASSIGNED_COLUMN_ID; // Initialize column_id
561573

562574
struct_field->field_id = struct_schema.__isset.field_id ? struct_schema.field_id : -1;
563575
DataTypes res_data_types;
@@ -610,6 +622,59 @@ std::string FieldDescriptor::debug_string() const {
610622
ss << "]";
611623
return ss.str();
612624
}
625+
626+
void FieldDescriptor::assign_ids() {
627+
uint64_t next_id = 1;
628+
for (auto& field : _fields) {
629+
field.assign_ids(next_id);
630+
}
631+
}
632+
633+
const FieldSchema* FieldDescriptor::find_column_by_id(uint64_t column_id) const {
634+
for (const auto& field : _fields) {
635+
if (auto result = field.find_column_by_id(column_id)) {
636+
return result;
637+
}
638+
}
639+
return nullptr;
640+
}
641+
642+
void FieldSchema::assign_ids(uint64_t& next_id) {
643+
column_id = next_id++;
644+
645+
for (auto& child : children) {
646+
child.assign_ids(next_id);
647+
}
648+
649+
max_column_id = next_id - 1;
650+
}
651+
652+
const FieldSchema* FieldSchema::find_column_by_id(uint64_t target_id) const {
653+
if (column_id == target_id) {
654+
return this;
655+
}
656+
657+
for (const auto& child : children) {
658+
if (auto result = child.find_column_by_id(target_id)) {
659+
return result;
660+
}
661+
}
662+
663+
return nullptr;
664+
}
665+
666+
uint64_t FieldSchema::get_column_id() const {
667+
return column_id;
668+
}
669+
670+
void FieldSchema::set_column_id(uint64_t id) {
671+
column_id = id;
672+
}
673+
674+
uint64_t FieldSchema::get_max_column_id() const {
675+
return max_column_id;
676+
}
677+
613678
#include "common/compile_check_end.h"
614679

615680
} // namespace doris::vectorized

be/src/vec/exec/format/parquet/schema_desc.h

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,13 @@
3636

3737
namespace doris::vectorized {
3838
#include "common/compile_check_begin.h"
39+
40+
// Constant for unassigned column IDs
41+
constexpr uint64_t UNASSIGNED_COLUMN_ID = UINT64_MAX;
42+
3943
struct FieldSchema {
4044
std::string name;
45+
std::string lower_case_name; // for hms column name case insensitive match
4146
// the referenced parquet schema element
4247
tparquet::SchemaElement parquet_schema;
4348

@@ -56,12 +61,22 @@ struct FieldSchema {
5661
//For UInt8 -> Int16,UInt16 -> Int32,UInt32 -> Int64,UInt64 -> Int128.
5762
bool is_type_compatibility = false;
5863

59-
FieldSchema() : data_type(std::make_shared<DataTypeNothing>()) {}
64+
FieldSchema()
65+
: data_type(std::make_shared<DataTypeNothing>()), column_id(UNASSIGNED_COLUMN_ID) {}
6066
~FieldSchema() = default;
6167
FieldSchema(const FieldSchema& fieldSchema) = default;
6268
std::string debug_string() const;
6369

6470
int32_t field_id = -1;
71+
uint64_t column_id = UNASSIGNED_COLUMN_ID;
72+
uint64_t max_column_id = 0; // Maximum column ID for this field and its children
73+
74+
// Column ID assignment and lookup methods
75+
void assign_ids(uint64_t& next_id);
76+
const FieldSchema* find_column_by_id(uint64_t target_id) const;
77+
uint64_t get_column_id() const;
78+
void set_column_id(uint64_t id);
79+
uint64_t get_max_column_id() const;
6580
};
6681

6782
class FieldDescriptor {
@@ -133,6 +148,19 @@ class FieldDescriptor {
133148
int32_t size() const { return cast_set<int32_t>(_fields.size()); }
134149

135150
const std::vector<FieldSchema>& get_fields_schema() const { return _fields; }
151+
152+
/**
153+
* Assign stable column IDs to schema fields.
154+
*
155+
* This uses an ORC-compatible encoding so that the results of
156+
* create_column_ids() are consistent across formats. IDs start from 1
157+
* and are assigned in a pre-order traversal (parent before children).
158+
* After calling this, each FieldSchema will have column_id and
159+
* max_column_id populated.
160+
*/
161+
void assign_ids();
162+
163+
const FieldSchema* find_column_by_id(uint64_t column_id) const;
136164
};
137165
#include "common/compile_check_end.h"
138166

0 commit comments

Comments
 (0)