Skip to content

Commit ada3074

Browse files
authored
[bugfix](memory) should count memory when cancel query is called (#58252)
*** Current BE git commitID: 4945aa0 *** *** SIGABRT unknown detail explain (@0x3c54) received by PID 15444 (TID 16396 OR 0x7b2e9f39b700) from PID 15444; stack trace: *** 0# doris::signal::(anonymous namespace)::FailureSignalHandler(int, siginfo_t*, void*) at /root/doris/be/src/common/signal_handler.h:420 1# 0x00007F3421D1D420 in /lib/x86_64-linux-gnu/libpthread.so.0 2# raise at ../sysdeps/unix/sysv/linux/raise.c:51 3# abort at /build/glibc-SzIz7B/glibc-2.31/stdlib/abort.c:81 4# 0x000055A425A57985 in /home/work/unlimit_teamcity/TeamCity/Agents/20251120181904agent_172.16.0.6_1/work/60183217f6ee2a9c/output/be/lib/doris_be 5# 0x000055A425A4923A in /home/work/unlimit_teamcity/TeamCity/Agents/20251120181904agent_172.16.0.6_1/work/60183217f6ee2a9c/output/be/lib/doris_be 6# google::LogMessage::SendToLog() in /home/work/unlimit_teamcity/TeamCity/Agents/20251120181904agent_172.16.0.6_1/work/60183217f6ee2a9c/output/be/lib/doris_be 7# google::LogMessage::Flush() in /home/work/unlimit_teamcity/TeamCity/Agents/20251120181904agent_172.16.0.6_1/work/60183217f6ee2a9c/output/be/lib/doris_be 8# google::LogMessageFatal::~LogMessageFatal() in /home/work/unlimit_teamcity/TeamCity/Agents/20251120181904agent_172.16.0.6_1/work/60183217f6ee2a9c/output/be/lib/doris_be 9# doris::Status doris::Status::FatalError<true, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >&>(std::basic_string_view<char, std::char_traits<char> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >&) at /root/doris/be/src/common/status.h:467 10# doris::io::LocalFileReader::read_at_impl(unsigned long, doris::Slice, unsigned long*, doris::io::IOContext const*) at /root/doris/be/src/io/fs/local_file_reader.cpp:151 11# doris::io::FileReader::read_at(unsigned long, doris::Slice, unsigned long*, doris::io::IOContext const*) at /root/doris/be/src/io/fs/file_reader.cpp:34 12# doris::io::S3FileSystem::upload_impl(std::filesystem::__cxx11::path const&, std::filesystem::__cxx11::path const&) at /root/doris/be/src/io/fs/s3_file_system.cpp:339 13# doris::io::RemoteFileSystem::upload(std::filesystem::__cxx11::path const&, std::filesystem::__cxx11::path const&) at /root/doris/be/src/io/fs/remote_file_system.cpp:34 14# doris::RuntimeState::get_error_log_file_path[abi:cxx11]() at /root/doris/be/src/runtime/runtime_state.cpp:418 15# doris::pipeline::PipelineFragmentContext::get_load_error_url[abi:cxx11]() at /root/doris/be/src/pipeline/pipeline_fragment_context.cpp:1806 16# doris::pipeline::PipelineFragmentContext::cancel(doris::Status) at /root/doris/be/src/pipeline/pipeline_fragment_context.cpp:202 17# doris::QueryContext::cancel_all_pipeline_context(doris::Status const&, int) in /home/work/unlimit_teamcity/TeamCity/Agents/20251120181904agent_172.16.0.6_1/work/60183217f6ee2a9c/output/be/lib/doris_be 18# doris::QueryContext::cancel(doris::Status, int) in /home/work/unlimit_teamcity/TeamCity/Agents/20251120181904agent_172.16.0.6_1/work/60183217f6ee2a9c/output/be/lib/doris_be 19# doris::FragmentMgr::cancel_query(doris::TUniqueId, doris::Status) at /root/doris/be/src/runtime/fragment_mgr.cpp:915 20# std::_Function_handler<void (), doris::PInternalService::cancel_plan_fragment(google::protobuf::RpcController*, doris::PCancelPlanFragmentRequest const*, doris::PCancelPlanFragmentResult*, google::protobuf::Closure*)::$_0>::_M_invoke(std::_Any_data const&) at /usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/std_function.h:292 21# doris::WorkThreadPool<false>::work_thread(int) in /home/work/unlimit_teamcity/TeamCity/Agents/20251120181904agent_172.16.0.6_1/work/60183217f6ee2a9c/output/be/lib/doris_be 22# execute_native_thread_routine in /home/work/unlimit_teamcity/TeamCity/Agents/20251120181904agent_172.16.0.6_1/work/60183217f6ee2a9c/output/be/lib/doris_be 23# asan_thread_start(void*) in /home/work/unlimit_teamcity/TeamCity/Agents/20251120181904agent_172.16.0.6_1/work/60183217f6ee2a9c/output/be/lib/doris_be ### Release note None ### Check List (For Author) - Test <!-- At least one of them must be included. --> - [ ] Regression test - [ ] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason <!-- Add your reason? --> - Behavior changed: - [ ] No. - [ ] Yes. <!-- Explain the behavior change --> - Does this need documentation? - [ ] No. - [ ] Yes. <!-- Add document PR link here. eg: apache/doris-website#1214 --> ### Check List (For Reviewer who merge this PR) - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label <!-- Add branch pick label that this PR should merge into -->
1 parent a5f36a1 commit ada3074

8 files changed

+15
-46
lines changed

be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,7 @@ Status PartitionedAggSinkLocalState::revoke_memory(
441441
status = Status::InternalError(
442442
"fault_inject partitioned_agg_sink "
443443
"revoke_memory canceled");
444-
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, status);
444+
state->get_query_ctx()->cancel(status);
445445
return status;
446446
});
447447
Defer defer {[&]() {

be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
302302
auto st = Status::InternalError(
303303
"fault_inject partitioned_agg_source "
304304
"merge spill data canceled");
305-
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, st);
305+
state->get_query_ctx()->cancel(st);
306306
return st;
307307
});
308308

be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -231,12 +231,12 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
231231
return Status::OK();
232232
};
233233

234-
auto exception_catch_func = [query_id, spill_func]() {
234+
auto exception_catch_func = [query_id, state, spill_func]() {
235235
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_cancel", {
236236
auto status = Status::InternalError(
237237
"fault_inject partitioned_hash_join_probe "
238238
"spill_probe_blocks canceled");
239-
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, status);
239+
state->get_query_ctx()->cancel(status);
240240
return status;
241241
});
242242

@@ -347,12 +347,13 @@ Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
347347
return status;
348348
};
349349

350-
auto exception_catch_func = [read_func, query_id]() {
350+
auto exception_catch_func = [read_func, state, query_id]() {
351351
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recover_build_blocks_cancel", {
352352
auto status = Status::InternalError(
353353
"fault_inject partitioned_hash_join_probe "
354354
"recover_build_blocks canceled");
355-
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, status);
355+
356+
state->get_query_ctx()->cancel(status);
356357
return status;
357358
});
358359

@@ -451,12 +452,12 @@ Status PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(Runtim
451452
return st;
452453
};
453454

454-
auto exception_catch_func = [read_func, query_id]() {
455+
auto exception_catch_func = [read_func, state, query_id]() {
455456
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recover_probe_blocks_cancel", {
456457
auto status = Status::InternalError(
457458
"fault_inject partitioned_hash_join_probe "
458459
"recover_probe_blocks canceled");
459-
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, status);
460+
state->get_query_ctx()->cancel(status);
460461
return status;
461462
});
462463

be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -368,12 +368,12 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
368368

369369
SpillSinkRunnable spill_runnable(
370370
state, nullptr, operator_profile(),
371-
[this, query_id] {
371+
[this, state, query_id] {
372372
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_sink::revoke_memory_cancel", {
373373
auto status = Status::InternalError(
374374
"fault_inject partitioned_hash_join_sink "
375375
"revoke_memory canceled");
376-
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, status);
376+
state->get_query_ctx()->cancel(status);
377377
return status;
378378
});
379379
SCOPED_TIMER(_spill_build_timer);

be/src/pipeline/exec/spill_sort_sink_operator.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -264,12 +264,12 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
264264
return Status::OK();
265265
};
266266

267-
auto exception_catch_func = [query_id, spill_func]() {
267+
auto exception_catch_func = [query_id, state, spill_func]() {
268268
DBUG_EXECUTE_IF("fault_inject::spill_sort_sink::revoke_memory_cancel", {
269269
auto status = Status::InternalError(
270270
"fault_inject spill_sort_sink "
271271
"revoke_memory canceled");
272-
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, status);
272+
state->get_query_ctx()->cancel(status);
273273
return status;
274274
});
275275

be/src/runtime/fragment_mgr.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -912,6 +912,7 @@ void FragmentMgr::cancel_query(const TUniqueId query_id, const Status reason) {
912912
return;
913913
}
914914
}
915+
SCOPED_ATTACH_TASK(query_ctx->resource_ctx());
915916
query_ctx->cancel(reason);
916917
remove_query_context(query_id);
917918
LOG(INFO) << "Query " << print_id(query_id)

be/src/runtime/thread_context.cpp

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -122,19 +122,4 @@ AddThreadMemTrackerConsumer::~AddThreadMemTrackerConsumer() {
122122
ThreadLocalHandle::del_thread_local_if_count_is_zero();
123123
}
124124

125-
AddThreadMemTrackerConsumerByHook::AddThreadMemTrackerConsumerByHook(
126-
const std::shared_ptr<MemTracker>& mem_tracker)
127-
: _mem_tracker(mem_tracker) {
128-
ThreadLocalHandle::create_thread_local_if_not_exits();
129-
DCHECK(mem_tracker != nullptr);
130-
use_mem_hook = true;
131-
thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(_mem_tracker.get());
132-
}
133-
134-
AddThreadMemTrackerConsumerByHook::~AddThreadMemTrackerConsumerByHook() {
135-
thread_context()->thread_mem_tracker_mgr->pop_consumer_tracker();
136-
use_mem_hook = false;
137-
ThreadLocalHandle::del_thread_local_if_count_is_zero();
138-
}
139-
140125
} // namespace doris

be/src/runtime/thread_context.h

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -80,13 +80,6 @@
8080
#define SCOPED_PEAK_MEM(peak_mem) \
8181
auto VARNAME_LINENUM(scope_peak_mem) = doris::ScopedPeakMem(peak_mem)
8282

83-
// Count a code segment memory (memory malloc - memory free) to MemTracker.
84-
// Compared to count `scope_mem`, MemTracker is easier to observe from the outside and is thread-safe.
85-
// Usage example: std::unique_ptr<MemTracker> tracker = std::make_unique<MemTracker>("first_tracker");
86-
// { SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(_mem_tracker.get()); xxx; xxx; }
87-
#define SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(mem_tracker) \
88-
auto VARNAME_LINENUM(add_mem_consumer) = doris::AddThreadMemTrackerConsumerByHook(mem_tracker)
89-
9083
#define SCOPED_SKIP_MEMORY_CHECK() \
9184
auto VARNAME_LINENUM(scope_skip_memory_check) = doris::ScopeSkipMemoryCheck()
9285

@@ -154,8 +147,6 @@ static std::string NO_THREAD_CONTEXT_MSG =
154147
// Is true after ThreadContext construction.
155148
inline thread_local bool pthread_context_ptr_init = false;
156149
inline thread_local constinit ThreadContext* thread_context_ptr = nullptr;
157-
// use mem hook to consume thread mem tracker.
158-
inline thread_local bool use_mem_hook = false;
159150

160151
// The thread context saves some info about a working thread.
161152
// 2 required info:
@@ -383,15 +374,6 @@ class AddThreadMemTrackerConsumer {
383374
bool _need_pop = false;
384375
};
385376

386-
class AddThreadMemTrackerConsumerByHook {
387-
public:
388-
explicit AddThreadMemTrackerConsumerByHook(const std::shared_ptr<MemTracker>& mem_tracker);
389-
~AddThreadMemTrackerConsumerByHook();
390-
391-
private:
392-
std::shared_ptr<MemTracker> _mem_tracker;
393-
};
394-
395377
class ScopeSkipMemoryCheck {
396378
public:
397379
explicit ScopeSkipMemoryCheck() {
@@ -409,7 +391,7 @@ class ScopeSkipMemoryCheck {
409391
// must call create_thread_local_if_not_exits() before use thread_context().
410392
#define CONSUME_THREAD_MEM_TRACKER(size) \
411393
do { \
412-
if (size == 0 || doris::use_mem_hook) { \
394+
if (size == 0) { \
413395
break; \
414396
} \
415397
if (doris::pthread_context_ptr_init) { \

0 commit comments

Comments
 (0)