Skip to content

Add number of throttled query in CLI response #5655

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion docs/internals/ingest-v2.md
Original file line number Diff line number Diff line change
@@ -4,7 +4,11 @@ Ingest V2 is the latest ingestion API that is designed to be more efficient and

## Architecture

Just like ingest V1, the new ingest uses [`mrecordlog`](https://github.com/quickwit-oss/mrecordlog) to persist ingested documents that are waiting to be indexed. But unlike V1, which always persists the documents locally on the node that receives them, ingest V2 can dynamically distribute them into WAL units called _shards_. The assigned shard can be local or on another indexer. The control plane is in charge of distributing the shards to balance the indexing work as well as possible across all indexer nodes. The progress within each shard is not tracked as an index metadata checkpoint anymore but in a dedicated metastore `shards` table.
Just like ingest V1, the new ingest uses [`mrecordlog`](https://github.com/quickwit-oss/mrecordlog) to persist ingested documents that are waiting to be indexed. But unlike V1, which always persists the documents locally on the node that receives them, ingest V2 can dynamically distribute them into WAL units called _shards_. Here are a few key behaviors of this new mechanism:
- When an indexer receives a document for ingestion, the assigned shard can be local or on another indexer.
- The control plane is in charge of distributing the shards to balance the indexing work as well as possible across all indexer nodes.
- Each shard has a throughput limit (5MB). If the ingest rate on an index is becoming greater than the cumulated throughput of all its shards, the control plane schedules the creation of new shards. Note that when the cumulated throughput is exceeded on an index, the ingest API returns "too many requests" errors until the new shards are effectively created.
- The progress within each shard is tracked in a dedicated metastore `shards` table (instead of the index metadata checkpoint like for other sources).

In the future, the shard based ingest will also be capable of writing a replica for each shard, thus ensuring a high durability of the documents that are waiting to be indexed (durability of the indexed documents is guarantied by the object store).

@@ -33,3 +37,4 @@ See [full configuration example](https://github.com/quickwit-oss/quickwit/blob/m
- `ingest_api.replication_factor`, not working yet
- ingest V1 always writes to the WAL of the node receiving the request, V2 potentially forwards it to another node, dynamically assigned by the control plane to distribute the indexing work more evenly.
- ingest V2 parses and validates input documents synchronously. Schema and JSON formatting errors are returned in the ingest response (for ingest V1 those errors were available in the server logs only).
- ingest V2 returns transient 429 (too many requests) errors when the ingestion rate is too fast
2 changes: 1 addition & 1 deletion docs/operating/upgrades.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
title: Version 0.7 upgrade
title: Version upgrade
sidebar_position: 4
---

7 changes: 7 additions & 0 deletions quickwit/quickwit-cli/src/index.rs
Original file line number Diff line number Diff line change
@@ -1071,6 +1071,13 @@ pub async fn ingest_docs_cli(args: IngestDocsArgs) -> anyhow::Result<()> {
println!("└ document: {}", failure.document);
}
}
if response.num_too_many_requests > 0 {
println!("Retried request counts:");
println!(
" 429 (too many requests) = {}",
response.num_too_many_requests
);
}
Ok(())
}

Original file line number Diff line number Diff line change
@@ -34,14 +34,12 @@ use quickwit_proto::jaeger::storage::v1::span_reader_plugin_client::SpanReaderPl
use quickwit_proto::opentelemetry::proto::collector::logs::v1::logs_service_client::LogsServiceClient;
use quickwit_proto::opentelemetry::proto::collector::trace::v1::trace_service_client::TraceServiceClient;
use quickwit_proto::types::NodeId;
use quickwit_rest_client::models::IngestSource;
use quickwit_rest_client::models::{CumulatedIngestResponse, IngestSource};
use quickwit_rest_client::rest_client::{
CommitType, QuickwitClient, QuickwitClientBuilder, DEFAULT_BASE_URL,
};
use quickwit_serve::tcp_listener::for_tests::TestTcpListenerResolver;
use quickwit_serve::{
serve_quickwit, ListSplitsQueryParams, RestIngestResponse, SearchRequestQueryString,
};
use quickwit_serve::{serve_quickwit, ListSplitsQueryParams, SearchRequestQueryString};
use quickwit_storage::StorageResolver;
use reqwest::Url;
use serde_json::Value;
@@ -243,7 +241,7 @@ pub(crate) async fn ingest(
index_id: &str,
ingest_source: IngestSource,
commit_type: CommitType,
) -> anyhow::Result<RestIngestResponse> {
) -> anyhow::Result<CumulatedIngestResponse> {
let resp = client
.ingest(index_id, ingest_source, None, None, commit_type)
.await?;
106 changes: 92 additions & 14 deletions quickwit/quickwit-integration-tests/src/tests/ingest_v2_tests.rs
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::Duration;
use std::time::{Duration, Instant};

use futures_util::FutureExt;
use itertools::Itertools;
@@ -23,9 +23,9 @@ use quickwit_indexing::actors::INDEXING_DIR_NAME;
use quickwit_metastore::SplitState;
use quickwit_proto::ingest::ParseFailureReason;
use quickwit_rest_client::error::{ApiError, Error};
use quickwit_rest_client::models::IngestSource;
use quickwit_rest_client::models::{CumulatedIngestResponse, IngestSource};
use quickwit_rest_client::rest_client::CommitType;
use quickwit_serve::{ListSplitsQueryParams, RestIngestResponse, RestParseFailure};
use quickwit_serve::{ListSplitsQueryParams, RestParseFailure, SearchRequestQueryString};
use serde_json::json;

use crate::ingest_json;
@@ -306,11 +306,11 @@ async fn test_ingest_v2_happy_path() {
.unwrap();
assert_eq!(
ingest_resp,
RestIngestResponse {
CumulatedIngestResponse {
num_docs_for_processing: 1,
num_ingested_docs: Some(1),
num_rejected_docs: Some(0),
parse_failures: None,
..Default::default()
},
);

@@ -332,6 +332,83 @@ async fn test_ingest_v2_happy_path() {
sandbox.shutdown().await.unwrap();
}

#[tokio::test]
async fn test_ingest_v2_high_throughput() {
let sandbox = ClusterSandboxBuilder::build_and_start_standalone().await;
let index_id = "test_high_throughput";
let index_config = format!(
r#"
version: 0.8
index_id: {index_id}
doc_mapping:
field_mappings:
- name: body
type: text
indexing_settings:
commit_timeout_secs: 1
"#
);
sandbox
.rest_client(QuickwitService::Indexer)
.indexes()
.create(index_config, ConfigFormat::Yaml, false)
.await
.unwrap();

let body_size = 20 * 1000 * 1000;
let line = json!({"body": "my dummy repeated payload"}).to_string();
let num_docs = body_size / line.len();
let body = std::iter::repeat_n(&line, num_docs).join("\n");
let ingest_resp = sandbox
.rest_client(QuickwitService::Indexer)
.ingest(
index_id,
IngestSource::Str(body),
// TODO: when using the default 10MiB batch size, we get persist
// timeouts with code 500 on some lower performance machines (e.g.
// Github runners). We should investigate why this happens exactly.
Some(5_000_000),
Copy link
Collaborator Author

@rdettai rdettai Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guilload I didn't find a good explanation for why this timeout occur here in the persist

let persist_result = tokio::time::timeout(
PERSIST_REQUEST_TIMEOUT,
ingester.persist(persist_request),
)
.await
.unwrap_or_else(|_| {
let message = format!(
"persist request timed out after {} seconds",
PERSIST_REQUEST_TIMEOUT.as_secs()
);
Err(IngestV2Error::Timeout(message))
});

Persisting 10MB should not take 6 sec, even on a slow system and in debug mode, should it?

None,
CommitType::Auto,
)
.await
.unwrap();
assert_eq!(ingest_resp.num_docs_for_processing, num_docs as u64);
assert_eq!(ingest_resp.num_ingested_docs, Some(num_docs as u64));
assert_eq!(ingest_resp.num_rejected_docs, Some(0));
// num_too_many_requests might actually be > 0

let searcher_client = sandbox.rest_client(QuickwitService::Searcher);
// wait for the docs to be indexed
let start_time = Instant::now();
loop {
let res = searcher_client
.search(
index_id,
SearchRequestQueryString {
query: "*".to_string(),
..Default::default()
},
)
.await;
if let Ok(success_resp) = res {
if success_resp.num_hits == num_docs as u64 {
break;
}
}
if start_time.elapsed() > Duration::from_secs(20) {
panic!(
"didn't manage to index {} docs in {:?}",
num_docs,
start_time.elapsed()
);
}
tokio::time::sleep(Duration::from_secs(1)).await;
}

sandbox.shutdown().await.unwrap();
}

#[tokio::test]
async fn test_commit_force() {
let sandbox = ClusterSandboxBuilder::build_and_start_standalone().await;
@@ -372,11 +449,11 @@ async fn test_commit_force() {
.unwrap();
assert_eq!(
ingest_resp,
RestIngestResponse {
CumulatedIngestResponse {
num_docs_for_processing: 1,
num_ingested_docs: Some(1),
num_rejected_docs: Some(0),
parse_failures: None,
..Default::default()
},
);

@@ -452,20 +529,20 @@ async fn test_commit_wait_for() {
let (ingest_resp_1, ingest_resp_2) = tokio::join!(ingest_1_fut, ingest_2_fut);
assert_eq!(
ingest_resp_1,
RestIngestResponse {
CumulatedIngestResponse {
num_docs_for_processing: 1,
num_ingested_docs: Some(1),
num_rejected_docs: Some(0),
parse_failures: None,
..Default::default()
},
);
assert_eq!(
ingest_resp_2,
RestIngestResponse {
CumulatedIngestResponse {
num_docs_for_processing: 1,
num_ingested_docs: Some(1),
num_rejected_docs: Some(0),
parse_failures: None,
..Default::default()
},
);

@@ -523,11 +600,11 @@ async fn test_commit_auto() {
.unwrap();
assert_eq!(
ingest_resp,
RestIngestResponse {
CumulatedIngestResponse {
num_docs_for_processing: 1,
num_ingested_docs: Some(1),
num_rejected_docs: Some(0),
parse_failures: None,
..Default::default()
},
);

@@ -577,7 +654,7 @@ async fn test_detailed_ingest_response() {

assert_eq!(
ingest_resp,
RestIngestResponse {
CumulatedIngestResponse {
num_docs_for_processing: 2,
num_ingested_docs: Some(1),
num_rejected_docs: Some(1),
@@ -586,6 +663,7 @@ async fn test_detailed_ingest_response() {
message: "failed to parse JSON document".to_string(),
reason: ParseFailureReason::InvalidJson,
}]),
..Default::default()
},
);
sandbox.shutdown().await.unwrap();
90 changes: 90 additions & 0 deletions quickwit/quickwit-rest-client/src/models.rs
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@
use std::path::PathBuf;
use std::time::Duration;

use quickwit_serve::{RestIngestResponse, RestParseFailure};
use reqwest::StatusCode;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
@@ -94,6 +95,43 @@ pub enum IngestSource {
Stdin,
}

#[derive(Debug, PartialEq, Default)]
pub struct CumulatedIngestResponse {
pub num_docs_for_processing: u64,
pub num_ingested_docs: Option<u64>,
pub num_rejected_docs: Option<u64>,
pub parse_failures: Option<Vec<RestParseFailure>>,
pub num_too_many_requests: u64,
}

impl CumulatedIngestResponse {
/// Aggregates ingest counts and errors.
pub fn merge(self, other: RestIngestResponse) -> Self {
Self {
num_docs_for_processing: self.num_docs_for_processing + other.num_docs_for_processing,
num_ingested_docs: apply_op(self.num_ingested_docs, other.num_ingested_docs, |a, b| {
a + b
}),
num_rejected_docs: apply_op(self.num_rejected_docs, other.num_rejected_docs, |a, b| {
a + b
}),
parse_failures: apply_op(self.parse_failures, other.parse_failures, |a, b| {
a.into_iter().chain(b).collect()
}),
num_too_many_requests: self.num_too_many_requests,
}
}
Comment on lines +109 to +123
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this back here as it makes more sense than in the API model because accumulating responses is quite specific to the rest client.

}

fn apply_op<T>(a: Option<T>, b: Option<T>, f: impl Fn(T, T) -> T) -> Option<T> {
match (a, b) {
(Some(a), Some(b)) => Some(f(a, b)),
(Some(a), None) => Some(a),
(None, Some(b)) => Some(b),
(None, None) => None,
}
}

/// A structure that represent a timeout. Unlike Duration it can also represent an infinite or no
/// timeout value.
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default, Debug)]
@@ -149,3 +187,55 @@ impl Timeout {
}
}
}

#[cfg(test)]
mod test {
use quickwit_proto::ingest::ParseFailureReason;

use super::*;

#[test]
fn test_merge_responses() {
let mut merged_response = CumulatedIngestResponse::default();
let response1 = RestIngestResponse {
num_docs_for_processing: 10,
num_ingested_docs: Some(5),
num_rejected_docs: Some(2),
parse_failures: Some(vec![RestParseFailure {
message: "error1".to_string(),
document: "doc1".to_string(),
reason: ParseFailureReason::InvalidJson,
}]),
};
let response2 = RestIngestResponse {
num_docs_for_processing: 15,
num_ingested_docs: Some(10),
num_rejected_docs: Some(3),
parse_failures: Some(vec![RestParseFailure {
message: "error2".to_string(),
document: "doc2".to_string(),
reason: ParseFailureReason::InvalidJson,
}]),
};
merged_response = merged_response.merge(response1);
merged_response = merged_response.merge(response2);
assert_eq!(merged_response.num_docs_for_processing, 25);
assert_eq!(merged_response.num_ingested_docs.unwrap(), 15);
assert_eq!(merged_response.num_rejected_docs.unwrap(), 5);
assert_eq!(
merged_response.parse_failures.unwrap(),
vec![
RestParseFailure {
message: "error1".to_string(),
document: "doc1".to_string(),
reason: ParseFailureReason::InvalidJson,
},
RestParseFailure {
message: "error2".to_string(),
document: "doc2".to_string(),
reason: ParseFailureReason::InvalidJson,
}
]
);
}
}
Loading