Skip to content

Commit

Permalink
refactor results transformations
Browse files Browse the repository at this point in the history
  • Loading branch information
KSDaemon committed Dec 16, 2024
1 parent 08019ce commit 3a811ac
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 192 deletions.
35 changes: 6 additions & 29 deletions packages/cubejs-backend-native/src/node_export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,10 @@ use cubeorchestrator::cubestore_message_parser::CubeStoreResult;
use cubesql::{telemetry::ReportingLogger, CubeError};

use cubeorchestrator::cubestore_result_transform::{
get_final_cubestore_result, get_final_cubestore_result_array, get_final_cubestore_result_multi,
transform_data,
};
use cubeorchestrator::types::{
RequestResultArray, RequestResultData, RequestResultDataMulti, TransformDataRequest,
get_final_cubestore_result_array, RequestResultArray, RequestResultData,
RequestResultDataMulti, TransformedData,
};
use cubeorchestrator::transport::TransformDataRequest;
use neon::prelude::*;
use neon::types::buffer::TypedArray;

Expand Down Expand Up @@ -567,20 +565,7 @@ fn transform_query_data(mut cx: FunctionContext) -> JsResult<JsPromise> {
Err(err) => return Err(anyhow::Error::from(err)),
};

let alias_to_member_name_map = &request_data.alias_to_member_name_map;
let annotation = &request_data.annotation;
let query = &request_data.query;
let query_type = &request_data.query_type.unwrap_or_default();
let res_type = &request_data.res_type;

let transformed = transform_data(
alias_to_member_name_map,
annotation,
&cube_store_result,
query,
query_type,
res_type.clone(),
)?;
let transformed = TransformedData::transform(&request_data, &cube_store_result)?;

match serde_json::to_string(&transformed) {
Ok(json) => Ok(json),
Expand Down Expand Up @@ -622,11 +607,7 @@ fn final_cubestore_result(mut cx: FunctionContext) -> JsResult<JsPromise> {
Err(err) => return Err(anyhow::Error::from(err)),
};

get_final_cubestore_result(
&transform_request_data,
&cube_store_result,
&mut result_data,
)?;
result_data.prepare_results(&transform_request_data, &cube_store_result)?;

match serde_json::to_string(&result_data) {
Ok(json) => Ok(json),
Expand Down Expand Up @@ -783,11 +764,7 @@ fn final_cubestore_result_multi(mut cx: FunctionContext) -> JsResult<JsPromise>
Err(err) => return Err(anyhow::Error::from(err)),
};

get_final_cubestore_result_multi(
&transform_requests,
&cube_store_results,
&mut result_data,
)?;
result_data.prepare_results(&transform_requests, &cube_store_results)?;

match serde_json::to_string(&result_data) {
Ok(json) => Ok(json),
Expand Down
266 changes: 157 additions & 109 deletions rust/cubeorchestrator/src/cubestore_result_transform.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use crate::{
cubestore_message_parser::CubeStoreResult,
types::{
ConfigItem, MembersMap, NormalizedQuery, QueryTimeDimension, QueryType, RequestResultData,
RequestResultDataMulti, ResultType, TransformDataRequest, TransformedData,
BLENDING_QUERY_KEY_PREFIX, BLENDING_QUERY_RES_SEPARATOR, COMPARE_DATE_RANGE_FIELD,
COMPARE_DATE_RANGE_SEPARATOR, MEMBER_SEPARATOR,
transport::{
ConfigItem, MembersMap, NormalizedQuery, QueryTimeDimension, QueryType, ResultType,
TransformDataRequest, BLENDING_QUERY_KEY_PREFIX, BLENDING_QUERY_RES_SEPARATOR,
COMPARE_DATE_RANGE_FIELD, COMPARE_DATE_RANGE_SEPARATOR, MEMBER_SEPARATOR,
},
};
use anyhow::{bail, Context, Result};
use chrono::{DateTime, SecondsFormat};
use itertools::multizip;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
Expand Down Expand Up @@ -284,63 +285,6 @@ pub fn get_vanilla_row(
Ok(row)
}

/// Transforms queried data array to the output format.
pub fn transform_data(
alias_to_member_name_map: &HashMap<String, String>,
annotation: &HashMap<String, ConfigItem>,
data: &CubeStoreResult,
query: &NormalizedQuery,
query_type: &QueryType,
res_type: Option<ResultType>,
) -> Result<TransformedData> {
let members_to_alias_map = get_members(
query_type,
query,
data,
alias_to_member_name_map,
annotation,
)?;
let members: Vec<String> = members_to_alias_map.keys().cloned().collect();

match res_type {
Some(ResultType::Compact) => {
let dataset: Vec<_> = data
.rows
.iter()
.map(|row| {
get_compact_row(
&members_to_alias_map,
annotation,
query_type,
&members,
query.time_dimensions.as_ref(),
row,
&data.columns_pos,
)
})
.collect::<Result<Vec<_>>>()?;
Ok(TransformedData::Compact { members, dataset })
}
_ => {
let dataset: Vec<_> = data
.rows
.iter()
.map(|row| {
get_vanilla_row(
alias_to_member_name_map,
annotation,
query_type,
query,
row,
&data.columns_pos,
)
})
.collect::<Result<Vec<_>>>()?;
Ok(TransformedData::Vanilla(dataset))
}
}
}

/// Helper to get a list if unique granularities from normalized queries
pub fn get_query_granularities(queries: &[&NormalizedQuery]) -> Vec<String> {
queries
Expand Down Expand Up @@ -414,31 +358,6 @@ pub fn get_pivot_query(
Ok(pivot_query)
}

pub fn get_final_cubestore_result(
request_data: &TransformDataRequest,
cube_store_result: &CubeStoreResult,
result_data: &mut RequestResultData,
) -> Result<()> {
let alias_to_member_name_map = &request_data.alias_to_member_name_map;
let annotation = &request_data.annotation;
let query = &request_data.query;
let query_type = &request_data.query_type.clone().unwrap_or_default();
let res_type = &request_data.res_type;

let transformed = transform_data(
alias_to_member_name_map,
annotation,
cube_store_result,
query,
query_type,
res_type.clone(),
)?;

result_data.data = Some(transformed);

Ok(())
}

pub fn get_final_cubestore_result_array(
transform_requests: &[TransformDataRequest],
cube_store_results: &[Arc<CubeStoreResult>],
Expand All @@ -449,35 +368,164 @@ pub fn get_final_cubestore_result_array(
cube_store_results.iter(),
result_data.iter_mut(),
)) {
get_final_cubestore_result(transform_data, cube_store_result, result)?;
result.prepare_results(transform_data, cube_store_result)?;
}

Ok(())
}

pub fn get_final_cubestore_result_multi(
request_data: &[TransformDataRequest],
cube_store_result: &[Arc<CubeStoreResult>],
result_data: &mut RequestResultDataMulti,
) -> Result<()> {
for (transform_data, cube_store_result, result) in multizip((
request_data.iter(),
cube_store_result.iter(),
result_data.results.iter_mut(),
)) {
get_final_cubestore_result(transform_data, cube_store_result, result)?;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum TransformedData {
Compact {
members: Vec<String>,
dataset: Vec<Vec<String>>,
},
Vanilla(Vec<HashMap<String, String>>),
}

impl TransformedData {
/// Transforms queried data array to the output format.
pub fn transform(
request_data: &TransformDataRequest,
cube_store_result: &CubeStoreResult,
) -> Result<Self> {
let alias_to_member_name_map = &request_data.alias_to_member_name_map;
let annotation = &request_data.annotation;
let query = &request_data.query;
let query_type = &request_data.query_type.clone().unwrap_or_default();
let res_type = request_data.res_type.clone();

let members_to_alias_map = get_members(
&query_type,
query,
cube_store_result,
alias_to_member_name_map,
annotation,
)?;
let members: Vec<String> = members_to_alias_map.keys().cloned().collect();

match res_type {
Some(ResultType::Compact) => {
let dataset: Vec<_> = cube_store_result
.rows
.iter()
.map(|row| {
get_compact_row(
&members_to_alias_map,
annotation,
&query_type,
&members,
query.time_dimensions.as_ref(),
row,
&cube_store_result.columns_pos,
)
})
.collect::<Result<Vec<_>>>()?;
Ok(TransformedData::Compact { members, dataset })
}
_ => {
let dataset: Vec<_> = cube_store_result
.rows
.iter()
.map(|row| {
get_vanilla_row(
alias_to_member_name_map,
annotation,
&query_type,
query,
row,
&cube_store_result.columns_pos,
)
})
.collect::<Result<Vec<_>>>()?;
Ok(TransformedData::Vanilla(dataset))
}
}
}
}

let normalized_queries = result_data
.results
.iter()
.map(|result| &result.query)
.collect::<Vec<_>>();
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RequestResultDataMulti {
#[serde(rename = "queryType")]
pub query_type: QueryType,
pub results: Vec<RequestResultData>,
#[serde(rename = "pivotQuery")]
pub pivot_query: Option<NormalizedQuery>,
#[serde(rename = "slowQuery")]
pub slow_query: bool,
}

result_data.pivot_query = Option::from(get_pivot_query(
&result_data.query_type,
&normalized_queries,
)?);
impl RequestResultDataMulti {
/// Processes multiple results and populates the final `RequestResultDataMulti` structure
/// which is sent to the client.
pub fn prepare_results(
&mut self,
request_data: &[TransformDataRequest],
cube_store_result: &[Arc<CubeStoreResult>],
) -> Result<()> {
for (transform_data, cube_store_result, result) in multizip((
request_data.iter(),
cube_store_result.iter(),
self.results.iter_mut(),
)) {
result.prepare_results(transform_data, cube_store_result)?;
}

Ok(())
let normalized_queries = self
.results
.iter()
.map(|result| &result.query)
.collect::<Vec<_>>();

self.pivot_query = Some(get_pivot_query(&self.query_type, &normalized_queries)?);

Ok(())
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RequestResultData {
pub query: NormalizedQuery,
#[serde(rename = "lastRefreshTime")]
pub last_refresh_time: Option<String>,
#[serde(rename = "refreshKeyValues")]
pub refresh_key_values: Option<Value>,
#[serde(rename = "usedPreAggregations")]
pub used_pre_aggregations: Option<Value>,
#[serde(rename = "transformedQuery")]
pub transformed_query: Option<Value>,
#[serde(rename = "requestId")]
pub request_id: Option<String>,
pub annotation: HashMap<String, HashMap<String, ConfigItem>>,
#[serde(rename = "dataSource")]
pub data_source: String,
#[serde(rename = "dbType")]
pub db_type: String,
#[serde(rename = "extDbType")]
pub ext_db_type: Option<String>,
pub external: bool,
#[serde(rename = "slowQuery")]
pub slow_query: bool,
pub total: Option<u64>,
pub data: Option<TransformedData>,
}

impl RequestResultData {
/// Populates the `RequestResultData` structure with the transformed Query result.
pub fn prepare_results(
&mut self,
request_data: &TransformDataRequest,
cube_store_result: &CubeStoreResult,
) -> Result<()> {
let transformed = TransformedData::transform(request_data, cube_store_result)?;
self.data = Some(transformed);

Ok(())
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RequestResultArray {
pub results: Vec<RequestResultData>,
}
2 changes: 1 addition & 1 deletion rust/cubeorchestrator/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
pub mod cubestore_message_parser;
pub mod cubestore_result_transform;
pub mod types;
pub mod transport;
Loading

0 comments on commit 3a811ac

Please sign in to comment.