Skip to content
Open

test #58240

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 58 additions & 4 deletions be/src/olap/rowset/segment_v2/column_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -600,14 +600,29 @@ uint64_t ScalarColumnWriter::estimate_buffer_size() {
if (is_nullable()) {
size += _null_bitmap_builder->size();
}
size += _ordinal_index_builder->size();
if (_opts.need_zone_map) {
size += estimate_index_size();
return size;
}

uint64_t ScalarColumnWriter::estimate_index_size() {
uint64_t size = 0;
if (_ordinal_index_builder != nullptr) {
size += _ordinal_index_builder->size();
}
if (_opts.need_zone_map && _zone_map_index_builder != nullptr) {
size += _zone_map_index_builder->size();
}
if (_opts.need_bitmap_index) {
if (_opts.need_bitmap_index && _bitmap_index_builder != nullptr) {
size += _bitmap_index_builder->size();
}
if (_opts.need_bloom_filter) {
if (_opts.need_inverted_index) {
for (const auto& builder : _inverted_index_builders) {
if (builder != nullptr) {
size += builder->size();
}
}
}
if (_opts.need_bloom_filter && _bloom_filter_index_builder != nullptr) {
size += _bloom_filter_index_builder->size();
}
return size;
Expand Down Expand Up @@ -869,6 +884,15 @@ uint64_t StructColumnWriter::estimate_buffer_size() {
return size;
}

uint64_t StructColumnWriter::estimate_index_size() {
uint64_t size = 0;
for (auto& column_writer : _sub_column_writers) {
size += column_writer->estimate_index_size();
}
size += is_nullable() ? _null_writer->estimate_index_size() : 0;
return size;
}

Status StructColumnWriter::finish() {
for (auto& column_writer : _sub_column_writers) {
RETURN_IF_ERROR(column_writer->finish());
Expand Down Expand Up @@ -1020,6 +1044,21 @@ uint64_t ArrayColumnWriter::estimate_buffer_size() {
_item_writer->estimate_buffer_size();
}

uint64_t ArrayColumnWriter::estimate_index_size() {
uint64_t size = _offset_writer->estimate_index_size();
// if (is_nullable()) {
// size += _null_writer->estimate_index_size();
// }
// size += _item_writer->estimate_index_size();
// if (_opts.need_inverted_index && _inverted_index_builder != nullptr) {
// size += _inverted_index_builder->size();
// }
// if (_opts.need_ann_index && _ann_index_writer != nullptr) {
// size += _ann_index_writer->size();
// }
return size;
}

Status ArrayColumnWriter::append_nullable(const uint8_t* null_map, const uint8_t** ptr,
size_t num_rows) {
RETURN_IF_ERROR(append_data(ptr, num_rows));
Expand Down Expand Up @@ -1129,6 +1168,21 @@ uint64_t MapColumnWriter::estimate_buffer_size() {
return estimate;
}

uint64_t MapColumnWriter::estimate_index_size() {
uint64_t size = 0;
for (auto& sub_writer : _kv_writers) {
size += sub_writer->estimate_index_size();
}
size += _offsets_writer->estimate_index_size();
if (is_nullable()) {
size += _null_writer->estimate_index_size();
}
if (_index_builder != nullptr) {
size += _index_builder->size();
}
return size;
}

Status MapColumnWriter::finish() {
RETURN_IF_ERROR(_offsets_writer->finish());
if (is_nullable()) {
Expand Down
9 changes: 9 additions & 0 deletions be/src/olap/rowset/segment_v2/column_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ class ColumnWriter {

virtual uint64_t estimate_buffer_size() = 0;

virtual uint64_t estimate_index_size() { return 0; }

// finish append data
virtual Status finish() = 0;

Expand Down Expand Up @@ -226,6 +228,8 @@ class ScalarColumnWriter : public ColumnWriter {

uint64_t estimate_buffer_size() override;

uint64_t estimate_index_size() override;

// finish append data
Status finish() override;

Expand Down Expand Up @@ -358,6 +362,8 @@ class StructColumnWriter final : public ColumnWriter {

uint64_t estimate_buffer_size() override;

uint64_t estimate_index_size() override;

Status finish() override;
Status write_data() override;
Status write_ordinal_index() override;
Expand Down Expand Up @@ -430,6 +436,8 @@ class ArrayColumnWriter final : public ColumnWriter {

uint64_t estimate_buffer_size() override;

uint64_t estimate_index_size() override;

Status finish() override;
Status write_data() override;
Status write_ordinal_index() override;
Expand Down Expand Up @@ -510,6 +518,7 @@ class MapColumnWriter final : public ColumnWriter {
Status append_data(const uint8_t** ptr, size_t num_rows) override;
Status append_nullable(const uint8_t* null_map, const uint8_t** ptr, size_t num_rows) override;
uint64_t estimate_buffer_size() override;
uint64_t estimate_index_size() override;

Status finish() override;
Status write_data() override;
Expand Down
36 changes: 35 additions & 1 deletion be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -911,6 +911,31 @@ Status VerticalSegmentWriter::batch_block(const vectorized::Block* block, size_t
}

Status VerticalSegmentWriter::write_batch() {
auto log_column_writer_mem_usage = [&](const std::string& stage, int64_t cid) {

uint64_t total_size = 0;
uint64_t total_index_size = 0;
uint64_t current_index_size = 0;
for (size_t i = 0; i < _column_writers.size(); ++i) {
const auto& writer = _column_writers[i];
if (writer == nullptr) {
continue;
}
total_size += writer->estimate_buffer_size();
const auto idx_size = writer->estimate_index_size();
total_index_size += idx_size;
if (cid >= 0 && static_cast<int64_t>(i) == cid) {
current_index_size = idx_size;
}
}
LOG(INFO) << fmt::format(
"segment {} write_batch stage={} cid={} column_writers_mem_bytes={} "
"column_writers_index_bytes={}{}",
_segment_id, stage, cid >= 0 ? std::to_string(cid) : "all", total_size,
total_index_size,
cid >= 0 ? fmt::format(" writer_index_mem_bytes={}", current_index_size) : "");
};

if (_opts.rowset_ctx->partial_update_info &&
_opts.rowset_ctx->partial_update_info->is_partial_update() &&
_opts.write_type == DataWriteType::TYPE_DIRECT &&
Expand All @@ -921,6 +946,7 @@ Status VerticalSegmentWriter::write_batch() {
RETURN_IF_ERROR(
_create_column_writer(cid, _tablet_schema->column(cid), _tablet_schema));
}
log_column_writer_mem_usage("partial_update_after_create_column_writers", -1);
vectorized::Block full_block;
for (auto& data : _batched_blocks) {
if (is_flexible_partial_update) {
Expand All @@ -929,9 +955,13 @@ Status VerticalSegmentWriter::write_batch() {
RETURN_IF_ERROR(_append_block_with_partial_content(data, full_block));
}
}
for (auto& column_writer : _column_writers) {
log_column_writer_mem_usage("partial_update_after_append_blocks", -1);
for (size_t cid = 0; cid < _column_writers.size(); ++cid) {
auto& column_writer = _column_writers[cid];
RETURN_IF_ERROR(column_writer->finish());
log_column_writer_mem_usage("partial_update_after_finish_column_writer", cid);
RETURN_IF_ERROR(column_writer->write_data());
log_column_writer_mem_usage("partial_update_after_write_data", cid);

auto* column_meta = column_writer->get_column_meta();
column_meta->set_compressed_data_bytes(
Expand All @@ -958,6 +988,7 @@ Status VerticalSegmentWriter::write_batch() {
std::map<uint32_t, vectorized::IOlapColumnDataAccessor*> cid_to_column;
for (uint32_t cid = 0; cid < _tablet_schema->num_columns(); ++cid) {
RETURN_IF_ERROR(_create_column_writer(cid, _tablet_schema->column(cid), _tablet_schema));
log_column_writer_mem_usage("after_create_column_writer", cid);
for (auto& data : _batched_blocks) {
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_columns(
data.block, data.row_pos, data.num_rows, std::vector<uint32_t> {cid}));
Expand All @@ -984,13 +1015,16 @@ Status VerticalSegmentWriter::write_batch() {
data.num_rows));
_olap_data_convertor->clear_source_content();
}
log_column_writer_mem_usage("after_append_column_data", cid);
if (_data_dir != nullptr &&
_data_dir->reach_capacity_limit(_column_writers[cid]->estimate_buffer_size())) {
return Status::Error<DISK_REACH_CAPACITY_LIMIT>("disk {} exceed capacity limit.",
_data_dir->path_hash());
}
RETURN_IF_ERROR(_column_writers[cid]->finish());
log_column_writer_mem_usage("after_finish_column_writer", cid);
RETURN_IF_ERROR(_column_writers[cid]->write_data());
log_column_writer_mem_usage("after_write_data", cid);

auto* column_meta = _column_writers[cid]->get_column_meta();
column_meta->set_compressed_data_bytes(
Expand Down