diff --git a/packages/cubejs-backend-native/src/node_export.rs b/packages/cubejs-backend-native/src/node_export.rs index 3c86121c3c2c7..ae578437bb28a 100644 --- a/packages/cubejs-backend-native/src/node_export.rs +++ b/packages/cubejs-backend-native/src/node_export.rs @@ -9,10 +9,6 @@ use futures::StreamExt; use serde_json::Map; use tokio::sync::Semaphore; -use std::net::SocketAddr; -use std::rc::Rc; -use std::str::FromStr; -use std::sync::Arc; use crate::auth::{NativeAuthContext, NodeBridgeAuthService}; use crate::channel::call_js_fn; use crate::config::{NodeConfiguration, NodeConfigurationFactoryOptions, NodeCubeServices}; @@ -30,6 +26,10 @@ use cubenativeutils::wrappers::serializer::NativeDeserialize; use cubenativeutils::wrappers::NativeContextHolder; use cubesqlplanner::cube_bridge::base_query_options::NativeBaseQueryOptions; use cubesqlplanner::planner::base_query::BaseQuery; +use std::net::SocketAddr; +use std::rc::Rc; +use std::str::FromStr; +use std::sync::Arc; use cubeorchestrator::cubestore_message_parser::CubeStoreResult; @@ -520,11 +520,9 @@ fn parse_cubestore_ws_result_message(mut cx: FunctionContext) -> JsResult Ok(cx.boxed(Arc::new(result))), - Err(err) => cx.throw_error(err.to_string()), - } + .promise(move |mut cx, res| match res { + Ok(result) => Ok(cx.boxed(Arc::new(result))), + Err(err) => cx.throw_error(err.to_string()), }); Ok(promise) @@ -589,18 +587,16 @@ fn transform_query_data(mut cx: FunctionContext) -> JsResult { Err(err) => Err(anyhow::Error::from(err)), } }) - .promise(move |mut cx, json_data| { - match json_data { - Ok(json_data) => { - let js_string = cx.string(json_data); + .promise(move |mut cx, json_data| match json_data { + Ok(json_data) => { + let js_string = cx.string(json_data); - let js_result = cx.empty_object(); - js_result.set(&mut cx, "result", js_string)?; + let js_result = cx.empty_object(); + js_result.set(&mut cx, "result", js_string)?; - Ok(js_result) - } - Err(err) => cx.throw_error(err.to_string()), + Ok(js_result) } + Err(err) => cx.throw_error(err.to_string()), }); Ok(promise) @@ -620,7 +616,8 @@ fn final_cubestore_result(mut cx: FunctionContext) -> JsResult { Err(err) => return Err(anyhow::Error::from(err)), }; - let mut result_data = match serde_json::from_str::(&result_data_str) { + let mut result_data = match serde_json::from_str::(&result_data_str) + { Ok(data) => data, Err(err) => return Err(anyhow::Error::from(err)), }; @@ -636,21 +633,19 @@ fn final_cubestore_result(mut cx: FunctionContext) -> JsResult { Err(err) => Err(anyhow::Error::from(err)), } }) - .promise(move |mut cx, json_string| { - match json_string { - Ok(json_string) => { - let json_bytes = json_string.as_bytes(); - - let mut js_buffer = cx.array_buffer(json_bytes.len())?; - { - let buffer = js_buffer.as_mut_slice(&mut cx); - buffer.copy_from_slice(json_bytes); - } - - Ok(js_buffer) + .promise(move |mut cx, json_string| match json_string { + Ok(json_string) => { + let json_bytes = json_string.as_bytes(); + + let mut js_buffer = cx.array_buffer(json_bytes.len())?; + { + let buffer = js_buffer.as_mut_slice(&mut cx); + buffer.copy_from_slice(json_bytes); } - Err(err) => cx.throw_error(err.to_string()), + + Ok(js_buffer) } + Err(err) => cx.throw_error(err.to_string()), }); Ok(promise) @@ -672,10 +667,7 @@ fn final_cubestore_result_array(mut cx: FunctionContext) -> JsResult let cube_store_results_boxed: Vec>>> = cube_store_array .to_vec(&mut cx)? .into_iter() - .map(|js_value| { - js_value - .downcast_or_throw::>, _>(&mut cx) - }) + .map(|js_value| js_value.downcast_or_throw::>, _>(&mut cx)) .collect::>()?; let cube_store_results: Vec> = cube_store_results_boxed .iter() @@ -697,22 +689,22 @@ fn final_cubestore_result_array(mut cx: FunctionContext) -> JsResult .task(move || { let transform_requests: Vec = transform_request_strings .into_iter() - .map(|req_str| { - match serde_json::from_str::(&req_str) { + .map( + |req_str| match serde_json::from_str::(&req_str) { Ok(request) => Ok(request), Err(err) => Err(anyhow::Error::from(err)), - } - }) + }, + ) .collect::>()?; let mut request_results: Vec = request_result_strings .into_iter() - .map(|req_str| { - match serde_json::from_str::(&req_str) { + .map( + |req_str| match serde_json::from_str::(&req_str) { Ok(request) => Ok(request), Err(err) => Err(anyhow::Error::from(err)), - } - }) + }, + ) .collect::>()?; get_final_cubestore_result_array( @@ -730,21 +722,19 @@ fn final_cubestore_result_array(mut cx: FunctionContext) -> JsResult Err(err) => Err(anyhow::Error::from(err)), } }) - .promise(move |mut cx, json_data| { - match json_data { - Ok(json_data) => { - let json_bytes = json_data.as_bytes(); - - let mut js_buffer = cx.array_buffer(json_bytes.len())?; - { - let buffer = js_buffer.as_mut_slice(&mut cx); - buffer.copy_from_slice(json_bytes); - } - - Ok(js_buffer) + .promise(move |mut cx, json_data| match json_data { + Ok(json_data) => { + let json_bytes = json_data.as_bytes(); + + let mut js_buffer = cx.array_buffer(json_bytes.len())?; + { + let buffer = js_buffer.as_mut_slice(&mut cx); + buffer.copy_from_slice(json_bytes); } - Err(err) => cx.throw_error(err.to_string()), + + Ok(js_buffer) } + Err(err) => cx.throw_error(err.to_string()), }); Ok(promise) @@ -766,10 +756,7 @@ fn final_cubestore_result_multi(mut cx: FunctionContext) -> JsResult let cube_store_results_boxed: Vec>>> = cube_store_array .to_vec(&mut cx)? .into_iter() - .map(|js_value| { - js_value - .downcast_or_throw::>, _>(&mut cx) - }) + .map(|js_value| js_value.downcast_or_throw::>, _>(&mut cx)) .collect::>()?; let cube_store_results: Vec> = cube_store_results_boxed .iter() @@ -782,41 +769,44 @@ fn final_cubestore_result_multi(mut cx: FunctionContext) -> JsResult .task(move || { let transform_requests: Vec = transform_request_strings .into_iter() - .map(|req_str| { - match serde_json::from_str::(&req_str) { + .map( + |req_str| match serde_json::from_str::(&req_str) { Ok(request) => Ok(request), Err(err) => Err(anyhow::Error::from(err)), - } - }) + }, + ) .collect::>()?; - let mut result_data = match serde_json::from_str::(&result_data_str) { - Ok(data) => data, - Err(err) => return Err(anyhow::Error::from(err)), - }; + let mut result_data = + match serde_json::from_str::(&result_data_str) { + Ok(data) => data, + Err(err) => return Err(anyhow::Error::from(err)), + }; - get_final_cubestore_result_multi(&transform_requests, &cube_store_results, &mut result_data)?; + get_final_cubestore_result_multi( + &transform_requests, + &cube_store_results, + &mut result_data, + )?; match serde_json::to_string(&result_data) { Ok(json) => Ok(json), Err(err) => Err(anyhow::Error::from(err)), } }) - .promise(move |mut cx, json_data| { - match json_data { - Ok(json_data) => { - let json_bytes = json_data.as_bytes(); - - let mut js_buffer = cx.array_buffer(json_bytes.len())?; - { - let buffer = js_buffer.as_mut_slice(&mut cx); - buffer.copy_from_slice(json_bytes); - } - - Ok(js_buffer) + .promise(move |mut cx, json_data| match json_data { + Ok(json_data) => { + let json_bytes = json_data.as_bytes(); + + let mut js_buffer = cx.array_buffer(json_bytes.len())?; + { + let buffer = js_buffer.as_mut_slice(&mut cx); + buffer.copy_from_slice(json_bytes); } - Err(err) => cx.throw_error(err.to_string()), + + Ok(js_buffer) } + Err(err) => cx.throw_error(err.to_string()), }); Ok(promise) diff --git a/rust/cubeorchestrator/src/cubestore_result_transform.rs b/rust/cubeorchestrator/src/cubestore_result_transform.rs index a8e72b0484713..c35c00d3922da 100644 --- a/rust/cubeorchestrator/src/cubestore_result_transform.rs +++ b/rust/cubeorchestrator/src/cubestore_result_transform.rs @@ -10,8 +10,10 @@ use crate::{ use anyhow::{bail, Context, Result}; use chrono::{DateTime, SecondsFormat}; use itertools::multizip; -use std::collections::{HashMap, HashSet}; -use std::sync::Arc; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; /// Transform specified `value` with specified `type` to the network protocol type. pub fn transform_value(value: String, type_: &str) -> String {