Skip to content
Open

[test] #58218

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 199 additions & 0 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,8 @@ Status FragmentMgr::_get_or_create_query_ctx(const TPipelineFragmentParams& para
}
} else {
if (!query_ctx) {
LOG(INFO) << "[AI_CHECK]: Creating new QueryContext for query_id: "
<< print_id(query_id);
RETURN_IF_ERROR(_query_ctx_map.apply_if_not_exists(
query_id, query_ctx,
[&](phmap::flat_hash_map<TUniqueId, std::weak_ptr<QueryContext>>& map)
Expand Down Expand Up @@ -738,14 +740,211 @@ Status FragmentMgr::_get_or_create_query_ctx(const TPipelineFragmentParams& para

query_ctx->query_globals = params.query_globals;

LOG(INFO) << "[AI_CHECK]: TPipelineFragmentParams details:";
LOG(INFO) << " protocol_version=" << params.protocol_version;
LOG(INFO) << " query_id=" << params.query_id;
if (params.__isset.fragment_id) {
LOG(INFO) << " fragment_id=" << params.fragment_id;
}
LOG(INFO) << " per_exch_num_senders size="
<< params.per_exch_num_senders.size();
if (params.__isset.desc_tbl) {
LOG(INFO) << " desc_tbl is set";
}
if (params.__isset.resource_info) {
LOG(INFO) << " resource_info is set";
query_ctx->user = params.resource_info.user;
query_ctx->group = params.resource_info.group;
query_ctx->set_rsc_info = true;
}
LOG(INFO) << " destinations size=" << params.destinations.size();
if (params.__isset.num_senders) {
LOG(INFO) << " num_senders=" << params.num_senders;
}
if (params.__isset.send_query_statistics_with_every_batch) {
LOG(INFO) << " send_query_statistics_with_every_batch="
<< params.send_query_statistics_with_every_batch;
}
if (params.__isset.coord) {
LOG(INFO) << " coord is set";
}
if (params.__isset.query_globals) {
LOG(INFO) << " query_globals is set";
}
if (params.__isset.query_options) {
LOG(INFO) << " query_options is set";
}
if (params.__isset.import_label) {
LOG(INFO) << " import_label=" << params.import_label;
}
if (params.__isset.db_name) {
LOG(INFO) << " db_name=" << params.db_name;
}
if (params.__isset.load_job_id) {
LOG(INFO) << " load_job_id=" << params.load_job_id;
}
if (params.__isset.load_error_hub_info) {
LOG(INFO) << " load_error_hub_info is set";
}
if (params.__isset.fragment_num_on_host) {
LOG(INFO) << " fragment_num_on_host=" << params.fragment_num_on_host;
}
if (params.__isset.backend_id) {
LOG(INFO) << " backend_id=" << params.backend_id;
}
if (params.__isset.need_wait_execution_trigger) {
LOG(INFO) << " need_wait_execution_trigger="
<< params.need_wait_execution_trigger;
}
if (params.__isset.instances_sharing_hash_table) {
LOG(INFO) << " instances_sharing_hash_table size="
<< params.instances_sharing_hash_table.size();
}
if (params.__isset.is_simplified_param) {
LOG(INFO) << " is_simplified_param=" << params.is_simplified_param;
}
if (params.__isset.global_dict) {
LOG(INFO) << " global_dict is set";
}
if (params.__isset.fragment) {
LOG(INFO) << " fragment is set";
}
LOG(INFO) << " local_params size=" << params.local_params.size();
if (params.__isset.workload_groups) {
LOG(INFO) << " workload_groups size=" << params.workload_groups.size();
}
if (params.__isset.txn_conf) {
LOG(INFO) << " txn_conf is set";
}
if (params.__isset.table_name) {
LOG(INFO) << " table_name=" << params.table_name;
}
if (params.__isset.file_scan_params) {
LOG(INFO)
<< " file_scan_params size=" << params.file_scan_params.size();
}
if (params.__isset.group_commit) {
LOG(INFO) << " group_commit=" << params.group_commit;
}
if (params.__isset.load_stream_per_node) {
LOG(INFO) << " load_stream_per_node=" << params.load_stream_per_node;
}
if (params.__isset.total_load_streams) {
LOG(INFO) << " total_load_streams=" << params.total_load_streams;
}
if (params.__isset.num_local_sink) {
LOG(INFO) << " num_local_sink=" << params.num_local_sink;
}
if (params.__isset.num_buckets) {
LOG(INFO) << " num_buckets=" << params.num_buckets;
}
if (params.__isset.bucket_seq_to_instance_idx) {
LOG(INFO) << " bucket_seq_to_instance_idx size="
<< params.bucket_seq_to_instance_idx.size();
}
if (params.__isset.per_node_shared_scans) {
LOG(INFO) << " per_node_shared_scans size="
<< params.per_node_shared_scans.size();
}
if (params.__isset.parallel_instances) {
LOG(INFO) << " parallel_instances=" << params.parallel_instances;
}
if (params.__isset.total_instances) {
LOG(INFO) << " total_instances=" << params.total_instances;
}
if (params.__isset.shuffle_idx_to_instance_idx) {
LOG(INFO) << " shuffle_idx_to_instance_idx size="
<< params.shuffle_idx_to_instance_idx.size();
}
if (params.__isset.is_nereids) {
LOG(INFO) << "[AI_CHECK]: is_nereids=" << params.is_nereids;
}
if (params.__isset.wal_id) {
LOG(INFO) << " wal_id=" << params.wal_id;
}
if (params.__isset.content_length) {
LOG(INFO) << " content_length=" << params.content_length;
}
if (params.__isset.current_connect_fe) {
LOG(INFO) << " current_connect_fe is set";
}
if (params.__isset.topn_filter_source_node_ids) {
LOG(INFO) << " topn_filter_source_node_ids size="
<< params.topn_filter_source_node_ids.size();
}
if (params.__isset.is_mow_table) {
LOG(INFO) << " is_mow_table=" << params.is_mow_table;
}

// Log Nereids-specific fields
if (params.__isset.is_nereids) {
LOG(INFO)
<< "[AI_CHECK][NEREIDS] Nereids-specific fields: "
<< "fragment_id="
<< (params.__isset.fragment_id
? std::to_string(params.fragment_id)
: "NOT_SET")
<< ", backend_id="
<< (params.__isset.backend_id
? std::to_string(params.backend_id)
: "NOT_SET")
<< ", fragment_num_on_host="
<< (params.__isset.fragment_num_on_host
? std::to_string(params.fragment_num_on_host)
: "NOT_SET")
<< ", need_wait_execution_trigger="
<< (params.__isset.need_wait_execution_trigger
? (params.need_wait_execution_trigger ? "true"
: "false")
: "NOT_SET")
<< ", total_instances="
<< (params.__isset.total_instances
? std::to_string(params.total_instances)
: "NOT_SET")
<< ", bucket_seq_to_instance_idx="
<< (params.__isset.bucket_seq_to_instance_idx
? ("size=" +
std::to_string(params.bucket_seq_to_instance_idx
.size()))
: "NOT_SET")
<< ", shuffle_idx_to_instance_idx="
<< (params.__isset.shuffle_idx_to_instance_idx
? ("size=" +
std::to_string(params.shuffle_idx_to_instance_idx
.size()))
: "NOT_SET")
<< ", file_scan_params="
<< (params.__isset.file_scan_params
? ("size=" +
std::to_string(params.file_scan_params.size()))
: "NOT_SET")
<< ", num_buckets="
<< (params.__isset.num_buckets
? std::to_string(params.num_buckets)
: "NOT_SET")
<< ", parallel_instances="
<< (params.__isset.parallel_instances
? std::to_string(params.parallel_instances)
: "NOT_SET")
<< ", ai_resources="
<< (params.__isset.ai_resources
? ("size=" +
std::to_string(params.ai_resources.size()))
: "NOT_SET");
}

if (params.__isset.resource_info) {
query_ctx->user = params.resource_info.user;
query_ctx->group = params.resource_info.group;
query_ctx->set_rsc_info = true;
}

if (params.__isset.ai_resources) {
LOG(INFO) << "[AI_CHECK]: AI resources count: "
<< params.ai_resources.size();
query_ctx->set_ai_resources(params.ai_resources);
} else {
LOG(INFO) << "[AI_CHECK]: No AI resources set in fragment params.";
}

RETURN_IF_ERROR(query_ctx->set_workload_group(workload_group_ptr));
Expand Down
7 changes: 7 additions & 0 deletions be/src/runtime/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -259,14 +259,21 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
}

void set_ai_resources(std::map<std::string, TAIResource> ai_resources) {
for (const auto& [resource_name, ai_resource] : ai_resources) {
LOG(INFO) << "AI Resource: name=" << resource_name
<< ", model=" << ai_resource.model_name
<< ", api_key=" << (ai_resource.__isset.api_key ? "[SET]" : "[NOT_SET]");
}
_ai_resources =
std::make_unique<std::map<std::string, TAIResource>>(std::move(ai_resources));
}

const std::map<std::string, TAIResource>& get_ai_resources() const {
if (_ai_resources == nullptr) {
LOG(INFO) << "[AI_CHECK]: AI resources not set in QueryContext.";
throw Status::InternalError("AI resources not found");
}
LOG(INFO) << "[AI_CHECK]: Returning AI resources from QueryContext.";
return *_ai_resources;
}

Expand Down
Loading