diff --git a/be/src/http/action/update_config_action.cpp b/be/src/http/action/update_config_action.cpp index 3eebf25e3b339..0cc46e0f2db2f 100644 --- a/be/src/http/action/update_config_action.cpp +++ b/be/src/http/action/update_config_action.cpp @@ -131,9 +131,8 @@ Status UpdateConfigAction::update_config(const std::string& name, const std::str return Status::InvalidArgument( "This parameter is mutable when the Event-based Compaction Framework is enabled."); } - Status st = StorageEngine::instance()->compaction_manager()->update_max_threads( + return StorageEngine::instance()->compaction_manager()->update_max_threads( config::max_compaction_concurrency); - return st; }); _config_callback.emplace("flush_thread_num_per_store", [&]() -> Status { const size_t dir_cnt = StorageEngine::instance()->get_stores().size(); @@ -149,9 +148,8 @@ Status UpdateConfigAction::update_config(const std::string& name, const std::str return st1; }); _config_callback.emplace("lake_flush_thread_num_per_store", [&]() -> Status { - Status st = StorageEngine::instance()->lake_memtable_flush_executor()->update_max_threads( + return StorageEngine::instance()->lake_memtable_flush_executor()->update_max_threads( MemTableFlushExecutor::calc_max_threads_for_lake_table(StorageEngine::instance()->get_stores())); - return st; }); _config_callback.emplace("update_compaction_num_threads_per_disk", [&]() -> Status { StorageEngine::instance()->increase_update_compaction_thread( @@ -163,8 +161,7 @@ Status UpdateConfigAction::update_config(const std::string& name, const std::str StorageEngine::instance()->update_manager()->get_pindex_compaction_mgr(); if (mgr != nullptr) { const int max_pk_index_compaction_thread_cnt = std::max(1, config::pindex_major_compaction_num_threads); - Status st = mgr->update_max_threads(max_pk_index_compaction_thread_cnt); - return st; + return mgr->update_max_threads(max_pk_index_compaction_thread_cnt); } return Status::OK(); }); @@ -179,30 +176,26 @@ Status UpdateConfigAction::update_config(const std::string& name, const std::str }); _config_callback.emplace("dictionary_cache_refresh_threadpool_size", [&]() -> Status { if (_exec_env->dictionary_cache_pool() != nullptr) { - Status st = _exec_env->dictionary_cache_pool()->update_max_threads( + return _exec_env->dictionary_cache_pool()->update_max_threads( config::dictionary_cache_refresh_threadpool_size); - return st; } return Status::OK(); }); _config_callback.emplace("transaction_publish_version_worker_count", [&]() -> Status { auto thread_pool = ExecEnv::GetInstance()->agent_server()->get_thread_pool(TTaskType::PUBLISH_VERSION); - Status st = thread_pool->update_max_threads( + return thread_pool->update_max_threads( std::max(MIN_TRANSACTION_PUBLISH_WORKER_COUNT, config::transaction_publish_version_worker_count)); - return st; }); _config_callback.emplace("transaction_publish_version_thread_pool_num_min", [&]() -> Status { auto thread_pool = ExecEnv::GetInstance()->agent_server()->get_thread_pool(TTaskType::PUBLISH_VERSION); - Status st = thread_pool->update_min_threads(std::max( + return thread_pool->update_min_threads(std::max( MIN_TRANSACTION_PUBLISH_WORKER_COUNT, config::transaction_publish_version_thread_pool_num_min)); - return st; }); _config_callback.emplace("parallel_clone_task_per_path", [&]() -> Status { _exec_env->agent_server()->update_max_thread_by_type(TTaskType::CLONE, config::parallel_clone_task_per_path); return Status::OK(); }); - _config_callback.emplace("make_snapshot_worker_count", [&]() -> Status { _exec_env->agent_server()->update_max_thread_by_type(TTaskType::MAKE_SNAPSHOT, config::make_snapshot_worker_count); @@ -257,39 +250,33 @@ Status UpdateConfigAction::update_config(const std::string& name, const std::str if (config::transaction_apply_worker_count > 0) { max_thread_cnt = config::transaction_apply_worker_count; } - Status st = StorageEngine::instance()->update_manager()->apply_thread_pool()->update_max_threads( + return StorageEngine::instance()->update_manager()->apply_thread_pool()->update_max_threads( max_thread_cnt); - return st; }); _config_callback.emplace("transaction_apply_thread_pool_num_min", [&]() -> Status { int min_thread_cnt = config::transaction_apply_thread_pool_num_min; - Status st = StorageEngine::instance()->update_manager()->apply_thread_pool()->update_min_threads( + return StorageEngine::instance()->update_manager()->apply_thread_pool()->update_min_threads( min_thread_cnt); - return st; }); _config_callback.emplace("get_pindex_worker_count", [&]() -> Status { int max_thread_cnt = CpuInfo::num_cores(); if (config::get_pindex_worker_count > 0) { max_thread_cnt = config::get_pindex_worker_count; } - Status st = StorageEngine::instance()->update_manager()->get_pindex_thread_pool()->update_max_threads( + return StorageEngine::instance()->update_manager()->get_pindex_thread_pool()->update_max_threads( max_thread_cnt); - return st; }); _config_callback.emplace("drop_tablet_worker_count", [&]() -> Status { auto thread_pool = ExecEnv::GetInstance()->agent_server()->get_thread_pool(TTaskType::DROP); - Status st = thread_pool->update_max_threads(config::drop_tablet_worker_count); - return st; + return thread_pool->update_max_threads(config::drop_tablet_worker_count); }); _config_callback.emplace("make_snapshot_worker_count", [&]() -> Status { auto thread_pool = ExecEnv::GetInstance()->agent_server()->get_thread_pool(TTaskType::MAKE_SNAPSHOT); - Status st = thread_pool->update_max_threads(config::make_snapshot_worker_count); - return st; + return thread_pool->update_max_threads(config::make_snapshot_worker_count); }); _config_callback.emplace("release_snapshot_worker_count", [&]() -> Status { auto thread_pool = ExecEnv::GetInstance()->agent_server()->get_thread_pool(TTaskType::RELEASE_SNAPSHOT); - Status st = thread_pool->update_max_threads(config::release_snapshot_worker_count); - return st; + return thread_pool->update_max_threads(config::release_snapshot_worker_count); }); _config_callback.emplace("pipeline_connector_scan_thread_num_per_cpu", [&]() -> Status { LOG(INFO) << "set pipeline_connector_scan_thread_num_per_cpu:" @@ -309,15 +296,13 @@ Status UpdateConfigAction::update_config(const std::string& name, const std::str _config_callback.emplace("create_tablet_worker_count", [&]() -> Status { LOG(INFO) << "set create_tablet_worker_count:" << config::create_tablet_worker_count; auto thread_pool = ExecEnv::GetInstance()->agent_server()->get_thread_pool(TTaskType::CREATE); - Status st = thread_pool->update_max_threads(config::create_tablet_worker_count); - return st; + return thread_pool->update_max_threads(config::create_tablet_worker_count); }); _config_callback.emplace("number_tablet_writer_threads", [&]() -> Status { LOG(INFO) << "set number_tablet_writer_threads:" << config::number_tablet_writer_threads; bthreads::ThreadPoolExecutor* executor = static_cast( StorageEngine::instance()->async_delta_writer_executor()); - Status st = executor->get_thread_pool()->update_max_threads(config::number_tablet_writer_threads); - return st; + return executor->get_thread_pool()->update_max_threads(config::number_tablet_writer_threads); }); _config_callback.emplace("compact_threads", [&]() -> Status { auto tablet_manager = _exec_env->lake_tablet_manager();