From 3a811ace5a52797c82677119821d3ee5213b74ea Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Mon, 16 Dec 2024 14:00:24 +0200 Subject: [PATCH] refactor results transformations --- .../cubejs-backend-native/src/node_export.rs | 35 +-- .../src/cubestore_result_transform.rs | 266 +++++++++++------- rust/cubeorchestrator/src/lib.rs | 2 +- .../src/{types.rs => transport.rs} | 53 ---- 4 files changed, 164 insertions(+), 192 deletions(-) rename rust/cubeorchestrator/src/{types.rs => transport.rs} (86%) diff --git a/packages/cubejs-backend-native/src/node_export.rs b/packages/cubejs-backend-native/src/node_export.rs index ae578437bb28a..fd768be8c53d0 100644 --- a/packages/cubejs-backend-native/src/node_export.rs +++ b/packages/cubejs-backend-native/src/node_export.rs @@ -36,12 +36,10 @@ use cubeorchestrator::cubestore_message_parser::CubeStoreResult; use cubesql::{telemetry::ReportingLogger, CubeError}; use cubeorchestrator::cubestore_result_transform::{ - get_final_cubestore_result, get_final_cubestore_result_array, get_final_cubestore_result_multi, - transform_data, -}; -use cubeorchestrator::types::{ - RequestResultArray, RequestResultData, RequestResultDataMulti, TransformDataRequest, + get_final_cubestore_result_array, RequestResultArray, RequestResultData, + RequestResultDataMulti, TransformedData, }; +use cubeorchestrator::transport::TransformDataRequest; use neon::prelude::*; use neon::types::buffer::TypedArray; @@ -567,20 +565,7 @@ fn transform_query_data(mut cx: FunctionContext) -> JsResult { Err(err) => return Err(anyhow::Error::from(err)), }; - let alias_to_member_name_map = &request_data.alias_to_member_name_map; - let annotation = &request_data.annotation; - let query = &request_data.query; - let query_type = &request_data.query_type.unwrap_or_default(); - let res_type = &request_data.res_type; - - let transformed = transform_data( - alias_to_member_name_map, - annotation, - &cube_store_result, - query, - query_type, - res_type.clone(), - )?; + let transformed = TransformedData::transform(&request_data, &cube_store_result)?; match serde_json::to_string(&transformed) { Ok(json) => Ok(json), @@ -622,11 +607,7 @@ fn final_cubestore_result(mut cx: FunctionContext) -> JsResult { Err(err) => return Err(anyhow::Error::from(err)), }; - get_final_cubestore_result( - &transform_request_data, - &cube_store_result, - &mut result_data, - )?; + result_data.prepare_results(&transform_request_data, &cube_store_result)?; match serde_json::to_string(&result_data) { Ok(json) => Ok(json), @@ -783,11 +764,7 @@ fn final_cubestore_result_multi(mut cx: FunctionContext) -> JsResult Err(err) => return Err(anyhow::Error::from(err)), }; - get_final_cubestore_result_multi( - &transform_requests, - &cube_store_results, - &mut result_data, - )?; + result_data.prepare_results(&transform_requests, &cube_store_results)?; match serde_json::to_string(&result_data) { Ok(json) => Ok(json), diff --git a/rust/cubeorchestrator/src/cubestore_result_transform.rs b/rust/cubeorchestrator/src/cubestore_result_transform.rs index c35c00d3922da..82fc4757c2f76 100644 --- a/rust/cubeorchestrator/src/cubestore_result_transform.rs +++ b/rust/cubeorchestrator/src/cubestore_result_transform.rs @@ -1,15 +1,16 @@ use crate::{ cubestore_message_parser::CubeStoreResult, - types::{ - ConfigItem, MembersMap, NormalizedQuery, QueryTimeDimension, QueryType, RequestResultData, - RequestResultDataMulti, ResultType, TransformDataRequest, TransformedData, - BLENDING_QUERY_KEY_PREFIX, BLENDING_QUERY_RES_SEPARATOR, COMPARE_DATE_RANGE_FIELD, - COMPARE_DATE_RANGE_SEPARATOR, MEMBER_SEPARATOR, + transport::{ + ConfigItem, MembersMap, NormalizedQuery, QueryTimeDimension, QueryType, ResultType, + TransformDataRequest, BLENDING_QUERY_KEY_PREFIX, BLENDING_QUERY_RES_SEPARATOR, + COMPARE_DATE_RANGE_FIELD, COMPARE_DATE_RANGE_SEPARATOR, MEMBER_SEPARATOR, }, }; use anyhow::{bail, Context, Result}; use chrono::{DateTime, SecondsFormat}; use itertools::multizip; +use serde::{Deserialize, Serialize}; +use serde_json::Value; use std::{ collections::{HashMap, HashSet}, sync::Arc, @@ -284,63 +285,6 @@ pub fn get_vanilla_row( Ok(row) } -/// Transforms queried data array to the output format. -pub fn transform_data( - alias_to_member_name_map: &HashMap, - annotation: &HashMap, - data: &CubeStoreResult, - query: &NormalizedQuery, - query_type: &QueryType, - res_type: Option, -) -> Result { - let members_to_alias_map = get_members( - query_type, - query, - data, - alias_to_member_name_map, - annotation, - )?; - let members: Vec = members_to_alias_map.keys().cloned().collect(); - - match res_type { - Some(ResultType::Compact) => { - let dataset: Vec<_> = data - .rows - .iter() - .map(|row| { - get_compact_row( - &members_to_alias_map, - annotation, - query_type, - &members, - query.time_dimensions.as_ref(), - row, - &data.columns_pos, - ) - }) - .collect::>>()?; - Ok(TransformedData::Compact { members, dataset }) - } - _ => { - let dataset: Vec<_> = data - .rows - .iter() - .map(|row| { - get_vanilla_row( - alias_to_member_name_map, - annotation, - query_type, - query, - row, - &data.columns_pos, - ) - }) - .collect::>>()?; - Ok(TransformedData::Vanilla(dataset)) - } - } -} - /// Helper to get a list if unique granularities from normalized queries pub fn get_query_granularities(queries: &[&NormalizedQuery]) -> Vec { queries @@ -414,31 +358,6 @@ pub fn get_pivot_query( Ok(pivot_query) } -pub fn get_final_cubestore_result( - request_data: &TransformDataRequest, - cube_store_result: &CubeStoreResult, - result_data: &mut RequestResultData, -) -> Result<()> { - let alias_to_member_name_map = &request_data.alias_to_member_name_map; - let annotation = &request_data.annotation; - let query = &request_data.query; - let query_type = &request_data.query_type.clone().unwrap_or_default(); - let res_type = &request_data.res_type; - - let transformed = transform_data( - alias_to_member_name_map, - annotation, - cube_store_result, - query, - query_type, - res_type.clone(), - )?; - - result_data.data = Some(transformed); - - Ok(()) -} - pub fn get_final_cubestore_result_array( transform_requests: &[TransformDataRequest], cube_store_results: &[Arc], @@ -449,35 +368,164 @@ pub fn get_final_cubestore_result_array( cube_store_results.iter(), result_data.iter_mut(), )) { - get_final_cubestore_result(transform_data, cube_store_result, result)?; + result.prepare_results(transform_data, cube_store_result)?; } Ok(()) } -pub fn get_final_cubestore_result_multi( - request_data: &[TransformDataRequest], - cube_store_result: &[Arc], - result_data: &mut RequestResultDataMulti, -) -> Result<()> { - for (transform_data, cube_store_result, result) in multizip(( - request_data.iter(), - cube_store_result.iter(), - result_data.results.iter_mut(), - )) { - get_final_cubestore_result(transform_data, cube_store_result, result)?; +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(untagged)] +pub enum TransformedData { + Compact { + members: Vec, + dataset: Vec>, + }, + Vanilla(Vec>), +} + +impl TransformedData { + /// Transforms queried data array to the output format. + pub fn transform( + request_data: &TransformDataRequest, + cube_store_result: &CubeStoreResult, + ) -> Result { + let alias_to_member_name_map = &request_data.alias_to_member_name_map; + let annotation = &request_data.annotation; + let query = &request_data.query; + let query_type = &request_data.query_type.clone().unwrap_or_default(); + let res_type = request_data.res_type.clone(); + + let members_to_alias_map = get_members( + &query_type, + query, + cube_store_result, + alias_to_member_name_map, + annotation, + )?; + let members: Vec = members_to_alias_map.keys().cloned().collect(); + + match res_type { + Some(ResultType::Compact) => { + let dataset: Vec<_> = cube_store_result + .rows + .iter() + .map(|row| { + get_compact_row( + &members_to_alias_map, + annotation, + &query_type, + &members, + query.time_dimensions.as_ref(), + row, + &cube_store_result.columns_pos, + ) + }) + .collect::>>()?; + Ok(TransformedData::Compact { members, dataset }) + } + _ => { + let dataset: Vec<_> = cube_store_result + .rows + .iter() + .map(|row| { + get_vanilla_row( + alias_to_member_name_map, + annotation, + &query_type, + query, + row, + &cube_store_result.columns_pos, + ) + }) + .collect::>>()?; + Ok(TransformedData::Vanilla(dataset)) + } + } } +} - let normalized_queries = result_data - .results - .iter() - .map(|result| &result.query) - .collect::>(); +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RequestResultDataMulti { + #[serde(rename = "queryType")] + pub query_type: QueryType, + pub results: Vec, + #[serde(rename = "pivotQuery")] + pub pivot_query: Option, + #[serde(rename = "slowQuery")] + pub slow_query: bool, +} - result_data.pivot_query = Option::from(get_pivot_query( - &result_data.query_type, - &normalized_queries, - )?); +impl RequestResultDataMulti { + /// Processes multiple results and populates the final `RequestResultDataMulti` structure + /// which is sent to the client. + pub fn prepare_results( + &mut self, + request_data: &[TransformDataRequest], + cube_store_result: &[Arc], + ) -> Result<()> { + for (transform_data, cube_store_result, result) in multizip(( + request_data.iter(), + cube_store_result.iter(), + self.results.iter_mut(), + )) { + result.prepare_results(transform_data, cube_store_result)?; + } - Ok(()) + let normalized_queries = self + .results + .iter() + .map(|result| &result.query) + .collect::>(); + + self.pivot_query = Some(get_pivot_query(&self.query_type, &normalized_queries)?); + + Ok(()) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RequestResultData { + pub query: NormalizedQuery, + #[serde(rename = "lastRefreshTime")] + pub last_refresh_time: Option, + #[serde(rename = "refreshKeyValues")] + pub refresh_key_values: Option, + #[serde(rename = "usedPreAggregations")] + pub used_pre_aggregations: Option, + #[serde(rename = "transformedQuery")] + pub transformed_query: Option, + #[serde(rename = "requestId")] + pub request_id: Option, + pub annotation: HashMap>, + #[serde(rename = "dataSource")] + pub data_source: String, + #[serde(rename = "dbType")] + pub db_type: String, + #[serde(rename = "extDbType")] + pub ext_db_type: Option, + pub external: bool, + #[serde(rename = "slowQuery")] + pub slow_query: bool, + pub total: Option, + pub data: Option, +} + +impl RequestResultData { + /// Populates the `RequestResultData` structure with the transformed Query result. + pub fn prepare_results( + &mut self, + request_data: &TransformDataRequest, + cube_store_result: &CubeStoreResult, + ) -> Result<()> { + let transformed = TransformedData::transform(request_data, cube_store_result)?; + self.data = Some(transformed); + + Ok(()) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RequestResultArray { + pub results: Vec, } diff --git a/rust/cubeorchestrator/src/lib.rs b/rust/cubeorchestrator/src/lib.rs index f445d3cfcead5..6532689df15d9 100644 --- a/rust/cubeorchestrator/src/lib.rs +++ b/rust/cubeorchestrator/src/lib.rs @@ -1,3 +1,3 @@ pub mod cubestore_message_parser; pub mod cubestore_result_transform; -pub mod types; +pub mod transport; diff --git a/rust/cubeorchestrator/src/types.rs b/rust/cubeorchestrator/src/transport.rs similarity index 86% rename from rust/cubeorchestrator/src/types.rs rename to rust/cubeorchestrator/src/transport.rs index eeeada3e3b7ce..08033e984326b 100644 --- a/rust/cubeorchestrator/src/types.rs +++ b/rust/cubeorchestrator/src/transport.rs @@ -315,16 +315,6 @@ pub struct NormalizedQuery { pub query_type: Option, } -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(untagged)] -pub enum TransformedData { - Compact { - members: Vec, - dataset: Vec>, - }, - Vanilla(Vec>), -} - #[derive(Debug, Clone, Deserialize)] pub struct TransformDataRequest { #[serde(rename = "aliasToMemberNameMap")] @@ -336,46 +326,3 @@ pub struct TransformDataRequest { #[serde(rename = "resType")] pub res_type: Option, } - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct RequestResultData { - pub query: NormalizedQuery, - #[serde(rename = "lastRefreshTime")] - pub last_refresh_time: Option, - #[serde(rename = "refreshKeyValues")] - pub refresh_key_values: Option, - #[serde(rename = "usedPreAggregations")] - pub used_pre_aggregations: Option, - #[serde(rename = "transformedQuery")] - pub transformed_query: Option, - #[serde(rename = "requestId")] - pub request_id: Option, - pub annotation: HashMap>, - #[serde(rename = "dataSource")] - pub data_source: String, - #[serde(rename = "dbType")] - pub db_type: String, - #[serde(rename = "extDbType")] - pub ext_db_type: Option, - pub external: bool, - #[serde(rename = "slowQuery")] - pub slow_query: bool, - pub total: Option, - pub data: Option, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct RequestResultDataMulti { - #[serde(rename = "queryType")] - pub query_type: QueryType, - pub results: Vec, - #[serde(rename = "pivotQuery")] - pub pivot_query: Option, - #[serde(rename = "slowQuery")] - pub slow_query: bool, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct RequestResultArray { - pub results: Vec, -}