Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion be/src/http/action/debug_point_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ void BaseDebugPointAction::handle(HttpRequest* req) {
"Disable debug points. please check config::enable_debug_points");
}
std::string result = status.to_json();
LOG(INFO) << "handle request result:" << result;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why del the code

if (status.ok()) {
HttpChannel::send_reply(req, HttpStatus::OK, result);
} else {
Expand Down
4 changes: 3 additions & 1 deletion be/src/olap/iterators.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ struct IteratorRowRef;
namespace segment_v2 {
struct SubstreamIterator;
}

class StorageReadOptions {
public:
struct KeyRange {
Expand Down Expand Up @@ -143,6 +142,9 @@ class StorageReadOptions {
std::map<ColumnId, size_t> vir_cid_to_idx_in_block;
std::map<size_t, vectorized::DataTypePtr> vir_col_idx_to_type;

std::map<int32_t, TColumnAccessPaths> all_access_paths;
std::map<int32_t, TColumnAccessPaths> predicate_access_paths;

std::shared_ptr<vectorized::ScoreRuntime> score_runtime;
CollectionStatisticsPtr collection_statistics;

Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/rowset/beta_rowset_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
_read_options.remaining_conjunct_roots = _read_context->remaining_conjunct_roots;
_read_options.common_expr_ctxs_push_down = _read_context->common_expr_ctxs_push_down;
_read_options.virtual_column_exprs = _read_context->virtual_column_exprs;

_read_options.all_access_paths = _read_context->all_access_paths;
_read_options.predicate_access_paths = _read_context->predicate_access_paths;

_read_options.ann_topn_runtime = _read_context->ann_topn_runtime;
_read_options.vir_cid_to_idx_in_block = _read_context->vir_cid_to_idx_in_block;
_read_options.vir_col_idx_to_type = _read_context->vir_col_idx_to_type;
Expand Down
5 changes: 5 additions & 0 deletions be/src/olap/rowset/rowset_reader_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#ifndef DORIS_BE_SRC_OLAP_ROWSET_ROWSET_READER_CONTEXT_H
#define DORIS_BE_SRC_OLAP_ROWSET_ROWSET_READER_CONTEXT_H

#include <vector>

#include "io/io_common.h"
#include "olap/column_predicate.h"
#include "olap/olap_common.h"
Expand Down Expand Up @@ -90,6 +92,9 @@ struct RowsetReaderContext {
std::map<ColumnId, size_t> vir_cid_to_idx_in_block;
std::map<size_t, vectorized::DataTypePtr> vir_col_idx_to_type;

std::map<int32_t, TColumnAccessPaths> all_access_paths;
std::map<int32_t, TColumnAccessPaths> predicate_access_paths;

std::shared_ptr<vectorized::ScoreRuntime> score_runtime;
CollectionStatisticsPtr collection_statistics;
std::shared_ptr<segment_v2::AnnTopNRuntime> ann_topn_runtime;
Expand Down
361 changes: 351 additions & 10 deletions be/src/olap/rowset/segment_v2/column_reader.cpp

Large diffs are not rendered by default.

70 changes: 69 additions & 1 deletion be/src/olap/rowset/segment_v2/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#pragma once

#include <gen_cpp/Descriptors_types.h>
#include <gen_cpp/segment_v2.pb.h>
#include <sys/types.h>

Expand Down Expand Up @@ -69,6 +70,8 @@ class FileReader;
struct Slice;
struct StringRef;

using TColumnAccessPaths = std::vector<TColumnAccessPath>;

namespace segment_v2 {

class EncodingInfo;
Expand Down Expand Up @@ -271,7 +274,6 @@ class ColumnReader : public MetadataAdder<ColumnReader>,
Status _calculate_row_ranges(const std::vector<uint32_t>& page_indexes, RowRanges* row_ranges,
const ColumnIteratorOptions& iter_opts);

private:
int64_t _meta_length;
FieldType _meta_type;
FieldType _meta_children_column_type;
Expand Down Expand Up @@ -366,8 +368,54 @@ class ColumnIterator {

virtual bool is_all_dict_encoding() const { return false; }

virtual Status set_access_paths(const TColumnAccessPaths& all_access_paths,
const TColumnAccessPaths& predicate_access_paths) {
if (!predicate_access_paths.empty()) {
_reading_flag = ReadingFlag::READING_FOR_PREDICATE;
}
return Status::OK();
}

void set_column_name(const std::string& column_name) { _column_name = column_name; }

const std::string& column_name() const { return _column_name; }

// Since there may be multiple paths with conflicts or overlaps,
// we need to define several reading flags:
//
// NORMAL_READING — Default value, indicating that the column should be read.
// SKIP_READING — The column should not be read.
// NEED_TO_READ — The column must be read.
// READING_FOR_PREDICATE — The column is required for predicate evaluation.
//
// For example, suppose there are two paths:
// - Path 1 specifies that column A needs to be read, so it is marked as NEED_TO_READ.
// - Path 2 specifies that the column should not be read, but since it is already marked as NEED_TO_READ,
// it should not be changed to SKIP_READING.
enum class ReadingFlag : int {
NORMAL_READING,
SKIP_READING,
NEED_TO_READ,
READING_FOR_PREDICATE
};
void set_reading_flag(ReadingFlag flag) {
if (static_cast<int>(flag) > static_cast<int>(_reading_flag)) {
_reading_flag = flag;
}
}

ReadingFlag reading_flag() const { return _reading_flag; }

virtual void set_need_to_read() { set_reading_flag(ReadingFlag::NEED_TO_READ); }

virtual void remove_pruned_sub_iterators() {};

protected:
Result<TColumnAccessPaths> _get_sub_access_paths(const TColumnAccessPaths& access_paths);
ColumnIteratorOptions _opts;

ReadingFlag _reading_flag {ReadingFlag::NORMAL_READING};
std::string _column_name;
};

// This iterator is used to read column data from file
Expand Down Expand Up @@ -504,6 +552,13 @@ class MapFileColumnIterator final : public ColumnIterator {
return _offsets_iterator->get_current_ordinal();
}

Status set_access_paths(const TColumnAccessPaths& all_access_paths,
const TColumnAccessPaths& predicate_access_paths) override;

void set_need_to_read() override;

void remove_pruned_sub_iterators() override;

private:
std::shared_ptr<ColumnReader> _map_reader = nullptr;
ColumnIteratorUPtr _null_iterator;
Expand Down Expand Up @@ -533,6 +588,13 @@ class StructFileColumnIterator final : public ColumnIterator {
return _sub_column_iterators[0]->get_current_ordinal();
}

Status set_access_paths(const TColumnAccessPaths& all_access_paths,
const TColumnAccessPaths& predicate_access_paths) override;

void set_need_to_read() override;

void remove_pruned_sub_iterators() override;

private:
std::shared_ptr<ColumnReader> _struct_reader = nullptr;
ColumnIteratorUPtr _null_iterator;
Expand Down Expand Up @@ -561,6 +623,12 @@ class ArrayFileColumnIterator final : public ColumnIterator {
return _offset_iterator->get_current_ordinal();
}

Status set_access_paths(const TColumnAccessPaths& all_access_paths,
const TColumnAccessPaths& predicate_access_paths) override;
void set_need_to_read() override;

void remove_pruned_sub_iterators() override;

private:
std::shared_ptr<ColumnReader> _array_reader = nullptr;
std::unique_ptr<OffsetFileColumnIterator> _offset_iterator;
Expand Down
16 changes: 16 additions & 0 deletions be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

#include "olap/rowset/segment_v2/segment.h"

#include <gen_cpp/Descriptors_types.h>
#include <gen_cpp/PlanNodes_types.h>
#include <gen_cpp/olap_file.pb.h>
#include <gen_cpp/segment_v2.pb.h>

#include <cstring>
#include <memory>
#include <sstream>
#include <utility>

#include "cloud/config.h"
Expand Down Expand Up @@ -765,6 +767,20 @@ Status Segment::new_column_iterator(const TabletColumn& tablet_column,
sparse_column_cache_ptr));
} else {
RETURN_IF_ERROR(reader->new_iterator(iter, &tablet_column, opt));
if (opt->all_access_paths.contains(unique_id) ||
opt->predicate_access_paths.contains(unique_id)) {
const auto& all_access_paths = opt->all_access_paths.contains(unique_id)
? opt->all_access_paths.at(unique_id)
: TColumnAccessPaths {};
const auto& predicate_access_paths = opt->predicate_access_paths.contains(unique_id)
? opt->predicate_access_paths.at(unique_id)
: TColumnAccessPaths {};

// set column name to apply access paths.
(*iter)->set_column_name(tablet_column.name());
RETURN_IF_ERROR((*iter)->set_access_paths(all_access_paths, predicate_access_paths));
(*iter)->remove_pruned_sub_iterators();
}
}

if (config::enable_column_type_check && !tablet_column.has_path_info() &&
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/tablet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,9 @@ Status TabletReader::_capture_rs_readers(const ReaderParams& read_params) {
_reader_context.ann_topn_runtime = read_params.ann_topn_runtime;

_reader_context.condition_cache_digest = read_params.condition_cache_digest;
_reader_context.all_access_paths = read_params.all_access_paths;
_reader_context.predicate_access_paths = read_params.predicate_access_paths;

return Status::OK();
}

Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/tablet_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#pragma once

#include <gen_cpp/Descriptors_types.h>
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/PlanNodes_types.h>
#include <stddef.h>
Expand Down Expand Up @@ -146,6 +147,9 @@ class TabletReader {
// slots that cast may be eliminated in storage layer
std::map<std::string, vectorized::DataTypePtr> target_cast_type_for_variants;

std::map<int32_t, TColumnAccessPaths> all_access_paths;
std::map<int32_t, TColumnAccessPaths> predicate_access_paths;

std::vector<RowSetSplits> rs_splits;
// For unique key table with merge-on-write
DeleteBitmapPtr delete_bitmap = nullptr;
Expand Down
19 changes: 19 additions & 0 deletions be/src/olap/tablet_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1684,6 +1684,14 @@ vectorized::Block TabletSchema::create_block(
tablet_columns_need_convert_null->find(cid) !=
tablet_columns_need_convert_null->end());
auto data_type = vectorized::DataTypeFactory::instance().create_data_type(col, is_nullable);
if (col.type() == FieldType::OLAP_FIELD_TYPE_STRUCT ||
col.type() == FieldType::OLAP_FIELD_TYPE_MAP ||
col.type() == FieldType::OLAP_FIELD_TYPE_ARRAY) {
if (_pruned_columns_data_type.contains(col.unique_id())) {
data_type = _pruned_columns_data_type.at(col.unique_id());
}
}

if (_vir_col_idx_to_unique_id.contains(cid)) {
block.insert({vectorized::ColumnNothing::create(0), data_type, col.name()});
VLOG_DEBUG << fmt::format(
Expand All @@ -1703,7 +1711,13 @@ vectorized::Block TabletSchema::create_block(bool ignore_dropped_col) const {
if (ignore_dropped_col && is_dropped_column(*col)) {
continue;
}

auto data_type = vectorized::DataTypeFactory::instance().create_data_type(*col);
if (col->type() == FieldType::OLAP_FIELD_TYPE_STRUCT) {
if (_pruned_columns_data_type.contains(col->unique_id())) {
data_type = _pruned_columns_data_type.at(col->unique_id());
}
}
block.insert({data_type->create_column(), data_type, col->name()});
}
return block;
Expand All @@ -1714,6 +1728,11 @@ vectorized::Block TabletSchema::create_block_by_cids(const std::vector<uint32_t>
for (const auto& cid : cids) {
const auto& col = *_cols[cid];
auto data_type = vectorized::DataTypeFactory::instance().create_data_type(col);
if (col.type() == FieldType::OLAP_FIELD_TYPE_STRUCT) {
if (_pruned_columns_data_type.contains(col.unique_id())) {
data_type = _pruned_columns_data_type.at(col.unique_id());
}
}
block.insert({data_type->create_column(), data_type, col.name()});
}
return block;
Expand Down
10 changes: 10 additions & 0 deletions be/src/olap/tablet_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "runtime/define_primitive_type.h"
#include "runtime/descriptors.h"
#include "runtime/memory/lru_cache_policy.h"
#include "udf/udf.h"
#include "util/debug_points.h"
#include "util/string_parser.hpp"
#include "util/string_util.h"
Expand Down Expand Up @@ -682,6 +683,14 @@ class TabletSchema : public MetadataAdder<TabletSchema> {
return 0;
}

void add_pruned_columns_data_type(int32_t col_unique_id, vectorized::DataTypePtr data_type) {
_pruned_columns_data_type[col_unique_id] = std::move(data_type);
}

void clear_pruned_columns_data_type() { _pruned_columns_data_type.clear(); }

bool has_pruned_columns() const { return !_pruned_columns_data_type.empty(); }

private:
friend bool operator==(const TabletSchema& a, const TabletSchema& b);
friend bool operator!=(const TabletSchema& a, const TabletSchema& b);
Expand Down Expand Up @@ -752,6 +761,7 @@ class TabletSchema : public MetadataAdder<TabletSchema> {
bool _enable_variant_flatten_nested = false;

std::map<size_t, int32_t> _vir_col_idx_to_unique_id;
std::map<int32_t, vectorized::DataTypePtr> _pruned_columns_data_type;

// value: extracted path set and sparse path set
std::unordered_map<int32_t, PathsSetInfo> _path_set_info_map;
Expand Down
58 changes: 57 additions & 1 deletion be/src/runtime/descriptors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ SlotDescriptor::SlotDescriptor(const TSlotDescriptor& tdesc)
_is_materialized(tdesc.isMaterialized && tdesc.need_materialize),
_is_key(tdesc.is_key),
_column_paths(tdesc.column_paths),
_all_access_paths(tdesc.__isset.all_access_paths ? tdesc.all_access_paths
: TColumnAccessPaths {}),
_predicate_access_paths(tdesc.__isset.predicate_access_paths
? tdesc.predicate_access_paths
: TColumnAccessPaths {}),
_is_auto_increment(tdesc.__isset.is_auto_increment ? tdesc.is_auto_increment : false),
_col_default_value(tdesc.__isset.col_default_value ? tdesc.col_default_value : "") {
if (tdesc.__isset.virtual_column_expr) {
Expand Down Expand Up @@ -98,7 +103,31 @@ SlotDescriptor::SlotDescriptor(const PSlotDescriptor& pdesc)
_is_materialized(pdesc.is_materialized()),
_is_key(pdesc.is_key()),
_column_paths(pdesc.column_paths().begin(), pdesc.column_paths().end()),
_is_auto_increment(pdesc.is_auto_increment()) {}
_is_auto_increment(pdesc.is_auto_increment()) {
auto convert_to_thrift_column_access_path = [](const PColumnAccessPath& pb_path) {
TColumnAccessPath thrift_path;
thrift_path.type = (TAccessPathType::type)pb_path.type();
if (pb_path.has_data_access_path()) {
thrift_path.__isset.data_access_path = true;
for (int i = 0; i < pb_path.data_access_path().path_size(); ++i) {
thrift_path.data_access_path.path.push_back(pb_path.data_access_path().path(i));
}
}
if (pb_path.has_meta_access_path()) {
thrift_path.__isset.meta_access_path = true;
for (int i = 0; i < pb_path.meta_access_path().path_size(); ++i) {
thrift_path.meta_access_path.path.push_back(pb_path.meta_access_path().path(i));
}
}
return thrift_path;
};
for (const auto& pb_path : pdesc.all_access_paths()) {
_all_access_paths.push_back(convert_to_thrift_column_access_path(pb_path));
}
for (const auto& pb_path : pdesc.predicate_access_paths()) {
_predicate_access_paths.push_back(convert_to_thrift_column_access_path(pb_path));
}
}

#ifdef BE_TEST
SlotDescriptor::SlotDescriptor()
Expand Down Expand Up @@ -132,6 +161,33 @@ void SlotDescriptor::to_protobuf(PSlotDescriptor* pslot) const {
for (const std::string& path : _column_paths) {
pslot->add_column_paths(path);
}
auto convert_to_protobuf_column_access_path = [](const TColumnAccessPath& thrift_path,
doris::PColumnAccessPath* pb_path) {
pb_path->Clear();
pb_path->set_type((PAccessPathType)thrift_path.type); // 使用 reinterpret_cast 进行类型转换
if (thrift_path.__isset.data_access_path) {
auto* pb_data = pb_path->mutable_data_access_path();
pb_data->Clear();
for (const auto& s : thrift_path.data_access_path.path) {
pb_data->add_path(s);
}
}
if (thrift_path.__isset.meta_access_path) {
auto* pb_meta = pb_path->mutable_meta_access_path();
pb_meta->Clear();
for (const auto& s : thrift_path.meta_access_path.path) {
pb_meta->add_path(s);
}
}
};
for (const auto& path : _all_access_paths) {
auto* pb_path = pslot->add_all_access_paths();
convert_to_protobuf_column_access_path(path, pb_path);
}
for (const auto& path : _predicate_access_paths) {
auto* pb_path = pslot->add_predicate_access_paths();
convert_to_protobuf_column_access_path(path, pb_path);
}
}

vectorized::DataTypePtr SlotDescriptor::get_data_type_ptr() const {
Expand Down
Loading
Loading