Skip to content

Commit

Permalink
Bugfix: fix asan/tsan reported error
Browse files Browse the repository at this point in the history
Signed-off-by: Li Junlin <[email protected]>
  • Loading branch information
L-Xiafeng committed Nov 12, 2024
1 parent 8582f7a commit a649dff
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 22 deletions.
13 changes: 10 additions & 3 deletions src/Craned/CgroupManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -443,10 +443,17 @@ bool CgroupManager::ReleaseCgroup(uint32_t task_id, uid_t uid) {
return false;
}

auto task_id_set_ptr = it->second.GetExclusivePtr();
bool task_id_set_empty{false};
{
auto task_id_set_ptr = it->second.GetExclusivePtr();

task_id_set_ptr->erase(task_id);
if (task_id_set_ptr->empty()) uid_task_ids_map_ptr->erase(uid);
task_id_set_ptr->erase(task_id);
if (task_id_set_ptr->empty()) {
// In case of destroy a locked lock
task_id_set_empty=true;
}
}
if (task_id_set_empty)uid_task_ids_map_ptr->erase(uid);

return true;
}
Expand Down
4 changes: 2 additions & 2 deletions src/Craned/CtldClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ CtldClient::~CtldClient() {
m_thread_stop_ = true;

CRANE_TRACE("CtldClient is ending. Waiting for the thread to finish.");
m_async_send_thread_.join();
if(m_async_send_thread_.joinable())m_async_send_thread_.join();
}

void CtldClient::InitChannelAndStub(const std::string& server_address) {
Expand Down Expand Up @@ -83,7 +83,7 @@ void CtldClient::OnCraneCtldConnected() {
return;
}
}
} while (retry_time--);
} while (!m_thread_stop_ && retry_time--);

CRANE_ERROR("Failed to register actively.");
}
Expand Down
28 changes: 14 additions & 14 deletions src/Craned/TaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,6 @@ TaskManager::TaskManager() {
CleanSigchldQueueCb_();
});

// Exit Event
m_exit_event_async_handle_ = m_uvw_loop->resource<uvw::async_handle>();
m_exit_event_async_handle_->on<uvw::async_event>(
[this](const uvw::async_event&, uvw::async_handle&) { ExitEventCb_(); });

// Task Status Change Event
m_task_status_change_async_handle_ =
m_uvw_loop->resource<uvw::async_handle>();
Expand Down Expand Up @@ -178,6 +173,18 @@ TaskManager::TaskManager() {

m_uvw_thread = std::thread([this]() {
util::SetCurrentThreadName("TaskMgrLoopThr");
auto idle_handle = m_uvw_loop->resource<uvw::idle_handle>();
idle_handle->on<uvw::idle_event>(
[this](const uvw::idle_event&, uvw::idle_handle& h) {
if (m_task_cleared) {
h.parent().walk([](auto&& h) { h.close(); });
h.parent().stop();
}
std::this_thread::sleep_for(std::chrono::milliseconds(50));
});
if (idle_handle->start() != 0) {
CRANE_ERROR("Failed to start the idle event in TaskManager loop.");
}
m_uvw_loop->run();
});
}
Expand Down Expand Up @@ -398,6 +405,7 @@ void TaskManager::SigchldTimerCb_(ProcSigchldInfo* sigchld_info) {
}

void TaskManager::SigintCb_() {
absl::MutexLock lock_guard(&m_mtx_);
if (!m_is_ending_now_) {
// SIGINT has been sent once. If SIGINT are captured twice, it indicates
// the signal sender can't wait to stop Craned and Craned just send SIGTERM
Expand Down Expand Up @@ -462,18 +470,10 @@ void TaskManager::SigintCb_() {
}
}

void TaskManager::ExitEventCb_() {
CRANE_TRACE("Exit event triggered. Stop event loop.");

// Close all handle
m_uvw_loop->walk([](auto&& h) { h.close(); });
m_uvw_loop->stop();
}

void TaskManager::ActivateShutdownAsync_() {
CRANE_TRACE("Triggering exit event...");
CRANE_ASSERT(m_is_ending_now_ == true);
m_exit_event_async_handle_->send();
m_task_cleared = true;
}

void TaskManager::Wait() {
Expand Down
6 changes: 3 additions & 3 deletions src/Craned/TaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -431,9 +431,6 @@ class TaskManager {
std::shared_ptr<uvw::async_handle> m_process_sigchld_async_handle_;
ConcurrentQueue<std::unique_ptr<ProcSigchldInfo>> m_sigchld_queue_;

// When this event is triggered, the event loop will exit.
std::shared_ptr<uvw::async_handle> m_exit_event_async_handle_;

std::shared_ptr<uvw::async_handle> m_task_status_change_async_handle_;
ConcurrentQueue<TaskStatusChangeQueueElem> m_task_status_change_queue_;

Expand All @@ -454,6 +451,9 @@ class TaskManager {
// ev_sigchld_cb_ will stop the event loop when there is no task running.
std::atomic_bool m_is_ending_now_{false};

// After m_is_ending_now_ set to true, when all task are cleared, we can exit.
std::atomic_bool m_task_cleared{false};

std::thread m_uvw_thread;

static inline TaskManager* m_instance_ptr_;
Expand Down

0 comments on commit a649dff

Please sign in to comment.