Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ Status PartitionedAggSinkLocalState::revoke_memory(
status = Status::InternalError(
"fault_inject partitioned_agg_sink "
"revoke_memory canceled");
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, status);
state->get_query_ctx()->cancel(status);
return status;
});
Defer defer {[&]() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
auto st = Status::InternalError(
"fault_inject partitioned_agg_source "
"merge spill data canceled");
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, st);
state->get_query_ctx()->cancel(st);
return st;
});

Expand Down
13 changes: 7 additions & 6 deletions be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,12 +231,12 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
return Status::OK();
};

auto exception_catch_func = [query_id, spill_func]() {
auto exception_catch_func = [query_id, state, spill_func]() {
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_cancel", {
auto status = Status::InternalError(
"fault_inject partitioned_hash_join_probe "
"spill_probe_blocks canceled");
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, status);
state->get_query_ctx()->cancel(status);
return status;
});

Expand Down Expand Up @@ -347,12 +347,13 @@ Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
return status;
};

auto exception_catch_func = [read_func, query_id]() {
auto exception_catch_func = [read_func, state, query_id]() {
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recover_build_blocks_cancel", {
auto status = Status::InternalError(
"fault_inject partitioned_hash_join_probe "
"recover_build_blocks canceled");
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, status);

state->get_query_ctx()->cancel(status);
return status;
});

Expand Down Expand Up @@ -451,12 +452,12 @@ Status PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(Runtim
return st;
};

auto exception_catch_func = [read_func, query_id]() {
auto exception_catch_func = [read_func, state, query_id]() {
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recover_probe_blocks_cancel", {
auto status = Status::InternalError(
"fault_inject partitioned_hash_join_probe "
"recover_probe_blocks canceled");
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, status);
state->get_query_ctx()->cancel(status);
return status;
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,12 +368,12 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(

SpillSinkRunnable spill_runnable(
state, nullptr, operator_profile(),
[this, query_id] {
[this, state, query_id] {
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_sink::revoke_memory_cancel", {
auto status = Status::InternalError(
"fault_inject partitioned_hash_join_sink "
"revoke_memory canceled");
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, status);
state->get_query_ctx()->cancel(status);
return status;
});
SCOPED_TIMER(_spill_build_timer);
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/spill_sort_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,12 +264,12 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
return Status::OK();
};

auto exception_catch_func = [query_id, spill_func]() {
auto exception_catch_func = [query_id, state, spill_func]() {
DBUG_EXECUTE_IF("fault_inject::spill_sort_sink::revoke_memory_cancel", {
auto status = Status::InternalError(
"fault_inject spill_sort_sink "
"revoke_memory canceled");
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, status);
state->get_query_ctx()->cancel(status);
return status;
});

Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,7 @@ void FragmentMgr::cancel_query(const TUniqueId query_id, const Status reason) {
return;
}
}
SCOPED_ATTACH_TASK(query_ctx->resource_ctx());
query_ctx->cancel(reason);
remove_query_context(query_id);
LOG(INFO) << "Query " << print_id(query_id)
Expand Down
15 changes: 0 additions & 15 deletions be/src/runtime/thread_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,19 +122,4 @@ AddThreadMemTrackerConsumer::~AddThreadMemTrackerConsumer() {
ThreadLocalHandle::del_thread_local_if_count_is_zero();
}

AddThreadMemTrackerConsumerByHook::AddThreadMemTrackerConsumerByHook(
const std::shared_ptr<MemTracker>& mem_tracker)
: _mem_tracker(mem_tracker) {
ThreadLocalHandle::create_thread_local_if_not_exits();
DCHECK(mem_tracker != nullptr);
use_mem_hook = true;
thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(_mem_tracker.get());
}

AddThreadMemTrackerConsumerByHook::~AddThreadMemTrackerConsumerByHook() {
thread_context()->thread_mem_tracker_mgr->pop_consumer_tracker();
use_mem_hook = false;
ThreadLocalHandle::del_thread_local_if_count_is_zero();
}

} // namespace doris
20 changes: 1 addition & 19 deletions be/src/runtime/thread_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,6 @@
#define SCOPED_PEAK_MEM(peak_mem) \
auto VARNAME_LINENUM(scope_peak_mem) = doris::ScopedPeakMem(peak_mem)

// Count a code segment memory (memory malloc - memory free) to MemTracker.
// Compared to count `scope_mem`, MemTracker is easier to observe from the outside and is thread-safe.
// Usage example: std::unique_ptr<MemTracker> tracker = std::make_unique<MemTracker>("first_tracker");
// { SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(_mem_tracker.get()); xxx; xxx; }
#define SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(mem_tracker) \
auto VARNAME_LINENUM(add_mem_consumer) = doris::AddThreadMemTrackerConsumerByHook(mem_tracker)

#define SCOPED_SKIP_MEMORY_CHECK() \
auto VARNAME_LINENUM(scope_skip_memory_check) = doris::ScopeSkipMemoryCheck()

Expand Down Expand Up @@ -154,8 +147,6 @@ static std::string NO_THREAD_CONTEXT_MSG =
// Is true after ThreadContext construction.
inline thread_local bool pthread_context_ptr_init = false;
inline thread_local constinit ThreadContext* thread_context_ptr = nullptr;
// use mem hook to consume thread mem tracker.
inline thread_local bool use_mem_hook = false;

// The thread context saves some info about a working thread.
// 2 required info:
Expand Down Expand Up @@ -383,15 +374,6 @@ class AddThreadMemTrackerConsumer {
bool _need_pop = false;
};

class AddThreadMemTrackerConsumerByHook {
public:
explicit AddThreadMemTrackerConsumerByHook(const std::shared_ptr<MemTracker>& mem_tracker);
~AddThreadMemTrackerConsumerByHook();

private:
std::shared_ptr<MemTracker> _mem_tracker;
};

class ScopeSkipMemoryCheck {
public:
explicit ScopeSkipMemoryCheck() {
Expand All @@ -409,7 +391,7 @@ class ScopeSkipMemoryCheck {
// must call create_thread_local_if_not_exits() before use thread_context().
#define CONSUME_THREAD_MEM_TRACKER(size) \
do { \
if (size == 0 || doris::use_mem_hook) { \
if (size == 0) { \
break; \
} \
if (doris::pthread_context_ptr_init) { \
Expand Down