Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] consider offline cores in BE (backport #52728) #52742

Merged
merged 1 commit into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 78 additions & 40 deletions be/src/util/cpu_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ int CpuInfo::max_num_numa_nodes_;
std::unique_ptr<int[]> CpuInfo::core_to_numa_node_;
std::vector<vector<int>> CpuInfo::numa_node_to_cores_;
std::vector<size_t> CpuInfo::cpuset_cores_;
std::set<size_t> CpuInfo::offline_cores_;
std::vector<int> CpuInfo::numa_node_core_idx_;

static struct {
Expand Down Expand Up @@ -161,6 +162,7 @@ void CpuInfo::init() {
if (num_cores > 0) {
num_cores_ = num_cores;
}
_init_offline_cores();
_init_num_cores_with_cgroup();
if (num_cores_ <= 0) {
num_cores_ = 1;
Expand Down Expand Up @@ -229,6 +231,40 @@ void CpuInfo::_init_numa() {
_init_numa_node_to_cores();
}

std::vector<size_t> CpuInfo::parse_cpus(const std::string& cpus_str) {
std::vector<size_t> cpuids;
std::vector<std::string> fields = strings::Split(cpus_str, ",", strings::SkipWhitespace());
for (const auto& field : fields) {
StringParser::ParseResult result;
if (field.find('-') == std::string::npos) {
auto cpu_id = StringParser::string_to_int<int32_t>(field.data(), field.size(), &result);
if (result == StringParser::PARSE_SUCCESS) {
cpuids.emplace_back(cpu_id);
}
continue;
}

std::vector<std::string> pair = strings::Split(field, "-", strings::SkipWhitespace());
if (pair.size() != 2) {
continue;
}
std::string& start_str = pair[0];
std::string& end_str = pair[1];
auto start = StringParser::string_to_int<int32_t>(start_str.data(), start_str.size(), &result);
if (result != StringParser::PARSE_SUCCESS) {
continue;
}
auto end = StringParser::string_to_int<int32_t>(end_str.data(), end_str.size(), &result);
if (result != StringParser::PARSE_SUCCESS) {
continue;
}
for (int i = start; i <= end; i++) {
cpuids.emplace_back(i);
}
}
return cpuids;
}

void CpuInfo::_init_num_cores_with_cgroup() {
bool running_in_docker = fs::path_exist("/.dockerenv");
if (!running_in_docker) {
Expand All @@ -240,40 +276,6 @@ void CpuInfo::_init_num_cores_with_cgroup() {
return;
}

auto parse_cpusets = [](const std::string& cpuset_str) {
std::vector<size_t> cpuids;
std::vector<std::string> fields = strings::Split(cpuset_str, ",", strings::SkipWhitespace());
for (const auto& field : fields) {
StringParser::ParseResult result;
if (field.find('-') == std::string::npos) {
auto cpu_id = StringParser::string_to_int<int32_t>(field.data(), field.size(), &result);
if (result == StringParser::PARSE_SUCCESS) {
cpuids.emplace_back(cpu_id);
}
continue;
}

std::vector<std::string> pair = strings::Split(field, "-", strings::SkipWhitespace());
if (pair.size() != 2) {
continue;
}
std::string& start_str = pair[0];
std::string& end_str = pair[1];
auto start = StringParser::string_to_int<int32_t>(start_str.data(), start_str.size(), &result);
if (result != StringParser::PARSE_SUCCESS) {
continue;
}
auto end = StringParser::string_to_int<int32_t>(end_str.data(), end_str.size(), &result);
if (result != StringParser::PARSE_SUCCESS) {
continue;
}
for (int i = start; i <= end; i++) {
cpuids.emplace_back(i);
}
}
return cpuids;
};

std::string cfs_period_us_str;
std::string cfs_quota_us_str;
std::string cpuset_str;
Expand Down Expand Up @@ -323,7 +325,8 @@ void CpuInfo::_init_num_cores_with_cgroup() {
int32_t cpuset_num_cores = num_cores_;
if (!cpuset_str.empty() &&
std::any_of(cpuset_str.begin(), cpuset_str.end(), [](char c) { return !std::isspace(c); })) {
cpuset_cores_ = parse_cpusets(cpuset_str);
cpuset_cores_ = parse_cpus(cpuset_str);
std::erase_if(cpuset_cores_, [&](const size_t core) { return offline_cores_.contains(core); });
cpuset_num_cores = cpuset_cores_.size();
is_cgroup_with_cpuset_ = true;
}
Expand All @@ -346,6 +349,17 @@ void CpuInfo::_init_numa_node_to_cores() {
}
}

void CpuInfo::_init_offline_cores() {
offline_cores_.clear();
std::string offline_cores_str;
if (!FileUtil::read_whole_content("/sys/devices/system/cpu/offline", offline_cores_str)) {
return;
}

std::vector<size_t> offline_cores = parse_cpus(offline_cores_str);
offline_cores_.insert(offline_cores.begin(), offline_cores.end());
}

int CpuInfo::get_current_core() {
// sched_getcpu() is not supported on some old kernels/glibcs (like the versions that
// shipped with CentOS 5). In that case just pretend we're always running on CPU 0
Expand Down Expand Up @@ -429,18 +443,42 @@ std::string CpuInfo::debug_string() {
stream << " " << core << "->" << core_to_numa_node_[core] << " |";
}
stream << std::endl;

auto print_cores = [&stream](const std::string& title, const auto& cores) {
stream << " " << title << ": ";
if (cores.empty()) {
stream << "None";
} else {
bool is_first = true;
for (const int core : cores) {
if (!is_first) {
stream << ",";
}
is_first = false;
stream << core;
}
}
stream << std::endl;
};

print_cores("Cores from CGroup CPUSET", cpuset_cores_);
print_cores("Offline Cores", offline_cores_);

return stream.str();
}

std::vector<size_t> CpuInfo::get_core_ids() {
std::vector<size_t> core_ids;
if (!cpuset_cores_.empty()) {
return cpuset_cores_;
core_ids = cpuset_cores_;
} else {
for (const auto& core_ids_of_node : numa_node_to_cores_) {
core_ids.insert(core_ids.end(), core_ids_of_node.begin(), core_ids_of_node.end());
}
}

std::vector<size_t> core_ids;
for (const auto& core_ids_of_node : numa_node_to_cores_) {
core_ids.insert(core_ids.end(), core_ids_of_node.begin(), core_ids_of_node.end());
}
std::erase_if(core_ids, [&](const size_t core) { return offline_cores_.contains(core); });

return core_ids;
}

Expand Down
8 changes: 8 additions & 0 deletions be/src/util/cpu_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <boost/cstdint.hpp>
#include <memory>
#include <set>
#include <string>
#include <vector>

Expand Down Expand Up @@ -87,6 +88,9 @@ class CpuInfo {
static bool is_cgroup_with_cpuset() { return is_cgroup_with_cpuset_; }
static bool is_cgroup_with_cpu_quota() { return is_cgroup_with_cpu_quota_; }

/// Parse a string-formatted cpus in the format "0-3,5,7-9" and return the parsed core IDs.
static std::vector<size_t> parse_cpus(const std::string& cpus_str);

private:
/// Initialize NUMA-related state - called from Init();
static void _init_numa();
Expand All @@ -98,6 +102,9 @@ class CpuInfo {
/// 'core_to_numa_node_'. Called from InitNuma();
static void _init_numa_node_to_cores();

/// Initialize 'core_to_numa_node_' from `/sys/devices/system/cpu/offline`.
static void _init_offline_cores();

/// Populates the arguments with information about this machine's caches.
/// The values returned are not reliable in some environments, e.g. RHEL5 on EC2, so
/// so we will keep this as a private method.
Expand All @@ -123,6 +130,7 @@ class CpuInfo {
/// belonging to that NUMA node.
static std::vector<std::vector<int>> numa_node_to_cores_;
static std::vector<size_t> cpuset_cores_;
static std::set<size_t> offline_cores_;

/// Array with 'max_num_cores_' entries, each of which is the index of that core in its
/// NUMA node.
Expand Down
30 changes: 30 additions & 0 deletions be/test/gutil/cpu_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

#include <gtest/gtest.h>

#include "util/cpu_info.h"

namespace starrocks {

TEST(CpuInfoTest, hardware_support) {
Expand Down Expand Up @@ -44,4 +46,32 @@ TEST(CpuInfoTest, hardware_support) {
#endif
}

TEST(CpuInfoTest, parse_cpus) {
auto assert_cpu_equals = [](std::vector<size_t>& cpus, std::vector<size_t>& expected_cpus) {
ASSERT_EQ(expected_cpus.size(), cpus.size());
std::ranges::sort(cpus);
std::ranges::sort(expected_cpus);
for (size_t i = 0; i < cpus.size(); ++i) {
EXPECT_EQ(expected_cpus[i], cpus[i]);
}
};

{
std::vector<size_t> cpus = CpuInfo::parse_cpus("0-3,5,7,9-10");
std::vector<size_t> expected_cpus = {0, 1, 2, 3, 5, 7, 9, 10};
assert_cpu_equals(cpus, expected_cpus);
}

{
const std::vector<size_t> cpus = CpuInfo::parse_cpus("");
EXPECT_TRUE(cpus.empty());
}

{
std::vector<size_t> cpus = CpuInfo::parse_cpus("abc,1-,2-abc,3-5,,8");
std::vector<size_t> expected_cpus = {3, 4, 5, 8};
assert_cpu_equals(cpus, expected_cpus);
}
}

} // namespace starrocks
Loading