Skip to content

Commit

Permalink
Handle nested OTLP values in attributes and log bodies (#5485)
Browse files Browse the repository at this point in the history
* Handle nested OTLP values in attributes and log bodies

* Fix Python install
  • Loading branch information
guilload authored Oct 14, 2024
1 parent f5da576 commit 4c1e228
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 48 deletions.
9 changes: 5 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Install Ubuntu packages
run: sudo apt-get -y install protobuf-compiler python3 python3-pip
run: sudo apt-get -y install protobuf-compiler python3
- uses: dorny/paths-filter@v3
id: modified
with:
Expand Down Expand Up @@ -86,11 +86,12 @@ jobs:
run: cargo build --features=postgres --bin quickwit
working-directory: ./quickwit
- name: Install python packages
run: sudo pip3 install pyaml requests
if: always() && steps.modified.outputs.rust_src == 'true'
- name: run REST API tests
run: python3 -m venv venv && source venv/bin/activate && pip install pyaml requests
working-directory: ./quickwit/rest-api-tests
- name: Run REST API tests
if: always() && steps.modified.outputs.rust_src == 'true'
run: python3 ./run_tests.py --binary ../target/debug/quickwit
run: source venv/bin/activate && python3 ./run_tests.py --binary ../target/debug/quickwit
working-directory: ./quickwit/rest-api-tests

lints:
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-opentelemetry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ quickwit-proto = { workspace = true }
[dev-dependencies]
quickwit-common = { workspace = true, features = ["testsuite"] }
quickwit-metastore = { workspace = true, features = ["testsuite"] }
quickwit-proto = { workspace = true, features = ["testsuite"] }

[features]
testsuite = []
133 changes: 89 additions & 44 deletions quickwit/quickwit-opentelemetry/src/otlp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

use std::collections::HashMap;

use quickwit_common::rate_limited_warn;
use quickwit_config::{validate_identifier, validate_index_id_pattern, INGEST_V2_SOURCE_ID};
use quickwit_ingest::{CommitType, IngestServiceError};
use quickwit_proto::ingest::router::{
Expand Down Expand Up @@ -119,46 +120,54 @@ pub(crate) fn extract_attributes(attributes: Vec<OtlpKeyValue>) -> HashMap<Strin
if let Some(value) = attribute
.value
.and_then(|any_value| any_value.value)
.and_then(to_json_value)
.and_then(oltp_value_to_json_value)
{
attrs.insert(attribute.key, value);
}
}
attrs
}

fn to_json_value(value: OtlpValue) -> Option<JsonValue> {
fn oltp_value_to_json_value(value: OtlpValue) -> Option<JsonValue> {
match value {
OtlpValue::ArrayValue(OtlpArrayValue { values }) => Some(
values
.into_iter()
.flat_map(to_json_value_from_primitive_any_value)
.filter_map(|value| match value.value {
Some(value) => oltp_value_to_json_value(value),
None => None,
})
.collect(),
),
OtlpValue::BoolValue(value) => Some(JsonValue::Bool(value)),
OtlpValue::DoubleValue(value) => JsonNumber::from_f64(value).map(JsonValue::Number),
OtlpValue::IntValue(value) => Some(JsonValue::Number(JsonNumber::from(value))),
OtlpValue::StringValue(value) => Some(JsonValue::String(value)),
OtlpValue::BytesValue(_) | OtlpValue::KvlistValue(_) => {
// These attribute types are not supported for attributes according to the OpenTelemetry
// specification.
OtlpValue::BoolValue(bool_value) => Some(JsonValue::Bool(bool_value)),
OtlpValue::DoubleValue(double_value) => {
JsonNumber::from_f64(double_value).map(JsonValue::Number)
}
OtlpValue::IntValue(int_value) => Some(JsonValue::Number(JsonNumber::from(int_value))),
OtlpValue::KvlistValue(key_values) => {
let mut map = serde_json::Map::with_capacity(key_values.values.len());

for key_value in key_values.values {
if let Some(value) = key_value
.value
.and_then(|any_value| any_value.value)
.and_then(oltp_value_to_json_value)
{
map.insert(key_value.key, value);
}
}
Some(JsonValue::Object(map))
}
OtlpValue::StringValue(string_value) => Some(JsonValue::String(string_value)),
OtlpValue::BytesValue(_) => {
rate_limited_warn!(limit_per_min = 10, "ignoring unsupported OTLP bytes value");
None
}
}
}

fn to_json_value_from_primitive_any_value(any_value: OtlpAnyValue) -> Option<JsonValue> {
match any_value.value {
Some(OtlpValue::BoolValue(value)) => Some(JsonValue::Bool(value)),
Some(OtlpValue::DoubleValue(value)) => JsonNumber::from_f64(value).map(JsonValue::Number),
Some(OtlpValue::IntValue(value)) => Some(JsonValue::Number(JsonNumber::from(value))),
Some(OtlpValue::StringValue(value)) => Some(JsonValue::String(value)),
_ => None,
}
}

pub(crate) fn parse_log_record_body(body: OtlpAnyValue) -> Option<JsonValue> {
body.value.and_then(to_json_value).map(|value| {
body.value.and_then(oltp_value_to_json_value).map(|value| {
if value.is_string() {
let mut map = serde_json::Map::with_capacity(1);
map.insert("message".to_string(), value);
Expand Down Expand Up @@ -211,15 +220,13 @@ pub(crate) fn extract_otel_index_id_from_metadata(
.transpose()
.map_err(|error| {
Status::internal(format!(
"failed to extract index ID from request metadata: {}",
error
"failed to extract index ID from request metadata: {error}",
))
})?
.unwrap_or_else(|| otel_signal.default_index_id());
validate_identifier("index_id", index_id).map_err(|error| {
Status::internal(format!(
"invalid index ID pattern in request metadata: {}",
error
"invalid index ID pattern in request metadata: {error}",
))
})?;
Ok(index_id.to_string())
Expand All @@ -241,14 +248,11 @@ async fn ingest_doc_batch_v2(
commit_type: commit_type.into(),
subrequests: vec![subrequest],
};
let mut response = ingest_router
.ingest(request)
.await
.map_err(IngestServiceError::from)?;
let mut response = ingest_router.ingest(request).await?;
let num_responses = response.successes.len() + response.failures.len();
if num_responses != 1 {
return Err(IngestServiceError::Internal(format!(
"Expected a single failure/success, got {}.",
"expected a single failure or success, got {}",
num_responses
)));
}
Expand All @@ -264,34 +268,75 @@ mod tests {
use quickwit_proto::opentelemetry::proto::common::v1::any_value::{
Value as OtlpValue, Value as OtlpAnyValueValue,
};
use quickwit_proto::opentelemetry::proto::common::v1::ArrayValue as OtlpArrayValue;
use quickwit_proto::opentelemetry::proto::common::v1::{
ArrayValue as OtlpArrayValue, KeyValueList as OtlpKeyValueList,
};
use serde_json::{json, Value as JsonValue};

use super::*;
use crate::otlp::{extract_attributes, parse_log_record_body, to_json_value};
use crate::otlp::{extract_attributes, oltp_value_to_json_value, parse_log_record_body};

#[test]
fn test_to_json_value() {
fn test_oltp_value_to_json_value() {
assert_eq!(
to_json_value(OtlpValue::ArrayValue(OtlpArrayValue { values: Vec::new() })),
oltp_value_to_json_value(OtlpValue::ArrayValue(OtlpArrayValue { values: Vec::new() })),
Some(json!([]))
);
assert_eq!(
to_json_value(OtlpValue::ArrayValue(OtlpArrayValue {
values: vec![OtlpAnyValue {
value: Some(OtlpAnyValueValue::IntValue(1337))
}]
oltp_value_to_json_value(OtlpValue::ArrayValue(OtlpArrayValue {
values: vec![
OtlpAnyValue {
value: Some(OtlpAnyValueValue::IntValue(1337))
},
OtlpAnyValue {
value: Some(OtlpAnyValueValue::StringValue("1337".to_string()))
}
]
})),
Some(json!([1337]))
Some(json!([1337, "1337"]))
);
assert_eq!(
oltp_value_to_json_value(OtlpValue::BoolValue(true)),
Some(json!(true))
);
assert_eq!(to_json_value(OtlpValue::BoolValue(true)), Some(json!(true)));
assert_eq!(
to_json_value(OtlpValue::DoubleValue(12.0)),
oltp_value_to_json_value(OtlpValue::DoubleValue(12.0)),
Some(json!(12.0))
);
assert_eq!(to_json_value(OtlpValue::IntValue(42)), Some(json!(42)));
assert_eq!(
to_json_value(OtlpValue::StringValue("foo".to_string())),
oltp_value_to_json_value(OtlpValue::IntValue(42)),
Some(json!(42))
);
assert_eq!(
oltp_value_to_json_value(OtlpValue::KvlistValue(OtlpKeyValueList {
values: Vec::new()
})),
Some(json!({}))
);
assert_eq!(
oltp_value_to_json_value(OtlpValue::KvlistValue(OtlpKeyValueList {
values: vec![
OtlpKeyValue {
key: "foo".to_string(),
value: Some(OtlpAnyValue {
value: Some(OtlpAnyValueValue::IntValue(1337))
})
},
OtlpKeyValue {
key: "bar".to_string(),
value: Some(OtlpAnyValue {
value: Some(OtlpAnyValueValue::StringValue("1337".to_string()))
})
}
]
})),
Some(json!({
"foo": 1337,
"bar": "1337"
}))
);
assert_eq!(
oltp_value_to_json_value(OtlpValue::StringValue("foo".to_string())),
Some(json!("foo"))
);
}
Expand Down Expand Up @@ -358,7 +403,7 @@ mod tests {
}),
},
];
let expected_attributes = std::collections::HashMap::from_iter([
let expected_attributes = HashMap::from_iter([
("array_key".to_string(), json!([1337])),
("bool_key".to_string(), json!(true)),
("double_key".to_string(), json!(12.0)),
Expand Down

0 comments on commit 4c1e228

Please sign in to comment.