Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] support del files and cloud native index recover for shared-data pk table #52707

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions be/src/storage/lake/lake_primary_key_recover.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "column/column.h"
#include "common/constexpr.h"
#include "fs/fs_util.h"
#include "fs/key_cache.h"
#include "storage/chunk_helper.h"
#include "storage/lake/meta_file.h"
#include "storage/lake/tablet.h"
Expand All @@ -28,11 +29,12 @@
namespace starrocks::lake {

Status LakePrimaryKeyRecover::pre_cleanup() {
// 1. reset delvec in metadata and clean delvec in builder
// 1. reset delvec & sst in metadata and clean delvec in builder
// TODO reclaim delvec files
_metadata->clear_delvec_meta();
_metadata->clear_sstable_meta();
// 2. reset primary index
_tablet->update_mgr()->try_remove_primary_index_cache(_tablet->id());
RETURN_ERROR_IF_FALSE(_tablet->update_mgr()->try_remove_primary_index_cache(_tablet->id()));
DataDir* data_dir = StorageEngine::instance()->get_persistent_index_store(_tablet->id());
if (data_dir != nullptr) {
// clear local persistent index's index meta from rocksdb, and index files.
Expand Down Expand Up @@ -84,8 +86,32 @@ Status LakePrimaryKeyRecover::rowset_iterator(
if (!res.ok()) {
return res.status();
}
// iterator for each segment
auto& itrs = res.value();
RETURN_IF_ERROR(handler(itrs, {}, {}, rowset->id()));
// iterator for each del files
std::vector<std::unique_ptr<RandomAccessFile>> del_rfs;
for (int del_idx = 0; del_idx < rowset->metadata().del_files_size(); ++del_idx) {
const auto& del = rowset->metadata().del_files(del_idx);
if (del.origin_rowset_id() != rowset->id()) {
// if del file is not origin from this rowset, skip it
// because that means this rowset is generated by compaction
break;
}
RandomAccessFileOptions ropts;
if (!del.encryption_meta().empty()) {
ASSIGN_OR_RETURN(ropts.encryption_info,
KeyCache::instance().unwrap_encryption_meta(del.encryption_meta()));
}
ASSIGN_OR_RETURN(auto read_file, fs::new_random_access_file(ropts, _tablet->tablet_mgr()->del_location(
_metadata->id(), del.name())));
del_rfs.push_back(std::move(read_file));
}
// position of del files in itrs
std::vector<uint32_t> delidxs;
for (uint32_t i = itrs.size(); i < itrs.size() + del_rfs.size(); ++i) {
delidxs.push_back(i);
}
RETURN_IF_ERROR(handler(itrs, del_rfs, delidxs, rowset->id()));
}
return Status::OK();
}
luohaha marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
7 changes: 7 additions & 0 deletions be/src/storage/lake/meta_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,13 @@ Status MetaFileBuilder::update_num_del_stat(const std::map<uint32_t, size_t>& se
segment_id_to_rowset[mutable_rowset->id() + j] = mutable_rowset;
}
}
// For test purpose, we can set recover flag to test recover mode.
Status test_status = Status::OK();
TEST_SYNC_POINT_CALLBACK("update_num_del_stat", &test_status);
if (!test_status.ok()) {
set_recover_flag(RecoverFlag::RECOVER_WITHOUT_PUBLISH);
return test_status;
}
for (const auto& each : segment_id_to_add_dels) {
if (segment_id_to_rowset.count(each.first) == 0) {
// Maybe happen when primary index is in error state.
Expand Down
7 changes: 7 additions & 0 deletions be/src/storage/lake/update_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,13 @@ Status UpdateManager::publish_primary_key_tablet(const TxnLogPB_OpWrite& op_writ
size_t cur_old = old_del_vec->cardinality();
size_t cur_add = new_delete.second.size();
size_t cur_new = new_del_vecs[idx].second->cardinality();
// For test purpose, we can set recover flag to test recover mode.
Status test_status = Status::OK();
TEST_SYNC_POINT_CALLBACK("delvec_inconsistent", &test_status);
if (!test_status.ok()) {
builder->set_recover_flag(RecoverFlag::RECOVER_WITH_PUBLISH);
return test_status;
}
if (cur_old + cur_add != cur_new) {
// should not happen, data inconsistent
std::string error_msg = strings::Substitute(
Expand Down
195 changes: 188 additions & 7 deletions be/test/storage/lake/primary_key_publish_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,17 @@ TEST_P(LakePrimaryKeyPublishTest, test_recover) {
auto [chunk0, indexes] = gen_data_and_index(kChunkSize, 0, true, true);
auto version = 1;
auto tablet_id = _tablet_metadata->id();
bool ingest_failure = true;
std::string sync_point = "lake_index_load.1";
SyncPoint::GetInstance()->SetCallBack(sync_point, [&](void* arg) {
if (ingest_failure) {
*(Status*)arg = Status::AlreadyExist("ut_test");
ingest_failure = false;
} else {
ingest_failure = true;
}
});
SyncPoint::GetInstance()->EnableProcessing();
for (int i = 0; i < 3; i++) {
int64_t txn_id = next_id();
ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder()
Expand All @@ -687,33 +698,203 @@ TEST_P(LakePrimaryKeyPublishTest, test_recover) {
ASSERT_OK(delta_writer->write(*chunk0, indexes.data(), indexes.size()));
ASSERT_OK(delta_writer->finish_with_txnlog());
delta_writer->close();
std::string sync_point = "lake_index_load.1";
// Publish version
int retry_time = 0;
ASSERT_OK(publish_single_version(tablet_id, version + 1, txn_id).status());
EXPECT_TRUE(_update_mgr->TEST_check_update_state_cache_absent(tablet_id, txn_id));
EXPECT_TRUE(_update_mgr->TEST_check_primary_index_cache_ref(tablet_id, 1));
EXPECT_TRUE(_update_mgr->try_remove_primary_index_cache(tablet_id));
version++;
}
SyncPoint::GetInstance()->ClearCallBack(sync_point);
SyncPoint::GetInstance()->DisableProcessing();
ASSERT_EQ(kChunkSize, read_rows(tablet_id, version));
ASSIGN_OR_ABORT(auto new_tablet_metadata, _tablet_mgr->get_tablet_metadata(tablet_id, version));
EXPECT_EQ(new_tablet_metadata->rowsets_size(), 3);
ASSERT_EQ(kChunkSize, read_rows(tablet_id, version));
if (GetParam().enable_persistent_index && GetParam().persistent_index_type == PersistentIndexTypePB::LOCAL) {
check_local_persistent_index_meta(tablet_id, version);
}
config::enable_primary_key_recover = false;
}

TEST_P(LakePrimaryKeyPublishTest, test_recover_with_multi_reason) {
config::enable_primary_key_recover = true;
auto [chunk0, indexes] = gen_data_and_index(kChunkSize, 0, true, true);
auto version = 1;
auto tablet_id = _tablet_metadata->id();
auto old_l0_max_mem_usage = config::l0_max_mem_usage;
config::l0_max_mem_usage = 10;
std::vector<std::string> sync_points = {"lake_index_load.1", "update_num_del_stat", "delvec_inconsistent"};
for (int i = 0; i < 30; i++) {
std::string sync_point = sync_points[i % sync_points.size()];
bool ingest_failure = true;
SyncPoint::GetInstance()->SetCallBack(sync_point, [&](void* arg) {
if (retry_time < 1) {
if (ingest_failure) {
*(Status*)arg = Status::AlreadyExist("ut_test");
retry_time++;
ingest_failure = false;
} else {
ingest_failure = true;
}
});
SyncPoint::GetInstance()->EnableProcessing();
int64_t txn_id = next_id();
ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder()
.set_tablet_manager(_tablet_mgr.get())
.set_tablet_id(tablet_id)
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_schema_id(_tablet_schema->id())
.build());
ASSERT_OK(delta_writer->open());
ASSERT_OK(delta_writer->write(*chunk0, indexes.data(), indexes.size()));
ASSERT_OK(delta_writer->finish_with_txnlog());
delta_writer->close();
// Publish version
ASSERT_OK(publish_single_version(tablet_id, version + 1, txn_id).status());
SyncPoint::GetInstance()->ClearCallBack(sync_point);
SyncPoint::GetInstance()->DisableProcessing();
EXPECT_TRUE(_update_mgr->TEST_check_update_state_cache_absent(tablet_id, txn_id));
EXPECT_TRUE(_update_mgr->TEST_check_primary_index_cache_ref(tablet_id, 1));
EXPECT_TRUE(_update_mgr->try_remove_primary_index_cache(tablet_id));
version++;
SyncPoint::GetInstance()->ClearCallBack(sync_point);
SyncPoint::GetInstance()->DisableProcessing();
}
config::l0_max_mem_usage = old_l0_max_mem_usage;
ASSERT_EQ(kChunkSize, read_rows(tablet_id, version));
ASSIGN_OR_ABORT(auto new_tablet_metadata, _tablet_mgr->get_tablet_metadata(tablet_id, version));
EXPECT_EQ(new_tablet_metadata->rowsets_size(), 3);
EXPECT_EQ(new_tablet_metadata->rowsets_size(), 30);
ASSERT_EQ(kChunkSize, read_rows(tablet_id, version));
if (GetParam().enable_persistent_index && GetParam().persistent_index_type == PersistentIndexTypePB::LOCAL) {
check_local_persistent_index_meta(tablet_id, version);
}
config::enable_primary_key_recover = false;
}

TEST_P(LakePrimaryKeyPublishTest, test_recover_with_dels) {
config::enable_primary_key_recover = true;
auto [chunk0, indexes] = gen_data_and_index(kChunkSize, 0, true, true);
auto [chunk1, indexes1] = gen_data_and_index(kChunkSize, 0, true, false);
auto version = 1;
auto tablet_id = _tablet_metadata->id();
std::string sync_point = "lake_index_load.1";
bool ingest_failure = true;
SyncPoint::GetInstance()->SetCallBack(sync_point, [&](void* arg) {
if (ingest_failure) {
*(Status*)arg = Status::AlreadyExist("ut_test");
ingest_failure = false;
} else {
ingest_failure = true;
}
});
SyncPoint::GetInstance()->EnableProcessing();
for (int i = 0; i < 6; i++) {
int64_t txn_id = next_id();
ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder()
.set_tablet_manager(_tablet_mgr.get())
.set_tablet_id(tablet_id)
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_schema_id(_tablet_schema->id())
.set_slot_descriptors(&_slot_pointers)
.build());
ASSERT_OK(delta_writer->open());
if (i % 2 == 0) {
// upsert
ASSERT_OK(delta_writer->write(*chunk0, indexes.data(), indexes.size()));
} else {
// delete
ASSERT_OK(delta_writer->write(*chunk1, indexes1.data(), indexes1.size()));
}
ASSERT_OK(delta_writer->finish_with_txnlog());
delta_writer->close();
// Publish version
ASSERT_OK(publish_single_version(tablet_id, version + 1, txn_id).status());
EXPECT_TRUE(_update_mgr->TEST_check_update_state_cache_absent(tablet_id, txn_id));
EXPECT_TRUE(_update_mgr->TEST_check_primary_index_cache_ref(tablet_id, 1));
EXPECT_TRUE(_update_mgr->try_remove_primary_index_cache(tablet_id));
version++;
}
SyncPoint::GetInstance()->ClearCallBack(sync_point);
SyncPoint::GetInstance()->DisableProcessing();
ASSERT_EQ(0, read_rows(tablet_id, version));
ASSIGN_OR_ABORT(auto new_tablet_metadata, _tablet_mgr->get_tablet_metadata(tablet_id, version));
EXPECT_EQ(new_tablet_metadata->rowsets_size(), 6);
for (int i = 0; i < 6; i++) {
EXPECT_EQ(new_tablet_metadata->rowsets(i).del_files_size(), i % 2);
}
ASSERT_EQ(0, read_rows(tablet_id, version));
if (GetParam().enable_persistent_index && GetParam().persistent_index_type == PersistentIndexTypePB::LOCAL) {
check_local_persistent_index_meta(tablet_id, version);
}
config::enable_primary_key_recover = false;
}

TEST_P(LakePrimaryKeyPublishTest, test_recover_with_dels2) {
config::enable_primary_key_recover = true;
auto [chunk0, indexes] = gen_data_and_index(kChunkSize, 0, true, true);
auto [chunk1, indexes1] = gen_data_and_index(kChunkSize, 0, true, false);
auto version = 1;
auto tablet_id = _tablet_metadata->id();
bool ingest_failure = true;
std::string sync_point = "lake_index_load.1";
SyncPoint::GetInstance()->SetCallBack(sync_point, [&](void* arg) {
if (ingest_failure) {
*(Status*)arg = Status::AlreadyExist("ut_test");
ingest_failure = false;
} else {
ingest_failure = true;
}
});
SyncPoint::GetInstance()->EnableProcessing();
for (int i = 0; i < 6; i++) {
int64_t txn_id = next_id();
ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder()
.set_tablet_manager(_tablet_mgr.get())
.set_tablet_id(tablet_id)
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_schema_id(_tablet_schema->id())
.set_slot_descriptors(&_slot_pointers)
.build());
ASSERT_OK(delta_writer->open());
// upsert
const int64_t old_size = config::write_buffer_size;
config::write_buffer_size = 1;
ASSERT_OK(delta_writer->write(*chunk0, indexes.data(), indexes.size()));
// delete
for (int j = 0; j < i; j++) {
ASSERT_OK(delta_writer->write(*chunk1, indexes1.data(), indexes1.size()));
}
if (i == 5) {
ASSERT_OK(delta_writer->write(*chunk0, indexes.data(), indexes.size()));
}
ASSERT_OK(delta_writer->finish_with_txnlog());
config::write_buffer_size = old_size;
delta_writer->close();
// Publish version
ASSERT_OK(publish_single_version(tablet_id, version + 1, txn_id).status());
EXPECT_TRUE(_update_mgr->TEST_check_update_state_cache_absent(tablet_id, txn_id));
EXPECT_TRUE(_update_mgr->TEST_check_primary_index_cache_ref(tablet_id, 1));
EXPECT_TRUE(_update_mgr->try_remove_primary_index_cache(tablet_id));
version++;
}
SyncPoint::GetInstance()->ClearCallBack(sync_point);
SyncPoint::GetInstance()->DisableProcessing();
ASSERT_EQ(0, read_rows(tablet_id, version));
ASSIGN_OR_ABORT(auto new_tablet_metadata, _tablet_mgr->get_tablet_metadata(tablet_id, version));
EXPECT_EQ(new_tablet_metadata->rowsets_size(), 6);
for (int i = 0; i < 6; i++) {
EXPECT_EQ(new_tablet_metadata->rowsets(i).del_files_size(), i);
}
ASSERT_EQ(0, read_rows(tablet_id, version));
if (GetParam().enable_persistent_index && GetParam().persistent_index_type == PersistentIndexTypePB::LOCAL) {
check_local_persistent_index_meta(tablet_id, version);
}
config::enable_primary_key_recover = false;
}

TEST_P(LakePrimaryKeyPublishTest, test_write_rebuild_persistent_index) {
if (!GetParam().enable_persistent_index) {
// only test persistent index
Expand Down
Loading