Skip to content

Commit

Permalink
bugfix: create embeddings on non receiving thread
Browse files Browse the repository at this point in the history
  • Loading branch information
cdxker authored and skeptrunedev committed Oct 15, 2024
1 parent 84c7039 commit 1d3bc06
Showing 1 changed file with 141 additions and 136 deletions.
277 changes: 141 additions & 136 deletions server/src/operators/model_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::{
get_env,
handlers::chunk_handler::{FullTextBoost, SemanticBoost},
};
use actix_web::web;
use murmur3::murmur3_32;
use openai_dive::v1::resources::embedding::EmbeddingInput;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -130,72 +131,71 @@ pub async fn get_dense_vector(
truncate: true,
};

let embeddings_resp_a = ureq::post(&format!(
"{}/embeddings?api-version=2023-05-15",
embedding_base_url
))
.set("Authorization", &format!("Bearer {}", &embedding_api_key))
.set("api-key", &embedding_api_key)
.set("Content-Type", "application/json")
.send_json(serde_json::to_value(parameters).unwrap())
.map_err(|e| {
ServiceError::InternalServerError(format!(
"Could not get embeddings from server: {:?}, {:?}",
e,
e.to_string()
let resp = web::block(move || {
let embeddings_resp_a = ureq::post(&format!(
"{}/embeddings?api-version=2023-05-15",
embedding_base_url
))
})?;

let embeddings_resp = embeddings_resp_a
.into_json::<DenseEmbedData>()
.map_err(|err| {
.set("Authorization", &format!("Bearer {}", &embedding_api_key))
.set("api-key", &embedding_api_key)
.set("Content-Type", "application/json")
.send_json(serde_json::to_value(parameters).unwrap())
.map_err(|e| {
ServiceError::InternalServerError(format!(
"Failed to format response from embeddings server {:?}",
err
"Could not get embeddings from server: {:?}, {:?}",
e,
e.to_string()
))
})?;

let mut vectors: Vec<Vec<f32>> = embeddings_resp.to_vec();
if vectors.iter().any(|x| x.is_empty()) {
return Err(ServiceError::InternalServerError(
"Embedding server responded with Base64 and that is not currently supported for embeddings".to_owned(),
));
}

if let Some(semantic_boost) = semantic_boost {
let distance_factor = semantic_boost.distance_factor;
let boost_vector = match vectors.pop() {
Some(v) => v,
None => {
return Err(ServiceError::InternalServerError(
"No dense embedding returned from server for boost_vector".to_owned(),
))
}
};
let embedding_vector = match vectors.pop() {
Some(v) => v,
None => {
return Err(ServiceError::InternalServerError(
"No dense embedding returned from server for embedding_vector".to_owned(),
let embeddings_resp = embeddings_resp_a
.into_json::<DenseEmbedData>()
.map_err(|err| {
ServiceError::InternalServerError(format!(
"Failed to format response from embeddings server {:?}",
err
))
}
};
})?;

return Ok(embedding_vector
.iter()
.zip(boost_vector)
.map(|(vec_elem, boost_vec_elem)| vec_elem + distance_factor * boost_vec_elem)
.collect());
}
let mut vectors = embeddings_resp.to_vec();
if let Some(semantic_boost) = semantic_boost {
let distance_factor = semantic_boost.distance_factor;
let boost_vector = match vectors.pop() {
Some(v) => v,
None => {
return Err(ServiceError::InternalServerError(
"No dense embedding returned from server for boost_vector".to_owned(),
))
}
};
let embedding_vector = match vectors.pop() {
Some(v) => v,
None => {
return Err(ServiceError::InternalServerError(
"No dense embedding returned from server for embedding_vector".to_owned(),
))
}
};

transaction.finish();
return Ok(embedding_vector
.iter()
.zip(boost_vector)
.map(|(vec_elem, boost_vec_elem)| vec_elem + distance_factor * boost_vec_elem)
.collect());
}

match vectors.first() {
Some(v) => Ok(v.clone()),
None => Err(ServiceError::InternalServerError(
"No dense embeddings returned from server".to_owned(),
)),
}
match vectors.first() {
Some(v) => Ok(v.clone()),
None => Err(ServiceError::InternalServerError(
"No dense embeddings returned from server".to_owned(),
)),
}
})
.await
.map_err(|err| ServiceError::BadRequest(format!("Thread error {:?}", err)))?;

transaction.finish();
resp
}

#[tracing::instrument]
Expand Down Expand Up @@ -232,92 +232,97 @@ pub async fn get_sparse_vector(
}

let embedding_server_call = format!("{}/embed_sparse", server_origin);
let embed_type_string = embed_type.to_owned();

let mut sparse_vectors = ureq::post(&embedding_server_call)
.set("Content-Type", "application/json")
.set(
"Authorization",
&format!(
"Bearer {}",
get_env!("OPENAI_API_KEY", "OPENAI_API should be set")
),
)
.send_json(CustomSparseEmbedData {
inputs,
encode_type: embed_type.to_string(),
truncate: true,
})
.map_err(|err| {
log::error!(
"Failed parsing response from custom embedding server {:?}",
err
);
ServiceError::BadRequest(format!("Failed making call to server {:?}", err))
})?
.into_json::<Vec<Vec<SpladeIndicies>>>()
.map_err(|_e| {
log::error!(
"Failed parsing response from custom embedding server {:?}",
_e
);
ServiceError::BadRequest(
"Failed parsing response from custom embedding server".to_string(),
web::block(move || {
let mut sparse_vectors = ureq::post(&embedding_server_call)
.set("Content-Type", "application/json")
.set(
"Authorization",
&format!(
"Bearer {}",
get_env!("OPENAI_API_KEY", "OPENAI_API should be set")
),
)
})?;
.send_json(CustomSparseEmbedData {
inputs,
encode_type: embed_type_string,
truncate: true,
})
.map_err(|err| {
log::error!(
"Failed parsing response from custom embedding server {:?}",
err
);
ServiceError::BadRequest(format!("Failed making call to server {:?}", err))
})?
.into_json::<Vec<Vec<SpladeIndicies>>>()
.map_err(|_e| {
log::error!(
"Failed parsing response from custom embedding server {:?}",
_e
);
ServiceError::BadRequest(
"Failed parsing response from custom embedding server".to_string(),
)
})?;

if let Some(fulltext_boost) = fulltext_boost {
let boost_amt = fulltext_boost.boost_factor;
let boost_vector = match sparse_vectors.pop() {
Some(v) => v,
None => {
return Err(ServiceError::InternalServerError(
"No sparse vector returned from server for boost_vector".to_owned(),
))
}
};
let query_vector = match sparse_vectors.pop() {
Some(v) => v,
None => {
return Err(ServiceError::InternalServerError(
"No sparse vector returned from server for embedding_vector".to_owned(),
))
}
};
if let Some(fulltext_boost) = fulltext_boost {
let boost_amt = fulltext_boost.boost_factor;
let boost_vector = match sparse_vectors.pop() {
Some(v) => v,
None => {
return Err(ServiceError::InternalServerError(
"No sparse vector returned from server for boost_vector".to_owned(),
))
}
};
let query_vector = match sparse_vectors.pop() {
Some(v) => v,
None => {
return Err(ServiceError::InternalServerError(
"No sparse vector returned from server for embedding_vector".to_owned(),
))
}
};

let boosted_query_vector = query_vector
.iter()
.map(|splade_indice| {
if boost_vector
.iter()
.any(|boost_splade_indice| boost_splade_indice.index == splade_indice.index)
{
SpladeIndicies {
index: splade_indice.index,
value: splade_indice.value * (boost_amt as f32),
}
.into_tuple()
} else {
SpladeIndicies {
index: splade_indice.index,
value: splade_indice.value,
let boosted_query_vector = query_vector
.iter()
.map(|splade_indice| {
if boost_vector
.iter()
.any(|boost_splade_indice| boost_splade_indice.index == splade_indice.index)
{
SpladeIndicies {
index: splade_indice.index,
value: splade_indice.value * (boost_amt as f32),
}
.into_tuple()
} else {
SpladeIndicies {
index: splade_indice.index,
value: splade_indice.value,
}
.into_tuple()
}
.into_tuple()
}
})
.collect();
})
.collect();

return Ok(boosted_query_vector);
}
return Ok(boosted_query_vector);
}

match sparse_vectors.first() {
Some(v) => Ok(v
.iter()
.map(|splade_idx| (*splade_idx).into_tuple())
.collect()),
None => Err(ServiceError::InternalServerError(
"No sparse embeddings returned from server".to_owned(),
)),
}
match sparse_vectors.first() {
Some(v) => Ok(v
.iter()
.map(|splade_idx| (*splade_idx).into_tuple())
.collect()),
None => Err(ServiceError::InternalServerError(
"No sparse embeddings returned from server".to_owned(),
)),
}
})
.await
.map_err(|err| ServiceError::BadRequest(format!("Thread error {:?}", err)))?
}

#[tracing::instrument]
Expand Down

0 comments on commit 1d3bc06

Please sign in to comment.