Skip to content

Commit

Permalink
fix clang format
Browse files Browse the repository at this point in the history
Signed-off-by: edwinhzhang <[email protected]>
  • Loading branch information
zhangheihei committed Oct 28, 2024
1 parent 4509859 commit b9dd5b5
Showing 1 changed file with 14 additions and 29 deletions.
43 changes: 14 additions & 29 deletions be/src/http/action/update_config_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,8 @@ Status UpdateConfigAction::update_config(const std::string& name, const std::str
return Status::InvalidArgument(
"This parameter is mutable when the Event-based Compaction Framework is enabled.");
}
Status st = StorageEngine::instance()->compaction_manager()->update_max_threads(
return StorageEngine::instance()->compaction_manager()->update_max_threads(
config::max_compaction_concurrency);
return st;
});
_config_callback.emplace("flush_thread_num_per_store", [&]() -> Status {
const size_t dir_cnt = StorageEngine::instance()->get_stores().size();
Expand All @@ -149,9 +148,8 @@ Status UpdateConfigAction::update_config(const std::string& name, const std::str
return st1;
});
_config_callback.emplace("lake_flush_thread_num_per_store", [&]() -> Status {
Status st = StorageEngine::instance()->lake_memtable_flush_executor()->update_max_threads(
return StorageEngine::instance()->lake_memtable_flush_executor()->update_max_threads(
MemTableFlushExecutor::calc_max_threads_for_lake_table(StorageEngine::instance()->get_stores()));
return st;
});
_config_callback.emplace("update_compaction_num_threads_per_disk", [&]() -> Status {
StorageEngine::instance()->increase_update_compaction_thread(
Expand All @@ -163,8 +161,7 @@ Status UpdateConfigAction::update_config(const std::string& name, const std::str
StorageEngine::instance()->update_manager()->get_pindex_compaction_mgr();
if (mgr != nullptr) {
const int max_pk_index_compaction_thread_cnt = std::max(1, config::pindex_major_compaction_num_threads);
Status st = mgr->update_max_threads(max_pk_index_compaction_thread_cnt);
return st;
return mgr->update_max_threads(max_pk_index_compaction_thread_cnt);
}
return Status::OK();
});
Expand All @@ -179,30 +176,26 @@ Status UpdateConfigAction::update_config(const std::string& name, const std::str
});
_config_callback.emplace("dictionary_cache_refresh_threadpool_size", [&]() -> Status {
if (_exec_env->dictionary_cache_pool() != nullptr) {
Status st = _exec_env->dictionary_cache_pool()->update_max_threads(
return _exec_env->dictionary_cache_pool()->update_max_threads(
config::dictionary_cache_refresh_threadpool_size);
return st;
}
return Status::OK();
});
_config_callback.emplace("transaction_publish_version_worker_count", [&]() -> Status {
auto thread_pool = ExecEnv::GetInstance()->agent_server()->get_thread_pool(TTaskType::PUBLISH_VERSION);
Status st = thread_pool->update_max_threads(
return thread_pool->update_max_threads(
std::max(MIN_TRANSACTION_PUBLISH_WORKER_COUNT, config::transaction_publish_version_worker_count));
return st;
});
_config_callback.emplace("transaction_publish_version_thread_pool_num_min", [&]() -> Status {
auto thread_pool = ExecEnv::GetInstance()->agent_server()->get_thread_pool(TTaskType::PUBLISH_VERSION);
Status st = thread_pool->update_min_threads(std::max(
return thread_pool->update_min_threads(std::max(
MIN_TRANSACTION_PUBLISH_WORKER_COUNT, config::transaction_publish_version_thread_pool_num_min));
return st;
});
_config_callback.emplace("parallel_clone_task_per_path", [&]() -> Status {
_exec_env->agent_server()->update_max_thread_by_type(TTaskType::CLONE,
config::parallel_clone_task_per_path);
return Status::OK();
});

_config_callback.emplace("make_snapshot_worker_count", [&]() -> Status {
_exec_env->agent_server()->update_max_thread_by_type(TTaskType::MAKE_SNAPSHOT,
config::make_snapshot_worker_count);
Expand Down Expand Up @@ -257,39 +250,33 @@ Status UpdateConfigAction::update_config(const std::string& name, const std::str
if (config::transaction_apply_worker_count > 0) {
max_thread_cnt = config::transaction_apply_worker_count;
}
Status st = StorageEngine::instance()->update_manager()->apply_thread_pool()->update_max_threads(
return StorageEngine::instance()->update_manager()->apply_thread_pool()->update_max_threads(
max_thread_cnt);
return st;
});
_config_callback.emplace("transaction_apply_thread_pool_num_min", [&]() -> Status {
int min_thread_cnt = config::transaction_apply_thread_pool_num_min;
Status st = StorageEngine::instance()->update_manager()->apply_thread_pool()->update_min_threads(
return StorageEngine::instance()->update_manager()->apply_thread_pool()->update_min_threads(
min_thread_cnt);
return st;
});
_config_callback.emplace("get_pindex_worker_count", [&]() -> Status {
int max_thread_cnt = CpuInfo::num_cores();
if (config::get_pindex_worker_count > 0) {
max_thread_cnt = config::get_pindex_worker_count;
}
Status st = StorageEngine::instance()->update_manager()->get_pindex_thread_pool()->update_max_threads(
return StorageEngine::instance()->update_manager()->get_pindex_thread_pool()->update_max_threads(
max_thread_cnt);
return st;
});
_config_callback.emplace("drop_tablet_worker_count", [&]() -> Status {
auto thread_pool = ExecEnv::GetInstance()->agent_server()->get_thread_pool(TTaskType::DROP);
Status st = thread_pool->update_max_threads(config::drop_tablet_worker_count);
return st;
return thread_pool->update_max_threads(config::drop_tablet_worker_count);
});
_config_callback.emplace("make_snapshot_worker_count", [&]() -> Status {
auto thread_pool = ExecEnv::GetInstance()->agent_server()->get_thread_pool(TTaskType::MAKE_SNAPSHOT);
Status st = thread_pool->update_max_threads(config::make_snapshot_worker_count);
return st;
return thread_pool->update_max_threads(config::make_snapshot_worker_count);
});
_config_callback.emplace("release_snapshot_worker_count", [&]() -> Status {
auto thread_pool = ExecEnv::GetInstance()->agent_server()->get_thread_pool(TTaskType::RELEASE_SNAPSHOT);
Status st = thread_pool->update_max_threads(config::release_snapshot_worker_count);
return st;
return thread_pool->update_max_threads(config::release_snapshot_worker_count);
});
_config_callback.emplace("pipeline_connector_scan_thread_num_per_cpu", [&]() -> Status {
LOG(INFO) << "set pipeline_connector_scan_thread_num_per_cpu:"
Expand All @@ -309,15 +296,13 @@ Status UpdateConfigAction::update_config(const std::string& name, const std::str
_config_callback.emplace("create_tablet_worker_count", [&]() -> Status {
LOG(INFO) << "set create_tablet_worker_count:" << config::create_tablet_worker_count;
auto thread_pool = ExecEnv::GetInstance()->agent_server()->get_thread_pool(TTaskType::CREATE);
Status st = thread_pool->update_max_threads(config::create_tablet_worker_count);
return st;
return thread_pool->update_max_threads(config::create_tablet_worker_count);
});
_config_callback.emplace("number_tablet_writer_threads", [&]() -> Status {
LOG(INFO) << "set number_tablet_writer_threads:" << config::number_tablet_writer_threads;
bthreads::ThreadPoolExecutor* executor = static_cast<bthreads::ThreadPoolExecutor*>(
StorageEngine::instance()->async_delta_writer_executor());
Status st = executor->get_thread_pool()->update_max_threads(config::number_tablet_writer_threads);
return st;
return executor->get_thread_pool()->update_max_threads(config::number_tablet_writer_threads);
});
_config_callback.emplace("compact_threads", [&]() -> Status {
auto tablet_manager = _exec_env->lake_tablet_manager();
Expand Down

0 comments on commit b9dd5b5

Please sign in to comment.