From d91e7ef295add4469d45c0d09a5a92c36216e78a Mon Sep 17 00:00:00 2001 From: luohaha <18810541851@163.com> Date: Fri, 8 Nov 2024 18:05:59 +0800 Subject: [PATCH] [Enhancement] support del files and cloud native index recover for shared-data pk table Signed-off-by: luohaha <18810541851@163.com> --- .../storage/lake/lake_primary_key_recover.cpp | 32 ++- be/src/storage/lake/meta_file.cpp | 7 + be/src/storage/lake/update_manager.cpp | 7 + .../storage/lake/primary_key_publish_test.cpp | 195 +++++++++++++++++- 4 files changed, 231 insertions(+), 10 deletions(-) diff --git a/be/src/storage/lake/lake_primary_key_recover.cpp b/be/src/storage/lake/lake_primary_key_recover.cpp index a5d9d366124da..2ee6bdadd0874 100644 --- a/be/src/storage/lake/lake_primary_key_recover.cpp +++ b/be/src/storage/lake/lake_primary_key_recover.cpp @@ -17,6 +17,7 @@ #include "column/column.h" #include "common/constexpr.h" #include "fs/fs_util.h" +#include "fs/key_cache.h" #include "storage/chunk_helper.h" #include "storage/lake/meta_file.h" #include "storage/lake/tablet.h" @@ -28,11 +29,12 @@ namespace starrocks::lake { Status LakePrimaryKeyRecover::pre_cleanup() { - // 1. reset delvec in metadata and clean delvec in builder + // 1. reset delvec & sst in metadata and clean delvec in builder // TODO reclaim delvec files _metadata->clear_delvec_meta(); + _metadata->clear_sstable_meta(); // 2. reset primary index - _tablet->update_mgr()->try_remove_primary_index_cache(_tablet->id()); + RETURN_ERROR_IF_FALSE(_tablet->update_mgr()->try_remove_primary_index_cache(_tablet->id())); DataDir* data_dir = StorageEngine::instance()->get_persistent_index_store(_tablet->id()); if (data_dir != nullptr) { // clear local persistent index's index meta from rocksdb, and index files. @@ -84,8 +86,32 @@ Status LakePrimaryKeyRecover::rowset_iterator( if (!res.ok()) { return res.status(); } + // iterator for each segment auto& itrs = res.value(); - RETURN_IF_ERROR(handler(itrs, {}, {}, rowset->id())); + // iterator for each del files + std::vector> del_rfs; + for (int del_idx = 0; del_idx < rowset->metadata().del_files_size(); ++del_idx) { + const auto& del = rowset->metadata().del_files(del_idx); + if (del.origin_rowset_id() != rowset->id()) { + // if del file is not origin from this rowset, skip it + // because that means this rowset is generated by compaction + break; + } + RandomAccessFileOptions ropts; + if (!del.encryption_meta().empty()) { + ASSIGN_OR_RETURN(ropts.encryption_info, + KeyCache::instance().unwrap_encryption_meta(del.encryption_meta())); + } + ASSIGN_OR_RETURN(auto read_file, fs::new_random_access_file(ropts, _tablet->tablet_mgr()->del_location( + _metadata->id(), del.name()))); + del_rfs.push_back(std::move(read_file)); + } + // position of del files in itrs + std::vector delidxs; + for (uint32_t i = itrs.size(); i < itrs.size() + del_rfs.size(); ++i) { + delidxs.push_back(i); + } + RETURN_IF_ERROR(handler(itrs, del_rfs, delidxs, rowset->id())); } return Status::OK(); } diff --git a/be/src/storage/lake/meta_file.cpp b/be/src/storage/lake/meta_file.cpp index 02084a5fadab3..e837cd6674c16 100644 --- a/be/src/storage/lake/meta_file.cpp +++ b/be/src/storage/lake/meta_file.cpp @@ -402,6 +402,13 @@ Status MetaFileBuilder::update_num_del_stat(const std::map& se segment_id_to_rowset[mutable_rowset->id() + j] = mutable_rowset; } } + // For test purpose, we can set recover flag to test recover mode. + Status test_status = Status::OK(); + TEST_SYNC_POINT_CALLBACK("update_num_del_stat", &test_status); + if (!test_status.ok()) { + set_recover_flag(RecoverFlag::RECOVER_WITHOUT_PUBLISH); + return test_status; + } for (const auto& each : segment_id_to_add_dels) { if (segment_id_to_rowset.count(each.first) == 0) { // Maybe happen when primary index is in error state. diff --git a/be/src/storage/lake/update_manager.cpp b/be/src/storage/lake/update_manager.cpp index 21ff88b223df2..1a94e5e230952 100644 --- a/be/src/storage/lake/update_manager.cpp +++ b/be/src/storage/lake/update_manager.cpp @@ -295,6 +295,13 @@ Status UpdateManager::publish_primary_key_tablet(const TxnLogPB_OpWrite& op_writ size_t cur_old = old_del_vec->cardinality(); size_t cur_add = new_delete.second.size(); size_t cur_new = new_del_vecs[idx].second->cardinality(); + // For test purpose, we can set recover flag to test recover mode. + Status test_status = Status::OK(); + TEST_SYNC_POINT_CALLBACK("delvec_inconsistent", &test_status); + if (!test_status.ok()) { + builder->set_recover_flag(RecoverFlag::RECOVER_WITH_PUBLISH); + return test_status; + } if (cur_old + cur_add != cur_new) { // should not happen, data inconsistent std::string error_msg = strings::Substitute( diff --git a/be/test/storage/lake/primary_key_publish_test.cpp b/be/test/storage/lake/primary_key_publish_test.cpp index e9a89b18a372f..d874a349aca21 100644 --- a/be/test/storage/lake/primary_key_publish_test.cpp +++ b/be/test/storage/lake/primary_key_publish_test.cpp @@ -673,6 +673,17 @@ TEST_P(LakePrimaryKeyPublishTest, test_recover) { auto [chunk0, indexes] = gen_data_and_index(kChunkSize, 0, true, true); auto version = 1; auto tablet_id = _tablet_metadata->id(); + bool ingest_failure = true; + std::string sync_point = "lake_index_load.1"; + SyncPoint::GetInstance()->SetCallBack(sync_point, [&](void* arg) { + if (ingest_failure) { + *(Status*)arg = Status::AlreadyExist("ut_test"); + ingest_failure = false; + } else { + ingest_failure = true; + } + }); + SyncPoint::GetInstance()->EnableProcessing(); for (int i = 0; i < 3; i++) { int64_t txn_id = next_id(); ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder() @@ -687,26 +698,71 @@ TEST_P(LakePrimaryKeyPublishTest, test_recover) { ASSERT_OK(delta_writer->write(*chunk0, indexes.data(), indexes.size())); ASSERT_OK(delta_writer->finish_with_txnlog()); delta_writer->close(); - std::string sync_point = "lake_index_load.1"; // Publish version - int retry_time = 0; + ASSERT_OK(publish_single_version(tablet_id, version + 1, txn_id).status()); + EXPECT_TRUE(_update_mgr->TEST_check_update_state_cache_absent(tablet_id, txn_id)); + EXPECT_TRUE(_update_mgr->TEST_check_primary_index_cache_ref(tablet_id, 1)); + EXPECT_TRUE(_update_mgr->try_remove_primary_index_cache(tablet_id)); + version++; + } + SyncPoint::GetInstance()->ClearCallBack(sync_point); + SyncPoint::GetInstance()->DisableProcessing(); + ASSERT_EQ(kChunkSize, read_rows(tablet_id, version)); + ASSIGN_OR_ABORT(auto new_tablet_metadata, _tablet_mgr->get_tablet_metadata(tablet_id, version)); + EXPECT_EQ(new_tablet_metadata->rowsets_size(), 3); + ASSERT_EQ(kChunkSize, read_rows(tablet_id, version)); + if (GetParam().enable_persistent_index && GetParam().persistent_index_type == PersistentIndexTypePB::LOCAL) { + check_local_persistent_index_meta(tablet_id, version); + } + config::enable_primary_key_recover = false; +} + +TEST_P(LakePrimaryKeyPublishTest, test_recover_with_multi_reason) { + config::enable_primary_key_recover = true; + auto [chunk0, indexes] = gen_data_and_index(kChunkSize, 0, true, true); + auto version = 1; + auto tablet_id = _tablet_metadata->id(); + auto old_l0_max_mem_usage = config::l0_max_mem_usage; + config::l0_max_mem_usage = 10; + std::vector sync_points = {"lake_index_load.1", "update_num_del_stat", "delvec_inconsistent"}; + for (int i = 0; i < 30; i++) { + std::string sync_point = sync_points[i % sync_points.size()]; + bool ingest_failure = true; SyncPoint::GetInstance()->SetCallBack(sync_point, [&](void* arg) { - if (retry_time < 1) { + if (ingest_failure) { *(Status*)arg = Status::AlreadyExist("ut_test"); - retry_time++; + ingest_failure = false; + } else { + ingest_failure = true; } }); SyncPoint::GetInstance()->EnableProcessing(); + int64_t txn_id = next_id(); + ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder() + .set_tablet_manager(_tablet_mgr.get()) + .set_tablet_id(tablet_id) + .set_txn_id(txn_id) + .set_partition_id(_partition_id) + .set_mem_tracker(_mem_tracker.get()) + .set_schema_id(_tablet_schema->id()) + .build()); + ASSERT_OK(delta_writer->open()); + ASSERT_OK(delta_writer->write(*chunk0, indexes.data(), indexes.size())); + ASSERT_OK(delta_writer->finish_with_txnlog()); + delta_writer->close(); + // Publish version ASSERT_OK(publish_single_version(tablet_id, version + 1, txn_id).status()); - SyncPoint::GetInstance()->ClearCallBack(sync_point); - SyncPoint::GetInstance()->DisableProcessing(); EXPECT_TRUE(_update_mgr->TEST_check_update_state_cache_absent(tablet_id, txn_id)); EXPECT_TRUE(_update_mgr->TEST_check_primary_index_cache_ref(tablet_id, 1)); + EXPECT_TRUE(_update_mgr->try_remove_primary_index_cache(tablet_id)); version++; + SyncPoint::GetInstance()->ClearCallBack(sync_point); + SyncPoint::GetInstance()->DisableProcessing(); } + config::l0_max_mem_usage = old_l0_max_mem_usage; ASSERT_EQ(kChunkSize, read_rows(tablet_id, version)); ASSIGN_OR_ABORT(auto new_tablet_metadata, _tablet_mgr->get_tablet_metadata(tablet_id, version)); - EXPECT_EQ(new_tablet_metadata->rowsets_size(), 3); + EXPECT_EQ(new_tablet_metadata->rowsets_size(), 30); ASSERT_EQ(kChunkSize, read_rows(tablet_id, version)); if (GetParam().enable_persistent_index && GetParam().persistent_index_type == PersistentIndexTypePB::LOCAL) { check_local_persistent_index_meta(tablet_id, version); @@ -714,6 +770,131 @@ TEST_P(LakePrimaryKeyPublishTest, test_recover) { config::enable_primary_key_recover = false; } +TEST_P(LakePrimaryKeyPublishTest, test_recover_with_dels) { + config::enable_primary_key_recover = true; + auto [chunk0, indexes] = gen_data_and_index(kChunkSize, 0, true, true); + auto [chunk1, indexes1] = gen_data_and_index(kChunkSize, 0, true, false); + auto version = 1; + auto tablet_id = _tablet_metadata->id(); + std::string sync_point = "lake_index_load.1"; + bool ingest_failure = true; + SyncPoint::GetInstance()->SetCallBack(sync_point, [&](void* arg) { + if (ingest_failure) { + *(Status*)arg = Status::AlreadyExist("ut_test"); + ingest_failure = false; + } else { + ingest_failure = true; + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + for (int i = 0; i < 6; i++) { + int64_t txn_id = next_id(); + ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder() + .set_tablet_manager(_tablet_mgr.get()) + .set_tablet_id(tablet_id) + .set_txn_id(txn_id) + .set_partition_id(_partition_id) + .set_mem_tracker(_mem_tracker.get()) + .set_schema_id(_tablet_schema->id()) + .set_slot_descriptors(&_slot_pointers) + .build()); + ASSERT_OK(delta_writer->open()); + if (i % 2 == 0) { + // upsert + ASSERT_OK(delta_writer->write(*chunk0, indexes.data(), indexes.size())); + } else { + // delete + ASSERT_OK(delta_writer->write(*chunk1, indexes1.data(), indexes1.size())); + } + ASSERT_OK(delta_writer->finish_with_txnlog()); + delta_writer->close(); + // Publish version + ASSERT_OK(publish_single_version(tablet_id, version + 1, txn_id).status()); + EXPECT_TRUE(_update_mgr->TEST_check_update_state_cache_absent(tablet_id, txn_id)); + EXPECT_TRUE(_update_mgr->TEST_check_primary_index_cache_ref(tablet_id, 1)); + EXPECT_TRUE(_update_mgr->try_remove_primary_index_cache(tablet_id)); + version++; + } + SyncPoint::GetInstance()->ClearCallBack(sync_point); + SyncPoint::GetInstance()->DisableProcessing(); + ASSERT_EQ(0, read_rows(tablet_id, version)); + ASSIGN_OR_ABORT(auto new_tablet_metadata, _tablet_mgr->get_tablet_metadata(tablet_id, version)); + EXPECT_EQ(new_tablet_metadata->rowsets_size(), 6); + for (int i = 0; i < 6; i++) { + EXPECT_EQ(new_tablet_metadata->rowsets(i).del_files_size(), i % 2); + } + ASSERT_EQ(0, read_rows(tablet_id, version)); + if (GetParam().enable_persistent_index && GetParam().persistent_index_type == PersistentIndexTypePB::LOCAL) { + check_local_persistent_index_meta(tablet_id, version); + } + config::enable_primary_key_recover = false; +} + +TEST_P(LakePrimaryKeyPublishTest, test_recover_with_dels2) { + config::enable_primary_key_recover = true; + auto [chunk0, indexes] = gen_data_and_index(kChunkSize, 0, true, true); + auto [chunk1, indexes1] = gen_data_and_index(kChunkSize, 0, true, false); + auto version = 1; + auto tablet_id = _tablet_metadata->id(); + bool ingest_failure = true; + std::string sync_point = "lake_index_load.1"; + SyncPoint::GetInstance()->SetCallBack(sync_point, [&](void* arg) { + if (ingest_failure) { + *(Status*)arg = Status::AlreadyExist("ut_test"); + ingest_failure = false; + } else { + ingest_failure = true; + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + for (int i = 0; i < 6; i++) { + int64_t txn_id = next_id(); + ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder() + .set_tablet_manager(_tablet_mgr.get()) + .set_tablet_id(tablet_id) + .set_txn_id(txn_id) + .set_partition_id(_partition_id) + .set_mem_tracker(_mem_tracker.get()) + .set_schema_id(_tablet_schema->id()) + .set_slot_descriptors(&_slot_pointers) + .build()); + ASSERT_OK(delta_writer->open()); + // upsert + const int64_t old_size = config::write_buffer_size; + config::write_buffer_size = 1; + ASSERT_OK(delta_writer->write(*chunk0, indexes.data(), indexes.size())); + // delete + for (int j = 0; j < i; j++) { + ASSERT_OK(delta_writer->write(*chunk1, indexes1.data(), indexes1.size())); + } + if (i == 5) { + ASSERT_OK(delta_writer->write(*chunk0, indexes.data(), indexes.size())); + } + ASSERT_OK(delta_writer->finish_with_txnlog()); + config::write_buffer_size = old_size; + delta_writer->close(); + // Publish version + ASSERT_OK(publish_single_version(tablet_id, version + 1, txn_id).status()); + EXPECT_TRUE(_update_mgr->TEST_check_update_state_cache_absent(tablet_id, txn_id)); + EXPECT_TRUE(_update_mgr->TEST_check_primary_index_cache_ref(tablet_id, 1)); + EXPECT_TRUE(_update_mgr->try_remove_primary_index_cache(tablet_id)); + version++; + } + SyncPoint::GetInstance()->ClearCallBack(sync_point); + SyncPoint::GetInstance()->DisableProcessing(); + ASSERT_EQ(0, read_rows(tablet_id, version)); + ASSIGN_OR_ABORT(auto new_tablet_metadata, _tablet_mgr->get_tablet_metadata(tablet_id, version)); + EXPECT_EQ(new_tablet_metadata->rowsets_size(), 6); + for (int i = 0; i < 6; i++) { + EXPECT_EQ(new_tablet_metadata->rowsets(i).del_files_size(), i); + } + ASSERT_EQ(0, read_rows(tablet_id, version)); + if (GetParam().enable_persistent_index && GetParam().persistent_index_type == PersistentIndexTypePB::LOCAL) { + check_local_persistent_index_meta(tablet_id, version); + } + config::enable_primary_key_recover = false; +} + TEST_P(LakePrimaryKeyPublishTest, test_write_rebuild_persistent_index) { if (!GetParam().enable_persistent_index) { // only test persistent index