Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,7 @@ Status BlockChanger::change_block(vectorized::Block* ref_block,
RETURN_IF_ERROR(ctx->prepare(state.get(), row_desc));
RETURN_IF_ERROR(ctx->open(state.get()));

RETURN_IF_ERROR(
vectorized::VExprContext::filter_block(ctx.get(), ref_block, ref_block->columns()));
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(ctx.get(), ref_block));
}

const int row_num = cast_set<int>(ref_block->rows());
Expand Down
64 changes: 35 additions & 29 deletions be/src/vec/exprs/vexpr_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "runtime/thread_context.h"
#include "udf/udf.h"
#include "util/simd/bits.h"
#include "vec/columns/column.h"
#include "vec/columns/column_const.h"
#include "vec/core/column_numbers.h"
#include "vec/core/column_with_type_and_name.h"
Expand Down Expand Up @@ -73,12 +74,20 @@ Status VExprContext::execute(vectorized::Block* block, int* result_column_id) {
return st;
}

Status VExprContext::execute(Block* block, ColumnPtr& result_column) {
Status VExprContext::execute(const Block* block, ColumnPtr& result_column) {
Status st;
RETURN_IF_CATCH_EXCEPTION({ st = _root->execute_column(this, block, result_column); });
return st;
}

DataTypePtr VExprContext::execute_type(const Block* block) {
return _root->execute_type(block);
}

[[nodiscard]] const std::string& VExprContext::expr_name() const {
return _root->expr_name();
}

bool VExprContext::is_blockable() const {
return _root->is_blockable();
}
Expand Down Expand Up @@ -159,15 +168,16 @@ bool VExprContext::all_expr_inverted_index_evaluated() {
return _index_context->has_index_result_for_expr(_root.get());
}

Status VExprContext::filter_block(VExprContext* vexpr_ctx, Block* block, size_t column_to_keep) {
Status VExprContext::filter_block(VExprContext* vexpr_ctx, Block* block) {
if (vexpr_ctx == nullptr || block->rows() == 0) {
return Status::OK();
}
int result_column_id = -1;
size_t origin_size = block->allocated_bytes();
RETURN_IF_ERROR(vexpr_ctx->execute(block, &result_column_id));
vexpr_ctx->_memory_usage = (block->allocated_bytes() - origin_size);
return Block::filter_block(block, result_column_id, column_to_keep);
ColumnPtr filter_column;
RETURN_IF_ERROR(vexpr_ctx->execute(block, filter_column));
size_t filter_column_id = block->columns();
block->insert({filter_column, vexpr_ctx->execute_type(block), "filter_column"});
vexpr_ctx->_memory_usage = filter_column->allocated_bytes();
return Block::filter_block(block, filter_column_id, filter_column_id);
}

Status VExprContext::filter_block(const VExprContextSPtrs& expr_contexts, Block* block,
Expand All @@ -192,7 +202,7 @@ Status VExprContext::execute_conjuncts(const VExprContextSPtrs& ctxs,
// TODO: Performance Optimization
Status VExprContext::execute_conjuncts(const VExprContextSPtrs& ctxs,
const std::vector<IColumn::Filter*>* filters,
bool accept_null, Block* block,
bool accept_null, const Block* block,
IColumn::Filter* result_filter, bool* can_filter_all) {
size_t rows = block->rows();
DCHECK_EQ(result_filter->size(), rows);
Expand All @@ -201,9 +211,8 @@ Status VExprContext::execute_conjuncts(const VExprContextSPtrs& ctxs,
for (const auto& ctx : ctxs) {
// Statistics are only required when an rf wrapper exists in the expr.
bool is_rf_wrapper = ctx->root()->is_rf_wrapper();
int result_column_id = -1;
RETURN_IF_ERROR(ctx->execute(block, &result_column_id));
ColumnPtr& filter_column = block->get_by_position(result_column_id).column;
ColumnPtr filter_column;
RETURN_IF_ERROR(ctx->execute(block, filter_column));
if (const auto* nullable_column = check_and_get_column<ColumnNullable>(*filter_column)) {
size_t column_size = nullable_column->size();
if (column_size == 0) {
Expand Down Expand Up @@ -297,7 +306,7 @@ Status VExprContext::execute_conjuncts(const VExprContextSPtrs& ctxs,
return Status::OK();
}

Status VExprContext::execute_conjuncts(const VExprContextSPtrs& conjuncts, Block* block,
Status VExprContext::execute_conjuncts(const VExprContextSPtrs& conjuncts, const Block* block,
ColumnUInt8& null_map, IColumn::Filter& filter) {
const auto& rows = block->rows();
if (rows == 0) {
Expand All @@ -312,10 +321,9 @@ Status VExprContext::execute_conjuncts(const VExprContextSPtrs& conjuncts, Block
auto* final_filter_ptr = filter.data();

for (const auto& conjunct : conjuncts) {
int result_column_id = -1;
RETURN_IF_ERROR(conjunct->execute(block, &result_column_id));
auto [filter_column, is_const] =
unpack_if_const(block->get_by_position(result_column_id).column);
ColumnPtr result_column;
RETURN_IF_ERROR(conjunct->execute(block, result_column));
auto [filter_column, is_const] = unpack_if_const(result_column);
const auto* nullable_column = assert_cast<const ColumnNullable*>(filter_column.get());
if (!is_const) {
const ColumnPtr& nested_column = nullable_column->get_nested_column_ptr();
Expand Down Expand Up @@ -426,24 +434,22 @@ Status VExprContext::get_output_block_after_execute_exprs(
const VExprContextSPtrs& output_vexpr_ctxs, const Block& input_block, Block* output_block,
bool do_projection) {
auto rows = input_block.rows();
vectorized::Block tmp_block(input_block.get_columns_with_type_and_name());
vectorized::ColumnsWithTypeAndName result_columns;
_reset_memory_usage(output_vexpr_ctxs);

for (const auto& vexpr_ctx : output_vexpr_ctxs) {
int result_column_id = -1;
int origin_columns = tmp_block.columns();
size_t origin_usage = tmp_block.allocated_bytes();
RETURN_IF_ERROR(vexpr_ctx->execute(&tmp_block, &result_column_id));
DCHECK(result_column_id != -1);

vexpr_ctx->_memory_usage = tmp_block.allocated_bytes() - origin_usage;
const auto& col = tmp_block.get_by_position(result_column_id);
if (do_projection && origin_columns <= result_column_id) {
result_columns.emplace_back(col.column->clone_resized(rows), col.type, col.name);
vexpr_ctx->_memory_usage += result_columns.back().column->allocated_bytes();
ColumnPtr result_column;
RETURN_IF_ERROR(vexpr_ctx->execute(&input_block, result_column));

auto type = vexpr_ctx->execute_type(&input_block);
const auto& name = vexpr_ctx->expr_name();

vexpr_ctx->_memory_usage += result_column->allocated_bytes();
if (do_projection) {
result_columns.emplace_back(result_column->clone_resized(rows), type, name);

} else {
result_columns.emplace_back(tmp_block.get_by_position(result_column_id));
result_columns.emplace_back(result_column, type, name);
}
}
*output_block = {result_columns};
Expand Down
13 changes: 7 additions & 6 deletions be/src/vec/exprs/vexpr_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,9 @@ class VExprContext {
[[nodiscard]] Status open(RuntimeState* state);
[[nodiscard]] Status clone(RuntimeState* state, VExprContextSPtr& new_ctx);
[[nodiscard]] Status execute(Block* block, int* result_column_id);
[[nodiscard]] Status execute(Block* block, ColumnPtr& result_column);
[[nodiscard]] Status execute(const Block* block, ColumnPtr& result_column);
[[nodiscard]] DataTypePtr execute_type(const Block* block);
[[nodiscard]] const std::string& expr_name() const;
[[nodiscard]] bool is_blockable() const;

VExprSPtr root() { return _root; }
Expand Down Expand Up @@ -208,20 +210,19 @@ class VExprContext {

bool all_expr_inverted_index_evaluated();

[[nodiscard]] static Status filter_block(VExprContext* vexpr_ctx, Block* block,
size_t column_to_keep);
[[nodiscard]] static Status filter_block(VExprContext* vexpr_ctx, Block* block);

[[nodiscard]] static Status filter_block(const VExprContextSPtrs& expr_contexts, Block* block,
size_t column_to_keep);

[[nodiscard]] static Status execute_conjuncts(const VExprContextSPtrs& ctxs,
const std::vector<IColumn::Filter*>* filters,
bool accept_null, Block* block,
bool accept_null, const Block* block,
IColumn::Filter* result_filter,
bool* can_filter_all);

[[nodiscard]] static Status execute_conjuncts(const VExprContextSPtrs& conjuncts, Block* block,
ColumnUInt8& null_map,
[[nodiscard]] static Status execute_conjuncts(const VExprContextSPtrs& conjuncts,
const Block* block, ColumnUInt8& null_map,
IColumn::Filter& result_filter);

static Status execute_conjuncts(const VExprContextSPtrs& ctxs,
Expand Down