From 1d3bc06c4446278c77f2d694a9bd4c2fa32ad82b Mon Sep 17 00:00:00 2001 From: cdxker Date: Tue, 15 Oct 2024 09:34:18 -0700 Subject: [PATCH] bugfix: create embeddings on non receiving thread --- server/src/operators/model_operator.rs | 277 +++++++++++++------------ 1 file changed, 141 insertions(+), 136 deletions(-) diff --git a/server/src/operators/model_operator.rs b/server/src/operators/model_operator.rs index 9ec42f3e1..f412b5c2d 100644 --- a/server/src/operators/model_operator.rs +++ b/server/src/operators/model_operator.rs @@ -4,6 +4,7 @@ use crate::{ get_env, handlers::chunk_handler::{FullTextBoost, SemanticBoost}, }; +use actix_web::web; use murmur3::murmur3_32; use openai_dive::v1::resources::embedding::EmbeddingInput; use serde::{Deserialize, Serialize}; @@ -130,72 +131,71 @@ pub async fn get_dense_vector( truncate: true, }; - let embeddings_resp_a = ureq::post(&format!( - "{}/embeddings?api-version=2023-05-15", - embedding_base_url - )) - .set("Authorization", &format!("Bearer {}", &embedding_api_key)) - .set("api-key", &embedding_api_key) - .set("Content-Type", "application/json") - .send_json(serde_json::to_value(parameters).unwrap()) - .map_err(|e| { - ServiceError::InternalServerError(format!( - "Could not get embeddings from server: {:?}, {:?}", - e, - e.to_string() + let resp = web::block(move || { + let embeddings_resp_a = ureq::post(&format!( + "{}/embeddings?api-version=2023-05-15", + embedding_base_url )) - })?; - - let embeddings_resp = embeddings_resp_a - .into_json::() - .map_err(|err| { + .set("Authorization", &format!("Bearer {}", &embedding_api_key)) + .set("api-key", &embedding_api_key) + .set("Content-Type", "application/json") + .send_json(serde_json::to_value(parameters).unwrap()) + .map_err(|e| { ServiceError::InternalServerError(format!( - "Failed to format response from embeddings server {:?}", - err + "Could not get embeddings from server: {:?}, {:?}", + e, + e.to_string() )) })?; - let mut vectors: Vec> = embeddings_resp.to_vec(); - if vectors.iter().any(|x| x.is_empty()) { - return Err(ServiceError::InternalServerError( - "Embedding server responded with Base64 and that is not currently supported for embeddings".to_owned(), - )); - } - - if let Some(semantic_boost) = semantic_boost { - let distance_factor = semantic_boost.distance_factor; - let boost_vector = match vectors.pop() { - Some(v) => v, - None => { - return Err(ServiceError::InternalServerError( - "No dense embedding returned from server for boost_vector".to_owned(), - )) - } - }; - let embedding_vector = match vectors.pop() { - Some(v) => v, - None => { - return Err(ServiceError::InternalServerError( - "No dense embedding returned from server for embedding_vector".to_owned(), + let embeddings_resp = embeddings_resp_a + .into_json::() + .map_err(|err| { + ServiceError::InternalServerError(format!( + "Failed to format response from embeddings server {:?}", + err )) - } - }; + })?; - return Ok(embedding_vector - .iter() - .zip(boost_vector) - .map(|(vec_elem, boost_vec_elem)| vec_elem + distance_factor * boost_vec_elem) - .collect()); - } + let mut vectors = embeddings_resp.to_vec(); + if let Some(semantic_boost) = semantic_boost { + let distance_factor = semantic_boost.distance_factor; + let boost_vector = match vectors.pop() { + Some(v) => v, + None => { + return Err(ServiceError::InternalServerError( + "No dense embedding returned from server for boost_vector".to_owned(), + )) + } + }; + let embedding_vector = match vectors.pop() { + Some(v) => v, + None => { + return Err(ServiceError::InternalServerError( + "No dense embedding returned from server for embedding_vector".to_owned(), + )) + } + }; - transaction.finish(); + return Ok(embedding_vector + .iter() + .zip(boost_vector) + .map(|(vec_elem, boost_vec_elem)| vec_elem + distance_factor * boost_vec_elem) + .collect()); + } - match vectors.first() { - Some(v) => Ok(v.clone()), - None => Err(ServiceError::InternalServerError( - "No dense embeddings returned from server".to_owned(), - )), - } + match vectors.first() { + Some(v) => Ok(v.clone()), + None => Err(ServiceError::InternalServerError( + "No dense embeddings returned from server".to_owned(), + )), + } + }) + .await + .map_err(|err| ServiceError::BadRequest(format!("Thread error {:?}", err)))?; + + transaction.finish(); + resp } #[tracing::instrument] @@ -232,92 +232,97 @@ pub async fn get_sparse_vector( } let embedding_server_call = format!("{}/embed_sparse", server_origin); + let embed_type_string = embed_type.to_owned(); - let mut sparse_vectors = ureq::post(&embedding_server_call) - .set("Content-Type", "application/json") - .set( - "Authorization", - &format!( - "Bearer {}", - get_env!("OPENAI_API_KEY", "OPENAI_API should be set") - ), - ) - .send_json(CustomSparseEmbedData { - inputs, - encode_type: embed_type.to_string(), - truncate: true, - }) - .map_err(|err| { - log::error!( - "Failed parsing response from custom embedding server {:?}", - err - ); - ServiceError::BadRequest(format!("Failed making call to server {:?}", err)) - })? - .into_json::>>() - .map_err(|_e| { - log::error!( - "Failed parsing response from custom embedding server {:?}", - _e - ); - ServiceError::BadRequest( - "Failed parsing response from custom embedding server".to_string(), + web::block(move || { + let mut sparse_vectors = ureq::post(&embedding_server_call) + .set("Content-Type", "application/json") + .set( + "Authorization", + &format!( + "Bearer {}", + get_env!("OPENAI_API_KEY", "OPENAI_API should be set") + ), ) - })?; + .send_json(CustomSparseEmbedData { + inputs, + encode_type: embed_type_string, + truncate: true, + }) + .map_err(|err| { + log::error!( + "Failed parsing response from custom embedding server {:?}", + err + ); + ServiceError::BadRequest(format!("Failed making call to server {:?}", err)) + })? + .into_json::>>() + .map_err(|_e| { + log::error!( + "Failed parsing response from custom embedding server {:?}", + _e + ); + ServiceError::BadRequest( + "Failed parsing response from custom embedding server".to_string(), + ) + })?; - if let Some(fulltext_boost) = fulltext_boost { - let boost_amt = fulltext_boost.boost_factor; - let boost_vector = match sparse_vectors.pop() { - Some(v) => v, - None => { - return Err(ServiceError::InternalServerError( - "No sparse vector returned from server for boost_vector".to_owned(), - )) - } - }; - let query_vector = match sparse_vectors.pop() { - Some(v) => v, - None => { - return Err(ServiceError::InternalServerError( - "No sparse vector returned from server for embedding_vector".to_owned(), - )) - } - }; + if let Some(fulltext_boost) = fulltext_boost { + let boost_amt = fulltext_boost.boost_factor; + let boost_vector = match sparse_vectors.pop() { + Some(v) => v, + None => { + return Err(ServiceError::InternalServerError( + "No sparse vector returned from server for boost_vector".to_owned(), + )) + } + }; + let query_vector = match sparse_vectors.pop() { + Some(v) => v, + None => { + return Err(ServiceError::InternalServerError( + "No sparse vector returned from server for embedding_vector".to_owned(), + )) + } + }; - let boosted_query_vector = query_vector - .iter() - .map(|splade_indice| { - if boost_vector - .iter() - .any(|boost_splade_indice| boost_splade_indice.index == splade_indice.index) - { - SpladeIndicies { - index: splade_indice.index, - value: splade_indice.value * (boost_amt as f32), - } - .into_tuple() - } else { - SpladeIndicies { - index: splade_indice.index, - value: splade_indice.value, + let boosted_query_vector = query_vector + .iter() + .map(|splade_indice| { + if boost_vector + .iter() + .any(|boost_splade_indice| boost_splade_indice.index == splade_indice.index) + { + SpladeIndicies { + index: splade_indice.index, + value: splade_indice.value * (boost_amt as f32), + } + .into_tuple() + } else { + SpladeIndicies { + index: splade_indice.index, + value: splade_indice.value, + } + .into_tuple() } - .into_tuple() - } - }) - .collect(); + }) + .collect(); - return Ok(boosted_query_vector); - } + return Ok(boosted_query_vector); + } - match sparse_vectors.first() { - Some(v) => Ok(v - .iter() - .map(|splade_idx| (*splade_idx).into_tuple()) - .collect()), - None => Err(ServiceError::InternalServerError( - "No sparse embeddings returned from server".to_owned(), - )), - } + match sparse_vectors.first() { + Some(v) => Ok(v + .iter() + .map(|splade_idx| (*splade_idx).into_tuple()) + .collect()), + None => Err(ServiceError::InternalServerError( + "No sparse embeddings returned from server".to_owned(), + )), + } + }) + .await + .map_err(|err| ServiceError::BadRequest(format!("Thread error {:?}", err)))? } #[tracing::instrument]